handler.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. import fs from "node:fs/promises";
  2. import path from "node:path";
  3. import os from "node:os";
  4. import crypto from "node:crypto";
  5. const HOOK_KEY = "task-board-realtime";
  6. const DEFAULT_API_BASE = "http://127.0.0.1:3001";
  7. const DEFAULT_MAX_REMEMBERED = 3000;
  8. const DEFAULT_ADMIN_TOKEN = "dev-task-board-token";
  9. const SESSION_TAIL_BYTES = 2 * 1024 * 1024;
  10. const SESSION_USER_SCAN_LIMIT = 80;
  11. const DEFERRED_RECONCILE_DELAYS_MS = [1600, 4800];
  12. function nowIso() {
  13. return new Date().toISOString();
  14. }
  15. function safeTrim(value) {
  16. return typeof value === "string" ? value.trim() : "";
  17. }
  18. function oneLine(text) {
  19. return safeTrim(text).replace(/\s+/g, " ");
  20. }
  21. function truncate(text, maxChars) {
  22. if (!text) return "";
  23. if (text.length <= maxChars) return text;
  24. return `${text.slice(0, Math.max(1, maxChars - 1))}...`;
  25. }
  26. function stableHash(input) {
  27. return crypto.createHash("sha1").update(input).digest("hex").slice(0, 12);
  28. }
  29. function resolveStateDir() {
  30. const explicit = safeTrim(process.env.OPENCLAW_STATE_DIR);
  31. if (explicit) return explicit;
  32. return path.join(os.homedir(), ".openclaw");
  33. }
  34. function resolveConfigPath(stateDir) {
  35. const explicit = safeTrim(process.env.OPENCLAW_CONFIG_PATH);
  36. if (explicit) return explicit;
  37. return path.join(stateDir, "openclaw.json");
  38. }
  39. async function readJson(filePath, fallback) {
  40. try {
  41. const raw = await fs.readFile(filePath, "utf8");
  42. return JSON.parse(raw);
  43. } catch {
  44. return fallback;
  45. }
  46. }
  47. async function writeJsonAtomic(filePath, payload) {
  48. await fs.mkdir(path.dirname(filePath), { recursive: true });
  49. const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
  50. await fs.writeFile(tempPath, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
  51. await fs.rename(tempPath, filePath);
  52. }
  53. async function appendLog(stateDir, message) {
  54. try {
  55. const logPath = path.join(stateDir, "logs", "task-board-realtime.log");
  56. await fs.mkdir(path.dirname(logPath), { recursive: true });
  57. await fs.appendFile(logPath, `[${nowIso()}] ${message}\n`, "utf8");
  58. } catch {
  59. // Never fail hook execution on logging errors.
  60. }
  61. }
  62. function resolveHookConfig(config, stateDir) {
  63. const workspaceDir =
  64. safeTrim(config?.agents?.defaults?.workspace) ||
  65. safeTrim(config?.workspace?.dir) ||
  66. path.join(stateDir, "workspace");
  67. const cfg = config?.hooks?.internal?.entries?.[HOOK_KEY] ?? {};
  68. const apiBaseUrl =
  69. safeTrim(cfg.apiBaseUrl) ||
  70. safeTrim(process.env.TASK_BOARD_API_BASE) ||
  71. DEFAULT_API_BASE;
  72. const adminToken =
  73. safeTrim(cfg.adminToken) ||
  74. safeTrim(process.env.TASK_BOARD_ADMIN_TOKEN) ||
  75. DEFAULT_ADMIN_TOKEN;
  76. const stateFileRaw = safeTrim(cfg.stateFile);
  77. const stateFile = stateFileRaw
  78. ? path.isAbsolute(stateFileRaw)
  79. ? stateFileRaw
  80. : path.join(workspaceDir, stateFileRaw)
  81. : path.join(workspaceDir, "task-board", ".sync-state.json");
  82. const maxRemembered = Number.isFinite(cfg.maxRememberedMessages)
  83. ? Math.max(200, Math.floor(cfg.maxRememberedMessages))
  84. : DEFAULT_MAX_REMEMBERED;
  85. return {
  86. workspaceDir,
  87. apiBaseUrl: apiBaseUrl.replace(/\/+$/, ""),
  88. stateFile,
  89. adminToken,
  90. ignoreSlashCommands: cfg.ignoreSlashCommands !== false,
  91. maxRemembered,
  92. };
  93. }
  94. function normalizeState(state) {
  95. const safe = state && typeof state === "object" ? state : {};
  96. return {
  97. version: 1,
  98. sessions: safe.sessions && typeof safe.sessions === "object" ? safe.sessions : {},
  99. counters: safe.counters && typeof safe.counters === "object" ? safe.counters : {},
  100. messages: safe.messages && typeof safe.messages === "object" ? safe.messages : {},
  101. updatedAt: safeTrim(safe.updatedAt) || nowIso(),
  102. };
  103. }
  104. function cleanupSeenMessages(state, maxRemembered) {
  105. const entries = Object.entries(state.messages).sort((a, b) => {
  106. const ta = Date.parse(a[1]) || 0;
  107. const tb = Date.parse(b[1]) || 0;
  108. return tb - ta;
  109. });
  110. if (entries.length <= maxRemembered) return;
  111. for (const [key] of entries.slice(maxRemembered)) {
  112. delete state.messages[key];
  113. }
  114. }
  115. function toIsoFromEvent(event) {
  116. const contextTs = Number(event?.context?.timestamp);
  117. if (Number.isFinite(contextTs)) {
  118. return new Date(contextTs).toISOString();
  119. }
  120. if (event?.timestamp instanceof Date && Number.isFinite(event.timestamp.getTime())) {
  121. return event.timestamp.toISOString();
  122. }
  123. return nowIso();
  124. }
  125. async function requestJson(apiBaseUrl, urlPath, method = "GET", payload, extraHeaders = {}) {
  126. const controller = new AbortController();
  127. const timer = setTimeout(() => controller.abort(), 8000);
  128. try {
  129. const response = await fetch(`${apiBaseUrl}${urlPath}`, {
  130. method,
  131. headers: {
  132. "Content-Type": "application/json",
  133. ...extraHeaders,
  134. },
  135. body: payload === undefined ? undefined : JSON.stringify(payload),
  136. signal: controller.signal,
  137. });
  138. const data = await response.json().catch(() => ({}));
  139. if (!response.ok) {
  140. throw new Error(`${method} ${urlPath} failed: ${response.status} ${data.error || ""}`.trim());
  141. }
  142. return data;
  143. } finally {
  144. clearTimeout(timer);
  145. }
  146. }
  147. function buildMessageDedupKey(event, content, isoAt) {
  148. const sessionKey = safeTrim(event?.sessionKey) || "unknown-session";
  149. const messageId = safeTrim(event?.context?.messageId);
  150. if (messageId) {
  151. return `${sessionKey}::${messageId}`;
  152. }
  153. return `${sessionKey}::${stableHash(`${content}::${isoAt}`)}`;
  154. }
  155. function resolveAgentId(event) {
  156. const fromContext = safeTrim(event?.context?.agentId);
  157. const fromSessionKey = safeTrim(event?.sessionKey).split(":")[1] || "";
  158. return fromContext || fromSessionKey || "main";
  159. }
  160. async function resolveSessionIdFromRegistry(stateDir, event) {
  161. const sessionKey = safeTrim(event?.sessionKey);
  162. if (!sessionKey) return "";
  163. const agentId = resolveAgentId(event);
  164. const registryPath = path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
  165. const registry = await readJson(registryPath, {});
  166. const direct = registry?.[sessionKey];
  167. if (safeTrim(direct?.sessionId)) return safeTrim(direct.sessionId);
  168. const lowered = sessionKey.toLowerCase();
  169. const loweredMatch = registry?.[lowered];
  170. if (safeTrim(loweredMatch?.sessionId)) return safeTrim(loweredMatch.sessionId);
  171. return "";
  172. }
  173. async function resolveSessionTranscriptPath(stateDir, event) {
  174. const explicitSessionId = safeTrim(event?.context?.sessionId);
  175. const sessionId = explicitSessionId || (await resolveSessionIdFromRegistry(stateDir, event));
  176. if (!sessionId) return "";
  177. const agentId = resolveAgentId(event);
  178. const sessionsDir = path.join(stateDir, "agents", agentId, "sessions");
  179. const exactPath = path.join(sessionsDir, `${sessionId}.jsonl`);
  180. try {
  181. const entries = await fs.readdir(sessionsDir, { withFileTypes: true });
  182. const prefixed = entries
  183. .filter((entry) => entry.isFile())
  184. .map((entry) => entry.name)
  185. .filter((name) => name.startsWith(`${sessionId}-`) && name.endsWith(".jsonl"));
  186. const candidates = [path.basename(exactPath), ...prefixed];
  187. if (!candidates.length) return "";
  188. const withStats = await Promise.all(
  189. candidates.map(async (name) => {
  190. const fullPath = path.join(sessionsDir, name);
  191. try {
  192. const stat = await fs.stat(fullPath);
  193. return { fullPath, mtimeMs: Number(stat.mtimeMs) || 0 };
  194. } catch {
  195. return null;
  196. }
  197. }),
  198. );
  199. const existing = withStats.filter(Boolean);
  200. if (!existing.length) return "";
  201. existing.sort((a, b) => b.mtimeMs - a.mtimeMs);
  202. return existing[0].fullPath;
  203. } catch {
  204. return "";
  205. }
  206. }
  207. function extractTextFromMessageContent(content) {
  208. if (!Array.isArray(content)) return "";
  209. const parts = [];
  210. for (const chunk of content) {
  211. if (!chunk || typeof chunk !== "object") continue;
  212. if (chunk.type === "text" && typeof chunk.text === "string") {
  213. parts.push(chunk.text);
  214. }
  215. }
  216. return oneLine(parts.join("\n"));
  217. }
  218. async function readFileTail(filePath, maxBytes) {
  219. const stat = await fs.stat(filePath);
  220. const size = Number(stat.size) || 0;
  221. const start = Math.max(0, size - maxBytes);
  222. const bytesToRead = size - start;
  223. if (bytesToRead <= 0) return "";
  224. const file = await fs.open(filePath, "r");
  225. try {
  226. const buffer = Buffer.alloc(bytesToRead);
  227. await file.read(buffer, 0, bytesToRead, start);
  228. return buffer.toString("utf8");
  229. } finally {
  230. await file.close();
  231. }
  232. }
  233. async function extractUserMessagesFromTranscript(stateDir, event) {
  234. const transcriptPath = await resolveSessionTranscriptPath(stateDir, event);
  235. if (!transcriptPath) return [];
  236. try {
  237. const text = await readFileTail(transcriptPath, SESSION_TAIL_BYTES);
  238. const lines = text.split("\n").map((line) => line.trim()).filter(Boolean);
  239. const messages = [];
  240. const seen = new Set();
  241. for (let i = 0; i < lines.length; i += 1) {
  242. const line = lines[i];
  243. let row;
  244. try {
  245. row = JSON.parse(line);
  246. } catch {
  247. continue;
  248. }
  249. if (row?.type !== "message") continue;
  250. if (row?.message?.role !== "user") continue;
  251. const content = extractTextFromMessageContent(row?.message?.content);
  252. if (!content) continue;
  253. const timestamp = Date.parse(row.timestamp || "");
  254. const id = safeTrim(row.id);
  255. const fallbackId = stableHash(`${safeTrim(row.timestamp)}::${content}`);
  256. const messageId = id || `bootstrap-${fallbackId}`;
  257. if (seen.has(messageId)) continue;
  258. seen.add(messageId);
  259. messages.push({
  260. messageId,
  261. content,
  262. timestamp: Number.isFinite(timestamp) ? timestamp : Date.now(),
  263. });
  264. }
  265. if (messages.length <= SESSION_USER_SCAN_LIMIT) {
  266. return messages;
  267. }
  268. return messages.slice(messages.length - SESSION_USER_SCAN_LIMIT);
  269. } catch (error) {
  270. await appendLog(stateDir, `WARN transcript parse failed: ${String(error.message || error)}`);
  271. }
  272. return [];
  273. }
  274. async function closeConversationIfPresent(event, runtime, state) {
  275. const sessionKey = safeTrim(event?.sessionKey);
  276. if (!sessionKey) return false;
  277. const sessionState = state.sessions[sessionKey];
  278. if (!sessionState?.rootTaskId) return false;
  279. const endTime = toIsoFromEvent(event);
  280. try {
  281. await requestJson(
  282. runtime.apiBaseUrl,
  283. `/api/admin/nodes/${encodeURIComponent(sessionState.rootTaskId)}`,
  284. "PUT",
  285. {
  286. status: "completed",
  287. timestamps: {
  288. completed: endTime,
  289. },
  290. notes: oneLine(`${safeTrim(sessionState.notes) || ""} Conversation closed by /${event.action}.`),
  291. },
  292. {
  293. Authorization: `Bearer ${runtime.adminToken}`,
  294. },
  295. );
  296. } catch (error) {
  297. await appendLog(runtime.stateDir, `WARN close root failed session=${sessionKey}: ${String(error.message || error)}`);
  298. }
  299. const lastIndex = Number(sessionState.index) || Number(state.counters[sessionKey]) || 0;
  300. state.counters[sessionKey] = Math.max(1, lastIndex);
  301. delete state.sessions[sessionKey];
  302. state.updatedAt = nowIso();
  303. return true;
  304. }
  305. async function ensureRootTask(event, runtime, state) {
  306. const sessionKey = safeTrim(event?.sessionKey);
  307. if (!sessionKey) return null;
  308. const existing = state.sessions[sessionKey];
  309. if (existing?.rootTaskId) {
  310. return existing;
  311. }
  312. const index = (Number(state.counters[sessionKey]) || 0) + 1;
  313. const preview = truncate(oneLine(event?.context?.content || ""), 26) || "new conversation";
  314. const rootName = `Conversation #${index} · ${preview}`;
  315. const startedAt = toIsoFromEvent(event);
  316. const created = await requestJson(
  317. runtime.apiBaseUrl,
  318. "/api/admin/nodes",
  319. "POST",
  320. {
  321. conversationId: sessionKey,
  322. name: rootName,
  323. nodeType: "prompt",
  324. status: "in-progress",
  325. content: oneLine(event?.context?.content || ""),
  326. timestamps: {
  327. created: startedAt,
  328. started: startedAt,
  329. },
  330. plannedApproach: "Auto-synced from OpenClaw conversation events.",
  331. notes: `sessionKey=${sessionKey}`,
  332. },
  333. {
  334. Authorization: `Bearer ${runtime.adminToken}`,
  335. },
  336. );
  337. const sessionState = {
  338. rootTaskId: String(created.id),
  339. index,
  340. startedAt,
  341. notes: `sessionKey=${sessionKey}`,
  342. };
  343. state.sessions[sessionKey] = sessionState;
  344. state.updatedAt = nowIso();
  345. return sessionState;
  346. }
  347. async function appendMessageChild(event, runtime, state, sessionState) {
  348. const content = oneLine(event?.context?.content || "");
  349. if (!content) return false;
  350. if (runtime.ignoreSlashCommands && content.startsWith("/")) return false;
  351. const atIso = toIsoFromEvent(event);
  352. const dedupKey = buildMessageDedupKey(event, content, atIso);
  353. if (state.messages[dedupKey]) return false;
  354. const channelId = safeTrim(event?.context?.channelId) || "unknown";
  355. const messageId = safeTrim(event?.context?.messageId) || "n/a";
  356. const childName = `Turn · ${truncate(content, 36)}`;
  357. const notes = [
  358. `content=${content}`,
  359. `messageId=${messageId}`,
  360. `channelId=${channelId}`,
  361. `sessionKey=${safeTrim(event?.sessionKey) || "unknown"}`,
  362. ].join("\n");
  363. await requestJson(
  364. runtime.apiBaseUrl,
  365. "/api/admin/nodes",
  366. "POST",
  367. {
  368. conversationId: safeTrim(event?.sessionKey) || "unknown-session",
  369. name: childName,
  370. parentId: sessionState.rootTaskId,
  371. nodeType: "thought",
  372. status: "completed",
  373. content,
  374. timestamps: {
  375. created: atIso,
  376. started: atIso,
  377. completed: atIso,
  378. },
  379. notes,
  380. },
  381. {
  382. Authorization: `Bearer ${runtime.adminToken}`,
  383. },
  384. );
  385. state.messages[dedupKey] = nowIso();
  386. cleanupSeenMessages(state, runtime.maxRemembered);
  387. state.updatedAt = nowIso();
  388. return true;
  389. }
  390. async function reconcileTranscriptMessages(event, runtime, state, stateDir) {
  391. let createdAny = false;
  392. const transcriptMessages = await extractUserMessagesFromTranscript(stateDir, event);
  393. if (!transcriptMessages.length) return false;
  394. for (const item of transcriptMessages) {
  395. const syntheticEvent = {
  396. ...event,
  397. type: "message",
  398. action: "received",
  399. context: {
  400. ...(event?.context || {}),
  401. content: item.content,
  402. messageId: item.messageId,
  403. timestamp: item.timestamp,
  404. channelId: "agent-bootstrap",
  405. },
  406. };
  407. const sessionState = await ensureRootTask(syntheticEvent, runtime, state);
  408. if (!sessionState) continue;
  409. const created = await appendMessageChild(syntheticEvent, runtime, state, sessionState);
  410. createdAny = createdAny || created;
  411. }
  412. return createdAny;
  413. }
  414. function scheduleDeferredReconcile(event, runtime) {
  415. if (event?.context?._taskBoardDeferred) return;
  416. for (const delayMs of DEFERRED_RECONCILE_DELAYS_MS) {
  417. const timer = setTimeout(() => {
  418. void (async () => {
  419. try {
  420. const deferredState = normalizeState(await readJson(runtime.stateFile, {}));
  421. const deferredEvent = {
  422. ...event,
  423. context: {
  424. ...(event?.context || {}),
  425. _taskBoardDeferred: true,
  426. },
  427. };
  428. const createdAny = await reconcileTranscriptMessages(deferredEvent, runtime, deferredState, runtime.stateDir);
  429. if (createdAny) {
  430. await writeJsonAtomic(runtime.stateFile, deferredState);
  431. }
  432. } catch (error) {
  433. const message = error instanceof Error ? error.message : String(error);
  434. await appendLog(runtime.stateDir, `WARN deferred reconcile failed: ${message}`);
  435. }
  436. })();
  437. }, delayMs);
  438. if (typeof timer?.unref === "function") {
  439. timer.unref();
  440. }
  441. }
  442. }
  443. export default async function taskBoardRealtimeHook(event) {
  444. const stateDir = resolveStateDir();
  445. const configPath = resolveConfigPath(stateDir);
  446. const config = await readJson(configPath, {});
  447. const runtime = resolveHookConfig(config, stateDir);
  448. runtime.stateDir = stateDir;
  449. const state = normalizeState(await readJson(runtime.stateFile, {}));
  450. try {
  451. if (event?.type === "command" && (event?.action === "new" || event?.action === "reset")) {
  452. const changed = await closeConversationIfPresent(event, runtime, state);
  453. if (changed) {
  454. await writeJsonAtomic(runtime.stateFile, state);
  455. }
  456. return;
  457. }
  458. const isMessageReceived = event?.type === "message" && event?.action === "received";
  459. const shouldScanTranscript =
  460. (event?.type === "agent" && event?.action === "bootstrap") ||
  461. (event?.type === "message" && event?.action === "sent") ||
  462. (event?.type === "command" && event?.action === "stop");
  463. if (!isMessageReceived) {
  464. if (!shouldScanTranscript) {
  465. return;
  466. }
  467. const createdAny = await reconcileTranscriptMessages(event, runtime, state, stateDir);
  468. if (createdAny) {
  469. await writeJsonAtomic(runtime.stateFile, state);
  470. }
  471. scheduleDeferredReconcile(event, runtime);
  472. return;
  473. }
  474. const sessionState = await ensureRootTask(event, runtime, state);
  475. if (!sessionState) return;
  476. const created = await appendMessageChild(event, runtime, state, sessionState);
  477. if (created) {
  478. await writeJsonAtomic(runtime.stateFile, state);
  479. }
  480. } catch (error) {
  481. const message = error instanceof Error ? error.message : String(error);
  482. await appendLog(stateDir, `ERROR event=${event?.type || "unknown"}:${event?.action || "unknown"} ${message}`);
  483. }
  484. }