major-domo-database/app/routers_v3/stratplay.py
2025-10-27 14:57:14 -05:00

1374 lines
63 KiB
Python

import copy
from itertools import groupby
from tokenize import group
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional, Literal
import logging
from pydantic import BaseModel, validator
from ..db_engine import db, StratPlay, StratGame, Team, Player, Decision, model_to_dict, chunked, fn, SQL, \
complex_data_to_csv
from ..dependencies import add_cache_headers, cache_result, oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors
logger = logging.getLogger('discord_app')
router = APIRouter(
prefix='/api/v3/plays',
tags=['plays']
)
POS_LIST = Literal['C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF', 'P', 'DH', 'PH', 'PR', 'GHOST']
class PlayModel(BaseModel):
game_id: int
play_num: int
batter_id: int = None
batter_team_id: int = None
pitcher_id: int
pitcher_team_id: int = None
on_base_code: str
inning_half: Literal['top', 'bot', 'Top', 'Bot']
inning_num: int
batting_order: int
starting_outs: int
away_score: int
home_score: int
batter_pos: POS_LIST = None
on_first_id: int = None
on_first_final: int = None
on_second_id: int = None
on_second_final: int = None
on_third_id: int = None
on_third_final: int = None
batter_final: int = None
pa: int = 0
ab: int = 0
e_run: int = 0
run: int = 0
hit: int = 0
rbi: int = 0
double: int = 0
triple: int = 0
homerun: int = 0
bb: int = 0
so: int = 0
hbp: int = 0
sac: int = 0
ibb: int = 0
gidp: int = 0
bphr: int = 0
bpfo: int = 0
bp1b: int = 0
bplo: int = 0
sb: int = 0
cs: int = 0
outs: int = 0
wpa: float = 0
catcher_id: int = None
catcher_team_id: int = None
defender_id: int = None
defender_team_id: int = None
runner_id: int = None
runner_team_id: int = None
check_pos: POS_LIST = None
error: int = 0
wild_pitch: int = 0
passed_ball: int = 0
pick_off: int = 0
balk: int = 0
is_go_ahead: bool = False
is_tied: bool = False
is_new_inning: bool = False
hand_batting: str = None
hand_pitching: str = None
re24_primary: float = None
re24_running: float = None
@validator('on_first_final')
def no_final_if_no_runner_one(cls, v, values):
if values['on_first_id'] is None:
return None
return v
@validator('on_second_final')
def no_final_if_no_runner_two(cls, v, values):
if values['on_second_id'] is None:
return None
return v
@validator('on_third_final')
def no_final_if_no_runner_three(cls, v, values):
if values['on_third_id'] is None:
return None
return v
@validator('batter_final')
def no_final_if_no_batter(cls, v, values):
if values['batter_id'] is None:
return None
return v
class PlayList(BaseModel):
plays: List[PlayModel]
@router.get('')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='plays')
async def get_plays(
game_id: list = Query(default=None), batter_id: list = Query(default=None), season: list = Query(default=None),
week: list = Query(default=None), has_defender: Optional[bool] = None, has_catcher: Optional[bool] = None,
has_defender_or_catcher: Optional[bool] = None, is_scoring_play: Optional[bool] = None,
pitcher_id: list = Query(default=None), obc: list = Query(default=None), inning: list = Query(default=None),
batting_order: list = Query(default=None), starting_outs: list = Query(default=None),
batter_pos: list = Query(default=None), catcher_id: list = Query(default=None),
defender_id: list = Query(default=None), runner_id: list = Query(default=None),
offense_team_id: list = Query(default=None), defense_team_id: list = Query(default=None),
hit: Optional[int] = None, double: Optional[int] = None, triple: Optional[int] = None,
homerun: Optional[int] = None, play_num: list = Query(default=None), error: list = Query(default=None),
sb: Optional[int] = None, cs: Optional[int] = None, manager_id: list = Query(default=None),
run: Optional[int] = None, e_run: Optional[int] = None, rbi: list = Query(default=None),
outs: list = Query(default=None), wild_pitch: Optional[int] = None, is_final_out: Optional[bool] = None,
is_go_ahead: Optional[bool] = None, is_tied: Optional[bool] = None, is_new_inning: Optional[bool] = None,
min_wpa: Optional[float] = None, max_wpa: Optional[float] = None, pitcher_team_id: list = Query(default=None),
short_output: Optional[bool] = False, sort: Optional[str] = None, limit: Optional[int] = 200,
page_num: Optional[int] = 1, s_type: Literal['regular', 'post', 'total', None] = None):
all_plays = StratPlay.select()
if season is not None:
s_games = StratGame.select().where(StratGame.season << season)
all_plays = all_plays.where(StratPlay.game << s_games)
if week is not None:
w_games = StratGame.select().where(StratGame.week << week)
all_plays = all_plays.where(StratPlay.game << w_games)
if has_defender is not None:
all_plays = all_plays.where(StratPlay.defender.is_null(False))
if has_catcher is not None:
all_plays = all_plays.where(StratPlay.catcher.is_null(False))
if has_defender_or_catcher is not None:
all_plays = all_plays.where(
(StratPlay.catcher.is_null(False)) | (StratPlay.defender.is_null(False))
)
if game_id is not None:
all_plays = all_plays.where(StratPlay.game_id << game_id)
if batter_id is not None:
all_plays = all_plays.where(StratPlay.batter_id << batter_id)
if pitcher_id is not None:
all_plays = all_plays.where(StratPlay.pitcher_id << pitcher_id)
if obc is not None:
all_plays = all_plays.where(StratPlay.on_base_code << obc)
if inning is not None:
all_plays = all_plays.where(StratPlay.inning_num << inning)
if batting_order is not None:
all_plays = all_plays.where(StratPlay.batting_order << batting_order)
if starting_outs is not None:
all_plays = all_plays.where(StratPlay.starting_outs << starting_outs)
if batter_pos is not None:
all_plays = all_plays.where(StratPlay.batter_pos << batter_pos)
if catcher_id is not None:
all_plays = all_plays.where(StratPlay.catcher_id << catcher_id)
if defender_id is not None:
all_plays = all_plays.where(StratPlay.defender_id << defender_id)
if runner_id is not None:
all_plays = all_plays.where(StratPlay.runner_id << runner_id)
if pitcher_team_id is not None:
all_teams = Team.select().where(Team.id << pitcher_team_id)
all_plays = all_plays.where(
(StratPlay.pitcher_team << all_teams)
)
if offense_team_id is not None:
all_teams = Team.select().where(Team.id << offense_team_id)
all_plays = all_plays.where(
(StratPlay.batter_team << all_teams) | (StratPlay.runner_team << all_teams)
)
if defense_team_id is not None:
all_teams = Team.select().where(Team.id << defense_team_id)
all_plays = all_plays.where(
(StratPlay.catcher_team << all_teams) | (StratPlay.defender_team << all_teams)
)
if hit is not None:
all_plays = all_plays.where(StratPlay.hit == hit)
if double is not None:
all_plays = all_plays.where(StratPlay.double == double)
if triple is not None:
all_plays = all_plays.where(StratPlay.triple == triple)
if homerun is not None:
all_plays = all_plays.where(StratPlay.homerun == homerun)
if sb is not None:
all_plays = all_plays.where(StratPlay.sb == sb)
if cs is not None:
all_plays = all_plays.where(StratPlay.cs == cs)
if wild_pitch is not None:
all_plays = all_plays.where(StratPlay.wild_pitch == wild_pitch)
if run is not None:
all_plays = all_plays.where(StratPlay.run == run)
if e_run is not None:
all_plays = all_plays.where(StratPlay.e_run == e_run)
if rbi is not None:
all_plays = all_plays.where(StratPlay.rbi << rbi)
if outs is not None:
all_plays = all_plays.where(StratPlay.outs << outs)
if error is not None:
all_plays = all_plays.where(StratPlay.error << error)
if manager_id is not None:
all_games = StratGame.select().where(
(StratGame.away_manager_id << manager_id) | (StratGame.home_manager_id << manager_id)
)
all_plays = all_plays.where(StratPlay.game << all_games)
if is_final_out is not None:
all_plays = all_plays.where(StratPlay.starting_outs + StratPlay.outs == 3)
if is_go_ahead is not None:
all_plays = all_plays.where(StratPlay.is_go_ahead == is_go_ahead)
if is_tied is not None:
all_plays = all_plays.where(StratPlay.is_tied == is_tied)
if is_new_inning is not None:
all_plays = all_plays.where(StratPlay.is_new_inning == is_new_inning)
if is_scoring_play is not None:
all_plays = all_plays.where(
(StratPlay.on_first_final == 4) | (StratPlay.on_second_final == 4) | (StratPlay.on_third_final == 4) |
(StratPlay.batter_final == 4)
)
if min_wpa is not None:
all_plays = all_plays.where(StratPlay.wpa >= min_wpa)
if max_wpa is not None:
all_plays = all_plays.where(StratPlay.wpa <= max_wpa)
if play_num is not None:
all_plays = all_plays.where(StratPlay.play_num << play_num)
if s_type is not None:
season_games = StratGame.select()
if s_type == 'regular':
season_games = season_games.where(StratGame.week <= 18)
elif s_type == 'post':
season_games = season_games.where(StratGame.week > 18)
all_plays = all_plays.where(StratPlay.game << season_games)
if limit < 1:
limit = 1
bat_plays = all_plays.paginate(page_num, limit)
if sort == 'wpa-desc':
all_plays = all_plays.order_by(-fn.ABS(StratPlay.wpa))
elif sort == 'wpa-asc':
all_plays = all_plays.order_by(fn.ABS(StratPlay.wpa))
elif sort == 're24-desc':
all_plays = all_plays.order_by(-fn.ABS(StratPlay.re24_primary))
elif sort == 're24-asc':
all_plays = all_plays.order_by(fn.ABS(StratPlay.re24_primary))
elif sort == 'newest':
all_plays = all_plays.order_by(StratPlay.game_id.desc(), StratPlay.play_num.desc())
elif sort == 'oldest':
all_plays = all_plays.order_by(StratPlay.game_id, StratPlay.play_num)
all_plays = all_plays.limit(limit)
return_plays = {
'count': all_plays.count(),
'plays': [model_to_dict(x, recurse=not short_output) for x in all_plays]
}
db.close()
return return_plays
@router.get('/batting')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='plays-batting')
async def get_batting_totals(
season: list = Query(default=None), week: list = Query(default=None),
s_type: Literal['regular', 'post', 'total', None] = None, position: list = Query(default=None),
player_id: list = Query(default=None), min_wpa: Optional[float] = -999, max_wpa: Optional[float] = 999,
group_by: Literal['team', 'player', 'playerteam', 'playergame', 'teamgame', 'league', 'playerweek',
'teamweek'] = 'player',
min_pa: Optional[int] = 1, team_id: list = Query(default=None), manager_id: list = Query(default=None),
obc: list = Query(default=None), risp: Optional[bool] = None, inning: list = Query(default=None),
sort: Optional[str] = None, limit: Optional[int] = 200, short_output: Optional[bool] = False,
page_num: Optional[int] = 1, week_start: Optional[int] = None, week_end: Optional[int] = None,
min_repri: Optional[int] = None):
season_games = StratGame.select()
if season is not None:
season_games = season_games.where(StratGame.season << season)
if week is not None and s_type is not None:
raise HTTPException(status_code=400, detail=f'Week and s_type parameters cannot be used in the same query')
if week is not None and (week_start is not None or week_end is not None):
raise HTTPException(
status_code=400, detail=f'Week and week_start/week_end parameters cannot be used in the same query')
if week is not None:
season_games = season_games.where(StratGame.week << week)
if week_start is not None:
season_games = season_games.where(StratGame.week >= week_start)
if week_end is not None:
season_games = season_games.where(StratGame.week <= week_end)
if s_type is not None:
if s_type == 'regular':
season_games = season_games.where(StratGame.week <= 18)
elif s_type == 'post':
season_games = season_games.where(StratGame.week > 18)
if manager_id is not None:
season_games = season_games.where(
(StratGame.away_manager_id << manager_id) | (StratGame.home_manager_id << manager_id)
)
# Build SELECT fields conditionally based on group_by
base_select_fields = [
fn.SUM(StratPlay.pa).alias('sum_pa'),
fn.SUM(StratPlay.ab).alias('sum_ab'), fn.SUM(StratPlay.run).alias('sum_run'),
fn.SUM(StratPlay.hit).alias('sum_hit'), fn.SUM(StratPlay.rbi).alias('sum_rbi'),
fn.SUM(StratPlay.double).alias('sum_double'), fn.SUM(StratPlay.triple).alias('sum_triple'),
fn.SUM(StratPlay.homerun).alias('sum_hr'), fn.SUM(StratPlay.bb).alias('sum_bb'),
fn.SUM(StratPlay.so).alias('sum_so'),
fn.SUM(StratPlay.hbp).alias('sum_hbp'), fn.SUM(StratPlay.sac).alias('sum_sac'),
fn.SUM(StratPlay.ibb).alias('sum_ibb'), fn.SUM(StratPlay.gidp).alias('sum_gidp'),
fn.SUM(StratPlay.sb).alias('sum_sb'), fn.SUM(StratPlay.cs).alias('sum_cs'),
fn.SUM(StratPlay.bphr).alias('sum_bphr'), fn.SUM(StratPlay.bpfo).alias('sum_bpfo'),
fn.SUM(StratPlay.bp1b).alias('sum_bp1b'), fn.SUM(StratPlay.bplo).alias('sum_bplo'),
fn.SUM(StratPlay.wpa).alias('sum_wpa'), fn.SUM(StratPlay.re24_primary).alias('sum_repri'),
fn.COUNT(StratPlay.on_first_final).filter(
StratPlay.on_first_final.is_null(False) & (StratPlay.on_first_final != 4)).alias('count_lo1'),
fn.COUNT(StratPlay.on_second_final).filter(
StratPlay.on_second_final.is_null(False) & (StratPlay.on_second_final != 4)).alias('count_lo2'),
fn.COUNT(StratPlay.on_third_final).filter(
StratPlay.on_third_final.is_null(False) & (StratPlay.on_third_final != 4)).alias('count_lo3'),
fn.COUNT(StratPlay.on_first).filter(StratPlay.on_first.is_null(False)).alias('count_runner1'),
fn.COUNT(StratPlay.on_second).filter(StratPlay.on_second.is_null(False)).alias('count_runner2'),
fn.COUNT(StratPlay.on_third).filter(StratPlay.on_third.is_null(False)).alias('count_runner3'),
fn.COUNT(StratPlay.on_first_final).filter(
StratPlay.on_first_final.is_null(False) & (StratPlay.on_first_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo1_3out'),
fn.COUNT(StratPlay.on_second_final).filter(
StratPlay.on_second_final.is_null(False) & (StratPlay.on_second_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo2_3out'),
fn.COUNT(StratPlay.on_third_final).filter(
StratPlay.on_third_final.is_null(False) & (StratPlay.on_third_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo3_3out')
]
# Add player and team fields based on grouping type
if group_by in ['player', 'playerteam', 'playergame', 'playerweek']:
base_select_fields.insert(0, StratPlay.batter) # Add batter as first field
if group_by in ['team', 'playerteam', 'teamgame', 'teamweek']:
base_select_fields.append(StratPlay.batter_team)
bat_plays = (
StratPlay
.select(*base_select_fields)
.where((StratPlay.game << season_games) & (StratPlay.batter.is_null(False)))
.having((fn.SUM(StratPlay.pa) >= min_pa))
)
if min_repri is not None:
bat_plays = bat_plays.having(fn.SUM(StratPlay.re24_primary) >= min_repri)
# Build running plays SELECT fields conditionally
run_select_fields = [
fn.SUM(StratPlay.sb).alias('sum_sb'),
fn.SUM(StratPlay.cs).alias('sum_cs'), fn.SUM(StratPlay.pick_off).alias('sum_pick'),
fn.SUM(StratPlay.wpa).alias('sum_wpa'), fn.SUM(StratPlay.re24_running).alias('sum_rerun')
]
if group_by in ['player', 'playerteam', 'playergame', 'playerweek']:
run_select_fields.insert(0, StratPlay.runner) # Add runner as first field
if group_by in ['team', 'playerteam', 'teamgame', 'teamweek']:
run_select_fields.append(StratPlay.runner_team)
run_plays = (
StratPlay
.select(*run_select_fields)
.where((StratPlay.game << season_games) & (StratPlay.runner.is_null(False)))
)
# Build defensive plays SELECT fields conditionally
def_select_fields = [
fn.SUM(StratPlay.error).alias('sum_error'),
fn.SUM(StratPlay.hit).alias('sum_hit'), fn.SUM(StratPlay.pa).alias('sum_chances'),
fn.SUM(StratPlay.wpa).alias('sum_wpa')
]
if group_by in ['player', 'playerteam', 'playergame', 'playerweek']:
def_select_fields.insert(0, StratPlay.defender) # Add defender as first field
if group_by in ['team', 'playerteam', 'teamgame', 'teamweek']:
def_select_fields.append(StratPlay.defender_team)
def_plays = (
StratPlay
.select(*def_select_fields)
.where((StratPlay.game << season_games) & (StratPlay.defender.is_null(False)))
)
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
bat_plays = bat_plays.where(StratPlay.batter << all_players)
run_plays = run_plays.where(StratPlay.runner << all_players)
def_plays = def_plays.where(StratPlay.defender << all_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
bat_plays = bat_plays.where(StratPlay.batter_team << all_teams)
run_plays = run_plays.where(StratPlay.runner_team << all_teams)
def_plays = def_plays.where(StratPlay.defender_team << all_teams)
if position is not None:
bat_plays = bat_plays.where(StratPlay.batter_pos << position)
if obc is not None:
bat_plays = bat_plays.where(StratPlay.on_base_code << obc)
if risp is not None:
bat_plays = bat_plays.where(StratPlay.on_base_code << ['100', '101', '110', '111', '010', '011'])
if inning is not None:
bat_plays = bat_plays.where(StratPlay.inning_num << inning)
# Initialize game_select_fields for use in GROUP BY
game_select_fields = []
# Add StratPlay.game to SELECT clause for group_by scenarios that need it
if group_by in ['playergame', 'teamgame']:
# For playergame/teamgame grouping, build appropriate SELECT fields
if group_by == 'playergame':
game_select_fields = [StratPlay.batter, StratPlay.game, StratPlay.batter_team]
else: # teamgame
game_select_fields = [StratPlay.batter_team, StratPlay.game]
game_bat_plays = (
StratPlay
.select(*game_select_fields,
fn.SUM(StratPlay.pa).alias('sum_pa'),
fn.SUM(StratPlay.ab).alias('sum_ab'), fn.SUM(StratPlay.run).alias('sum_run'),
fn.SUM(StratPlay.hit).alias('sum_hit'), fn.SUM(StratPlay.rbi).alias('sum_rbi'),
fn.SUM(StratPlay.double).alias('sum_double'), fn.SUM(StratPlay.triple).alias('sum_triple'),
fn.SUM(StratPlay.homerun).alias('sum_hr'), fn.SUM(StratPlay.bb).alias('sum_bb'),
fn.SUM(StratPlay.so).alias('sum_so'),
fn.SUM(StratPlay.hbp).alias('sum_hbp'), fn.SUM(StratPlay.sac).alias('sum_sac'),
fn.SUM(StratPlay.ibb).alias('sum_ibb'), fn.SUM(StratPlay.gidp).alias('sum_gidp'),
fn.SUM(StratPlay.sb).alias('sum_sb'), fn.SUM(StratPlay.cs).alias('sum_cs'),
fn.SUM(StratPlay.bphr).alias('sum_bphr'), fn.SUM(StratPlay.bpfo).alias('sum_bpfo'),
fn.SUM(StratPlay.bp1b).alias('sum_bp1b'), fn.SUM(StratPlay.bplo).alias('sum_bplo'),
fn.SUM(StratPlay.wpa).alias('sum_wpa'), fn.SUM(StratPlay.re24_primary).alias('sum_repri'),
fn.COUNT(StratPlay.on_first_final).filter(
StratPlay.on_first_final.is_null(False) & (StratPlay.on_first_final != 4)).alias('count_lo1'),
fn.COUNT(StratPlay.on_second_final).filter(
StratPlay.on_second_final.is_null(False) & (StratPlay.on_second_final != 4)).alias('count_lo2'),
fn.COUNT(StratPlay.on_third_final).filter(
StratPlay.on_third_final.is_null(False) & (StratPlay.on_third_final != 4)).alias('count_lo3'),
fn.COUNT(StratPlay.on_first).filter(StratPlay.on_first.is_null(False)).alias('count_runner1'),
fn.COUNT(StratPlay.on_second).filter(StratPlay.on_second.is_null(False)).alias('count_runner2'),
fn.COUNT(StratPlay.on_third).filter(StratPlay.on_third.is_null(False)).alias('count_runner3'),
fn.COUNT(StratPlay.on_first_final).filter(
StratPlay.on_first_final.is_null(False) & (StratPlay.on_first_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo1_3out'),
fn.COUNT(StratPlay.on_second_final).filter(
StratPlay.on_second_final.is_null(False) & (StratPlay.on_second_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo2_3out'),
fn.COUNT(StratPlay.on_third_final).filter(
StratPlay.on_third_final.is_null(False) & (StratPlay.on_third_final != 4) &
(StratPlay.starting_outs + StratPlay.outs == 3)).alias('count_lo3_3out'))
.where((StratPlay.game << season_games) & (StratPlay.batter.is_null(False)))
.having((fn.SUM(StratPlay.pa) >= min_pa))
)
# Apply the same filters that were applied to bat_plays
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
game_bat_plays = game_bat_plays.where(StratPlay.batter << all_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
game_bat_plays = game_bat_plays.where(StratPlay.batter_team << all_teams)
if position is not None:
game_bat_plays = game_bat_plays.where(StratPlay.batter_pos << position)
if obc is not None:
game_bat_plays = game_bat_plays.where(StratPlay.on_base_code << obc)
if risp is not None:
game_bat_plays = game_bat_plays.where(StratPlay.on_base_code << ['100', '101', '110', '111', '010', '011'])
if inning is not None:
game_bat_plays = game_bat_plays.where(StratPlay.inning_num << inning)
if min_repri is not None:
game_bat_plays = game_bat_plays.having(fn.SUM(StratPlay.re24_primary) >= min_repri)
bat_plays = game_bat_plays
if group_by is not None:
if group_by == 'player':
bat_plays = bat_plays.group_by(StratPlay.batter)
run_plays = run_plays.group_by(StratPlay.runner)
def_plays = def_plays.group_by(StratPlay.defender)
elif group_by == 'team':
bat_plays = bat_plays.group_by(StratPlay.batter_team)
run_plays = run_plays.group_by(StratPlay.runner_team)
def_plays = def_plays.group_by(StratPlay.defender_team)
elif group_by == 'playerteam':
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.batter_team)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.runner_team)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team)
elif group_by == 'playergame':
if game_select_fields:
bat_plays = bat_plays.group_by(*game_select_fields)
else:
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.game)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.game)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game)
elif group_by == 'teamgame':
if game_select_fields:
bat_plays = bat_plays.group_by(*game_select_fields)
else:
bat_plays = bat_plays.group_by(StratPlay.batter_team, StratPlay.game)
run_plays = run_plays.group_by(StratPlay.runner_team, StratPlay.game)
elif group_by == 'league':
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.game.season)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.game.season)
elif group_by == 'playerweek':
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.game.week)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.game.week)
elif group_by == 'teamweek':
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.batter_team, StratPlay.game.week)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.runner_team, StratPlay.game.week)
if sort is not None:
if sort == 'player':
bat_plays = bat_plays.order_by(StratPlay.batter)
run_plays = run_plays.order_by(StratPlay.runner)
def_plays = def_plays.order_by(StratPlay.defender)
elif sort == 'team':
bat_plays = bat_plays.order_by(StratPlay.batter_team)
run_plays = run_plays.order_by(StratPlay.runner_team)
def_plays = def_plays.order_by(StratPlay.defender_team)
elif sort == 'wpa-desc':
bat_plays = bat_plays.order_by(SQL('sum_wpa').desc())
elif sort == 'wpa-asc':
bat_plays = bat_plays.order_by(SQL('sum_wpa').asc())
elif sort == 'repri-desc':
bat_plays = bat_plays.order_by(SQL('sum_repri').desc())
elif sort == 'repri-asc':
bat_plays = bat_plays.order_by(SQL('sum_repri').asc())
elif sort == 'pa-desc':
bat_plays = bat_plays.order_by(SQL('sum_pa').desc())
elif sort == 'pa-asc':
bat_plays = bat_plays.order_by(SQL('sum_pa').asc())
elif sort == 'newest':
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ['playergame', 'teamgame']:
# StratPlay.game is in GROUP BY for these cases
bat_plays = bat_plays.order_by(StratPlay.game.desc())
run_plays = run_plays.order_by(StratPlay.game.desc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
elif sort == 'oldest':
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ['playergame', 'teamgame']:
# StratPlay.game is in GROUP BY for these cases
bat_plays = bat_plays.order_by(StratPlay.game.asc())
run_plays = run_plays.order_by(StratPlay.game.asc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
if limit < 1:
limit = 1
bat_plays = bat_plays.paginate(page_num, limit)
logger.info(f'bat_plays query: {bat_plays}')
logger.info(f'run_plays query: {run_plays}')
return_stats = {
'count': bat_plays.count(),
'stats': []
}
for x in bat_plays:
this_run = run_plays
if group_by == 'player':
this_run = this_run.where(StratPlay.runner == x.batter)
elif group_by == 'team':
this_run = this_run.where(StratPlay.batter_team == x.batter_team)
elif group_by == 'playerteam':
this_run = this_run.where((StratPlay.runner == x.batter) & (StratPlay.batter_team == x.batter_team))
elif group_by == 'playergame':
this_run = this_run.where((StratPlay.runner == x.batter) & (StratPlay.game == x.game))
elif group_by == 'teamgame':
this_run = this_run.where((StratPlay.batter_team == x.batter_team) & (StratPlay.game == x.game))
elif group_by == 'playerweek':
this_run = this_run.where((StratPlay.runner == x.batter) & (StratPlay.game.week == x.game.week))
elif group_by == 'teamweek':
this_run = this_run.where((StratPlay.batter_team == x.batter_team) & (StratPlay.game.week == x.game.week))
if this_run.count() > 0:
sum_sb = this_run[0].sum_sb
sum_cs = this_run[0].sum_cs
run_wpa = this_run[0].sum_wpa
sum_rerun = this_run[0].sum_rerun
else:
sum_sb = 0
sum_cs = 0
run_wpa = 0
sum_rerun = 0
this_wpa = bat_plays.where(
(StratPlay.wpa >= min_wpa) & (StratPlay.wpa <= max_wpa) & (StratPlay.batter == x.batter)
)
if this_wpa.count() > 0:
sum_wpa = this_wpa[0].sum_wpa
else:
sum_wpa = 0
this_repri = bat_plays.where(StratPlay.batter == x.batter)
if this_wpa.count() > 0:
sum_repri = this_repri[0].sum_repri
else:
sum_repri = 0
tot_ab = x.sum_ab if x.sum_ab > 0 else 1
obp = (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / x.sum_pa
slg = (x.sum_hr * 4 + x.sum_triple * 3 + x.sum_double * 2 +
(x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)) / tot_ab
this_game = 'TOT'
if group_by in ['playergame', 'teamgame']:
this_game = x.game.id if short_output else model_to_dict(x.game, recurse=False)
this_week = 'TOT'
if group_by in ['playerweek', 'teamweek']:
this_week = x.game.week
this_player = 'TOT'
if 'player' in group_by:
this_player = x.batter_id if short_output else model_to_dict(x.batter, recurse=False)
lob_all_rate, lob_2outs_rate, rbi_rate = 0, 0, 0
if x.count_runner1 + x.count_runner2 + x.count_runner3 > 0:
lob_all_rate = (x.count_lo1 + x.count_lo2 + x.count_lo3) / \
(x.count_runner1 + x.count_runner2 + x.count_runner3)
rbi_rate = (x.sum_rbi - x.sum_hr) / (x.count_runner1 + x.count_runner2 + x.count_runner3)
# Handle team field based on grouping - set to 'TOT' when not grouping by team
if hasattr(x, 'batter_team') and x.batter_team is not None:
team_info = x.batter_team_id if short_output else model_to_dict(x.batter_team, recurse=False)
else:
team_info = 'TOT'
return_stats['stats'].append({
'player': this_player,
'team': team_info,
'pa': x.sum_pa,
'ab': x.sum_ab,
'run': x.sum_run,
'hit': x.sum_hit,
'rbi': x.sum_rbi,
'double': x.sum_double,
'triple': x.sum_triple,
'hr': x.sum_hr,
'bb': x.sum_bb,
'so': x.sum_so,
'hbp': x.sum_hbp,
'sac': x.sum_sac,
'ibb': x.sum_ibb,
'gidp': x.sum_gidp,
'sb': sum_sb,
'cs': sum_cs,
'bphr': x.sum_bphr,
'bpfo': x.sum_bpfo,
'bp1b': x.sum_bp1b,
'bplo': x.sum_bplo,
'wpa': sum_wpa + run_wpa,
'avg': x.sum_hit / tot_ab,
'obp': obp,
'slg': slg,
'ops': obp + slg,
'woba': (.69 * x.sum_bb + .72 * x.sum_hbp + .89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr) +
1.27 * x.sum_double + 1.62 * x.sum_triple + 2.1 * x.sum_hr) / max(x.sum_pa - x.sum_ibb, 1),
'game': this_game,
'lob_all': x.count_lo1 + x.count_lo2 + x.count_lo3,
'lob_all_rate': lob_all_rate,
'lob_2outs': x.count_lo1_3out + x.count_lo2_3out + x.count_lo3_3out,
'rbi%': rbi_rate,
'week': this_week,
're24_primary': sum_repri,
# 're24_running': sum_rerun
})
db.close()
return return_stats
@router.get('/pitching')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='plays-batting')
async def get_pitching_totals(
season: list = Query(default=None), week: list = Query(default=None),
s_type: Literal['regular', 'post', 'total', None] = None, player_id: list = Query(default=None),
group_by: Literal['team', 'player', 'playerteam', 'playergame', 'teamgame', 'league', 'playerweek',
'teamweek'] = 'player',
min_pa: Optional[int] = 1, team_id: list = Query(default=None), manager_id: list = Query(default=None),
obc: list = Query(default=None), risp: Optional[bool] = None, inning: list = Query(default=None),
sort: Optional[str] = None, limit: Optional[int] = 200, short_output: Optional[bool] = False,
csv: Optional[bool] = False, page_num: Optional[int] = 1, week_start: Optional[int] = None,
week_end: Optional[int] = None):
season_games = StratGame.select()
if season is not None:
season_games = season_games.where(StratGame.season << season)
if week is not None and s_type is not None:
raise HTTPException(status_code=400, detail=f'Week and s_type parameters cannot be used in the same query')
if week is not None and (week_start is not None or week_end is not None):
raise HTTPException(
status_code=400, detail=f'Week and week_start/week_end parameters cannot be used in the same query')
if week is not None:
season_games = season_games.where(StratGame.week << week)
if week_start is not None:
season_games = season_games.where(StratGame.week >= week_start)
if week_end is not None:
season_games = season_games.where(StratGame.week <= week_end)
if s_type is not None:
if s_type == 'regular':
season_games = season_games.where(StratGame.week <= 18)
elif s_type == 'post':
season_games = season_games.where(StratGame.week > 18)
if manager_id is not None:
season_games = season_games.where(
(StratGame.away_manager_id << manager_id) | (StratGame.home_manager_id << manager_id)
)
# Build SELECT fields conditionally based on group_by for pitching to match GROUP BY exactly
pitch_select_fields = []
if group_by == 'player':
pitch_select_fields = [StratPlay.pitcher]
elif group_by == 'team':
pitch_select_fields = [StratPlay.pitcher_team]
elif group_by == 'playerteam':
pitch_select_fields = [StratPlay.pitcher, StratPlay.pitcher_team]
elif group_by == 'playergame':
pitch_select_fields = [StratPlay.pitcher, StratPlay.game]
elif group_by == 'teamgame':
pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game]
elif group_by == 'playerweek':
pitch_select_fields = [StratPlay.pitcher, StratPlay.game]
elif group_by == 'teamweek':
pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game]
else:
# Default case
pitch_select_fields = [StratPlay.pitcher]
# Build Peewee query for pitching stats
pitch_plays = (
StratPlay
.select(*pitch_select_fields,
fn.SUM(StratPlay.pa).alias('sum_pa'),
fn.SUM(StratPlay.ab).alias('sum_ab'),
fn.SUM(StratPlay.run).alias('sum_run'),
fn.SUM(StratPlay.hit).alias('sum_hit'),
fn.SUM(StratPlay.rbi).alias('sum_rbi'),
fn.SUM(StratPlay.double).alias('sum_double'),
fn.SUM(StratPlay.triple).alias('sum_triple'),
fn.SUM(StratPlay.homerun).alias('sum_hr'),
fn.SUM(StratPlay.bb).alias('sum_bb'),
fn.SUM(StratPlay.so).alias('sum_so'),
fn.SUM(StratPlay.wpa).alias('sum_wpa'),
fn.SUM(StratPlay.hbp).alias('sum_hbp'),
fn.SUM(StratPlay.sac).alias('sum_sac'),
fn.SUM(StratPlay.ibb).alias('sum_ibb'),
fn.SUM(StratPlay.gidp).alias('sum_gidp'),
fn.SUM(StratPlay.sb).alias('sum_sb'),
fn.SUM(StratPlay.cs).alias('sum_cs'),
fn.SUM(StratPlay.bphr).alias('sum_bphr'),
fn.SUM(StratPlay.bpfo).alias('sum_bpfo'),
fn.SUM(StratPlay.bp1b).alias('sum_bp1b'),
fn.SUM(StratPlay.bplo).alias('sum_bplo'),
fn.SUM(StratPlay.wild_pitch).alias('sum_wp'),
fn.SUM(StratPlay.balk).alias('sum_balk'),
fn.SUM(StratPlay.outs).alias('sum_outs'),
fn.SUM(StratPlay.e_run).alias('sum_erun'),
fn.SUM(StratPlay.re24_primary).alias('sum_repri'))
.where((StratPlay.game << season_games) & (StratPlay.pitcher.is_null(False)))
.having(fn.SUM(StratPlay.pa) >= min_pa)
)
# Apply filters to the pitching query
if player_id is not None:
pitch_plays = pitch_plays.where(StratPlay.pitcher << player_id)
if team_id is not None:
pitch_plays = pitch_plays.where(StratPlay.pitcher_team << team_id)
# Group by the fields
if pitch_select_fields:
pitch_plays = pitch_plays.group_by(*pitch_select_fields)
# Apply sorting
if sort is not None:
if sort == 'player':
pitch_plays = pitch_plays.order_by(StratPlay.pitcher)
elif sort == 'team':
pitch_plays = pitch_plays.order_by(StratPlay.pitcher_team)
elif sort == 'wpa-desc':
pitch_plays = pitch_plays.order_by(SQL('sum_wpa').desc())
elif sort == 'wpa-asc':
pitch_plays = pitch_plays.order_by(SQL('sum_wpa').asc())
elif sort == 'repri-desc':
pitch_plays = pitch_plays.order_by(SQL('sum_repri').desc())
elif sort == 'repri-asc':
pitch_plays = pitch_plays.order_by(SQL('sum_repri').asc())
elif sort == 'pa-desc':
pitch_plays = pitch_plays.order_by(SQL('sum_pa').desc())
elif sort == 'pa-asc':
pitch_plays = pitch_plays.order_by(SQL('sum_pa').asc())
elif sort == 'newest':
if group_by in ['playergame', 'teamgame']:
pitch_plays = pitch_plays.order_by(StratPlay.game.desc())
elif sort == 'oldest':
if group_by in ['playergame', 'teamgame']:
pitch_plays = pitch_plays.order_by(StratPlay.game.asc())
if limit < 1:
limit = 1
pitch_plays = pitch_plays.paginate(page_num, limit)
# Execute the Peewee query
return_stats = {
'count': 0,
'stats': []
}
for x in pitch_plays:
# Extract basic stats from Peewee result
tot_outs = x.sum_outs if x.sum_outs > 0 else 1
obp = (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / x.sum_pa
slg = (x.sum_hr * 4 + x.sum_triple * 3 + x.sum_double * 2 +
(x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)) / max(x.sum_ab, 1)
tot_bb = 0.1 if x.sum_bb == 0 else x.sum_bb
# Handle player field based on grouping with safe access (similar to fielding)
this_player = 'TOT'
if 'player' in group_by:
try:
this_player = x.pitcher_id if short_output else model_to_dict(x.pitcher, recurse=False)
except Exception as e:
logger.error('Error extracting pitcher from query: {e}\n\nx: {x}',stack_info=True)
# Handle team field based on grouping with safe access
team_info = 'TOT'
if 'team' in group_by and hasattr(x, 'pitcher_team'):
pitcher_team_obj = getattr(x, 'pitcher_team', None)
if pitcher_team_obj:
team_info = pitcher_team_obj.id if short_output else model_to_dict(pitcher_team_obj, recurse=False)
# Handle game field based on grouping with safe access
this_game = 'TOT'
if 'game' in group_by:
this_game = x.game_id if short_output else model_to_dict(x.game, recurse=False)
this_week = 'TOT'
if group_by in ['playerweek', 'teamweek']:
game_obj = getattr(x, 'game', None)
this_week = game_obj.week if game_obj else 'TOT'
# Get Decision data for this specific grouping
decision_query = Decision.select(
fn.SUM(Decision.win).alias('sum_win'),
fn.SUM(Decision.loss).alias('sum_loss'),
fn.SUM(Decision.hold).alias('sum_hold'),
fn.SUM(Decision.is_save).alias('sum_save'),
fn.SUM(Decision.b_save).alias('sum_b_save'),
fn.SUM(Decision.irunners).alias('sum_irunners'),
fn.SUM(Decision.irunners_scored).alias('sum_irun_scored'),
fn.SUM(Decision.is_start.cast('integer')).alias('sum_gs'),
fn.COUNT(Decision.game_id).alias('sum_game')
).where(Decision.game << season_games)
# Apply same filters as main query based on grouping
if 'player' in group_by:
decision_query = decision_query.where(Decision.pitcher == x.pitcher_id)
if 'team' in group_by and hasattr(x, 'pitcher_team') and x.pitcher_team:
# Filter by the team field in Decision table directly
team_obj = getattr(x, 'pitcher_team', None)
if team_obj:
decision_query = decision_query.where(Decision.team == team_obj.id)
if 'game' in group_by:
decision_query = decision_query.where(Decision.game == x.game_id)
# Execute decision query
try:
decision_result = decision_query.get()
decision_data = {
'sum_win': decision_result.sum_win or 0,
'sum_loss': decision_result.sum_loss or 0,
'sum_hold': decision_result.sum_hold or 0,
'sum_save': decision_result.sum_save or 0,
'sum_b_save': decision_result.sum_b_save or 0,
'sum_irunners': decision_result.sum_irunners or 0,
'sum_irun_scored': decision_result.sum_irun_scored or 0,
'sum_gs': decision_result.sum_gs or 0,
'sum_game': decision_result.sum_game or 0
}
except Decision.DoesNotExist:
# No decision data found for this grouping
decision_data = {
'sum_win': 0, 'sum_loss': 0, 'sum_hold': 0, 'sum_save': 0, 'sum_b_save': 0,
'sum_irunners': 0, 'sum_irun_scored': 0, 'sum_gs': 0, 'sum_game': 0
}
return_stats['stats'].append({
'player': this_player,
'team': team_info,
'tbf': x.sum_pa,
'outs': x.sum_outs,
'games': decision_data['sum_game'],
'gs': decision_data['sum_gs'],
'win': decision_data['sum_win'],
'loss': decision_data['sum_loss'],
'hold': decision_data['sum_hold'],
'save': decision_data['sum_save'],
'bsave': decision_data['sum_b_save'],
'ir': decision_data['sum_irunners'],
'ir_sc': decision_data['sum_irun_scored'],
'ab': x.sum_ab,
'run': x.sum_run,
'e_run': x.sum_erun,
'hits': x.sum_hit,
'double': x.sum_double,
'triple': x.sum_triple,
'hr': x.sum_hr,
'bb': x.sum_bb,
'so': x.sum_so,
'hbp': x.sum_hbp,
'sac': x.sum_sac,
'ibb': x.sum_ibb,
'gidp': x.sum_gidp,
'sb': x.sum_sb,
'cs': x.sum_cs,
'bphr': x.sum_bphr,
'bpfo': x.sum_bpfo,
'bp1b': x.sum_bp1b,
'bplo': x.sum_bplo,
'wp': x.sum_wp,
'balk': x.sum_balk,
'wpa': x.sum_wpa * -1,
'era': (x.sum_erun * 27) / tot_outs,
'whip': ((x.sum_bb + x.sum_hit + x.sum_ibb) * 3) / tot_outs,
'avg': x.sum_hit / max(x.sum_ab, 1),
'obp': obp,
'slg': slg,
'ops': obp + slg,
'woba': (.69 * x.sum_bb + .72 * x.sum_hbp + .89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr) +
1.27 * x.sum_double + 1.62 * x.sum_triple + 2.1 * x.sum_hr) / max(x.sum_pa - x.sum_ibb, 1),
'k/9': x.sum_so * 9 / (tot_outs / 3),
'bb/9': x.sum_bb * 9 / (tot_outs / 3),
'k/bb': x.sum_so / tot_bb,
'game': this_game,
'lob_2outs': 0, # Not available in current implementation
'rbi%': 0, # Not available in current implementation
'week': this_week,
're24_primary': x.sum_repri * -1 if x.sum_repri is not None else None
})
return_stats['count'] = len(return_stats['stats'])
db.close()
if csv:
return Response(content=complex_data_to_csv(return_stats['stats']), media_type='text/csv')
return return_stats
@router.get('/fielding')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='plays-fielding')
async def get_fielding_totals(
season: list = Query(default=None), week: list = Query(default=None),
s_type: Literal['regular', 'post', 'total', None] = None, position: list = Query(default=None),
player_id: list = Query(default=None),
group_by: Literal['team', 'player', 'playerteam', 'playerposition', 'teamposition', 'playerpositiongame',
'playergame', 'playerteamposition', 'playerweek', 'teamweek'] = 'player',
week_start: Optional[int] = None, week_end: Optional[int] = None,
min_ch: Optional[int] = 1, team_id: list = Query(default=None), manager_id: list = Query(default=None),
sort: Optional[str] = None, limit: Optional[int] = 200, short_output: Optional[bool] = False,
page_num: Optional[int] = 1):
season_games = StratGame.select()
if season is not None:
season_games = season_games.where(StratGame.season << season)
if week is not None and s_type is not None:
raise HTTPException(status_code=400, detail=f'Week and s_type parameters cannot be used in the same query')
if week is not None and (week_start is not None or week_end is not None):
raise HTTPException(
status_code=400, detail=f'Week and week_start/week_end parameters cannot be used in the same query')
if week is not None:
season_games = season_games.where(StratGame.week << week)
if week_start is not None:
season_games = season_games.where(StratGame.week >= week_start)
if week_end is not None:
season_games = season_games.where(StratGame.week <= week_end)
if s_type is not None:
if s_type == 'regular':
season_games = season_games.where(StratGame.week <= 18)
elif s_type == 'post':
season_games = season_games.where(StratGame.week > 18)
if manager_id is not None:
season_games = season_games.where(
(StratGame.away_manager_id << manager_id) | (StratGame.home_manager_id << manager_id)
)
# Build SELECT fields conditionally based on group_by for fielding to match GROUP BY exactly
def_select_fields = []
cat_select_fields = []
if group_by == 'player':
def_select_fields = [StratPlay.defender]
cat_select_fields = [StratPlay.catcher]
elif group_by == 'team':
def_select_fields = [StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher_team]
elif group_by == 'playerteam':
def_select_fields = [StratPlay.defender, StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
elif group_by == 'playerposition':
def_select_fields = [StratPlay.defender, StratPlay.check_pos]
cat_select_fields = [StratPlay.catcher]
elif group_by == 'teamposition':
def_select_fields = [StratPlay.defender_team, StratPlay.check_pos]
cat_select_fields = [StratPlay.catcher_team]
elif group_by == 'playergame':
def_select_fields = [StratPlay.defender, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == 'playerpositiongame':
def_select_fields = [StratPlay.defender, StratPlay.check_pos, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == 'playerteamposition':
def_select_fields = [StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
elif group_by == 'playerweek':
def_select_fields = [StratPlay.defender, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == 'teamweek':
def_select_fields = [StratPlay.defender_team, StratPlay.game]
cat_select_fields = [StratPlay.catcher_team, StratPlay.game]
else:
# Default case
def_select_fields = [StratPlay.defender, StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
# Ensure def_select_fields is not empty
if not def_select_fields:
def_select_fields = [StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos]
def_plays = (
StratPlay
.select(*def_select_fields,
fn.SUM(StratPlay.error).alias('sum_error'),
fn.SUM(StratPlay.hit).alias('sum_hit'), fn.SUM(StratPlay.pa).alias('sum_chances'),
fn.SUM(StratPlay.wpa).alias('sum_wpa'))
.where((StratPlay.game << season_games) & (StratPlay.defender.is_null(False)))
.having(fn.SUM(StratPlay.pa) >= min_ch)
)
# Ensure cat_select_fields is not empty
if not cat_select_fields:
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
cat_plays = (
StratPlay
.select(*cat_select_fields, fn.SUM(StratPlay.sb).alias('sum_sb'),
fn.SUM(StratPlay.cs).alias('sum_cs'), fn.SUM(StratPlay.wpa).alias('sum_wpa'),
fn.SUM(StratPlay.passed_ball).alias('sum_pb'), fn.SUM(StratPlay.error).alias('sum_error'))
.where((StratPlay.game << season_games) & (StratPlay.catcher.is_null(False)))
)
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
def_plays = def_plays.where(StratPlay.defender << all_players)
cat_plays = cat_plays.where(StratPlay.catcher << all_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
def_plays = def_plays.where(StratPlay.defender_team << all_teams)
cat_plays = cat_plays.where(StratPlay.catcher_team << all_teams)
if position is not None:
def_plays = def_plays.where(StratPlay.check_pos << position)
if group_by is not None:
if group_by == 'player':
def_plays = def_plays.group_by(StratPlay.defender)
cat_plays = cat_plays.group_by(StratPlay.catcher)
elif group_by == 'team':
def_plays = def_plays.group_by(StratPlay.defender_team)
cat_plays = cat_plays.group_by(StratPlay.catcher_team)
elif group_by == 'playerteam':
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team)
elif group_by == 'playerposition':
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.check_pos)
cat_plays = cat_plays.group_by(StratPlay.catcher)
elif group_by == 'teamposition':
def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.check_pos)
cat_plays = cat_plays.group_by(StratPlay.catcher_team)
elif group_by == 'playergame':
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game)
elif group_by == 'playerpositiongame':
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.check_pos, StratPlay.game)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game)
elif group_by == 'playerteamposition':
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team)
elif group_by == 'playerweek':
def_plays = def_plays.join(StratGame)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game.week)
cat_plays = cat_plays.join(StratGame)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game.week)
elif group_by == 'teamweek':
def_plays = def_plays.join(StratGame)
def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.game.week)
cat_plays = cat_plays.join(StratGame)
cat_plays = cat_plays.group_by(StratPlay.catcher_team, StratPlay.game.week)
if sort is not None:
if sort == 'player':
def_plays = def_plays.order_by(StratPlay.defender)
elif sort == 'team':
def_plays = def_plays.order_by(StratPlay.defender_team)
elif sort == 'wpa-desc':
def_plays = def_plays.order_by(SQL('sum_wpa').asc())
elif sort == 'wpa-asc':
def_plays = def_plays.order_by(SQL('sum_wpa').desc())
elif sort == 'ch-desc':
def_plays = def_plays.order_by(SQL('sum_chances').desc())
elif sort == 'ch-asc':
def_plays = def_plays.order_by(SQL('sum_chances').asc())
elif sort == 'newest':
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ['playergame', 'playerpositiongame', 'playerweek', 'teamweek']:
# StratPlay.game is in GROUP BY for these cases
def_plays = def_plays.order_by(StratPlay.game.desc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
elif sort == 'oldest':
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ['playergame', 'playerpositiongame', 'playerweek', 'teamweek']:
# StratPlay.game is in GROUP BY for these cases
def_plays = def_plays.order_by(StratPlay.game.asc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
if limit < 1:
limit = 1
def_plays = def_plays.paginate(page_num, limit)
logger.info(f'def_plays query: {def_plays}')
return_stats = {
'count': def_plays.count(),
'stats': []
}
for x in def_plays:
logger.info(f'this_play: {x}')
# this_cat = cat_plays.where(StratPlay.catcher == x.defender)
# if this_cat.count() > 0:
# sum_sb = this_cat[0].sum_sb
# sum_cs = this_cat[0].sum_cs
# sum_wpa = this_cat[0].sum_wpa
# sum_pb = this_cat[0].sum_pb
# sum_error = this_cat[0].sum_error + x.sum_error
# else:
# sum_sb = 0
# sum_cs = 0
# sum_wpa = 0
# sum_pb = 0
# sum_error = x.sum_error
this_pos = 'TOT'
if 'position' in group_by:
this_pos = x.check_pos
this_cat = cat_plays
if group_by in ['player', 'playerposition']:
this_cat = this_cat.where(StratPlay.catcher == x.defender)
elif group_by in ['team', 'teamposition']:
this_cat = this_cat.where(StratPlay.catcher_team == x.defender_team)
elif group_by in ['playerteam', 'playerteamposition']:
this_cat = this_cat.where((StratPlay.catcher == x.defender) & (StratPlay.catcher_team == x.defender_team))
elif group_by in ['playergame', 'playerpositiongame']:
this_cat = this_cat.where((StratPlay.catcher == x.defender) & (StratPlay.game == x.game))
elif group_by == 'playerweek':
this_cat = this_cat.where((StratPlay.catcher == x.defender) & (StratPlay.game.week == x.game.week))
elif group_by == 'teamweek':
this_cat = this_cat.where((StratPlay.catcher_team == x.defender_team) & (StratPlay.game.week == x.game.week))
this_cat = this_cat.where(StratPlay.game == x.game)
if this_cat.count() > 0:
sum_sb = this_cat[0].sum_sb
sum_cs = this_cat[0].sum_cs
sum_wpa = this_cat[0].sum_wpa
sum_pb = this_cat[0].sum_pb
sum_error = this_cat[0].sum_error + x.sum_error
else:
sum_sb = 0
sum_cs = 0
sum_wpa = 0
sum_pb = 0
sum_error = x.sum_error
this_player = 'TOT'
if 'player' in group_by:
this_player = x.defender_id if short_output else model_to_dict(x.defender, recurse=False)
this_game = 'TOT'
if 'game' in group_by:
this_game = x.game_id if short_output else model_to_dict(x.game, recurse=False)
this_week = 'TOT'
if group_by in ['playerweek', 'teamweek']:
game_obj = getattr(x, 'game', None)
this_week = game_obj.week if game_obj else 'TOT'
# Handle team field based on grouping with safe access
defender_team_obj = getattr(x, 'defender_team', None)
team_info = 'TOT'
if defender_team_obj:
team_info = defender_team_obj.id if short_output else model_to_dict(defender_team_obj, recurse=False)
return_stats['stats'].append({
'player': this_player,
'team': team_info,
'pos': this_pos,
'x-ch': x.sum_chances,
'hit': x.sum_hit,
'error': sum_error,
'sb-ch': sum_sb + sum_cs,
'sb': sum_sb,
'cs': sum_cs,
'pb': sum_pb,
'wpa': (x.sum_wpa + sum_wpa) * -1,
'wf%': (x.sum_chances - (x.sum_error * .5) - (x.sum_hit * .75)) / x.sum_chances,
'cs%': sum_cs / (sum_sb + sum_cs) if (sum_sb + sum_cs) > 0 else None,
'game': this_game,
'week': this_week
})
db.close()
return return_stats
@router.get('/{play_id}')
@handle_db_errors
async def get_one_play(play_id: int):
if StratPlay.get_or_none(StratPlay.id == play_id) is None:
db.close()
raise HTTPException(status_code=404, detail=f'Play ID {play_id} not found')
r_play = model_to_dict(StratPlay.get_by_id(play_id))
db.close()
return r_play
@router.patch('/{play_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def patch_play(play_id: int, new_play: PlayModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f'patch_play - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
if StratPlay.get_or_none(StratPlay.id == play_id) is None:
db.close()
raise HTTPException(status_code=404, detail=f'Play ID {play_id} not found')
StratPlay.update(**new_play.dict()).where(StratPlay.id == play_id).execute()
r_play = model_to_dict(StratPlay.get_by_id(play_id))
db.close()
return r_play
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def post_plays(p_list: PlayList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f'post_plays - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
new_plays = []
this_game = StratGame.get_or_none(StratGame.id == p_list.plays[0].game_id)
if this_game is None:
raise HTTPException(status_code=404, detail=f'Game ID {p_list.plays[0].game_id} not found')
for play in p_list.plays:
this_play = play
this_play.inning_half = this_play.inning_half.lower()
top_half = this_play.inning_half == 'top'
if this_play.batter_team_id is None and this_play.batter_id is not None:
this_play.batter_team_id = this_game.away_team.id if top_half else this_game.home_team.id
if this_play.pitcher_team_id is None:
this_play.pitcher_team_id = this_game.home_team.id if top_half else this_game.away_team.id
if this_play.catcher_id is not None:
this_play.catcher_team_id = this_game.home_team.id if top_half else this_game.away_team.id
if this_play.defender_id is not None:
this_play.defender_team_id = this_game.home_team.id if top_half else this_game.away_team.id
if this_play.runner_id is not None:
this_play.runner_team_id = this_game.away_team.id if top_half else this_game.home_team.id
if this_play.pa == 0:
this_play.batter_final = None
new_plays.append(this_play.dict())
with db.atomic():
for batch in chunked(new_plays, 20):
StratPlay.insert_many(batch).on_conflict_ignore().execute()
db.close()
return f'Inserted {len(new_plays)} plays'
@router.delete('/{play_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def delete_play(play_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f'delete_play - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_play = StratPlay.get_or_none(StratPlay.id == play_id)
if not this_play:
db.close()
raise HTTPException(status_code=404, detail=f'Play ID {play_id} not found')
count = this_play.delete_instance()
db.close()
if count == 1:
return f'Play {play_id} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Play {play_id} could not be deleted')
@router.delete('/game/{game_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def delete_plays_game(game_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f'delete_plays_game - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_game = StratGame.get_or_none(StratGame.id == game_id)
if not this_game:
db.close()
raise HTTPException(status_code=404, detail=f'Game ID {game_id} not found')
count = StratPlay.delete().where(StratPlay.game == this_game).execute()
db.close()
if count > 0:
return f'Deleted {count} plays matching Game ID {game_id}'
else:
raise HTTPException(status_code=500, detail=f'No plays matching Game ID {game_id} were deleted')
@router.post('/erun-check', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def post_erun_check(token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f'post_erun_check - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
all_plays = StratPlay.update(run=1).where((StratPlay.e_run == 1) & (StratPlay.run == 0))
count = all_plays.execute()
db.close()
return count