696 lines
24 KiB
Python
696 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Email Processor - Learning-based mailbox cleanup using Himalaya + Ollama.
|
|
|
|
Uses himalaya CLI for all IMAP operations (no raw imaplib, no stored
|
|
credentials). Uses a local Qwen3 model via Ollama for classification,
|
|
with few-shot learning from past user decisions.
|
|
|
|
All commands are non-interactive — they take arguments, mutate files on
|
|
disk, and exit. Suitable for cron (OpenClaw) and scripting.
|
|
|
|
Subcommands:
|
|
python main.py scan # classify unseen emails
|
|
python main.py scan --recent 30 # classify last 30 days
|
|
python main.py scan --dry-run # classify only, no changes
|
|
python main.py scan --recent 7 --dry-run # combine both
|
|
python main.py review list # print pending queue
|
|
python main.py review <num-or-id> <action> # act on one email
|
|
python main.py review all <action> # act on all pending
|
|
python main.py review accept # accept all suggestions
|
|
python main.py stats # show decision history
|
|
|
|
Action mapping (what each classification does to the email):
|
|
delete -> himalaya message delete <id> (moves to Trash)
|
|
archive -> himalaya message move Archive <id>
|
|
keep -> no-op (leave unread in inbox)
|
|
mark_read -> himalaya flag add <id> seen
|
|
label:X -> himalaya message move <X> <id>
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import hashlib
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import classifier
|
|
import decision_store
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paths — all relative to the script's own directory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
CONFIG_FILE = SCRIPT_DIR / "config.json"
|
|
LOGS_DIR = SCRIPT_DIR / "logs"
|
|
DATA_DIR = SCRIPT_DIR / "data"
|
|
PENDING_FILE = DATA_DIR / "pending_emails.json"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_config():
|
|
"""Load config.json from the script directory.
|
|
|
|
Only ollama, rules, and automation settings are needed — himalaya
|
|
manages its own IMAP config separately.
|
|
"""
|
|
with open(CONFIG_FILE) as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Himalaya CLI wrappers
|
|
#
|
|
# All IMAP operations go through himalaya, which handles connection,
|
|
# auth, and protocol details. We call it as a subprocess and parse
|
|
# its JSON output.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _himalaya(*args):
|
|
"""Run a himalaya command and return its stdout.
|
|
|
|
Raises subprocess.CalledProcessError on failure.
|
|
"""
|
|
result = subprocess.run(
|
|
["himalaya", *args],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
return result.stdout
|
|
|
|
|
|
def _himalaya_json(*args):
|
|
"""Run a himalaya command with JSON output and return parsed result."""
|
|
return json.loads(_himalaya("-o", "json", *args))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Email fetching via himalaya
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_unseen_envelopes():
|
|
"""Fetch envelope metadata for all unseen emails in INBOX.
|
|
|
|
Returns a list of envelope dicts from himalaya's JSON output.
|
|
Each has keys like: id, subject, from, to, date, flags.
|
|
"""
|
|
return _himalaya_json("envelope", "list", "not", "flag", "seen")
|
|
|
|
|
|
def get_recent_envelopes(days):
|
|
"""Fetch envelope metadata for all emails from the last N days.
|
|
|
|
Includes both read and unread emails — useful for testing and
|
|
bulk-classifying historical mail.
|
|
"""
|
|
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
|
return _himalaya_json("envelope", "list", "after", since)
|
|
|
|
|
|
def read_message(envelope_id):
|
|
"""Read the full message body without marking it as seen.
|
|
|
|
The --preview flag prevents himalaya from adding the \\Seen flag,
|
|
so the email stays unread for the actual action to handle.
|
|
"""
|
|
# Read plain text, no headers, without marking as seen
|
|
return _himalaya("message", "read", "--preview", "--no-headers", str(envelope_id))
|
|
|
|
|
|
def build_email_data(envelope, body, config):
|
|
"""Build the email_data dict expected by classifier and decision_store.
|
|
|
|
Combines envelope metadata (from himalaya envelope list) with the
|
|
message body (from himalaya message read).
|
|
"""
|
|
max_body = config.get("rules", {}).get("max_body_length", 1000)
|
|
|
|
# himalaya envelope JSON uses "from" as a nested object or string
|
|
sender = envelope.get("from", {})
|
|
if isinstance(sender, dict):
|
|
# Format: {"name": "Display Name", "addr": "user@example.com"}
|
|
name = sender.get("name", "")
|
|
addr = sender.get("addr", "")
|
|
sender_str = f"{name} <{addr}>" if name else addr
|
|
elif isinstance(sender, list) and sender:
|
|
first = sender[0]
|
|
name = first.get("name", "")
|
|
addr = first.get("addr", "")
|
|
sender_str = f"{name} <{addr}>" if name else addr
|
|
else:
|
|
sender_str = str(sender)
|
|
|
|
# Same for "to"
|
|
to = envelope.get("to", {})
|
|
if isinstance(to, dict):
|
|
name = to.get("name", "")
|
|
addr = to.get("addr", "")
|
|
to_str = f"{name} <{addr}>" if name else addr
|
|
elif isinstance(to, list) and to:
|
|
first = to[0]
|
|
name = first.get("name", "")
|
|
addr = first.get("addr", "")
|
|
to_str = f"{name} <{addr}>" if name else addr
|
|
else:
|
|
to_str = str(to)
|
|
|
|
return {
|
|
"id": str(envelope.get("id", "")),
|
|
"subject": envelope.get("subject", "(No Subject)"),
|
|
"sender": sender_str,
|
|
"recipient": to_str,
|
|
"date": envelope.get("date", ""),
|
|
"body": body[:max_body],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IMAP actions via himalaya
|
|
#
|
|
# Each function executes one himalaya command. Returns True on success.
|
|
# On failure, prints the error and returns False.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def execute_action(envelope_id, action):
|
|
"""Dispatch an action string to the appropriate himalaya command.
|
|
|
|
Action mapping:
|
|
"delete" -> himalaya message delete <id>
|
|
"archive" -> himalaya message move Archive <id>
|
|
"keep" -> no-op (leave unread in inbox)
|
|
"mark_read" -> himalaya flag add <id> seen
|
|
"label:X" -> himalaya message move <X> <id>
|
|
|
|
Returns True on success, False on failure.
|
|
"""
|
|
eid = str(envelope_id)
|
|
try:
|
|
if action == "delete":
|
|
_himalaya("message", "delete", eid)
|
|
elif action == "archive":
|
|
_himalaya("message", "move", "Archive", eid)
|
|
elif action == "keep":
|
|
pass # leave unread in inbox — no IMAP changes
|
|
elif action == "mark_read":
|
|
_himalaya("flag", "add", eid, "seen")
|
|
elif action.startswith("label:"):
|
|
folder = action[6:]
|
|
_himalaya("message", "move", folder, eid)
|
|
else:
|
|
print(f" Unknown action: {action}")
|
|
return False
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" Himalaya error: {e.stderr.strip()}")
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pending queue — emails awaiting manual review
|
|
#
|
|
# Stored as a JSON dict in data/pending_emails.json, keyed by msg_id.
|
|
# Each entry tracks the envelope ID (for himalaya), classifier suggestion,
|
|
# and status (pending/done).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_pending():
|
|
"""Load the pending queue from disk."""
|
|
if not PENDING_FILE.exists():
|
|
return {}
|
|
with open(PENDING_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_pending(pending):
|
|
"""Write the pending queue to disk."""
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
with open(PENDING_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(pending, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def add_to_pending(email_data, summary, reason, action_suggestion, confidence):
|
|
"""Add an email to the pending queue for manual review.
|
|
|
|
Stores the classifier's suggestion and confidence alongside the
|
|
email metadata so the user can see what the model thought.
|
|
"""
|
|
pending = load_pending()
|
|
|
|
# Generate a stable ID from envelope ID + subject
|
|
eid = str(email_data["id"])
|
|
key = f"{eid}_{email_data['subject']}"
|
|
msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}"
|
|
|
|
pending[msg_id] = {
|
|
"envelope_id": eid,
|
|
"subject": email_data["subject"],
|
|
"sender": email_data["sender"],
|
|
"recipient": email_data.get("recipient", ""),
|
|
"summary": summary,
|
|
"reason": reason,
|
|
"suggested_action": action_suggestion,
|
|
"confidence": confidence,
|
|
"email_date": email_data.get("date", ""),
|
|
"status": "pending",
|
|
"found_at": datetime.now().isoformat(),
|
|
}
|
|
save_pending(pending)
|
|
return msg_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_result(log_file, email_data, action, detail, duration=None):
|
|
"""Append a one-line log entry for a processed email."""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
dur = f" ({duration:.1f}s)" if duration else ""
|
|
with open(log_file, "a") as f:
|
|
f.write(f"[{timestamp}] {action}{dur}: {email_data['subject'][:60]}\n")
|
|
f.write(f" From: {email_data['sender']}\n")
|
|
f.write(f" Detail: {detail}\n\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subcommand: scan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def cmd_scan(config, recent=None, dry_run=False):
|
|
"""Fetch emails, classify each one, then auto-act or queue.
|
|
|
|
Auto-action is based on a single confidence threshold. When the
|
|
decision history has fewer than 20 entries, a higher threshold (95%)
|
|
is used to be conservative during the learning phase. Once enough
|
|
history accumulates, the configured threshold takes over.
|
|
|
|
Args:
|
|
config: full config dict.
|
|
recent: if set, fetch emails from last N days (not just unseen).
|
|
dry_run: if True, classify and print but skip all actions.
|
|
"""
|
|
mode = "DRY RUN" if dry_run else "Scan"
|
|
print(f"Email Processor - {mode}")
|
|
print("=" * 50)
|
|
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
|
|
# Load automation threshold
|
|
automation = config.get("automation", {})
|
|
configured_threshold = automation.get("confidence_threshold", 75)
|
|
|
|
# Adaptive threshold: be conservative when history is thin
|
|
stats = decision_store.get_all_stats()
|
|
total_decisions = stats["total"] if stats else 0
|
|
bootstrap_min = automation.get("bootstrap_min_decisions", 20)
|
|
if total_decisions < bootstrap_min:
|
|
confidence_threshold = 95
|
|
print(f"Learning phase ({total_decisions}/{bootstrap_min} decisions) — threshold: 95%\n")
|
|
else:
|
|
confidence_threshold = configured_threshold
|
|
|
|
# Fetch envelopes via himalaya
|
|
if recent:
|
|
envelopes = get_recent_envelopes(recent)
|
|
print(f"Found {len(envelopes)} emails from last {recent} days\n")
|
|
else:
|
|
envelopes = get_unseen_envelopes()
|
|
print(f"Found {len(envelopes)} unread emails\n")
|
|
|
|
if not envelopes:
|
|
print("No new emails to process.")
|
|
return
|
|
|
|
auto_acted = 0
|
|
queued = 0
|
|
skipped = 0
|
|
|
|
# Load pending queue once to skip already-queued emails
|
|
pending = load_pending()
|
|
pending_eids = {v.get("envelope_id") for v in pending.values() if v.get("status") == "pending"}
|
|
|
|
for envelope in envelopes:
|
|
eid = envelope.get("id", "?")
|
|
|
|
# Skip emails already in the pending queue
|
|
if str(eid) in pending_eids:
|
|
print(f"[{eid}] (already pending, skipped)")
|
|
skipped += 1
|
|
continue
|
|
|
|
print(f"[{eid}] ", end="", flush=True)
|
|
|
|
# Read message body without marking as seen
|
|
try:
|
|
body = read_message(eid)
|
|
except subprocess.CalledProcessError:
|
|
body = ""
|
|
|
|
email_data = build_email_data(envelope, body, config)
|
|
print(f"{email_data['subject'][:55]}")
|
|
|
|
# Run the LLM classifier (includes few-shot examples from history)
|
|
action, confidence, summary, reason, duration = classifier.classify_email(
|
|
email_data, config
|
|
)
|
|
|
|
print(f" -> {action} (confidence: {confidence}%, {duration:.1f}s)")
|
|
print(f" {reason[:80]}")
|
|
|
|
# Auto-act if confidence meets threshold
|
|
can_auto = confidence >= confidence_threshold
|
|
|
|
if dry_run:
|
|
# Dry run: log what would happen, touch nothing
|
|
log_result(log_file, email_data, f"DRYRUN:{action}@{confidence}%", reason, duration)
|
|
if can_auto:
|
|
print(f" -> Would AUTO-execute: {action}")
|
|
auto_acted += 1
|
|
else:
|
|
print(f" -> Would queue for review")
|
|
queued += 1
|
|
elif can_auto:
|
|
# Auto-execute the action via himalaya
|
|
success = execute_action(eid, action)
|
|
if success:
|
|
decision_store.record_decision(
|
|
{**email_data, "summary": summary}, action, source="auto"
|
|
)
|
|
log_result(log_file, email_data, f"AUTO:{action}", reason, duration)
|
|
print(f" ** AUTO-executed: {action}")
|
|
auto_acted += 1
|
|
else:
|
|
# Himalaya action failed — fall back to queuing
|
|
log_result(log_file, email_data, "AUTO_FAILED", reason, duration)
|
|
print(f" !! Auto-action failed, queuing instead")
|
|
add_to_pending(email_data, summary, reason, action, confidence)
|
|
queued += 1
|
|
else:
|
|
# Not enough confidence or history — queue for manual review
|
|
add_to_pending(email_data, summary, reason, action, confidence)
|
|
log_result(log_file, email_data, f"QUEUED:{action}@{confidence}%", reason, duration)
|
|
print(f" -> Queued (confidence {confidence}% < {confidence_threshold}%)")
|
|
queued += 1
|
|
|
|
# Print run summary
|
|
print(f"\n{'=' * 50}")
|
|
print(f"Processed: {len(envelopes)} emails")
|
|
print(f" Auto-acted: {auto_acted}")
|
|
print(f" Queued for review: {queued}")
|
|
if skipped:
|
|
print(f" Skipped (already pending): {skipped}")
|
|
print(f"\nRun 'python main.py review list' to see pending emails")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subcommand: review
|
|
#
|
|
# Non-interactive: each invocation takes arguments, acts, and exits.
|
|
# No input() calls. Compatible with cron and scripting.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_pending_items():
|
|
"""Return only pending (not done) items, sorted by found_at."""
|
|
pending = load_pending()
|
|
items = {k: v for k, v in pending.items() if v.get("status") == "pending"}
|
|
sorted_items = sorted(items.items(), key=lambda x: x[1].get("found_at", ""))
|
|
return sorted_items
|
|
|
|
|
|
def cmd_review_list():
|
|
"""Print the pending queue and exit.
|
|
|
|
Shows each email with its number, ID, subject, sender, summary,
|
|
and the classifier's suggested action with confidence.
|
|
"""
|
|
sorted_items = _get_pending_items()
|
|
|
|
if not sorted_items:
|
|
print("No pending emails to review.")
|
|
return
|
|
|
|
print(f"Pending emails: {len(sorted_items)}")
|
|
print("=" * 60)
|
|
|
|
for i, (msg_id, data) in enumerate(sorted_items, 1):
|
|
suggested = data.get("suggested_action", "?")
|
|
conf = data.get("confidence", "?")
|
|
print(f"\n {i}. [{msg_id}]")
|
|
print(f" Subject: {data.get('subject', 'N/A')[:55]}")
|
|
print(f" From: {data.get('sender', 'N/A')[:55]}")
|
|
print(f" To: {data.get('recipient', 'N/A')[:40]}")
|
|
print(f" Summary: {data.get('summary', 'N/A')[:70]}")
|
|
print(f" Suggested: {suggested} ({conf}% confidence)")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("Usage:")
|
|
print(" python main.py review <number> <action>")
|
|
print(" python main.py review all <action>")
|
|
print(" python main.py review accept")
|
|
print("Actions: delete / archive / keep / mark_read / label:<name>")
|
|
|
|
|
|
def cmd_review_act(selector, action):
|
|
"""Execute an action on one or more pending emails.
|
|
|
|
Args:
|
|
selector: a 1-based number, a msg_id string, or "all".
|
|
action: one of delete/archive/keep/mark_read/label:<name>.
|
|
"""
|
|
# Validate action
|
|
valid_actions = {"delete", "archive", "keep", "mark_read"}
|
|
if action not in valid_actions and not action.startswith("label:"):
|
|
print(f"Invalid action: {action}")
|
|
print(f"Valid: {', '.join(sorted(valid_actions))}, label:<name>")
|
|
sys.exit(1)
|
|
|
|
sorted_items = _get_pending_items()
|
|
if not sorted_items:
|
|
print("No pending emails to review.")
|
|
return
|
|
|
|
# Resolve targets
|
|
if selector == "all":
|
|
targets = sorted_items
|
|
else:
|
|
target = _resolve_target(selector, sorted_items)
|
|
if target is None:
|
|
sys.exit(1)
|
|
targets = [target]
|
|
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
|
|
# Execute action on each target
|
|
for msg_id, data in targets:
|
|
eid = data.get("envelope_id") or data.get("imap_uid")
|
|
if not eid:
|
|
print(f" {msg_id}: No envelope ID, skipping")
|
|
continue
|
|
|
|
success = execute_action(eid, action)
|
|
if success:
|
|
# Record decision for future learning
|
|
decision_store.record_decision(data, action, source="user")
|
|
|
|
# Mark as done in pending queue
|
|
pending = load_pending()
|
|
pending[msg_id]["status"] = "done"
|
|
pending[msg_id]["action"] = action
|
|
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
|
save_pending(pending)
|
|
|
|
log_result(log_file, data, f"REVIEW:{action}", data.get("reason", ""))
|
|
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
|
else:
|
|
log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", ""))
|
|
print(f" {msg_id}: {action} -> FAILED")
|
|
|
|
|
|
def cmd_review_accept():
|
|
"""Accept all classifier suggestions for pending emails.
|
|
|
|
For each pending email, executes the suggested_action that the
|
|
classifier assigned during scan. Records each as a "user" decision
|
|
since the user explicitly chose to accept.
|
|
"""
|
|
sorted_items = _get_pending_items()
|
|
if not sorted_items:
|
|
print("No pending emails to review.")
|
|
return
|
|
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
|
|
for msg_id, data in sorted_items:
|
|
action = data.get("suggested_action")
|
|
if not action:
|
|
print(f" {msg_id}: No suggestion, skipping")
|
|
continue
|
|
|
|
eid = data.get("envelope_id") or data.get("imap_uid")
|
|
if not eid:
|
|
print(f" {msg_id}: No envelope ID, skipping")
|
|
continue
|
|
|
|
success = execute_action(eid, action)
|
|
if success:
|
|
decision_store.record_decision(data, action, source="user")
|
|
|
|
pending = load_pending()
|
|
pending[msg_id]["status"] = "done"
|
|
pending[msg_id]["action"] = action
|
|
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
|
save_pending(pending)
|
|
|
|
log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", ""))
|
|
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
|
else:
|
|
log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", ""))
|
|
print(f" {msg_id}: {action} -> FAILED")
|
|
|
|
|
|
def _resolve_target(selector, sorted_items):
|
|
"""Resolve a selector (number or msg_id) to a (msg_id, data) tuple.
|
|
|
|
Returns None and prints an error if the selector is invalid.
|
|
"""
|
|
# Try as 1-based index
|
|
try:
|
|
idx = int(selector) - 1
|
|
if 0 <= idx < len(sorted_items):
|
|
return sorted_items[idx]
|
|
else:
|
|
print(f"Invalid number. Range: 1-{len(sorted_items)}")
|
|
return None
|
|
except ValueError:
|
|
pass
|
|
|
|
# Try as msg_id
|
|
for msg_id, data in sorted_items:
|
|
if msg_id == selector:
|
|
return (msg_id, data)
|
|
|
|
print(f"Not found: {selector}")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subcommand: stats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def cmd_stats():
|
|
"""Print a summary of the decision history.
|
|
|
|
Shows total decisions, user vs. auto breakdown, action distribution,
|
|
top sender domains, and custom labels.
|
|
"""
|
|
stats = decision_store.get_all_stats()
|
|
|
|
if not stats:
|
|
print("No decision history yet.")
|
|
print("Run 'python main.py scan' and 'python main.py review' to build history.")
|
|
return
|
|
|
|
print("Decision History Stats")
|
|
print("=" * 50)
|
|
print(f"Total decisions: {stats['total']}")
|
|
|
|
# User vs. auto breakdown
|
|
print(f"\nBy source:")
|
|
for source, count in sorted(stats["by_source"].items()):
|
|
pct = count / stats["total"] * 100
|
|
print(f" {source}: {count} ({pct:.0f}%)")
|
|
|
|
auto = stats["by_source"].get("auto", 0)
|
|
if stats["total"] > 0:
|
|
print(f" Automation rate: {auto / stats['total'] * 100:.0f}%")
|
|
|
|
# Action distribution
|
|
print(f"\nBy action:")
|
|
for action, count in sorted(stats["by_action"].items(), key=lambda x: -x[1]):
|
|
print(f" {action}: {count}")
|
|
|
|
# Top sender domains with per-domain action counts
|
|
print(f"\nTop sender domains:")
|
|
for domain, count in stats["top_domains"]:
|
|
domain_stats = decision_store.get_sender_stats(domain)
|
|
detail = ", ".join(
|
|
f"{a}:{c}" for a, c in sorted(domain_stats.items(), key=lambda x: -x[1])
|
|
)
|
|
print(f" {domain}: {count} ({detail})")
|
|
|
|
# Custom labels
|
|
labels = decision_store.get_known_labels()
|
|
if labels:
|
|
print(f"\nKnown labels: {', '.join(sorted(labels))}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point & argument parsing
|
|
#
|
|
# Simple hand-rolled parser — no external dependencies. Supports:
|
|
# main.py [subcommand] [--recent N] [--dry-run] [review-args...]
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
subcommand = "scan"
|
|
recent = None
|
|
dry_run = False
|
|
extra_args = [] # for review subcommand arguments
|
|
|
|
# Parse args
|
|
i = 0
|
|
while i < len(args):
|
|
if args[i] == "--recent" and i + 1 < len(args):
|
|
recent = int(args[i + 1])
|
|
i += 2
|
|
elif args[i] == "--dry-run":
|
|
dry_run = True
|
|
i += 1
|
|
elif not args[i].startswith("--") and subcommand == "scan" and not extra_args:
|
|
# First positional arg is the subcommand
|
|
subcommand = args[i]
|
|
i += 1
|
|
elif not args[i].startswith("--"):
|
|
# Remaining positional args go to the subcommand
|
|
extra_args.append(args[i])
|
|
i += 1
|
|
else:
|
|
print(f"Unknown flag: {args[i]}")
|
|
sys.exit(1)
|
|
|
|
config = load_config()
|
|
|
|
if subcommand == "scan":
|
|
cmd_scan(config, recent=recent, dry_run=dry_run)
|
|
|
|
elif subcommand == "review":
|
|
if not extra_args or extra_args[0] == "list":
|
|
cmd_review_list()
|
|
elif extra_args[0] == "accept":
|
|
cmd_review_accept()
|
|
elif len(extra_args) == 2:
|
|
cmd_review_act(extra_args[0], extra_args[1])
|
|
else:
|
|
print("Usage:")
|
|
print(" python main.py review list")
|
|
print(" python main.py review <number-or-id> <action>")
|
|
print(" python main.py review all <action>")
|
|
print(" python main.py review accept")
|
|
sys.exit(1)
|
|
|
|
elif subcommand == "stats":
|
|
cmd_stats()
|
|
|
|
else:
|
|
print(f"Unknown subcommand: {subcommand}")
|
|
print("Usage: python main.py [scan|review|stats] [--recent N] [--dry-run]")
|
|
sys.exit(1)
|