paper-dynasty-database/app/routers_v2/pitchingcardratings.py
Cal Corum 0cba52cea5 PostgreSQL migration: Complete code preparation phase
- Add db_helpers.py with cross-database upsert functions for SQLite/PostgreSQL
- Replace 12 on_conflict_replace() calls with PostgreSQL-compatible upserts
- Add unique indexes: StratPlay(game, play_num), Decision(game, pitcher)
- Add max_length to Team model fields (abbrev, sname, lname)
- Fix boolean comparison in teams.py (== 0/1 to == False/True)
- Create migrate_to_postgres.py with ID-preserving migration logic
- Create audit_sqlite.py for pre-migration data integrity checks
- Add PROJECT_PLAN.json for migration tracking
- Add .secrets/ to .gitignore for credentials

Audit results: 658,963 records across 29 tables, 2,390 orphaned stats (expected)

Based on Major Domo migration lessons learned (33 issues resolved there)
2026-01-25 23:05:54 -06:00

603 lines
19 KiB
Python

import os
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import FileResponse
from typing import Literal, Optional, List
import logging
import pandas as pd
import pydantic
from pydantic import validator, root_validator
from ..db_engine import (
db,
PitchingCardRatings,
model_to_dict,
chunked,
PitchingCard,
Player,
query_to_csv,
Team,
CardPosition,
)
from ..db_helpers import upsert_pitching_card_ratings
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA["filename"],
format=LOG_DATA["format"],
level=LOG_DATA["log_level"],
)
router = APIRouter(prefix="/api/v2/pitchingcardratings", tags=["pitchingcardratings"])
RATINGS_FILE = "storage/pitching-ratings.csv"
BASIC_FILE = "storage/pitching-basic.csv"
class PitchingCardRatingsModel(pydantic.BaseModel):
pitchingcard_id: int
vs_hand: Literal["R", "L", "vR", "vL"]
homerun: float = 0.0
bp_homerun: float = 0.0
triple: float = 0.0
double_three: float = 0.0
double_two: float = 0.0
double_cf: float = 0.0
single_two: float = 0.0
single_one: float = 0.0
single_center: float = 0.0
bp_single: float = 0.0
hbp: float = 0.0
walk: float = 0.0
strikeout: float = 0.0
flyout_lf_b: float = 0.0
flyout_cf_b: float = 0.0
flyout_rf_b: float = 0.0
groundout_a: float = 0.0
groundout_b: float = 0.0
xcheck_p: float = 0.0
xcheck_c: float = 0.0
xcheck_1b: float = 0.0
xcheck_2b: float = 0.0
xcheck_3b: float = 0.0
xcheck_ss: float = 0.0
xcheck_lf: float = 0.0
xcheck_cf: float = 0.0
xcheck_rf: float = 0.0
avg: float = 0.0
obp: float = 0.0
slg: float = 0.0
@validator("avg", always=True)
def avg_validator(cls, v, values, **kwargs):
return (
values["homerun"]
+ values["bp_homerun"] / 2
+ values["triple"]
+ values["double_three"]
+ values["double_two"]
+ values["double_cf"]
+ values["single_two"]
+ values["single_one"]
+ values["single_center"]
+ values["bp_single"] / 2
) / 108
@validator("obp", always=True)
def obp_validator(cls, v, values, **kwargs):
return ((values["hbp"] + values["walk"]) / 108) + values["avg"]
@validator("slg", always=True)
def slg_validator(cls, v, values, **kwargs):
return (
values["homerun"] * 4
+ values["bp_homerun"] * 2
+ values["triple"] * 3
+ values["double_three"] * 2
+ values["double_two"] * 2
+ values["double_cf"] * 2
+ values["single_two"]
+ values["single_one"]
+ values["single_center"]
+ values["bp_single"] / 2
) / 108
@root_validator(skip_on_failure=True)
def validate_chance_total(cls, values):
total_chances = (
values["homerun"]
+ values["bp_homerun"]
+ values["triple"]
+ values["double_three"]
+ values["double_two"]
+ values["double_cf"]
+ values["single_two"]
+ values["single_one"]
+ values["single_center"]
+ values["bp_single"]
+ values["hbp"]
+ values["walk"]
+ values["strikeout"]
+ values["flyout_lf_b"]
+ values["flyout_cf_b"]
+ values["flyout_rf_b"]
+ values["groundout_a"]
+ values["groundout_b"]
+ values["xcheck_p"]
+ values["xcheck_c"]
+ values["xcheck_1b"]
+ values["xcheck_2b"]
+ values["xcheck_3b"]
+ values["xcheck_ss"]
+ values["xcheck_lf"]
+ values["xcheck_cf"]
+ values["xcheck_rf"]
)
if round(total_chances) != 108:
raise ValueError("Must have exactly 108 chances on the card")
return values
class RatingsList(pydantic.BaseModel):
ratings: List[PitchingCardRatingsModel]
@router.get("")
async def get_card_ratings(
pitchingcard_id: list = Query(default=None),
vs_hand: Literal["R", "L", "vR", "vL"] = None,
short_output: bool = False,
csv: bool = False,
cardset_id: list = Query(default=None),
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to pull card ratings."
)
all_ratings = PitchingCardRatings.select()
if pitchingcard_id is not None:
all_ratings = all_ratings.where(
PitchingCardRatings.pitchingcard_id << pitchingcard_id
)
if vs_hand is not None:
all_ratings = all_ratings.where(PitchingCardRatings.vs_hand == vs_hand[-1])
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(
Player.cardset_id << cardset_id
)
set_cards = PitchingCard.select(PitchingCard.id).where(
PitchingCard.player << set_players
)
all_ratings = all_ratings.where(PitchingCardRatings.pitchingcard << set_cards)
if csv:
return_val = query_to_csv(all_ratings)
db.close()
return Response(content=return_val, media_type="text/csv")
else:
return_val = {
"count": all_ratings.count(),
"ratings": [
model_to_dict(x, recurse=not short_output) for x in all_ratings
],
}
db.close()
return return_val
def get_scouting_dfs(cardset_id: list = None):
all_ratings = PitchingCardRatings.select()
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(
Player.cardset_id << cardset_id
)
set_cards = PitchingCard.select(PitchingCard.id).where(
PitchingCard.player << set_players
)
all_ratings = all_ratings.where(PitchingCardRatings.pitchingcard << set_cards)
vl_query = all_ratings.where(PitchingCardRatings.vs_hand == "L")
vr_query = all_ratings.where(PitchingCardRatings.vs_hand == "R")
vl_vals = [model_to_dict(x) for x in vl_query]
for x in vl_vals:
x.update(x["pitchingcard"])
x["player_id"] = x["pitchingcard"]["player"]["player_id"]
x["player_name"] = x["pitchingcard"]["player"]["p_name"]
x["rarity"] = x["pitchingcard"]["player"]["rarity"]["name"]
x["cardset_id"] = x["pitchingcard"]["player"]["cardset"]["id"]
x["cardset_name"] = x["pitchingcard"]["player"]["cardset"]["name"]
x["starter_rating"] = x["pitchingcard"]["starter_rating"]
x["relief_rating"] = x["pitchingcard"]["relief_rating"]
x["closer_rating"] = x["pitchingcard"]["closer_rating"]
del x["pitchingcard"], x["player"]
vr_vals = [model_to_dict(x) for x in vr_query]
for x in vr_vals:
x["player_id"] = x["pitchingcard"]["player"]["player_id"]
del x["pitchingcard"]
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
pit_df = pd.merge(vl, vr, on="player_id", suffixes=("_vl", "_vr")).set_index(
"player_id", drop=False
)
logging.debug(f"pit_df: {pit_df}")
positions = CardPosition.select().where(CardPosition.position == "P")
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(
Player.cardset_id << cardset_id
)
positions = positions.where(CardPosition.player << set_players)
series_list = [
pd.Series(
dict([(x.player.player_id, x.range) for x in positions]), name=f"Range P"
),
pd.Series(
dict([(x.player.player_id, x.error) for x in positions]), name=f"Error P"
),
]
db.close()
logging.debug(f"series_list: {series_list}")
return pit_df.join(series_list)
@router.get("/scouting")
async def get_card_scouting(team_id: int, ts: str):
this_team = Team.get_or_none(Team.id == team_id)
logging.debug(f"Team: {this_team} / has_guide: {this_team.has_guide}")
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
logging.warning(f"Team_id {team_id} attempted to pull ratings")
db.close()
return (
"Your team does not have the ratings guide enabled. If you have purchased a copy ping Cal to "
"make sure it is enabled on your team. If you are interested you can pick it up here (thank you!): "
"https://ko-fi.com/manticorum/shop"
)
if os.path.isfile(RATINGS_FILE):
return FileResponse(
path=RATINGS_FILE,
media_type="text/csv",
# headers=headers
)
raise HTTPException(
status_code=400, detail="Go pester Cal - the scouting file is missing"
)
@router.post("/calculate/scouting")
async def post_calc_scouting(token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to calculate card ratings."
)
logging.warning(f"Re-calculating pitching ratings\n\n")
output = get_scouting_dfs()
first = ["player_id", "player_name", "cardset_name", "rarity", "hand", "variant"]
exclude = first + ["id_vl", "id_vr", "vs_hand_vl", "vs_hand_vr"]
output = output[first + [col for col in output.columns if col not in exclude]]
csv_file = pd.DataFrame(output).to_csv(index=False)
with open(RATINGS_FILE, "w") as file:
file.write(csv_file)
return Response(content=csv_file, media_type="text/csv")
@router.get("/basic")
async def get_basic_scouting():
if os.path.isfile(BASIC_FILE):
return FileResponse(
path=BASIC_FILE,
media_type="text/csv",
# headers=headers
)
raise HTTPException(
status_code=400, detail="Go pester Cal - the scouting file is missing"
)
@router.post("/calculate/basic")
async def post_calc_basic(token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to calculate basic ratings."
)
logging.warning(f"Re-calculating basic pitching ratings\n\n")
raw_data = get_scouting_dfs()
logging.debug(f"output: {raw_data}")
def get_raw_leftcontrol(df_data):
return ((1 - (df_data["obp_vl"] - df_data["avg_vl"])) * 100) + (
1 - (df_data["wild_pitch"] / 20)
)
raw_series = raw_data.apply(get_raw_leftcontrol, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Control L"] = round(rank_series * 100)
def get_raw_rightcontrol(df_data):
return ((1 - (df_data["obp_vr"] - df_data["avg_vr"])) * 100) + (
1 - (df_data["wild_pitch"] / 20)
)
raw_series = raw_data.apply(get_raw_rightcontrol, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Control R"] = round(rank_series * 100)
def get_raw_leftstuff(df_data):
return 10 - (
df_data["slg_vl"]
+ df_data["slg_vl"]
+ ((df_data["homerun_vl"] + df_data["bp_homerun_vl"]) / 108)
)
raw_series = raw_data.apply(get_raw_leftstuff, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Stuff L"] = round(rank_series * 100)
def get_raw_rightstuff(df_data):
return 10 - (
df_data["slg_vr"]
+ df_data["slg_vr"]
+ ((df_data["homerun_vr"] + df_data["bp_homerun_vr"]) / 108)
)
raw_series = raw_data.apply(get_raw_rightstuff, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Stuff R"] = round(rank_series * 100)
def get_raw_fielding(df_data):
return ((6 - df_data["Range P"]) * 10) + (50 - df_data["Error P"])
raw_series = raw_data.apply(get_raw_fielding, axis=1)
rank_series = raw_series.rank(pct=True)
logging.debug(f"max fld: {raw_series.max()} / min fld: {raw_series.min()}")
raw_data["Fielding"] = round(rank_series * 100)
def get_raw_stamina(df_data):
spow = df_data["starter_rating"] if pd.isna(df_data["starter_rating"]) else -1
rpow = df_data["relief_rating"] if pd.isna(df_data["relief_rating"]) else -1
this_pow = spow if spow > rpow else rpow
return (
(
(this_pow * (df_data["obp_vr"] * (2 / 3)))
+ (this_pow * (df_data["obp_vl"] / 3))
)
* 4.5
) + this_pow
raw_series = raw_data.apply(get_raw_stamina, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Stamina"] = round(rank_series * 100)
def get_raw_hit(df_data):
return 1 - (df_data["avg_vr"] * (2 / 3)) + (df_data["avg_vl"] / 3)
raw_series = raw_data.apply(get_raw_hit, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["H/9"] = round(rank_series * 100)
def get_raw_k(df_data):
return ((df_data["strikeout_vr"] / 108) * (2 / 3)) + (
(df_data["strikeout_vl"] / 108) / 3
)
raw_series = raw_data.apply(get_raw_k, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["K/9"] = round(rank_series * 100)
def get_raw_bb(df_data):
return ((df_data["walk_vr"] / 108) * (2 / 3)) + ((df_data["walk_vl"] / 108) / 3)
raw_series = raw_data.apply(get_raw_bb, axis=1)
rank_series = raw_series.rank(pct=True, ascending=False)
raw_data["BB/9"] = round(rank_series * 100)
def get_raw_hr(df_data):
return 1 - (
(((df_data["homerun_vr"] + df_data["bp_homerun_vr"]) / 108) * (2 / 3))
+ (((df_data["homerun_vl"] + df_data["bp_homerun_vl"]) / 108) / 3)
)
raw_series = raw_data.apply(get_raw_hr, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["HR/9"] = round(rank_series * 100)
def get_raw_rating(df_data):
spow = df_data["starter_rating"] if pd.isna(df_data["starter_rating"]) else -1
rpow = df_data["relief_rating"] if pd.isna(df_data["relief_rating"]) else -1
if spow > rpow and spow >= 4:
return (
(
(
df_data["H/9"]
+ df_data["K/9"]
+ df_data["BB/9"]
+ df_data["HR/9"]
)
* 5
)
+ (df_data["Fielding"])
+ (df_data["Stamina"] * 5)
+ (((df_data["Stuff L"] / 3) + (df_data["Stuff R"] * (2 / 3))) * 4)
+ (((df_data["Control L"] / 3) + (df_data["Control R"] * (2 / 3))) * 2)
)
else:
return (
(
(
df_data["H/9"]
+ df_data["K/9"]
+ df_data["BB/9"]
+ df_data["HR/9"]
)
* 5
)
+ (df_data["Fielding"])
+ (df_data["Stamina"] * 5)
+ (((df_data["Stuff L"] / 3) + (df_data["Stuff R"] * (2 / 3))) * 4)
+ (((df_data["Control L"] / 3) + (df_data["Control R"] * (2 / 3))) * 2)
)
raw_series = raw_data.apply(get_raw_rating, axis=1)
rank_series = raw_series.rank(pct=True)
raw_data["Rating"] = round(rank_series * 100)
output = raw_data[
[
"player_id",
"player_name",
"Rating",
"Control R",
"Control L",
"Stuff R",
"Stuff L",
"Stamina",
"Fielding",
"H/9",
"K/9",
"BB/9",
"HR/9",
"hand",
"cardset_name",
]
]
csv_file = pd.DataFrame(output).to_csv(index=False)
with open(BASIC_FILE, "w") as file:
file.write(csv_file)
return Response(content=csv_file, media_type="text/csv")
@router.get("/{ratings_id}")
async def get_one_rating(ratings_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to pull card ratings."
)
this_rating = PitchingCardRatings.get_or_none(PitchingCardRatings.id == ratings_id)
if this_rating is None:
db.close()
raise HTTPException(
status_code=404, detail=f"PitchingCardRating id {ratings_id} not found"
)
r_data = model_to_dict(this_rating)
db.close()
return r_data
@router.get("/player/{player_id}")
async def get_player_ratings(
player_id: int, variant: list = Query(default=None), short_output: bool = False
):
all_cards = (
PitchingCard.select()
.where(PitchingCard.player_id == player_id)
.order_by(PitchingCard.variant)
)
if variant is not None:
all_cards = all_cards.where(PitchingCard.variant << variant)
all_ratings = PitchingCardRatings.select().where(
PitchingCardRatings.pitchingcard << all_cards
)
return_val = {
"count": all_ratings.count(),
"ratings": [model_to_dict(x, recurse=not short_output) for x in all_ratings],
}
db.close()
return return_val
@router.put("")
async def put_ratings(ratings: RatingsList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to post card ratings."
)
new_ratings = []
updates = 0
for x in ratings.ratings:
try:
PitchingCardRatings.get(
(PitchingCardRatings.pitchingcard_id == x.pitchingcard_id)
& (PitchingCardRatings.vs_hand == x.vs_hand)
)
updates += (
PitchingCardRatings.update(x.dict())
.where(
(PitchingCardRatings.pitchingcard_id == x.pitchingcard_id)
& (PitchingCardRatings.vs_hand == x.vs_hand)
)
.execute()
)
except PitchingCardRatings.DoesNotExist:
new_ratings.append(x.dict())
with db.atomic():
# Use PostgreSQL-compatible upsert helper
upsert_pitching_card_ratings(new_ratings, batch_size=30)
db.close()
return f"Updated ratings: {updates}; new ratings: {len(new_ratings)}"
@router.delete("/{ratings_id}")
async def delete_rating(ratings_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
db.close()
raise HTTPException(
status_code=401, detail="You are not authorized to post card ratings."
)
this_rating = PitchingCardRatings.get_or_none(PitchingCardRatings.id == ratings_id)
if this_rating is None:
db.close()
raise HTTPException(
status_code=404, detail=f"PitchingCardRating id {ratings_id} not found"
)
count = this_rating.delete_instance()
db.close()
if count == 1:
return f"Rating {this_rating} has been deleted"
else:
raise HTTPException(
status_code=500, detail=f"Rating {this_rating} could not be deleted"
)