CLAUDE: Add S3 upload script and cardset analysis tools

- Add check_cards_and_upload.py: Fetches card images from API and uploads to AWS S3
  - Uses persistent aiohttp session for efficient connection reuse
  - Supports cache-busting query parameters (?d=date) for Discord compatibility
  - S3 URL structure: cards/cardset-{id:03d}/player-{id}/{type}card.png
  - Configurable upload and player URL update flags

- Add analyze_cardset_rarity.py: Analyzes players by franchise and rarity
  - Groups batters, pitchers, and combined totals
  - Displays counts for all rarity tiers by franchise
  - Provides comprehensive breakdown of cardset composition

- Add rank_pitching_staffs.py: Ranks teams 1-30 by pitching staff quality
  - Point system based on rarity tiers (HoF=5, MVP=4, AS=3, etc.)
  - Shows detailed rosters for top 5 and bottom 5 teams
  - Useful for balance analysis and cardset evaluation

- Update CLAUDE.md with new scripts documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2025-11-09 06:11:16 -06:00
parent 6de22f7f1f
commit 96f3721780
4 changed files with 463 additions and 81 deletions

View File

@ -24,7 +24,8 @@ This is a baseball card creation system for Paper Dynasty, a sports card simulat
- **live_series_update.py**: Main script for live season card updates (in-season cards)
- **retrosheet_data.py**: Main script for historical replay cardsets
- **refresh_cards.py**: Updates existing player card images and metadata
- **check_cards.py**: Validates card data and generates test outputs
- **check_cards.py**: Validates card data and generates test outputs
- **check_cards_and_upload.py**: Fetches card images from API and uploads to AWS S3 with cache-busting URLs
- **scouting_batters.py** / **scouting_pitchers.py**: Generate scouting reports and ratings comparisons
## Common Commands
@ -49,6 +50,17 @@ python scouting_batters.py # Generate batting scouting data
python scouting_pitchers.py # Generate pitching scouting data
```
### AWS S3 Card Upload
```bash
python check_cards_and_upload.py # Fetch cards from API and upload to S3
```
### Analysis and Reporting
```bash
python analyze_cardset_rarity.py # Analyze players by franchise and rarity (batters/pitchers/combined)
python rank_pitching_staffs.py # Rank teams 1-30 by pitching staff quality
```
## Data Input Requirements
### FanGraphs Data (place in data-input/[YEAR] [TYPE] Cardset/)
@ -119,6 +131,24 @@ Before running retrosheet_data.py, verify these configuration settings:
4. Defense CSV files present and properly named
5. Running/pitching CSV files present
### AWS S3 Upload Settings (check_cards_and_upload.py)
- `CARDSET_NAME`: Target cardset name to fetch players from (e.g., "2005 Live")
- `START_ID`: Optional player_id to start from (useful for resuming uploads)
- `TEST_COUNT`: Limit number of cards to process (set to None for all cards)
- `HTML_CARDS`: Set to True to fetch HTML preview cards instead of PNG
- `UPLOAD_TO_S3`: Enable/disable S3 upload (True for production)
- `UPDATE_PLAYER_URLS`: Enable/disable updating player records with S3 URLs (careful - modifies database)
- `AWS_BUCKET_NAME`: S3 bucket name (default: 'paper-dynasty')
- `AWS_REGION`: AWS region (default: 'us-east-1')
**S3 URL Structure**: `cards/cardset-{cardset_id:03d}/player-{player_id}/{batting|pitching}card.png?d={release_date}`
- Uses zero-padded 3-digit cardset ID for consistent sorting
- Includes cache-busting query parameter with date (YYYY-M-D format)
- Uses persistent aiohttp session for efficient connection reuse
**AWS Credentials**: Requires AWS CLI configured with credentials (`~/.aws/credentials`) and appropriate IAM permissions:
- `s3:PutObject`, `s3:GetObject`, `s3:ListBucket` on the target bucket
## Important Notes
- The system uses D20-based probability mechanics where statistics are converted to chances out of 20

181
analyze_cardset_rarity.py Normal file
View File

@ -0,0 +1,181 @@
import asyncio
from collections import defaultdict
from db_calls import db_get
async def analyze_cardset_rarity(cardset_id: int = 27):
"""Analyze players by franchise and rarity for a given cardset."""
print(f'Fetching players from cardset {cardset_id}...\n')
# Fetch all players from the cardset
p_query = await db_get(
'players',
params=[('cardset_id', cardset_id), ('inc_dex', False)]
)
if not p_query or p_query['count'] == 0:
print(f'No players found for cardset {cardset_id}')
return
players = p_query['players']
print(f'Found {len(players)} players\n')
# First pass: collect all unique rarity names
all_rarities = set()
for player in players:
rarity_obj = player.get('rarity', {})
if isinstance(rarity_obj, dict):
rarity_name = rarity_obj.get('name')
if rarity_name:
all_rarities.add(rarity_name)
sorted_rarities = sorted(all_rarities, key=lambda x: (
{'Common': 0, 'Uncommon': 1, 'Rare': 2, 'Epic': 3, 'Legend': 4,
'Starter': 0, 'Bench': 1, 'All-Star': 2, 'MVP': 3, 'Hall of Fame': 4}.get(x, 99), x
))
print(f'Found rarities: {sorted_rarities}\n')
# Group players by franchise and rarity
franchise_data = defaultdict(lambda: {
'batters': defaultdict(int),
'pitchers': defaultdict(int),
'combined': defaultdict(int)
})
for player in players:
franchise = player.get('franchise', 'Unknown')
rarity_obj = player.get('rarity', {})
# Extract rarity name from rarity object
if isinstance(rarity_obj, dict):
rarity = rarity_obj.get('name', 'Unknown')
else:
rarity = str(rarity_obj) if rarity_obj else 'Unknown'
# Determine if batter or pitcher based on positions
positions = []
for i in range(1, 9):
pos = player.get(f'pos_{i}')
if pos:
positions.append(pos)
is_pitcher = any(pos in ['SP', 'RP', 'CP'] for pos in positions)
if is_pitcher:
franchise_data[franchise]['pitchers'][rarity] += 1
else:
franchise_data[franchise]['batters'][rarity] += 1
franchise_data[franchise]['combined'][rarity] += 1
# Sort franchises alphabetically
sorted_franchises = sorted(franchise_data.keys())
# Print batters
print('=' * 100)
print('BATTERS BY FRANCHISE AND RARITY')
print('=' * 100)
# Build header dynamically
header = f'{"Franchise":<20}'
for rarity in sorted_rarities:
header += f' {rarity:<12}'
header += f' {"Total":<10}'
print(header)
print('-' * 100)
batter_totals = defaultdict(int)
for franchise in sorted_franchises:
batters = franchise_data[franchise]['batters']
total = sum(batters.values())
if total > 0:
row = f'{franchise:<20}'
for rarity in sorted_rarities:
count = batters.get(rarity, 0)
row += f' {count:<12}'
batter_totals[rarity] += count
row += f' {total:<10}'
batter_totals['Total'] += total
print(row)
print('-' * 100)
total_row = f'{"TOTAL":<20}'
for rarity in sorted_rarities:
total_row += f' {batter_totals[rarity]:<12}'
total_row += f' {batter_totals["Total"]:<10}'
print(total_row)
# Print pitchers
print('\n' + '=' * 100)
print('PITCHERS BY FRANCHISE AND RARITY')
print('=' * 100)
header = f'{"Franchise":<20}'
for rarity in sorted_rarities:
header += f' {rarity:<12}'
header += f' {"Total":<10}'
print(header)
print('-' * 100)
pitcher_totals = defaultdict(int)
for franchise in sorted_franchises:
pitchers = franchise_data[franchise]['pitchers']
total = sum(pitchers.values())
if total > 0:
row = f'{franchise:<20}'
for rarity in sorted_rarities:
count = pitchers.get(rarity, 0)
row += f' {count:<12}'
pitcher_totals[rarity] += count
row += f' {total:<10}'
pitcher_totals['Total'] += total
print(row)
print('-' * 100)
total_row = f'{"TOTAL":<20}'
for rarity in sorted_rarities:
total_row += f' {pitcher_totals[rarity]:<12}'
total_row += f' {pitcher_totals["Total"]:<10}'
print(total_row)
# Print combined
print('\n' + '=' * 100)
print('COMBINED (BATTERS + PITCHERS) BY FRANCHISE AND RARITY')
print('=' * 100)
header = f'{"Franchise":<20}'
for rarity in sorted_rarities:
header += f' {rarity:<12}'
header += f' {"Total":<10}'
print(header)
print('-' * 100)
combined_totals = defaultdict(int)
for franchise in sorted_franchises:
combined = franchise_data[franchise]['combined']
total = sum(combined.values())
row = f'{franchise:<20}'
for rarity in sorted_rarities:
count = combined.get(rarity, 0)
row += f' {count:<12}'
combined_totals[rarity] += count
row += f' {total:<10}'
combined_totals['Total'] += total
print(row)
print('-' * 100)
total_row = f'{"TOTAL":<20}'
for rarity in sorted_rarities:
total_row += f' {combined_totals[rarity]:<12}'
total_row += f' {combined_totals["Total"]:<10}'
print(total_row)
print('=' * 100)
if __name__ == '__main__':
asyncio.run(analyze_cardset_rarity(27))

View File

@ -12,13 +12,13 @@ from exceptions import logger
# Configuration
CARDSET_NAME = '2005 Live'
START_ID = None # Integer to only start pulling cards at player_id START_ID
TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls
TEST_COUNT = 5 # integer to stop after TEST_COUNT calls
HTML_CARDS = False # boolean to only check and not generate cards
SKIP_ARMS = False
SKIP_BATS = False
# AWS Configuration
AWS_BUCKET_NAME = 'your-pd-cards-bucket' # Change to your bucket name
AWS_BUCKET_NAME = 'paper-dynasty' # Change to your bucket name
AWS_REGION = 'us-east-1' # Change to your region
S3_BASE_URL = f'https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com'
UPLOAD_TO_S3 = True # Set to False to skip S3 upload (testing)
@ -28,7 +28,31 @@ UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing)
s3_client = boto3.client('s3', region_name=AWS_REGION) if UPLOAD_TO_S3 else None
def upload_card_to_s3(image_data: bytes, player_id: int, card_type: str, release_date: str) -> str:
async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes:
"""
Fetch card image from URL and return raw bytes.
Args:
session: aiohttp ClientSession to use for the request
card_url: URL to fetch the card from
timeout: Request timeout in seconds
Returns:
Raw PNG image bytes
"""
import aiohttp
async with session.get(card_url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
if resp.status == 200:
logger.info(f'Fetched card image from {card_url}')
return await resp.read()
else:
error_text = await resp.text()
logger.error(f'Failed to fetch card: {error_text}')
raise ValueError(f'Card fetch error: {error_text}')
def upload_card_to_s3(image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int) -> str:
"""
Upload card image to S3 and return the S3 URL with cache-busting param.
@ -37,11 +61,14 @@ def upload_card_to_s3(image_data: bytes, player_id: int, card_type: str, release
player_id: Player ID
card_type: 'batting' or 'pitching'
release_date: Date string for cache busting (e.g., '2025-11-8')
cardset_id: Cardset ID (will be zero-padded to 3 digits)
Returns:
Full S3 URL with ?d= parameter
"""
s3_key = f'cards/player-{player_id}/{card_type}card.png'
# Format cardset_id with 3 digits and leading zeros
cardset_str = f'{cardset_id:03d}'
s3_key = f'cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png'
try:
s3_client.put_object(
@ -68,10 +95,12 @@ def upload_card_to_s3(image_data: bytes, player_id: int, card_type: str, release
async def main(args):
import aiohttp
print(f'Searching for cardset: {CARDSET_NAME}')
c_query = await db_get('cardsets', params=[('name', CARDSET_NAME)])
if c_query['count'] == 0:
if not c_query or c_query['count'] == 0:
print(f'I do not see a cardset named {CARDSET_NAME}')
return
cardset = c_query['cardsets'][0]
@ -81,8 +110,8 @@ async def main(args):
'players',
params=[('inc_dex', False), ('cardset_id', cardset['id']), ('short_output', True)]
)
if p_query['count'] == 0:
raise ValueError(f'No players returned from Paper Dynasty API')
if not p_query or p_query['count'] == 0:
raise ValueError('No players returned from Paper Dynasty API')
all_players = p_query['players']
del p_query
@ -102,52 +131,41 @@ async def main(args):
print(f'S3 Upload: {"ENABLED" if UPLOAD_TO_S3 else "DISABLED"}')
print(f'URL Update: {"ENABLED" if UPDATE_PLAYER_URLS else "DISABLED"}\n')
for x in all_players:
if 'pitching' in x['image'] and SKIP_ARMS:
pass
elif 'batting' in x['image'] and SKIP_BATS:
pass
elif START_ID is not None and START_ID > x['player_id']:
pass
elif 'sombaseball' in x['image']:
errors.append((x, f'Bad card url: {x["image"]}'))
else:
count += 1
if count % 20 == 0:
print(f'Card #{count + 1} being pulled is {x["p_name"]}...')
elif TEST_COUNT is not None and TEST_COUNT < count:
print(f'Done test run')
break
if HTML_CARDS:
card_url = f'{x["image"]}&html=true'
timeout = 2
# Create persistent aiohttp session for all card fetches
async with aiohttp.ClientSession() as session:
for x in all_players:
if 'pitching' in x['image'] and SKIP_ARMS:
pass
elif 'batting' in x['image'] and SKIP_BATS:
pass
elif START_ID is not None and START_ID > x['player_id']:
pass
elif 'sombaseball' in x['image']:
errors.append((x, f'Bad card url: {x["image"]}'))
else:
card_url = x['image']
timeout = 6
count += 1
if count % 20 == 0:
print(f'Card #{count + 1} being pulled is {x["p_name"]}...')
elif TEST_COUNT is not None and TEST_COUNT < count:
print(f'Done test run')
break
try:
logger.info(f'calling the card url')
resp = await url_get(card_url, timeout=timeout)
if HTML_CARDS:
card_url = f'{x["image"]}&html=true'
timeout = 2
else:
card_url = x['image']
timeout = 6
except ConnectionError as e:
if cxn_error:
raise e
try:
# Determine card type from URL
card_type = 'pitching' if 'pitching' in x['image'] else 'batting'
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
else:
# Determine card type from URL
card_type = 'pitching' if 'pitching' in x['image'] else 'batting'
# Upload to S3 if enabled
if UPLOAD_TO_S3 and not HTML_CARDS:
try:
s3_url = upload_card_to_s3(resp, x['player_id'], card_type, release_date)
# Upload to S3 if enabled
if UPLOAD_TO_S3 and not HTML_CARDS:
# Fetch card image bytes directly
image_bytes = await fetch_card_image(session, card_url, timeout=timeout)
s3_url = upload_card_to_s3(image_bytes, x['player_id'], card_type, release_date, cardset['id'])
uploads.append((x['player_id'], card_type, s3_url))
# Update player record with new S3 URL
@ -157,58 +175,69 @@ async def main(args):
])
url_updates.append((x['player_id'], card_type, s3_url))
logger.info(f'Updated player {x["player_id"]} image URL to S3')
else:
# Just validate card exists (old behavior)
logger.info(f'calling the card url')
resp = await url_get(card_url, timeout=timeout)
except Exception as e:
logger.error(f'S3 upload/update failed for player {x["player_id"]}: {e}')
errors.append((x, f'S3 error: {e}'))
continue
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
except Exception as e:
logger.error(f'S3 upload/update failed for player {x["player_id"]}: {e}')
errors.append((x, f'S3 error: {e}'))
continue
# Handle image2 (dual-position players)
if x['image2'] is not None:
if HTML_CARDS:
card_url = f'{x["image2"]}&html=true'
card_url2 = f'{x["image2"]}&html=true'
else:
card_url = x['image2']
card_url2 = x['image2']
if 'sombaseball' in x['image2']:
errors.append((x, f'Bad card url: {x["image2"]}'))
else:
try:
resp = await url_get(card_url, timeout=6)
card_type2 = 'pitching' if 'pitching' in x['image2'] else 'batting'
if UPLOAD_TO_S3 and not HTML_CARDS:
# Fetch second card image bytes directly
image_bytes2 = await fetch_card_image(session, card_url2, timeout=6)
s3_url2 = upload_card_to_s3(image_bytes2, x['player_id'], card_type2, release_date, cardset['id'])
uploads.append((x['player_id'], card_type2, s3_url2))
# Update player record with new S3 URL for image2
if UPDATE_PLAYER_URLS:
await db_patch('players', object_id=x['player_id'], params=[
('image2', s3_url2)
])
url_updates.append((x['player_id'], card_type2, s3_url2))
logger.info(f'Updated player {x["player_id"]} image2 URL to S3')
else:
# Just validate card exists (old behavior)
resp = await url_get(card_url2, timeout=6)
successes.append(x)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
else:
# Upload second card to S3
card_type2 = 'pitching' if 'pitching' in x['image2'] else 'batting'
if UPLOAD_TO_S3 and not HTML_CARDS:
try:
s3_url2 = upload_card_to_s3(resp, x['player_id'], card_type2, release_date)
uploads.append((x['player_id'], card_type2, s3_url2))
# Update player record with new S3 URL for image2
if UPDATE_PLAYER_URLS:
await db_patch('players', object_id=x['player_id'], params=[
('image2', s3_url2)
])
url_updates.append((x['player_id'], card_type2, s3_url2))
logger.info(f'Updated player {x["player_id"]} image2 URL to S3')
except Exception as e:
logger.error(f'S3 upload/update failed for player {x["player_id"]} image2: {e}')
errors.append((x, f'S3 error (image2): {e}'))
continue
successes.append(x)
except Exception as e:
logger.error(f'S3 upload/update failed for player {x["player_id"]} image2: {e}')
errors.append((x, f'S3 error (image2): {e}'))
else:
successes.append(x)

142
rank_pitching_staffs.py Normal file
View File

@ -0,0 +1,142 @@
import asyncio
from collections import defaultdict
from db_calls import db_get
async def rank_pitching_staffs(cardset_id: int = 27):
"""Rank teams by pitching staff quality based on player rarity."""
print(f'Fetching players from cardset {cardset_id}...\n')
# Fetch all players from the cardset
p_query = await db_get(
'players',
params=[('cardset_id', cardset_id), ('inc_dex', False)]
)
if not p_query or p_query['count'] == 0:
print(f'No players found for cardset {cardset_id}')
return
players = p_query['players']
print(f'Found {len(players)} players\n')
# Assign point values to each rarity
rarity_points = {
'Hall of Fame': 5,
'MVP': 4,
'All-Star': 3,
'Starter': 2,
'Reserve': 1,
'Replacement': 0.5
}
# Collect pitching staff data by franchise
franchise_pitching = defaultdict(lambda: {
'pitchers': [],
'total_points': 0,
'count': 0,
'avg_points': 0,
'rarities': defaultdict(int)
})
for player in players:
franchise = player.get('franchise', 'Unknown')
rarity_obj = player.get('rarity', {})
# Extract rarity name from rarity object
if isinstance(rarity_obj, dict):
rarity = rarity_obj.get('name', 'Unknown')
else:
rarity = str(rarity_obj) if rarity_obj else 'Unknown'
# Determine if pitcher based on positions
positions = []
for i in range(1, 9):
pos = player.get(f'pos_{i}')
if pos:
positions.append(pos)
is_pitcher = any(pos in ['SP', 'RP', 'CP'] for pos in positions)
if is_pitcher and franchise != 'Unknown':
points = rarity_points.get(rarity, 0)
franchise_pitching[franchise]['pitchers'].append({
'name': player.get('p_name'),
'rarity': rarity,
'points': points
})
franchise_pitching[franchise]['total_points'] += points
franchise_pitching[franchise]['count'] += 1
franchise_pitching[franchise]['rarities'][rarity] += 1
# Calculate average points per pitcher
for franchise in franchise_pitching:
count = franchise_pitching[franchise]['count']
if count > 0:
franchise_pitching[franchise]['avg_points'] = (
franchise_pitching[franchise]['total_points'] / count
)
# Sort franchises by total points (descending)
ranked_teams = sorted(
franchise_pitching.items(),
key=lambda x: (x[1]['total_points'], x[1]['avg_points']),
reverse=True
)
# Print rankings
print('=' * 120)
print('PITCHING STAFF RANKINGS (by total rarity points)')
print('=' * 120)
print(f'{"Rank":<6} {"Franchise":<25} {"Pitchers":<10} {"Total Pts":<12} {"Avg Pts":<10} {"HoF":<6} {"MVP":<6} {"AS":<6} {"STR":<6} {"RES":<6} {"REP":<6}')
print('-' * 120)
for rank, (franchise, data) in enumerate(ranked_teams, start=1):
hof = data['rarities'].get('Hall of Fame', 0)
mvp = data['rarities'].get('MVP', 0)
all_star = data['rarities'].get('All-Star', 0)
starter = data['rarities'].get('Starter', 0)
reserve = data['rarities'].get('Reserve', 0)
replacement = data['rarities'].get('Replacement', 0)
print(f'{rank:<6} {franchise:<25} {data["count"]:<10} {data["total_points"]:<12.1f} {data["avg_points"]:<10.2f} '
f'{hof:<6} {mvp:<6} {all_star:<6} {starter:<6} {reserve:<6} {replacement:<6}')
print('=' * 120)
# Print top 5 teams with details
print('\n' + '=' * 100)
print('TOP 5 PITCHING STAFFS - DETAILED ROSTERS')
print('=' * 100)
for rank, (franchise, data) in enumerate(ranked_teams[:5], start=1):
print(f'\n#{rank} - {franchise.upper()} ({data["total_points"]:.1f} points, {data["count"]} pitchers)')
print('-' * 100)
# Sort pitchers by points
sorted_pitchers = sorted(data['pitchers'], key=lambda x: x['points'], reverse=True)
for pitcher in sorted_pitchers:
print(f' {pitcher["name"]:<30} {pitcher["rarity"]:<15} ({pitcher["points"]} pts)')
# Print bottom 5 teams
print('\n' + '=' * 100)
print('BOTTOM 5 PITCHING STAFFS - DETAILED ROSTERS')
print('=' * 100)
for rank, (franchise, data) in enumerate(ranked_teams[-5:], start=len(ranked_teams)-4):
print(f'\n#{rank} - {franchise.upper()} ({data["total_points"]:.1f} points, {data["count"]} pitchers)')
print('-' * 100)
# Sort pitchers by points
sorted_pitchers = sorted(data['pitchers'], key=lambda x: x['points'], reverse=True)
for pitcher in sorted_pitchers:
print(f' {pitcher["name"]:<30} {pitcher["rarity"]:<15} ({pitcher["points"]} pts)')
print('\n' + '=' * 100)
if __name__ == '__main__':
asyncio.run(rank_pitching_staffs(27))