import asyncio import datetime import sys import boto3 from io import BytesIO from creation_helpers import get_args from db_calls import db_get, db_patch, url_get from exceptions import logger # Configuration CARDSET_NAME = '2005 Live' START_ID = None # Integer to only start pulling cards at player_id START_ID TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls HTML_CARDS = False # boolean to only check and not generate cards SKIP_ARMS = False SKIP_BATS = False # AWS Configuration AWS_BUCKET_NAME = 'your-pd-cards-bucket' # Change to your bucket name AWS_REGION = 'us-east-1' # Change to your region S3_BASE_URL = f'https://{AWS_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com' UPLOAD_TO_S3 = True # Set to False to skip S3 upload (testing) UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) # Initialize S3 client s3_client = boto3.client('s3', region_name=AWS_REGION) if UPLOAD_TO_S3 else None def upload_card_to_s3(image_data: bytes, player_id: int, card_type: str, release_date: str) -> str: """ Upload card image to S3 and return the S3 URL with cache-busting param. Args: image_data: Raw PNG image bytes player_id: Player ID card_type: 'batting' or 'pitching' release_date: Date string for cache busting (e.g., '2025-11-8') Returns: Full S3 URL with ?d= parameter """ s3_key = f'cards/player-{player_id}/{card_type}card.png' try: s3_client.put_object( Bucket=AWS_BUCKET_NAME, Key=s3_key, Body=image_data, ContentType='image/png', CacheControl='public, max-age=300', # 5 minute cache Metadata={ 'player-id': str(player_id), 'card-type': card_type, 'upload-date': datetime.datetime.now().isoformat() } ) # Return URL with cache-busting parameter s3_url = f'{S3_BASE_URL}/{s3_key}?d={release_date}' logger.info(f'Uploaded {card_type} card for player {player_id} to S3: {s3_url}') return s3_url except Exception as e: logger.error(f'Failed to upload {card_type} card for player {player_id}: {e}') raise async def main(args): print(f'Searching for cardset: {CARDSET_NAME}') c_query = await db_get('cardsets', params=[('name', CARDSET_NAME)]) if c_query['count'] == 0: print(f'I do not see a cardset named {CARDSET_NAME}') return cardset = c_query['cardsets'][0] del c_query p_query = await db_get( 'players', params=[('inc_dex', False), ('cardset_id', cardset['id']), ('short_output', True)] ) if p_query['count'] == 0: raise ValueError(f'No players returned from Paper Dynasty API') all_players = p_query['players'] del p_query # Generate release date for cache busting now = datetime.datetime.now() release_date = f'{now.year}-{now.month}-{now.day}' errors = [] successes = [] uploads = [] url_updates = [] cxn_error = False count = -1 start_time = datetime.datetime.now() print(f'\nRelease date for cards: {release_date}') print(f'S3 Upload: {"ENABLED" if UPLOAD_TO_S3 else "DISABLED"}') print(f'URL Update: {"ENABLED" if UPDATE_PLAYER_URLS else "DISABLED"}\n') for x in all_players: if 'pitching' in x['image'] and SKIP_ARMS: pass elif 'batting' in x['image'] and SKIP_BATS: pass elif START_ID is not None and START_ID > x['player_id']: pass elif 'sombaseball' in x['image']: errors.append((x, f'Bad card url: {x["image"]}')) else: count += 1 if count % 20 == 0: print(f'Card #{count + 1} being pulled is {x["p_name"]}...') elif TEST_COUNT is not None and TEST_COUNT < count: print(f'Done test run') break if HTML_CARDS: card_url = f'{x["image"]}&html=true' timeout = 2 else: card_url = x['image'] timeout = 6 try: logger.info(f'calling the card url') resp = await url_get(card_url, timeout=timeout) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: # Determine card type from URL card_type = 'pitching' if 'pitching' in x['image'] else 'batting' # Upload to S3 if enabled if UPLOAD_TO_S3 and not HTML_CARDS: try: s3_url = upload_card_to_s3(resp, x['player_id'], card_type, release_date) uploads.append((x['player_id'], card_type, s3_url)) # Update player record with new S3 URL if UPDATE_PLAYER_URLS: await db_patch('players', object_id=x['player_id'], params=[ ('image', s3_url) ]) url_updates.append((x['player_id'], card_type, s3_url)) logger.info(f'Updated player {x["player_id"]} image URL to S3') except Exception as e: logger.error(f'S3 upload/update failed for player {x["player_id"]}: {e}') errors.append((x, f'S3 error: {e}')) continue # Handle image2 (dual-position players) if x['image2'] is not None: if HTML_CARDS: card_url = f'{x["image2"]}&html=true' else: card_url = x['image2'] if 'sombaseball' in x['image2']: errors.append((x, f'Bad card url: {x["image2"]}')) else: try: resp = await url_get(card_url, timeout=6) except ConnectionError as e: if cxn_error: raise e cxn_error = True errors.append((x, e)) except ValueError as e: errors.append((x, e)) else: # Upload second card to S3 card_type2 = 'pitching' if 'pitching' in x['image2'] else 'batting' if UPLOAD_TO_S3 and not HTML_CARDS: try: s3_url2 = upload_card_to_s3(resp, x['player_id'], card_type2, release_date) uploads.append((x['player_id'], card_type2, s3_url2)) # Update player record with new S3 URL for image2 if UPDATE_PLAYER_URLS: await db_patch('players', object_id=x['player_id'], params=[ ('image2', s3_url2) ]) url_updates.append((x['player_id'], card_type2, s3_url2)) logger.info(f'Updated player {x["player_id"]} image2 URL to S3') except Exception as e: logger.error(f'S3 upload/update failed for player {x["player_id"]} image2: {e}') errors.append((x, f'S3 error (image2): {e}')) continue successes.append(x) else: successes.append(x) # Print summary print(f'\n{"="*60}') print(f'SUMMARY') print(f'{"="*60}') if len(errors) > 0: logger.error(f'All Errors:') for x in errors: logger.error(f'ID {x[0]["player_id"]} {x[0]["p_name"]} - Error: {x[1]}') if len(successes) > 0: logger.debug(f'All Successes:') for x in successes: logger.info(f'ID {x["player_id"]} {x["p_name"]}') p_run_time = datetime.datetime.now() - start_time print(f'\nErrors: {len(errors)}') print(f'Successes: {len(successes)}') if UPLOAD_TO_S3: print(f'S3 Uploads: {len(uploads)}') if len(uploads) > 0: print(f' First upload: {uploads[0][2]}') if UPDATE_PLAYER_URLS: print(f'URL Updates: {len(url_updates)}') print(f'\nTotal runtime: {p_run_time.total_seconds():.2f} seconds') print(f'{"="*60}') if __name__ == '__main__': asyncio.run(main(sys.argv[1:]))