#!/usr/bin/env python3 """Shared helpers for the RobotDaily arXiv digest skill.""" from __future__ import annotations import json import os import re import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Optional from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from zoneinfo import ZoneInfo SKILL_DIR = Path(__file__).resolve().parents[1] ROOT_DIR = SKILL_DIR.parent DEFAULT_OUTPUT_DIR = SKILL_DIR / "output" DEFAULT_LOG_DIR = SKILL_DIR / "logs" LOCAL_TZ = ZoneInfo("Asia/Shanghai") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/generate") def log(message: str) -> None: timestamp = datetime.now(LOCAL_TZ).strftime("%H:%M:%S") print(f"[{timestamp}] {message}", file=sys.stderr) def now_local() -> datetime: return datetime.now(LOCAL_TZ) def ensure_dir(path: Path | str) -> Path: path_obj = Path(path) path_obj.mkdir(parents=True, exist_ok=True) return path_obj def normalize_space(text: str) -> str: return re.sub(r"\s+", " ", str(text or "")).strip() def slugify(text: str) -> str: slug = re.sub(r"[^a-zA-Z0-9]+", "-", str(text or "").strip().lower()).strip("-") return slug or "digest" def canonical_arxiv_id(raw: str) -> str: text = normalize_space(raw) if not text: return "" text = text.rsplit("/", 1)[-1] text = text.replace("arXiv:", "") return re.sub(r"v\d+$", "", text) def canonical_doi(arxiv_id: str, doi: str = "") -> str: clean = normalize_space(doi) if clean: clean = clean.replace("https://doi.org/", "").replace("http://doi.org/", "") clean = clean.replace("doi:", "") return clean.strip() arxiv_clean = canonical_arxiv_id(arxiv_id) return f"10.48550/arXiv.{arxiv_clean}" if arxiv_clean else "" def canonical_doi_url(arxiv_id: str, doi: str = "") -> str: clean_doi = canonical_doi(arxiv_id, doi) return f"https://doi.org/{clean_doi}" if clean_doi else "" def build_arxiv_urls(arxiv_id: str) -> Dict[str, str]: clean = canonical_arxiv_id(arxiv_id) if not clean: return {"abs_url": "", "pdf_url": ""} return { "abs_url": f"https://arxiv.org/abs/{clean}", "pdf_url": f"https://arxiv.org/pdf/{clean}.pdf", } def read_json(path: Path | str, default: Any = None) -> Any: path_obj = Path(path) if not path_obj.exists(): return default with path_obj.open("r", encoding="utf-8") as handle: return json.load(handle) def write_json(path: Path | str, data: Any) -> Path: path_obj = Path(path) ensure_dir(path_obj.parent) with path_obj.open("w", encoding="utf-8") as handle: json.dump(data, handle, ensure_ascii=False, indent=2) return path_obj def write_text(path: Path | str, content: str) -> Path: path_obj = Path(path) ensure_dir(path_obj.parent) path_obj.write_text(content, encoding="utf-8") return path_obj def load_env(env_file: Path | str | None = None) -> Dict[str, str]: env = dict(os.environ) env_path = Path(env_file) if env_file else SKILL_DIR / ".env" if env_path.exists(): for line in env_path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") env.setdefault(key, value) return env def extract_json_object(text: str) -> Optional[Dict[str, Any]]: if not text: return None match = re.search(r"\{.*\}", text, re.DOTALL) if not match: return None try: payload = json.loads(match.group(0)) except Exception: return None return payload if isinstance(payload, dict) else None def ollama_generate_json(prompt: str, model: str, timeout: int = 120) -> Optional[Dict[str, Any]]: body = json.dumps( { "model": model, "prompt": prompt, "stream": False, "format": "json", "think": False, "options": {"temperature": 0.1, "num_predict": 800}, } ).encode("utf-8") request = Request(url=OLLAMA_URL, data=body, method="POST") request.add_header("Content-Type", "application/json") try: with urlopen(request, timeout=timeout) as response: payload = json.loads(response.read().decode("utf-8", errors="ignore")) return extract_json_object(payload.get("response", "")) except HTTPError as exc: detail = "" try: detail = exc.read().decode("utf-8", errors="ignore") except Exception: detail = "" log(f"Ollama request failed: {exc} {detail}".strip()) return None except (URLError, TimeoutError) as exc: log(f"Ollama request failed: {exc}") return None except Exception as exc: log(f"Ollama parse failed: {exc}") return None class CommandError(RuntimeError): pass def run_command(args: List[str], cwd: Path | str | None = None) -> subprocess.CompletedProcess[str]: result = subprocess.run( args, cwd=str(cwd) if cwd else None, capture_output=True, text=True, check=False, ) if result.returncode != 0: stderr = result.stderr.strip() stdout = result.stdout.strip() detail = stderr or stdout or f"exit code {result.returncode}" raise CommandError(detail) return result def run_command_json(args: List[str], cwd: Path | str | None = None) -> Dict[str, Any]: result = run_command(args, cwd=cwd) stdout = result.stdout.strip() if not stdout: return {} try: return json.loads(stdout) except json.JSONDecodeError: start = stdout.find("{") end = stdout.rfind("}") if start != -1 and end != -1 and end > start: snippet = stdout[start : end + 1] try: return json.loads(snippet) except json.JSONDecodeError as exc: raise CommandError(f"Invalid JSON output: {exc}: {stdout[:300]}") from exc raise CommandError(f"Invalid JSON output: {stdout[:300]}") def chunk_lines(lines: Iterable[str], limit: int = 1800) -> List[str]: chunks: List[str] = [] current: List[str] = [] current_len = 0 for line in lines: safe_line = str(line) extra = len(safe_line) + (1 if current else 0) if current and current_len + extra > limit: chunks.append("\n".join(current)) current = [safe_line] current_len = len(safe_line) continue current.append(safe_line) current_len += extra if current: chunks.append("\n".join(current)) return chunks def format_authors(authors: List[str], limit: int = 4) -> str: items = [normalize_space(author) for author in authors if normalize_space(author)] if len(items) <= limit: return ", ".join(items) hidden = len(items) - limit return f"{', '.join(items[:limit])} 等另外{hidden}人" def truncate(text: str, limit: int) -> str: clean = normalize_space(text) if len(clean) <= limit: return clean return clean[: limit - 1].rstrip() + "…" def html_escape(text: str) -> str: return ( str(text or "") .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) )