calendar: use todoman for VTODO list/complete/delete/check
Replaces custom .ics parsing with todoman CLI (--porcelain for JSON). todo add still uses icalendar directly (needs ICS creation + email). Updates MIGRATION.md with todoman install/config instructions.
This commit is contained in:
@@ -386,50 +386,44 @@ def cmd_reply(args):
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VTODO: helpers
|
||||
# VTODO: todoman helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _load_todos():
|
||||
"""Load all VTODO items from TASKS_DIR. Returns list of (path, vtodo) tuples."""
|
||||
if not TASKS_DIR.is_dir():
|
||||
def _run_todoman(*todo_args):
|
||||
"""Run a todoman command and return its stdout."""
|
||||
cmd = ["todo"] + list(todo_args)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
print(f"Error: todoman failed: {result.stderr.strip()}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def _todoman_list_json(*extra_args):
|
||||
"""Get todos as JSON from todoman --porcelain."""
|
||||
cmd = ["todo", "--porcelain", "list"] + list(extra_args)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
print(f"Error: todoman failed: {result.stderr.strip()}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if not result.stdout.strip():
|
||||
return []
|
||||
todos = []
|
||||
for ics_path in TASKS_DIR.glob("*.ics"):
|
||||
import json
|
||||
return json.loads(result.stdout)
|
||||
|
||||
|
||||
def _days_until(due_str):
|
||||
"""Days from today until due date string. Negative means overdue."""
|
||||
if not due_str:
|
||||
return None
|
||||
try:
|
||||
due_date = datetime.fromisoformat(due_str).date()
|
||||
except (ValueError, TypeError):
|
||||
try:
|
||||
cal = Calendar.from_ical(ics_path.read_bytes())
|
||||
for component in cal.walk():
|
||||
if component.name == "VTODO":
|
||||
todos.append((ics_path, component))
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
return todos
|
||||
|
||||
|
||||
def _get_due_date(vtodo):
|
||||
"""Extract due date from a VTODO as a date object, or None."""
|
||||
due = vtodo.get("due")
|
||||
if due is None:
|
||||
return None
|
||||
dt = due.dt
|
||||
if isinstance(dt, datetime):
|
||||
return dt.date()
|
||||
return dt
|
||||
|
||||
|
||||
def _get_priority_int(vtodo):
|
||||
"""Get priority as int (1=high, 5=medium, 9=low). Default 5."""
|
||||
p = vtodo.get("priority")
|
||||
if p is None:
|
||||
return 5
|
||||
return int(p)
|
||||
|
||||
|
||||
def _days_until(due_date):
|
||||
"""Days from today until due_date. Negative means overdue."""
|
||||
if due_date is None:
|
||||
return None
|
||||
due_date = _parse_date(due_str)
|
||||
except ValueError:
|
||||
return None
|
||||
return (due_date - date.today()).days
|
||||
|
||||
|
||||
@@ -449,42 +443,19 @@ def _urgency_label(days):
|
||||
return f"🟢 {days} 天后"
|
||||
|
||||
|
||||
def _find_todo_by_match(todos, match_str):
|
||||
"""Find a single todo by fuzzy match on SUMMARY. Exits on 0 or >1 matches."""
|
||||
matches = []
|
||||
for path, vtodo in todos:
|
||||
summary = str(vtodo.get("summary", ""))
|
||||
if match_str in summary:
|
||||
matches.append((path, vtodo))
|
||||
def _find_todo_id(match_str):
|
||||
"""Find a todoman ID by matching summary text. Exits on 0 or >1 matches."""
|
||||
todos = _todoman_list_json("--sort", "due")
|
||||
matches = [t for t in todos if match_str in (t.get("summary") or "")]
|
||||
if not matches:
|
||||
print(f"Error: No todo matching '{match_str}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if len(matches) > 1:
|
||||
print(f"Error: Multiple todos match '{match_str}':", file=sys.stderr)
|
||||
for _, vt in matches:
|
||||
print(f" - {vt.get('summary')} (UID: {vt.get('uid')})", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return matches[0]
|
||||
|
||||
|
||||
def _find_todo_by_uid(todos, uid):
|
||||
"""Find a todo by UID. Exits if not found."""
|
||||
for path, vtodo in todos:
|
||||
if str(vtodo.get("uid", "")) == uid:
|
||||
return path, vtodo
|
||||
print(f"Error: No todo with UID '{uid}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _resolve_todo(args, todos):
|
||||
"""Resolve a todo from --uid or --match args."""
|
||||
if args.uid:
|
||||
return _find_todo_by_uid(todos, args.uid)
|
||||
elif args.match:
|
||||
return _find_todo_by_match(todos, args.match)
|
||||
else:
|
||||
print("Error: --uid or --match is required", file=sys.stderr)
|
||||
for t in matches:
|
||||
print(f" - {t.get('summary')} (id: {t.get('id')})", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return matches[0]["id"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -584,152 +555,116 @@ def cmd_todo_add(args):
|
||||
print(f"Warning: Failed to email todo to {DEFAULT_OWNER_EMAIL}")
|
||||
|
||||
|
||||
def cmd_todo_list(args):
|
||||
"""List todos, optionally including completed ones."""
|
||||
todos = _load_todos()
|
||||
def _format_todo_digest(todos):
|
||||
"""Format todos into the Chinese priority-grouped digest. Returns string or None."""
|
||||
if not todos:
|
||||
print("No todos found.")
|
||||
return
|
||||
return None
|
||||
|
||||
# Filter
|
||||
if not args.all:
|
||||
todos = [(p, vt) for p, vt in todos
|
||||
if str(vt.get("status", "NEEDS-ACTION")) in ("NEEDS-ACTION", "IN-PROCESS")]
|
||||
|
||||
if not todos:
|
||||
print("No pending todos.")
|
||||
return
|
||||
|
||||
# Sort by priority then due date
|
||||
def sort_key(item):
|
||||
_, vt = item
|
||||
prio = _get_priority_int(vt)
|
||||
due = _get_due_date(vt)
|
||||
return (prio, due or date.max)
|
||||
|
||||
todos.sort(key=sort_key)
|
||||
|
||||
# Format output
|
||||
today_str = date.today().isoformat()
|
||||
print(f"📋 待办事项 ({today_str})")
|
||||
print("=" * 50)
|
||||
lines = [f"📋 待办事项 ({today_str})", "=" * 50]
|
||||
|
||||
# Group by priority
|
||||
groups = {1: [], 5: [], 9: []}
|
||||
for path, vt in todos:
|
||||
prio = _get_priority_int(vt)
|
||||
# Bucket into nearest standard priority
|
||||
if prio <= 3:
|
||||
groups[1].append((path, vt))
|
||||
elif prio <= 7:
|
||||
groups[5].append((path, vt))
|
||||
for t in todos:
|
||||
prio = t.get("priority") or 0
|
||||
if 1 <= prio <= 3:
|
||||
groups[1].append(t)
|
||||
elif 4 <= prio <= 7:
|
||||
groups[5].append(t)
|
||||
else:
|
||||
groups[9].append((path, vt))
|
||||
groups[9].append(t)
|
||||
|
||||
for prio, emoji, label in [(1, "🔴", "高优先级"), (5, "🟡", "中优先级"), (9, "🟢", "低优先级")]:
|
||||
items = groups[prio]
|
||||
if not items:
|
||||
continue
|
||||
print(f"\n{emoji} {label}:")
|
||||
for _, vt in items:
|
||||
summary = str(vt.get("summary", ""))
|
||||
due = _get_due_date(vt)
|
||||
lines.append(f"\n{emoji} {label}:")
|
||||
for t in items:
|
||||
summary = t.get("summary") or ""
|
||||
due = t.get("due")
|
||||
days = _days_until(due)
|
||||
urgency = _urgency_label(days)
|
||||
status = str(vt.get("status", ""))
|
||||
desc = str(vt.get("description", ""))
|
||||
desc = t.get("description") or ""
|
||||
is_completed = t.get("is_completed", False)
|
||||
|
||||
line = f" • {summary} ({urgency})"
|
||||
if status == "COMPLETED":
|
||||
if is_completed:
|
||||
line = f" • ✅ {summary} (已完成)"
|
||||
else:
|
||||
line = f" • {summary} ({urgency})"
|
||||
if desc:
|
||||
line += f" | {desc}"
|
||||
print(line)
|
||||
lines.append(line)
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
lines.append("\n" + "=" * 50)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def cmd_todo_list(args):
|
||||
"""List todos via todoman, optionally including completed ones."""
|
||||
extra = ["--sort", "priority"]
|
||||
if args.all:
|
||||
extra += ["--status", "ANY"]
|
||||
todos = _todoman_list_json(*extra)
|
||||
|
||||
if not todos:
|
||||
print("No pending todos." if not args.all else "No todos found.")
|
||||
return
|
||||
|
||||
output = _format_todo_digest(todos)
|
||||
if output:
|
||||
print(output)
|
||||
|
||||
|
||||
def cmd_todo_complete(args):
|
||||
"""Mark a todo as COMPLETED."""
|
||||
todos = _load_todos()
|
||||
path, vtodo = _resolve_todo(args, todos)
|
||||
"""Mark a todo as done via todoman."""
|
||||
if args.uid:
|
||||
# todoman uses numeric IDs, not UIDs — search by UID in summary fallback
|
||||
todos = _todoman_list_json("--sort", "due")
|
||||
match = [t for t in todos if args.uid in (t.get("uid") or "")]
|
||||
if not match:
|
||||
print(f"Error: No todo with UID '{args.uid}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
todo_id = match[0]["id"]
|
||||
elif args.match:
|
||||
todo_id = _find_todo_id(args.match)
|
||||
else:
|
||||
print("Error: --uid or --match is required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Read, modify, rewrite
|
||||
cal = Calendar.from_ical(path.read_bytes())
|
||||
for component in cal.walk():
|
||||
if component.name == "VTODO":
|
||||
component["status"] = "COMPLETED"
|
||||
component.add("completed", datetime.now(timezone.utc))
|
||||
break
|
||||
|
||||
path.write_bytes(cal.to_ical())
|
||||
summary = str(vtodo.get("summary", ""))
|
||||
print(f"Completed: {summary}")
|
||||
_run_todoman("done", str(todo_id))
|
||||
print(f"Completed todo #{todo_id}")
|
||||
_sync_calendar()
|
||||
|
||||
|
||||
def cmd_todo_delete(args):
|
||||
"""Delete a todo .ics file."""
|
||||
todos = _load_todos()
|
||||
path, vtodo = _resolve_todo(args, todos)
|
||||
"""Delete a todo via todoman."""
|
||||
if args.uid:
|
||||
todos = _todoman_list_json("--sort", "due")
|
||||
match = [t for t in todos if args.uid in (t.get("uid") or "")]
|
||||
if not match:
|
||||
print(f"Error: No todo with UID '{args.uid}'", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
todo_id = match[0]["id"]
|
||||
elif args.match:
|
||||
todo_id = _find_todo_id(args.match)
|
||||
else:
|
||||
print("Error: --uid or --match is required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
summary = str(vtodo.get("summary", ""))
|
||||
path.unlink()
|
||||
print(f"Deleted: {summary}")
|
||||
_run_todoman("delete", str(todo_id))
|
||||
print(f"Deleted todo #{todo_id}")
|
||||
_sync_calendar()
|
||||
|
||||
|
||||
def cmd_todo_check(args):
|
||||
"""Daily digest of pending todos (for cron). Silent when empty."""
|
||||
todos = _load_todos()
|
||||
# Filter to NEEDS-ACTION only
|
||||
pending = [(p, vt) for p, vt in todos
|
||||
if str(vt.get("status", "NEEDS-ACTION")) in ("NEEDS-ACTION", "IN-PROCESS")]
|
||||
|
||||
if not pending:
|
||||
todos = _todoman_list_json("--sort", "priority")
|
||||
if not todos:
|
||||
return # silent exit
|
||||
|
||||
# Sort by priority then due date
|
||||
def sort_key(item):
|
||||
_, vt = item
|
||||
prio = _get_priority_int(vt)
|
||||
due = _get_due_date(vt)
|
||||
return (prio, due or date.max)
|
||||
|
||||
pending.sort(key=sort_key)
|
||||
|
||||
# Format output (same style as todo list but without footer)
|
||||
today_str = date.today().isoformat()
|
||||
print(f"📋 待办事项 ({today_str})")
|
||||
print("=" * 50)
|
||||
|
||||
groups = {1: [], 5: [], 9: []}
|
||||
for path, vt in pending:
|
||||
prio = _get_priority_int(vt)
|
||||
if prio <= 3:
|
||||
groups[1].append((path, vt))
|
||||
elif prio <= 7:
|
||||
groups[5].append((path, vt))
|
||||
else:
|
||||
groups[9].append((path, vt))
|
||||
|
||||
for prio, emoji, label in [(1, "🔴", "高优先级"), (5, "🟡", "中优先级"), (9, "🟢", "低优先级")]:
|
||||
items = groups[prio]
|
||||
if not items:
|
||||
continue
|
||||
print(f"\n{emoji} {label}:")
|
||||
for _, vt in items:
|
||||
summary = str(vt.get("summary", ""))
|
||||
due = _get_due_date(vt)
|
||||
days = _days_until(due)
|
||||
urgency = _urgency_label(days)
|
||||
desc = str(vt.get("description", ""))
|
||||
line = f" • {summary} ({urgency})"
|
||||
if desc:
|
||||
line += f" | {desc}"
|
||||
print(line)
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
output = _format_todo_digest(todos)
|
||||
if output:
|
||||
print(output)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user