import asyncio import copy import datetime from functools import partial import multiprocessing from db_calls import db_get from exceptions import logger, log_exception from typing import Literal import pandas as pd def log_time( which: Literal["start", "end"], message: str = "", print_to_console: bool = True, start_time: datetime.datetime = None, ): if print_to_console and len(message) == 0: log_exception( KeyError, "A message must be included when print_to_console equals True" ) if which == "start": logger.info(f"starting timer - {message}") if print_to_console: print(message) return datetime.datetime.now() elif start_time is not None: logger.info( f"ending timer - {message}: {(datetime.datetime.now() - start_time).total_seconds():.2f}s\n" ) if print_to_console: print(f"{message}\n") return else: log_exception( KeyError, "start_time must be passed to log_time() when which equals 'end'" ) def build_series(label: str, code: str, pos_code: str, all_positions): logger.info(f"Building {label} series for {pos_code}") return pd.Series( dict( [ (x["player"]["player_id"], x[code]) for x in all_positions if x["position"] == pos_code ] ), name=f"{label} {pos_code}", ) def build_ranges(all_positions, pos_code): return build_series("Range", "range", pos_code, all_positions) def build_errors(all_positions, pos_code): x = build_series("Error", "error", pos_code, all_positions) logger.info(f"error ratings:\n{x}") return x def build_of_arms(all_positions, pos_code): logger.info(f"Building OF series for {pos_code}") return pd.Series( dict( [ (x["player"]["player_id"], x["arm"]) for x in all_positions if x["position"] == pos_code ] ), name="Arm OF", ) def build_c_arms(all_positions, pos_code): x = build_series("Arm", "arm", pos_code, all_positions) logger.info(f"arm ratings:\n{x}") return x def build_c_pb(all_positions, pos_code): return build_series("PB", "pb", pos_code, all_positions) def build_c_throw(all_positions, pos_code): return build_series("Throw", "overthrow", pos_code, all_positions) async def fetch_data(data): start_time = log_time("start", print_to_console=False) this_query = await db_get(endpoint=data[0], params=data[1], timeout=120) log_time("end", print_to_console=False, start_time=start_time) return this_query async def get_scouting_dfs(cardset_id: list = None) -> pd.DataFrame: cardset_params = [("cardset_id", x) for x in cardset_id] ratings_params = [ ("team_id", 31), ("ts", "s37136685556r6135248705"), *cardset_params, ] API_CALLS = [ ("battingcardratings", [("vs_hand", "vL"), *ratings_params]), ("battingcardratings", [("vs_hand", "vR"), *ratings_params]), ("cardpositions", cardset_params), ] start_time = log_time( "start", message="Pulling all batting card ratings and positions" ) tasks = [fetch_data(params) for params in API_CALLS] api_data = await asyncio.gather(*tasks) log_time( "end", f"Pulled {api_data[0]['count'] + api_data[1]['count']} batting card ratings and {api_data[2]['count']} positions", start_time=start_time, ) start_time = log_time("start", message="Building base dataframes") vl_vals = api_data[0]["ratings"] for x in vl_vals: x.update(x["battingcard"]) x["player_id"] = x["battingcard"]["player"]["player_id"] x["player_name"] = x["battingcard"]["player"]["p_name"] x["rarity"] = x["battingcard"]["player"]["rarity"]["name"] x["cardset_id"] = x["battingcard"]["player"]["cardset"]["id"] x["cardset_name"] = x["battingcard"]["player"]["cardset"]["name"] del x["battingcard"] del x["player"] vr_vals = api_data[1]["ratings"] for x in vr_vals: x["player_id"] = x["battingcard"]["player"]["player_id"] del x["battingcard"] vl = pd.DataFrame(vl_vals) vr = pd.DataFrame(vr_vals) log_time("end", "Base dataframes are complete", start_time=start_time) start_time = log_time("start", message="Building combined dataframe") bat_df = pd.merge(vl, vr, on="player_id", suffixes=("_vl", "_vr")).set_index( "player_id", drop=False ) log_time("end", "Combined dataframe is complete", start_time=start_time) POSITION_DATA = api_data[2]["positions"] series_list = [] POSITIONS = ["P", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"] start_time = log_time("start", message="Building range series") with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool: get_ranges = partial(build_ranges, POSITION_DATA) ranges = pool.map(get_ranges, POSITIONS) series_list.extend(ranges) log_time("end", f"Processed {len(ranges)} position ranges", start_time=start_time) start_time = log_time("start", message="Building error series") with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool: get_errors = partial(build_errors, POSITION_DATA) errors = pool.map(get_errors, POSITIONS) series_list.extend(errors) log_time("end", f"Processed {len(errors)} position errors", start_time=start_time) start_time = log_time("start", message="Building OF arm series") lf_arms = build_of_arms(POSITION_DATA, "LF") cf_arms = build_of_arms(POSITION_DATA, "CF") rf_arms = build_of_arms(POSITION_DATA, "RF") combined_series = lf_arms.combine(cf_arms, max, fill_value=0) combined_series = combined_series.combine(rf_arms, max, fill_value=0) series_list.extend([combined_series]) log_time("end", f"Processed {len(combined_series)} OF arms", start_time=start_time) start_time = log_time("start", message="Building C arm series") c_arms = build_c_arms(POSITION_DATA, "C") series_list.extend([c_arms]) log_time("end", f"Processed {len(c_arms)} catcher arms", start_time=start_time) start_time = log_time("start", message="Building C PB series") with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool: get_pb = partial(build_c_pb, POSITION_DATA) passed_ball = pool.map(get_pb, ["C"]) series_list.extend(passed_ball) log_time("end", f"Processed {len(passed_ball)} C PB series", start_time=start_time) start_time = log_time("start", message="Building C OT series") with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool: get_throw = partial(build_c_throw, POSITION_DATA) overthrows = pool.map(get_throw, ["C"]) series_list.extend(overthrows) log_time("end", f"Processed {len(overthrows)} C OT series", start_time=start_time) logger.info(f"series_list: {series_list}") return bat_df.join(series_list) async def post_calc_basic(batting_dfs: pd.DataFrame): def get_raw_speed(df_data): speed_raw = df_data["running"] / 20 + df_data["steal_jump"] if df_data["steal_auto"]: speed_raw += 0.5 return speed_raw start_time = log_time("start", "Beginning Speed calcs") raw_series = batting_dfs.apply(get_raw_speed, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Speed"] = round(rank_series * 100) end_time = log_time("end", "Done Speed calcs", start_time=start_time) start_time = log_time("start", "Beginning Stealing calcs") def get_raw_steal(df_data): return ((df_data["steal_high"] / 20) + (df_data["steal_low"] / 20)) * df_data[ "steal_jump" ] raw_series = batting_dfs.apply(get_raw_steal, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Steal"] = round(rank_series * 100) end_time = log_time("end", "Done Stealing calcs", start_time=start_time) start_time = log_time("start", "Beginning Reaction calcs") def get_raw_reaction(df_data): raw_total = 0 for pos_range in [ df_data["Range C"], df_data["Range 1B"], df_data["Range 2B"], df_data["Range 3B"], df_data["Range SS"], df_data["Range LF"], df_data["Range CF"], df_data["Range RF"], ]: if pd.notna(pos_range): raw_total += 10 ** (5 - pos_range) return raw_total raw_series = batting_dfs.apply(get_raw_reaction, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Reaction"] = round(rank_series * 100) end_time = log_time("end", "Done Reaction calcs", start_time=start_time) start_time = log_time("start", "Beginning Arm calcs") def get_raw_arm(df_data): of_arm = None of_pos = None if pd.notna(df_data["Range RF"]): of_pos = "RF" elif pd.notna(df_data["Range CF"]): of_pos = "CF" elif pd.notna(df_data["Range LF"]): of_pos = "LF" if of_pos is not None: if df_data["Arm OF"] < 0: of_raw = df_data["Arm OF"] * -10 else: of_raw = 5 - df_data["Arm OF"] if of_pos == "RF": of_raw = of_raw * 1.5 of_raw += (6 - df_data["Range RF"]) * 4 elif of_pos == "CF": of_raw += (6 - df_data["Range CF"]) * 3 elif of_pos == "LF": of_raw = of_raw / 2 of_raw += (6 - df_data["Range LF"]) * 2 of_arm = of_raw if_arm = None if ( pd.notna(df_data["Range 3B"]) or pd.notna(df_data["Range 2B"]) or pd.notna(df_data["Range 1B"]) or pd.notna(df_data["Range SS"]) ): range_totals = 0 if pd.notna(df_data["Range 3B"]): range_totals += (6 - df_data["Range 3B"]) * 5 if pd.notna(df_data["Range SS"]): range_totals += (6 - df_data["Range SS"]) * 4 if pd.notna(df_data["Range 2B"]): range_totals += (6 - df_data["Range 2B"]) * 3 if pd.notna(df_data["Range 1B"]): range_totals += 6 - df_data["Range 1B"] if_arm = 100 - (50 - range_totals) c_arm = None if pd.notna(df_data["Arm C"]): if df_data["Arm C"] == -5: c_arm = 100 else: temp_arm = ( 20 + ((10 - df_data["Arm C"]) * 3) + (20 - df_data["PB C"]) + (20 - df_data["Throw C"]) - df_data["Error C"] ) c_arm = min(100, temp_arm) if c_arm is not None: return c_arm elif of_arm is not None: return of_arm elif if_arm is not None: return if_arm else: return 1 raw_series = batting_dfs.apply(get_raw_arm, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Arm"] = round(rank_series * 100) end_time = log_time("end", "Done Arm calcs", start_time=start_time) start_time = log_time("start", "Beginning Fielding calcs") def get_raw_fielding(df_data): if_error, of_error, c_error = 0, 0, 0 denom = 0 if ( pd.notna(df_data["Error 3B"]) or pd.notna(df_data["Error 2B"]) or pd.notna(df_data["Error 1B"]) or pd.notna(df_data["Error SS"]) ): raw_if = 100 if pd.notna(df_data["Error 3B"]): raw_if -= df_data["Error 3B"] * 2 if pd.notna(df_data["Error SS"]): raw_if -= df_data["Error SS"] * 0.75 if pd.notna(df_data["Error 2B"]): raw_if -= df_data["Error 2B"] * 1.25 if pd.notna(df_data["Error 1B"]): raw_if -= df_data["Error 1B"] * 2 if_error = max(1, raw_if) denom += 1 if ( pd.notna(df_data["Error LF"]) or pd.notna(df_data["Error CF"]) or pd.notna(df_data["Error RF"]) ): raw_of = 100 if pd.notna(df_data["Error LF"]): raw_of -= df_data["Error LF"] * 2 if pd.notna(df_data["Error CF"]): raw_of -= df_data["Error CF"] * 0.75 if pd.notna(df_data["Error RF"]): raw_of -= df_data["Error RF"] * 1.25 of_error = max(1, raw_of) denom += 1 if pd.notna(df_data["Error C"]): c_error = max( 100 - (df_data["Error C"] * 5) - df_data["Throw C"] - df_data["PB C"], 1 ) denom += 1 return sum([if_error, of_error, c_error]) / max(denom, 1) raw_series = batting_dfs.apply(get_raw_fielding, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Fielding"] = round(rank_series * 100) end_time = log_time("end", "Done Fielding calcs", start_time=start_time) start_time = log_time("start", "Beginning AVG vL calcs") rank_series = batting_dfs["avg_vl"].rank(pct=True) batting_dfs["Contact L"] = round(rank_series * 100) end_time = log_time("end", "Done AVG vL calcs", start_time=start_time) start_time = log_time("start", "Beginning AVG vR calcs") rank_series = batting_dfs["avg_vr"].rank(pct=True) batting_dfs["Contact R"] = round(rank_series * 100) end_time = log_time("end", "Done AVG vR calcs", start_time=start_time) start_time = log_time("start", "Beginning PWR vL calcs") rank_series = batting_dfs["slg_vl"].rank(pct=True) batting_dfs["Power L"] = round(rank_series * 100) end_time = log_time("end", "Done PWR vL calcs", start_time=start_time) start_time = log_time("start", "Beginning PWR vR calcs") rank_series = batting_dfs["slg_vr"].rank(pct=True) batting_dfs["Power R"] = round(rank_series * 100) end_time = log_time("end", "Done PWR vR calcs", start_time=start_time) start_time = log_time("start", "Beginning Vision calcs") def get_raw_vision(df_data): return ( ( ((df_data["obp_vr"] * 0.67) + (df_data["obp_vl"] * 0.33)) - ((df_data["avg_vr"] * 0.67) + (df_data["avg_vl"] * 0.33)) ) * 5 ) - ( ((df_data["strikeout_vl"] * 0.33) + (df_data["strikeout_vr"] * 0.67)) / 208 ) raw_series = batting_dfs.apply(get_raw_vision, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Vision"] = round(rank_series * 100) end_time = log_time("end", "Done Vision calcs", start_time=start_time) start_time = log_time("start", "Beginning Rating calcs") def get_raw_rating(df_data): return ( ((df_data["Reaction"] + df_data["Arm"] + df_data["Fielding"]) * 2) + (df_data["Speed"] + df_data["Steal"]) + ( ( ((df_data["Contact R"] + df_data["Power R"]) * 0.67) + ((df_data["Contact L"] + df_data["Power L"]) * 0.33) + df_data["Vision"] ) * 6 ) ) raw_series = batting_dfs.apply(get_raw_rating, axis=1) rank_series = raw_series.rank(pct=True) batting_dfs["Rating"] = round(rank_series * 100) end_time = log_time("end", "Done Rating calcs", start_time=start_time) start_time = log_time("start", "Beginning write to file") output = batting_dfs[ [ "player_id", "player_name", "Rating", "Contact R", "Contact L", "Power R", "Power L", "Vision", "Speed", "Steal", "Reaction", "Arm", "Fielding", "hand", "cardset_name", ] ] csv_file = pd.DataFrame(output).to_csv(index=False) with open("scouting/batting-basic.csv", "w") as file: file.write(csv_file) log_time("end", "Done writing to file", start_time=start_time) async def post_calc_ratings(batting_dfs: pd.DataFrame): start_time = log_time("start", "Beginning Ratings filtering") output = batting_dfs first = ["player_id", "player_name", "cardset_name", "rarity", "hand", "variant"] exclude = first + ["id_vl", "id_vr", "vs_hand_vl", "vs_hand_vr"] output = output[first + [col for col in output.columns if col not in exclude]] log_time("end", "Done filtering ratings", start_time=start_time) start_time = log_time("start", "Beginning write to file") csv_file = pd.DataFrame(output).to_csv(index=False) with open("scouting/batting-ratings.csv", "w") as file: file.write(csv_file) log_time("end", "Done writing to file", start_time=start_time) async def main(): start_time = log_time("start", "Pulling scouting data") overall_start_time = start_time batting_dfs = await get_scouting_dfs([]) print(f"Received {batting_dfs} rows") log_time("end", "Pulled scouting data", start_time=start_time) start_time = log_time("start", "Beginning basic scouting") await post_calc_basic(copy.deepcopy(batting_dfs)) log_time("end", "Completed basic scouting", start_time=start_time) start_time = log_time("start", "Beginning ratings guide") await post_calc_ratings(copy.deepcopy(batting_dfs)) log_time("end", "Completed ratings guide", start_time=start_time) log_time( "end", "Total batter scouting", print_to_console=False, start_time=overall_start_time, ) print("All done with batters!") if __name__ == "__main__": asyncio.run(main())