#!/usr/bin/env python3 """ Email Processor - Auto filter ads using local Qwen3 Moves ad emails to Trash folder (not permanently deleted) """ import json import imaplib import email import os import sys from datetime import datetime from pathlib import Path # Config SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "config.json" LOGS_DIR = SCRIPT_DIR / "logs" DATA_DIR = SCRIPT_DIR / "data" PENDING_FILE = DATA_DIR / "pending_emails.json" def load_config(): """Load configuration""" with open(CONFIG_FILE) as f: return json.load(f) def connect_imap(config): """Connect to IMAP server""" imap_config = config['imap'] mail = imaplib.IMAP4_SSL(imap_config['host'], imap_config['port']) mail.login(imap_config['email'], imap_config['password']) return mail def get_unseen_emails(mail): """Get list of unseen email IDs""" mail.select('INBOX') _, search_data = mail.search(None, 'UNSEEN') email_ids = search_data[0].split() return email_ids def fetch_email(mail, email_id): """Fetch email content""" _, msg_data = mail.fetch(email_id, '(RFC822)') raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Extract subject subject = msg['Subject'] or '(No Subject)' # Extract sender sender = msg['From'] or '(Unknown)' # Extract recipient recipient = msg['To'] or '(Unknown)' # Extract date date = msg['Date'] or datetime.now().isoformat() # Extract body body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": try: body = part.get_payload(decode=True).decode('utf-8', errors='ignore') break except: pass else: try: body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') except: pass return { 'id': email_id, 'subject': subject, 'sender': sender, 'recipient': recipient, 'date': date, 'body': body[:300] # Limit body length } def analyze_with_qwen3(email_data, config): """Analyze email with local Qwen3 using official library""" import ollama import time prompt = f"""Analyze this email and provide two pieces of information: 1. Is this an advertisement/promotional email? 2. Summarize the email in one sentence Email details: Subject: {email_data['subject']} Sender: {email_data['sender']} Body: {email_data['body'][:300]} Respond in this exact format: IsAD: [YES or NO] Summary: [one sentence summary] Reason: [brief explanation] """ start_time = time.time() model = config['ollama'].get('model', 'qwen3:4b') try: response = ollama.generate(model=model, prompt=prompt, options={'temperature': 0.1}) output = response['response'] # Parse output is_ad = False summary = "No summary" reason = "Unknown" for line in output.strip().split('\n'): if line.startswith('IsAD:'): is_ad = 'YES' in line.upper() elif line.startswith('Summary:'): summary = line.replace('Summary:', '').strip()[:200] elif line.startswith('Reason:'): reason = line.replace('Reason:', '').strip() if is_ad: result = f"AD: {reason}" else: result = f"KEEP: {reason}" except Exception as e: result = f"KEEP: error - {str(e)[:100]}" summary = "Analysis failed" is_ad = False duration = time.time() - start_time return result, summary, is_ad, duration def move_to_trash(mail, email_id): """Move email to Trash folder""" # Copy to Trash result = mail.copy(email_id, 'Trash') if result[0] == 'OK': # Mark original as deleted mail.store(email_id, '+FLAGS', '\\Deleted') return True return False def log_result(log_file, email_data, analysis, action, duration=None): """Log processing result with Qwen3 duration""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') duration_str = f" ({duration:.1f}s)" if duration else "" with open(log_file, 'a') as f: f.write(f"[{timestamp}] {action}{duration_str}: {email_data['subject'][:60]}\n") f.write(f" From: {email_data['sender']}\n") f.write(f" Analysis: {analysis}\n\n") def load_pending(): """Load pending emails from JSON file""" if not PENDING_FILE.exists(): return {} with open(PENDING_FILE, 'r', encoding='utf-8') as f: return json.load(f) def save_pending(pending): """Save pending emails to JSON file""" DATA_DIR.mkdir(exist_ok=True) with open(PENDING_FILE, 'w', encoding='utf-8') as f: json.dump(pending, f, indent=2, ensure_ascii=False) def add_to_pending(email_data, summary, imap_uid, recipient): """Add email to pending queue""" pending = load_pending() # Generate unique ID import hashlib msg_id = f"msg_{hashlib.md5(f'{imap_uid}_{email_data['subject']}'.encode()).hexdigest()[:8]}" # Extract date from email email_date = email_data.get('date', datetime.now().isoformat()) pending[msg_id] = { "imap_uid": str(imap_uid), "subject": email_data['subject'], "sender": email_data['sender'], "recipient": recipient, "summary": summary, "email_date": email_date, "status": "pending", "found_at": datetime.now().isoformat() } save_pending(pending) return msg_id def main(): """Main processing function""" print("šŸ“§ Email Processor Starting...") # Load config config = load_config() # Setup logging LOGS_DIR.mkdir(exist_ok=True) log_file = LOGS_DIR / f"{datetime.now().strftime('%Y-%m-%d')}.log" try: # Connect to IMAP print("Connecting to IMAP...") mail = connect_imap(config) print("āœ… Connected") # Get unseen emails email_ids = get_unseen_emails(mail) print(f"Found {len(email_ids)} unread emails") if not email_ids: print("No new emails to process") mail.logout() return # Process each email processed = 0 moved_to_trash = 0 added_to_pending = 0 for email_id in email_ids: print(f"\nProcessing email {email_id.decode()}...") # Fetch email email_data = fetch_email(mail, email_id) print(f" Subject: {email_data['subject'][:50]}") # Analyze with Qwen3 (one call for both ad detection and summary) analysis, summary, is_ad, duration = analyze_with_qwen3(email_data, config) print(f" Analysis: {analysis[:100]}") print(f" Summary: {summary[:60]}...") print(f" Qwen3 time: {duration:.1f}s") # Check if analysis was successful (not an error) if 'error -' in analysis.lower(): # Analysis failed - keep email unread for retry print(f" -> Analysis failed, keeping unread for retry") log_result(log_file, email_data, analysis, "FAILED_RETRY", duration) # Don't increment processed count - will retry next time continue # Analysis successful - determine action if is_ad: print(" -> Moving to Trash") if move_to_trash(mail, email_id): log_result(log_file, email_data, analysis, "MOVED_TO_TRASH", duration) moved_to_trash += 1 else: log_result(log_file, email_data, analysis, "MOVE_FAILED", duration) else: # Non-ad email - add to pending queue print(" -> Adding to pending queue") # Add to pending msg_internal_id = add_to_pending( email_data, summary, email_id.decode(), email_data.get('recipient', 'youlu@luyanxin.com') ) # Mark as read (so it won't be processed again) mail.store(email_id, '+FLAGS', '\\Seen') log_result(log_file, email_data, analysis, f"ADDED_TO_PENDING ({msg_internal_id})", duration) added_to_pending += 1 processed += 1 # Expunge deleted emails mail.expunge() mail.logout() # Summary print(f"\n{'='*50}") print(f"Total emails checked: {len(email_ids)}") print(f"Successfully processed: {processed} emails") print(f" - Moved to trash (ads): {moved_to_trash}") print(f" - Added to pending queue: {added_to_pending}") print(f"Failed (will retry next time): {len(email_ids) - processed}") print(f"\nšŸ“ Pending queue: {PENDING_FILE}") print(f"šŸ“ Log: {log_file}") print(f"\nšŸ’” Run 'python process_queue.py' to view and process pending emails") except Exception as e: print(f"āŒ Error: {e}") sys.exit(1) if __name__ == "__main__": main()