paper-dynasty-card-creation/pd_cards/core/upload.py
Cal Corum 8bddf31bf6 feat: configurable API URL for local high-concurrency card rendering
Allow upload scripts to target a local API server instead of the remote
production server, enabling 32x+ concurrency for dramatically faster
full-cardset uploads (~30-45s vs ~2-3min for 800 cards).

- pd_cards/core/upload.py: add api_url param to upload_cards_to_s3(),
  refresh_card_images(), and check_card_images()
- pd_cards/commands/upload.py: add --api-url CLI option to upload s3
- check_cards_and_upload.py: read PD_API_URL env var with prod fallback
- Update CLAUDE.md, CLI reference, and Phase 0 project plan docs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 10:27:16 -05:00

632 lines
21 KiB
Python

"""
Card image upload and management core logic.
Business logic for uploading card images to AWS S3 and managing card URLs.
"""
import asyncio
import datetime
from typing import Optional
import urllib.parse
# These imports are resolved at runtime when called from CLI
# since the CLI adds the parent directory to sys.path
from db_calls import db_get, db_patch, db_post, url_get
from exceptions import logger
# AWS Configuration
DEFAULT_AWS_BUCKET = "paper-dynasty"
DEFAULT_AWS_REGION = "us-east-1"
def get_s3_base_url(
bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION
) -> str:
"""Get the S3 base URL for a bucket."""
return f"https://{bucket}.s3.{region}.amazonaws.com"
async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes:
"""
Fetch card image from URL and return raw bytes.
Args:
session: aiohttp ClientSession to use for the request
card_url: URL to fetch the card from
timeout: Request timeout in seconds
Returns:
Raw PNG image bytes
"""
import aiohttp
async with session.get(
card_url, timeout=aiohttp.ClientTimeout(total=timeout)
) as resp:
if resp.status == 200:
logger.info(f"Fetched card image from {card_url}")
return await resp.read()
else:
error_text = await resp.text()
logger.error(f"Failed to fetch card: {error_text}")
raise ValueError(f"Card fetch error: {error_text}")
def upload_card_to_s3(
s3_client,
image_data: bytes,
player_id: int,
card_type: str,
release_date: str,
cardset_id: int,
bucket: str = DEFAULT_AWS_BUCKET,
region: str = DEFAULT_AWS_REGION,
) -> str:
"""
Upload card image to S3 and return the S3 URL with cache-busting param.
Args:
s3_client: Boto3 S3 client
image_data: Raw PNG image bytes
player_id: Player ID
card_type: 'batting' or 'pitching'
release_date: Date string for cache busting (e.g., '2025-11-8')
cardset_id: Cardset ID (will be zero-padded to 3 digits)
bucket: S3 bucket name
region: AWS region
Returns:
Full S3 URL with ?d= parameter
"""
# Format cardset_id with 3 digits and leading zeros
cardset_str = f"{cardset_id:03d}"
s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png"
s3_base_url = get_s3_base_url(bucket, region)
try:
s3_client.put_object(
Bucket=bucket,
Key=s3_key,
Body=image_data,
ContentType="image/png",
CacheControl="public, max-age=300", # 5 minute cache
Metadata={
"player-id": str(player_id),
"card-type": card_type,
"upload-date": datetime.datetime.now().isoformat(),
},
)
# Return URL with cache-busting parameter
s3_url = f"{s3_base_url}/{s3_key}?d={release_date}"
logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}")
return s3_url
except Exception as e:
logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}")
raise
DEFAULT_PD_API_URL = "https://pd.manticorum.com/api"
async def upload_cards_to_s3(
cardset_name: str,
start_id: Optional[int] = None,
limit: Optional[int] = None,
html_cards: bool = False,
skip_batters: bool = False,
skip_pitchers: bool = False,
upload: bool = True,
update_urls: bool = True,
bucket: str = DEFAULT_AWS_BUCKET,
region: str = DEFAULT_AWS_REGION,
on_progress: callable = None,
concurrency: int = 8,
api_url: str = DEFAULT_PD_API_URL,
) -> dict:
"""
Upload card images to S3 for a cardset using concurrent async tasks.
Cards are fetched and uploaded in parallel, bounded by ``concurrency``
semaphore slots. boto3 S3 calls (synchronous) are offloaded to a thread
pool via ``loop.run_in_executor`` so they do not block the event loop.
Individual card failures are collected and do NOT abort the batch;
a summary is logged once all tasks complete.
Args:
cardset_name: Name of the cardset to process
start_id: Player ID to start from (for resuming)
limit: Maximum number of cards to process
html_cards: Fetch HTML preview cards instead of PNG
skip_batters: Skip batting cards
skip_pitchers: Skip pitching cards
upload: Actually upload to S3
update_urls: Update player URLs in database
bucket: S3 bucket name
region: AWS region
on_progress: Callback function for progress updates
concurrency: Number of parallel card-processing tasks (default 8)
Returns:
Dict with counts of errors, successes, uploads, url_updates
"""
import aiohttp
import boto3
# Look up cardset
c_query = await db_get("cardsets", params=[("name", cardset_name)])
if not c_query or c_query["count"] == 0:
raise ValueError(f'Cardset "{cardset_name}" not found')
cardset = c_query["cardsets"][0]
# Get all players
p_query = await db_get(
"players",
params=[
("inc_dex", False),
("cardset_id", cardset["id"]),
("short_output", True),
],
)
if not p_query or p_query["count"] == 0:
raise ValueError("No players returned from Paper Dynasty API")
all_players = p_query["players"]
# Generate release date for cache busting (include timestamp for same-day updates)
now = datetime.datetime.now()
timestamp = int(now.timestamp())
release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}"
# PD API base URL for card generation (configurable for local rendering)
PD_API_URL = api_url
logger.info(f"Using API URL: {PD_API_URL}")
# Initialize S3 client if uploading (boto3 client is thread-safe for reads;
# we will call it from a thread pool so we create it once here)
s3_client = boto3.client("s3", region_name=region) if upload else None
# Build the filtered list of players to process, respecting start_id / limit
max_count = limit or 9999
filtered_players = []
for x in all_players:
if len(filtered_players) >= max_count:
break
if "pitching" in x["image"] and skip_pitchers:
continue
if "batting" in x["image"] and skip_batters:
continue
if start_id is not None and start_id > x["player_id"]:
continue
filtered_players.append(x)
total = len(filtered_players)
logger.info(f"Processing {total} cards with concurrency={concurrency}")
# Shared mutable state protected by a lock
errors = []
successes = []
uploads = []
url_updates = []
completed = 0
progress_lock = asyncio.Lock()
results_lock = asyncio.Lock()
loop = asyncio.get_running_loop()
semaphore = asyncio.Semaphore(concurrency)
async def report_progress():
"""Increment the completed counter and log every 20 completions."""
nonlocal completed
async with progress_lock:
completed += 1
if completed % 20 == 0 or completed == total:
logger.info(f"Progress: {completed}/{total} cards processed")
if on_progress:
on_progress(completed, f"{completed}/{total}")
async def process_single_card(x: dict) -> None:
"""
Process one player entry: fetch card image(s), upload to S3, and
optionally patch the player record with the new S3 URL.
Both the primary card (image) and the secondary card for two-way
players (image2) are handled here. Errors are appended to the
shared ``errors`` list rather than re-raised so the batch continues.
"""
async with semaphore:
player_id = x["player_id"]
# --- primary card ---
if "sombaseball" in x["image"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image']}"))
await report_progress()
return
card_type = "pitching" if "pitching" in x["image"] else "batting"
pd_card_url = (
f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}"
)
if html_cards:
card_url = f"{pd_card_url}&html=true"
timeout = 2
else:
card_url = pd_card_url
timeout = 10
primary_ok = False
try:
if upload and not html_cards:
image_bytes = await fetch_card_image(
session, card_url, timeout=timeout
)
# boto3 is synchronous — offload to thread pool
s3_url = await loop.run_in_executor(
None,
upload_card_to_s3,
s3_client,
image_bytes,
player_id,
card_type,
release_date,
cardset["id"],
bucket,
region,
)
async with results_lock:
uploads.append((player_id, card_type, s3_url))
if update_urls:
await db_patch(
"players",
object_id=player_id,
params=[("image", s3_url)],
)
async with results_lock:
url_updates.append((player_id, card_type, s3_url))
logger.info(f"Updated player {player_id} image URL to S3")
else:
logger.info(f"Validating card URL: {card_url}")
await url_get(card_url, timeout=timeout)
primary_ok = True
except ConnectionError as e:
logger.error(f"Connection error for player {player_id}: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(f"S3 upload/update failed for player {player_id}: {e}")
async with results_lock:
errors.append((x, f"S3 error: {e}"))
if not primary_ok:
await report_progress()
return
# --- secondary card (two-way players) ---
if x["image2"] is not None:
if "sombaseball" in x["image2"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image2']}"))
await report_progress()
return
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}"
card_url2 = f"{pd_card_url2}&html=true" if html_cards else pd_card_url2
try:
if upload and not html_cards:
image_bytes2 = await fetch_card_image(
session, card_url2, timeout=10
)
s3_url2 = await loop.run_in_executor(
None,
upload_card_to_s3,
s3_client,
image_bytes2,
player_id,
card_type2,
release_date,
cardset["id"],
bucket,
region,
)
async with results_lock:
uploads.append((player_id, card_type2, s3_url2))
if update_urls:
await db_patch(
"players",
object_id=player_id,
params=[("image2", s3_url2)],
)
async with results_lock:
url_updates.append((player_id, card_type2, s3_url2))
logger.info(f"Updated player {player_id} image2 URL to S3")
else:
await url_get(card_url2, timeout=10)
async with results_lock:
successes.append(x)
except ConnectionError as e:
logger.error(f"Connection error for player {player_id} image2: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(
f"S3 upload/update failed for player {player_id} image2: {e}"
)
async with results_lock:
errors.append((x, f"S3 error (image2): {e}"))
else:
async with results_lock:
successes.append(x)
await report_progress()
async with aiohttp.ClientSession() as session:
tasks = [process_single_card(x) for x in filtered_players]
await asyncio.gather(*tasks, return_exceptions=True)
# Log final summary
success_count = len(successes)
error_count = len(errors)
logger.info(
f"Upload complete: {success_count} succeeded, {error_count} failed "
f"out of {total} cards"
)
if error_count:
for player, err in errors:
logger.warning(
f" Failed: player {player.get('player_id', '?')} "
f"({player.get('p_name', '?')}): {err}"
)
return {
"errors": errors,
"successes": successes,
"uploads": uploads,
"url_updates": url_updates,
"release_date": release_date,
"cardset": cardset,
}
async def refresh_card_images(
cardset_name: str,
limit: Optional[int] = None,
html_cards: bool = False,
on_progress: callable = None,
api_url: str = DEFAULT_PD_API_URL,
) -> dict:
"""
Refresh card images for a cardset by triggering regeneration.
Args:
cardset_name: Name of the cardset to process
limit: Maximum number of cards to process
html_cards: Fetch HTML preview cards instead of PNG
on_progress: Callback function for progress updates
Returns:
Dict with counts of errors, successes
"""
# Look up cardset
c_query = await db_get("cardsets", params=[("name", cardset_name)])
if not c_query or c_query["count"] == 0:
raise ValueError(f'Cardset "{cardset_name}" not found')
cardset = c_query["cardsets"][0]
CARD_BASE_URL = f"{api_url}/v2/players"
# Get all players
p_query = await db_get(
"players",
params=[
("inc_dex", False),
("cardset_id", cardset["id"]),
("short_output", True),
],
)
if p_query["count"] == 0:
raise ValueError("No players returned from Paper Dynasty API")
all_players = p_query["players"]
errors = []
successes = []
cxn_error = False
count = 0
max_count = limit or 9999
start_time = datetime.datetime.now()
# First pass: Reset URLs for players with old sombaseball URLs
for x in all_players:
if "sombaseball" in x["image"]:
if on_progress:
on_progress(count, f"{x['p_name']} - fixing old URL")
release_dir = f"{start_time.year}-{start_time.month}-{start_time.day}"
if x["pos_1"] in ["SP", "RP", "CP", "P"]:
image_url = (
f"{CARD_BASE_URL}/{x['player_id']}/pitchingcard"
f"{urllib.parse.quote('?d=')}{release_dir}"
)
else:
image_url = (
f"{CARD_BASE_URL}/{x['player_id']}/battingcard"
f"{urllib.parse.quote('?d=')}{release_dir}"
)
await db_patch(
"players", object_id=x["player_id"], params=[("image", image_url)]
)
else:
count += 1
if on_progress and count % 20 == 0:
on_progress(count, f"{x['p_name']} - resetting")
if count >= max_count:
break
try:
await db_post(f"players/{x['player_id']}/image-reset")
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
# Second pass: Fetch images to trigger regeneration
count = 0
for x in all_players:
if count >= max_count:
break
if html_cards:
card_url = f"{x['image']}&html=true"
timeout = 2
else:
card_url = x["image"]
timeout = 6
try:
logger.info(f"Fetching card URL: {card_url}")
await url_get(card_url, timeout=timeout)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
else:
# Handle image2
if x["image2"] is not None:
if html_cards:
card_url2 = f"{x['image2']}&html=true"
else:
card_url2 = x["image2"]
if "sombaseball" in x["image2"]:
errors.append((x, f"Bad card url: {x['image2']}"))
else:
try:
await url_get(card_url2, timeout=6)
successes.append(x)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
else:
successes.append(x)
count += 1
return {"errors": errors, "successes": successes, "cardset": cardset}
async def check_card_images(
cardset_name: str,
limit: Optional[int] = None,
on_progress: callable = None,
api_url: str = DEFAULT_PD_API_URL,
) -> dict:
"""
Check and validate card images without uploading.
Args:
cardset_name: Name of the cardset to check
limit: Maximum number of cards to check
on_progress: Callback function for progress updates
Returns:
Dict with counts of errors and successes
"""
# Look up cardset
c_query = await db_get("cardsets", params=[("name", cardset_name)])
if not c_query or c_query["count"] == 0:
raise ValueError(f'Cardset "{cardset_name}" not found')
cardset = c_query["cardsets"][0]
# Get all players
p_query = await db_get(
"players",
params=[
("inc_dex", False),
("cardset_id", cardset["id"]),
("short_output", True),
],
)
if not p_query or p_query["count"] == 0:
raise ValueError("No players returned from Paper Dynasty API")
all_players = p_query["players"]
# Generate release date for cache busting (include timestamp for same-day updates)
now = datetime.datetime.now()
timestamp = int(now.timestamp())
release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}"
PD_API_URL = api_url
errors = []
successes = []
cxn_error = False
count = 0
max_count = limit or 9999
for x in all_players:
if count >= max_count:
break
if "sombaseball" in x["image"]:
errors.append((x, f"Bad card url: {x['image']}"))
continue
count += 1
if on_progress and count % 20 == 0:
on_progress(count, x["p_name"])
card_type = "pitching" if "pitching" in x["image"] else "batting"
card_url = (
f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}"
)
try:
logger.info(f"Checking card URL: {card_url}")
await url_get(card_url, timeout=6)
successes.append(x)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
return {
"errors": errors,
"successes": successes,
"cardset": cardset,
"release_date": release_date,
}