Compare commits

...

2 Commits

Author SHA1 Message Date
Yanxin Lu
da26f84947 Merge branch 'main' of ssh://git.luyanxin.com:8103/lyx/youlu-openclaw-workspace
merge
2026-03-20 18:59:19 -07:00
Yanxin Lu
71672b31ca email-processor: fix concurrency bugs and several other issues
- Add fcntl file locking around read-modify-write cycles on both
  decision_history.json and pending_emails.json to prevent data
  corruption from parallel processes
- Pass --page-size 500 to himalaya envelope list to avoid silently
  missing emails beyond the default first page
- Use ollama.Client(host=...) so the config.json host setting is
  actually respected
- Fall back to sender-only matching in compute_confidence when LLM
  returns no valid taxonomy tags, instead of always returning 50%
- Fix _format_address to return empty string instead of literal
  "None" or "[]" for missing address fields
2026-03-20 18:58:13 -07:00
3 changed files with 78 additions and 59 deletions

View File

@@ -187,12 +187,15 @@ def classify_email(email_data, config):
import ollama import ollama
prompt = _build_prompt(email_data, config) prompt = _build_prompt(email_data, config)
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest") ollama_config = config.get("ollama", {})
model = ollama_config.get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
host = ollama_config.get("host")
client = ollama.Client(host=host) if host else ollama.Client()
start_time = time.time() start_time = time.time()
try: try:
# Low temperature for consistent classification # Low temperature for consistent classification
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1}) response = client.generate(model=model, prompt=prompt, options={"temperature": 0.1})
output = response["response"] output = response["response"]
action, tags, summary, reason = _parse_response(output) action, tags, summary, reason = _parse_response(output)
except Exception as e: except Exception as e:

View File

@@ -12,8 +12,10 @@ recipient, subject, summary, action taken, and whether it was a user or
auto decision. auto decision.
""" """
import fcntl
import json import json
import re import re
from contextlib import contextmanager
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from collections import Counter from collections import Counter
@@ -34,6 +36,19 @@ _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "y
# Internal helpers # Internal helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@contextmanager
def file_lock(path):
"""Acquire an exclusive file lock for safe concurrent access."""
DATA_DIR.mkdir(exist_ok=True)
lock_path = str(path) + ".lock"
with open(lock_path, "w") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
def _load_history(): def _load_history():
"""Load the full decision history list from disk.""" """Load the full decision history list from disk."""
if not HISTORY_FILE.exists(): if not HISTORY_FILE.exists():
@@ -69,7 +84,6 @@ def record_decision(email_data, action, source="user", tags=None):
source: "user" (manual review) or "auto" (high-confidence). source: "user" (manual review) or "auto" (high-confidence).
tags: list of category tags from the classifier taxonomy. tags: list of category tags from the classifier taxonomy.
""" """
history = _load_history()
entry = { entry = {
"timestamp": datetime.now().isoformat(timespec="seconds"), "timestamp": datetime.now().isoformat(timespec="seconds"),
"sender": email_data.get("sender", ""), "sender": email_data.get("sender", ""),
@@ -80,6 +94,8 @@ def record_decision(email_data, action, source="user", tags=None):
"source": source, "source": source,
"tags": tags or [], "tags": tags or [],
} }
with file_lock(HISTORY_FILE):
history = _load_history()
history.append(entry) history.append(entry)
_save_history(history) _save_history(history)
return entry return entry
@@ -152,24 +168,29 @@ def compute_confidence(sender_email, action, tags):
Returns an integer 0-100. Returns an integer 0-100.
""" """
history = _load_history() history = _load_history()
if not history or not tags: if not history:
return 50 return 50
# Find past decisions with same sender and sufficient tag overlap # Find past decisions with same sender and sufficient tag overlap.
# If tags are empty (LLM returned no valid taxonomy tags), fall back
# to sender-only matching so history still contributes to confidence.
matches = [] matches = []
for entry in history: for entry in history:
entry_email = extract_email_address(entry.get("sender", "")) entry_email = extract_email_address(entry.get("sender", ""))
if entry_email != sender_email: if entry_email != sender_email:
continue continue
if tags:
entry_tags = entry.get("tags", []) entry_tags = entry.get("tags", [])
if not entry_tags: if not entry_tags:
continue continue
shared = len(set(tags) & set(entry_tags)) shared = len(set(tags) & set(entry_tags))
min_len = min(len(tags), len(entry_tags)) min_len = min(len(tags), len(entry_tags))
if min_len > 0 and shared / min_len >= 0.5: if min_len > 0 and shared / min_len >= 0.5:
matches.append(entry) matches.append(entry)
else:
# No tags to compare — match on sender alone
matches.append(entry)
if not matches: if not matches:
return 50 return 50

View File

@@ -100,7 +100,7 @@ def get_unseen_envelopes():
Returns a list of envelope dicts from himalaya's JSON output. Returns a list of envelope dicts from himalaya's JSON output.
Each has keys like: id, subject, from, to, date, flags. Each has keys like: id, subject, from, to, date, flags.
""" """
return _himalaya_json("envelope", "list", "not", "flag", "seen") return _himalaya_json("envelope", "list", "-s", "500", "not", "flag", "seen")
def get_recent_envelopes(days): def get_recent_envelopes(days):
@@ -110,7 +110,7 @@ def get_recent_envelopes(days):
bulk-classifying historical mail. bulk-classifying historical mail.
""" """
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
return _himalaya_json("envelope", "list", "after", since) return _himalaya_json("envelope", "list", "-s", "500", "after", since)
def read_message(envelope_id): def read_message(envelope_id):
@@ -134,7 +134,9 @@ def _format_address(addr_field):
name = first.get("name", "") name = first.get("name", "")
addr = first.get("addr", "") addr = first.get("addr", "")
return f"{name} <{addr}>" if name else addr return f"{name} <{addr}>" if name else addr
return str(addr_field) elif isinstance(addr_field, str) and addr_field:
return addr_field
return ""
def build_email_data(envelope, body, config): def build_email_data(envelope, body, config):
@@ -226,13 +228,13 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence, t
alongside the email metadata so the user can see what the model thought. alongside the email metadata so the user can see what the model thought.
Uses envelope_id as the primary identifier for review commands. Uses envelope_id as the primary identifier for review commands.
""" """
pending = load_pending()
# Generate a stable ID from envelope ID + subject # Generate a stable ID from envelope ID + subject
eid = str(email_data["id"]) eid = str(email_data["id"])
key = f"{eid}_{email_data['subject']}" key = f"{eid}_{email_data['subject']}"
msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}" msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}"
with decision_store.file_lock(PENDING_FILE):
pending = load_pending()
pending[msg_id] = { pending[msg_id] = {
"envelope_id": eid, "envelope_id": eid,
"subject": email_data["subject"], "subject": email_data["subject"],
@@ -287,6 +289,7 @@ def cmd_scan(config, recent=None, dry_run=False):
print("=" * 50) print("=" * 50)
# Clear done items from previous scan cycles # Clear done items from previous scan cycles
with decision_store.file_lock(PENDING_FILE):
pending = load_pending() pending = load_pending()
cleared = {k: v for k, v in pending.items() if v.get("status") != "done"} cleared = {k: v for k, v in pending.items() if v.get("status") != "done"}
if len(cleared) < len(pending): if len(cleared) < len(pending):
@@ -484,9 +487,6 @@ def cmd_review_act(selector, action):
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
# Execute action on each target # Execute action on each target
pending = load_pending()
pending_dirty = False
for msg_id, data in targets: for msg_id, data in targets:
eid = data.get("envelope_id") eid = data.get("envelope_id")
if not eid: if not eid:
@@ -498,11 +498,13 @@ def cmd_review_act(selector, action):
# Record decision for future learning # Record decision for future learning
decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
# Mark as done in pending queue # Mark as done in pending queue (locked to avoid concurrent corruption)
with decision_store.file_lock(PENDING_FILE):
pending = load_pending()
pending[msg_id]["status"] = "done" pending[msg_id]["status"] = "done"
pending[msg_id]["action"] = action pending[msg_id]["action"] = action
pending[msg_id]["processed_at"] = datetime.now().isoformat() pending[msg_id]["processed_at"] = datetime.now().isoformat()
pending_dirty = True save_pending(pending)
log_result(log_file, data, f"REVIEW:{action}", data.get("reason", "")) log_result(log_file, data, f"REVIEW:{action}", data.get("reason", ""))
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
@@ -510,9 +512,6 @@ def cmd_review_act(selector, action):
log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", "")) log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", ""))
print(f" {msg_id}: {action} -> FAILED") print(f" {msg_id}: {action} -> FAILED")
if pending_dirty:
save_pending(pending)
def cmd_review_accept(): def cmd_review_accept():
"""Accept all classifier suggestions for pending emails. """Accept all classifier suggestions for pending emails.
@@ -529,9 +528,6 @@ def cmd_review_accept():
LOGS_DIR.mkdir(exist_ok=True) LOGS_DIR.mkdir(exist_ok=True)
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
pending = load_pending()
pending_dirty = False
for msg_id, data in sorted_items: for msg_id, data in sorted_items:
action = data.get("suggested_action") action = data.get("suggested_action")
if not action: if not action:
@@ -547,10 +543,12 @@ def cmd_review_accept():
if success: if success:
decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
with decision_store.file_lock(PENDING_FILE):
pending = load_pending()
pending[msg_id]["status"] = "done" pending[msg_id]["status"] = "done"
pending[msg_id]["action"] = action pending[msg_id]["action"] = action
pending[msg_id]["processed_at"] = datetime.now().isoformat() pending[msg_id]["processed_at"] = datetime.now().isoformat()
pending_dirty = True save_pending(pending)
log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", "")) log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", ""))
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
@@ -558,9 +556,6 @@ def cmd_review_accept():
log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", "")) log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", ""))
print(f" {msg_id}: {action} -> FAILED") print(f" {msg_id}: {action} -> FAILED")
if pending_dirty:
save_pending(pending)
def _resolve_target(selector, sorted_items): def _resolve_target(selector, sorted_items):
"""Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple. """Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple.