#!/usr/bin/env python3 """Fetch recent arXiv papers for RobotDaily domains.""" from __future__ import annotations import argparse import json import math import xml.etree.ElementTree as ET from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import Any, Dict, List from urllib.parse import urlencode from urllib.request import urlopen from utils import ( LOCAL_TZ, build_arxiv_urls, canonical_arxiv_id, canonical_doi, canonical_doi_url, ensure_dir, log, normalize_space, now_local, write_json, ) ATOM_NS = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"} API_URL = "https://export.arxiv.org/api/query" DOMAIN_CONFIGS: Dict[str, Dict[str, Any]] = { "embodied": { "label_zh": "具身智能", "query": "(cat:cs.RO OR cat:cs.AI OR cat:cs.LG OR cat:cs.CV) AND (all:robot OR all:embodied OR all:humanoid OR all:manipulation OR all:navigation OR all:locomotion OR all:grasp)", "categories": {"cs.RO": 3.0, "cs.AI": 1.2, "cs.LG": 0.8, "cs.CV": 0.5}, "keywords": { "embodied": 2.5, "robot": 2.5, "robotics": 2.0, "humanoid": 2.0, "manipulation": 2.0, "navigation": 1.8, "locomotion": 1.8, "grasp": 1.8, "grasping": 1.8, "sim2real": 1.6, "physical": 1.0, "contact-rich": 1.2, "real robot": 2.0, }, }, "representation": { "label_zh": "表征学习", "query": "(cat:cs.LG OR cat:cs.CV OR cat:cs.AI OR cat:cs.RO) AND (all:\"representation learning\" OR all:representation OR all:latent OR all:embedding OR all:\"world model\" OR all:\"self-supervised\")", "categories": {"cs.LG": 2.5, "cs.CV": 1.2, "cs.AI": 1.0, "cs.RO": 0.8}, "keywords": { "representation": 2.5, "representations": 2.5, "latent": 2.0, "embedding": 2.0, "feature": 1.3, "state space": 1.4, "world model": 2.0, "self-supervised": 1.8, "pretraining": 1.2, "tokenizer": 1.0, "object-centric": 1.4, }, }, "reinforcement": { "label_zh": "强化学习", "query": "(cat:cs.LG OR cat:cs.AI OR cat:cs.RO) AND (all:\"reinforcement learning\" OR all:\"offline reinforcement learning\" OR all:\"offline rl\" OR all:\"imitation learning\" OR all:\"policy optimization\" OR all:\"multi-agent reinforcement learning\")", "categories": {"cs.LG": 2.0, "cs.AI": 1.8, "cs.RO": 1.0}, "keywords": { "reinforcement learning": 2.8, "offline reinforcement learning": 2.6, "offline rl": 2.4, "policy optimization": 2.0, "policy gradient": 1.8, "actor-critic": 1.8, "imitation learning": 2.0, "multi-agent reinforcement learning": 2.2, "decision-making": 1.4, "control": 0.8, "trajectory": 0.8, "q-learning": 1.8, }, }, } APPLIED_KEYWORDS = { "real-world": 2.2, "real world": 2.2, "deployment": 2.0, "deployed": 1.6, "robot": 1.5, "robotic": 1.4, "system": 1.0, "benchmark": 0.9, "dataset": 0.9, "controller": 1.0, "hardware": 1.4, "field": 1.2, "navigation": 1.2, "manipulation": 1.2, "autonomous": 1.2, "assistive": 1.4, "human-robot": 1.6, "sim2real": 1.8, "simulation-to-real": 1.8, "real robot": 2.0, "open-world": 1.2, } INNOVATION_KEYWORDS = { "foundation model": 2.2, "world model": 1.8, "unified": 1.3, "generalist": 1.5, "scalable": 1.2, "multimodal": 1.2, "diffusion": 1.2, "cross-embodiment": 1.8, "self-supervised": 1.1, "zero-shot": 1.2, "few-shot": 1.0, "novel": 0.8, "first": 0.8, "new benchmark": 1.0, "data engine": 1.4, "reasoning": 1.0, } NEGATIVE_KEYWORDS = { "survey": -2.4, "review": -2.1, "tutorial": -2.4, "perspective": -1.6, } def build_date_clause(lookback_days: int) -> str: now = now_local() start_local = (now - timedelta(days=max(lookback_days, 1) - 1)).replace(hour=0, minute=0, second=0, microsecond=0) end_local = now.replace(hour=23, minute=59, second=0, microsecond=0) start_utc = start_local.astimezone(timezone.utc) end_utc = end_local.astimezone(timezone.utc) return f"submittedDate:[{start_utc.strftime('%Y%m%d%H%M')} TO {end_utc.strftime('%Y%m%d%H%M')}]" def build_query(domain: str, lookback_days: int, with_date: bool = True) -> str: base = DOMAIN_CONFIGS[domain]["query"] if not with_date: return base return f"({base}) AND {build_date_clause(lookback_days)}" def request_feed(query: str, start: int, max_results: int) -> str: params = urlencode( { "search_query": query, "sortBy": "submittedDate", "sortOrder": "descending", "start": start, "max_results": max_results, } ) url = f"{API_URL}?{params}" with urlopen(url, timeout=45) as response: return response.read().decode("utf-8", errors="ignore") def parse_entry(entry: ET.Element, query_domain: str) -> Dict[str, Any]: raw_id = entry.findtext("atom:id", default="", namespaces=ATOM_NS) arxiv_id = canonical_arxiv_id(raw_id) title = normalize_space(entry.findtext("atom:title", default="", namespaces=ATOM_NS)) summary = normalize_space(entry.findtext("atom:summary", default="", namespaces=ATOM_NS)) published = normalize_space(entry.findtext("atom:published", default="", namespaces=ATOM_NS)) updated = normalize_space(entry.findtext("atom:updated", default="", namespaces=ATOM_NS)) comment = normalize_space(entry.findtext("arxiv:comment", default="", namespaces=ATOM_NS)) journal_ref = normalize_space(entry.findtext("arxiv:journal_ref", default="", namespaces=ATOM_NS)) doi = canonical_doi(arxiv_id, entry.findtext("arxiv:doi", default="", namespaces=ATOM_NS)) authors = [normalize_space(author.findtext("atom:name", default="", namespaces=ATOM_NS)) for author in entry.findall("atom:author", ATOM_NS)] authors = [author for author in authors if author] categories = [cat.attrib.get("term", "") for cat in entry.findall("atom:category", ATOM_NS) if cat.attrib.get("term")] primary_category = normalize_space(entry.findtext("arxiv:primary_category", default="", namespaces=ATOM_NS)) if not primary_category: primary_category = entry.find("arxiv:primary_category", ATOM_NS).attrib.get("term", "") if entry.find("arxiv:primary_category", ATOM_NS) is not None else "" abs_url = "" pdf_url = "" for link in entry.findall("atom:link", ATOM_NS): href = link.attrib.get("href", "") title_attr = link.attrib.get("title", "") rel = link.attrib.get("rel", "") if title_attr == "pdf" or link.attrib.get("type") == "application/pdf": pdf_url = href elif rel == "alternate" and href: abs_url = href if not abs_url or not pdf_url: urls = build_arxiv_urls(arxiv_id) abs_url = abs_url or urls["abs_url"] pdf_url = pdf_url or urls["pdf_url"] published_dt = datetime.fromisoformat(published.replace("Z", "+00:00")) if published else None updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00")) if updated else None paper = { "arxiv_id": arxiv_id, "title": title, "summary": summary, "authors": authors, "published": published, "updated": updated, "published_local": published_dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M") if published_dt else "", "updated_local": updated_dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M") if updated_dt else "", "abs_url": abs_url, "pdf_url": pdf_url, "doi": doi, "doi_url": canonical_doi_url(arxiv_id, doi), "comment": comment, "journal_ref": journal_ref, "categories": categories, "primary_category": primary_category, "query_domains": [query_domain], } paper.update(score_paper(paper)) return paper def score_terms(text: str, weights: Dict[str, float]) -> Dict[str, Any]: matched: List[str] = [] score = 0.0 lowered = text.lower() for term, weight in weights.items(): if term in lowered: matched.append(term) score += weight return {"score": round(score, 3), "matched": matched} def score_domain_fit(paper: Dict[str, Any]) -> Dict[str, Any]: text = f"{paper.get('title', '')} {paper.get('summary', '')} {paper.get('comment', '')}".lower() domain_scores: Dict[str, float] = {} domain_matches: Dict[str, List[str]] = {} for domain, cfg in DOMAIN_CONFIGS.items(): keyword_result = score_terms(text, cfg["keywords"]) category_score = sum(cfg["categories"].get(category, 0.0) for category in paper.get("categories", [])) query_boost = 1.2 if domain in paper.get("query_domains", []) else 0.0 total = keyword_result["score"] + category_score + query_boost domain_scores[domain] = round(total, 3) domain_matches[domain] = keyword_result["matched"] best_domain = max(domain_scores.items(), key=lambda item: item[1])[0] return { "domain": best_domain, "domain_scores": domain_scores, "domain_matches": domain_matches, "score_domain_fit": round(domain_scores[best_domain], 3), } def score_paper(paper: Dict[str, Any]) -> Dict[str, Any]: text = f"{paper.get('title', '')} {paper.get('summary', '')} {paper.get('comment', '')} {paper.get('journal_ref', '')}".lower() domain_result = score_domain_fit(paper) applied_result = score_terms(text, APPLIED_KEYWORDS) innovation_result = score_terms(text, INNOVATION_KEYWORDS) negative_result = score_terms(text, NEGATIVE_KEYWORDS) recency_score = 0.0 published = paper.get("published") if published: try: published_dt = datetime.fromisoformat(published.replace("Z", "+00:00")).astimezone(LOCAL_TZ) age_hours = max((now_local() - published_dt).total_seconds() / 3600.0, 0.0) recency_score = max(0.0, 1.5 - min(age_hours / 24.0, 1.5)) except Exception: recency_score = 0.0 total_score = ( domain_result["score_domain_fit"] * 1.35 + applied_result["score"] * 1.25 + innovation_result["score"] + negative_result["score"] + recency_score ) return { **domain_result, "score_applied": round(applied_result["score"], 3), "score_innovation": round(innovation_result["score"], 3), "score_recency": round(recency_score, 3), "score_penalty": round(negative_result["score"], 3), "score_total": round(total_score, 3), "matched_applied_terms": applied_result["matched"], "matched_innovation_terms": innovation_result["matched"], "matched_negative_terms": negative_result["matched"], } def merge_papers(existing: Dict[str, Any], incoming: Dict[str, Any]) -> Dict[str, Any]: merged = dict(existing) merged["query_domains"] = sorted(set(existing.get("query_domains", [])) | set(incoming.get("query_domains", []))) merged["categories"] = sorted(set(existing.get("categories", [])) | set(incoming.get("categories", []))) if incoming.get("comment") and len(incoming["comment"]) > len(existing.get("comment", "")): merged["comment"] = incoming["comment"] if incoming.get("journal_ref") and not existing.get("journal_ref"): merged["journal_ref"] = incoming["journal_ref"] rescored = score_paper(merged) merged.update(rescored) return merged def fetch_candidates(lookback_days: int = 2, max_results_per_domain: int = 40) -> List[Dict[str, Any]]: papers_by_id: Dict[str, Dict[str, Any]] = {} for domain in DOMAIN_CONFIGS: query = build_query(domain, lookback_days, with_date=True) log(f"Fetching {domain} candidates from arXiv") feed = request_feed(query, start=0, max_results=max_results_per_domain) root = ET.fromstring(feed) entries = root.findall("atom:entry", ATOM_NS) if not entries: log(f"No dated results for {domain}; falling back to latest recent results without date filter") query = build_query(domain, lookback_days, with_date=False) feed = request_feed(query, start=0, max_results=max_results_per_domain) root = ET.fromstring(feed) entries = root.findall("atom:entry", ATOM_NS) for entry in entries: paper = parse_entry(entry, query_domain=domain) arxiv_id = paper["arxiv_id"] if not arxiv_id: continue if arxiv_id in papers_by_id: papers_by_id[arxiv_id] = merge_papers(papers_by_id[arxiv_id], paper) else: papers_by_id[arxiv_id] = paper papers = list(papers_by_id.values()) papers.sort(key=lambda item: (item.get("score_total", 0.0), item.get("published", "")), reverse=True) return papers def main() -> None: parser = argparse.ArgumentParser(description="Fetch daily arXiv candidates for RobotDaily") parser.add_argument("--lookback-days", type=int, default=2) parser.add_argument("--max-results-per-domain", type=int, default=40) parser.add_argument("--output", type=str, default="") args = parser.parse_args() papers = fetch_candidates( lookback_days=args.lookback_days, max_results_per_domain=args.max_results_per_domain, ) payload = { "generated_at": now_local().isoformat(), "count": len(papers), "papers": papers, } if args.output: write_json(args.output, payload) log(f"Saved {len(papers)} candidates to {args.output}") else: print(json.dumps(payload, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()