from fastapi import APIRouter, Query from typing import Optional, Literal import logging from ...db_engine import ( db, SbaPlayer, StratPlay, StratGame, Team, Player, model_to_dict, fn, SQL, ) from ...dependencies import handle_db_errors, add_cache_headers, cache_result from .common import build_season_games logger = logging.getLogger("discord_app") router = APIRouter() @router.get("/fielding") @handle_db_errors @add_cache_headers(max_age=10 * 60) @cache_result(ttl=5 * 60, key_prefix="plays-fielding") async def get_fielding_totals( season: list = Query(default=None), week: list = Query(default=None), s_type: Literal["regular", "post", "total", None] = None, position: list = Query(default=None), player_id: list = Query(default=None), sbaplayer_id: list = Query(default=None), group_by: Literal[ "team", "player", "playerteam", "playerposition", "teamposition", "playerpositiongame", "playergame", "playerteamposition", "playerweek", "teamweek", "sbaplayer", ] = "player", week_start: Optional[int] = None, week_end: Optional[int] = None, min_ch: Optional[int] = 1, team_id: list = Query(default=None), manager_id: list = Query(default=None), sort: Optional[str] = None, limit: Optional[int] = 200, short_output: Optional[bool] = False, page_num: Optional[int] = 1, ): season_games = build_season_games( season, week, s_type, week_start, week_end, manager_id ) # Build SELECT fields conditionally based on group_by for fielding to match GROUP BY exactly def_select_fields = [] cat_select_fields = [] if group_by == "player": def_select_fields = [StratPlay.defender] cat_select_fields = [StratPlay.catcher] elif group_by == "team": def_select_fields = [StratPlay.defender_team] cat_select_fields = [StratPlay.catcher_team] elif group_by == "playerteam": def_select_fields = [StratPlay.defender, StratPlay.defender_team] cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team] elif group_by == "playerposition": def_select_fields = [StratPlay.defender, StratPlay.check_pos] cat_select_fields = [StratPlay.catcher] elif group_by == "teamposition": def_select_fields = [StratPlay.defender_team, StratPlay.check_pos] cat_select_fields = [StratPlay.catcher_team] elif group_by == "playergame": def_select_fields = [StratPlay.defender, StratPlay.game] cat_select_fields = [StratPlay.catcher, StratPlay.game] elif group_by == "playerpositiongame": def_select_fields = [StratPlay.defender, StratPlay.check_pos, StratPlay.game] cat_select_fields = [StratPlay.catcher, StratPlay.game] elif group_by == "playerteamposition": def_select_fields = [ StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos, ] cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team] elif group_by == "playerweek": def_select_fields = [StratPlay.defender, StratPlay.game] cat_select_fields = [StratPlay.catcher, StratPlay.game] elif group_by == "teamweek": def_select_fields = [StratPlay.defender_team, StratPlay.game] cat_select_fields = [StratPlay.catcher_team, StratPlay.game] elif group_by == "sbaplayer": def_select_fields = [Player.sbaplayer] cat_select_fields = [Player.sbaplayer] else: # Default case def_select_fields = [StratPlay.defender, StratPlay.defender_team] cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team] # Ensure def_select_fields is not empty if not def_select_fields: def_select_fields = [ StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos, ] def_plays = ( StratPlay.select( *def_select_fields, fn.SUM(StratPlay.error).alias("sum_error"), fn.SUM(StratPlay.hit).alias("sum_hit"), fn.SUM(StratPlay.pa).alias("sum_chances"), fn.SUM(StratPlay.wpa).alias("sum_wpa"), ) .where((StratPlay.game << season_games) & (StratPlay.defender.is_null(False))) .having(fn.SUM(StratPlay.pa) >= min_ch) ) # Ensure cat_select_fields is not empty if not cat_select_fields: cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team] cat_plays = StratPlay.select( *cat_select_fields, fn.SUM(StratPlay.sb).alias("sum_sb"), fn.SUM(StratPlay.cs).alias("sum_cs"), fn.SUM(StratPlay.wpa).alias("sum_wpa"), fn.SUM(StratPlay.passed_ball).alias("sum_pb"), fn.SUM(StratPlay.error).alias("sum_error"), ).where((StratPlay.game << season_games) & (StratPlay.catcher.is_null(False))) if player_id is not None: all_players = Player.select().where(Player.id << player_id) def_plays = def_plays.where(StratPlay.defender << all_players) cat_plays = cat_plays.where(StratPlay.catcher << all_players) if sbaplayer_id is not None: sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id) def_plays = def_plays.where(StratPlay.defender << sba_players) cat_plays = cat_plays.where(StratPlay.catcher << sba_players) if team_id is not None: all_teams = Team.select().where(Team.id << team_id) def_plays = def_plays.where(StratPlay.defender_team << all_teams) cat_plays = cat_plays.where(StratPlay.catcher_team << all_teams) if position is not None: def_plays = def_plays.where(StratPlay.check_pos << position) if group_by is not None: if group_by == "player": def_plays = def_plays.group_by(StratPlay.defender) cat_plays = cat_plays.group_by(StratPlay.catcher) elif group_by == "team": def_plays = def_plays.group_by(StratPlay.defender_team) cat_plays = cat_plays.group_by(StratPlay.catcher_team) elif group_by == "playerteam": def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team) cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team) elif group_by == "playerposition": def_plays = def_plays.group_by(StratPlay.defender, StratPlay.check_pos) cat_plays = cat_plays.group_by(StratPlay.catcher) elif group_by == "teamposition": def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.check_pos) cat_plays = cat_plays.group_by(StratPlay.catcher_team) elif group_by == "playergame": def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game) cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game) elif group_by == "playerpositiongame": def_plays = def_plays.group_by( StratPlay.defender, StratPlay.check_pos, StratPlay.game ) cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game) elif group_by == "playerteamposition": def_plays = def_plays.group_by( StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos ) cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team) elif group_by == "playerweek": def_plays = def_plays.join(StratGame) def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game.week) cat_plays = cat_plays.join(StratGame) cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game.week) elif group_by == "teamweek": def_plays = def_plays.join(StratGame) def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.game.week) cat_plays = cat_plays.join(StratGame) cat_plays = cat_plays.group_by(StratPlay.catcher_team, StratPlay.game.week) elif group_by == "sbaplayer": def_plays = def_plays.join( Player, on=(StratPlay.defender == Player.id) ).where(Player.sbaplayer.is_null(False)) def_plays = def_plays.group_by(Player.sbaplayer) cat_plays = cat_plays.join( Player, on=(StratPlay.catcher == Player.id) ).where(Player.sbaplayer.is_null(False)) cat_plays = cat_plays.group_by(Player.sbaplayer) if sort is not None: if sort == "player": def_plays = def_plays.order_by(StratPlay.defender) elif sort == "team": def_plays = def_plays.order_by(StratPlay.defender_team) elif sort == "wpa-desc": def_plays = def_plays.order_by(SQL("sum_wpa").asc()) elif sort == "wpa-asc": def_plays = def_plays.order_by(SQL("sum_wpa").desc()) elif sort == "ch-desc": def_plays = def_plays.order_by(SQL("sum_chances").desc()) elif sort == "ch-asc": def_plays = def_plays.order_by(SQL("sum_chances").asc()) elif sort == "newest": # For grouped queries, only sort by fields in GROUP BY clause if group_by in [ "playergame", "playerpositiongame", "playerweek", "teamweek", ]: # StratPlay.game is in GROUP BY for these cases def_plays = def_plays.order_by(StratPlay.game.desc()) # For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY elif sort == "oldest": # For grouped queries, only sort by fields in GROUP BY clause if group_by in [ "playergame", "playerpositiongame", "playerweek", "teamweek", ]: # StratPlay.game is in GROUP BY for these cases def_plays = def_plays.order_by(StratPlay.game.asc()) # For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY if limit < 1: limit = 1 def_plays = def_plays.paginate(page_num, limit) logger.info(f"def_plays query: {def_plays}") return_stats = {"count": def_plays.count(), "stats": []} for x in def_plays: logger.info(f"this_play: {x}") # this_cat = cat_plays.where(StratPlay.catcher == x.defender) # if this_cat.count() > 0: # sum_sb = this_cat[0].sum_sb # sum_cs = this_cat[0].sum_cs # sum_wpa = this_cat[0].sum_wpa # sum_pb = this_cat[0].sum_pb # sum_error = this_cat[0].sum_error + x.sum_error # else: # sum_sb = 0 # sum_cs = 0 # sum_wpa = 0 # sum_pb = 0 # sum_error = x.sum_error this_pos = "TOT" if "position" in group_by: this_pos = x.check_pos this_cat = cat_plays if group_by in ["player", "playerposition"]: this_cat = this_cat.where(StratPlay.catcher == x.defender) elif group_by in ["team", "teamposition"]: this_cat = this_cat.where(StratPlay.catcher_team == x.defender_team) elif group_by in ["playerteam", "playerteamposition"]: this_cat = this_cat.where( (StratPlay.catcher == x.defender) & (StratPlay.catcher_team == x.defender_team) ) elif group_by in ["playergame", "playerpositiongame"]: this_cat = this_cat.where( (StratPlay.catcher == x.defender) & (StratPlay.game == x.game) ) elif group_by == "playerweek": this_cat = this_cat.where( (StratPlay.catcher == x.defender) & (StratPlay.game.week == x.game.week) ) elif group_by == "teamweek": this_cat = this_cat.where( (StratPlay.catcher_team == x.defender_team) & (StratPlay.game.week == x.game.week) ) this_cat = this_cat.where(StratPlay.game == x.game) elif group_by == "sbaplayer": this_cat = this_cat.where(Player.sbaplayer == x.defender.sbaplayer) if this_cat.count() > 0: sum_sb = this_cat[0].sum_sb sum_cs = this_cat[0].sum_cs sum_wpa = this_cat[0].sum_wpa sum_pb = this_cat[0].sum_pb sum_error = this_cat[0].sum_error + x.sum_error else: sum_sb = 0 sum_cs = 0 sum_wpa = 0 sum_pb = 0 sum_error = x.sum_error this_player = "TOT" if group_by == "sbaplayer": this_player = ( x.defender.sbaplayer_id if short_output else model_to_dict(x.defender.sbaplayer, recurse=False) ) elif "player" in group_by: this_player = ( x.defender_id if short_output else model_to_dict(x.defender, recurse=False) ) this_game = "TOT" if "game" in group_by: this_game = ( x.game_id if short_output else model_to_dict(x.game, recurse=False) ) this_week = "TOT" if group_by in ["playerweek", "teamweek"]: game_obj = getattr(x, "game", None) this_week = game_obj.week if game_obj else "TOT" # Handle team field based on grouping with safe access defender_team_obj = getattr(x, "defender_team", None) team_info = "TOT" if defender_team_obj: team_info = ( defender_team_obj.id if short_output else model_to_dict(defender_team_obj, recurse=False) ) return_stats["stats"].append( { "player": this_player, "team": team_info, "pos": this_pos, "x-ch": x.sum_chances, "hit": x.sum_hit, "error": sum_error, "sb-ch": sum_sb + sum_cs, "sb": sum_sb, "cs": sum_cs, "pb": sum_pb, "wpa": (x.sum_wpa + sum_wpa) * -1, "wf%": (x.sum_chances - (x.sum_error * 0.5) - (x.sum_hit * 0.75)) / x.sum_chances, "cs%": sum_cs / (sum_sb + sum_cs) if (sum_sb + sum_cs) > 0 else None, "game": this_game, "week": this_week, } ) db.close() return return_stats