import logging from typing import Optional, Literal from fastapi import APIRouter, Query, Response from ...db_engine import ( db, SbaPlayer, StratPlay, StratGame, Team, Player, Decision, model_to_dict, fn, SQL, complex_data_to_csv, ) from ...dependencies import handle_db_errors, add_cache_headers, cache_result from .common import build_season_games router = APIRouter() logger = logging.getLogger("discord_app") @router.get("/pitching") @handle_db_errors @add_cache_headers(max_age=10 * 60) @cache_result(ttl=5 * 60, key_prefix="plays-batting") async def get_pitching_totals( season: list = Query(default=None), week: list = Query(default=None), s_type: Literal["regular", "post", "total", None] = None, player_id: list = Query(default=None), sbaplayer_id: list = Query(default=None), group_by: Literal[ "team", "player", "playerteam", "playergame", "teamgame", "league", "playerweek", "teamweek", "sbaplayer", ] = "player", min_pa: Optional[int] = 1, team_id: list = Query(default=None), manager_id: list = Query(default=None), obc: list = Query(default=None), risp: Optional[bool] = None, inning: list = Query(default=None), sort: Optional[str] = None, limit: Optional[int] = 200, short_output: Optional[bool] = False, csv: Optional[bool] = False, page_num: Optional[int] = 1, week_start: Optional[int] = None, week_end: Optional[int] = None, ): season_games = build_season_games( season, week, s_type, week_start, week_end, manager_id ) # Build SELECT fields conditionally based on group_by for pitching to match GROUP BY exactly pitch_select_fields = [] if group_by == "player": pitch_select_fields = [StratPlay.pitcher] elif group_by == "team": pitch_select_fields = [StratPlay.pitcher_team] elif group_by == "playerteam": pitch_select_fields = [StratPlay.pitcher, StratPlay.pitcher_team] elif group_by == "playergame": pitch_select_fields = [StratPlay.pitcher, StratPlay.game] elif group_by == "teamgame": pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game] elif group_by == "playerweek": pitch_select_fields = [StratPlay.pitcher, StratPlay.game] elif group_by == "teamweek": pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game] elif group_by == "sbaplayer": pitch_select_fields = [Player.sbaplayer] else: # Default case pitch_select_fields = [StratPlay.pitcher] # Build Peewee query for pitching stats pitch_plays = ( StratPlay.select( *pitch_select_fields, fn.SUM(StratPlay.pa).alias("sum_pa"), fn.SUM(StratPlay.ab).alias("sum_ab"), fn.SUM(StratPlay.run).alias("sum_run"), fn.SUM(StratPlay.hit).alias("sum_hit"), fn.SUM(StratPlay.rbi).alias("sum_rbi"), fn.SUM(StratPlay.double).alias("sum_double"), fn.SUM(StratPlay.triple).alias("sum_triple"), fn.SUM(StratPlay.homerun).alias("sum_hr"), fn.SUM(StratPlay.bb).alias("sum_bb"), fn.SUM(StratPlay.so).alias("sum_so"), fn.SUM(StratPlay.wpa).alias("sum_wpa"), fn.SUM(StratPlay.hbp).alias("sum_hbp"), fn.SUM(StratPlay.sac).alias("sum_sac"), fn.SUM(StratPlay.ibb).alias("sum_ibb"), fn.SUM(StratPlay.gidp).alias("sum_gidp"), fn.SUM(StratPlay.sb).alias("sum_sb"), fn.SUM(StratPlay.cs).alias("sum_cs"), fn.SUM(StratPlay.bphr).alias("sum_bphr"), fn.SUM(StratPlay.bpfo).alias("sum_bpfo"), fn.SUM(StratPlay.bp1b).alias("sum_bp1b"), fn.SUM(StratPlay.bplo).alias("sum_bplo"), fn.SUM(StratPlay.wild_pitch).alias("sum_wp"), fn.SUM(StratPlay.balk).alias("sum_balk"), fn.SUM(StratPlay.outs).alias("sum_outs"), fn.SUM(StratPlay.e_run).alias("sum_erun"), fn.SUM(StratPlay.re24_primary).alias("sum_repri"), ) .where((StratPlay.game << season_games) & (StratPlay.pitcher.is_null(False))) .having(fn.SUM(StratPlay.pa) >= min_pa) ) # Apply filters to the pitching query if player_id is not None: pitch_plays = pitch_plays.where(StratPlay.pitcher << player_id) if sbaplayer_id is not None: sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id) pitch_plays = pitch_plays.where(StratPlay.pitcher << sba_players) if team_id is not None: pitch_plays = pitch_plays.where(StratPlay.pitcher_team << team_id) # Add JOINs for special group_by modes if group_by == "sbaplayer": pitch_plays = pitch_plays.join( Player, on=(StratPlay.pitcher == Player.id) ).where(Player.sbaplayer.is_null(False)) # Group by the fields if pitch_select_fields: pitch_plays = pitch_plays.group_by(*pitch_select_fields) # Apply sorting if sort is not None: if sort == "player": pitch_plays = pitch_plays.order_by(StratPlay.pitcher) elif sort == "team": pitch_plays = pitch_plays.order_by(StratPlay.pitcher_team) elif sort == "wpa-desc": pitch_plays = pitch_plays.order_by(SQL("sum_wpa").desc()) elif sort == "wpa-asc": pitch_plays = pitch_plays.order_by(SQL("sum_wpa").asc()) elif sort == "repri-desc": pitch_plays = pitch_plays.order_by(SQL("sum_repri").desc()) elif sort == "repri-asc": pitch_plays = pitch_plays.order_by(SQL("sum_repri").asc()) elif sort == "pa-desc": pitch_plays = pitch_plays.order_by(SQL("sum_pa").desc()) elif sort == "pa-asc": pitch_plays = pitch_plays.order_by(SQL("sum_pa").asc()) elif sort == "newest": if group_by in ["playergame", "teamgame"]: pitch_plays = pitch_plays.order_by(StratPlay.game.desc()) elif sort == "oldest": if group_by in ["playergame", "teamgame"]: pitch_plays = pitch_plays.order_by(StratPlay.game.asc()) if limit < 1: limit = 1 pitch_plays = pitch_plays.paginate(page_num, limit) # Execute the Peewee query return_stats = {"count": 0, "stats": []} for x in pitch_plays: # Extract basic stats from Peewee result tot_outs = x.sum_outs if x.sum_outs > 0 else 1 obp = (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / x.sum_pa slg = ( x.sum_hr * 4 + x.sum_triple * 3 + x.sum_double * 2 + (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr) ) / max(x.sum_ab, 1) tot_bb = 0.1 if x.sum_bb == 0 else x.sum_bb # Handle player field based on grouping with safe access (similar to fielding) this_player = "TOT" if group_by == "sbaplayer": this_player = ( x.pitcher.sbaplayer_id if short_output else model_to_dict(x.pitcher.sbaplayer, recurse=False) ) elif "player" in group_by: try: this_player = ( x.pitcher_id if short_output else model_to_dict(x.pitcher, recurse=False) ) except Exception as e: logger.error( "Error extracting pitcher from query: {e}\n\nx: {x}", stack_info=True, ) # Handle team field based on grouping with safe access team_info = "TOT" if "team" in group_by and hasattr(x, "pitcher_team"): pitcher_team_obj = getattr(x, "pitcher_team", None) if pitcher_team_obj: team_info = ( pitcher_team_obj.id if short_output else model_to_dict(pitcher_team_obj, recurse=False) ) # Handle game field based on grouping with safe access this_game = "TOT" if "game" in group_by: this_game = ( x.game_id if short_output else model_to_dict(x.game, recurse=False) ) this_week = "TOT" if group_by in ["playerweek", "teamweek"]: game_obj = getattr(x, "game", None) this_week = game_obj.week if game_obj else "TOT" # Get Decision data for this specific grouping decision_query = Decision.select( fn.SUM(Decision.win).alias("sum_win"), fn.SUM(Decision.loss).alias("sum_loss"), fn.SUM(Decision.hold).alias("sum_hold"), fn.SUM(Decision.is_save).alias("sum_save"), fn.SUM(Decision.b_save).alias("sum_b_save"), fn.SUM(Decision.irunners).alias("sum_irunners"), fn.SUM(Decision.irunners_scored).alias("sum_irun_scored"), fn.SUM(Decision.is_start.cast("integer")).alias("sum_gs"), fn.COUNT(Decision.game_id).alias("sum_game"), ).where(Decision.game << season_games) # Apply same filters as main query based on grouping if group_by == "sbaplayer": sba_pitchers = Player.select(Player.id).where( Player.sbaplayer == x.pitcher.sbaplayer_id ) decision_query = decision_query.where(Decision.pitcher << sba_pitchers) elif "player" in group_by: decision_query = decision_query.where(Decision.pitcher == x.pitcher_id) if "team" in group_by and hasattr(x, "pitcher_team") and x.pitcher_team: # Filter by the team field in Decision table directly team_obj = getattr(x, "pitcher_team", None) if team_obj: decision_query = decision_query.where(Decision.team == team_obj.id) if "game" in group_by: decision_query = decision_query.where(Decision.game == x.game_id) # Execute decision query try: decision_result = decision_query.get() decision_data = { "sum_win": decision_result.sum_win or 0, "sum_loss": decision_result.sum_loss or 0, "sum_hold": decision_result.sum_hold or 0, "sum_save": decision_result.sum_save or 0, "sum_b_save": decision_result.sum_b_save or 0, "sum_irunners": decision_result.sum_irunners or 0, "sum_irun_scored": decision_result.sum_irun_scored or 0, "sum_gs": decision_result.sum_gs or 0, "sum_game": decision_result.sum_game or 0, } except Decision.DoesNotExist: # No decision data found for this grouping decision_data = { "sum_win": 0, "sum_loss": 0, "sum_hold": 0, "sum_save": 0, "sum_b_save": 0, "sum_irunners": 0, "sum_irun_scored": 0, "sum_gs": 0, "sum_game": 0, } return_stats["stats"].append( { "player": this_player, "team": team_info, "tbf": x.sum_pa, "outs": x.sum_outs, "games": decision_data["sum_game"], "gs": decision_data["sum_gs"], "win": decision_data["sum_win"], "loss": decision_data["sum_loss"], "hold": decision_data["sum_hold"], "save": decision_data["sum_save"], "bsave": decision_data["sum_b_save"], "ir": decision_data["sum_irunners"], "ir_sc": decision_data["sum_irun_scored"], "ab": x.sum_ab, "run": x.sum_run, "e_run": x.sum_erun, "hits": x.sum_hit, "double": x.sum_double, "triple": x.sum_triple, "hr": x.sum_hr, "bb": x.sum_bb, "so": x.sum_so, "hbp": x.sum_hbp, "sac": x.sum_sac, "ibb": x.sum_ibb, "gidp": x.sum_gidp, "sb": x.sum_sb, "cs": x.sum_cs, "bphr": x.sum_bphr, "bpfo": x.sum_bpfo, "bp1b": x.sum_bp1b, "bplo": x.sum_bplo, "wp": x.sum_wp, "balk": x.sum_balk, "wpa": x.sum_wpa * -1, "era": (x.sum_erun * 27) / tot_outs, "whip": ((x.sum_bb + x.sum_hit + x.sum_ibb) * 3) / tot_outs, "avg": x.sum_hit / max(x.sum_ab, 1), "obp": obp, "slg": slg, "ops": obp + slg, "woba": ( 0.69 * x.sum_bb + 0.72 * x.sum_hbp + 0.89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr) + 1.27 * x.sum_double + 1.62 * x.sum_triple + 2.1 * x.sum_hr ) / max(x.sum_pa - x.sum_ibb, 1), "k/9": x.sum_so * 9 / (tot_outs / 3), "bb/9": x.sum_bb * 9 / (tot_outs / 3), "k/bb": x.sum_so / tot_bb, "game": this_game, "lob_2outs": 0, # Not available in current implementation "rbi%": 0, # Not available in current implementation "week": this_week, "re24_primary": x.sum_repri * -1 if x.sum_repri is not None else None, } ) return_stats["count"] = len(return_stats["stats"]) db.close() if csv: return Response( content=complex_data_to_csv(return_stats["stats"]), media_type="text/csv" ) return return_stats