paper-dynasty-card-creation/scouting_batters.py
2025-07-22 09:24:34 -05:00

427 lines
17 KiB
Python

import asyncio
import copy
import datetime
from functools import partial
import multiprocessing
import sys
from db_calls import db_get
from exceptions import logger, log_exception
from typing import Literal
import pandas as pd
def log_time(which: Literal['start', 'end'], message: str = '', print_to_console: bool = True, start_time: datetime.datetime = None):
if print_to_console and len(message) == 0:
log_exception(KeyError, 'A message must be included when print_to_console equals True')
if which == 'start':
logger.info(f'starting timer - {message}')
if print_to_console:
print(message)
return datetime.datetime.now()
elif start_time is not None:
logger.info(f'ending timer - {message}: {(datetime.datetime.now() - start_time).total_seconds():.2f}s\n')
if print_to_console:
print(f'{message}\n')
return
else:
log_exception(KeyError, 'start_time must be passed to log_time() when which equals \'end\'')
def build_series(label: str, code: str, pos_code: str, all_positions):
logger.info(f'Building {label} series for {pos_code}')
return pd.Series(
dict([(x['player']['player_id'], x[code]) for x in all_positions if x['position'] == pos_code]),
name=f'{label} {pos_code}'
)
def build_ranges(all_positions, pos_code):
return build_series('Range', 'range', pos_code, all_positions)
def build_errors(all_positions, pos_code):
x = build_series('Error', 'error', pos_code, all_positions)
logger.info(f'error ratings:\n{x}')
return x
def build_of_arms(all_positions, pos_code):
logger.info(f'Building OF series for {pos_code}')
return pd.Series(
dict([(x['player']['player_id'], x['arm']) for x in all_positions if x['position'] == pos_code]),
name=f'Arm OF'
)
def build_c_arms(all_positions, pos_code):
x = build_series('Arm', 'arm', pos_code, all_positions)
logger.info(f'arm ratings:\n{x}')
return x
def build_c_pb(all_positions, pos_code):
return build_series('PB', 'pb', pos_code, all_positions)
def build_c_throw(all_positions, pos_code):
return build_series('Throw', 'overthrow', pos_code, all_positions)
async def fetch_data(data):
start_time = log_time('start', print_to_console=False)
this_query = await db_get(endpoint=data[0], params=data[1])
log_time('end', print_to_console=False, start_time=start_time)
return this_query
async def get_scouting_dfs(cardset_id: list = None) -> pd.DataFrame:
cardset_params = [('cardset_id', x) for x in cardset_id]
ratings_params = [('team_id', 31), ('ts', 's37136685556r6135248705'), *cardset_params]
API_CALLS = [
('battingcardratings', [('vs_hand', 'vL'), *ratings_params]),
('battingcardratings', [('vs_hand', 'vR'), *ratings_params]),
('cardpositions', cardset_params)
]
start_time = log_time('start', message='Pulling all batting card ratings and positions')
tasks = [fetch_data(params) for params in API_CALLS]
api_data = await asyncio.gather(*tasks)
log_time('end', f'Pulled {api_data[0]['count'] + api_data[1]['count']} batting card ratings and {api_data[2]['count']} positions', start_time=start_time)
start_time = log_time('start', message='Building base dataframes')
vl_vals = api_data[0]['ratings']
for x in vl_vals:
x.update(x['battingcard'])
x['player_id'] = x['battingcard']['player']['player_id']
x['player_name'] = x['battingcard']['player']['p_name']
x['rarity'] = x['battingcard']['player']['rarity']['name']
x['cardset_id'] = x['battingcard']['player']['cardset']['id']
x['cardset_name'] = x['battingcard']['player']['cardset']['name']
del x['battingcard']
del x['player']
vr_vals = api_data[1]['ratings']
for x in vr_vals:
x['player_id'] = x['battingcard']['player']['player_id']
del x['battingcard']
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
log_time('end', f'Base dataframes are complete', start_time=start_time)
start_time = log_time('start', message='Building combined dataframe')
bat_df = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')).set_index('player_id', drop=False)
log_time('end', f'Combined dataframe is complete', start_time=start_time)
POSITION_DATA = api_data[2]['positions']
series_list = []
POSITIONS = ['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']
start_time = log_time('start', message='Building range series')
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_ranges = partial(build_ranges, POSITION_DATA)
ranges = pool.map(get_ranges, POSITIONS)
series_list.extend(ranges)
log_time('end', f'Processed {len(ranges)} position ranges', start_time=start_time)
start_time = log_time('start', message='Building error series')
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_errors = partial(build_errors, POSITION_DATA)
errors = pool.map(get_errors, POSITIONS)
series_list.extend(errors)
log_time('end', f'Processed {len(errors)} position errors', start_time=start_time)
start_time = log_time('start', message='Building OF arm series')
lf_arms = build_of_arms(POSITION_DATA, 'LF')
cf_arms = build_of_arms(POSITION_DATA, 'CF')
rf_arms = build_of_arms(POSITION_DATA, 'RF')
combined_series = lf_arms.combine(cf_arms, max, fill_value=0)
combined_series = combined_series.combine(rf_arms, max, fill_value=0)
series_list.extend([combined_series])
log_time('end', f'Processed {len(combined_series)} OF arms', start_time=start_time)
start_time = log_time('start', message='Building C arm series')
c_arms = build_c_arms(POSITION_DATA, 'C')
series_list.extend([c_arms])
log_time('end', f'Processed {len(c_arms)} catcher arms', start_time=start_time)
start_time = log_time('start', message='Building C PB series')
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_pb = partial(build_c_pb, POSITION_DATA)
passed_ball = pool.map(get_pb, ['C'])
series_list.extend(passed_ball)
log_time('end', f'Processed {len(passed_ball)} C PB series', start_time=start_time)
start_time = log_time('start', message='Building C OT series')
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_throw = partial(build_c_throw, POSITION_DATA)
overthrows = pool.map(get_throw, ['C'])
series_list.extend(overthrows)
log_time('end', f'Processed {len(overthrows)} C OT series', start_time=start_time)
logger.info(f'series_list: {series_list}')
return bat_df.join(series_list)
async def post_calc_basic(batting_dfs: pd.DataFrame):
def get_raw_speed(df_data):
speed_raw = df_data['running'] / 20 + df_data['steal_jump']
if df_data['steal_auto']:
speed_raw += 0.5
return speed_raw
start_time = log_time('start', 'Beginning Speed calcs')
raw_series = batting_dfs.apply(get_raw_speed, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Speed'] = round(rank_series * 100)
end_time = log_time('end', 'Done Speed calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Stealing calcs')
def get_raw_steal(df_data):
return (
((df_data['steal_high'] / 20) + (df_data['steal_low'] / 20)) * df_data['steal_jump']
)
raw_series = batting_dfs.apply(get_raw_steal, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Steal'] = round(rank_series * 100)
end_time = log_time('end', 'Done Stealing calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Reaction calcs')
def get_raw_reaction(df_data):
raw_total = 0
for pos_range in [df_data['Range C'], df_data['Range 1B'], df_data['Range 2B'], df_data['Range 3B'],
df_data['Range SS'], df_data['Range LF'], df_data['Range CF'], df_data['Range RF']]:
if pd.notna(pos_range):
raw_total += 10 ** (5 - pos_range)
return raw_total
raw_series = batting_dfs.apply(get_raw_reaction, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Reaction'] = round(rank_series * 100)
end_time = log_time('end', 'Done Reaction calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Arm calcs')
def get_raw_arm(df_data):
of_arm = None
of_pos = None
if pd.notna(df_data['Range RF']):
of_pos = 'RF'
elif pd.notna(df_data['Range CF']):
of_pos = 'CF'
elif pd.notna(df_data['Range LF']):
of_pos = 'LF'
if of_pos is not None:
if df_data['Arm OF'] < 0:
of_raw = df_data['Arm OF'] * -10
else:
of_raw = (5 - df_data['Arm OF'])
if of_pos == 'RF':
of_raw = of_raw * 1.5
of_raw += ((6 - df_data['Range RF']) * 4)
elif of_pos == 'CF':
of_raw += ((6 - df_data['Range CF']) * 3)
elif of_pos == 'LF':
of_raw = of_raw / 2
of_raw += ((6 - df_data['Range LF']) * 2)
of_arm = of_raw
if_arm = None
if pd.notna(df_data['Range 3B']) or pd.notna(df_data['Range 2B']) or pd.notna(df_data['Range 1B']) or \
pd.notna(df_data['Range SS']):
range_totals = 0
if pd.notna(df_data['Range 3B']):
range_totals += ((6 - df_data['Range 3B']) * 5)
if pd.notna(df_data['Range SS']):
range_totals += ((6 - df_data['Range SS']) * 4)
if pd.notna(df_data['Range 2B']):
range_totals += ((6 - df_data['Range 2B']) * 3)
if pd.notna(df_data['Range 1B']):
range_totals += (6 - df_data['Range 1B'])
if_arm = 100 - (50 - range_totals)
c_arm = None
if pd.notna(df_data['Arm C']):
if df_data['Arm C'] == -5:
c_arm = 100
else:
temp_arm = 20 + ((10 - df_data['Arm C']) * 3) + (20 - df_data['PB C']) + (20 - df_data['Throw C']) - \
df_data['Error C']
c_arm = min(100, temp_arm)
if c_arm is not None:
return c_arm
elif of_arm is not None:
return of_arm
elif if_arm is not None:
return if_arm
else:
return 1
raw_series = batting_dfs.apply(get_raw_arm, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Arm'] = round(rank_series * 100)
end_time = log_time('end', 'Done Arm calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Fielding calcs')
def get_raw_fielding(df_data):
if_error, of_error, c_error = 0, 0, 0
denom = 0
if pd.notna(df_data['Error 3B']) or pd.notna(df_data['Error 2B']) or pd.notna(df_data['Error 1B']) or \
pd.notna(df_data['Error SS']):
raw_if = 100
if pd.notna(df_data['Error 3B']):
raw_if -= (df_data['Error 3B'] * 2)
if pd.notna(df_data['Error SS']):
raw_if -= (df_data['Error SS'] * .75)
if pd.notna(df_data['Error 2B']):
raw_if -= (df_data['Error 2B'] * 1.25)
if pd.notna(df_data['Error 1B']):
raw_if -= (df_data['Error 1B'] * 2)
if_error = max(1, raw_if)
denom += 1
if pd.notna(df_data['Error LF']) or pd.notna(df_data['Error CF']) or pd.notna(df_data['Error RF']):
raw_of = 100
if pd.notna(df_data['Error LF']):
raw_of -= (df_data['Error LF'] * 2)
if pd.notna(df_data['Error CF']):
raw_of -= (df_data['Error CF'] * .75)
if pd.notna(df_data['Error RF']):
raw_of -= (df_data['Error RF'] * 1.25)
of_error = max(1, raw_of)
denom += 1
if pd.notna(df_data['Error C']):
c_error = max(100 - (df_data['Error C'] * 5) - df_data['Throw C'] - df_data['PB C'], 1)
denom += 1
return sum([if_error, of_error, c_error]) / max(denom, 1)
raw_series = batting_dfs.apply(get_raw_fielding, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Fielding'] = round(rank_series * 100)
end_time = log_time('end', 'Done Fielding calcs', start_time=start_time)
start_time = log_time('start', 'Beginning AVG vL calcs')
rank_series = batting_dfs['avg_vl'].rank(pct=True)
batting_dfs['Contact L'] = round(rank_series * 100)
end_time = log_time('end', 'Done AVG vL calcs', start_time=start_time)
start_time = log_time('start', 'Beginning AVG vR calcs')
rank_series = batting_dfs['avg_vr'].rank(pct=True)
batting_dfs['Contact R'] = round(rank_series * 100)
end_time = log_time('end', 'Done AVG vR calcs', start_time=start_time)
start_time = log_time('start', 'Beginning PWR vL calcs')
rank_series = batting_dfs['slg_vl'].rank(pct=True)
batting_dfs['Power L'] = round(rank_series * 100)
end_time = log_time('end', 'Done PWR vL calcs', start_time=start_time)
start_time = log_time('start', 'Beginning PWR vR calcs')
rank_series = batting_dfs['slg_vr'].rank(pct=True)
batting_dfs['Power R'] = round(rank_series * 100)
end_time = log_time('end', 'Done PWR vR calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Vision calcs')
def get_raw_vision(df_data):
return (
((((df_data['obp_vr'] * 0.67) + (df_data['obp_vl'] * 0.33)) -
((df_data['avg_vr'] * 0.67) + (df_data['avg_vl'] * 0.33))) * 5) -
(((df_data['strikeout_vl'] * 0.33) + (df_data['strikeout_vr'] * 0.67)) / 208)
)
raw_series = batting_dfs.apply(get_raw_vision, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Vision'] = round(rank_series * 100)
end_time = log_time('end', 'Done Vision calcs', start_time=start_time)
start_time = log_time('start', 'Beginning Rating calcs')
def get_raw_rating(df_data):
return (
((df_data['Reaction'] + df_data['Arm'] + df_data['Fielding']) * 2) +
(df_data['Speed'] + df_data['Steal']) +
((((df_data['Contact R'] + df_data['Power R']) * 0.67) +
((df_data['Contact L'] + df_data['Power L']) * 0.33) + df_data['Vision'] ) * 6
)
)
raw_series = batting_dfs.apply(get_raw_rating, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs['Rating'] = round(rank_series * 100)
end_time = log_time('end', 'Done Rating calcs', start_time=start_time)
start_time = log_time('start', 'Beginning write to file')
output = batting_dfs[[
'player_id', 'player_name', 'Rating', 'Contact R', 'Contact L', 'Power R', 'Power L', 'Vision', 'Speed',
'Steal', 'Reaction', 'Arm', 'Fielding', 'hand', 'cardset_name'
]]
csv_file = pd.DataFrame(output).to_csv(index=False)
with open('scouting/batting-basic.csv', 'w') as file:
file.write(csv_file)
log_time('end', 'Done writing to file', start_time=start_time)
async def post_calc_ratings(batting_dfs: pd.DataFrame):
start_time = log_time('start', 'Beginning Ratings filtering')
output = batting_dfs
first = ['player_id', 'player_name', 'cardset_name', 'rarity', 'hand', 'variant']
exclude = first + ['id_vl', 'id_vr', 'vs_hand_vl', 'vs_hand_vr']
output = output[first + [col for col in output.columns if col not in exclude]]
log_time('end', 'Done filtering ratings', start_time=start_time)
start_time = log_time('start', 'Beginning write to file')
csv_file = pd.DataFrame(output).to_csv(index=False)
with open('scouting/batting-ratings.csv', 'w') as file:
file.write(csv_file)
log_time('end', 'Done writing to file', start_time=start_time)
async def main():
start_time = log_time('start', 'Pulling scouting data')
overall_start_time = start_time
batting_dfs = await get_scouting_dfs(range(1, 28))
print(f'Received {batting_dfs} rows')
log_time('end', 'Pulled scouting data', start_time=start_time)
start_time = log_time('start', 'Beginning basic scouting')
await post_calc_basic(copy.deepcopy(batting_dfs))
log_time('end', 'Completed basic scouting', start_time=start_time)
start_time = log_time('start', 'Beginning ratings guide')
await post_calc_ratings(copy.deepcopy(batting_dfs))
log_time('end', 'Completed ratings guide', start_time=start_time)
log_time('end', 'Total batter scouting', print_to_console=False, start_time=overall_start_time)
print('All done with batters!')
if __name__ == '__main__':
asyncio.run(main())