paper-dynasty-card-creation/batters/creation.py
Cal Corum cb471d8057 CLAUDE: Extract rarity cost adjustment logic into data-driven function
This commit eliminates 150+ lines of duplicated, error-prone nested if/elif
logic by extracting rarity cost calculations into a lookup table and function.

## Changes Made

1. **Add RARITY_COST_ADJUSTMENTS lookup table** (creation_helpers.py)
   - Maps (old_rarity, new_rarity) → (cost_adjustment, minimum_cost)
   - Covers all 30 possible rarity transitions
   - Self-documenting with comments for each rarity tier
   - Single source of truth for all cost adjustments

2. **Add calculate_rarity_cost_adjustment() function** (creation_helpers.py)
   - Takes old_rarity, new_rarity, old_cost
   - Returns new cost with adjustments and minimums applied
   - Includes comprehensive docstring with examples
   - Handles edge cases (same rarity, undefined transitions)
   - Logs warnings for undefined transitions

3. **Update batters/creation.py**
   - Import calculate_rarity_cost_adjustment
   - Replace 75-line nested if/elif block with 7-line function call
   - Identical behavior, much cleaner code

4. **Update pitchers/creation.py**
   - Import calculate_rarity_cost_adjustment
   - Replace 75-line nested if/elif block with 7-line function call
   - Eliminates duplication between batters and pitchers

5. **Add comprehensive tests** (tests/test_rarity_cost_adjustments.py)
   - 22 tests covering all scenarios
   - Tests individual transitions (Diamond→Gold, Common→Bronze, etc.)
   - Tests all upward and downward transitions
   - Tests minimum cost enforcement
   - Tests edge cases (zero cost, very high cost, negative cost)
   - Tests symmetry (up then down returns close to original)

## Impact

### Lines Eliminated
- **Batters:** 75 lines → 7 lines (89% reduction)
- **Pitchers:** 75 lines → 7 lines (89% reduction)
- **Total:** 150 lines of nested logic eliminated

### Benefits
 Eliminates 150+ lines of duplicated code
 Data-driven approach makes adjustments clear and modifiable
 Single source of truth prevents inconsistencies
 Independently testable business logic
 22 comprehensive tests ensure correctness
 Easy to add new rarity tiers or modify costs
 Reduced risk of typos in magic numbers

## Test Results
 22/22 new tests pass
 All existing tests still pass
 100% backward compatible - identical behavior

## Files Modified
- creation_helpers.py: +104 lines (table + function + docs)
- batters/creation.py: -68 lines (replaced nested logic)
- pitchers/creation.py: -68 lines (replaced nested logic)
- tests/test_rarity_cost_adjustments.py: +174 lines (new tests)

**Net change:** 150+ lines of complex logic replaced with simple,
tested, data-driven approach.

Part of ongoing refactoring to reduce code fragility.
2025-10-31 22:49:35 -05:00

446 lines
19 KiB
Python

import datetime
import urllib.parse
import pandas as pd
import numpy as np
from creation_helpers import (
get_all_pybaseball_ids, sanitize_name, CLUB_LIST, FRANCHISE_LIST, pd_players_df,
mlbteam_and_franchise, get_hand, NEW_PLAYER_COST, RARITY_BASE_COSTS,
should_update_player_description, calculate_rarity_cost_adjustment
)
from db_calls import db_post, db_get, db_put, db_patch
from . import calcs_batter as cba
from defenders import calcs_defense as cde
from exceptions import logger
from rarity_thresholds import get_batter_thresholds
async def pd_battingcards_df(cardset_id: int):
bc_query = await db_get('battingcards', params=[('cardset_id', cardset_id), ('short_output', True)])
if bc_query['count'] == 0:
raise ValueError(f'No batting cards returned from Paper Dynasty API')
return pd.DataFrame(bc_query['cards']).rename(columns={'id': 'battingcard_id', 'player': 'player_id'})
async def pd_battingcardratings_df(cardset_id: int, season: int):
vl_query = await db_get(
'battingcardratings', params=[
('cardset_id', cardset_id), ('vs_hand', 'L'), ('short_output', True), ('team_id', 31),
('ts', 's37136685556r6135248705')])
vr_query = await db_get(
'battingcardratings', params=[
('cardset_id', cardset_id), ('vs_hand', 'R'), ('short_output', True), ('team_id', 31),
('ts', 's37136685556r6135248705')])
if 0 in [vl_query['count'], vr_query['count']]:
raise ValueError(f'No batting card ratings returned from Paper Dynasty API')
vl = pd.DataFrame(vl_query['ratings'])
vr = pd.DataFrame(vr_query['ratings'])
ratings = (pd.merge(vl, vr, on='battingcard', suffixes=('_vL', '_vR'))
.rename(columns={'battingcard': 'battingcard_id'}))
def get_total_ops(df_data):
ops_vl = df_data['obp_vL'] + df_data['slg_vL']
ops_vr = df_data['obp_vR'] + df_data['slg_vR']
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
ratings['total_OPS'] = ratings.apply(get_total_ops, axis=1)
# Get season-appropriate rarity thresholds
thresholds = get_batter_thresholds(season)
def new_rarity_id(df_data):
return thresholds.get_rarity(df_data['total_OPS'])
ratings['new_rarity_id'] = ratings.apply(new_rarity_id, axis=1)
return ratings
# return pd.DataFrame(bcr_query['ratings']).rename(columns={'battingcard': 'battingcard_id'})
def get_batting_stats(
file_path: str = None, start_date: datetime.datetime = None, end_date: datetime.datetime = None,
ignore_limits: bool = False):
min_vl = 20 if not ignore_limits else 1
min_vr = 40 if not ignore_limits else 1
if file_path is not None:
vl_basic = pd.read_csv(f'{file_path}vlhp-basic.csv').query(f'PA >= {min_vl}')
vr_basic = pd.read_csv(f'{file_path}vrhp-basic.csv').query(f'PA >= {min_vr}')
total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR'))
vl_rate = pd.read_csv(f'{file_path}vlhp-rate.csv').query(f'PA >= {min_vl}')
vr_rate = pd.read_csv(f'{file_path}vrhp-rate.csv').query(f'PA >= {min_vr}')
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR'))
return pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate'))
else:
raise LookupError(f'Date-based stat pulls not implemented, yet. Please provide batting csv files.')
def match_player_lines(all_batting: pd.DataFrame, all_players: pd.DataFrame, is_custom: bool = False):
def get_pids(df_data):
return get_all_pybaseball_ids([df_data["playerId"]], 'fangraphs', is_custom, df_data['Name_vL'])
print(f'Now pulling mlbam player IDs...')
ids_and_names = all_batting.apply(get_pids, axis=1)
player_data = (ids_and_names
.merge(all_players, how='left', left_on='key_bbref', right_on='bbref_id')
.query('key_mlbam == key_mlbam')
.set_index('key_bbref', drop=False))
print(f'Matched mlbam to pd players.')
final_batting = pd.merge(
player_data, all_batting, left_on='key_fangraphs', right_on='playerId', sort=False
).set_index('key_bbref', drop=False)
return final_batting
async def create_new_players(
final_batting: pd.DataFrame, cardset: dict, card_base_url: str, release_dir: str, player_desc: str):
new_players = []
new_mlbplayers = {}
def create_batters(df_data):
f_name = sanitize_name(df_data["name_first"]).title()
l_name = sanitize_name(df_data["name_last"]).title()
new_players.append({
'p_name': f'{f_name} {l_name}',
'cost': NEW_PLAYER_COST,
'image': f'{card_base_url}/{df_data["player_id"]}/battingcard'
f'{urllib.parse.quote("?d=")}{release_dir}',
'mlbclub': CLUB_LIST[df_data['Tm_vL']],
'franchise': FRANCHISE_LIST[df_data['Tm_vL']],
'cardset_id': cardset['id'],
'set_num': int(float(df_data['key_fangraphs'])),
'rarity_id': 99,
'pos_1': 'DH',
'description': f'{player_desc}',
'bbref_id': df_data.name,
'fangr_id': int(float(df_data['key_fangraphs'])),
'strat_code': int(float(df_data['key_mlbam']))
})
new_mlbplayers[df_data.name] = {
'first_name': sanitize_name(df_data["name_first"]).title(),
'last_name': sanitize_name(df_data["name_last"]).title(),
'key_mlbam': int(float(df_data['key_mlbam'])),
'key_fangraphs': int(float(df_data['key_fangraphs'])),
'key_bbref': df_data['key_bbref'],
'key_retro': df_data['key_retro']
}
final_batting[final_batting['player_id'].isnull()].apply(create_batters, axis=1)
print(f'Creating {len(new_players)} new players...')
for x in new_players:
mlb_query = await db_get('mlbplayers', params=[('key_bbref', x['bbref_id'])])
if mlb_query['count'] > 0:
x['mlbplayer_id'] = mlb_query['players'][0]['id']
else:
new_mlb = await db_post('mlbplayers/one', payload=new_mlbplayers[x['bbref_id']])
x['mlbplayer_id'] = new_mlb['id']
this_player = await db_post('players', payload=x)
final_batting.at[x['bbref_id'], 'player_id'] = this_player['player_id']
final_batting.at[x['bbref_id'], 'p_name'] = this_player['p_name']
print(f'Player IDs linked to batting stats.\n{len(final_batting.values)} players remain\n')
return len(new_players)
def get_run_stat_df(final_batting: pd.DataFrame, input_path: str):
print(f'Reading baserunning stats...')
run_data = (pd.read_csv(f'{input_path}running.csv')
.set_index('Name-additional'))
run_data['bat_hand'] = run_data.apply(get_hand, axis=1)
offense_stats = final_batting.join(run_data)
print(f'Stats are tallied\n{len(offense_stats.values)} players remain\n\nCollecting defensive data from bbref...')
return offense_stats
async def calculate_batting_cards(offense_stats: pd.DataFrame, cardset: dict, season_pct: float, post_batters: bool):
batting_cards = []
def create_batting_card(df_data):
logger.info(df_data['player_id'])
try:
s_data = cba.stealing(
chances=int(df_data['SBO']),
sb2s=int(df_data['SB2']),
cs2s=int(df_data['CS2']),
sb3s=int(df_data['SB3']),
cs3s=int(df_data['CS3']),
season_pct=season_pct
)
except ValueError as e:
print(f'Stealing error for *{df_data.name}*: {e}')
logger.error(e)
s_data = [0, 0, 0, 0]
batting_cards.append({
"player_id": df_data['player_id'],
"key_bbref": df_data.name,
"key_fangraphs": int(float(df_data['key_fangraphs'])),
"key_mlbam": df_data['key_mlbam'],
"key_retro": df_data['key_retro'],
"name_first": df_data["name_first"].title(),
"name_last": df_data["name_last"].title(),
"steal_low": s_data[0],
"steal_high": s_data[1],
"steal_auto": s_data[2],
"steal_jump": s_data[3],
"hit_and_run": cba.hit_and_run(
df_data['AB_vL'], df_data['AB_vR'], df_data['H_vL'], df_data['H_vR'],
df_data['HR_vL'], df_data['HR_vR'], df_data['SO_vL'], df_data['SO_vR']
),
"running": cba.running(df_data['XBT%']),
"hand": df_data['bat_hand']
})
print(f'Calculating batting cards...')
offense_stats.apply(create_batting_card, axis=1)
print(f'Cards are complete.\n\nPosting cards now...')
if post_batters:
resp = await db_put('battingcards', payload={'cards': batting_cards}, timeout=30)
print(f'Response: {resp}\n\nMatching batting card database IDs to player stats...')
offense_stats = pd.merge(
offense_stats, await pd_battingcards_df(cardset['id']), on='player_id').set_index('key_bbref', drop=False)
return offense_stats
async def calculate_batting_ratings(offense_stats: pd.DataFrame, to_post: bool):
batting_ratings = []
def create_batting_card_ratings(df_data):
logger.debug(f'Calculating card ratings for {df_data.name}')
batting_ratings.extend(cba.get_batter_ratings(df_data))
print(f'Calculating card ratings...')
offense_stats.apply(create_batting_card_ratings, axis=1)
print(f'Ratings are complete\n\nPosting ratings now...')
if to_post:
resp = await db_put('battingcardratings', payload={'ratings': batting_ratings}, timeout=30)
print(f'Response: {resp}\n\nPulling fresh PD player data...')
return len(batting_ratings)
async def post_player_updates(
cardset: dict, card_base_url: str, release_dir: str, player_desc: str, is_liveseries: bool, to_post: bool,
is_custom: bool, season: int):
"""
Update player metadata after card creation (costs, rarities, descriptions, teams, images).
Process:
1. Pull fresh pd_players and batting cards/ratings
2. Calculate total OPS and assign rarity_id
3. For NEW players (cost == NEW_PLAYER_COST):
- Set cost = RARITY_BASE_COSTS[rarity] * total_OPS / average_ops[rarity]
- Set rarity_id
4. For existing players:
- Update costs if rarity changed
- Update descriptions (promo cardsets: only new cards; regular: all except PotM)
- Update team/franchise if live series
- Update image URLs
Returns:
Number of player updates sent to database
"""
p_data = await pd_players_df(cardset['id'])
p_data.set_index('player_id', drop=False)
# Use LEFT JOIN to keep all batters, even those without ratings
batting_cards = await pd_battingcards_df(cardset['id'])
batting_ratings = await pd_battingcardratings_df(cardset['id'], season)
total_ratings = pd.merge(
batting_cards,
batting_ratings,
on='battingcard_id',
how='left' # Keep all batting cards
)
# Assign default rarity (Common/5) for players without ratings
if 'new_rarity_id' not in total_ratings.columns:
total_ratings['new_rarity_id'] = 5
total_ratings['new_rarity_id'] = (
total_ratings['new_rarity_id']
.replace(r'^\s*$', np.nan, regex=True)
.fillna(5)
.astype('Int64') # optional: keep it as nullable integer type
)
# Assign default total_OPS for players without ratings (Common rarity default)
if 'total_OPS' in total_ratings.columns:
missing_ops = total_ratings[total_ratings['total_OPS'].isna()]
if not missing_ops.empty:
logger.warning(f"batters.creation.post_player_updates - {len(missing_ops)} players missing total_OPS, assigning default 0.612: {missing_ops[['player_id', 'battingcard_id']].to_dict('records')}")
total_ratings['total_OPS'] = total_ratings['total_OPS'].fillna(0.612)
player_data = pd.merge(
p_data,
total_ratings,
on='player_id'
).set_index('player_id', drop=False)
del total_ratings
def get_pids(df_data):
if is_custom:
return get_all_pybaseball_ids([df_data["fangr_id"]], 'fangraphs', is_custom)
else:
return get_all_pybaseball_ids([df_data["bbref_id"]], 'bbref')
ids_and_names = player_data.apply(get_pids, axis=1)
player_data = (ids_and_names
.merge(player_data, how='left', left_on='key_bbref', right_on='bbref_id')
.query('key_mlbam == key_mlbam')
.set_index('key_bbref', drop=False))
player_updates = {} # { <player_id> : [ (param pairs) ] }
rarity_group = player_data.query('rarity == new_rarity_id').groupby('rarity')
average_ops = rarity_group['total_OPS'].mean().to_dict()
if 1 not in average_ops:
average_ops[1] = 1.066
if 2 not in average_ops:
average_ops[2] = 0.938
if 3 not in average_ops:
average_ops[3] = 0.844
if 4 not in average_ops:
average_ops[4] = 0.752
if 5 not in average_ops:
average_ops[5] = 0.612
def get_player_updates(df_data):
params = []
# Check if description should be updated using extracted business logic
if should_update_player_description(
cardset_name=cardset['name'],
player_cost=df_data['cost'],
current_description=df_data['description'],
new_description=player_desc
):
params = [('description', f'{player_desc}')]
logger.debug(
f"batters.creation.post_player_updates - Setting description for player_id={df_data['player_id']}: "
f"'{df_data['description']}' -> '{player_desc}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
else:
logger.debug(
f"batters.creation.post_player_updates - Skipping description update for player_id={df_data['player_id']}: "
f"current='{df_data['description']}', proposed='{player_desc}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
if is_liveseries:
team_data = mlbteam_and_franchise(int(float(df_data['key_mlbam'])))
if df_data['mlbclub'] != team_data['mlbclub'] and team_data['mlbclub'] is not None:
params.extend([('mlbclub', team_data['mlbclub'])])
if df_data['franchise'] != team_data['franchise'] and team_data['franchise'] is not None:
params.extend([('franchise', team_data['franchise'])])
# if release_directory not in df_data['image']:
params.extend([('image', f'{card_base_url}/{df_data["player_id"]}/battingcard'
f'{urllib.parse.quote("?d=")}{release_dir}')])
if df_data['cost'] == NEW_PLAYER_COST:
params.extend([
('cost',
round(RARITY_BASE_COSTS[df_data['new_rarity_id']] * df_data['total_OPS'] /
average_ops[df_data['new_rarity_id']])),
('rarity_id', df_data['new_rarity_id'])
])
elif df_data['rarity'] != df_data['new_rarity_id']:
# Calculate adjusted cost for rarity change using lookup table
new_cost = calculate_rarity_cost_adjustment(
old_rarity=df_data['rarity'],
new_rarity=df_data['new_rarity_id'],
old_cost=df_data['cost']
)
params.extend([('cost', new_cost), ('rarity_id', df_data['new_rarity_id'])])
if len(params) > 0:
if df_data.player_id not in player_updates.keys():
player_updates[df_data.player_id] = params
else:
player_updates[df_data.player_id].extend(params)
player_data.apply(get_player_updates, axis=1)
print(f'Sending {len(player_updates)} player updates to PD database...')
if to_post:
for x in player_updates:
await db_patch('players', object_id=x, params=player_updates[x])
return len(player_updates)
async def run_batter_fielding(season: int, offense_stats: pd.DataFrame, season_pct: float, post_batters: bool):
print(f'Pulling catcher defense...')
df_c = cde.get_bbref_fielding_df('c', season)
print(f'Pulling first base defense...')
df_1b = cde.get_bbref_fielding_df('1b', season)
print(f'Pulling second base defense...')
df_2b = cde.get_bbref_fielding_df('2b', season)
print(f'Pulling third base defense...')
df_3b = cde.get_bbref_fielding_df('3b', season)
print(f'Pulling short stop defense...')
df_ss = cde.get_bbref_fielding_df('ss', season)
print(f'Pulling left field defense...')
df_lf = cde.get_bbref_fielding_df('lf', season)
print(f'Pulling center field defense...')
df_cf = cde.get_bbref_fielding_df('cf', season)
print(f'Pulling right field defense...')
df_rf = cde.get_bbref_fielding_df('rf', season)
print(f'Pulling outfield defense...')
df_of = cde.get_bbref_fielding_df('of', season)
print(f'Positions data is retrieved')
await cde.create_positions(
offense_stats, season_pct, post_batters, df_c, df_1b, df_2b, df_3b, df_ss, df_lf, df_cf, df_rf, df_of
)
async def run_batters(
cardset: dict, input_path: str, post_players: bool, card_base_url: str, release_directory: str,
player_description: str, season_pct: float, post_batters: bool, pull_fielding: bool, season: int,
is_liveseries: bool, ignore_limits: bool, is_custom: bool = False):
print(f'Pulling PD player IDs...')
pd_players = await pd_players_df(cardset['id'])
print('Reading batting stats...')
all_stats = get_batting_stats(file_path=input_path, ignore_limits=ignore_limits)
print(f'Processed {len(all_stats.values)} batters\n')
bat_step1 = match_player_lines(all_stats, pd_players, is_custom)
if post_players:
new_batters = await create_new_players(
bat_step1, cardset, card_base_url, release_directory, player_description
)
else:
new_batters = 0
# Custom Cardsets
if cardset['id'] in [16]:
offense_stats = pd.merge(
bat_step1, await pd_battingcards_df(cardset['id']), on='player_id').set_index('key_bbref', drop=False)
else:
bat_step2 = get_run_stat_df(bat_step1, input_path)
offense_stats = await calculate_batting_cards(bat_step2, cardset, season_pct, post_batters)
del bat_step2
del bat_step1, all_stats
await calculate_batting_ratings(offense_stats, post_batters)
if pull_fielding:
await run_batter_fielding(season, offense_stats, season_pct, post_batters)
await post_player_updates(
cardset, card_base_url, release_directory, player_description, is_liveseries, post_batters, is_custom, season
)
return {
'tot_batters': len(offense_stats.index),
'new_batters': new_batters
}