Files
youlu-openclaw-workspace/scripts/news_digest/main.py
2026-02-22 10:51:58 -08:00

475 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""RSS News Digest — fetch feeds, store articles with full content in SQLite, and summarize via Ollama during fetch.
Recommended: run via ./run.sh, which uses `uv` to handle dependencies
automatically (no manual venv or pip install needed).
When an `ollama` key is present in config.json, each newly fetched article is
automatically summarized and the result is stored in the database. Ollama
latency provides natural rate limiting between HTTP requests; when Ollama is
not configured, a 1-second sleep is used instead.
Uses a requests.Session with automatic retries and browser-like headers to
handle transient HTTP errors (429/5xx). A configurable per-feed article cap
helps avoid overwhelming upstream servers.
Use ``--test`` to smoke-test feed fetching and/or Ollama summarization without
writing to the database.
"""
import argparse
import json
import logging
import sqlite3
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from time import mktime
import feedparser
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger("news_digest")
# Hardcoded test articles for --test summary (one English, one Chinese)
_TEST_ARTICLES = [
{
"title": "Global Semiconductor Shortage Eases as New Factories Come Online",
"content": (
"The global chip shortage that disrupted industries from automotive to "
"consumer electronics is finally showing signs of relief. Major semiconductor "
"manufacturers including TSMC, Samsung, and Intel have begun production at new "
"fabrication plants in Arizona, Texas, and Japan. Industry analysts project that "
"global chip capacity will increase by 15% over the next 18 months, potentially "
"leading to a supply surplus in certain categories. The shift has already begun "
"to impact pricing, with memory chip costs dropping 12% in the last quarter."
),
},
{
"title": "中国新能源汽车出口量首次突破年度600万辆大关",
"content": (
"据中国汽车工业协会最新数据2025年中国新能源汽车出口量首次突破600万辆"
"同比增长38%。比亚迪、上汽、蔚来等品牌在东南亚、欧洲和南美市场持续扩张。"
"分析人士指出,中国在电池技术和供应链方面的优势使其产品在全球市场具有较强"
"竞争力,但欧盟加征的反补贴关税可能对未来增长构成挑战。"
),
},
]
def _build_session() -> requests.Session:
"""Create a requests session with automatic retries and browser-like headers."""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s between retries
status_forcelist=[429, 500, 502, 503, 504],
respect_retry_after_header=True,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
})
return session
_session = _build_session()
def load_config(path: str) -> dict:
with open(path, encoding="utf-8") as f:
return json.load(f)
def init_db(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
description TEXT,
content TEXT,
summary TEXT,
published_date TEXT,
fetched_date TEXT NOT NULL,
feed_name TEXT,
feed_url TEXT,
category TEXT,
author TEXT
)
""")
conn.commit()
return conn
def parse_article_date(entry) -> datetime | None:
for attr in ("published_parsed", "updated_parsed"):
parsed = getattr(entry, attr, None)
if parsed:
return datetime.fromtimestamp(mktime(parsed), tz=timezone.utc)
return None
def is_within_lookback(dt: datetime | None, hours: int) -> bool:
if dt is None:
return True
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
return dt >= cutoff
def fetch_feed(url: str) -> list[dict]:
try:
resp = _session.get(url, timeout=30)
resp.raise_for_status()
raw = resp.content
except requests.RequestException as e:
logger.warning("Failed to fetch %s: %s", url, e)
return []
feed = feedparser.parse(raw)
if feed.bozo and not feed.entries:
logger.warning("Feed parse error for %s: %s", url, feed.bozo_exception)
return []
return feed.entries
def fetch_content(url: str) -> str | None:
try:
resp = _session.get(url, timeout=15)
resp.raise_for_status()
html = resp.content
except requests.RequestException as e:
logger.warning("Failed to fetch content from %s: %s", url, e)
return None
soup = BeautifulSoup(html, "html.parser")
# Remove non-content elements
for tag in soup.find_all(["script", "style", "nav", "header", "footer", "aside", "form"]):
tag.decompose()
# Try common article content containers first
article = (
soup.find("article")
or soup.find(attrs={"role": "main"})
or soup.find("main")
or soup.find(class_=lambda c: c and ("article" in c or "content" in c or "post" in c))
)
target = article if article else soup.body if soup.body else soup
text = target.get_text(separator="\n", strip=True)
# Collapse excessive blank lines
lines = [line for line in text.splitlines() if line.strip()]
return "\n".join(lines) if lines else None
def save_articles(conn: sqlite3.Connection, articles: list[dict]) -> list[str]:
"""Insert articles, return list of URLs that were newly inserted."""
new_urls = []
now = datetime.now(timezone.utc).isoformat()
for a in articles:
try:
conn.execute(
"""INSERT OR IGNORE INTO articles
(url, title, description, published_date, fetched_date,
feed_name, feed_url, category, author)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
a["url"],
a.get("title"),
a.get("description"),
a.get("published_date"),
now,
a.get("feed_name"),
a.get("feed_url"),
a.get("category"),
a.get("author"),
),
)
if conn.execute("SELECT changes()").fetchone()[0] > 0:
new_urls.append(a["url"])
except sqlite3.Error as e:
logger.warning("DB insert error for %s: %s", a.get("url"), e)
conn.commit()
return new_urls
def purge_old_articles(conn: sqlite3.Connection, days: int) -> int:
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
conn.execute("DELETE FROM articles WHERE fetched_date < ?", (cutoff,))
deleted = conn.execute("SELECT changes()").fetchone()[0]
conn.commit()
return deleted
def get_recent_articles(conn: sqlite3.Connection, hours: int) -> list[dict]:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
rows = conn.execute(
"SELECT * FROM articles WHERE published_date >= ? OR published_date IS NULL ORDER BY id",
(cutoff,),
).fetchall()
return [dict(r) for r in rows]
def generate_summary(title: str, description: str | None, content: str | None, model: str, prompt: str) -> str | None:
try:
import ollama as ollama_lib
except ImportError:
logger.warning("ollama package not installed; skipping summary")
return None
body = content or description
article_text = f"Title: {title}"
if body:
article_text += f"\n\n{body}"
user_message = f"{prompt}\n\n{article_text}"
try:
response = ollama_lib.chat(
model=model,
messages=[{"role": "user", "content": user_message}],
)
return response["message"]["content"]
except Exception as e:
logger.warning("Ollama error for '%s': %s", title, e)
return None
def _run_test(mode: str, config: dict) -> None:
"""Run smoke tests for feed fetching, summarization, or both.
All JSON results go to stdout; status messages go to stderr.
"""
if mode not in ("", "feed", "summary"):
print(f"Unknown test mode: {mode!r} (use 'feed', 'summary', or omit)", file=sys.stderr)
sys.exit(1)
feed_article = None # may be populated by feed test for use in full mode
# --- Feed test ---
if mode in ("", "feed"):
print("=== Feed test ===", file=sys.stderr)
feeds = config.get("feeds", [])
enabled = [f for f in feeds if f.get("enabled", True)]
if not enabled:
print("FAIL: no enabled feeds in config", file=sys.stderr)
sys.exit(1)
feed_cfg = enabled[0]
url = feed_cfg["url"]
name = feed_cfg.get("name", url)
print(f"Fetching feed: {name} ({url})", file=sys.stderr)
entries = fetch_feed(url)
if not entries:
print("FAIL: no entries returned from feed", file=sys.stderr)
sys.exit(1)
entry = entries[0]
link = entry.get("link", "")
title = entry.get("title", "")
print(f"Fetching content: {link}", file=sys.stderr)
content = fetch_content(link) if link else None
result = {
"feed": name,
"title": title,
"url": link,
"content_length": len(content) if content else 0,
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if content:
print("PASS: feed fetch", file=sys.stderr)
feed_article = {"title": title, "content": content}
else:
print("FAIL: could not fetch article content", file=sys.stderr)
if mode == "feed":
sys.exit(1)
# --- Summary test ---
if mode in ("", "summary"):
print("=== Summary test ===", file=sys.stderr)
ollama_cfg = config.get("ollama")
if not ollama_cfg:
print("FAIL: no 'ollama' key in config", file=sys.stderr)
sys.exit(1)
model = ollama_cfg.get("model", "kamekichi128/qwen3-4b-instruct-2507")
prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences:")
# Build test inputs: hardcoded articles + fetched article (full mode only)
articles = list(_TEST_ARTICLES)
if feed_article:
articles.append(feed_article)
all_ok = True
for article in articles:
print(f"Summarizing: {article['title']}", file=sys.stderr)
summary = generate_summary(article["title"], None, article["content"], model, prompt)
result = {"title": article["title"], "summary": summary}
print(json.dumps(result, ensure_ascii=False, indent=2))
if not summary:
all_ok = False
if all_ok:
print("PASS: summary", file=sys.stderr)
else:
print("FAIL: one or more summaries failed", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="RSS News Digest")
parser.add_argument("-c", "--config", default="config.json", help="Config file path")
parser.add_argument("-d", "--database", default="news_digest.db", help="SQLite database path")
parser.add_argument("--hours", type=int, help="Override lookback hours")
parser.add_argument("-f", "--fields", default="id,title,url,published_date,fetched_date,feed_name", help="Comma-separated output fields")
parser.add_argument("--purge-only", action="store_true", help="Only purge old articles")
parser.add_argument("--no-fetch", action="store_true", help="Skip fetching feeds, only query stored articles")
parser.add_argument("-v", "--verbose", action="store_true", help="Debug logging to stderr")
parser.add_argument("--test", nargs="?", const="", metavar="MODE",
help="Smoke test: 'feed', 'summary', or omit for full pipeline")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.WARNING,
format="%(asctime)s %(levelname)s %(message)s",
stream=sys.stderr,
)
config_path = Path(args.config)
if not config_path.exists():
logger.error("Config file not found: %s", config_path)
sys.exit(1)
config = load_config(str(config_path))
# Handle --test before any DB operations
if args.test is not None:
_run_test(args.test, config)
return
settings = config.get("settings", {})
hours_lookback = args.hours or settings.get("hours_lookback", 24)
retention_days = settings.get("retention_days", 30)
max_per_feed = settings.get("max_articles_per_feed", 0)
conn = init_db(args.database)
# Purge old articles
deleted = purge_old_articles(conn, retention_days)
if deleted:
logger.info("Purged %d articles older than %d days", deleted, retention_days)
if args.purge_only:
logger.info("Purge-only mode; exiting")
conn.close()
return
# Fetch feeds
if not args.no_fetch:
feeds = config.get("feeds", [])
total_new = 0
# Read ollama config once for summarization during fetch
ollama_cfg = config.get("ollama")
if ollama_cfg:
ollama_model = ollama_cfg.get("model", "kamekichi128/qwen3-4b-instruct-2507")
ollama_prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences:")
logger.debug("Ollama summarization enabled (model: %s)", ollama_model)
else:
ollama_model = ollama_prompt = None
logger.debug("Ollama not configured; skipping summarization")
for feed_cfg in feeds:
if not feed_cfg.get("enabled", True):
logger.debug("Skipping disabled feed: %s", feed_cfg.get("name"))
continue
url = feed_cfg["url"]
logger.debug("Fetching feed: %s (%s)", feed_cfg.get("name", url), url)
entries = fetch_feed(url)
logger.debug("Got %d entries from %s", len(entries), feed_cfg.get("name", url))
articles = []
for entry in entries:
pub_date = parse_article_date(entry)
if not is_within_lookback(pub_date, hours_lookback):
continue
link = entry.get("link", "")
if not link:
continue
articles.append({
"url": link,
"title": entry.get("title"),
"description": entry.get("summary"),
"published_date": pub_date.isoformat() if pub_date else None,
"feed_name": feed_cfg.get("name"),
"feed_url": url,
"category": feed_cfg.get("category"),
"author": entry.get("author"),
})
# Cap articles per feed to avoid flooding the DB and downstream fetches
if max_per_feed > 0:
articles = articles[:max_per_feed]
new_urls = save_articles(conn, articles)
total_new += len(new_urls)
logger.info("Feed '%s': %d new articles (of %d within lookback)",
feed_cfg.get("name", url), len(new_urls), len(articles))
# Fetch full content and optionally summarize newly inserted articles
for i, article_url in enumerate(new_urls):
if i > 0 and not ollama_cfg:
time.sleep(1) # rate limit when Ollama isn't providing natural delay
logger.debug("Fetching content: %s", article_url)
content = fetch_content(article_url)
summary = None
if ollama_cfg:
row = conn.execute(
"SELECT title, description FROM articles WHERE url = ?", (article_url,)
).fetchone()
if row:
summary = generate_summary(row["title"], row["description"], content, ollama_model, ollama_prompt)
if summary:
logger.debug("Generated summary for %s", article_url)
else:
if i > 0:
time.sleep(1) # fallback rate limit on summary failure
conn.execute(
"UPDATE articles SET content = ?, summary = ? WHERE url = ?",
(content, summary, article_url),
)
conn.commit()
logger.info("Total new articles saved: %d", total_new)
# Output recent articles
recent = get_recent_articles(conn, hours_lookback)
fields = [f.strip() for f in args.fields.split(",")]
output = [{k: article[k] for k in fields if k in article} for article in recent]
print(json.dumps(output, ensure_ascii=False, indent=2))
conn.close()
if __name__ == "__main__":
main()