#!/usr/bin/env python3 """ Calendar — Send/reply to calendar invites and manage VTODO tasks via CalDAV. Uses the icalendar library for proper RFC 5545 ICS generation and parsing. Uses himalaya CLI for email delivery. Syncs to local CalDAV via vdirsyncer. Subcommands: python calendar.py send [options] # create and send an invite python calendar.py reply [options] # accept/decline/tentative python calendar.py todo add [options] # create a VTODO task python calendar.py todo list [options] # list pending tasks python calendar.py todo complete [options] # mark task as done python calendar.py todo delete [options] # remove a task python calendar.py todo check # daily digest for cron """ import argparse import subprocess import sys import uuid from datetime import date, datetime, timedelta, timezone from pathlib import Path from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from icalendar import Alarm, Calendar, Event, Todo, vCalAddress, vText # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DEFAULT_TIMEZONE = "America/Los_Angeles" DEFAULT_FROM = "youlu@luyanxin.com" DEFAULT_OWNER_EMAIL = "mail@luyx.org" # Always added as attendee CALENDAR_DIR = Path.home() / ".openclaw" / "workspace" / "calendars" / "home" TASKS_DIR = Path.home() / ".openclaw" / "workspace" / "calendars" / "tasks" PRODID = "-//OpenClaw//Calendar//EN" # RFC 5545 priority mapping PRIORITY_MAP = {"high": 1, "medium": 5, "low": 9} PRIORITY_LABELS = {1: "高", 5: "中", 9: "低"} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _sync_calendar(): """Sync local calendar to CalDAV server via vdirsyncer.""" try: subprocess.run( ["vdirsyncer", "sync"], capture_output=True, text=True, check=True, ) print("Synced to CalDAV server") except (subprocess.CalledProcessError, FileNotFoundError): print("Warning: CalDAV sync failed (will retry on next heartbeat)") def _send_email(email_str, account=None): """Send a raw MIME email via himalaya message send (stdin).""" cmd = ["himalaya"] if account: cmd += ["--account", account] cmd += ["message", "send"] subprocess.run(cmd, input=email_str, text=True, check=True) def _build_calendar_email(from_addr, to_addr, subject, body, ics_bytes, method="REQUEST"): """Build a MIME email with a text/calendar attachment.""" msg = MIMEMultipart('mixed') msg['From'] = from_addr msg['To'] = to_addr msg['Subject'] = subject msg.attach(MIMEText(body, 'plain', 'utf-8')) ics_part = MIMEBase('text', 'calendar', method=method, charset='utf-8') ics_part.set_payload(ics_bytes.decode('utf-8')) ics_part.add_header('Content-Disposition', 'attachment; filename="invite.ics"') msg.attach(ics_part) return msg.as_string() def _strip_method(ics_bytes): """Remove METHOD property from ICS for CalDAV storage. CalDAV servers reject METHOD (it's an iTIP/email concept, not a storage one). """ cal = Calendar.from_ical(ics_bytes) if "method" in cal: del cal["method"] return cal.to_ical() def _parse_iso_datetime(dt_str): """Parse ISO 8601 datetime string to a datetime object.""" # Handle both 2026-03-20T14:00:00 and 2026-03-20T14:00 for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"): try: return datetime.strptime(dt_str, fmt) except ValueError: continue raise ValueError(f"Cannot parse datetime: {dt_str}") def _parse_date(date_str): """Parse YYYY-MM-DD date string.""" return datetime.strptime(date_str, "%Y-%m-%d").date() # --------------------------------------------------------------------------- # Send invite # --------------------------------------------------------------------------- def cmd_send(args): """Create and send a calendar invite.""" start = _parse_iso_datetime(args.start) end = _parse_iso_datetime(args.end) uid = args.uid or f"{uuid.uuid4()}@openclaw" organizer_name = args.organizer or args.sender # Build ICS cal = Calendar() cal.add("prodid", PRODID) cal.add("version", "2.0") cal.add("calscale", "GREGORIAN") cal.add("method", "REQUEST") event = Event() event.add("uid", uid) event.add("dtstamp", datetime.now(timezone.utc)) event.add("dtstart", start, parameters={"TZID": args.timezone}) event.add("dtend", end, parameters={"TZID": args.timezone}) event.add("summary", args.summary) event.add("status", "CONFIRMED") event.add("sequence", 0) organizer = vCalAddress(f"mailto:{args.sender}") organizer.params["CN"] = vText(organizer_name) event.add("organizer", organizer) if args.location: event.add("location", args.location) if args.description: event.add("description", args.description) recipients = [addr.strip() for addr in args.to.split(",")] # Always include owner as attendee all_attendees = list(recipients) if DEFAULT_OWNER_EMAIL not in all_attendees: all_attendees.append(DEFAULT_OWNER_EMAIL) for addr in all_attendees: event.add("attendee", f"mailto:{addr}", parameters={ "ROLE": "REQ-PARTICIPANT", "RSVP": "TRUE", }) # 1-day reminder alarm = Alarm() alarm.add("action", "DISPLAY") alarm.add("description", f"Reminder: {args.summary}") alarm.add("trigger", timedelta(days=-1)) event.add_component(alarm) cal.add_component(event) ics_bytes = cal.to_ical() # Build plain text body body = f"You're invited to: {args.summary}\n\nWhen: {args.start} - {args.end} ({args.timezone})" if args.location: body += f"\nWhere: {args.location}" if args.description: body += f"\n\n{args.description}" # Email goes to all attendees (including owner) all_to = ", ".join(all_attendees) # Build MIME email email_str = _build_calendar_email(args.sender, all_to, args.subject, body, ics_bytes, method="REQUEST") if args.dry_run: print("=== ICS Content ===") print(ics_bytes.decode()) print("=== Email Message ===") print(email_str) return # Send email via himalaya message send (stdin) _send_email(email_str, args.account) print(f"Calendar invite sent to: {args.to}") # Save to local calendar (without METHOD for CalDAV compatibility) if CALENDAR_DIR.is_dir(): dest = CALENDAR_DIR / f"{uid}.ics" dest.write_bytes(_strip_method(ics_bytes)) print(f"Saved to local calendar: {dest}") _sync_calendar() # --------------------------------------------------------------------------- # Reply to invite # --------------------------------------------------------------------------- PARTSTAT_MAP = { "accept": "ACCEPTED", "accepted": "ACCEPTED", "decline": "DECLINED", "declined": "DECLINED", "tentative": "TENTATIVE", } SUBJECT_PREFIX = { "ACCEPTED": "Accepted", "DECLINED": "Declined", "TENTATIVE": "Tentative", } def _extract_ics_from_email(envelope_id, folder, account): """Download attachments from an email and find the .ics file.""" download_dir = Path(f"/tmp/openclaw-ics-extract-{envelope_id}") download_dir.mkdir(exist_ok=True) cmd = ["himalaya"] if account: cmd += ["--account", account] cmd += ["attachment", "download", "--folder", folder, str(envelope_id), "--dir", str(download_dir)] try: subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError: pass # some emails have no attachments ics_files = list(download_dir.glob("*.ics")) if not ics_files: print(f"Error: No .ics attachment found in envelope {envelope_id}", file=sys.stderr) # Cleanup for f in download_dir.iterdir(): f.unlink() download_dir.rmdir() sys.exit(1) return ics_files[0], download_dir def cmd_reply(args): """Accept, decline, or tentatively accept a calendar invite.""" partstat = PARTSTAT_MAP.get(args.action.lower()) if not partstat: print(f"Error: --action must be accept, decline, or tentative", file=sys.stderr) sys.exit(1) # Get the ICS file cleanup_dir = None if args.envelope_id: ics_path, cleanup_dir = _extract_ics_from_email(args.envelope_id, args.folder, args.account) elif args.ics_file: ics_path = Path(args.ics_file) if not ics_path.is_file(): print(f"Error: ICS file not found: {ics_path}", file=sys.stderr) sys.exit(1) else: print("Error: --envelope-id or --ics-file is required", file=sys.stderr) sys.exit(1) # Parse original ICS original_cal = Calendar.from_ical(ics_path.read_bytes()) # Find the VEVENT original_event = None for component in original_cal.walk(): if component.name == "VEVENT": original_event = component break if not original_event: print("Error: No VEVENT found in ICS file", file=sys.stderr) sys.exit(1) # Extract fields from original uid = str(original_event.get("uid", "")) summary = str(original_event.get("summary", "")) organizer = original_event.get("organizer") if not organizer: print("Error: No ORGANIZER found in ICS", file=sys.stderr) sys.exit(1) organizer_email = str(organizer).replace("mailto:", "").replace("MAILTO:", "") # Build reply calendar reply_cal = Calendar() reply_cal.add("prodid", PRODID) reply_cal.add("version", "2.0") reply_cal.add("calscale", "GREGORIAN") reply_cal.add("method", "REPLY") reply_event = Event() reply_event.add("uid", uid) reply_event.add("dtstamp", datetime.now(timezone.utc)) # Copy timing from original if original_event.get("dtstart"): reply_event["dtstart"] = original_event["dtstart"] if original_event.get("dtend"): reply_event["dtend"] = original_event["dtend"] reply_event.add("summary", summary) reply_event["organizer"] = original_event["organizer"] reply_event.add("attendee", f"mailto:{args.sender}", parameters={ "PARTSTAT": partstat, "RSVP": "FALSE", }) if original_event.get("sequence"): reply_event.add("sequence", original_event.get("sequence")) reply_cal.add_component(reply_event) reply_ics_bytes = reply_cal.to_ical() # Build email prefix = SUBJECT_PREFIX[partstat] subject = f"{prefix}: {summary}" body = f"{prefix}: {summary}" if args.comment: body += f"\n\n{args.comment}" email_str = _build_calendar_email(args.sender, organizer_email, subject, body, reply_ics_bytes, method="REPLY") if args.dry_run: print("=== Original Event ===") print(f"Summary: {summary}") print(f"Organizer: {organizer_email}") print(f"Action: {partstat}") print() print("=== Reply ICS ===") print(reply_ics_bytes.decode()) print("=== Email Message ===") print(email_str) if cleanup_dir: for f in cleanup_dir.iterdir(): f.unlink() cleanup_dir.rmdir() return # Send reply _send_email(email_str, args.account) print(f"Calendar invite {partstat.lower()}: {summary} (replied to {organizer_email})") # Forward invite to owner on accept/tentative if partstat in ("ACCEPTED", "TENTATIVE"): fwd_body = f"{prefix}: {summary}" fwd_email = _build_calendar_email( args.sender, DEFAULT_OWNER_EMAIL, f"{prefix}: {summary}", fwd_body, ics_path.read_bytes(), method="REQUEST", ) try: _send_email(fwd_email, args.account) print(f"Forwarded invite to {DEFAULT_OWNER_EMAIL}") except subprocess.CalledProcessError: print(f"Warning: Failed to forward invite to {DEFAULT_OWNER_EMAIL}") # Save to / remove from local calendar if CALENDAR_DIR.is_dir(): dest = CALENDAR_DIR / f"{uid}.ics" if partstat in ("ACCEPTED", "TENTATIVE"): # Save the original event to local calendar (without METHOD for CalDAV) dest.write_bytes(_strip_method(ics_path.read_bytes())) print(f"Saved to local calendar: {dest}") elif partstat == "DECLINED" and dest.is_file(): dest.unlink() print("Removed from local calendar") _sync_calendar() # Cleanup if cleanup_dir: for f in cleanup_dir.iterdir(): f.unlink() cleanup_dir.rmdir() # --------------------------------------------------------------------------- # VTODO: todoman helpers # --------------------------------------------------------------------------- def _run_todoman(*todo_args): """Run a todoman command and return its stdout.""" cmd = ["todo"] + list(todo_args) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error: todoman failed: {result.stderr.strip()}", file=sys.stderr) sys.exit(1) return result.stdout def _todoman_list_json(*extra_args): """Get todos as JSON from todoman --porcelain.""" cmd = ["todo", "--porcelain", "list"] + list(extra_args) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error: todoman failed: {result.stderr.strip()}", file=sys.stderr) sys.exit(1) if not result.stdout.strip(): return [] import json return json.loads(result.stdout) def _days_until(due_val): """Days from today until due date (string or Unix timestamp). Negative means overdue.""" if not due_val: return None # Unix timestamp (int/float) from todoman --porcelain if isinstance(due_val, (int, float)): try: due_date = datetime.fromtimestamp(due_val, tz=timezone.utc).date() except (ValueError, TypeError, OSError): return None return (due_date - date.today()).days # String formats (ISO or YYYY-MM-DD) try: due_date = datetime.fromisoformat(due_val).date() except (ValueError, TypeError): try: due_date = _parse_date(due_val) except ValueError: return None return (due_date - date.today()).days def _urgency_label(days): """Urgency label with emoji, matching reminder_check.py style.""" if days is None: return "❓ 日期未知" elif days < 0: return f"🔴 逾期 {-days} 天" elif days == 0: return "🔴 今天" elif days == 1: return "🟡 明天" elif days <= 3: return f"🟡 {days} 天后" else: return f"🟢 {days} 天后" def _find_todo_id(match_str): """Find a todoman ID by matching summary text. Exits on 0 or >1 matches.""" todos = _todoman_list_json("--sort", "due") matches = [t for t in todos if match_str in (t.get("summary") or "")] if not matches: print(f"Error: No todo matching '{match_str}'", file=sys.stderr) sys.exit(1) if len(matches) > 1: print(f"Error: Multiple todos match '{match_str}':", file=sys.stderr) for t in matches: print(f" - {t.get('summary')} (id: {t.get('id')})", file=sys.stderr) sys.exit(1) return matches[0]["id"] # --------------------------------------------------------------------------- # VTODO: subcommands # --------------------------------------------------------------------------- def cmd_todo_add(args): """Create a VTODO and save to TASKS_DIR.""" TASKS_DIR.mkdir(parents=True, exist_ok=True) uid = f"{uuid.uuid4()}@openclaw" now = datetime.now(timezone.utc) # Parse due date (default: tomorrow) if args.due: due_date = _parse_date(args.due) else: due_date = date.today() + timedelta(days=1) # Parse priority priority = PRIORITY_MAP.get(args.priority, 5) # Parse alarm trigger alarm_trigger = timedelta(days=-1) # default: 1 day before if args.alarm: alarm_str = args.alarm if alarm_str.endswith("d"): alarm_trigger = timedelta(days=-int(alarm_str[:-1])) elif alarm_str.endswith("h"): alarm_trigger = timedelta(hours=-int(alarm_str[:-1])) elif alarm_str.endswith("m"): alarm_trigger = timedelta(minutes=-int(alarm_str[:-1])) # Build VTODO calendar cal = Calendar() cal.add("prodid", PRODID) cal.add("version", "2.0") cal.add("method", "REQUEST") todo = Todo() todo.add("uid", uid) todo.add("dtstamp", now) todo.add("created", now) todo.add("summary", args.summary) todo.add("due", due_date) todo.add("priority", priority) todo.add("status", "NEEDS-ACTION") if args.description: todo.add("description", args.description) # VALARM reminder alarm = Alarm() alarm.add("action", "DISPLAY") alarm.add("description", f"Todo: {args.summary}") alarm.add("trigger", alarm_trigger) todo.add_component(alarm) cal.add_component(todo) ics_bytes = cal.to_ical() # Build email body prio_label = PRIORITY_LABELS.get(priority, "中") body = f"待办事项: {args.summary}\n截止日期: {due_date}\n优先级: {prio_label}" if args.description: body += f"\n\n{args.description}" # Build MIME email email_str = _build_calendar_email( DEFAULT_FROM, DEFAULT_OWNER_EMAIL, f"📋 待办: {args.summary}", body, ics_bytes, method="REQUEST", ) if args.dry_run: print("=== ICS Content ===") print(ics_bytes.decode()) print("=== Email Message ===") print(email_str) return # Save to TASKS_DIR (without METHOD for CalDAV) dest = TASKS_DIR / f"{uid}.ics" dest.write_bytes(_strip_method(ics_bytes)) print(f"Todo created: {args.summary} (due: {due_date}, priority: {prio_label})") print(f"Saved to: {dest}") # Sync _sync_calendar() # Email the VTODO to owner try: _send_email(email_str) print(f"Emailed todo to {DEFAULT_OWNER_EMAIL}") except subprocess.CalledProcessError: print(f"Warning: Failed to email todo to {DEFAULT_OWNER_EMAIL}") def _format_todo_digest(todos): """Format todos into the Chinese priority-grouped digest. Returns string or None.""" if not todos: return None today_str = date.today().isoformat() lines = [f"📋 待办事项 ({today_str})", "=" * 50] # Group by priority groups = {1: [], 5: [], 9: []} for t in todos: prio = t.get("priority") or 0 if 1 <= prio <= 3: groups[1].append(t) elif 4 <= prio <= 7: groups[5].append(t) else: groups[9].append(t) for prio, emoji, label in [(1, "🔴", "高优先级"), (5, "🟡", "中优先级"), (9, "🟢", "低优先级")]: items = groups[prio] if not items: continue lines.append(f"\n{emoji} {label}:") for t in items: summary = t.get("summary") or "" due = t.get("due") days = _days_until(due) urgency = _urgency_label(days) desc = t.get("description") or "" is_completed = t.get("is_completed", False) if is_completed: line = f" • ✅ {summary} (已完成)" else: line = f" • {summary} ({urgency})" if desc: line += f" | {desc}" lines.append(line) lines.append("\n" + "=" * 50) return "\n".join(lines) def cmd_todo_list(args): """List todos via todoman, optionally including completed ones.""" extra = ["--sort", "priority"] if args.all: extra += ["--status", "ANY"] todos = _todoman_list_json(*extra) if not todos: print("No pending todos." if not args.all else "No todos found.") return output = _format_todo_digest(todos) if output: print(output) def cmd_todo_complete(args): """Mark a todo as done via todoman.""" if args.uid: # todoman uses numeric IDs, not UIDs — search by UID in summary fallback todos = _todoman_list_json("--sort", "due") match = [t for t in todos if args.uid in (t.get("uid") or "")] if not match: print(f"Error: No todo with UID '{args.uid}'", file=sys.stderr) sys.exit(1) todo_id = match[0]["id"] elif args.match: todo_id = _find_todo_id(args.match) else: print("Error: --uid or --match is required", file=sys.stderr) sys.exit(1) _run_todoman("done", str(todo_id)) print(f"Completed todo #{todo_id}") _sync_calendar() def cmd_todo_delete(args): """Delete a todo via todoman.""" if args.uid: todos = _todoman_list_json("--sort", "due") match = [t for t in todos if args.uid in (t.get("uid") or "")] if not match: print(f"Error: No todo with UID '{args.uid}'", file=sys.stderr) sys.exit(1) todo_id = match[0]["id"] elif args.match: todo_id = _find_todo_id(args.match) else: print("Error: --uid or --match is required", file=sys.stderr) sys.exit(1) _run_todoman("delete", "--yes", str(todo_id)) print(f"Deleted todo #{todo_id}") _sync_calendar() def cmd_todo_check(args): """Daily digest of pending todos (for cron). Silent when empty.""" todos = _todoman_list_json("--sort", "priority") if not todos: return # silent exit output = _format_todo_digest(todos) if output: print(output) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Calendar and todo tool") subparsers = parser.add_subparsers(dest="command", required=True) # --- send --- send_p = subparsers.add_parser("send", help="Send a calendar invite") send_p.add_argument("--from", dest="sender", default=DEFAULT_FROM, help="Sender email") send_p.add_argument("--to", required=True, help="Recipient(s), comma-separated") send_p.add_argument("--subject", required=True, help="Email subject") send_p.add_argument("--summary", required=True, help="Event title") send_p.add_argument("--start", required=True, help="Start time (ISO 8601)") send_p.add_argument("--end", required=True, help="End time (ISO 8601)") send_p.add_argument("--timezone", default=DEFAULT_TIMEZONE, help="IANA timezone") send_p.add_argument("--location", default="", help="Event location") send_p.add_argument("--description", default="", help="Event description") send_p.add_argument("--organizer", default="", help="Organizer display name") send_p.add_argument("--uid", default="", help="Custom event UID") send_p.add_argument("--account", default="", help="Himalaya account") send_p.add_argument("--dry-run", action="store_true", help="Preview without sending") # --- reply --- reply_p = subparsers.add_parser("reply", help="Reply to a calendar invite") reply_p.add_argument("--from", dest="sender", default=DEFAULT_FROM, help="Your email") reply_p.add_argument("--action", required=True, help="accept, decline, or tentative") reply_p.add_argument("--envelope-id", default="", help="Himalaya envelope ID") reply_p.add_argument("--ics-file", default="", help="Path to .ics file") reply_p.add_argument("--account", default="", help="Himalaya account") reply_p.add_argument("--folder", default="INBOX", help="Himalaya folder") reply_p.add_argument("--comment", default="", help="Message to include in reply") reply_p.add_argument("--dry-run", action="store_true", help="Preview without sending") # --- todo --- todo_p = subparsers.add_parser("todo", help="Manage VTODO tasks") todo_sub = todo_p.add_subparsers(dest="todo_command", required=True) # todo add add_p = todo_sub.add_parser("add", help="Create a new todo") add_p.add_argument("--summary", required=True, help="Todo title") add_p.add_argument("--due", default="", help="Due date (YYYY-MM-DD, default: tomorrow)") add_p.add_argument("--priority", default="medium", choices=["high", "medium", "low"], help="Priority") add_p.add_argument("--description", default="", help="Notes / description") add_p.add_argument("--alarm", default="1d", help="Reminder trigger (e.g. 1d, 2h, 30m)") add_p.add_argument("--dry-run", action="store_true", help="Preview without saving") # todo list list_p = todo_sub.add_parser("list", help="List todos") list_p.add_argument("--all", action="store_true", help="Include completed todos") # todo complete comp_p = todo_sub.add_parser("complete", help="Mark a todo as done") comp_p.add_argument("--uid", default="", help="Todo UID") comp_p.add_argument("--match", default="", help="Fuzzy match on summary") # todo delete del_p = todo_sub.add_parser("delete", help="Delete a todo") del_p.add_argument("--uid", default="", help="Todo UID") del_p.add_argument("--match", default="", help="Fuzzy match on summary") # todo check todo_sub.add_parser("check", help="Daily digest (for cron)") args = parser.parse_args() if args.command == "send": cmd_send(args) elif args.command == "reply": cmd_reply(args) elif args.command == "todo": if args.todo_command == "add": cmd_todo_add(args) elif args.todo_command == "list": cmd_todo_list(args) elif args.todo_command == "complete": cmd_todo_complete(args) elif args.todo_command == "delete": cmd_todo_delete(args) elif args.todo_command == "check": cmd_todo_check(args) if __name__ == "__main__": main()