327 lines
11 KiB
Python
327 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""RSS News Digest — fetch feeds, store articles with full content in SQLite, optionally summarize via Ollama."""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from time import mktime
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import URLError
|
|
|
|
import feedparser
|
|
from bs4 import BeautifulSoup
|
|
|
|
logger = logging.getLogger("news_digest")
|
|
|
|
|
|
def load_config(path: str) -> dict:
|
|
with open(path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def init_db(db_path: str) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
url TEXT UNIQUE NOT NULL,
|
|
title TEXT,
|
|
description TEXT,
|
|
content TEXT,
|
|
published_date TEXT,
|
|
fetched_date TEXT NOT NULL,
|
|
feed_name TEXT,
|
|
feed_url TEXT,
|
|
category TEXT,
|
|
author TEXT
|
|
)
|
|
""")
|
|
# Migrate: add content column if missing (existing DBs)
|
|
try:
|
|
conn.execute("ALTER TABLE articles ADD COLUMN content TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def parse_article_date(entry) -> datetime | None:
|
|
for attr in ("published_parsed", "updated_parsed"):
|
|
parsed = getattr(entry, attr, None)
|
|
if parsed:
|
|
return datetime.fromtimestamp(mktime(parsed), tz=timezone.utc)
|
|
return None
|
|
|
|
|
|
def is_within_lookback(dt: datetime | None, hours: int) -> bool:
|
|
if dt is None:
|
|
return True
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
return dt >= cutoff
|
|
|
|
|
|
_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)"
|
|
|
|
|
|
def fetch_feed(url: str) -> list[dict]:
|
|
try:
|
|
req = Request(url, headers={"User-Agent": _USER_AGENT})
|
|
with urlopen(req, timeout=30) as resp:
|
|
raw = resp.read()
|
|
except (URLError, OSError) as e:
|
|
logger.warning("Failed to fetch %s: %s", url, e)
|
|
return []
|
|
|
|
feed = feedparser.parse(raw)
|
|
if feed.bozo and not feed.entries:
|
|
logger.warning("Feed parse error for %s: %s", url, feed.bozo_exception)
|
|
return []
|
|
return feed.entries
|
|
|
|
|
|
def fetch_content(url: str) -> str | None:
|
|
try:
|
|
req = Request(url, headers={"User-Agent": _USER_AGENT})
|
|
with urlopen(req, timeout=15) as resp:
|
|
html = resp.read()
|
|
except (URLError, OSError) as e:
|
|
logger.warning("Failed to fetch content from %s: %s", url, e)
|
|
return None
|
|
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Remove non-content elements
|
|
for tag in soup.find_all(["script", "style", "nav", "header", "footer", "aside", "form"]):
|
|
tag.decompose()
|
|
|
|
# Try common article content containers first
|
|
article = (
|
|
soup.find("article")
|
|
or soup.find(attrs={"role": "main"})
|
|
or soup.find("main")
|
|
or soup.find(class_=lambda c: c and ("article" in c or "content" in c or "post" in c))
|
|
)
|
|
|
|
target = article if article else soup.body if soup.body else soup
|
|
text = target.get_text(separator="\n", strip=True)
|
|
|
|
# Collapse excessive blank lines
|
|
lines = [line for line in text.splitlines() if line.strip()]
|
|
return "\n".join(lines) if lines else None
|
|
|
|
|
|
def save_articles(conn: sqlite3.Connection, articles: list[dict]) -> list[str]:
|
|
"""Insert articles, return list of URLs that were newly inserted."""
|
|
new_urls = []
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
for a in articles:
|
|
try:
|
|
conn.execute(
|
|
"""INSERT OR IGNORE INTO articles
|
|
(url, title, description, published_date, fetched_date,
|
|
feed_name, feed_url, category, author)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
a["url"],
|
|
a.get("title"),
|
|
a.get("description"),
|
|
a.get("published_date"),
|
|
now,
|
|
a.get("feed_name"),
|
|
a.get("feed_url"),
|
|
a.get("category"),
|
|
a.get("author"),
|
|
),
|
|
)
|
|
if conn.execute("SELECT changes()").fetchone()[0] > 0:
|
|
new_urls.append(a["url"])
|
|
except sqlite3.Error as e:
|
|
logger.warning("DB insert error for %s: %s", a.get("url"), e)
|
|
conn.commit()
|
|
return new_urls
|
|
|
|
|
|
def purge_old_articles(conn: sqlite3.Connection, days: int) -> int:
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
|
|
conn.execute("DELETE FROM articles WHERE fetched_date < ?", (cutoff,))
|
|
deleted = conn.execute("SELECT changes()").fetchone()[0]
|
|
conn.commit()
|
|
return deleted
|
|
|
|
|
|
def get_recent_articles(conn: sqlite3.Connection, hours: int) -> list[dict]:
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
rows = conn.execute(
|
|
"SELECT * FROM articles WHERE fetched_date >= ? ORDER BY id", (cutoff,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_articles_by_ids(conn: sqlite3.Connection, ids: list[int]) -> list[dict]:
|
|
placeholders = ",".join("?" for _ in ids)
|
|
rows = conn.execute(
|
|
f"SELECT * FROM articles WHERE id IN ({placeholders}) ORDER BY id", ids
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def generate_summary(title: str, description: str | None, content: str | None, model: str, prompt: str) -> str | None:
|
|
try:
|
|
import ollama as ollama_lib
|
|
except ImportError:
|
|
logger.warning("ollama package not installed; skipping summary")
|
|
return None
|
|
|
|
body = content or description
|
|
article_text = f"Title: {title}"
|
|
if body:
|
|
article_text += f"\n\n{body}"
|
|
user_message = f"{prompt}\n\n{article_text}"
|
|
|
|
try:
|
|
response = ollama_lib.chat(
|
|
model=model,
|
|
messages=[{"role": "user", "content": user_message}],
|
|
)
|
|
return response["message"]["content"]
|
|
except Exception as e:
|
|
logger.warning("Ollama error for '%s': %s", title, e)
|
|
return None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="RSS News Digest")
|
|
parser.add_argument("-c", "--config", default="config.json", help="Config file path")
|
|
parser.add_argument("-d", "--database", default="news_digest.db", help="SQLite database path")
|
|
parser.add_argument("--hours", type=int, help="Override lookback hours")
|
|
parser.add_argument("-f", "--fields", default="id,title,url", help="Comma-separated output fields")
|
|
parser.add_argument("--digest", help="Article IDs to summarize (comma-separated, e.g. 1,3,7)")
|
|
parser.add_argument("--purge-only", action="store_true", help="Only purge old articles")
|
|
parser.add_argument("--no-fetch", action="store_true", help="Skip fetching feeds, only query stored articles")
|
|
parser.add_argument("-v", "--verbose", action="store_true", help="Debug logging to stderr")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.WARNING,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
stream=sys.stderr,
|
|
)
|
|
|
|
config_path = Path(args.config)
|
|
if not config_path.exists():
|
|
logger.error("Config file not found: %s", config_path)
|
|
sys.exit(1)
|
|
|
|
config = load_config(str(config_path))
|
|
settings = config.get("settings", {})
|
|
hours_lookback = args.hours or settings.get("hours_lookback", 24)
|
|
retention_days = settings.get("retention_days", 30)
|
|
|
|
conn = init_db(args.database)
|
|
|
|
# Digest mode — summarize specified articles, then exit
|
|
if args.digest:
|
|
ollama_cfg = config.get("ollama", {})
|
|
model = ollama_cfg.get("model", "qwen3")
|
|
prompt = ollama_cfg.get("prompt", "Summarize the following news article in 2-3 concise sentences:")
|
|
|
|
ids = [int(x.strip()) for x in args.digest.split(",")]
|
|
articles = get_articles_by_ids(conn, ids)
|
|
|
|
if not articles:
|
|
logger.warning("No articles found for IDs: %s", ids)
|
|
|
|
results = []
|
|
for article in articles:
|
|
logger.debug("Summarizing article %d: %s", article["id"], article["title"])
|
|
summary = generate_summary(article["title"], article.get("description"), article.get("content"), model, prompt)
|
|
results.append({
|
|
"id": article["id"],
|
|
"title": article["title"],
|
|
"url": article["url"],
|
|
"summary": summary,
|
|
})
|
|
|
|
print(json.dumps(results, ensure_ascii=False, indent=2))
|
|
conn.close()
|
|
return
|
|
|
|
# Purge old articles
|
|
deleted = purge_old_articles(conn, retention_days)
|
|
if deleted:
|
|
logger.info("Purged %d articles older than %d days", deleted, retention_days)
|
|
|
|
if args.purge_only:
|
|
logger.info("Purge-only mode; exiting")
|
|
conn.close()
|
|
return
|
|
|
|
# Fetch feeds
|
|
if not args.no_fetch:
|
|
feeds = config.get("feeds", [])
|
|
total_new = 0
|
|
|
|
for feed_cfg in feeds:
|
|
if not feed_cfg.get("enabled", True):
|
|
logger.debug("Skipping disabled feed: %s", feed_cfg.get("name"))
|
|
continue
|
|
|
|
url = feed_cfg["url"]
|
|
logger.debug("Fetching feed: %s (%s)", feed_cfg.get("name", url), url)
|
|
entries = fetch_feed(url)
|
|
logger.debug("Got %d entries from %s", len(entries), feed_cfg.get("name", url))
|
|
|
|
articles = []
|
|
for entry in entries:
|
|
pub_date = parse_article_date(entry)
|
|
if not is_within_lookback(pub_date, hours_lookback):
|
|
continue
|
|
|
|
link = entry.get("link", "")
|
|
if not link:
|
|
continue
|
|
|
|
articles.append({
|
|
"url": link,
|
|
"title": entry.get("title"),
|
|
"description": entry.get("summary"),
|
|
"published_date": pub_date.isoformat() if pub_date else None,
|
|
"feed_name": feed_cfg.get("name"),
|
|
"feed_url": url,
|
|
"category": feed_cfg.get("category"),
|
|
"author": entry.get("author"),
|
|
})
|
|
|
|
new_urls = save_articles(conn, articles)
|
|
total_new += len(new_urls)
|
|
logger.info("Feed '%s': %d new articles (of %d within lookback)",
|
|
feed_cfg.get("name", url), len(new_urls), len(articles))
|
|
|
|
# Fetch full content for newly inserted articles
|
|
for article_url in new_urls:
|
|
logger.debug("Fetching content: %s", article_url)
|
|
content = fetch_content(article_url)
|
|
if content:
|
|
conn.execute("UPDATE articles SET content = ? WHERE url = ?", (content, article_url))
|
|
logger.debug("Saved content (%d chars) for %s", len(content), article_url)
|
|
conn.commit()
|
|
|
|
logger.info("Total new articles saved: %d", total_new)
|
|
|
|
# Output recent articles
|
|
recent = get_recent_articles(conn, hours_lookback)
|
|
fields = [f.strip() for f in args.fields.split(",")]
|
|
output = [{k: article[k] for k in fields if k in article} for article in recent]
|
|
print(json.dumps(output, ensure_ascii=False, indent=2))
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|