474 lines
18 KiB
Python
474 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""RSS News Digest — fetch feeds, store articles with full content in SQLite, and summarize via Ollama during fetch.
|
||
|
||
Recommended: run via ./run.sh, which uses `uv` to handle dependencies
|
||
automatically (no manual venv or pip install needed).
|
||
|
||
When an `ollama` key is present in config.json, each newly fetched article is
|
||
automatically summarized and the result is stored in the database. Ollama
|
||
latency provides natural rate limiting between HTTP requests; when Ollama is
|
||
not configured, a 1-second sleep is used instead.
|
||
|
||
Uses a requests.Session with automatic retries and browser-like headers to
|
||
handle transient HTTP errors (429/5xx). A configurable per-feed article cap
|
||
helps avoid overwhelming upstream servers.
|
||
|
||
Use ``--test`` to smoke-test feed fetching and/or Ollama summarization without
|
||
writing to the database.
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from time import mktime
|
||
|
||
import feedparser
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
|
||
logger = logging.getLogger("news_digest")
|
||
|
||
# Hardcoded test articles for --test summary (one English, one Chinese)
|
||
_TEST_ARTICLES = [
|
||
{
|
||
"title": "Global Semiconductor Shortage Eases as New Factories Come Online",
|
||
"content": (
|
||
"The global chip shortage that disrupted industries from automotive to "
|
||
"consumer electronics is finally showing signs of relief. Major semiconductor "
|
||
"manufacturers including TSMC, Samsung, and Intel have begun production at new "
|
||
"fabrication plants in Arizona, Texas, and Japan. Industry analysts project that "
|
||
"global chip capacity will increase by 15% over the next 18 months, potentially "
|
||
"leading to a supply surplus in certain categories. The shift has already begun "
|
||
"to impact pricing, with memory chip costs dropping 12% in the last quarter."
|
||
),
|
||
},
|
||
{
|
||
"title": "中国新能源汽车出口量首次突破年度600万辆大关",
|
||
"content": (
|
||
"据中国汽车工业协会最新数据,2025年中国新能源汽车出口量首次突破600万辆,"
|
||
"同比增长38%。比亚迪、上汽、蔚来等品牌在东南亚、欧洲和南美市场持续扩张。"
|
||
"分析人士指出,中国在电池技术和供应链方面的优势使其产品在全球市场具有较强"
|
||
"竞争力,但欧盟加征的反补贴关税可能对未来增长构成挑战。"
|
||
),
|
||
},
|
||
]
|
||
|
||
|
||
def _build_session() -> requests.Session:
|
||
"""Create a requests session with automatic retries and browser-like headers."""
|
||
session = requests.Session()
|
||
retry = Retry(
|
||
total=3,
|
||
backoff_factor=1, # 1s, 2s, 4s between retries
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
respect_retry_after_header=True,
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retry)
|
||
session.mount("http://", adapter)
|
||
session.mount("https://", adapter)
|
||
session.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/131.0.0.0 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Accept-Language": "en-US,en;q=0.9",
|
||
})
|
||
return session
|
||
|
||
|
||
_session = _build_session()
|
||
|
||
|
||
def load_config(path: str) -> dict:
|
||
with open(path, encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def init_db(db_path: str) -> sqlite3.Connection:
|
||
conn = sqlite3.connect(db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS articles (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
url TEXT UNIQUE NOT NULL,
|
||
title TEXT,
|
||
description TEXT,
|
||
content TEXT,
|
||
summary TEXT,
|
||
published_date TEXT,
|
||
fetched_date TEXT NOT NULL,
|
||
feed_name TEXT,
|
||
feed_url TEXT,
|
||
category TEXT,
|
||
author TEXT
|
||
)
|
||
""")
|
||
conn.commit()
|
||
return conn
|
||
|
||
|
||
def parse_article_date(entry) -> datetime | None:
|
||
for attr in ("published_parsed", "updated_parsed"):
|
||
parsed = getattr(entry, attr, None)
|
||
if parsed:
|
||
return datetime.fromtimestamp(mktime(parsed), tz=timezone.utc)
|
||
return None
|
||
|
||
|
||
def is_within_lookback(dt: datetime | None, hours: int) -> bool:
|
||
if dt is None:
|
||
return True
|
||
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
||
return dt >= cutoff
|
||
|
||
|
||
def fetch_feed(url: str) -> list[dict]:
|
||
try:
|
||
resp = _session.get(url, timeout=30)
|
||
resp.raise_for_status()
|
||
raw = resp.content
|
||
except requests.RequestException as e:
|
||
logger.warning("Failed to fetch %s: %s", url, e)
|
||
return []
|
||
|
||
feed = feedparser.parse(raw)
|
||
if feed.bozo and not feed.entries:
|
||
logger.warning("Feed parse error for %s: %s", url, feed.bozo_exception)
|
||
return []
|
||
return feed.entries
|
||
|
||
|
||
def fetch_content(url: str) -> str | None:
|
||
try:
|
||
resp = _session.get(url, timeout=15)
|
||
resp.raise_for_status()
|
||
html = resp.content
|
||
except requests.RequestException as e:
|
||
logger.warning("Failed to fetch content from %s: %s", url, e)
|
||
return None
|
||
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
|
||
# Remove non-content elements
|
||
for tag in soup.find_all(["script", "style", "nav", "header", "footer", "aside", "form"]):
|
||
tag.decompose()
|
||
|
||
# Try common article content containers first
|
||
article = (
|
||
soup.find("article")
|
||
or soup.find(attrs={"role": "main"})
|
||
or soup.find("main")
|
||
or soup.find(class_=lambda c: c and ("article" in c or "content" in c or "post" in c))
|
||
)
|
||
|
||
target = article if article else soup.body if soup.body else soup
|
||
text = target.get_text(separator="\n", strip=True)
|
||
|
||
# Collapse excessive blank lines
|
||
lines = [line for line in text.splitlines() if line.strip()]
|
||
return "\n".join(lines) if lines else None
|
||
|
||
|
||
def save_articles(conn: sqlite3.Connection, articles: list[dict]) -> list[str]:
|
||
"""Insert articles, return list of URLs that were newly inserted."""
|
||
new_urls = []
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
for a in articles:
|
||
try:
|
||
conn.execute(
|
||
"""INSERT OR IGNORE INTO articles
|
||
(url, title, description, published_date, fetched_date,
|
||
feed_name, feed_url, category, author)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||
(
|
||
a["url"],
|
||
a.get("title"),
|
||
a.get("description"),
|
||
a.get("published_date"),
|
||
now,
|
||
a.get("feed_name"),
|
||
a.get("feed_url"),
|
||
a.get("category"),
|
||
a.get("author"),
|
||
),
|
||
)
|
||
if conn.execute("SELECT changes()").fetchone()[0] > 0:
|
||
new_urls.append(a["url"])
|
||
except sqlite3.Error as e:
|
||
logger.warning("DB insert error for %s: %s", a.get("url"), e)
|
||
conn.commit()
|
||
return new_urls
|
||
|
||
|
||
def purge_old_articles(conn: sqlite3.Connection, days: int) -> int:
|
||
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
|
||
conn.execute("DELETE FROM articles WHERE fetched_date < ?", (cutoff,))
|
||
deleted = conn.execute("SELECT changes()").fetchone()[0]
|
||
conn.commit()
|
||
return deleted
|
||
|
||
|
||
def get_recent_articles(conn: sqlite3.Connection, hours: int) -> list[dict]:
|
||
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
||
rows = conn.execute(
|
||
"SELECT * FROM articles WHERE published_date >= ? OR published_date IS NULL ORDER BY id",
|
||
(cutoff,),
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def generate_summary(title: str, description: str | None, content: str | None, model: str, prompt: str) -> str | None:
|
||
try:
|
||
import ollama as ollama_lib
|
||
except ImportError:
|
||
logger.warning("ollama package not installed; skipping summary")
|
||
return None
|
||
|
||
body = content or description
|
||
article_text = f"Title: {title}"
|
||
if body:
|
||
article_text += f"\n\n{body}"
|
||
user_message = f"{prompt}\n\n{article_text}"
|
||
|
||
try:
|
||
response = ollama_lib.chat(
|
||
model=model,
|
||
messages=[{"role": "user", "content": user_message}],
|
||
)
|
||
return response["message"]["content"]
|
||
except Exception as e:
|
||
logger.warning("Ollama error for '%s': %s", title, e)
|
||
return None
|
||
|
||
|
||
def _run_test(mode: str, config: dict) -> None:
|
||
"""Run smoke tests for feed fetching, summarization, or both.
|
||
|
||
All JSON results go to stdout; status messages go to stderr.
|
||
"""
|
||
if mode not in ("", "feed", "summary"):
|
||
print(f"Unknown test mode: {mode!r} (use 'feed', 'summary', or omit)", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
feed_article = None # may be populated by feed test for use in full mode
|
||
|
||
# --- Feed test ---
|
||
if mode in ("", "feed"):
|
||
print("=== Feed test ===", file=sys.stderr)
|
||
feeds = config.get("feeds", [])
|
||
enabled = [f for f in feeds if f.get("enabled", True)]
|
||
if not enabled:
|
||
print("FAIL: no enabled feeds in config", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
feed_cfg = enabled[0]
|
||
url = feed_cfg["url"]
|
||
name = feed_cfg.get("name", url)
|
||
print(f"Fetching feed: {name} ({url})", file=sys.stderr)
|
||
|
||
entries = fetch_feed(url)
|
||
if not entries:
|
||
print("FAIL: no entries returned from feed", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
entry = entries[0]
|
||
link = entry.get("link", "")
|
||
title = entry.get("title", "")
|
||
print(f"Fetching content: {link}", file=sys.stderr)
|
||
content = fetch_content(link) if link else None
|
||
|
||
result = {
|
||
"feed": name,
|
||
"title": title,
|
||
"url": link,
|
||
"content_length": len(content) if content else 0,
|
||
}
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
|
||
if content:
|
||
print("PASS: feed fetch", file=sys.stderr)
|
||
feed_article = {"title": title, "content": content}
|
||
else:
|
||
print("FAIL: could not fetch article content", file=sys.stderr)
|
||
if mode == "feed":
|
||
sys.exit(1)
|
||
|
||
# --- Summary test ---
|
||
if mode in ("", "summary"):
|
||
print("=== Summary test ===", file=sys.stderr)
|
||
ollama_cfg = config.get("ollama")
|
||
if not ollama_cfg:
|
||
print("FAIL: no 'ollama' key in config", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
model = ollama_cfg.get("model", "kamekichi128/qwen3-4b-instruct-2507")
|
||
prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences (around 50 words):")
|
||
|
||
# Build test inputs: hardcoded articles + fetched article (full mode only)
|
||
articles = list(_TEST_ARTICLES)
|
||
if feed_article:
|
||
articles.append(feed_article)
|
||
|
||
all_ok = True
|
||
for article in articles:
|
||
print(f"Summarizing: {article['title']}", file=sys.stderr)
|
||
summary = generate_summary(article["title"], None, article["content"], model, prompt)
|
||
result = {"title": article["title"], "summary": summary}
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
if not summary:
|
||
all_ok = False
|
||
|
||
if all_ok:
|
||
print("PASS: summary", file=sys.stderr)
|
||
else:
|
||
print("FAIL: one or more summaries failed", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="RSS News Digest")
|
||
parser.add_argument("-c", "--config", default="config.json", help="Config file path")
|
||
parser.add_argument("-d", "--database", default="news_digest.db", help="SQLite database path")
|
||
parser.add_argument("--hours", type=int, help="Override lookback hours")
|
||
parser.add_argument("-f", "--fields", default="id,title,url,published_date,fetched_date,feed_name", help="Comma-separated output fields")
|
||
parser.add_argument("--purge-only", action="store_true", help="Only purge old articles")
|
||
parser.add_argument("--no-fetch", action="store_true", help="Skip fetching feeds, only query stored articles")
|
||
parser.add_argument("-v", "--verbose", action="store_true", help="Debug logging to stderr")
|
||
parser.add_argument("--test", nargs="?", const="", metavar="MODE",
|
||
help="Smoke test: 'feed', 'summary', or omit for full pipeline")
|
||
args = parser.parse_args()
|
||
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.verbose else logging.WARNING,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
stream=sys.stderr,
|
||
)
|
||
|
||
config_path = Path(args.config)
|
||
if not config_path.exists():
|
||
logger.error("Config file not found: %s", config_path)
|
||
sys.exit(1)
|
||
|
||
config = load_config(str(config_path))
|
||
|
||
# Handle --test before any DB operations
|
||
if args.test is not None:
|
||
_run_test(args.test, config)
|
||
return
|
||
|
||
settings = config.get("settings", {})
|
||
hours_lookback = args.hours or settings.get("hours_lookback", 24)
|
||
retention_days = settings.get("retention_days", 30)
|
||
max_per_feed = settings.get("max_articles_per_feed", 0)
|
||
|
||
conn = init_db(args.database)
|
||
try:
|
||
# Purge old articles
|
||
deleted = purge_old_articles(conn, retention_days)
|
||
if deleted:
|
||
logger.info("Purged %d articles older than %d days", deleted, retention_days)
|
||
|
||
if args.purge_only:
|
||
logger.info("Purge-only mode; exiting")
|
||
return
|
||
|
||
# Fetch feeds
|
||
if not args.no_fetch:
|
||
feeds = config.get("feeds", [])
|
||
total_new = 0
|
||
|
||
# Read ollama config once for summarization during fetch
|
||
ollama_cfg = config.get("ollama")
|
||
if ollama_cfg:
|
||
ollama_model = ollama_cfg.get("model", "kamekichi128/qwen3-4b-instruct-2507")
|
||
ollama_prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences (around 50 words):")
|
||
logger.debug("Ollama summarization enabled (model: %s)", ollama_model)
|
||
else:
|
||
ollama_model = ollama_prompt = None
|
||
logger.debug("Ollama not configured; skipping summarization")
|
||
|
||
for feed_cfg in feeds:
|
||
if not feed_cfg.get("enabled", True):
|
||
logger.debug("Skipping disabled feed: %s", feed_cfg.get("name"))
|
||
continue
|
||
|
||
url = feed_cfg["url"]
|
||
logger.debug("Fetching feed: %s (%s)", feed_cfg.get("name", url), url)
|
||
entries = fetch_feed(url)
|
||
logger.debug("Got %d entries from %s", len(entries), feed_cfg.get("name", url))
|
||
|
||
articles = []
|
||
for entry in entries:
|
||
pub_date = parse_article_date(entry)
|
||
if not is_within_lookback(pub_date, hours_lookback):
|
||
continue
|
||
|
||
link = entry.get("link", "")
|
||
if not link:
|
||
continue
|
||
|
||
articles.append({
|
||
"url": link,
|
||
"title": entry.get("title"),
|
||
"description": entry.get("summary"),
|
||
"published_date": pub_date.isoformat() if pub_date else None,
|
||
"feed_name": feed_cfg.get("name"),
|
||
"feed_url": url,
|
||
"category": feed_cfg.get("category"),
|
||
"author": entry.get("author"),
|
||
})
|
||
|
||
# Cap articles per feed to avoid flooding the DB and downstream fetches
|
||
if max_per_feed > 0:
|
||
articles = articles[:max_per_feed]
|
||
|
||
new_urls = save_articles(conn, articles)
|
||
total_new += len(new_urls)
|
||
logger.info("Feed '%s': %d new articles (of %d within lookback)",
|
||
feed_cfg.get("name", url), len(new_urls), len(articles))
|
||
|
||
# Fetch full content and optionally summarize newly inserted articles
|
||
for i, article_url in enumerate(new_urls):
|
||
if i > 0 and not ollama_cfg:
|
||
time.sleep(1) # rate limit when Ollama isn't providing natural delay
|
||
logger.debug("Fetching content: %s", article_url)
|
||
content = fetch_content(article_url)
|
||
summary = None
|
||
if ollama_cfg:
|
||
row = conn.execute(
|
||
"SELECT title, description FROM articles WHERE url = ?", (article_url,)
|
||
).fetchone()
|
||
if row:
|
||
summary = generate_summary(row["title"], row["description"], content, ollama_model, ollama_prompt)
|
||
if summary:
|
||
logger.debug("Generated summary for %s", article_url)
|
||
else:
|
||
if i > 0:
|
||
time.sleep(1) # fallback rate limit on summary failure
|
||
conn.execute(
|
||
"UPDATE articles SET content = ?, summary = ? WHERE url = ?",
|
||
(content, summary, article_url),
|
||
)
|
||
conn.commit()
|
||
|
||
logger.info("Total new articles saved: %d", total_new)
|
||
|
||
# Output recent articles
|
||
recent = get_recent_articles(conn, hours_lookback)
|
||
fields = [f.strip() for f in args.fields.split(",")]
|
||
output = [{k: article[k] for k in fields if k in article} for article in recent]
|
||
print(json.dumps(output, ensure_ascii=False, indent=2))
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|