#!/usr/bin/env python3 """ Paper Dynasty API Client Shared API client for all Paper Dynasty operations. Provides methods for interacting with teams, players, cards, gauntlets, and more. Environment Variables: API_TOKEN: Bearer token for API authentication (required) DATABASE: 'prod' or 'dev' (default: dev) """ import os import sys from typing import Optional, Dict, List, Any import requests class PaperDynastyAPI: """ Paper Dynasty API client for remote database access Usage: api = PaperDynastyAPI(environment='prod') # Get a team team = api.get_team(abbrev='SKB') # List gauntlet runs runs = api.list_gauntlet_runs(event_id=8, active_only=True) # Wipe team cards api.wipe_team_cards(team_id=464) """ def __init__( self, environment: str = "dev", token: Optional[str] = None, verbose: bool = False, ): """ Initialize API client Args: environment: 'prod' or 'dev' token: API token (defaults to API_TOKEN env var). Only required for write operations (POST/PATCH/DELETE). verbose: Print request/response details """ self.env = environment.lower() self.base_url = ( "https://pd.manticorum.com/api" if "prod" in self.env else "https://pddev.manticorum.com/api" ) self.token = token or os.getenv("API_TOKEN") self.verbose = verbose self.headers = {"Content-Type": "application/json"} if self.token: self.headers["Authorization"] = f"Bearer {self.token}" def _require_token(self): """Raise if no API token is set (needed for write operations)""" if not self.token: raise ValueError( "API_TOKEN required for write operations. " "Set it with: export API_TOKEN='your-token-here'" ) def _log(self, message: str): """Print message if verbose mode enabled""" if self.verbose: print(f"[API] {message}") def _build_url( self, endpoint: str, api_ver: int = 2, object_id: Optional[int] = None, params: Optional[List] = None, ) -> str: """Build API URL with parameters""" url = f"{self.base_url}/v{api_ver}/{endpoint}" if object_id is not None: url += f"/{object_id}" if params: param_strs = [f"{k}={v}" for k, v in params] url += "?" + "&".join(param_strs) return url # ==================== # Low-level HTTP methods # ==================== def get( self, endpoint: str, object_id: Optional[int] = None, params: Optional[List] = None, timeout: int = 10, ) -> Dict: """GET request to API""" url = self._build_url(endpoint, object_id=object_id, params=params) self._log(f"GET {url}") response = requests.get(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.json() def post( self, endpoint: str, payload: Optional[Dict] = None, timeout: int = 10 ) -> Any: """POST request to API""" self._require_token() url = self._build_url(endpoint) self._log(f"POST {url}") response = requests.post( url, headers=self.headers, json=payload, timeout=timeout ) response.raise_for_status() return response.json() if response.text else {} def patch( self, endpoint: str, object_id: int, params: List, timeout: int = 10 ) -> Dict: """PATCH request to API""" self._require_token() url = self._build_url(endpoint, object_id=object_id, params=params) self._log(f"PATCH {url}") response = requests.patch(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.json() def delete(self, endpoint: str, object_id: int, timeout: int = 10) -> str: """DELETE request to API""" self._require_token() url = self._build_url(endpoint, object_id=object_id) self._log(f"DELETE {url}") response = requests.delete(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.text # ==================== # Team Operations # ==================== def get_team( self, team_id: Optional[int] = None, abbrev: Optional[str] = None ) -> Dict: """ Get a team by ID or abbreviation Args: team_id: Team ID abbrev: Team abbreviation (e.g., 'SKB') Returns: Team dict """ if team_id: return self.get("teams", object_id=team_id) elif abbrev: result = self.get("teams", params=[("abbrev", abbrev.upper())]) teams = result.get("teams", []) if not teams: raise ValueError(f"Team '{abbrev}' not found") return teams[0] else: raise ValueError("Must provide team_id or abbrev") def list_teams( self, season: Optional[int] = None, event_id: Optional[int] = None ) -> List[Dict]: """ List teams Args: season: Filter by season event_id: Filter by event Returns: List of team dicts """ params = [] if season: params.append(("season", season)) if event_id: params.append(("event", event_id)) result = self.get("teams", params=params if params else None) return result.get("teams", []) # ==================== # Card Operations # ==================== def wipe_team_cards(self, team_id: int) -> Any: """ Wipe all cards for a team (unassigns them) Args: team_id: Team ID Returns: API response """ return self.post(f"cards/wipe-team/{team_id}") def list_cards( self, team_id: Optional[int] = None, player_id: Optional[int] = None ) -> List[Dict]: """ List cards. At least one filter is required to avoid massive unfiltered queries. Args: team_id: Filter by team player_id: Filter by player Returns: List of card dicts """ if not team_id and not player_id: raise ValueError( "list_cards requires at least one filter (team_id or player_id)" ) params = [] if team_id: params.append(("team_id", team_id)) if player_id: params.append(("player_id", player_id)) result = self.get("cards", params=params if params else None) return result.get("cards", []) # ==================== # Pack Operations # ==================== def list_packs( self, team_id: Optional[int] = None, opened: Optional[bool] = None, new_to_old: bool = False, limit: Optional[int] = None, timeout: int = 10, ) -> List[Dict]: """ List packs Args: team_id: Filter by team opened: Filter by opened status (True=opened, False=unopened) new_to_old: Sort newest to oldest (default: False) limit: Maximum number of results (e.g., 200, 1000, 2000) timeout: Request timeout in seconds (default: 10, increase for large queries) Returns: List of pack dicts Examples: # Get 200 most recently opened packs packs = api.list_packs(opened=True, new_to_old=True, limit=200) # Get unopened packs for a team packs = api.list_packs(team_id=69, opened=False) # Large query with extended timeout packs = api.list_packs(opened=True, limit=2000, timeout=30) """ if team_id is None and opened is None: raise ValueError( "list_packs requires at least one filter (team_id or opened)" ) params = [] if team_id: params.append(("team_id", team_id)) if opened is not None: params.append(("opened", "true" if opened else "false")) if new_to_old: params.append(("new_to_old", "true")) if limit: params.append(("limit", str(limit))) result = self.get("packs", params=params if params else None, timeout=timeout) return result.get("packs", []) def delete_pack(self, pack_id: int) -> str: """ Delete a pack Args: pack_id: Pack ID Returns: Success message """ return self.delete("packs", object_id=pack_id) def update_pack( self, pack_id: int, pack_cardset_id: Optional[int] = None, pack_team_id: Optional[int] = None, pack_type_id: Optional[int] = None, ) -> Dict: """ Update pack properties (PATCH) Args: pack_id: Pack ID pack_cardset_id: Update pack cardset (use -1 to clear) pack_team_id: Update pack team (use -1 to clear) pack_type_id: Update pack type Returns: Updated pack dict Example: # Fix missing cardset on Team Choice pack api.update_pack(pack_id=21207, pack_cardset_id=27) """ params = [] if pack_cardset_id is not None: params.append(("pack_cardset_id", pack_cardset_id)) if pack_team_id is not None: params.append(("pack_team_id", pack_team_id)) if pack_type_id is not None: params.append(("pack_type_id", pack_type_id)) return self.patch("packs", object_id=pack_id, params=params) def create_packs(self, packs: List[Dict]) -> Any: """ Create packs (bulk distribution) Args: packs: List of pack dicts with keys: team_id, pack_type_id, pack_cardset_id Returns: API response Example: # Give 5 Standard packs to team 31 api.create_packs([ {'team_id': 31, 'pack_type_id': 1, 'pack_cardset_id': None} for _ in range(5) ]) """ payload = {"packs": packs} return self.post("packs", payload=payload) def get_packs_opened_today(self, limit: int = 2000, timeout: int = 30) -> Dict: """ Get analytics on packs opened today Args: limit: Number of recent packs to check (default: 2000) timeout: Request timeout in seconds (default: 30) Returns: Dict with keys: - total: Total packs opened today - teams: List of dicts with team info and pack counts - note: Warning if limit was reached Example: result = api.get_packs_opened_today() print(f"{result['total']} packs opened by {len(result['teams'])} teams") """ from datetime import datetime, timezone from collections import defaultdict # Get recent opened packs packs = self.list_packs( opened=True, new_to_old=True, limit=limit, timeout=timeout ) # Today's date (UTC) today = datetime.now(timezone.utc).date() # Count packs by team teams_data = defaultdict( lambda: {"count": 0, "abbrev": "", "lname": "", "first": None, "last": None} ) total = 0 for pack in packs: if pack.get("open_time"): try: open_dt = datetime.fromtimestamp( pack["open_time"] / 1000, tz=timezone.utc ) if open_dt.date() == today: total += 1 team_id = pack["team"]["id"] teams_data[team_id]["abbrev"] = pack["team"]["abbrev"] teams_data[team_id]["lname"] = pack["team"]["lname"] teams_data[team_id]["count"] += 1 if ( teams_data[team_id]["first"] is None or open_dt < teams_data[team_id]["first"] ): teams_data[team_id]["first"] = open_dt if ( teams_data[team_id]["last"] is None or open_dt > teams_data[team_id]["last"] ): teams_data[team_id]["last"] = open_dt except Exception: pass # Format results teams_list = [] for team_id, data in teams_data.items(): teams_list.append( { "team_id": team_id, "abbrev": data["abbrev"], "name": data["lname"], "packs": data["count"], "first_pack": data["first"].isoformat() if data["first"] else None, "last_pack": data["last"].isoformat() if data["last"] else None, } ) # Sort by pack count teams_list.sort(key=lambda x: x["packs"], reverse=True) result = {"total": total, "teams": teams_list, "date": today.isoformat()} if len(packs) == limit: result["note"] = f"Hit limit of {limit} packs - actual count may be higher" return result def distribute_packs( self, num_packs: int = 5, exclude_team_abbrev: Optional[List[str]] = None, pack_type_id: int = 1, season: Optional[int] = None, cardset_id: Optional[int] = None, ) -> Dict: """ Distribute packs to all human-controlled teams Args: num_packs: Number of packs to give to each team (default: 5) exclude_team_abbrev: List of team abbreviations to exclude (default: None) pack_type_id: Pack type ID (default: 1 = Standard packs) season: Season to distribute for (default: current season) cardset_id: Cardset ID for pack types that require it (e.g., Promo Choice = type 9) Returns: Dict with keys: - total_packs: Total packs distributed - teams_count: Number of teams that received packs - teams: List of teams that received packs Example: # Give 10 packs to all teams result = api.distribute_packs(num_packs=10) # Give 11 packs to all teams except CAR result = api.distribute_packs(num_packs=11, exclude_team_abbrev=['CAR']) """ if exclude_team_abbrev is None: exclude_team_abbrev = [] # Convert to uppercase for case-insensitive matching exclude_team_abbrev = [abbrev.upper() for abbrev in exclude_team_abbrev] # Get current season if not specified if season is None: current = self.get("current") season = current["season"] self._log(f"Distributing {num_packs} packs to season {season} teams") # Get all teams for season all_teams = self.list_teams(season=season) # Filter for human-controlled teams only qualifying_teams = [] for team in all_teams: if not team["is_ai"] and "gauntlet" not in team["abbrev"].lower(): # Check if team is in exclusion list if team["abbrev"].upper() in exclude_team_abbrev: self._log(f"Excluding team {team['abbrev']}: {team['sname']}") continue qualifying_teams.append(team) self._log(f"Found {len(qualifying_teams)} qualifying teams") if exclude_team_abbrev: self._log(f"Excluded teams: {', '.join(exclude_team_abbrev)}") # Distribute packs to each team total_packs = 0 for team in qualifying_teams: self._log(f"Giving {num_packs} packs to {team['abbrev']} ({team['sname']})") # Create pack payload packs = [ { "team_id": team["id"], "pack_type_id": pack_type_id, "pack_cardset_id": cardset_id, } for _ in range(num_packs) ] try: self.create_packs(packs) total_packs += num_packs self._log( f" ✓ Successfully gave {num_packs} packs to {team['abbrev']}" ) except Exception as e: self._log(f" ✗ Failed to give packs to {team['abbrev']}: {e}") raise result = { "total_packs": total_packs, "teams_count": len(qualifying_teams), "teams": qualifying_teams, } self._log( f"Distribution complete: {total_packs} packs to {len(qualifying_teams)} teams" ) return result # ==================== # Gauntlet Operations # ==================== def list_gauntlet_runs( self, event_id: Optional[int] = None, team_id: Optional[int] = None, active_only: bool = False, ) -> List[Dict]: """ List gauntlet runs Args: event_id: Filter by event team_id: Filter by team active_only: Only show active runs Returns: List of run dicts """ params = [] if event_id: params.append(("gauntlet_id", event_id)) if team_id: params.append(("team_id", team_id)) if active_only: params.append(("is_active", "true")) result = self.get("gauntletruns", params=params if params else None) return result.get("runs", []) def end_gauntlet_run(self, run_id: int) -> Dict: """ End a gauntlet run by setting ended timestamp Args: run_id: Run ID Returns: Updated run dict """ return self.patch("gauntletruns", object_id=run_id, params=[("ended", "true")]) # ==================== # Player Operations # ==================== def get_player(self, player_id: int) -> Dict: """ Get a player by ID Args: player_id: Player ID Returns: Player dict """ return self.get("players", object_id=player_id) def list_players( self, cardset_id: Optional[int] = None, rarity: Optional[str] = None, timeout: int = 30, ) -> List[Dict]: """ List players. At least one filter is required to avoid massive unfiltered queries. Args: cardset_id: Filter by cardset rarity: Filter by rarity timeout: Request timeout in seconds (default: 30, player lists are large) Returns: List of player dicts """ if not cardset_id and not rarity: raise ValueError( "list_players requires at least one filter (cardset_id or rarity)" ) params = [] if cardset_id: params.append(("cardset", cardset_id)) if rarity: params.append(("rarity", rarity)) result = self.get("players", params=params, timeout=timeout) return result.get("players", []) # ==================== # Result/Stats Operations # ==================== def list_results( self, season: Optional[int] = None, team_id: Optional[int] = None ) -> List[Dict]: """ List game results. At least one filter is required to avoid massive unfiltered queries. Args: season: Filter by season team_id: Filter by team Returns: List of result dicts """ if not season and not team_id: raise ValueError( "list_results requires at least one filter (season or team_id)" ) params = [] if season: params.append(("season", season)) if team_id: params.append(("team_id", team_id)) result = self.get("results", params=params if params else None) return result.get("results", []) # ==================== # Helper Methods # ==================== def find_gauntlet_teams( self, event_id: Optional[int] = None, active_only: bool = False ) -> List[Dict]: """ Find gauntlet teams (teams with 'Gauntlet' in abbrev) Args: event_id: Filter by event active_only: Only teams with active runs Returns: List of team dicts with run information """ if active_only: # Get active runs, then get teams runs = self.list_gauntlet_runs(event_id=event_id, active_only=True) teams_with_runs = [] for run in runs: team = run["team"] team["active_run"] = run teams_with_runs.append(team) return teams_with_runs else: # Get all teams with 'Gauntlet' in name all_teams = self.list_teams() gauntlet_teams = [t for t in all_teams if "Gauntlet" in t.get("abbrev", "")] # Optionally add run info if event_id: runs = self.list_gauntlet_runs(event_id=event_id) run_by_team = {r["team"]["id"]: r for r in runs} for team in gauntlet_teams: if team["id"] in run_by_team: team["run"] = run_by_team[team["id"]] return gauntlet_teams def main(): """Example usage""" import argparse parser = argparse.ArgumentParser(description="Paper Dynasty API Client") parser.add_argument( "--env", choices=["prod", "dev"], default="dev", help="Environment" ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() try: api = PaperDynastyAPI(environment=args.env, verbose=args.verbose) print(f"✓ Connected to {args.env.upper()} database: {api.base_url}") # Example: List gauntlet teams print("\nExample: Listing gauntlet teams...") teams = api.find_gauntlet_teams(active_only=True) print(f"Found {len(teams)} active gauntlet teams") except ValueError as e: print(f"❌ Error: {e}") sys.exit(1) if __name__ == "__main__": main()