""" Card image upload and management core logic. Business logic for uploading card images to AWS S3 and managing card URLs. """ import asyncio import datetime from typing import Optional, List, Tuple import urllib.parse # These imports are resolved at runtime when called from CLI # since the CLI adds the parent directory to sys.path from db_calls import db_get, db_patch, db_post, url_get from exceptions import logger # AWS Configuration DEFAULT_AWS_BUCKET = "paper-dynasty" DEFAULT_AWS_REGION = "us-east-1" def get_s3_base_url( bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION ) -> str: """Get the S3 base URL for a bucket.""" return f"https://{bucket}.s3.{region}.amazonaws.com" async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes: """ Fetch card image from URL and return raw bytes. Args: session: aiohttp ClientSession to use for the request card_url: URL to fetch the card from timeout: Request timeout in seconds Returns: Raw PNG image bytes """ import aiohttp async with session.get( card_url, timeout=aiohttp.ClientTimeout(total=timeout) ) as resp: if resp.status == 200: logger.info(f"Fetched card image from {card_url}") return await resp.read() else: error_text = await resp.text() logger.error(f"Failed to fetch card: {error_text}") raise ValueError(f"Card fetch error: {error_text}") def upload_card_to_s3( s3_client, image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int, bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION, ) -> str: """ Upload card image to S3 and return the S3 URL with cache-busting param. Args: s3_client: Boto3 S3 client image_data: Raw PNG image bytes player_id: Player ID card_type: 'batting' or 'pitching' release_date: Date string for cache busting (e.g., '2025-11-8') cardset_id: Cardset ID (will be zero-padded to 3 digits) bucket: S3 bucket name region: AWS region Returns: Full S3 URL with ?d= parameter """ # Format cardset_id with 3 digits and leading zeros cardset_str = f"{cardset_id:03d}" s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png" s3_base_url = get_s3_base_url(bucket, region) try: s3_client.put_object( Bucket=bucket, Key=s3_key, Body=image_data, ContentType="image/png", CacheControl="public, max-age=300", # 5 minute cache Metadata={ "player-id": str(player_id), "card-type": card_type, "upload-date": datetime.datetime.now().isoformat(), }, ) # Return URL with cache-busting parameter s3_url = f"{s3_base_url}/{s3_key}?d={release_date}" logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}") return s3_url except Exception as e: logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}") raise async def upload_cards_to_s3( cardset_name: str, start_id: Optional[int] = None, limit: Optional[int] = None, html_cards: bool = False, skip_batters: bool = False, skip_pitchers: bool = False, upload: bool = True, update_urls: bool = True, bucket: str = DEFAULT_AWS_BUCKET, region: str = DEFAULT_AWS_REGION, on_progress: callable = None, ) -> dict: """ Upload card images to S3 for a cardset. Args: cardset_name: Name of the cardset to process start_id: Player ID to start from (for resuming) limit: Maximum number of cards to process html_cards: Fetch HTML preview cards instead of PNG skip_batters: Skip batting cards skip_pitchers: Skip pitching cards upload: Actually upload to S3 update_urls: Update player URLs in database bucket: S3 bucket name region: AWS region on_progress: Callback function for progress updates Returns: Dict with counts of errors, successes, uploads, url_updates """ import aiohttp import boto3 # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" # PD API base URL for card generation PD_API_URL = "https://pd.manticorum.com/api" # Initialize S3 client if uploading s3_client = boto3.client("s3", region_name=region) if upload else None errors = [] successes = [] uploads = [] url_updates = [] cxn_error = False count = 0 max_count = limit or 9999 async with aiohttp.ClientSession() as session: for x in all_players: # Apply filters if "pitching" in x["image"] and skip_pitchers: continue if "batting" in x["image"] and skip_batters: continue if start_id is not None and start_id > x["player_id"]: continue if "sombaseball" in x["image"]: errors.append((x, f"Bad card url: {x['image']}")) continue if count >= max_count: break count += 1 if on_progress and count % 20 == 0: on_progress(count, x["p_name"]) # Determine card type from existing image URL card_type = "pitching" if "pitching" in x["image"] else "batting" # Generate card URL from PD API (forces fresh generation from database) pd_card_url = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}" if html_cards: card_url = f"{pd_card_url}&html=true" timeout = 2 else: card_url = pd_card_url timeout = 6 try: if upload and not html_cards: # Fetch card image bytes directly image_bytes = await fetch_card_image( session, card_url, timeout=timeout ) s3_url = upload_card_to_s3( s3_client, image_bytes, x["player_id"], card_type, release_date, cardset["id"], bucket, region, ) uploads.append((x["player_id"], card_type, s3_url)) # Update player record with new S3 URL if update_urls: await db_patch( "players", object_id=x["player_id"], params=[("image", s3_url)], ) url_updates.append((x["player_id"], card_type, s3_url)) logger.info(f"Updated player {x['player_id']} image URL to S3") else: # Just validate card exists logger.info(f"Validating card URL: {card_url}") await url_get(card_url, timeout=timeout) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {x['player_id']}: {e}" ) errors.append((x, f"S3 error: {e}")) continue # Handle image2 (dual-position players) if x["image2"] is not None: card_type2 = "pitching" if "pitching" in x["image2"] else "batting" pd_card_url2 = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type2}card?d={release_date}" if html_cards: card_url2 = f"{pd_card_url2}&html=true" else: card_url2 = pd_card_url2 if "sombaseball" in x["image2"]: errors.append((x, f"Bad card url: {x['image2']}")) else: try: if upload and not html_cards: image_bytes2 = await fetch_card_image( session, card_url2, timeout=6 ) s3_url2 = upload_card_to_s3( s3_client, image_bytes2, x["player_id"], card_type2, release_date, cardset["id"], bucket, region, ) uploads.append((x["player_id"], card_type2, s3_url2)) if update_urls: await db_patch( "players", object_id=x["player_id"], params=[("image2", s3_url2)], ) url_updates.append( (x["player_id"], card_type2, s3_url2) ) logger.info( f"Updated player {x['player_id']} image2 URL to S3" ) else: await url_get(card_url2, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {x['player_id']} image2: {e}" ) errors.append((x, f"S3 error (image2): {e}")) else: successes.append(x) return { "errors": errors, "successes": successes, "uploads": uploads, "url_updates": url_updates, "release_date": release_date, "cardset": cardset, } async def refresh_card_images( cardset_name: str, limit: Optional[int] = None, html_cards: bool = False, on_progress: callable = None, ) -> dict: """ Refresh card images for a cardset by triggering regeneration. Args: cardset_name: Name of the cardset to process limit: Maximum number of cards to process html_cards: Fetch HTML preview cards instead of PNG on_progress: Callback function for progress updates Returns: Dict with counts of errors, successes """ # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] CARD_BASE_URL = "https://pd.manticorum.com/api/v2/players" # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] errors = [] successes = [] cxn_error = False count = 0 max_count = limit or 9999 start_time = datetime.datetime.now() # First pass: Reset URLs for players with old sombaseball URLs for x in all_players: if "sombaseball" in x["image"]: if on_progress: on_progress(count, f"{x['p_name']} - fixing old URL") release_dir = f"{start_time.year}-{start_time.month}-{start_time.day}" if x["pos_1"] in ["SP", "RP", "CP", "P"]: image_url = ( f"{CARD_BASE_URL}/{x['player_id']}/pitchingcard" f"{urllib.parse.quote('?d=')}{release_dir}" ) else: image_url = ( f"{CARD_BASE_URL}/{x['player_id']}/battingcard" f"{urllib.parse.quote('?d=')}{release_dir}" ) await db_patch( "players", object_id=x["player_id"], params=[("image", image_url)] ) else: count += 1 if on_progress and count % 20 == 0: on_progress(count, f"{x['p_name']} - resetting") if count >= max_count: break try: await db_post(f"players/{x['player_id']}/image-reset") except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) # Second pass: Fetch images to trigger regeneration count = 0 for x in all_players: if count >= max_count: break if html_cards: card_url = f"{x['image']}&html=true" timeout = 2 else: card_url = x["image"] timeout = 6 try: logger.info(f"Fetching card URL: {card_url}") await url_get(card_url, timeout=timeout) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: # Handle image2 if x["image2"] is not None: if html_cards: card_url2 = f"{x['image2']}&html=true" else: card_url2 = x["image2"] if "sombaseball" in x["image2"]: errors.append((x, f"Bad card url: {x['image2']}")) else: try: await url_get(card_url2, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: successes.append(x) count += 1 return {"errors": errors, "successes": successes, "cardset": cardset} async def check_card_images( cardset_name: str, limit: Optional[int] = None, on_progress: callable = None ) -> dict: """ Check and validate card images without uploading. Args: cardset_name: Name of the cardset to check limit: Maximum number of cards to check on_progress: Callback function for progress updates Returns: Dict with counts of errors and successes """ # Look up cardset c_query = await db_get("cardsets", params=[("name", cardset_name)]) if not c_query or c_query["count"] == 0: raise ValueError(f'Cardset "{cardset_name}" not found') cardset = c_query["cardsets"][0] # Get all players p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" PD_API_URL = "https://pd.manticorum.com/api" errors = [] successes = [] cxn_error = False count = 0 max_count = limit or 9999 for x in all_players: if count >= max_count: break if "sombaseball" in x["image"]: errors.append((x, f"Bad card url: {x['image']}")) continue count += 1 if on_progress and count % 20 == 0: on_progress(count, x["p_name"]) card_type = "pitching" if "pitching" in x["image"] else "batting" card_url = ( f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}" ) try: logger.info(f"Checking card URL: {card_url}") await url_get(card_url, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) return { "errors": errors, "successes": successes, "cardset": cardset, "release_date": release_date, }