Version control Claude Code configuration including: - Global instructions (CLAUDE.md) - User settings (settings.json) - Custom agents (architect, designer, engineer, etc.) - Custom skills (create-skill templates and workflows) Excludes session data, secrets, cache, and temporary files per .gitignore. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
638 lines
20 KiB
Python
Executable File
638 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Paper Dynasty API Client
|
|
|
|
Shared API client for all Paper Dynasty operations.
|
|
Provides methods for interacting with teams, players, cards, gauntlets, and more.
|
|
|
|
Environment Variables:
|
|
API_TOKEN: Bearer token for API authentication (required)
|
|
DATABASE: 'prod' or 'dev' (default: dev)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
from typing import Optional, Dict, List, Any
|
|
import requests
|
|
|
|
|
|
class PaperDynastyAPI:
|
|
"""
|
|
Paper Dynasty API client for remote database access
|
|
|
|
Usage:
|
|
api = PaperDynastyAPI(environment='prod')
|
|
|
|
# Get a team
|
|
team = api.get_team(abbrev='SKB')
|
|
|
|
# List gauntlet runs
|
|
runs = api.list_gauntlet_runs(event_id=8, active_only=True)
|
|
|
|
# Wipe team cards
|
|
api.wipe_team_cards(team_id=464)
|
|
"""
|
|
|
|
def __init__(self, environment: str = 'dev', token: Optional[str] = None, verbose: bool = False):
|
|
"""
|
|
Initialize API client
|
|
|
|
Args:
|
|
environment: 'prod' or 'dev'
|
|
token: API token (defaults to API_TOKEN env var)
|
|
verbose: Print request/response details
|
|
"""
|
|
self.env = environment.lower()
|
|
self.base_url = (
|
|
'https://pd.manticorum.com/api'
|
|
if 'prod' in self.env
|
|
else 'https://pddev.manticorum.com/api'
|
|
)
|
|
self.token = token or os.getenv('API_TOKEN')
|
|
self.verbose = verbose
|
|
|
|
if not self.token:
|
|
raise ValueError(
|
|
"API_TOKEN environment variable required. "
|
|
"Set it with: export API_TOKEN='your-token-here'"
|
|
)
|
|
|
|
self.headers = {
|
|
'Authorization': f'Bearer {self.token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def _log(self, message: str):
|
|
"""Print message if verbose mode enabled"""
|
|
if self.verbose:
|
|
print(f"[API] {message}")
|
|
|
|
def _build_url(self, endpoint: str, api_ver: int = 2, object_id: Optional[int] = None, params: Optional[List] = None) -> str:
|
|
"""Build API URL with parameters"""
|
|
url = f'{self.base_url}/v{api_ver}/{endpoint}'
|
|
|
|
if object_id is not None:
|
|
url += f'/{object_id}'
|
|
|
|
if params:
|
|
param_strs = [f'{k}={v}' for k, v in params]
|
|
url += '?' + '&'.join(param_strs)
|
|
|
|
return url
|
|
|
|
# ====================
|
|
# Low-level HTTP methods
|
|
# ====================
|
|
|
|
def get(self, endpoint: str, object_id: Optional[int] = None, params: Optional[List] = None, timeout: int = 10) -> Dict:
|
|
"""GET request to API"""
|
|
url = self._build_url(endpoint, object_id=object_id, params=params)
|
|
self._log(f"GET {url}")
|
|
response = requests.get(url, headers=self.headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def post(self, endpoint: str, payload: Optional[Dict] = None, timeout: int = 10) -> Any:
|
|
"""POST request to API"""
|
|
url = self._build_url(endpoint)
|
|
self._log(f"POST {url}")
|
|
response = requests.post(url, headers=self.headers, json=payload, timeout=timeout)
|
|
response.raise_for_status()
|
|
return response.json() if response.text else {}
|
|
|
|
def patch(self, endpoint: str, object_id: int, params: List, timeout: int = 10) -> Dict:
|
|
"""PATCH request to API"""
|
|
url = self._build_url(endpoint, object_id=object_id, params=params)
|
|
self._log(f"PATCH {url}")
|
|
response = requests.patch(url, headers=self.headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def delete(self, endpoint: str, object_id: int, timeout: int = 10) -> str:
|
|
"""DELETE request to API"""
|
|
url = self._build_url(endpoint, object_id=object_id)
|
|
self._log(f"DELETE {url}")
|
|
response = requests.delete(url, headers=self.headers, timeout=timeout)
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
# ====================
|
|
# Team Operations
|
|
# ====================
|
|
|
|
def get_team(self, team_id: Optional[int] = None, abbrev: Optional[str] = None) -> Dict:
|
|
"""
|
|
Get a team by ID or abbreviation
|
|
|
|
Args:
|
|
team_id: Team ID
|
|
abbrev: Team abbreviation (e.g., 'SKB')
|
|
|
|
Returns:
|
|
Team dict
|
|
"""
|
|
if team_id:
|
|
return self.get('teams', object_id=team_id)
|
|
elif abbrev:
|
|
result = self.get('teams', params=[('abbrev', abbrev.upper())])
|
|
teams = result.get('teams', [])
|
|
if not teams:
|
|
raise ValueError(f"Team '{abbrev}' not found")
|
|
return teams[0]
|
|
else:
|
|
raise ValueError("Must provide team_id or abbrev")
|
|
|
|
def list_teams(self, season: Optional[int] = None, event_id: Optional[int] = None) -> List[Dict]:
|
|
"""
|
|
List teams
|
|
|
|
Args:
|
|
season: Filter by season
|
|
event_id: Filter by event
|
|
|
|
Returns:
|
|
List of team dicts
|
|
"""
|
|
params = []
|
|
if season:
|
|
params.append(('season', season))
|
|
if event_id:
|
|
params.append(('event', event_id))
|
|
|
|
result = self.get('teams', params=params if params else None)
|
|
return result.get('teams', [])
|
|
|
|
# ====================
|
|
# Card Operations
|
|
# ====================
|
|
|
|
def wipe_team_cards(self, team_id: int) -> Any:
|
|
"""
|
|
Wipe all cards for a team (unassigns them)
|
|
|
|
Args:
|
|
team_id: Team ID
|
|
|
|
Returns:
|
|
API response
|
|
"""
|
|
return self.post(f'cards/wipe-team/{team_id}')
|
|
|
|
def list_cards(self, team_id: Optional[int] = None, player_id: Optional[int] = None) -> List[Dict]:
|
|
"""
|
|
List cards
|
|
|
|
Args:
|
|
team_id: Filter by team
|
|
player_id: Filter by player
|
|
|
|
Returns:
|
|
List of card dicts
|
|
"""
|
|
params = []
|
|
if team_id:
|
|
params.append(('team_id', team_id))
|
|
if player_id:
|
|
params.append(('player_id', player_id))
|
|
|
|
result = self.get('cards', params=params if params else None)
|
|
return result.get('cards', [])
|
|
|
|
# ====================
|
|
# Pack Operations
|
|
# ====================
|
|
|
|
def list_packs(
|
|
self,
|
|
team_id: Optional[int] = None,
|
|
opened: Optional[bool] = None,
|
|
new_to_old: bool = False,
|
|
limit: Optional[int] = None,
|
|
timeout: int = 10
|
|
) -> List[Dict]:
|
|
"""
|
|
List packs
|
|
|
|
Args:
|
|
team_id: Filter by team
|
|
opened: Filter by opened status (True=opened, False=unopened)
|
|
new_to_old: Sort newest to oldest (default: False)
|
|
limit: Maximum number of results (e.g., 200, 1000, 2000)
|
|
timeout: Request timeout in seconds (default: 10, increase for large queries)
|
|
|
|
Returns:
|
|
List of pack dicts
|
|
|
|
Examples:
|
|
# Get 200 most recently opened packs
|
|
packs = api.list_packs(opened=True, new_to_old=True, limit=200)
|
|
|
|
# Get unopened packs for a team
|
|
packs = api.list_packs(team_id=69, opened=False)
|
|
|
|
# Large query with extended timeout
|
|
packs = api.list_packs(opened=True, limit=2000, timeout=30)
|
|
"""
|
|
params = []
|
|
if team_id:
|
|
params.append(('team_id', team_id))
|
|
if opened is not None:
|
|
params.append(('opened', 'true' if opened else 'false'))
|
|
if new_to_old:
|
|
params.append(('new_to_old', 'true'))
|
|
if limit:
|
|
params.append(('limit', str(limit)))
|
|
|
|
result = self.get('packs', params=params if params else None, timeout=timeout)
|
|
return result.get('packs', [])
|
|
|
|
def delete_pack(self, pack_id: int) -> str:
|
|
"""
|
|
Delete a pack
|
|
|
|
Args:
|
|
pack_id: Pack ID
|
|
|
|
Returns:
|
|
Success message
|
|
"""
|
|
return self.delete('packs', object_id=pack_id)
|
|
|
|
def update_pack(self, pack_id: int, pack_cardset_id: Optional[int] = None,
|
|
pack_team_id: Optional[int] = None, pack_type_id: Optional[int] = None) -> Dict:
|
|
"""
|
|
Update pack properties (PATCH)
|
|
|
|
Args:
|
|
pack_id: Pack ID
|
|
pack_cardset_id: Update pack cardset (use -1 to clear)
|
|
pack_team_id: Update pack team (use -1 to clear)
|
|
pack_type_id: Update pack type
|
|
|
|
Returns:
|
|
Updated pack dict
|
|
|
|
Example:
|
|
# Fix missing cardset on Team Choice pack
|
|
api.update_pack(pack_id=21207, pack_cardset_id=27)
|
|
"""
|
|
params = []
|
|
if pack_cardset_id is not None:
|
|
params.append(('pack_cardset_id', pack_cardset_id))
|
|
if pack_team_id is not None:
|
|
params.append(('pack_team_id', pack_team_id))
|
|
if pack_type_id is not None:
|
|
params.append(('pack_type_id', pack_type_id))
|
|
|
|
return self.patch('packs', object_id=pack_id, params=params)
|
|
|
|
def create_packs(self, packs: List[Dict]) -> Any:
|
|
"""
|
|
Create packs (bulk distribution)
|
|
|
|
Args:
|
|
packs: List of pack dicts with keys: team_id, pack_type_id, pack_cardset_id
|
|
|
|
Returns:
|
|
API response
|
|
|
|
Example:
|
|
# Give 5 Standard packs to team 31
|
|
api.create_packs([
|
|
{'team_id': 31, 'pack_type_id': 1, 'pack_cardset_id': None}
|
|
for _ in range(5)
|
|
])
|
|
"""
|
|
payload = {'packs': packs}
|
|
return self.post('packs', payload=payload)
|
|
|
|
def get_packs_opened_today(self, limit: int = 2000, timeout: int = 30) -> Dict:
|
|
"""
|
|
Get analytics on packs opened today
|
|
|
|
Args:
|
|
limit: Number of recent packs to check (default: 2000)
|
|
timeout: Request timeout in seconds (default: 30)
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- total: Total packs opened today
|
|
- teams: List of dicts with team info and pack counts
|
|
- note: Warning if limit was reached
|
|
|
|
Example:
|
|
result = api.get_packs_opened_today()
|
|
print(f"{result['total']} packs opened by {len(result['teams'])} teams")
|
|
"""
|
|
from datetime import datetime, timezone
|
|
from collections import defaultdict
|
|
|
|
# Get recent opened packs
|
|
packs = self.list_packs(opened=True, new_to_old=True, limit=limit, timeout=timeout)
|
|
|
|
# Today's date (UTC)
|
|
today = datetime.now(timezone.utc).date()
|
|
|
|
# Count packs by team
|
|
teams_data = defaultdict(lambda: {'count': 0, 'abbrev': '', 'lname': '', 'first': None, 'last': None})
|
|
total = 0
|
|
|
|
for pack in packs:
|
|
if pack.get('open_time'):
|
|
try:
|
|
open_dt = datetime.fromtimestamp(pack['open_time'] / 1000, tz=timezone.utc)
|
|
|
|
if open_dt.date() == today:
|
|
total += 1
|
|
team_id = pack['team']['id']
|
|
teams_data[team_id]['abbrev'] = pack['team']['abbrev']
|
|
teams_data[team_id]['lname'] = pack['team']['lname']
|
|
teams_data[team_id]['count'] += 1
|
|
|
|
if teams_data[team_id]['first'] is None or open_dt < teams_data[team_id]['first']:
|
|
teams_data[team_id]['first'] = open_dt
|
|
if teams_data[team_id]['last'] is None or open_dt > teams_data[team_id]['last']:
|
|
teams_data[team_id]['last'] = open_dt
|
|
except Exception:
|
|
pass
|
|
|
|
# Format results
|
|
teams_list = []
|
|
for team_id, data in teams_data.items():
|
|
teams_list.append({
|
|
'team_id': team_id,
|
|
'abbrev': data['abbrev'],
|
|
'name': data['lname'],
|
|
'packs': data['count'],
|
|
'first_pack': data['first'].isoformat() if data['first'] else None,
|
|
'last_pack': data['last'].isoformat() if data['last'] else None
|
|
})
|
|
|
|
# Sort by pack count
|
|
teams_list.sort(key=lambda x: x['packs'], reverse=True)
|
|
|
|
result = {
|
|
'total': total,
|
|
'teams': teams_list,
|
|
'date': today.isoformat()
|
|
}
|
|
|
|
if len(packs) == limit:
|
|
result['note'] = f'Hit limit of {limit} packs - actual count may be higher'
|
|
|
|
return result
|
|
|
|
def distribute_packs(
|
|
self,
|
|
num_packs: int = 5,
|
|
exclude_team_abbrev: Optional[List[str]] = None,
|
|
pack_type_id: int = 1,
|
|
season: Optional[int] = None
|
|
) -> Dict:
|
|
"""
|
|
Distribute packs to all human-controlled teams
|
|
|
|
Args:
|
|
num_packs: Number of packs to give to each team (default: 5)
|
|
exclude_team_abbrev: List of team abbreviations to exclude (default: None)
|
|
pack_type_id: Pack type ID (default: 1 = Standard packs)
|
|
season: Season to distribute for (default: current season)
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- total_packs: Total packs distributed
|
|
- teams_count: Number of teams that received packs
|
|
- teams: List of teams that received packs
|
|
|
|
Example:
|
|
# Give 10 packs to all teams
|
|
result = api.distribute_packs(num_packs=10)
|
|
|
|
# Give 11 packs to all teams except CAR
|
|
result = api.distribute_packs(num_packs=11, exclude_team_abbrev=['CAR'])
|
|
"""
|
|
if exclude_team_abbrev is None:
|
|
exclude_team_abbrev = []
|
|
|
|
# Convert to uppercase for case-insensitive matching
|
|
exclude_team_abbrev = [abbrev.upper() for abbrev in exclude_team_abbrev]
|
|
|
|
# Get current season if not specified
|
|
if season is None:
|
|
current = self.get('current')
|
|
season = current['season']
|
|
|
|
self._log(f"Distributing {num_packs} packs to season {season} teams")
|
|
|
|
# Get all teams for season
|
|
all_teams = self.list_teams(season=season)
|
|
|
|
# Filter for human-controlled teams only
|
|
qualifying_teams = []
|
|
for team in all_teams:
|
|
if not team['is_ai'] and 'gauntlet' not in team['abbrev'].lower():
|
|
# Check if team is in exclusion list
|
|
if team['abbrev'].upper() in exclude_team_abbrev:
|
|
self._log(f"Excluding team {team['abbrev']}: {team['sname']}")
|
|
continue
|
|
qualifying_teams.append(team)
|
|
|
|
self._log(f"Found {len(qualifying_teams)} qualifying teams")
|
|
if exclude_team_abbrev:
|
|
self._log(f"Excluded teams: {', '.join(exclude_team_abbrev)}")
|
|
|
|
# Distribute packs to each team
|
|
total_packs = 0
|
|
for team in qualifying_teams:
|
|
self._log(f"Giving {num_packs} packs to {team['abbrev']} ({team['sname']})")
|
|
|
|
# Create pack payload
|
|
packs = [{
|
|
'team_id': team['id'],
|
|
'pack_type_id': pack_type_id,
|
|
'pack_cardset_id': None
|
|
} for _ in range(num_packs)]
|
|
|
|
try:
|
|
self.create_packs(packs)
|
|
total_packs += num_packs
|
|
self._log(f" ✓ Successfully gave {num_packs} packs to {team['abbrev']}")
|
|
except Exception as e:
|
|
self._log(f" ✗ Failed to give packs to {team['abbrev']}: {e}")
|
|
raise
|
|
|
|
result = {
|
|
'total_packs': total_packs,
|
|
'teams_count': len(qualifying_teams),
|
|
'teams': qualifying_teams
|
|
}
|
|
|
|
self._log(f"Distribution complete: {total_packs} packs to {len(qualifying_teams)} teams")
|
|
|
|
return result
|
|
|
|
# ====================
|
|
# Gauntlet Operations
|
|
# ====================
|
|
|
|
def list_gauntlet_runs(self, event_id: Optional[int] = None, team_id: Optional[int] = None, active_only: bool = False) -> List[Dict]:
|
|
"""
|
|
List gauntlet runs
|
|
|
|
Args:
|
|
event_id: Filter by event
|
|
team_id: Filter by team
|
|
active_only: Only show active runs
|
|
|
|
Returns:
|
|
List of run dicts
|
|
"""
|
|
params = []
|
|
if event_id:
|
|
params.append(('gauntlet_id', event_id))
|
|
if team_id:
|
|
params.append(('team_id', team_id))
|
|
if active_only:
|
|
params.append(('is_active', 'true'))
|
|
|
|
result = self.get('gauntletruns', params=params if params else None)
|
|
return result.get('runs', [])
|
|
|
|
def end_gauntlet_run(self, run_id: int) -> Dict:
|
|
"""
|
|
End a gauntlet run by setting ended timestamp
|
|
|
|
Args:
|
|
run_id: Run ID
|
|
|
|
Returns:
|
|
Updated run dict
|
|
"""
|
|
return self.patch('gauntletruns', object_id=run_id, params=[('ended', 'true')])
|
|
|
|
# ====================
|
|
# Player Operations
|
|
# ====================
|
|
|
|
def get_player(self, player_id: int) -> Dict:
|
|
"""
|
|
Get a player by ID
|
|
|
|
Args:
|
|
player_id: Player ID
|
|
|
|
Returns:
|
|
Player dict
|
|
"""
|
|
return self.get('players', object_id=player_id)
|
|
|
|
def list_players(self, cardset_id: Optional[int] = None, rarity: Optional[str] = None) -> List[Dict]:
|
|
"""
|
|
List players
|
|
|
|
Args:
|
|
cardset_id: Filter by cardset
|
|
rarity: Filter by rarity
|
|
|
|
Returns:
|
|
List of player dicts
|
|
"""
|
|
params = []
|
|
if cardset_id:
|
|
params.append(('cardset', cardset_id))
|
|
if rarity:
|
|
params.append(('rarity', rarity))
|
|
|
|
result = self.get('players', params=params if params else None)
|
|
return result.get('players', [])
|
|
|
|
# ====================
|
|
# Result/Stats Operations
|
|
# ====================
|
|
|
|
def list_results(self, season: Optional[int] = None, team_id: Optional[int] = None) -> List[Dict]:
|
|
"""
|
|
List game results
|
|
|
|
Args:
|
|
season: Filter by season
|
|
team_id: Filter by team
|
|
|
|
Returns:
|
|
List of result dicts
|
|
"""
|
|
params = []
|
|
if season:
|
|
params.append(('season', season))
|
|
if team_id:
|
|
params.append(('team_id', team_id))
|
|
|
|
result = self.get('results', params=params if params else None)
|
|
return result.get('results', [])
|
|
|
|
# ====================
|
|
# Helper Methods
|
|
# ====================
|
|
|
|
def find_gauntlet_teams(self, event_id: Optional[int] = None, active_only: bool = False) -> List[Dict]:
|
|
"""
|
|
Find gauntlet teams (teams with 'Gauntlet' in abbrev)
|
|
|
|
Args:
|
|
event_id: Filter by event
|
|
active_only: Only teams with active runs
|
|
|
|
Returns:
|
|
List of team dicts with run information
|
|
"""
|
|
if active_only:
|
|
# Get active runs, then get teams
|
|
runs = self.list_gauntlet_runs(event_id=event_id, active_only=True)
|
|
teams_with_runs = []
|
|
for run in runs:
|
|
team = run['team']
|
|
team['active_run'] = run
|
|
teams_with_runs.append(team)
|
|
return teams_with_runs
|
|
else:
|
|
# Get all teams with 'Gauntlet' in name
|
|
all_teams = self.list_teams()
|
|
gauntlet_teams = [t for t in all_teams if 'Gauntlet' in t.get('abbrev', '')]
|
|
|
|
# Optionally add run info
|
|
if event_id:
|
|
runs = self.list_gauntlet_runs(event_id=event_id)
|
|
run_by_team = {r['team']['id']: r for r in runs}
|
|
for team in gauntlet_teams:
|
|
if team['id'] in run_by_team:
|
|
team['run'] = run_by_team[team['id']]
|
|
|
|
return gauntlet_teams
|
|
|
|
|
|
def main():
|
|
"""Example usage"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Paper Dynasty API Client')
|
|
parser.add_argument('--env', choices=['prod', 'dev'], default='dev', help='Environment')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
api = PaperDynastyAPI(environment=args.env, verbose=args.verbose)
|
|
print(f"✓ Connected to {args.env.upper()} database: {api.base_url}")
|
|
|
|
# Example: List gauntlet teams
|
|
print("\nExample: Listing gauntlet teams...")
|
|
teams = api.find_gauntlet_teams(active_only=True)
|
|
print(f"Found {len(teams)} active gauntlet teams")
|
|
|
|
except ValueError as e:
|
|
print(f"❌ Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|