Compare commits
1 Commits
main
...
ai/paper-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d603ccd48 |
@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import functools
|
||||
import sys
|
||||
import boto3
|
||||
|
||||
@ -14,6 +15,9 @@ HTML_CARDS = False # boolean to only check and not generate cards
|
||||
SKIP_ARMS = False
|
||||
SKIP_BATS = False
|
||||
|
||||
# Concurrency
|
||||
CONCURRENCY = 8 # number of parallel card-processing tasks
|
||||
|
||||
# AWS Configuration
|
||||
AWS_BUCKET_NAME = "paper-dynasty" # Change to your bucket name
|
||||
AWS_REGION = "us-east-1" # Change to your region
|
||||
@ -23,11 +27,11 @@ UPLOAD_TO_S3 = (
|
||||
)
|
||||
UPDATE_PLAYER_URLS = True # Set to False to skip player URL updates (testing) - STEP 6: Update player URLs
|
||||
|
||||
# Initialize S3 client
|
||||
# Initialize S3 client (module-level; boto3 client is thread-safe for concurrent reads)
|
||||
s3_client = boto3.client("s3", region_name=AWS_REGION) if UPLOAD_TO_S3 else None
|
||||
|
||||
|
||||
async def fetch_card_image(session, card_url: str, timeout: int = 6) -> bytes:
|
||||
async def fetch_card_image(session, card_url: str, timeout: int = 10) -> bytes:
|
||||
"""
|
||||
Fetch card image from URL and return raw bytes.
|
||||
|
||||
@ -134,165 +138,216 @@ async def main(args):
|
||||
# PD API base URL for card generation
|
||||
PD_API_URL = "https://pd.manticorum.com/api"
|
||||
|
||||
print(f"\nRelease date for cards: {release_date}")
|
||||
print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}")
|
||||
print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}")
|
||||
print(f"Concurrency: {CONCURRENCY} parallel tasks\n")
|
||||
|
||||
# Build filtered list respecting SKIP_ARMS, SKIP_BATS, START_ID, TEST_COUNT
|
||||
max_count = TEST_COUNT if TEST_COUNT is not None else 9999
|
||||
filtered_players = []
|
||||
for x in all_players:
|
||||
if len(filtered_players) >= max_count:
|
||||
break
|
||||
if "pitching" in x["image"] and SKIP_ARMS:
|
||||
continue
|
||||
if "batting" in x["image"] and SKIP_BATS:
|
||||
continue
|
||||
if START_ID is not None and START_ID > x["player_id"]:
|
||||
continue
|
||||
filtered_players.append(x)
|
||||
|
||||
total = len(filtered_players)
|
||||
logger.info(f"Processing {total} cards with concurrency={CONCURRENCY}")
|
||||
|
||||
# Shared mutable state protected by locks
|
||||
errors = []
|
||||
successes = []
|
||||
uploads = []
|
||||
url_updates = []
|
||||
cxn_error = False
|
||||
count = -1
|
||||
completed = 0
|
||||
progress_lock = asyncio.Lock()
|
||||
results_lock = asyncio.Lock()
|
||||
|
||||
start_time = datetime.datetime.now()
|
||||
loop = asyncio.get_event_loop()
|
||||
semaphore = asyncio.Semaphore(CONCURRENCY)
|
||||
|
||||
print(f"\nRelease date for cards: {release_date}")
|
||||
print(f"S3 Upload: {'ENABLED' if UPLOAD_TO_S3 else 'DISABLED'}")
|
||||
print(f"URL Update: {'ENABLED' if UPDATE_PLAYER_URLS else 'DISABLED'}\n")
|
||||
async def report_progress():
|
||||
"""Increment the completed counter and log/print every 20 completions."""
|
||||
nonlocal completed
|
||||
async with progress_lock:
|
||||
completed += 1
|
||||
if completed % 20 == 0 or completed == total:
|
||||
print(f"Progress: {completed}/{total} cards processed")
|
||||
logger.info(f"Progress: {completed}/{total} cards processed")
|
||||
|
||||
# Create persistent aiohttp session for all card fetches
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for x in all_players:
|
||||
if "pitching" in x["image"] and SKIP_ARMS:
|
||||
pass
|
||||
elif "batting" in x["image"] and SKIP_BATS:
|
||||
pass
|
||||
elif START_ID is not None and START_ID > x["player_id"]:
|
||||
pass
|
||||
elif "sombaseball" in x["image"]:
|
||||
errors.append((x, f"Bad card url: {x['image']}"))
|
||||
async def process_single_card(x: dict) -> None:
|
||||
"""
|
||||
Process one player entry under the semaphore: fetch card image(s), upload
|
||||
to S3 (offloading the synchronous boto3 call to a thread pool), and
|
||||
optionally patch the player record with the new S3 URL.
|
||||
|
||||
Both the primary card (image) and the secondary card for two-way players
|
||||
(image2) are handled. Failures are appended to the shared errors list
|
||||
rather than re-raised so the overall batch continues.
|
||||
"""
|
||||
async with semaphore:
|
||||
player_id = x["player_id"]
|
||||
|
||||
# --- primary card ---
|
||||
if "sombaseball" in x["image"]:
|
||||
async with results_lock:
|
||||
errors.append((x, f"Bad card url: {x['image']}"))
|
||||
await report_progress()
|
||||
return
|
||||
|
||||
card_type = "pitching" if "pitching" in x["image"] else "batting"
|
||||
pd_card_url = (
|
||||
f"{PD_API_URL}/v2/players/{player_id}/{card_type}card?d={release_date}"
|
||||
)
|
||||
|
||||
if HTML_CARDS:
|
||||
card_url = f"{pd_card_url}&html=true"
|
||||
timeout = 2
|
||||
else:
|
||||
count += 1
|
||||
if count % 20 == 0:
|
||||
print(f"Card #{count + 1} being pulled is {x['p_name']}...")
|
||||
elif TEST_COUNT is not None and TEST_COUNT < count:
|
||||
print("Done test run")
|
||||
break
|
||||
card_url = pd_card_url
|
||||
timeout = 10
|
||||
|
||||
# Determine card type from existing image URL
|
||||
card_type = "pitching" if "pitching" in x["image"] else "batting"
|
||||
|
||||
# Generate card URL from PD API (forces fresh generation from database)
|
||||
pd_card_url = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type}card?d={release_date}"
|
||||
|
||||
if HTML_CARDS:
|
||||
card_url = f"{pd_card_url}&html=true"
|
||||
timeout = 2
|
||||
else:
|
||||
card_url = pd_card_url
|
||||
timeout = 6
|
||||
|
||||
try:
|
||||
# Upload to S3 if enabled
|
||||
if UPLOAD_TO_S3 and not HTML_CARDS:
|
||||
# Fetch card image bytes directly
|
||||
image_bytes = await fetch_card_image(
|
||||
session, card_url, timeout=timeout
|
||||
)
|
||||
s3_url = upload_card_to_s3(
|
||||
primary_ok = False
|
||||
try:
|
||||
if UPLOAD_TO_S3 and not HTML_CARDS:
|
||||
image_bytes = await fetch_card_image(
|
||||
session, card_url, timeout=timeout
|
||||
)
|
||||
# boto3 is synchronous — offload to thread pool so the event
|
||||
# loop is not blocked during the S3 PUT
|
||||
s3_url = await loop.run_in_executor(
|
||||
None,
|
||||
functools.partial(
|
||||
upload_card_to_s3,
|
||||
image_bytes,
|
||||
x["player_id"],
|
||||
player_id,
|
||||
card_type,
|
||||
release_date,
|
||||
cardset["id"],
|
||||
)
|
||||
uploads.append((x["player_id"], card_type, s3_url))
|
||||
),
|
||||
)
|
||||
async with results_lock:
|
||||
uploads.append((player_id, card_type, s3_url))
|
||||
|
||||
if UPDATE_PLAYER_URLS:
|
||||
await db_patch(
|
||||
"players",
|
||||
object_id=player_id,
|
||||
params=[("image", s3_url)],
|
||||
)
|
||||
async with results_lock:
|
||||
url_updates.append((player_id, card_type, s3_url))
|
||||
logger.info(f"Updated player {player_id} image URL to S3")
|
||||
else:
|
||||
# Just validate card exists (old behavior)
|
||||
logger.info("calling the card url")
|
||||
await url_get(card_url, timeout=timeout)
|
||||
|
||||
primary_ok = True
|
||||
|
||||
except ConnectionError as e:
|
||||
logger.error(f"Connection error for player {player_id}: {e}")
|
||||
async with results_lock:
|
||||
errors.append((x, e))
|
||||
|
||||
except ValueError as e:
|
||||
async with results_lock:
|
||||
errors.append((x, e))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"S3 upload/update failed for player {player_id}: {e}")
|
||||
async with results_lock:
|
||||
errors.append((x, f"S3 error: {e}"))
|
||||
|
||||
if not primary_ok:
|
||||
await report_progress()
|
||||
return
|
||||
|
||||
# --- secondary card (two-way players) ---
|
||||
if x["image2"] is not None:
|
||||
if "sombaseball" in x["image2"]:
|
||||
async with results_lock:
|
||||
errors.append((x, f"Bad card url: {x['image2']}"))
|
||||
await report_progress()
|
||||
return
|
||||
|
||||
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
|
||||
pd_card_url2 = f"{PD_API_URL}/v2/players/{player_id}/{card_type2}card?d={release_date}"
|
||||
|
||||
if HTML_CARDS:
|
||||
card_url2 = f"{pd_card_url2}&html=true"
|
||||
else:
|
||||
card_url2 = pd_card_url2
|
||||
|
||||
try:
|
||||
if UPLOAD_TO_S3 and not HTML_CARDS:
|
||||
image_bytes2 = await fetch_card_image(
|
||||
session, card_url2, timeout=10
|
||||
)
|
||||
s3_url2 = await loop.run_in_executor(
|
||||
None,
|
||||
functools.partial(
|
||||
upload_card_to_s3,
|
||||
image_bytes2,
|
||||
player_id,
|
||||
card_type2,
|
||||
release_date,
|
||||
cardset["id"],
|
||||
),
|
||||
)
|
||||
async with results_lock:
|
||||
uploads.append((player_id, card_type2, s3_url2))
|
||||
|
||||
# Update player record with new S3 URL
|
||||
if UPDATE_PLAYER_URLS:
|
||||
await db_patch(
|
||||
"players",
|
||||
object_id=x["player_id"],
|
||||
params=[("image", s3_url)],
|
||||
)
|
||||
url_updates.append((x["player_id"], card_type, s3_url))
|
||||
logger.info(
|
||||
f"Updated player {x['player_id']} image URL to S3"
|
||||
params=[("image2", s3_url2)],
|
||||
)
|
||||
async with results_lock:
|
||||
url_updates.append((player_id, card_type2, s3_url2))
|
||||
logger.info(f"Updated player {player_id} image2 URL to S3")
|
||||
else:
|
||||
# Just validate card exists (old behavior)
|
||||
logger.info("calling the card url")
|
||||
resp = await url_get(card_url, timeout=timeout)
|
||||
await url_get(card_url2, timeout=10)
|
||||
|
||||
async with results_lock:
|
||||
successes.append(x)
|
||||
|
||||
except ConnectionError as e:
|
||||
if cxn_error:
|
||||
raise e
|
||||
cxn_error = True
|
||||
errors.append((x, e))
|
||||
logger.error(f"Connection error for player {player_id} image2: {e}")
|
||||
async with results_lock:
|
||||
errors.append((x, e))
|
||||
|
||||
except ValueError as e:
|
||||
errors.append((x, e))
|
||||
async with results_lock:
|
||||
errors.append((x, e))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"S3 upload/update failed for player {x['player_id']}: {e}"
|
||||
f"S3 upload/update failed for player {player_id} image2: {e}"
|
||||
)
|
||||
errors.append((x, f"S3 error: {e}"))
|
||||
continue
|
||||
async with results_lock:
|
||||
errors.append((x, f"S3 error (image2): {e}"))
|
||||
|
||||
# Handle image2 (dual-position players)
|
||||
if x["image2"] is not None:
|
||||
# Determine second card type
|
||||
card_type2 = "pitching" if "pitching" in x["image2"] else "batting"
|
||||
|
||||
# Generate card URL from PD API (forces fresh generation from database)
|
||||
pd_card_url2 = f"{PD_API_URL}/v2/players/{x['player_id']}/{card_type2}card?d={release_date}"
|
||||
|
||||
if HTML_CARDS:
|
||||
card_url2 = f"{pd_card_url2}&html=true"
|
||||
else:
|
||||
card_url2 = pd_card_url2
|
||||
|
||||
if "sombaseball" in x["image2"]:
|
||||
errors.append((x, f"Bad card url: {x['image2']}"))
|
||||
else:
|
||||
try:
|
||||
if UPLOAD_TO_S3 and not HTML_CARDS:
|
||||
# Fetch second card image bytes directly from PD API
|
||||
image_bytes2 = await fetch_card_image(
|
||||
session, card_url2, timeout=6
|
||||
)
|
||||
s3_url2 = upload_card_to_s3(
|
||||
image_bytes2,
|
||||
x["player_id"],
|
||||
card_type2,
|
||||
release_date,
|
||||
cardset["id"],
|
||||
)
|
||||
uploads.append((x["player_id"], card_type2, s3_url2))
|
||||
|
||||
# Update player record with new S3 URL for image2
|
||||
if UPDATE_PLAYER_URLS:
|
||||
await db_patch(
|
||||
"players",
|
||||
object_id=x["player_id"],
|
||||
params=[("image2", s3_url2)],
|
||||
)
|
||||
url_updates.append(
|
||||
(x["player_id"], card_type2, s3_url2)
|
||||
)
|
||||
logger.info(
|
||||
f"Updated player {x['player_id']} image2 URL to S3"
|
||||
)
|
||||
else:
|
||||
# Just validate card exists (old behavior)
|
||||
resp = await url_get(card_url2, timeout=6)
|
||||
|
||||
successes.append(x)
|
||||
|
||||
except ConnectionError as e:
|
||||
if cxn_error:
|
||||
raise e
|
||||
cxn_error = True
|
||||
errors.append((x, e))
|
||||
|
||||
except ValueError as e:
|
||||
errors.append((x, e))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"S3 upload/update failed for player {x['player_id']} image2: {e}"
|
||||
)
|
||||
errors.append((x, f"S3 error (image2): {e}"))
|
||||
else:
|
||||
else:
|
||||
async with results_lock:
|
||||
successes.append(x)
|
||||
|
||||
await report_progress()
|
||||
|
||||
# Create persistent aiohttp session shared across all concurrent tasks
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [process_single_card(x) for x in filtered_players]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Print summary
|
||||
print(f"\n{'=' * 60}")
|
||||
print("SUMMARY")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user