#!/usr/bin/env python3 """ Decision Store - Manages decision history for learning-based email classification. This module persists every user and auto-made decision to a flat JSON file (data/decision_history.json). Past decisions serve as few-shot examples that are injected into the LLM prompt by classifier.py, enabling the system to learn from user behavior over time. Storage format: a JSON array of decision entries, each containing sender, recipient, subject, summary, action taken, and whether it was a user or auto decision. """ import json import re from datetime import datetime from pathlib import Path from collections import Counter # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" HISTORY_FILE = DATA_DIR / "decision_history.json" PENDING_FILE = DATA_DIR / "pending_emails.json" # Stop-words excluded from subject keyword matching to reduce noise. _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "your", "you"} # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _load_history(): """Load the full decision history list from disk.""" if not HISTORY_FILE.exists(): return [] with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) def _save_history(history): """Write the full decision history list to disk.""" DATA_DIR.mkdir(exist_ok=True) with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(history, f, indent=2, ensure_ascii=False) def _extract_domain(sender): """Extract the domain part from a sender string. Handles formats like: "Display Name " user@example.com """ match = re.search(r"[\w.+-]+@([\w.-]+)", sender) return match.group(1).lower() if match else "" def _extract_email_address(sender): """Extract the full email address from a sender string.""" match = re.search(r"([\w.+-]+@[\w.-]+)", sender) return match.group(1).lower() if match else sender.lower() # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def record_decision(email_data, action, source="user"): """Append a decision to the history file. Args: email_data: dict with keys: sender, recipient, subject, summary. action: one of "delete", "archive", "keep", "mark_read", or "label:". source: "user" (manual review) or "auto" (high-confidence). """ history = _load_history() entry = { "timestamp": datetime.now().isoformat(timespec="seconds"), "sender": email_data.get("sender", ""), "sender_domain": _extract_domain(email_data.get("sender", "")), "recipient": email_data.get("recipient", ""), "subject": email_data.get("subject", ""), "summary": email_data.get("summary", ""), "action": action, "source": source, } history.append(entry) _save_history(history) return entry def get_relevant_examples(email_data, n=10): """Find the N most relevant past decisions for a given email. Relevance is scored by three signals: - Exact sender domain match: +3 points - Recipient string match: +2 points - Subject keyword overlap: +1 point per shared word Only entries with score > 0 are considered. Results are returned sorted by descending relevance. """ history = _load_history() if not history: return [] target_domain = _extract_domain(email_data.get("sender", "")) target_recipient = email_data.get("recipient", "").lower() target_words = ( set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS ) scored = [] for entry in history: score = 0 # Signal 1: sender domain match if target_domain and entry.get("sender_domain", "") == target_domain: score += 3 # Signal 2: recipient substring match if target_recipient and target_recipient in entry.get("recipient", "").lower(): score += 2 # Signal 3: subject keyword overlap entry_words = ( set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS ) score += len(target_words & entry_words) if score > 0: scored.append((score, entry)) scored.sort(key=lambda x: x[0], reverse=True) return [entry for _, entry in scored[:n]] def get_sender_stats(sender_domain): """Get action distribution for a sender domain. Returns a dict like {"delete": 5, "keep": 2, "archive": 1}. """ history = _load_history() actions = Counter() for entry in history: if entry.get("sender_domain", "") == sender_domain: actions[entry["action"]] += 1 return dict(actions) def get_sender_history_count(sender_domain): """Count total past decisions for a sender domain. Used by the scan command to decide whether there is enough history to trust auto-actions for this sender. """ history = _load_history() return sum(1 for e in history if e.get("sender_domain", "") == sender_domain) def get_known_labels(): """Return the set of all label names used in past "label:" decisions. These are offered to the LLM so it can reuse existing labels rather than inventing new ones. """ history = _load_history() labels = set() for entry in history: action = entry.get("action", "") if action.startswith("label:"): labels.add(action[6:]) return labels def get_all_stats(): """Compute aggregate statistics across the full decision history. Returns a dict with keys: total, by_action, by_source, top_domains. Returns None if history is empty. """ history = _load_history() if not history: return None total = len(history) by_action = Counter(e["action"] for e in history) by_source = Counter(e["source"] for e in history) # Top 10 sender domains by decision count domain_counts = Counter(e.get("sender_domain", "") for e in history) top_domains = domain_counts.most_common(10) return { "total": total, "by_action": dict(by_action), "by_source": dict(by_source), "top_domains": top_domains, } # --------------------------------------------------------------------------- # Migration # --------------------------------------------------------------------------- def migrate_pending(): """One-time migration: import 'done' entries from pending_emails.json. Converts old-style action names ("archived" -> "archive", etc.) and records them as user decisions in the history file. Safe to run multiple times (will create duplicates though, so run once only). """ if not PENDING_FILE.exists(): print("No pending_emails.json found, nothing to migrate.") return 0 with open(PENDING_FILE, "r", encoding="utf-8") as f: pending = json.load(f) # Map old action names to new ones action_map = { "archived": "archive", "kept": "keep", "deleted": "delete", } migrated = 0 for msg_id, data in pending.items(): if data.get("status") != "done": continue old_action = data.get("action", "") action = action_map.get(old_action, old_action) if not action: continue email_data = { "sender": data.get("sender", ""), "recipient": data.get("recipient", ""), "subject": data.get("subject", ""), "summary": data.get("summary", ""), } record_decision(email_data, action, source="user") migrated += 1 print(f"Migrated {migrated} decisions from pending_emails.json") return migrated