Files
Yanxin Lu 71672b31ca email-processor: fix concurrency bugs and several other issues
- Add fcntl file locking around read-modify-write cycles on both
  decision_history.json and pending_emails.json to prevent data
  corruption from parallel processes
- Pass --page-size 500 to himalaya envelope list to avoid silently
  missing emails beyond the default first page
- Use ollama.Client(host=...) so the config.json host setting is
  actually respected
- Fall back to sender-only matching in compute_confidence when LLM
  returns no valid taxonomy tags, instead of always returning 50%
- Fix _format_address to return empty string instead of literal
  "None" or "[]" for missing address fields
2026-03-20 18:58:13 -07:00

275 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
Decision Store - Manages decision history for learning-based email classification.
This module persists every user and auto-made decision to a flat JSON file
(data/decision_history.json). Past decisions serve as few-shot examples
that are injected into the LLM prompt by classifier.py, enabling the
system to learn from user behavior over time.
Storage format: a JSON array of decision entries, each containing sender,
recipient, subject, summary, action taken, and whether it was a user or
auto decision.
"""
import fcntl
import json
import re
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from collections import Counter
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).parent
DATA_DIR = SCRIPT_DIR / "data"
HISTORY_FILE = DATA_DIR / "decision_history.json"
# Stop-words excluded from subject keyword matching to reduce noise.
_STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "your", "you"}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
@contextmanager
def file_lock(path):
"""Acquire an exclusive file lock for safe concurrent access."""
DATA_DIR.mkdir(exist_ok=True)
lock_path = str(path) + ".lock"
with open(lock_path, "w") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
def _load_history():
"""Load the full decision history list from disk."""
if not HISTORY_FILE.exists():
return []
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def _save_history(history):
"""Write the full decision history list to disk."""
DATA_DIR.mkdir(exist_ok=True)
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(history, f, indent=2, ensure_ascii=False)
def extract_email_address(sender):
"""Extract the full email address from a sender string."""
match = re.search(r"([\w.+-]+@[\w.-]+)", sender)
return match.group(1).lower() if match else sender.lower()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def record_decision(email_data, action, source="user", tags=None):
"""Append a decision to the history file.
Args:
email_data: dict with keys: sender, recipient, subject, summary.
action: one of "delete", "archive", "keep", "mark_read",
or "label:<name>".
source: "user" (manual review) or "auto" (high-confidence).
tags: list of category tags from the classifier taxonomy.
"""
entry = {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"sender": email_data.get("sender", ""),
"recipient": email_data.get("recipient", ""),
"subject": email_data.get("subject", ""),
"summary": email_data.get("summary", ""),
"action": action,
"source": source,
"tags": tags or [],
}
with file_lock(HISTORY_FILE):
history = _load_history()
history.append(entry)
_save_history(history)
return entry
def get_relevant_examples(email_data, n=5):
"""Find the N most relevant past decisions for a given email.
Relevance is scored by two signals:
- Exact sender email address match: +3 points
- Subject keyword overlap: +1 point per shared word
Only entries with score > 0 are considered. Results are returned
sorted by descending relevance.
"""
history = _load_history()
if not history:
return []
target_email = extract_email_address(email_data.get("sender", ""))
target_words = (
set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS
)
scored = []
for entry in history:
score = 0
# Signal 1: sender email match
if target_email and extract_email_address(entry.get("sender", "")) == target_email:
score += 3
# Signal 2: subject keyword overlap
entry_words = (
set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS
)
score += len(target_words & entry_words)
if score > 0:
scored.append((score, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [entry for _, entry in scored[:n]]
def get_sender_stats(sender_email):
"""Get action distribution for a sender email address.
Returns a dict like {"delete": 5, "keep": 2, "archive": 1}.
"""
history = _load_history()
actions = Counter()
for entry in history:
if extract_email_address(entry.get("sender", "")) == sender_email:
actions[entry["action"]] += 1
return dict(actions)
def compute_confidence(sender_email, action, tags):
"""Compute confidence from decision history by matching email signatures.
A "signature" is (sender_email, tags). Past decisions match if they have
the same sender email AND at least 50% tag overlap with the current email.
Confidence is based on two factors:
1. Agreement: what fraction of matching decisions chose the same action.
2. Match-count cap: limits confidence until enough history exists
(1 match -> max 10%, 5 matches -> 50%, 10+ -> 100%).
Returns an integer 0-100.
"""
history = _load_history()
if not history:
return 50
# Find past decisions with same sender and sufficient tag overlap.
# If tags are empty (LLM returned no valid taxonomy tags), fall back
# to sender-only matching so history still contributes to confidence.
matches = []
for entry in history:
entry_email = extract_email_address(entry.get("sender", ""))
if entry_email != sender_email:
continue
if tags:
entry_tags = entry.get("tags", [])
if not entry_tags:
continue
shared = len(set(tags) & set(entry_tags))
min_len = min(len(tags), len(entry_tags))
if min_len > 0 and shared / min_len >= 0.5:
matches.append(entry)
else:
# No tags to compare — match on sender alone
matches.append(entry)
if not matches:
return 50
# Agreement: fraction of matches with the same action
matching_action = sum(1 for m in matches if m["action"] == action)
total = len(matches)
agreement = round(matching_action / total * 100)
# Cap by match count: each match adds 10% to the cap
cap = min(total * 10, 100)
return min(agreement, cap)
def get_known_labels():
"""Return the set of all label names used in past "label:<name>" decisions.
These are offered to the LLM so it can reuse existing labels rather
than inventing new ones.
"""
history = _load_history()
labels = set()
for entry in history:
action = entry.get("action", "")
if action.startswith("label:"):
labels.add(action[6:])
return labels
def get_recent_decisions(days=1):
"""Return recent decisions grouped by action.
Args:
days: number of days to look back (default 1 = today).
Returns:
dict of action -> list of entries, e.g. {"delete": [...], "archive": [...]}.
Returns empty dict if no decisions found in the period.
"""
history = _load_history()
if not history:
return {}
cutoff = datetime.now() - timedelta(days=days)
grouped = {}
for entry in history:
try:
ts = datetime.fromisoformat(entry["timestamp"])
except (KeyError, ValueError):
continue
if ts >= cutoff:
action = entry.get("action", "unknown")
grouped.setdefault(action, []).append(entry)
return grouped
def get_all_stats():
"""Compute aggregate statistics across the full decision history.
Returns a dict with keys: total, by_action, by_source, top_senders.
Returns None if history is empty.
"""
history = _load_history()
if not history:
return None
total = len(history)
by_action = Counter(e["action"] for e in history)
by_source = Counter(e["source"] for e in history)
# Top 10 sender addresses by decision count
sender_counts = Counter(extract_email_address(e.get("sender", "")) for e in history)
top_senders = sender_counts.most_common(10)
return {
"total": total,
"by_action": dict(by_action),
"by_source": dict(by_source),
"top_senders": top_senders,
}