diff --git a/check_cards_and_upload.py b/check_cards_and_upload.py index 905e33c..2135c88 100644 --- a/check_cards_and_upload.py +++ b/check_cards_and_upload.py @@ -1,5 +1,6 @@ import asyncio import datetime +import functools import sys import boto3 @@ -14,6 +15,9 @@ HTML_CARDS = False # boolean to only check and not generate cards SKIP_ARMS = False SKIP_BATS = False +# Concurrency +CONCURRENCY = 8 # number of parallel card-processing tasks + # AWS Configuration AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name AWS_REGION = "us-east-1" # Change to your region @@ -23,11 +27,11 @@ UPLOAD_TO_S3 = ( ) UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs -# Initialize S3 client +# Initialize S3 client (module-level; boto3 client is thread-safe for concurrent reads) s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None -async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes: +async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes: """ Fetch card image from URL and return raw bytes. @@ -134,165 +138,216 @@ async def main(args): # PD API base URL for card generation PD_API_URL = "https://pd.manticorum.com/api" + print(f"\nRelease date for cards: {release_date}") + print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}") + print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}") + print(f"Concurrency: {CONCURRENCY} parallel tasks\n") + + # Build filtered list respecting SKIP_ARMS, SKIP_BATS, START_ID, TEST_COUNT + max_count = TEST_COUNT if TEST_COUNT is not None else 9999 + filtered_players = [] + for x in all_players: + if len(filtered_players) >= max_count: + break + if "pitching" in x["image"] and SKIP_ARMS: + continue + if "batting" in x["image"] and SKIP_BATS: + continue + if START_ID is not None and START_ID > x["player_id"]: + continue + filtered_players.append(x) + + total = len(filtered_players) + logger.info(f"Processing {total} cards with concurrency={CONCURRENCY}") + + # Shared mutable state protected by locks errors = [] successes = [] uploads = [] url_updates = [] - cxn_error = False - count = -1 + completed = 0 + progress_lock = asyncio.Lock() + results_lock = asyncio.Lock() + start_time = datetime.datetime.now() + loop = asyncio.get_event_loop() + semaphore = asyncio.Semaphore(CONCURRENCY) - print(f"\nRelease date for cards: {release_date}") - print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}") - print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}\n") + async def report_progress(): + """Increment the completed counter and log/print every 20 completions.""" + nonlocal completed + async with progress_lock: + completed += 1 + if completed % 20 == 0 or completed == total: + print(f"Progress: {completed}/{total} cards processed") + logger.info(f"Progress: {completed}/{total} cards processed") - # Create persistent aiohttp session for all card fetches - async with aiohttp.ClientSession() as session: - for x in all_players: - if "pitching" in x["image"] and SKIP_ARMS: - pass - elif "batting" in x["image"] and SKIP_BATS: - pass - elif START_ID is not None and START_ID > x["player_id"]: - pass - elif "sombaseball" in x["image"]: - errors.append((x, f"Bad card url: {x['image']}")) + async def process_single_card(x: dict) -> None: + """ + Process one player entry under the semaphore: fetch card image(s), upload + to S3 (offloading the synchronous boto3 call to a thread pool), and + optionally patch the player record with the new S3 URL. + + Both the primary card (image) and the secondary card for two-way players + (image2) are handled. Failures are appended to the shared errors list + rather than re-raised so the overall batch continues. + """ + async with semaphore: + player_id = x["player_id"] + + # --- primary card --- + if "sombaseball" in x["image"]: + async with results_lock: + errors.append((x, f"Bad card url: {x['image']}")) + await report_progress() + return + + card_type = "pitching" if "pitching" in x["image"] else "batting" + pd_card_url = ( + f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}" + ) + + if HTML_CARDS: + card_url = f"{pd_card_url}&html=true" + timeout = 2 else: - count += 1 - if count % 20 == 0: - print(f"Card #{count + 1} being pulled is {x['p_name']}...") - elif TEST_COUNT is not None and TEST_COUNT < count: - print("Done test run") - break + card_url = pd_card_url + timeout = 10 - # Determine card type from existing image URL - card_type = "pitching" if "pitching" in x["image"] else "batting" - - # Generate card URL from PD API (forces fresh generation from database) - pd_card_url = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}" - - if HTML_CARDS: - card_url = f"{pd_card_url}&html=true" - timeout = 2 - else: - card_url = pd_card_url - timeout = 6 - - try: - # Upload to S3 if enabled - if UPLOAD_TO_S3 and not HTML_CARDS: - # Fetch card image bytes directly - image_bytes = await fetch_card_image( - session, card_url, timeout=timeout - ) - s3_url = upload_card_to_s3( + primary_ok = False + try: + if UPLOAD_TO_S3 and not HTML_CARDS: + image_bytes = await fetch_card_image( + session, card_url, timeout=timeout + ) + # boto3 is synchronous — offload to thread pool so the event + # loop is not blocked during the S3 PUT + s3_url = await loop.run_in_executor( + None, + functools.partial( + upload_card_to_s3, image_bytes, - x["player_id"], + player_id, card_type, release_date, cardset["id"], - ) - uploads.append((x["player_id"], card_type, s3_url)) + ), + ) + async with results_lock: + uploads.append((player_id, card_type, s3_url)) + + if UPDATE_PLAYER_URLS: + await db_patch( + "players", + object_id=player_id, + params=[("image", s3_url)], + ) + async with results_lock: + url_updates.append((player_id, card_type, s3_url)) + logger.info(f"Updated player {player_id} image URL to S3") + else: + # Just validate card exists (old behavior) + logger.info("calling the card url") + await url_get(card_url, timeout=timeout) + + primary_ok = True + + except ConnectionError as e: + logger.error(f"Connection error for player {player_id}: {e}") + async with results_lock: + errors.append((x, e)) + + except ValueError as e: + async with results_lock: + errors.append((x, e)) + + except Exception as e: + logger.error(f"S3 upload/update failed for player {player_id}: {e}") + async with results_lock: + errors.append((x, f"S3 error: {e}")) + + if not primary_ok: + await report_progress() + return + + # --- secondary card (two-way players) --- + if x["image2"] is not None: + if "sombaseball" in x["image2"]: + async with results_lock: + errors.append((x, f"Bad card url: {x['image2']}")) + await report_progress() + return + + card_type2 = "pitching" if "pitching" in x["image2"] else "batting" + pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}" + + if HTML_CARDS: + card_url2 = f"{pd_card_url2}&html=true" + else: + card_url2 = pd_card_url2 + + try: + if UPLOAD_TO_S3 and not HTML_CARDS: + image_bytes2 = await fetch_card_image( + session, card_url2, timeout=10 + ) + s3_url2 = await loop.run_in_executor( + None, + functools.partial( + upload_card_to_s3, + image_bytes2, + player_id, + card_type2, + release_date, + cardset["id"], + ), + ) + async with results_lock: + uploads.append((player_id, card_type2, s3_url2)) - # Update player record with new S3 URL if UPDATE_PLAYER_URLS: await db_patch( "players", object_id=x["player_id"], - params=[("image", s3_url)], - ) - url_updates.append((x["player_id"], card_type, s3_url)) - logger.info( - f"Updated player {x['player_id']} image URL to S3" + params=[("image2", s3_url2)], ) + async with results_lock: + url_updates.append((player_id, card_type2, s3_url2)) + logger.info(f"Updated player {player_id} image2 URL to S3") else: # Just validate card exists (old behavior) - logger.info("calling the card url") - resp = await url_get(card_url, timeout=timeout) + await url_get(card_url2, timeout=10) + + async with results_lock: + successes.append(x) except ConnectionError as e: - if cxn_error: - raise e - cxn_error = True - errors.append((x, e)) + logger.error(f"Connection error for player {player_id} image2: {e}") + async with results_lock: + errors.append((x, e)) except ValueError as e: - errors.append((x, e)) + async with results_lock: + errors.append((x, e)) except Exception as e: logger.error( - f"S3 upload/update failed for player {x['player_id']}: {e}" + f"S3 upload/update failed for player {player_id} image2: {e}" ) - errors.append((x, f"S3 error: {e}")) - continue + async with results_lock: + errors.append((x, f"S3 error (image2): {e}")) - # Handle image2 (dual-position players) - if x["image2"] is not None: - # Determine second card type - card_type2 = "pitching" if "pitching" in x["image2"] else "batting" - - # Generate card URL from PD API (forces fresh generation from database) - pd_card_url2 = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type2}card?d={release_date}" - - if HTML_CARDS: - card_url2 = f"{pd_card_url2}&html=true" - else: - card_url2 = pd_card_url2 - - if "sombaseball" in x["image2"]: - errors.append((x, f"Bad card url: {x['image2']}")) - else: - try: - if UPLOAD_TO_S3 and not HTML_CARDS: - # Fetch second card image bytes directly from PD API - image_bytes2 = await fetch_card_image( - session, card_url2, timeout=6 - ) - s3_url2 = upload_card_to_s3( - image_bytes2, - x["player_id"], - card_type2, - release_date, - cardset["id"], - ) - uploads.append((x["player_id"], card_type2, s3_url2)) - - # Update player record with new S3 URL for image2 - if UPDATE_PLAYER_URLS: - await db_patch( - "players", - object_id=x["player_id"], - params=[("image2", s3_url2)], - ) - url_updates.append( - (x["player_id"], card_type2, s3_url2) - ) - logger.info( - f"Updated player {x['player_id']} image2 URL to S3" - ) - else: - # Just validate card exists (old behavior) - resp = await url_get(card_url2, timeout=6) - - successes.append(x) - - except ConnectionError as e: - if cxn_error: - raise e - cxn_error = True - errors.append((x, e)) - - except ValueError as e: - errors.append((x, e)) - - except Exception as e: - logger.error( - f"S3 upload/update failed for player {x['player_id']} image2: {e}" - ) - errors.append((x, f"S3 error (image2): {e}")) - else: + else: + async with results_lock: successes.append(x) + await report_progress() + + # Create persistent aiohttp session shared across all concurrent tasks + async with aiohttp.ClientSession() as session: + tasks = [process_single_card(x) for x in filtered_players] + await asyncio.gather(*tasks, return_exceptions=True) + # Print summary print(f"\n{'=' * 60}") print("SUMMARY")