Compute confidence from decision history instead of LLM

This commit is contained in:
Yanxin Lu
2026-03-04 14:23:50 -08:00
parent 720f4ef1ad
commit 64e28b55d1
4 changed files with 133 additions and 84 deletions

View File

@@ -5,7 +5,10 @@ Classifier - LLM-based email classification with learning.
This module builds a rich prompt for the local Ollama model (Qwen3) that
includes few-shot examples from past user decisions, per-sender statistics,
and a list of known labels. The model returns a structured response with
an action, confidence score, summary, and reason.
an action, category tags, summary, and reason.
Confidence is NOT produced by the LLM — it is computed externally from
decision history by decision_store.compute_confidence().
The prompt structure:
1. System instructions (action definitions)
@@ -13,7 +16,7 @@ The prompt structure:
3. Sender statistics ("linkedin.com: deleted 8 times, kept 2 times")
4. Few-shot examples (top 5 most relevant past decisions)
5. The email to classify (subject, sender, recipient, body preview)
6. Output format specification
6. Output format specification (action, tags, summary, reason)
"""
import time
@@ -24,6 +27,15 @@ import decision_store
LOGS_DIR = Path(__file__).parent / "logs"
TAG_TAXONOMY = [
"receipt", "invoice", "payment", "billing",
"shipping", "delivery",
"promotion", "discount", "marketing", "newsletter",
"notification", "security", "social",
"reminder", "confirmation", "update", "alert",
"personal", "account", "subscription", "travel",
]
def _build_prompt(email_data, config):
"""Assemble the full classification prompt with learning context.
@@ -36,8 +48,8 @@ def _build_prompt(email_data, config):
# Gather learning context from decision history
examples = decision_store.get_relevant_examples(email_data, n=10)
sender_domain = decision_store._extract_domain(email_data.get("sender", ""))
sender_stats = decision_store.get_sender_stats(sender_domain) if sender_domain else {}
sender_email = decision_store._extract_email_address(email_data.get("sender", ""))
sender_stats = decision_store.get_sender_stats(sender_email) if sender_email else {}
known_labels = decision_store.get_known_labels()
# /no_think disables Qwen3's chain-of-thought, giving faster + shorter output
@@ -63,7 +75,7 @@ def _build_prompt(email_data, config):
stats_str = ", ".join(
f"{action} {count} times" for action, count in sender_stats.items()
)
parts.append(f"\nHistory for {sender_domain}: {stats_str}\n")
parts.append(f"\nHistory for {sender_email}: {stats_str}\n")
# Section 4: Few-shot examples (top 5 most relevant past decisions)
if examples:
@@ -86,10 +98,11 @@ def _build_prompt(email_data, config):
)
# Section 6: Required output format
tags_list = ", ".join(TAG_TAXONOMY)
parts.append(
"Respond in this exact format (nothing else):\n"
"Action: [delete|archive|keep|mark_read|label:<name>]\n"
"Confidence: [0-100]\n"
f"Tags: [comma-separated tags from: {tags_list}]\n"
"Summary: [one sentence summary of the email]\n"
"Reason: [brief explanation for your classification]"
)
@@ -97,18 +110,19 @@ def _build_prompt(email_data, config):
return "\n".join(parts)
def _log_llm(prompt, output, email_data, action, confidence, duration):
def _log_llm(prompt, output, email_data, action, tags, duration):
"""Log the full LLM prompt and response to logs/llm_YYYY-MM-DD.log."""
LOGS_DIR.mkdir(exist_ok=True)
log_file = LOGS_DIR / f"llm_{datetime.now().strftime('%Y-%m-%d')}.log"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = email_data.get("subject", "(No Subject)")[:60]
sender = email_data.get("sender", "(Unknown)")[:60]
tags_str = ", ".join(tags)
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"{'=' * 70}\n")
f.write(f"[{timestamp}] {subject}\n")
f.write(f"From: {sender} | Result: {action} @ {confidence}% | {duration:.1f}s\n")
f.write(f"From: {sender} | Result: {action} tags=[{tags_str}] | {duration:.1f}s\n")
f.write(f"{'-' * 70}\n")
f.write(f"PROMPT:\n{prompt}\n")
f.write(f"{'-' * 70}\n")
@@ -121,17 +135,19 @@ def _parse_response(output):
Expected format (one per line):
Action: delete
Confidence: 92
Tags: promotion, marketing, newsletter
Summary: Promotional offer from retailer
Reason: Clearly a marketing email with discount offer
Falls back to safe defaults (keep, 50% confidence) on parse failure.
Falls back to safe defaults (keep, empty tags) on parse failure.
"""
action = "keep"
confidence = 50
tags = []
summary = "No summary"
reason = "Unknown"
valid_tags = set(TAG_TAXONOMY)
for line in output.strip().split("\n"):
line = line.strip()
if line.startswith("Action:"):
@@ -139,25 +155,26 @@ def _parse_response(output):
valid_actions = {"delete", "archive", "keep", "mark_read"}
if raw_action in valid_actions or raw_action.startswith("label:"):
action = raw_action
elif line.startswith("Confidence:"):
try:
confidence = int(line.replace("Confidence:", "").strip().rstrip("%"))
confidence = max(0, min(100, confidence)) # clamp to 0-100
except ValueError:
confidence = 50
elif line.startswith("Tags:"):
raw_tags = line.replace("Tags:", "").strip()
tags = [
t.strip().lower()
for t in raw_tags.split(",")
if t.strip().lower() in valid_tags
]
elif line.startswith("Summary:"):
summary = line.replace("Summary:", "").strip()[:200]
elif line.startswith("Reason:"):
reason = line.replace("Reason:", "").strip()
return action, confidence, summary, reason
return action, tags, summary, reason
def classify_email(email_data, config):
"""Classify an email using the local LLM with few-shot learning context.
Connects to Ollama, sends the assembled prompt, and parses the response.
On any error, falls back to "keep" with 0% confidence so the email
On any error, falls back to "keep" with empty tags so the email
gets queued for manual review rather than auto-acted upon.
Args:
@@ -165,7 +182,7 @@ def classify_email(email_data, config):
config: full config dict (needs ollama.model and rules.max_body_length).
Returns:
Tuple of (action, confidence, summary, reason, duration_seconds).
Tuple of (action, tags, summary, reason, duration_seconds).
"""
import ollama
@@ -177,15 +194,15 @@ def classify_email(email_data, config):
# Low temperature for consistent classification
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
output = response["response"]
action, confidence, summary, reason = _parse_response(output)
action, tags, summary, reason = _parse_response(output)
except Exception as e:
# On failure, default to "keep" with 0 confidence -> always queued
# On failure, default to "keep" with empty tags -> always queued
output = f"ERROR: {e}"
action = "keep"
confidence = 0
tags = []
summary = "Classification failed"
reason = f"error - {str(e)[:100]}"
duration = time.time() - start_time
_log_llm(prompt, output, email_data, action, confidence, duration)
return action, confidence, summary, reason, duration
_log_llm(prompt, output, email_data, action, tags, duration)
return action, tags, summary, reason, duration

View File

@@ -8,7 +8,6 @@
"check_unseen_only": true
},
"automation": {
"confidence_threshold": 75,
"bootstrap_min_decisions": 30
"confidence_threshold": 85
}
}

View File

@@ -71,7 +71,7 @@ def _extract_email_address(sender):
# Public API
# ---------------------------------------------------------------------------
def record_decision(email_data, action, source="user"):
def record_decision(email_data, action, source="user", tags=None):
"""Append a decision to the history file.
Args:
@@ -79,6 +79,7 @@ def record_decision(email_data, action, source="user"):
action: one of "delete", "archive", "keep", "mark_read",
or "label:<name>".
source: "user" (manual review) or "auto" (high-confidence).
tags: list of category tags from the classifier taxonomy.
"""
history = _load_history()
entry = {
@@ -90,6 +91,7 @@ def record_decision(email_data, action, source="user"):
"summary": email_data.get("summary", ""),
"action": action,
"source": source,
"tags": tags or [],
}
history.append(entry)
_save_history(history)
@@ -99,10 +101,9 @@ def record_decision(email_data, action, source="user"):
def get_relevant_examples(email_data, n=10):
"""Find the N most relevant past decisions for a given email.
Relevance is scored by three signals:
- Exact sender domain match: +3 points
- Recipient string match: +2 points
- Subject keyword overlap: +1 point per shared word
Relevance is scored by two signals:
- Exact sender email address match: +3 points
- Subject keyword overlap: +1 point per shared word
Only entries with score > 0 are considered. Results are returned
sorted by descending relevance.
@@ -111,8 +112,7 @@ def get_relevant_examples(email_data, n=10):
if not history:
return []
target_domain = _extract_domain(email_data.get("sender", ""))
target_recipient = email_data.get("recipient", "").lower()
target_email = _extract_email_address(email_data.get("sender", ""))
target_words = (
set(re.findall(r"\w+", email_data.get("subject", "").lower())) - _STOP_WORDS
)
@@ -121,15 +121,11 @@ def get_relevant_examples(email_data, n=10):
for entry in history:
score = 0
# Signal 1: sender domain match
if target_domain and entry.get("sender_domain", "") == target_domain:
# Signal 1: sender email match
if target_email and _extract_email_address(entry.get("sender", "")) == target_email:
score += 3
# Signal 2: recipient substring match
if target_recipient and target_recipient in entry.get("recipient", "").lower():
score += 2
# Signal 3: subject keyword overlap
# Signal 2: subject keyword overlap
entry_words = (
set(re.findall(r"\w+", entry.get("subject", "").lower())) - _STOP_WORDS
)
@@ -142,27 +138,64 @@ def get_relevant_examples(email_data, n=10):
return [entry for _, entry in scored[:n]]
def get_sender_stats(sender_domain):
"""Get action distribution for a sender domain.
def get_sender_stats(sender_email):
"""Get action distribution for a sender email address.
Returns a dict like {"delete": 5, "keep": 2, "archive": 1}.
"""
history = _load_history()
actions = Counter()
for entry in history:
if entry.get("sender_domain", "") == sender_domain:
if _extract_email_address(entry.get("sender", "")) == sender_email:
actions[entry["action"]] += 1
return dict(actions)
def get_sender_history_count(sender_domain):
"""Count total past decisions for a sender domain.
def compute_confidence(sender_email, action, tags):
"""Compute confidence from decision history by matching email signatures.
Used by the scan command to decide whether there is enough history
to trust auto-actions for this sender.
A "signature" is (sender_email, tags). Past decisions match if they have
the same sender email AND at least 50% tag overlap with the current email.
Confidence is based on two factors:
1. Agreement: what fraction of matching decisions chose the same action.
2. Match-count cap: limits confidence until enough history exists
(1 match -> max 10%, 5 matches -> 50%, 10+ -> 100%).
Returns an integer 0-100.
"""
history = _load_history()
return sum(1 for e in history if e.get("sender_domain", "") == sender_domain)
if not history or not tags:
return 50
# Find past decisions with same sender and sufficient tag overlap
matches = []
for entry in history:
entry_email = _extract_email_address(entry.get("sender", ""))
if entry_email != sender_email:
continue
entry_tags = entry.get("tags", [])
if not entry_tags:
continue
shared = len(set(tags) & set(entry_tags))
min_len = min(len(tags), len(entry_tags))
if min_len > 0 and shared / min_len >= 0.5:
matches.append(entry)
if not matches:
return 50
# Agreement: fraction of matches with the same action
matching_action = sum(1 for m in matches if m["action"] == action)
total = len(matches)
agreement = round(matching_action / total * 100)
# Cap by match count: each match adds 10% to the cap
cap = min(total * 10, 100)
return min(agreement, cap)
def get_known_labels():
@@ -194,13 +227,13 @@ def get_all_stats():
by_action = Counter(e["action"] for e in history)
by_source = Counter(e["source"] for e in history)
# Top 10 sender domains by decision count
domain_counts = Counter(e.get("sender_domain", "") for e in history)
top_domains = domain_counts.most_common(10)
# Top 10 sender addresses by decision count
sender_counts = Counter(_extract_email_address(e.get("sender", "")) for e in history)
top_senders = sender_counts.most_common(10)
return {
"total": total,
"by_action": dict(by_action),
"by_source": dict(by_source),
"top_domains": top_domains,
"top_senders": top_senders,
}

View File

@@ -232,11 +232,11 @@ def save_pending(pending):
json.dump(pending, f, indent=2, ensure_ascii=False)
def add_to_pending(email_data, summary, reason, action_suggestion, confidence):
def add_to_pending(email_data, summary, reason, action_suggestion, confidence, tags=None):
"""Add an email to the pending queue for manual review.
Stores the classifier's suggestion and confidence alongside the
email metadata so the user can see what the model thought.
Stores the classifier's suggestion, computed confidence, and tags
alongside the email metadata so the user can see what the model thought.
"""
pending = load_pending()
@@ -254,6 +254,7 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence):
"reason": reason,
"suggested_action": action_suggestion,
"confidence": confidence,
"tags": tags or [],
"email_date": email_data.get("date", ""),
"status": "pending",
"found_at": datetime.now().isoformat(),
@@ -283,10 +284,10 @@ def log_result(log_file, email_data, action, detail, duration=None):
def cmd_scan(config, recent=None, dry_run=False):
"""Fetch emails, classify each one, then auto-act or queue.
Auto-action is based on a single confidence threshold. When the
decision history has fewer than 20 entries, a higher threshold (95%)
is used to be conservative during the learning phase. Once enough
history accumulates, the configured threshold takes over.
Confidence is computed from decision history by matching the email's
signature (sender_email, tags) against past decisions. New/unknown
senders start at 50% (queued). Confidence grows as consistent history
accumulates.
Args:
config: full config dict.
@@ -302,17 +303,7 @@ def cmd_scan(config, recent=None, dry_run=False):
# Load automation threshold
automation = config.get("automation", {})
configured_threshold = automation.get("confidence_threshold", 75)
# Adaptive threshold: be conservative when history is thin
stats = decision_store.get_all_stats()
total_decisions = stats["total"] if stats else 0
bootstrap_min = automation.get("bootstrap_min_decisions", 20)
if total_decisions < bootstrap_min:
confidence_threshold = 95
print(f"Learning phase ({total_decisions}/{bootstrap_min} decisions) — threshold: 95%\n")
else:
confidence_threshold = configured_threshold
confidence_threshold = automation.get("confidence_threshold", 75)
# Fetch envelopes via himalaya
if recent:
@@ -354,12 +345,18 @@ def cmd_scan(config, recent=None, dry_run=False):
email_data = build_email_data(envelope, body, config)
print(f"{email_data['subject'][:55]}")
# Run the LLM classifier (includes few-shot examples from history)
action, confidence, summary, reason, duration = classifier.classify_email(
# Run the LLM classifier (returns tags instead of confidence)
action, tags, summary, reason, duration = classifier.classify_email(
email_data, config
)
# Compute confidence from decision history
sender_email = decision_store._extract_email_address(email_data.get("sender", ""))
confidence = decision_store.compute_confidence(sender_email, action, tags)
tags_str = ", ".join(tags) if tags else "(none)"
print(f" -> {action} (confidence: {confidence}%, {duration:.1f}s)")
print(f" tags: [{tags_str}]")
print(f" {reason[:80]}")
# Auto-act if confidence meets threshold
@@ -379,7 +376,7 @@ def cmd_scan(config, recent=None, dry_run=False):
success = execute_action(eid, action)
if success:
decision_store.record_decision(
{**email_data, "summary": summary}, action, source="auto"
{**email_data, "summary": summary}, action, source="auto", tags=tags
)
log_result(log_file, email_data, f"AUTO:{action}", reason, duration)
print(f" ** AUTO-executed: {action}")
@@ -388,11 +385,11 @@ def cmd_scan(config, recent=None, dry_run=False):
# Himalaya action failed — fall back to queuing
log_result(log_file, email_data, "AUTO_FAILED", reason, duration)
print(f" !! Auto-action failed, queuing instead")
add_to_pending(email_data, summary, reason, action, confidence)
add_to_pending(email_data, summary, reason, action, confidence, tags)
queued += 1
else:
# Not enough confidence or history — queue for manual review
add_to_pending(email_data, summary, reason, action, confidence)
add_to_pending(email_data, summary, reason, action, confidence, tags)
log_result(log_file, email_data, f"QUEUED:{action}@{confidence}%", reason, duration)
print(f" -> Queued (confidence {confidence}% < {confidence_threshold}%)")
queued += 1
@@ -440,11 +437,14 @@ def cmd_review_list():
for i, (msg_id, data) in enumerate(sorted_items, 1):
suggested = data.get("suggested_action", "?")
conf = data.get("confidence", "?")
tags = data.get("tags", [])
tags_str = ", ".join(tags) if tags else "(none)"
print(f"\n {i}. [{msg_id}]")
print(f" Subject: {data.get('subject', 'N/A')[:55]}")
print(f" From: {data.get('sender', 'N/A')[:55]}")
print(f" To: {data.get('recipient', 'N/A')[:40]}")
print(f" Summary: {data.get('summary', 'N/A')[:70]}")
print(f" Tags: [{tags_str}]")
print(f" Suggested: {suggested} ({conf}% confidence)")
print(f"\n{'=' * 60}")
@@ -496,7 +496,7 @@ def cmd_review_act(selector, action):
success = execute_action(eid, action)
if success:
# Record decision for future learning
decision_store.record_decision(data, action, source="user")
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
# Mark as done in pending queue
pending = load_pending()
@@ -540,7 +540,7 @@ def cmd_review_accept():
success = execute_action(eid, action)
if success:
decision_store.record_decision(data, action, source="user")
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
pending = load_pending()
pending[msg_id]["status"] = "done"
@@ -616,14 +616,14 @@ def cmd_stats():
for action, count in sorted(stats["by_action"].items(), key=lambda x: -x[1]):
print(f" {action}: {count}")
# Top sender domains with per-domain action counts
print(f"\nTop sender domains:")
for domain, count in stats["top_domains"]:
domain_stats = decision_store.get_sender_stats(domain)
# Top sender addresses with per-sender action counts
print(f"\nTop senders:")
for sender, count in stats["top_senders"]:
sender_stats = decision_store.get_sender_stats(sender)
detail = ", ".join(
f"{a}:{c}" for a, c in sorted(domain_stats.items(), key=lambda x: -x[1])
f"{a}:{c}" for a, c in sorted(sender_stats.items(), key=lambda x: -x[1])
)
print(f" {domain}: {count} ({detail})")
print(f" {sender}: {count} ({detail})")
# Custom labels
labels = decision_store.get_known_labels()