Files
youlu-openclaw-workspace/scripts/email_processor/classifier.py

209 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
Classifier - LLM-based email classification with learning.
This module builds a rich prompt for the local Ollama model (Qwen3) that
includes few-shot examples from past user decisions, per-sender statistics,
and a list of known labels. The model returns a structured response with
an action, category tags, summary, and reason.
Confidence is NOT produced by the LLM — it is computed externally from
decision history by decision_store.compute_confidence().
The prompt structure:
1. System instructions (action definitions)
2. Known labels (so the model reuses them)
3. Sender statistics ("linkedin.com: deleted 8 times, kept 2 times")
4. Few-shot examples (top 5 most relevant past decisions)
5. The email to classify (subject, sender, recipient, body preview)
6. Output format specification (action, tags, summary, reason)
"""
import time
from datetime import datetime
from pathlib import Path
import decision_store
LOGS_DIR = Path(__file__).parent / "logs"
TAG_TAXONOMY = [
"receipt", "invoice", "payment", "billing",
"shipping", "delivery",
"promotion", "discount", "marketing", "newsletter",
"notification", "security", "social",
"reminder", "confirmation", "update", "alert",
"personal", "account", "subscription", "travel",
]
def _build_prompt(email_data, config):
"""Assemble the full classification prompt with learning context.
The prompt is built in sections, each providing different context to
help the model make better decisions. Sections are omitted when there
is no relevant data (e.g., no history yet for a new sender).
"""
max_body = config.get("rules", {}).get("max_body_length", 1000)
# Gather learning context from decision history
examples = decision_store.get_relevant_examples(email_data, n=10)
sender_email = decision_store._extract_email_address(email_data.get("sender", ""))
sender_stats = decision_store.get_sender_stats(sender_email) if sender_email else {}
known_labels = decision_store.get_known_labels()
# /no_think disables Qwen3's chain-of-thought, giving faster + shorter output
parts = ["/no_think\n"]
# Section 1: Action definitions
parts.append(
"You are an email classifier. Classify the email into one of these actions:\n"
"- delete: Spam, ads, promotions, unwanted notifications\n"
"- archive: Informational emails worth keeping but not needing attention "
"(receipts, shipping updates, automated confirmations)\n"
"- keep: Important emails that need attention or action (left unread in inbox)\n"
"- mark_read: Low-priority, leave in inbox but mark as read\n"
"- label:<name>: Categorize with a specific label\n"
)
# Section 2: Known labels (helps model reuse instead of inventing)
if known_labels:
parts.append(f"\nLabels used before: {', '.join(sorted(known_labels))}\n")
# Section 3: Sender statistics (strong signal for repeat senders)
if sender_stats:
stats_str = ", ".join(
f"{action} {count} times" for action, count in sender_stats.items()
)
parts.append(f"\nHistory for {sender_email}: {stats_str}\n")
# Section 4: Few-shot examples (top 5 most relevant past decisions)
if examples:
parts.append("\n--- Past decisions (learn from these) ---")
for ex in examples[:5]:
parts.append(
f"From: {ex['sender'][:60]} | To: {ex['recipient'][:40]} | "
f"Subject: {ex['subject'][:60]} -> {ex['action']}"
)
parts.append("--- End examples ---\n")
# Section 5: The email being classified
body_preview = email_data.get("body", "")[:max_body]
parts.append(
f"Now classify this email:\n"
f"Subject: {email_data.get('subject', '(No Subject)')}\n"
f"From: {email_data.get('sender', '(Unknown)')}\n"
f"To: {email_data.get('recipient', '(Unknown)')}\n"
f"Body: {body_preview}\n"
)
# Section 6: Required output format
tags_list = ", ".join(TAG_TAXONOMY)
parts.append(
"Respond in this exact format (nothing else):\n"
"Action: [delete|archive|keep|mark_read|label:<name>]\n"
f"Tags: [comma-separated tags from: {tags_list}] (at least 3, max 5)\n"
"Summary: [one sentence summary of the email]\n"
"Reason: [brief explanation for your classification]"
)
return "\n".join(parts)
def _log_llm(prompt, output, email_data, action, tags, duration):
"""Log the full LLM prompt and response to logs/llm_YYYY-MM-DD.log."""
LOGS_DIR.mkdir(exist_ok=True)
log_file = LOGS_DIR / f"llm_{datetime.now().strftime('%Y-%m-%d')}.log"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = email_data.get("subject", "(No Subject)")[:60]
sender = email_data.get("sender", "(Unknown)")[:60]
tags_str = ", ".join(tags)
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"{'=' * 70}\n")
f.write(f"[{timestamp}] {subject}\n")
f.write(f"From: {sender} | Result: {action} tags=[{tags_str}] | {duration:.1f}s\n")
f.write(f"{'-' * 70}\n")
f.write(f"PROMPT:\n{prompt}\n")
f.write(f"{'-' * 70}\n")
f.write(f"RESPONSE:\n{output}\n")
f.write(f"{'=' * 70}\n\n")
def _parse_response(output):
"""Parse the model's text response into structured fields.
Expected format (one per line):
Action: delete
Tags: promotion, marketing, newsletter
Summary: Promotional offer from retailer
Reason: Clearly a marketing email with discount offer
Falls back to safe defaults (keep, empty tags) on parse failure.
"""
action = "keep"
tags = []
summary = "No summary"
reason = "Unknown"
valid_tags = set(TAG_TAXONOMY)
for line in output.strip().split("\n"):
line = line.strip()
if line.startswith("Action:"):
raw_action = line.replace("Action:", "").strip().lower()
valid_actions = {"delete", "archive", "keep", "mark_read"}
if raw_action in valid_actions or raw_action.startswith("label:"):
action = raw_action
elif line.startswith("Tags:"):
raw_tags = line.replace("Tags:", "").strip()
tags = [
t.strip().lower()
for t in raw_tags.split(",")
if t.strip().lower() in valid_tags
]
elif line.startswith("Summary:"):
summary = line.replace("Summary:", "").strip()[:200]
elif line.startswith("Reason:"):
reason = line.replace("Reason:", "").strip()
return action, tags, summary, reason
def classify_email(email_data, config):
"""Classify an email using the local LLM with few-shot learning context.
Connects to Ollama, sends the assembled prompt, and parses the response.
On any error, falls back to "keep" with empty tags so the email
gets queued for manual review rather than auto-acted upon.
Args:
email_data: dict with subject, sender, recipient, body keys.
config: full config dict (needs ollama.model and rules.max_body_length).
Returns:
Tuple of (action, tags, summary, reason, duration_seconds).
"""
import ollama
prompt = _build_prompt(email_data, config)
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
start_time = time.time()
try:
# Low temperature for consistent classification
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
output = response["response"]
action, tags, summary, reason = _parse_response(output)
except Exception as e:
# On failure, default to "keep" with empty tags -> always queued
output = f"ERROR: {e}"
action = "keep"
tags = []
summary = "Classification failed"
reason = f"error - {str(e)[:100]}"
duration = time.time() - start_time
_log_llm(prompt, output, email_data, action, tags, duration)
return action, tags, summary, reason, duration