paper-dynasty-card-creation/pitchers/creation.py
Cal Corum db2d81a6d1 CLAUDE: Add default OPS constants and type hints to improve code clarity
This commit adds default OPS value constants and type hints to key functions,
improving code documentation and IDE support.

## Changes Made

1. **Add default OPS constants** (creation_helpers.py)
   - DEFAULT_BATTER_OPS: Default OPS by rarity (1-5)
   - DEFAULT_STARTER_OPS: Default OPS-against for starters (99, 1-5)
   - DEFAULT_RELIEVER_OPS: Default OPS-against for relievers (99, 1-5)
   - Comprehensive comments explaining usage
   - Single source of truth for fallback values

2. **Update batters/creation.py**
   - Import DEFAULT_BATTER_OPS
   - Replace 6 hardcoded if-checks with clean loop over constants
   - Add type hints to post_player_updates function
   - Import Dict from typing

3. **Update pitchers/creation.py**
   - Import DEFAULT_STARTER_OPS and DEFAULT_RELIEVER_OPS
   - Replace 12 hardcoded if-checks with clean loops over constants
   - Add type hints to post_player_updates function
   - Import Dict from typing

4. **Add typing import** (creation_helpers.py)
   - Import Dict, List, Tuple, Optional for type hints
   - Enables type hints throughout helper functions

## Impact

### Before
```python
# Scattered hardcoded values (batters)
if 1 not in average_ops:
    average_ops[1] = 1.066
if 2 not in average_ops:
    average_ops[2] = 0.938
# ... 4 more if-checks

# Scattered hardcoded values (pitchers)
if 99 not in sp_average_ops:
    sp_average_ops[99] = 0.388
# ... 5 more if-checks for starters
# ... 6 more if-checks for relievers
```

### After
```python
# Clean, data-driven approach (batters)
for rarity, default_ops in DEFAULT_BATTER_OPS.items():
    if rarity not in average_ops:
        average_ops[rarity] = default_ops

# Clean, data-driven approach (pitchers)
for rarity, default_ops in DEFAULT_STARTER_OPS.items():
    if rarity not in sp_average_ops:
        sp_average_ops[rarity] = default_ops

for rarity, default_ops in DEFAULT_RELIEVER_OPS.items():
    if rarity not in rp_average_ops:
        rp_average_ops[rarity] = default_ops
```

### Benefits
 Eliminates 18 if-checks across batters and pitchers
 Single source of truth for default OPS values
 Easy to modify values (change constant, not scattered code)
 Self-documenting with clear constant names and comments
 Type hints improve IDE support and catch errors early
 Function signatures now document expected types
 Consistent with other recent refactorings

## Test Results
 42/42 tests pass
 All existing functionality preserved
 100% backward compatible

## Files Modified
- creation_helpers.py: +35 lines (3 constants + typing import)
- batters/creation.py: -4 lines net (cleaner code + type hints)
- pitchers/creation.py: -8 lines net (cleaner code + type hints)

**Net change:** More constants, less scattered magic numbers, better types.

Part of ongoing refactoring to reduce code fragility.
2025-10-31 23:28:49 -05:00

495 lines
22 KiB
Python

import datetime
import urllib.parse
import pandas as pd
from typing import Dict
from creation_helpers import (
get_all_pybaseball_ids, sanitize_name, CLUB_LIST, FRANCHISE_LIST, pd_players_df,
mlbteam_and_franchise, NEW_PLAYER_COST, RARITY_BASE_COSTS,
should_update_player_description, calculate_rarity_cost_adjustment,
DEFAULT_STARTER_OPS, DEFAULT_RELIEVER_OPS
)
from db_calls import db_post, db_get, db_put, db_patch
from defenders import calcs_defense as cde
from . import calcs_pitcher as cpi
from exceptions import logger
from rarity_thresholds import get_pitcher_thresholds
def get_pitching_stats(
file_path: str = None, start_date: datetime.datetime = None, end_date: datetime.datetime = None,
ignore_limits: bool = False):
print('Reading pitching stats...')
min_vl = 20 if not ignore_limits else 1
min_vr = 40 if not ignore_limits else 1
if file_path is not None:
vl_basic = pd.read_csv(f'{file_path}vlhh-basic.csv').query(f'TBF >= {min_vl}')
vr_basic = pd.read_csv(f'{file_path}vrhh-basic.csv').query(f'TBF >= {min_vr}')
total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR'))
vl_rate = pd.read_csv(f'{file_path}vlhh-rate.csv').query(f'TBF >= {min_vl}')
vr_rate = pd.read_csv(f'{file_path}vrhh-rate.csv').query(f'TBF >= {min_vr}')
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR'))
return pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate'))
else:
raise LookupError(f'Date-based stat pulls not implemented, yet. Please provide batting csv files.')
# vrb_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=6&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=1' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vrr_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=6&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=3' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vlb_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=5&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=1' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vlr_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=5&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=3' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
#
# soup = BeautifulSoup(requests.get(vrb_url).text, 'html.parser')
# time.sleep(3)
# table = soup.find('a', {'class': 'data-export'})
async def pd_pitchingcards_df(cardset_id: int):
bc_query = await db_get('pitchingcards', params=[('cardset_id', cardset_id), ('short_output', True)])
if bc_query['count'] == 0:
raise ValueError(f'No pitching cards returned from Paper Dynasty API')
return pd.DataFrame(bc_query['cards']).rename(columns={'id': 'pitchingcard_id', 'player': 'player_id'})
async def pd_pitchingcardratings_df(cardset_id: int, season: int, pitching_cards: pd.DataFrame = None):
vl_query = await db_get(
'pitchingcardratings', params=[('cardset_id', cardset_id), ('vs_hand', 'L'), ('short_output', True)])
vr_query = await db_get(
'pitchingcardratings', params=[('cardset_id', cardset_id), ('vs_hand', 'R'), ('short_output', True)])
if 0 in [vl_query['count'], vr_query['count']]:
raise ValueError(f'No pitching card ratings returned from Paper Dynasty API')
vl = pd.DataFrame(vl_query['ratings'])
vr = pd.DataFrame(vr_query['ratings'])
ratings = (pd.merge(vl, vr, on='pitchingcard', suffixes=('_vL', '_vR'))
.rename(columns={'pitchingcard': 'pitchingcard_id'}))
def get_total_ops(df_data):
ops_vl = df_data['obp_vL'] + df_data['slg_vL']
ops_vr = df_data['obp_vR'] + df_data['slg_vR']
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
ratings['total_OPS'] = ratings.apply(get_total_ops, axis=1)
# Get season-appropriate rarity thresholds
thresholds = get_pitcher_thresholds(season)
# Need starter_rating to determine rarity - merge with pitching cards if provided
if pitching_cards is not None:
ratings = pd.merge(
ratings,
pitching_cards[['pitchingcard_id', 'starter_rating']],
on='pitchingcard_id',
how='left'
)
def new_rarity_id(df_data):
if pd.isna(df_data.get('starter_rating')):
return 5 # Default to Common if no starter rating
if df_data['starter_rating'] > 3:
return thresholds.get_rarity_for_starter(df_data['total_OPS'])
else:
return thresholds.get_rarity_for_reliever(df_data['total_OPS'])
ratings['new_rarity_id'] = ratings.apply(new_rarity_id, axis=1)
# Drop starter_rating as it will be re-merged from pitching_cards in post_player_updates
ratings = ratings.drop(columns=['starter_rating'])
return ratings
def match_player_lines(
all_pitching: pd.DataFrame, all_players: pd.DataFrame, df_p: pd.DataFrame, is_custom: bool = False):
def get_pids(df_data):
return get_all_pybaseball_ids([df_data["playerId"]], 'fangraphs', is_custom, df_data['Name_vL'])
print(f'Now pulling mlbam player IDs...')
ids_and_names = all_pitching.apply(get_pids, axis=1)
player_data = (ids_and_names
.merge(all_players, how='left', left_on='key_bbref', right_on='bbref_id')
.query('key_mlbam == key_mlbam')
.set_index('key_bbref', drop=False))
print(f'Matched mlbam to pd players.')
step_pitching = pd.merge(
player_data, all_pitching, left_on='key_fangraphs', right_on='playerId', sort=False
).set_index('key_bbref', drop=False)
final_pitching = step_pitching.join(df_p, rsuffix='_r')
return final_pitching
async def create_new_players(
final_pitching: pd.DataFrame, cardset: dict, card_base_url: str, release_dir: str, player_desc: str):
new_players = []
new_mlbplayers = {}
def create_pitchers(df_data):
f_name = sanitize_name(df_data["name_first"]).title()
l_name = sanitize_name(df_data["name_last"]).title()
new_players.append({
'p_name': f'{f_name} {l_name}',
'cost': NEW_PLAYER_COST,
'image': f'{card_base_url}/{df_data["player_id"]}/'
f'pitchingcard{urllib.parse.quote("?d=")}{release_dir}',
'mlbclub': CLUB_LIST[df_data['Tm_vL']],
'franchise': FRANCHISE_LIST[df_data['Tm_vL']],
'cardset_id': cardset['id'],
'set_num': int(float(df_data['key_fangraphs'])),
'rarity_id': 99,
'pos_1': 'P',
'description': f'{player_desc}',
'bbref_id': df_data.name,
'fangr_id': int(float(df_data['key_fangraphs'])),
'strat_code': int(float(df_data['key_mlbam']))
})
new_mlbplayers[df_data.name] = {
'first_name': sanitize_name(df_data["name_first"]).title(),
'last_name': sanitize_name(df_data["name_last"]).title(),
'key_mlbam': int(float(df_data['key_mlbam'])),
'key_fangraphs': int(float(df_data['key_fangraphs'])),
'key_bbref': df_data['key_bbref'],
'key_retro': df_data['key_retro']
}
final_pitching[final_pitching['player_id'].isnull()].apply(create_pitchers, axis=1)
print(f'Creating {len(new_players)} new players...')
for x in new_players:
mlb_query = await db_get('mlbplayers', params=[('key_bbref', x['bbref_id'])])
if mlb_query['count'] > 0:
x['mlbplayer_id'] = mlb_query['players'][0]['id']
else:
new_mlb = await db_post('mlbplayers/one', payload=new_mlbplayers[x['bbref_id']])
x['mlbplayer_id'] = new_mlb['id']
this_player = await db_post('players', payload=x)
final_pitching.at[x['bbref_id'], 'player_id'] = this_player['player_id']
final_pitching.at[x['bbref_id'], 'p_name'] = this_player['p_name']
print(f'Player IDs linked to pitching stats.\n{len(final_pitching.values)} players remain\n')
return len(new_players)
def get_stat_df(input_path: str, final_pitching: pd.DataFrame):
def get_hand(df_data):
if df_data['Name'][-1] == '*':
return 'L'
else:
return 'R'
print(f'Reading pitching peripheral stats...')
pit_data = (pd.read_csv(f'{input_path}pitching.csv')
.drop_duplicates(subset=['Name-additional'], keep='first')
.set_index('Name-additional'))
pit_data['pitch_hand'] = pit_data.apply(get_hand, axis=1)
pitching_stats = final_pitching.join(pit_data, lsuffix='_l')
print(f'Stats are tallied\n{len(pitching_stats.values)} players remain\n')
return pitching_stats
async def calculate_pitching_cards(pitching_stats: pd.DataFrame, cardset: dict, season_pct: float, post_pitchers: bool):
pitching_cards = []
def create_pitching_card(df_data):
logger.info(f'Creating pitching card for {df_data["name_first"]} {df_data["name_last"]} / fg ID: {df_data["key_fangraphs"]}')
pow_data = cde.pow_ratings(float(df_data['Inn_def']), df_data['GS'], df_data['G'])
try:
pitching_cards.append({
"player_id": int(float(df_data['player_id'])),
"key_bbref": df_data.name,
"key_fangraphs": int(float(df_data['key_fangraphs'])),
"key_mlbam": int(float(df_data['key_mlbam'])),
"key_retro": df_data['key_retro'],
"name_first": df_data["name_first"].title(),
"name_last": df_data["name_last"].title(),
"balk": cpi.balks(df_data['BK'], df_data['IP'], season_pct),
"wild_pitch": cpi.wild_pitches(df_data['WP'], df_data['IP'], season_pct),
"hold": cde.hold_pitcher(df_data['caught_stealing_perc'], int(df_data['pickoffs']), season_pct),
"starter_rating": pow_data[0],
"relief_rating": pow_data[1],
"closer_rating": cpi.closer_rating(int(df_data['GF']), int(df_data['SV']), int(df_data['G'])),
"hand": df_data['pitch_hand'],
"batting": f'#1W{df_data["pitch_hand"]}-C'
})
except Exception as e:
logger.error(f'Skipping fg ID {df_data["key_fangraphs"]} due to: {e}')
print(f'Calculating pitching cards...')
pitching_stats.apply(create_pitching_card, axis=1)
print(f'Cards are complete.\n\nPosting cards now...')
if post_pitchers:
resp = await db_put('pitchingcards', payload={'cards': pitching_cards}, timeout=30)
print(f'Response: {resp}\n\nMatching pitching card database IDs to player stats...')
pc_df = await pd_pitchingcards_df(cardset['id'])
pitching_stats = pitching_stats.merge(pc_df, how='left', on='player_id').set_index('key_bbref', drop=False)
return pitching_stats
async def create_position(season_pct: float, pitching_stats: pd.DataFrame, post_pitchers: bool, df_p: pd.DataFrame):
pit_positions = []
def create_pit_position(df_data):
if df_data["key_bbref"] in df_p.index:
logger.debug(f'Running P stats for {df_data["p_name"]}')
pit_positions.append({
"player_id": int(df_data['player_id']),
"position": 'P',
"innings": float(df_p.at[df_data["key_bbref"], 'Inn_def']),
"range": cde.range_pitcher(
rs_value=int(df_p.at[df_data["key_bbref"], 'bis_runs_total']),
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code='p',
errors=int(df_p.at[df_data["key_bbref"], 'E_def']),
chances=int(df_p.at[df_data["key_bbref"], 'chances']),
season_pct=season_pct
)
})
else:
try:
pit_positions.append({
"player_id": int(df_data['key_bbref']),
"position": 'P',
"innings": 1,
"range": 5,
"error": 51
})
except Exception as e:
logger.error(f'Could not create pitcher position for {df_data["key_bbref"]}')
print(f'Calculating pitcher fielding lines now...')
pitching_stats.apply(create_pit_position, axis=1)
print(f'Fielding is complete.\n\nPosting positions now...')
if post_pitchers:
resp = await db_put('cardpositions', payload={'positions': pit_positions}, timeout=30)
print(f'Response: {resp}\n')
async def calculate_pitcher_ratings(pitching_stats: pd.DataFrame, post_pitchers: bool):
pitching_ratings = []
def create_pitching_card_ratings(df_data):
logger.info(f'Calculating pitching card ratings for {df_data.name}')
try:
pitching_ratings.extend(cpi.get_pitcher_ratings(df_data))
except Exception as e:
logger.error(f'Could not create a pitching card for {df_data["key_fangraphs"]}')
print(f'Calculating card ratings...')
pitching_stats.apply(create_pitching_card_ratings, axis=1)
print(f'Ratings are complete\n\nPosting ratings now...')
if post_pitchers:
resp = await db_put('pitchingcardratings', payload={'ratings': pitching_ratings}, timeout=30)
print(f'Response: {resp}\n\nPulling all positions to set player positions...')
async def post_player_updates(
cardset: Dict[str, any],
player_description: str,
card_base_url: str,
release_dir: str,
is_liveseries: bool,
post_players: bool,
season: int
) -> int:
p_data = await pd_players_df(cardset['id'])
p_data.set_index('player_id', drop=False)
# Use LEFT JOIN to keep all pitchers, even those without ratings
pitching_cards = await pd_pitchingcards_df(cardset['id'])
pitching_ratings = await pd_pitchingcardratings_df(cardset['id'], season, pitching_cards)
total_ratings = pd.merge(
pitching_cards,
pitching_ratings,
on='pitchingcard_id',
how='left' # Keep all pitching cards
)
# Assign default rarity (Common/5) for pitchers without ratings
if 'new_rarity_id' not in total_ratings.columns:
total_ratings['new_rarity_id'] = 5
elif total_ratings['new_rarity_id'].isna().any():
total_ratings['new_rarity_id'] = total_ratings['new_rarity_id'].fillna(5)
# Assign default total_OPS for pitchers without ratings (Common reliever default)
if 'total_OPS' in total_ratings.columns:
missing_ops = total_ratings[total_ratings['total_OPS'].isna()]
if not missing_ops.empty:
logger.warning(f"pitchers.creation.post_player_updates - {len(missing_ops)} pitchers missing total_OPS, assigning default 0.702: {missing_ops[['player_id', 'pitchingcard_id']].to_dict('records')}")
total_ratings['total_OPS'] = total_ratings['total_OPS'].fillna(0.702)
player_data = pd.merge(
p_data,
total_ratings,
on='player_id'
).set_index('player_id', drop=False)
del total_ratings
# p_query = await db_get('mlbplayers')
# mlb_players = pd.DataFrame(p_query['players'])
def get_pids(df_data):
# if df_data['key_mlbam'] in
return get_all_pybaseball_ids([df_data["bbref_id"]], 'bbref')
ids_and_names = player_data.apply(get_pids, axis=1)
player_data = (ids_and_names
.merge(player_data, how='left', left_on='key_bbref', right_on='bbref_id')
.query('key_mlbam == key_mlbam')
.set_index('key_bbref', drop=False))
player_updates = {} # { <player_id> : [ (param pairs) ] }
sp_rarity_group = player_data.query('rarity == new_rarity_id and starter_rating >= 4').groupby('rarity')
sp_average_ops = sp_rarity_group['total_OPS'].mean().to_dict()
rp_rarity_group = player_data.query('rarity == new_rarity_id and starter_rating < 4').groupby('rarity')
rp_average_ops = rp_rarity_group['total_OPS'].mean().to_dict()
# Fill in missing rarity averages with defaults
for rarity, default_ops in DEFAULT_STARTER_OPS.items():
if rarity not in sp_average_ops:
sp_average_ops[rarity] = default_ops
for rarity, default_ops in DEFAULT_RELIEVER_OPS.items():
if rarity not in rp_average_ops:
rp_average_ops[rarity] = default_ops
def get_player_updates(df_data):
def avg_ops(rarity_id, starter_rating):
if starter_rating >= 4:
return sp_average_ops[rarity_id]
else:
return rp_average_ops[rarity_id]
params = []
# Check if description should be updated using extracted business logic
if should_update_player_description(
cardset_name=cardset['name'],
player_cost=df_data['cost'],
current_description=df_data['description'],
new_description=player_description
):
params = [('description', f'{player_description}')]
logger.debug(
f"pitchers.creation.post_player_updates - Setting description for player_id={df_data['player_id']}: "
f"'{df_data['description']}' -> '{player_description}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
else:
logger.debug(
f"pitchers.creation.post_player_updates - Skipping description update for player_id={df_data['player_id']}: "
f"current='{df_data['description']}', proposed='{player_description}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
if is_liveseries:
team_data = mlbteam_and_franchise(int(float(df_data['key_mlbam'])))
if df_data['mlbclub'] != team_data['mlbclub'] and team_data['mlbclub'] is not None:
params.extend([('mlbclub', team_data['mlbclub'])])
if df_data['franchise'] != team_data['franchise'] and team_data['franchise'] is not None:
params.extend([('franchise', team_data['franchise'])])
# if release_directory not in df_data['image']:
params.extend([('image', f'{card_base_url}/{df_data["player_id"]}/pitchingcard'
f'{urllib.parse.quote("?d=")}{release_dir}')])
if df_data['cost'] == NEW_PLAYER_COST:
params.extend([
('cost',
round(RARITY_BASE_COSTS[df_data['new_rarity_id']] * df_data['total_OPS'] /
avg_ops(df_data['new_rarity_id'], df_data['starter_rating']))),
('rarity_id', df_data['new_rarity_id'])
])
elif df_data['rarity'] != df_data['new_rarity_id']:
# Calculate adjusted cost for rarity change using lookup table
new_cost = calculate_rarity_cost_adjustment(
old_rarity=df_data['rarity'],
new_rarity=df_data['new_rarity_id'],
old_cost=df_data['cost']
)
params.extend([('cost', new_cost), ('rarity_id', df_data['new_rarity_id'])])
if len(params) > 0:
if df_data.player_id not in player_updates.keys():
player_updates[df_data.player_id] = params
else:
player_updates[df_data.player_id].extend(params)
player_data.apply(get_player_updates, axis=1)
print(f'Sending {len(player_updates)} player updates to PD database...')
if post_players:
for x in player_updates:
await db_patch('players', object_id=x, params=player_updates[x])
return len(player_updates)
async def run_pitchers(
cardset: dict, input_path: str, card_base_url: str, season: int, release_directory: str,
player_description: str, season_pct: float, post_players: bool, post_pitchers: bool, is_liveseries: bool,
ignore_limits: bool, pull_fielding: bool = True, is_custom: bool = False):
print(f'Pulling PD player IDs...')
pd_players = await pd_players_df(cardset['id'])
all_stats = get_pitching_stats(file_path=input_path, ignore_limits=ignore_limits)
print(f'Processed {len(all_stats.values)} pitchers\n')
print(f'Pulling pitcher defense...')
if pull_fielding:
df_p = cde.get_bbref_fielding_df('p', season)
else:
df_p = pd.DataFrame()
pit_step1 = match_player_lines(all_stats, pd_players, df_p, is_custom)
if post_players:
new_pitchers = await create_new_players(
pit_step1, cardset, card_base_url, release_directory, player_description
)
else:
new_pitchers = 0
pitching_stats = get_stat_df(input_path, pit_step1)
del all_stats, pit_step1
pitching_stats = await calculate_pitching_cards(pitching_stats, cardset, season_pct, post_pitchers)
await create_position(season_pct, pitching_stats, post_pitchers, df_p)
await calculate_pitcher_ratings(pitching_stats, post_pitchers)
await post_player_updates(
cardset, player_description, card_base_url, release_directory, is_liveseries, post_players, season)
return {
'tot_pitchers': len(pitching_stats.index),
'new_pitchers': new_pitchers,
'pitching_stats': pitching_stats
}