claude-configs/skills/paper-dynasty/api_client.py
Cal Corum 8642bb539a Migrate Gitea ops to MCP, update Paper Dynasty skill, sync plugins
- CLAUDE.md + commit-push-pr: prefer gitea-mcp over tea CLI
- Paper Dynasty: updated api_client, cli, distribute_packs
- New skill: resume-tailoring
- Plugins: updated marketplaces, blocklist, install counts
- Settings and MCP config updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 18:59:49 -06:00

732 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Paper Dynasty API Client
Shared API client for all Paper Dynasty operations.
Provides methods for interacting with teams, players, cards, gauntlets, and more.
Environment Variables:
API_TOKEN: Bearer token for API authentication (required)
DATABASE: 'prod' or 'dev' (default: dev)
"""
import os
import sys
from typing import Optional, Dict, List, Any
import requests
class PaperDynastyAPI:
"""
Paper Dynasty API client for remote database access
Usage:
api = PaperDynastyAPI(environment='prod')
# Get a team
team = api.get_team(abbrev='SKB')
# List gauntlet runs
runs = api.list_gauntlet_runs(event_id=8, active_only=True)
# Wipe team cards
api.wipe_team_cards(team_id=464)
"""
def __init__(
self,
environment: str = "dev",
token: Optional[str] = None,
verbose: bool = False,
):
"""
Initialize API client
Args:
environment: 'prod' or 'dev'
token: API token (defaults to API_TOKEN env var). Only required for write operations (POST/PATCH/DELETE).
verbose: Print request/response details
"""
self.env = environment.lower()
self.base_url = (
"https://pd.manticorum.com/api"
if "prod" in self.env
else "https://pddev.manticorum.com/api"
)
self.token = token or os.getenv("API_TOKEN")
self.verbose = verbose
self.headers = {"Content-Type": "application/json"}
if self.token:
self.headers["Authorization"] = f"Bearer {self.token}"
def _require_token(self):
"""Raise if no API token is set (needed for write operations)"""
if not self.token:
raise ValueError(
"API_TOKEN required for write operations. "
"Set it with: export API_TOKEN='your-token-here'"
)
def _log(self, message: str):
"""Print message if verbose mode enabled"""
if self.verbose:
print(f"[API] {message}")
def _build_url(
self,
endpoint: str,
api_ver: int = 2,
object_id: Optional[int] = None,
params: Optional[List] = None,
) -> str:
"""Build API URL with parameters"""
url = f"{self.base_url}/v{api_ver}/{endpoint}"
if object_id is not None:
url += f"/{object_id}"
if params:
param_strs = [f"{k}={v}" for k, v in params]
url += "?" + "&".join(param_strs)
return url
# ====================
# Low-level HTTP methods
# ====================
def get(
self,
endpoint: str,
object_id: Optional[int] = None,
params: Optional[List] = None,
timeout: int = 10,
) -> Dict:
"""GET request to API"""
url = self._build_url(endpoint, object_id=object_id, params=params)
self._log(f"GET {url}")
response = requests.get(url, headers=self.headers, timeout=timeout)
response.raise_for_status()
return response.json()
def post(
self, endpoint: str, payload: Optional[Dict] = None, timeout: int = 10
) -> Any:
"""POST request to API"""
self._require_token()
url = self._build_url(endpoint)
self._log(f"POST {url}")
response = requests.post(
url, headers=self.headers, json=payload, timeout=timeout
)
response.raise_for_status()
return response.json() if response.text else {}
def patch(
self, endpoint: str, object_id: int, params: List, timeout: int = 10
) -> Dict:
"""PATCH request to API"""
self._require_token()
url = self._build_url(endpoint, object_id=object_id, params=params)
self._log(f"PATCH {url}")
response = requests.patch(url, headers=self.headers, timeout=timeout)
response.raise_for_status()
return response.json()
def delete(self, endpoint: str, object_id: int, timeout: int = 10) -> str:
"""DELETE request to API"""
self._require_token()
url = self._build_url(endpoint, object_id=object_id)
self._log(f"DELETE {url}")
response = requests.delete(url, headers=self.headers, timeout=timeout)
response.raise_for_status()
return response.text
# ====================
# Team Operations
# ====================
def get_team(
self, team_id: Optional[int] = None, abbrev: Optional[str] = None
) -> Dict:
"""
Get a team by ID or abbreviation
Args:
team_id: Team ID
abbrev: Team abbreviation (e.g., 'SKB')
Returns:
Team dict
"""
if team_id:
return self.get("teams", object_id=team_id)
elif abbrev:
result = self.get("teams", params=[("abbrev", abbrev.upper())])
teams = result.get("teams", [])
if not teams:
raise ValueError(f"Team '{abbrev}' not found")
return teams[0]
else:
raise ValueError("Must provide team_id or abbrev")
def list_teams(
self, season: Optional[int] = None, event_id: Optional[int] = None
) -> List[Dict]:
"""
List teams
Args:
season: Filter by season
event_id: Filter by event
Returns:
List of team dicts
"""
params = []
if season:
params.append(("season", season))
if event_id:
params.append(("event", event_id))
result = self.get("teams", params=params if params else None)
return result.get("teams", [])
# ====================
# Card Operations
# ====================
def wipe_team_cards(self, team_id: int) -> Any:
"""
Wipe all cards for a team (unassigns them)
Args:
team_id: Team ID
Returns:
API response
"""
return self.post(f"cards/wipe-team/{team_id}")
def list_cards(
self, team_id: Optional[int] = None, player_id: Optional[int] = None
) -> List[Dict]:
"""
List cards. At least one filter is required to avoid massive unfiltered queries.
Args:
team_id: Filter by team
player_id: Filter by player
Returns:
List of card dicts
"""
if not team_id and not player_id:
raise ValueError(
"list_cards requires at least one filter (team_id or player_id)"
)
params = []
if team_id:
params.append(("team_id", team_id))
if player_id:
params.append(("player_id", player_id))
result = self.get("cards", params=params if params else None)
return result.get("cards", [])
# ====================
# Pack Operations
# ====================
def list_packs(
self,
team_id: Optional[int] = None,
opened: Optional[bool] = None,
new_to_old: bool = False,
limit: Optional[int] = None,
timeout: int = 10,
) -> List[Dict]:
"""
List packs
Args:
team_id: Filter by team
opened: Filter by opened status (True=opened, False=unopened)
new_to_old: Sort newest to oldest (default: False)
limit: Maximum number of results (e.g., 200, 1000, 2000)
timeout: Request timeout in seconds (default: 10, increase for large queries)
Returns:
List of pack dicts
Examples:
# Get 200 most recently opened packs
packs = api.list_packs(opened=True, new_to_old=True, limit=200)
# Get unopened packs for a team
packs = api.list_packs(team_id=69, opened=False)
# Large query with extended timeout
packs = api.list_packs(opened=True, limit=2000, timeout=30)
"""
if team_id is None and opened is None:
raise ValueError(
"list_packs requires at least one filter (team_id or opened)"
)
params = []
if team_id:
params.append(("team_id", team_id))
if opened is not None:
params.append(("opened", "true" if opened else "false"))
if new_to_old:
params.append(("new_to_old", "true"))
if limit:
params.append(("limit", str(limit)))
result = self.get("packs", params=params if params else None, timeout=timeout)
return result.get("packs", [])
def delete_pack(self, pack_id: int) -> str:
"""
Delete a pack
Args:
pack_id: Pack ID
Returns:
Success message
"""
return self.delete("packs", object_id=pack_id)
def update_pack(
self,
pack_id: int,
pack_cardset_id: Optional[int] = None,
pack_team_id: Optional[int] = None,
pack_type_id: Optional[int] = None,
) -> Dict:
"""
Update pack properties (PATCH)
Args:
pack_id: Pack ID
pack_cardset_id: Update pack cardset (use -1 to clear)
pack_team_id: Update pack team (use -1 to clear)
pack_type_id: Update pack type
Returns:
Updated pack dict
Example:
# Fix missing cardset on Team Choice pack
api.update_pack(pack_id=21207, pack_cardset_id=27)
"""
params = []
if pack_cardset_id is not None:
params.append(("pack_cardset_id", pack_cardset_id))
if pack_team_id is not None:
params.append(("pack_team_id", pack_team_id))
if pack_type_id is not None:
params.append(("pack_type_id", pack_type_id))
return self.patch("packs", object_id=pack_id, params=params)
def create_packs(self, packs: List[Dict]) -> Any:
"""
Create packs (bulk distribution)
Args:
packs: List of pack dicts with keys: team_id, pack_type_id, pack_cardset_id
Returns:
API response
Example:
# Give 5 Standard packs to team 31
api.create_packs([
{'team_id': 31, 'pack_type_id': 1, 'pack_cardset_id': None}
for _ in range(5)
])
"""
payload = {"packs": packs}
return self.post("packs", payload=payload)
def get_packs_opened_today(self, limit: int = 2000, timeout: int = 30) -> Dict:
"""
Get analytics on packs opened today
Args:
limit: Number of recent packs to check (default: 2000)
timeout: Request timeout in seconds (default: 30)
Returns:
Dict with keys:
- total: Total packs opened today
- teams: List of dicts with team info and pack counts
- note: Warning if limit was reached
Example:
result = api.get_packs_opened_today()
print(f"{result['total']} packs opened by {len(result['teams'])} teams")
"""
from datetime import datetime, timezone
from collections import defaultdict
# Get recent opened packs
packs = self.list_packs(
opened=True, new_to_old=True, limit=limit, timeout=timeout
)
# Today's date (UTC)
today = datetime.now(timezone.utc).date()
# Count packs by team
teams_data = defaultdict(
lambda: {"count": 0, "abbrev": "", "lname": "", "first": None, "last": None}
)
total = 0
for pack in packs:
if pack.get("open_time"):
try:
open_dt = datetime.fromtimestamp(
pack["open_time"] / 1000, tz=timezone.utc
)
if open_dt.date() == today:
total += 1
team_id = pack["team"]["id"]
teams_data[team_id]["abbrev"] = pack["team"]["abbrev"]
teams_data[team_id]["lname"] = pack["team"]["lname"]
teams_data[team_id]["count"] += 1
if (
teams_data[team_id]["first"] is None
or open_dt < teams_data[team_id]["first"]
):
teams_data[team_id]["first"] = open_dt
if (
teams_data[team_id]["last"] is None
or open_dt > teams_data[team_id]["last"]
):
teams_data[team_id]["last"] = open_dt
except Exception:
pass
# Format results
teams_list = []
for team_id, data in teams_data.items():
teams_list.append(
{
"team_id": team_id,
"abbrev": data["abbrev"],
"name": data["lname"],
"packs": data["count"],
"first_pack": data["first"].isoformat() if data["first"] else None,
"last_pack": data["last"].isoformat() if data["last"] else None,
}
)
# Sort by pack count
teams_list.sort(key=lambda x: x["packs"], reverse=True)
result = {"total": total, "teams": teams_list, "date": today.isoformat()}
if len(packs) == limit:
result["note"] = f"Hit limit of {limit} packs - actual count may be higher"
return result
def distribute_packs(
self,
num_packs: int = 5,
exclude_team_abbrev: Optional[List[str]] = None,
pack_type_id: int = 1,
season: Optional[int] = None,
cardset_id: Optional[int] = None,
) -> Dict:
"""
Distribute packs to all human-controlled teams
Args:
num_packs: Number of packs to give to each team (default: 5)
exclude_team_abbrev: List of team abbreviations to exclude (default: None)
pack_type_id: Pack type ID (default: 1 = Standard packs)
season: Season to distribute for (default: current season)
cardset_id: Cardset ID for pack types that require it (e.g., Promo Choice = type 9)
Returns:
Dict with keys:
- total_packs: Total packs distributed
- teams_count: Number of teams that received packs
- teams: List of teams that received packs
Example:
# Give 10 packs to all teams
result = api.distribute_packs(num_packs=10)
# Give 11 packs to all teams except CAR
result = api.distribute_packs(num_packs=11, exclude_team_abbrev=['CAR'])
"""
if exclude_team_abbrev is None:
exclude_team_abbrev = []
# Convert to uppercase for case-insensitive matching
exclude_team_abbrev = [abbrev.upper() for abbrev in exclude_team_abbrev]
# Get current season if not specified
if season is None:
current = self.get("current")
season = current["season"]
self._log(f"Distributing {num_packs} packs to season {season} teams")
# Get all teams for season
all_teams = self.list_teams(season=season)
# Filter for human-controlled teams only
qualifying_teams = []
for team in all_teams:
if not team["is_ai"] and "gauntlet" not in team["abbrev"].lower():
# Check if team is in exclusion list
if team["abbrev"].upper() in exclude_team_abbrev:
self._log(f"Excluding team {team['abbrev']}: {team['sname']}")
continue
qualifying_teams.append(team)
self._log(f"Found {len(qualifying_teams)} qualifying teams")
if exclude_team_abbrev:
self._log(f"Excluded teams: {', '.join(exclude_team_abbrev)}")
# Distribute packs to each team
total_packs = 0
for team in qualifying_teams:
self._log(f"Giving {num_packs} packs to {team['abbrev']} ({team['sname']})")
# Create pack payload
packs = [
{
"team_id": team["id"],
"pack_type_id": pack_type_id,
"pack_cardset_id": cardset_id,
}
for _ in range(num_packs)
]
try:
self.create_packs(packs)
total_packs += num_packs
self._log(
f" ✓ Successfully gave {num_packs} packs to {team['abbrev']}"
)
except Exception as e:
self._log(f" ✗ Failed to give packs to {team['abbrev']}: {e}")
raise
result = {
"total_packs": total_packs,
"teams_count": len(qualifying_teams),
"teams": qualifying_teams,
}
self._log(
f"Distribution complete: {total_packs} packs to {len(qualifying_teams)} teams"
)
return result
# ====================
# Gauntlet Operations
# ====================
def list_gauntlet_runs(
self,
event_id: Optional[int] = None,
team_id: Optional[int] = None,
active_only: bool = False,
) -> List[Dict]:
"""
List gauntlet runs
Args:
event_id: Filter by event
team_id: Filter by team
active_only: Only show active runs
Returns:
List of run dicts
"""
params = []
if event_id:
params.append(("gauntlet_id", event_id))
if team_id:
params.append(("team_id", team_id))
if active_only:
params.append(("is_active", "true"))
result = self.get("gauntletruns", params=params if params else None)
return result.get("runs", [])
def end_gauntlet_run(self, run_id: int) -> Dict:
"""
End a gauntlet run by setting ended timestamp
Args:
run_id: Run ID
Returns:
Updated run dict
"""
return self.patch("gauntletruns", object_id=run_id, params=[("ended", "true")])
# ====================
# Player Operations
# ====================
def get_player(self, player_id: int) -> Dict:
"""
Get a player by ID
Args:
player_id: Player ID
Returns:
Player dict
"""
return self.get("players", object_id=player_id)
def list_players(
self,
cardset_id: Optional[int] = None,
rarity: Optional[str] = None,
timeout: int = 30,
) -> List[Dict]:
"""
List players. At least one filter is required to avoid massive unfiltered queries.
Args:
cardset_id: Filter by cardset
rarity: Filter by rarity
timeout: Request timeout in seconds (default: 30, player lists are large)
Returns:
List of player dicts
"""
if not cardset_id and not rarity:
raise ValueError(
"list_players requires at least one filter (cardset_id or rarity)"
)
params = []
if cardset_id:
params.append(("cardset", cardset_id))
if rarity:
params.append(("rarity", rarity))
result = self.get("players", params=params, timeout=timeout)
return result.get("players", [])
# ====================
# Result/Stats Operations
# ====================
def list_results(
self, season: Optional[int] = None, team_id: Optional[int] = None
) -> List[Dict]:
"""
List game results. At least one filter is required to avoid massive unfiltered queries.
Args:
season: Filter by season
team_id: Filter by team
Returns:
List of result dicts
"""
if not season and not team_id:
raise ValueError(
"list_results requires at least one filter (season or team_id)"
)
params = []
if season:
params.append(("season", season))
if team_id:
params.append(("team_id", team_id))
result = self.get("results", params=params if params else None)
return result.get("results", [])
# ====================
# Helper Methods
# ====================
def find_gauntlet_teams(
self, event_id: Optional[int] = None, active_only: bool = False
) -> List[Dict]:
"""
Find gauntlet teams (teams with 'Gauntlet' in abbrev)
Args:
event_id: Filter by event
active_only: Only teams with active runs
Returns:
List of team dicts with run information
"""
if active_only:
# Get active runs, then get teams
runs = self.list_gauntlet_runs(event_id=event_id, active_only=True)
teams_with_runs = []
for run in runs:
team = run["team"]
team["active_run"] = run
teams_with_runs.append(team)
return teams_with_runs
else:
# Get all teams with 'Gauntlet' in name
all_teams = self.list_teams()
gauntlet_teams = [t for t in all_teams if "Gauntlet" in t.get("abbrev", "")]
# Optionally add run info
if event_id:
runs = self.list_gauntlet_runs(event_id=event_id)
run_by_team = {r["team"]["id"]: r for r in runs}
for team in gauntlet_teams:
if team["id"] in run_by_team:
team["run"] = run_by_team[team["id"]]
return gauntlet_teams
def main():
"""Example usage"""
import argparse
parser = argparse.ArgumentParser(description="Paper Dynasty API Client")
parser.add_argument(
"--env", choices=["prod", "dev"], default="dev", help="Environment"
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
try:
api = PaperDynastyAPI(environment=args.env, verbose=args.verbose)
print(f"✓ Connected to {args.env.upper()} database: {api.base_url}")
# Example: List gauntlet teams
print("\nExample: Listing gauntlet teams...")
teams = api.find_gauntlet_teams(active_only=True)
print(f"Found {len(teams)} active gauntlet teams")
except ValueError as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()