paper-dynasty-card-creation/check_cards_and_upload.py
Cal Corum 8bddf31bf6 feat: configurable API URL for local high-concurrency card rendering
Allow upload scripts to target a local API server instead of the remote
production server, enabling 32x+ concurrency for dramatically faster
full-cardset uploads (~30-45s vs ~2-3min for 800 cards).

- pd_cards/core/upload.py: add api_url param to upload_cards_to_s3(),
  refresh_card_images(), and check_card_images()
- pd_cards/commands/upload.py: add --api-url CLI option to upload s3
- check_cards_and_upload.py: read PD_API_URL env var with prod fallback
- Update CLAUDE.md, CLI reference, and Phase 0 project plan docs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 10:27:16 -05:00

387 lines
14 KiB
Python

import asyncio
import datetime
import functools
import os
import sys
import boto3
from db_calls import db_get, db_patch, url_get
from exceptions import logger
# Configuration
CARDSET_NAME = "2005 Live"
START_ID = None # Integer to only start pulling cards at player_id START_ID
TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls
HTML_CARDS = False # boolean to only check and not generate cards
SKIP_ARMS = False
SKIP_BATS = False
# Concurrency
CONCURRENCY = 8 # number of parallel card-processing tasks
# AWS Configuration
AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name
AWS_REGION = "us-east-1" # Change to your region
S3_BASE_URL = f"https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com"
UPLOAD_TO_S3 = (
True # Set to False to skip S3 upload (testing) - STEP 6: Upload validated cards
)
UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs
# Initialize S3 client (module-level; boto3 client is thread-safe for concurrent reads)
s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None
async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes:
"""
Fetch card image from URL and return raw bytes.
Args:
session: aiohttp ClientSession to use for the request
card_url: URL to fetch the card from
timeout: Request timeout in seconds
Returns:
Raw PNG image bytes
"""
import aiohttp
async with session.get(
card_url, timeout=aiohttp.ClientTimeout(total=timeout)
) as resp:
if resp.status == 200:
logger.info(f"Fetched card image from {card_url}")
return await resp.read()
else:
error_text = await resp.text()
logger.error(f"Failed to fetch card: {error_text}")
raise ValueError(f"Card fetch error: {error_text}")
def upload_card_to_s3(
image_data: bytes,
player_id: int,
card_type: str,
release_date: str,
cardset_id: int,
) -> str:
"""
Upload card image to S3 and return the S3 URL with cache-busting param.
Args:
image_data: Raw PNG image bytes
player_id: Player ID
card_type: 'batting' or 'pitching'
release_date: Date string for cache busting (e.g., '2025-11-8')
cardset_id: Cardset ID (will be zero-padded to 3 digits)
Returns:
Full S3 URL with ?d= parameter
"""
# Format cardset_id with 3 digits and leading zeros
cardset_str = f"{cardset_id:03d}"
s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png"
try:
s3_client.put_object(
Bucket=AWS_BUCKET_NAME,
Key=s3_key,
Body=image_data,
ContentType="image/png",
CacheControl="public, max-age=300", # 5 minute cache
Metadata={
"player-id": str(player_id),
"card-type": card_type,
"upload-date": datetime.datetime.now().isoformat(),
},
)
# Return URL with cache-busting parameter
s3_url = f"{S3_BASE_URL}/{s3_key}?d={release_date}"
logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}")
return s3_url
except Exception as e:
logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}")
raise
async def main(args):
import aiohttp
print(f"Searching for cardset: {CARDSET_NAME}")
c_query = await db_get("cardsets", params=[("name", CARDSET_NAME)])
if not c_query or c_query["count"] == 0:
print(f"I do not see a cardset named {CARDSET_NAME}")
return
cardset = c_query["cardsets"][0]
del c_query
p_query = await db_get(
"players",
params=[
("inc_dex", False),
("cardset_id", cardset["id"]),
("short_output", True),
],
)
if not p_query or p_query["count"] == 0:
raise ValueError("No players returned from Paper Dynasty API")
all_players = p_query["players"]
del p_query
# Generate release date for cache busting (include timestamp for same-day updates)
now = datetime.datetime.now()
timestamp = int(now.timestamp())
release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}"
# PD API base URL for card generation (override with PD_API_URL env var for local rendering)
PD_API_URL = os.environ.get("PD_API_URL", "https://pd.manticorum.com/api")
print(f"\nRelease date for cards: {release_date}")
print(f"API URL: {PD_API_URL}")
print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}")
print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}")
print(f"Concurrency: {CONCURRENCY} parallel tasks\n")
# Build filtered list respecting SKIP_ARMS, SKIP_BATS, START_ID, TEST_COUNT
max_count = TEST_COUNT if TEST_COUNT is not None else 9999
filtered_players = []
for x in all_players:
if len(filtered_players) >= max_count:
break
if "pitching" in x["image"] and SKIP_ARMS:
continue
if "batting" in x["image"] and SKIP_BATS:
continue
if START_ID is not None and START_ID > x["player_id"]:
continue
filtered_players.append(x)
total = len(filtered_players)
logger.info(f"Processing {total} cards with concurrency={CONCURRENCY}")
# Shared mutable state protected by locks
errors = []
successes = []
uploads = []
url_updates = []
completed = 0
progress_lock = asyncio.Lock()
results_lock = asyncio.Lock()
start_time = datetime.datetime.now()
loop = asyncio.get_running_loop()
semaphore = asyncio.Semaphore(CONCURRENCY)
async def report_progress():
"""Increment the completed counter and log/print every 20 completions."""
nonlocal completed
async with progress_lock:
completed += 1
if completed % 20 == 0 or completed == total:
print(f"Progress: {completed}/{total} cards processed")
logger.info(f"Progress: {completed}/{total} cards processed")
async def process_single_card(x: dict) -> None:
"""
Process one player entry under the semaphore: fetch card image(s), upload
to S3 (offloading the synchronous boto3 call to a thread pool), and
optionally patch the player record with the new S3 URL.
Both the primary card (image) and the secondary card for two-way players
(image2) are handled. Failures are appended to the shared errors list
rather than re-raised so the overall batch continues.
"""
async with semaphore:
player_id = x["player_id"]
# --- primary card ---
if "sombaseball" in x["image"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image']}"))
await report_progress()
return
card_type = "pitching" if "pitching" in x["image"] else "batting"
pd_card_url = (
f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}"
)
if HTML_CARDS:
card_url = f"{pd_card_url}&html=true"
timeout = 2
else:
card_url = pd_card_url
timeout = 10
primary_ok = False
try:
if UPLOAD_TO_S3 and not HTML_CARDS:
image_bytes = await fetch_card_image(
session, card_url, timeout=timeout
)
# boto3 is synchronous — offload to thread pool so the event
# loop is not blocked during the S3 PUT
s3_url = await loop.run_in_executor(
None,
functools.partial(
upload_card_to_s3,
image_bytes,
player_id,
card_type,
release_date,
cardset["id"],
),
)
async with results_lock:
uploads.append((player_id, card_type, s3_url))
if UPDATE_PLAYER_URLS:
await db_patch(
"players",
object_id=player_id,
params=[("image", s3_url)],
)
async with results_lock:
url_updates.append((player_id, card_type, s3_url))
logger.info(f"Updated player {player_id} image URL to S3")
else:
# Just validate card exists (old behavior)
logger.info("calling the card url")
await url_get(card_url, timeout=timeout)
primary_ok = True
except ConnectionError as e:
logger.error(f"Connection error for player {player_id}: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(f"S3 upload/update failed for player {player_id}: {e}")
async with results_lock:
errors.append((x, f"S3 error: {e}"))
if not primary_ok:
await report_progress()
return
# --- secondary card (two-way players) ---
if x["image2"] is not None:
if "sombaseball" in x["image2"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image2']}"))
await report_progress()
return
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}"
if HTML_CARDS:
card_url2 = f"{pd_card_url2}&html=true"
else:
card_url2 = pd_card_url2
try:
if UPLOAD_TO_S3 and not HTML_CARDS:
image_bytes2 = await fetch_card_image(
session, card_url2, timeout=10
)
s3_url2 = await loop.run_in_executor(
None,
functools.partial(
upload_card_to_s3,
image_bytes2,
player_id,
card_type2,
release_date,
cardset["id"],
),
)
async with results_lock:
uploads.append((player_id, card_type2, s3_url2))
if UPDATE_PLAYER_URLS:
await db_patch(
"players",
object_id=x["player_id"],
params=[("image2", s3_url2)],
)
async with results_lock:
url_updates.append((player_id, card_type2, s3_url2))
logger.info(f"Updated player {player_id} image2 URL to S3")
else:
# Just validate card exists (old behavior)
await url_get(card_url2, timeout=10)
async with results_lock:
successes.append(x)
except ConnectionError as e:
logger.error(f"Connection error for player {player_id} image2: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(
f"S3 upload/update failed for player {player_id} image2: {e}"
)
async with results_lock:
errors.append((x, f"S3 error (image2): {e}"))
else:
async with results_lock:
successes.append(x)
await report_progress()
# Create persistent aiohttp session shared across all concurrent tasks
async with aiohttp.ClientSession() as session:
tasks = [process_single_card(x) for x in filtered_players]
await asyncio.gather(*tasks, return_exceptions=True)
# Print summary
print(f"\n{'=' * 60}")
print("SUMMARY")
print(f"{'=' * 60}")
if len(errors) > 0:
logger.error("All Errors:")
for x in errors:
logger.error(f"ID {x[0]['player_id']} {x[0]['p_name']} - Error: {x[1]}")
if len(successes) > 0:
logger.debug("All Successes:")
for x in successes:
logger.info(f"ID {x['player_id']} {x['p_name']}")
p_run_time = datetime.datetime.now() - start_time
print(f"\nErrors: {len(errors)}")
print(f"Successes: {len(successes)}")
if UPLOAD_TO_S3:
print(f"S3 Uploads: {len(uploads)}")
if len(uploads) > 0:
print(f" First upload: {uploads[0][2]}")
if UPDATE_PLAYER_URLS:
print(f"URL Updates: {len(url_updates)}")
print(f"\nTotal runtime: {p_run_time.total_seconds():.2f} seconds")
print(f"{'=' * 60}")
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))