deduplication-handler.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. 'use strict'
  2. /**
  3. * @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandler} DispatchHandler
  4. */
  5. /**
  6. * Handler that buffers response data and notifies multiple waiting handlers.
  7. * Used for request deduplication.
  8. *
  9. * @implements {DispatchHandler}
  10. */
  11. class DeduplicationHandler {
  12. /**
  13. * @type {DispatchHandler}
  14. */
  15. #primaryHandler
  16. /**
  17. * @type {DispatchHandler[]}
  18. */
  19. #waitingHandlers = []
  20. /**
  21. * @type {Buffer[]}
  22. */
  23. #chunks = []
  24. /**
  25. * @type {number}
  26. */
  27. #statusCode = 0
  28. /**
  29. * @type {Record<string, string | string[]>}
  30. */
  31. #headers = {}
  32. /**
  33. * @type {string}
  34. */
  35. #statusMessage = ''
  36. /**
  37. * @type {boolean}
  38. */
  39. #aborted = false
  40. /**
  41. * @type {import('../../types/dispatcher.d.ts').default.DispatchController | null}
  42. */
  43. #controller = null
  44. /**
  45. * @type {(() => void) | null}
  46. */
  47. #onComplete = null
  48. /**
  49. * @param {DispatchHandler} primaryHandler The primary handler
  50. * @param {() => void} onComplete Callback when request completes
  51. */
  52. constructor (primaryHandler, onComplete) {
  53. this.#primaryHandler = primaryHandler
  54. this.#onComplete = onComplete
  55. }
  56. /**
  57. * Add a waiting handler that will receive the buffered response
  58. * @param {DispatchHandler} handler
  59. */
  60. addWaitingHandler (handler) {
  61. this.#waitingHandlers.push(handler)
  62. }
  63. /**
  64. * @param {() => void} abort
  65. * @param {any} context
  66. */
  67. onRequestStart (controller, context) {
  68. this.#controller = controller
  69. this.#primaryHandler.onRequestStart?.(controller, context)
  70. }
  71. /**
  72. * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
  73. * @param {number} statusCode
  74. * @param {import('../../types/header.d.ts').IncomingHttpHeaders} headers
  75. * @param {Socket} socket
  76. */
  77. onRequestUpgrade (controller, statusCode, headers, socket) {
  78. this.#primaryHandler.onRequestUpgrade?.(controller, statusCode, headers, socket)
  79. }
  80. /**
  81. * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
  82. * @param {number} statusCode
  83. * @param {Record<string, string | string[]>} headers
  84. * @param {string} statusMessage
  85. */
  86. onResponseStart (controller, statusCode, headers, statusMessage) {
  87. this.#statusCode = statusCode
  88. this.#headers = headers
  89. this.#statusMessage = statusMessage
  90. this.#primaryHandler.onResponseStart?.(controller, statusCode, headers, statusMessage)
  91. }
  92. /**
  93. * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
  94. * @param {Buffer} chunk
  95. */
  96. onResponseData (controller, chunk) {
  97. // Buffer the chunk for waiting handlers
  98. this.#chunks.push(Buffer.from(chunk))
  99. this.#primaryHandler.onResponseData?.(controller, chunk)
  100. }
  101. /**
  102. * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
  103. * @param {object} trailers
  104. */
  105. onResponseEnd (controller, trailers) {
  106. this.#primaryHandler.onResponseEnd?.(controller, trailers)
  107. this.#notifyWaitingHandlers()
  108. this.#onComplete?.()
  109. }
  110. /**
  111. * @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
  112. * @param {Error} err
  113. */
  114. onResponseError (controller, err) {
  115. this.#aborted = true
  116. this.#primaryHandler.onResponseError?.(controller, err)
  117. this.#notifyWaitingHandlersError(err)
  118. this.#onComplete?.()
  119. }
  120. /**
  121. * Notify all waiting handlers with the buffered response
  122. */
  123. #notifyWaitingHandlers () {
  124. const body = Buffer.concat(this.#chunks)
  125. for (const handler of this.#waitingHandlers) {
  126. // Create a simple controller for each waiting handler
  127. const waitingController = {
  128. resume () {},
  129. pause () {},
  130. get paused () { return false },
  131. get aborted () { return false },
  132. get reason () { return null },
  133. abort () {}
  134. }
  135. try {
  136. handler.onRequestStart?.(waitingController, null)
  137. if (waitingController.aborted) {
  138. continue
  139. }
  140. handler.onResponseStart?.(
  141. waitingController,
  142. this.#statusCode,
  143. this.#headers,
  144. this.#statusMessage
  145. )
  146. if (waitingController.aborted) {
  147. continue
  148. }
  149. if (body.length > 0) {
  150. handler.onResponseData?.(waitingController, body)
  151. }
  152. handler.onResponseEnd?.(waitingController, {})
  153. } catch {
  154. // Ignore errors from waiting handlers
  155. }
  156. }
  157. this.#waitingHandlers = []
  158. this.#chunks = []
  159. }
  160. /**
  161. * Notify all waiting handlers of an error
  162. * @param {Error} err
  163. */
  164. #notifyWaitingHandlersError (err) {
  165. for (const handler of this.#waitingHandlers) {
  166. const waitingController = {
  167. resume () {},
  168. pause () {},
  169. get paused () { return false },
  170. get aborted () { return true },
  171. get reason () { return err },
  172. abort () {}
  173. }
  174. try {
  175. handler.onRequestStart?.(waitingController, null)
  176. handler.onResponseError?.(waitingController, err)
  177. } catch {
  178. // Ignore errors from waiting handlers
  179. }
  180. }
  181. this.#waitingHandlers = []
  182. this.#chunks = []
  183. }
  184. }
  185. module.exports = DeduplicationHandler