""" Card image upload and management core logic. Business logic for uploading card images to AWS S3 and managing card URLs. """ import asyncio import datetime from typing import Optional import urllib.parse # These imports are resolved at runtime when called from CLI # since the CLI adds the parent directory to sys.path from db_calls import db_get, db_patch, db_post, url_get from exceptions import logger # AWS Configuration DEFAULT_AWS_BUCKET = "paper-dynasty" DEFAULT_AWS_REGION = "us-east-1" def get_s3_base_url( bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION ) -> str: """Get the S3 base URL for a bucket.""" return f"https://{bucket}.s3.{region}.amazonaws.com" async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes: """ Fetch card image from URL and return raw bytes. Args: session: aiohttp ClientSession to use for the request card_url: URL to fetch the card from timeout: Request timeout in seconds Returns: Raw PNG image bytes """ import aiohttp async with session.get( card_url, timeout=aiohttp.ClientTimeout(total=timeout) ) as resp: if resp.status == 200: logger.info(f"Fetched card image from {card_url}") return await resp.read() else: error_text = await resp.text() logger.error(f"Failed to fetch card: {error_text}") raise ValueError(f"Card fetch error: {error_text}") def upload_card_to_s3( s3_client, image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int, bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION, ) -> str: """ Upload card image to S3 and return the S3 URL with cache-busting param. Args: s3_client: Boto3 S3 client image_data: Raw PNG image bytes player_id: Player ID card_type: 'batting' or 'pitching' release_date: Date string for cache busting (e.g., '2025-11-8') cardset_id: Cardset ID (will be zero-padded to 3 digits) bucket: S3 bucket name region: AWS region Returns: Full S3 URL with ?d= parameter """ # Format cardset_id with 3 digits and leading zeros cardset_str = f"{cardset_id:03d}" s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png" s3_base_url = get_s3_base_url(bucket, region) try: s3_client.put_object( Bucket=bucket, Key=s3_key, Body=image_data, ContentType="image/png", CacheControl="public, max-age=300", # 5 minute cache Metadata={ "player-id": str(player_id), "card-type": card_type, "upload-date": datetime.datetime.now().isoformat(), }, ) # Return URL with cache-busting parameter s3_url = f"{s3_base_url}/{s3_key}?d={release_date}" logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}") return s3_url except Exception as e: logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}") raise DEFAULT_PD_API_URL = "https://pd.manticorum.com/api" async def upload_cards_to_s3( cardset_name: str, start_id: Optional[int] = None, limit: Optional[int] = None, html_cards: bool = False, skip_batters: bool = False, skip_pitchers: bool = False, upload: bool = True, update_urls: bool = True, bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION, on_progress: callable = None, concurrency: int = 8, api_url: str = DEFAULT_PD_API_URL, ) -> dict: """ Upload card images to S3 for a cardset using concurrent async tasks. Cards are fetched and uploaded in parallel, bounded by ``concurrency`` semaphore slots. boto3 S3 calls (synchronous) are offloaded to a thread pool via ``loop.run_in_executor`` so they do not block the event loop. Individual card failures are collected and do NOT abort the batch; a summary is logged once all tasks complete. Args: cardset_name: Name of the cardset to process start_id: Player ID to start from (for resuming) limit: Maximum number of cards to process html_cards: Fetch HTML preview cards instead of PNG skip_batters: Skip batting cards skip_pitchers: Skip pitching cards upload: Actually upload to S3 update_urls: Update player URLs in database bucket: S3 bucket name region: AWS region on_progress: Callback function for progress updates concurrency: Number of parallel card-processing tasks (default 8) Returns: Dict with counts of errors, successes, uploads, url_updates """ import aiohttp import boto3 # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" # PD API base URL for card generation (configurable for local rendering) PD_API_URL = api_url logger.info(f"Using API URL: {PD_API_URL}") # Initialize S3 client if uploading (boto3 client is thread-safe for reads; # we will call it from a thread pool so we create it once here) s3_client = boto3.client("s3", region_name=region) if upload else None # Build the filtered list of players to process, respecting start_id / limit max_count = limit or 9999 filtered_players = [] for x in all_players: if len(filtered_players) >= max_count: break if "pitching" in x["image"] and skip_pitchers: continue if "batting" in x["image"] and skip_batters: continue if start_id is not None and start_id > x["player_id"]: continue filtered_players.append(x) total = len(filtered_players) logger.info(f"Processing {total} cards with concurrency={concurrency}") # Shared mutable state protected by a lock errors = [] successes = [] uploads = [] url_updates = [] completed = 0 progress_lock = asyncio.Lock() results_lock = asyncio.Lock() loop = asyncio.get_running_loop() semaphore = asyncio.Semaphore(concurrency) async def report_progress(): """Increment the completed counter and log every 20 completions.""" nonlocal completed async with progress_lock: completed += 1 if completed % 20 == 0 or completed == total: logger.info(f"Progress: {completed}/{total} cards processed") if on_progress: on_progress(completed, f"{completed}/{total}") async def process_single_card(x: dict) -> None: """ Process one player entry: fetch card image(s), upload to S3, and optionally patch the player record with the new S3 URL. Both the primary card (image) and the secondary card for two-way players (image2) are handled here. Errors are appended to the shared ``errors`` list rather than re-raised so the batch continues. """ async with semaphore: player_id = x["player_id"] # --- primary card --- if "sombaseball" in x["image"]: async with results_lock: errors.append((x, f"Bad card url: {x['image']}")) await report_progress() return card_type = "pitching" if "pitching" in x["image"] else "batting" pd_card_url = ( f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}" ) if html_cards: card_url = f"{pd_card_url}&html=true" timeout = 2 else: card_url = pd_card_url timeout = 10 primary_ok = False try: if upload and not html_cards: image_bytes = await fetch_card_image( session, card_url, timeout=timeout ) # boto3 is synchronous — offload to thread pool s3_url = await loop.run_in_executor( None, upload_card_to_s3, s3_client, image_bytes, player_id, card_type, release_date, cardset["id"], bucket, region, ) async with results_lock: uploads.append((player_id, card_type, s3_url)) if update_urls: await db_patch( "players", object_id=player_id, params=[("image", s3_url)], ) async with results_lock: url_updates.append((player_id, card_type, s3_url)) logger.info(f"Updated player {player_id} image URL to S3") else: logger.info(f"Validating card URL: {card_url}") await url_get(card_url, timeout=timeout) primary_ok = True except ConnectionError as e: logger.error(f"Connection error for player {player_id}: {e}") async with results_lock: errors.append((x, e)) except ValueError as e: async with results_lock: errors.append((x, e)) except Exception as e: logger.error(f"S3 upload/update failed for player {player_id}: {e}") async with results_lock: errors.append((x, f"S3 error: {e}")) if not primary_ok: await report_progress() return # --- secondary card (two-way players) --- if x["image2"] is not None: if "sombaseball" in x["image2"]: async with results_lock: errors.append((x, f"Bad card url: {x['image2']}")) await report_progress() return card_type2 = "pitching" if "pitching" in x["image2"] else "batting" pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}" card_url2 = f"{pd_card_url2}&html=true" if html_cards else pd_card_url2 try: if upload and not html_cards: image_bytes2 = await fetch_card_image( session, card_url2, timeout=10 ) s3_url2 = await loop.run_in_executor( None, upload_card_to_s3, s3_client, image_bytes2, player_id, card_type2, release_date, cardset["id"], bucket, region, ) async with results_lock: uploads.append((player_id, card_type2, s3_url2)) if update_urls: await db_patch( "players", object_id=player_id, params=[("image2", s3_url2)], ) async with results_lock: url_updates.append((player_id, card_type2, s3_url2)) logger.info(f"Updated player {player_id} image2 URL to S3") else: await url_get(card_url2, timeout=10) async with results_lock: successes.append(x) except ConnectionError as e: logger.error(f"Connection error for player {player_id} image2: {e}") async with results_lock: errors.append((x, e)) except ValueError as e: async with results_lock: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {player_id} image2: {e}" ) async with results_lock: errors.append((x, f"S3 error (image2): {e}")) else: async with results_lock: successes.append(x) await report_progress() async with aiohttp.ClientSession() as session: tasks = [process_single_card(x) for x in filtered_players] await asyncio.gather(*tasks, return_exceptions=True) # Log final summary success_count = len(successes) error_count = len(errors) logger.info( f"Upload complete: {success_count} succeeded, {error_count} failed " f"out of {total} cards" ) if error_count: for player, err in errors: logger.warning( f" Failed: player {player.get('player_id', '?')} " f"({player.get('p_name', '?')}): {err}" ) return { "errors": errors, "successes": successes, "uploads": uploads, "url_updates": url_updates, "release_date": release_date, "cardset": cardset, } async def refresh_card_images( cardset_name: str, limit: Optional[int] = None, html_cards: bool = False, on_progress: callable = None, api_url: str = DEFAULT_PD_API_URL, ) -> dict: """ Refresh card images for a cardset by triggering regeneration. Args: cardset_name: Name of the cardset to process limit: Maximum number of cards to process html_cards: Fetch HTML preview cards instead of PNG on_progress: Callback function for progress updates Returns: Dict with counts of errors, successes """ # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] CARD_BASE_URL = f"{api_url}/v2/players" # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] errors = [] successes = [] cxn_error = False count = 0 max_count = limit or 9999 start_time = datetime.datetime.now() # First pass: Reset URLs for players with old sombaseball URLs for x in all_players: if "sombaseball" in x["image"]: if on_progress: on_progress(count, f"{x['p_name']} - fixing old URL") release_dir = f"{start_time.year}-{start_time.month}-{start_time.day}" if x["pos_1"] in ["SP", "RP", "CP", "P"]: image_url = ( f"{CARD_BASE_URL}/{x['player_id']}/pitchingcard" f"{urllib.parse.quote('?d=')}{release_dir}" ) else: image_url = ( f"{CARD_BASE_URL}/{x['player_id']}/battingcard" f"{urllib.parse.quote('?d=')}{release_dir}" ) await db_patch( "players", object_id=x["player_id"], params=[("image", image_url)] ) else: count += 1 if on_progress and count % 20 == 0: on_progress(count, f"{x['p_name']} - resetting") if count >= max_count: break try: await db_post(f"players/{x['player_id']}/image-reset") except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) # Second pass: Fetch images to trigger regeneration count = 0 for x in all_players: if count >= max_count: break if html_cards: card_url = f"{x['image']}&html=true" timeout = 2 else: card_url = x["image"] timeout = 6 try: logger.info(f"Fetching card URL: {card_url}") await url_get(card_url, timeout=timeout) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: # Handle image2 if x["image2"] is not None: if html_cards: card_url2 = f"{x['image2']}&html=true" else: card_url2 = x["image2"] if "sombaseball" in x["image2"]: errors.append((x, f"Bad card url: {x['image2']}")) else: try: await url_get(card_url2, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: successes.append(x) count += 1 return {"errors": errors, "successes": successes, "cardset": cardset} async def check_card_images( cardset_name: str, limit: Optional[int] = None, on_progress: callable = None, api_url: str = DEFAULT_PD_API_URL, ) -> dict: """ Check and validate card images without uploading. Args: cardset_name: Name of the cardset to check limit: Maximum number of cards to check on_progress: Callback function for progress updates Returns: Dict with counts of errors and successes """ # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" PD_API_URL = api_url errors = [] successes = [] cxn_error = False count = 0 max_count = limit or 9999 for x in all_players: if count >= max_count: break if "sombaseball" in x["image"]: errors.append((x, f"Bad card url: {x['image']}")) continue count += 1 if on_progress and count % 20 == 0: on_progress(count, x["p_name"]) card_type = "pitching" if "pitching" in x["image"] else "batting" card_url = ( f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}" ) try: logger.info(f"Checking card URL: {card_url}") await url_get(card_url, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) return { "errors": errors, "successes": successes, "cardset": cardset, "release_date": release_date, }