#!/usr/bin/env python3 """ Email Processor - Learning-based mailbox cleanup using Himalaya + Ollama. Uses himalaya CLI for all IMAP operations (no raw imaplib, no stored credentials). Uses a local Qwen3 model via Ollama for classification, with few-shot learning from past user decisions. All commands are non-interactive — they take arguments, mutate files on disk, and exit. Suitable for cron (OpenClaw) and scripting. Subcommands: python main.py scan # classify unseen emails python main.py scan --recent 30 # classify last 30 days python main.py scan --dry-run # classify only, no changes python main.py scan --recent 7 --dry-run # combine both python main.py review list # print pending queue python main.py review # act on one email python main.py review all # act on all pending python main.py review accept # accept all suggestions python main.py stats # show decision history python main.py digest # today's processed emails python main.py digest --recent 3 # last 3 days Action mapping (what each classification does to the email): delete -> himalaya message delete (moves to Trash) archive -> himalaya message move Archive keep -> no-op (leave unread in inbox) mark_read -> himalaya flag add seen label:X -> himalaya message move """ import json import subprocess import hashlib import sys from datetime import datetime, timedelta from pathlib import Path import classifier import decision_store # --------------------------------------------------------------------------- # Paths — all relative to the script's own directory # --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "config.json" LOGS_DIR = SCRIPT_DIR / "logs" DATA_DIR = SCRIPT_DIR / "data" PENDING_FILE = DATA_DIR / "pending_emails.json" # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- def load_config(): """Load config.json from the script directory. Only ollama, rules, and automation settings are needed — himalaya manages its own IMAP config separately. """ with open(CONFIG_FILE) as f: return json.load(f) # --------------------------------------------------------------------------- # Himalaya CLI wrappers # # All IMAP operations go through himalaya, which handles connection, # auth, and protocol details. We call it as a subprocess and parse # its JSON output. # --------------------------------------------------------------------------- def _himalaya(*args): """Run a himalaya command and return its stdout. Raises subprocess.CalledProcessError on failure. """ result = subprocess.run( ["himalaya", *args], capture_output=True, text=True, check=True, ) return result.stdout def _himalaya_json(*args): """Run a himalaya command with JSON output and return parsed result.""" return json.loads(_himalaya("-o", "json", *args)) # --------------------------------------------------------------------------- # Email fetching via himalaya # --------------------------------------------------------------------------- def get_unseen_envelopes(): """Fetch envelope metadata for all unseen emails in INBOX. Returns a list of envelope dicts from himalaya's JSON output. Each has keys like: id, subject, from, to, date, flags. """ return _himalaya_json("envelope", "list", "-s", "500", "not", "flag", "seen") def get_recent_envelopes(days): """Fetch envelope metadata for all emails from the last N days. Includes both read and unread emails — useful for testing and bulk-classifying historical mail. """ since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") return _himalaya_json("envelope", "list", "-s", "500", "after", since) def read_message(envelope_id): """Read the full message body without marking it as seen. The --preview flag prevents himalaya from adding the \\Seen flag, so the email stays unread for the actual action to handle. """ # Read plain text, no headers, without marking as seen return _himalaya("message", "read", "--preview", "--no-headers", str(envelope_id)) def _format_address(addr_field): """Format a himalaya address field (dict, list, or string) into a display string.""" if isinstance(addr_field, dict): name = addr_field.get("name", "") addr = addr_field.get("addr", "") return f"{name} <{addr}>" if name else addr elif isinstance(addr_field, list) and addr_field: first = addr_field[0] name = first.get("name", "") addr = first.get("addr", "") return f"{name} <{addr}>" if name else addr elif isinstance(addr_field, str) and addr_field: return addr_field return "" def build_email_data(envelope, body, config): """Build the email_data dict expected by classifier and decision_store. Combines envelope metadata (from himalaya envelope list) with the message body (from himalaya message read). """ max_body = config.get("rules", {}).get("max_body_length", 1000) return { "id": str(envelope.get("id", "")), "subject": envelope.get("subject", "(No Subject)"), "sender": _format_address(envelope.get("from", {})), "recipient": _format_address(envelope.get("to", {})), "date": envelope.get("date", ""), "body": body[:max_body], } # --------------------------------------------------------------------------- # IMAP actions via himalaya # # Each function executes one himalaya command. Returns True on success. # On failure, prints the error and returns False. # --------------------------------------------------------------------------- def execute_action(envelope_id, action): """Dispatch an action string to the appropriate himalaya command. Action mapping: "delete" -> himalaya message delete "archive" -> himalaya message move Archive "keep" -> no-op (leave unread in inbox) "mark_read" -> himalaya flag add seen "label:X" -> himalaya message move Returns True on success, False on failure. """ eid = str(envelope_id) try: if action == "delete": _himalaya("message", "delete", eid) elif action == "archive": _himalaya("message", "move", "Archive", eid) elif action == "keep": pass # leave unread in inbox — no IMAP changes elif action == "mark_read": _himalaya("flag", "add", eid, "seen") elif action.startswith("label:"): folder = action[6:] _himalaya("message", "move", folder, eid) else: print(f" Unknown action: {action}") return False return True except subprocess.CalledProcessError as e: print(f" Himalaya error: {e.stderr.strip()}") return False # --------------------------------------------------------------------------- # Pending queue — emails awaiting manual review # # Stored as a JSON dict in data/pending_emails.json, keyed by msg_id. # Each entry tracks the envelope ID (for himalaya), classifier suggestion, # and status (pending/done). # --------------------------------------------------------------------------- def load_pending(): """Load the pending queue from disk.""" if not PENDING_FILE.exists(): return {} with open(PENDING_FILE, "r", encoding="utf-8") as f: return json.load(f) def save_pending(pending): """Write the pending queue to disk.""" DATA_DIR.mkdir(exist_ok=True) with open(PENDING_FILE, "w", encoding="utf-8") as f: json.dump(pending, f, indent=2, ensure_ascii=False) def add_to_pending(email_data, summary, reason, action_suggestion, confidence, tags=None): """Add an email to the pending queue for manual review. Stores the classifier's suggestion, computed confidence, and tags alongside the email metadata so the user can see what the model thought. Uses envelope_id as the primary identifier for review commands. """ # Generate a stable ID from envelope ID + subject eid = str(email_data["id"]) key = f"{eid}_{email_data['subject']}" msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}" with decision_store.file_lock(PENDING_FILE): pending = load_pending() pending[msg_id] = { "envelope_id": eid, "subject": email_data["subject"], "sender": email_data["sender"], "recipient": email_data.get("recipient", ""), "summary": summary, "reason": reason, "suggested_action": action_suggestion, "confidence": confidence, "tags": tags or [], "email_date": email_data.get("date", ""), "status": "pending", "found_at": datetime.now().isoformat(), } save_pending(pending) return msg_id # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- def log_result(log_file, email_data, action, detail, duration=None): """Append a one-line log entry for a processed email.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") dur = f" ({duration:.1f}s)" if duration else "" with open(log_file, "a") as f: f.write(f"[{timestamp}] {action}{dur}: {email_data['subject'][:60]}\n") f.write(f" From: {email_data['sender']}\n") f.write(f" Detail: {detail}\n\n") # --------------------------------------------------------------------------- # Subcommand: scan # --------------------------------------------------------------------------- def cmd_scan(config, recent=None, dry_run=False): """Fetch emails, classify each one, then auto-act or queue. Confidence is computed from decision history by matching the email's signature (sender_email, tags) against past decisions. New/unknown senders start at 50% (queued). Confidence grows as consistent history accumulates. Args: config: full config dict. recent: if set, fetch emails from last N days (not just unseen). dry_run: if True, classify and print but skip all actions. """ mode = "DRY RUN" if dry_run else "Scan" print(f"Email Processor - {mode}") print("=" * 50) # Clear done items from previous scan cycles with decision_store.file_lock(PENDING_FILE): pending = load_pending() cleared = {k: v for k, v in pending.items() if v.get("status") != "done"} if len(cleared) < len(pending): save_pending(cleared) LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" # Load automation threshold automation = config.get("automation", {}) confidence_threshold = automation.get("confidence_threshold", 85) # Fetch envelopes via himalaya if recent: envelopes = get_recent_envelopes(recent) print(f"Found {len(envelopes)} emails from last {recent} days\n") else: envelopes = get_unseen_envelopes() print(f"Found {len(envelopes)} unread emails\n") if not envelopes: print("No new emails to process.") return auto_acted = 0 queued = 0 skipped = 0 # Reuse the cleared pending dict from above to skip already-queued emails pending_eids = {v.get("envelope_id") for v in cleared.values() if v.get("status") == "pending"} for envelope in envelopes: eid = envelope.get("id", "?") # Skip emails already in the pending queue if str(eid) in pending_eids: print(f"[{eid}] (already pending, skipped)") skipped += 1 continue # Track this eid so duplicates within the same envelope list are caught pending_eids.add(str(eid)) print(f"[{eid}] ", end="", flush=True) # Read message body without marking as seen try: body = read_message(eid) except subprocess.CalledProcessError: body = "" email_data = build_email_data(envelope, body, config) print(f"{email_data['subject'][:55]}") print(f" From: {email_data['sender'][:60]}") # Run the LLM classifier (returns tags instead of confidence) action, tags, summary, reason, duration = classifier.classify_email( email_data, config ) # Compute confidence from decision history sender_email = decision_store.extract_email_address(email_data.get("sender", "")) confidence = decision_store.compute_confidence(sender_email, action, tags) tags_str = ", ".join(tags) if tags else "(none)" print(f" -> {action} (confidence: {confidence}%, {duration:.1f}s)") print(f" tags: [{tags_str}]") print(f" {reason[:80]}") # Auto-act if confidence meets threshold can_auto = confidence >= confidence_threshold if dry_run: # Dry run: log what would happen, touch nothing log_result(log_file, email_data, f"DRYRUN:{action}@{confidence}%", reason, duration) if can_auto: print(f" -> Would AUTO-execute: {action}") auto_acted += 1 else: print(f" -> Would queue for review") queued += 1 elif can_auto: # Auto-execute the action via himalaya success = execute_action(eid, action) if success: decision_store.record_decision( {**email_data, "summary": summary}, action, source="auto", tags=tags ) log_result(log_file, email_data, f"AUTO:{action}", reason, duration) print(f" ** AUTO-executed: {action}") auto_acted += 1 else: # Himalaya action failed — fall back to queuing log_result(log_file, email_data, "AUTO_FAILED", reason, duration) print(f" !! Auto-action failed, queuing instead") add_to_pending(email_data, summary, reason, action, confidence, tags) queued += 1 else: # Not enough confidence or history — queue for manual review add_to_pending(email_data, summary, reason, action, confidence, tags) log_result(log_file, email_data, f"QUEUED:{action}@{confidence}%", reason, duration) print(f" -> Queued (confidence {confidence}% < {confidence_threshold}%)") queued += 1 # Print run summary print(f"\n{'=' * 50}") print(f"Processed: {len(envelopes)} emails") print(f" Auto-acted: {auto_acted}") print(f" Queued for review: {queued}") if skipped: print(f" Skipped (already pending): {skipped}") print(f"\nRun 'python main.py review list' to see pending emails") # --------------------------------------------------------------------------- # Subcommand: review # # Non-interactive: each invocation takes arguments, acts, and exits. # No input() calls. Compatible with cron and scripting. # --------------------------------------------------------------------------- def _get_pending_items(): """Return only pending (not done) items, sorted by envelope_id.""" pending = load_pending() items = {k: v for k, v in pending.items() if v.get("status") == "pending"} sorted_items = sorted(items.items(), key=lambda x: int(x[1].get("envelope_id", 0))) return sorted_items def cmd_review_list(): """Print the pending queue and exit. Shows each email with its envelope ID, subject, sender, summary, and the classifier's suggested action with confidence. """ sorted_items = _get_pending_items() if not sorted_items: print("No pending emails to review.") return print(f"Pending emails: {len(sorted_items)}") print("=" * 60) for msg_id, data in sorted_items: eid = data.get("envelope_id", "?") suggested = data.get("suggested_action", "?") conf = data.get("confidence", "?") tags = data.get("tags", []) tags_str = ", ".join(tags) if tags else "(none)" print(f"\n [{eid}] {msg_id}") print(f" Subject: {data.get('subject', 'N/A')[:55]}") print(f" From: {data.get('sender', 'N/A')[:55]}") print(f" To: {data.get('recipient', 'N/A')[:40]}") print(f" Summary: {data.get('summary', 'N/A')[:70]}") print(f" Tags: [{tags_str}]") print(f" Suggested: {suggested} ({conf}% confidence)") print(f"\n{'=' * 60}") print("Usage:") print(" python main.py review ") print(" python main.py review all ") print(" python main.py review accept") print("Actions: delete / archive / keep / mark_read / label:") def cmd_review_act(selector, action): """Execute an action on one or more pending emails. Args: selector: an envelope_id, a msg_id string, or "all". action: one of delete/archive/keep/mark_read/label:. """ # Validate action valid_actions = {"delete", "archive", "keep", "mark_read"} if action not in valid_actions and not action.startswith("label:"): print(f"Invalid action: {action}") print(f"Valid: {', '.join(sorted(valid_actions))}, label:") sys.exit(1) sorted_items = _get_pending_items() if not sorted_items: print("No pending emails to review.") return # Resolve targets if selector == "all": targets = sorted_items else: target = _resolve_target(selector, sorted_items) if target is None: sys.exit(1) targets = [target] LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" # Execute action on each target for msg_id, data in targets: eid = data.get("envelope_id") if not eid: print(f" {msg_id}: No envelope ID, skipping") continue success = execute_action(eid, action) if success: # Record decision for future learning decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) # Mark as done in pending queue (locked to avoid concurrent corruption) with decision_store.file_lock(PENDING_FILE): pending = load_pending() pending[msg_id]["status"] = "done" pending[msg_id]["action"] = action pending[msg_id]["processed_at"] = datetime.now().isoformat() save_pending(pending) log_result(log_file, data, f"REVIEW:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") else: log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") def cmd_review_accept(): """Accept all classifier suggestions for pending emails. For each pending email, executes the suggested_action that the classifier assigned during scan. Records each as a "user" decision since the user explicitly chose to accept. """ sorted_items = _get_pending_items() if not sorted_items: print("No pending emails to review.") return LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" for msg_id, data in sorted_items: action = data.get("suggested_action") if not action: print(f" {msg_id}: No suggestion, skipping") continue eid = data.get("envelope_id") if not eid: print(f" {msg_id}: No envelope ID, skipping") continue success = execute_action(eid, action) if success: decision_store.record_decision(data, action, source="user", tags=data.get("tags", [])) with decision_store.file_lock(PENDING_FILE): pending = load_pending() pending[msg_id]["status"] = "done" pending[msg_id]["action"] = action pending[msg_id]["processed_at"] = datetime.now().isoformat() save_pending(pending) log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})") else: log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", "")) print(f" {msg_id}: {action} -> FAILED") def _resolve_target(selector, sorted_items): """Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple. Looks up by envelope_id first, then by msg_id string. Returns None and prints an error if the selector is invalid. """ # Try as envelope_id for msg_id, data in sorted_items: if data.get("envelope_id") == selector: return (msg_id, data) # Try as msg_id for msg_id, data in sorted_items: if msg_id == selector: return (msg_id, data) print(f"Not found: {selector}") valid = [d.get("envelope_id") for _, d in sorted_items] print(f"Valid envelope IDs: {', '.join(valid)}") return None # --------------------------------------------------------------------------- # Subcommand: stats # --------------------------------------------------------------------------- def cmd_stats(): """Print a summary of the decision history. Shows total decisions, user vs. auto breakdown, action distribution, top senders, and custom labels. """ stats = decision_store.get_all_stats() if not stats: print("No decision history yet.") print("Run 'python main.py scan' and 'python main.py review' to build history.") return print("Decision History Stats") print("=" * 50) print(f"Total decisions: {stats['total']}") # User vs. auto breakdown print(f"\nBy source:") for source, count in sorted(stats["by_source"].items()): pct = count / stats["total"] * 100 print(f" {source}: {count} ({pct:.0f}%)") auto = stats["by_source"].get("auto", 0) if stats["total"] > 0: print(f" Automation rate: {auto / stats['total'] * 100:.0f}%") # Action distribution print(f"\nBy action:") for action, count in sorted(stats["by_action"].items(), key=lambda x: -x[1]): print(f" {action}: {count}") # Top sender addresses with per-sender action counts print(f"\nTop senders:") for sender, count in stats["top_senders"]: sender_stats = decision_store.get_sender_stats(sender) detail = ", ".join( f"{a}:{c}" for a, c in sorted(sender_stats.items(), key=lambda x: -x[1]) ) print(f" {sender}: {count} ({detail})") # Custom labels labels = decision_store.get_known_labels() if labels: print(f"\nKnown labels: {', '.join(sorted(labels))}") # --------------------------------------------------------------------------- # Subcommand: digest # --------------------------------------------------------------------------- def cmd_digest(days=1): """Print a compact summary of recently processed emails, grouped by action. Shows both auto and user decisions with a marker to distinguish them. """ grouped = decision_store.get_recent_decisions(days) if not grouped: period = "today" if days == 1 else f"last {days} days" print(f"No processed emails in this period ({period}).") return period_label = "today" if days == 1 else f"last {days} days" print(f"Email digest ({period_label})") print("=" * 40) # Map action names to display labels action_labels = { "delete": "Deleted", "archive": "Archived", "keep": "Kept", "mark_read": "Marked read", } total = 0 auto_count = 0 user_count = 0 for action, entries in sorted(grouped.items()): label = action_labels.get(action, action.replace("label:", "Labeled ").title()) print(f"\n{label} ({len(entries)}):") for entry in entries: source = entry.get("source", "?") sender = entry.get("sender", "unknown") subject = entry.get("subject", "(no subject)") print(f" [{source}] {sender}") print(f" {subject}") total += 1 if source == "auto": auto_count += 1 else: user_count += 1 print(f"\nTotal: {total} emails processed ({auto_count} auto, {user_count} user)") # --------------------------------------------------------------------------- # Entry point & argument parsing # # Simple hand-rolled parser — no external dependencies. Supports: # main.py [subcommand] [--recent N] [--dry-run] [review-args...] # --------------------------------------------------------------------------- if __name__ == "__main__": args = sys.argv[1:] subcommand = "scan" recent = None dry_run = False extra_args = [] # for review subcommand arguments # Parse args i = 0 while i < len(args): if args[i] == "--recent" and i + 1 < len(args): try: recent = int(args[i + 1]) except ValueError: print(f"--recent requires a number, got: {args[i + 1]}") sys.exit(1) i += 2 elif args[i] == "--dry-run": dry_run = True i += 1 elif not args[i].startswith("--") and subcommand == "scan" and not extra_args: # First positional arg is the subcommand subcommand = args[i] i += 1 elif not args[i].startswith("--"): # Remaining positional args go to the subcommand extra_args.append(args[i]) i += 1 else: print(f"Unknown flag: {args[i]}") sys.exit(1) config = load_config() if subcommand == "scan": cmd_scan(config, recent=recent, dry_run=dry_run) elif subcommand == "review": if not extra_args or extra_args[0] == "list": cmd_review_list() elif extra_args[0] == "accept": cmd_review_accept() elif len(extra_args) == 2: cmd_review_act(extra_args[0], extra_args[1]) else: print("Usage:") print(" python main.py review list") print(" python main.py review ") print(" python main.py review all ") print(" python main.py review accept") sys.exit(1) elif subcommand == "stats": cmd_stats() elif subcommand == "digest": cmd_digest(days=recent if recent else 1) else: print(f"Unknown subcommand: {subcommand}") print("Usage: python main.py [scan|review|stats|digest] [--recent N] [--dry-run]") sys.exit(1)