#!/usr/bin/env python3 """ Decision Store - Manages decision history for learning-based email classification. This module persists every user and auto-made decision to a flat JSON file (data/decision_history.json). Past decisions serve as few-shot examples that are injected into the LLM prompt by classifier.py, enabling the system to learn from user behavior over time. Storage format: a JSON array of decision entries, each containing sender, recipient, subject, summary, action taken, and whether it was a user or auto decision. """ import json import re from datetime import datetime from pathlib import Path from collections import Counter # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" HISTORY_FILE = DATA_DIR / "decision_history.json" PENDING_FILE = DATA_DIR / "pending_emails.json" # Stop-words excluded from subject keyword matching to reduce noise. _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "your", "you"} # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _load_history(): """Load the full decision history list from disk.""" if not HISTORY_FILE.exists(): return [] with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) def _save_history(history): """Write the full decision history list to disk.""" DATA_DIR.mkdir(exist_ok=True) with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(history, f, indent=2, ensure_ascii=False) def _extract_domain(sender): """Extract the domain part from a sender string. Handles formats like: "Display Name " user@example.com """ match = re.search(r"[\w.+-]+@([\w.-]+)", sender) return match.group(1).lower() if match else "" def _extract_email_address(sender): """Extract the full email address from a sender string.""" match = re.search(r"([\w.+-]+@[\w.-]+)", sender) return match.group(1).lower() if match else sender.lower() # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def record_decision(email_data, action, source="user", tags=None): """Append a decision to the history file. Args: email_data: dict with keys: sender, recipient, subject, summary. action: one of "delete", "archive", "keep", "mark_read", or "label:". source: "user" (manual review) or "auto" (high-confidence). tags: list of category tags from the classifier taxonomy. """ history = _load_history() entry = { "timestamp": datetime.now().isoformat(timespec="seconds"), "sender": email_data.get("sender", ""), "sender_domain": _extract_domain(email_data.get("sender", "")), "recipient": email_data.get("recipient", ""), "subject": email_data.get("subject", ""), "summary": email_data.get("summary", ""), "action": action, "source": source, "tags": tags or [], } history.append(entry) _save_history(history) return entry def get_relevant_examples(email_data, n=10): """Find the N most relevant past decisions for a given email. Relevance is scored by two signals: - Exact sender email address match: +3 points - Subject keyword overlap: +1 point per shared word Only entries with score > 0 are considered. Results are returned sorted by descending relevance. """ history = _load_history() if not history: return [] target_email = _extract_email_address(email_data.get("sender", "")) target_words = ( set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS ) scored = [] for entry in history: score = 0 # Signal 1: sender email match if target_email and _extract_email_address(entry.get("sender", "")) == target_email: score += 3 # Signal 2: subject keyword overlap entry_words = ( set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS ) score += len(target_words & entry_words) if score > 0: scored.append((score, entry)) scored.sort(key=lambda x: x[0], reverse=True) return [entry for _, entry in scored[:n]] def get_sender_stats(sender_email): """Get action distribution for a sender email address. Returns a dict like {"delete": 5, "keep": 2, "archive": 1}. """ history = _load_history() actions = Counter() for entry in history: if _extract_email_address(entry.get("sender", "")) == sender_email: actions[entry["action"]] += 1 return dict(actions) def compute_confidence(sender_email, action, tags): """Compute confidence from decision history by matching email signatures. A "signature" is (sender_email, tags). Past decisions match if they have the same sender email AND at least 50% tag overlap with the current email. Confidence is based on two factors: 1. Agreement: what fraction of matching decisions chose the same action. 2. Match-count cap: limits confidence until enough history exists (1 match -> max 10%, 5 matches -> 50%, 10+ -> 100%). Returns an integer 0-100. """ history = _load_history() if not history or not tags: return 50 # Find past decisions with same sender and sufficient tag overlap matches = [] for entry in history: entry_email = _extract_email_address(entry.get("sender", "")) if entry_email != sender_email: continue entry_tags = entry.get("tags", []) if not entry_tags: continue shared = len(set(tags) & set(entry_tags)) min_len = min(len(tags), len(entry_tags)) if min_len > 0 and shared / min_len >= 0.5: matches.append(entry) if not matches: return 50 # Agreement: fraction of matches with the same action matching_action = sum(1 for m in matches if m["action"] == action) total = len(matches) agreement = round(matching_action / total * 100) # Cap by match count: each match adds 10% to the cap cap = min(total * 10, 100) return min(agreement, cap) def get_known_labels(): """Return the set of all label names used in past "label:" decisions. These are offered to the LLM so it can reuse existing labels rather than inventing new ones. """ history = _load_history() labels = set() for entry in history: action = entry.get("action", "") if action.startswith("label:"): labels.add(action[6:]) return labels def get_all_stats(): """Compute aggregate statistics across the full decision history. Returns a dict with keys: total, by_action, by_source, top_domains. Returns None if history is empty. """ history = _load_history() if not history: return None total = len(history) by_action = Counter(e["action"] for e in history) by_source = Counter(e["source"] for e in history) # Top 10 sender addresses by decision count sender_counts = Counter(_extract_email_address(e.get("sender", "")) for e in history) top_senders = sender_counts.most_common(10) return { "total": total, "by_action": dict(by_action), "by_source": dict(by_source), "top_senders": top_senders, }