Fix check_cards_and_upload.py to regenerate cards from PD API

- Changed card URL generation to fetch from PD API endpoint
  (/v2/players/{id}/battingcard) instead of existing S3 URL
- This ensures database changes (like cardpositions) are reflected
  in regenerated card images
- Added fix_cardpositions.py utility for regenerating batter positions
  without re-running full retrosheet_data.py script

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2025-12-07 17:05:47 -06:00
parent 9d9c507e84
commit b10cfcfd09
2 changed files with 317 additions and 10 deletions

View File

@ -10,7 +10,7 @@ from exceptions import logger
# Configuration # Configuration
CARDSET_NAME = '2005 Promos' CARDSET_NAME = '2005 Live'
START_ID = None # Integer to only start pulling cards at player_id START_ID START_ID = None # Integer to only start pulling cards at player_id START_ID
TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls TEST_COUNT = 9999 # integer to stop after TEST_COUNT calls
HTML_CARDS = False # boolean to only check and not generate cards HTML_CARDS = False # boolean to only check and not generate cards
@ -119,6 +119,9 @@ async def main(args):
now = datetime.datetime.now() now = datetime.datetime.now()
release_date = f'{now.year}-{now.month}-{now.day}' release_date = f'{now.year}-{now.month}-{now.day}'
# PD API base URL for card generation
PD_API_URL = 'https://pd.manticorum.com/api'
errors = [] errors = []
successes = [] successes = []
uploads = [] uploads = []
@ -150,16 +153,20 @@ async def main(args):
print(f'Done test run') print(f'Done test run')
break break
# Determine card type from existing image URL
card_type = 'pitching' if 'pitching' in x['image'] else 'batting'
# Generate card URL from PD API (forces fresh generation from database)
pd_card_url = f'{PD_API_URL}/v2/players/{x["player_id"]}/{card_type}card?d={release_date}'
if HTML_CARDS: if HTML_CARDS:
card_url = f'{x["image"]}&html=true' card_url = f'{pd_card_url}&html=true'
timeout = 2 timeout = 2
else: else:
card_url = x['image'] card_url = pd_card_url
timeout = 6 timeout = 6
try: try:
# Determine card type from URL
card_type = 'pitching' if 'pitching' in x['image'] else 'batting'
# Upload to S3 if enabled # Upload to S3 if enabled
if UPLOAD_TO_S3 and not HTML_CARDS: if UPLOAD_TO_S3 and not HTML_CARDS:
@ -196,19 +203,23 @@ async def main(args):
# Handle image2 (dual-position players) # Handle image2 (dual-position players)
if x['image2'] is not None: if x['image2'] is not None:
# Determine second card type
card_type2 = 'pitching' if 'pitching' in x['image2'] else 'batting'
# Generate card URL from PD API (forces fresh generation from database)
pd_card_url2 = f'{PD_API_URL}/v2/players/{x["player_id"]}/{card_type2}card?d={release_date}'
if HTML_CARDS: if HTML_CARDS:
card_url2 = f'{x["image2"]}&html=true' card_url2 = f'{pd_card_url2}&html=true'
else: else:
card_url2 = x['image2'] card_url2 = pd_card_url2
if 'sombaseball' in x['image2']: if 'sombaseball' in x['image2']:
errors.append((x, f'Bad card url: {x["image2"]}')) errors.append((x, f'Bad card url: {x["image2"]}'))
else: else:
try: try:
card_type2 = 'pitching' if 'pitching' in x['image2'] else 'batting'
if UPLOAD_TO_S3 and not HTML_CARDS: if UPLOAD_TO_S3 and not HTML_CARDS:
# Fetch second card image bytes directly # Fetch second card image bytes directly from PD API
image_bytes2 = await fetch_card_image(session, card_url2, timeout=6) image_bytes2 = await fetch_card_image(session, card_url2, timeout=6)
s3_url2 = upload_card_to_s3(image_bytes2, x['player_id'], card_type2, release_date, cardset['id']) s3_url2 = upload_card_to_s3(image_bytes2, x['player_id'], card_type2, release_date, cardset['id'])
uploads.append((x['player_id'], card_type2, s3_url2)) uploads.append((x['player_id'], card_type2, s3_url2))

296
fix_cardpositions.py Normal file
View File

@ -0,0 +1,296 @@
"""
Fix script to regenerate cardpositions for cardset 27 batters.
This addresses the bug where batter positions were deleted but never recreated
due to script interruption on 2025-12-07.
"""
import asyncio
import pandas as pd
from db_calls import db_get, db_put, db_delete
from exceptions import logger
import defenders.calcs_defense as cde
# Configuration
CARDSET_ID = 27
DATA_INPUT_FILE_PATH = 'data-input/2005 Live Cardset/'
async def get_batters_from_api():
"""Fetch all batters (players with battingcards) from the API for cardset 27."""
print(f'Fetching batters from cardset {CARDSET_ID}...')
# Get all players in cardset
resp = await db_get('players', params=[
('cardset_id', CARDSET_ID),
('short_output', True)
])
if not resp or resp.get('count', 0) == 0:
print('No players found!')
return []
# Filter to batters only (those with battingcard URLs)
batters = [p for p in resp['players'] if 'batting' in p.get('image', '')]
print(f'Found {len(batters)} batters')
return batters
def calc_positions_for_batters(batters: list) -> pd.DataFrame:
"""Calculate position data for all batters using defense CSV files."""
print(f'Loading defense CSV files from {DATA_INPUT_FILE_PATH}...')
df_c = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_c.csv').set_index('key_bbref')
df_1b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_1b.csv').set_index('key_bbref')
df_2b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_2b.csv').set_index('key_bbref')
df_3b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_3b.csv').set_index('key_bbref')
df_ss = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_ss.csv').set_index('key_bbref')
df_lf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_lf.csv').set_index('key_bbref')
df_cf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_cf.csv').set_index('key_bbref')
df_rf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_rf.csv').set_index('key_bbref')
df_of = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_of.csv').set_index('key_bbref')
all_pos = []
season_pct = 1.0
for batter in batters:
key_bbref = batter.get('bbref_id')
player_id = batter.get('player_id')
player_name = batter.get('p_name', 'Unknown')
if not key_bbref:
print(f' Warning: No bbref_id for player {player_id} ({player_name})')
continue
no_data = True
# Process infield positions
for pos_df, position in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]:
if key_bbref in pos_df.index:
try:
if 'bis_runs_total' in pos_df.columns:
average_range = (int(pos_df.at[key_bbref, 'tz_runs_total']) +
int(pos_df.at[key_bbref, 'bis_runs_total']) +
min(
int(pos_df.at[key_bbref, 'tz_runs_total']),
int(pos_df.at[key_bbref, 'bis_runs_total'])
)) / 3
else:
average_range = pos_df.at[key_bbref, 'tz_runs_total']
if float(pos_df.at[key_bbref, 'Inn_def']) >= 10.0:
all_pos.append({
"player_id": player_id,
"position": position.upper(),
"innings": float(pos_df.at[key_bbref, 'Inn_def']),
"range": cde.get_if_range(
pos_code=position,
tz_runs=round(average_range),
r_dp=0,
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code=position,
errors=int(pos_df.at[key_bbref, 'E_def']),
chances=int(pos_df.at[key_bbref, 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logger.info(f'Infield position failed for {player_name}: {e}')
# Process outfield positions
of_arms = []
of_payloads = []
for pos_df, position in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]:
if key_bbref in pos_df.index:
try:
if 'bis_runs_total' in pos_df.columns:
average_range = (int(pos_df.at[key_bbref, 'tz_runs_total']) +
int(pos_df.at[key_bbref, 'bis_runs_total']) +
min(
int(pos_df.at[key_bbref, 'tz_runs_total']),
int(pos_df.at[key_bbref, 'bis_runs_total'])
)) / 3
else:
average_range = pos_df.at[key_bbref, 'tz_runs_total']
if float(pos_df.at[key_bbref, 'Inn_def']) >= 10.0:
of_payloads.append({
"player_id": player_id,
"position": position.upper(),
"innings": float(pos_df.at[key_bbref, 'Inn_def']),
"range": cde.get_of_range(
pos_code=position,
tz_runs=round(average_range),
season_pct=season_pct
)
})
of_run_rating = 'bis_runs_outfield' if 'bis_runs_outfield' in pos_df.columns else 'tz_runs_total'
of_arms.append(int(pos_df.at[key_bbref, of_run_rating]))
no_data = False
except Exception as e:
logger.info(f'Outfield position failed for {player_name}: {e}')
# Add arm/error to outfield positions
if key_bbref in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0:
try:
error_rating = cde.get_any_error(
pos_code='of',
errors=int(df_of.at[key_bbref, 'E_def']),
chances=int(df_of.at[key_bbref, 'chances']),
season_pct=season_pct
)
arm_rating = cde.arm_outfield(of_arms)
for f in of_payloads:
f['error'] = error_rating
f['arm'] = arm_rating
all_pos.append(f)
except Exception as e:
logger.info(f'Outfield arm/error failed for {player_name}: {e}')
# Process catcher
if key_bbref in df_c.index:
try:
run_rating = 'bis_runs_catcher_sb' if 'bis_runs_catcher_sb' in df_c else 'tz_runs_catcher'
if df_c.at[key_bbref, 'SB'] + df_c.at[key_bbref, 'CS'] == 0:
arm_rating = 3
else:
arm_rating = cde.arm_catcher(
cs_pct=df_c.at[key_bbref, 'caught_stealing_perc'],
raa=int(df_c.at[key_bbref, run_rating]),
season_pct=season_pct
)
if float(df_c.at[key_bbref, 'Inn_def']) >= 10.0:
all_pos.append({
"player_id": player_id,
"position": 'C',
"innings": float(df_c.at[key_bbref, 'Inn_def']),
"range": cde.range_catcher(
rs_value=int(df_c.at[key_bbref, 'tz_runs_catcher']),
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code='c',
errors=int(df_c.at[key_bbref, 'E_def']),
chances=int(df_c.at[key_bbref, 'chances']),
season_pct=season_pct
),
"arm": arm_rating,
"pb": cde.pb_catcher(
pb=int(df_c.at[key_bbref, 'PB']),
innings=int(float(df_c.at[key_bbref, 'Inn_def'])),
season_pct=season_pct
),
"overthrow": cde.ot_catcher(
errors=int(df_c.at[key_bbref, 'E_def']),
chances=int(df_c.at[key_bbref, 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logger.info(f'Catcher position failed for {player_name}: {e}')
# DH fallback if no defensive data
if no_data:
all_pos.append({
"player_id": player_id,
"position": 'DH',
"innings": 100 # Default innings for DH
})
print(f'Calculated {len(all_pos)} position records for {len(batters)} batters')
return pd.DataFrame(all_pos)
async def delete_batter_positions(batter_player_ids: list):
"""Delete existing batter cardpositions for cardset 27."""
print(f'Fetching existing cardpositions for cardset {CARDSET_ID}...')
existing = await db_get('cardpositions', params=[('cardset_id', CARDSET_ID)])
if not existing or existing.get('count', 0) == 0:
print('No existing positions found')
return 0
# Only delete positions for batters (not pitchers)
batter_ids_set = set(batter_player_ids)
positions_to_delete = [
p for p in existing['positions']
if p['player']['player_id'] in batter_ids_set
]
print(f'Found {len(positions_to_delete)} batter positions to delete (keeping pitcher positions)')
deleted = 0
for pos in positions_to_delete:
try:
await db_delete('cardpositions', object_id=pos['id'], timeout=1)
deleted += 1
if deleted % 50 == 0:
print(f' Deleted {deleted}/{len(positions_to_delete)} positions...')
except Exception as e:
print(f' Warning: Failed to delete position {pos["id"]}: {e}')
print(f'Deleted {deleted} batter positions')
return deleted
async def post_positions(pos_df: pd.DataFrame):
"""POST the new cardpositions to the API."""
all_pos = []
for _, row in pos_df.iterrows():
clean_row = row.dropna()
new_val = clean_row.to_dict()
new_val['player_id'] = int(row['player_id'])
all_pos.append(new_val)
print(f'POSTing {len(all_pos)} cardpositions...')
resp = await db_put('cardpositions', payload={'positions': all_pos}, timeout=10)
if resp is not None:
print(f'Successfully posted positions: {resp}')
return True
else:
print('Failed to post positions!')
return False
async def main():
print('='*60)
print('CARDPOSITIONS FIX SCRIPT')
print('='*60)
print(f'Target: Cardset {CARDSET_ID} (2005 Live)')
print()
# Step 1: Get batters from API
batters = await get_batters_from_api()
if not batters:
print('No batters to process!')
return
batter_player_ids = [b['player_id'] for b in batters]
# Step 2: Delete existing batter positions
await delete_batter_positions(batter_player_ids)
# Step 3: Calculate new positions
pos_df = calc_positions_for_batters(batters)
# Step 4: Post new positions
success = await post_positions(pos_df)
print()
print('='*60)
if success:
print('FIX COMPLETE - Batter positions have been regenerated!')
else:
print('FIX FAILED - Check logs for errors')
print('='*60)
if __name__ == '__main__':
asyncio.run(main())