eventsource-stream.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. 'use strict'
  2. const { Transform } = require('node:stream')
  3. const { isASCIINumber, isValidLastEventId } = require('./util')
  4. /**
  5. * @type {number[]} BOM
  6. */
  7. const BOM = [0xEF, 0xBB, 0xBF]
  8. /**
  9. * @type {10} LF
  10. */
  11. const LF = 0x0A
  12. /**
  13. * @type {13} CR
  14. */
  15. const CR = 0x0D
  16. /**
  17. * @type {58} COLON
  18. */
  19. const COLON = 0x3A
  20. /**
  21. * @type {32} SPACE
  22. */
  23. const SPACE = 0x20
  24. /**
  25. * @typedef {object} EventSourceStreamEvent
  26. * @type {object}
  27. * @property {string} [event] The event type.
  28. * @property {string} [data] The data of the message.
  29. * @property {string} [id] A unique ID for the event.
  30. * @property {string} [retry] The reconnection time, in milliseconds.
  31. */
  32. /**
  33. * @typedef eventSourceSettings
  34. * @type {object}
  35. * @property {string} [lastEventId] The last event ID received from the server.
  36. * @property {string} [origin] The origin of the event source.
  37. * @property {number} [reconnectionTime] The reconnection time, in milliseconds.
  38. */
  39. class EventSourceStream extends Transform {
  40. /**
  41. * @type {eventSourceSettings}
  42. */
  43. state
  44. /**
  45. * Leading byte-order-mark check.
  46. * @type {boolean}
  47. */
  48. checkBOM = true
  49. /**
  50. * @type {boolean}
  51. */
  52. crlfCheck = false
  53. /**
  54. * @type {boolean}
  55. */
  56. eventEndCheck = false
  57. /**
  58. * @type {Buffer|null}
  59. */
  60. buffer = null
  61. pos = 0
  62. event = {
  63. data: undefined,
  64. event: undefined,
  65. id: undefined,
  66. retry: undefined
  67. }
  68. /**
  69. * @param {object} options
  70. * @param {boolean} [options.readableObjectMode]
  71. * @param {eventSourceSettings} [options.eventSourceSettings]
  72. * @param {(chunk: any, encoding?: BufferEncoding | undefined) => boolean} [options.push]
  73. */
  74. constructor (options = {}) {
  75. // Enable object mode as EventSourceStream emits objects of shape
  76. // EventSourceStreamEvent
  77. options.readableObjectMode = true
  78. super(options)
  79. this.state = options.eventSourceSettings || {}
  80. if (options.push) {
  81. this.push = options.push
  82. }
  83. }
  84. /**
  85. * @param {Buffer} chunk
  86. * @param {string} _encoding
  87. * @param {Function} callback
  88. * @returns {void}
  89. */
  90. _transform (chunk, _encoding, callback) {
  91. if (chunk.length === 0) {
  92. callback()
  93. return
  94. }
  95. // Cache the chunk in the buffer, as the data might not be complete while
  96. // processing it
  97. // TODO: Investigate if there is a more performant way to handle
  98. // incoming chunks
  99. // see: https://github.com/nodejs/undici/issues/2630
  100. if (this.buffer) {
  101. this.buffer = Buffer.concat([this.buffer, chunk])
  102. } else {
  103. this.buffer = chunk
  104. }
  105. // Strip leading byte-order-mark if we opened the stream and started
  106. // the processing of the incoming data
  107. if (this.checkBOM) {
  108. switch (this.buffer.length) {
  109. case 1:
  110. // Check if the first byte is the same as the first byte of the BOM
  111. if (this.buffer[0] === BOM[0]) {
  112. // If it is, we need to wait for more data
  113. callback()
  114. return
  115. }
  116. // Set the checkBOM flag to false as we don't need to check for the
  117. // BOM anymore
  118. this.checkBOM = false
  119. // The buffer only contains one byte so we need to wait for more data
  120. callback()
  121. return
  122. case 2:
  123. // Check if the first two bytes are the same as the first two bytes
  124. // of the BOM
  125. if (
  126. this.buffer[0] === BOM[0] &&
  127. this.buffer[1] === BOM[1]
  128. ) {
  129. // If it is, we need to wait for more data, because the third byte
  130. // is needed to determine if it is the BOM or not
  131. callback()
  132. return
  133. }
  134. // Set the checkBOM flag to false as we don't need to check for the
  135. // BOM anymore
  136. this.checkBOM = false
  137. break
  138. case 3:
  139. // Check if the first three bytes are the same as the first three
  140. // bytes of the BOM
  141. if (
  142. this.buffer[0] === BOM[0] &&
  143. this.buffer[1] === BOM[1] &&
  144. this.buffer[2] === BOM[2]
  145. ) {
  146. // If it is, we can drop the buffered data, as it is only the BOM
  147. this.buffer = Buffer.alloc(0)
  148. // Set the checkBOM flag to false as we don't need to check for the
  149. // BOM anymore
  150. this.checkBOM = false
  151. // Await more data
  152. callback()
  153. return
  154. }
  155. // If it is not the BOM, we can start processing the data
  156. this.checkBOM = false
  157. break
  158. default:
  159. // The buffer is longer than 3 bytes, so we can drop the BOM if it is
  160. // present
  161. if (
  162. this.buffer[0] === BOM[0] &&
  163. this.buffer[1] === BOM[1] &&
  164. this.buffer[2] === BOM[2]
  165. ) {
  166. // Remove the BOM from the buffer
  167. this.buffer = this.buffer.subarray(3)
  168. }
  169. // Set the checkBOM flag to false as we don't need to check for the
  170. this.checkBOM = false
  171. break
  172. }
  173. }
  174. while (this.pos < this.buffer.length) {
  175. // If the previous line ended with an end-of-line, we need to check
  176. // if the next character is also an end-of-line.
  177. if (this.eventEndCheck) {
  178. // If the the current character is an end-of-line, then the event
  179. // is finished and we can process it
  180. // If the previous line ended with a carriage return, we need to
  181. // check if the current character is a line feed and remove it
  182. // from the buffer.
  183. if (this.crlfCheck) {
  184. // If the current character is a line feed, we can remove it
  185. // from the buffer and reset the crlfCheck flag
  186. if (this.buffer[this.pos] === LF) {
  187. this.buffer = this.buffer.subarray(this.pos + 1)
  188. this.pos = 0
  189. this.crlfCheck = false
  190. // It is possible that the line feed is not the end of the
  191. // event. We need to check if the next character is an
  192. // end-of-line character to determine if the event is
  193. // finished. We simply continue the loop to check the next
  194. // character.
  195. // As we removed the line feed from the buffer and set the
  196. // crlfCheck flag to false, we basically don't make any
  197. // distinction between a line feed and a carriage return.
  198. continue
  199. }
  200. this.crlfCheck = false
  201. }
  202. if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
  203. // If the current character is a carriage return, we need to
  204. // set the crlfCheck flag to true, as we need to check if the
  205. // next character is a line feed so we can remove it from the
  206. // buffer
  207. if (this.buffer[this.pos] === CR) {
  208. this.crlfCheck = true
  209. }
  210. this.buffer = this.buffer.subarray(this.pos + 1)
  211. this.pos = 0
  212. if (
  213. this.event.data !== undefined || this.event.event || this.event.id !== undefined || this.event.retry) {
  214. this.processEvent(this.event)
  215. }
  216. this.clearEvent()
  217. continue
  218. }
  219. // If the current character is not an end-of-line, then the event
  220. // is not finished and we have to reset the eventEndCheck flag
  221. this.eventEndCheck = false
  222. continue
  223. }
  224. // If the current character is an end-of-line, we can process the
  225. // line
  226. if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
  227. // If the current character is a carriage return, we need to
  228. // set the crlfCheck flag to true, as we need to check if the
  229. // next character is a line feed
  230. if (this.buffer[this.pos] === CR) {
  231. this.crlfCheck = true
  232. }
  233. // In any case, we can process the line as we reached an
  234. // end-of-line character
  235. this.parseLine(this.buffer.subarray(0, this.pos), this.event)
  236. // Remove the processed line from the buffer
  237. this.buffer = this.buffer.subarray(this.pos + 1)
  238. // Reset the position as we removed the processed line from the buffer
  239. this.pos = 0
  240. // A line was processed and this could be the end of the event. We need
  241. // to check if the next line is empty to determine if the event is
  242. // finished.
  243. this.eventEndCheck = true
  244. continue
  245. }
  246. this.pos++
  247. }
  248. callback()
  249. }
  250. /**
  251. * @param {Buffer} line
  252. * @param {EventSourceStreamEvent} event
  253. */
  254. parseLine (line, event) {
  255. // If the line is empty (a blank line)
  256. // Dispatch the event, as defined below.
  257. // This will be handled in the _transform method
  258. if (line.length === 0) {
  259. return
  260. }
  261. // If the line starts with a U+003A COLON character (:)
  262. // Ignore the line.
  263. const colonPosition = line.indexOf(COLON)
  264. if (colonPosition === 0) {
  265. return
  266. }
  267. let field = ''
  268. let value = ''
  269. // If the line contains a U+003A COLON character (:)
  270. if (colonPosition !== -1) {
  271. // Collect the characters on the line before the first U+003A COLON
  272. // character (:), and let field be that string.
  273. // TODO: Investigate if there is a more performant way to extract the
  274. // field
  275. // see: https://github.com/nodejs/undici/issues/2630
  276. field = line.subarray(0, colonPosition).toString('utf8')
  277. // Collect the characters on the line after the first U+003A COLON
  278. // character (:), and let value be that string.
  279. // If value starts with a U+0020 SPACE character, remove it from value.
  280. let valueStart = colonPosition + 1
  281. if (line[valueStart] === SPACE) {
  282. ++valueStart
  283. }
  284. // TODO: Investigate if there is a more performant way to extract the
  285. // value
  286. // see: https://github.com/nodejs/undici/issues/2630
  287. value = line.subarray(valueStart).toString('utf8')
  288. // Otherwise, the string is not empty but does not contain a U+003A COLON
  289. // character (:)
  290. } else {
  291. // Process the field using the steps described below, using the whole
  292. // line as the field name, and the empty string as the field value.
  293. field = line.toString('utf8')
  294. value = ''
  295. }
  296. // Modify the event with the field name and value. The value is also
  297. // decoded as UTF-8
  298. switch (field) {
  299. case 'data':
  300. if (event[field] === undefined) {
  301. event[field] = value
  302. } else {
  303. event[field] += `\n${value}`
  304. }
  305. break
  306. case 'retry':
  307. if (isASCIINumber(value)) {
  308. event[field] = value
  309. }
  310. break
  311. case 'id':
  312. if (isValidLastEventId(value)) {
  313. event[field] = value
  314. }
  315. break
  316. case 'event':
  317. if (value.length > 0) {
  318. event[field] = value
  319. }
  320. break
  321. }
  322. }
  323. /**
  324. * @param {EventSourceStreamEvent} event
  325. */
  326. processEvent (event) {
  327. if (event.retry && isASCIINumber(event.retry)) {
  328. this.state.reconnectionTime = parseInt(event.retry, 10)
  329. }
  330. if (event.id !== undefined && isValidLastEventId(event.id)) {
  331. this.state.lastEventId = event.id
  332. }
  333. // only dispatch event, when data is provided
  334. if (event.data !== undefined) {
  335. this.push({
  336. type: event.event || 'message',
  337. options: {
  338. data: event.data,
  339. lastEventId: this.state.lastEventId,
  340. origin: this.state.origin
  341. }
  342. })
  343. }
  344. }
  345. clearEvent () {
  346. this.event = {
  347. data: undefined,
  348. event: undefined,
  349. id: undefined,
  350. retry: undefined
  351. }
  352. }
  353. }
  354. module.exports = {
  355. EventSourceStream
  356. }