| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- 'use strict'
- /**
- * @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandler} DispatchHandler
- */
- /**
- * Handler that buffers response data and notifies multiple waiting handlers.
- * Used for request deduplication.
- *
- * @implements {DispatchHandler}
- */
- class DeduplicationHandler {
- /**
- * @type {DispatchHandler}
- */
- #primaryHandler
- /**
- * @type {DispatchHandler[]}
- */
- #waitingHandlers = []
- /**
- * @type {Buffer[]}
- */
- #chunks = []
- /**
- * @type {number}
- */
- #statusCode = 0
- /**
- * @type {Record<string, string | string[]>}
- */
- #headers = {}
- /**
- * @type {string}
- */
- #statusMessage = ''
- /**
- * @type {boolean}
- */
- #aborted = false
- /**
- * @type {import('../../types/dispatcher.d.ts').default.DispatchController | null}
- */
- #controller = null
- /**
- * @type {(() => void) | null}
- */
- #onComplete = null
- /**
- * @param {DispatchHandler} primaryHandler The primary handler
- * @param {() => void} onComplete Callback when request completes
- */
- constructor (primaryHandler, onComplete) {
- this.#primaryHandler = primaryHandler
- this.#onComplete = onComplete
- }
- /**
- * Add a waiting handler that will receive the buffered response
- * @param {DispatchHandler} handler
- */
- addWaitingHandler (handler) {
- this.#waitingHandlers.push(handler)
- }
- /**
- * @param {() => void} abort
- * @param {any} context
- */
- onRequestStart (controller, context) {
- this.#controller = controller
- this.#primaryHandler.onRequestStart?.(controller, context)
- }
- /**
- * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
- * @param {number} statusCode
- * @param {import('../../types/header.d.ts').IncomingHttpHeaders} headers
- * @param {Socket} socket
- */
- onRequestUpgrade (controller, statusCode, headers, socket) {
- this.#primaryHandler.onRequestUpgrade?.(controller, statusCode, headers, socket)
- }
- /**
- * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
- * @param {number} statusCode
- * @param {Record<string, string | string[]>} headers
- * @param {string} statusMessage
- */
- onResponseStart (controller, statusCode, headers, statusMessage) {
- this.#statusCode = statusCode
- this.#headers = headers
- this.#statusMessage = statusMessage
- this.#primaryHandler.onResponseStart?.(controller, statusCode, headers, statusMessage)
- }
- /**
- * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
- * @param {Buffer} chunk
- */
- onResponseData (controller, chunk) {
- // Buffer the chunk for waiting handlers
- this.#chunks.push(Buffer.from(chunk))
- this.#primaryHandler.onResponseData?.(controller, chunk)
- }
- /**
- * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
- * @param {object} trailers
- */
- onResponseEnd (controller, trailers) {
- this.#primaryHandler.onResponseEnd?.(controller, trailers)
- this.#notifyWaitingHandlers()
- this.#onComplete?.()
- }
- /**
- * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
- * @param {Error} err
- */
- onResponseError (controller, err) {
- this.#aborted = true
- this.#primaryHandler.onResponseError?.(controller, err)
- this.#notifyWaitingHandlersError(err)
- this.#onComplete?.()
- }
- /**
- * Notify all waiting handlers with the buffered response
- */
- #notifyWaitingHandlers () {
- const body = Buffer.concat(this.#chunks)
- for (const handler of this.#waitingHandlers) {
- // Create a simple controller for each waiting handler
- const waitingController = {
- resume () {},
- pause () {},
- get paused () { return false },
- get aborted () { return false },
- get reason () { return null },
- abort () {}
- }
- try {
- handler.onRequestStart?.(waitingController, null)
- if (waitingController.aborted) {
- continue
- }
- handler.onResponseStart?.(
- waitingController,
- this.#statusCode,
- this.#headers,
- this.#statusMessage
- )
- if (waitingController.aborted) {
- continue
- }
- if (body.length > 0) {
- handler.onResponseData?.(waitingController, body)
- }
- handler.onResponseEnd?.(waitingController, {})
- } catch {
- // Ignore errors from waiting handlers
- }
- }
- this.#waitingHandlers = []
- this.#chunks = []
- }
- /**
- * Notify all waiting handlers of an error
- * @param {Error} err
- */
- #notifyWaitingHandlersError (err) {
- for (const handler of this.#waitingHandlers) {
- const waitingController = {
- resume () {},
- pause () {},
- get paused () { return false },
- get aborted () { return true },
- get reason () { return err },
- abort () {}
- }
- try {
- handler.onRequestStart?.(waitingController, null)
- handler.onResponseError?.(waitingController, err)
- } catch {
- // Ignore errors from waiting handlers
- }
- }
- this.#waitingHandlers = []
- this.#chunks = []
- }
- }
- module.exports = DeduplicationHandler
|