Compare commits
2 Commits
6bea16d391
...
da26f84947
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
da26f84947 | ||
|
|
71672b31ca |
@@ -187,12 +187,15 @@ def classify_email(email_data, config):
|
|||||||
import ollama
|
import ollama
|
||||||
|
|
||||||
prompt = _build_prompt(email_data, config)
|
prompt = _build_prompt(email_data, config)
|
||||||
model = config.get("ollama", {}).get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
ollama_config = config.get("ollama", {})
|
||||||
|
model = ollama_config.get("model", "kamekichi128/qwen3-4b-instruct-2507:latest")
|
||||||
|
host = ollama_config.get("host")
|
||||||
|
client = ollama.Client(host=host) if host else ollama.Client()
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
try:
|
try:
|
||||||
# Low temperature for consistent classification
|
# Low temperature for consistent classification
|
||||||
response = ollama.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
response = client.generate(model=model, prompt=prompt, options={"temperature": 0.1})
|
||||||
output = response["response"]
|
output = response["response"]
|
||||||
action, tags, summary, reason = _parse_response(output)
|
action, tags, summary, reason = _parse_response(output)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ recipient, subject, summary, action taken, and whether it was a user or
|
|||||||
auto decision.
|
auto decision.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import fcntl
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
from contextlib import contextmanager
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
@@ -34,6 +36,19 @@ _STOP_WORDS = {"re", "fwd", "the", "a", "an", "is", "to", "for", "and", "or", "y
|
|||||||
# Internal helpers
|
# Internal helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def file_lock(path):
|
||||||
|
"""Acquire an exclusive file lock for safe concurrent access."""
|
||||||
|
DATA_DIR.mkdir(exist_ok=True)
|
||||||
|
lock_path = str(path) + ".lock"
|
||||||
|
with open(lock_path, "w") as lock_file:
|
||||||
|
fcntl.flock(lock_file, fcntl.LOCK_EX)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
fcntl.flock(lock_file, fcntl.LOCK_UN)
|
||||||
|
|
||||||
|
|
||||||
def _load_history():
|
def _load_history():
|
||||||
"""Load the full decision history list from disk."""
|
"""Load the full decision history list from disk."""
|
||||||
if not HISTORY_FILE.exists():
|
if not HISTORY_FILE.exists():
|
||||||
@@ -69,7 +84,6 @@ def record_decision(email_data, action, source="user", tags=None):
|
|||||||
source: "user" (manual review) or "auto" (high-confidence).
|
source: "user" (manual review) or "auto" (high-confidence).
|
||||||
tags: list of category tags from the classifier taxonomy.
|
tags: list of category tags from the classifier taxonomy.
|
||||||
"""
|
"""
|
||||||
history = _load_history()
|
|
||||||
entry = {
|
entry = {
|
||||||
"timestamp": datetime.now().isoformat(timespec="seconds"),
|
"timestamp": datetime.now().isoformat(timespec="seconds"),
|
||||||
"sender": email_data.get("sender", ""),
|
"sender": email_data.get("sender", ""),
|
||||||
@@ -80,8 +94,10 @@ def record_decision(email_data, action, source="user", tags=None):
|
|||||||
"source": source,
|
"source": source,
|
||||||
"tags": tags or [],
|
"tags": tags or [],
|
||||||
}
|
}
|
||||||
history.append(entry)
|
with file_lock(HISTORY_FILE):
|
||||||
_save_history(history)
|
history = _load_history()
|
||||||
|
history.append(entry)
|
||||||
|
_save_history(history)
|
||||||
return entry
|
return entry
|
||||||
|
|
||||||
|
|
||||||
@@ -152,23 +168,28 @@ def compute_confidence(sender_email, action, tags):
|
|||||||
Returns an integer 0-100.
|
Returns an integer 0-100.
|
||||||
"""
|
"""
|
||||||
history = _load_history()
|
history = _load_history()
|
||||||
if not history or not tags:
|
if not history:
|
||||||
return 50
|
return 50
|
||||||
|
|
||||||
# Find past decisions with same sender and sufficient tag overlap
|
# Find past decisions with same sender and sufficient tag overlap.
|
||||||
|
# If tags are empty (LLM returned no valid taxonomy tags), fall back
|
||||||
|
# to sender-only matching so history still contributes to confidence.
|
||||||
matches = []
|
matches = []
|
||||||
for entry in history:
|
for entry in history:
|
||||||
entry_email = extract_email_address(entry.get("sender", ""))
|
entry_email = extract_email_address(entry.get("sender", ""))
|
||||||
if entry_email != sender_email:
|
if entry_email != sender_email:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
entry_tags = entry.get("tags", [])
|
if tags:
|
||||||
if not entry_tags:
|
entry_tags = entry.get("tags", [])
|
||||||
continue
|
if not entry_tags:
|
||||||
|
continue
|
||||||
shared = len(set(tags) & set(entry_tags))
|
shared = len(set(tags) & set(entry_tags))
|
||||||
min_len = min(len(tags), len(entry_tags))
|
min_len = min(len(tags), len(entry_tags))
|
||||||
if min_len > 0 and shared / min_len >= 0.5:
|
if min_len > 0 and shared / min_len >= 0.5:
|
||||||
|
matches.append(entry)
|
||||||
|
else:
|
||||||
|
# No tags to compare — match on sender alone
|
||||||
matches.append(entry)
|
matches.append(entry)
|
||||||
|
|
||||||
if not matches:
|
if not matches:
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ def get_unseen_envelopes():
|
|||||||
Returns a list of envelope dicts from himalaya's JSON output.
|
Returns a list of envelope dicts from himalaya's JSON output.
|
||||||
Each has keys like: id, subject, from, to, date, flags.
|
Each has keys like: id, subject, from, to, date, flags.
|
||||||
"""
|
"""
|
||||||
return _himalaya_json("envelope", "list", "not", "flag", "seen")
|
return _himalaya_json("envelope", "list", "-s", "500", "not", "flag", "seen")
|
||||||
|
|
||||||
|
|
||||||
def get_recent_envelopes(days):
|
def get_recent_envelopes(days):
|
||||||
@@ -110,7 +110,7 @@ def get_recent_envelopes(days):
|
|||||||
bulk-classifying historical mail.
|
bulk-classifying historical mail.
|
||||||
"""
|
"""
|
||||||
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
|
||||||
return _himalaya_json("envelope", "list", "after", since)
|
return _himalaya_json("envelope", "list", "-s", "500", "after", since)
|
||||||
|
|
||||||
|
|
||||||
def read_message(envelope_id):
|
def read_message(envelope_id):
|
||||||
@@ -134,7 +134,9 @@ def _format_address(addr_field):
|
|||||||
name = first.get("name", "")
|
name = first.get("name", "")
|
||||||
addr = first.get("addr", "")
|
addr = first.get("addr", "")
|
||||||
return f"{name} <{addr}>" if name else addr
|
return f"{name} <{addr}>" if name else addr
|
||||||
return str(addr_field)
|
elif isinstance(addr_field, str) and addr_field:
|
||||||
|
return addr_field
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
def build_email_data(envelope, body, config):
|
def build_email_data(envelope, body, config):
|
||||||
@@ -226,28 +228,28 @@ def add_to_pending(email_data, summary, reason, action_suggestion, confidence, t
|
|||||||
alongside the email metadata so the user can see what the model thought.
|
alongside the email metadata so the user can see what the model thought.
|
||||||
Uses envelope_id as the primary identifier for review commands.
|
Uses envelope_id as the primary identifier for review commands.
|
||||||
"""
|
"""
|
||||||
pending = load_pending()
|
|
||||||
|
|
||||||
# Generate a stable ID from envelope ID + subject
|
# Generate a stable ID from envelope ID + subject
|
||||||
eid = str(email_data["id"])
|
eid = str(email_data["id"])
|
||||||
key = f"{eid}_{email_data['subject']}"
|
key = f"{eid}_{email_data['subject']}"
|
||||||
msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}"
|
msg_id = f"msg_{hashlib.md5(key.encode()).hexdigest()[:8]}"
|
||||||
|
|
||||||
pending[msg_id] = {
|
with decision_store.file_lock(PENDING_FILE):
|
||||||
"envelope_id": eid,
|
pending = load_pending()
|
||||||
"subject": email_data["subject"],
|
pending[msg_id] = {
|
||||||
"sender": email_data["sender"],
|
"envelope_id": eid,
|
||||||
"recipient": email_data.get("recipient", ""),
|
"subject": email_data["subject"],
|
||||||
"summary": summary,
|
"sender": email_data["sender"],
|
||||||
"reason": reason,
|
"recipient": email_data.get("recipient", ""),
|
||||||
"suggested_action": action_suggestion,
|
"summary": summary,
|
||||||
"confidence": confidence,
|
"reason": reason,
|
||||||
"tags": tags or [],
|
"suggested_action": action_suggestion,
|
||||||
"email_date": email_data.get("date", ""),
|
"confidence": confidence,
|
||||||
"status": "pending",
|
"tags": tags or [],
|
||||||
"found_at": datetime.now().isoformat(),
|
"email_date": email_data.get("date", ""),
|
||||||
}
|
"status": "pending",
|
||||||
save_pending(pending)
|
"found_at": datetime.now().isoformat(),
|
||||||
|
}
|
||||||
|
save_pending(pending)
|
||||||
return msg_id
|
return msg_id
|
||||||
|
|
||||||
|
|
||||||
@@ -287,10 +289,11 @@ def cmd_scan(config, recent=None, dry_run=False):
|
|||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
|
|
||||||
# Clear done items from previous scan cycles
|
# Clear done items from previous scan cycles
|
||||||
pending = load_pending()
|
with decision_store.file_lock(PENDING_FILE):
|
||||||
cleared = {k: v for k, v in pending.items() if v.get("status") != "done"}
|
pending = load_pending()
|
||||||
if len(cleared) < len(pending):
|
cleared = {k: v for k, v in pending.items() if v.get("status") != "done"}
|
||||||
save_pending(cleared)
|
if len(cleared) < len(pending):
|
||||||
|
save_pending(cleared)
|
||||||
|
|
||||||
LOGS_DIR.mkdir(exist_ok=True)
|
LOGS_DIR.mkdir(exist_ok=True)
|
||||||
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
||||||
@@ -484,9 +487,6 @@ def cmd_review_act(selector, action):
|
|||||||
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
||||||
|
|
||||||
# Execute action on each target
|
# Execute action on each target
|
||||||
pending = load_pending()
|
|
||||||
pending_dirty = False
|
|
||||||
|
|
||||||
for msg_id, data in targets:
|
for msg_id, data in targets:
|
||||||
eid = data.get("envelope_id")
|
eid = data.get("envelope_id")
|
||||||
if not eid:
|
if not eid:
|
||||||
@@ -498,11 +498,13 @@ def cmd_review_act(selector, action):
|
|||||||
# Record decision for future learning
|
# Record decision for future learning
|
||||||
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
||||||
|
|
||||||
# Mark as done in pending queue
|
# Mark as done in pending queue (locked to avoid concurrent corruption)
|
||||||
pending[msg_id]["status"] = "done"
|
with decision_store.file_lock(PENDING_FILE):
|
||||||
pending[msg_id]["action"] = action
|
pending = load_pending()
|
||||||
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
pending[msg_id]["status"] = "done"
|
||||||
pending_dirty = True
|
pending[msg_id]["action"] = action
|
||||||
|
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
||||||
|
save_pending(pending)
|
||||||
|
|
||||||
log_result(log_file, data, f"REVIEW:{action}", data.get("reason", ""))
|
log_result(log_file, data, f"REVIEW:{action}", data.get("reason", ""))
|
||||||
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
||||||
@@ -510,9 +512,6 @@ def cmd_review_act(selector, action):
|
|||||||
log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", ""))
|
log_result(log_file, data, f"REVIEW_FAILED:{action}", data.get("reason", ""))
|
||||||
print(f" {msg_id}: {action} -> FAILED")
|
print(f" {msg_id}: {action} -> FAILED")
|
||||||
|
|
||||||
if pending_dirty:
|
|
||||||
save_pending(pending)
|
|
||||||
|
|
||||||
|
|
||||||
def cmd_review_accept():
|
def cmd_review_accept():
|
||||||
"""Accept all classifier suggestions for pending emails.
|
"""Accept all classifier suggestions for pending emails.
|
||||||
@@ -529,9 +528,6 @@ def cmd_review_accept():
|
|||||||
LOGS_DIR.mkdir(exist_ok=True)
|
LOGS_DIR.mkdir(exist_ok=True)
|
||||||
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log"
|
||||||
|
|
||||||
pending = load_pending()
|
|
||||||
pending_dirty = False
|
|
||||||
|
|
||||||
for msg_id, data in sorted_items:
|
for msg_id, data in sorted_items:
|
||||||
action = data.get("suggested_action")
|
action = data.get("suggested_action")
|
||||||
if not action:
|
if not action:
|
||||||
@@ -547,10 +543,12 @@ def cmd_review_accept():
|
|||||||
if success:
|
if success:
|
||||||
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
decision_store.record_decision(data, action, source="user", tags=data.get("tags", []))
|
||||||
|
|
||||||
pending[msg_id]["status"] = "done"
|
with decision_store.file_lock(PENDING_FILE):
|
||||||
pending[msg_id]["action"] = action
|
pending = load_pending()
|
||||||
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
pending[msg_id]["status"] = "done"
|
||||||
pending_dirty = True
|
pending[msg_id]["action"] = action
|
||||||
|
pending[msg_id]["processed_at"] = datetime.now().isoformat()
|
||||||
|
save_pending(pending)
|
||||||
|
|
||||||
log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", ""))
|
log_result(log_file, data, f"ACCEPT:{action}", data.get("reason", ""))
|
||||||
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
print(f" {msg_id}: {action} -> OK ({data['subject'][:40]})")
|
||||||
@@ -558,9 +556,6 @@ def cmd_review_accept():
|
|||||||
log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", ""))
|
log_result(log_file, data, f"ACCEPT_FAILED:{action}", data.get("reason", ""))
|
||||||
print(f" {msg_id}: {action} -> FAILED")
|
print(f" {msg_id}: {action} -> FAILED")
|
||||||
|
|
||||||
if pending_dirty:
|
|
||||||
save_pending(pending)
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_target(selector, sorted_items):
|
def _resolve_target(selector, sorted_items):
|
||||||
"""Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple.
|
"""Resolve a selector (envelope_id or msg_id) to a (msg_id, data) tuple.
|
||||||
|
|||||||
Reference in New Issue
Block a user