client-h2.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { pipeline } = require('node:stream')
  4. const util = require('../core/util.js')
  5. const {
  6. RequestContentLengthMismatchError,
  7. RequestAbortedError,
  8. SocketError,
  9. InformationalError,
  10. InvalidArgumentError
  11. } = require('../core/errors.js')
  12. const {
  13. kUrl,
  14. kReset,
  15. kClient,
  16. kRunning,
  17. kPending,
  18. kQueue,
  19. kPendingIdx,
  20. kRunningIdx,
  21. kError,
  22. kSocket,
  23. kStrictContentLength,
  24. kOnError,
  25. kMaxConcurrentStreams,
  26. kPingInterval,
  27. kHTTP2Session,
  28. kHTTP2InitialWindowSize,
  29. kHTTP2ConnectionWindowSize,
  30. kResume,
  31. kSize,
  32. kHTTPContext,
  33. kClosed,
  34. kBodyTimeout,
  35. kEnableConnectProtocol,
  36. kRemoteSettings,
  37. kHTTP2Stream,
  38. kHTTP2SessionState
  39. } = require('../core/symbols.js')
  40. const { channels } = require('../core/diagnostics.js')
  41. const kOpenStreams = Symbol('open streams')
  42. let extractBody
  43. /** @type {import('http2')} */
  44. let http2
  45. try {
  46. http2 = require('node:http2')
  47. } catch {
  48. // @ts-ignore
  49. http2 = { constants: {} }
  50. }
  51. const {
  52. constants: {
  53. HTTP2_HEADER_AUTHORITY,
  54. HTTP2_HEADER_METHOD,
  55. HTTP2_HEADER_PATH,
  56. HTTP2_HEADER_SCHEME,
  57. HTTP2_HEADER_CONTENT_LENGTH,
  58. HTTP2_HEADER_EXPECT,
  59. HTTP2_HEADER_STATUS,
  60. HTTP2_HEADER_PROTOCOL,
  61. NGHTTP2_REFUSED_STREAM,
  62. NGHTTP2_CANCEL
  63. }
  64. } = http2
  65. function parseH2Headers (headers) {
  66. const result = []
  67. for (const [name, value] of Object.entries(headers)) {
  68. // h2 may concat the header value by array
  69. // e.g. Set-Cookie
  70. if (Array.isArray(value)) {
  71. for (const subvalue of value) {
  72. // we need to provide each header value of header name
  73. // because the headers handler expect name-value pair
  74. result.push(Buffer.from(name), Buffer.from(subvalue))
  75. }
  76. } else {
  77. result.push(Buffer.from(name), Buffer.from(value))
  78. }
  79. }
  80. return result
  81. }
  82. function connectH2 (client, socket) {
  83. client[kSocket] = socket
  84. const http2InitialWindowSize = client[kHTTP2InitialWindowSize]
  85. const http2ConnectionWindowSize = client[kHTTP2ConnectionWindowSize]
  86. const session = http2.connect(client[kUrl], {
  87. createConnection: () => socket,
  88. peerMaxConcurrentStreams: client[kMaxConcurrentStreams],
  89. settings: {
  90. // TODO(metcoder95): add support for PUSH
  91. enablePush: false,
  92. ...(http2InitialWindowSize != null ? { initialWindowSize: http2InitialWindowSize } : null)
  93. }
  94. })
  95. client[kSocket] = socket
  96. session[kOpenStreams] = 0
  97. session[kClient] = client
  98. session[kSocket] = socket
  99. session[kHTTP2SessionState] = {
  100. ping: {
  101. interval: client[kPingInterval] === 0 ? null : setInterval(onHttp2SendPing, client[kPingInterval], session).unref()
  102. }
  103. }
  104. // We set it to true by default in a best-effort; however once connected to an H2 server
  105. // we will check if extended CONNECT protocol is supported or not
  106. // and set this value accordingly.
  107. session[kEnableConnectProtocol] = false
  108. // States whether or not we have received the remote settings from the server
  109. session[kRemoteSettings] = false
  110. // Apply connection-level flow control once connected (if supported).
  111. if (http2ConnectionWindowSize) {
  112. util.addListener(session, 'connect', applyConnectionWindowSize.bind(session, http2ConnectionWindowSize))
  113. }
  114. util.addListener(session, 'error', onHttp2SessionError)
  115. util.addListener(session, 'frameError', onHttp2FrameError)
  116. util.addListener(session, 'end', onHttp2SessionEnd)
  117. util.addListener(session, 'goaway', onHttp2SessionGoAway)
  118. util.addListener(session, 'close', onHttp2SessionClose)
  119. util.addListener(session, 'remoteSettings', onHttp2RemoteSettings)
  120. // TODO (@metcoder95): implement SETTINGS support
  121. // util.addListener(session, 'localSettings', onHttp2RemoteSettings)
  122. session.unref()
  123. client[kHTTP2Session] = session
  124. socket[kHTTP2Session] = session
  125. util.addListener(socket, 'error', onHttp2SocketError)
  126. util.addListener(socket, 'end', onHttp2SocketEnd)
  127. util.addListener(socket, 'close', onHttp2SocketClose)
  128. socket[kClosed] = false
  129. socket.on('close', onSocketClose)
  130. return {
  131. version: 'h2',
  132. defaultPipelining: Infinity,
  133. /**
  134. * @param {import('../core/request.js')} request
  135. * @returns {boolean}
  136. */
  137. write (request) {
  138. return writeH2(client, request)
  139. },
  140. /**
  141. * @returns {void}
  142. */
  143. resume () {
  144. resumeH2(client)
  145. },
  146. /**
  147. * @param {Error | null} err
  148. * @param {() => void} callback
  149. */
  150. destroy (err, callback) {
  151. if (socket[kClosed]) {
  152. queueMicrotask(callback)
  153. } else {
  154. socket.destroy(err).on('close', callback)
  155. }
  156. },
  157. /**
  158. * @type {boolean}
  159. */
  160. get destroyed () {
  161. return socket.destroyed
  162. },
  163. /**
  164. * @param {import('../core/request.js')} request
  165. * @returns {boolean}
  166. */
  167. busy (request) {
  168. if (request != null) {
  169. if (client[kRunning] > 0) {
  170. // We are already processing requests
  171. // Non-idempotent request cannot be retried.
  172. // Ensure that no other requests are inflight and
  173. // could cause failure.
  174. if (request.idempotent === false) return true
  175. // Don't dispatch an upgrade until all preceding requests have completed.
  176. // Possibly, we do not have remote settings confirmed yet.
  177. if ((request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false) return true
  178. // Request with stream or iterator body can error while other requests
  179. // are inflight and indirectly error those as well.
  180. // Ensure this doesn't happen by waiting for inflight
  181. // to complete before dispatching.
  182. // Request with stream or iterator body cannot be retried.
  183. // Ensure that no other requests are inflight and
  184. // could cause failure.
  185. if (util.bodyLength(request.body) !== 0 &&
  186. (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) return true
  187. } else {
  188. return (request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false
  189. }
  190. }
  191. return false
  192. }
  193. }
  194. }
  195. function resumeH2 (client) {
  196. const socket = client[kSocket]
  197. if (socket?.destroyed === false) {
  198. if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
  199. socket.unref()
  200. client[kHTTP2Session].unref()
  201. } else {
  202. socket.ref()
  203. client[kHTTP2Session].ref()
  204. }
  205. }
  206. }
  207. function applyConnectionWindowSize (connectionWindowSize) {
  208. try {
  209. if (typeof this.setLocalWindowSize === 'function') {
  210. this.setLocalWindowSize(connectionWindowSize)
  211. }
  212. } catch {
  213. // Best-effort only.
  214. }
  215. }
  216. function onHttp2RemoteSettings (settings) {
  217. // Fallbacks are a safe bet, remote setting will always override
  218. this[kClient][kMaxConcurrentStreams] = settings.maxConcurrentStreams ?? this[kClient][kMaxConcurrentStreams]
  219. /**
  220. * From RFC-8441
  221. * A sender MUST NOT send a SETTINGS_ENABLE_CONNECT_PROTOCOL parameter
  222. * with the value of 0 after previously sending a value of 1.
  223. */
  224. // Note: Cannot be tested in Node, it does not supports disabling the extended CONNECT protocol once enabled
  225. if (this[kRemoteSettings] === true && this[kEnableConnectProtocol] === true && settings.enableConnectProtocol === false) {
  226. const err = new InformationalError('HTTP/2: Server disabled extended CONNECT protocol against RFC-8441')
  227. this[kSocket][kError] = err
  228. this[kClient][kOnError](err)
  229. return
  230. }
  231. this[kEnableConnectProtocol] = settings.enableConnectProtocol ?? this[kEnableConnectProtocol]
  232. this[kRemoteSettings] = true
  233. this[kClient][kResume]()
  234. }
  235. function onHttp2SendPing (session) {
  236. const state = session[kHTTP2SessionState]
  237. if ((session.closed || session.destroyed) && state.ping.interval != null) {
  238. clearInterval(state.ping.interval)
  239. state.ping.interval = null
  240. return
  241. }
  242. // If no ping sent, do nothing
  243. session.ping(onPing.bind(session))
  244. function onPing (err, duration) {
  245. const client = this[kClient]
  246. const socket = this[kClient]
  247. if (err != null) {
  248. const error = new InformationalError(`HTTP/2: "PING" errored - type ${err.message}`)
  249. socket[kError] = error
  250. client[kOnError](error)
  251. } else {
  252. client.emit('ping', duration)
  253. }
  254. }
  255. }
  256. function onHttp2SessionError (err) {
  257. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  258. this[kSocket][kError] = err
  259. this[kClient][kOnError](err)
  260. }
  261. function onHttp2FrameError (type, code, id) {
  262. if (id === 0) {
  263. const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
  264. this[kSocket][kError] = err
  265. this[kClient][kOnError](err)
  266. }
  267. }
  268. function onHttp2SessionEnd () {
  269. const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
  270. this.destroy(err)
  271. util.destroy(this[kSocket], err)
  272. }
  273. /**
  274. * This is the root cause of #3011
  275. * We need to handle GOAWAY frames properly, and trigger the session close
  276. * along with the socket right away
  277. *
  278. * @this {import('http2').ClientHttp2Session}
  279. * @param {number} errorCode
  280. */
  281. function onHttp2SessionGoAway (errorCode) {
  282. // TODO(mcollina): Verify if GOAWAY implements the spec correctly:
  283. // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
  284. // Specifically, we do not verify the "valid" stream id.
  285. const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
  286. const client = this[kClient]
  287. client[kSocket] = null
  288. client[kHTTPContext] = null
  289. // this is an HTTP2 session
  290. this.close()
  291. this[kHTTP2Session] = null
  292. util.destroy(this[kSocket], err)
  293. // Fail head of pipeline.
  294. if (client[kRunningIdx] < client[kQueue].length) {
  295. const request = client[kQueue][client[kRunningIdx]]
  296. client[kQueue][client[kRunningIdx]++] = null
  297. util.errorRequest(client, request, err)
  298. client[kPendingIdx] = client[kRunningIdx]
  299. }
  300. assert(client[kRunning] === 0)
  301. client.emit('disconnect', client[kUrl], [client], err)
  302. client.emit('connectionError', client[kUrl], [client], err)
  303. client[kResume]()
  304. }
  305. function onHttp2SessionClose () {
  306. const { [kClient]: client, [kHTTP2SessionState]: state } = this
  307. const { [kSocket]: socket } = client
  308. const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
  309. client[kSocket] = null
  310. client[kHTTPContext] = null
  311. if (state.ping.interval != null) {
  312. clearInterval(state.ping.interval)
  313. state.ping.interval = null
  314. }
  315. if (client.destroyed) {
  316. assert(client[kPending] === 0)
  317. // Fail entire queue.
  318. const requests = client[kQueue].splice(client[kRunningIdx])
  319. for (let i = 0; i < requests.length; i++) {
  320. const request = requests[i]
  321. util.errorRequest(client, request, err)
  322. }
  323. }
  324. }
  325. function onHttp2SocketClose () {
  326. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  327. const client = this[kHTTP2Session][kClient]
  328. client[kSocket] = null
  329. client[kHTTPContext] = null
  330. if (this[kHTTP2Session] !== null) {
  331. this[kHTTP2Session].destroy(err)
  332. }
  333. client[kPendingIdx] = client[kRunningIdx]
  334. assert(client[kRunning] === 0)
  335. client.emit('disconnect', client[kUrl], [client], err)
  336. client[kResume]()
  337. }
  338. function onHttp2SocketError (err) {
  339. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  340. this[kError] = err
  341. this[kClient][kOnError](err)
  342. }
  343. function onHttp2SocketEnd () {
  344. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  345. }
  346. function onSocketClose () {
  347. this[kClosed] = true
  348. }
  349. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  350. function shouldSendContentLength (method) {
  351. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  352. }
  353. function writeH2 (client, request) {
  354. const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]
  355. const session = client[kHTTP2Session]
  356. const { method, path, host, upgrade, expectContinue, signal, protocol, headers: reqHeaders } = request
  357. let { body } = request
  358. if (upgrade != null && upgrade !== 'websocket') {
  359. util.errorRequest(client, request, new InvalidArgumentError(`Custom upgrade "${upgrade}" not supported over HTTP/2`))
  360. return false
  361. }
  362. const headers = {}
  363. for (let n = 0; n < reqHeaders.length; n += 2) {
  364. const key = reqHeaders[n + 0]
  365. const val = reqHeaders[n + 1]
  366. if (key === 'cookie') {
  367. if (headers[key] != null) {
  368. headers[key] = Array.isArray(headers[key]) ? (headers[key].push(val), headers[key]) : [headers[key], val]
  369. } else {
  370. headers[key] = val
  371. }
  372. continue
  373. }
  374. if (Array.isArray(val)) {
  375. for (let i = 0; i < val.length; i++) {
  376. if (headers[key]) {
  377. headers[key] += `, ${val[i]}`
  378. } else {
  379. headers[key] = val[i]
  380. }
  381. }
  382. } else if (headers[key]) {
  383. headers[key] += `, ${val}`
  384. } else {
  385. headers[key] = val
  386. }
  387. }
  388. /** @type {import('node:http2').ClientHttp2Stream} */
  389. let stream = null
  390. const { hostname, port } = client[kUrl]
  391. headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
  392. headers[HTTP2_HEADER_METHOD] = method
  393. const abort = (err) => {
  394. if (request.aborted || request.completed) {
  395. return
  396. }
  397. err = err || new RequestAbortedError()
  398. util.errorRequest(client, request, err)
  399. if (stream != null) {
  400. // Some chunks might still come after abort,
  401. // let's ignore them
  402. stream.removeAllListeners('data')
  403. // On Abort, we close the stream to send RST_STREAM frame
  404. stream.close()
  405. // We move the running index to the next request
  406. client[kOnError](err)
  407. client[kResume]()
  408. }
  409. // We do not destroy the socket as we can continue using the session
  410. // the stream gets destroyed and the session remains to create new streams
  411. util.destroy(body, err)
  412. }
  413. try {
  414. // We are already connected, streams are pending.
  415. // We can call on connect, and wait for abort
  416. request.onConnect(abort)
  417. } catch (err) {
  418. util.errorRequest(client, request, err)
  419. }
  420. if (request.aborted) {
  421. return false
  422. }
  423. if (upgrade || method === 'CONNECT') {
  424. session.ref()
  425. if (upgrade === 'websocket') {
  426. // We cannot upgrade to websocket if extended CONNECT protocol is not supported
  427. if (session[kEnableConnectProtocol] === false) {
  428. util.errorRequest(client, request, new InformationalError('HTTP/2: Extended CONNECT protocol not supported by server'))
  429. session.unref()
  430. return false
  431. }
  432. // We force the method to CONNECT
  433. // as per RFC-8441
  434. // https://datatracker.ietf.org/doc/html/rfc8441#section-4
  435. headers[HTTP2_HEADER_METHOD] = 'CONNECT'
  436. headers[HTTP2_HEADER_PROTOCOL] = 'websocket'
  437. // :path and :scheme headers must be omitted when sending CONNECT but set if extended-CONNECT
  438. headers[HTTP2_HEADER_PATH] = path
  439. if (protocol === 'ws:' || protocol === 'wss:') {
  440. headers[HTTP2_HEADER_SCHEME] = protocol === 'ws:' ? 'http' : 'https'
  441. } else {
  442. headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
  443. }
  444. stream = session.request(headers, { endStream: false, signal })
  445. stream[kHTTP2Stream] = true
  446. stream.once('response', (headers, _flags) => {
  447. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  448. request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
  449. ++session[kOpenStreams]
  450. client[kQueue][client[kRunningIdx]++] = null
  451. })
  452. stream.on('error', () => {
  453. if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) {
  454. // NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8)
  455. // We do not treat those as errors as the server might
  456. // not support websockets and refuse the stream
  457. abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
  458. }
  459. })
  460. stream.once('close', () => {
  461. session[kOpenStreams] -= 1
  462. if (session[kOpenStreams] === 0) session.unref()
  463. })
  464. stream.setTimeout(requestTimeout)
  465. return true
  466. }
  467. // TODO: consolidate once we support CONNECT properly
  468. // NOTE: We are already connected, streams are pending, first request
  469. // will create a new stream. We trigger a request to create the stream and wait until
  470. // `ready` event is triggered
  471. // We disabled endStream to allow the user to write to the stream
  472. stream = session.request(headers, { endStream: false, signal })
  473. stream[kHTTP2Stream] = true
  474. stream.on('response', headers => {
  475. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  476. request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
  477. ++session[kOpenStreams]
  478. client[kQueue][client[kRunningIdx]++] = null
  479. })
  480. stream.once('close', () => {
  481. session[kOpenStreams] -= 1
  482. if (session[kOpenStreams] === 0) session.unref()
  483. })
  484. stream.setTimeout(requestTimeout)
  485. return true
  486. }
  487. // https://tools.ietf.org/html/rfc7540#section-8.3
  488. // :path and :scheme headers must be omitted when sending CONNECT
  489. headers[HTTP2_HEADER_PATH] = path
  490. headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
  491. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  492. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  493. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  494. // Sending a payload body on a request that does not
  495. // expect it can cause undefined behavior on some
  496. // servers and corrupt connection state. Do not
  497. // re-use the connection for further requests.
  498. const expectsPayload = (
  499. method === 'PUT' ||
  500. method === 'POST' ||
  501. method === 'PATCH'
  502. )
  503. if (body && typeof body.read === 'function') {
  504. // Try to read EOF in order to get length.
  505. body.read(0)
  506. }
  507. let contentLength = util.bodyLength(body)
  508. if (util.isFormDataLike(body)) {
  509. extractBody ??= require('../web/fetch/body.js').extractBody
  510. const [bodyStream, contentType] = extractBody(body)
  511. headers['content-type'] = contentType
  512. body = bodyStream.stream
  513. contentLength = bodyStream.length
  514. }
  515. if (contentLength == null) {
  516. contentLength = request.contentLength
  517. }
  518. if (!expectsPayload) {
  519. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  520. // A user agent SHOULD NOT send a Content-Length header field when
  521. // the request message does not contain a payload body and the method
  522. // semantics do not anticipate such a body.
  523. // And for methods that don't expect a payload, omit Content-Length.
  524. contentLength = null
  525. }
  526. // https://github.com/nodejs/undici/issues/2046
  527. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  528. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
  529. if (client[kStrictContentLength]) {
  530. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  531. return false
  532. }
  533. process.emitWarning(new RequestContentLengthMismatchError())
  534. }
  535. if (contentLength != null) {
  536. assert(body || contentLength === 0, 'no body must not have content length')
  537. headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
  538. }
  539. session.ref()
  540. if (channels.sendHeaders.hasSubscribers) {
  541. let header = ''
  542. for (const key in headers) {
  543. header += `${key}: ${headers[key]}\r\n`
  544. }
  545. channels.sendHeaders.publish({ request, headers: header, socket: session[kSocket] })
  546. }
  547. // TODO(metcoder95): add support for sending trailers
  548. const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
  549. if (expectContinue) {
  550. headers[HTTP2_HEADER_EXPECT] = '100-continue'
  551. stream = session.request(headers, { endStream: shouldEndStream, signal })
  552. stream[kHTTP2Stream] = true
  553. stream.once('continue', writeBodyH2)
  554. } else {
  555. stream = session.request(headers, {
  556. endStream: shouldEndStream,
  557. signal
  558. })
  559. stream[kHTTP2Stream] = true
  560. writeBodyH2()
  561. }
  562. // Increment counter as we have new streams open
  563. ++session[kOpenStreams]
  564. stream.setTimeout(requestTimeout)
  565. // Track whether we received a response (headers)
  566. let responseReceived = false
  567. stream.once('response', headers => {
  568. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  569. request.onResponseStarted()
  570. responseReceived = true
  571. // Due to the stream nature, it is possible we face a race condition
  572. // where the stream has been assigned, but the request has been aborted
  573. // the request remains in-flight and headers hasn't been received yet
  574. // for those scenarios, best effort is to destroy the stream immediately
  575. // as there's no value to keep it open.
  576. if (request.aborted) {
  577. stream.removeAllListeners('data')
  578. return
  579. }
  580. if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
  581. stream.pause()
  582. }
  583. })
  584. stream.on('data', (chunk) => {
  585. if (request.onData(chunk) === false) {
  586. stream.pause()
  587. }
  588. })
  589. stream.once('end', () => {
  590. stream.removeAllListeners('data')
  591. // If we received a response, this is a normal completion
  592. if (responseReceived) {
  593. if (!request.aborted && !request.completed) {
  594. request.onComplete({})
  595. }
  596. client[kQueue][client[kRunningIdx]++] = null
  597. client[kResume]()
  598. } else {
  599. // Stream ended without receiving a response - this is an error
  600. // (e.g., server destroyed the stream before sending headers)
  601. abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
  602. client[kQueue][client[kRunningIdx]++] = null
  603. client[kPendingIdx] = client[kRunningIdx]
  604. client[kResume]()
  605. }
  606. })
  607. stream.once('close', () => {
  608. stream.removeAllListeners('data')
  609. session[kOpenStreams] -= 1
  610. if (session[kOpenStreams] === 0) {
  611. session.unref()
  612. }
  613. })
  614. stream.once('error', function (err) {
  615. stream.removeAllListeners('data')
  616. abort(err)
  617. })
  618. stream.once('frameError', (type, code) => {
  619. stream.removeAllListeners('data')
  620. abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
  621. })
  622. stream.on('aborted', () => {
  623. stream.removeAllListeners('data')
  624. })
  625. stream.on('timeout', () => {
  626. const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`)
  627. stream.removeAllListeners('data')
  628. session[kOpenStreams] -= 1
  629. if (session[kOpenStreams] === 0) {
  630. session.unref()
  631. }
  632. abort(err)
  633. })
  634. stream.once('trailers', trailers => {
  635. if (request.aborted || request.completed) {
  636. return
  637. }
  638. request.onComplete(trailers)
  639. })
  640. return true
  641. function writeBodyH2 () {
  642. if (!body || contentLength === 0) {
  643. writeBuffer(
  644. abort,
  645. stream,
  646. null,
  647. client,
  648. request,
  649. client[kSocket],
  650. contentLength,
  651. expectsPayload
  652. )
  653. } else if (util.isBuffer(body)) {
  654. writeBuffer(
  655. abort,
  656. stream,
  657. body,
  658. client,
  659. request,
  660. client[kSocket],
  661. contentLength,
  662. expectsPayload
  663. )
  664. } else if (util.isBlobLike(body)) {
  665. if (typeof body.stream === 'function') {
  666. writeIterable(
  667. abort,
  668. stream,
  669. body.stream(),
  670. client,
  671. request,
  672. client[kSocket],
  673. contentLength,
  674. expectsPayload
  675. )
  676. } else {
  677. writeBlob(
  678. abort,
  679. stream,
  680. body,
  681. client,
  682. request,
  683. client[kSocket],
  684. contentLength,
  685. expectsPayload
  686. )
  687. }
  688. } else if (util.isStream(body)) {
  689. writeStream(
  690. abort,
  691. client[kSocket],
  692. expectsPayload,
  693. stream,
  694. body,
  695. client,
  696. request,
  697. contentLength
  698. )
  699. } else if (util.isIterable(body)) {
  700. writeIterable(
  701. abort,
  702. stream,
  703. body,
  704. client,
  705. request,
  706. client[kSocket],
  707. contentLength,
  708. expectsPayload
  709. )
  710. } else {
  711. assert(false)
  712. }
  713. }
  714. }
  715. function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  716. try {
  717. if (body != null && util.isBuffer(body)) {
  718. assert(contentLength === body.byteLength, 'buffer body must have content length')
  719. h2stream.cork()
  720. h2stream.write(body)
  721. h2stream.uncork()
  722. h2stream.end()
  723. request.onBodySent(body)
  724. }
  725. if (!expectsPayload) {
  726. socket[kReset] = true
  727. }
  728. request.onRequestSent()
  729. client[kResume]()
  730. } catch (error) {
  731. abort(error)
  732. }
  733. }
  734. function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
  735. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  736. // For HTTP/2, is enough to pipe the stream
  737. const pipe = pipeline(
  738. body,
  739. h2stream,
  740. (err) => {
  741. if (err) {
  742. util.destroy(pipe, err)
  743. abort(err)
  744. } else {
  745. util.removeAllListeners(pipe)
  746. request.onRequestSent()
  747. if (!expectsPayload) {
  748. socket[kReset] = true
  749. }
  750. client[kResume]()
  751. }
  752. }
  753. )
  754. util.addListener(pipe, 'data', onPipeData)
  755. function onPipeData (chunk) {
  756. request.onBodySent(chunk)
  757. }
  758. }
  759. async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  760. assert(contentLength === body.size, 'blob body must have content length')
  761. try {
  762. if (contentLength != null && contentLength !== body.size) {
  763. throw new RequestContentLengthMismatchError()
  764. }
  765. const buffer = Buffer.from(await body.arrayBuffer())
  766. h2stream.cork()
  767. h2stream.write(buffer)
  768. h2stream.uncork()
  769. h2stream.end()
  770. request.onBodySent(buffer)
  771. request.onRequestSent()
  772. if (!expectsPayload) {
  773. socket[kReset] = true
  774. }
  775. client[kResume]()
  776. } catch (err) {
  777. abort(err)
  778. }
  779. }
  780. async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  781. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  782. let callback = null
  783. function onDrain () {
  784. if (callback) {
  785. const cb = callback
  786. callback = null
  787. cb()
  788. }
  789. }
  790. const waitForDrain = () => new Promise((resolve, reject) => {
  791. assert(callback === null)
  792. if (socket[kError]) {
  793. reject(socket[kError])
  794. } else {
  795. callback = resolve
  796. }
  797. })
  798. h2stream
  799. .on('close', onDrain)
  800. .on('drain', onDrain)
  801. try {
  802. // It's up to the user to somehow abort the async iterable.
  803. for await (const chunk of body) {
  804. if (socket[kError]) {
  805. throw socket[kError]
  806. }
  807. const res = h2stream.write(chunk)
  808. request.onBodySent(chunk)
  809. if (!res) {
  810. await waitForDrain()
  811. }
  812. }
  813. h2stream.end()
  814. request.onRequestSent()
  815. if (!expectsPayload) {
  816. socket[kReset] = true
  817. }
  818. client[kResume]()
  819. } catch (err) {
  820. abort(err)
  821. } finally {
  822. h2stream
  823. .off('close', onDrain)
  824. .off('drain', onDrain)
  825. }
  826. }
  827. module.exports = connectH2