round-robin-pool.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. 'use strict'
  2. const {
  3. PoolBase,
  4. kClients,
  5. kNeedDrain,
  6. kAddClient,
  7. kGetDispatcher,
  8. kRemoveClient
  9. } = require('./pool-base')
  10. const Client = require('./client')
  11. const {
  12. InvalidArgumentError
  13. } = require('../core/errors')
  14. const util = require('../core/util')
  15. const { kUrl } = require('../core/symbols')
  16. const buildConnector = require('../core/connect')
  17. const kOptions = Symbol('options')
  18. const kConnections = Symbol('connections')
  19. const kFactory = Symbol('factory')
  20. const kIndex = Symbol('index')
  21. function defaultFactory (origin, opts) {
  22. return new Client(origin, opts)
  23. }
  24. class RoundRobinPool extends PoolBase {
  25. constructor (origin, {
  26. connections,
  27. factory = defaultFactory,
  28. connect,
  29. connectTimeout,
  30. tls,
  31. maxCachedSessions,
  32. socketPath,
  33. autoSelectFamily,
  34. autoSelectFamilyAttemptTimeout,
  35. allowH2,
  36. clientTtl,
  37. ...options
  38. } = {}) {
  39. if (connections != null && (!Number.isFinite(connections) || connections < 0)) {
  40. throw new InvalidArgumentError('invalid connections')
  41. }
  42. if (typeof factory !== 'function') {
  43. throw new InvalidArgumentError('factory must be a function.')
  44. }
  45. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  46. throw new InvalidArgumentError('connect must be a function or an object')
  47. }
  48. if (typeof connect !== 'function') {
  49. connect = buildConnector({
  50. ...tls,
  51. maxCachedSessions,
  52. allowH2,
  53. socketPath,
  54. timeout: connectTimeout,
  55. ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
  56. ...connect
  57. })
  58. }
  59. super()
  60. this[kConnections] = connections || null
  61. this[kUrl] = util.parseOrigin(origin)
  62. this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl }
  63. this[kOptions].interceptors = options.interceptors
  64. ? { ...options.interceptors }
  65. : undefined
  66. this[kFactory] = factory
  67. this[kIndex] = -1
  68. this.on('connect', (origin, targets) => {
  69. if (clientTtl != null && clientTtl > 0) {
  70. for (const target of targets) {
  71. Object.assign(target, { ttl: Date.now() })
  72. }
  73. }
  74. })
  75. this.on('connectionError', (origin, targets, error) => {
  76. for (const target of targets) {
  77. const idx = this[kClients].indexOf(target)
  78. if (idx !== -1) {
  79. this[kClients].splice(idx, 1)
  80. }
  81. }
  82. })
  83. }
  84. [kGetDispatcher] () {
  85. const clientTtlOption = this[kOptions].clientTtl
  86. const clientsLength = this[kClients].length
  87. // If we have no clients yet, create one
  88. if (clientsLength === 0) {
  89. const dispatcher = this[kFactory](this[kUrl], this[kOptions])
  90. this[kAddClient](dispatcher)
  91. return dispatcher
  92. }
  93. // Round-robin through existing clients
  94. let checked = 0
  95. while (checked < clientsLength) {
  96. this[kIndex] = (this[kIndex] + 1) % clientsLength
  97. const client = this[kClients][this[kIndex]]
  98. // Check if client is stale (TTL expired)
  99. if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) {
  100. this[kRemoveClient](client)
  101. checked++
  102. continue
  103. }
  104. // Return client if it's not draining
  105. if (!client[kNeedDrain]) {
  106. return client
  107. }
  108. checked++
  109. }
  110. // All clients are busy, create a new one if we haven't reached the limit
  111. if (!this[kConnections] || clientsLength < this[kConnections]) {
  112. const dispatcher = this[kFactory](this[kUrl], this[kOptions])
  113. this[kAddClient](dispatcher)
  114. return dispatcher
  115. }
  116. }
  117. }
  118. module.exports = RoundRobinPool