email-processor: fix concurrency bugs and several other issues
- Add fcntl file locking around read-modify-write cycles on both decision_history.json and pending_emails.json to prevent data corruption from parallel processes - Pass --page-size 500 to himalaya envelope list to avoid silently missing emails beyond the default first page - Use ollama.Client(host=...) so the config.json host setting is actually respected - Fall back to sender-only matching in compute_confidence when LLM returns no valid taxonomy tags, instead of always returning 50% - Fix _format_address to return empty string instead of literal "None" or "[]" for missing address fields
This commit is contained in:
@@ -187,12 +187,15 @@ def classify_email(email_data, config):
|
||||
import ollama
|
||||
|
||||
prompt = _build_prompt(email_data, config)
|
||||
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
||||
ollama_config = config.get("ollama", {})
|
||||
model = ollama_config.get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
||||
host = ollama_config.get("host")
|
||||
client = ollama.Client(host=host) if host else ollama.Client()
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
# Low temperature for consistent classification
|
||||
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
||||
response = client.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
||||
output = response["response"]
|
||||
action, tags, summary, reason = _parse_response(output)
|
||||
except Exception as e:
|
||||
|
||||
@@ -12,8 +12,10 @@ recipient, subject, summary, action taken, and whether it was a user or
|
||||
auto decision.
|
||||
"""
|
||||
|
||||
import fcntl
|
||||
import json
|
||||
import re
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
@@ -34,6 +36,19 @@ _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "y
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@contextmanager
|
||||
def file_lock(path):
|
||||
"""Acquire an exclusive file lock for safe concurrent access."""
|
||||
DATA_DIR.mkdir(exist_ok=True)
|
||||
lock_path = str(path) + ".lock"
|
||||
with open(lock_path, "w") as lock_file:
|
||||
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
def _load_history():
|
||||
"""Load the full decision history list from disk."""
|
||||
if not HISTORY_FILE.exists():
|
||||
@@ -69,7 +84,6 @@ def record_decision(email_data, action, source="user", tags=None):
|
||||
source: "user" (manual review) or "auto" (high-confidence).
|
||||
tags: list of category tags from the classifier taxonomy.
|
||||
"""
|
||||
history = _load_history()
|
||||
entry = {
|
||||
"timestamp": datetime.now().isoformat(timespec="seconds"),
|
||||
"sender": email_data.get("sender", ""),
|
||||
@@ -80,6 +94,8 @@ def record_decision(email_data, action, source="user", tags=None):
|
||||
"source": source,
|
||||
"tags": tags or [],
|
||||
}
|
||||
with file_lock(HISTORY_FILE):
|
||||
history = _load_history()
|
||||
history.append(entry)
|
||||
_save_history(history)
|
||||
return entry
|
||||
@@ -152,24 +168,29 @@ def compute_confidence(sender_email, action, tags):
|
||||
Returns an integer 0-100.
|
||||
"""
|
||||
history = _load_history()
|
||||
if not history or not tags:
|
||||
if not history:
|
||||
return 50
|
||||
|
||||
# Find past decisions with same sender and sufficient tag overlap
|
||||
# Find past decisions with same sender and sufficient tag overlap.
|
||||
# If tags are empty (LLM returned no valid taxonomy tags), fall back
|
||||
# to sender-only matching so history still contributes to confidence.
|
||||
matches = []
|
||||
for entry in history:
|
||||
entry_email = extract_email_address(entry.get("sender", ""))
|
||||
if entry_email != sender_email:
|
||||
continue
|
||||
|
||||
if tags:
|
||||
entry_tags = entry.get("tags", [])
|
||||
if not entry_tags:
|
||||
continue
|
||||
|
||||
shared = len(set(tags) & set(entry_tags))
|
||||
min_len = min(len(tags), len(entry_tags))
|
||||
if min_len > 0 and shared / min_len >= 0.5:
|
||||
matches.append(entry)
|
||||
else:
|
||||
# No tags to compare — match on sender alone
|
||||
matches.append(entry)
|
||||
|
||||
if not matches:
|
||||
return 50
|
||||
|
||||
@@ -100,7 +100,7 @@ def get_unseen_envelopes():
|
||||
Returns a list of envelope dicts from himalaya's JSON output.
|
||||
Each has keys like: id, subject, from, to, date, flags.
|
||||
"""
|
||||
return _himalaya_json("envelope", "list", "not", "flag", "seen")
|
||||
return _himalaya_json("envelope", "list", "-s", "500", "not", "flag", "seen")
|
||||
|
||||
|
||||
def get_recent_envelopes(days):
|
||||
@@ -110,7 +110,7 @@ def get_recent_envelopes(days):
|
||||
bulk-classifying historical mail.
|
||||
"""
|
||||
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
||||
return _himalaya_json("envelope", "list", "after", since)
|
||||
return _himalaya_json("envelope", "list", "-s", "500", "after", since)
|
||||
|
||||
|
||||
def read_message(envelope_id):
|
||||
@@ -134,7 +134,9 @@ def _format_address(addr_field):
|
||||
name = first.get("name", "")
|
||||
addr = first.get("addr", "")
|
||||
return f"{name} <{addr}>" if name else addr
|
||||
return str(addr_field)
|
||||
elif isinstance(addr_field, str) and addr_field:
|
||||
return addr_field
|
||||
return ""
|
||||
|
||||
|
||||
def build_email_data(envelope, body, config):
|
||||
@@ -226,13 +228,13 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence, t
|
||||
alongside the email metadata so the user can see what the model thought.
|
||||
Uses envelope_id as the primary identifier for review commands.
|
||||
"""
|
||||
pending = load_pending()
|
||||
|
||||
# Generate a stable ID from envelope ID + subject
|
||||
eid = str(email_data["id"])
|
||||
key = f"{eid}_{email_data['subject']}"
|
||||
msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}"
|
||||
|
||||
with decision_store.file_lock(PENDING_FILE):
|
||||
pending = load_pending()
|
||||
pending[msg_id] = {
|
||||
"envelope_id": eid,
|
||||
"subject": email_data["subject"],
|
||||
@@ -287,6 +289,7 @@ def cmd_scan(config, recent=None, dry_run=False):
|
||||
print("=" * 50)
|
||||
|
||||
# Clear done items from previous scan cycles
|
||||
with decision_store.file_lock(PENDING_FILE):
|
||||
pending = load_pending()
|
||||
cleared = {k: v for k, v in pending.items() if v.get("status") != "done"}
|
||||
if len(cleared) < len(pending):
|
||||
@@ -484,9 +487,6 @@ def cmd_review_act(selector, action):
|
||||
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
||||
|
||||
# Execute action on each target
|
||||
pending = load_pending()
|
||||
pending_dirty = False
|
||||
|
||||
for msg_id, data in targets:
|
||||
eid = data.get("envelope_id")
|
||||
if not eid:
|
||||
@@ -498,11 +498,13 @@ def cmd_review_act(selector, action):
|
||||
# Record decision for future learning
|
||||
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
||||
|
||||
# Mark as done in pending queue
|
||||
# Mark as done in pending queue (locked to avoid concurrent corruption)
|
||||
with decision_store.file_lock(PENDING_FILE):
|
||||
pending = load_pending()
|
||||
pending[msg_id]["status"] = "done"
|
||||
pending[msg_id]["action"] = action
|
||||
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
||||
pending_dirty = True
|
||||
save_pending(pending)
|
||||
|
||||
log_result(log_file, data, f"REVIEW:{action}", data.get("reason", ""))
|
||||
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
||||
@@ -510,9 +512,6 @@ def cmd_review_act(selector, action):
|
||||
log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", ""))
|
||||
print(f" {msg_id}: {action} -> FAILED")
|
||||
|
||||
if pending_dirty:
|
||||
save_pending(pending)
|
||||
|
||||
|
||||
def cmd_review_accept():
|
||||
"""Accept all classifier suggestions for pending emails.
|
||||
@@ -529,9 +528,6 @@ def cmd_review_accept():
|
||||
LOGS_DIR.mkdir(exist_ok=True)
|
||||
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
||||
|
||||
pending = load_pending()
|
||||
pending_dirty = False
|
||||
|
||||
for msg_id, data in sorted_items:
|
||||
action = data.get("suggested_action")
|
||||
if not action:
|
||||
@@ -547,10 +543,12 @@ def cmd_review_accept():
|
||||
if success:
|
||||
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
||||
|
||||
with decision_store.file_lock(PENDING_FILE):
|
||||
pending = load_pending()
|
||||
pending[msg_id]["status"] = "done"
|
||||
pending[msg_id]["action"] = action
|
||||
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
||||
pending_dirty = True
|
||||
save_pending(pending)
|
||||
|
||||
log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", ""))
|
||||
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
||||
@@ -558,9 +556,6 @@ def cmd_review_accept():
|
||||
log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", ""))
|
||||
print(f" {msg_id}: {action} -> FAILED")
|
||||
|
||||
if pending_dirty:
|
||||
save_pending(pending)
|
||||
|
||||
|
||||
def _resolve_target(selector, sorted_items):
|
||||
"""Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple.
|
||||
|
||||
Reference in New Issue
Block a user