#!/usr/bin/env python3 """Publish RobotDaily digests to Discord via OpenClaw and optional Discord REST.""" from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any, Dict, List, Optional from urllib.error import HTTPError from urllib.request import Request, urlopen from fetch_arxiv import DOMAIN_CONFIGS from utils import ( format_authors, log, normalize_space, now_local, read_json, run_command_json, truncate, ) DOMAIN_ORDER = ["embodied", "representation", "reinforcement"] DISCORD_API = "https://discord.com/api/v10" class PublishError(RuntimeError): pass def normalize_channel_name(name: str) -> str: text = normalize_space(name).lstrip("#").lower() text = re.sub(r"\s+", "-", text) return text class DiscordPublisher: def __init__( self, *, openclaw_bin: str, account_id: str, mode: str, guild_id: str, parent_channel_id: str, target_channel_id: str, target_channel_name: str, category_id: str, bot_token: str, thread_auto_archive_min: int, dry_run: bool, ) -> None: self.openclaw_bin = openclaw_bin self.account_id = account_id self.mode = mode self.guild_id = guild_id self.parent_channel_id = parent_channel_id self.target_channel_id = target_channel_id self.target_channel_name = target_channel_name self.category_id = category_id self.bot_token = bot_token self.thread_auto_archive_min = thread_auto_archive_min self.dry_run = dry_run def openclaw(self, *args: str) -> Dict[str, Any]: command = [self.openclaw_bin, "message", *args, "--channel", "discord", "--account", self.account_id, "--json"] if self.dry_run: command.append("--dry-run") return run_command_json(command) def list_channels(self) -> List[Dict[str, Any]]: if not self.guild_id: return [] payload = self.openclaw("channel", "list", "--guild-id", self.guild_id) return payload.get("payload", {}).get("channels", []) def list_threads(self) -> List[Dict[str, Any]]: if not self.guild_id or not self.parent_channel_id: return [] payload = self.openclaw( "thread", "list", "--guild-id", self.guild_id, "--channel-id", self.parent_channel_id, "--include-archived", "--limit", "100", ) return payload.get("payload", {}).get("threads", {}).get("threads", []) def find_existing_channel(self, name: str) -> Optional[str]: wanted_exact = normalize_space(name) wanted_normalized = normalize_channel_name(name) for channel in self.list_channels(): current_name = str(channel.get("name", "")) if current_name == wanted_exact or normalize_channel_name(current_name) == wanted_normalized: return str(channel.get("id", "")) return None def find_existing_thread(self, name: str) -> Optional[str]: for thread in self.list_threads(): if thread.get("name") == name: return str(thread.get("id", "")) return None def create_channel_via_rest(self, name: str, topic: str = "") -> str: if not self.guild_id: raise PublishError("channel 模式需要 DISCORD_GUILD_ID") if self.dry_run: return f"dry-run-channel-{normalize_channel_name(name) or 'robotdaily'}" if not self.bot_token: raise PublishError("创建 Discord 频道需要 DISCORD_BOT_TOKEN;当前 OpenClaw CLI 版本没有公开 channel create 子命令") body: Dict[str, Any] = {"name": normalize_channel_name(name) or "robotdaily", "type": 0} if self.category_id: body["parent_id"] = self.category_id if topic: body["topic"] = topic request = Request( url=f"{DISCORD_API}/guilds/{self.guild_id}/channels", method="POST", data=json.dumps(body).encode("utf-8"), headers={ "Authorization": f"Bot {self.bot_token}", "Content-Type": "application/json", }, ) try: with urlopen(request, timeout=30) as response: payload = json.loads(response.read().decode("utf-8", errors="ignore")) except HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise PublishError(f"Discord channel create failed: {exc.code} {detail}") from exc return str(payload.get("id", "")) def create_thread(self, thread_name: str, opening_message: str) -> str: if not self.parent_channel_id: raise PublishError("thread 模式需要 DISCORD_PARENT_CHANNEL_ID") existing = self.find_existing_thread(thread_name) if existing: return existing payload = self.openclaw( "thread", "create", "--target", f"channel:{self.parent_channel_id}", "--thread-name", thread_name, "--message", opening_message, "--auto-archive-min", str(self.thread_auto_archive_min), ) thread = payload.get("payload", {}).get("thread", {}) thread_id = str(thread.get("id", "")) if not thread_id and self.dry_run: return f"dry-run-thread-{thread_name}" if not thread_id: raise PublishError("OpenClaw thread create 没有返回 thread id") return thread_id def create_fixed_channel(self, title: str) -> str: channel_name = normalize_channel_name(self.target_channel_name or "robotdaily") or "robotdaily" existing = self.find_existing_channel(channel_name) if existing: return existing topic = truncate(title, 180) channel_id = self.create_channel_via_rest(channel_name, topic=topic) if not channel_id: raise PublishError("Discord fixed channel create 返回空 id") return channel_id def create_or_resolve_target(self, title: str, opening_message: str) -> str: date_slug = now_local().strftime("%Y-%m-%d") if self.mode == "existing-channel": if not self.target_channel_id: raise PublishError("existing-channel 模式需要 DISCORD_TARGET_CHANNEL_ID") return self.target_channel_id if self.mode == "fixed-channel": return self.create_fixed_channel(title) if self.mode == "thread": thread_name = f"RobotDaily {date_slug}" return self.create_thread(thread_name, opening_message) if self.mode == "channel": prefix = normalize_channel_name(self.target_channel_name or "robotdaily") or "robotdaily" channel_name = f"{prefix}-{date_slug}" existing = self.find_existing_channel(channel_name) if existing: return existing topic = truncate(title, 180) channel_id = self.create_channel_via_rest(channel_name, topic=topic) if not channel_id: raise PublishError("Discord channel create 返回空 id") return channel_id raise PublishError(f"未知的投递模式: {self.mode}") def send_embeds_via_rest(self, target_channel_id: str, content: str = "", embeds: List[Dict[str, Any]] = None) -> Dict[str, Any]: if self.dry_run: log(f"[Dry Run] send_embeds_via_rest: channel {target_channel_id}, embeds count {len(embeds or [])}") return {"id": "dry-run-msg-id"} if not self.bot_token: raise PublishError("发送 Embed 需要 DISCORD_BOT_TOKEN") body: Dict[str, Any] = {} if content: body["content"] = content if embeds: body["embeds"] = embeds request = Request( url=f"{DISCORD_API}/channels/{target_channel_id}/messages", method="POST", data=json.dumps(body).encode("utf-8"), headers={ "Authorization": f"Bot {self.bot_token}", "Content-Type": "application/json", }, ) try: with urlopen(request, timeout=30) as response: result = json.loads(response.read().decode("utf-8", errors="ignore")) return result except HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") log(f"REST API 错误: {exc.code} {exc.reason} - {detail}") raise PublishError(f"创建 Discord 消息失败: {exc.code} {detail}") from exc def send_message(self, target_channel_id: str, message: str, media: str = "") -> Dict[str, Any]: args = ["send", "--target", f"channel:{target_channel_id}", "--message", message] if media: args.extend(["--media", media]) return self.openclaw(*args) def build_opening_message(payload: Dict[str, Any]) -> str: total = len(payload.get("papers", [])) counts = payload.get("counts", {}) parts = [f"老大早安~今天给你挑了 {total} 篇偏应用论文。"] for domain in DOMAIN_ORDER: count = counts.get(domain, 0) if count: parts.append(f"{DOMAIN_CONFIGS[domain]['label_zh']} {count} 篇") parts.append("下面每张卡片都带 DOI / arXiv / PDF,可直接点开读。") return " | ".join(parts) def build_domain_header(domain: str, count: int) -> str: return f"## {DOMAIN_CONFIGS[domain]['label_zh']}({count} 篇)" def build_paper_embed(paper: Dict[str, Any]) -> Dict[str, Any]: tags = " ".join(f"`{tag}`" for tag in paper.get("tags", [])[:6]) # Title max 256 title = f"{paper.get('domain_rank', '?')}. {paper.get('title', '')}" if len(title) > 256: title = title[:253] + "..." description_lines = [] description_lines.append(f"**作者:** {format_authors(paper.get('authors', []), limit=4)}") if tags: description_lines.append(f"**关键词:** {tags}") brief = paper.get('brief_explanation_zh', '') if brief: description_lines.append(f"\n**💡 简析**\n{brief}") abstract = truncate(paper.get('translated_abstract_zh', ''), 700) if abstract: description_lines.append(f"\n**📖 摘要**\n{abstract}") links = [] if paper.get('doi_url'): links.append(f"[DOI]({paper.get('doi_url')})") if paper.get('abs_url'): links.append(f"[arXiv]({paper.get('abs_url')})") if paper.get('pdf_url'): links.append(f"[PDF]({paper.get('pdf_url')})") if links: description_lines.append(f"\n**🔗 链接:** {' | '.join(links)}") description = "\n".join(description_lines) if len(description) > 4096: description = description[:4093] + "..." embed = { "title": title, "description": description, "url": paper.get('abs_url', ''), "color": 0x3498db } return embed def publish_digest( payload: Dict[str, Any], *, html_path: str = "", markdown_path: str = "", publisher: DiscordPublisher, ) -> str: opening_message = build_opening_message(payload) target_channel_id = publisher.create_or_resolve_target(opening_message, opening_message) attached_message = opening_message + "\n\n今天起换成了全新卡片式排版,直接在 Discord 里看中译摘要和核心内容啦!" if html_path: publisher.send_message(target_channel_id, attached_message, media=html_path) else: publisher.send_message(target_channel_id, attached_message) grouped: Dict[str, List[Dict[str, Any]]] = {domain: [] for domain in DOMAIN_ORDER} for paper in payload.get("papers", []): grouped.setdefault(paper["domain"], []).append(paper) for domain in DOMAIN_ORDER: papers = grouped.get(domain, []) if not papers: continue # Build embeds for the domain embeds = [build_paper_embed(paper) for paper in papers] # Discord limit is 10 embeds per message, we chunk them by 4 to be safe with total characters chunk_size = 4 for i in range(0, len(embeds), chunk_size): chunk_embeds = embeds[i:i + chunk_size] # Print domain header on the first chunk msg_content = build_domain_header(domain, len(papers)) if i == 0 else "" if publisher.bot_token: # Use REST API to send rich embeds publisher.send_embeds_via_rest(target_channel_id, content=msg_content, embeds=chunk_embeds) else: # Fallback to plain text if no bot token if msg_content: publisher.send_message(target_channel_id, msg_content) for paper in papers[i:i + chunk_size]: fallback_text = f"**{paper.get('title')}**\n{paper.get('abs_url')}" publisher.send_message(target_channel_id, fallback_text) if markdown_path: publisher.send_message(target_channel_id, "附一份 Markdown 归档版,桌面端检索会更方便。", media=markdown_path) return target_channel_id def main() -> None: parser = argparse.ArgumentParser(description="Publish RobotDaily digest to Discord") parser.add_argument("--input", required=True) parser.add_argument("--html", default="") parser.add_argument("--markdown", default="") parser.add_argument("--mode", default="thread") parser.add_argument("--openclaw-bin", default="openclaw") parser.add_argument("--account-id", default="codex") parser.add_argument("--guild-id", default="") parser.add_argument("--parent-channel-id", default="") parser.add_argument("--target-channel-id", default="") parser.add_argument("--target-channel-name", default="") parser.add_argument("--category-id", default="") parser.add_argument("--bot-token", default="") parser.add_argument("--thread-auto-archive-min", type=int, default=10080) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() payload = read_json(args.input, default={}) or {} publisher = DiscordPublisher( openclaw_bin=args.openclaw_bin, account_id=args.account_id, mode=args.mode, guild_id=args.guild_id, parent_channel_id=args.parent_channel_id, target_channel_id=args.target_channel_id, target_channel_name=args.target_channel_name, category_id=args.category_id, bot_token=args.bot_token, thread_auto_archive_min=args.thread_auto_archive_min, dry_run=args.dry_run, ) target = publish_digest( payload, html_path=args.html, markdown_path=args.markdown, publisher=publisher, ) log(f"Digest published to Discord target: {target}") if __name__ == "__main__": main()