diff --git a/scripts/email_processor/README.md b/scripts/email_processor/README.md index 3f21dc8..f10f1f7 100644 --- a/scripts/email_processor/README.md +++ b/scripts/email_processor/README.md @@ -35,11 +35,11 @@ The system separates **classification** (what the LLM does) from **confidence** ``` 1. [msg_f1d43ea3] Subject: New jobs matching your profile From: LinkedIn - Tags: [promotion, social, notification] + Tags: [promotion, social, newsletter] Suggested: delete (50%) 2. [msg_60c56a87] Subject: Your order shipped From: Amazon - Tags: [shipping, confirmation, notification] + Tags: [shipping, confirmation, receipt] Suggested: archive (50%) ``` @@ -147,16 +147,16 @@ Example: sender `noreply@example.com` has 8 entries with action `delete` and 4 e Look at the subject lines, summaries, and current tags of the entries that got different actions. Identify the pattern — what makes the "delete" emails different from the "keep" emails? Example: -- Deleted emails: subjects like "50% off sale", "Weekly deals" → tags: `[promotion, notification, newsletter]` -- Kept emails: subjects like "Your password was changed", "New login from Chrome" → tags: `[security, notification, update]` +- Deleted emails: subjects like "50% off sale", "Weekly deals" → tags: `[promotion, account, newsletter]` +- Kept emails: subjects like "Your password was changed", "New login from Chrome" → tags: `[security, account, alert]` -The shared tag `notification` is causing these to match as the same signature, dragging confidence down. +The shared tag `account` is causing these to match as the same signature, dragging confidence down. **Step 3: Determine if a new tag would fix it.** -Ask: is there a category that applies to one group but not the other? In the example above, an `account` tag would distinguish password/login emails from promotional emails. Check if the tag already exists in `TAG_TAXONOMY` in `classifier.py` — it might just be that the LLM isn't using it consistently. +Ask: is there a category that applies to one group but not the other? In the example above, the LLM is assigning `account` to both promotional and security emails from the same service. Check if the problem is LLM consistency (the tag exists but the model uses it too broadly) or a missing tag (no existing tag can distinguish the two types). -If the tag already exists: the problem is LLM consistency, not the taxonomy. Consider adjusting the prompt or few-shot examples. +If the tag exists but is overused: the problem is LLM consistency, not the taxonomy. Consider adjusting the prompt or few-shot examples. If the tag doesn't exist: propose a new tag. @@ -169,7 +169,7 @@ Before adding, check that the new tag: **Step 5: Add the tag to `TAG_TAXONOMY` in `classifier.py`.** -Add the new tag to the `TAG_TAXONOMY` list in `classifier.py:30-38`. Keep the list organized by category. The LLM prompt automatically picks up the updated list on the next scan. +Add the new tag to the `TAG_TAXONOMY` list in `classifier.py:30-37`. Keep the list organized by category. The LLM prompt automatically picks up the updated list on the next scan. **Step 6: Decide whether to wipe history.** @@ -191,7 +191,7 @@ Check the logs for the affected senders: - **Only add tags, never rename.** Renaming `billing` to `finance` means old entries with `billing` never match new entries with `finance`. If you must rename, keep both in the taxonomy. - **Avoid deleting tags.** Old entries with deleted tags become slightly less useful (fewer matching tags) but don't cause incorrect matches. Only delete a tag if it's actively causing confusion (e.g., the LLM uses it inconsistently and it's hurting overlap calculations). -- **Keep the taxonomy small.** More tags = more choices for the LLM = more inconsistency. The taxonomy should have the minimum number of tags needed to distinguish email types that deserve different actions. 20-30 tags is a reasonable range. +- **Keep the taxonomy small.** More tags = more choices for the LLM = more inconsistency. The taxonomy should have the minimum number of tags needed to distinguish email types that deserve different actions. 10-20 tags is a reasonable range. ## Configuration @@ -298,7 +298,7 @@ The top 5 most relevant examples are injected into the prompt as few-shot demons ### Fixed tag taxonomy -Tags are defined in `classifier.py` as `TAG_TAXONOMY` — a manually curated list of 21 categories. The LLM must pick from this list (invalid tags are silently dropped). The taxonomy should stay fixed to keep history matching stable. See "Refining the Tag Taxonomy" above for when and how to update it. +Tags are defined in `classifier.py` as `TAG_TAXONOMY` — a manually curated list of 14 categories. The LLM must pick from this list (invalid tags are silently dropped). The taxonomy should stay fixed to keep history matching stable. See "Refining the Tag Taxonomy" above for when and how to update it. ### `keep` means unread diff --git a/scripts/email_processor/classifier.py b/scripts/email_processor/classifier.py index d9de378..add125f 100644 --- a/scripts/email_processor/classifier.py +++ b/scripts/email_processor/classifier.py @@ -47,8 +47,8 @@ def _build_prompt(email_data, config): max_body = config.get("rules", {}).get("max_body_length", 1000) # Gather learning context from decision history - examples = decision_store.get_relevant_examples(email_data, n=10) - sender_email = decision_store._extract_email_address(email_data.get("sender", "")) + examples = decision_store.get_relevant_examples(email_data, n=5) + sender_email = decision_store.extract_email_address(email_data.get("sender", "")) sender_stats = decision_store.get_sender_stats(sender_email) if sender_email else {} known_labels = decision_store.get_known_labels() @@ -80,7 +80,7 @@ def _build_prompt(email_data, config): # Section 4: Few-shot examples (top 5 most relevant past decisions) if examples: parts.append("\n--- Past decisions (learn from these) ---") - for ex in examples[:5]: + for ex in examples: parts.append( f"From: {ex['sender'][:60]} | To: {ex['recipient'][:40]} | " f"Subject: {ex['subject'][:60]} -> {ex['action']}" @@ -135,9 +135,9 @@ def _parse_response(output): Expected format (one per line): Action: delete - Tags: promotion, marketing, newsletter + Tags: promotion, newsletter, social Summary: Promotional offer from retailer - Reason: Clearly a marketing email with discount offer + Reason: Clearly a promotional email with discount offer Falls back to safe defaults (keep, empty tags) on parse failure. """ diff --git a/scripts/email_processor/config.json b/scripts/email_processor/config.json index d258d74..4dc3391 100644 --- a/scripts/email_processor/config.json +++ b/scripts/email_processor/config.json @@ -4,8 +4,7 @@ "model": "kamekichi128/qwen3-4b-instruct-2507:latest" }, "rules": { - "max_body_length": 1000, - "check_unseen_only": true + "max_body_length": 1000 }, "automation": { "confidence_threshold": 85 diff --git a/scripts/email_processor/decision_store.py b/scripts/email_processor/decision_store.py index 4805350..2542373 100644 --- a/scripts/email_processor/decision_store.py +++ b/scripts/email_processor/decision_store.py @@ -25,7 +25,6 @@ from collections import Counter SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" HISTORY_FILE = DATA_DIR / "decision_history.json" -PENDING_FILE = DATA_DIR / "pending_emails.json" # Stop-words excluded from subject keyword matching to reduce noise. _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "your", "you"} @@ -50,18 +49,7 @@ def _save_history(history): json.dump(history, f, indent=2, ensure_ascii=False) -def _extract_domain(sender): - """Extract the domain part from a sender string. - - Handles formats like: - "Display Name " - user@example.com - """ - match = re.search(r"[\w.+-]+@([\w.-]+)", sender) - return match.group(1).lower() if match else "" - - -def _extract_email_address(sender): +def extract_email_address(sender): """Extract the full email address from a sender string.""" match = re.search(r"([\w.+-]+@[\w.-]+)", sender) return match.group(1).lower() if match else sender.lower() @@ -85,7 +73,6 @@ def record_decision(email_data, action, source="user", tags=None): entry = { "timestamp": datetime.now().isoformat(timespec="seconds"), "sender": email_data.get("sender", ""), - "sender_domain": _extract_domain(email_data.get("sender", "")), "recipient": email_data.get("recipient", ""), "subject": email_data.get("subject", ""), "summary": email_data.get("summary", ""), @@ -98,7 +85,7 @@ def record_decision(email_data, action, source="user", tags=None): return entry -def get_relevant_examples(email_data, n=10): +def get_relevant_examples(email_data, n=5): """Find the N most relevant past decisions for a given email. Relevance is scored by two signals: @@ -112,7 +99,7 @@ def get_relevant_examples(email_data, n=10): if not history: return [] - target_email = _extract_email_address(email_data.get("sender", "")) + target_email = extract_email_address(email_data.get("sender", "")) target_words = ( set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS ) @@ -122,7 +109,7 @@ def get_relevant_examples(email_data, n=10): score = 0 # Signal 1: sender email match - if target_email and _extract_email_address(entry.get("sender", "")) == target_email: + if target_email and extract_email_address(entry.get("sender", "")) == target_email: score += 3 # Signal 2: subject keyword overlap @@ -146,7 +133,7 @@ def get_sender_stats(sender_email): history = _load_history() actions = Counter() for entry in history: - if _extract_email_address(entry.get("sender", "")) == sender_email: + if extract_email_address(entry.get("sender", "")) == sender_email: actions[entry["action"]] += 1 return dict(actions) @@ -171,7 +158,7 @@ def compute_confidence(sender_email, action, tags): # Find past decisions with same sender and sufficient tag overlap matches = [] for entry in history: - entry_email = _extract_email_address(entry.get("sender", "")) + entry_email = extract_email_address(entry.get("sender", "")) if entry_email != sender_email: continue @@ -216,7 +203,7 @@ def get_known_labels(): def get_all_stats(): """Compute aggregate statistics across the full decision history. - Returns a dict with keys: total, by_action, by_source, top_domains. + Returns a dict with keys: total, by_action, by_source, top_senders. Returns None if history is empty. """ history = _load_history() @@ -228,7 +215,7 @@ def get_all_stats(): by_source = Counter(e["source"] for e in history) # Top 10 sender addresses by decision count - sender_counts = Counter(_extract_email_address(e.get("sender", "")) for e in history) + sender_counts = Counter(extract_email_address(e.get("sender", "")) for e in history) top_senders = sender_counts.most_common(10) return { diff --git a/scripts/email_processor/main.py b/scripts/email_processor/main.py index afaf45d..596922f 100644 --- a/scripts/email_processor/main.py +++ b/scripts/email_processor/main.py @@ -121,6 +121,20 @@ def read_message(envelope_id): return _himalaya("message", "read", "--preview", "--no-headers", str(envelope_id)) +def _format_address(addr_field): + """Format a himalaya address field (dict, list, or string) into a display string.""" + if isinstance(addr_field, dict): + name = addr_field.get("name", "") + addr = addr_field.get("addr", "") + return f"{name} <{addr}>" if name else addr + elif isinstance(addr_field, list) and addr_field: + first = addr_field[0] + name = first.get("name", "") + addr = first.get("addr", "") + return f"{name} <{addr}>" if name else addr + return str(addr_field) + + def build_email_data(envelope, body, config): """Build the email_data dict expected by classifier and decision_store. @@ -129,40 +143,11 @@ def build_email_data(envelope, body, config): """ max_body = config.get("rules", {}).get("max_body_length", 1000) - # himalaya envelope JSON uses "from" as a nested object or string - sender = envelope.get("from", {}) - if isinstance(sender, dict): - # Format: {"name": "Display Name", "addr": "user@example.com"} - name = sender.get("name", "") - addr = sender.get("addr", "") - sender_str = f"{name} <{addr}>" if name else addr - elif isinstance(sender, list) and sender: - first = sender[0] - name = first.get("name", "") - addr = first.get("addr", "") - sender_str = f"{name} <{addr}>" if name else addr - else: - sender_str = str(sender) - - # Same for "to" - to = envelope.get("to", {}) - if isinstance(to, dict): - name = to.get("name", "") - addr = to.get("addr", "") - to_str = f"{name} <{addr}>" if name else addr - elif isinstance(to, list) and to: - first = to[0] - name = first.get("name", "") - addr = first.get("addr", "") - to_str = f"{name} <{addr}>" if name else addr - else: - to_str = str(to) - return { "id": str(envelope.get("id", "")), "subject": envelope.get("subject", "(No Subject)"), - "sender": sender_str, - "recipient": to_str, + "sender": _format_address(envelope.get("from", {})), + "recipient": _format_address(envelope.get("to", {})), "date": envelope.get("date", ""), "body": body[:max_body], } @@ -322,7 +307,7 @@ def cmd_scan(config, recent=None, dry_run=False): # Load automation threshold automation = config.get("automation", {}) - confidence_threshold = automation.get("confidence_threshold", 75) + confidence_threshold = automation.get("confidence_threshold", 85) # Fetch envelopes via himalaya if recent: @@ -340,9 +325,8 @@ def cmd_scan(config, recent=None, dry_run=False): queued = 0 skipped = 0 - # Load pending queue once to skip already-queued emails - pending = load_pending() - pending_eids = {v.get("envelope_id") for v in pending.values() if v.get("status") == "pending"} + # Reuse the cleared pending dict from above to skip already-queued emails + pending_eids = {v.get("envelope_id") for v in cleared.values() if v.get("status") == "pending"} for envelope in envelopes: eid = envelope.get("id", "?") @@ -353,6 +337,9 @@ def cmd_scan(config, recent=None, dry_run=False): skipped += 1 continue + # Track this eid so duplicates within the same envelope list are caught + pending_eids.add(str(eid)) + print(f"[{eid}] ", end="", flush=True) # Read message body without marking as seen @@ -370,7 +357,7 @@ def cmd_scan(config, recent=None, dry_run=False): ) # Compute confidence from decision history - sender_email = decision_store._extract_email_address(email_data.get("sender", "")) + sender_email = decision_store.extract_email_address(email_data.get("sender", "")) confidence = decision_store.compute_confidence(sender_email, action, tags) tags_str = ", ".join(tags) if tags else "(none)" @@ -479,7 +466,7 @@ def cmd_review_act(selector, action): """Execute an action on one or more pending emails. Args: - selector: a 1-based number, a msg_id string, or "all". + selector: a scan_index number, a msg_id string, or "all". action: one of delete/archive/keep/mark_read/label:. """ # Validate action @@ -507,8 +494,11 @@ def cmd_review_act(selector, action): log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" # Execute action on each target + pending = load_pending() + pending_dirty = False + for msg_id, data in targets: - eid = data.get("envelope_id") or data.get("imap_uid") + eid = data.get("envelope_id") if not eid: print(f" {msg_id}: No envelope ID, skipping") continue @@ -519,11 +509,10 @@ def cmd_review_act(selector, action): decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) # Mark as done in pending queue - pending = load_pending() pending[msg_id]["status"] = "done" pending[msg_id]["action"] = action pending[msg_id]["processed_at"] = datetime.now().isoformat() - save_pending(pending) + pending_dirty = True log_result(log_file, data, f"REVIEW:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") @@ -531,6 +520,9 @@ def cmd_review_act(selector, action): log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") + if pending_dirty: + save_pending(pending) + def cmd_review_accept(): """Accept all classifier suggestions for pending emails. @@ -547,13 +539,16 @@ def cmd_review_accept(): LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" + pending = load_pending() + pending_dirty = False + for msg_id, data in sorted_items: action = data.get("suggested_action") if not action: print(f" {msg_id}: No suggestion, skipping") continue - eid = data.get("envelope_id") or data.get("imap_uid") + eid = data.get("envelope_id") if not eid: print(f" {msg_id}: No envelope ID, skipping") continue @@ -562,11 +557,10 @@ def cmd_review_accept(): if success: decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) - pending = load_pending() pending[msg_id]["status"] = "done" pending[msg_id]["action"] = action pending[msg_id]["processed_at"] = datetime.now().isoformat() - save_pending(pending) + pending_dirty = True log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") @@ -574,6 +568,9 @@ def cmd_review_accept(): log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") + if pending_dirty: + save_pending(pending) + def _resolve_target(selector, sorted_items): """Resolve a selector (scan_index number or msg_id) to a (msg_id, data) tuple. @@ -611,7 +608,7 @@ def cmd_stats(): """Print a summary of the decision history. Shows total decisions, user vs. auto breakdown, action distribution, - top sender domains, and custom labels. + top senders, and custom labels. """ stats = decision_store.get_all_stats() @@ -672,7 +669,11 @@ if __name__ == "__main__": i = 0 while i < len(args): if args[i] == "--recent" and i + 1 < len(args): - recent = int(args[i + 1]) + try: + recent = int(args[i + 1]) + except ValueError: + print(f"--recent requires a number, got: {args[i + 1]}") + sys.exit(1) i += 2 elif args[i] == "--dry-run": dry_run = True