api-upgrade.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. 'use strict'
  2. const { InvalidArgumentError, SocketError } = require('../core/errors')
  3. const { AsyncResource } = require('node:async_hooks')
  4. const assert = require('node:assert')
  5. const util = require('../core/util')
  6. const { kHTTP2Stream } = require('../core/symbols')
  7. const { addSignal, removeSignal } = require('./abort-signal')
  8. class UpgradeHandler extends AsyncResource {
  9. constructor (opts, callback) {
  10. if (!opts || typeof opts !== 'object') {
  11. throw new InvalidArgumentError('invalid opts')
  12. }
  13. if (typeof callback !== 'function') {
  14. throw new InvalidArgumentError('invalid callback')
  15. }
  16. const { signal, opaque, responseHeaders } = opts
  17. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  18. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  19. }
  20. super('UNDICI_UPGRADE')
  21. this.responseHeaders = responseHeaders || null
  22. this.opaque = opaque || null
  23. this.callback = callback
  24. this.abort = null
  25. this.context = null
  26. addSignal(this, signal)
  27. }
  28. onConnect (abort, context) {
  29. if (this.reason) {
  30. abort(this.reason)
  31. return
  32. }
  33. assert(this.callback)
  34. this.abort = abort
  35. this.context = null
  36. }
  37. onHeaders () {
  38. throw new SocketError('bad upgrade', null)
  39. }
  40. onUpgrade (statusCode, rawHeaders, socket) {
  41. assert(socket[kHTTP2Stream] === true ? statusCode === 200 : statusCode === 101)
  42. const { callback, opaque, context } = this
  43. removeSignal(this)
  44. this.callback = null
  45. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  46. this.runInAsyncScope(callback, null, null, {
  47. headers,
  48. socket,
  49. opaque,
  50. context
  51. })
  52. }
  53. onError (err) {
  54. const { callback, opaque } = this
  55. removeSignal(this)
  56. if (callback) {
  57. this.callback = null
  58. queueMicrotask(() => {
  59. this.runInAsyncScope(callback, null, err, { opaque })
  60. })
  61. }
  62. }
  63. }
  64. function upgrade (opts, callback) {
  65. if (callback === undefined) {
  66. return new Promise((resolve, reject) => {
  67. upgrade.call(this, opts, (err, data) => {
  68. return err ? reject(err) : resolve(data)
  69. })
  70. })
  71. }
  72. try {
  73. const upgradeHandler = new UpgradeHandler(opts, callback)
  74. const upgradeOpts = {
  75. ...opts,
  76. method: opts.method || 'GET',
  77. upgrade: opts.protocol || 'Websocket'
  78. }
  79. this.dispatch(upgradeOpts, upgradeHandler)
  80. } catch (err) {
  81. if (typeof callback !== 'function') {
  82. throw err
  83. }
  84. const opaque = opts?.opaque
  85. queueMicrotask(() => callback(err, { opaque }))
  86. }
  87. }
  88. module.exports = upgrade