- Add fcntl file locking around read-modify-write cycles on both decision_history.json and pending_emails.json to prevent data corruption from parallel processes - Pass --page-size 500 to himalaya envelope list to avoid silently missing emails beyond the default first page - Use ollama.Client(host=...) so the config.json host setting is actually respected - Fall back to sender-only matching in compute_confidence when LLM returns no valid taxonomy tags, instead of always returning 50% - Fix _format_address to return empty string instead of literal "None" or "[]" for missing address fields
275 lines
8.7 KiB
Python
275 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Decision Store - Manages decision history for learning-based email classification.
|
|
|
|
This module persists every user and auto-made decision to a flat JSON file
|
|
(data/decision_history.json). Past decisions serve as few-shot examples
|
|
that are injected into the LLM prompt by classifier.py, enabling the
|
|
system to learn from user behavior over time.
|
|
|
|
Storage format: a JSON array of decision entries, each containing sender,
|
|
recipient, subject, summary, action taken, and whether it was a user or
|
|
auto decision.
|
|
"""
|
|
|
|
import fcntl
|
|
import json
|
|
import re
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from collections import Counter
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paths
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
DATA_DIR = SCRIPT_DIR / "data"
|
|
HISTORY_FILE = DATA_DIR / "decision_history.json"
|
|
|
|
# Stop-words excluded from subject keyword matching to reduce noise.
|
|
_STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "your", "you"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@contextmanager
|
|
def file_lock(path):
|
|
"""Acquire an exclusive file lock for safe concurrent access."""
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
lock_path = str(path) + ".lock"
|
|
with open(lock_path, "w") as lock_file:
|
|
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
|
try:
|
|
yield
|
|
finally:
|
|
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
|
|
|
|
|
def _load_history():
|
|
"""Load the full decision history list from disk."""
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def _save_history(history):
|
|
"""Write the full decision history list to disk."""
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(history, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def extract_email_address(sender):
|
|
"""Extract the full email address from a sender string."""
|
|
match = re.search(r"([\w.+-]+@[\w.-]+)", sender)
|
|
return match.group(1).lower() if match else sender.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def record_decision(email_data, action, source="user", tags=None):
|
|
"""Append a decision to the history file.
|
|
|
|
Args:
|
|
email_data: dict with keys: sender, recipient, subject, summary.
|
|
action: one of "delete", "archive", "keep", "mark_read",
|
|
or "label:<name>".
|
|
source: "user" (manual review) or "auto" (high-confidence).
|
|
tags: list of category tags from the classifier taxonomy.
|
|
"""
|
|
entry = {
|
|
"timestamp": datetime.now().isoformat(timespec="seconds"),
|
|
"sender": email_data.get("sender", ""),
|
|
"recipient": email_data.get("recipient", ""),
|
|
"subject": email_data.get("subject", ""),
|
|
"summary": email_data.get("summary", ""),
|
|
"action": action,
|
|
"source": source,
|
|
"tags": tags or [],
|
|
}
|
|
with file_lock(HISTORY_FILE):
|
|
history = _load_history()
|
|
history.append(entry)
|
|
_save_history(history)
|
|
return entry
|
|
|
|
|
|
def get_relevant_examples(email_data, n=5):
|
|
"""Find the N most relevant past decisions for a given email.
|
|
|
|
Relevance is scored by two signals:
|
|
- Exact sender email address match: +3 points
|
|
- Subject keyword overlap: +1 point per shared word
|
|
|
|
Only entries with score > 0 are considered. Results are returned
|
|
sorted by descending relevance.
|
|
"""
|
|
history = _load_history()
|
|
if not history:
|
|
return []
|
|
|
|
target_email = extract_email_address(email_data.get("sender", ""))
|
|
target_words = (
|
|
set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS
|
|
)
|
|
|
|
scored = []
|
|
for entry in history:
|
|
score = 0
|
|
|
|
# Signal 1: sender email match
|
|
if target_email and extract_email_address(entry.get("sender", "")) == target_email:
|
|
score += 3
|
|
|
|
# Signal 2: subject keyword overlap
|
|
entry_words = (
|
|
set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS
|
|
)
|
|
score += len(target_words & entry_words)
|
|
|
|
if score > 0:
|
|
scored.append((score, entry))
|
|
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
return [entry for _, entry in scored[:n]]
|
|
|
|
|
|
def get_sender_stats(sender_email):
|
|
"""Get action distribution for a sender email address.
|
|
|
|
Returns a dict like {"delete": 5, "keep": 2, "archive": 1}.
|
|
"""
|
|
history = _load_history()
|
|
actions = Counter()
|
|
for entry in history:
|
|
if extract_email_address(entry.get("sender", "")) == sender_email:
|
|
actions[entry["action"]] += 1
|
|
return dict(actions)
|
|
|
|
|
|
def compute_confidence(sender_email, action, tags):
|
|
"""Compute confidence from decision history by matching email signatures.
|
|
|
|
A "signature" is (sender_email, tags). Past decisions match if they have
|
|
the same sender email AND at least 50% tag overlap with the current email.
|
|
|
|
Confidence is based on two factors:
|
|
1. Agreement: what fraction of matching decisions chose the same action.
|
|
2. Match-count cap: limits confidence until enough history exists
|
|
(1 match -> max 10%, 5 matches -> 50%, 10+ -> 100%).
|
|
|
|
Returns an integer 0-100.
|
|
"""
|
|
history = _load_history()
|
|
if not history:
|
|
return 50
|
|
|
|
# Find past decisions with same sender and sufficient tag overlap.
|
|
# If tags are empty (LLM returned no valid taxonomy tags), fall back
|
|
# to sender-only matching so history still contributes to confidence.
|
|
matches = []
|
|
for entry in history:
|
|
entry_email = extract_email_address(entry.get("sender", ""))
|
|
if entry_email != sender_email:
|
|
continue
|
|
|
|
if tags:
|
|
entry_tags = entry.get("tags", [])
|
|
if not entry_tags:
|
|
continue
|
|
shared = len(set(tags) & set(entry_tags))
|
|
min_len = min(len(tags), len(entry_tags))
|
|
if min_len > 0 and shared / min_len >= 0.5:
|
|
matches.append(entry)
|
|
else:
|
|
# No tags to compare — match on sender alone
|
|
matches.append(entry)
|
|
|
|
if not matches:
|
|
return 50
|
|
|
|
# Agreement: fraction of matches with the same action
|
|
matching_action = sum(1 for m in matches if m["action"] == action)
|
|
total = len(matches)
|
|
agreement = round(matching_action / total * 100)
|
|
|
|
# Cap by match count: each match adds 10% to the cap
|
|
cap = min(total * 10, 100)
|
|
|
|
return min(agreement, cap)
|
|
|
|
|
|
def get_known_labels():
|
|
"""Return the set of all label names used in past "label:<name>" decisions.
|
|
|
|
These are offered to the LLM so it can reuse existing labels rather
|
|
than inventing new ones.
|
|
"""
|
|
history = _load_history()
|
|
labels = set()
|
|
for entry in history:
|
|
action = entry.get("action", "")
|
|
if action.startswith("label:"):
|
|
labels.add(action[6:])
|
|
return labels
|
|
|
|
|
|
def get_recent_decisions(days=1):
|
|
"""Return recent decisions grouped by action.
|
|
|
|
Args:
|
|
days: number of days to look back (default 1 = today).
|
|
|
|
Returns:
|
|
dict of action -> list of entries, e.g. {"delete": [...], "archive": [...]}.
|
|
Returns empty dict if no decisions found in the period.
|
|
"""
|
|
history = _load_history()
|
|
if not history:
|
|
return {}
|
|
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
grouped = {}
|
|
for entry in history:
|
|
try:
|
|
ts = datetime.fromisoformat(entry["timestamp"])
|
|
except (KeyError, ValueError):
|
|
continue
|
|
if ts >= cutoff:
|
|
action = entry.get("action", "unknown")
|
|
grouped.setdefault(action, []).append(entry)
|
|
return grouped
|
|
|
|
|
|
def get_all_stats():
|
|
"""Compute aggregate statistics across the full decision history.
|
|
|
|
Returns a dict with keys: total, by_action, by_source, top_senders.
|
|
Returns None if history is empty.
|
|
"""
|
|
history = _load_history()
|
|
if not history:
|
|
return None
|
|
|
|
total = len(history)
|
|
by_action = Counter(e["action"] for e in history)
|
|
by_source = Counter(e["source"] for e in history)
|
|
|
|
# Top 10 sender addresses by decision count
|
|
sender_counts = Counter(extract_email_address(e.get("sender", "")) for e in history)
|
|
top_senders = sender_counts.most_common(10)
|
|
|
|
return {
|
|
"total": total,
|
|
"by_action": dict(by_action),
|
|
"by_source": dict(by_source),
|
|
"top_senders": top_senders,
|
|
}
|