From 71672b31ca6aeb973d738da8dc79eaf24a1a3441 Mon Sep 17 00:00:00 2001 From: Yanxin Lu Date: Fri, 20 Mar 2026 18:58:13 -0700 Subject: [PATCH] email-processor: fix concurrency bugs and several other issues - Add fcntl file locking around read-modify-write cycles on both decision_history.json and pending_emails.json to prevent data corruption from parallel processes - Pass --page-size 500 to himalaya envelope list to avoid silently missing emails beyond the default first page - Use ollama.Client(host=...) so the config.json host setting is actually respected - Fall back to sender-only matching in compute_confidence when LLM returns no valid taxonomy tags, instead of always returning 50% - Fix _format_address to return empty string instead of literal "None" or "[]" for missing address fields --- scripts/email_processor/classifier.py | 7 +- scripts/email_processor/decision_store.py | 45 ++++++++---- scripts/email_processor/main.py | 85 +++++++++++------------ 3 files changed, 78 insertions(+), 59 deletions(-) diff --git a/scripts/email_processor/classifier.py b/scripts/email_processor/classifier.py index add125f..6bbea47 100644 --- a/scripts/email_processor/classifier.py +++ b/scripts/email_processor/classifier.py @@ -187,12 +187,15 @@ def classify_email(email_data, config): import ollama prompt = _build_prompt(email_data, config) - model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest") + ollama_config = config.get("ollama", {}) + model = ollama_config.get("model", "kamekichi128/qwen3-4b-instruct-2507:latest") + host = ollama_config.get("host") + client = ollama.Client(host=host) if host else ollama.Client() start_time = time.time() try: # Low temperature for consistent classification - response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1}) + response = client.generate(model=model, prompt=prompt, options={"temperature": 0.1}) output = response["response"] action, tags, summary, reason = _parse_response(output) except Exception as e: diff --git a/scripts/email_processor/decision_store.py b/scripts/email_processor/decision_store.py index 7e08446..03e4d31 100644 --- a/scripts/email_processor/decision_store.py +++ b/scripts/email_processor/decision_store.py @@ -12,8 +12,10 @@ recipient, subject, summary, action taken, and whether it was a user or auto decision. """ +import fcntl import json import re +from contextlib import contextmanager from datetime import datetime, timedelta from pathlib import Path from collections import Counter @@ -34,6 +36,19 @@ _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "y # Internal helpers # --------------------------------------------------------------------------- +@contextmanager +def file_lock(path): + """Acquire an exclusive file lock for safe concurrent access.""" + DATA_DIR.mkdir(exist_ok=True) + lock_path = str(path) + ".lock" + with open(lock_path, "w") as lock_file: + fcntl.flock(lock_file, fcntl.LOCK_EX) + try: + yield + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) + + def _load_history(): """Load the full decision history list from disk.""" if not HISTORY_FILE.exists(): @@ -69,7 +84,6 @@ def record_decision(email_data, action, source="user", tags=None): source: "user" (manual review) or "auto" (high-confidence). tags: list of category tags from the classifier taxonomy. """ - history = _load_history() entry = { "timestamp": datetime.now().isoformat(timespec="seconds"), "sender": email_data.get("sender", ""), @@ -80,8 +94,10 @@ def record_decision(email_data, action, source="user", tags=None): "source": source, "tags": tags or [], } - history.append(entry) - _save_history(history) + with file_lock(HISTORY_FILE): + history = _load_history() + history.append(entry) + _save_history(history) return entry @@ -152,23 +168,28 @@ def compute_confidence(sender_email, action, tags): Returns an integer 0-100. """ history = _load_history() - if not history or not tags: + if not history: return 50 - # Find past decisions with same sender and sufficient tag overlap + # Find past decisions with same sender and sufficient tag overlap. + # If tags are empty (LLM returned no valid taxonomy tags), fall back + # to sender-only matching so history still contributes to confidence. matches = [] for entry in history: entry_email = extract_email_address(entry.get("sender", "")) if entry_email != sender_email: continue - entry_tags = entry.get("tags", []) - if not entry_tags: - continue - - shared = len(set(tags) & set(entry_tags)) - min_len = min(len(tags), len(entry_tags)) - if min_len > 0 and shared / min_len >= 0.5: + if tags: + entry_tags = entry.get("tags", []) + if not entry_tags: + continue + shared = len(set(tags) & set(entry_tags)) + min_len = min(len(tags), len(entry_tags)) + if min_len > 0 and shared / min_len >= 0.5: + matches.append(entry) + else: + # No tags to compare — match on sender alone matches.append(entry) if not matches: diff --git a/scripts/email_processor/main.py b/scripts/email_processor/main.py index 34c1379..e7760dc 100644 --- a/scripts/email_processor/main.py +++ b/scripts/email_processor/main.py @@ -100,7 +100,7 @@ def get_unseen_envelopes(): Returns a list of envelope dicts from himalaya's JSON output. Each has keys like: id, subject, from, to, date, flags. """ - return _himalaya_json("envelope", "list", "not", "flag", "seen") + return _himalaya_json("envelope", "list", "-s", "500", "not", "flag", "seen") def get_recent_envelopes(days): @@ -110,7 +110,7 @@ def get_recent_envelopes(days): bulk-classifying historical mail. """ since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - return _himalaya_json("envelope", "list", "after", since) + return _himalaya_json("envelope", "list", "-s", "500", "after", since) def read_message(envelope_id): @@ -134,7 +134,9 @@ def _format_address(addr_field): name = first.get("name", "") addr = first.get("addr", "") return f"{name} <{addr}>" if name else addr - return str(addr_field) + elif isinstance(addr_field, str) and addr_field: + return addr_field + return "" def build_email_data(envelope, body, config): @@ -226,28 +228,28 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence, t alongside the email metadata so the user can see what the model thought. Uses envelope_id as the primary identifier for review commands. """ - pending = load_pending() - # Generate a stable ID from envelope ID + subject eid = str(email_data["id"]) key = f"{eid}_{email_data['subject']}" msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}" - pending[msg_id] = { - "envelope_id": eid, - "subject": email_data["subject"], - "sender": email_data["sender"], - "recipient": email_data.get("recipient", ""), - "summary": summary, - "reason": reason, - "suggested_action": action_suggestion, - "confidence": confidence, - "tags": tags or [], - "email_date": email_data.get("date", ""), - "status": "pending", - "found_at": datetime.now().isoformat(), - } - save_pending(pending) + with decision_store.file_lock(PENDING_FILE): + pending = load_pending() + pending[msg_id] = { + "envelope_id": eid, + "subject": email_data["subject"], + "sender": email_data["sender"], + "recipient": email_data.get("recipient", ""), + "summary": summary, + "reason": reason, + "suggested_action": action_suggestion, + "confidence": confidence, + "tags": tags or [], + "email_date": email_data.get("date", ""), + "status": "pending", + "found_at": datetime.now().isoformat(), + } + save_pending(pending) return msg_id @@ -287,10 +289,11 @@ def cmd_scan(config, recent=None, dry_run=False): print("=" * 50) # Clear done items from previous scan cycles - pending = load_pending() - cleared = {k: v for k, v in pending.items() if v.get("status") != "done"} - if len(cleared) < len(pending): - save_pending(cleared) + with decision_store.file_lock(PENDING_FILE): + pending = load_pending() + cleared = {k: v for k, v in pending.items() if v.get("status") != "done"} + if len(cleared) < len(pending): + save_pending(cleared) LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" @@ -484,9 +487,6 @@ def cmd_review_act(selector, action): log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" # Execute action on each target - pending = load_pending() - pending_dirty = False - for msg_id, data in targets: eid = data.get("envelope_id") if not eid: @@ -498,11 +498,13 @@ def cmd_review_act(selector, action): # Record decision for future learning decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) - # Mark as done in pending queue - pending[msg_id]["status"] = "done" - pending[msg_id]["action"] = action - pending[msg_id]["processed_at"] = datetime.now().isoformat() - pending_dirty = True + # Mark as done in pending queue (locked to avoid concurrent corruption) + with decision_store.file_lock(PENDING_FILE): + pending = load_pending() + pending[msg_id]["status"] = "done" + pending[msg_id]["action"] = action + pending[msg_id]["processed_at"] = datetime.now().isoformat() + save_pending(pending) log_result(log_file, data, f"REVIEW:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") @@ -510,9 +512,6 @@ def cmd_review_act(selector, action): log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") - if pending_dirty: - save_pending(pending) - def cmd_review_accept(): """Accept all classifier suggestions for pending emails. @@ -529,9 +528,6 @@ def cmd_review_accept(): LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" - pending = load_pending() - pending_dirty = False - for msg_id, data in sorted_items: action = data.get("suggested_action") if not action: @@ -547,10 +543,12 @@ def cmd_review_accept(): if success: decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) - pending[msg_id]["status"] = "done" - pending[msg_id]["action"] = action - pending[msg_id]["processed_at"] = datetime.now().isoformat() - pending_dirty = True + with decision_store.file_lock(PENDING_FILE): + pending = load_pending() + pending[msg_id]["status"] = "done" + pending[msg_id]["action"] = action + pending[msg_id]["processed_at"] = datetime.now().isoformat() + save_pending(pending) log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") @@ -558,9 +556,6 @@ def cmd_review_accept(): log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") - if pending_dirty: - save_pending(pending) - def _resolve_target(selector, sorted_items): """Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple.