claude-configs/skills/paper-dynasty/plan/cli.py
Cal Corum 0fa8486e93 Sync: update agents, paper-dynasty skills, sessions
- agents: issue-worker.md and pr-reviewer.md updated for standard
  branch naming (issue/<number>-<slug> instead of ai/<repo>#<number>)
- paper-dynasty: updated SKILL.md, generate_summary, smoke_test,
  validate_database scripts; added ecosystem_status.sh and plan/
- plugins: updated marketplace submodules and blocklist
- sessions: rotate session files, add session-analysis/
- settings: updated settings.json

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 23:03:10 -05:00

1029 lines
32 KiB
Python
Executable File

#!/usr/bin/env python3
"""
pd-plan — Paper Dynasty Initiative Tracker
A local SQLite-based CLI tool for tracking cross-project priorities
across the Paper Dynasty project ecosystem.
Usage:
python ~/.claude/skills/paper-dynasty/plan/cli.py [command] [options]
"""
import argparse
import json
import os
import sqlite3
import sys
import textwrap
from datetime import datetime, timezone
from pathlib import Path
# ---------------------------------------------------------------------------
# Database setup
# ---------------------------------------------------------------------------
DB_PATH = Path(__file__).parent / "initiatives.db"
PHASE_NAMES = {
1: "Foundation",
2: "Engagement",
3: "Growth",
}
VALID_STATUSES = ("backlog", "active", "in_progress", "blocked", "done")
VALID_SIZES = ("S", "M", "L", "XL")
VALID_IMPACTS = ("retention", "acquisition", "engagement")
def get_conn() -> sqlite3.Connection:
"""Return a connection with row_factory set for dict-like access."""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init_db(conn: sqlite3.Connection) -> None:
"""Create tables if they do not exist."""
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS initiatives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
phase INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'backlog',
priority INTEGER NOT NULL DEFAULT 50,
impact TEXT,
size TEXT,
repos TEXT,
linked_issues TEXT,
blocked_by TEXT,
owner TEXT,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS activity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
initiative_id INTEGER NOT NULL,
action TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
actor TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (initiative_id) REFERENCES initiatives(id)
);
"""
)
conn.commit()
def now_ts() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def log_activity(
conn: sqlite3.Connection,
initiative_id: int,
action: str,
old_value: str | None,
new_value: str | None,
actor: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO activity_log (initiative_id, action, old_value, new_value, actor, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(initiative_id, action, old_value, new_value, actor, now_ts()),
)
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
SEPARATOR = "\u2500"
DOUBLE_SEPARATOR = "\u2550"
def fmt_table(rows: list[dict], columns: list[tuple[str, str, int]]) -> str:
"""
Format a list of dicts as a plain-text table.
columns: list of (key, header, width) tuples
"""
if not rows:
return " (no results)"
header_parts = []
divider_parts = []
for key, header, width in columns:
header_parts.append(header.ljust(width))
divider_parts.append(SEPARATOR * width)
lines = [" " + " ".join(header_parts)]
lines.append(" " + " ".join(divider_parts))
for row in rows:
parts = []
for key, header, width in columns:
val = str(row[key] or "")
parts.append(val.ljust(width)[:width])
lines.append(" " + " ".join(parts))
return "\n".join(lines)
def relative_time(ts_str: str | None) -> str:
"""Return a human-readable relative timestamp like '2h ago'."""
if not ts_str:
return ""
try:
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
delta = datetime.now(timezone.utc) - ts
secs = int(delta.total_seconds())
if secs < 60:
return f"{secs}s ago"
if secs < 3600:
return f"{secs // 60}m ago"
if secs < 86400:
return f"{secs // 3600}h ago"
return f"{secs // 86400}d ago"
except ValueError:
return ts_str
# ---------------------------------------------------------------------------
# Command implementations
# ---------------------------------------------------------------------------
def cmd_list(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Show initiatives matching optional filters."""
clauses = []
params: list = []
if not args.all and args.status != "done":
if args.status:
clauses.append("status = ?")
params.append(args.status)
else:
clauses.append("status != 'done'")
elif args.status:
clauses.append("status = ?")
params.append(args.status)
if args.phase:
clauses.append("phase = ?")
params.append(args.phase)
if args.impact:
clauses.append("impact = ?")
params.append(args.impact)
if args.repo:
clauses.append("(',' || repos || ',' LIKE ?)")
params.append(f"%,{args.repo},%")
if args.owner:
clauses.append("owner = ?")
params.append(args.owner)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
sql = f"SELECT * FROM initiatives {where} ORDER BY priority ASC, id ASC"
rows = [dict(r) for r in conn.execute(sql, params).fetchall()]
if args.json:
print(json.dumps(rows, indent=2, default=str))
return
columns = [
("id", "ID", 4),
("priority", "Pri", 4),
("phase", "Phase", 5),
("status", "Status", 12),
("impact", "Impact", 11),
("size", "Size", 4),
("title", "Title", 55),
]
print()
print(fmt_table(rows, columns))
print()
def cmd_next(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Show the highest-priority non-blocked, non-done initiative."""
clauses = ["status != 'done'", "status != 'blocked'"]
params: list = []
if args.repo:
clauses.append("(',' || repos || ',' LIKE ?)")
params.append(f"%,{args.repo},%")
if args.owner:
clauses.append("owner = ?")
params.append(args.owner)
where = "WHERE " + " AND ".join(clauses)
sql = f"SELECT * FROM initiatives {where} ORDER BY priority ASC, id ASC LIMIT 1"
row = conn.execute(sql, params).fetchone()
if args.json:
print(json.dumps(dict(row) if row else {}, indent=2, default=str))
return
if not row:
print("\n No eligible initiatives found.\n")
return
row = dict(row)
print()
print(f" Next initiative:")
print(f" #{row['id']} [P{row['priority']}] {row['title']}")
print(
f" Status: {row['status']} | Phase: {row['phase']} ({PHASE_NAMES.get(row['phase'], '?')})"
)
print(
f" Impact: {row['impact'] or '-'} | Size: {row['size'] or '-'} | Owner: {row['owner'] or '-'}"
)
if row.get("repos"):
print(f" Repos: {row['repos']}")
if row.get("linked_issues"):
print(f" Issues: {row['linked_issues']}")
if row.get("description"):
wrapped = textwrap.fill(
row["description"], width=70, initial_indent=" ", subsequent_indent=" "
)
print(f"\n{wrapped}")
print()
def cmd_show(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Show full details of a single initiative."""
row = conn.execute("SELECT * FROM initiatives WHERE id = ?", (args.id,)).fetchone()
if not row:
print(f" Error: initiative #{args.id} not found.", file=sys.stderr)
sys.exit(1)
row = dict(row)
logs = conn.execute(
"SELECT * FROM activity_log WHERE initiative_id = ? ORDER BY created_at DESC LIMIT 10",
(args.id,),
).fetchall()
if args.json:
out = dict(row)
out["activity"] = [dict(l) for l in logs]
print(json.dumps(out, indent=2, default=str))
return
phase_name = PHASE_NAMES.get(row["phase"], str(row["phase"]))
print()
print(f" Initiative #{row['id']}: {row['title']}")
print(f" " + DOUBLE_SEPARATOR * 60)
print(f" Phase: {row['phase']}{phase_name}")
print(f" Status: {row['status']}")
print(f" Priority: {row['priority']}")
print(f" Impact: {row['impact'] or '-'}")
print(f" Size: {row['size'] or '-'}")
print(f" Owner: {row['owner'] or '-'}")
print(f" Repos: {row['repos'] or '-'}")
print(f" Issues: {row['linked_issues'] or '-'}")
print(f" Blocked by: {row['blocked_by'] or '-'}")
print(f" Created: {row['created_at']}")
print(f" Updated: {row['updated_at']}")
if row.get("description"):
print()
print(" Description:")
wrapped = textwrap.fill(
row["description"],
width=70,
initial_indent=" ",
subsequent_indent=" ",
)
print(wrapped)
if row.get("notes"):
print()
print(" Notes:")
wrapped = textwrap.fill(
row["notes"], width=70, initial_indent=" ", subsequent_indent=" "
)
print(wrapped)
if logs:
print()
print(" Recent Activity:")
for log in logs:
log = dict(log)
age = relative_time(log["created_at"])
actor = f" ({log['actor']})" if log.get("actor") else ""
if log["action"] == "status_change":
print(
f" [{age}] status: {log['old_value']} -> {log['new_value']}{actor}"
)
elif log["action"] == "note":
note_preview = (log["new_value"] or "")[:60]
print(f' [{age}] note: "{note_preview}"{actor}')
else:
old = f" {log['old_value']} ->" if log.get("old_value") else ""
new = f" {log['new_value']}" if log.get("new_value") else ""
print(f" [{age}] {log['action']}:{old}{new}{actor}")
print()
def cmd_add(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Create a new initiative."""
ts = now_ts()
cursor = conn.execute(
"""
INSERT INTO initiatives
(title, description, phase, status, priority, impact, size,
repos, linked_issues, blocked_by, owner, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
args.title,
args.description,
args.phase,
args.status,
args.priority,
args.impact,
args.size,
args.repos,
args.linked,
args.blocked_by,
args.owner,
None,
ts,
ts,
),
)
new_id = cursor.lastrowid
log_activity(conn, new_id, "created", None, args.title, args.actor)
conn.commit()
if args.json:
print(json.dumps({"id": new_id, "title": args.title}, indent=2))
return
print(f"\n Created initiative #{new_id}: {args.title}\n")
def cmd_update(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Update one or more fields of an initiative."""
row = conn.execute("SELECT * FROM initiatives WHERE id = ?", (args.id,)).fetchone()
if not row:
print(f" Error: initiative #{args.id} not found.", file=sys.stderr)
sys.exit(1)
row = dict(row)
updates: list[tuple[str, str | None, str | None]] = [] # (field, old, new)
set_clauses = []
params = []
def queue(field: str, new_val):
old_val = row.get(field)
if new_val is not None and str(new_val) != str(old_val or ""):
set_clauses.append(f"{field} = ?")
params.append(new_val)
updates.append((field, old_val, new_val))
if args.status is not None:
queue("status", args.status)
if args.priority is not None:
queue("priority", args.priority)
if args.phase is not None:
queue("phase", args.phase)
if args.impact is not None:
queue("impact", args.impact)
if args.size is not None:
queue("size", args.size)
if args.owner is not None:
queue("owner", args.owner)
if args.repos is not None:
queue("repos", args.repos)
if args.description is not None:
queue("description", args.description)
# --blocked-by replaces the field
if args.blocked_by is not None:
queue("blocked_by", args.blocked_by)
# --link appends to linked_issues
if args.link is not None:
existing = row.get("linked_issues") or ""
new_linked = (existing + "," + args.link).strip(",") if existing else args.link
queue("linked_issues", new_linked)
# --note appends to notes; also logs as 'note' action
if args.note is not None:
existing = row.get("notes") or ""
new_notes = (existing + "\n" + args.note).strip() if existing else args.note
set_clauses.append("notes = ?")
params.append(new_notes)
log_activity(conn, args.id, "note", None, args.note, args.actor)
if not set_clauses:
print(" Nothing to update.")
return
set_clauses.append("updated_at = ?")
params.append(now_ts())
params.append(args.id)
conn.execute(
f"UPDATE initiatives SET {', '.join(set_clauses)} WHERE id = ?",
params,
)
for field, old_val, new_val in updates:
if field == "status":
log_activity(
conn, args.id, "status_change", str(old_val), str(new_val), args.actor
)
else:
log_activity(
conn, args.id, f"update_{field}", str(old_val), str(new_val), args.actor
)
conn.commit()
if args.json:
updated_row = dict(
conn.execute(
"SELECT * FROM initiatives WHERE id = ?", (args.id,)
).fetchone()
)
print(json.dumps(updated_row, indent=2, default=str))
return
print(f"\n Updated initiative #{args.id}\n")
def cmd_done(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Mark an initiative as done."""
row = conn.execute(
"SELECT status FROM initiatives WHERE id = ?", (args.id,)
).fetchone()
if not row:
print(f" Error: initiative #{args.id} not found.", file=sys.stderr)
sys.exit(1)
old_status = row["status"]
conn.execute(
"UPDATE initiatives SET status = 'done', updated_at = ? WHERE id = ?",
(now_ts(), args.id),
)
log_activity(
conn, args.id, "status_change", old_status, "done", getattr(args, "actor", None)
)
conn.commit()
if args.json:
print(json.dumps({"id": args.id, "status": "done"}, indent=2))
return
print(f"\n Initiative #{args.id} marked as done.\n")
def cmd_summary(args: argparse.Namespace, conn: sqlite3.Connection) -> None:
"""Dashboard view for session startup."""
all_rows = [
dict(r)
for r in conn.execute(
"SELECT * FROM initiatives ORDER BY priority ASC"
).fetchall()
]
if args.json:
# Build structured summary
phases = {}
for row in all_rows:
p = row["phase"]
if p not in phases:
phases[p] = {}
s = row["status"]
phases[p][s] = phases[p].get(s, 0) + 1
top3 = [r for r in all_rows if r["status"] != "done"][:3]
recent_logs = [
dict(l)
for l in conn.execute(
"""
SELECT al.*, i.title
FROM activity_log al
JOIN initiatives i ON i.id = al.initiative_id
ORDER BY al.created_at DESC LIMIT 5
"""
).fetchall()
]
print(
json.dumps(
{
"phases": phases,
"top_priorities": top3,
"recent_activity": recent_logs,
},
indent=2,
default=str,
)
)
return
W = 50
print()
print(" Paper Dynasty Initiative Tracker")
print(" " + DOUBLE_SEPARATOR * W)
print()
for phase_num, phase_name in PHASE_NAMES.items():
phase_rows = [r for r in all_rows if r["phase"] == phase_num]
if not phase_rows:
continue
counts: dict[str, int] = {}
for r in phase_rows:
counts[r["status"]] = counts.get(r["status"], 0) + 1
parts = []
for s in ("in_progress", "active", "blocked", "backlog", "done"):
if counts.get(s, 0):
label = s.replace("_", " ").title()
parts.append(f"{label}: {counts[s]}")
print(f" Phase {phase_num}{phase_name}")
print(f" " + " | ".join(parts))
# Top 3 non-done initiatives
top = [r for r in all_rows if r["status"] != "done"][:3]
if top:
print()
print(" Top Priorities:")
for i, r in enumerate(top, 1):
impact = r["impact"] or "-"
size = r["size"] or "-"
print(
f" #{r['id']} [P{r['priority']}] {r['title']}{r['status']}, {impact}, {size}"
)
# Recently updated
recent_logs = conn.execute(
"""
SELECT al.*, i.title
FROM activity_log al
JOIN initiatives i ON i.id = al.initiative_id
ORDER BY al.created_at DESC LIMIT 5
"""
).fetchall()
if recent_logs:
print()
print(" Recently Updated:")
for log in recent_logs:
log = dict(log)
age = relative_time(log["created_at"])
if log["action"] == "status_change":
print(
f" #{log['initiative_id']} — status: {log['old_value']} -> {log['new_value']} ({age})"
)
elif log["action"] == "note":
preview = (log["new_value"] or "")[:50]
print(f' #{log["initiative_id"]} — note: "{preview}" ({age})')
else:
desc = log["action"].replace("update_", "")
new_v = log.get("new_value") or ""
print(f" #{log['initiative_id']}{desc}: {new_v[:40]} ({age})")
print()
# ---------------------------------------------------------------------------
# Seed data
# ---------------------------------------------------------------------------
SEED_INITIATIVES = [
# Phase 1
{
"title": "Complete card evolution / Refractor system",
"description": "Complete the card evolution mechanic (internally known as Refractor) across discord, database, and card-creation services.",
"phase": 1,
"status": "in_progress",
"priority": 10,
"impact": "retention",
"size": "M",
"repos": "discord,database,card-creation",
"linked_issues": "discord#87,discord#88,discord#89",
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Clear Discord bug backlog",
"description": "Work through the accumulated bug backlog in the discord repo.",
"phase": 1,
"status": "done",
"priority": 15,
"impact": "retention",
"size": "S",
"repos": "discord",
"linked_issues": None,
"blocked_by": None,
"owner": "pd-discord",
"notes": None,
},
{
"title": "Clear card creation backlog",
"description": "Review and merge the 11 open PRs in the card-creation repo.",
"phase": 1,
"status": "active",
"priority": 20,
"impact": "retention",
"size": "S",
"repos": "card-creation",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": "11 PRs need review",
},
{
"title": "Database API stability — HTTPException fix",
"description": "Standardise HTTPException usage across the database API to fix inconsistent error responses.",
"phase": 1,
"status": "active",
"priority": 25,
"impact": "retention",
"size": "M",
"repos": "database",
"linked_issues": "database#16",
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Database API — add test suite",
"description": "Add a comprehensive test suite to the database API service covering all endpoints.",
"phase": 1,
"status": "backlog",
"priority": 30,
"impact": "retention",
"size": "L",
"repos": "database",
"linked_issues": "database#28",
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Remove legacy SQLite code",
"description": "Remove all remaining legacy SQLite code from the database service now that PostgreSQL migration is complete.",
"phase": 1,
"status": "backlog",
"priority": 35,
"impact": "retention",
"size": "S",
"repos": "database",
"linked_issues": "database#122,database#123,database#124",
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Resolve ruff pre-commit hook",
"description": "Fix the ruff pre-commit hook that is blocking contributions to the discord repo.",
"phase": 1,
"status": "backlog",
"priority": 40,
"impact": "retention",
"size": "S",
"repos": "discord",
"linked_issues": "discord#108",
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Refractor system rename",
"description": "Rename the evolution system to 'Refractor' across all user-facing surfaces in discord and database.",
"phase": 1,
"status": "backlog",
"priority": 12,
"impact": "retention",
"size": "M",
"repos": "discord,database",
"linked_issues": "discord#87,discord#88",
"blocked_by": "1",
"owner": None,
"notes": None,
},
# Phase 2
{
"title": "Enhanced pack opening experience",
"description": "Redesign the pack opening flow for improved engagement and excitement.",
"phase": 2,
"status": "backlog",
"priority": 50,
"impact": "engagement",
"size": "M",
"repos": "discord",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "League / season structure",
"description": "Implement a league and season structure with rankings and rewards.",
"phase": 2,
"status": "backlog",
"priority": 55,
"impact": "engagement",
"size": "L",
"repos": "discord,database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Content pipeline automation",
"description": "Automate the card content pipeline to reduce manual effort for new set releases.",
"phase": 2,
"status": "backlog",
"priority": 60,
"impact": "engagement",
"size": "M",
"repos": "discord,database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Social features — trades and showcases",
"description": "Implement card trading between players and personal showcase/display features.",
"phase": 2,
"status": "backlog",
"priority": 65,
"impact": "engagement",
"size": "L",
"repos": "discord,database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Gauntlet improvements",
"description": "Iterate on the Gauntlet game mode based on player feedback.",
"phase": 2,
"status": "backlog",
"priority": 70,
"impact": "engagement",
"size": "M",
"repos": "discord,database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
# Phase 3
{
"title": "Website revival",
"description": "Revive the Paper Dynasty website with current game info, leaderboards, and set details.",
"phase": 3,
"status": "backlog",
"priority": 75,
"impact": "acquisition",
"size": "L",
"repos": "website",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Web gameplay app",
"description": "Build a web-based gameplay client so players can participate without Discord.",
"phase": 3,
"status": "backlog",
"priority": 80,
"impact": "acquisition",
"size": "XL",
"repos": "database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Onboarding flow",
"description": "Redesign the new player onboarding flow to reduce drop-off.",
"phase": 3,
"status": "backlog",
"priority": 85,
"impact": "acquisition",
"size": "M",
"repos": "discord",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
{
"title": "Analytics instrumentation",
"description": "Add analytics instrumentation across discord and database to track player engagement.",
"phase": 3,
"status": "backlog",
"priority": 60,
"impact": "engagement",
"size": "M",
"repos": "discord,database",
"linked_issues": None,
"blocked_by": None,
"owner": None,
"notes": None,
},
]
def seed_db(conn: sqlite3.Connection) -> None:
"""Insert seed initiatives if the table is empty."""
count = conn.execute("SELECT COUNT(*) FROM initiatives").fetchone()[0]
if count > 0:
print(
f" Database already has {count} initiatives — skipping seed.",
file=sys.stderr,
)
return
ts = now_ts()
for item in SEED_INITIATIVES:
cursor = conn.execute(
"""
INSERT INTO initiatives
(title, description, phase, status, priority, impact, size,
repos, linked_issues, blocked_by, owner, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
item["title"],
item.get("description"),
item["phase"],
item["status"],
item["priority"],
item.get("impact"),
item.get("size"),
item.get("repos"),
item.get("linked_issues"),
item.get("blocked_by"),
item.get("owner"),
item.get("notes"),
ts,
ts,
),
)
log_activity(conn, cursor.lastrowid, "created", None, item["title"], "seed")
conn.commit()
print(f" Seeded {len(SEED_INITIATIVES)} initiatives.", file=sys.stderr)
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
# Common flags shared by all subcommands via a parent parser.
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--json", action="store_true", help="Output as JSON")
common.add_argument("--actor", default=None, help="Actor name for activity log")
parser = argparse.ArgumentParser(
prog="pd-plan",
description="Paper Dynasty Initiative Tracker",
parents=[common],
)
sub = parser.add_subparsers(dest="command")
# list
p_list = sub.add_parser("list", help="List initiatives", parents=[common])
p_list.add_argument("--phase", type=int)
p_list.add_argument("--status")
p_list.add_argument("--impact")
p_list.add_argument("--repo")
p_list.add_argument("--owner")
p_list.add_argument("--all", action="store_true")
# next
p_next = sub.add_parser(
"next", help="Show next highest-priority initiative", parents=[common]
)
p_next.add_argument("--repo")
p_next.add_argument("--owner")
# show
p_show = sub.add_parser(
"show", help="Show full details of an initiative", parents=[common]
)
p_show.add_argument("id", type=int)
# add
p_add = sub.add_parser("add", help="Create a new initiative", parents=[common])
p_add.add_argument("title")
p_add.add_argument("--description", default=None)
p_add.add_argument("--phase", type=int, default=1)
p_add.add_argument("--status", default="backlog", choices=VALID_STATUSES)
p_add.add_argument("--priority", type=int, default=50)
p_add.add_argument("--impact", choices=VALID_IMPACTS)
p_add.add_argument("--size", choices=VALID_SIZES)
p_add.add_argument("--repos", default=None)
p_add.add_argument("--linked", default=None, dest="linked")
p_add.add_argument("--owner", default=None)
p_add.add_argument("--blocked-by", default=None, dest="blocked_by")
# update
p_update = sub.add_parser("update", help="Update an initiative", parents=[common])
p_update.add_argument("id", type=int)
p_update.add_argument("--status", choices=VALID_STATUSES)
p_update.add_argument("--priority", type=int)
p_update.add_argument("--phase", type=int)
p_update.add_argument("--impact", choices=VALID_IMPACTS)
p_update.add_argument("--size", choices=VALID_SIZES)
p_update.add_argument("--owner")
p_update.add_argument("--repos")
p_update.add_argument("--description")
p_update.add_argument("--blocked-by", dest="blocked_by")
p_update.add_argument("--link")
p_update.add_argument("--note")
# done
p_done = sub.add_parser("done", help="Mark an initiative as done", parents=[common])
p_done.add_argument("id", type=int)
# summary
sub.add_parser("summary", help="Dashboard view", parents=[common])
# seed (internal / bootstrap)
sub.add_parser("seed", help="Seed the database with initial data", parents=[common])
return parser
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = build_parser()
args = parser.parse_args()
# Propagate top-level flags to sub-namespace
# (argparse puts them before the subcommand so we pass them manually)
with get_conn() as conn:
init_db(conn)
if args.command is None:
parser.print_help()
sys.exit(0)
# Ensure --json and --actor always have defaults when parsed by subparser.
if not hasattr(args, "json"):
args.json = False
if not hasattr(args, "actor"):
args.actor = None
if args.command == "list":
cmd_list(args, conn)
elif args.command == "next":
cmd_next(args, conn)
elif args.command == "show":
cmd_show(args, conn)
elif args.command == "add":
cmd_add(args, conn)
elif args.command == "update":
cmd_update(args, conn)
elif args.command == "done":
cmd_done(args, conn)
elif args.command == "summary":
cmd_summary(args, conn)
elif args.command == "seed":
seed_db(conn)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()