#!/usr/bin/env python3 """RSS News Digest — fetch feeds, store articles with full content in SQLite, optionally summarize via Ollama.""" import argparse import json import logging import sqlite3 import sys from datetime import datetime, timedelta, timezone from pathlib import Path from time import mktime from urllib.request import Request, urlopen from urllib.error import URLError import feedparser from bs4 import BeautifulSoup logger = logging.getLogger("news_digest") def load_config(path: str) -> dict: with open(path, encoding="utf-8") as f: return json.load(f) def init_db(db_path: str) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row conn.execute(""" CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, title TEXT, description TEXT, content TEXT, published_date TEXT, fetched_date TEXT NOT NULL, feed_name TEXT, feed_url TEXT, category TEXT, author TEXT ) """) # Migrate: add content column if missing (existing DBs) try: conn.execute("ALTER TABLE articles ADD COLUMN content TEXT") except sqlite3.OperationalError: pass conn.commit() return conn def parse_article_date(entry) -> datetime | None: for attr in ("published_parsed", "updated_parsed"): parsed = getattr(entry, attr, None) if parsed: return datetime.fromtimestamp(mktime(parsed), tz=timezone.utc) return None def is_within_lookback(dt: datetime | None, hours: int) -> bool: if dt is None: return True cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) return dt >= cutoff _USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)" def fetch_feed(url: str) -> list[dict]: try: req = Request(url, headers={"User-Agent": _USER_AGENT}) with urlopen(req, timeout=30) as resp: raw = resp.read() except (URLError, OSError) as e: logger.warning("Failed to fetch %s: %s", url, e) return [] feed = feedparser.parse(raw) if feed.bozo and not feed.entries: logger.warning("Feed parse error for %s: %s", url, feed.bozo_exception) return [] return feed.entries def fetch_content(url: str) -> str | None: try: req = Request(url, headers={"User-Agent": _USER_AGENT}) with urlopen(req, timeout=15) as resp: html = resp.read() except (URLError, OSError) as e: logger.warning("Failed to fetch content from %s: %s", url, e) return None soup = BeautifulSoup(html, "html.parser") # Remove non-content elements for tag in soup.find_all(["script", "style", "nav", "header", "footer", "aside", "form"]): tag.decompose() # Try common article content containers first article = ( soup.find("article") or soup.find(attrs={"role": "main"}) or soup.find("main") or soup.find(class_=lambda c: c and ("article" in c or "content" in c or "post" in c)) ) target = article if article else soup.body if soup.body else soup text = target.get_text(separator="\n", strip=True) # Collapse excessive blank lines lines = [line for line in text.splitlines() if line.strip()] return "\n".join(lines) if lines else None def save_articles(conn: sqlite3.Connection, articles: list[dict]) -> list[str]: """Insert articles, return list of URLs that were newly inserted.""" new_urls = [] now = datetime.now(timezone.utc).isoformat() for a in articles: try: conn.execute( """INSERT OR IGNORE INTO articles (url, title, description, published_date, fetched_date, feed_name, feed_url, category, author) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( a["url"], a.get("title"), a.get("description"), a.get("published_date"), now, a.get("feed_name"), a.get("feed_url"), a.get("category"), a.get("author"), ), ) if conn.execute("SELECT changes()").fetchone()[0] > 0: new_urls.append(a["url"]) except sqlite3.Error as e: logger.warning("DB insert error for %s: %s", a.get("url"), e) conn.commit() return new_urls def purge_old_articles(conn: sqlite3.Connection, days: int) -> int: cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() conn.execute("DELETE FROM articles WHERE fetched_date < ?", (cutoff,)) deleted = conn.execute("SELECT changes()").fetchone()[0] conn.commit() return deleted def get_recent_articles(conn: sqlite3.Connection, hours: int) -> list[dict]: cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() rows = conn.execute( "SELECT * FROM articles WHERE fetched_date >= ? ORDER BY id", (cutoff,) ).fetchall() return [dict(r) for r in rows] def get_articles_by_ids(conn: sqlite3.Connection, ids: list[int]) -> list[dict]: placeholders = ",".join("?" for _ in ids) rows = conn.execute( f"SELECT * FROM articles WHERE id IN ({placeholders}) ORDER BY id", ids ).fetchall() return [dict(r) for r in rows] def generate_summary(title: str, description: str | None, content: str | None, model: str, prompt: str) -> str | None: try: import ollama as ollama_lib except ImportError: logger.warning("ollama package not installed; skipping summary") return None body = content or description article_text = f"Title: {title}" if body: article_text += f"\n\n{body}" user_message = f"{prompt}\n\n{article_text}" try: response = ollama_lib.chat( model=model, messages=[{"role": "user", "content": user_message}], ) return response["message"]["content"] except Exception as e: logger.warning("Ollama error for '%s': %s", title, e) return None def main(): parser = argparse.ArgumentParser(description="RSS News Digest") parser.add_argument("-c", "--config", default="config.json", help="Config file path") parser.add_argument("-d", "--database", default="news_digest.db", help="SQLite database path") parser.add_argument("--hours", type=int, help="Override lookback hours") parser.add_argument("-f", "--fields", default="id,title,url", help="Comma-separated output fields") parser.add_argument("--digest", help="Article IDs to summarize (comma-separated, e.g. 1,3,7)") parser.add_argument("--purge-only", action="store_true", help="Only purge old articles") parser.add_argument("--no-fetch", action="store_true", help="Skip fetching feeds, only query stored articles") parser.add_argument("-v", "--verbose", action="store_true", help="Debug logging to stderr") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.WARNING, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stderr, ) config_path = Path(args.config) if not config_path.exists(): logger.error("Config file not found: %s", config_path) sys.exit(1) config = load_config(str(config_path)) settings = config.get("settings", {}) hours_lookback = args.hours or settings.get("hours_lookback", 24) retention_days = settings.get("retention_days", 30) conn = init_db(args.database) # Digest mode — summarize specified articles, then exit if args.digest: ollama_cfg = config.get("ollama", {}) model = ollama_cfg.get("model", "qwen3") prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences:") ids = [int(x.strip()) for x in args.digest.split(",")] articles = get_articles_by_ids(conn, ids) if not articles: logger.warning("No articles found for IDs: %s", ids) results = [] for article in articles: logger.debug("Summarizing article %d: %s", article["id"], article["title"]) summary = generate_summary(article["title"], article.get("description"), article.get("content"), model, prompt) results.append({ "id": article["id"], "title": article["title"], "url": article["url"], "summary": summary, }) print(json.dumps(results, ensure_ascii=False, indent=2)) conn.close() return # Purge old articles deleted = purge_old_articles(conn, retention_days) if deleted: logger.info("Purged %d articles older than %d days", deleted, retention_days) if args.purge_only: logger.info("Purge-only mode; exiting") conn.close() return # Fetch feeds if not args.no_fetch: feeds = config.get("feeds", []) total_new = 0 for feed_cfg in feeds: if not feed_cfg.get("enabled", True): logger.debug("Skipping disabled feed: %s", feed_cfg.get("name")) continue url = feed_cfg["url"] logger.debug("Fetching feed: %s (%s)", feed_cfg.get("name", url), url) entries = fetch_feed(url) logger.debug("Got %d entries from %s", len(entries), feed_cfg.get("name", url)) articles = [] for entry in entries: pub_date = parse_article_date(entry) if not is_within_lookback(pub_date, hours_lookback): continue link = entry.get("link", "") if not link: continue articles.append({ "url": link, "title": entry.get("title"), "description": entry.get("summary"), "published_date": pub_date.isoformat() if pub_date else None, "feed_name": feed_cfg.get("name"), "feed_url": url, "category": feed_cfg.get("category"), "author": entry.get("author"), }) new_urls = save_articles(conn, articles) total_new += len(new_urls) logger.info("Feed '%s': %d new articles (of %d within lookback)", feed_cfg.get("name", url), len(new_urls), len(articles)) # Fetch full content for newly inserted articles for article_url in new_urls: logger.debug("Fetching content: %s", article_url) content = fetch_content(article_url) if content: conn.execute("UPDATE articles SET content = ? WHERE url = ?", (content, article_url)) logger.debug("Saved content (%d chars) for %s", len(content), article_url) conn.commit() logger.info("Total new articles saved: %d", total_new) # Output recent articles recent = get_recent_articles(conn, hours_lookback) fields = [f.strip() for f in args.fields.split(",")] output = [{k: article[k] for k in fields if k in article} for article in recent] print(json.dumps(output, ensure_ascii=False, indent=2)) conn.close() if __name__ == "__main__": main()