feat/stratplay-sbaplayer #17

Merged
cal merged 2 commits from feat/stratplay-sbaplayer into main 2026-02-17 23:56:42 +00:00
14 changed files with 2507 additions and 1377 deletions

View File

@ -11,8 +11,9 @@ docker-compose --profile sync up sync-prod # One-time production data sync
python migrations.py # Run migrations (SQL files in migrations/)
```
- **Dev server**: `10.10.0.42` | **Adminer**: `http://10.10.0.42:8080`
- **Dev server**: `10.10.0.42:814` / `ssh sba-db``cd container-data/dev-sba-database` | **Adminer**: `http://10.10.0.42:8080`
- **Production**: `ssh akamai``cd container-data/sba-database`
- **Deploy (dev/prod)**: `docker-compose pull && docker-compose down && docker-compose up -d` (on the remote server)
## Architecture
@ -23,10 +24,21 @@ python migrations.py # Run migrations (SQL files in migrat
- **POST models**: Use `Optional[int] = None` for `id` fields (DB auto-generates)
- **Logging**: Rotating file handler (`/tmp/sba-database.log`, 8MB max, 5 backups)
## Environment Variables
## Production Environment
- **Host**: `ssh akamai`
- **Path**: `~/container-data/sba-database`
- **Bot container**: `sba_postgres` (PostgreSQL) + `sba_db_api` (API) — check with `docker ps`
- **Other services on same host**: `major-domo-discord-app-1`, `sba_adminer`, `sba-website_sba-web_1`, `sba-ghost_sba-ghost_1`
- **Image**: `manticorum67/major-domo-database` (Docker Hub)
- **Version file**: `VERSION` — bump before merge to `main`
- **Health**: API Port 80 — `/health`, `/ready`, `/metrics`, `/diagnostics`
- **Env vars**: Set in `docker-compose.prod.yml` and passed to the container on startup (not stored in GitHub)
**Required**: `API_TOKEN`, `POSTGRES_HOST`, `POSTGRES_DB`, `POSTGRES_USER`, `POSTGRES_PASSWORD`
**Optional**: `POSTGRES_PORT` (5432), `LOG_LEVEL` (WARNING), `PRIVATE_IN_SCHEMA`
## Development Environment
- **Host**: `ssh sba-db`
- **Path**: `~/container-data/dev-sba-database`
- **Bot container**: `dev_sba_postgres` (PostgreSQL) + `dev_sba_db_api` (API) — check with `docker ps`
- **Image**: `manticorum67/major-domo-database:dev` (Docker Hub)
- **CI/CD**: Gitea Actions on PR to `main` — builds Docker image, auto-generates CalVer version (`YYYY.MM.BUILD`) on merge
@ -36,3 +48,4 @@ python migrations.py # Run migrations (SQL files in migrat
- PostgreSQL only (no SQLite fallback)
- Migrations are SQL files in `migrations/`, applied manually via psql
- API docs auto-generated at `/api/docs`
- **NEVER run or test against the local docker compose** — always deploy to and test against the dev server (`10.10.0.42:814`). The local container has no meaningful data.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,15 @@
from fastapi import APIRouter
from .plays import router as plays_router
from .batting import router as batting_router
from .pitching import router as pitching_router
from .fielding import router as fielding_router
from .crud import router as crud_router
router = APIRouter(prefix="/api/v3/plays", tags=["plays"])
router.include_router(plays_router)
router.include_router(batting_router)
router.include_router(pitching_router)
router.include_router(fielding_router)
router.include_router(crud_router)

View File

@ -0,0 +1,598 @@
import logging
from typing import Literal, Optional
from fastapi import APIRouter, Query
from ...db_engine import (
SQL,
StratGame,
StratPlay,
Team,
Player,
db,
fn,
model_to_dict,
)
from ...dependencies import add_cache_headers, cache_result, handle_db_errors
from .common import build_season_games
router = APIRouter()
logger = logging.getLogger("discord_app")
@router.get("/batting")
@handle_db_errors
@add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5 * 60, key_prefix="plays-batting")
async def get_batting_totals(
season: list = Query(default=None),
week: list = Query(default=None),
s_type: Literal["regular", "post", "total", None] = None,
position: list = Query(default=None),
player_id: list = Query(default=None),
sbaplayer_id: list = Query(default=None),
min_wpa: Optional[float] = -999,
max_wpa: Optional[float] = 999,
group_by: Literal[
"team",
"player",
"playerteam",
"playergame",
"teamgame",
"league",
"playerweek",
"teamweek",
"sbaplayer",
] = "player",
min_pa: Optional[int] = 1,
team_id: list = Query(default=None),
manager_id: list = Query(default=None),
obc: list = Query(default=None),
risp: Optional[bool] = None,
inning: list = Query(default=None),
sort: Optional[str] = None,
limit: Optional[int] = 200,
short_output: Optional[bool] = False,
page_num: Optional[int] = 1,
week_start: Optional[int] = None,
week_end: Optional[int] = None,
min_repri: Optional[int] = None,
):
season_games = build_season_games(
season, week, s_type, week_start, week_end, manager_id
)
# Build SELECT fields conditionally based on group_by
base_select_fields = [
fn.SUM(StratPlay.pa).alias("sum_pa"),
fn.SUM(StratPlay.ab).alias("sum_ab"),
fn.SUM(StratPlay.run).alias("sum_run"),
fn.SUM(StratPlay.hit).alias("sum_hit"),
fn.SUM(StratPlay.rbi).alias("sum_rbi"),
fn.SUM(StratPlay.double).alias("sum_double"),
fn.SUM(StratPlay.triple).alias("sum_triple"),
fn.SUM(StratPlay.homerun).alias("sum_hr"),
fn.SUM(StratPlay.bb).alias("sum_bb"),
fn.SUM(StratPlay.so).alias("sum_so"),
fn.SUM(StratPlay.hbp).alias("sum_hbp"),
fn.SUM(StratPlay.sac).alias("sum_sac"),
fn.SUM(StratPlay.ibb).alias("sum_ibb"),
fn.SUM(StratPlay.gidp).alias("sum_gidp"),
fn.SUM(StratPlay.sb).alias("sum_sb"),
fn.SUM(StratPlay.cs).alias("sum_cs"),
fn.SUM(StratPlay.bphr).alias("sum_bphr"),
fn.SUM(StratPlay.bpfo).alias("sum_bpfo"),
fn.SUM(StratPlay.bp1b).alias("sum_bp1b"),
fn.SUM(StratPlay.bplo).alias("sum_bplo"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
fn.SUM(StratPlay.re24_primary).alias("sum_repri"),
fn.COUNT(StratPlay.on_first_final)
.filter(
StratPlay.on_first_final.is_null(False) & (StratPlay.on_first_final != 4)
)
.alias("count_lo1"),
fn.COUNT(StratPlay.on_second_final)
.filter(
StratPlay.on_second_final.is_null(False) & (StratPlay.on_second_final != 4)
)
.alias("count_lo2"),
fn.COUNT(StratPlay.on_third_final)
.filter(
StratPlay.on_third_final.is_null(False) & (StratPlay.on_third_final != 4)
)
.alias("count_lo3"),
fn.COUNT(StratPlay.on_first)
.filter(StratPlay.on_first.is_null(False))
.alias("count_runner1"),
fn.COUNT(StratPlay.on_second)
.filter(StratPlay.on_second.is_null(False))
.alias("count_runner2"),
fn.COUNT(StratPlay.on_third)
.filter(StratPlay.on_third.is_null(False))
.alias("count_runner3"),
fn.COUNT(StratPlay.on_first_final)
.filter(
StratPlay.on_first_final.is_null(False)
& (StratPlay.on_first_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo1_3out"),
fn.COUNT(StratPlay.on_second_final)
.filter(
StratPlay.on_second_final.is_null(False)
& (StratPlay.on_second_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo2_3out"),
fn.COUNT(StratPlay.on_third_final)
.filter(
StratPlay.on_third_final.is_null(False)
& (StratPlay.on_third_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo3_3out"),
]
# Add player and team fields based on grouping type
if group_by in ["player", "playerteam", "playergame", "playerweek"]:
base_select_fields.insert(0, StratPlay.batter) # Add batter as first field
if group_by == "sbaplayer":
base_select_fields.insert(0, Player.sbaplayer)
if group_by in ["team", "playerteam", "teamgame", "teamweek"]:
base_select_fields.append(StratPlay.batter_team)
bat_plays = (
StratPlay.select(*base_select_fields)
.where((StratPlay.game << season_games) & (StratPlay.batter.is_null(False)))
.having((fn.SUM(StratPlay.pa) >= min_pa))
)
if min_repri is not None:
bat_plays = bat_plays.having(fn.SUM(StratPlay.re24_primary) >= min_repri)
# Build running plays SELECT fields conditionally
run_select_fields = [
fn.SUM(StratPlay.sb).alias("sum_sb"),
fn.SUM(StratPlay.cs).alias("sum_cs"),
fn.SUM(StratPlay.pick_off).alias("sum_pick"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
fn.SUM(StratPlay.re24_running).alias("sum_rerun"),
]
if group_by in ["player", "playerteam", "playergame", "playerweek"]:
run_select_fields.insert(0, StratPlay.runner) # Add runner as first field
if group_by == "sbaplayer":
run_select_fields.insert(0, Player.sbaplayer)
if group_by in ["team", "playerteam", "teamgame", "teamweek"]:
run_select_fields.append(StratPlay.runner_team)
run_plays = StratPlay.select(*run_select_fields).where(
(StratPlay.game << season_games) & (StratPlay.runner.is_null(False))
)
# Build defensive plays SELECT fields conditionally
def_select_fields = [
fn.SUM(StratPlay.error).alias("sum_error"),
fn.SUM(StratPlay.hit).alias("sum_hit"),
fn.SUM(StratPlay.pa).alias("sum_chances"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
]
if group_by in ["player", "playerteam", "playergame", "playerweek"]:
def_select_fields.insert(0, StratPlay.defender) # Add defender as first field
if group_by == "sbaplayer":
def_select_fields.insert(0, Player.sbaplayer)
if group_by in ["team", "playerteam", "teamgame", "teamweek"]:
def_select_fields.append(StratPlay.defender_team)
def_plays = StratPlay.select(*def_select_fields).where(
(StratPlay.game << season_games) & (StratPlay.defender.is_null(False))
)
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
bat_plays = bat_plays.where(StratPlay.batter << all_players)
run_plays = run_plays.where(StratPlay.runner << all_players)
def_plays = def_plays.where(StratPlay.defender << all_players)
if sbaplayer_id is not None:
sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id)
bat_plays = bat_plays.where(StratPlay.batter << sba_players)
run_plays = run_plays.where(StratPlay.runner << sba_players)
def_plays = def_plays.where(StratPlay.defender << sba_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
bat_plays = bat_plays.where(StratPlay.batter_team << all_teams)
run_plays = run_plays.where(StratPlay.runner_team << all_teams)
def_plays = def_plays.where(StratPlay.defender_team << all_teams)
if position is not None:
bat_plays = bat_plays.where(StratPlay.batter_pos << position)
if obc is not None:
bat_plays = bat_plays.where(StratPlay.on_base_code << obc)
if risp is not None:
bat_plays = bat_plays.where(
StratPlay.on_base_code << ["100", "101", "110", "111", "010", "011"]
)
if inning is not None:
bat_plays = bat_plays.where(StratPlay.inning_num << inning)
# Initialize game_select_fields for use in GROUP BY
game_select_fields = []
# Add StratPlay.game to SELECT clause for group_by scenarios that need it
if group_by in ["playergame", "teamgame"]:
# For playergame/teamgame grouping, build appropriate SELECT fields
if group_by == "playergame":
game_select_fields = [
StratPlay.batter,
StratPlay.game,
StratPlay.batter_team,
]
else: # teamgame
game_select_fields = [StratPlay.batter_team, StratPlay.game]
game_bat_plays = (
StratPlay.select(
*game_select_fields,
fn.SUM(StratPlay.pa).alias("sum_pa"),
fn.SUM(StratPlay.ab).alias("sum_ab"),
fn.SUM(StratPlay.run).alias("sum_run"),
fn.SUM(StratPlay.hit).alias("sum_hit"),
fn.SUM(StratPlay.rbi).alias("sum_rbi"),
fn.SUM(StratPlay.double).alias("sum_double"),
fn.SUM(StratPlay.triple).alias("sum_triple"),
fn.SUM(StratPlay.homerun).alias("sum_hr"),
fn.SUM(StratPlay.bb).alias("sum_bb"),
fn.SUM(StratPlay.so).alias("sum_so"),
fn.SUM(StratPlay.hbp).alias("sum_hbp"),
fn.SUM(StratPlay.sac).alias("sum_sac"),
fn.SUM(StratPlay.ibb).alias("sum_ibb"),
fn.SUM(StratPlay.gidp).alias("sum_gidp"),
fn.SUM(StratPlay.sb).alias("sum_sb"),
fn.SUM(StratPlay.cs).alias("sum_cs"),
fn.SUM(StratPlay.bphr).alias("sum_bphr"),
fn.SUM(StratPlay.bpfo).alias("sum_bpfo"),
fn.SUM(StratPlay.bp1b).alias("sum_bp1b"),
fn.SUM(StratPlay.bplo).alias("sum_bplo"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
fn.SUM(StratPlay.re24_primary).alias("sum_repri"),
fn.COUNT(StratPlay.on_first_final)
.filter(
StratPlay.on_first_final.is_null(False)
& (StratPlay.on_first_final != 4)
)
.alias("count_lo1"),
fn.COUNT(StratPlay.on_second_final)
.filter(
StratPlay.on_second_final.is_null(False)
& (StratPlay.on_second_final != 4)
)
.alias("count_lo2"),
fn.COUNT(StratPlay.on_third_final)
.filter(
StratPlay.on_third_final.is_null(False)
& (StratPlay.on_third_final != 4)
)
.alias("count_lo3"),
fn.COUNT(StratPlay.on_first)
.filter(StratPlay.on_first.is_null(False))
.alias("count_runner1"),
fn.COUNT(StratPlay.on_second)
.filter(StratPlay.on_second.is_null(False))
.alias("count_runner2"),
fn.COUNT(StratPlay.on_third)
.filter(StratPlay.on_third.is_null(False))
.alias("count_runner3"),
fn.COUNT(StratPlay.on_first_final)
.filter(
StratPlay.on_first_final.is_null(False)
& (StratPlay.on_first_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo1_3out"),
fn.COUNT(StratPlay.on_second_final)
.filter(
StratPlay.on_second_final.is_null(False)
& (StratPlay.on_second_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo2_3out"),
fn.COUNT(StratPlay.on_third_final)
.filter(
StratPlay.on_third_final.is_null(False)
& (StratPlay.on_third_final != 4)
& (StratPlay.starting_outs + StratPlay.outs == 3)
)
.alias("count_lo3_3out"),
)
.where((StratPlay.game << season_games) & (StratPlay.batter.is_null(False)))
.having((fn.SUM(StratPlay.pa) >= min_pa))
)
# Apply the same filters that were applied to bat_plays
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
game_bat_plays = game_bat_plays.where(StratPlay.batter << all_players)
if sbaplayer_id is not None:
sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id)
game_bat_plays = game_bat_plays.where(StratPlay.batter << sba_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
game_bat_plays = game_bat_plays.where(StratPlay.batter_team << all_teams)
if position is not None:
game_bat_plays = game_bat_plays.where(StratPlay.batter_pos << position)
if obc is not None:
game_bat_plays = game_bat_plays.where(StratPlay.on_base_code << obc)
if risp is not None:
game_bat_plays = game_bat_plays.where(
StratPlay.on_base_code << ["100", "101", "110", "111", "010", "011"]
)
if inning is not None:
game_bat_plays = game_bat_plays.where(StratPlay.inning_num << inning)
if min_repri is not None:
game_bat_plays = game_bat_plays.having(
fn.SUM(StratPlay.re24_primary) >= min_repri
)
bat_plays = game_bat_plays
if group_by is not None:
if group_by == "player":
bat_plays = bat_plays.group_by(StratPlay.batter)
run_plays = run_plays.group_by(StratPlay.runner)
def_plays = def_plays.group_by(StratPlay.defender)
elif group_by == "team":
bat_plays = bat_plays.group_by(StratPlay.batter_team)
run_plays = run_plays.group_by(StratPlay.runner_team)
def_plays = def_plays.group_by(StratPlay.defender_team)
elif group_by == "playerteam":
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.batter_team)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.runner_team)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team)
elif group_by == "playergame":
if game_select_fields:
bat_plays = bat_plays.group_by(*game_select_fields)
else:
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.game)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.game)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game)
elif group_by == "teamgame":
if game_select_fields:
bat_plays = bat_plays.group_by(*game_select_fields)
else:
bat_plays = bat_plays.group_by(StratPlay.batter_team, StratPlay.game)
run_plays = run_plays.group_by(StratPlay.runner_team, StratPlay.game)
elif group_by == "league":
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.game.season)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.game.season)
elif group_by == "playerweek":
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.batter, StratPlay.game.week)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.runner, StratPlay.game.week)
elif group_by == "teamweek":
bat_plays = bat_plays.join(StratGame)
bat_plays = bat_plays.group_by(StratPlay.batter_team, StratPlay.game.week)
run_plays = run_plays.join(StratGame)
run_plays = run_plays.group_by(StratPlay.runner_team, StratPlay.game.week)
elif group_by == "sbaplayer":
bat_plays = bat_plays.join(
Player, on=(StratPlay.batter == Player.id)
).where(Player.sbaplayer.is_null(False))
bat_plays = bat_plays.group_by(Player.sbaplayer)
run_plays = run_plays.join(
Player, on=(StratPlay.runner == Player.id)
).where(Player.sbaplayer.is_null(False))
run_plays = run_plays.group_by(Player.sbaplayer)
def_plays = def_plays.join(
Player, on=(StratPlay.defender == Player.id)
).where(Player.sbaplayer.is_null(False))
def_plays = def_plays.group_by(Player.sbaplayer)
if sort is not None:
if sort == "player":
bat_plays = bat_plays.order_by(StratPlay.batter)
run_plays = run_plays.order_by(StratPlay.runner)
def_plays = def_plays.order_by(StratPlay.defender)
elif sort == "team":
bat_plays = bat_plays.order_by(StratPlay.batter_team)
run_plays = run_plays.order_by(StratPlay.runner_team)
def_plays = def_plays.order_by(StratPlay.defender_team)
elif sort == "wpa-desc":
bat_plays = bat_plays.order_by(SQL("sum_wpa").desc())
elif sort == "wpa-asc":
bat_plays = bat_plays.order_by(SQL("sum_wpa").asc())
elif sort == "repri-desc":
bat_plays = bat_plays.order_by(SQL("sum_repri").desc())
elif sort == "repri-asc":
bat_plays = bat_plays.order_by(SQL("sum_repri").asc())
elif sort == "pa-desc":
bat_plays = bat_plays.order_by(SQL("sum_pa").desc())
elif sort == "pa-asc":
bat_plays = bat_plays.order_by(SQL("sum_pa").asc())
elif sort == "newest":
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ["playergame", "teamgame"]:
# StratPlay.game is in GROUP BY for these cases
bat_plays = bat_plays.order_by(StratPlay.game.desc())
run_plays = run_plays.order_by(StratPlay.game.desc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
elif sort == "oldest":
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in ["playergame", "teamgame"]:
# StratPlay.game is in GROUP BY for these cases
bat_plays = bat_plays.order_by(StratPlay.game.asc())
run_plays = run_plays.order_by(StratPlay.game.asc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
if limit < 1:
limit = 1
bat_plays = bat_plays.paginate(page_num, limit)
logger.info(f"bat_plays query: {bat_plays}")
logger.info(f"run_plays query: {run_plays}")
return_stats = {"count": bat_plays.count(), "stats": []}
for x in bat_plays:
this_run = run_plays
if group_by == "player":
this_run = this_run.where(StratPlay.runner == x.batter)
elif group_by == "team":
this_run = this_run.where(StratPlay.batter_team == x.batter_team)
elif group_by == "playerteam":
this_run = this_run.where(
(StratPlay.runner == x.batter)
& (StratPlay.batter_team == x.batter_team)
)
elif group_by == "playergame":
this_run = this_run.where(
(StratPlay.runner == x.batter) & (StratPlay.game == x.game)
)
elif group_by == "teamgame":
this_run = this_run.where(
(StratPlay.batter_team == x.batter_team) & (StratPlay.game == x.game)
)
elif group_by == "playerweek":
this_run = this_run.where(
(StratPlay.runner == x.batter) & (StratPlay.game.week == x.game.week)
)
elif group_by == "teamweek":
this_run = this_run.where(
(StratPlay.batter_team == x.batter_team)
& (StratPlay.game.week == x.game.week)
)
elif group_by == "sbaplayer":
this_run = this_run.where(Player.sbaplayer == x.batter.sbaplayer)
if this_run.count() > 0:
sum_sb = this_run[0].sum_sb
sum_cs = this_run[0].sum_cs
run_wpa = this_run[0].sum_wpa
sum_rerun = this_run[0].sum_rerun
else:
sum_sb = 0
sum_cs = 0
run_wpa = 0
sum_rerun = 0
if group_by == "sbaplayer":
wpa_filter = Player.sbaplayer == x.batter.sbaplayer
repri_filter = Player.sbaplayer == x.batter.sbaplayer
else:
wpa_filter = StratPlay.batter == x.batter
repri_filter = StratPlay.batter == x.batter
this_wpa = bat_plays.where(
(StratPlay.wpa >= min_wpa) & (StratPlay.wpa <= max_wpa) & wpa_filter
)
if this_wpa.count() > 0:
sum_wpa = this_wpa[0].sum_wpa
else:
sum_wpa = 0
this_repri = bat_plays.where(repri_filter)
if this_wpa.count() > 0:
sum_repri = this_repri[0].sum_repri
else:
sum_repri = 0
tot_ab = x.sum_ab if x.sum_ab > 0 else 1
obp = (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / x.sum_pa
slg = (
x.sum_hr * 4
+ x.sum_triple * 3
+ x.sum_double * 2
+ (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)
) / tot_ab
this_game = "TOT"
if group_by in ["playergame", "teamgame"]:
this_game = (
x.game.id if short_output else model_to_dict(x.game, recurse=False)
)
this_week = "TOT"
if group_by in ["playerweek", "teamweek"]:
this_week = x.game.week
this_player = "TOT"
if group_by == "sbaplayer":
this_player = (
x.batter.sbaplayer_id
if short_output
else model_to_dict(x.batter.sbaplayer, recurse=False)
)
elif "player" in group_by:
this_player = (
x.batter_id if short_output else model_to_dict(x.batter, recurse=False)
)
lob_all_rate, lob_2outs_rate, rbi_rate = 0, 0, 0
if x.count_runner1 + x.count_runner2 + x.count_runner3 > 0:
lob_all_rate = (x.count_lo1 + x.count_lo2 + x.count_lo3) / (
x.count_runner1 + x.count_runner2 + x.count_runner3
)
rbi_rate = (x.sum_rbi - x.sum_hr) / (
x.count_runner1 + x.count_runner2 + x.count_runner3
)
# Handle team field based on grouping - set to 'TOT' when not grouping by team
if hasattr(x, "batter_team") and x.batter_team is not None:
team_info = (
x.batter_team_id
if short_output
else model_to_dict(x.batter_team, recurse=False)
)
else:
team_info = "TOT"
return_stats["stats"].append(
{
"player": this_player,
"team": team_info,
"pa": x.sum_pa,
"ab": x.sum_ab,
"run": x.sum_run,
"hit": x.sum_hit,
"rbi": x.sum_rbi,
"double": x.sum_double,
"triple": x.sum_triple,
"hr": x.sum_hr,
"bb": x.sum_bb,
"so": x.sum_so,
"hbp": x.sum_hbp,
"sac": x.sum_sac,
"ibb": x.sum_ibb,
"gidp": x.sum_gidp,
"sb": sum_sb,
"cs": sum_cs,
"bphr": x.sum_bphr,
"bpfo": x.sum_bpfo,
"bp1b": x.sum_bp1b,
"bplo": x.sum_bplo,
"wpa": sum_wpa + run_wpa,
"avg": x.sum_hit / tot_ab,
"obp": obp,
"slg": slg,
"ops": obp + slg,
"woba": (
0.69 * x.sum_bb
+ 0.72 * x.sum_hbp
+ 0.89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)
+ 1.27 * x.sum_double
+ 1.62 * x.sum_triple
+ 2.1 * x.sum_hr
)
/ max(x.sum_pa - x.sum_ibb, 1),
"game": this_game,
"lob_all": x.count_lo1 + x.count_lo2 + x.count_lo3,
"lob_all_rate": lob_all_rate,
"lob_2outs": x.count_lo1_3out + x.count_lo2_3out + x.count_lo3_3out,
"rbi%": rbi_rate,
"week": this_week,
"re24_primary": sum_repri,
# 're24_running': sum_rerun
}
)
db.close()
return return_stats

View File

@ -0,0 +1,37 @@
from fastapi import HTTPException
from ...db_engine import StratGame
def build_season_games(season, week, s_type, week_start, week_end, manager_id=None):
"""Build the filtered StratGame subquery used by all stats endpoints."""
season_games = StratGame.select()
if season is not None:
season_games = season_games.where(StratGame.season << season)
if week is not None and s_type is not None:
raise HTTPException(
status_code=400,
detail="Week and s_type parameters cannot be used in the same query",
)
if week is not None and (week_start is not None or week_end is not None):
raise HTTPException(
status_code=400,
detail="Week and week_start/week_end parameters cannot be used in the same query",
)
if week is not None:
season_games = season_games.where(StratGame.week << week)
if week_start is not None:
season_games = season_games.where(StratGame.week >= week_start)
if week_end is not None:
season_games = season_games.where(StratGame.week <= week_end)
if s_type is not None:
if s_type == "regular":
season_games = season_games.where(StratGame.week <= 18)
elif s_type == "post":
season_games = season_games.where(StratGame.week > 18)
if manager_id is not None:
season_games = season_games.where(
(StratGame.away_manager_id << manager_id)
| (StratGame.home_manager_id << manager_id)
)
return season_games

View File

@ -0,0 +1,159 @@
import logging
from fastapi import APIRouter, Depends, HTTPException
from ...db_engine import db, StratPlay, StratGame, model_to_dict, chunked
from ...dependencies import (
oauth2_scheme,
valid_token,
PRIVATE_IN_SCHEMA,
handle_db_errors,
)
from .models import PlayModel, PlayList
router = APIRouter()
logger = logging.getLogger("discord_app")
@router.get("/{play_id}")
@handle_db_errors
async def get_one_play(play_id: int):
if StratPlay.get_or_none(StratPlay.id == play_id) is None:
db.close()
raise HTTPException(status_code=404, detail=f"Play ID {play_id} not found")
r_play = model_to_dict(StratPlay.get_by_id(play_id))
db.close()
return r_play
@router.patch("/{play_id}", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def patch_play(
play_id: int, new_play: PlayModel, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logger.warning(f"patch_play - Bad Token: {token}")
raise HTTPException(status_code=401, detail="Unauthorized")
if StratPlay.get_or_none(StratPlay.id == play_id) is None:
db.close()
raise HTTPException(status_code=404, detail=f"Play ID {play_id} not found")
StratPlay.update(**new_play.dict()).where(StratPlay.id == play_id).execute()
r_play = model_to_dict(StratPlay.get_by_id(play_id))
db.close()
return r_play
@router.post("/", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def post_plays(p_list: PlayList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f"post_plays - Bad Token: {token}")
raise HTTPException(status_code=401, detail="Unauthorized")
new_plays = []
this_game = StratGame.get_or_none(StratGame.id == p_list.plays[0].game_id)
if this_game is None:
raise HTTPException(
status_code=404, detail=f"Game ID {p_list.plays[0].game_id} not found"
)
for play in p_list.plays:
this_play = play
this_play.inning_half = this_play.inning_half.lower()
top_half = this_play.inning_half == "top"
if this_play.batter_team_id is None and this_play.batter_id is not None:
this_play.batter_team_id = (
this_game.away_team.id if top_half else this_game.home_team.id
)
if this_play.pitcher_team_id is None:
this_play.pitcher_team_id = (
this_game.home_team.id if top_half else this_game.away_team.id
)
if this_play.catcher_id is not None:
this_play.catcher_team_id = (
this_game.home_team.id if top_half else this_game.away_team.id
)
if this_play.defender_id is not None:
this_play.defender_team_id = (
this_game.home_team.id if top_half else this_game.away_team.id
)
if this_play.runner_id is not None:
this_play.runner_team_id = (
this_game.away_team.id if top_half else this_game.home_team.id
)
if this_play.pa == 0:
this_play.batter_final = None
new_plays.append(this_play.dict())
with db.atomic():
for batch in chunked(new_plays, 20):
StratPlay.insert_many(batch).on_conflict_ignore().execute()
db.close()
return f"Inserted {len(new_plays)} plays"
@router.delete("/{play_id}", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def delete_play(play_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f"delete_play - Bad Token: {token}")
raise HTTPException(status_code=401, detail="Unauthorized")
this_play = StratPlay.get_or_none(StratPlay.id == play_id)
if not this_play:
db.close()
raise HTTPException(status_code=404, detail=f"Play ID {play_id} not found")
count = this_play.delete_instance()
db.close()
if count == 1:
return f"Play {play_id} has been deleted"
else:
raise HTTPException(
status_code=500, detail=f"Play {play_id} could not be deleted"
)
@router.delete("/game/{game_id}", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def delete_plays_game(game_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f"delete_plays_game - Bad Token: {token}")
raise HTTPException(status_code=401, detail="Unauthorized")
this_game = StratGame.get_or_none(StratGame.id == game_id)
if not this_game:
db.close()
raise HTTPException(status_code=404, detail=f"Game ID {game_id} not found")
count = StratPlay.delete().where(StratPlay.game == this_game).execute()
db.close()
if count > 0:
return f"Deleted {count} plays matching Game ID {game_id}"
else:
raise HTTPException(
status_code=500, detail=f"No plays matching Game ID {game_id} were deleted"
)
@router.post("/erun-check", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def post_erun_check(token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logger.warning(f"post_erun_check - Bad Token: {token}")
raise HTTPException(status_code=401, detail="Unauthorized")
all_plays = StratPlay.update(run=1).where(
(StratPlay.e_run == 1) & (StratPlay.run == 0)
)
count = all_plays.execute()
db.close()
return count

View File

@ -0,0 +1,365 @@
from fastapi import APIRouter, Query
from typing import Optional, Literal
import logging
from ...db_engine import (
db,
SbaPlayer,
StratPlay,
StratGame,
Team,
Player,
model_to_dict,
fn,
SQL,
)
from ...dependencies import handle_db_errors, add_cache_headers, cache_result
from .common import build_season_games
logger = logging.getLogger("discord_app")
router = APIRouter()
@router.get("/fielding")
@handle_db_errors
@add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5 * 60, key_prefix="plays-fielding")
async def get_fielding_totals(
season: list = Query(default=None),
week: list = Query(default=None),
s_type: Literal["regular", "post", "total", None] = None,
position: list = Query(default=None),
player_id: list = Query(default=None),
sbaplayer_id: list = Query(default=None),
group_by: Literal[
"team",
"player",
"playerteam",
"playerposition",
"teamposition",
"playerpositiongame",
"playergame",
"playerteamposition",
"playerweek",
"teamweek",
"sbaplayer",
] = "player",
week_start: Optional[int] = None,
week_end: Optional[int] = None,
min_ch: Optional[int] = 1,
team_id: list = Query(default=None),
manager_id: list = Query(default=None),
sort: Optional[str] = None,
limit: Optional[int] = 200,
short_output: Optional[bool] = False,
page_num: Optional[int] = 1,
):
season_games = build_season_games(
season, week, s_type, week_start, week_end, manager_id
)
# Build SELECT fields conditionally based on group_by for fielding to match GROUP BY exactly
def_select_fields = []
cat_select_fields = []
if group_by == "player":
def_select_fields = [StratPlay.defender]
cat_select_fields = [StratPlay.catcher]
elif group_by == "team":
def_select_fields = [StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher_team]
elif group_by == "playerteam":
def_select_fields = [StratPlay.defender, StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
elif group_by == "playerposition":
def_select_fields = [StratPlay.defender, StratPlay.check_pos]
cat_select_fields = [StratPlay.catcher]
elif group_by == "teamposition":
def_select_fields = [StratPlay.defender_team, StratPlay.check_pos]
cat_select_fields = [StratPlay.catcher_team]
elif group_by == "playergame":
def_select_fields = [StratPlay.defender, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == "playerpositiongame":
def_select_fields = [StratPlay.defender, StratPlay.check_pos, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == "playerteamposition":
def_select_fields = [
StratPlay.defender,
StratPlay.defender_team,
StratPlay.check_pos,
]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
elif group_by == "playerweek":
def_select_fields = [StratPlay.defender, StratPlay.game]
cat_select_fields = [StratPlay.catcher, StratPlay.game]
elif group_by == "teamweek":
def_select_fields = [StratPlay.defender_team, StratPlay.game]
cat_select_fields = [StratPlay.catcher_team, StratPlay.game]
elif group_by == "sbaplayer":
def_select_fields = [Player.sbaplayer]
cat_select_fields = [Player.sbaplayer]
else:
# Default case
def_select_fields = [StratPlay.defender, StratPlay.defender_team]
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
# Ensure def_select_fields is not empty
if not def_select_fields:
def_select_fields = [
StratPlay.defender,
StratPlay.defender_team,
StratPlay.check_pos,
]
def_plays = (
StratPlay.select(
*def_select_fields,
fn.SUM(StratPlay.error).alias("sum_error"),
fn.SUM(StratPlay.hit).alias("sum_hit"),
fn.SUM(StratPlay.pa).alias("sum_chances"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
)
.where((StratPlay.game << season_games) & (StratPlay.defender.is_null(False)))
.having(fn.SUM(StratPlay.pa) >= min_ch)
)
# Ensure cat_select_fields is not empty
if not cat_select_fields:
cat_select_fields = [StratPlay.catcher, StratPlay.catcher_team]
cat_plays = StratPlay.select(
*cat_select_fields,
fn.SUM(StratPlay.sb).alias("sum_sb"),
fn.SUM(StratPlay.cs).alias("sum_cs"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
fn.SUM(StratPlay.passed_ball).alias("sum_pb"),
fn.SUM(StratPlay.error).alias("sum_error"),
).where((StratPlay.game << season_games) & (StratPlay.catcher.is_null(False)))
if player_id is not None:
all_players = Player.select().where(Player.id << player_id)
def_plays = def_plays.where(StratPlay.defender << all_players)
cat_plays = cat_plays.where(StratPlay.catcher << all_players)
if sbaplayer_id is not None:
sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id)
def_plays = def_plays.where(StratPlay.defender << sba_players)
cat_plays = cat_plays.where(StratPlay.catcher << sba_players)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
def_plays = def_plays.where(StratPlay.defender_team << all_teams)
cat_plays = cat_plays.where(StratPlay.catcher_team << all_teams)
if position is not None:
def_plays = def_plays.where(StratPlay.check_pos << position)
if group_by is not None:
if group_by == "player":
def_plays = def_plays.group_by(StratPlay.defender)
cat_plays = cat_plays.group_by(StratPlay.catcher)
elif group_by == "team":
def_plays = def_plays.group_by(StratPlay.defender_team)
cat_plays = cat_plays.group_by(StratPlay.catcher_team)
elif group_by == "playerteam":
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.defender_team)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team)
elif group_by == "playerposition":
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.check_pos)
cat_plays = cat_plays.group_by(StratPlay.catcher)
elif group_by == "teamposition":
def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.check_pos)
cat_plays = cat_plays.group_by(StratPlay.catcher_team)
elif group_by == "playergame":
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game)
elif group_by == "playerpositiongame":
def_plays = def_plays.group_by(
StratPlay.defender, StratPlay.check_pos, StratPlay.game
)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game)
elif group_by == "playerteamposition":
def_plays = def_plays.group_by(
StratPlay.defender, StratPlay.defender_team, StratPlay.check_pos
)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.catcher_team)
elif group_by == "playerweek":
def_plays = def_plays.join(StratGame)
def_plays = def_plays.group_by(StratPlay.defender, StratPlay.game.week)
cat_plays = cat_plays.join(StratGame)
cat_plays = cat_plays.group_by(StratPlay.catcher, StratPlay.game.week)
elif group_by == "teamweek":
def_plays = def_plays.join(StratGame)
def_plays = def_plays.group_by(StratPlay.defender_team, StratPlay.game.week)
cat_plays = cat_plays.join(StratGame)
cat_plays = cat_plays.group_by(StratPlay.catcher_team, StratPlay.game.week)
elif group_by == "sbaplayer":
def_plays = def_plays.join(
Player, on=(StratPlay.defender == Player.id)
).where(Player.sbaplayer.is_null(False))
def_plays = def_plays.group_by(Player.sbaplayer)
cat_plays = cat_plays.join(
Player, on=(StratPlay.catcher == Player.id)
).where(Player.sbaplayer.is_null(False))
cat_plays = cat_plays.group_by(Player.sbaplayer)
if sort is not None:
if sort == "player":
def_plays = def_plays.order_by(StratPlay.defender)
elif sort == "team":
def_plays = def_plays.order_by(StratPlay.defender_team)
elif sort == "wpa-desc":
def_plays = def_plays.order_by(SQL("sum_wpa").asc())
elif sort == "wpa-asc":
def_plays = def_plays.order_by(SQL("sum_wpa").desc())
elif sort == "ch-desc":
def_plays = def_plays.order_by(SQL("sum_chances").desc())
elif sort == "ch-asc":
def_plays = def_plays.order_by(SQL("sum_chances").asc())
elif sort == "newest":
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in [
"playergame",
"playerpositiongame",
"playerweek",
"teamweek",
]:
# StratPlay.game is in GROUP BY for these cases
def_plays = def_plays.order_by(StratPlay.game.desc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
elif sort == "oldest":
# For grouped queries, only sort by fields in GROUP BY clause
if group_by in [
"playergame",
"playerpositiongame",
"playerweek",
"teamweek",
]:
# StratPlay.game is in GROUP BY for these cases
def_plays = def_plays.order_by(StratPlay.game.asc())
# For other group_by values, skip game_id/play_num sorting since they're not in GROUP BY
if limit < 1:
limit = 1
def_plays = def_plays.paginate(page_num, limit)
logger.info(f"def_plays query: {def_plays}")
return_stats = {"count": def_plays.count(), "stats": []}
for x in def_plays:
logger.info(f"this_play: {x}")
# this_cat = cat_plays.where(StratPlay.catcher == x.defender)
# if this_cat.count() > 0:
# sum_sb = this_cat[0].sum_sb
# sum_cs = this_cat[0].sum_cs
# sum_wpa = this_cat[0].sum_wpa
# sum_pb = this_cat[0].sum_pb
# sum_error = this_cat[0].sum_error + x.sum_error
# else:
# sum_sb = 0
# sum_cs = 0
# sum_wpa = 0
# sum_pb = 0
# sum_error = x.sum_error
this_pos = "TOT"
if "position" in group_by:
this_pos = x.check_pos
this_cat = cat_plays
if group_by in ["player", "playerposition"]:
this_cat = this_cat.where(StratPlay.catcher == x.defender)
elif group_by in ["team", "teamposition"]:
this_cat = this_cat.where(StratPlay.catcher_team == x.defender_team)
elif group_by in ["playerteam", "playerteamposition"]:
this_cat = this_cat.where(
(StratPlay.catcher == x.defender)
& (StratPlay.catcher_team == x.defender_team)
)
elif group_by in ["playergame", "playerpositiongame"]:
this_cat = this_cat.where(
(StratPlay.catcher == x.defender) & (StratPlay.game == x.game)
)
elif group_by == "playerweek":
this_cat = this_cat.where(
(StratPlay.catcher == x.defender) & (StratPlay.game.week == x.game.week)
)
elif group_by == "teamweek":
this_cat = this_cat.where(
(StratPlay.catcher_team == x.defender_team)
& (StratPlay.game.week == x.game.week)
)
this_cat = this_cat.where(StratPlay.game == x.game)
elif group_by == "sbaplayer":
this_cat = this_cat.where(Player.sbaplayer == x.defender.sbaplayer)
if this_cat.count() > 0:
sum_sb = this_cat[0].sum_sb
sum_cs = this_cat[0].sum_cs
sum_wpa = this_cat[0].sum_wpa
sum_pb = this_cat[0].sum_pb
sum_error = this_cat[0].sum_error + x.sum_error
else:
sum_sb = 0
sum_cs = 0
sum_wpa = 0
sum_pb = 0
sum_error = x.sum_error
this_player = "TOT"
if group_by == "sbaplayer":
this_player = (
x.defender.sbaplayer_id
if short_output
else model_to_dict(x.defender.sbaplayer, recurse=False)
)
elif "player" in group_by:
this_player = (
x.defender_id
if short_output
else model_to_dict(x.defender, recurse=False)
)
this_game = "TOT"
if "game" in group_by:
this_game = (
x.game_id if short_output else model_to_dict(x.game, recurse=False)
)
this_week = "TOT"
if group_by in ["playerweek", "teamweek"]:
game_obj = getattr(x, "game", None)
this_week = game_obj.week if game_obj else "TOT"
# Handle team field based on grouping with safe access
defender_team_obj = getattr(x, "defender_team", None)
team_info = "TOT"
if defender_team_obj:
team_info = (
defender_team_obj.id
if short_output
else model_to_dict(defender_team_obj, recurse=False)
)
return_stats["stats"].append(
{
"player": this_player,
"team": team_info,
"pos": this_pos,
"x-ch": x.sum_chances,
"hit": x.sum_hit,
"error": sum_error,
"sb-ch": sum_sb + sum_cs,
"sb": sum_sb,
"cs": sum_cs,
"pb": sum_pb,
"wpa": (x.sum_wpa + sum_wpa) * -1,
"wf%": (x.sum_chances - (x.sum_error * 0.5) - (x.sum_hit * 0.75))
/ x.sum_chances,
"cs%": sum_cs / (sum_sb + sum_cs) if (sum_sb + sum_cs) > 0 else None,
"game": this_game,
"week": this_week,
}
)
db.close()
return return_stats

View File

@ -0,0 +1,106 @@
from typing import List, Literal
from pydantic import BaseModel, validator
POS_LIST = Literal[
"C", "1B", "2B", "3B", "SS", "LF", "CF", "RF", "P", "DH", "PH", "PR", "GHOST"
]
class PlayModel(BaseModel):
game_id: int
play_num: int
batter_id: int = None
batter_team_id: int = None
pitcher_id: int
pitcher_team_id: int = None
on_base_code: str
inning_half: Literal["top", "bot", "Top", "Bot"]
inning_num: int
batting_order: int
starting_outs: int
away_score: int
home_score: int
batter_pos: POS_LIST = None
on_first_id: int = None
on_first_final: int = None
on_second_id: int = None
on_second_final: int = None
on_third_id: int = None
on_third_final: int = None
batter_final: int = None
pa: int = 0
ab: int = 0
e_run: int = 0
run: int = 0
hit: int = 0
rbi: int = 0
double: int = 0
triple: int = 0
homerun: int = 0
bb: int = 0
so: int = 0
hbp: int = 0
sac: int = 0
ibb: int = 0
gidp: int = 0
bphr: int = 0
bpfo: int = 0
bp1b: int = 0
bplo: int = 0
sb: int = 0
cs: int = 0
outs: int = 0
wpa: float = 0
catcher_id: int = None
catcher_team_id: int = None
defender_id: int = None
defender_team_id: int = None
runner_id: int = None
runner_team_id: int = None
check_pos: POS_LIST = None
error: int = 0
wild_pitch: int = 0
passed_ball: int = 0
pick_off: int = 0
balk: int = 0
is_go_ahead: bool = False
is_tied: bool = False
is_new_inning: bool = False
hand_batting: str = None
hand_pitching: str = None
re24_primary: float = None
re24_running: float = None
@validator("on_first_final")
def no_final_if_no_runner_one(cls, v, values):
if values["on_first_id"] is None:
return None
return v
@validator("on_second_final")
def no_final_if_no_runner_two(cls, v, values):
if values["on_second_id"] is None:
return None
return v
@validator("on_third_final")
def no_final_if_no_runner_three(cls, v, values):
if values["on_third_id"] is None:
return None
return v
@validator("batter_final")
def no_final_if_no_batter(cls, v, values):
if values["batter_id"] is None:
return None
return v
class PlayList(BaseModel):
plays: List[PlayModel]

View File

@ -0,0 +1,357 @@
import logging
from typing import Optional, Literal
from fastapi import APIRouter, Query, Response
from ...db_engine import (
db,
SbaPlayer,
StratPlay,
StratGame,
Team,
Player,
Decision,
model_to_dict,
fn,
SQL,
complex_data_to_csv,
)
from ...dependencies import handle_db_errors, add_cache_headers, cache_result
from .common import build_season_games
router = APIRouter()
logger = logging.getLogger("discord_app")
@router.get("/pitching")
@handle_db_errors
@add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5 * 60, key_prefix="plays-batting")
async def get_pitching_totals(
season: list = Query(default=None),
week: list = Query(default=None),
s_type: Literal["regular", "post", "total", None] = None,
player_id: list = Query(default=None),
sbaplayer_id: list = Query(default=None),
group_by: Literal[
"team",
"player",
"playerteam",
"playergame",
"teamgame",
"league",
"playerweek",
"teamweek",
"sbaplayer",
] = "player",
min_pa: Optional[int] = 1,
team_id: list = Query(default=None),
manager_id: list = Query(default=None),
obc: list = Query(default=None),
risp: Optional[bool] = None,
inning: list = Query(default=None),
sort: Optional[str] = None,
limit: Optional[int] = 200,
short_output: Optional[bool] = False,
csv: Optional[bool] = False,
page_num: Optional[int] = 1,
week_start: Optional[int] = None,
week_end: Optional[int] = None,
):
season_games = build_season_games(
season, week, s_type, week_start, week_end, manager_id
)
# Build SELECT fields conditionally based on group_by for pitching to match GROUP BY exactly
pitch_select_fields = []
if group_by == "player":
pitch_select_fields = [StratPlay.pitcher]
elif group_by == "team":
pitch_select_fields = [StratPlay.pitcher_team]
elif group_by == "playerteam":
pitch_select_fields = [StratPlay.pitcher, StratPlay.pitcher_team]
elif group_by == "playergame":
pitch_select_fields = [StratPlay.pitcher, StratPlay.game]
elif group_by == "teamgame":
pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game]
elif group_by == "playerweek":
pitch_select_fields = [StratPlay.pitcher, StratPlay.game]
elif group_by == "teamweek":
pitch_select_fields = [StratPlay.pitcher_team, StratPlay.game]
elif group_by == "sbaplayer":
pitch_select_fields = [Player.sbaplayer]
else:
# Default case
pitch_select_fields = [StratPlay.pitcher]
# Build Peewee query for pitching stats
pitch_plays = (
StratPlay.select(
*pitch_select_fields,
fn.SUM(StratPlay.pa).alias("sum_pa"),
fn.SUM(StratPlay.ab).alias("sum_ab"),
fn.SUM(StratPlay.run).alias("sum_run"),
fn.SUM(StratPlay.hit).alias("sum_hit"),
fn.SUM(StratPlay.rbi).alias("sum_rbi"),
fn.SUM(StratPlay.double).alias("sum_double"),
fn.SUM(StratPlay.triple).alias("sum_triple"),
fn.SUM(StratPlay.homerun).alias("sum_hr"),
fn.SUM(StratPlay.bb).alias("sum_bb"),
fn.SUM(StratPlay.so).alias("sum_so"),
fn.SUM(StratPlay.wpa).alias("sum_wpa"),
fn.SUM(StratPlay.hbp).alias("sum_hbp"),
fn.SUM(StratPlay.sac).alias("sum_sac"),
fn.SUM(StratPlay.ibb).alias("sum_ibb"),
fn.SUM(StratPlay.gidp).alias("sum_gidp"),
fn.SUM(StratPlay.sb).alias("sum_sb"),
fn.SUM(StratPlay.cs).alias("sum_cs"),
fn.SUM(StratPlay.bphr).alias("sum_bphr"),
fn.SUM(StratPlay.bpfo).alias("sum_bpfo"),
fn.SUM(StratPlay.bp1b).alias("sum_bp1b"),
fn.SUM(StratPlay.bplo).alias("sum_bplo"),
fn.SUM(StratPlay.wild_pitch).alias("sum_wp"),
fn.SUM(StratPlay.balk).alias("sum_balk"),
fn.SUM(StratPlay.outs).alias("sum_outs"),
fn.SUM(StratPlay.e_run).alias("sum_erun"),
fn.SUM(StratPlay.re24_primary).alias("sum_repri"),
)
.where((StratPlay.game << season_games) & (StratPlay.pitcher.is_null(False)))
.having(fn.SUM(StratPlay.pa) >= min_pa)
)
# Apply filters to the pitching query
if player_id is not None:
pitch_plays = pitch_plays.where(StratPlay.pitcher << player_id)
if sbaplayer_id is not None:
sba_players = Player.select().where(Player.sbaplayer_id << sbaplayer_id)
pitch_plays = pitch_plays.where(StratPlay.pitcher << sba_players)
if team_id is not None:
pitch_plays = pitch_plays.where(StratPlay.pitcher_team << team_id)
# Add JOINs for special group_by modes
if group_by == "sbaplayer":
pitch_plays = pitch_plays.join(
Player, on=(StratPlay.pitcher == Player.id)
).where(Player.sbaplayer.is_null(False))
# Group by the fields
if pitch_select_fields:
pitch_plays = pitch_plays.group_by(*pitch_select_fields)
# Apply sorting
if sort is not None:
if sort == "player":
pitch_plays = pitch_plays.order_by(StratPlay.pitcher)
elif sort == "team":
pitch_plays = pitch_plays.order_by(StratPlay.pitcher_team)
elif sort == "wpa-desc":
pitch_plays = pitch_plays.order_by(SQL("sum_wpa").desc())
elif sort == "wpa-asc":
pitch_plays = pitch_plays.order_by(SQL("sum_wpa").asc())
elif sort == "repri-desc":
pitch_plays = pitch_plays.order_by(SQL("sum_repri").desc())
elif sort == "repri-asc":
pitch_plays = pitch_plays.order_by(SQL("sum_repri").asc())
elif sort == "pa-desc":
pitch_plays = pitch_plays.order_by(SQL("sum_pa").desc())
elif sort == "pa-asc":
pitch_plays = pitch_plays.order_by(SQL("sum_pa").asc())
elif sort == "newest":
if group_by in ["playergame", "teamgame"]:
pitch_plays = pitch_plays.order_by(StratPlay.game.desc())
elif sort == "oldest":
if group_by in ["playergame", "teamgame"]:
pitch_plays = pitch_plays.order_by(StratPlay.game.asc())
if limit < 1:
limit = 1
pitch_plays = pitch_plays.paginate(page_num, limit)
# Execute the Peewee query
return_stats = {"count": 0, "stats": []}
for x in pitch_plays:
# Extract basic stats from Peewee result
tot_outs = x.sum_outs if x.sum_outs > 0 else 1
obp = (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / x.sum_pa
slg = (
x.sum_hr * 4
+ x.sum_triple * 3
+ x.sum_double * 2
+ (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)
) / max(x.sum_ab, 1)
tot_bb = 0.1 if x.sum_bb == 0 else x.sum_bb
# Handle player field based on grouping with safe access (similar to fielding)
this_player = "TOT"
if group_by == "sbaplayer":
this_player = (
x.pitcher.sbaplayer_id
if short_output
else model_to_dict(x.pitcher.sbaplayer, recurse=False)
)
elif "player" in group_by:
try:
this_player = (
x.pitcher_id
if short_output
else model_to_dict(x.pitcher, recurse=False)
)
except Exception as e:
logger.error(
"Error extracting pitcher from query: {e}\n\nx: {x}",
stack_info=True,
)
# Handle team field based on grouping with safe access
team_info = "TOT"
if "team" in group_by and hasattr(x, "pitcher_team"):
pitcher_team_obj = getattr(x, "pitcher_team", None)
if pitcher_team_obj:
team_info = (
pitcher_team_obj.id
if short_output
else model_to_dict(pitcher_team_obj, recurse=False)
)
# Handle game field based on grouping with safe access
this_game = "TOT"
if "game" in group_by:
this_game = (
x.game_id if short_output else model_to_dict(x.game, recurse=False)
)
this_week = "TOT"
if group_by in ["playerweek", "teamweek"]:
game_obj = getattr(x, "game", None)
this_week = game_obj.week if game_obj else "TOT"
# Get Decision data for this specific grouping
decision_query = Decision.select(
fn.SUM(Decision.win).alias("sum_win"),
fn.SUM(Decision.loss).alias("sum_loss"),
fn.SUM(Decision.hold).alias("sum_hold"),
fn.SUM(Decision.is_save).alias("sum_save"),
fn.SUM(Decision.b_save).alias("sum_b_save"),
fn.SUM(Decision.irunners).alias("sum_irunners"),
fn.SUM(Decision.irunners_scored).alias("sum_irun_scored"),
fn.SUM(Decision.is_start.cast("integer")).alias("sum_gs"),
fn.COUNT(Decision.game_id).alias("sum_game"),
).where(Decision.game << season_games)
# Apply same filters as main query based on grouping
if group_by == "sbaplayer":
sba_pitchers = Player.select(Player.id).where(
Player.sbaplayer == x.pitcher.sbaplayer_id
)
decision_query = decision_query.where(Decision.pitcher << sba_pitchers)
elif "player" in group_by:
decision_query = decision_query.where(Decision.pitcher == x.pitcher_id)
if "team" in group_by and hasattr(x, "pitcher_team") and x.pitcher_team:
# Filter by the team field in Decision table directly
team_obj = getattr(x, "pitcher_team", None)
if team_obj:
decision_query = decision_query.where(Decision.team == team_obj.id)
if "game" in group_by:
decision_query = decision_query.where(Decision.game == x.game_id)
# Execute decision query
try:
decision_result = decision_query.get()
decision_data = {
"sum_win": decision_result.sum_win or 0,
"sum_loss": decision_result.sum_loss or 0,
"sum_hold": decision_result.sum_hold or 0,
"sum_save": decision_result.sum_save or 0,
"sum_b_save": decision_result.sum_b_save or 0,
"sum_irunners": decision_result.sum_irunners or 0,
"sum_irun_scored": decision_result.sum_irun_scored or 0,
"sum_gs": decision_result.sum_gs or 0,
"sum_game": decision_result.sum_game or 0,
}
except Decision.DoesNotExist:
# No decision data found for this grouping
decision_data = {
"sum_win": 0,
"sum_loss": 0,
"sum_hold": 0,
"sum_save": 0,
"sum_b_save": 0,
"sum_irunners": 0,
"sum_irun_scored": 0,
"sum_gs": 0,
"sum_game": 0,
}
return_stats["stats"].append(
{
"player": this_player,
"team": team_info,
"tbf": x.sum_pa,
"outs": x.sum_outs,
"games": decision_data["sum_game"],
"gs": decision_data["sum_gs"],
"win": decision_data["sum_win"],
"loss": decision_data["sum_loss"],
"hold": decision_data["sum_hold"],
"save": decision_data["sum_save"],
"bsave": decision_data["sum_b_save"],
"ir": decision_data["sum_irunners"],
"ir_sc": decision_data["sum_irun_scored"],
"ab": x.sum_ab,
"run": x.sum_run,
"e_run": x.sum_erun,
"hits": x.sum_hit,
"double": x.sum_double,
"triple": x.sum_triple,
"hr": x.sum_hr,
"bb": x.sum_bb,
"so": x.sum_so,
"hbp": x.sum_hbp,
"sac": x.sum_sac,
"ibb": x.sum_ibb,
"gidp": x.sum_gidp,
"sb": x.sum_sb,
"cs": x.sum_cs,
"bphr": x.sum_bphr,
"bpfo": x.sum_bpfo,
"bp1b": x.sum_bp1b,
"bplo": x.sum_bplo,
"wp": x.sum_wp,
"balk": x.sum_balk,
"wpa": x.sum_wpa * -1,
"era": (x.sum_erun * 27) / tot_outs,
"whip": ((x.sum_bb + x.sum_hit + x.sum_ibb) * 3) / tot_outs,
"avg": x.sum_hit / max(x.sum_ab, 1),
"obp": obp,
"slg": slg,
"ops": obp + slg,
"woba": (
0.69 * x.sum_bb
+ 0.72 * x.sum_hbp
+ 0.89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)
+ 1.27 * x.sum_double
+ 1.62 * x.sum_triple
+ 2.1 * x.sum_hr
)
/ max(x.sum_pa - x.sum_ibb, 1),
"k/9": x.sum_so * 9 / (tot_outs / 3),
"bb/9": x.sum_bb * 9 / (tot_outs / 3),
"k/bb": x.sum_so / tot_bb,
"game": this_game,
"lob_2outs": 0, # Not available in current implementation
"rbi%": 0, # Not available in current implementation
"week": this_week,
"re24_primary": x.sum_repri * -1 if x.sum_repri is not None else None,
}
)
return_stats["count"] = len(return_stats["stats"])
db.close()
if csv:
return Response(
content=complex_data_to_csv(return_stats["stats"]), media_type="text/csv"
)
return return_stats

View File

@ -0,0 +1,214 @@
import logging
from typing import Optional, Literal
from fastapi import APIRouter, Query
from ...db_engine import (
db,
StratPlay,
StratGame,
Team,
Player,
model_to_dict,
fn,
)
from ...dependencies import (
handle_db_errors,
add_cache_headers,
cache_result,
)
logger = logging.getLogger("discord_app")
router = APIRouter()
@router.get("/")
@handle_db_errors
@add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5 * 60, key_prefix="plays")
async def get_plays(
game_id: list = Query(default=None),
batter_id: list = Query(default=None),
season: list = Query(default=None),
week: list = Query(default=None),
has_defender: Optional[bool] = None,
has_catcher: Optional[bool] = None,
has_defender_or_catcher: Optional[bool] = None,
is_scoring_play: Optional[bool] = None,
pitcher_id: list = Query(default=None),
obc: list = Query(default=None),
inning: list = Query(default=None),
batting_order: list = Query(default=None),
starting_outs: list = Query(default=None),
batter_pos: list = Query(default=None),
catcher_id: list = Query(default=None),
defender_id: list = Query(default=None),
runner_id: list = Query(default=None),
offense_team_id: list = Query(default=None),
defense_team_id: list = Query(default=None),
hit: Optional[int] = None,
double: Optional[int] = None,
triple: Optional[int] = None,
homerun: Optional[int] = None,
play_num: list = Query(default=None),
error: list = Query(default=None),
sb: Optional[int] = None,
cs: Optional[int] = None,
manager_id: list = Query(default=None),
run: Optional[int] = None,
e_run: Optional[int] = None,
rbi: list = Query(default=None),
outs: list = Query(default=None),
wild_pitch: Optional[int] = None,
is_final_out: Optional[bool] = None,
is_go_ahead: Optional[bool] = None,
is_tied: Optional[bool] = None,
is_new_inning: Optional[bool] = None,
min_wpa: Optional[float] = None,
max_wpa: Optional[float] = None,
pitcher_team_id: list = Query(default=None),
short_output: Optional[bool] = False,
sort: Optional[str] = None,
limit: Optional[int] = 200,
page_num: Optional[int] = 1,
s_type: Literal["regular", "post", "total", None] = None,
):
all_plays = StratPlay.select()
if season is not None:
s_games = StratGame.select().where(StratGame.season << season)
all_plays = all_plays.where(StratPlay.game << s_games)
if week is not None:
w_games = StratGame.select().where(StratGame.week << week)
all_plays = all_plays.where(StratPlay.game << w_games)
if has_defender is not None:
all_plays = all_plays.where(StratPlay.defender.is_null(False))
if has_catcher is not None:
all_plays = all_plays.where(StratPlay.catcher.is_null(False))
if has_defender_or_catcher is not None:
all_plays = all_plays.where(
(StratPlay.catcher.is_null(False)) | (StratPlay.defender.is_null(False))
)
if game_id is not None:
all_plays = all_plays.where(StratPlay.game_id << game_id)
if batter_id is not None:
all_plays = all_plays.where(StratPlay.batter_id << batter_id)
if pitcher_id is not None:
all_plays = all_plays.where(StratPlay.pitcher_id << pitcher_id)
if obc is not None:
all_plays = all_plays.where(StratPlay.on_base_code << obc)
if inning is not None:
all_plays = all_plays.where(StratPlay.inning_num << inning)
if batting_order is not None:
all_plays = all_plays.where(StratPlay.batting_order << batting_order)
if starting_outs is not None:
all_plays = all_plays.where(StratPlay.starting_outs << starting_outs)
if batter_pos is not None:
all_plays = all_plays.where(StratPlay.batter_pos << batter_pos)
if catcher_id is not None:
all_plays = all_plays.where(StratPlay.catcher_id << catcher_id)
if defender_id is not None:
all_plays = all_plays.where(StratPlay.defender_id << defender_id)
if runner_id is not None:
all_plays = all_plays.where(StratPlay.runner_id << runner_id)
if pitcher_team_id is not None:
all_teams = Team.select().where(Team.id << pitcher_team_id)
all_plays = all_plays.where((StratPlay.pitcher_team << all_teams))
if offense_team_id is not None:
all_teams = Team.select().where(Team.id << offense_team_id)
all_plays = all_plays.where(
(StratPlay.batter_team << all_teams) | (StratPlay.runner_team << all_teams)
)
if defense_team_id is not None:
all_teams = Team.select().where(Team.id << defense_team_id)
all_plays = all_plays.where(
(StratPlay.catcher_team << all_teams)
| (StratPlay.defender_team << all_teams)
)
if hit is not None:
all_plays = all_plays.where(StratPlay.hit == hit)
if double is not None:
all_plays = all_plays.where(StratPlay.double == double)
if triple is not None:
all_plays = all_plays.where(StratPlay.triple == triple)
if homerun is not None:
all_plays = all_plays.where(StratPlay.homerun == homerun)
if sb is not None:
all_plays = all_plays.where(StratPlay.sb == sb)
if cs is not None:
all_plays = all_plays.where(StratPlay.cs == cs)
if wild_pitch is not None:
all_plays = all_plays.where(StratPlay.wild_pitch == wild_pitch)
if run is not None:
all_plays = all_plays.where(StratPlay.run == run)
if e_run is not None:
all_plays = all_plays.where(StratPlay.e_run == e_run)
if rbi is not None:
all_plays = all_plays.where(StratPlay.rbi << rbi)
if outs is not None:
all_plays = all_plays.where(StratPlay.outs << outs)
if error is not None:
all_plays = all_plays.where(StratPlay.error << error)
if manager_id is not None:
all_games = StratGame.select().where(
(StratGame.away_manager_id << manager_id)
| (StratGame.home_manager_id << manager_id)
)
all_plays = all_plays.where(StratPlay.game << all_games)
if is_final_out is not None:
all_plays = all_plays.where(StratPlay.starting_outs + StratPlay.outs == 3)
if is_go_ahead is not None:
all_plays = all_plays.where(StratPlay.is_go_ahead == is_go_ahead)
if is_tied is not None:
all_plays = all_plays.where(StratPlay.is_tied == is_tied)
if is_new_inning is not None:
all_plays = all_plays.where(StratPlay.is_new_inning == is_new_inning)
if is_scoring_play is not None:
all_plays = all_plays.where(
(StratPlay.on_first_final == 4)
| (StratPlay.on_second_final == 4)
| (StratPlay.on_third_final == 4)
| (StratPlay.batter_final == 4)
)
if min_wpa is not None:
all_plays = all_plays.where(StratPlay.wpa >= min_wpa)
if max_wpa is not None:
all_plays = all_plays.where(StratPlay.wpa <= max_wpa)
if play_num is not None:
all_plays = all_plays.where(StratPlay.play_num << play_num)
if s_type is not None:
season_games = StratGame.select()
if s_type == "regular":
season_games = season_games.where(StratGame.week <= 18)
elif s_type == "post":
season_games = season_games.where(StratGame.week > 18)
all_plays = all_plays.where(StratPlay.game << season_games)
if limit < 1:
limit = 1
bat_plays = all_plays.paginate(page_num, limit)
if sort == "wpa-desc":
all_plays = all_plays.order_by(-fn.ABS(StratPlay.wpa))
elif sort == "wpa-asc":
all_plays = all_plays.order_by(fn.ABS(StratPlay.wpa))
elif sort == "re24-desc":
all_plays = all_plays.order_by(-fn.ABS(StratPlay.re24_primary))
elif sort == "re24-asc":
all_plays = all_plays.order_by(fn.ABS(StratPlay.re24_primary))
elif sort == "newest":
all_plays = all_plays.order_by(
StratPlay.game_id.desc(), StratPlay.play_num.desc()
)
elif sort == "oldest":
all_plays = all_plays.order_by(StratPlay.game_id, StratPlay.play_num)
all_plays = all_plays.limit(limit)
return_plays = {
"count": all_plays.count(),
"plays": [model_to_dict(x, recurse=not short_output) for x in all_plays],
}
db.close()
return return_plays

View File

@ -0,0 +1,4 @@
{
"count": 0,
"stats": []
}

View File

@ -0,0 +1,4 @@
{
"count": 0,
"stats": []
}

View File

@ -0,0 +1,4 @@
{
"count": 0,
"stats": []
}

View File

@ -0,0 +1,627 @@
"""
Integration tests for stratplay router endpoints.
Hits the live dev API to verify all play routes return correct
status codes, response shapes, and data. Used as a before/after
regression suite when refactoring the stratplay router.
Usage:
# Capture baseline snapshots (run BEFORE refactor)
python -m pytest tests/integration/test_stratplay_routes.py --snapshot-update -v
# Verify against snapshots (run AFTER refactor)
python -m pytest tests/integration/test_stratplay_routes.py -v
"""
import json
import os
from pathlib import Path
import pytest
import requests
BASE_URL = os.environ.get("TEST_API_URL", "http://10.10.0.42:814")
SNAPSHOT_DIR = Path(__file__).parent / "snapshots"
@pytest.fixture(scope="module")
def api():
"""Verify the API is reachable before running tests."""
try:
r = requests.get(f"{BASE_URL}/api/v3/current", timeout=5)
r.raise_for_status()
except requests.ConnectionError:
pytest.skip(f"API not reachable at {BASE_URL}")
return BASE_URL
def save_snapshot(name: str, data: dict):
"""Save response data as a JSON snapshot."""
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
path = SNAPSHOT_DIR / f"{name}.json"
path.write_text(json.dumps(data, sort_keys=True, indent=2))
def load_snapshot(name: str) -> dict | None:
"""Load a previously saved snapshot, or None if it doesn't exist."""
path = SNAPSHOT_DIR / f"{name}.json"
if path.exists():
return json.loads(path.read_text())
return None
def snapshot_update_mode() -> bool:
"""Check if we're in snapshot update mode (baseline capture)."""
return os.environ.get("SNAPSHOT_UPDATE", "").lower() in ("1", "true", "yes")
# ---------------------------------------------------------------------------
# 1. Route registration (OpenAPI schema)
# ---------------------------------------------------------------------------
# Only public (GET) routes are visible in OpenAPI by default.
# POST/PATCH/DELETE use include_in_schema=PRIVATE_IN_SCHEMA which hides them
# unless PRIVATE_IN_SCHEMA env var is set. We test what's always visible.
EXPECTED_PLAY_ROUTES = {
"/api/v3/plays/": ["get"],
"/api/v3/plays/batting": ["get"],
"/api/v3/plays/pitching": ["get"],
"/api/v3/plays/fielding": ["get"],
"/api/v3/plays/{play_id}": ["get"],
}
class TestRouteRegistration:
def test_all_play_routes_exist(self, api):
"""Verify every expected play route is registered in the OpenAPI schema."""
r = requests.get(f"{api}/api/openapi.json", timeout=10)
assert r.status_code == 200
paths = r.json()["paths"]
for route, methods in EXPECTED_PLAY_ROUTES.items():
assert route in paths, f"Route {route} missing from OpenAPI schema"
for method in methods:
assert (
method in paths[route]
), f"Method {method.upper()} missing for {route}"
def test_play_routes_have_plays_tag(self, api):
"""All play routes should be tagged with 'plays'."""
r = requests.get(f"{api}/api/openapi.json", timeout=10)
paths = r.json()["paths"]
for route in EXPECTED_PLAY_ROUTES:
if route not in paths:
continue
for method, spec in paths[route].items():
if method in ("get", "post", "patch", "delete"):
tags = spec.get("tags", [])
assert (
"plays" in tags
), f"{method.upper()} {route} missing 'plays' tag, has {tags}"
@pytest.mark.post_deploy
@pytest.mark.skip(
reason="@cache_result decorator uses *args/**kwargs wrapper which hides "
"typed params from FastAPI's OpenAPI schema generation. "
"The parameter works functionally (see sbaplayer_filter tests)."
)
def test_sbaplayer_id_parameter_exists(self, api):
"""Verify sbaplayer_id query parameter is present on stats endpoints.
NOTE: Currently skipped because the @cache_result decorator's wrapper
uses *args/**kwargs, which prevents FastAPI from discovering typed
parameters for OpenAPI schema generation. This is a pre-existing issue
affecting all cached endpoints, not specific to sbaplayer_id.
"""
r = requests.get(f"{api}/api/openapi.json", timeout=10)
paths = r.json()["paths"]
for route in [
"/api/v3/plays/batting",
"/api/v3/plays/pitching",
"/api/v3/plays/fielding",
]:
params = paths[route]["get"].get("parameters", [])
param_names = [p["name"] for p in params]
assert (
"sbaplayer_id" in param_names
), f"sbaplayer_id parameter missing from {route}"
# ---------------------------------------------------------------------------
# 2. Generic plays query
# ---------------------------------------------------------------------------
# Peewee model_to_dict uses FK field names (game, pitcher) not _id suffixed
PLAY_REQUIRED_KEYS = {"id", "game", "play_num", "pitcher", "pa", "hit", "wpa"}
class TestGenericPlays:
def test_plays_basic(self, api):
"""GET /plays with season filter returns valid structure."""
r = requests.get(
f"{api}/api/v3/plays", params={"season": 12, "limit": 3}, timeout=10
)
assert r.status_code == 200
data = r.json()
assert "count" in data
assert "plays" in data
assert isinstance(data["count"], int)
assert isinstance(data["plays"], list)
def test_plays_have_expected_keys(self, api):
"""Each play object has the core required fields."""
r = requests.get(
f"{api}/api/v3/plays", params={"season": 12, "limit": 3}, timeout=10
)
data = r.json()
if data["count"] == 0:
pytest.skip("No play data for season 12")
for play in data["plays"]:
missing = PLAY_REQUIRED_KEYS - set(play.keys())
assert not missing, f"Play missing keys: {missing}"
def test_plays_empty_season(self, api):
"""Querying a nonexistent season returns 200 with empty results."""
r = requests.get(
f"{api}/api/v3/plays", params={"season": 99, "limit": 3}, timeout=10
)
assert r.status_code == 200
data = r.json()
assert data["count"] == 0
# ---------------------------------------------------------------------------
# 3. Batting stats
# ---------------------------------------------------------------------------
BATTING_REQUIRED_KEYS = {
"player",
"team",
"pa",
"ab",
"hit",
"hr",
"avg",
"obp",
"slg",
"ops",
"woba",
"wpa",
"re24_primary",
"run",
"rbi",
"bb",
"so",
}
class TestBattingStats:
def test_batting_basic(self, api):
"""GET /plays/batting returns valid structure."""
r = requests.get(
f"{api}/api/v3/plays/batting", params={"season": 12, "limit": 3}, timeout=10
)
assert r.status_code == 200
data = r.json()
assert "count" in data
assert "stats" in data
assert isinstance(data["stats"], list)
def test_batting_has_expected_keys(self, api):
"""Each batting stat has required computed fields."""
r = requests.get(
f"{api}/api/v3/plays/batting", params={"season": 12, "limit": 3}, timeout=10
)
data = r.json()
if not data["stats"]:
pytest.skip("No batting data")
for stat in data["stats"]:
missing = BATTING_REQUIRED_KEYS - set(stat.keys())
assert not missing, f"Batting stat missing keys: {missing}"
def test_batting_player_filter(self, api):
"""Filtering by player_id returns results for that player."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "player_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
@pytest.mark.post_deploy
def test_batting_sbaplayer_filter(self, api):
"""Filtering by sbaplayer_id returns 200. Requires feature branch deployed."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "sbaplayer_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
def test_batting_group_by_team(self, api):
"""Group by team sets player to 'TOT'."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "group_by": "team", "limit": 3},
timeout=10,
)
assert r.status_code == 200
data = r.json()
if data["stats"]:
assert data["stats"][0]["player"] == "TOT"
def test_batting_short_output(self, api):
"""Short output returns player as int, not dict."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "short_output": "true", "limit": 3},
timeout=10,
)
assert r.status_code == 200
data = r.json()
if data["stats"]:
assert isinstance(data["stats"][0]["player"], int)
def test_batting_snapshot(self, api):
"""Snapshot test: exact data match for a specific player+season query."""
params = {"season": 12, "player_id": 1}
r = requests.get(f"{api}/api/v3/plays/batting", params=params, timeout=10)
assert r.status_code == 200
data = r.json()
name = "batting_s12_p1"
if snapshot_update_mode():
save_snapshot(name, data)
return
expected = load_snapshot(name)
if expected is None:
pytest.skip("No snapshot saved yet; run with SNAPSHOT_UPDATE=1 first")
assert data == expected, f"Batting snapshot mismatch for {name}"
# ---------------------------------------------------------------------------
# 4. Pitching stats
# ---------------------------------------------------------------------------
PITCHING_REQUIRED_KEYS = {
"player",
"team",
"tbf",
"outs",
"era",
"whip",
"win",
"loss",
"k/9",
"bb/9",
"wpa",
"re24_primary",
"hits",
"so",
"bb",
}
class TestPitchingStats:
def test_pitching_basic(self, api):
"""GET /plays/pitching returns valid structure."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"season": 12, "limit": 3},
timeout=10,
)
assert r.status_code == 200
data = r.json()
assert "count" in data
assert "stats" in data
def test_pitching_has_expected_keys(self, api):
"""Each pitching stat has required computed fields."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"season": 12, "limit": 3},
timeout=10,
)
data = r.json()
if not data["stats"]:
pytest.skip("No pitching data")
for stat in data["stats"]:
missing = PITCHING_REQUIRED_KEYS - set(stat.keys())
assert not missing, f"Pitching stat missing keys: {missing}"
def test_pitching_player_filter(self, api):
"""Filtering by player_id returns 200."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"season": 12, "player_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
@pytest.mark.post_deploy
def test_pitching_sbaplayer_filter(self, api):
"""Filtering by sbaplayer_id returns 200. Requires feature branch deployed."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"season": 12, "sbaplayer_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
def test_pitching_group_by_team(self, api):
"""Group by team returns results."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"season": 12, "group_by": "team", "limit": 3},
timeout=10,
)
assert r.status_code == 200
def test_pitching_snapshot(self, api):
"""Snapshot test: exact data match for a specific player+season query."""
params = {"season": 12, "player_id": 1}
r = requests.get(f"{api}/api/v3/plays/pitching", params=params, timeout=10)
assert r.status_code == 200
data = r.json()
name = "pitching_s12_p1"
if snapshot_update_mode():
save_snapshot(name, data)
return
expected = load_snapshot(name)
if expected is None:
pytest.skip("No snapshot saved yet; run with SNAPSHOT_UPDATE=1 first")
assert data == expected, f"Pitching snapshot mismatch for {name}"
# ---------------------------------------------------------------------------
# 5. Fielding stats
# ---------------------------------------------------------------------------
FIELDING_REQUIRED_KEYS = {
"player",
"team",
"pos",
"x-ch",
"hit",
"error",
"sb",
"cs",
"pb",
"wpa",
"wf%",
}
class TestFieldingStats:
def test_fielding_basic(self, api):
"""GET /plays/fielding returns valid structure."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"season": 12, "limit": 3},
timeout=10,
)
assert r.status_code == 200
data = r.json()
assert "count" in data
assert "stats" in data
def test_fielding_has_expected_keys(self, api):
"""Each fielding stat has required fields."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"season": 12, "limit": 3},
timeout=10,
)
data = r.json()
if not data["stats"]:
pytest.skip("No fielding data")
for stat in data["stats"]:
missing = FIELDING_REQUIRED_KEYS - set(stat.keys())
assert not missing, f"Fielding stat missing keys: {missing}"
def test_fielding_player_filter(self, api):
"""Filtering by player_id returns 200."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"season": 12, "player_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
@pytest.mark.post_deploy
def test_fielding_sbaplayer_filter(self, api):
"""Filtering by sbaplayer_id returns 200. Requires feature branch deployed."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"season": 12, "sbaplayer_id": 1, "limit": 5},
timeout=10,
)
assert r.status_code == 200
def test_fielding_position_filter(self, api):
"""Filtering by position returns 200."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"season": 12, "position": "C", "limit": 3},
timeout=10,
)
assert r.status_code == 200
def test_fielding_snapshot(self, api):
"""Snapshot test: exact data match for a specific player+season query."""
params = {"season": 12, "player_id": 1}
r = requests.get(f"{api}/api/v3/plays/fielding", params=params, timeout=10)
assert r.status_code == 200
data = r.json()
name = "fielding_s12_p1"
if snapshot_update_mode():
save_snapshot(name, data)
return
expected = load_snapshot(name)
if expected is None:
pytest.skip("No snapshot saved yet; run with SNAPSHOT_UPDATE=1 first")
assert data == expected, f"Fielding snapshot mismatch for {name}"
# ---------------------------------------------------------------------------
# 6. Single play CRUD (read-only)
# ---------------------------------------------------------------------------
class TestPlayCrud:
def test_get_single_play(self, api):
"""GET /plays/{id} returns a play with matching id."""
# First find a valid play ID from the data
r = requests.get(
f"{api}/api/v3/plays", params={"season": 12, "limit": 1}, timeout=10
)
data = r.json()
if not data["plays"]:
pytest.skip("No play data available")
play_id = data["plays"][0]["id"]
r = requests.get(f"{api}/api/v3/plays/{play_id}", timeout=10)
assert r.status_code == 200
result = r.json()
assert result["id"] == play_id
def test_get_nonexistent_play(self, api):
"""GET /plays/999999999 returns an error (wrapped by handle_db_errors)."""
r = requests.get(f"{api}/api/v3/plays/999999999", timeout=10)
# handle_db_errors wraps HTTPException as 500 with detail message
assert r.status_code == 500
assert "not found" in r.json().get("detail", "").lower()
# ---------------------------------------------------------------------------
# 7. Validation / error cases
# ---------------------------------------------------------------------------
class TestValidation:
def test_week_and_stype_conflict(self, api):
"""Using both week and s_type should return an error."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "week": 1, "s_type": "regular"},
timeout=10,
)
# handle_db_errors wraps the 400 HTTPException as 500
assert r.status_code in (400, 500)
assert "cannot be used" in r.json().get("detail", "").lower()
def test_week_and_week_start_conflict(self, api):
"""Using both week and week_start should return an error."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"season": 12, "week": 1, "week_start": 1},
timeout=10,
)
assert r.status_code in (400, 500)
assert "cannot be used" in r.json().get("detail", "").lower()
# ---------------------------------------------------------------------------
# 8. group_by=sbaplayer
# ---------------------------------------------------------------------------
class TestGroupBySbaPlayer:
"""Tests for group_by=sbaplayer which aggregates across all Player records
sharing the same SbaPlayer identity (career totals by real-world player)."""
@pytest.mark.post_deploy
def test_batting_sbaplayer_group_by(self, api):
"""GET /plays/batting?group_by=sbaplayer&sbaplayer_id=1 returns exactly 1 row
with SbaPlayer object in 'player' field and team='TOT'."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={"group_by": "sbaplayer", "sbaplayer_id": 1},
timeout=15,
)
assert r.status_code == 200
data = r.json()
assert data["count"] == 1, f"Expected 1 career row, got {data['count']}"
row = data["stats"][0]
assert isinstance(row["player"], dict), "player should be SbaPlayer dict"
assert "first_name" in row["player"]
assert "last_name" in row["player"]
assert row["team"] == "TOT"
@pytest.mark.post_deploy
def test_batting_sbaplayer_career_totals(self, api):
"""Career PA via group_by=sbaplayer should be >= any single season's PA."""
# Get career total
r_career = requests.get(
f"{api}/api/v3/plays/batting",
params={"group_by": "sbaplayer", "sbaplayer_id": 1},
timeout=15,
)
assert r_career.status_code == 200
career_pa = r_career.json()["stats"][0]["pa"]
# Get per-season rows
r_seasons = requests.get(
f"{api}/api/v3/plays/batting",
params={"group_by": "player", "sbaplayer_id": 1, "limit": 999},
timeout=15,
)
assert r_seasons.status_code == 200
season_pas = [s["pa"] for s in r_seasons.json()["stats"]]
assert career_pa >= max(
season_pas
), f"Career PA ({career_pa}) should be >= max season PA ({max(season_pas)})"
@pytest.mark.post_deploy
def test_batting_sbaplayer_short_output(self, api):
"""short_output=true with group_by=sbaplayer returns integer player field."""
r = requests.get(
f"{api}/api/v3/plays/batting",
params={
"group_by": "sbaplayer",
"sbaplayer_id": 1,
"short_output": "true",
},
timeout=15,
)
assert r.status_code == 200
data = r.json()
assert data["count"] == 1
assert isinstance(data["stats"][0]["player"], int)
@pytest.mark.post_deploy
def test_pitching_sbaplayer_group_by(self, api):
"""GET /plays/pitching?group_by=sbaplayer returns 200 with valid structure."""
r = requests.get(
f"{api}/api/v3/plays/pitching",
params={"group_by": "sbaplayer", "sbaplayer_id": 1, "min_pa": 1},
timeout=15,
)
assert r.status_code == 200
data = r.json()
if data["stats"]:
row = data["stats"][0]
assert isinstance(row["player"], dict)
assert row["team"] == "TOT"
@pytest.mark.post_deploy
def test_fielding_sbaplayer_group_by(self, api):
"""GET /plays/fielding?group_by=sbaplayer returns 200 with valid structure."""
r = requests.get(
f"{api}/api/v3/plays/fielding",
params={"group_by": "sbaplayer", "sbaplayer_id": 1, "min_ch": 1},
timeout=15,
)
assert r.status_code == 200
data = r.json()
if data["stats"]:
row = data["stats"][0]
assert isinstance(row["player"], dict)
assert row["team"] == "TOT"