contacts: extract into standalone skill, add himalaya wrapper for all email sends

Refactors the contacts system from being embedded in cal_tool.py into a
standalone contacts skill that serves as the single source of truth for
recipient validation across all outbound email paths.

- New skills/contacts/ skill: list, add, delete, resolve commands
- New skills/himalaya/scripts/himalaya.sh wrapper: validates To/Cc/Bcc
  recipients against contacts for message send, template send, and
  message write commands; passes everything else through unchanged
- cal_tool.py now delegates to contacts.py resolve instead of inline logic
- TOOLS.md updated: agent should use himalaya wrapper, not raw himalaya
This commit is contained in:
Yanxin Lu
2026-03-31 11:12:08 -07:00
parent cd1ee050ed
commit f05a84d8ca
10 changed files with 703 additions and 314 deletions

View File

@@ -10,9 +10,6 @@ Subcommands:
python calendar.py reply [options] # accept/decline/tentative
python calendar.py event list [options] # list/search calendar events
python calendar.py event delete [options] # delete an event by UID or summary
python calendar.py contact list # list all contacts
python calendar.py contact add [options] # add a contact
python calendar.py contact delete [options] # delete a contact
python calendar.py todo add [options] # create a VTODO task
python calendar.py todo list [options] # list pending tasks
python calendar.py todo edit [options] # edit a task's fields
@@ -42,8 +39,8 @@ from icalendar import Alarm, Calendar, Event, Todo, vCalAddress, vRecur, vText
DEFAULT_TIMEZONE = "America/Los_Angeles"
DEFAULT_FROM = "youlu@luyanxin.com"
CALENDAR_DIR = Path.home() / ".openclaw" / "workspace" / "calendars" / "home"
CONTACTS_DIR = Path.home() / ".openclaw" / "workspace" / "contacts" / "default"
TASKS_DIR = Path.home() / ".openclaw" / "workspace" / "calendars" / "tasks"
CONTACTS_SCRIPT = Path(__file__).resolve().parent.parent.parent / "contacts" / "scripts" / "contacts.py"
PRODID = "-//OpenClaw//Calendar//EN"
# RFC 5545 priority mapping
@@ -166,244 +163,25 @@ def _validate_rrule_dtstart(rrule_dict, dtstart):
# ---------------------------------------------------------------------------
# Contacts (vCard / CardDAV)
# Contacts (delegates to contacts skill)
# ---------------------------------------------------------------------------
def _parse_vcf(path):
"""Parse a .vcf file into a dict with fn, nickname, emails, uid."""
text = path.read_text(encoding="utf-8", errors="replace")
result = {"fn": "", "nickname": "", "emails": [], "uid": "", "path": path}
for line in text.splitlines():
line_upper = line.upper()
if line_upper.startswith("FN:"):
result["fn"] = line.split(":", 1)[1].strip()
elif line_upper.startswith("NICKNAME:"):
result["nickname"] = line.split(":", 1)[1].strip()
elif line_upper.startswith("UID:"):
result["uid"] = line.split(":", 1)[1].strip()
elif "EMAIL" in line_upper and ":" in line:
# Handle EMAIL;TYPE=WORK:addr and EMAIL:addr
parts = line.split(":", 1)
email_addr = parts[1].strip()
email_type = ""
param_part = parts[0].upper()
if "TYPE=" in param_part:
for param in param_part.split(";"):
if param.startswith("TYPE="):
email_type = param.split("=", 1)[1].lower()
result["emails"].append({"address": email_addr, "type": email_type})
return result
def _load_contacts():
"""Load all contacts from CONTACTS_DIR. Returns list of parsed contact dicts."""
if not CONTACTS_DIR.is_dir():
return []
contacts = []
for vcf_path in sorted(CONTACTS_DIR.glob("*.vcf")):
try:
contact = _parse_vcf(vcf_path)
if contact["fn"] or contact["emails"]:
contacts.append(contact)
except Exception:
continue
return contacts
def _resolve_recipient(to_str):
"""Resolve a --to value to an email address via contacts lookup.
"""Resolve a --to value to an email address via the contacts skill.
Supported formats:
--to "小橘子" → match by FN or NICKNAME, use sole email or error if multiple
--to "小橘子:work" → match by name, select email by TYPE
--to "user@example.com" → match by email address in contacts
Returns the resolved email address string, or exits with error + available contacts.
Delegates to contacts.py resolve, which validates against the contact list.
Exits with error if the address is not in contacts.
"""
contacts = _load_contacts()
# Format: "name:type"
if ":" in to_str and "@" not in to_str:
name_part, type_part = to_str.rsplit(":", 1)
type_part = type_part.lower()
else:
name_part = to_str
type_part = ""
# If it looks like a raw email, search contacts for it
if "@" in name_part:
for c in contacts:
for e in c["emails"]:
if e["address"].lower() == name_part.lower():
return e["address"]
_print_contact_error(f"Email '{name_part}' not found in contacts.", contacts)
sys.exit(1)
# Search by FN or NICKNAME (case-insensitive)
matches = []
for c in contacts:
if (c["fn"].lower() == name_part.lower() or
c["nickname"].lower() == name_part.lower()):
matches.append(c)
if not matches:
_print_contact_error(f"Contact '{name_part}' not found.", contacts)
sys.exit(1)
if len(matches) > 1:
_print_contact_error(f"Multiple contacts match '{name_part}'.", contacts)
sys.exit(1)
contact = matches[0]
emails = contact["emails"]
if not emails:
print(f"Error: Contact '{contact['fn']}' has no email addresses.", file=sys.stderr)
sys.exit(1)
# If type specified, filter by type
if type_part:
typed = [e for e in emails if e["type"] == type_part]
if not typed:
avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails)
print(f"Error: No '{type_part}' email for '{contact['fn']}'. Available: {avail}", file=sys.stderr)
sys.exit(1)
return typed[0]["address"]
# No type specified
if len(emails) == 1:
return emails[0]["address"]
# Multiple emails, require type qualifier
avail = ", ".join(f"{e['type'] or 'default'}={e['address']}" for e in emails)
print(
f"Error: '{contact['fn']}' has multiple emails. Specify type with '{name_part}:<type>'.\n"
f" Available: {avail}",
file=sys.stderr,
result = subprocess.run(
[sys.executable, str(CONTACTS_SCRIPT), "resolve", to_str],
capture_output=True, text=True,
)
sys.exit(1)
def _print_contact_error(message, contacts):
"""Print error with available contacts list."""
print(f"Error: {message}", file=sys.stderr)
if contacts:
print("Available contacts:", file=sys.stderr)
for c in contacts:
name = c["fn"]
nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else ""
for e in c["emails"]:
label = f" [{e['type']}]" if e["type"] else ""
print(f" {name}{nick}{label} <{e['address']}>", file=sys.stderr)
else:
print("No contacts found. Add contacts with: calendar.sh contact add", file=sys.stderr)
def _build_vcf(fn, emails, nickname="", uid=""):
"""Build a vCard 3.0 string."""
uid = uid or f"{uuid.uuid4()}@openclaw"
lines = [
"BEGIN:VCARD",
"VERSION:3.0",
f"PRODID:{PRODID}",
f"UID:{uid}",
f"FN:{fn}",
]
if nickname:
lines.append(f"NICKNAME:{nickname}")
for e in emails:
if e.get("type"):
lines.append(f"EMAIL;TYPE={e['type'].upper()}:{e['address']}")
else:
lines.append(f"EMAIL:{e['address']}")
lines.append("END:VCARD")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# Contact subcommands
# ---------------------------------------------------------------------------
def cmd_contact_list(args):
"""List all contacts."""
contacts = _load_contacts()
if not contacts:
print("No contacts found. Add contacts with: calendar.sh contact add")
return
for c in contacts:
name = c["fn"]
nick = f" ({c['nickname']})" if c["nickname"] and c["nickname"] != c["fn"] else ""
for e in c["emails"]:
label = f" [{e['type']}]" if e["type"] else ""
print(f" {name}{nick}{label} <{e['address']}>")
def cmd_contact_add(args):
"""Add a new contact (creates or updates a .vcf file)."""
CONTACTS_DIR.mkdir(parents=True, exist_ok=True)
emails = [{"address": args.email, "type": args.type or ""}]
nickname = args.nickname or ""
# Check if contact with same name already exists (update it)
existing = _load_contacts()
for c in existing:
if c["fn"].lower() == args.name.lower():
# Add email to existing contact if not duplicate
existing_addrs = {e["address"].lower() for e in c["emails"]}
if args.email.lower() in existing_addrs:
print(f"Contact '{args.name}' already has email {args.email}")
return
# Re-read and update
emails = c["emails"] + emails
nickname = nickname or c["nickname"]
vcf_str = _build_vcf(args.name, emails, nickname, c["uid"])
c["path"].write_text(vcf_str, encoding="utf-8")
print(f"Updated contact: {args.name} — added {args.email}")
_sync_calendar()
return
# New contact
uid = f"{uuid.uuid4()}@openclaw"
vcf_str = _build_vcf(args.name, emails, nickname, uid)
dest = CONTACTS_DIR / f"{uid}.vcf"
dest.write_text(vcf_str, encoding="utf-8")
print(f"Added contact: {args.name} <{args.email}>")
_sync_calendar()
def cmd_contact_delete(args):
"""Delete a contact."""
contacts = _load_contacts()
if not contacts:
print("No contacts found.", file=sys.stderr)
if result.returncode != 0:
# Print the contacts script's error output directly
print(result.stderr, end="", file=sys.stderr)
sys.exit(1)
matches = []
for c in contacts:
if args.name:
if (c["fn"].lower() == args.name.lower() or
c["nickname"].lower() == args.name.lower()):
matches.append(c)
elif args.uid:
if args.uid in c["uid"]:
matches.append(c)
if not matches:
target = args.name or args.uid
_print_contact_error(f"No contact matching '{target}'.", contacts)
sys.exit(1)
if len(matches) > 1:
print(f"Error: Multiple contacts match:", file=sys.stderr)
for c in matches:
print(f" {c['fn']} (uid: {c['uid']})", file=sys.stderr)
sys.exit(1)
contact = matches[0]
contact["path"].unlink()
print(f"Deleted contact: {contact['fn']}")
_sync_calendar()
return result.stdout.strip()
# ---------------------------------------------------------------------------
@@ -1147,25 +925,6 @@ def main():
edel_p.add_argument("--date", default="", help="Cancel single occurrence on this date (YYYY-MM-DD, for recurring events)")
edel_p.add_argument("--all", action="store_true", help="Delete entire recurring series (safety flag)")
# --- contact ---
contact_p = subparsers.add_parser("contact", help="Manage contacts")
contact_sub = contact_p.add_subparsers(dest="contact_command", required=True)
# contact list
contact_sub.add_parser("list", help="List all contacts")
# contact add
cadd_p = contact_sub.add_parser("add", help="Add a contact")
cadd_p.add_argument("--name", required=True, help="Display name (e.g. 小橘子)")
cadd_p.add_argument("--email", required=True, help="Email address")
cadd_p.add_argument("--nickname", default="", help="Nickname for lookup")
cadd_p.add_argument("--type", default="", help="Email type (work, home)")
# contact delete
cdel_p = contact_sub.add_parser("delete", help="Delete a contact")
cdel_p.add_argument("--name", default="", help="Contact name or nickname")
cdel_p.add_argument("--uid", default="", help="Contact UID")
# --- todo ---
todo_p = subparsers.add_parser("todo", help="Manage VTODO tasks")
todo_sub = todo_p.add_subparsers(dest="todo_command", required=True)
@@ -1214,13 +973,6 @@ def main():
cmd_event_list(args)
elif args.event_command == "delete":
cmd_event_delete(args)
elif args.command == "contact":
if args.contact_command == "list":
cmd_contact_list(args)
elif args.contact_command == "add":
cmd_contact_add(args)
elif args.contact_command == "delete":
cmd_contact_delete(args)
elif args.command == "todo":
if args.todo_command == "add":
cmd_todo_add(args)