| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- import fs from "node:fs/promises";
- import path from "node:path";
- import os from "node:os";
- import crypto from "node:crypto";
- const HOOK_KEY = "task-board-realtime";
- const DEFAULT_API_BASE = "http://127.0.0.1:3001";
- const DEFAULT_MAX_REMEMBERED = 3000;
- const DEFAULT_ADMIN_TOKEN = "dev-task-board-token";
- const SESSION_TAIL_BYTES = 2 * 1024 * 1024;
- const SESSION_USER_SCAN_LIMIT = 80;
- const DEFERRED_RECONCILE_DELAYS_MS = [1600, 4800];
- function nowIso() {
- return new Date().toISOString();
- }
- function safeTrim(value) {
- return typeof value === "string" ? value.trim() : "";
- }
- function oneLine(text) {
- return safeTrim(text).replace(/\s+/g, " ");
- }
- function truncate(text, maxChars) {
- if (!text) return "";
- if (text.length <= maxChars) return text;
- return `${text.slice(0, Math.max(1, maxChars - 1))}...`;
- }
- function stableHash(input) {
- return crypto.createHash("sha1").update(input).digest("hex").slice(0, 12);
- }
- function resolveStateDir() {
- const explicit = safeTrim(process.env.OPENCLAW_STATE_DIR);
- if (explicit) return explicit;
- return path.join(os.homedir(), ".openclaw");
- }
- function resolveConfigPath(stateDir) {
- const explicit = safeTrim(process.env.OPENCLAW_CONFIG_PATH);
- if (explicit) return explicit;
- return path.join(stateDir, "openclaw.json");
- }
- async function readJson(filePath, fallback) {
- try {
- const raw = await fs.readFile(filePath, "utf8");
- return JSON.parse(raw);
- } catch {
- return fallback;
- }
- }
- async function writeJsonAtomic(filePath, payload) {
- await fs.mkdir(path.dirname(filePath), { recursive: true });
- const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
- await fs.writeFile(tempPath, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
- await fs.rename(tempPath, filePath);
- }
- async function appendLog(stateDir, message) {
- try {
- const logPath = path.join(stateDir, "logs", "task-board-realtime.log");
- await fs.mkdir(path.dirname(logPath), { recursive: true });
- await fs.appendFile(logPath, `[${nowIso()}] ${message}\n`, "utf8");
- } catch {
- // Never fail hook execution on logging errors.
- }
- }
- function resolveHookConfig(config, stateDir) {
- const workspaceDir =
- safeTrim(config?.agents?.defaults?.workspace) ||
- safeTrim(config?.workspace?.dir) ||
- path.join(stateDir, "workspace");
- const cfg = config?.hooks?.internal?.entries?.[HOOK_KEY] ?? {};
- const apiBaseUrl =
- safeTrim(cfg.apiBaseUrl) ||
- safeTrim(process.env.TASK_BOARD_API_BASE) ||
- DEFAULT_API_BASE;
- const adminToken =
- safeTrim(cfg.adminToken) ||
- safeTrim(process.env.TASK_BOARD_ADMIN_TOKEN) ||
- DEFAULT_ADMIN_TOKEN;
- const stateFileRaw = safeTrim(cfg.stateFile);
- const stateFile = stateFileRaw
- ? path.isAbsolute(stateFileRaw)
- ? stateFileRaw
- : path.join(workspaceDir, stateFileRaw)
- : path.join(workspaceDir, "task-board", ".sync-state.json");
- const maxRemembered = Number.isFinite(cfg.maxRememberedMessages)
- ? Math.max(200, Math.floor(cfg.maxRememberedMessages))
- : DEFAULT_MAX_REMEMBERED;
- return {
- workspaceDir,
- apiBaseUrl: apiBaseUrl.replace(/\/+$/, ""),
- stateFile,
- adminToken,
- ignoreSlashCommands: cfg.ignoreSlashCommands !== false,
- maxRemembered,
- };
- }
- function normalizeState(state) {
- const safe = state && typeof state === "object" ? state : {};
- return {
- version: 1,
- sessions: safe.sessions && typeof safe.sessions === "object" ? safe.sessions : {},
- counters: safe.counters && typeof safe.counters === "object" ? safe.counters : {},
- messages: safe.messages && typeof safe.messages === "object" ? safe.messages : {},
- updatedAt: safeTrim(safe.updatedAt) || nowIso(),
- };
- }
- function cleanupSeenMessages(state, maxRemembered) {
- const entries = Object.entries(state.messages).sort((a, b) => {
- const ta = Date.parse(a[1]) || 0;
- const tb = Date.parse(b[1]) || 0;
- return tb - ta;
- });
- if (entries.length <= maxRemembered) return;
- for (const [key] of entries.slice(maxRemembered)) {
- delete state.messages[key];
- }
- }
- function toIsoFromEvent(event) {
- const contextTs = Number(event?.context?.timestamp);
- if (Number.isFinite(contextTs)) {
- return new Date(contextTs).toISOString();
- }
- if (event?.timestamp instanceof Date && Number.isFinite(event.timestamp.getTime())) {
- return event.timestamp.toISOString();
- }
- return nowIso();
- }
- async function requestJson(apiBaseUrl, urlPath, method = "GET", payload, extraHeaders = {}) {
- const controller = new AbortController();
- const timer = setTimeout(() => controller.abort(), 8000);
- try {
- const response = await fetch(`${apiBaseUrl}${urlPath}`, {
- method,
- headers: {
- "Content-Type": "application/json",
- ...extraHeaders,
- },
- body: payload === undefined ? undefined : JSON.stringify(payload),
- signal: controller.signal,
- });
- const data = await response.json().catch(() => ({}));
- if (!response.ok) {
- throw new Error(`${method} ${urlPath} failed: ${response.status} ${data.error || ""}`.trim());
- }
- return data;
- } finally {
- clearTimeout(timer);
- }
- }
- function buildMessageDedupKey(event, content, isoAt) {
- const sessionKey = safeTrim(event?.sessionKey) || "unknown-session";
- const messageId = safeTrim(event?.context?.messageId);
- if (messageId) {
- return `${sessionKey}::${messageId}`;
- }
- return `${sessionKey}::${stableHash(`${content}::${isoAt}`)}`;
- }
- function resolveAgentId(event) {
- const fromContext = safeTrim(event?.context?.agentId);
- const fromSessionKey = safeTrim(event?.sessionKey).split(":")[1] || "";
- return fromContext || fromSessionKey || "main";
- }
- async function resolveSessionIdFromRegistry(stateDir, event) {
- const sessionKey = safeTrim(event?.sessionKey);
- if (!sessionKey) return "";
- const agentId = resolveAgentId(event);
- const registryPath = path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
- const registry = await readJson(registryPath, {});
- const direct = registry?.[sessionKey];
- if (safeTrim(direct?.sessionId)) return safeTrim(direct.sessionId);
- const lowered = sessionKey.toLowerCase();
- const loweredMatch = registry?.[lowered];
- if (safeTrim(loweredMatch?.sessionId)) return safeTrim(loweredMatch.sessionId);
- return "";
- }
- async function resolveSessionTranscriptPath(stateDir, event) {
- const explicitSessionId = safeTrim(event?.context?.sessionId);
- const sessionId = explicitSessionId || (await resolveSessionIdFromRegistry(stateDir, event));
- if (!sessionId) return "";
- const agentId = resolveAgentId(event);
- const sessionsDir = path.join(stateDir, "agents", agentId, "sessions");
- const exactPath = path.join(sessionsDir, `${sessionId}.jsonl`);
- try {
- const entries = await fs.readdir(sessionsDir, { withFileTypes: true });
- const prefixed = entries
- .filter((entry) => entry.isFile())
- .map((entry) => entry.name)
- .filter((name) => name.startsWith(`${sessionId}-`) && name.endsWith(".jsonl"));
- const candidates = [path.basename(exactPath), ...prefixed];
- if (!candidates.length) return "";
- const withStats = await Promise.all(
- candidates.map(async (name) => {
- const fullPath = path.join(sessionsDir, name);
- try {
- const stat = await fs.stat(fullPath);
- return { fullPath, mtimeMs: Number(stat.mtimeMs) || 0 };
- } catch {
- return null;
- }
- }),
- );
- const existing = withStats.filter(Boolean);
- if (!existing.length) return "";
- existing.sort((a, b) => b.mtimeMs - a.mtimeMs);
- return existing[0].fullPath;
- } catch {
- return "";
- }
- }
- function extractTextFromMessageContent(content) {
- if (!Array.isArray(content)) return "";
- const parts = [];
- for (const chunk of content) {
- if (!chunk || typeof chunk !== "object") continue;
- if (chunk.type === "text" && typeof chunk.text === "string") {
- parts.push(chunk.text);
- }
- }
- return oneLine(parts.join("\n"));
- }
- async function readFileTail(filePath, maxBytes) {
- const stat = await fs.stat(filePath);
- const size = Number(stat.size) || 0;
- const start = Math.max(0, size - maxBytes);
- const bytesToRead = size - start;
- if (bytesToRead <= 0) return "";
- const file = await fs.open(filePath, "r");
- try {
- const buffer = Buffer.alloc(bytesToRead);
- await file.read(buffer, 0, bytesToRead, start);
- return buffer.toString("utf8");
- } finally {
- await file.close();
- }
- }
- async function extractUserMessagesFromTranscript(stateDir, event) {
- const transcriptPath = await resolveSessionTranscriptPath(stateDir, event);
- if (!transcriptPath) return [];
- try {
- const text = await readFileTail(transcriptPath, SESSION_TAIL_BYTES);
- const lines = text.split("\n").map((line) => line.trim()).filter(Boolean);
- const messages = [];
- const seen = new Set();
- for (let i = 0; i < lines.length; i += 1) {
- const line = lines[i];
- let row;
- try {
- row = JSON.parse(line);
- } catch {
- continue;
- }
- if (row?.type !== "message") continue;
- if (row?.message?.role !== "user") continue;
- const content = extractTextFromMessageContent(row?.message?.content);
- if (!content) continue;
- const timestamp = Date.parse(row.timestamp || "");
- const id = safeTrim(row.id);
- const fallbackId = stableHash(`${safeTrim(row.timestamp)}::${content}`);
- const messageId = id || `bootstrap-${fallbackId}`;
- if (seen.has(messageId)) continue;
- seen.add(messageId);
- messages.push({
- messageId,
- content,
- timestamp: Number.isFinite(timestamp) ? timestamp : Date.now(),
- });
- }
- if (messages.length <= SESSION_USER_SCAN_LIMIT) {
- return messages;
- }
- return messages.slice(messages.length - SESSION_USER_SCAN_LIMIT);
- } catch (error) {
- await appendLog(stateDir, `WARN transcript parse failed: ${String(error.message || error)}`);
- }
- return [];
- }
- async function closeConversationIfPresent(event, runtime, state) {
- const sessionKey = safeTrim(event?.sessionKey);
- if (!sessionKey) return false;
- const sessionState = state.sessions[sessionKey];
- if (!sessionState?.rootTaskId) return false;
- const endTime = toIsoFromEvent(event);
- try {
- await requestJson(
- runtime.apiBaseUrl,
- `/api/admin/nodes/${encodeURIComponent(sessionState.rootTaskId)}`,
- "PUT",
- {
- status: "completed",
- timestamps: {
- completed: endTime,
- },
- notes: oneLine(`${safeTrim(sessionState.notes) || ""} Conversation closed by /${event.action}.`),
- },
- {
- Authorization: `Bearer ${runtime.adminToken}`,
- },
- );
- } catch (error) {
- await appendLog(runtime.stateDir, `WARN close root failed session=${sessionKey}: ${String(error.message || error)}`);
- }
- const lastIndex = Number(sessionState.index) || Number(state.counters[sessionKey]) || 0;
- state.counters[sessionKey] = Math.max(1, lastIndex);
- delete state.sessions[sessionKey];
- state.updatedAt = nowIso();
- return true;
- }
- async function ensureRootTask(event, runtime, state) {
- const sessionKey = safeTrim(event?.sessionKey);
- if (!sessionKey) return null;
- const existing = state.sessions[sessionKey];
- if (existing?.rootTaskId) {
- return existing;
- }
- const index = (Number(state.counters[sessionKey]) || 0) + 1;
- const preview = truncate(oneLine(event?.context?.content || ""), 26) || "new conversation";
- const rootName = `Conversation #${index} · ${preview}`;
- const startedAt = toIsoFromEvent(event);
- const created = await requestJson(
- runtime.apiBaseUrl,
- "/api/admin/nodes",
- "POST",
- {
- conversationId: sessionKey,
- name: rootName,
- nodeType: "prompt",
- status: "in-progress",
- content: oneLine(event?.context?.content || ""),
- timestamps: {
- created: startedAt,
- started: startedAt,
- },
- plannedApproach: "Auto-synced from OpenClaw conversation events.",
- notes: `sessionKey=${sessionKey}`,
- },
- {
- Authorization: `Bearer ${runtime.adminToken}`,
- },
- );
- const sessionState = {
- rootTaskId: String(created.id),
- index,
- startedAt,
- notes: `sessionKey=${sessionKey}`,
- };
- state.sessions[sessionKey] = sessionState;
- state.updatedAt = nowIso();
- return sessionState;
- }
- async function appendMessageChild(event, runtime, state, sessionState) {
- const content = oneLine(event?.context?.content || "");
- if (!content) return false;
- if (runtime.ignoreSlashCommands && content.startsWith("/")) return false;
- const atIso = toIsoFromEvent(event);
- const dedupKey = buildMessageDedupKey(event, content, atIso);
- if (state.messages[dedupKey]) return false;
- const channelId = safeTrim(event?.context?.channelId) || "unknown";
- const messageId = safeTrim(event?.context?.messageId) || "n/a";
- const childName = `Turn · ${truncate(content, 36)}`;
- const notes = [
- `content=${content}`,
- `messageId=${messageId}`,
- `channelId=${channelId}`,
- `sessionKey=${safeTrim(event?.sessionKey) || "unknown"}`,
- ].join("\n");
- await requestJson(
- runtime.apiBaseUrl,
- "/api/admin/nodes",
- "POST",
- {
- conversationId: safeTrim(event?.sessionKey) || "unknown-session",
- name: childName,
- parentId: sessionState.rootTaskId,
- nodeType: "thought",
- status: "completed",
- content,
- timestamps: {
- created: atIso,
- started: atIso,
- completed: atIso,
- },
- notes,
- },
- {
- Authorization: `Bearer ${runtime.adminToken}`,
- },
- );
- state.messages[dedupKey] = nowIso();
- cleanupSeenMessages(state, runtime.maxRemembered);
- state.updatedAt = nowIso();
- return true;
- }
- async function reconcileTranscriptMessages(event, runtime, state, stateDir) {
- let createdAny = false;
- const transcriptMessages = await extractUserMessagesFromTranscript(stateDir, event);
- if (!transcriptMessages.length) return false;
- for (const item of transcriptMessages) {
- const syntheticEvent = {
- ...event,
- type: "message",
- action: "received",
- context: {
- ...(event?.context || {}),
- content: item.content,
- messageId: item.messageId,
- timestamp: item.timestamp,
- channelId: "agent-bootstrap",
- },
- };
- const sessionState = await ensureRootTask(syntheticEvent, runtime, state);
- if (!sessionState) continue;
- const created = await appendMessageChild(syntheticEvent, runtime, state, sessionState);
- createdAny = createdAny || created;
- }
- return createdAny;
- }
- function scheduleDeferredReconcile(event, runtime) {
- if (event?.context?._taskBoardDeferred) return;
- for (const delayMs of DEFERRED_RECONCILE_DELAYS_MS) {
- const timer = setTimeout(() => {
- void (async () => {
- try {
- const deferredState = normalizeState(await readJson(runtime.stateFile, {}));
- const deferredEvent = {
- ...event,
- context: {
- ...(event?.context || {}),
- _taskBoardDeferred: true,
- },
- };
- const createdAny = await reconcileTranscriptMessages(deferredEvent, runtime, deferredState, runtime.stateDir);
- if (createdAny) {
- await writeJsonAtomic(runtime.stateFile, deferredState);
- }
- } catch (error) {
- const message = error instanceof Error ? error.message : String(error);
- await appendLog(runtime.stateDir, `WARN deferred reconcile failed: ${message}`);
- }
- })();
- }, delayMs);
- if (typeof timer?.unref === "function") {
- timer.unref();
- }
- }
- }
- export default async function taskBoardRealtimeHook(event) {
- const stateDir = resolveStateDir();
- const configPath = resolveConfigPath(stateDir);
- const config = await readJson(configPath, {});
- const runtime = resolveHookConfig(config, stateDir);
- runtime.stateDir = stateDir;
- const state = normalizeState(await readJson(runtime.stateFile, {}));
- try {
- if (event?.type === "command" && (event?.action === "new" || event?.action === "reset")) {
- const changed = await closeConversationIfPresent(event, runtime, state);
- if (changed) {
- await writeJsonAtomic(runtime.stateFile, state);
- }
- return;
- }
- const isMessageReceived = event?.type === "message" && event?.action === "received";
- const shouldScanTranscript =
- (event?.type === "agent" && event?.action === "bootstrap") ||
- (event?.type === "message" && event?.action === "sent") ||
- (event?.type === "command" && event?.action === "stop");
- if (!isMessageReceived) {
- if (!shouldScanTranscript) {
- return;
- }
- const createdAny = await reconcileTranscriptMessages(event, runtime, state, stateDir);
- if (createdAny) {
- await writeJsonAtomic(runtime.stateFile, state);
- }
- scheduleDeferredReconcile(event, runtime);
- return;
- }
- const sessionState = await ensureRootTask(event, runtime, state);
- if (!sessionState) return;
- const created = await appendMessageChild(event, runtime, state, sessionState);
- if (created) {
- await writeJsonAtomic(runtime.stateFile, state);
- }
- } catch (error) {
- const message = error instanceof Error ? error.message : String(error);
- await appendLog(stateDir, `ERROR event=${event?.type || "unknown"}:${event?.action || "unknown"} ${message}`);
- }
- }
|