Review items now get a stable scan_index assigned during scan, so sequential review commands don't target wrong emails after earlier items are resolved. Indices reset on each new scan. Deduplicate tag taxonomy from 21 to 14 tags: drop invoice/payment (covered by billing), delivery (covered by shipping), discount/marketing (covered by promotion), and generic notification/update tags.
209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Classifier - LLM-based email classification with learning.
|
|
|
|
This module builds a rich prompt for the local Ollama model (Qwen3) that
|
|
includes few-shot examples from past user decisions, per-sender statistics,
|
|
and a list of known labels. The model returns a structured response with
|
|
an action, category tags, summary, and reason.
|
|
|
|
Confidence is NOT produced by the LLM — it is computed externally from
|
|
decision history by decision_store.compute_confidence().
|
|
|
|
The prompt structure:
|
|
1. System instructions (action definitions)
|
|
2. Known labels (so the model reuses them)
|
|
3. Sender statistics ("linkedin.com: deleted 8 times, kept 2 times")
|
|
4. Few-shot examples (top 5 most relevant past decisions)
|
|
5. The email to classify (subject, sender, recipient, body preview)
|
|
6. Output format specification (action, tags, summary, reason)
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import decision_store
|
|
|
|
LOGS_DIR = Path(__file__).parent / "logs"
|
|
|
|
TAG_TAXONOMY = [
|
|
"receipt", "billing",
|
|
"shipping",
|
|
"promotion", "newsletter",
|
|
"security", "social",
|
|
"reminder", "confirmation", "alert",
|
|
"personal", "account", "subscription", "travel",
|
|
]
|
|
|
|
|
|
def _build_prompt(email_data, config):
|
|
"""Assemble the full classification prompt with learning context.
|
|
|
|
The prompt is built in sections, each providing different context to
|
|
help the model make better decisions. Sections are omitted when there
|
|
is no relevant data (e.g., no history yet for a new sender).
|
|
"""
|
|
max_body = config.get("rules", {}).get("max_body_length", 1000)
|
|
|
|
# Gather learning context from decision history
|
|
examples = decision_store.get_relevant_examples(email_data, n=10)
|
|
sender_email = decision_store._extract_email_address(email_data.get("sender", ""))
|
|
sender_stats = decision_store.get_sender_stats(sender_email) if sender_email else {}
|
|
known_labels = decision_store.get_known_labels()
|
|
|
|
# /no_think disables Qwen3's chain-of-thought, giving faster + shorter output
|
|
parts = ["/no_think\n"]
|
|
|
|
# Section 1: Action definitions
|
|
parts.append(
|
|
"You are an email classifier. Classify the email into one of these actions:\n"
|
|
"- delete: Spam, ads, promotions, unwanted notifications\n"
|
|
"- archive: Informational emails worth keeping but not needing attention "
|
|
"(receipts, shipping updates, automated confirmations)\n"
|
|
"- keep: Important emails that need attention or action (left unread in inbox)\n"
|
|
"- mark_read: Low-priority, leave in inbox but mark as read\n"
|
|
"- label:<name>: Categorize with a specific label\n"
|
|
)
|
|
|
|
# Section 2: Known labels (helps model reuse instead of inventing)
|
|
if known_labels:
|
|
parts.append(f"\nLabels used before: {', '.join(sorted(known_labels))}\n")
|
|
|
|
# Section 3: Sender statistics (strong signal for repeat senders)
|
|
if sender_stats:
|
|
stats_str = ", ".join(
|
|
f"{action} {count} times" for action, count in sender_stats.items()
|
|
)
|
|
parts.append(f"\nHistory for {sender_email}: {stats_str}\n")
|
|
|
|
# Section 4: Few-shot examples (top 5 most relevant past decisions)
|
|
if examples:
|
|
parts.append("\n--- Past decisions (learn from these) ---")
|
|
for ex in examples[:5]:
|
|
parts.append(
|
|
f"From: {ex['sender'][:60]} | To: {ex['recipient'][:40]} | "
|
|
f"Subject: {ex['subject'][:60]} -> {ex['action']}"
|
|
)
|
|
parts.append("--- End examples ---\n")
|
|
|
|
# Section 5: The email being classified
|
|
body_preview = email_data.get("body", "")[:max_body]
|
|
parts.append(
|
|
f"Now classify this email:\n"
|
|
f"Subject: {email_data.get('subject', '(No Subject)')}\n"
|
|
f"From: {email_data.get('sender', '(Unknown)')}\n"
|
|
f"To: {email_data.get('recipient', '(Unknown)')}\n"
|
|
f"Body: {body_preview}\n"
|
|
)
|
|
|
|
# Section 6: Required output format
|
|
tags_list = ", ".join(TAG_TAXONOMY)
|
|
parts.append(
|
|
"Respond in this exact format (nothing else):\n"
|
|
"Action: [delete|archive|keep|mark_read|label:<name>]\n"
|
|
f"Tags: [comma-separated tags from: {tags_list}] (at least 3, max 5)\n"
|
|
"Summary: [one sentence summary of the email]\n"
|
|
"Reason: [brief explanation for your classification]"
|
|
)
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _log_llm(prompt, output, email_data, action, tags, duration):
|
|
"""Log the full LLM prompt and response to logs/llm_YYYY-MM-DD.log."""
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
log_file = LOGS_DIR / f"llm_{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
subject = email_data.get("subject", "(No Subject)")[:60]
|
|
sender = email_data.get("sender", "(Unknown)")[:60]
|
|
tags_str = ", ".join(tags)
|
|
|
|
with open(log_file, "a", encoding="utf-8") as f:
|
|
f.write(f"{'=' * 70}\n")
|
|
f.write(f"[{timestamp}] {subject}\n")
|
|
f.write(f"From: {sender} | Result: {action} tags=[{tags_str}] | {duration:.1f}s\n")
|
|
f.write(f"{'-' * 70}\n")
|
|
f.write(f"PROMPT:\n{prompt}\n")
|
|
f.write(f"{'-' * 70}\n")
|
|
f.write(f"RESPONSE:\n{output}\n")
|
|
f.write(f"{'=' * 70}\n\n")
|
|
|
|
|
|
def _parse_response(output):
|
|
"""Parse the model's text response into structured fields.
|
|
|
|
Expected format (one per line):
|
|
Action: delete
|
|
Tags: promotion, marketing, newsletter
|
|
Summary: Promotional offer from retailer
|
|
Reason: Clearly a marketing email with discount offer
|
|
|
|
Falls back to safe defaults (keep, empty tags) on parse failure.
|
|
"""
|
|
action = "keep"
|
|
tags = []
|
|
summary = "No summary"
|
|
reason = "Unknown"
|
|
|
|
valid_tags = set(TAG_TAXONOMY)
|
|
|
|
for line in output.strip().split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("Action:"):
|
|
raw_action = line.replace("Action:", "").strip().lower()
|
|
valid_actions = {"delete", "archive", "keep", "mark_read"}
|
|
if raw_action in valid_actions or raw_action.startswith("label:"):
|
|
action = raw_action
|
|
elif line.startswith("Tags:"):
|
|
raw_tags = line.replace("Tags:", "").strip()
|
|
tags = [
|
|
t.strip().lower()
|
|
for t in raw_tags.split(",")
|
|
if t.strip().lower() in valid_tags
|
|
]
|
|
elif line.startswith("Summary:"):
|
|
summary = line.replace("Summary:", "").strip()[:200]
|
|
elif line.startswith("Reason:"):
|
|
reason = line.replace("Reason:", "").strip()
|
|
|
|
return action, tags, summary, reason
|
|
|
|
|
|
def classify_email(email_data, config):
|
|
"""Classify an email using the local LLM with few-shot learning context.
|
|
|
|
Connects to Ollama, sends the assembled prompt, and parses the response.
|
|
On any error, falls back to "keep" with empty tags so the email
|
|
gets queued for manual review rather than auto-acted upon.
|
|
|
|
Args:
|
|
email_data: dict with subject, sender, recipient, body keys.
|
|
config: full config dict (needs ollama.model and rules.max_body_length).
|
|
|
|
Returns:
|
|
Tuple of (action, tags, summary, reason, duration_seconds).
|
|
"""
|
|
import ollama
|
|
|
|
prompt = _build_prompt(email_data, config)
|
|
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
|
|
|
start_time = time.time()
|
|
try:
|
|
# Low temperature for consistent classification
|
|
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
|
output = response["response"]
|
|
action, tags, summary, reason = _parse_response(output)
|
|
except Exception as e:
|
|
# On failure, default to "keep" with empty tags -> always queued
|
|
output = f"ERROR: {e}"
|
|
action = "keep"
|
|
tags = []
|
|
summary = "Classification failed"
|
|
reason = f"error - {str(e)[:100]}"
|
|
|
|
duration = time.time() - start_time
|
|
_log_llm(prompt, output, email_data, action, tags, duration)
|
|
return action, tags, summary, reason, duration
|