import asyncio import copy import csv import datetime import html5lib import logging import random import requests import calcs_batter as cba import calcs_defense as cde import calcs_pitcher as cpi import pandas as pd import pybaseball as pb import pydantic import sys from db_calls import db_get, db_put, db_post from typing import Literal from bs4 import BeautifulSoup date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' log_level = logging.INFO logging.basicConfig( filename=f'logs/{date}.log', format='%(asctime)s - card-creation - %(levelname)s - %(message)s', level=log_level ) CARD_BASE_URL = 'https://sombaseball.ddns.net/cards/pd' def sanitize_name(start_name: str) -> str: return (start_name .replace("é", "e") .replace("á", "a") .replace(".", "") .replace("Á", "A") .replace("ñ", "n") .replace("ó", "o") .replace("í", "i") .replace("ú", "u")) def get_args(args): logging.info(f'Process arguments: {args}') final_args = {} for x in args: if "=" not in x: raise TypeError(f'Invalid = argument: {x}') key, value = x.split("=") logging.info(f'key: {key} / value: {value}') if key in final_args: raise ValueError(f'Duplicate argument: {key}') final_args[key] = value return final_args # class BattingStat(pydantic.BaseModel): # fg_id: int # vs_hand: Literal['L', 'R'] # pa: int # hit: int # single: int # double: int # triple: int # homerun: int # rbi: int # bb: int # ibb: int # so: int # hbp: int # gidp: int # sb: int # cs: int # avg: float # hard_rate: float = None # med_rate: float = None # soft_rate: float = None # ifh_rate: float = None # hr_per_fb: float = None # ld_rate: float = None # iffb_rate: float = None # fb_rate: float = None # pull_rate: float = None # center_rate: float = None # oppo_rate: float = None async def main(args): arg_data = get_args(args) # cardset_name = input(f'What is the name of this Cardset? ') cardset_name = arg_data['cardset_name'] print(f'Searching for cardset: {cardset_name}') c_query = await db_get('cardsets', params=[('name', cardset_name)]) if c_query['count'] == 0: print(f'I do not see a cardset named {cardset_name}') return cardset = c_query['cardsets'][0] if 'season' in arg_data: season = arg_data['season'] else: season = int(cardset['name'][:4]) game_count = int(arg_data['games_played']) if game_count < 1 or game_count > 162: print(f'Game count has to be between 1 and 162.') return season_pct = game_count / 162 print(f'Cardset ID: {cardset["id"]} / Season: {season}\nGame count: {game_count} / Season %: {season_pct}\n') start_time = datetime.datetime.now() release_directory = f'{season}-{datetime.datetime.now().month}{datetime.datetime.now().day}' input_path = f'data-input/{cardset["name"]} Cardset/' print('Reading batting stats...') vl_basic = pd.read_csv(f'{input_path}vlhp-basic.csv').query('PA >= 20') vr_basic = pd.read_csv(f'{input_path}vrhp-basic.csv').query('PA >= 40') total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR')) vl_rate = pd.read_csv(f'{input_path}vlhp-rate.csv').query('PA >= 20') vr_rate = pd.read_csv(f'{input_path}vrhp-rate.csv').query('PA >= 40') total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR')) all_batting = pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate')) del vl_basic, vr_basic, total_basic, vl_rate, vr_rate, total_rate print(f'Processed {len(all_batting.values)} batters\n') def get_pids(df_data): q = pb.playerid_reverse_lookup([df_data["playerId"]], key_type="fangraphs") return_val = q.loc[0] if len(q.values) > 0 else None # print(f'lookup id: {df_data["playerId"]}\n{return_val}') return return_val def get_hand(df_data): if df_data['Name'][-1] == '*': return 'L' elif df_data['Name'][-1] == '#': return 'S' else: return 'R' print(f'Pulling PD player IDs...') p_query = await db_get('players', params=[('inc_dex', False), ('cardset_id', cardset['id'])]) if p_query['count'] == 0: raise ValueError(f'No players returned from Paper Dynasty API') pd_players = pd.DataFrame(p_query['players']).rename(columns={'bbref_id': 'key_bbref'}) print(f'Now pulling mlbam player IDs...') ids_and_names = all_batting.apply(get_pids, axis=1) player_data = (ids_and_names .merge(pd_players, left_on='key_bbref', right_on='key_bbref') .query('key_mlbam == key_mlbam') .set_index('key_bbref', drop=False)) print(f'Matched mlbam to pd players.') new_players = [] def create_players(df_data): f_name = sanitize_name(df_data["name_first"]).title() l_name = sanitize_name(df_data["name_last"]).title() new_players.append({ 'p_name': f'{f_name} {l_name}', 'cost': 99999, 'image': f'{CARD_BASE_URL}/{release_directory}/{f_name.lower()}-{l_name.lower()}.png', 'mlbclub': 'None', 'franchise': 'None', 'cardset_id': cardset['id'], 'set_num': df_data['key_fangraphs'], 'rarity_id': 99, 'pos_1': 'DH', 'description': f'Live {f_name} {l_name}', 'bbref_id': df_data.name, 'fangr_id': int(float(df_data['key_fangraphs'])) }) player_data[player_data['player_id'].isnull()].apply(create_players, axis=1) print(f'Creating {len(new_players)} new players...') for x in new_players: this_player = await db_post('players', payload=x) player_data.at[x['bbref_id'], 'player_id'] = this_player['player_id'] player_data.at[x['bbref_id'], 'p_name'] = this_player['p_name'] final_batting = pd.merge( player_data, all_batting, left_on='key_fangraphs', right_on='playerId', sort=False ).set_index('key_bbref', drop=False) del ids_and_names, all_batting, pd_players print(f'Player IDs linked to batting stats.\n{len(final_batting.values)} players remain\n') print(f'Reading baserunning stats...') run_data = (pd.read_csv(f'{input_path}running.csv') .set_index('Name-additional')) run_data['bat_hand'] = run_data.apply(get_hand, axis=1) offense_stats = final_batting.join(run_data) del final_batting, run_data print(f'Stats are tallied\n{len(offense_stats.values)} players remain\n\nCollecting defensive data from bbref...') # print(f'Pulling pitcher defense...') # df_p = cde.get_bbref_fielding_df('p', season) # print(f'Pulling catcher defense...') # df_c = cde.get_bbref_fielding_df('c', season) # print(f'Pulling first base defense...') # df_1b = cde.get_bbref_fielding_df('1b', season) # print(f'Pulling second base defense...') # df_2b = cde.get_bbref_fielding_df('2b', season) # print(f'Pulling third base defense...') # df_3b = cde.get_bbref_fielding_df('3b', season) # print(f'Pulling short stop defense...') # df_ss = cde.get_bbref_fielding_df('ss', season) # print(f'Pulling left field defense...') # df_lf = cde.get_bbref_fielding_df('lf', season) # print(f'Pulling center field defense...') # df_cf = cde.get_bbref_fielding_df('cf', season) # print(f'Pulling right field defense...') # df_rf = cde.get_bbref_fielding_df('rf', season) # print(f'Pulling outfield defense...') # df_of = cde.get_bbref_fielding_df('of', season) print(f'Positions data is retrieved') batting_cards = [] def create_batting_card(df_data): s_data = cba.stealing( chances=df_data['SBO'], sb2s=df_data['SB2'], cs2s=df_data['CS2'], sb3s=df_data['SB3'], cs3s=df_data['CS3'], season_pct=season_pct ) batting_cards.append({ "player_id": df_data['player_id'], "key_bbref": df_data.name, "key_fangraphs": df_data['key_fangraphs'], "key_mlbam": df_data['key_mlbam'], "key_retro": df_data['key_retro'], "name_first": df_data["name_first"].title(), "name_last": df_data["name_last"].title(), "steal_low": s_data[0], "steal_high": s_data[1], "steal_auto": s_data[2], "steal_jump": s_data[3], "hit_and_run": cba.hit_and_run( df_data['AB_vL'], df_data['AB_vR'], df_data['H_vL'], df_data['H_vR'], df_data['HR_vL'], df_data['HR_vR'], df_data['SO_vL'], df_data['SO_vR'] ), "running": cba.running(df_data['XBT%']), "hand": df_data['bat_hand'] }) print(f'Calculating batting cards...') offense_stats.apply(create_batting_card, axis=1) print(f'Cards are complete.\n\nPosting cards now...') # resp = await db_put('battingcards', payload={'cards': batting_cards}, timeout=30) # print(f'Response: {resp}\n') position_payload = [] # def create_positions(df_data): # for pos_data in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]: # if df_data.name in pos_data[0].index: # logging.debug(f'Running {pos_data[1]} stats for {player_data.at[df_data.name, "p_name"]}') # position_payload.append({ # "player_id": int(player_data.at[df_data.name, 'player_id']), # "position": pos_data[1].upper(), # "innings": float(pos_data[0].at[df_data.name, 'Inn_def']), # "range": cde.get_if_range( # pos_code=pos_data[1], # tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']), # r_dp=0, # season_pct=season_pct # ), # "error": cde.get_any_error( # pos_code=pos_data[1], # errors=int(pos_data[0].at[df_data.name, 'E_def']), # chances=int(pos_data[0].at[df_data.name, 'chances']), # season_pct=season_pct # ) # }) # # of_arms = [] # of_payloads = [] # for pos_data in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]: # if df_data.name in pos_data[0].index: # of_payloads.append({ # "player_id": int(player_data.at[df_data.name, 'player_id']), # "position": pos_data[1].upper(), # "innings": float(pos_data[0].at[df_data.name, 'Inn_def']), # "range": cde.get_of_range( # pos_code=pos_data[1], # tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']), # season_pct=season_pct # ) # }) # of_arms.append(int(pos_data[0].at[df_data.name, 'bis_runs_outfield'])) # # if df_data.name in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0: # error_rating = cde.get_any_error( # pos_code=pos_data[1], # errors=int(df_of.at[df_data.name, 'E_def']), # chances=int(df_of.at[df_data.name, 'chances']), # season_pct=season_pct # ) # arm_rating = cde.arm_outfield(of_arms) # for f in of_payloads: # f['error'] = error_rating # f['arm'] = arm_rating # position_payload.append(f) # # if df_data.name in df_c.index: # if df_c.at[df_data.name, 'SB'] + df_c.at[df_data.name, 'CS'] == 0: # arm_rating = 3 # else: # arm_rating = cde.arm_catcher( # cs_pct=df_c.at[df_data.name, 'caught_stealing_perc'], # raa=int(df_c.at[df_data.name, 'bis_runs_catcher_sb']), # season_pct=season_pct # ) # position_payload.append({ # "player_id": int(player_data.at[df_data.name, 'player_id']), # "position": 'C', # "innings": float(df_c.at[df_data.name, 'Inn_def']), # "range": cde.range_catcher( # rs_value=int(df_c.at[df_data.name, 'tz_runs_catcher']), # season_pct=season_pct # ), # "error": cde.get_any_error( # pos_code='c', # errors=int(df_c.at[df_data.name, 'E_def']), # chances=int(df_c.at[df_data.name, 'chances']), # season_pct=season_pct # ), # "arm": arm_rating, # "pb": cde.pb_catcher( # pb=int(df_c.at[df_data.name, 'PB']), # innings=int(float(df_c.at[df_data.name, 'Inn_def'])), # season_pct=season_pct # ), # "overthrow": cde.ot_catcher( # errors=int(df_c.at[df_data.name, 'E_def']), # chances=int(df_c.at[df_data.name, 'chances']), # season_pct=season_pct # ) # }) # # print(f'Calculating fielding lines now...') # offense_stats.apply(create_positions, axis=1) # print(f'Fielding is complete.\n\nPosting positions now...') # resp = await db_put('cardpositions', payload={'positions': position_payload}, timeout=30) # print(f'Response: {resp}\n') batting_ratings = [] def create_batting_card_ratings(df_data): logging.info(f'Calculating card ratings for {df_data.name}') batting_ratings.extend(cba.get_batter_ratings(df_data)) print(f'Calculating card ratings...') offense_stats.apply(create_batting_card_ratings, axis=1) print(f'Ratings are complete\n\nPosting ratings now...') # resp = await db_put('battingcardratings', payload={'ratings': batting_ratings}, timeout=30) # Update player record with positions, rarity, cost # Cost only changes if starting cost is 99999 or calculated rarity is different than current run_time = datetime.datetime.now() - start_time print(f'Total batting cards: {len(batting_cards)}\nNew cardset batters: {len(new_players)}\n' f'Program runtime: {round(run_time.total_seconds())} seconds') if __name__ == '__main__': asyncio.run(main(sys.argv[1:]))