import fs from "node:fs/promises"; import path from "node:path"; import os from "node:os"; import crypto from "node:crypto"; const HOOK_KEY = "task-board-realtime"; const DEFAULT_API_BASE = "http://127.0.0.1:3001"; const DEFAULT_MAX_REMEMBERED = 3000; const DEFAULT_ADMIN_TOKEN = "dev-task-board-token"; const SESSION_TAIL_BYTES = 2 * 1024 * 1024; const SESSION_USER_SCAN_LIMIT = 80; const DEFERRED_RECONCILE_DELAYS_MS = [1600, 4800]; function nowIso() { return new Date().toISOString(); } function safeTrim(value) { return typeof value === "string" ? value.trim() : ""; } function oneLine(text) { return safeTrim(text).replace(/\s+/g, " "); } function truncate(text, maxChars) { if (!text) return ""; if (text.length <= maxChars) return text; return `${text.slice(0, Math.max(1, maxChars - 1))}...`; } function stableHash(input) { return crypto.createHash("sha1").update(input).digest("hex").slice(0, 12); } function resolveStateDir() { const explicit = safeTrim(process.env.OPENCLAW_STATE_DIR); if (explicit) return explicit; return path.join(os.homedir(), ".openclaw"); } function resolveConfigPath(stateDir) { const explicit = safeTrim(process.env.OPENCLAW_CONFIG_PATH); if (explicit) return explicit; return path.join(stateDir, "openclaw.json"); } async function readJson(filePath, fallback) { try { const raw = await fs.readFile(filePath, "utf8"); return JSON.parse(raw); } catch { return fallback; } } async function writeJsonAtomic(filePath, payload) { await fs.mkdir(path.dirname(filePath), { recursive: true }); const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`; await fs.writeFile(tempPath, `${JSON.stringify(payload, null, 2)}\n`, "utf8"); await fs.rename(tempPath, filePath); } async function appendLog(stateDir, message) { try { const logPath = path.join(stateDir, "logs", "task-board-realtime.log"); await fs.mkdir(path.dirname(logPath), { recursive: true }); await fs.appendFile(logPath, `[${nowIso()}] ${message}\n`, "utf8"); } catch { // Never fail hook execution on logging errors. } } function resolveHookConfig(config, stateDir) { const workspaceDir = safeTrim(config?.agents?.defaults?.workspace) || safeTrim(config?.workspace?.dir) || path.join(stateDir, "workspace"); const cfg = config?.hooks?.internal?.entries?.[HOOK_KEY] ?? {}; const apiBaseUrl = safeTrim(cfg.apiBaseUrl) || safeTrim(process.env.TASK_BOARD_API_BASE) || DEFAULT_API_BASE; const adminToken = safeTrim(cfg.adminToken) || safeTrim(process.env.TASK_BOARD_ADMIN_TOKEN) || DEFAULT_ADMIN_TOKEN; const stateFileRaw = safeTrim(cfg.stateFile); const stateFile = stateFileRaw ? path.isAbsolute(stateFileRaw) ? stateFileRaw : path.join(workspaceDir, stateFileRaw) : path.join(workspaceDir, "task-board", ".sync-state.json"); const maxRemembered = Number.isFinite(cfg.maxRememberedMessages) ? Math.max(200, Math.floor(cfg.maxRememberedMessages)) : DEFAULT_MAX_REMEMBERED; return { workspaceDir, apiBaseUrl: apiBaseUrl.replace(/\/+$/, ""), stateFile, adminToken, ignoreSlashCommands: cfg.ignoreSlashCommands !== false, maxRemembered, }; } function normalizeState(state) { const safe = state && typeof state === "object" ? state : {}; return { version: 1, sessions: safe.sessions && typeof safe.sessions === "object" ? safe.sessions : {}, counters: safe.counters && typeof safe.counters === "object" ? safe.counters : {}, messages: safe.messages && typeof safe.messages === "object" ? safe.messages : {}, updatedAt: safeTrim(safe.updatedAt) || nowIso(), }; } function cleanupSeenMessages(state, maxRemembered) { const entries = Object.entries(state.messages).sort((a, b) => { const ta = Date.parse(a[1]) || 0; const tb = Date.parse(b[1]) || 0; return tb - ta; }); if (entries.length <= maxRemembered) return; for (const [key] of entries.slice(maxRemembered)) { delete state.messages[key]; } } function toIsoFromEvent(event) { const contextTs = Number(event?.context?.timestamp); if (Number.isFinite(contextTs)) { return new Date(contextTs).toISOString(); } if (event?.timestamp instanceof Date && Number.isFinite(event.timestamp.getTime())) { return event.timestamp.toISOString(); } return nowIso(); } async function requestJson(apiBaseUrl, urlPath, method = "GET", payload, extraHeaders = {}) { const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), 8000); try { const response = await fetch(`${apiBaseUrl}${urlPath}`, { method, headers: { "Content-Type": "application/json", ...extraHeaders, }, body: payload === undefined ? undefined : JSON.stringify(payload), signal: controller.signal, }); const data = await response.json().catch(() => ({})); if (!response.ok) { throw new Error(`${method} ${urlPath} failed: ${response.status} ${data.error || ""}`.trim()); } return data; } finally { clearTimeout(timer); } } function buildMessageDedupKey(event, content, isoAt) { const sessionKey = safeTrim(event?.sessionKey) || "unknown-session"; const messageId = safeTrim(event?.context?.messageId); if (messageId) { return `${sessionKey}::${messageId}`; } return `${sessionKey}::${stableHash(`${content}::${isoAt}`)}`; } function resolveAgentId(event) { const fromContext = safeTrim(event?.context?.agentId); const fromSessionKey = safeTrim(event?.sessionKey).split(":")[1] || ""; return fromContext || fromSessionKey || "main"; } async function resolveSessionIdFromRegistry(stateDir, event) { const sessionKey = safeTrim(event?.sessionKey); if (!sessionKey) return ""; const agentId = resolveAgentId(event); const registryPath = path.join(stateDir, "agents", agentId, "sessions", "sessions.json"); const registry = await readJson(registryPath, {}); const direct = registry?.[sessionKey]; if (safeTrim(direct?.sessionId)) return safeTrim(direct.sessionId); const lowered = sessionKey.toLowerCase(); const loweredMatch = registry?.[lowered]; if (safeTrim(loweredMatch?.sessionId)) return safeTrim(loweredMatch.sessionId); return ""; } async function resolveSessionTranscriptPath(stateDir, event) { const explicitSessionId = safeTrim(event?.context?.sessionId); const sessionId = explicitSessionId || (await resolveSessionIdFromRegistry(stateDir, event)); if (!sessionId) return ""; const agentId = resolveAgentId(event); const sessionsDir = path.join(stateDir, "agents", agentId, "sessions"); const exactPath = path.join(sessionsDir, `${sessionId}.jsonl`); try { const entries = await fs.readdir(sessionsDir, { withFileTypes: true }); const prefixed = entries .filter((entry) => entry.isFile()) .map((entry) => entry.name) .filter((name) => name.startsWith(`${sessionId}-`) && name.endsWith(".jsonl")); const candidates = [path.basename(exactPath), ...prefixed]; if (!candidates.length) return ""; const withStats = await Promise.all( candidates.map(async (name) => { const fullPath = path.join(sessionsDir, name); try { const stat = await fs.stat(fullPath); return { fullPath, mtimeMs: Number(stat.mtimeMs) || 0 }; } catch { return null; } }), ); const existing = withStats.filter(Boolean); if (!existing.length) return ""; existing.sort((a, b) => b.mtimeMs - a.mtimeMs); return existing[0].fullPath; } catch { return ""; } } function extractTextFromMessageContent(content) { if (!Array.isArray(content)) return ""; const parts = []; for (const chunk of content) { if (!chunk || typeof chunk !== "object") continue; if (chunk.type === "text" && typeof chunk.text === "string") { parts.push(chunk.text); } } return oneLine(parts.join("\n")); } async function readFileTail(filePath, maxBytes) { const stat = await fs.stat(filePath); const size = Number(stat.size) || 0; const start = Math.max(0, size - maxBytes); const bytesToRead = size - start; if (bytesToRead <= 0) return ""; const file = await fs.open(filePath, "r"); try { const buffer = Buffer.alloc(bytesToRead); await file.read(buffer, 0, bytesToRead, start); return buffer.toString("utf8"); } finally { await file.close(); } } async function extractUserMessagesFromTranscript(stateDir, event) { const transcriptPath = await resolveSessionTranscriptPath(stateDir, event); if (!transcriptPath) return []; try { const text = await readFileTail(transcriptPath, SESSION_TAIL_BYTES); const lines = text.split("\n").map((line) => line.trim()).filter(Boolean); const messages = []; const seen = new Set(); for (let i = 0; i < lines.length; i += 1) { const line = lines[i]; let row; try { row = JSON.parse(line); } catch { continue; } if (row?.type !== "message") continue; if (row?.message?.role !== "user") continue; const content = extractTextFromMessageContent(row?.message?.content); if (!content) continue; const timestamp = Date.parse(row.timestamp || ""); const id = safeTrim(row.id); const fallbackId = stableHash(`${safeTrim(row.timestamp)}::${content}`); const messageId = id || `bootstrap-${fallbackId}`; if (seen.has(messageId)) continue; seen.add(messageId); messages.push({ messageId, content, timestamp: Number.isFinite(timestamp) ? timestamp : Date.now(), }); } if (messages.length <= SESSION_USER_SCAN_LIMIT) { return messages; } return messages.slice(messages.length - SESSION_USER_SCAN_LIMIT); } catch (error) { await appendLog(stateDir, `WARN transcript parse failed: ${String(error.message || error)}`); } return []; } async function closeConversationIfPresent(event, runtime, state) { const sessionKey = safeTrim(event?.sessionKey); if (!sessionKey) return false; const sessionState = state.sessions[sessionKey]; if (!sessionState?.rootTaskId) return false; const endTime = toIsoFromEvent(event); try { await requestJson( runtime.apiBaseUrl, `/api/admin/nodes/${encodeURIComponent(sessionState.rootTaskId)}`, "PUT", { status: "completed", timestamps: { completed: endTime, }, notes: oneLine(`${safeTrim(sessionState.notes) || ""} Conversation closed by /${event.action}.`), }, { Authorization: `Bearer ${runtime.adminToken}`, }, ); } catch (error) { await appendLog(runtime.stateDir, `WARN close root failed session=${sessionKey}: ${String(error.message || error)}`); } const lastIndex = Number(sessionState.index) || Number(state.counters[sessionKey]) || 0; state.counters[sessionKey] = Math.max(1, lastIndex); delete state.sessions[sessionKey]; state.updatedAt = nowIso(); return true; } async function ensureRootTask(event, runtime, state) { const sessionKey = safeTrim(event?.sessionKey); if (!sessionKey) return null; const existing = state.sessions[sessionKey]; if (existing?.rootTaskId) { return existing; } const index = (Number(state.counters[sessionKey]) || 0) + 1; const preview = truncate(oneLine(event?.context?.content || ""), 26) || "new conversation"; const rootName = `Conversation #${index} · ${preview}`; const startedAt = toIsoFromEvent(event); const created = await requestJson( runtime.apiBaseUrl, "/api/admin/nodes", "POST", { conversationId: sessionKey, name: rootName, nodeType: "prompt", status: "in-progress", content: oneLine(event?.context?.content || ""), timestamps: { created: startedAt, started: startedAt, }, plannedApproach: "Auto-synced from OpenClaw conversation events.", notes: `sessionKey=${sessionKey}`, }, { Authorization: `Bearer ${runtime.adminToken}`, }, ); const sessionState = { rootTaskId: String(created.id), index, startedAt, notes: `sessionKey=${sessionKey}`, }; state.sessions[sessionKey] = sessionState; state.updatedAt = nowIso(); return sessionState; } async function appendMessageChild(event, runtime, state, sessionState) { const content = oneLine(event?.context?.content || ""); if (!content) return false; if (runtime.ignoreSlashCommands && content.startsWith("/")) return false; const atIso = toIsoFromEvent(event); const dedupKey = buildMessageDedupKey(event, content, atIso); if (state.messages[dedupKey]) return false; const channelId = safeTrim(event?.context?.channelId) || "unknown"; const messageId = safeTrim(event?.context?.messageId) || "n/a"; const childName = `Turn · ${truncate(content, 36)}`; const notes = [ `content=${content}`, `messageId=${messageId}`, `channelId=${channelId}`, `sessionKey=${safeTrim(event?.sessionKey) || "unknown"}`, ].join("\n"); await requestJson( runtime.apiBaseUrl, "/api/admin/nodes", "POST", { conversationId: safeTrim(event?.sessionKey) || "unknown-session", name: childName, parentId: sessionState.rootTaskId, nodeType: "thought", status: "completed", content, timestamps: { created: atIso, started: atIso, completed: atIso, }, notes, }, { Authorization: `Bearer ${runtime.adminToken}`, }, ); state.messages[dedupKey] = nowIso(); cleanupSeenMessages(state, runtime.maxRemembered); state.updatedAt = nowIso(); return true; } async function reconcileTranscriptMessages(event, runtime, state, stateDir) { let createdAny = false; const transcriptMessages = await extractUserMessagesFromTranscript(stateDir, event); if (!transcriptMessages.length) return false; for (const item of transcriptMessages) { const syntheticEvent = { ...event, type: "message", action: "received", context: { ...(event?.context || {}), content: item.content, messageId: item.messageId, timestamp: item.timestamp, channelId: "agent-bootstrap", }, }; const sessionState = await ensureRootTask(syntheticEvent, runtime, state); if (!sessionState) continue; const created = await appendMessageChild(syntheticEvent, runtime, state, sessionState); createdAny = createdAny || created; } return createdAny; } function scheduleDeferredReconcile(event, runtime) { if (event?.context?._taskBoardDeferred) return; for (const delayMs of DEFERRED_RECONCILE_DELAYS_MS) { const timer = setTimeout(() => { void (async () => { try { const deferredState = normalizeState(await readJson(runtime.stateFile, {})); const deferredEvent = { ...event, context: { ...(event?.context || {}), _taskBoardDeferred: true, }, }; const createdAny = await reconcileTranscriptMessages(deferredEvent, runtime, deferredState, runtime.stateDir); if (createdAny) { await writeJsonAtomic(runtime.stateFile, deferredState); } } catch (error) { const message = error instanceof Error ? error.message : String(error); await appendLog(runtime.stateDir, `WARN deferred reconcile failed: ${message}`); } })(); }, delayMs); if (typeof timer?.unref === "function") { timer.unref(); } } } export default async function taskBoardRealtimeHook(event) { const stateDir = resolveStateDir(); const configPath = resolveConfigPath(stateDir); const config = await readJson(configPath, {}); const runtime = resolveHookConfig(config, stateDir); runtime.stateDir = stateDir; const state = normalizeState(await readJson(runtime.stateFile, {})); try { if (event?.type === "command" && (event?.action === "new" || event?.action === "reset")) { const changed = await closeConversationIfPresent(event, runtime, state); if (changed) { await writeJsonAtomic(runtime.stateFile, state); } return; } const isMessageReceived = event?.type === "message" && event?.action === "received"; const shouldScanTranscript = (event?.type === "agent" && event?.action === "bootstrap") || (event?.type === "message" && event?.action === "sent") || (event?.type === "command" && event?.action === "stop"); if (!isMessageReceived) { if (!shouldScanTranscript) { return; } const createdAny = await reconcileTranscriptMessages(event, runtime, state, stateDir); if (createdAny) { await writeJsonAtomic(runtime.stateFile, state); } scheduleDeferredReconcile(event, runtime); return; } const sessionState = await ensureRootTask(event, runtime, state); if (!sessionState) return; const created = await appendMessageChild(event, runtime, state, sessionState); if (created) { await writeJsonAtomic(runtime.stateFile, state); } } catch (error) { const message = error instanceof Error ? error.message : String(error); await appendLog(stateDir, `ERROR event=${event?.type || "unknown"}:${event?.action || "unknown"} ${message}`); } }