Merge next-release into main #100

Merged
cal merged 28 commits from next-release into main 2026-03-17 19:17:34 +00:00
24 changed files with 1592 additions and 135 deletions

View File

@ -55,8 +55,6 @@ jobs:
context: .
push: true
tags: ${{ steps.tags.outputs.tags }}
cache-from: type=registry,ref=manticorum67/paper-dynasty-database:buildcache
cache-to: type=registry,ref=manticorum67/paper-dynasty-database:buildcache,mode=max
- name: Tag release
if: success() && github.ref == 'refs/heads/main'

View File

@ -1,41 +1,10 @@
FROM tiangolo/uvicorn-gunicorn-fastapi:latest
FROM python:3.11-slim-bookworm
WORKDIR /usr/src/app
# Chrome dependency Instalation
# RUN apt-get update && apt-get install -y \
# fonts-liberation \
# libasound2 \
# libatk-bridge2.0-0 \
# libatk1.0-0 \
# libatspi2.0-0 \
# libcups2 \
# libdbus-1-3 \
# libdrm2 \
# libgbm1 \
# libgtk-3-0 \
# # libgtk-4-1 \
# libnspr4 \
# libnss3 \
# libwayland-client0 \
# libxcomposite1 \
# libxdamage1 \
# libxfixes3 \
# libxkbcommon0 \
# libxrandr2 \
# xdg-utils \
# libu2f-udev \
# libvulkan1
# # Chrome instalation
# RUN curl -LO https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
# RUN apt-get install -y ./google-chrome-stable_current_amd64.deb
# RUN rm google-chrome-stable_current_amd64.deb
# # Check chrome version
# RUN echo "Chrome: " && google-chrome --version
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN playwright install chromium
RUN playwright install-deps chromium
COPY ./app /app/app
COPY ./app /app/app

View File

@ -1050,8 +1050,113 @@ decision_index = ModelIndex(Decision, (Decision.game, Decision.pitcher), unique=
Decision.add_index(decision_index)
class BattingSeasonStats(BaseModel):
player = ForeignKeyField(Player)
team = ForeignKeyField(Team)
season = IntegerField()
games = IntegerField(default=0)
pa = IntegerField(default=0)
ab = IntegerField(default=0)
hits = IntegerField(default=0)
doubles = IntegerField(default=0)
triples = IntegerField(default=0)
hr = IntegerField(default=0)
rbi = IntegerField(default=0)
runs = IntegerField(default=0)
bb = IntegerField(default=0)
strikeouts = IntegerField(default=0)
hbp = IntegerField(default=0)
sac = IntegerField(default=0)
ibb = IntegerField(default=0)
gidp = IntegerField(default=0)
sb = IntegerField(default=0)
cs = IntegerField(default=0)
last_game = ForeignKeyField(StratGame, null=True)
last_updated_at = DateTimeField(null=True)
class Meta:
database = db
table_name = "batting_season_stats"
bss_unique_index = ModelIndex(
BattingSeasonStats,
(BattingSeasonStats.player, BattingSeasonStats.team, BattingSeasonStats.season),
unique=True,
)
BattingSeasonStats.add_index(bss_unique_index)
bss_team_season_index = ModelIndex(
BattingSeasonStats,
(BattingSeasonStats.team, BattingSeasonStats.season),
unique=False,
)
BattingSeasonStats.add_index(bss_team_season_index)
bss_player_season_index = ModelIndex(
BattingSeasonStats,
(BattingSeasonStats.player, BattingSeasonStats.season),
unique=False,
)
BattingSeasonStats.add_index(bss_player_season_index)
class PitchingSeasonStats(BaseModel):
player = ForeignKeyField(Player)
team = ForeignKeyField(Team)
season = IntegerField()
games = IntegerField(default=0)
games_started = IntegerField(default=0)
outs = IntegerField(default=0)
strikeouts = IntegerField(default=0)
bb = IntegerField(default=0)
hits_allowed = IntegerField(default=0)
runs_allowed = IntegerField(default=0)
earned_runs = IntegerField(default=0)
hr_allowed = IntegerField(default=0)
hbp = IntegerField(default=0)
wild_pitches = IntegerField(default=0)
balks = IntegerField(default=0)
wins = IntegerField(default=0)
losses = IntegerField(default=0)
holds = IntegerField(default=0)
saves = IntegerField(default=0)
blown_saves = IntegerField(default=0)
last_game = ForeignKeyField(StratGame, null=True)
last_updated_at = DateTimeField(null=True)
class Meta:
database = db
table_name = "pitching_season_stats"
pitss_unique_index = ModelIndex(
PitchingSeasonStats,
(PitchingSeasonStats.player, PitchingSeasonStats.team, PitchingSeasonStats.season),
unique=True,
)
PitchingSeasonStats.add_index(pitss_unique_index)
pitss_team_season_index = ModelIndex(
PitchingSeasonStats,
(PitchingSeasonStats.team, PitchingSeasonStats.season),
unique=False,
)
PitchingSeasonStats.add_index(pitss_team_season_index)
pitss_player_season_index = ModelIndex(
PitchingSeasonStats,
(PitchingSeasonStats.player, PitchingSeasonStats.season),
unique=False,
)
PitchingSeasonStats.add_index(pitss_player_season_index)
if not SKIP_TABLE_CREATION:
db.create_tables([StratGame, StratPlay, Decision], safe=True)
db.create_tables(
[StratGame, StratPlay, Decision, BattingSeasonStats, PitchingSeasonStats],
safe=True,
)
class ScoutOpportunity(BaseModel):

View File

@ -51,6 +51,7 @@ from .routers_v2 import (
stratplays,
scout_opportunities,
scout_claims,
evolution,
)
@ -105,6 +106,7 @@ app.include_router(stratplays.router)
app.include_router(decisions.router)
app.include_router(scout_opportunities.router)
app.include_router(scout_claims.router)
app.include_router(evolution.router)
@app.middleware("http")

0
app/models/__init__.py Normal file
View File

View File

@ -0,0 +1,7 @@
"""Season stats ORM models.
Models are defined in db_engine alongside all other Peewee models; this
module re-exports them so callers can import from `app.models.season_stats`.
"""
from ..db_engine import BattingSeasonStats, PitchingSeasonStats # noqa: F401

View File

@ -7,11 +7,7 @@ from pandas import DataFrame
from ..db_engine import db, Card, model_to_dict, Team, Player, Pack, Paperdex, CARDSETS, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/cards',
tags=['cards']
)
router = APIRouter(prefix="/api/v2/cards", tags=["cards"])
class CardPydantic(pydantic.BaseModel):
@ -26,12 +22,20 @@ class CardModel(pydantic.BaseModel):
cards: List[CardPydantic]
@router.get('')
@router.get("")
async def get_cards(
player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None,
value: Optional[int] = None, min_value: Optional[int] = None, max_value: Optional[int] = None, variant: Optional[int] = None,
order_by: Optional[str] = None, limit: Optional[int] = None, dupes: Optional[bool] = None,
csv: Optional[bool] = None):
player_id: Optional[int] = None,
team_id: Optional[int] = None,
pack_id: Optional[int] = None,
value: Optional[int] = None,
min_value: Optional[int] = None,
max_value: Optional[int] = None,
variant: Optional[int] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
dupes: Optional[bool] = None,
csv: Optional[bool] = None,
):
all_cards = Card.select()
# if all_cards.count() == 0:
@ -65,7 +69,7 @@ async def get_cards(
if max_value is not None:
all_cards = all_cards.where(Card.value <= max_value)
if order_by is not None:
if order_by.lower() == 'new':
if order_by.lower() == "new":
all_cards = all_cards.order_by(-Card.id)
else:
all_cards = all_cards.order_by(Card.id)
@ -73,8 +77,10 @@ async def get_cards(
all_cards = all_cards.limit(limit)
if dupes:
if team_id is None:
raise HTTPException(status_code=400, detail='Dupe checking must include a team_id')
logging.debug(f'dupe check')
raise HTTPException(
status_code=400, detail="Dupe checking must include a team_id"
)
logging.debug(f"dupe check")
p_query = Card.select(Card.player).where(Card.team_id == team_id)
seen = set()
dupes = []
@ -90,38 +96,52 @@ async def get_cards(
# raise HTTPException(status_code=404, detail=f'No cards found')
if csv:
data_list = [['id', 'player', 'cardset', 'rarity', 'team', 'pack', 'value']] #, 'variant']]
data_list = [
["id", "player", "cardset", "rarity", "team", "pack", "value"]
] # , 'variant']]
for line in all_cards:
data_list.append(
[
line.id, line.player.p_name, line.player.cardset, line.player.rarity, line.team.abbrev, line.pack,
line.id,
line.player.p_name,
line.player.cardset,
line.player.rarity,
line.team.abbrev,
line.pack,
line.value, # line.variant
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_cards.count(), 'cards': []}
for x in all_cards:
card_list = list(all_cards)
player_ids = [c.player_id for c in card_list if c.player_id is not None]
dex_by_player = {}
if player_ids:
for row in Paperdex.select().where(Paperdex.player_id << player_ids):
dex_by_player.setdefault(row.player_id, []).append(row)
return_val = {"count": len(card_list), "cards": []}
for x in card_list:
this_record = model_to_dict(x)
logging.debug(f'this_record: {this_record}')
logging.debug(f"this_record: {this_record}")
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['player']['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['player']['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
entries = dex_by_player.get(x.player_id, [])
this_record["player"]["paperdex"] = {
"count": len(entries),
"paperdex": [model_to_dict(y, recurse=False) for y in entries],
}
return_val['cards'].append(this_record)
return_val["cards"].append(this_record)
# return_val['cards'].append(model_to_dict(x))
return return_val
@router.get('/{card_id}')
@router.get("/{card_id}")
async def v1_cards_get_one(card_id, csv: Optional[bool] = False):
try:
this_card = Card.get_by_id(card_id)
@ -130,25 +150,31 @@ async def v1_cards_get_one(card_id, csv: Optional[bool] = False):
if csv:
data_list = [
['id', 'player', 'team', 'pack', 'value'],
[this_card.id, this_card.player, this_card.team.abbrev, this_card.pack, this_card.value]
["id", "player", "team", "pack", "value"],
[
this_card.id,
this_card.player,
this_card.team.abbrev,
this_card.pack,
this_card.value,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_card)
return return_val
@router.post('')
@router.post("")
async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post cards. This event has been logged.'
detail="You are not authorized to post cards. This event has been logged.",
)
last_card = Card.select(Card.id).order_by(-Card.id).limit(1)
lc_id = last_card[0].id
@ -157,7 +183,7 @@ async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)):
player_ids = []
inc_dex = True
this_team = Team.get_by_id(cards.cards[0].team_id)
if this_team.is_ai or 'Gauntlet' in this_team.abbrev:
if this_team.is_ai or "Gauntlet" in this_team.abbrev:
inc_dex = False
# new_dex = []
@ -177,11 +203,15 @@ async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)):
with db.atomic():
Card.bulk_create(new_cards, batch_size=15)
cost_query = Player.update(cost=Player.cost + 1).where(Player.player_id << player_ids)
cost_query = Player.update(cost=Player.cost + 1).where(
Player.player_id << player_ids
)
cost_query.execute()
# sheets.post_new_cards(SHEETS_AUTH, lc_id)
raise HTTPException(status_code=200, detail=f'{len(new_cards)} cards have been added')
raise HTTPException(
status_code=200, detail=f"{len(new_cards)} cards have been added"
)
# @router.post('/ai-update')
@ -198,21 +228,27 @@ async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)):
# raise HTTPException(status_code=200, detail=f'Just sent AI cards to sheets')
@router.post('/legal-check/{rarity_name}')
@router.post("/legal-check/{rarity_name}")
async def v1_cards_legal_check(
rarity_name: str, card_id: list = Query(default=None), token: str = Depends(oauth2_scheme)):
rarity_name: str,
card_id: list = Query(default=None),
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
raise HTTPException(
status_code=401,
detail='Unauthorized'
)
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
if rarity_name not in CARDSETS.keys():
return f'Rarity name {rarity_name} not a valid check'
return f"Rarity name {rarity_name} not a valid check"
# Handle case where card_id is passed as a stringified list
if card_id and len(card_id) == 1 and isinstance(card_id[0], str) and card_id[0].startswith('['):
if (
card_id
and len(card_id) == 1
and isinstance(card_id[0], str)
and card_id[0].startswith("[")
):
import ast
try:
card_id = [int(x) for x in ast.literal_eval(card_id[0])]
except (ValueError, SyntaxError):
@ -222,48 +258,51 @@ async def v1_cards_legal_check(
all_cards = Card.select().where(Card.id << card_id)
for x in all_cards:
if x.player.cardset_id not in CARDSETS[rarity_name]['human']:
if x.player.cardset_id not in CARDSETS[rarity_name]["human"]:
if x.player.p_name in x.player.description:
bad_cards.append(x.player.description)
else:
bad_cards.append(f'{x.player.description} {x.player.p_name}')
bad_cards.append(f"{x.player.description} {x.player.p_name}")
return {'count': len(bad_cards), 'bad_cards': bad_cards}
return {"count": len(bad_cards), "bad_cards": bad_cards}
@router.post('/post-update/{starting_id}')
@router.post("/post-update/{starting_id}")
async def v1_cards_post_update(starting_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to update card lists. This event has been logged.'
detail="You are not authorized to update card lists. This event has been logged.",
)
# sheets.post_new_cards(SHEETS_AUTH, starting_id)
raise HTTPException(status_code=200, detail=f'Just sent cards to sheets starting at ID {starting_id}')
raise HTTPException(
status_code=200,
detail=f"Just sent cards to sheets starting at ID {starting_id}",
)
@router.post('/post-delete')
@router.post("/post-delete")
async def v1_cards_post_delete(del_ids: str, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete card lists. This event has been logged.'
detail="You are not authorized to delete card lists. This event has been logged.",
)
logging.info(f'del_ids: {del_ids} / type: {type(del_ids)}')
logging.info(f"del_ids: {del_ids} / type: {type(del_ids)}")
# sheets.post_deletion(SHEETS_AUTH, del_ids.split(','))
@router.post('/wipe-team/{team_id}')
@router.post("/wipe-team/{team_id}")
async def v1_cards_wipe_team(team_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to wipe teams. This event has been logged.'
detail="You are not authorized to wipe teams. This event has been logged.",
)
try:
@ -273,19 +312,27 @@ async def v1_cards_wipe_team(team_id: int, token: str = Depends(oauth2_scheme)):
raise HTTPException(status_code=404, detail=f'Team {team_id} not found')
t_query = Card.update(team=None).where(Card.team == this_team).execute()
return f'Wiped {t_query} cards'
return f"Wiped {t_query} cards"
@router.patch('/{card_id}')
@router.patch("/{card_id}")
async def v1_cards_patch(
card_id, player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None,
value: Optional[int] = None, variant: Optional[int] = None, roster1_id: Optional[int] = None, roster2_id: Optional[int] = None,
roster3_id: Optional[int] = None, token: str = Depends(oauth2_scheme)):
card_id,
player_id: Optional[int] = None,
team_id: Optional[int] = None,
pack_id: Optional[int] = None,
value: Optional[int] = None,
variant: Optional[int] = None,
roster1_id: Optional[int] = None,
roster2_id: Optional[int] = None,
roster3_id: Optional[int] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch cards. This event has been logged.'
detail="You are not authorized to patch cards. This event has been logged.",
)
try:
this_card = Card.get_by_id(card_id)
@ -318,17 +365,17 @@ async def v1_cards_patch(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
detail="Well slap my ass and call me a teapot; I could not save that rarity",
)
@router.delete('/{card_id}')
@router.delete("/{card_id}")
async def v1_cards_delete(card_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packs. This event has been logged.'
detail="You are not authorized to delete packs. This event has been logged.",
)
try:
this_card = Card.get_by_id(card_id)
@ -338,6 +385,6 @@ async def v1_cards_delete(card_id, token: str = Depends(oauth2_scheme)):
count = this_card.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Card {card_id} has been deleted')
raise HTTPException(status_code=200, detail=f"Card {card_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f'Card {card_id} was not deleted')
raise HTTPException(status_code=500, detail=f"Card {card_id} was not deleted")

View File

@ -0,0 +1,43 @@
from fastapi import APIRouter, Depends, HTTPException, Query
import logging
from typing import Optional
from ..db_engine import model_to_dict
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/evolution", tags=["evolution"])
@router.get("/tracks")
async def list_tracks(
card_type: Optional[str] = Query(default=None),
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import EvolutionTrack
query = EvolutionTrack.select()
if card_type is not None:
query = query.where(EvolutionTrack.card_type == card_type)
items = [model_to_dict(t, recurse=False) for t in query]
return {"count": len(items), "items": items}
@router.get("/tracks/{track_id}")
async def get_track(track_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import EvolutionTrack
try:
track = EvolutionTrack.get_by_id(track_id)
except Exception:
raise HTTPException(status_code=404, detail=f"Track {track_id} not found")
return model_to_dict(track, recurse=False)

View File

@ -36,14 +36,3 @@ async def get_player_keys(player_id: list = Query(default=None)):
return_val = {"count": len(all_keys), "keys": [dict(x) for x in all_keys]}
return return_val
@router.post("/live-update/pitching")
def live_update_pitching(files: BattingFiles, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401, detail="You are not authorized to initiate live updates."
)
return files.dict()

View File

@ -0,0 +1,232 @@
"""Season stats API endpoints.
Covers WP-13 (Post-Game Callback Integration):
POST /api/v2/season-stats/update-game/{game_id}
Aggregates BattingStat and PitchingStat rows for a completed game and
increments the corresponding batting_season_stats / pitching_season_stats
rows via an additive upsert.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
from ..db_engine import db
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/season-stats", tags=["season-stats"])
def _ip_to_outs(ip: float) -> int:
"""Convert innings-pitched float (e.g. 6.1) to integer outs (e.g. 19).
Baseball stores IP as whole.partial where the fractional digit is outs
(0, 1, or 2), not tenths. 6.1 = 6 innings + 1 out = 19 outs.
"""
whole = int(ip)
partial = round((ip - whole) * 10)
return whole * 3 + partial
@router.post("/update-game/{game_id}")
async def update_game_season_stats(game_id: int, token: str = Depends(oauth2_scheme)):
"""Increment season stats with batting and pitching deltas from a game.
Queries BattingStat and PitchingStat rows for game_id, aggregates by
(player_id, team_id, season), then performs an additive ON CONFLICT upsert
into batting_season_stats and pitching_season_stats respectively.
Replaying the same game_id will double-count stats, so callers must ensure
this is only called once per game.
Response: {"updated": N} where N is the number of player rows touched.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
updated = 0
# --- Batting ---
bat_rows = list(
db.execute_sql(
"""
SELECT c.player_id, bs.team_id, bs.season,
SUM(bs.pa), SUM(bs.ab), SUM(bs.run), SUM(bs.hit),
SUM(bs.double), SUM(bs.triple), SUM(bs.hr), SUM(bs.rbi),
SUM(bs.bb), SUM(bs.so), SUM(bs.hbp), SUM(bs.sac),
SUM(bs.ibb), SUM(bs.gidp), SUM(bs.sb), SUM(bs.cs)
FROM battingstat bs
JOIN card c ON bs.card_id = c.id
WHERE bs.game_id = %s
GROUP BY c.player_id, bs.team_id, bs.season
""",
(game_id,),
)
)
for row in bat_rows:
(
player_id,
team_id,
season,
pa,
ab,
runs,
hits,
doubles,
triples,
hr,
rbi,
bb,
strikeouts,
hbp,
sac,
ibb,
gidp,
sb,
cs,
) = row
db.execute_sql(
"""
INSERT INTO batting_season_stats
(player_id, team_id, season,
pa, ab, runs, hits, doubles, triples, hr, rbi,
bb, strikeouts, hbp, sac, ibb, gidp, sb, cs)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (player_id, team_id, season) DO UPDATE SET
pa = batting_season_stats.pa + EXCLUDED.pa,
ab = batting_season_stats.ab + EXCLUDED.ab,
runs = batting_season_stats.runs + EXCLUDED.runs,
hits = batting_season_stats.hits + EXCLUDED.hits,
doubles = batting_season_stats.doubles + EXCLUDED.doubles,
triples = batting_season_stats.triples + EXCLUDED.triples,
hr = batting_season_stats.hr + EXCLUDED.hr,
rbi = batting_season_stats.rbi + EXCLUDED.rbi,
bb = batting_season_stats.bb + EXCLUDED.bb,
strikeouts= batting_season_stats.strikeouts+ EXCLUDED.strikeouts,
hbp = batting_season_stats.hbp + EXCLUDED.hbp,
sac = batting_season_stats.sac + EXCLUDED.sac,
ibb = batting_season_stats.ibb + EXCLUDED.ibb,
gidp = batting_season_stats.gidp + EXCLUDED.gidp,
sb = batting_season_stats.sb + EXCLUDED.sb,
cs = batting_season_stats.cs + EXCLUDED.cs
""",
(
player_id,
team_id,
season,
pa,
ab,
runs,
hits,
doubles,
triples,
hr,
rbi,
bb,
strikeouts,
hbp,
sac,
ibb,
gidp,
sb,
cs,
),
)
updated += 1
# --- Pitching ---
pit_rows = list(
db.execute_sql(
"""
SELECT c.player_id, ps.team_id, ps.season,
SUM(ps.ip), SUM(ps.so), SUM(ps.hit), SUM(ps.run), SUM(ps.erun),
SUM(ps.bb), SUM(ps.hbp), SUM(ps.wp), SUM(ps.balk), SUM(ps.hr),
SUM(ps.gs), SUM(ps.win), SUM(ps.loss), SUM(ps.hold),
SUM(ps.sv), SUM(ps.bsv)
FROM pitchingstat ps
JOIN card c ON ps.card_id = c.id
WHERE ps.game_id = %s
GROUP BY c.player_id, ps.team_id, ps.season
""",
(game_id,),
)
)
for row in pit_rows:
(
player_id,
team_id,
season,
ip,
strikeouts,
hits_allowed,
runs_allowed,
earned_runs,
bb,
hbp,
wild_pitches,
balks,
hr_allowed,
games_started,
wins,
losses,
holds,
saves,
blown_saves,
) = row
outs = _ip_to_outs(float(ip))
db.execute_sql(
"""
INSERT INTO pitching_season_stats
(player_id, team_id, season,
outs, strikeouts, hits_allowed, runs_allowed, earned_runs,
bb, hbp, wild_pitches, balks, hr_allowed,
games_started, wins, losses, holds, saves, blown_saves)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (player_id, team_id, season) DO UPDATE SET
outs = pitching_season_stats.outs + EXCLUDED.outs,
strikeouts = pitching_season_stats.strikeouts + EXCLUDED.strikeouts,
hits_allowed= pitching_season_stats.hits_allowed+ EXCLUDED.hits_allowed,
runs_allowed= pitching_season_stats.runs_allowed+ EXCLUDED.runs_allowed,
earned_runs = pitching_season_stats.earned_runs + EXCLUDED.earned_runs,
bb = pitching_season_stats.bb + EXCLUDED.bb,
hbp = pitching_season_stats.hbp + EXCLUDED.hbp,
wild_pitches= pitching_season_stats.wild_pitches+ EXCLUDED.wild_pitches,
balks = pitching_season_stats.balks + EXCLUDED.balks,
hr_allowed = pitching_season_stats.hr_allowed + EXCLUDED.hr_allowed,
games_started= pitching_season_stats.games_started+ EXCLUDED.games_started,
wins = pitching_season_stats.wins + EXCLUDED.wins,
losses = pitching_season_stats.losses + EXCLUDED.losses,
holds = pitching_season_stats.holds + EXCLUDED.holds,
saves = pitching_season_stats.saves + EXCLUDED.saves,
blown_saves = pitching_season_stats.blown_saves + EXCLUDED.blown_saves
""",
(
player_id,
team_id,
season,
outs,
strikeouts,
hits_allowed,
runs_allowed,
earned_runs,
bb,
hbp,
wild_pitches,
balks,
hr_allowed,
games_started,
wins,
losses,
holds,
saves,
blown_saves,
),
)
updated += 1
logging.info(f"update-game/{game_id}: updated {updated} season stats rows")
return {"updated": updated}

View File

@ -617,8 +617,9 @@ def sort_pitchers(pitching_card_query) -> DataFrame | None:
return float("inf")
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
# TODO: should this be max??
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
# Weight the weaker split (higher OPS allowed) so platoon weaknesses are penalized.
# Starters face both LHH and RHH, so vulnerability against either hand matters.
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
pitcher_df["total_ops"] = pitcher_df.apply(get_total_ops, axis=1)
return pitcher_df.sort_values(by="total_ops")
@ -699,7 +700,8 @@ async def get_team_sp(
return float("inf")
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
# Weight the weaker split (higher OPS allowed) so platoon weaknesses are penalized.
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
starter_df["total_ops"] = starter_df.apply(get_total_ops, axis=1)
return starter_df.sort_values(by="total_ops")

0
app/seed/__init__.py Normal file
View File

View File

@ -0,0 +1,5 @@
[
{"name": "Batter", "card_type": "batter", "formula": "pa+tb*2", "t1": 37, "t2": 149, "t3": 448, "t4": 896},
{"name": "Starting Pitcher", "card_type": "sp", "formula": "ip+k", "t1": 10, "t2": 40, "t3": 120, "t4": 240},
{"name": "Relief Pitcher", "card_type": "rp", "formula": "ip+k", "t1": 3, "t2": 12, "t3": 35, "t4": 70}
]

View File

@ -0,0 +1,41 @@
"""Seed data fixture for EvolutionTrack.
Inserts the three universal evolution tracks (Batter, Starting Pitcher,
Relief Pitcher) if they do not already exist. Safe to call multiple times
thanks to get_or_create depends on WP-01 (EvolutionTrack model) to run.
"""
import json
import os
_JSON_PATH = os.path.join(os.path.dirname(__file__), "evolution_tracks.json")
def load_tracks():
"""Return the locked list of evolution track dicts from the JSON fixture."""
with open(_JSON_PATH) as fh:
return json.load(fh)
def seed(model_class=None):
"""Insert evolution tracks that are not yet in the database.
Args:
model_class: Peewee model with get_or_create support. Defaults to
``app.db_engine.EvolutionTrack`` (imported lazily so this module
can be imported before WP-01 lands).
Returns:
List of (instance, created) tuples from get_or_create.
"""
if model_class is None:
from app.db_engine import EvolutionTrack as model_class # noqa: PLC0415
results = []
for track in load_tracks():
instance, created = model_class.get_or_create(
card_type=track["card_type"],
defaults=track,
)
results.append((instance, created))
return results

0
app/services/__init__.py Normal file
View File

View File

@ -0,0 +1,109 @@
"""Formula engine for evolution value computation (WP-09).
Three pure functions that compute a numeric evolution value from career stats,
plus helpers for formula dispatch and tier classification.
Stats attributes expected by each formula:
compute_batter_value: pa, hits, doubles, triples, hr (from BattingSeasonStats)
compute_sp_value: outs, strikeouts (from PitchingSeasonStats)
compute_rp_value: outs, strikeouts (from PitchingSeasonStats)
"""
from typing import Protocol
class BatterStats(Protocol):
pa: int
hits: int
doubles: int
triples: int
hr: int
class PitcherStats(Protocol):
outs: int
strikeouts: int
# ---------------------------------------------------------------------------
# Core formula functions
# ---------------------------------------------------------------------------
def compute_batter_value(stats) -> float:
"""PA + (TB x 2) where TB = 1B + 2x2B + 3x3B + 4xHR."""
singles = stats.hits - stats.doubles - stats.triples - stats.hr
tb = singles + 2 * stats.doubles + 3 * stats.triples + 4 * stats.hr
return float(stats.pa + tb * 2)
def _pitcher_value(stats) -> float:
return stats.outs / 3 + stats.strikeouts
def compute_sp_value(stats) -> float:
"""IP + K where IP = outs / 3."""
return _pitcher_value(stats)
def compute_rp_value(stats) -> float:
"""IP + K (same formula as SP; thresholds differ)."""
return _pitcher_value(stats)
# ---------------------------------------------------------------------------
# Dispatch and tier helpers
# ---------------------------------------------------------------------------
_FORMULA_DISPATCH = {
"batter": compute_batter_value,
"sp": compute_sp_value,
"rp": compute_rp_value,
}
def compute_value_for_track(card_type: str, stats) -> float:
"""Dispatch to the correct formula function by card_type.
Args:
card_type: One of 'batter', 'sp', 'rp'.
stats: Object with the attributes required by the formula.
Raises:
ValueError: If card_type is not recognised.
"""
fn = _FORMULA_DISPATCH.get(card_type)
if fn is None:
raise ValueError(f"Unknown card_type: {card_type!r}")
return fn(stats)
def tier_from_value(value: float, track) -> int:
"""Return the evolution tier (0-4) for a computed value against a track.
Tier boundaries are inclusive on the lower end:
T0: value < t1
T1: t1 <= value < t2
T2: t2 <= value < t3
T3: t3 <= value < t4
T4: value >= t4
Args:
value: Computed formula value.
track: Object (or dict-like) with t1, t2, t3, t4 attributes/keys.
"""
# Support both attribute-style (Peewee model) and dict (seed fixture)
if isinstance(track, dict):
t1, t2, t3, t4 = track["t1"], track["t2"], track["t3"], track["t4"]
else:
t1, t2, t3, t4 = track.t1, track.t2, track.t3, track.t4
if value >= t4:
return 4
if value >= t3:
return 3
if value >= t2:
return 2
if value >= t1:
return 1
return 0

5
pyproject.toml Normal file
View File

@ -0,0 +1,5 @@
[tool.ruff]
[tool.ruff.lint]
# db_engine.py uses `from peewee import *` throughout — a pre-existing
# codebase pattern. Suppress wildcard-import warnings for that file only.
per-file-ignores = { "app/db_engine.py" = ["F401", "F403", "F405"], "app/main.py" = ["E402", "F541"] }

View File

@ -1,15 +1,14 @@
pydantic==1.*
fastapi
uvicorn
peewee
psycopg2-binary # PostgreSQL adapter for Python
python-multipart
numpy<2
pandas
pygsheets
pybaseball
python-multipart
requests
html2image
jinja2
playwright
pydantic==1.10.21
fastapi==0.111.1
uvicorn==0.30.6
peewee==3.17.9
psycopg2-binary==2.9.9
python-multipart==0.0.9
numpy==1.26.4
pandas==2.2.3
pygsheets==2.0.6
pybaseball==2.2.7
requests==2.32.3
html2image==2.0.6
jinja2==3.1.4
playwright==1.45.1

0
tests/__init__.py Normal file
View File

14
tests/conftest.py Normal file
View File

@ -0,0 +1,14 @@
"""Pytest configuration for the paper-dynasty-database test suite.
Sets DATABASE_TYPE=postgresql before any app module is imported so that
db_engine.py sets SKIP_TABLE_CREATION=True and does not try to mutate the
production SQLite file during test collection. Each test module is
responsible for binding models to its own in-memory database.
"""
import os
os.environ["DATABASE_TYPE"] = "postgresql"
# Provide dummy credentials so PooledPostgresqlDatabase can be instantiated
# without raising a configuration error (it will not actually be used).
os.environ.setdefault("POSTGRES_PASSWORD", "test-dummy")

View File

@ -0,0 +1,119 @@
"""Tests for the evolution track seed data fixture (WP-03).
Unit tests verify the JSON fixture is correctly formed without touching any
database. The integration test binds a minimal in-memory EvolutionTrack
model (mirroring the schema WP-01 will add to db_engine) to an in-memory
SQLite database, calls seed(), and verifies idempotency.
"""
import pytest
from peewee import CharField, IntegerField, Model, SqliteDatabase
from app.seed.evolution_tracks import load_tracks, seed
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_test_db = SqliteDatabase(":memory:")
class EvolutionTrackStub(Model):
"""Minimal EvolutionTrack model for integration tests.
Mirrors the schema that WP-01 will add to db_engine so the integration
test can run without WP-01 being merged.
"""
name = CharField()
card_type = CharField(unique=True)
formula = CharField()
t1 = IntegerField()
t2 = IntegerField()
t3 = IntegerField()
t4 = IntegerField()
class Meta:
database = _test_db
table_name = "evolution_track"
@pytest.fixture(autouse=True)
def _db():
"""Bind and create the stub table; drop it after each test."""
_test_db.connect(reuse_if_open=True)
_test_db.create_tables([EvolutionTrackStub])
yield
_test_db.drop_tables([EvolutionTrackStub])
# ---------------------------------------------------------------------------
# Unit tests — JSON fixture only, no database
# ---------------------------------------------------------------------------
def test_three_tracks_in_seed_data():
"""load_tracks() must return exactly 3 evolution tracks."""
assert len(load_tracks()) == 3
def test_card_types_are_exactly_batter_sp_rp():
"""The set of card_type values must be exactly {'batter', 'sp', 'rp'}."""
types = {t["card_type"] for t in load_tracks()}
assert types == {"batter", "sp", "rp"}
def test_all_thresholds_positive_and_ascending():
"""Each track must have t1 < t2 < t3 < t4, all positive."""
for track in load_tracks():
assert track["t1"] > 0
assert track["t1"] < track["t2"] < track["t3"] < track["t4"]
def test_all_tracks_have_non_empty_formula():
"""Every track must have a non-empty formula string."""
for track in load_tracks():
assert isinstance(track["formula"], str) and track["formula"].strip()
def test_tier_thresholds_match_locked_values():
"""Threshold values must exactly match the locked design spec."""
tracks = {t["card_type"]: t for t in load_tracks()}
assert tracks["batter"]["t1"] == 37
assert tracks["batter"]["t2"] == 149
assert tracks["batter"]["t3"] == 448
assert tracks["batter"]["t4"] == 896
assert tracks["sp"]["t1"] == 10
assert tracks["sp"]["t2"] == 40
assert tracks["sp"]["t3"] == 120
assert tracks["sp"]["t4"] == 240
assert tracks["rp"]["t1"] == 3
assert tracks["rp"]["t2"] == 12
assert tracks["rp"]["t3"] == 35
assert tracks["rp"]["t4"] == 70
# ---------------------------------------------------------------------------
# Integration test — uses the stub model + in-memory SQLite
# ---------------------------------------------------------------------------
def test_seed_is_idempotent():
"""Calling seed() twice must not create duplicate rows (get_or_create).
First call: all three tracks created (created=True for each).
Second call: all three already exist (created=False for each).
Both calls succeed without error.
"""
results_first = seed(model_class=EvolutionTrackStub)
assert len(results_first) == 3
assert all(created for _, created in results_first)
results_second = seed(model_class=EvolutionTrackStub)
assert len(results_second) == 3
assert not any(created for _, created in results_second)
assert EvolutionTrackStub.select().count() == 3

View File

@ -0,0 +1,132 @@
"""Integration tests for the evolution track catalog API endpoints (WP-06).
Tests cover:
GET /api/v2/evolution/tracks
GET /api/v2/evolution/tracks/{track_id}
All tests require a live PostgreSQL connection (POSTGRES_HOST env var) and
assume the evolution schema migration (WP-04) has already been applied.
Tests auto-skip when POSTGRES_HOST is not set.
Test data is inserted via psycopg2 before the test module runs and deleted
afterwards so the tests are repeatable. ON CONFLICT keeps the table clean
even if a previous run did not complete teardown.
"""
import os
import pytest
from fastapi.testclient import TestClient
POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif(
not POSTGRES_HOST, reason="POSTGRES_HOST not set — integration tests skipped"
)
AUTH_HEADER = {"Authorization": f"Bearer {os.environ.get('API_TOKEN', 'test-token')}"}
_SEED_TRACKS = [
("Batter", "batter", "pa+tb*2", 37, 149, 448, 896),
("Starting Pitcher", "sp", "ip+k", 10, 40, 120, 240),
("Relief Pitcher", "rp", "ip+k", 3, 12, 35, 70),
]
@pytest.fixture(scope="module")
def seeded_tracks(pg_conn):
"""Insert three canonical evolution tracks; remove them after the module.
Uses ON CONFLICT DO UPDATE so the fixture is safe to run even if rows
already exist from a prior test run that did not clean up. Returns the
list of row IDs that were upserted.
"""
cur = pg_conn.cursor()
ids = []
for name, card_type, formula, t1, t2, t3, t4 in _SEED_TRACKS:
cur.execute(
"""
INSERT INTO evolution_track
(name, card_type, formula, t1_threshold, t2_threshold, t3_threshold, t4_threshold)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (card_type) DO UPDATE SET
name = EXCLUDED.name,
formula = EXCLUDED.formula,
t1_threshold = EXCLUDED.t1_threshold,
t2_threshold = EXCLUDED.t2_threshold,
t3_threshold = EXCLUDED.t3_threshold,
t4_threshold = EXCLUDED.t4_threshold
RETURNING id
""",
(name, card_type, formula, t1, t2, t3, t4),
)
ids.append(cur.fetchone()[0])
pg_conn.commit()
yield ids
cur.execute("DELETE FROM evolution_track WHERE id = ANY(%s)", (ids,))
pg_conn.commit()
@pytest.fixture(scope="module")
def client():
"""FastAPI TestClient backed by the real PostgreSQL database."""
from app.main import app
with TestClient(app) as c:
yield c
@_skip_no_pg
def test_list_tracks_returns_count_3(client, seeded_tracks):
"""GET /tracks returns all three tracks with count=3.
After seeding batter/sp/rp, the table should have exactly those three
rows (no other tracks are inserted by other test modules).
"""
resp = client.get("/api/v2/evolution/tracks", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 3
assert len(data["items"]) == 3
@_skip_no_pg
def test_filter_by_card_type(client, seeded_tracks):
"""card_type=sp filter returns exactly 1 track with card_type 'sp'."""
resp = client.get("/api/v2/evolution/tracks?card_type=sp", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 1
assert data["items"][0]["card_type"] == "sp"
@_skip_no_pg
def test_get_single_track_with_thresholds(client, seeded_tracks):
"""GET /tracks/{id} returns a track dict with formula and t1-t4 thresholds."""
track_id = seeded_tracks[0] # batter
resp = client.get(f"/api/v2/evolution/tracks/{track_id}", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["card_type"] == "batter"
assert data["formula"] == "pa+tb*2"
for key in ("t1_threshold", "t2_threshold", "t3_threshold", "t4_threshold"):
assert key in data, f"Missing field: {key}"
assert data["t1_threshold"] == 37
assert data["t4_threshold"] == 896
@_skip_no_pg
def test_404_for_nonexistent_track(client, seeded_tracks):
"""GET /tracks/999999 returns 404 when the track does not exist."""
resp = client.get("/api/v2/evolution/tracks/999999", headers=AUTH_HEADER)
assert resp.status_code == 404
@_skip_no_pg
def test_auth_required(client, seeded_tracks):
"""Requests without a Bearer token return 401 for both endpoints."""
resp_list = client.get("/api/v2/evolution/tracks")
assert resp_list.status_code == 401
track_id = seeded_tracks[0]
resp_single = client.get(f"/api/v2/evolution/tracks/{track_id}")
assert resp_single.status_code == 401

View File

@ -0,0 +1,188 @@
"""Tests for the formula engine (WP-09).
Unit tests only no database required. Stats inputs are simple namespace
objects whose attributes match what BattingSeasonStats/PitchingSeasonStats expose.
Tier thresholds used (from evolution_tracks.json seed data):
Batter: t1=37, t2=149, t3=448, t4=896
SP: t1=10, t2=40, t3=120, t4=240
RP: t1=3, t2=12, t3=35, t4=70
"""
from types import SimpleNamespace
import pytest
from app.services.formula_engine import (
compute_batter_value,
compute_rp_value,
compute_sp_value,
compute_value_for_track,
tier_from_value,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def batter_stats(**kwargs):
"""Build a minimal batter stats object with all fields defaulting to 0."""
defaults = {"pa": 0, "hits": 0, "doubles": 0, "triples": 0, "hr": 0}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def pitcher_stats(**kwargs):
"""Build a minimal pitcher stats object with all fields defaulting to 0."""
defaults = {"outs": 0, "strikeouts": 0}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def track_dict(card_type: str) -> dict:
"""Return the locked threshold dict for a given card_type."""
return {
"batter": {"card_type": "batter", "t1": 37, "t2": 149, "t3": 448, "t4": 896},
"sp": {"card_type": "sp", "t1": 10, "t2": 40, "t3": 120, "t4": 240},
"rp": {"card_type": "rp", "t1": 3, "t2": 12, "t3": 35, "t4": 70},
}[card_type]
def track_ns(card_type: str):
"""Return a namespace (attribute-style) track for a given card_type."""
return SimpleNamespace(**track_dict(card_type))
# ---------------------------------------------------------------------------
# compute_batter_value
# ---------------------------------------------------------------------------
def test_batter_formula_single_and_double():
"""4 PA, 1 single, 1 double: PA=4, TB=1+2=3, value = 4 + 3×2 = 10."""
stats = batter_stats(pa=4, hits=2, doubles=1)
assert compute_batter_value(stats) == 10.0
def test_batter_formula_no_hits():
"""4 PA, 0 hits: TB=0, value = 4 + 0 = 4."""
stats = batter_stats(pa=4)
assert compute_batter_value(stats) == 4.0
def test_batter_formula_hr_heavy():
"""4 PA, 2 HR: TB = 0 singles + 4×2 = 8, value = 4 + 8×2 = 20."""
stats = batter_stats(pa=4, hits=2, hr=2)
assert compute_batter_value(stats) == 20.0
# ---------------------------------------------------------------------------
# compute_sp_value
# ---------------------------------------------------------------------------
def test_sp_formula_standard():
"""18 outs + 5 K: IP = 18/3 = 6.0, value = 6.0 + 5 = 11.0."""
stats = pitcher_stats(outs=18, strikeouts=5)
assert compute_sp_value(stats) == 11.0
# ---------------------------------------------------------------------------
# compute_rp_value
# ---------------------------------------------------------------------------
def test_rp_formula_standard():
"""3 outs + 2 K: IP = 3/3 = 1.0, value = 1.0 + 2 = 3.0."""
stats = pitcher_stats(outs=3, strikeouts=2)
assert compute_rp_value(stats) == 3.0
# ---------------------------------------------------------------------------
# Zero stats
# ---------------------------------------------------------------------------
def test_batter_zero_stats_returns_zero():
"""All-zero batter stats must return 0.0."""
assert compute_batter_value(batter_stats()) == 0.0
def test_sp_zero_stats_returns_zero():
"""All-zero SP stats must return 0.0."""
assert compute_sp_value(pitcher_stats()) == 0.0
def test_rp_zero_stats_returns_zero():
"""All-zero RP stats must return 0.0."""
assert compute_rp_value(pitcher_stats()) == 0.0
# ---------------------------------------------------------------------------
# Formula dispatch by track name
# ---------------------------------------------------------------------------
def test_dispatch_batter():
"""compute_value_for_track('batter', ...) delegates to compute_batter_value."""
stats = batter_stats(pa=4, hits=2, doubles=1)
assert compute_value_for_track("batter", stats) == compute_batter_value(stats)
def test_dispatch_sp():
"""compute_value_for_track('sp', ...) delegates to compute_sp_value."""
stats = pitcher_stats(outs=18, strikeouts=5)
assert compute_value_for_track("sp", stats) == compute_sp_value(stats)
def test_dispatch_rp():
"""compute_value_for_track('rp', ...) delegates to compute_rp_value."""
stats = pitcher_stats(outs=3, strikeouts=2)
assert compute_value_for_track("rp", stats) == compute_rp_value(stats)
def test_dispatch_unknown_raises():
"""An unrecognised card_type must raise ValueError."""
with pytest.raises(ValueError, match="Unknown card_type"):
compute_value_for_track("dh", batter_stats())
# ---------------------------------------------------------------------------
# tier_from_value — batter thresholds (t1=37, t2=149, t3=448, t4=896)
# ---------------------------------------------------------------------------
def test_tier_exact_t1_boundary():
"""value=37 is exactly t1 for batter → T1."""
assert tier_from_value(37, track_dict("batter")) == 1
def test_tier_just_below_t1():
"""value=36 is just below t1=37 for batter → T0."""
assert tier_from_value(36, track_dict("batter")) == 0
def test_tier_t4_boundary():
"""value=896 is exactly t4 for batter → T4."""
assert tier_from_value(896, track_dict("batter")) == 4
def test_tier_above_t4():
"""value above t4 still returns T4 (fully evolved)."""
assert tier_from_value(1000, track_dict("batter")) == 4
def test_tier_t2_boundary():
"""value=149 is exactly t2 for batter → T2."""
assert tier_from_value(149, track_dict("batter")) == 2
def test_tier_t3_boundary():
"""value=448 is exactly t3 for batter → T3."""
assert tier_from_value(448, track_dict("batter")) == 3
def test_tier_accepts_namespace_track():
"""tier_from_value must work with attribute-style track objects (Peewee models)."""
assert tier_from_value(37, track_ns("batter")) == 1

View File

@ -0,0 +1,451 @@
"""Tests for BattingSeasonStats and PitchingSeasonStats Peewee models.
Unit tests verify model structure and defaults on unsaved instances without
touching a database. Integration tests use an in-memory SQLite database to
verify table creation, unique constraints, indexes, and the delta-update
(increment) pattern.
"""
import pytest
from peewee import SqliteDatabase, IntegrityError
from app.models.season_stats import BattingSeasonStats, PitchingSeasonStats
from app.db_engine import Rarity, Event, Cardset, MlbPlayer, Player, Team, StratGame
# Dependency order matters for FK resolution.
_TEST_MODELS = [
Rarity,
Event,
Cardset,
MlbPlayer,
Player,
Team,
StratGame,
BattingSeasonStats,
PitchingSeasonStats,
]
_test_db = SqliteDatabase(":memory:", pragmas={"foreign_keys": 1})
@pytest.fixture(autouse=True)
def setup_test_db():
"""Bind all models to an in-memory SQLite database, create tables, and
tear them down after each test so each test starts from a clean state."""
_test_db.bind(_TEST_MODELS)
_test_db.create_tables(_TEST_MODELS)
yield _test_db
_test_db.drop_tables(list(reversed(_TEST_MODELS)), safe=True)
# ── Fixture helpers ─────────────────────────────────────────────────────────
def make_rarity():
return Rarity.create(value=1, name="Common", color="#ffffff")
def make_cardset():
return Cardset.create(name="2025", description="2025 Season", total_cards=100)
def make_player(cardset, rarity, player_id=1):
return Player.create(
player_id=player_id,
p_name="Test Player",
cost=100,
image="test.png",
mlbclub="BOS",
franchise="Boston",
cardset=cardset,
set_num=1,
rarity=rarity,
pos_1="OF",
description="Test",
)
def make_team(abbrev="TEST", gmid=123456789):
return Team.create(
abbrev=abbrev,
sname=abbrev,
lname=f"Team {abbrev}",
gmid=gmid,
gmname="testuser",
gsheet="https://example.com",
wallet=1000,
team_value=1000,
collection_value=1000,
season=1,
)
def make_game(home_team, away_team, season=10):
return StratGame.create(
season=season,
game_type="ranked",
away_team=away_team,
home_team=home_team,
)
def make_batting_stats(player, team, season=10, **kwargs):
return BattingSeasonStats.create(player=player, team=team, season=season, **kwargs)
def make_pitching_stats(player, team, season=10, **kwargs):
return PitchingSeasonStats.create(player=player, team=team, season=season, **kwargs)
# ── Shared column-list constants ─────────────────────────────────────────────
_BATTING_STAT_COLS = [
"games",
"pa",
"ab",
"hits",
"doubles",
"triples",
"hr",
"rbi",
"runs",
"bb",
"strikeouts",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
]
_PITCHING_STAT_COLS = [
"games",
"games_started",
"outs",
"strikeouts",
"bb",
"hits_allowed",
"runs_allowed",
"earned_runs",
"hr_allowed",
"hbp",
"wild_pitches",
"balks",
"wins",
"losses",
"holds",
"saves",
"blown_saves",
]
_KEY_COLS = ["player", "team", "season"]
_META_COLS = ["last_game", "last_updated_at"]
# ── Shared index helper ───────────────────────────────────────────────────────
def _get_index_columns(db_conn, table: str) -> set:
"""Return a set of frozensets, each being the column set of one index."""
indexes = db_conn.execute_sql(f"PRAGMA index_list({table})").fetchall()
result = set()
for idx in indexes:
idx_name = idx[1]
cols = db_conn.execute_sql(f"PRAGMA index_info({idx_name})").fetchall()
result.add(frozenset(col[2] for col in cols))
return result
# ── Unit: column completeness ────────────────────────────────────────────────
class TestBattingColumnCompleteness:
"""All required columns are present in BattingSeasonStats."""
EXPECTED_COLS = _BATTING_STAT_COLS
KEY_COLS = _KEY_COLS
META_COLS = _META_COLS
def test_stat_columns_present(self):
"""All batting aggregate columns are present."""
fields = BattingSeasonStats._meta.fields
for col in self.EXPECTED_COLS:
assert col in fields, f"Missing batting column: {col}"
def test_key_columns_present(self):
"""player, team, and season columns are present."""
fields = BattingSeasonStats._meta.fields
for col in self.KEY_COLS:
assert col in fields, f"Missing key column: {col}"
def test_meta_columns_present(self):
"""Meta columns last_game and last_updated_at are present."""
fields = BattingSeasonStats._meta.fields
for col in self.META_COLS:
assert col in fields, f"Missing meta column: {col}"
class TestPitchingColumnCompleteness:
"""All required columns are present in PitchingSeasonStats."""
EXPECTED_COLS = _PITCHING_STAT_COLS
KEY_COLS = _KEY_COLS
META_COLS = _META_COLS
def test_stat_columns_present(self):
"""All pitching aggregate columns are present."""
fields = PitchingSeasonStats._meta.fields
for col in self.EXPECTED_COLS:
assert col in fields, f"Missing pitching column: {col}"
def test_key_columns_present(self):
"""player, team, and season columns are present."""
fields = PitchingSeasonStats._meta.fields
for col in self.KEY_COLS:
assert col in fields, f"Missing key column: {col}"
def test_meta_columns_present(self):
"""Meta columns last_game and last_updated_at are present."""
fields = PitchingSeasonStats._meta.fields
for col in self.META_COLS:
assert col in fields, f"Missing meta column: {col}"
# ── Unit: default values ─────────────────────────────────────────────────────
class TestBattingDefaultValues:
"""All integer stat columns default to 0; nullable meta fields default to None."""
INT_STAT_COLS = _BATTING_STAT_COLS
def test_all_int_columns_default_to_zero(self):
"""Every integer stat column defaults to 0 on an unsaved instance."""
row = BattingSeasonStats()
for col in self.INT_STAT_COLS:
val = getattr(row, col)
assert val == 0, f"Column {col!r} default is {val!r}, expected 0"
def test_last_game_defaults_to_none(self):
"""last_game FK is nullable and defaults to None."""
row = BattingSeasonStats()
assert row.last_game_id is None
def test_last_updated_at_defaults_to_none(self):
"""last_updated_at defaults to None."""
row = BattingSeasonStats()
assert row.last_updated_at is None
class TestPitchingDefaultValues:
"""All integer stat columns default to 0; nullable meta fields default to None."""
INT_STAT_COLS = _PITCHING_STAT_COLS
def test_all_int_columns_default_to_zero(self):
"""Every integer stat column defaults to 0 on an unsaved instance."""
row = PitchingSeasonStats()
for col in self.INT_STAT_COLS:
val = getattr(row, col)
assert val == 0, f"Column {col!r} default is {val!r}, expected 0"
def test_last_game_defaults_to_none(self):
"""last_game FK is nullable and defaults to None."""
row = PitchingSeasonStats()
assert row.last_game_id is None
def test_last_updated_at_defaults_to_none(self):
"""last_updated_at defaults to None."""
row = PitchingSeasonStats()
assert row.last_updated_at is None
# ── Integration: unique constraint ───────────────────────────────────────────
class TestBattingUniqueConstraint:
"""UNIQUE on (player_id, team_id, season) is enforced at the DB level."""
def test_duplicate_raises(self):
"""Inserting a second row for the same (player, team, season) raises IntegrityError."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
make_batting_stats(player, team, season=10)
with pytest.raises(IntegrityError):
make_batting_stats(player, team, season=10)
def test_different_season_allowed(self):
"""Same (player, team) in a different season creates a separate row."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
make_batting_stats(player, team, season=10)
row2 = make_batting_stats(player, team, season=11)
assert row2.id is not None
def test_different_team_allowed(self):
"""Same (player, season) on a different team creates a separate row."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team1 = make_team("TM1", gmid=111)
team2 = make_team("TM2", gmid=222)
make_batting_stats(player, team1, season=10)
row2 = make_batting_stats(player, team2, season=10)
assert row2.id is not None
class TestPitchingUniqueConstraint:
"""UNIQUE on (player_id, team_id, season) is enforced at the DB level."""
def test_duplicate_raises(self):
"""Inserting a second row for the same (player, team, season) raises IntegrityError."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
make_pitching_stats(player, team, season=10)
with pytest.raises(IntegrityError):
make_pitching_stats(player, team, season=10)
def test_different_season_allowed(self):
"""Same (player, team) in a different season creates a separate row."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
make_pitching_stats(player, team, season=10)
row2 = make_pitching_stats(player, team, season=11)
assert row2.id is not None
# ── Integration: delta update pattern ───────────────────────────────────────
class TestBattingDeltaUpdate:
"""Batting stats can be incremented (delta update) without replacing existing values."""
def test_increment_batting_stats(self):
"""Updating pa and hits increments correctly."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
row = make_batting_stats(player, team, season=10, pa=5, hits=2)
BattingSeasonStats.update(
pa=BattingSeasonStats.pa + 3,
hits=BattingSeasonStats.hits + 1,
).where(
(BattingSeasonStats.player == player)
& (BattingSeasonStats.team == team)
& (BattingSeasonStats.season == 10)
).execute()
updated = BattingSeasonStats.get_by_id(row.id)
assert updated.pa == 8
assert updated.hits == 3
def test_last_game_fk_is_nullable(self):
"""last_game FK can be set to a StratGame instance or left NULL."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
row = make_batting_stats(player, team, season=10)
assert row.last_game_id is None
game = make_game(home_team=team, away_team=team)
BattingSeasonStats.update(last_game=game).where(
BattingSeasonStats.id == row.id
).execute()
updated = BattingSeasonStats.get_by_id(row.id)
assert updated.last_game_id == game.id
class TestPitchingDeltaUpdate:
"""Pitching stats can be incremented (delta update) without replacing existing values."""
def test_increment_pitching_stats(self):
"""Updating outs and strikeouts increments correctly."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
row = make_pitching_stats(player, team, season=10, outs=9, strikeouts=3)
PitchingSeasonStats.update(
outs=PitchingSeasonStats.outs + 6,
strikeouts=PitchingSeasonStats.strikeouts + 2,
).where(
(PitchingSeasonStats.player == player)
& (PitchingSeasonStats.team == team)
& (PitchingSeasonStats.season == 10)
).execute()
updated = PitchingSeasonStats.get_by_id(row.id)
assert updated.outs == 15
assert updated.strikeouts == 5
def test_last_game_fk_is_nullable(self):
"""last_game FK can be set to a StratGame instance or left NULL."""
rarity = make_rarity()
cardset = make_cardset()
player = make_player(cardset, rarity)
team = make_team()
row = make_pitching_stats(player, team, season=10)
assert row.last_game_id is None
game = make_game(home_team=team, away_team=team)
PitchingSeasonStats.update(last_game=game).where(
PitchingSeasonStats.id == row.id
).execute()
updated = PitchingSeasonStats.get_by_id(row.id)
assert updated.last_game_id == game.id
# ── Integration: index existence ─────────────────────────────────────────────
class TestBattingIndexExistence:
"""Required indexes exist on batting_season_stats."""
def test_unique_index_on_player_team_season(self, setup_test_db):
"""A unique index covering (player_id, team_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "batting_season_stats")
assert frozenset({"player_id", "team_id", "season"}) in index_sets
def test_index_on_team_season(self, setup_test_db):
"""An index covering (team_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "batting_season_stats")
assert frozenset({"team_id", "season"}) in index_sets
def test_index_on_player_season(self, setup_test_db):
"""An index covering (player_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "batting_season_stats")
assert frozenset({"player_id", "season"}) in index_sets
class TestPitchingIndexExistence:
"""Required indexes exist on pitching_season_stats."""
def test_unique_index_on_player_team_season(self, setup_test_db):
"""A unique index covering (player_id, team_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "pitching_season_stats")
assert frozenset({"player_id", "team_id", "season"}) in index_sets
def test_index_on_team_season(self, setup_test_db):
"""An index covering (team_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "pitching_season_stats")
assert frozenset({"team_id", "season"}) in index_sets
def test_index_on_player_season(self, setup_test_db):
"""An index covering (player_id, season) exists."""
index_sets = _get_index_columns(setup_test_db, "pitching_season_stats")
assert frozenset({"player_id", "season"}) in index_sets