Migadu exposes "family" and "business" address books via CardDAV. The contacts script now searches all subdirs under contacts/ and adds new contacts to the "family" collection by default.
350 lines
12 KiB
Python
Executable File
350 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Contacts — Manage a local vCard contact list synced via CardDAV.
|
|
|
|
Used by the himalaya wrapper and calendar tool to validate recipient
|
|
addresses before sending. Prevents hallucinated email addresses.
|
|
|
|
Subcommands:
|
|
contacts.py list # list all contacts
|
|
contacts.py add [options] # add a contact
|
|
contacts.py delete [options] # delete a contact
|
|
contacts.py resolve <name|name:type|email> # resolve to email address
|
|
"""
|
|
|
|
import argparse
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CONTACTS_ROOT = Path.home() / ".openclaw" / "workspace" / "contacts"
|
|
DEFAULT_COLLECTION = "family" # default address book for new contacts
|
|
PRODID = "-//OpenClaw//Contacts//EN"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# vCard parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_vcf(path):
|
|
"""Parse a .vcf file into a dict with fn, nickname, emails, uid."""
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
result = {"fn": "", "nickname": "", "emails": [], "uid": "", "path": path}
|
|
for line in text.splitlines():
|
|
line_upper = line.upper()
|
|
if line_upper.startswith("FN:"):
|
|
result["fn"] = line.split(":", 1)[1].strip()
|
|
elif line_upper.startswith("NICKNAME:"):
|
|
result["nickname"] = line.split(":", 1)[1].strip()
|
|
elif line_upper.startswith("UID:"):
|
|
result["uid"] = line.split(":", 1)[1].strip()
|
|
elif "EMAIL" in line_upper and ":" in line:
|
|
# Handle EMAIL;TYPE=WORK:addr and EMAIL:addr
|
|
parts = line.split(":", 1)
|
|
email_addr = parts[1].strip()
|
|
email_type = ""
|
|
param_part = parts[0].upper()
|
|
if "TYPE=" in param_part:
|
|
for param in param_part.split(";"):
|
|
if param.startswith("TYPE="):
|
|
email_type = param.split("=", 1)[1].lower()
|
|
result["emails"].append({"address": email_addr, "type": email_type})
|
|
return result
|
|
|
|
|
|
def _load_contacts():
|
|
"""Load all contacts from all collections under CONTACTS_ROOT."""
|
|
if not CONTACTS_ROOT.is_dir():
|
|
return []
|
|
contacts = []
|
|
for subdir in sorted(CONTACTS_ROOT.iterdir()):
|
|
if not subdir.is_dir():
|
|
continue
|
|
for vcf_path in sorted(subdir.glob("*.vcf")):
|
|
try:
|
|
contact = _parse_vcf(vcf_path)
|
|
if contact["fn"] or contact["emails"]:
|
|
contacts.append(contact)
|
|
except Exception:
|
|
continue
|
|
return contacts
|
|
|
|
|
|
def _build_vcf(fn, emails, nickname="", uid=""):
|
|
"""Build a vCard 3.0 string."""
|
|
uid = uid or f"{uuid.uuid4()}@openclaw"
|
|
lines = [
|
|
"BEGIN:VCARD",
|
|
"VERSION:3.0",
|
|
f"PRODID:{PRODID}",
|
|
f"UID:{uid}",
|
|
f"FN:{fn}",
|
|
]
|
|
if nickname:
|
|
lines.append(f"NICKNAME:{nickname}")
|
|
for e in emails:
|
|
if e.get("type"):
|
|
lines.append(f"EMAIL;TYPE={e['type'].upper()}:{e['address']}")
|
|
else:
|
|
lines.append(f"EMAIL:{e['address']}")
|
|
lines.append("END:VCARD")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def _print_contact_error(message, contacts):
|
|
"""Print error with available contacts list."""
|
|
print(f"Error: {message}", file=sys.stderr)
|
|
if contacts:
|
|
print("Available contacts:", file=sys.stderr)
|
|
for c in contacts:
|
|
name = c["fn"]
|
|
nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else ""
|
|
for e in c["emails"]:
|
|
label = f" [{e['type']}]" if e["type"] else ""
|
|
print(f" {name}{nick}{label} <{e['address']}>", file=sys.stderr)
|
|
else:
|
|
print("No contacts found. Add contacts with: contacts.sh add", file=sys.stderr)
|
|
|
|
|
|
def _format_contact(c):
|
|
"""Format a contact for display."""
|
|
name = c["fn"]
|
|
nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else ""
|
|
lines = []
|
|
for e in c["emails"]:
|
|
label = f" [{e['type']}]" if e["type"] else ""
|
|
lines.append(f" {name}{nick}{label} <{e['address']}>")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _sync_contacts():
|
|
"""Sync contacts to CardDAV server via vdirsyncer."""
|
|
try:
|
|
subprocess.run(
|
|
["vdirsyncer", "sync"],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
print("Synced to CardDAV server")
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
print("Warning: CardDAV sync failed (will retry on next heartbeat)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resolve (used by himalaya wrapper and calendar tool)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def resolve(to_str):
|
|
"""Resolve a --to value to an email address via contacts lookup.
|
|
|
|
Supported formats:
|
|
"小橘子" -> match by FN or NICKNAME, use sole email or error if multiple
|
|
"小橘子:work" -> match by name, select email by TYPE
|
|
"user@example.com" -> match by email address in contacts
|
|
|
|
Returns the resolved email address string, or exits with error.
|
|
"""
|
|
contacts = _load_contacts()
|
|
|
|
# Format: "name:type"
|
|
if ":" in to_str and "@" not in to_str:
|
|
name_part, type_part = to_str.rsplit(":", 1)
|
|
type_part = type_part.lower()
|
|
else:
|
|
name_part = to_str
|
|
type_part = ""
|
|
|
|
# If it looks like a raw email, search contacts for it
|
|
if "@" in name_part:
|
|
for c in contacts:
|
|
for e in c["emails"]:
|
|
if e["address"].lower() == name_part.lower():
|
|
return e["address"]
|
|
_print_contact_error(f"Email '{name_part}' not found in contacts.", contacts)
|
|
sys.exit(1)
|
|
|
|
# Search by FN or NICKNAME (case-insensitive)
|
|
matches = []
|
|
for c in contacts:
|
|
if (c["fn"].lower() == name_part.lower() or
|
|
c["nickname"].lower() == name_part.lower()):
|
|
matches.append(c)
|
|
|
|
if not matches:
|
|
_print_contact_error(f"Contact '{name_part}' not found.", contacts)
|
|
sys.exit(1)
|
|
if len(matches) > 1:
|
|
_print_contact_error(f"Multiple contacts match '{name_part}'.", contacts)
|
|
sys.exit(1)
|
|
|
|
contact = matches[0]
|
|
emails = contact["emails"]
|
|
|
|
if not emails:
|
|
print(f"Error: Contact '{contact['fn']}' has no email addresses.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# If type specified, filter by type
|
|
if type_part:
|
|
typed = [e for e in emails if e["type"] == type_part]
|
|
if not typed:
|
|
avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails)
|
|
print(f"Error: No '{type_part}' email for '{contact['fn']}'. Available: {avail}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return typed[0]["address"]
|
|
|
|
# No type specified
|
|
if len(emails) == 1:
|
|
return emails[0]["address"]
|
|
|
|
# Multiple emails, require type qualifier
|
|
avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails)
|
|
print(
|
|
f"Error: '{contact['fn']}' has multiple emails. Specify type with '{name_part}:<type>'.\n"
|
|
f" Available: {avail}",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Subcommands
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def cmd_list(args):
|
|
"""List all contacts."""
|
|
contacts = _load_contacts()
|
|
if not contacts:
|
|
print("No contacts found. Add contacts with: contacts.sh add")
|
|
return
|
|
for c in contacts:
|
|
print(_format_contact(c))
|
|
|
|
|
|
def cmd_add(args):
|
|
"""Add a new contact (creates or updates a .vcf file)."""
|
|
target_dir = CONTACTS_ROOT / DEFAULT_COLLECTION
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
emails = [{"address": args.email, "type": args.type or ""}]
|
|
nickname = args.nickname or ""
|
|
|
|
# Check if contact with same name already exists (update it)
|
|
existing = _load_contacts()
|
|
for c in existing:
|
|
if c["fn"].lower() == args.name.lower():
|
|
# Add email to existing contact if not duplicate
|
|
existing_addrs = {e["address"].lower() for e in c["emails"]}
|
|
if args.email.lower() in existing_addrs:
|
|
print(f"Contact '{args.name}' already has email {args.email}")
|
|
return
|
|
# Re-read and update
|
|
emails = c["emails"] + emails
|
|
nickname = nickname or c["nickname"]
|
|
vcf_str = _build_vcf(args.name, emails, nickname, c["uid"])
|
|
c["path"].write_text(vcf_str, encoding="utf-8")
|
|
print(f"Updated contact: {args.name} — added {args.email}")
|
|
_sync_contacts()
|
|
return
|
|
|
|
# New contact
|
|
uid = f"{uuid.uuid4()}@openclaw"
|
|
vcf_str = _build_vcf(args.name, emails, nickname, uid)
|
|
dest = target_dir / f"{uid}.vcf"
|
|
dest.write_text(vcf_str, encoding="utf-8")
|
|
print(f"Added contact: {args.name} <{args.email}>")
|
|
_sync_contacts()
|
|
|
|
|
|
def cmd_delete(args):
|
|
"""Delete a contact."""
|
|
if not args.name and not args.uid:
|
|
print("Error: --name or --uid is required", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
contacts = _load_contacts()
|
|
if not contacts:
|
|
print("No contacts found.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
matches = []
|
|
for c in contacts:
|
|
if args.name:
|
|
if (c["fn"].lower() == args.name.lower() or
|
|
c["nickname"].lower() == args.name.lower()):
|
|
matches.append(c)
|
|
elif args.uid:
|
|
if args.uid in c["uid"]:
|
|
matches.append(c)
|
|
|
|
if not matches:
|
|
target = args.name or args.uid
|
|
_print_contact_error(f"No contact matching '{target}'.", contacts)
|
|
sys.exit(1)
|
|
if len(matches) > 1:
|
|
print(f"Error: Multiple contacts match:", file=sys.stderr)
|
|
for c in matches:
|
|
print(f" {c['fn']} (uid: {c['uid']})", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
contact = matches[0]
|
|
contact["path"].unlink()
|
|
print(f"Deleted contact: {contact['fn']}")
|
|
_sync_contacts()
|
|
|
|
|
|
def cmd_resolve(args):
|
|
"""Resolve a name/alias to an email address. Prints the email to stdout."""
|
|
email = resolve(args.query)
|
|
print(email)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Contact management with CardDAV sync")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# list
|
|
subparsers.add_parser("list", help="List all contacts")
|
|
|
|
# add
|
|
add_p = subparsers.add_parser("add", help="Add a contact")
|
|
add_p.add_argument("--name", required=True, help="Display name (e.g. 小橘子)")
|
|
add_p.add_argument("--email", required=True, help="Email address")
|
|
add_p.add_argument("--nickname", default="", help="Nickname for lookup")
|
|
add_p.add_argument("--type", default="", help="Email type (work, home)")
|
|
|
|
# delete
|
|
del_p = subparsers.add_parser("delete", help="Delete a contact")
|
|
del_p.add_argument("--name", default="", help="Contact name or nickname")
|
|
del_p.add_argument("--uid", default="", help="Contact UID")
|
|
|
|
# resolve
|
|
res_p = subparsers.add_parser("resolve", help="Resolve name to email address")
|
|
res_p.add_argument("query", help="Contact name, name:type, or email to resolve")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "list":
|
|
cmd_list(args)
|
|
elif args.command == "add":
|
|
cmd_add(args)
|
|
elif args.command == "delete":
|
|
cmd_delete(args)
|
|
elif args.command == "resolve":
|
|
cmd_resolve(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|