websocketstream.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. 'use strict'
  2. const { createDeferredPromise } = require('../../../util/promise')
  3. const { environmentSettingsObject } = require('../../fetch/util')
  4. const { states, opcodes, sentCloseFrameState } = require('../constants')
  5. const { webidl } = require('../../webidl')
  6. const { getURLRecord, isValidSubprotocol, isEstablished, utf8Decode } = require('../util')
  7. const { establishWebSocketConnection, failWebsocketConnection, closeWebSocketConnection } = require('../connection')
  8. const { channels } = require('../../../core/diagnostics')
  9. const { WebsocketFrameSend } = require('../frame')
  10. const { ByteParser } = require('../receiver')
  11. const { WebSocketError, createUnvalidatedWebSocketError } = require('./websocketerror')
  12. const { kEnumerableProperty } = require('../../../core/util')
  13. const { utf8DecodeBytes } = require('../../../encoding')
  14. let emittedExperimentalWarning = false
  15. class WebSocketStream {
  16. // Each WebSocketStream object has an associated url , which is a URL record .
  17. /** @type {URL} */
  18. #url
  19. // Each WebSocketStream object has an associated opened promise , which is a promise.
  20. /** @type {import('../../../util/promise').DeferredPromise} */
  21. #openedPromise
  22. // Each WebSocketStream object has an associated closed promise , which is a promise.
  23. /** @type {import('../../../util/promise').DeferredPromise} */
  24. #closedPromise
  25. // Each WebSocketStream object has an associated readable stream , which is a ReadableStream .
  26. /** @type {ReadableStream} */
  27. #readableStream
  28. /** @type {ReadableStreamDefaultController} */
  29. #readableStreamController
  30. // Each WebSocketStream object has an associated writable stream , which is a WritableStream .
  31. /** @type {WritableStream} */
  32. #writableStream
  33. // Each WebSocketStream object has an associated boolean handshake aborted , which is initially false.
  34. #handshakeAborted = false
  35. /** @type {import('../websocket').Handler} */
  36. #handler = {
  37. // https://whatpr.org/websockets/48/7b748d3...d5570f3.html#feedback-to-websocket-stream-from-the-protocol
  38. onConnectionEstablished: (response, extensions) => this.#onConnectionEstablished(response, extensions),
  39. onMessage: (opcode, data) => this.#onMessage(opcode, data),
  40. onParserError: (err) => failWebsocketConnection(this.#handler, null, err.message),
  41. onParserDrain: () => this.#handler.socket.resume(),
  42. onSocketData: (chunk) => {
  43. if (!this.#parser.write(chunk)) {
  44. this.#handler.socket.pause()
  45. }
  46. },
  47. onSocketError: (err) => {
  48. this.#handler.readyState = states.CLOSING
  49. if (channels.socketError.hasSubscribers) {
  50. channels.socketError.publish(err)
  51. }
  52. this.#handler.socket.destroy()
  53. },
  54. onSocketClose: () => this.#onSocketClose(),
  55. onPing: () => {},
  56. onPong: () => {},
  57. readyState: states.CONNECTING,
  58. socket: null,
  59. closeState: new Set(),
  60. controller: null,
  61. wasEverConnected: false
  62. }
  63. /** @type {import('../receiver').ByteParser} */
  64. #parser
  65. constructor (url, options = undefined) {
  66. if (!emittedExperimentalWarning) {
  67. process.emitWarning('WebSocketStream is experimental! Expect it to change at any time.', {
  68. code: 'UNDICI-WSS'
  69. })
  70. emittedExperimentalWarning = true
  71. }
  72. webidl.argumentLengthCheck(arguments, 1, 'WebSocket')
  73. url = webidl.converters.USVString(url)
  74. if (options !== null) {
  75. options = webidl.converters.WebSocketStreamOptions(options)
  76. }
  77. // 1. Let baseURL be this 's relevant settings object 's API base URL .
  78. const baseURL = environmentSettingsObject.settingsObject.baseUrl
  79. // 2. Let urlRecord be the result of getting a URL record given url and baseURL .
  80. const urlRecord = getURLRecord(url, baseURL)
  81. // 3. Let protocols be options [" protocols "] if it exists , otherwise an empty sequence.
  82. const protocols = options.protocols
  83. // 4. If any of the values in protocols occur more than once or otherwise fail to match the requirements for elements that comprise the value of ` Sec-WebSocket-Protocol ` fields as defined by The WebSocket Protocol , then throw a " SyntaxError " DOMException . [WSP]
  84. if (protocols.length !== new Set(protocols.map(p => p.toLowerCase())).size) {
  85. throw new DOMException('Invalid Sec-WebSocket-Protocol value', 'SyntaxError')
  86. }
  87. if (protocols.length > 0 && !protocols.every(p => isValidSubprotocol(p))) {
  88. throw new DOMException('Invalid Sec-WebSocket-Protocol value', 'SyntaxError')
  89. }
  90. // 5. Set this 's url to urlRecord .
  91. this.#url = urlRecord.toString()
  92. // 6. Set this 's opened promise and closed promise to new promises.
  93. this.#openedPromise = createDeferredPromise()
  94. this.#closedPromise = createDeferredPromise()
  95. // 7. Apply backpressure to the WebSocket.
  96. // TODO
  97. // 8. If options [" signal "] exists ,
  98. if (options.signal != null) {
  99. // 8.1. Let signal be options [" signal "].
  100. const signal = options.signal
  101. // 8.2. If signal is aborted , then reject this 's opened promise and closed promise with signal ’s abort reason
  102. // and return.
  103. if (signal.aborted) {
  104. this.#openedPromise.reject(signal.reason)
  105. this.#closedPromise.reject(signal.reason)
  106. return
  107. }
  108. // 8.3. Add the following abort steps to signal :
  109. signal.addEventListener('abort', () => {
  110. // 8.3.1. If the WebSocket connection is not yet established : [WSP]
  111. if (!isEstablished(this.#handler.readyState)) {
  112. // 8.3.1.1. Fail the WebSocket connection .
  113. failWebsocketConnection(this.#handler)
  114. // Set this 's ready state to CLOSING .
  115. this.#handler.readyState = states.CLOSING
  116. // Reject this 's opened promise and closed promise with signal ’s abort reason .
  117. this.#openedPromise.reject(signal.reason)
  118. this.#closedPromise.reject(signal.reason)
  119. // Set this 's handshake aborted to true.
  120. this.#handshakeAborted = true
  121. }
  122. }, { once: true })
  123. }
  124. // 9. Let client be this 's relevant settings object .
  125. const client = environmentSettingsObject.settingsObject
  126. // 10. Run this step in parallel :
  127. // 10.1. Establish a WebSocket connection given urlRecord , protocols , and client . [FETCH]
  128. this.#handler.controller = establishWebSocketConnection(
  129. urlRecord,
  130. protocols,
  131. client,
  132. this.#handler,
  133. options
  134. )
  135. }
  136. // The url getter steps are to return this 's url , serialized .
  137. get url () {
  138. return this.#url.toString()
  139. }
  140. // The opened getter steps are to return this 's opened promise .
  141. get opened () {
  142. return this.#openedPromise.promise
  143. }
  144. // The closed getter steps are to return this 's closed promise .
  145. get closed () {
  146. return this.#closedPromise.promise
  147. }
  148. // The close( closeInfo ) method steps are:
  149. close (closeInfo = undefined) {
  150. if (closeInfo !== null) {
  151. closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo)
  152. }
  153. // 1. Let code be closeInfo [" closeCode "] if present, or null otherwise.
  154. const code = closeInfo.closeCode ?? null
  155. // 2. Let reason be closeInfo [" reason "].
  156. const reason = closeInfo.reason
  157. // 3. Close the WebSocket with this , code , and reason .
  158. closeWebSocketConnection(this.#handler, code, reason, true)
  159. }
  160. #write (chunk) {
  161. // See /websockets/stream/tentative/write.any.html
  162. chunk = webidl.converters.WebSocketStreamWrite(chunk)
  163. // 1. Let promise be a new promise created in stream ’s relevant realm .
  164. const promise = createDeferredPromise()
  165. // 2. Let data be null.
  166. let data = null
  167. // 3. Let opcode be null.
  168. let opcode = null
  169. // 4. If chunk is a BufferSource ,
  170. if (webidl.is.BufferSource(chunk)) {
  171. // 4.1. Set data to a copy of the bytes given chunk .
  172. data = new Uint8Array(ArrayBuffer.isView(chunk) ? new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength) : chunk.slice())
  173. // 4.2. Set opcode to a binary frame opcode.
  174. opcode = opcodes.BINARY
  175. } else {
  176. // 5. Otherwise,
  177. // 5.1. Let string be the result of converting chunk to an IDL USVString .
  178. // If this throws an exception, return a promise rejected with the exception.
  179. let string
  180. try {
  181. string = webidl.converters.DOMString(chunk)
  182. } catch (e) {
  183. promise.reject(e)
  184. return promise.promise
  185. }
  186. // 5.2. Set data to the result of UTF-8 encoding string .
  187. data = new TextEncoder().encode(string)
  188. // 5.3. Set opcode to a text frame opcode.
  189. opcode = opcodes.TEXT
  190. }
  191. // 6. In parallel,
  192. // 6.1. Wait until there is sufficient buffer space in stream to send the message.
  193. // 6.2. If the closing handshake has not yet started , Send a WebSocket Message to stream comprised of data using opcode .
  194. if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) {
  195. const frame = new WebsocketFrameSend(data)
  196. this.#handler.socket.write(frame.createFrame(opcode), () => {
  197. promise.resolve(undefined)
  198. })
  199. }
  200. // 6.3. Queue a global task on the WebSocket task source given stream ’s relevant global object to resolve promise with undefined.
  201. return promise.promise
  202. }
  203. /** @type {import('../websocket').Handler['onConnectionEstablished']} */
  204. #onConnectionEstablished (response, parsedExtensions) {
  205. this.#handler.socket = response.socket
  206. const parser = new ByteParser(this.#handler, parsedExtensions)
  207. parser.on('drain', () => this.#handler.onParserDrain())
  208. parser.on('error', (err) => this.#handler.onParserError(err))
  209. this.#parser = parser
  210. // 1. Change stream ’s ready state to OPEN (1).
  211. this.#handler.readyState = states.OPEN
  212. // 2. Set stream ’s was ever connected to true.
  213. // This is done in the opening handshake.
  214. // 3. Let extensions be the extensions in use .
  215. const extensions = parsedExtensions ?? ''
  216. // 4. Let protocol be the subprotocol in use .
  217. const protocol = response.headersList.get('sec-websocket-protocol') ?? ''
  218. // 5. Let pullAlgorithm be an action that pulls bytes from stream .
  219. // 6. Let cancelAlgorithm be an action that cancels stream with reason , given reason .
  220. // 7. Let readable be a new ReadableStream .
  221. // 8. Set up readable with pullAlgorithm and cancelAlgorithm .
  222. const readable = new ReadableStream({
  223. start: (controller) => {
  224. this.#readableStreamController = controller
  225. },
  226. pull (controller) {
  227. let chunk
  228. while (controller.desiredSize > 0 && (chunk = response.socket.read()) !== null) {
  229. controller.enqueue(chunk)
  230. }
  231. },
  232. cancel: (reason) => this.#cancel(reason)
  233. })
  234. // 9. Let writeAlgorithm be an action that writes chunk to stream , given chunk .
  235. // 10. Let closeAlgorithm be an action that closes stream .
  236. // 11. Let abortAlgorithm be an action that aborts stream with reason , given reason .
  237. // 12. Let writable be a new WritableStream .
  238. // 13. Set up writable with writeAlgorithm , closeAlgorithm , and abortAlgorithm .
  239. const writable = new WritableStream({
  240. write: (chunk) => this.#write(chunk),
  241. close: () => closeWebSocketConnection(this.#handler, null, null),
  242. abort: (reason) => this.#closeUsingReason(reason)
  243. })
  244. // Set stream ’s readable stream to readable .
  245. this.#readableStream = readable
  246. // Set stream ’s writable stream to writable .
  247. this.#writableStream = writable
  248. // Resolve stream ’s opened promise with WebSocketOpenInfo «[ " extensions " → extensions , " protocol " → protocol , " readable " → readable , " writable " → writable ]».
  249. this.#openedPromise.resolve({
  250. extensions,
  251. protocol,
  252. readable,
  253. writable
  254. })
  255. }
  256. /** @type {import('../websocket').Handler['onMessage']} */
  257. #onMessage (type, data) {
  258. // 1. If stream’s ready state is not OPEN (1), then return.
  259. if (this.#handler.readyState !== states.OPEN) {
  260. return
  261. }
  262. // 2. Let chunk be determined by switching on type:
  263. // - type indicates that the data is Text
  264. // a new DOMString containing data
  265. // - type indicates that the data is Binary
  266. // a new Uint8Array object, created in the relevant Realm of the
  267. // WebSocketStream object, whose contents are data
  268. let chunk
  269. if (type === opcodes.TEXT) {
  270. try {
  271. chunk = utf8Decode(data)
  272. } catch {
  273. failWebsocketConnection(this.#handler, 'Received invalid UTF-8 in text frame.')
  274. return
  275. }
  276. } else if (type === opcodes.BINARY) {
  277. chunk = new Uint8Array(data.buffer, data.byteOffset, data.byteLength)
  278. }
  279. // 3. Enqueue chunk into stream’s readable stream.
  280. this.#readableStreamController.enqueue(chunk)
  281. // 4. Apply backpressure to the WebSocket.
  282. }
  283. /** @type {import('../websocket').Handler['onSocketClose']} */
  284. #onSocketClose () {
  285. const wasClean =
  286. this.#handler.closeState.has(sentCloseFrameState.SENT) &&
  287. this.#handler.closeState.has(sentCloseFrameState.RECEIVED)
  288. // 1. Change the ready state to CLOSED (3).
  289. this.#handler.readyState = states.CLOSED
  290. // 2. If stream ’s handshake aborted is true, then return.
  291. if (this.#handshakeAborted) {
  292. return
  293. }
  294. // 3. If stream ’s was ever connected is false, then reject stream ’s opened promise with a new WebSocketError.
  295. if (!this.#handler.wasEverConnected) {
  296. this.#openedPromise.reject(new WebSocketError('Socket never opened'))
  297. }
  298. const result = this.#parser?.closingInfo
  299. // 4. Let code be the WebSocket connection close code .
  300. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
  301. // If this Close control frame contains no status code, _The WebSocket
  302. // Connection Close Code_ is considered to be 1005. If _The WebSocket
  303. // Connection is Closed_ and no Close control frame was received by the
  304. // endpoint (such as could occur if the underlying transport connection
  305. // is lost), _The WebSocket Connection Close Code_ is considered to be
  306. // 1006.
  307. let code = result?.code ?? 1005
  308. if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) {
  309. code = 1006
  310. }
  311. // 5. Let reason be the result of applying UTF-8 decode without BOM to the WebSocket connection close reason .
  312. const reason = result?.reason == null ? '' : utf8DecodeBytes(Buffer.from(result.reason))
  313. // 6. If the connection was closed cleanly ,
  314. if (wasClean) {
  315. // 6.1. Close stream ’s readable stream .
  316. this.#readableStreamController.close()
  317. // 6.2. Error stream ’s writable stream with an " InvalidStateError " DOMException indicating that a closed WebSocketStream cannot be written to.
  318. if (!this.#writableStream.locked) {
  319. this.#writableStream.abort(new DOMException('A closed WebSocketStream cannot be written to', 'InvalidStateError'))
  320. }
  321. // 6.3. Resolve stream ’s closed promise with WebSocketCloseInfo «[ " closeCode " → code , " reason " → reason ]».
  322. this.#closedPromise.resolve({
  323. closeCode: code,
  324. reason
  325. })
  326. } else {
  327. // 7. Otherwise,
  328. // 7.1. Let error be a new WebSocketError whose closeCode is code and reason is reason .
  329. const error = createUnvalidatedWebSocketError('unclean close', code, reason)
  330. // 7.2. Error stream ’s readable stream with error .
  331. this.#readableStreamController?.error(error)
  332. // 7.3. Error stream ’s writable stream with error .
  333. this.#writableStream?.abort(error)
  334. // 7.4. Reject stream ’s closed promise with error .
  335. this.#closedPromise.reject(error)
  336. }
  337. }
  338. #closeUsingReason (reason) {
  339. // 1. Let code be null.
  340. let code = null
  341. // 2. Let reasonString be the empty string.
  342. let reasonString = ''
  343. // 3. If reason implements WebSocketError ,
  344. if (webidl.is.WebSocketError(reason)) {
  345. // 3.1. Set code to reason ’s closeCode .
  346. code = reason.closeCode
  347. // 3.2. Set reasonString to reason ’s reason .
  348. reasonString = reason.reason
  349. }
  350. // 4. Close the WebSocket with stream , code , and reasonString . If this throws an exception,
  351. // discard code and reasonString and close the WebSocket with stream .
  352. closeWebSocketConnection(this.#handler, code, reasonString)
  353. }
  354. // To cancel a WebSocketStream stream given reason , close using reason giving stream and reason .
  355. #cancel (reason) {
  356. this.#closeUsingReason(reason)
  357. }
  358. }
  359. Object.defineProperties(WebSocketStream.prototype, {
  360. url: kEnumerableProperty,
  361. opened: kEnumerableProperty,
  362. closed: kEnumerableProperty,
  363. close: kEnumerableProperty,
  364. [Symbol.toStringTag]: {
  365. value: 'WebSocketStream',
  366. writable: false,
  367. enumerable: false,
  368. configurable: true
  369. }
  370. })
  371. webidl.converters.WebSocketStreamOptions = webidl.dictionaryConverter([
  372. {
  373. key: 'protocols',
  374. converter: webidl.sequenceConverter(webidl.converters.USVString),
  375. defaultValue: () => []
  376. },
  377. {
  378. key: 'signal',
  379. converter: webidl.nullableConverter(webidl.converters.AbortSignal),
  380. defaultValue: () => null
  381. }
  382. ])
  383. webidl.converters.WebSocketCloseInfo = webidl.dictionaryConverter([
  384. {
  385. key: 'closeCode',
  386. converter: (V) => webidl.converters['unsigned short'](V, webidl.attributes.EnforceRange)
  387. },
  388. {
  389. key: 'reason',
  390. converter: webidl.converters.USVString,
  391. defaultValue: () => ''
  392. }
  393. ])
  394. webidl.converters.WebSocketStreamWrite = function (V) {
  395. if (typeof V === 'string') {
  396. return webidl.converters.USVString(V)
  397. }
  398. return webidl.converters.BufferSource(V)
  399. }
  400. module.exports = { WebSocketStream }