paper-dynasty-card-creation/scouting_batters.py
Cal Corum 43aff3568f fix: increase API timeouts to prevent bulk query failures
db_calls.py default timeouts raised from 3s to 30s across all methods
(db_get, url_get, db_patch, db_post, db_put). scouting_batters.py
fetch_data now passes timeout=120 for large card rating queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 07:53:58 -05:00

529 lines
18 KiB
Python

import asyncio
import copy
import datetime
from functools import partial
import multiprocessing
from db_calls import db_get
from exceptions import logger, log_exception
from typing import Literal
import pandas as pd
def log_time(
which: Literal["start", "end"],
message: str = "",
print_to_console: bool = True,
start_time: datetime.datetime = None,
):
if print_to_console and len(message) == 0:
log_exception(
KeyError, "A message must be included when print_to_console equals True"
)
if which == "start":
logger.info(f"starting timer - {message}")
if print_to_console:
print(message)
return datetime.datetime.now()
elif start_time is not None:
logger.info(
f"ending timer - {message}: {(datetime.datetime.now() - start_time).total_seconds():.2f}s\n"
)
if print_to_console:
print(f"{message}\n")
return
else:
log_exception(
KeyError, "start_time must be passed to log_time() when which equals 'end'"
)
def build_series(label: str, code: str, pos_code: str, all_positions):
logger.info(f"Building {label} series for {pos_code}")
return pd.Series(
dict(
[
(x["player"]["player_id"], x[code])
for x in all_positions
if x["position"] == pos_code
]
),
name=f"{label} {pos_code}",
)
def build_ranges(all_positions, pos_code):
return build_series("Range", "range", pos_code, all_positions)
def build_errors(all_positions, pos_code):
x = build_series("Error", "error", pos_code, all_positions)
logger.info(f"error ratings:\n{x}")
return x
def build_of_arms(all_positions, pos_code):
logger.info(f"Building OF series for {pos_code}")
return pd.Series(
dict(
[
(x["player"]["player_id"], x["arm"])
for x in all_positions
if x["position"] == pos_code
]
),
name="Arm OF",
)
def build_c_arms(all_positions, pos_code):
x = build_series("Arm", "arm", pos_code, all_positions)
logger.info(f"arm ratings:\n{x}")
return x
def build_c_pb(all_positions, pos_code):
return build_series("PB", "pb", pos_code, all_positions)
def build_c_throw(all_positions, pos_code):
return build_series("Throw", "overthrow", pos_code, all_positions)
async def fetch_data(data):
start_time = log_time("start", print_to_console=False)
this_query = await db_get(endpoint=data[0], params=data[1], timeout=120)
log_time("end", print_to_console=False, start_time=start_time)
return this_query
async def get_scouting_dfs(cardset_id: list = None) -> pd.DataFrame:
cardset_params = [("cardset_id", x) for x in cardset_id]
ratings_params = [
("team_id", 31),
("ts", "s37136685556r6135248705"),
*cardset_params,
]
API_CALLS = [
("battingcardratings", [("vs_hand", "vL"), *ratings_params]),
("battingcardratings", [("vs_hand", "vR"), *ratings_params]),
("cardpositions", cardset_params),
]
start_time = log_time(
"start", message="Pulling all batting card ratings and positions"
)
tasks = [fetch_data(params) for params in API_CALLS]
api_data = await asyncio.gather(*tasks)
log_time(
"end",
f"Pulled {api_data[0]['count'] + api_data[1]['count']} batting card ratings and {api_data[2]['count']} positions",
start_time=start_time,
)
start_time = log_time("start", message="Building base dataframes")
vl_vals = api_data[0]["ratings"]
for x in vl_vals:
x.update(x["battingcard"])
x["player_id"] = x["battingcard"]["player"]["player_id"]
x["player_name"] = x["battingcard"]["player"]["p_name"]
x["rarity"] = x["battingcard"]["player"]["rarity"]["name"]
x["cardset_id"] = x["battingcard"]["player"]["cardset"]["id"]
x["cardset_name"] = x["battingcard"]["player"]["cardset"]["name"]
del x["battingcard"]
del x["player"]
vr_vals = api_data[1]["ratings"]
for x in vr_vals:
x["player_id"] = x["battingcard"]["player"]["player_id"]
del x["battingcard"]
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
log_time("end", "Base dataframes are complete", start_time=start_time)
start_time = log_time("start", message="Building combined dataframe")
bat_df = pd.merge(vl, vr, on="player_id", suffixes=("_vl", "_vr")).set_index(
"player_id", drop=False
)
log_time("end", "Combined dataframe is complete", start_time=start_time)
POSITION_DATA = api_data[2]["positions"]
series_list = []
POSITIONS = ["P", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"]
start_time = log_time("start", message="Building range series")
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_ranges = partial(build_ranges, POSITION_DATA)
ranges = pool.map(get_ranges, POSITIONS)
series_list.extend(ranges)
log_time("end", f"Processed {len(ranges)} position ranges", start_time=start_time)
start_time = log_time("start", message="Building error series")
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_errors = partial(build_errors, POSITION_DATA)
errors = pool.map(get_errors, POSITIONS)
series_list.extend(errors)
log_time("end", f"Processed {len(errors)} position errors", start_time=start_time)
start_time = log_time("start", message="Building OF arm series")
lf_arms = build_of_arms(POSITION_DATA, "LF")
cf_arms = build_of_arms(POSITION_DATA, "CF")
rf_arms = build_of_arms(POSITION_DATA, "RF")
combined_series = lf_arms.combine(cf_arms, max, fill_value=0)
combined_series = combined_series.combine(rf_arms, max, fill_value=0)
series_list.extend([combined_series])
log_time("end", f"Processed {len(combined_series)} OF arms", start_time=start_time)
start_time = log_time("start", message="Building C arm series")
c_arms = build_c_arms(POSITION_DATA, "C")
series_list.extend([c_arms])
log_time("end", f"Processed {len(c_arms)} catcher arms", start_time=start_time)
start_time = log_time("start", message="Building C PB series")
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_pb = partial(build_c_pb, POSITION_DATA)
passed_ball = pool.map(get_pb, ["C"])
series_list.extend(passed_ball)
log_time("end", f"Processed {len(passed_ball)} C PB series", start_time=start_time)
start_time = log_time("start", message="Building C OT series")
with multiprocessing.Pool(processes=min(8, multiprocessing.cpu_count())) as pool:
get_throw = partial(build_c_throw, POSITION_DATA)
overthrows = pool.map(get_throw, ["C"])
series_list.extend(overthrows)
log_time("end", f"Processed {len(overthrows)} C OT series", start_time=start_time)
logger.info(f"series_list: {series_list}")
return bat_df.join(series_list)
async def post_calc_basic(batting_dfs: pd.DataFrame):
def get_raw_speed(df_data):
speed_raw = df_data["running"] / 20 + df_data["steal_jump"]
if df_data["steal_auto"]:
speed_raw += 0.5
return speed_raw
start_time = log_time("start", "Beginning Speed calcs")
raw_series = batting_dfs.apply(get_raw_speed, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Speed"] = round(rank_series * 100)
end_time = log_time("end", "Done Speed calcs", start_time=start_time)
start_time = log_time("start", "Beginning Stealing calcs")
def get_raw_steal(df_data):
return ((df_data["steal_high"] / 20) + (df_data["steal_low"] / 20)) * df_data[
"steal_jump"
]
raw_series = batting_dfs.apply(get_raw_steal, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Steal"] = round(rank_series * 100)
end_time = log_time("end", "Done Stealing calcs", start_time=start_time)
start_time = log_time("start", "Beginning Reaction calcs")
def get_raw_reaction(df_data):
raw_total = 0
for pos_range in [
df_data["Range C"],
df_data["Range 1B"],
df_data["Range 2B"],
df_data["Range 3B"],
df_data["Range SS"],
df_data["Range LF"],
df_data["Range CF"],
df_data["Range RF"],
]:
if pd.notna(pos_range):
raw_total += 10 ** (5 - pos_range)
return raw_total
raw_series = batting_dfs.apply(get_raw_reaction, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Reaction"] = round(rank_series * 100)
end_time = log_time("end", "Done Reaction calcs", start_time=start_time)
start_time = log_time("start", "Beginning Arm calcs")
def get_raw_arm(df_data):
of_arm = None
of_pos = None
if pd.notna(df_data["Range RF"]):
of_pos = "RF"
elif pd.notna(df_data["Range CF"]):
of_pos = "CF"
elif pd.notna(df_data["Range LF"]):
of_pos = "LF"
if of_pos is not None:
if df_data["Arm OF"] < 0:
of_raw = df_data["Arm OF"] * -10
else:
of_raw = 5 - df_data["Arm OF"]
if of_pos == "RF":
of_raw = of_raw * 1.5
of_raw += (6 - df_data["Range RF"]) * 4
elif of_pos == "CF":
of_raw += (6 - df_data["Range CF"]) * 3
elif of_pos == "LF":
of_raw = of_raw / 2
of_raw += (6 - df_data["Range LF"]) * 2
of_arm = of_raw
if_arm = None
if (
pd.notna(df_data["Range 3B"])
or pd.notna(df_data["Range 2B"])
or pd.notna(df_data["Range 1B"])
or pd.notna(df_data["Range SS"])
):
range_totals = 0
if pd.notna(df_data["Range 3B"]):
range_totals += (6 - df_data["Range 3B"]) * 5
if pd.notna(df_data["Range SS"]):
range_totals += (6 - df_data["Range SS"]) * 4
if pd.notna(df_data["Range 2B"]):
range_totals += (6 - df_data["Range 2B"]) * 3
if pd.notna(df_data["Range 1B"]):
range_totals += 6 - df_data["Range 1B"]
if_arm = 100 - (50 - range_totals)
c_arm = None
if pd.notna(df_data["Arm C"]):
if df_data["Arm C"] == -5:
c_arm = 100
else:
temp_arm = (
20
+ ((10 - df_data["Arm C"]) * 3)
+ (20 - df_data["PB C"])
+ (20 - df_data["Throw C"])
- df_data["Error C"]
)
c_arm = min(100, temp_arm)
if c_arm is not None:
return c_arm
elif of_arm is not None:
return of_arm
elif if_arm is not None:
return if_arm
else:
return 1
raw_series = batting_dfs.apply(get_raw_arm, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Arm"] = round(rank_series * 100)
end_time = log_time("end", "Done Arm calcs", start_time=start_time)
start_time = log_time("start", "Beginning Fielding calcs")
def get_raw_fielding(df_data):
if_error, of_error, c_error = 0, 0, 0
denom = 0
if (
pd.notna(df_data["Error 3B"])
or pd.notna(df_data["Error 2B"])
or pd.notna(df_data["Error 1B"])
or pd.notna(df_data["Error SS"])
):
raw_if = 100
if pd.notna(df_data["Error 3B"]):
raw_if -= df_data["Error 3B"] * 2
if pd.notna(df_data["Error SS"]):
raw_if -= df_data["Error SS"] * 0.75
if pd.notna(df_data["Error 2B"]):
raw_if -= df_data["Error 2B"] * 1.25
if pd.notna(df_data["Error 1B"]):
raw_if -= df_data["Error 1B"] * 2
if_error = max(1, raw_if)
denom += 1
if (
pd.notna(df_data["Error LF"])
or pd.notna(df_data["Error CF"])
or pd.notna(df_data["Error RF"])
):
raw_of = 100
if pd.notna(df_data["Error LF"]):
raw_of -= df_data["Error LF"] * 2
if pd.notna(df_data["Error CF"]):
raw_of -= df_data["Error CF"] * 0.75
if pd.notna(df_data["Error RF"]):
raw_of -= df_data["Error RF"] * 1.25
of_error = max(1, raw_of)
denom += 1
if pd.notna(df_data["Error C"]):
c_error = max(
100 - (df_data["Error C"] * 5) - df_data["Throw C"] - df_data["PB C"], 1
)
denom += 1
return sum([if_error, of_error, c_error]) / max(denom, 1)
raw_series = batting_dfs.apply(get_raw_fielding, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Fielding"] = round(rank_series * 100)
end_time = log_time("end", "Done Fielding calcs", start_time=start_time)
start_time = log_time("start", "Beginning AVG vL calcs")
rank_series = batting_dfs["avg_vl"].rank(pct=True)
batting_dfs["Contact L"] = round(rank_series * 100)
end_time = log_time("end", "Done AVG vL calcs", start_time=start_time)
start_time = log_time("start", "Beginning AVG vR calcs")
rank_series = batting_dfs["avg_vr"].rank(pct=True)
batting_dfs["Contact R"] = round(rank_series * 100)
end_time = log_time("end", "Done AVG vR calcs", start_time=start_time)
start_time = log_time("start", "Beginning PWR vL calcs")
rank_series = batting_dfs["slg_vl"].rank(pct=True)
batting_dfs["Power L"] = round(rank_series * 100)
end_time = log_time("end", "Done PWR vL calcs", start_time=start_time)
start_time = log_time("start", "Beginning PWR vR calcs")
rank_series = batting_dfs["slg_vr"].rank(pct=True)
batting_dfs["Power R"] = round(rank_series * 100)
end_time = log_time("end", "Done PWR vR calcs", start_time=start_time)
start_time = log_time("start", "Beginning Vision calcs")
def get_raw_vision(df_data):
return (
(
((df_data["obp_vr"] * 0.67) + (df_data["obp_vl"] * 0.33))
- ((df_data["avg_vr"] * 0.67) + (df_data["avg_vl"] * 0.33))
)
* 5
) - (
((df_data["strikeout_vl"] * 0.33) + (df_data["strikeout_vr"] * 0.67)) / 208
)
raw_series = batting_dfs.apply(get_raw_vision, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Vision"] = round(rank_series * 100)
end_time = log_time("end", "Done Vision calcs", start_time=start_time)
start_time = log_time("start", "Beginning Rating calcs")
def get_raw_rating(df_data):
return (
((df_data["Reaction"] + df_data["Arm"] + df_data["Fielding"]) * 2)
+ (df_data["Speed"] + df_data["Steal"])
+ (
(
((df_data["Contact R"] + df_data["Power R"]) * 0.67)
+ ((df_data["Contact L"] + df_data["Power L"]) * 0.33)
+ df_data["Vision"]
)
* 6
)
)
raw_series = batting_dfs.apply(get_raw_rating, axis=1)
rank_series = raw_series.rank(pct=True)
batting_dfs["Rating"] = round(rank_series * 100)
end_time = log_time("end", "Done Rating calcs", start_time=start_time)
start_time = log_time("start", "Beginning write to file")
output = batting_dfs[
[
"player_id",
"player_name",
"Rating",
"Contact R",
"Contact L",
"Power R",
"Power L",
"Vision",
"Speed",
"Steal",
"Reaction",
"Arm",
"Fielding",
"hand",
"cardset_name",
]
]
csv_file = pd.DataFrame(output).to_csv(index=False)
with open("scouting/batting-basic.csv", "w") as file:
file.write(csv_file)
log_time("end", "Done writing to file", start_time=start_time)
async def post_calc_ratings(batting_dfs: pd.DataFrame):
start_time = log_time("start", "Beginning Ratings filtering")
output = batting_dfs
first = ["player_id", "player_name", "cardset_name", "rarity", "hand", "variant"]
exclude = first + ["id_vl", "id_vr", "vs_hand_vl", "vs_hand_vr"]
output = output[first + [col for col in output.columns if col not in exclude]]
log_time("end", "Done filtering ratings", start_time=start_time)
start_time = log_time("start", "Beginning write to file")
csv_file = pd.DataFrame(output).to_csv(index=False)
with open("scouting/batting-ratings.csv", "w") as file:
file.write(csv_file)
log_time("end", "Done writing to file", start_time=start_time)
async def main():
start_time = log_time("start", "Pulling scouting data")
overall_start_time = start_time
batting_dfs = await get_scouting_dfs([])
print(f"Received {batting_dfs} rows")
log_time("end", "Pulled scouting data", start_time=start_time)
start_time = log_time("start", "Beginning basic scouting")
await post_calc_basic(copy.deepcopy(batting_dfs))
log_time("end", "Completed basic scouting", start_time=start_time)
start_time = log_time("start", "Beginning ratings guide")
await post_calc_ratings(copy.deepcopy(batting_dfs))
log_time("end", "Completed ratings guide", start_time=start_time)
log_time(
"end",
"Total batter scouting",
print_to_console=False,
start_time=overall_start_time,
)
print("All done with batters!")
if __name__ == "__main__":
asyncio.run(main())