client.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const net = require('node:net')
  4. const http = require('node:http')
  5. const util = require('../core/util.js')
  6. const { ClientStats } = require('../util/stats.js')
  7. const { channels } = require('../core/diagnostics.js')
  8. const Request = require('../core/request.js')
  9. const DispatcherBase = require('./dispatcher-base')
  10. const {
  11. InvalidArgumentError,
  12. InformationalError,
  13. ClientDestroyedError
  14. } = require('../core/errors.js')
  15. const buildConnector = require('../core/connect.js')
  16. const {
  17. kUrl,
  18. kServerName,
  19. kClient,
  20. kBusy,
  21. kConnect,
  22. kResuming,
  23. kRunning,
  24. kPending,
  25. kSize,
  26. kQueue,
  27. kConnected,
  28. kConnecting,
  29. kNeedDrain,
  30. kKeepAliveDefaultTimeout,
  31. kHostHeader,
  32. kPendingIdx,
  33. kRunningIdx,
  34. kError,
  35. kPipelining,
  36. kKeepAliveTimeoutValue,
  37. kMaxHeadersSize,
  38. kKeepAliveMaxTimeout,
  39. kKeepAliveTimeoutThreshold,
  40. kHeadersTimeout,
  41. kBodyTimeout,
  42. kStrictContentLength,
  43. kConnector,
  44. kMaxRequests,
  45. kCounter,
  46. kClose,
  47. kDestroy,
  48. kDispatch,
  49. kLocalAddress,
  50. kMaxResponseSize,
  51. kOnError,
  52. kHTTPContext,
  53. kMaxConcurrentStreams,
  54. kHTTP2InitialWindowSize,
  55. kHTTP2ConnectionWindowSize,
  56. kResume,
  57. kPingInterval
  58. } = require('../core/symbols.js')
  59. const connectH1 = require('./client-h1.js')
  60. const connectH2 = require('./client-h2.js')
  61. const kClosedResolve = Symbol('kClosedResolve')
  62. const getDefaultNodeMaxHeaderSize = http &&
  63. http.maxHeaderSize &&
  64. Number.isInteger(http.maxHeaderSize) &&
  65. http.maxHeaderSize > 0
  66. ? () => http.maxHeaderSize
  67. : () => { throw new InvalidArgumentError('http module not available or http.maxHeaderSize invalid') }
  68. const noop = () => {}
  69. function getPipelining (client) {
  70. return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
  71. }
  72. /**
  73. * @type {import('../../types/client.js').default}
  74. */
  75. class Client extends DispatcherBase {
  76. /**
  77. *
  78. * @param {string|URL} url
  79. * @param {import('../../types/client.js').Client.Options} options
  80. */
  81. constructor (url, {
  82. maxHeaderSize,
  83. headersTimeout,
  84. socketTimeout,
  85. requestTimeout,
  86. connectTimeout,
  87. bodyTimeout,
  88. idleTimeout,
  89. keepAlive,
  90. keepAliveTimeout,
  91. maxKeepAliveTimeout,
  92. keepAliveMaxTimeout,
  93. keepAliveTimeoutThreshold,
  94. socketPath,
  95. pipelining,
  96. tls,
  97. strictContentLength,
  98. maxCachedSessions,
  99. connect,
  100. maxRequestsPerClient,
  101. localAddress,
  102. maxResponseSize,
  103. autoSelectFamily,
  104. autoSelectFamilyAttemptTimeout,
  105. // h2
  106. maxConcurrentStreams,
  107. allowH2,
  108. useH2c,
  109. initialWindowSize,
  110. connectionWindowSize,
  111. pingInterval
  112. } = {}) {
  113. if (keepAlive !== undefined) {
  114. throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
  115. }
  116. if (socketTimeout !== undefined) {
  117. throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
  118. }
  119. if (requestTimeout !== undefined) {
  120. throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
  121. }
  122. if (idleTimeout !== undefined) {
  123. throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
  124. }
  125. if (maxKeepAliveTimeout !== undefined) {
  126. throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
  127. }
  128. if (maxHeaderSize != null) {
  129. if (!Number.isInteger(maxHeaderSize) || maxHeaderSize < 1) {
  130. throw new InvalidArgumentError('invalid maxHeaderSize')
  131. }
  132. } else {
  133. // If maxHeaderSize is not provided, use the default value from the http module
  134. // or if that is not available, throw an error.
  135. maxHeaderSize = getDefaultNodeMaxHeaderSize()
  136. }
  137. if (socketPath != null && typeof socketPath !== 'string') {
  138. throw new InvalidArgumentError('invalid socketPath')
  139. }
  140. if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
  141. throw new InvalidArgumentError('invalid connectTimeout')
  142. }
  143. if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
  144. throw new InvalidArgumentError('invalid keepAliveTimeout')
  145. }
  146. if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
  147. throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
  148. }
  149. if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
  150. throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
  151. }
  152. if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
  153. throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
  154. }
  155. if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
  156. throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
  157. }
  158. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  159. throw new InvalidArgumentError('connect must be a function or an object')
  160. }
  161. if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
  162. throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
  163. }
  164. if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
  165. throw new InvalidArgumentError('localAddress must be valid string IP address')
  166. }
  167. if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
  168. throw new InvalidArgumentError('maxResponseSize must be a positive number')
  169. }
  170. if (
  171. autoSelectFamilyAttemptTimeout != null &&
  172. (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
  173. ) {
  174. throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
  175. }
  176. // h2
  177. if (allowH2 != null && typeof allowH2 !== 'boolean') {
  178. throw new InvalidArgumentError('allowH2 must be a valid boolean value')
  179. }
  180. if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
  181. throw new InvalidArgumentError('maxConcurrentStreams must be a positive integer, greater than 0')
  182. }
  183. if (useH2c != null && typeof useH2c !== 'boolean') {
  184. throw new InvalidArgumentError('useH2c must be a valid boolean value')
  185. }
  186. if (initialWindowSize != null && (!Number.isInteger(initialWindowSize) || initialWindowSize < 1)) {
  187. throw new InvalidArgumentError('initialWindowSize must be a positive integer, greater than 0')
  188. }
  189. if (connectionWindowSize != null && (!Number.isInteger(connectionWindowSize) || connectionWindowSize < 1)) {
  190. throw new InvalidArgumentError('connectionWindowSize must be a positive integer, greater than 0')
  191. }
  192. if (pingInterval != null && (typeof pingInterval !== 'number' || !Number.isInteger(pingInterval) || pingInterval < 0)) {
  193. throw new InvalidArgumentError('pingInterval must be a positive integer, greater or equal to 0')
  194. }
  195. super()
  196. if (typeof connect !== 'function') {
  197. connect = buildConnector({
  198. ...tls,
  199. maxCachedSessions,
  200. allowH2,
  201. useH2c,
  202. socketPath,
  203. timeout: connectTimeout,
  204. ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
  205. ...connect
  206. })
  207. }
  208. this[kUrl] = util.parseOrigin(url)
  209. this[kConnector] = connect
  210. this[kPipelining] = pipelining != null ? pipelining : 1
  211. this[kMaxHeadersSize] = maxHeaderSize
  212. this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
  213. this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
  214. this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold
  215. this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
  216. this[kServerName] = null
  217. this[kLocalAddress] = localAddress != null ? localAddress : null
  218. this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
  219. this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
  220. this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
  221. this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
  222. this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
  223. this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
  224. this[kMaxRequests] = maxRequestsPerClient
  225. this[kClosedResolve] = null
  226. this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
  227. this[kHTTPContext] = null
  228. // h2
  229. this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
  230. // HTTP/2 window sizes are set to higher defaults than Node.js core for better performance:
  231. // - initialWindowSize: 262144 (256KB) vs Node.js default 65535 (64KB - 1)
  232. // Allows more data to be sent before requiring acknowledgment, improving throughput
  233. // especially on high-latency networks. This matches common production HTTP/2 servers.
  234. // - connectionWindowSize: 524288 (512KB) vs Node.js default (none set)
  235. // Provides better flow control for the entire connection across multiple streams.
  236. this[kHTTP2InitialWindowSize] = initialWindowSize != null ? initialWindowSize : 262144
  237. this[kHTTP2ConnectionWindowSize] = connectionWindowSize != null ? connectionWindowSize : 524288
  238. this[kPingInterval] = pingInterval != null ? pingInterval : 60e3 // Default ping interval for h2 - 1 minute
  239. // kQueue is built up of 3 sections separated by
  240. // the kRunningIdx and kPendingIdx indices.
  241. // | complete | running | pending |
  242. // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
  243. // kRunningIdx points to the first running element.
  244. // kPendingIdx points to the first pending element.
  245. // This implements a fast queue with an amortized
  246. // time of O(1).
  247. this[kQueue] = []
  248. this[kRunningIdx] = 0
  249. this[kPendingIdx] = 0
  250. this[kResume] = (sync) => resume(this, sync)
  251. this[kOnError] = (err) => onError(this, err)
  252. }
  253. get pipelining () {
  254. return this[kPipelining]
  255. }
  256. set pipelining (value) {
  257. this[kPipelining] = value
  258. this[kResume](true)
  259. }
  260. get stats () {
  261. return new ClientStats(this)
  262. }
  263. get [kPending] () {
  264. return this[kQueue].length - this[kPendingIdx]
  265. }
  266. get [kRunning] () {
  267. return this[kPendingIdx] - this[kRunningIdx]
  268. }
  269. get [kSize] () {
  270. return this[kQueue].length - this[kRunningIdx]
  271. }
  272. get [kConnected] () {
  273. return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
  274. }
  275. get [kBusy] () {
  276. return Boolean(
  277. this[kHTTPContext]?.busy(null) ||
  278. (this[kSize] >= (getPipelining(this) || 1)) ||
  279. this[kPending] > 0
  280. )
  281. }
  282. [kConnect] (cb) {
  283. connect(this)
  284. this.once('connect', cb)
  285. }
  286. [kDispatch] (opts, handler) {
  287. const request = new Request(this[kUrl].origin, opts, handler)
  288. this[kQueue].push(request)
  289. if (this[kResuming]) {
  290. // Do nothing.
  291. } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
  292. // Wait a tick in case stream/iterator is ended in the same tick.
  293. this[kResuming] = 1
  294. queueMicrotask(() => resume(this))
  295. } else {
  296. this[kResume](true)
  297. }
  298. if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
  299. this[kNeedDrain] = 2
  300. }
  301. return this[kNeedDrain] < 2
  302. }
  303. [kClose] () {
  304. // TODO: for H2 we need to gracefully flush the remaining enqueued
  305. // request and close each stream.
  306. return new Promise((resolve) => {
  307. if (this[kSize]) {
  308. this[kClosedResolve] = resolve
  309. } else {
  310. resolve(null)
  311. }
  312. })
  313. }
  314. [kDestroy] (err) {
  315. return new Promise((resolve) => {
  316. const requests = this[kQueue].splice(this[kPendingIdx])
  317. for (let i = 0; i < requests.length; i++) {
  318. const request = requests[i]
  319. util.errorRequest(this, request, err)
  320. }
  321. const callback = () => {
  322. if (this[kClosedResolve]) {
  323. // TODO (fix): Should we error here with ClientDestroyedError?
  324. this[kClosedResolve]()
  325. this[kClosedResolve] = null
  326. }
  327. resolve(null)
  328. }
  329. if (this[kHTTPContext]) {
  330. this[kHTTPContext].destroy(err, callback)
  331. this[kHTTPContext] = null
  332. } else {
  333. queueMicrotask(callback)
  334. }
  335. this[kResume]()
  336. })
  337. }
  338. }
  339. function onError (client, err) {
  340. if (
  341. client[kRunning] === 0 &&
  342. err.code !== 'UND_ERR_INFO' &&
  343. err.code !== 'UND_ERR_SOCKET'
  344. ) {
  345. // Error is not caused by running request and not a recoverable
  346. // socket error.
  347. assert(client[kPendingIdx] === client[kRunningIdx])
  348. const requests = client[kQueue].splice(client[kRunningIdx])
  349. for (let i = 0; i < requests.length; i++) {
  350. const request = requests[i]
  351. util.errorRequest(client, request, err)
  352. }
  353. assert(client[kSize] === 0)
  354. }
  355. }
  356. /**
  357. * @param {Client} client
  358. * @returns {void}
  359. */
  360. function connect (client) {
  361. assert(!client[kConnecting])
  362. assert(!client[kHTTPContext])
  363. let { host, hostname, protocol, port } = client[kUrl]
  364. // Resolve ipv6
  365. if (hostname[0] === '[') {
  366. const idx = hostname.indexOf(']')
  367. assert(idx !== -1)
  368. const ip = hostname.substring(1, idx)
  369. assert(net.isIPv6(ip))
  370. hostname = ip
  371. }
  372. client[kConnecting] = true
  373. if (channels.beforeConnect.hasSubscribers) {
  374. channels.beforeConnect.publish({
  375. connectParams: {
  376. host,
  377. hostname,
  378. protocol,
  379. port,
  380. version: client[kHTTPContext]?.version,
  381. servername: client[kServerName],
  382. localAddress: client[kLocalAddress]
  383. },
  384. connector: client[kConnector]
  385. })
  386. }
  387. client[kConnector]({
  388. host,
  389. hostname,
  390. protocol,
  391. port,
  392. servername: client[kServerName],
  393. localAddress: client[kLocalAddress]
  394. }, (err, socket) => {
  395. if (err) {
  396. handleConnectError(client, err, { host, hostname, protocol, port })
  397. client[kResume]()
  398. return
  399. }
  400. if (client.destroyed) {
  401. util.destroy(socket.on('error', noop), new ClientDestroyedError())
  402. client[kResume]()
  403. return
  404. }
  405. assert(socket)
  406. try {
  407. client[kHTTPContext] = socket.alpnProtocol === 'h2'
  408. ? connectH2(client, socket)
  409. : connectH1(client, socket)
  410. } catch (err) {
  411. socket.destroy().on('error', noop)
  412. handleConnectError(client, err, { host, hostname, protocol, port })
  413. client[kResume]()
  414. return
  415. }
  416. client[kConnecting] = false
  417. socket[kCounter] = 0
  418. socket[kMaxRequests] = client[kMaxRequests]
  419. socket[kClient] = client
  420. socket[kError] = null
  421. if (channels.connected.hasSubscribers) {
  422. channels.connected.publish({
  423. connectParams: {
  424. host,
  425. hostname,
  426. protocol,
  427. port,
  428. version: client[kHTTPContext]?.version,
  429. servername: client[kServerName],
  430. localAddress: client[kLocalAddress]
  431. },
  432. connector: client[kConnector],
  433. socket
  434. })
  435. }
  436. client.emit('connect', client[kUrl], [client])
  437. client[kResume]()
  438. })
  439. }
  440. function handleConnectError (client, err, { host, hostname, protocol, port }) {
  441. if (client.destroyed) {
  442. return
  443. }
  444. client[kConnecting] = false
  445. if (channels.connectError.hasSubscribers) {
  446. channels.connectError.publish({
  447. connectParams: {
  448. host,
  449. hostname,
  450. protocol,
  451. port,
  452. version: client[kHTTPContext]?.version,
  453. servername: client[kServerName],
  454. localAddress: client[kLocalAddress]
  455. },
  456. connector: client[kConnector],
  457. error: err
  458. })
  459. }
  460. if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
  461. assert(client[kRunning] === 0)
  462. while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
  463. const request = client[kQueue][client[kPendingIdx]++]
  464. util.errorRequest(client, request, err)
  465. }
  466. } else {
  467. onError(client, err)
  468. }
  469. client.emit('connectionError', client[kUrl], [client], err)
  470. }
  471. function emitDrain (client) {
  472. client[kNeedDrain] = 0
  473. client.emit('drain', client[kUrl], [client])
  474. }
  475. function resume (client, sync) {
  476. if (client[kResuming] === 2) {
  477. return
  478. }
  479. client[kResuming] = 2
  480. _resume(client, sync)
  481. client[kResuming] = 0
  482. if (client[kRunningIdx] > 256) {
  483. client[kQueue].splice(0, client[kRunningIdx])
  484. client[kPendingIdx] -= client[kRunningIdx]
  485. client[kRunningIdx] = 0
  486. }
  487. }
  488. function _resume (client, sync) {
  489. while (true) {
  490. if (client.destroyed) {
  491. assert(client[kPending] === 0)
  492. return
  493. }
  494. if (client[kClosedResolve] && !client[kSize]) {
  495. client[kClosedResolve]()
  496. client[kClosedResolve] = null
  497. return
  498. }
  499. if (client[kHTTPContext]) {
  500. client[kHTTPContext].resume()
  501. }
  502. if (client[kBusy]) {
  503. client[kNeedDrain] = 2
  504. } else if (client[kNeedDrain] === 2) {
  505. if (sync) {
  506. client[kNeedDrain] = 1
  507. queueMicrotask(() => emitDrain(client))
  508. } else {
  509. emitDrain(client)
  510. }
  511. continue
  512. }
  513. if (client[kPending] === 0) {
  514. return
  515. }
  516. if (client[kRunning] >= (getPipelining(client) || 1)) {
  517. return
  518. }
  519. const request = client[kQueue][client[kPendingIdx]]
  520. if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
  521. if (client[kRunning] > 0) {
  522. return
  523. }
  524. client[kServerName] = request.servername
  525. client[kHTTPContext]?.destroy(new InformationalError('servername changed'), () => {
  526. client[kHTTPContext] = null
  527. resume(client)
  528. })
  529. }
  530. if (client[kConnecting]) {
  531. return
  532. }
  533. if (!client[kHTTPContext]) {
  534. connect(client)
  535. return
  536. }
  537. if (client[kHTTPContext].destroyed) {
  538. return
  539. }
  540. if (client[kHTTPContext].busy(request)) {
  541. return
  542. }
  543. if (!request.aborted && client[kHTTPContext].write(request)) {
  544. client[kPendingIdx]++
  545. } else {
  546. client[kQueue].splice(client[kPendingIdx], 1)
  547. }
  548. }
  549. }
  550. module.exports = Client