paper-dynasty-card-creation/custom_cards/interactive_creator.py
2025-11-14 09:51:04 -06:00

647 lines
26 KiB
Python

"""
Interactive custom card creation workflow for Paper Dynasty.
This script guides users through creating fictional player cards using
baseball archetypes with iterative review and refinement.
"""
import asyncio
import sys
from typing import Dict, Any, Literal, Optional
from datetime import datetime
import boto3
import aiohttp
from custom_cards.archetype_definitions import (
BATTER_ARCHETYPES,
PITCHER_ARCHETYPES,
BatterArchetype,
PitcherArchetype,
describe_archetypes,
get_archetype,
)
from custom_cards.archetype_calculator import (
BatterRatingCalculator,
PitcherRatingCalculator,
calculate_total_ops,
)
from creation_helpers import (
get_all_pybaseball_ids,
sanitize_name,
CLUB_LIST,
mlbteam_and_franchise,
)
from db_calls import db_get, db_post, db_put, db_patch, url_get
from exceptions import logger
# AWS Configuration
AWS_BUCKET_NAME = 'paper-dynasty'
AWS_REGION = 'us-east-1'
S3_BASE_URL = f'https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com'
class CustomCardCreator:
"""Interactive workflow for creating custom fictional player cards."""
def __init__(self):
"""Initialize the creator."""
self.cardset = None
self.season = None
self.player_description = None
self.s3_client = boto3.client('s3', region_name=AWS_REGION)
async def run(self):
"""Main workflow entry point."""
print("\n" + "="*70)
print("PAPER DYNASTY - CUSTOM CARD CREATOR")
print("="*70)
print("\nCreate fictional player cards using baseball archetypes.")
print("You'll define a player profile, review the calculated ratings,")
print("and iteratively tweak until satisfied.\n")
# Step 1: Setup cardset
await self.setup_cardset()
# Step 2: Create players loop
while True:
player_type = self.prompt_player_type()
if player_type == 'quit':
break
await self.create_player(player_type)
# Ask if they want to create another
another = input("\nCreate another custom player? (yes/no): ").strip().lower()
if another not in ['y', 'yes']:
break
print("\n" + "="*70)
print("Custom card creation complete!")
print("="*70 + "\n")
async def setup_cardset(self):
"""Setup the target cardset for custom cards."""
print("\n" + "-"*70)
print("CARDSET SETUP")
print("-"*70)
cardset_name = input("\nEnter cardset name (e.g., 'Custom Players 2025'): ").strip()
# Search for existing cardset
c_query = await db_get('cardsets', params=[('name', cardset_name)])
if c_query['count'] == 0:
# Cardset doesn't exist - ask if they want to create it
print(f"\nCardset '{cardset_name}' not found.")
create = input("Create new cardset? (yes/no): ").strip().lower()
if create not in ['y', 'yes']:
print("Cannot proceed without a cardset. Exiting.")
sys.exit(0)
# Get cardset details
season = int(input("Enter season year (e.g., 2025): ").strip())
ranked_legal = input("Ranked legal? (yes/no): ").strip().lower() in ['y', 'yes']
in_packs = input("Available in packs? (yes/no): ").strip().lower() in ['y', 'yes']
# Create cardset
cardset_payload = {
'name': cardset_name,
'season': season,
'ranked_legal': ranked_legal,
'in_packs': in_packs
}
new_cardset = await db_post('cardsets', payload=cardset_payload)
self.cardset = new_cardset
print(f"\nCreated new cardset: {cardset_name} (ID: {new_cardset['id']})")
else:
self.cardset = c_query['cardsets'][0]
print(f"\nUsing existing cardset: {cardset_name} (ID: {self.cardset['id']})")
# Set season and player description
self.season = self.cardset.get('season', datetime.now().year)
self.player_description = input(f"\nPlayer description (default: '{cardset_name}'): ").strip() or cardset_name
print(f"\nCardset ID: {self.cardset['id']}")
print(f"Season: {self.season}")
print(f"Player Description: {self.player_description}")
def prompt_player_type(self) -> Literal['batter', 'pitcher', 'quit']:
"""Prompt for player type."""
print("\n" + "-"*70)
print("PLAYER TYPE")
print("-"*70)
print("\n1. Batter")
print("2. Pitcher")
print("3. Quit")
while True:
choice = input("\nSelect player type (1-3): ").strip()
if choice == '1':
return 'batter'
elif choice == '2':
return 'pitcher'
elif choice == '3':
return 'quit'
else:
print("Invalid choice. Please enter 1, 2, or 3.")
async def create_player(self, player_type: Literal['batter', 'pitcher']):
"""Create a custom player of the specified type."""
print("\n" + "="*70)
print(f"CREATING CUSTOM {player_type.upper()}")
print("="*70)
# Step 1: Choose archetype
archetype = self.choose_archetype(player_type)
# Step 2: Get player info
player_info = self.get_player_info(player_type, archetype)
# Step 3: Calculate initial ratings
if player_type == 'batter':
calc = BatterRatingCalculator(archetype)
ratings = calc.calculate_ratings(battingcard_id=0) # Temp ID
baserunning = calc.calculate_baserunning()
card_data = {'ratings': ratings, 'baserunning': baserunning}
else:
calc = PitcherRatingCalculator(archetype)
ratings = calc.calculate_ratings(pitchingcard_id=0) # Temp ID
card_data = {'ratings': ratings}
# Step 4: Review and tweak loop
final_data = await self.review_and_tweak(player_type, player_info, archetype, card_data)
# Step 5: Create records in database
await self.create_database_records(player_type, player_info, final_data)
print(f"\n{player_info['name_first']} {player_info['name_last']} created successfully!")
def choose_archetype(self, player_type: Literal['batter', 'pitcher']) -> BatterArchetype | PitcherArchetype:
"""Interactive archetype selection."""
print(describe_archetypes(player_type))
archetypes = BATTER_ARCHETYPES if player_type == 'batter' else PITCHER_ARCHETYPES
archetype_keys = list(archetypes.keys())
while True:
choice = input(f"\nSelect archetype (1-{len(archetype_keys)}): ").strip()
try:
idx = int(choice) - 1
if 0 <= idx < len(archetype_keys):
key = archetype_keys[idx]
return archetypes[key]
else:
print(f"Please enter a number between 1 and {len(archetype_keys)}.")
except ValueError:
print("Invalid input. Please enter a number.")
def get_player_info(self, player_type: Literal['batter', 'pitcher'], archetype: BatterArchetype | PitcherArchetype) -> dict:
"""Collect basic player information."""
print("\n" + "-"*70)
print("PLAYER INFORMATION")
print("-"*70)
name_first = input("\nFirst name: ").strip().title()
name_last = input("Last name: ").strip().title()
# Hand
if player_type == 'batter':
print("\nBatting hand:")
print("1. Right (R)")
print("2. Left (L)")
print("3. Switch (S)")
hand_choice = input("Select (1-3): ").strip()
hand = {'1': 'R', '2': 'L', '3': 'S'}.get(hand_choice, 'R')
else:
print("\nThrowing hand:")
print("1. Right (R)")
print("2. Left (L)")
hand_choice = input("Select (1-2): ").strip()
hand = {'1': 'R', '2': 'L'}.get(hand_choice, 'R')
# Team
print("\nMLB Team:")
for i, team in enumerate(sorted(CLUB_LIST.keys()), 1):
if i % 3 == 1:
print()
print(f"{i:2d}. {team:25s}", end="")
print("\n")
team_clubs = sorted(CLUB_LIST.keys())
while True:
team_choice = input(f"Select team (1-{len(team_clubs)}): ").strip()
try:
idx = int(team_choice) - 1
if 0 <= idx < len(team_clubs):
team_abbrev = team_clubs[idx]
break
else:
print(f"Please enter a number between 1 and {len(team_clubs)}.")
except ValueError:
print("Invalid input. Please enter a number.")
mlb_team_id, franchise_id = mlbteam_and_franchise(team_abbrev)
# Positions
if player_type == 'batter':
print("\nDefensive positions (primary first):")
print("Enter positions separated by spaces (e.g., 'CF RF DH')")
print("Available: C 1B 2B 3B SS LF CF RF DH")
positions_input = input("Positions: ").strip().upper().split()
positions = positions_input[:8] # Max 8 positions
else:
positions = ['P'] # Pitchers always have P
return {
'name_first': name_first,
'name_last': name_last,
'hand': hand,
'team_abbrev': team_abbrev,
'mlb_team_id': mlb_team_id,
'franchise_id': franchise_id,
'positions': positions,
}
async def review_and_tweak(
self,
player_type: Literal['batter', 'pitcher'],
player_info: dict,
archetype: BatterArchetype | PitcherArchetype,
card_data: dict
) -> dict:
"""Review calculated ratings and allow iterative tweaking."""
print("\n" + "="*70)
print("REVIEW & TWEAK RATINGS")
print("="*70)
while True:
# Display current ratings
self.display_ratings(player_type, player_info, card_data)
# Show options
print("\nOptions:")
print("1. Accept these ratings")
print("2. Tweak archetype percentages")
print("3. Manually adjust specific ratings")
print("4. Start over with different archetype")
choice = input("\nSelect (1-4): ").strip()
if choice == '1':
# Accept
return card_data
elif choice == '2':
# Tweak archetype
card_data = await self.tweak_archetype(player_type, archetype, card_data)
elif choice == '3':
# Manual adjustments
card_data = await self.manual_adjustments(player_type, card_data)
elif choice == '4':
# Start over
return await self.create_player(player_type)
else:
print("Invalid choice.")
def display_ratings(self, player_type: Literal['batter', 'pitcher'], player_info: dict, card_data: dict):
"""Display current calculated ratings in a readable format."""
name = f"{player_info['name_first']} {player_info['name_last']}"
print(f"\n{name} ({player_info['hand']}) - {player_info['team_abbrev']}")
print("-"*70)
ratings = card_data['ratings']
# Display vs L and vs R stats
for rating in ratings:
vs_hand = rating['vs_hand']
print(f"\nVS {vs_hand}{'HP' if player_type == 'batter' else 'HB'}:")
print(f" AVG: {rating['avg']:.3f} OBP: {rating['obp']:.3f} SLG: {rating['slg']:.3f} OPS: {rating['obp']+rating['slg']:.3f}")
# Show hit distribution
total_hits = (rating['homerun'] + rating['bp_homerun'] + rating['triple'] +
rating['double_three'] + rating['double_two'] + rating['double_pull'] +
rating['single_two'] + rating['single_one'] + rating['single_center'] + rating['bp_single'])
print(f" Hits: {total_hits:.1f} (HR: {rating['homerun']:.1f} 3B: {rating['triple']:.1f} 2B: {rating['double_pull']+rating['double_two']+rating['double_three']:.1f} 1B: {total_hits - rating['homerun'] - rating['bp_homerun'] - rating['triple'] - rating['double_pull'] - rating['double_two'] - rating['double_three']:.1f})")
# Show walk/strikeout
print(f" BB: {rating['walk']:.1f} HBP: {rating['hbp']:.1f} K: {rating['strikeout']:.1f}")
# Show batted ball distribution
outs = rating['lineout'] + rating['popout'] + (rating['flyout_a'] + rating['flyout_bq'] +
rating['flyout_lf_b'] + rating['flyout_rf_b']) + (rating['groundout_a'] +
rating['groundout_b'] + rating['groundout_c'])
print(f" Outs: {outs:.1f} (K: {rating['strikeout']:.1f} LD: {rating['lineout']:.1f} FB: {rating['flyout_a']+rating['flyout_bq']+rating['flyout_lf_b']+rating['flyout_rf_b']:.1f} GB: {rating['groundout_a']+rating['groundout_b']+rating['groundout_c']:.1f})")
# Calculate and display total OPS
is_pitcher = (player_type == 'pitcher')
total_ops = calculate_total_ops(ratings[0], ratings[1], is_pitcher)
print(f"\n{'Total OPS' if player_type == 'batter' else 'Total OPS Against'}: {total_ops:.3f}")
# Show baserunning for batters
if player_type == 'batter' and 'baserunning' in card_data:
br = card_data['baserunning']
print(f"\nBaserunning:")
print(f" Steal: {br['steal_low']}-{br['steal_high']} (Auto: {br['steal_auto']}, Jump: {br['steal_jump']})")
print(f" Running: {br['running']}/10 Hit-and-Run: {br['hit_and_run']}/10")
async def tweak_archetype(self, player_type: Literal['batter', 'pitcher'], archetype: BatterArchetype | PitcherArchetype, card_data: dict) -> dict:
"""Allow tweaking of archetype percentages."""
print("\n" + "-"*70)
print("TWEAK ARCHETYPE")
print("-"*70)
print("\nAdjust key percentages (press Enter to keep current value):\n")
# TODO: Implement percentage tweaking
# For now, return unchanged
print("(Feature coming soon - manual adjustments available in option 3)")
return card_data
async def manual_adjustments(self, player_type: Literal['batter', 'pitcher'], card_data: dict) -> dict:
"""Allow manual adjustment of specific D20 ratings."""
print("\n" + "-"*70)
print("MANUAL ADJUSTMENTS")
print("-"*70)
print("\nDirectly edit D20 chances (must sum to 108):\n")
# TODO: Implement manual adjustments
# For now, return unchanged
print("(Feature coming soon)")
return card_data
async def create_database_records(self, player_type: Literal['batter', 'pitcher'], player_info: dict, card_data: dict):
"""Create Player, Card, Ratings, and Position records in the database."""
print("\n" + "-"*70)
print("CREATING DATABASE RECORDS")
print("-"*70)
# Step 1: Create/verify MLBPlayer record
# For custom players, we use a fake bbref_id based on name
bbref_id = f"custom_{player_info['name_last'].lower()}{player_info['name_first'][0].lower()}01"
mlb_query = await db_get('mlbplayers', params=[('key_bbref', bbref_id)])
if mlb_query['count'] > 0:
mlbplayer_id = mlb_query['players'][0]['id']
print(f"✓ Using existing MLBPlayer record (ID: {mlbplayer_id})")
else:
mlbplayer_payload = {
'key_bbref': bbref_id,
'key_fangraphs': 0, # Custom player, no real ID
'key_mlbam': 0,
'key_retro': '',
'name_first': player_info['name_first'],
'name_last': player_info['name_last'],
'mlb_team_id': player_info['mlb_team_id'],
}
new_mlbplayer = await db_post('mlbplayers/one', payload=mlbplayer_payload)
mlbplayer_id = new_mlbplayer['id']
print(f"✓ Created MLBPlayer record (ID: {mlbplayer_id})")
# Step 2: Create Player record (with placeholder player_id for image URL)
# Note: We'll get the actual player_id after POST, then set the image URL
now = datetime.now()
release_date = f"{now.year}-{now.month}-{now.day}"
# We need to create player first, then PATCH with image URL
player_payload = {
'p_name': f"{player_info['name_first']} {player_info['name_last']}",
'bbref_id': bbref_id,
'fangraphs_id': 0,
'mlbam_id': 0,
'retrosheet_id': '',
'hand': player_info['hand'],
'mlb_team_id': player_info['mlb_team_id'],
'franchise_id': player_info['franchise_id'],
'cardset': self.cardset['id'],
'player_description': self.player_description,
'is_custom': True,
'mlbplayer_id': mlbplayer_id,
}
new_player = await db_post('players', payload=player_payload)
player_id = new_player['player_id']
print(f"✓ Created Player record (ID: {player_id})")
# Now add the image URL via PATCH
card_type = 'batting' if player_type == 'batter' else 'pitching'
image_url = f"https://pd.manticorum.com/api/v2/players/{player_id}/{card_type}card?d={release_date}"
await db_patch('players', object_id=player_id, params=[('image', image_url)])
print(f"✓ Updated Player with image URL")
# Step 3: Create Card and Ratings
if player_type == 'batter':
await self.create_batting_card(player_id, player_info, card_data)
else:
await self.create_pitching_card(player_id, player_info, card_data)
# Step 4: Create CardPosition records
await self.create_positions(player_id, player_info['positions'], card_data)
print(f"\n✓ All database records created for player ID {player_id}")
# Step 5: Generate card image and upload to S3
await self.image_generation_and_upload(player_id, player_type, release_date)
async def create_batting_card(self, player_id: int, player_info: dict, card_data: dict):
"""Create batting card and ratings."""
baserunning = card_data['baserunning']
# Create batting card
batting_card_payload = {
'cards': [{
'player_id': player_id,
'key_bbref': f"custom_{player_info['name_last'].lower()}{player_info['name_first'][0].lower()}01",
'key_fangraphs': 0,
'key_mlbam': 0,
'key_retro': '',
'name_first': player_info['name_first'],
'name_last': player_info['name_last'],
'steal_low': baserunning['steal_low'],
'steal_high': baserunning['steal_high'],
'steal_auto': baserunning['steal_auto'],
'steal_jump': baserunning['steal_jump'],
'hit_and_run': baserunning['hit_and_run'],
'running': baserunning['running'],
'hand': player_info['hand'],
}]
}
card_resp = await db_put('battingcards', payload=batting_card_payload, timeout=10)
print(f"✓ Created batting card")
# Get the created card ID
bc_query = await db_get('battingcards', params=[('player_id', player_id)])
battingcard_id = bc_query['cards'][0]['id']
# Create ratings (update with real card ID)
ratings = card_data['ratings']
for rating in ratings:
rating['battingcard_id'] = battingcard_id
ratings_payload = {'ratings': ratings}
await db_put('battingcardratings', payload=ratings_payload, timeout=10)
print(f"✓ Created batting ratings")
async def create_pitching_card(self, player_id: int, player_info: dict, card_data: dict):
"""Create pitching card and ratings."""
# Determine starter/relief/closer ratings based on archetype
# For now, use defaults - can be enhanced later
pitching_card_payload = {
'cards': [{
'player_id': player_id,
'key_bbref': f"custom_{player_info['name_last'].lower()}{player_info['name_first'][0].lower()}01",
'key_fangraphs': 0,
'key_mlbam': 0,
'key_retro': '',
'name_first': player_info['name_first'],
'name_last': player_info['name_last'],
'hand': player_info['hand'],
'starter_rating': 5, # TODO: Get from archetype
'relief_rating': 5, # TODO: Get from archetype
'closer_rating': None, # TODO: Get from archetype
}]
}
card_resp = await db_put('pitchingcards', payload=pitching_card_payload, timeout=10)
print(f"✓ Created pitching card")
# Get the created card ID
pc_query = await db_get('pitchingcards', params=[('player_id', player_id)])
pitchingcard_id = pc_query['cards'][0]['id']
# Create ratings (update with real card ID)
ratings = card_data['ratings']
for rating in ratings:
rating['pitchingcard_id'] = pitchingcard_id
ratings_payload = {'ratings': ratings}
await db_put('pitchingcardratings', payload=ratings_payload, timeout=10)
print(f"✓ Created pitching ratings")
async def create_positions(self, player_id: int, positions: list[str], card_data: dict):
"""Create CardPosition records."""
positions_payload = {
'positions': [
{
'player_id': player_id,
'position': pos
}
for pos in positions
]
}
await db_post('cardpositions', payload=positions_payload)
print(f"✓ Created {len(positions)} position record(s)")
async def image_generation_and_upload(self, player_id: int, player_type: Literal['batter', 'pitcher'], release_date: str):
"""
Generate card image from API and upload to S3.
This implements the 4-step workflow:
1. GET Player.image to trigger card generation
2. Fetch the generated PNG image bytes
3. Upload to S3 with proper metadata
4. PATCH player record with S3 URL
"""
print("\n" + "-"*70)
print("GENERATING & UPLOADING CARD IMAGE")
print("-"*70)
card_type = 'batting' if player_type == 'batter' else 'pitching'
api_image_url = f"https://pd.manticorum.com/api/v2/players/{player_id}/{card_type}card?d={release_date}"
try:
# Step 1 & 2: Trigger card generation and fetch image bytes
async with aiohttp.ClientSession() as session:
async with session.get(api_image_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
image_bytes = await resp.read()
image_size_kb = len(image_bytes) / 1024
print(f"✓ Triggered card generation at API")
print(f"✓ Fetched card image ({image_size_kb:.1f} KB)")
logger.info(f"Fetched {card_type} card for player {player_id}: {image_size_kb:.1f} KB")
else:
error_text = await resp.text()
raise ValueError(f"Card generation failed (HTTP {resp.status}): {error_text}")
# Step 3: Upload to S3
s3_url = self.upload_card_to_s3(
image_bytes,
player_id,
card_type,
release_date,
self.cardset['id']
)
print(f"✓ Uploaded to S3: cards/cardset-{self.cardset['id']:03d}/player-{player_id}/{card_type}card.png")
# Step 4: Update player record with S3 URL
await db_patch('players', object_id=player_id, params=[('image', s3_url)])
print(f"✓ Updated player record with S3 URL")
logger.info(f"Updated player {player_id} with S3 URL: {s3_url}")
except aiohttp.ClientError as e:
error_msg = f"Network error during card generation: {e}"
logger.error(error_msg)
print(f"\n⚠ WARNING: {error_msg}")
print(f" Player created but card image not uploaded to S3.")
print(f" Manual upload may be needed later.")
except ValueError as e:
error_msg = f"Card generation error: {e}"
logger.error(error_msg)
print(f"\n⚠ WARNING: {error_msg}")
print(f" Player created but card image not uploaded to S3.")
print(f" Check player record and card data.")
except Exception as e:
error_msg = f"S3 upload error: {e}"
logger.error(error_msg)
print(f"\n⚠ WARNING: {error_msg}")
print(f" Card generated but S3 upload failed.")
print(f" Player still has API image URL.")
def upload_card_to_s3(self, image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int) -> str:
"""
Upload card image to S3 and return the S3 URL with cache-busting param.
Args:
image_data: Raw PNG image bytes
player_id: Player ID
card_type: 'batting' or 'pitching'
release_date: Date string for cache busting (e.g., '2025-11-11')
cardset_id: Cardset ID (will be zero-padded to 3 digits)
Returns:
Full S3 URL with ?d= parameter
"""
# Format cardset_id with 3 digits and leading zeros
cardset_str = f'{cardset_id:03d}'
s3_key = f'cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png'
self.s3_client.put_object(
Bucket=AWS_BUCKET_NAME,
Key=s3_key,
Body=image_data,
ContentType='image/png',
CacheControl='public, max-age=300', # 5 minute cache
Metadata={
'player-id': str(player_id),
'card-type': card_type,
'upload-date': datetime.now().isoformat()
}
)
# Return URL with cache-busting parameter
s3_url = f'{S3_BASE_URL}/{s3_key}?d={release_date}'
logger.info(f'Uploaded {card_type} card for player {player_id} to S3: {s3_url}')
return s3_url
async def main():
"""Entry point."""
creator = CustomCardCreator()
await creator.run()
if __name__ == "__main__":
asyncio.run(main())