#!/usr/bin/env python3 """ Paper Dynasty API Client Shared API client for all Paper Dynasty operations. Provides methods for interacting with teams, players, cards, gauntlets, and more. Environment Variables: API_TOKEN: Bearer token for API authentication (required) DATABASE: 'prod' or 'dev' (default: dev) """ import os import sys from typing import Optional, Dict, List, Any import requests class PaperDynastyAPI: """ Paper Dynasty API client for remote database access Usage: api = PaperDynastyAPI(environment='prod') # Get a team team = api.get_team(abbrev='SKB') # List gauntlet runs runs = api.list_gauntlet_runs(event_id=8, active_only=True) # Wipe team cards api.wipe_team_cards(team_id=464) """ def __init__(self, environment: str = 'dev', token: Optional[str] = None, verbose: bool = False): """ Initialize API client Args: environment: 'prod' or 'dev' token: API token (defaults to API_TOKEN env var). Only required for write operations (POST/PATCH/DELETE). verbose: Print request/response details """ self.env = environment.lower() self.base_url = ( 'https://pd.manticorum.com/api' if 'prod' in self.env else 'https://pddev.manticorum.com/api' ) self.token = token or os.getenv('API_TOKEN') self.verbose = verbose self.headers = {'Content-Type': 'application/json'} if self.token: self.headers['Authorization'] = f'Bearer {self.token}' def _require_token(self): """Raise if no API token is set (needed for write operations)""" if not self.token: raise ValueError( "API_TOKEN required for write operations. " "Set it with: export API_TOKEN='your-token-here'" ) def _log(self, message: str): """Print message if verbose mode enabled""" if self.verbose: print(f"[API] {message}") def _build_url(self, endpoint: str, api_ver: int = 2, object_id: Optional[int] = None, params: Optional[List] = None) -> str: """Build API URL with parameters""" url = f'{self.base_url}/v{api_ver}/{endpoint}' if object_id is not None: url += f'/{object_id}' if params: param_strs = [f'{k}={v}' for k, v in params] url += '?' + '&'.join(param_strs) return url # ==================== # Low-level HTTP methods # ==================== def get(self, endpoint: str, object_id: Optional[int] = None, params: Optional[List] = None, timeout: int = 10) -> Dict: """GET request to API""" url = self._build_url(endpoint, object_id=object_id, params=params) self._log(f"GET {url}") response = requests.get(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.json() def post(self, endpoint: str, payload: Optional[Dict] = None, timeout: int = 10) -> Any: """POST request to API""" self._require_token() url = self._build_url(endpoint) self._log(f"POST {url}") response = requests.post(url, headers=self.headers, json=payload, timeout=timeout) response.raise_for_status() return response.json() if response.text else {} def patch(self, endpoint: str, object_id: int, params: List, timeout: int = 10) -> Dict: """PATCH request to API""" self._require_token() url = self._build_url(endpoint, object_id=object_id, params=params) self._log(f"PATCH {url}") response = requests.patch(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.json() def delete(self, endpoint: str, object_id: int, timeout: int = 10) -> str: """DELETE request to API""" self._require_token() url = self._build_url(endpoint, object_id=object_id) self._log(f"DELETE {url}") response = requests.delete(url, headers=self.headers, timeout=timeout) response.raise_for_status() return response.text # ==================== # Team Operations # ==================== def get_team(self, team_id: Optional[int] = None, abbrev: Optional[str] = None) -> Dict: """ Get a team by ID or abbreviation Args: team_id: Team ID abbrev: Team abbreviation (e.g., 'SKB') Returns: Team dict """ if team_id: return self.get('teams', object_id=team_id) elif abbrev: result = self.get('teams', params=[('abbrev', abbrev.upper())]) teams = result.get('teams', []) if not teams: raise ValueError(f"Team '{abbrev}' not found") return teams[0] else: raise ValueError("Must provide team_id or abbrev") def list_teams(self, season: Optional[int] = None, event_id: Optional[int] = None) -> List[Dict]: """ List teams Args: season: Filter by season event_id: Filter by event Returns: List of team dicts """ params = [] if season: params.append(('season', season)) if event_id: params.append(('event', event_id)) result = self.get('teams', params=params if params else None) return result.get('teams', []) # ==================== # Card Operations # ==================== def wipe_team_cards(self, team_id: int) -> Any: """ Wipe all cards for a team (unassigns them) Args: team_id: Team ID Returns: API response """ return self.post(f'cards/wipe-team/{team_id}') def list_cards(self, team_id: Optional[int] = None, player_id: Optional[int] = None) -> List[Dict]: """ List cards. At least one filter is required to avoid massive unfiltered queries. Args: team_id: Filter by team player_id: Filter by player Returns: List of card dicts """ if not team_id and not player_id: raise ValueError("list_cards requires at least one filter (team_id or player_id)") params = [] if team_id: params.append(('team_id', team_id)) if player_id: params.append(('player_id', player_id)) result = self.get('cards', params=params if params else None) return result.get('cards', []) # ==================== # Pack Operations # ==================== def list_packs( self, team_id: Optional[int] = None, opened: Optional[bool] = None, new_to_old: bool = False, limit: Optional[int] = None, timeout: int = 10 ) -> List[Dict]: """ List packs Args: team_id: Filter by team opened: Filter by opened status (True=opened, False=unopened) new_to_old: Sort newest to oldest (default: False) limit: Maximum number of results (e.g., 200, 1000, 2000) timeout: Request timeout in seconds (default: 10, increase for large queries) Returns: List of pack dicts Examples: # Get 200 most recently opened packs packs = api.list_packs(opened=True, new_to_old=True, limit=200) # Get unopened packs for a team packs = api.list_packs(team_id=69, opened=False) # Large query with extended timeout packs = api.list_packs(opened=True, limit=2000, timeout=30) """ if team_id is None and opened is None: raise ValueError("list_packs requires at least one filter (team_id or opened)") params = [] if team_id: params.append(('team_id', team_id)) if opened is not None: params.append(('opened', 'true' if opened else 'false')) if new_to_old: params.append(('new_to_old', 'true')) if limit: params.append(('limit', str(limit))) result = self.get('packs', params=params if params else None, timeout=timeout) return result.get('packs', []) def delete_pack(self, pack_id: int) -> str: """ Delete a pack Args: pack_id: Pack ID Returns: Success message """ return self.delete('packs', object_id=pack_id) def update_pack(self, pack_id: int, pack_cardset_id: Optional[int] = None, pack_team_id: Optional[int] = None, pack_type_id: Optional[int] = None) -> Dict: """ Update pack properties (PATCH) Args: pack_id: Pack ID pack_cardset_id: Update pack cardset (use -1 to clear) pack_team_id: Update pack team (use -1 to clear) pack_type_id: Update pack type Returns: Updated pack dict Example: # Fix missing cardset on Team Choice pack api.update_pack(pack_id=21207, pack_cardset_id=27) """ params = [] if pack_cardset_id is not None: params.append(('pack_cardset_id', pack_cardset_id)) if pack_team_id is not None: params.append(('pack_team_id', pack_team_id)) if pack_type_id is not None: params.append(('pack_type_id', pack_type_id)) return self.patch('packs', object_id=pack_id, params=params) def create_packs(self, packs: List[Dict]) -> Any: """ Create packs (bulk distribution) Args: packs: List of pack dicts with keys: team_id, pack_type_id, pack_cardset_id Returns: API response Example: # Give 5 Standard packs to team 31 api.create_packs([ {'team_id': 31, 'pack_type_id': 1, 'pack_cardset_id': None} for _ in range(5) ]) """ payload = {'packs': packs} return self.post('packs', payload=payload) def get_packs_opened_today(self, limit: int = 2000, timeout: int = 30) -> Dict: """ Get analytics on packs opened today Args: limit: Number of recent packs to check (default: 2000) timeout: Request timeout in seconds (default: 30) Returns: Dict with keys: - total: Total packs opened today - teams: List of dicts with team info and pack counts - note: Warning if limit was reached Example: result = api.get_packs_opened_today() print(f"{result['total']} packs opened by {len(result['teams'])} teams") """ from datetime import datetime, timezone from collections import defaultdict # Get recent opened packs packs = self.list_packs(opened=True, new_to_old=True, limit=limit, timeout=timeout) # Today's date (UTC) today = datetime.now(timezone.utc).date() # Count packs by team teams_data = defaultdict(lambda: {'count': 0, 'abbrev': '', 'lname': '', 'first': None, 'last': None}) total = 0 for pack in packs: if pack.get('open_time'): try: open_dt = datetime.fromtimestamp(pack['open_time'] / 1000, tz=timezone.utc) if open_dt.date() == today: total += 1 team_id = pack['team']['id'] teams_data[team_id]['abbrev'] = pack['team']['abbrev'] teams_data[team_id]['lname'] = pack['team']['lname'] teams_data[team_id]['count'] += 1 if teams_data[team_id]['first'] is None or open_dt < teams_data[team_id]['first']: teams_data[team_id]['first'] = open_dt if teams_data[team_id]['last'] is None or open_dt > teams_data[team_id]['last']: teams_data[team_id]['last'] = open_dt except Exception: pass # Format results teams_list = [] for team_id, data in teams_data.items(): teams_list.append({ 'team_id': team_id, 'abbrev': data['abbrev'], 'name': data['lname'], 'packs': data['count'], 'first_pack': data['first'].isoformat() if data['first'] else None, 'last_pack': data['last'].isoformat() if data['last'] else None }) # Sort by pack count teams_list.sort(key=lambda x: x['packs'], reverse=True) result = { 'total': total, 'teams': teams_list, 'date': today.isoformat() } if len(packs) == limit: result['note'] = f'Hit limit of {limit} packs - actual count may be higher' return result def distribute_packs( self, num_packs: int = 5, exclude_team_abbrev: Optional[List[str]] = None, pack_type_id: int = 1, season: Optional[int] = None ) -> Dict: """ Distribute packs to all human-controlled teams Args: num_packs: Number of packs to give to each team (default: 5) exclude_team_abbrev: List of team abbreviations to exclude (default: None) pack_type_id: Pack type ID (default: 1 = Standard packs) season: Season to distribute for (default: current season) Returns: Dict with keys: - total_packs: Total packs distributed - teams_count: Number of teams that received packs - teams: List of teams that received packs Example: # Give 10 packs to all teams result = api.distribute_packs(num_packs=10) # Give 11 packs to all teams except CAR result = api.distribute_packs(num_packs=11, exclude_team_abbrev=['CAR']) """ if exclude_team_abbrev is None: exclude_team_abbrev = [] # Convert to uppercase for case-insensitive matching exclude_team_abbrev = [abbrev.upper() for abbrev in exclude_team_abbrev] # Get current season if not specified if season is None: current = self.get('current') season = current['season'] self._log(f"Distributing {num_packs} packs to season {season} teams") # Get all teams for season all_teams = self.list_teams(season=season) # Filter for human-controlled teams only qualifying_teams = [] for team in all_teams: if not team['is_ai'] and 'gauntlet' not in team['abbrev'].lower(): # Check if team is in exclusion list if team['abbrev'].upper() in exclude_team_abbrev: self._log(f"Excluding team {team['abbrev']}: {team['sname']}") continue qualifying_teams.append(team) self._log(f"Found {len(qualifying_teams)} qualifying teams") if exclude_team_abbrev: self._log(f"Excluded teams: {', '.join(exclude_team_abbrev)}") # Distribute packs to each team total_packs = 0 for team in qualifying_teams: self._log(f"Giving {num_packs} packs to {team['abbrev']} ({team['sname']})") # Create pack payload packs = [{ 'team_id': team['id'], 'pack_type_id': pack_type_id, 'pack_cardset_id': None } for _ in range(num_packs)] try: self.create_packs(packs) total_packs += num_packs self._log(f" ✓ Successfully gave {num_packs} packs to {team['abbrev']}") except Exception as e: self._log(f" ✗ Failed to give packs to {team['abbrev']}: {e}") raise result = { 'total_packs': total_packs, 'teams_count': len(qualifying_teams), 'teams': qualifying_teams } self._log(f"Distribution complete: {total_packs} packs to {len(qualifying_teams)} teams") return result # ==================== # Gauntlet Operations # ==================== def list_gauntlet_runs(self, event_id: Optional[int] = None, team_id: Optional[int] = None, active_only: bool = False) -> List[Dict]: """ List gauntlet runs Args: event_id: Filter by event team_id: Filter by team active_only: Only show active runs Returns: List of run dicts """ params = [] if event_id: params.append(('gauntlet_id', event_id)) if team_id: params.append(('team_id', team_id)) if active_only: params.append(('is_active', 'true')) result = self.get('gauntletruns', params=params if params else None) return result.get('runs', []) def end_gauntlet_run(self, run_id: int) -> Dict: """ End a gauntlet run by setting ended timestamp Args: run_id: Run ID Returns: Updated run dict """ return self.patch('gauntletruns', object_id=run_id, params=[('ended', 'true')]) # ==================== # Player Operations # ==================== def get_player(self, player_id: int) -> Dict: """ Get a player by ID Args: player_id: Player ID Returns: Player dict """ return self.get('players', object_id=player_id) def list_players(self, cardset_id: Optional[int] = None, rarity: Optional[str] = None, timeout: int = 30) -> List[Dict]: """ List players. At least one filter is required to avoid massive unfiltered queries. Args: cardset_id: Filter by cardset rarity: Filter by rarity timeout: Request timeout in seconds (default: 30, player lists are large) Returns: List of player dicts """ if not cardset_id and not rarity: raise ValueError("list_players requires at least one filter (cardset_id or rarity)") params = [] if cardset_id: params.append(('cardset', cardset_id)) if rarity: params.append(('rarity', rarity)) result = self.get('players', params=params, timeout=timeout) return result.get('players', []) # ==================== # Result/Stats Operations # ==================== def list_results(self, season: Optional[int] = None, team_id: Optional[int] = None) -> List[Dict]: """ List game results. At least one filter is required to avoid massive unfiltered queries. Args: season: Filter by season team_id: Filter by team Returns: List of result dicts """ if not season and not team_id: raise ValueError("list_results requires at least one filter (season or team_id)") params = [] if season: params.append(('season', season)) if team_id: params.append(('team_id', team_id)) result = self.get('results', params=params if params else None) return result.get('results', []) # ==================== # Helper Methods # ==================== def find_gauntlet_teams(self, event_id: Optional[int] = None, active_only: bool = False) -> List[Dict]: """ Find gauntlet teams (teams with 'Gauntlet' in abbrev) Args: event_id: Filter by event active_only: Only teams with active runs Returns: List of team dicts with run information """ if active_only: # Get active runs, then get teams runs = self.list_gauntlet_runs(event_id=event_id, active_only=True) teams_with_runs = [] for run in runs: team = run['team'] team['active_run'] = run teams_with_runs.append(team) return teams_with_runs else: # Get all teams with 'Gauntlet' in name all_teams = self.list_teams() gauntlet_teams = [t for t in all_teams if 'Gauntlet' in t.get('abbrev', '')] # Optionally add run info if event_id: runs = self.list_gauntlet_runs(event_id=event_id) run_by_team = {r['team']['id']: r for r in runs} for team in gauntlet_teams: if team['id'] in run_by_team: team['run'] = run_by_team[team['id']] return gauntlet_teams def main(): """Example usage""" import argparse parser = argparse.ArgumentParser(description='Paper Dynasty API Client') parser.add_argument('--env', choices=['prod', 'dev'], default='dev', help='Environment') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args() try: api = PaperDynastyAPI(environment=args.env, verbose=args.verbose) print(f"✓ Connected to {args.env.upper()} database: {api.base_url}") # Example: List gauntlet teams print("\nExample: Listing gauntlet teams...") teams = api.find_gauntlet_teams(active_only=True) print(f"Found {len(teams)} active gauntlet teams") except ValueError as e: print(f"❌ Error: {e}") sys.exit(1) if __name__ == '__main__': main()