sse.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. "use strict";
  2. function safeJson(value) {
  3. try {
  4. return JSON.stringify(value);
  5. } catch {
  6. return "{}";
  7. }
  8. }
  9. class SseHub {
  10. constructor(options = {}) {
  11. this.clients = new Map();
  12. this.keepaliveMs = Number.isFinite(options.keepaliveMs) ? Math.max(5000, options.keepaliveMs) : 25000;
  13. }
  14. _setFor(conversationId) {
  15. if (!this.clients.has(conversationId)) {
  16. this.clients.set(conversationId, new Set());
  17. }
  18. return this.clients.get(conversationId);
  19. }
  20. subscribe(conversationId, req, res) {
  21. const key = conversationId || "*";
  22. const set = this._setFor(key);
  23. const client = { res, key };
  24. set.add(client);
  25. res.writeHead(200, {
  26. "Content-Type": "text/event-stream; charset=utf-8",
  27. "Cache-Control": "no-cache, no-transform",
  28. Connection: "keep-alive",
  29. "X-Accel-Buffering": "no",
  30. });
  31. res.write(": connected\n\n");
  32. const keepalive = setInterval(() => {
  33. try {
  34. res.write(": keepalive\n\n");
  35. } catch {
  36. this._unsubscribe(client, keepalive);
  37. }
  38. }, this.keepaliveMs);
  39. const cleanup = () => this._unsubscribe(client, keepalive);
  40. req.on("close", cleanup);
  41. req.on("error", cleanup);
  42. res.on("error", cleanup);
  43. }
  44. _unsubscribe(client, timer) {
  45. if (timer) clearInterval(timer);
  46. const set = this.clients.get(client.key);
  47. if (!set) return;
  48. set.delete(client);
  49. if (set.size === 0) {
  50. this.clients.delete(client.key);
  51. }
  52. }
  53. _broadcast(set, event, payload) {
  54. if (!set || set.size === 0) return;
  55. const data = safeJson(payload);
  56. for (const client of set) {
  57. try {
  58. client.res.write(`event: ${event}\n`);
  59. client.res.write(`data: ${data}\n\n`);
  60. } catch {
  61. // Ignore write failures; cleanup happens via close handler.
  62. }
  63. }
  64. }
  65. publishConversation(conversationId, payload) {
  66. const body = {
  67. conversationId,
  68. timestamp: new Date().toISOString(),
  69. ...payload,
  70. };
  71. this._broadcast(this.clients.get(conversationId), "conversation_update", body);
  72. this._broadcast(this.clients.get("*"), "conversation_update", body);
  73. }
  74. }
  75. module.exports = {
  76. SseHub,
  77. };