feat: concurrent upload pipeline for legacy script (WP-05) (#92)

Refactor check_cards_and_upload.py sequential loop to use asyncio.gather
+ Semaphore (CONCURRENCY=8), offload boto3 S3 calls to thread pool via
loop.run_in_executor, increase fetch timeout to 10s, and add progress
reporting every 20 completions.

Closes #92

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-03-13 08:05:22 -05:00
parent 336014b689
commit 0d603ccd48

View File

@ -1,5 +1,6 @@
import asyncio
import datetime
import functools
import sys
import boto3
@ -14,6 +15,9 @@ HTML_CARDS = False # boolean to only check and not generate cards
SKIP_ARMS = False
SKIP_BATS = False
# Concurrency
CONCURRENCY = 8 # number of parallel card-processing tasks
# AWS Configuration
AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name
AWS_REGION = "us-east-1" # Change to your region
@ -23,11 +27,11 @@ UPLOAD_TO_S3 = (
)
UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs
# Initialize S3 client
# Initialize S3 client (module-level; boto3 client is thread-safe for concurrent reads)
s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None
async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes:
async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes:
"""
Fetch card image from URL and return raw bytes.
@ -134,165 +138,216 @@ async def main(args):
# PD API base URL for card generation
PD_API_URL = "https://pd.manticorum.com/api"
print(f"\nRelease date for cards: {release_date}")
print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}")
print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}")
print(f"Concurrency: {CONCURRENCY} parallel tasks\n")
# Build filtered list respecting SKIP_ARMS, SKIP_BATS, START_ID, TEST_COUNT
max_count = TEST_COUNT if TEST_COUNT is not None else 9999
filtered_players = []
for x in all_players:
if len(filtered_players) >= max_count:
break
if "pitching" in x["image"] and SKIP_ARMS:
continue
if "batting" in x["image"] and SKIP_BATS:
continue
if START_ID is not None and START_ID > x["player_id"]:
continue
filtered_players.append(x)
total = len(filtered_players)
logger.info(f"Processing {total} cards with concurrency={CONCURRENCY}")
# Shared mutable state protected by locks
errors = []
successes = []
uploads = []
url_updates = []
cxn_error = False
count = -1
completed = 0
progress_lock = asyncio.Lock()
results_lock = asyncio.Lock()
start_time = datetime.datetime.now()
loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(CONCURRENCY)
print(f"\nRelease date for cards: {release_date}")
print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}")
print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}\n")
async def report_progress():
"""Increment the completed counter and log/print every 20 completions."""
nonlocal completed
async with progress_lock:
completed += 1
if completed % 20 == 0 or completed == total:
print(f"Progress: {completed}/{total} cards processed")
logger.info(f"Progress: {completed}/{total} cards processed")
# Create persistent aiohttp session for all card fetches
async with aiohttp.ClientSession() as session:
for x in all_players:
if "pitching" in x["image"] and SKIP_ARMS:
pass
elif "batting" in x["image"] and SKIP_BATS:
pass
elif START_ID is not None and START_ID > x["player_id"]:
pass
elif "sombaseball" in x["image"]:
errors.append((x, f"Bad card url: {x['image']}"))
async def process_single_card(x: dict) -> None:
"""
Process one player entry under the semaphore: fetch card image(s), upload
to S3 (offloading the synchronous boto3 call to a thread pool), and
optionally patch the player record with the new S3 URL.
Both the primary card (image) and the secondary card for two-way players
(image2) are handled. Failures are appended to the shared errors list
rather than re-raised so the overall batch continues.
"""
async with semaphore:
player_id = x["player_id"]
# --- primary card ---
if "sombaseball" in x["image"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image']}"))
await report_progress()
return
card_type = "pitching" if "pitching" in x["image"] else "batting"
pd_card_url = (
f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}"
)
if HTML_CARDS:
card_url = f"{pd_card_url}&html=true"
timeout = 2
else:
count += 1
if count % 20 == 0:
print(f"Card #{count + 1} being pulled is {x['p_name']}...")
elif TEST_COUNT is not None and TEST_COUNT < count:
print("Done test run")
break
card_url = pd_card_url
timeout = 10
# Determine card type from existing image URL
card_type = "pitching" if "pitching" in x["image"] else "batting"
# Generate card URL from PD API (forces fresh generation from database)
pd_card_url = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}"
if HTML_CARDS:
card_url = f"{pd_card_url}&html=true"
timeout = 2
else:
card_url = pd_card_url
timeout = 6
try:
# Upload to S3 if enabled
if UPLOAD_TO_S3 and not HTML_CARDS:
# Fetch card image bytes directly
image_bytes = await fetch_card_image(
session, card_url, timeout=timeout
)
s3_url = upload_card_to_s3(
primary_ok = False
try:
if UPLOAD_TO_S3 and not HTML_CARDS:
image_bytes = await fetch_card_image(
session, card_url, timeout=timeout
)
# boto3 is synchronous — offload to thread pool so the event
# loop is not blocked during the S3 PUT
s3_url = await loop.run_in_executor(
None,
functools.partial(
upload_card_to_s3,
image_bytes,
x["player_id"],
player_id,
card_type,
release_date,
cardset["id"],
)
uploads.append((x["player_id"], card_type, s3_url))
),
)
async with results_lock:
uploads.append((player_id, card_type, s3_url))
if UPDATE_PLAYER_URLS:
await db_patch(
"players",
object_id=player_id,
params=[("image", s3_url)],
)
async with results_lock:
url_updates.append((player_id, card_type, s3_url))
logger.info(f"Updated player {player_id} image URL to S3")
else:
# Just validate card exists (old behavior)
logger.info("calling the card url")
await url_get(card_url, timeout=timeout)
primary_ok = True
except ConnectionError as e:
logger.error(f"Connection error for player {player_id}: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(f"S3 upload/update failed for player {player_id}: {e}")
async with results_lock:
errors.append((x, f"S3 error: {e}"))
if not primary_ok:
await report_progress()
return
# --- secondary card (two-way players) ---
if x["image2"] is not None:
if "sombaseball" in x["image2"]:
async with results_lock:
errors.append((x, f"Bad card url: {x['image2']}"))
await report_progress()
return
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}"
if HTML_CARDS:
card_url2 = f"{pd_card_url2}&html=true"
else:
card_url2 = pd_card_url2
try:
if UPLOAD_TO_S3 and not HTML_CARDS:
image_bytes2 = await fetch_card_image(
session, card_url2, timeout=10
)
s3_url2 = await loop.run_in_executor(
None,
functools.partial(
upload_card_to_s3,
image_bytes2,
player_id,
card_type2,
release_date,
cardset["id"],
),
)
async with results_lock:
uploads.append((player_id, card_type2, s3_url2))
# Update player record with new S3 URL
if UPDATE_PLAYER_URLS:
await db_patch(
"players",
object_id=x["player_id"],
params=[("image", s3_url)],
)
url_updates.append((x["player_id"], card_type, s3_url))
logger.info(
f"Updated player {x['player_id']} image URL to S3"
params=[("image2", s3_url2)],
)
async with results_lock:
url_updates.append((player_id, card_type2, s3_url2))
logger.info(f"Updated player {player_id} image2 URL to S3")
else:
# Just validate card exists (old behavior)
logger.info("calling the card url")
resp = await url_get(card_url, timeout=timeout)
await url_get(card_url2, timeout=10)
async with results_lock:
successes.append(x)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
logger.error(f"Connection error for player {player_id} image2: {e}")
async with results_lock:
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
async with results_lock:
errors.append((x, e))
except Exception as e:
logger.error(
f"S3 upload/update failed for player {x['player_id']}: {e}"
f"S3 upload/update failed for player {player_id} image2: {e}"
)
errors.append((x, f"S3 error: {e}"))
continue
async with results_lock:
errors.append((x, f"S3 error (image2): {e}"))
# Handle image2 (dual-position players)
if x["image2"] is not None:
# Determine second card type
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
# Generate card URL from PD API (forces fresh generation from database)
pd_card_url2 = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type2}card?d={release_date}"
if HTML_CARDS:
card_url2 = f"{pd_card_url2}&html=true"
else:
card_url2 = pd_card_url2
if "sombaseball" in x["image2"]:
errors.append((x, f"Bad card url: {x['image2']}"))
else:
try:
if UPLOAD_TO_S3 and not HTML_CARDS:
# Fetch second card image bytes directly from PD API
image_bytes2 = await fetch_card_image(
session, card_url2, timeout=6
)
s3_url2 = upload_card_to_s3(
image_bytes2,
x["player_id"],
card_type2,
release_date,
cardset["id"],
)
uploads.append((x["player_id"], card_type2, s3_url2))
# Update player record with new S3 URL for image2
if UPDATE_PLAYER_URLS:
await db_patch(
"players",
object_id=x["player_id"],
params=[("image2", s3_url2)],
)
url_updates.append(
(x["player_id"], card_type2, s3_url2)
)
logger.info(
f"Updated player {x['player_id']} image2 URL to S3"
)
else:
# Just validate card exists (old behavior)
resp = await url_get(card_url2, timeout=6)
successes.append(x)
except ConnectionError as e:
if cxn_error:
raise e
cxn_error = True
errors.append((x, e))
except ValueError as e:
errors.append((x, e))
except Exception as e:
logger.error(
f"S3 upload/update failed for player {x['player_id']} image2: {e}"
)
errors.append((x, f"S3 error (image2): {e}"))
else:
else:
async with results_lock:
successes.append(x)
await report_progress()
# Create persistent aiohttp session shared across all concurrent tasks
async with aiohttp.ClientSession() as session:
tasks = [process_single_card(x) for x in filtered_players]
await asyncio.gather(*tasks, return_exceptions=True)
# Print summary
print(f"\n{'=' * 60}")
print("SUMMARY")