agent.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. 'use strict'
  2. const { InvalidArgumentError, MaxOriginsReachedError } = require('../core/errors')
  3. const { kClients, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
  4. const DispatcherBase = require('./dispatcher-base')
  5. const Pool = require('./pool')
  6. const Client = require('./client')
  7. const util = require('../core/util')
  8. const kOnConnect = Symbol('onConnect')
  9. const kOnDisconnect = Symbol('onDisconnect')
  10. const kOnConnectionError = Symbol('onConnectionError')
  11. const kOnDrain = Symbol('onDrain')
  12. const kFactory = Symbol('factory')
  13. const kOptions = Symbol('options')
  14. const kOrigins = Symbol('origins')
  15. function defaultFactory (origin, opts) {
  16. return opts && opts.connections === 1
  17. ? new Client(origin, opts)
  18. : new Pool(origin, opts)
  19. }
  20. class Agent extends DispatcherBase {
  21. constructor ({ factory = defaultFactory, maxOrigins = Infinity, connect, ...options } = {}) {
  22. if (typeof factory !== 'function') {
  23. throw new InvalidArgumentError('factory must be a function.')
  24. }
  25. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  26. throw new InvalidArgumentError('connect must be a function or an object')
  27. }
  28. if (typeof maxOrigins !== 'number' || Number.isNaN(maxOrigins) || maxOrigins <= 0) {
  29. throw new InvalidArgumentError('maxOrigins must be a number greater than 0')
  30. }
  31. super()
  32. if (connect && typeof connect !== 'function') {
  33. connect = { ...connect }
  34. }
  35. this[kOptions] = { ...util.deepClone(options), maxOrigins, connect }
  36. this[kFactory] = factory
  37. this[kClients] = new Map()
  38. this[kOrigins] = new Set()
  39. this[kOnDrain] = (origin, targets) => {
  40. this.emit('drain', origin, [this, ...targets])
  41. }
  42. this[kOnConnect] = (origin, targets) => {
  43. this.emit('connect', origin, [this, ...targets])
  44. }
  45. this[kOnDisconnect] = (origin, targets, err) => {
  46. this.emit('disconnect', origin, [this, ...targets], err)
  47. }
  48. this[kOnConnectionError] = (origin, targets, err) => {
  49. this.emit('connectionError', origin, [this, ...targets], err)
  50. }
  51. }
  52. get [kRunning] () {
  53. let ret = 0
  54. for (const { dispatcher } of this[kClients].values()) {
  55. ret += dispatcher[kRunning]
  56. }
  57. return ret
  58. }
  59. [kDispatch] (opts, handler) {
  60. let key
  61. if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
  62. key = String(opts.origin)
  63. } else {
  64. throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
  65. }
  66. if (this[kOrigins].size >= this[kOptions].maxOrigins && !this[kOrigins].has(key)) {
  67. throw new MaxOriginsReachedError()
  68. }
  69. const result = this[kClients].get(key)
  70. let dispatcher = result && result.dispatcher
  71. if (!dispatcher) {
  72. const closeClientIfUnused = (connected) => {
  73. const result = this[kClients].get(key)
  74. if (result) {
  75. if (connected) result.count -= 1
  76. if (result.count <= 0) {
  77. this[kClients].delete(key)
  78. if (!result.dispatcher.destroyed) {
  79. result.dispatcher.close()
  80. }
  81. }
  82. this[kOrigins].delete(key)
  83. }
  84. }
  85. dispatcher = this[kFactory](opts.origin, this[kOptions])
  86. .on('drain', this[kOnDrain])
  87. .on('connect', (origin, targets) => {
  88. const result = this[kClients].get(key)
  89. if (result) {
  90. result.count += 1
  91. }
  92. this[kOnConnect](origin, targets)
  93. })
  94. .on('disconnect', (origin, targets, err) => {
  95. closeClientIfUnused(true)
  96. this[kOnDisconnect](origin, targets, err)
  97. })
  98. .on('connectionError', (origin, targets, err) => {
  99. closeClientIfUnused(false)
  100. this[kOnConnectionError](origin, targets, err)
  101. })
  102. this[kClients].set(key, { count: 0, dispatcher })
  103. this[kOrigins].add(key)
  104. }
  105. return dispatcher.dispatch(opts, handler)
  106. }
  107. [kClose] () {
  108. const closePromises = []
  109. for (const { dispatcher } of this[kClients].values()) {
  110. closePromises.push(dispatcher.close())
  111. }
  112. this[kClients].clear()
  113. return Promise.all(closePromises)
  114. }
  115. [kDestroy] (err) {
  116. const destroyPromises = []
  117. for (const { dispatcher } of this[kClients].values()) {
  118. destroyPromises.push(dispatcher.destroy(err))
  119. }
  120. this[kClients].clear()
  121. return Promise.all(destroyPromises)
  122. }
  123. get stats () {
  124. const allClientStats = {}
  125. for (const { dispatcher } of this[kClients].values()) {
  126. if (dispatcher.stats) {
  127. allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
  128. }
  129. }
  130. return allClientStats
  131. }
  132. }
  133. module.exports = Agent