paper-dynasty-card-creation/pitchers/creation.py
Cal Corum 1d96223c78 fix: use player_id instead of key_bbref in create_pit_position() (#7)
Closes #7

The fallback branch of create_pit_position() used `int(df_data["key_bbref"])`
which always raises ValueError for string IDs like 'verlaju01'. The exception
was silently swallowed, causing pitchers without defensive stats to receive no
position record at all.

Fix: use `int(float(df_data["player_id"]))` to match the pattern used in
create_pitching_card() on the same file.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 23:03:43 -05:00

635 lines
24 KiB
Python

import datetime
import urllib.parse
import pandas as pd
from typing import Dict
from creation_helpers import (
get_all_pybaseball_ids,
sanitize_name,
CLUB_LIST,
FRANCHISE_LIST,
pd_players_df,
mlbteam_and_franchise,
NEW_PLAYER_COST,
RARITY_BASE_COSTS,
should_update_player_description,
calculate_rarity_cost_adjustment,
DEFAULT_STARTER_OPS,
DEFAULT_RELIEVER_OPS,
)
from db_calls import db_post, db_get, db_put, db_patch
from defenders import calcs_defense as cde
from . import calcs_pitcher as cpi
from exceptions import logger
from rarity_thresholds import get_pitcher_thresholds
def get_pitching_stats(
file_path: str = None,
start_date: datetime.datetime = None,
end_date: datetime.datetime = None,
ignore_limits: bool = False,
):
print("Reading pitching stats...")
min_vl = 20 if not ignore_limits else 1
min_vr = 40 if not ignore_limits else 1
if file_path is not None:
vl_basic = pd.read_csv(f"{file_path}vlhh-basic.csv").query(f"TBF >= {min_vl}")
vr_basic = pd.read_csv(f"{file_path}vrhh-basic.csv").query(f"TBF >= {min_vr}")
total_basic = pd.merge(
vl_basic, vr_basic, on="playerId", suffixes=("_vL", "_vR")
)
vl_rate = pd.read_csv(f"{file_path}vlhh-rate.csv").query(f"TBF >= {min_vl}")
vr_rate = pd.read_csv(f"{file_path}vrhh-rate.csv").query(f"TBF >= {min_vr}")
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=("_vL", "_vR"))
return pd.merge(total_basic, total_rate, on="playerId", suffixes=("", "_rate"))
else:
raise LookupError(
"Date-based stat pulls not implemented, yet. Please provide batting csv files."
)
# vrb_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=6&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=1' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vrr_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=6&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=3' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vlb_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=5&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=1' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
# vlr_url = f'https://www.fangraphs.com/leaders/splits-leaderboards?splitArr=5&splitArrPitch=&position=P' \
# f'&autoPt=false&splitTeams=false&statType=player&statgroup=3' \
# f'&startDate={start_date.year}-{start_date.month}-{start_date.day}' \
# f'&endDate={end_date.year}-{end_date.month}-{end_date.day}' \
# f'&players=&filter=&groupBy=season&sort=4,1&wxTemperature=&wxPressure=&wxAirDensity=' \
# f'&wxElevation=&wxWindSpeed='
#
# soup = BeautifulSoup(requests.get(vrb_url).text, 'html.parser')
# time.sleep(3)
# table = soup.find('a', {'class': 'data-export'})
async def pd_pitchingcards_df(cardset_id: int):
bc_query = await db_get(
"pitchingcards", params=[("cardset_id", cardset_id), ("short_output", True)]
)
if bc_query["count"] == 0:
raise ValueError("No pitching cards returned from Paper Dynasty API")
return pd.DataFrame(bc_query["cards"]).rename(
columns={"id": "pitchingcard_id", "player": "player_id"}
)
async def pd_pitchingcardratings_df(
cardset_id: int, season: int, pitching_cards: pd.DataFrame = None
):
vl_query = await db_get(
"pitchingcardratings",
params=[("cardset_id", cardset_id), ("vs_hand", "L"), ("short_output", True)],
)
vr_query = await db_get(
"pitchingcardratings",
params=[("cardset_id", cardset_id), ("vs_hand", "R"), ("short_output", True)],
)
if 0 in [vl_query["count"], vr_query["count"]]:
raise ValueError("No pitching card ratings returned from Paper Dynasty API")
vl = pd.DataFrame(vl_query["ratings"])
vr = pd.DataFrame(vr_query["ratings"])
ratings = pd.merge(vl, vr, on="pitchingcard", suffixes=("_vL", "_vR")).rename(
columns={"pitchingcard": "pitchingcard_id"}
)
def get_total_ops(df_data):
ops_vl = df_data["obp_vL"] + df_data["slg_vL"]
ops_vr = df_data["obp_vR"] + df_data["slg_vR"]
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
ratings["total_OPS"] = ratings.apply(get_total_ops, axis=1)
# Get season-appropriate rarity thresholds
thresholds = get_pitcher_thresholds(season)
# Need starter_rating to determine rarity - merge with pitching cards if provided
if pitching_cards is not None:
ratings = pd.merge(
ratings,
pitching_cards[["pitchingcard_id", "starter_rating"]],
on="pitchingcard_id",
how="left",
)
def new_rarity_id(df_data):
if pd.isna(df_data.get("starter_rating")):
return 5 # Default to Common if no starter rating
if df_data["starter_rating"] > 3:
return thresholds.get_rarity_for_starter(df_data["total_OPS"])
else:
return thresholds.get_rarity_for_reliever(df_data["total_OPS"])
ratings["new_rarity_id"] = ratings.apply(new_rarity_id, axis=1)
# Drop starter_rating as it will be re-merged from pitching_cards in post_player_updates
ratings = ratings.drop(columns=["starter_rating"])
return ratings
def match_player_lines(
all_pitching: pd.DataFrame,
all_players: pd.DataFrame,
df_p: pd.DataFrame,
is_custom: bool = False,
):
def get_pids(df_data):
return get_all_pybaseball_ids(
[df_data["playerId"]], "fangraphs", is_custom, df_data["Name_vL"]
)
print("Now pulling mlbam player IDs...")
ids_and_names = all_pitching.apply(get_pids, axis=1)
player_data = (
ids_and_names.merge(
all_players, how="left", left_on="key_bbref", right_on="bbref_id"
)
.query("key_mlbam == key_mlbam")
.set_index("key_bbref", drop=False)
)
print("Matched mlbam to pd players.")
step_pitching = pd.merge(
player_data,
all_pitching,
left_on="key_fangraphs",
right_on="playerId",
sort=False,
).set_index("key_bbref", drop=False)
final_pitching = step_pitching.join(df_p, rsuffix="_r")
return final_pitching
async def create_new_players(
final_pitching: pd.DataFrame,
cardset: dict,
card_base_url: str,
release_dir: str,
player_desc: str,
):
new_players = []
new_mlbplayers = {}
def create_pitchers(df_data):
f_name = sanitize_name(df_data["name_first"]).title()
l_name = sanitize_name(df_data["name_last"]).title()
new_players.append(
{
"p_name": f"{f_name} {l_name}",
"cost": NEW_PLAYER_COST,
"image": f"{card_base_url}/{df_data['player_id']}/"
f"pitchingcard{urllib.parse.quote('?d=')}{release_dir}",
"mlbclub": CLUB_LIST[df_data["Tm_vL"]],
"franchise": FRANCHISE_LIST[df_data["Tm_vL"]],
"cardset_id": cardset["id"],
"set_num": int(float(df_data["key_fangraphs"])),
"rarity_id": 99,
"pos_1": "P",
"description": f"{player_desc}",
"bbref_id": df_data.name,
"fangr_id": int(float(df_data["key_fangraphs"])),
"strat_code": int(float(df_data["key_mlbam"])),
}
)
new_mlbplayers[df_data.name] = {
"first_name": sanitize_name(df_data["name_first"]).title(),
"last_name": sanitize_name(df_data["name_last"]).title(),
"key_mlbam": int(float(df_data["key_mlbam"])),
"key_fangraphs": int(float(df_data["key_fangraphs"])),
"key_bbref": df_data["key_bbref"],
"key_retro": df_data["key_retro"],
}
final_pitching[final_pitching["player_id"].isnull()].apply(create_pitchers, axis=1)
print(f"Creating {len(new_players)} new players...")
for x in new_players:
mlb_query = await db_get("mlbplayers", params=[("key_bbref", x["bbref_id"])])
if mlb_query["count"] > 0:
x["mlbplayer_id"] = mlb_query["players"][0]["id"]
else:
new_mlb = await db_post(
"mlbplayers/one", payload=new_mlbplayers[x["bbref_id"]]
)
x["mlbplayer_id"] = new_mlb["id"]
this_player = await db_post("players", payload=x)
final_pitching.at[x["bbref_id"], "player_id"] = this_player["player_id"]
final_pitching.at[x["bbref_id"], "p_name"] = this_player["p_name"]
print(
f"Player IDs linked to pitching stats.\n{len(final_pitching.values)} players remain\n"
)
return len(new_players)
def get_stat_df(input_path: str, final_pitching: pd.DataFrame):
def get_hand(df_data):
if df_data["Name"][-1] == "*":
return "L"
else:
return "R"
print("Reading pitching peripheral stats...")
pit_data = (
pd.read_csv(f"{input_path}pitching.csv")
.drop_duplicates(subset=["Name-additional"], keep="first")
.set_index("Name-additional")
)
pit_data["pitch_hand"] = pit_data.apply(get_hand, axis=1)
pitching_stats = final_pitching.join(pit_data, lsuffix="_l")
print(f"Stats are tallied\n{len(pitching_stats.values)} players remain\n")
return pitching_stats
async def calculate_pitching_cards(
pitching_stats: pd.DataFrame, cardset: dict, season_pct: float, post_pitchers: bool
):
pitching_cards = []
def create_pitching_card(df_data):
logger.info(
f"Creating pitching card for {df_data['name_first']} {df_data['name_last']} / fg ID: {df_data['key_fangraphs']}"
)
pow_data = cde.pow_ratings(
float(df_data["Inn_def"]), df_data["GS"], df_data["G"]
)
try:
pitching_cards.append(
{
"player_id": int(float(df_data["player_id"])),
"key_bbref": df_data.name,
"key_fangraphs": int(float(df_data["key_fangraphs"])),
"key_mlbam": int(float(df_data["key_mlbam"])),
"key_retro": df_data["key_retro"],
"name_first": df_data["name_first"].title(),
"name_last": df_data["name_last"].title(),
"balk": cpi.balks(df_data["BK"], df_data["IP"], season_pct),
"wild_pitch": cpi.wild_pitches(
df_data["WP"], df_data["IP"], season_pct
),
"hold": cde.hold_pitcher(
df_data["caught_stealing_perc"],
int(df_data["pickoffs"]),
season_pct,
),
"starter_rating": pow_data[0],
"relief_rating": pow_data[1],
"closer_rating": cpi.closer_rating(
int(df_data["GF"]), int(df_data["SV"]), int(df_data["G"])
),
"hand": df_data["pitch_hand"],
"batting": f"#1W{df_data['pitch_hand']}-C",
}
)
except Exception as e:
logger.error(f"Skipping fg ID {df_data['key_fangraphs']} due to: {e}")
print("Calculating pitching cards...")
pitching_stats.apply(create_pitching_card, axis=1)
print("Cards are complete.\n\nPosting cards now...")
if post_pitchers:
resp = await db_put(
"pitchingcards", payload={"cards": pitching_cards}, timeout=30
)
print(
f"Response: {resp}\n\nMatching pitching card database IDs to player stats..."
)
pc_df = await pd_pitchingcards_df(cardset["id"])
pitching_stats = pitching_stats.merge(pc_df, how="left", on="player_id").set_index(
"key_bbref", drop=False
)
return pitching_stats
async def create_position(
season_pct: float,
pitching_stats: pd.DataFrame,
post_pitchers: bool,
df_p: pd.DataFrame,
):
pit_positions = []
def create_pit_position(df_data):
if df_data["key_bbref"] in df_p.index:
logger.debug(f"Running P stats for {df_data['p_name']}")
pit_positions.append(
{
"player_id": int(df_data["player_id"]),
"position": "P",
"innings": float(df_p.at[df_data["key_bbref"], "Inn_def"]),
"range": cde.range_pitcher(
rs_value=int(df_p.at[df_data["key_bbref"], "bis_runs_total"]),
season_pct=season_pct,
),
"error": cde.get_any_error(
pos_code="p",
errors=int(df_p.at[df_data["key_bbref"], "E_def"]),
chances=int(df_p.at[df_data["key_bbref"], "chances"]),
season_pct=season_pct,
),
}
)
else:
try:
pit_positions.append(
{
"player_id": int(float(df_data["player_id"])),
"position": "P",
"innings": 1,
"range": 5,
"error": 51,
}
)
except Exception:
logger.error(
f"Could not create pitcher position for {df_data['key_bbref']}"
)
print("Calculating pitcher fielding lines now...")
pitching_stats.apply(create_pit_position, axis=1)
print("Fielding is complete.\n\nPosting positions now...")
if post_pitchers:
resp = await db_put(
"cardpositions", payload={"positions": pit_positions}, timeout=30
)
print(f"Response: {resp}\n")
async def calculate_pitcher_ratings(pitching_stats: pd.DataFrame, post_pitchers: bool):
pitching_ratings = []
def create_pitching_card_ratings(df_data):
logger.info(f"Calculating pitching card ratings for {df_data.name}")
try:
pitching_ratings.extend(cpi.get_pitcher_ratings(df_data))
except Exception:
logger.error(
f"Could not create a pitching card for {df_data['key_fangraphs']}"
)
print("Calculating card ratings...")
pitching_stats.apply(create_pitching_card_ratings, axis=1)
print("Ratings are complete\n\nPosting ratings now...")
if post_pitchers:
resp = await db_put(
"pitchingcardratings", payload={"ratings": pitching_ratings}, timeout=30
)
print(f"Response: {resp}\n\nPulling all positions to set player positions...")
async def post_player_updates(
cardset: Dict[str, any],
player_description: str,
card_base_url: str,
release_dir: str,
is_liveseries: bool,
post_players: bool,
season: int,
) -> int:
p_data = await pd_players_df(cardset["id"])
p_data.set_index("player_id", drop=False)
# Use LEFT JOIN to keep all pitchers, even those without ratings
pitching_cards = await pd_pitchingcards_df(cardset["id"])
pitching_ratings = await pd_pitchingcardratings_df(
cardset["id"], season, pitching_cards
)
total_ratings = pd.merge(
pitching_cards,
pitching_ratings,
on="pitchingcard_id",
how="left", # Keep all pitching cards
)
# Assign default rarity (Common/5) for pitchers without ratings
if "new_rarity_id" not in total_ratings.columns:
total_ratings["new_rarity_id"] = 5
elif total_ratings["new_rarity_id"].isna().any():
total_ratings["new_rarity_id"] = total_ratings["new_rarity_id"].fillna(5)
# Assign default total_OPS for pitchers without ratings (Common reliever default)
if "total_OPS" in total_ratings.columns:
missing_ops = total_ratings[total_ratings["total_OPS"].isna()]
if not missing_ops.empty:
logger.warning(
f"pitchers.creation.post_player_updates - {len(missing_ops)} pitchers missing total_OPS, assigning default 0.702: {missing_ops[['player_id', 'pitchingcard_id']].to_dict('records')}"
)
total_ratings["total_OPS"] = total_ratings["total_OPS"].fillna(0.702)
player_data = pd.merge(p_data, total_ratings, on="player_id").set_index(
"player_id", drop=False
)
del total_ratings
# p_query = await db_get('mlbplayers')
# mlb_players = pd.DataFrame(p_query['players'])
def get_pids(df_data):
# if df_data['key_mlbam'] in
return get_all_pybaseball_ids([df_data["bbref_id"]], "bbref")
ids_and_names = player_data.apply(get_pids, axis=1)
player_data = (
ids_and_names.merge(
player_data, how="left", left_on="key_bbref", right_on="bbref_id"
)
.query("key_mlbam == key_mlbam")
.set_index("key_bbref", drop=False)
)
player_updates = {} # { <player_id> : [ (param pairs) ] }
sp_rarity_group = player_data.query(
"rarity == new_rarity_id and starter_rating >= 4"
).groupby("rarity")
sp_average_ops = sp_rarity_group["total_OPS"].mean().to_dict()
rp_rarity_group = player_data.query(
"rarity == new_rarity_id and starter_rating < 4"
).groupby("rarity")
rp_average_ops = rp_rarity_group["total_OPS"].mean().to_dict()
# Fill in missing rarity averages with defaults
for rarity, default_ops in DEFAULT_STARTER_OPS.items():
if rarity not in sp_average_ops:
sp_average_ops[rarity] = default_ops
for rarity, default_ops in DEFAULT_RELIEVER_OPS.items():
if rarity not in rp_average_ops:
rp_average_ops[rarity] = default_ops
def get_player_updates(df_data):
def avg_ops(rarity_id, starter_rating):
if starter_rating >= 4:
return sp_average_ops[rarity_id]
else:
return rp_average_ops[rarity_id]
params = []
# Check if description should be updated using extracted business logic
if should_update_player_description(
cardset_name=cardset["name"],
player_cost=df_data["cost"],
current_description=df_data["description"],
new_description=player_description,
):
params = [("description", f"{player_description}")]
logger.debug(
f"pitchers.creation.post_player_updates - Setting description for player_id={df_data['player_id']}: "
f"'{df_data['description']}' -> '{player_description}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
else:
logger.debug(
f"pitchers.creation.post_player_updates - Skipping description update for player_id={df_data['player_id']}: "
f"current='{df_data['description']}', proposed='{player_description}' (cost={df_data['cost']}, cardset={cardset['name']})"
)
if is_liveseries:
team_data = mlbteam_and_franchise(int(float(df_data["key_mlbam"])))
if (
df_data["mlbclub"] != team_data["mlbclub"]
and team_data["mlbclub"] is not None
):
params.extend([("mlbclub", team_data["mlbclub"])])
if (
df_data["franchise"] != team_data["franchise"]
and team_data["franchise"] is not None
):
params.extend([("franchise", team_data["franchise"])])
# if release_directory not in df_data['image']:
params.extend(
[
(
"image",
f"{card_base_url}/{df_data['player_id']}/pitchingcard"
f"{urllib.parse.quote('?d=')}{release_dir}",
)
]
)
if df_data["cost"] == NEW_PLAYER_COST:
params.extend(
[
(
"cost",
round(
RARITY_BASE_COSTS[df_data["new_rarity_id"]]
* df_data["total_OPS"]
/ avg_ops(
df_data["new_rarity_id"], df_data["starter_rating"]
)
),
),
("rarity_id", df_data["new_rarity_id"]),
]
)
elif df_data["rarity"] != df_data["new_rarity_id"]:
# Calculate adjusted cost for rarity change using lookup table
new_cost = calculate_rarity_cost_adjustment(
old_rarity=df_data["rarity"],
new_rarity=df_data["new_rarity_id"],
old_cost=df_data["cost"],
)
params.extend([("cost", new_cost), ("rarity_id", df_data["new_rarity_id"])])
if len(params) > 0:
if df_data.player_id not in player_updates.keys():
player_updates[df_data.player_id] = params
else:
player_updates[df_data.player_id].extend(params)
player_data.apply(get_player_updates, axis=1)
print(f"Sending {len(player_updates)} player updates to PD database...")
if post_players:
for x in player_updates:
await db_patch("players", object_id=x, params=player_updates[x])
return len(player_updates)
async def run_pitchers(
cardset: dict,
input_path: str,
card_base_url: str,
season: int,
release_directory: str,
player_description: str,
season_pct: float,
post_players: bool,
post_pitchers: bool,
is_liveseries: bool,
ignore_limits: bool,
pull_fielding: bool = True,
is_custom: bool = False,
):
print("Pulling PD player IDs...")
pd_players = await pd_players_df(cardset["id"])
all_stats = get_pitching_stats(file_path=input_path, ignore_limits=ignore_limits)
print(f"Processed {len(all_stats.values)} pitchers\n")
print("Pulling pitcher defense...")
if pull_fielding:
df_p = cde.get_bbref_fielding_df("p", season)
else:
df_p = pd.DataFrame()
pit_step1 = match_player_lines(all_stats, pd_players, df_p, is_custom)
if post_players:
new_pitchers = await create_new_players(
pit_step1, cardset, card_base_url, release_directory, player_description
)
else:
new_pitchers = 0
pitching_stats = get_stat_df(input_path, pit_step1)
del all_stats, pit_step1
pitching_stats = await calculate_pitching_cards(
pitching_stats, cardset, season_pct, post_pitchers
)
await create_position(season_pct, pitching_stats, post_pitchers, df_p)
await calculate_pitcher_ratings(pitching_stats, post_pitchers)
await post_player_updates(
cardset,
player_description,
card_base_url,
release_directory,
is_liveseries,
post_players,
season,
)
return {
"tot_pitchers": len(pitching_stats.index),
"new_pitchers": new_pitchers,
"pitching_stats": pitching_stats,
}