#!/usr/bin/env python3 """ Contacts — Manage a local vCard contact list synced via CardDAV. Used by the himalaya wrapper and calendar tool to validate recipient addresses before sending. Prevents hallucinated email addresses. Subcommands: contacts.py list # list all contacts contacts.py add [options] # add a contact contacts.py delete [options] # delete a contact contacts.py resolve # resolve to email address """ import argparse import subprocess import sys import uuid from pathlib import Path # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- CONTACTS_ROOT = Path.home() / ".openclaw" / "workspace" / "contacts" DEFAULT_COLLECTION = "family" # default address book for new contacts PRODID = "-//OpenClaw//Contacts//EN" # --------------------------------------------------------------------------- # vCard parsing # --------------------------------------------------------------------------- def _parse_vcf(path): """Parse a .vcf file into a dict with fn, nickname, emails, uid.""" text = path.read_text(encoding="utf-8", errors="replace") result = {"fn": "", "nickname": "", "emails": [], "uid": "", "path": path} for line in text.splitlines(): line_upper = line.upper() if line_upper.startswith("FN:"): result["fn"] = line.split(":", 1)[1].strip() elif line_upper.startswith("NICKNAME:"): result["nickname"] = line.split(":", 1)[1].strip() elif line_upper.startswith("UID:"): result["uid"] = line.split(":", 1)[1].strip() elif "EMAIL" in line_upper and ":" in line: # Handle EMAIL;TYPE=WORK:addr and EMAIL:addr parts = line.split(":", 1) email_addr = parts[1].strip() email_type = "" param_part = parts[0].upper() if "TYPE=" in param_part: for param in param_part.split(";"): if param.startswith("TYPE="): email_type = param.split("=", 1)[1].lower() result["emails"].append({"address": email_addr, "type": email_type}) return result def _load_contacts(): """Load all contacts from all collections under CONTACTS_ROOT.""" if not CONTACTS_ROOT.is_dir(): return [] contacts = [] for subdir in sorted(CONTACTS_ROOT.iterdir()): if not subdir.is_dir(): continue for vcf_path in sorted(subdir.glob("*.vcf")): try: contact = _parse_vcf(vcf_path) if contact["fn"] or contact["emails"]: contacts.append(contact) except Exception: continue return contacts def _build_vcf(fn, emails, nickname="", uid=""): """Build a vCard 3.0 string.""" uid = uid or f"{uuid.uuid4()}@openclaw" lines = [ "BEGIN:VCARD", "VERSION:3.0", f"PRODID:{PRODID}", f"UID:{uid}", f"FN:{fn}", ] if nickname: lines.append(f"NICKNAME:{nickname}") for e in emails: if e.get("type"): lines.append(f"EMAIL;TYPE={e['type'].upper()}:{e['address']}") else: lines.append(f"EMAIL:{e['address']}") lines.append("END:VCARD") return "\n".join(lines) + "\n" def _print_contact_error(message, contacts): """Print error with available contacts list.""" print(f"Error: {message}", file=sys.stderr) if contacts: print("Available contacts:", file=sys.stderr) for c in contacts: name = c["fn"] nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else "" for e in c["emails"]: label = f" [{e['type']}]" if e["type"] else "" print(f" {name}{nick}{label} <{e['address']}>", file=sys.stderr) else: print("No contacts found. Add contacts with: contacts.sh add", file=sys.stderr) def _format_contact(c): """Format a contact for display.""" name = c["fn"] nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else "" lines = [] for e in c["emails"]: label = f" [{e['type']}]" if e["type"] else "" lines.append(f" {name}{nick}{label} <{e['address']}>") return "\n".join(lines) def _sync_contacts(): """Sync contacts to CardDAV server via vdirsyncer.""" try: subprocess.run( ["vdirsyncer", "sync"], capture_output=True, text=True, check=True, ) print("Synced to CardDAV server") except (subprocess.CalledProcessError, FileNotFoundError): print("Warning: CardDAV sync failed (will retry on next heartbeat)") # --------------------------------------------------------------------------- # Resolve (used by himalaya wrapper and calendar tool) # --------------------------------------------------------------------------- def resolve(to_str): """Resolve a --to value to an email address via contacts lookup. Supported formats: "小橘子" -> match by FN or NICKNAME, use sole email or error if multiple "小橘子:work" -> match by name, select email by TYPE "user@example.com" -> match by email address in contacts Returns the resolved email address string, or exits with error. """ contacts = _load_contacts() # Format: "name:type" if ":" in to_str and "@" not in to_str: name_part, type_part = to_str.rsplit(":", 1) type_part = type_part.lower() else: name_part = to_str type_part = "" # If it looks like a raw email, search contacts for it if "@" in name_part: for c in contacts: for e in c["emails"]: if e["address"].lower() == name_part.lower(): return e["address"] _print_contact_error(f"Email '{name_part}' not found in contacts.", contacts) sys.exit(1) # Search by FN or NICKNAME (case-insensitive) matches = [] for c in contacts: if (c["fn"].lower() == name_part.lower() or c["nickname"].lower() == name_part.lower()): matches.append(c) if not matches: _print_contact_error(f"Contact '{name_part}' not found.", contacts) sys.exit(1) if len(matches) > 1: _print_contact_error(f"Multiple contacts match '{name_part}'.", contacts) sys.exit(1) contact = matches[0] emails = contact["emails"] if not emails: print(f"Error: Contact '{contact['fn']}' has no email addresses.", file=sys.stderr) sys.exit(1) # If type specified, filter by type if type_part: typed = [e for e in emails if e["type"] == type_part] if not typed: avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails) print(f"Error: No '{type_part}' email for '{contact['fn']}'. Available: {avail}", file=sys.stderr) sys.exit(1) return typed[0]["address"] # No type specified if len(emails) == 1: return emails[0]["address"] # Multiple emails, require type qualifier avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails) print( f"Error: '{contact['fn']}' has multiple emails. Specify type with '{name_part}:'.\n" f" Available: {avail}", file=sys.stderr, ) sys.exit(1) # --------------------------------------------------------------------------- # Subcommands # --------------------------------------------------------------------------- def cmd_list(args): """List all contacts.""" contacts = _load_contacts() if not contacts: print("No contacts found. Add contacts with: contacts.sh add") return for c in contacts: print(_format_contact(c)) def cmd_add(args): """Add a new contact (creates or updates a .vcf file).""" target_dir = CONTACTS_ROOT / DEFAULT_COLLECTION target_dir.mkdir(parents=True, exist_ok=True) emails = [{"address": args.email, "type": args.type or ""}] nickname = args.nickname or "" # Check if contact with same name already exists (update it) existing = _load_contacts() for c in existing: if c["fn"].lower() == args.name.lower(): # Add email to existing contact if not duplicate existing_addrs = {e["address"].lower() for e in c["emails"]} if args.email.lower() in existing_addrs: print(f"Contact '{args.name}' already has email {args.email}") return # Re-read and update emails = c["emails"] + emails nickname = nickname or c["nickname"] vcf_str = _build_vcf(args.name, emails, nickname, c["uid"]) c["path"].write_text(vcf_str, encoding="utf-8") print(f"Updated contact: {args.name} — added {args.email}") _sync_contacts() return # New contact uid = f"{uuid.uuid4()}@openclaw" vcf_str = _build_vcf(args.name, emails, nickname, uid) dest = target_dir / f"{uid}.vcf" dest.write_text(vcf_str, encoding="utf-8") print(f"Added contact: {args.name} <{args.email}>") _sync_contacts() def cmd_delete(args): """Delete a contact.""" if not args.name and not args.uid: print("Error: --name or --uid is required", file=sys.stderr) sys.exit(1) contacts = _load_contacts() if not contacts: print("No contacts found.", file=sys.stderr) sys.exit(1) matches = [] for c in contacts: if args.name: if (c["fn"].lower() == args.name.lower() or c["nickname"].lower() == args.name.lower()): matches.append(c) elif args.uid: if args.uid in c["uid"]: matches.append(c) if not matches: target = args.name or args.uid _print_contact_error(f"No contact matching '{target}'.", contacts) sys.exit(1) if len(matches) > 1: print(f"Error: Multiple contacts match:", file=sys.stderr) for c in matches: print(f" {c['fn']} (uid: {c['uid']})", file=sys.stderr) sys.exit(1) contact = matches[0] contact["path"].unlink() print(f"Deleted contact: {contact['fn']}") _sync_contacts() def cmd_resolve(args): """Resolve a name/alias to an email address. Prints the email to stdout.""" email = resolve(args.query) print(email) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Contact management with CardDAV sync") subparsers = parser.add_subparsers(dest="command", required=True) # list subparsers.add_parser("list", help="List all contacts") # add add_p = subparsers.add_parser("add", help="Add a contact") add_p.add_argument("--name", required=True, help="Display name (e.g. 小橘子)") add_p.add_argument("--email", required=True, help="Email address") add_p.add_argument("--nickname", default="", help="Nickname for lookup") add_p.add_argument("--type", default="", help="Email type (work, home)") # delete del_p = subparsers.add_parser("delete", help="Delete a contact") del_p.add_argument("--name", default="", help="Contact name or nickname") del_p.add_argument("--uid", default="", help="Contact UID") # resolve res_p = subparsers.add_parser("resolve", help="Resolve name to email address") res_p.add_argument("query", help="Contact name, name:type, or email to resolve") args = parser.parse_args() if args.command == "list": cmd_list(args) elif args.command == "add": cmd_add(args) elif args.command == "delete": cmd_delete(args) elif args.command == "resolve": cmd_resolve(args) if __name__ == "__main__": main()