readable.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { Readable } = require('node:stream')
  4. const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
  5. const util = require('../core/util')
  6. const { ReadableStreamFrom } = require('../core/util')
  7. const kConsume = Symbol('kConsume')
  8. const kReading = Symbol('kReading')
  9. const kBody = Symbol('kBody')
  10. const kAbort = Symbol('kAbort')
  11. const kContentType = Symbol('kContentType')
  12. const kContentLength = Symbol('kContentLength')
  13. const kUsed = Symbol('kUsed')
  14. const kBytesRead = Symbol('kBytesRead')
  15. const noop = () => {}
  16. /**
  17. * @class
  18. * @extends {Readable}
  19. * @see https://fetch.spec.whatwg.org/#body
  20. */
  21. class BodyReadable extends Readable {
  22. /**
  23. * @param {object} opts
  24. * @param {(this: Readable, size: number) => void} opts.resume
  25. * @param {() => (void | null)} opts.abort
  26. * @param {string} [opts.contentType = '']
  27. * @param {number} [opts.contentLength]
  28. * @param {number} [opts.highWaterMark = 64 * 1024]
  29. */
  30. constructor ({
  31. resume,
  32. abort,
  33. contentType = '',
  34. contentLength,
  35. highWaterMark = 64 * 1024 // Same as nodejs fs streams.
  36. }) {
  37. super({
  38. autoDestroy: true,
  39. read: resume,
  40. highWaterMark
  41. })
  42. this._readableState.dataEmitted = false
  43. this[kAbort] = abort
  44. /** @type {Consume | null} */
  45. this[kConsume] = null
  46. /** @type {number} */
  47. this[kBytesRead] = 0
  48. /** @type {ReadableStream|null} */
  49. this[kBody] = null
  50. /** @type {boolean} */
  51. this[kUsed] = false
  52. /** @type {string} */
  53. this[kContentType] = contentType
  54. /** @type {number|null} */
  55. this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null
  56. /**
  57. * Is stream being consumed through Readable API?
  58. * This is an optimization so that we avoid checking
  59. * for 'data' and 'readable' listeners in the hot path
  60. * inside push().
  61. *
  62. * @type {boolean}
  63. */
  64. this[kReading] = false
  65. }
  66. /**
  67. * @param {Error|null} err
  68. * @param {(error:(Error|null)) => void} callback
  69. * @returns {void}
  70. */
  71. _destroy (err, callback) {
  72. if (!err && !this._readableState.endEmitted) {
  73. err = new RequestAbortedError()
  74. }
  75. if (err) {
  76. this[kAbort]()
  77. }
  78. // Workaround for Node "bug". If the stream is destroyed in same
  79. // tick as it is created, then a user who is waiting for a
  80. // promise (i.e micro tick) for installing an 'error' listener will
  81. // never get a chance and will always encounter an unhandled exception.
  82. if (!this[kUsed]) {
  83. setImmediate(callback, err)
  84. } else {
  85. callback(err)
  86. }
  87. }
  88. /**
  89. * @param {string|symbol} event
  90. * @param {(...args: any[]) => void} listener
  91. * @returns {this}
  92. */
  93. on (event, listener) {
  94. if (event === 'data' || event === 'readable') {
  95. this[kReading] = true
  96. this[kUsed] = true
  97. }
  98. return super.on(event, listener)
  99. }
  100. /**
  101. * @param {string|symbol} event
  102. * @param {(...args: any[]) => void} listener
  103. * @returns {this}
  104. */
  105. addListener (event, listener) {
  106. return this.on(event, listener)
  107. }
  108. /**
  109. * @param {string|symbol} event
  110. * @param {(...args: any[]) => void} listener
  111. * @returns {this}
  112. */
  113. off (event, listener) {
  114. const ret = super.off(event, listener)
  115. if (event === 'data' || event === 'readable') {
  116. this[kReading] = (
  117. this.listenerCount('data') > 0 ||
  118. this.listenerCount('readable') > 0
  119. )
  120. }
  121. return ret
  122. }
  123. /**
  124. * @param {string|symbol} event
  125. * @param {(...args: any[]) => void} listener
  126. * @returns {this}
  127. */
  128. removeListener (event, listener) {
  129. return this.off(event, listener)
  130. }
  131. /**
  132. * @param {Buffer|null} chunk
  133. * @returns {boolean}
  134. */
  135. push (chunk) {
  136. if (chunk) {
  137. this[kBytesRead] += chunk.length
  138. if (this[kConsume]) {
  139. consumePush(this[kConsume], chunk)
  140. return this[kReading] ? super.push(chunk) : true
  141. }
  142. }
  143. return super.push(chunk)
  144. }
  145. /**
  146. * Consumes and returns the body as a string.
  147. *
  148. * @see https://fetch.spec.whatwg.org/#dom-body-text
  149. * @returns {Promise<string>}
  150. */
  151. text () {
  152. return consume(this, 'text')
  153. }
  154. /**
  155. * Consumes and returns the body as a JavaScript Object.
  156. *
  157. * @see https://fetch.spec.whatwg.org/#dom-body-json
  158. * @returns {Promise<unknown>}
  159. */
  160. json () {
  161. return consume(this, 'json')
  162. }
  163. /**
  164. * Consumes and returns the body as a Blob
  165. *
  166. * @see https://fetch.spec.whatwg.org/#dom-body-blob
  167. * @returns {Promise<Blob>}
  168. */
  169. blob () {
  170. return consume(this, 'blob')
  171. }
  172. /**
  173. * Consumes and returns the body as an Uint8Array.
  174. *
  175. * @see https://fetch.spec.whatwg.org/#dom-body-bytes
  176. * @returns {Promise<Uint8Array>}
  177. */
  178. bytes () {
  179. return consume(this, 'bytes')
  180. }
  181. /**
  182. * Consumes and returns the body as an ArrayBuffer.
  183. *
  184. * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
  185. * @returns {Promise<ArrayBuffer>}
  186. */
  187. arrayBuffer () {
  188. return consume(this, 'arrayBuffer')
  189. }
  190. /**
  191. * Not implemented
  192. *
  193. * @see https://fetch.spec.whatwg.org/#dom-body-formdata
  194. * @throws {NotSupportedError}
  195. */
  196. async formData () {
  197. // TODO: Implement.
  198. throw new NotSupportedError()
  199. }
  200. /**
  201. * Returns true if the body is not null and the body has been consumed.
  202. * Otherwise, returns false.
  203. *
  204. * @see https://fetch.spec.whatwg.org/#dom-body-bodyused
  205. * @readonly
  206. * @returns {boolean}
  207. */
  208. get bodyUsed () {
  209. return util.isDisturbed(this)
  210. }
  211. /**
  212. * @see https://fetch.spec.whatwg.org/#dom-body-body
  213. * @readonly
  214. * @returns {ReadableStream}
  215. */
  216. get body () {
  217. if (!this[kBody]) {
  218. this[kBody] = ReadableStreamFrom(this)
  219. if (this[kConsume]) {
  220. // TODO: Is this the best way to force a lock?
  221. this[kBody].getReader() // Ensure stream is locked.
  222. assert(this[kBody].locked)
  223. }
  224. }
  225. return this[kBody]
  226. }
  227. /**
  228. * Dumps the response body by reading `limit` number of bytes.
  229. * @param {object} opts
  230. * @param {number} [opts.limit = 131072] Number of bytes to read.
  231. * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
  232. * @returns {Promise<null>}
  233. */
  234. dump (opts) {
  235. const signal = opts?.signal
  236. if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
  237. return Promise.reject(new InvalidArgumentError('signal must be an AbortSignal'))
  238. }
  239. const limit = opts?.limit && Number.isFinite(opts.limit)
  240. ? opts.limit
  241. : 128 * 1024
  242. if (signal?.aborted) {
  243. return Promise.reject(signal.reason ?? new AbortError())
  244. }
  245. if (this._readableState.closeEmitted) {
  246. return Promise.resolve(null)
  247. }
  248. return new Promise((resolve, reject) => {
  249. if (
  250. (this[kContentLength] && (this[kContentLength] > limit)) ||
  251. this[kBytesRead] > limit
  252. ) {
  253. this.destroy(new AbortError())
  254. }
  255. if (signal) {
  256. const onAbort = () => {
  257. this.destroy(signal.reason ?? new AbortError())
  258. }
  259. signal.addEventListener('abort', onAbort)
  260. this
  261. .on('close', function () {
  262. signal.removeEventListener('abort', onAbort)
  263. if (signal.aborted) {
  264. reject(signal.reason ?? new AbortError())
  265. } else {
  266. resolve(null)
  267. }
  268. })
  269. } else {
  270. this.on('close', resolve)
  271. }
  272. this
  273. .on('error', noop)
  274. .on('data', () => {
  275. if (this[kBytesRead] > limit) {
  276. this.destroy()
  277. }
  278. })
  279. .resume()
  280. })
  281. }
  282. /**
  283. * @param {BufferEncoding} encoding
  284. * @returns {this}
  285. */
  286. setEncoding (encoding) {
  287. if (Buffer.isEncoding(encoding)) {
  288. this._readableState.encoding = encoding
  289. }
  290. return this
  291. }
  292. }
  293. /**
  294. * @see https://streams.spec.whatwg.org/#readablestream-locked
  295. * @param {BodyReadable} bodyReadable
  296. * @returns {boolean}
  297. */
  298. function isLocked (bodyReadable) {
  299. // Consume is an implicit lock.
  300. return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
  301. }
  302. /**
  303. * @see https://fetch.spec.whatwg.org/#body-unusable
  304. * @param {BodyReadable} bodyReadable
  305. * @returns {boolean}
  306. */
  307. function isUnusable (bodyReadable) {
  308. return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
  309. }
  310. /**
  311. * @typedef {'text' | 'json' | 'blob' | 'bytes' | 'arrayBuffer'} ConsumeType
  312. */
  313. /**
  314. * @template {ConsumeType} T
  315. * @typedef {T extends 'text' ? string :
  316. * T extends 'json' ? unknown :
  317. * T extends 'blob' ? Blob :
  318. * T extends 'arrayBuffer' ? ArrayBuffer :
  319. * T extends 'bytes' ? Uint8Array :
  320. * never
  321. * } ConsumeReturnType
  322. */
  323. /**
  324. * @typedef {object} Consume
  325. * @property {ConsumeType} type
  326. * @property {BodyReadable} stream
  327. * @property {((value?: any) => void)} resolve
  328. * @property {((err: Error) => void)} reject
  329. * @property {number} length
  330. * @property {Buffer[]} body
  331. */
  332. /**
  333. * @template {ConsumeType} T
  334. * @param {BodyReadable} stream
  335. * @param {T} type
  336. * @returns {Promise<ConsumeReturnType<T>>}
  337. */
  338. function consume (stream, type) {
  339. assert(!stream[kConsume])
  340. return new Promise((resolve, reject) => {
  341. if (isUnusable(stream)) {
  342. const rState = stream._readableState
  343. if (rState.destroyed && rState.closeEmitted === false) {
  344. stream
  345. .on('error', reject)
  346. .on('close', () => {
  347. reject(new TypeError('unusable'))
  348. })
  349. } else {
  350. reject(rState.errored ?? new TypeError('unusable'))
  351. }
  352. } else {
  353. queueMicrotask(() => {
  354. stream[kConsume] = {
  355. type,
  356. stream,
  357. resolve,
  358. reject,
  359. length: 0,
  360. body: []
  361. }
  362. stream
  363. .on('error', function (err) {
  364. consumeFinish(this[kConsume], err)
  365. })
  366. .on('close', function () {
  367. if (this[kConsume].body !== null) {
  368. consumeFinish(this[kConsume], new RequestAbortedError())
  369. }
  370. })
  371. consumeStart(stream[kConsume])
  372. })
  373. }
  374. })
  375. }
  376. /**
  377. * @param {Consume} consume
  378. * @returns {void}
  379. */
  380. function consumeStart (consume) {
  381. if (consume.body === null) {
  382. return
  383. }
  384. const { _readableState: state } = consume.stream
  385. if (state.bufferIndex) {
  386. const start = state.bufferIndex
  387. const end = state.buffer.length
  388. for (let n = start; n < end; n++) {
  389. consumePush(consume, state.buffer[n])
  390. }
  391. } else {
  392. for (const chunk of state.buffer) {
  393. consumePush(consume, chunk)
  394. }
  395. }
  396. if (state.endEmitted) {
  397. consumeEnd(this[kConsume], this._readableState.encoding)
  398. } else {
  399. consume.stream.on('end', function () {
  400. consumeEnd(this[kConsume], this._readableState.encoding)
  401. })
  402. }
  403. consume.stream.resume()
  404. while (consume.stream.read() != null) {
  405. // Loop
  406. }
  407. }
  408. /**
  409. * @param {Buffer[]} chunks
  410. * @param {number} length
  411. * @param {BufferEncoding} [encoding='utf8']
  412. * @returns {string}
  413. */
  414. function chunksDecode (chunks, length, encoding) {
  415. if (chunks.length === 0 || length === 0) {
  416. return ''
  417. }
  418. const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length)
  419. const bufferLength = buffer.length
  420. // Skip BOM.
  421. const start =
  422. bufferLength > 2 &&
  423. buffer[0] === 0xef &&
  424. buffer[1] === 0xbb &&
  425. buffer[2] === 0xbf
  426. ? 3
  427. : 0
  428. if (!encoding || encoding === 'utf8' || encoding === 'utf-8') {
  429. return buffer.utf8Slice(start, bufferLength)
  430. } else {
  431. return buffer.subarray(start, bufferLength).toString(encoding)
  432. }
  433. }
  434. /**
  435. * @param {Buffer[]} chunks
  436. * @param {number} length
  437. * @returns {Uint8Array}
  438. */
  439. function chunksConcat (chunks, length) {
  440. if (chunks.length === 0 || length === 0) {
  441. return new Uint8Array(0)
  442. }
  443. if (chunks.length === 1) {
  444. // fast-path
  445. return new Uint8Array(chunks[0])
  446. }
  447. const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer)
  448. let offset = 0
  449. for (let i = 0; i < chunks.length; ++i) {
  450. const chunk = chunks[i]
  451. buffer.set(chunk, offset)
  452. offset += chunk.length
  453. }
  454. return buffer
  455. }
  456. /**
  457. * @param {Consume} consume
  458. * @param {BufferEncoding} encoding
  459. * @returns {void}
  460. */
  461. function consumeEnd (consume, encoding) {
  462. const { type, body, resolve, stream, length } = consume
  463. try {
  464. if (type === 'text') {
  465. resolve(chunksDecode(body, length, encoding))
  466. } else if (type === 'json') {
  467. resolve(JSON.parse(chunksDecode(body, length, encoding)))
  468. } else if (type === 'arrayBuffer') {
  469. resolve(chunksConcat(body, length).buffer)
  470. } else if (type === 'blob') {
  471. resolve(new Blob(body, { type: stream[kContentType] }))
  472. } else if (type === 'bytes') {
  473. resolve(chunksConcat(body, length))
  474. }
  475. consumeFinish(consume)
  476. } catch (err) {
  477. stream.destroy(err)
  478. }
  479. }
  480. /**
  481. * @param {Consume} consume
  482. * @param {Buffer} chunk
  483. * @returns {void}
  484. */
  485. function consumePush (consume, chunk) {
  486. consume.length += chunk.length
  487. consume.body.push(chunk)
  488. }
  489. /**
  490. * @param {Consume} consume
  491. * @param {Error} [err]
  492. * @returns {void}
  493. */
  494. function consumeFinish (consume, err) {
  495. if (consume.body === null) {
  496. return
  497. }
  498. if (err) {
  499. consume.reject(err)
  500. } else {
  501. consume.resolve()
  502. }
  503. // Reset the consume object to allow for garbage collection.
  504. consume.type = null
  505. consume.stream = null
  506. consume.resolve = null
  507. consume.reject = null
  508. consume.length = 0
  509. consume.body = null
  510. }
  511. module.exports = {
  512. Readable: BodyReadable,
  513. chunksDecode
  514. }