From 64e28b55d105e7233e2e742d941e83c39520ccb1 Mon Sep 17 00:00:00 2001 From: Yanxin Lu Date: Wed, 4 Mar 2026 14:23:50 -0800 Subject: [PATCH] Compute confidence from decision history instead of LLM --- scripts/email_processor/classifier.py | 67 +++++++++++------- scripts/email_processor/config.json | 3 +- scripts/email_processor/decision_store.py | 85 ++++++++++++++++------- scripts/email_processor/main.py | 62 ++++++++--------- 4 files changed, 133 insertions(+), 84 deletions(-) diff --git a/scripts/email_processor/classifier.py b/scripts/email_processor/classifier.py index 10cecef..41e58c8 100644 --- a/scripts/email_processor/classifier.py +++ b/scripts/email_processor/classifier.py @@ -5,7 +5,10 @@ Classifier - LLM-based email classification with learning. This module builds a rich prompt for the local Ollama model (Qwen3) that includes few-shot examples from past user decisions, per-sender statistics, and a list of known labels. The model returns a structured response with -an action, confidence score, summary, and reason. +an action, category tags, summary, and reason. + +Confidence is NOT produced by the LLM — it is computed externally from +decision history by decision_store.compute_confidence(). The prompt structure: 1. System instructions (action definitions) @@ -13,7 +16,7 @@ The prompt structure: 3. Sender statistics ("linkedin.com: deleted 8 times, kept 2 times") 4. Few-shot examples (top 5 most relevant past decisions) 5. The email to classify (subject, sender, recipient, body preview) - 6. Output format specification + 6. Output format specification (action, tags, summary, reason) """ import time @@ -24,6 +27,15 @@ import decision_store LOGS_DIR = Path(__file__).parent / "logs" +TAG_TAXONOMY = [ + "receipt", "invoice", "payment", "billing", + "shipping", "delivery", + "promotion", "discount", "marketing", "newsletter", + "notification", "security", "social", + "reminder", "confirmation", "update", "alert", + "personal", "account", "subscription", "travel", +] + def _build_prompt(email_data, config): """Assemble the full classification prompt with learning context. @@ -36,8 +48,8 @@ def _build_prompt(email_data, config): # Gather learning context from decision history examples = decision_store.get_relevant_examples(email_data, n=10) - sender_domain = decision_store._extract_domain(email_data.get("sender", "")) - sender_stats = decision_store.get_sender_stats(sender_domain) if sender_domain else {} + sender_email = decision_store._extract_email_address(email_data.get("sender", "")) + sender_stats = decision_store.get_sender_stats(sender_email) if sender_email else {} known_labels = decision_store.get_known_labels() # /no_think disables Qwen3's chain-of-thought, giving faster + shorter output @@ -63,7 +75,7 @@ def _build_prompt(email_data, config): stats_str = ", ".join( f"{action} {count} times" for action, count in sender_stats.items() ) - parts.append(f"\nHistory for {sender_domain}: {stats_str}\n") + parts.append(f"\nHistory for {sender_email}: {stats_str}\n") # Section 4: Few-shot examples (top 5 most relevant past decisions) if examples: @@ -86,10 +98,11 @@ def _build_prompt(email_data, config): ) # Section 6: Required output format + tags_list = ", ".join(TAG_TAXONOMY) parts.append( "Respond in this exact format (nothing else):\n" "Action: [delete|archive|keep|mark_read|label:]\n" - "Confidence: [0-100]\n" + f"Tags: [comma-separated tags from: {tags_list}]\n" "Summary: [one sentence summary of the email]\n" "Reason: [brief explanation for your classification]" ) @@ -97,18 +110,19 @@ def _build_prompt(email_data, config): return "\n".join(parts) -def _log_llm(prompt, output, email_data, action, confidence, duration): +def _log_llm(prompt, output, email_data, action, tags, duration): """Log the full LLM prompt and response to logs/llm_YYYY-MM-DD.log.""" LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"llm_{datetime.now().strftime('%Y-%m-%d')}.log" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") subject = email_data.get("subject", "(No Subject)")[:60] sender = email_data.get("sender", "(Unknown)")[:60] + tags_str = ", ".join(tags) with open(log_file, "a", encoding="utf-8") as f: f.write(f"{'=' * 70}\n") f.write(f"[{timestamp}] {subject}\n") - f.write(f"From: {sender} | Result: {action} @ {confidence}% | {duration:.1f}s\n") + f.write(f"From: {sender} | Result: {action} tags=[{tags_str}] | {duration:.1f}s\n") f.write(f"{'-' * 70}\n") f.write(f"PROMPT:\n{prompt}\n") f.write(f"{'-' * 70}\n") @@ -121,17 +135,19 @@ def _parse_response(output): Expected format (one per line): Action: delete - Confidence: 92 + Tags: promotion, marketing, newsletter Summary: Promotional offer from retailer Reason: Clearly a marketing email with discount offer - Falls back to safe defaults (keep, 50% confidence) on parse failure. + Falls back to safe defaults (keep, empty tags) on parse failure. """ action = "keep" - confidence = 50 + tags = [] summary = "No summary" reason = "Unknown" + valid_tags = set(TAG_TAXONOMY) + for line in output.strip().split("\n"): line = line.strip() if line.startswith("Action:"): @@ -139,25 +155,26 @@ def _parse_response(output): valid_actions = {"delete", "archive", "keep", "mark_read"} if raw_action in valid_actions or raw_action.startswith("label:"): action = raw_action - elif line.startswith("Confidence:"): - try: - confidence = int(line.replace("Confidence:", "").strip().rstrip("%")) - confidence = max(0, min(100, confidence)) # clamp to 0-100 - except ValueError: - confidence = 50 + elif line.startswith("Tags:"): + raw_tags = line.replace("Tags:", "").strip() + tags = [ + t.strip().lower() + for t in raw_tags.split(",") + if t.strip().lower() in valid_tags + ] elif line.startswith("Summary:"): summary = line.replace("Summary:", "").strip()[:200] elif line.startswith("Reason:"): reason = line.replace("Reason:", "").strip() - return action, confidence, summary, reason + return action, tags, summary, reason def classify_email(email_data, config): """Classify an email using the local LLM with few-shot learning context. Connects to Ollama, sends the assembled prompt, and parses the response. - On any error, falls back to "keep" with 0% confidence so the email + On any error, falls back to "keep" with empty tags so the email gets queued for manual review rather than auto-acted upon. Args: @@ -165,7 +182,7 @@ def classify_email(email_data, config): config: full config dict (needs ollama.model and rules.max_body_length). Returns: - Tuple of (action, confidence, summary, reason, duration_seconds). + Tuple of (action, tags, summary, reason, duration_seconds). """ import ollama @@ -177,15 +194,15 @@ def classify_email(email_data, config): # Low temperature for consistent classification response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1}) output = response["response"] - action, confidence, summary, reason = _parse_response(output) + action, tags, summary, reason = _parse_response(output) except Exception as e: - # On failure, default to "keep" with 0 confidence -> always queued + # On failure, default to "keep" with empty tags -> always queued output = f"ERROR: {e}" action = "keep" - confidence = 0 + tags = [] summary = "Classification failed" reason = f"error - {str(e)[:100]}" duration = time.time() - start_time - _log_llm(prompt, output, email_data, action, confidence, duration) - return action, confidence, summary, reason, duration + _log_llm(prompt, output, email_data, action, tags, duration) + return action, tags, summary, reason, duration diff --git a/scripts/email_processor/config.json b/scripts/email_processor/config.json index c24dfde..d258d74 100644 --- a/scripts/email_processor/config.json +++ b/scripts/email_processor/config.json @@ -8,7 +8,6 @@ "check_unseen_only": true }, "automation": { - "confidence_threshold": 75, - "bootstrap_min_decisions": 30 + "confidence_threshold": 85 } } diff --git a/scripts/email_processor/decision_store.py b/scripts/email_processor/decision_store.py index ac94ff4..4805350 100644 --- a/scripts/email_processor/decision_store.py +++ b/scripts/email_processor/decision_store.py @@ -71,7 +71,7 @@ def _extract_email_address(sender): # Public API # --------------------------------------------------------------------------- -def record_decision(email_data, action, source="user"): +def record_decision(email_data, action, source="user", tags=None): """Append a decision to the history file. Args: @@ -79,6 +79,7 @@ def record_decision(email_data, action, source="user"): action: one of "delete", "archive", "keep", "mark_read", or "label:". source: "user" (manual review) or "auto" (high-confidence). + tags: list of category tags from the classifier taxonomy. """ history = _load_history() entry = { @@ -90,6 +91,7 @@ def record_decision(email_data, action, source="user"): "summary": email_data.get("summary", ""), "action": action, "source": source, + "tags": tags or [], } history.append(entry) _save_history(history) @@ -99,10 +101,9 @@ def record_decision(email_data, action, source="user"): def get_relevant_examples(email_data, n=10): """Find the N most relevant past decisions for a given email. - Relevance is scored by three signals: - - Exact sender domain match: +3 points - - Recipient string match: +2 points - - Subject keyword overlap: +1 point per shared word + Relevance is scored by two signals: + - Exact sender email address match: +3 points + - Subject keyword overlap: +1 point per shared word Only entries with score > 0 are considered. Results are returned sorted by descending relevance. @@ -111,8 +112,7 @@ def get_relevant_examples(email_data, n=10): if not history: return [] - target_domain = _extract_domain(email_data.get("sender", "")) - target_recipient = email_data.get("recipient", "").lower() + target_email = _extract_email_address(email_data.get("sender", "")) target_words = ( set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS ) @@ -121,15 +121,11 @@ def get_relevant_examples(email_data, n=10): for entry in history: score = 0 - # Signal 1: sender domain match - if target_domain and entry.get("sender_domain", "") == target_domain: + # Signal 1: sender email match + if target_email and _extract_email_address(entry.get("sender", "")) == target_email: score += 3 - # Signal 2: recipient substring match - if target_recipient and target_recipient in entry.get("recipient", "").lower(): - score += 2 - - # Signal 3: subject keyword overlap + # Signal 2: subject keyword overlap entry_words = ( set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS ) @@ -142,27 +138,64 @@ def get_relevant_examples(email_data, n=10): return [entry for _, entry in scored[:n]] -def get_sender_stats(sender_domain): - """Get action distribution for a sender domain. +def get_sender_stats(sender_email): + """Get action distribution for a sender email address. Returns a dict like {"delete": 5, "keep": 2, "archive": 1}. """ history = _load_history() actions = Counter() for entry in history: - if entry.get("sender_domain", "") == sender_domain: + if _extract_email_address(entry.get("sender", "")) == sender_email: actions[entry["action"]] += 1 return dict(actions) -def get_sender_history_count(sender_domain): - """Count total past decisions for a sender domain. +def compute_confidence(sender_email, action, tags): + """Compute confidence from decision history by matching email signatures. - Used by the scan command to decide whether there is enough history - to trust auto-actions for this sender. + A "signature" is (sender_email, tags). Past decisions match if they have + the same sender email AND at least 50% tag overlap with the current email. + + Confidence is based on two factors: + 1. Agreement: what fraction of matching decisions chose the same action. + 2. Match-count cap: limits confidence until enough history exists + (1 match -> max 10%, 5 matches -> 50%, 10+ -> 100%). + + Returns an integer 0-100. """ history = _load_history() - return sum(1 for e in history if e.get("sender_domain", "") == sender_domain) + if not history or not tags: + return 50 + + # Find past decisions with same sender and sufficient tag overlap + matches = [] + for entry in history: + entry_email = _extract_email_address(entry.get("sender", "")) + if entry_email != sender_email: + continue + + entry_tags = entry.get("tags", []) + if not entry_tags: + continue + + shared = len(set(tags) & set(entry_tags)) + min_len = min(len(tags), len(entry_tags)) + if min_len > 0 and shared / min_len >= 0.5: + matches.append(entry) + + if not matches: + return 50 + + # Agreement: fraction of matches with the same action + matching_action = sum(1 for m in matches if m["action"] == action) + total = len(matches) + agreement = round(matching_action / total * 100) + + # Cap by match count: each match adds 10% to the cap + cap = min(total * 10, 100) + + return min(agreement, cap) def get_known_labels(): @@ -194,13 +227,13 @@ def get_all_stats(): by_action = Counter(e["action"] for e in history) by_source = Counter(e["source"] for e in history) - # Top 10 sender domains by decision count - domain_counts = Counter(e.get("sender_domain", "") for e in history) - top_domains = domain_counts.most_common(10) + # Top 10 sender addresses by decision count + sender_counts = Counter(_extract_email_address(e.get("sender", "")) for e in history) + top_senders = sender_counts.most_common(10) return { "total": total, "by_action": dict(by_action), "by_source": dict(by_source), - "top_domains": top_domains, + "top_senders": top_senders, } diff --git a/scripts/email_processor/main.py b/scripts/email_processor/main.py index 7d5630a..30fa977 100644 --- a/scripts/email_processor/main.py +++ b/scripts/email_processor/main.py @@ -232,11 +232,11 @@ def save_pending(pending): json.dump(pending, f, indent=2, ensure_ascii=False) -def add_to_pending(email_data, summary, reason, action_suggestion, confidence): +def add_to_pending(email_data, summary, reason, action_suggestion, confidence, tags=None): """Add an email to the pending queue for manual review. - Stores the classifier's suggestion and confidence alongside the - email metadata so the user can see what the model thought. + Stores the classifier's suggestion, computed confidence, and tags + alongside the email metadata so the user can see what the model thought. """ pending = load_pending() @@ -254,6 +254,7 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence): "reason": reason, "suggested_action": action_suggestion, "confidence": confidence, + "tags": tags or [], "email_date": email_data.get("date", ""), "status": "pending", "found_at": datetime.now().isoformat(), @@ -283,10 +284,10 @@ def log_result(log_file, email_data, action, detail, duration=None): def cmd_scan(config, recent=None, dry_run=False): """Fetch emails, classify each one, then auto-act or queue. - Auto-action is based on a single confidence threshold. When the - decision history has fewer than 20 entries, a higher threshold (95%) - is used to be conservative during the learning phase. Once enough - history accumulates, the configured threshold takes over. + Confidence is computed from decision history by matching the email's + signature (sender_email, tags) against past decisions. New/unknown + senders start at 50% (queued). Confidence grows as consistent history + accumulates. Args: config: full config dict. @@ -302,17 +303,7 @@ def cmd_scan(config, recent=None, dry_run=False): # Load automation threshold automation = config.get("automation", {}) - configured_threshold = automation.get("confidence_threshold", 75) - - # Adaptive threshold: be conservative when history is thin - stats = decision_store.get_all_stats() - total_decisions = stats["total"] if stats else 0 - bootstrap_min = automation.get("bootstrap_min_decisions", 20) - if total_decisions < bootstrap_min: - confidence_threshold = 95 - print(f"Learning phase ({total_decisions}/{bootstrap_min} decisions) — threshold: 95%\n") - else: - confidence_threshold = configured_threshold + confidence_threshold = automation.get("confidence_threshold", 75) # Fetch envelopes via himalaya if recent: @@ -354,12 +345,18 @@ def cmd_scan(config, recent=None, dry_run=False): email_data = build_email_data(envelope, body, config) print(f"{email_data['subject'][:55]}") - # Run the LLM classifier (includes few-shot examples from history) - action, confidence, summary, reason, duration = classifier.classify_email( + # Run the LLM classifier (returns tags instead of confidence) + action, tags, summary, reason, duration = classifier.classify_email( email_data, config ) + # Compute confidence from decision history + sender_email = decision_store._extract_email_address(email_data.get("sender", "")) + confidence = decision_store.compute_confidence(sender_email, action, tags) + + tags_str = ", ".join(tags) if tags else "(none)" print(f" -> {action} (confidence: {confidence}%, {duration:.1f}s)") + print(f" tags: [{tags_str}]") print(f" {reason[:80]}") # Auto-act if confidence meets threshold @@ -379,7 +376,7 @@ def cmd_scan(config, recent=None, dry_run=False): success = execute_action(eid, action) if success: decision_store.record_decision( - {**email_data, "summary": summary}, action, source="auto" + {**email_data, "summary": summary}, action, source="auto", tags=tags ) log_result(log_file, email_data, f"AUTO:{action}", reason, duration) print(f" ** AUTO-executed: {action}") @@ -388,11 +385,11 @@ def cmd_scan(config, recent=None, dry_run=False): # Himalaya action failed — fall back to queuing log_result(log_file, email_data, "AUTO_FAILED", reason, duration) print(f" !! Auto-action failed, queuing instead") - add_to_pending(email_data, summary, reason, action, confidence) + add_to_pending(email_data, summary, reason, action, confidence, tags) queued += 1 else: # Not enough confidence or history — queue for manual review - add_to_pending(email_data, summary, reason, action, confidence) + add_to_pending(email_data, summary, reason, action, confidence, tags) log_result(log_file, email_data, f"QUEUED:{action}@{confidence}%", reason, duration) print(f" -> Queued (confidence {confidence}% < {confidence_threshold}%)") queued += 1 @@ -440,11 +437,14 @@ def cmd_review_list(): for i, (msg_id, data) in enumerate(sorted_items, 1): suggested = data.get("suggested_action", "?") conf = data.get("confidence", "?") + tags = data.get("tags", []) + tags_str = ", ".join(tags) if tags else "(none)" print(f"\n {i}. [{msg_id}]") print(f" Subject: {data.get('subject', 'N/A')[:55]}") print(f" From: {data.get('sender', 'N/A')[:55]}") print(f" To: {data.get('recipient', 'N/A')[:40]}") print(f" Summary: {data.get('summary', 'N/A')[:70]}") + print(f" Tags: [{tags_str}]") print(f" Suggested: {suggested} ({conf}% confidence)") print(f"\n{'=' * 60}") @@ -496,7 +496,7 @@ def cmd_review_act(selector, action): success = execute_action(eid, action) if success: # Record decision for future learning - decision_store.record_decision(data, action, source="user") + decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) # Mark as done in pending queue pending = load_pending() @@ -540,7 +540,7 @@ def cmd_review_accept(): success = execute_action(eid, action) if success: - decision_store.record_decision(data, action, source="user") + decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) pending = load_pending() pending[msg_id]["status"] = "done" @@ -616,14 +616,14 @@ def cmd_stats(): for action, count in sorted(stats["by_action"].items(), key=lambda x: -x[1]): print(f" {action}: {count}") - # Top sender domains with per-domain action counts - print(f"\nTop sender domains:") - for domain, count in stats["top_domains"]: - domain_stats = decision_store.get_sender_stats(domain) + # Top sender addresses with per-sender action counts + print(f"\nTop senders:") + for sender, count in stats["top_senders"]: + sender_stats = decision_store.get_sender_stats(sender) detail = ", ".join( - f"{a}:{c}" for a, c in sorted(domain_stats.items(), key=lambda x: -x[1]) + f"{a}:{c}" for a, c in sorted(sender_stats.items(), key=lambda x: -x[1]) ) - print(f" {domain}: {count} ({detail})") + print(f" {sender}: {count} ({detail})") # Custom labels labels = decision_store.get_known_labels()