receiver.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. 'use strict'
  2. const { Writable } = require('node:stream')
  3. const assert = require('node:assert')
  4. const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
  5. const {
  6. isValidStatusCode,
  7. isValidOpcode,
  8. websocketMessageReceived,
  9. utf8Decode,
  10. isControlFrame,
  11. isTextBinaryFrame,
  12. isContinuationFrame
  13. } = require('./util')
  14. const { failWebsocketConnection } = require('./connection')
  15. const { WebsocketFrameSend } = require('./frame')
  16. const { PerMessageDeflate } = require('./permessage-deflate')
  17. // This code was influenced by ws released under the MIT license.
  18. // Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  19. // Copyright (c) 2013 Arnout Kazemier and contributors
  20. // Copyright (c) 2016 Luigi Pinca and contributors
  21. class ByteParser extends Writable {
  22. #buffers = []
  23. #fragmentsBytes = 0
  24. #byteOffset = 0
  25. #loop = false
  26. #state = parserStates.INFO
  27. #info = {}
  28. #fragments = []
  29. /** @type {Map<string, PerMessageDeflate>} */
  30. #extensions
  31. /** @type {import('./websocket').Handler} */
  32. #handler
  33. constructor (handler, extensions) {
  34. super()
  35. this.#handler = handler
  36. this.#extensions = extensions == null ? new Map() : extensions
  37. if (this.#extensions.has('permessage-deflate')) {
  38. this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions))
  39. }
  40. }
  41. /**
  42. * @param {Buffer} chunk
  43. * @param {() => void} callback
  44. */
  45. _write (chunk, _, callback) {
  46. this.#buffers.push(chunk)
  47. this.#byteOffset += chunk.length
  48. this.#loop = true
  49. this.run(callback)
  50. }
  51. /**
  52. * Runs whenever a new chunk is received.
  53. * Callback is called whenever there are no more chunks buffering,
  54. * or not enough bytes are buffered to parse.
  55. */
  56. run (callback) {
  57. while (this.#loop) {
  58. if (this.#state === parserStates.INFO) {
  59. // If there aren't enough bytes to parse the payload length, etc.
  60. if (this.#byteOffset < 2) {
  61. return callback()
  62. }
  63. const buffer = this.consume(2)
  64. const fin = (buffer[0] & 0x80) !== 0
  65. const opcode = buffer[0] & 0x0F
  66. const masked = (buffer[1] & 0x80) === 0x80
  67. const fragmented = !fin && opcode !== opcodes.CONTINUATION
  68. const payloadLength = buffer[1] & 0x7F
  69. const rsv1 = buffer[0] & 0x40
  70. const rsv2 = buffer[0] & 0x20
  71. const rsv3 = buffer[0] & 0x10
  72. if (!isValidOpcode(opcode)) {
  73. failWebsocketConnection(this.#handler, 1002, 'Invalid opcode received')
  74. return callback()
  75. }
  76. if (masked) {
  77. failWebsocketConnection(this.#handler, 1002, 'Frame cannot be masked')
  78. return callback()
  79. }
  80. // MUST be 0 unless an extension is negotiated that defines meanings
  81. // for non-zero values. If a nonzero value is received and none of
  82. // the negotiated extensions defines the meaning of such a nonzero
  83. // value, the receiving endpoint MUST _Fail the WebSocket
  84. // Connection_.
  85. // This document allocates the RSV1 bit of the WebSocket header for
  86. // PMCEs and calls the bit the "Per-Message Compressed" bit. On a
  87. // WebSocket connection where a PMCE is in use, this bit indicates
  88. // whether a message is compressed or not.
  89. if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) {
  90. failWebsocketConnection(this.#handler, 1002, 'Expected RSV1 to be clear.')
  91. return
  92. }
  93. if (rsv2 !== 0 || rsv3 !== 0) {
  94. failWebsocketConnection(this.#handler, 1002, 'RSV1, RSV2, RSV3 must be clear')
  95. return
  96. }
  97. if (fragmented && !isTextBinaryFrame(opcode)) {
  98. // Only text and binary frames can be fragmented
  99. failWebsocketConnection(this.#handler, 1002, 'Invalid frame type was fragmented.')
  100. return
  101. }
  102. // If we are already parsing a text/binary frame and do not receive either
  103. // a continuation frame or close frame, fail the connection.
  104. if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
  105. failWebsocketConnection(this.#handler, 1002, 'Expected continuation frame')
  106. return
  107. }
  108. if (this.#info.fragmented && fragmented) {
  109. // A fragmented frame can't be fragmented itself
  110. failWebsocketConnection(this.#handler, 1002, 'Fragmented frame exceeded 125 bytes.')
  111. return
  112. }
  113. // "All control frames MUST have a payload length of 125 bytes or less
  114. // and MUST NOT be fragmented."
  115. if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
  116. failWebsocketConnection(this.#handler, 1002, 'Control frame either too large or fragmented')
  117. return
  118. }
  119. if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) {
  120. failWebsocketConnection(this.#handler, 1002, 'Unexpected continuation frame')
  121. return
  122. }
  123. if (payloadLength <= 125) {
  124. this.#info.payloadLength = payloadLength
  125. this.#state = parserStates.READ_DATA
  126. } else if (payloadLength === 126) {
  127. this.#state = parserStates.PAYLOADLENGTH_16
  128. } else if (payloadLength === 127) {
  129. this.#state = parserStates.PAYLOADLENGTH_64
  130. }
  131. if (isTextBinaryFrame(opcode)) {
  132. this.#info.binaryType = opcode
  133. this.#info.compressed = rsv1 !== 0
  134. }
  135. this.#info.opcode = opcode
  136. this.#info.masked = masked
  137. this.#info.fin = fin
  138. this.#info.fragmented = fragmented
  139. } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
  140. if (this.#byteOffset < 2) {
  141. return callback()
  142. }
  143. const buffer = this.consume(2)
  144. this.#info.payloadLength = buffer.readUInt16BE(0)
  145. this.#state = parserStates.READ_DATA
  146. } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
  147. if (this.#byteOffset < 8) {
  148. return callback()
  149. }
  150. const buffer = this.consume(8)
  151. const upper = buffer.readUInt32BE(0)
  152. // 2^31 is the maximum bytes an arraybuffer can contain
  153. // on 32-bit systems. Although, on 64-bit systems, this is
  154. // 2^53-1 bytes.
  155. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
  156. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
  157. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
  158. if (upper > 2 ** 31 - 1) {
  159. failWebsocketConnection(this.#handler, 1009, 'Received payload length > 2^31 bytes.')
  160. return
  161. }
  162. const lower = buffer.readUInt32BE(4)
  163. this.#info.payloadLength = (upper << 8) + lower
  164. this.#state = parserStates.READ_DATA
  165. } else if (this.#state === parserStates.READ_DATA) {
  166. if (this.#byteOffset < this.#info.payloadLength) {
  167. return callback()
  168. }
  169. const body = this.consume(this.#info.payloadLength)
  170. if (isControlFrame(this.#info.opcode)) {
  171. this.#loop = this.parseControlFrame(body)
  172. this.#state = parserStates.INFO
  173. } else {
  174. if (!this.#info.compressed) {
  175. this.writeFragments(body)
  176. // If the frame is not fragmented, a message has been received.
  177. // If the frame is fragmented, it will terminate with a fin bit set
  178. // and an opcode of 0 (continuation), therefore we handle that when
  179. // parsing continuation frames, not here.
  180. if (!this.#info.fragmented && this.#info.fin) {
  181. websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())
  182. }
  183. this.#state = parserStates.INFO
  184. } else {
  185. this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => {
  186. if (error) {
  187. failWebsocketConnection(this.#handler, 1007, error.message)
  188. return
  189. }
  190. this.writeFragments(data)
  191. if (!this.#info.fin) {
  192. this.#state = parserStates.INFO
  193. this.#loop = true
  194. this.run(callback)
  195. return
  196. }
  197. websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())
  198. this.#loop = true
  199. this.#state = parserStates.INFO
  200. this.run(callback)
  201. })
  202. this.#loop = false
  203. break
  204. }
  205. }
  206. }
  207. }
  208. }
  209. /**
  210. * Take n bytes from the buffered Buffers
  211. * @param {number} n
  212. * @returns {Buffer}
  213. */
  214. consume (n) {
  215. if (n > this.#byteOffset) {
  216. throw new Error('Called consume() before buffers satiated.')
  217. } else if (n === 0) {
  218. return emptyBuffer
  219. }
  220. this.#byteOffset -= n
  221. const first = this.#buffers[0]
  222. if (first.length > n) {
  223. // replace with remaining buffer
  224. this.#buffers[0] = first.subarray(n, first.length)
  225. return first.subarray(0, n)
  226. } else if (first.length === n) {
  227. // prefect match
  228. return this.#buffers.shift()
  229. } else {
  230. let offset = 0
  231. // If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
  232. const buffer = Buffer.allocUnsafeSlow(n)
  233. while (offset !== n) {
  234. const next = this.#buffers[0]
  235. const length = next.length
  236. if (length + offset === n) {
  237. buffer.set(this.#buffers.shift(), offset)
  238. break
  239. } else if (length + offset > n) {
  240. buffer.set(next.subarray(0, n - offset), offset)
  241. this.#buffers[0] = next.subarray(n - offset)
  242. break
  243. } else {
  244. buffer.set(this.#buffers.shift(), offset)
  245. offset += length
  246. }
  247. }
  248. return buffer
  249. }
  250. }
  251. writeFragments (fragment) {
  252. this.#fragmentsBytes += fragment.length
  253. this.#fragments.push(fragment)
  254. }
  255. consumeFragments () {
  256. const fragments = this.#fragments
  257. if (fragments.length === 1) {
  258. // single fragment
  259. this.#fragmentsBytes = 0
  260. return fragments.shift()
  261. }
  262. let offset = 0
  263. // If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
  264. const output = Buffer.allocUnsafeSlow(this.#fragmentsBytes)
  265. for (let i = 0; i < fragments.length; ++i) {
  266. const buffer = fragments[i]
  267. output.set(buffer, offset)
  268. offset += buffer.length
  269. }
  270. this.#fragments = []
  271. this.#fragmentsBytes = 0
  272. return output
  273. }
  274. parseCloseBody (data) {
  275. assert(data.length !== 1)
  276. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
  277. /** @type {number|undefined} */
  278. let code
  279. if (data.length >= 2) {
  280. // _The WebSocket Connection Close Code_ is
  281. // defined as the status code (Section 7.4) contained in the first Close
  282. // control frame received by the application
  283. code = data.readUInt16BE(0)
  284. }
  285. if (code !== undefined && !isValidStatusCode(code)) {
  286. return { code: 1002, reason: 'Invalid status code', error: true }
  287. }
  288. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
  289. /** @type {Buffer} */
  290. let reason = data.subarray(2)
  291. // Remove BOM
  292. if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
  293. reason = reason.subarray(3)
  294. }
  295. try {
  296. reason = utf8Decode(reason)
  297. } catch {
  298. return { code: 1007, reason: 'Invalid UTF-8', error: true }
  299. }
  300. return { code, reason, error: false }
  301. }
  302. /**
  303. * Parses control frames.
  304. * @param {Buffer} body
  305. */
  306. parseControlFrame (body) {
  307. const { opcode, payloadLength } = this.#info
  308. if (opcode === opcodes.CLOSE) {
  309. if (payloadLength === 1) {
  310. failWebsocketConnection(this.#handler, 1002, 'Received close frame with a 1-byte body.')
  311. return false
  312. }
  313. this.#info.closeInfo = this.parseCloseBody(body)
  314. if (this.#info.closeInfo.error) {
  315. const { code, reason } = this.#info.closeInfo
  316. failWebsocketConnection(this.#handler, code, reason)
  317. return false
  318. }
  319. // Upon receiving such a frame, the other peer sends a
  320. // Close frame in response, if it hasn't already sent one.
  321. if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) {
  322. // If an endpoint receives a Close frame and did not previously send a
  323. // Close frame, the endpoint MUST send a Close frame in response. (When
  324. // sending a Close frame in response, the endpoint typically echos the
  325. // status code it received.)
  326. let body = emptyBuffer
  327. if (this.#info.closeInfo.code) {
  328. body = Buffer.allocUnsafe(2)
  329. body.writeUInt16BE(this.#info.closeInfo.code, 0)
  330. }
  331. const closeFrame = new WebsocketFrameSend(body)
  332. this.#handler.socket.write(closeFrame.createFrame(opcodes.CLOSE))
  333. this.#handler.closeState.add(sentCloseFrameState.SENT)
  334. }
  335. // Upon either sending or receiving a Close control frame, it is said
  336. // that _The WebSocket Closing Handshake is Started_ and that the
  337. // WebSocket connection is in the CLOSING state.
  338. this.#handler.readyState = states.CLOSING
  339. this.#handler.closeState.add(sentCloseFrameState.RECEIVED)
  340. return false
  341. } else if (opcode === opcodes.PING) {
  342. // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
  343. // response, unless it already received a Close frame.
  344. // A Pong frame sent in response to a Ping frame must have identical
  345. // "Application data"
  346. if (!this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) {
  347. const frame = new WebsocketFrameSend(body)
  348. this.#handler.socket.write(frame.createFrame(opcodes.PONG))
  349. this.#handler.onPing(body)
  350. }
  351. } else if (opcode === opcodes.PONG) {
  352. // A Pong frame MAY be sent unsolicited. This serves as a
  353. // unidirectional heartbeat. A response to an unsolicited Pong frame is
  354. // not expected.
  355. this.#handler.onPong(body)
  356. }
  357. return true
  358. }
  359. get closingInfo () {
  360. return this.#info.closeInfo
  361. }
  362. }
  363. module.exports = {
  364. ByteParser
  365. }