import asyncio import datetime import functools import os import sys import boto3 from db_calls import db_get, db_patch, url_get from exceptions import logger # Configuration CARDSET_NAME = "2005 Live" START_ID = None # Integer to only start pulling cards at player_id START_ID TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls HTML_CARDS = False # boolean to only check and not generate cards SKIP_ARMS = False SKIP_BATS = False # Concurrency CONCURRENCY = 8 # number of parallel card-processing tasks # AWS Configuration AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name AWS_REGION = "us-east-1" # Change to your region S3_BASE_URL = f"https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com" UPLOAD_TO_S3 = ( True # Set to False to skip S3 upload (testing) - STEP 6: Upload validated cards ) UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs # Initialize S3 client (module-level; boto3 client is thread-safe for concurrent reads) s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes: """ Fetch card image from URL and return raw bytes. Args: session: aiohttp ClientSession to use for the request card_url: URL to fetch the card from timeout: Request timeout in seconds Returns: Raw PNG image bytes """ import aiohttp async with session.get( card_url, timeout=aiohttp.ClientTimeout(total=timeout) ) as resp: if resp.status == 200: logger.info(f"Fetched card image from {card_url}") return await resp.read() else: error_text = await resp.text() logger.error(f"Failed to fetch card: {error_text}") raise ValueError(f"Card fetch error: {error_text}") def upload_card_to_s3( image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int, ) -> str: """ Upload card image to S3 and return the S3 URL with cache-busting param. Args: image_data: Raw PNG image bytes player_id: Player ID card_type: 'batting' or 'pitching' release_date: Date string for cache busting (e.g., '2025-11-8') cardset_id: Cardset ID (will be zero-padded to 3 digits) Returns: Full S3 URL with ?d= parameter """ # Format cardset_id with 3 digits and leading zeros cardset_str = f"{cardset_id:03d}" s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png" try: s3_client.put_object( Bucket=AWS_BUCKET_NAME, Key=s3_key, Body=image_data, ContentType="image/png", CacheControl="public, max-age=300", # 5 minute cache Metadata={ "player-id": str(player_id), "card-type": card_type, "upload-date": datetime.datetime.now().isoformat(), }, ) # Return URL with cache-busting parameter s3_url = f"{S3_BASE_URL}/{s3_key}?d={release_date}" logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}") return s3_url except Exception as e: logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}") raise async def main(args): import aiohttp print(f"Searching for cardset: {CARDSET_NAME}") c_query = await db_get("cardsets", params=[("name", CARDSET_NAME)]) if not c_query or c_query["count"] == 0: print(f"I do not see a cardset named {CARDSET_NAME}") return cardset = c_query["cardsets"][0] del c_query p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] del p_query # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" # PD API base URL for card generation (override with PD_API_URL env var for local rendering) PD_API_URL = os.environ.get("PD_API_URL", "https://pd.manticorum.com/api") print(f"\nRelease date for cards: {release_date}") print(f"API URL: {PD_API_URL}") print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}") print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}") print(f"Concurrency: {CONCURRENCY} parallel tasks\n") # Build filtered list respecting SKIP_ARMS, SKIP_BATS, START_ID, TEST_COUNT max_count = TEST_COUNT if TEST_COUNT is not None else 9999 filtered_players = [] for x in all_players: if len(filtered_players) >= max_count: break if "pitching" in x["image"] and SKIP_ARMS: continue if "batting" in x["image"] and SKIP_BATS: continue if START_ID is not None and START_ID > x["player_id"]: continue filtered_players.append(x) total = len(filtered_players) logger.info(f"Processing {total} cards with concurrency={CONCURRENCY}") # Shared mutable state protected by locks errors = [] successes = [] uploads = [] url_updates = [] completed = 0 progress_lock = asyncio.Lock() results_lock = asyncio.Lock() start_time = datetime.datetime.now() loop = asyncio.get_running_loop() semaphore = asyncio.Semaphore(CONCURRENCY) async def report_progress(): """Increment the completed counter and log/print every 20 completions.""" nonlocal completed async with progress_lock: completed += 1 if completed % 20 == 0 or completed == total: print(f"Progress: {completed}/{total} cards processed") logger.info(f"Progress: {completed}/{total} cards processed") async def process_single_card(x: dict) -> None: """ Process one player entry under the semaphore: fetch card image(s), upload to S3 (offloading the synchronous boto3 call to a thread pool), and optionally patch the player record with the new S3 URL. Both the primary card (image) and the secondary card for two-way players (image2) are handled. Failures are appended to the shared errors list rather than re-raised so the overall batch continues. """ async with semaphore: player_id = x["player_id"] # --- primary card --- if "sombaseball" in x["image"]: async with results_lock: errors.append((x, f"Bad card url: {x['image']}")) await report_progress() return card_type = "pitching" if "pitching" in x["image"] else "batting" pd_card_url = ( f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}" ) if HTML_CARDS: card_url = f"{pd_card_url}&html=true" timeout = 2 else: card_url = pd_card_url timeout = 10 primary_ok = False try: if UPLOAD_TO_S3 and not HTML_CARDS: image_bytes = await fetch_card_image( session, card_url, timeout=timeout ) # boto3 is synchronous — offload to thread pool so the event # loop is not blocked during the S3 PUT s3_url = await loop.run_in_executor( None, functools.partial( upload_card_to_s3, image_bytes, player_id, card_type, release_date, cardset["id"], ), ) async with results_lock: uploads.append((player_id, card_type, s3_url)) if UPDATE_PLAYER_URLS: await db_patch( "players", object_id=player_id, params=[("image", s3_url)], ) async with results_lock: url_updates.append((player_id, card_type, s3_url)) logger.info(f"Updated player {player_id} image URL to S3") else: # Just validate card exists (old behavior) logger.info("calling the card url") await url_get(card_url, timeout=timeout) primary_ok = True except ConnectionError as e: logger.error(f"Connection error for player {player_id}: {e}") async with results_lock: errors.append((x, e)) except ValueError as e: async with results_lock: errors.append((x, e)) except Exception as e: logger.error(f"S3 upload/update failed for player {player_id}: {e}") async with results_lock: errors.append((x, f"S3 error: {e}")) if not primary_ok: await report_progress() return # --- secondary card (two-way players) --- if x["image2"] is not None: if "sombaseball" in x["image2"]: async with results_lock: errors.append((x, f"Bad card url: {x['image2']}")) await report_progress() return card_type2 = "pitching" if "pitching" in x["image2"] else "batting" pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}" if HTML_CARDS: card_url2 = f"{pd_card_url2}&html=true" else: card_url2 = pd_card_url2 try: if UPLOAD_TO_S3 and not HTML_CARDS: image_bytes2 = await fetch_card_image( session, card_url2, timeout=10 ) s3_url2 = await loop.run_in_executor( None, functools.partial( upload_card_to_s3, image_bytes2, player_id, card_type2, release_date, cardset["id"], ), ) async with results_lock: uploads.append((player_id, card_type2, s3_url2)) if UPDATE_PLAYER_URLS: await db_patch( "players", object_id=x["player_id"], params=[("image2", s3_url2)], ) async with results_lock: url_updates.append((player_id, card_type2, s3_url2)) logger.info(f"Updated player {player_id} image2 URL to S3") else: # Just validate card exists (old behavior) await url_get(card_url2, timeout=10) async with results_lock: successes.append(x) except ConnectionError as e: logger.error(f"Connection error for player {player_id} image2: {e}") async with results_lock: errors.append((x, e)) except ValueError as e: async with results_lock: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {player_id} image2: {e}" ) async with results_lock: errors.append((x, f"S3 error (image2): {e}")) else: async with results_lock: successes.append(x) await report_progress() # Create persistent aiohttp session shared across all concurrent tasks async with aiohttp.ClientSession() as session: tasks = [process_single_card(x) for x in filtered_players] await asyncio.gather(*tasks, return_exceptions=True) # Print summary print(f"\n{'=' * 60}") print("SUMMARY") print(f"{'=' * 60}") if len(errors) > 0: logger.error("All Errors:") for x in errors: logger.error(f"ID {x[0]['player_id']} {x[0]['p_name']} - Error: {x[1]}") if len(successes) > 0: logger.debug("All Successes:") for x in successes: logger.info(f"ID {x['player_id']} {x['p_name']}") p_run_time = datetime.datetime.now() - start_time print(f"\nErrors: {len(errors)}") print(f"Successes: {len(successes)}") if UPLOAD_TO_S3: print(f"S3 Uploads: {len(uploads)}") if len(uploads) > 0: print(f" First upload: {uploads[0][2]}") if UPDATE_PLAYER_URLS: print(f"URL Updates: {len(url_updates)}") print(f"\nTotal runtime: {p_run_time.total_seconds():.2f} seconds") print(f"{'=' * 60}") if __name__ == "__main__": asyncio.run(main(sys.argv[1:]))