192 lines
7.4 KiB
Python
192 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Classifier - LLM-based email classification with learning.
|
|
|
|
This module builds a rich prompt for the local Ollama model (Qwen3) that
|
|
includes few-shot examples from past user decisions, per-sender statistics,
|
|
and a list of known labels. The model returns a structured response with
|
|
an action, confidence score, summary, and reason.
|
|
|
|
The prompt structure:
|
|
1. System instructions (action definitions)
|
|
2. Known labels (so the model reuses them)
|
|
3. Sender statistics ("linkedin.com: deleted 8 times, kept 2 times")
|
|
4. Few-shot examples (top 5 most relevant past decisions)
|
|
5. The email to classify (subject, sender, recipient, body preview)
|
|
6. Output format specification
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import decision_store
|
|
|
|
LOGS_DIR = Path(__file__).parent / "logs"
|
|
|
|
|
|
def _build_prompt(email_data, config):
|
|
"""Assemble the full classification prompt with learning context.
|
|
|
|
The prompt is built in sections, each providing different context to
|
|
help the model make better decisions. Sections are omitted when there
|
|
is no relevant data (e.g., no history yet for a new sender).
|
|
"""
|
|
max_body = config.get("rules", {}).get("max_body_length", 1000)
|
|
|
|
# Gather learning context from decision history
|
|
examples = decision_store.get_relevant_examples(email_data, n=10)
|
|
sender_domain = decision_store._extract_domain(email_data.get("sender", ""))
|
|
sender_stats = decision_store.get_sender_stats(sender_domain) if sender_domain else {}
|
|
known_labels = decision_store.get_known_labels()
|
|
|
|
# /no_think disables Qwen3's chain-of-thought, giving faster + shorter output
|
|
parts = ["/no_think\n"]
|
|
|
|
# Section 1: Action definitions
|
|
parts.append(
|
|
"You are an email classifier. Classify the email into one of these actions:\n"
|
|
"- delete: Spam, ads, promotions, unwanted notifications\n"
|
|
"- archive: Informational emails worth keeping but not needing attention "
|
|
"(receipts, shipping updates, automated confirmations)\n"
|
|
"- keep: Important emails that need attention or action (left unread in inbox)\n"
|
|
"- mark_read: Low-priority, leave in inbox but mark as read\n"
|
|
"- label:<name>: Categorize with a specific label\n"
|
|
)
|
|
|
|
# Section 2: Known labels (helps model reuse instead of inventing)
|
|
if known_labels:
|
|
parts.append(f"\nLabels used before: {', '.join(sorted(known_labels))}\n")
|
|
|
|
# Section 3: Sender statistics (strong signal for repeat senders)
|
|
if sender_stats:
|
|
stats_str = ", ".join(
|
|
f"{action} {count} times" for action, count in sender_stats.items()
|
|
)
|
|
parts.append(f"\nHistory for {sender_domain}: {stats_str}\n")
|
|
|
|
# Section 4: Few-shot examples (top 5 most relevant past decisions)
|
|
if examples:
|
|
parts.append("\n--- Past decisions (learn from these) ---")
|
|
for ex in examples[:5]:
|
|
parts.append(
|
|
f"From: {ex['sender'][:60]} | To: {ex['recipient'][:40]} | "
|
|
f"Subject: {ex['subject'][:60]} -> {ex['action']}"
|
|
)
|
|
parts.append("--- End examples ---\n")
|
|
|
|
# Section 5: The email being classified
|
|
body_preview = email_data.get("body", "")[:max_body]
|
|
parts.append(
|
|
f"Now classify this email:\n"
|
|
f"Subject: {email_data.get('subject', '(No Subject)')}\n"
|
|
f"From: {email_data.get('sender', '(Unknown)')}\n"
|
|
f"To: {email_data.get('recipient', '(Unknown)')}\n"
|
|
f"Body: {body_preview}\n"
|
|
)
|
|
|
|
# Section 6: Required output format
|
|
parts.append(
|
|
"Respond in this exact format (nothing else):\n"
|
|
"Action: [delete|archive|keep|mark_read|label:<name>]\n"
|
|
"Confidence: [0-100]\n"
|
|
"Summary: [one sentence summary of the email]\n"
|
|
"Reason: [brief explanation for your classification]"
|
|
)
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _log_llm(prompt, output, email_data, action, confidence, duration):
|
|
"""Log the full LLM prompt and response to logs/llm_YYYY-MM-DD.log."""
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
log_file = LOGS_DIR / f"llm_{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
subject = email_data.get("subject", "(No Subject)")[:60]
|
|
sender = email_data.get("sender", "(Unknown)")[:60]
|
|
|
|
with open(log_file, "a", encoding="utf-8") as f:
|
|
f.write(f"{'=' * 70}\n")
|
|
f.write(f"[{timestamp}] {subject}\n")
|
|
f.write(f"From: {sender} | Result: {action} @ {confidence}% | {duration:.1f}s\n")
|
|
f.write(f"{'-' * 70}\n")
|
|
f.write(f"PROMPT:\n{prompt}\n")
|
|
f.write(f"{'-' * 70}\n")
|
|
f.write(f"RESPONSE:\n{output}\n")
|
|
f.write(f"{'=' * 70}\n\n")
|
|
|
|
|
|
def _parse_response(output):
|
|
"""Parse the model's text response into structured fields.
|
|
|
|
Expected format (one per line):
|
|
Action: delete
|
|
Confidence: 92
|
|
Summary: Promotional offer from retailer
|
|
Reason: Clearly a marketing email with discount offer
|
|
|
|
Falls back to safe defaults (keep, 50% confidence) on parse failure.
|
|
"""
|
|
action = "keep"
|
|
confidence = 50
|
|
summary = "No summary"
|
|
reason = "Unknown"
|
|
|
|
for line in output.strip().split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("Action:"):
|
|
raw_action = line.replace("Action:", "").strip().lower()
|
|
valid_actions = {"delete", "archive", "keep", "mark_read"}
|
|
if raw_action in valid_actions or raw_action.startswith("label:"):
|
|
action = raw_action
|
|
elif line.startswith("Confidence:"):
|
|
try:
|
|
confidence = int(line.replace("Confidence:", "").strip().rstrip("%"))
|
|
confidence = max(0, min(100, confidence)) # clamp to 0-100
|
|
except ValueError:
|
|
confidence = 50
|
|
elif line.startswith("Summary:"):
|
|
summary = line.replace("Summary:", "").strip()[:200]
|
|
elif line.startswith("Reason:"):
|
|
reason = line.replace("Reason:", "").strip()
|
|
|
|
return action, confidence, summary, reason
|
|
|
|
|
|
def classify_email(email_data, config):
|
|
"""Classify an email using the local LLM with few-shot learning context.
|
|
|
|
Connects to Ollama, sends the assembled prompt, and parses the response.
|
|
On any error, falls back to "keep" with 0% confidence so the email
|
|
gets queued for manual review rather than auto-acted upon.
|
|
|
|
Args:
|
|
email_data: dict with subject, sender, recipient, body keys.
|
|
config: full config dict (needs ollama.model and rules.max_body_length).
|
|
|
|
Returns:
|
|
Tuple of (action, confidence, summary, reason, duration_seconds).
|
|
"""
|
|
import ollama
|
|
|
|
prompt = _build_prompt(email_data, config)
|
|
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
|
|
|
start_time = time.time()
|
|
try:
|
|
# Low temperature for consistent classification
|
|
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
|
output = response["response"]
|
|
action, confidence, summary, reason = _parse_response(output)
|
|
except Exception as e:
|
|
# On failure, default to "keep" with 0 confidence -> always queued
|
|
output = f"ERROR: {e}"
|
|
action = "keep"
|
|
confidence = 0
|
|
summary = "Classification failed"
|
|
reason = f"error - {str(e)[:100]}"
|
|
|
|
duration = time.time() - start_time
|
|
_log_llm(prompt, output, email_data, action, confidence, duration)
|
|
return action, confidence, summary, reason, duration
|