| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- #!/usr/bin/env python3
- """Shared helpers for the RobotDaily arXiv digest skill."""
- from __future__ import annotations
- import json
- import os
- import re
- import subprocess
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, Iterable, List, Optional
- from urllib.error import HTTPError, URLError
- from urllib.request import Request, urlopen
- from zoneinfo import ZoneInfo
- SKILL_DIR = Path(__file__).resolve().parents[1]
- ROOT_DIR = SKILL_DIR.parent
- DEFAULT_OUTPUT_DIR = SKILL_DIR / "output"
- DEFAULT_LOG_DIR = SKILL_DIR / "logs"
- LOCAL_TZ = ZoneInfo("Asia/Shanghai")
- OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/generate")
- def log(message: str) -> None:
- timestamp = datetime.now(LOCAL_TZ).strftime("%H:%M:%S")
- print(f"[{timestamp}] {message}", file=sys.stderr)
- def now_local() -> datetime:
- return datetime.now(LOCAL_TZ)
- def ensure_dir(path: Path | str) -> Path:
- path_obj = Path(path)
- path_obj.mkdir(parents=True, exist_ok=True)
- return path_obj
- def normalize_space(text: str) -> str:
- return re.sub(r"\s+", " ", str(text or "")).strip()
- def slugify(text: str) -> str:
- slug = re.sub(r"[^a-zA-Z0-9]+", "-", str(text or "").strip().lower()).strip("-")
- return slug or "digest"
- def canonical_arxiv_id(raw: str) -> str:
- text = normalize_space(raw)
- if not text:
- return ""
- text = text.rsplit("/", 1)[-1]
- text = text.replace("arXiv:", "")
- return re.sub(r"v\d+$", "", text)
- def canonical_doi(arxiv_id: str, doi: str = "") -> str:
- clean = normalize_space(doi)
- if clean:
- clean = clean.replace("https://doi.org/", "").replace("http://doi.org/", "")
- clean = clean.replace("doi:", "")
- return clean.strip()
- arxiv_clean = canonical_arxiv_id(arxiv_id)
- return f"10.48550/arXiv.{arxiv_clean}" if arxiv_clean else ""
- def canonical_doi_url(arxiv_id: str, doi: str = "") -> str:
- clean_doi = canonical_doi(arxiv_id, doi)
- return f"https://doi.org/{clean_doi}" if clean_doi else ""
- def build_arxiv_urls(arxiv_id: str) -> Dict[str, str]:
- clean = canonical_arxiv_id(arxiv_id)
- if not clean:
- return {"abs_url": "", "pdf_url": ""}
- return {
- "abs_url": f"https://arxiv.org/abs/{clean}",
- "pdf_url": f"https://arxiv.org/pdf/{clean}.pdf",
- }
- def read_json(path: Path | str, default: Any = None) -> Any:
- path_obj = Path(path)
- if not path_obj.exists():
- return default
- with path_obj.open("r", encoding="utf-8") as handle:
- return json.load(handle)
- def write_json(path: Path | str, data: Any) -> Path:
- path_obj = Path(path)
- ensure_dir(path_obj.parent)
- with path_obj.open("w", encoding="utf-8") as handle:
- json.dump(data, handle, ensure_ascii=False, indent=2)
- return path_obj
- def write_text(path: Path | str, content: str) -> Path:
- path_obj = Path(path)
- ensure_dir(path_obj.parent)
- path_obj.write_text(content, encoding="utf-8")
- return path_obj
- def load_env(env_file: Path | str | None = None) -> Dict[str, str]:
- env = dict(os.environ)
- env_path = Path(env_file) if env_file else SKILL_DIR / ".env"
- if env_path.exists():
- for line in env_path.read_text(encoding="utf-8").splitlines():
- line = line.strip()
- if not line or line.startswith("#") or "=" not in line:
- continue
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip().strip('"').strip("'")
- env.setdefault(key, value)
- return env
- def extract_json_object(text: str) -> Optional[Dict[str, Any]]:
- if not text:
- return None
- match = re.search(r"\{.*\}", text, re.DOTALL)
- if not match:
- return None
- try:
- payload = json.loads(match.group(0))
- except Exception:
- return None
- return payload if isinstance(payload, dict) else None
- def ollama_generate_json(prompt: str, model: str, timeout: int = 120) -> Optional[Dict[str, Any]]:
- body = json.dumps(
- {
- "model": model,
- "prompt": prompt,
- "stream": False,
- "format": "json",
- "think": False,
- "options": {"temperature": 0.1, "num_predict": 800},
- }
- ).encode("utf-8")
- request = Request(url=OLLAMA_URL, data=body, method="POST")
- request.add_header("Content-Type", "application/json")
- try:
- with urlopen(request, timeout=timeout) as response:
- payload = json.loads(response.read().decode("utf-8", errors="ignore"))
- return extract_json_object(payload.get("response", ""))
- except HTTPError as exc:
- detail = ""
- try:
- detail = exc.read().decode("utf-8", errors="ignore")
- except Exception:
- detail = ""
- log(f"Ollama request failed: {exc} {detail}".strip())
- return None
- except (URLError, TimeoutError) as exc:
- log(f"Ollama request failed: {exc}")
- return None
- except Exception as exc:
- log(f"Ollama parse failed: {exc}")
- return None
- class CommandError(RuntimeError):
- pass
- def run_command(args: List[str], cwd: Path | str | None = None) -> subprocess.CompletedProcess[str]:
- result = subprocess.run(
- args,
- cwd=str(cwd) if cwd else None,
- capture_output=True,
- text=True,
- check=False,
- )
- if result.returncode != 0:
- stderr = result.stderr.strip()
- stdout = result.stdout.strip()
- detail = stderr or stdout or f"exit code {result.returncode}"
- raise CommandError(detail)
- return result
- def run_command_json(args: List[str], cwd: Path | str | None = None) -> Dict[str, Any]:
- result = run_command(args, cwd=cwd)
- stdout = result.stdout.strip()
- if not stdout:
- return {}
- try:
- return json.loads(stdout)
- except json.JSONDecodeError:
- start = stdout.find("{")
- end = stdout.rfind("}")
- if start != -1 and end != -1 and end > start:
- snippet = stdout[start : end + 1]
- try:
- return json.loads(snippet)
- except json.JSONDecodeError as exc:
- raise CommandError(f"Invalid JSON output: {exc}: {stdout[:300]}") from exc
- raise CommandError(f"Invalid JSON output: {stdout[:300]}")
- def chunk_lines(lines: Iterable[str], limit: int = 1800) -> List[str]:
- chunks: List[str] = []
- current: List[str] = []
- current_len = 0
- for line in lines:
- safe_line = str(line)
- extra = len(safe_line) + (1 if current else 0)
- if current and current_len + extra > limit:
- chunks.append("\n".join(current))
- current = [safe_line]
- current_len = len(safe_line)
- continue
- current.append(safe_line)
- current_len += extra
- if current:
- chunks.append("\n".join(current))
- return chunks
- def format_authors(authors: List[str], limit: int = 4) -> str:
- items = [normalize_space(author) for author in authors if normalize_space(author)]
- if len(items) <= limit:
- return ", ".join(items)
- hidden = len(items) - limit
- return f"{', '.join(items[:limit])} 等另外{hidden}人"
- def truncate(text: str, limit: int) -> str:
- clean = normalize_space(text)
- if len(clean) <= limit:
- return clean
- return clean[: limit - 1].rstrip() + "…"
- def html_escape(text: str) -> str:
- return (
- str(text or "")
- .replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
- )
|