import asyncio import datetime import sys import boto3 from io import BytesIO from creation_helpers import get_args from db_calls import db_get, db_patch, url_get from exceptions import logger # Configuration CARDSET_NAME = "2005 Live" START_ID = None # Integer to only start pulling cards at player_id START_ID TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls HTML_CARDS = False # boolean to only check and not generate cards SKIP_ARMS = False SKIP_BATS = False # AWS Configuration AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name AWS_REGION = "us-east-1" # Change to your region S3_BASE_URL = f"https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com" UPLOAD_TO_S3 = ( True # Set to False to skip S3 upload (testing) - STEP 6: Upload validated cards ) UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs # Initialize S3 client s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes: """ Fetch card image from URL and return raw bytes. Args: session: aiohttp ClientSession to use for the request card_url: URL to fetch the card from timeout: Request timeout in seconds Returns: Raw PNG image bytes """ import aiohttp async with session.get( card_url, timeout=aiohttp.ClientTimeout(total=timeout) ) as resp: if resp.status == 200: logger.info(f"Fetched card image from {card_url}") return await resp.read() else: error_text = await resp.text() logger.error(f"Failed to fetch card: {error_text}") raise ValueError(f"Card fetch error: {error_text}") def upload_card_to_s3( image_data: bytes, player_id: int, card_type: str, release_date: str, cardset_id: int, ) -> str: """ Upload card image to S3 and return the S3 URL with cache-busting param. Args: image_data: Raw PNG image bytes player_id: Player ID card_type: 'batting' or 'pitching' release_date: Date string for cache busting (e.g., '2025-11-8') cardset_id: Cardset ID (will be zero-padded to 3 digits) Returns: Full S3 URL with ?d= parameter """ # Format cardset_id with 3 digits and leading zeros cardset_str = f"{cardset_id:03d}" s3_key = f"cards/cardset-{cardset_str}/player-{player_id}/{card_type}card.png" try: s3_client.put_object( Bucket=AWS_BUCKET_NAME, Key=s3_key, Body=image_data, ContentType="image/png", CacheControl="public, max-age=300", # 5 minute cache Metadata={ "player-id": str(player_id), "card-type": card_type, "upload-date": datetime.datetime.now().isoformat(), }, ) # Return URL with cache-busting parameter s3_url = f"{S3_BASE_URL}/{s3_key}?d={release_date}" logger.info(f"Uploaded {card_type} card for player {player_id} to S3: {s3_url}") return s3_url except Exception as e: logger.error(f"Failed to upload {card_type} card for player {player_id}: {e}") raise async def main(args): import aiohttp print(f"Searching for cardset: {CARDSET_NAME}") c_query = await db_get("cardsets", params=[("name", CARDSET_NAME)]) if not c_query or c_query["count"] == 0: print(f"I do not see a cardset named {CARDSET_NAME}") return cardset = c_query["cardsets"][0] del c_query p_query = await db_get( "players", params=[ ("inc_dex", False), ("cardset_id", cardset["id"]), ("short_output", True), ], ) if not p_query or p_query["count"] == 0: raise ValueError("No players returned from Paper Dynasty API") all_players = p_query["players"] del p_query # Generate release date for cache busting (include timestamp for same-day updates) now = datetime.datetime.now() timestamp = int(now.timestamp()) release_date = f"{now.year}-{now.month}-{now.day}-{timestamp}" # PD API base URL for card generation PD_API_URL = "https://pd.manticorum.com/api" errors = [] successes = [] uploads = [] url_updates = [] cxn_error = False count = -1 start_time = datetime.datetime.now() print(f"\nRelease date for cards: {release_date}") print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}") print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}\n") # Create persistent aiohttp session for all card fetches async with aiohttp.ClientSession() as session: for x in all_players: if "pitching" in x["image"] and SKIP_ARMS: pass elif "batting" in x["image"] and SKIP_BATS: pass elif START_ID is not None and START_ID > x["player_id"]: pass elif "sombaseball" in x["image"]: errors.append((x, f"Bad card url: {x['image']}")) else: count += 1 if count % 20 == 0: print(f"Card #{count + 1} being pulled is {x['p_name']}...") elif TEST_COUNT is not None and TEST_COUNT < count: print(f"Done test run") break # Determine card type from existing image URL card_type = "pitching" if "pitching" in x["image"] else "batting" # Generate card URL from PD API (forces fresh generation from database) pd_card_url = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}" if HTML_CARDS: card_url = f"{pd_card_url}&html=true" timeout = 2 else: card_url = pd_card_url timeout = 6 try: # Upload to S3 if enabled if UPLOAD_TO_S3 and not HTML_CARDS: # Fetch card image bytes directly image_bytes = await fetch_card_image( session, card_url, timeout=timeout ) s3_url = upload_card_to_s3( image_bytes, x["player_id"], card_type, release_date, cardset["id"], ) uploads.append((x["player_id"], card_type, s3_url)) # Update player record with new S3 URL if UPDATE_PLAYER_URLS: await db_patch( "players", object_id=x["player_id"], params=[("image", s3_url)], ) url_updates.append((x["player_id"], card_type, s3_url)) logger.info( f"Updated player {x['player_id']} image URL to S3" ) else: # Just validate card exists (old behavior) logger.info(f"calling the card url") resp = await url_get(card_url, timeout=timeout) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {x['player_id']}: {e}" ) errors.append((x, f"S3 error: {e}")) continue # Handle image2 (dual-position players) if x["image2"] is not None: # Determine second card type card_type2 = "pitching" if "pitching" in x["image2"] else "batting" # Generate card URL from PD API (forces fresh generation from database) pd_card_url2 = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type2}card?d={release_date}" if HTML_CARDS: card_url2 = f"{pd_card_url2}&html=true" else: card_url2 = pd_card_url2 if "sombaseball" in x["image2"]: errors.append((x, f"Bad card url: {x['image2']}")) else: try: if UPLOAD_TO_S3 and not HTML_CARDS: # Fetch second card image bytes directly from PD API image_bytes2 = await fetch_card_image( session, card_url2, timeout=6 ) s3_url2 = upload_card_to_s3( image_bytes2, x["player_id"], card_type2, release_date, cardset["id"], ) uploads.append((x["player_id"], card_type2, s3_url2)) # Update player record with new S3 URL for image2 if UPDATE_PLAYER_URLS: await db_patch( "players", object_id=x["player_id"], params=[("image2", s3_url2)], ) url_updates.append( (x["player_id"], card_type2, s3_url2) ) logger.info( f"Updated player {x['player_id']} image2 URL to S3" ) else: # Just validate card exists (old behavior) resp = await url_get(card_url2, timeout=6) successes.append(x) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) except Exception as e: logger.error( f"S3 upload/update failed for player {x['player_id']} image2: {e}" ) errors.append((x, f"S3 error (image2): {e}")) else: successes.append(x) # Print summary print(f"\n{'=' * 60}") print(f"SUMMARY") print(f"{'=' * 60}") if len(errors) > 0: logger.error(f"All Errors:") for x in errors: logger.error(f"ID {x[0]['player_id']} {x[0]['p_name']} - Error: {x[1]}") if len(successes) > 0: logger.debug(f"All Successes:") for x in successes: logger.info(f"ID {x['player_id']} {x['p_name']}") p_run_time = datetime.datetime.now() - start_time print(f"\nErrors: {len(errors)}") print(f"Successes: {len(successes)}") if UPLOAD_TO_S3: print(f"S3 Uploads: {len(uploads)}") if len(uploads) > 0: print(f" First upload: {uploads[0][2]}") if UPDATE_PLAYER_URLS: print(f"URL Updates: {len(url_updates)}") print(f"\nTotal runtime: {p_run_time.total_seconds():.2f} seconds") print(f"{'=' * 60}") if __name__ == "__main__": asyncio.run(main(sys.argv[1:]))