paper-dynasty-database/app/routers_v2/teams.py
Cal Corum 583bde73a9 feat(WP-07): card state API endpoints — closes #72
Add two endpoints for reading EvolutionCardState:

  GET /api/v2/teams/{team_id}/evolutions
    - Optional filters: card_type, tier
    - Pagination: page / per_page (default 10, max 100)
    - Joins EvolutionTrack so card_type filter is a single query
    - Returns {count, items} with full card state + threshold context

  GET /api/v2/evolution/cards/{card_id}
    - Resolves card_id -> (player_id, team_id) via Card table
    - Duplicate cards for same player+team share one state row
    - Returns 404 when card missing or has no evolution state

Both endpoints:
  - Require bearer token auth (valid_token dependency)
  - Embed the EvolutionTrack in each item (not just the FK id)
  - Compute next_threshold: threshold for tier above current (null at T4)
  - Share _build_card_state_response() helper in evolution.py

Also cleans up 30 pre-existing ruff violations in teams.py that were
blocking the pre-commit hook: F541 bare f-strings, E712 boolean
comparisons (now noqa where Peewee ORM requires == False/True),
and F841 unused variable assignments.

Tests: tests/test_evolution_state_api.py — 10 integration tests that
skip automatically without POSTGRES_HOST, following the same pattern as
test_evolution_track_api.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 15:33:02 -05:00

1586 lines
57 KiB
Python

import copy
from datetime import datetime
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException, Response, Query
from typing import Optional, Literal
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import (
db,
Team,
model_to_dict,
fn,
Pack,
Card,
Player,
Paperdex,
Notification,
PackType,
Rarity,
Current,
query_to_csv,
complex_data_to_csv,
CARDSETS,
CardPosition,
BattingCardRatings,
BattingCard,
PitchingCard,
PitchingCardRatings,
StratGame,
LIVE_PROMO_CARDSET_ID,
DoesNotExist,
)
from ..dependencies import (
oauth2_scheme,
valid_token,
PRIVATE_IN_SCHEMA,
)
router = APIRouter(prefix="/api/v2/teams", tags=["teams"])
class TeamModel(pydantic.BaseModel):
abbrev: str
sname: str
lname: str
gmid: int
gmname: str
wallet: int = 0
gsheet: str
team_value: int = 0
collection_value: int = 0
logo: Optional[str] = None
color: Optional[str] = None
season: int
ps_shiny: Optional[int] = 0
ranking: Optional[int] = 1000
has_guide: Optional[bool] = False
is_ai: Optional[bool] = False
@router.get("")
async def get_teams(
season: Optional[int] = None,
gm_id: Optional[int] = None,
abbrev: Optional[str] = None,
tv_min: Optional[int] = None,
tv_max: Optional[int] = None,
cv_min: Optional[int] = None,
cv_max: Optional[int] = None,
ps_shiny_min: Optional[int] = None,
ps_shiny_max: Optional[int] = None,
ranking_min: Optional[int] = None,
ranking_max: Optional[int] = None,
has_guide: Optional[bool] = None,
sname: Optional[str] = None,
lname: Optional[str] = None,
is_ai: Optional[bool] = None,
event_id: Optional[int] = None,
limit: Optional[int] = None,
csv: Optional[bool] = False,
):
"""
Param: season: int
Param: team_abbrev: string
Param: owner_id: int
"""
if season:
all_teams = Team.select_season(season)
else:
all_teams = Team.select()
# if all_teams.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no teams to filter')
if gm_id is not None:
all_teams = all_teams.where(Team.gmid == gm_id)
if abbrev is not None:
all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower())
if sname is not None:
all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower())
if lname is not None:
all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower())
if tv_min is not None:
all_teams = all_teams.where(Team.team_value >= tv_min)
if tv_max is not None:
all_teams = all_teams.where(Team.team_value <= tv_max)
if cv_min is not None:
all_teams = all_teams.where(Team.collection_value >= cv_min)
if cv_max is not None:
all_teams = all_teams.where(Team.collection_value <= cv_max)
if ps_shiny_min is not None:
all_teams = all_teams.where(Team.career >= ps_shiny_min)
if ps_shiny_max is not None:
all_teams = all_teams.where(Team.career <= ps_shiny_max)
if ranking_min is not None:
all_teams = all_teams.where(Team.ranking >= ranking_min)
if ranking_max is not None:
all_teams = all_teams.where(Team.ranking <= ranking_max)
if has_guide is not None:
# Use boolean comparison (PostgreSQL-compatible)
if not has_guide:
all_teams = all_teams.where(Team.has_guide == False) # noqa: E712
else:
all_teams = all_teams.where(Team.has_guide == True) # noqa: E712
if is_ai is not None:
if not is_ai:
all_teams = all_teams.where(Team.is_ai == False) # noqa: E712
else:
all_teams = all_teams.where(Team.is_ai == True) # noqa: E712
if event_id is not None:
all_teams = all_teams.where(Team.event_id == event_id)
# Default ordering for PostgreSQL compatibility
all_teams = all_teams.order_by(Team.id)
if limit is not None:
all_teams = all_teams.limit(limit)
if csv:
return_val = query_to_csv(all_teams, exclude=[Team.career])
return Response(content=return_val, media_type="text/csv")
else:
return_teams = {"count": all_teams.count(), "teams": []}
for x in all_teams:
return_teams["teams"].append(model_to_dict(x))
return return_teams
@router.get("/{team_id}")
async def get_one_team(
team_id: int, inc_packs: bool = True, csv: Optional[bool] = False
):
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
p_query = Pack.select().where(
(Pack.team == this_team) & (Pack.open_time.is_null(True))
)
if csv:
data = model_to_dict(this_team)
data["sealed_packs"] = p_query.count()
return_val = complex_data_to_csv([data])
else:
return_val = model_to_dict(this_team)
if inc_packs:
return_val["sealed_packs"] = [model_to_dict(x) for x in p_query]
return return_val
def get_scouting_dfs(allowed_players, position: str):
logging.info(f"allowed_players: {allowed_players}\nposition: {position}")
positions = CardPosition.select().where(
(CardPosition.player << allowed_players) & (CardPosition.position == position)
)
pos_players = [x.player.player_id for x in positions]
logging.info(f"pos_players: {pos_players}")
if len(pos_players) == 0:
return None
all_cards = BattingCard.select().where(BattingCard.player << pos_players)
all_ratings = BattingCardRatings.select().where(
BattingCardRatings.battingcard << all_cards
)
vl_query = all_ratings.where(BattingCardRatings.vs_hand == "L")
vr_query = all_ratings.where(BattingCardRatings.vs_hand == "R")
vl_vals = [model_to_dict(x) for x in vl_query]
for x in vl_vals:
x.update(x["battingcard"])
x["player_id"] = x["battingcard"]["player"]["player_id"]
x["player_name"] = x["battingcard"]["player"]["p_name"]
x["rarity"] = x["battingcard"]["player"]["rarity"]["name"]
x["cardset_id"] = x["battingcard"]["player"]["cardset"]["id"]
x["cardset_name"] = x["battingcard"]["player"]["cardset"]["name"]
del x["battingcard"], x["player"]
vr_vals = [model_to_dict(x) for x in vr_query]
for x in vr_vals:
x["player_id"] = x["battingcard"]["player"]["player_id"]
del x["battingcard"]
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
bat_df = pd.merge(vl, vr, on="player_id", suffixes=("_vl", "_vr")).set_index(
"player_id", drop=False
)
logging.info(f"cols:\n{list(bat_df.columns)}")
series_list = []
series_list.append(
pd.Series(
dict([(x.player.player_id, x.range) for x in positions]),
name=f"Range {position}",
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.error) for x in positions]),
name=f"Error {position}",
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.innings) for x in positions]),
name=f"Innings {position}",
)
)
if position in ["LF", "CF", "RF"]:
series_list.append(
pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]), name="Arm OF"
)
)
elif position == "C":
series_list.append(
pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]), name="Arm C"
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.pb) for x in positions]), name="PB C"
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.overthrow) for x in positions]),
name="Throw C",
)
)
def get_total_ops(df_data):
ops_vl = df_data["obp_vl"] + df_data["slg_vl"]
ops_vr = df_data["obp_vr"] + df_data["slg_vr"]
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
logging.debug(f"series_list: {series_list}")
ratings = bat_df.join(series_list)
ratings["total_OPS"] = ratings.apply(get_total_ops, axis=1)
return ratings
@router.get("/{team_id}/lineup/{difficulty_name}")
async def get_team_lineup(
team_id: int,
difficulty_name: str,
pitcher_name: str,
build_type: str,
cardset_id: list = Query(default=None),
backup_cardset_id: list = Query(default=None),
):
"""
d_rank: int - 10: best overall, 9: prioritize range, 8: prioritize error
"""
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
raise HTTPException(status_code=404, detail=f"Team id {team_id} not found")
if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition":
raise HTTPException(
status_code=400,
detail=f"Difficulty name {difficulty_name} not a valid check",
)
# all_players = Player.select().where(
# (fn.Lower(Player.p_name) != pitcher_name.lower()) & (Player.mlbclub == this_team.lname)
# )
all_players = Player.select().where(Player.franchise == this_team.sname)
if difficulty_name == "exhibition":
logging.info("pulling an exhibition lineup")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
if backup_cardset_id is not None:
backup_players = all_players.where(Player.cardset_id << backup_cardset_id)
else:
backup_players = all_players.where(
Player.cardset_id << CARDSETS["minor-league"]["primary"]
)
else:
legal_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["primary"]
)
if "secondary" in CARDSETS[difficulty_name]:
backup_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["secondary"]
)
else:
backup_players = None
logging.info(f"legal_players: {legal_players.count()}")
logging.info(f"legal query: {legal_players}")
if backup_players is not None:
logging.info(f"backup_players: {backup_players.count()}")
player_names = []
starting_nine = {
"C": {"player": None, "vl": None, "vr": None, "ops": 0},
"1B": {"player": None, "vl": None, "vr": None, "ops": 0},
"2B": {"player": None, "vl": None, "vr": None, "ops": 0},
"3B": {"player": None, "vl": None, "vr": None, "ops": 0},
"SS": {"player": None, "vl": None, "vr": None, "ops": 0},
"LF": {"player": None, "vl": None, "vr": None, "ops": 0},
"CF": {"player": None, "vl": None, "vr": None, "ops": 0},
"RF": {"player": None, "vl": None, "vr": None, "ops": 0},
"DH": {"player": None, "vl": None, "vr": None, "ops": 0},
}
# Batch-fetch BattingCards and ratings for all candidate players to avoid
# per-player DB round trips inside the lineup construction loop below.
if backup_players is not None:
_batch_bcards = BattingCard.select().where(
(BattingCard.player << legal_players)
| (BattingCard.player << backup_players)
)
else:
_batch_bcards = BattingCard.select().where(BattingCard.player << legal_players)
_batting_cards_by_player = {bc.player_id: bc for bc in _batch_bcards}
_all_bratings = (
BattingCardRatings.select().where(
BattingCardRatings.battingcard << list(_batting_cards_by_player.values())
)
if _batting_cards_by_player
else []
)
_ratings_by_card_hand = {}
for _r in _all_bratings:
_ratings_by_card_hand.setdefault(_r.battingcard_id, {})[_r.vs_hand] = _r
def get_bratings(player_id):
this_bcard = _batting_cards_by_player.get(player_id)
card_ratings = (
_ratings_by_card_hand.get(this_bcard.id, {}) if this_bcard else {}
)
vl_ratings = card_ratings.get("L")
vr_ratings = card_ratings.get("R")
vl_ops = vl_ratings.obp + vl_ratings.slg
vr_ops = vr_ratings.obp + vr_ratings.slg
return (
model_to_dict(vl_ratings),
model_to_dict(vr_ratings),
(vl_ops + vr_ops + min(vl_ops, vr_ops)) / 3,
)
# IDEA: Rank guys by their bat per-position and take the best one that meets a threshold of defensive ability
for position in starting_nine.keys():
if position == "DH":
# all_bcards = BattingCard.select().where(BattingCard.player << legal_players)
# all_batters = BattingCardRatings.select().where(
# BattingCardRatings.battingcard << all_bcards
# ).order_by(BattingCardRatings.obp + BattingCardRatings.sl)
#
# for x in all_batters:
# if x.battingcard.player.p_name not in player_names:
# starting_nine['DH'] = x.battingcard.player
# break
logging.debug("Searching for a DH!")
dh_query = legal_players.order_by(Player.cost.desc())
for x in dh_query:
logging.debug(f"checking {x.p_name} for {position}")
if x.p_name not in player_names and "P" not in x.pos_1:
logging.debug("adding!")
starting_nine["DH"]["player"] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError:
logging.debug("Could not find batting lines")
else:
# starting_nine[position]['vl'] = vl
# starting_nine[position]['vr'] = vr
starting_nine[position]["vl"] = vl["obp"] + vl["slg"]
starting_nine[position]["vr"] = vr["obp"] + vr["slg"]
starting_nine["DH"]["ops"] = total_ops
player_names.append(x.p_name)
break
if starting_nine["DH"]["player"] is None:
dh_query = backup_players.order_by(Player.cost.desc())
for x in dh_query:
logging.debug(f"checking {x.p_name} for {position}")
if x.p_name not in player_names:
logging.debug("adding!")
starting_nine["DH"]["player"] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError:
logging.debug("Could not find batting lines")
else:
vl, vr, total_ops = get_bratings(x.player_id)
starting_nine[position]["vl"] = vl["obp"] + vl["slg"]
starting_nine[position]["vr"] = vr["obp"] + vr["slg"]
starting_nine["DH"]["ops"] = total_ops
player_names.append(x.p_name)
break
else:
pos_group = CardPosition.select().where(
(CardPosition.position == position)
& (CardPosition.player << legal_players)
)
backup_group = (
CardPosition.select()
.where(
(CardPosition.position == position)
& (CardPosition.player << backup_players)
)
.order_by(CardPosition.innings.desc())
)
if difficulty_name in ["minor-league", "gauntlet-3", "gauntlet-5"]:
pos_group = pos_group.order_by(CardPosition.innings.desc())
for x in pos_group:
logging.debug(f"checking {x.player.p_name} for {position}")
if (
x.player.p_name not in player_names
and x.player.p_name.lower() != pitcher_name
):
logging.debug("adding!")
starting_nine[position]["player"] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]["vl"] = vl
starting_nine[position]["vr"] = vr
# starting_nine[position]['vl'] = vl.obp_vl + vl.slg_vl
# starting_nine[position]['vr'] = vr.obp_vr + vr.slg_vr
starting_nine[position]["ops"] = total_ops
player_names.append(x.player.p_name)
break
# elif difficulty_name in ['major-league', 'flashback', 'hall-of-fame']:
else:
logging.debug(f"entering difficulty: {difficulty_name}")
eligible_cards = get_scouting_dfs(legal_players, position)
logging.debug(f"got dataframe:\n{eligible_cards}")
# if position == '1B':
# return Response(content=eligible_cards.to_csv(index=False), media_type='text/csv')
def rank_cards(df_data):
if position in ["C", "SS", "2B", "CF"]:
r_mult = 0.05
e_mult = -0.01
else:
r_mult = 0.025
e_mult = -0.005
r_mod = (3 - df_data[f"Range {position}"]) * r_mult
e_mod = df_data[f"Error {position}"] * e_mult
i_mult = df_data[f"Innings {position}"] / 1000
# final_ops = df_data['total_OPS'] + r_mod + e_mod
# final_ops = (df_data['total_OPS'] * i_mult) + r_mod + e_mod
final_ops = (df_data["total_OPS"] + r_mod + e_mod) * i_mult
logging.debug(
f"{df_data.player_name} total OPS: {df_data.total_OPS} / "
f"final OPS: {final_ops}"
)
return final_ops
if eligible_cards is not None and len(eligible_cards.index) >= 1:
eligible_cards["final_ops"] = eligible_cards.apply(
rank_cards, axis=1
)
logging.debug(f"final_ops:\n{eligible_cards['final_ops']}")
eligible_cards.sort_values(
by=["final_ops"], ascending=False, inplace=True
)
this_row = None
for x in range(len(eligible_cards.index)):
if eligible_cards.iloc[x].player_name not in player_names:
this_row = eligible_cards.iloc[x]
break
if this_row is not None:
starting_nine[position]["player"] = model_to_dict(
Player.get_by_id(this_row.player_id)
)
starting_nine[position]["vl"] = (
this_row.obp_vl + this_row.slg_vl
)
starting_nine[position]["vr"] = (
this_row.obp_vr + this_row.slg_vr
)
starting_nine[position]["ops"] = this_row.total_OPS
player_names.append(this_row.player_name)
logging.debug(
f"pos_group: {pos_group}\n{starting_nine}\n{player_names}\n\n"
)
if starting_nine[position]["player"] is None:
for x in backup_group:
logging.info(f"checking {x.player.p_name} for {position}")
if (
x.player.p_name not in player_names
and x.player.p_name.lower() != pitcher_name
):
logging.debug("adding!")
starting_nine[position]["player"] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]["vl"] = vl["obp"] + vl["slg"]
starting_nine[position]["vr"] = vr["obp"] + vr["slg"]
starting_nine[position]["ops"] = total_ops
player_names.append(x.player.p_name)
break
# all_bcards = BattingCard.select().where(BattingCard.player << starting_nine.values())
# all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_bcards)
#
# vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
# vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
#
# vl_vals = [model_to_dict(x) for x in vl_query]
# for x in vl_vals:
# x.update(x['battingcard'])
# x['player_id'] = x['battingcard']['player']['player_id']
# x['player_name'] = x['battingcard']['player']['p_name']
# x['rarity'] = x['battingcard']['player']['rarity']['name']
# x['cardset_id'] = x['battingcard']['player']['cardset']['id']
# x['cardset_name'] = x['battingcard']['player']['cardset']['name']
# del x['player']
#
# vr_vals = [model_to_dict(x) for x in vr_query]
# for x in vr_vals:
# x['player_id'] = x['battingcard']['player']['player_id']
# del x['battingcard']
#
# vl = pd.DataFrame(vl_vals)
# vr = pd.DataFrame(vr_vals)
# db.close()
#
# output = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr'))
#
# def get_total_ops(df_data):
# ops_vl = df_data['obp_vL'] + df_data['slg_vL']
# ops_vr = df_data['obp_vR'] + df_data['slg_vR']
# return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
# output['total_OPS'] = output.apply(get_total_ops, axis=1)
# output = output.sort_values(by=['total_OPS'], ascending=False)
sorted_nine = sorted(
starting_nine.items(), key=lambda item: item[1]["ops"], reverse=True
)
return {"json": dict(sorted_nine), "array": sorted_nine}
def sort_pitchers(pitching_card_query) -> DataFrame | None:
all_s = [model_to_dict(x, recurse=False) for x in pitching_card_query]
if len(all_s) == 0:
logging.error(f"Empty pitching_card_query: {pitching_card_query}")
return None
pitcher_df = pd.DataFrame(all_s).set_index("player", drop=False)
logging.debug(f"pitcher_df: {pitcher_df}")
card_ids = pitcher_df["id"].tolist()
ratings_map = {
(r.pitchingcard_id, r.vs_hand): r
for r in PitchingCardRatings.select().where(
(PitchingCardRatings.pitchingcard_id << card_ids)
& (PitchingCardRatings.vs_hand << ["L", "R"])
)
}
def get_total_ops(df_data):
vlval = ratings_map.get((df_data["id"], "L"))
vrval = ratings_map.get((df_data["id"], "R"))
if vlval is None or vrval is None:
return float("inf")
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
# Weight the weaker split (higher OPS allowed) so platoon weaknesses are penalized.
# Starters face both LHH and RHH, so vulnerability against either hand matters.
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
pitcher_df["total_ops"] = pitcher_df.apply(get_total_ops, axis=1)
return pitcher_df.sort_values(by="total_ops")
@router.get("/{team_id}/sp/{difficulty_name}")
async def get_team_sp(
team_id: int,
difficulty_name: str,
sp_rank: int,
cardset_id: list = Query(default=None),
backup_cardset_id: list = Query(default=None),
):
logging.info(
f"get_team_sp - team_id: {team_id} / difficulty_name: {difficulty_name} / sp_rank: {sp_rank}"
)
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
raise HTTPException(status_code=404, detail=f"Team id {team_id} not found")
if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition":
raise HTTPException(
status_code=400,
detail=f"Difficulty name {difficulty_name} not a valid check",
)
all_players = Player.select().where(Player.franchise == this_team.sname)
if difficulty_name == "exhibition":
logging.info("pulling an exhibition lineup")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
if backup_cardset_id is not None:
backup_players = all_players.where(Player.cardset_id << backup_cardset_id)
else:
backup_players = all_players.where(
Player.cardset_id << CARDSETS["minor-league"]["primary"]
)
else:
legal_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["primary"]
)
if "secondary" in CARDSETS[difficulty_name]:
backup_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["secondary"]
)
else:
backup_players = None
def sort_starters(starter_query) -> DataFrame | None:
all_s = [model_to_dict(x, recurse=False) for x in starter_query]
if len(all_s) == 0:
logging.error(f"Empty starter_query: {starter_query}")
return None
starter_df = pd.DataFrame(all_s).set_index("player", drop=False)
logging.debug(f"starter_df: {starter_df}")
card_ids = starter_df["id"].tolist()
ratings_map = {
(r.pitchingcard_id, r.vs_hand): r
for r in PitchingCardRatings.select().where(
(PitchingCardRatings.pitchingcard_id << card_ids)
& (PitchingCardRatings.vs_hand << ["L", "R"])
)
}
def get_total_ops(df_data):
vlval = ratings_map.get((df_data["id"], "L"))
vrval = ratings_map.get((df_data["id"], "R"))
if vlval is None or vrval is None:
return float("inf")
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
# Weight the weaker split (higher OPS allowed) so platoon weaknesses are penalized.
return (ops_vr + ops_vl + max(ops_vl, ops_vr)) / 3
starter_df["total_ops"] = starter_df.apply(get_total_ops, axis=1)
return starter_df.sort_values(by="total_ops")
# Find SP in primary cardsets
s_query = (
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players) & (PitchingCard.starter_rating >= 4)
)
)
all_starters = sort_starters(s_query)
logging.debug(f"sorted: {all_starters}")
if all_starters is not None and len(all_starters.index) >= sp_rank:
this_player_id = all_starters.iloc[sp_rank - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
return this_player
if all_starters is not None and len(all_starters.index) > 0:
this_player_id = all_starters.iloc[len(all_starters.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
return this_player
# Include backup cardsets
s_query = PitchingCard.select().where(
(PitchingCard.player << backup_players) & (PitchingCard.starter_rating >= 4)
)
all_starters = sort_starters(s_query)
logging.debug(f"sorted: {all_starters}")
if all_starters is not None and len(all_starters.index) >= sp_rank:
this_player_id = all_starters.iloc[sp_rank - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
return this_player
if all_starters is not None and len(all_starters.index) > 0:
this_player_id = all_starters.iloc[len(all_starters.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
return this_player
raise HTTPException(
status_code=400, detail=f"No SP #{sp_rank} found for Team {team_id}"
)
@router.get("/{team_id}/rp/{difficulty_name}")
async def get_team_rp(
team_id: int,
difficulty_name: str,
need: Literal["length", "setup", "closer", "middle"],
used_pitcher_ids: list = Query(default=[]),
cardset_id: list = Query(default=None),
backup_cardset_id: list = Query(default=None),
):
logging.info(
f"get_team_rp - team_id: {team_id} / difficulty_name: {difficulty_name} / need: {need} "
f"/ used_pitcher_ids: {used_pitcher_ids}"
)
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
raise HTTPException(status_code=404, detail=f"Team id {team_id} not found")
if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition":
raise HTTPException(
status_code=400,
detail=f"Difficulty name {difficulty_name} not a valid check",
)
all_players = Player.select().where(
(Player.franchise == this_team.sname)
& (Player.player_id.not_in(used_pitcher_ids))
)
if difficulty_name == "exhibition":
logging.info("pulling an exhibition RP")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
if backup_cardset_id is not None:
backup_players = all_players.where(Player.cardset_id << backup_cardset_id)
else:
backup_players = all_players.where(
Player.cardset_id << CARDSETS["minor-league"]["primary"]
)
else:
legal_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["primary"]
)
if "secondary" in CARDSETS[difficulty_name]:
backup_players = all_players.where(
Player.cardset_id << CARDSETS[difficulty_name]["secondary"]
)
else:
backup_players = None
logging.info(f"legal_players: {legal_players.count()}")
logging.info(f"legal query: {legal_players}")
if need == "closer":
for query in [
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.closer_rating >= 3)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.closer_rating >= 1)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.closer_rating >= 3)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.closer_rating >= 1)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.starter_rating < 4)
),
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None:
logging.info(f"RP query: {query}")
this_player_id = all_relievers.iloc[0].player
this_player = model_to_dict(
Player.get_by_id(this_player_id), recurse=False
)
return this_player
elif need == "setup":
for query in [
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.starter_rating < 4)
),
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None and len(all_relievers.index) >= 2:
this_player_id = all_relievers.iloc[1].player
this_player = model_to_dict(
Player.get_by_id(this_player_id), recurse=False
)
return this_player
elif need == "length" or len(used_pitcher_ids) > 4:
for query in [
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.relief_rating >= 3)
& (PitchingCard.starter_rating < 4)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.relief_rating >= 2)
& (PitchingCard.starter_rating < 4)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.relief_rating >= 2)
& (PitchingCard.starter_rating < 4)
),
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None:
this_player_id = all_relievers.iloc[0].player
this_player = model_to_dict(
Player.get_by_id(this_player_id), recurse=False
)
return this_player
elif need == "middle":
for query in [
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << legal_players)
& (PitchingCard.starter_rating == 1)
),
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
& (PitchingCard.starter_rating < 4)
),
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None and len(all_relievers.index) >= 3:
this_player_id = all_relievers.iloc[2].player
this_player = model_to_dict(
Player.get_by_id(this_player_id), recurse=False
)
return this_player
logging.info("Falling to last chance pitcher")
all_relievers = sort_pitchers(
PitchingCard.select()
.join(Player)
.where(
(PitchingCard.player << backup_players)
| (PitchingCard.player << legal_players)
)
)
if all_relievers is not None:
this_player_id = all_relievers.iloc[len(all_relievers.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
return this_player
raise HTTPException(status_code=400, detail=f"No RP found for Team {team_id}")
@router.get("/{team_id}/season-record/{season}")
async def get_team_record(team_id: int, season: int):
all_games = StratGame.select().where(
((StratGame.away_team_id == team_id) | (StratGame.home_team_id == team_id))
& (StratGame.season == season)
& (StratGame.short_game == False) # noqa: E712
)
template = {
"ARI": [0, 0, 0],
"ATL": [0, 0, 0],
"BAL": [0, 0, 0],
"BOS": [0, 0, 0],
"CHC": [0, 0, 0],
"CHW": [0, 0, 0],
"CIN": [0, 0, 0],
"CLE": [0, 0, 0],
"COL": [0, 0, 0],
"DET": [0, 0, 0],
"NYY": [0, 0, 0],
"TBR": [0, 0, 0],
"TOR": [0, 0, 0],
"PHI": [0, 0, 0],
"MIA": [0, 0, 0],
"NYM": [0, 0, 0],
"WSN": [0, 0, 0],
"MIN": [0, 0, 0],
"KCR": [0, 0, 0],
"HOU": [0, 0, 0],
"TEX": [0, 0, 0],
"SEA": [0, 0, 0],
"LAA": [0, 0, 0],
"OAK": [0, 0, 0],
"MIL": [0, 0, 0],
"PIT": [0, 0, 0],
"STL": [0, 0, 0],
"LAD": [0, 0, 0],
"SDP": [0, 0, 0],
"SFG": [0, 0, 0],
"ALAS": [0, 0, 0],
"NLAS": [0, 0, 0],
}
standings = {
"minor-league": copy.deepcopy(template),
"major-league": copy.deepcopy(template),
"hall-of-fame": copy.deepcopy(template),
"flashback": copy.deepcopy(template),
"unlimited": copy.deepcopy(template),
"ranked": copy.deepcopy(template),
"exhibition": copy.deepcopy(template),
}
for game in all_games:
run_diff = game.away_score - game.home_score
if run_diff > 0: # Away team won
if game.away_team_id == team_id: # Human is away team
standings[game.game_type][game.home_team.abbrev][0] += 1
standings[game.game_type][game.home_team.abbrev][2] += run_diff
else: # Human is home team
standings[game.game_type][game.away_team.abbrev][1] += 1
standings[game.game_type][game.away_team.abbrev][2] -= run_diff
elif run_diff < 0: # Home team won
if game.away_team_id == team_id: # Human is away team
if game.home_team.abbrev not in standings[game.game_type]:
standings[game.game_type][game.home_team.abbrev] = [0, 0, 0]
standings[game.game_type][game.home_team.abbrev][1] += 1
standings[game.game_type][game.home_team.abbrev][2] -= run_diff
else: # Human is home team
if game.away_team.abbrev not in standings[game.game_type]:
standings[game.game_type][game.away_team.abbrev] = [0, 0, 0]
standings[game.game_type][game.away_team.abbrev][0] += 1
standings[game.game_type][game.away_team.abbrev][2] -= run_diff
# for lg_query in [minor_games, major_games, hof_games]:
# this_lg = copy.deepcopy(template)
# for x in range(1, 30):
# team_games = lg_query.where((StratGame.away_team_id == x) | (StratGame.home_team_id == x))
# for game in team_games:
return standings
@router.get("/{team_id}/buy/players", include_in_schema=PRIVATE_IN_SCHEMA)
async def team_buy_players(team_id: int, ids: str, ts: str):
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
if ts != this_team.team_hash():
logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})")
raise HTTPException(
status_code=401,
detail=f"You are not authorized to buy {this_team.abbrev} cards. This event has been logged.",
)
all_ids = ids.split(",")
conf_message = ""
total_cost = 0
for player_id in all_ids:
if player_id != "":
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404,
detail=f"No player found with id {player_id} /// "
f"{conf_message} purchased",
)
# check wallet balance
if this_team.wallet < this_player.cost:
logging.error(
f"{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but "
f"{this_player} costs {this_player.cost}₼."
)
raise HTTPException(
200,
detail=f"{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but "
f"{this_player} costs {this_player.cost}₼. /// {conf_message} purchased",
)
# Create player card and update cost
buy_price = this_player.cost
total_cost += buy_price
this_card = Card(
player_id=this_player.player_id, team_id=this_team.id, value=buy_price
)
Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id)
this_card.save()
this_player.change_on_buy()
# Deduct card cost from team
logging.info(f"{this_team.abbrev} starting wallet: {this_team.wallet}")
this_team.wallet -= buy_price
this_team.save()
logging.info(f"{this_team.abbrev} ending wallet: {this_team.wallet}")
# Post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=datetime.now(),
title="Price Change",
desc="Modified by buying and selling",
field_name=f"{this_player.description} "
f"{this_player.p_name if this_player.p_name not in this_player.description else ''}",
message=f"From {buy_price}₼ 📈 to **{this_player.cost}**₼",
about=f"Player-{this_player.player_id}",
)
new_notif.save()
conf_message += (
f"{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} "
f"({this_player.cardset.name}), "
)
# sheets.post_new_cards(SHEETS_AUTH, lc_id)
raise HTTPException(
status_code=200,
detail=f"{conf_message} purchased. /// Total Cost: {total_cost}₼ /// "
f"Final Wallet: {this_team.wallet}",
)
@router.get("/{team_id}/buy/pack/{packtype_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def team_buy_packs(
team_id: int, packtype_id: int, ts: str, quantity: Optional[int] = 1
):
try:
this_packtype = PackType.get_by_id(packtype_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No pack type found with id {packtype_id}"
)
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
if ts != this_team.team_hash():
logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})")
logging.warning(
f"team: {this_team} / pack_type: {this_packtype} / secret: {ts} / "
f"actual: {this_team.team_hash()}"
)
raise HTTPException(
status_code=401,
detail=f"You are not authorized to buy {this_team.abbrev} packs. This event has been logged.",
)
# check wallet balance
total_cost = this_packtype.cost * quantity
if this_team.wallet < total_cost:
raise HTTPException(
200,
detail=f"{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but "
f"{this_packtype} costs {this_packtype.cost}.",
)
all_packs = []
cardset_id = None
if packtype_id == 9:
cardset_id = LIVE_PROMO_CARDSET_ID
for i in range(quantity):
all_packs.append(
Pack(
team_id=this_team.id,
pack_type_id=this_packtype.id,
pack_cardset_id=cardset_id,
)
)
# Deduct card cost from team
logging.info(f"{this_team.abbrev} starting wallet: {this_team.wallet}")
this_team.wallet -= total_cost
this_team.save()
logging.info(f"{this_team.abbrev} ending wallet: {this_team.wallet}")
with db.atomic():
Pack.bulk_create(all_packs, batch_size=15)
raise HTTPException(
status_code=200,
detail=f"Quantity {quantity} {this_packtype.name} pack{'s' if quantity > 1 else ''} have been purchased by "
f"{this_team.lname} for {total_cost} bucks. You may close this window.",
)
@router.get("/{team_id}/sell/cards", include_in_schema=PRIVATE_IN_SCHEMA)
async def team_sell_cards(team_id: int, ids: str, ts: str):
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
if ts != this_team.team_hash():
logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})")
raise HTTPException(
status_code=401,
detail=f"You are not authorized to sell {this_team.abbrev} cards. This event has been logged.",
)
all_ids = ids.split(",")
del_ids = []
conf_message = ""
total_cost = 0
for card_id in all_ids:
if card_id != "":
try:
this_card = Card.get_by_id(card_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No card found with id {card_id}"
)
del_ids.append(card_id)
this_player = this_card.player
if this_card.team != this_team:
raise HTTPException(
status_code=401,
detail=f"Card id {card_id} ({this_player.p_name}) belongs to "
f"{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold",
)
orig_price = this_player.cost
sell_price = round(this_player.cost * 0.5)
total_cost += sell_price
# credit selling team's wallet
if this_team.wallet is None:
this_team.wallet = sell_price
else:
this_team.wallet += sell_price
this_team.save()
# decrease price of player
this_player.change_on_sell()
this_card.delete_instance()
# post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=datetime.now(),
title="Price Change",
desc="Modified by buying and selling",
field_name=f"{this_player.description} "
f"{this_player.p_name if this_player.p_name not in this_player.description else ''}",
message=f"From {orig_price}₼ 📉 to **{this_player.cost}**₼",
about=f"Player-{this_player.id}",
)
new_notif.save()
conf_message += (
f"{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} "
f"({this_player.cardset.name}), "
)
# sheets.post_deletion(SHEETS_AUTH, del_ids)
raise HTTPException(
status_code=200,
detail=f"{conf_message} sold. /// Total Earned: {total_cost}₼ /// "
f"Final Wallet: {this_team.wallet}",
)
@router.get("/{team_id}/cards")
async def get_team_cards(team_id, csv: Optional[bool] = True):
"""
CSV output specifically targeting team roster sheet
Parameters
----------
team_id
csv
"""
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
if not csv:
raise HTTPException(
status_code=400,
detail="The /teams/{team_id}/cards endpoint only supports csv output.",
)
all_cards = (
Card.select()
.join(Player)
.join(Rarity)
.where(Card.team == this_team)
.order_by(-Card.player.rarity.value, Card.player.p_name)
)
if all_cards.count() == 0:
raise HTTPException(status_code=404, detail="No cards found")
card_vals = [model_to_dict(x) for x in all_cards]
for x in card_vals:
x.update(x["player"])
x["player_id"] = x["player"]["player_id"]
x["player_name"] = x["player"]["p_name"]
x["cardset_id"] = x["player"]["cardset"]["id"]
x["cardset_name"] = x["player"]["cardset"]["name"]
x["rarity"] = x["player"]["rarity"]["name"]
x["card_id"] = x["id"]
card_df = pd.DataFrame(card_vals)
output = card_df[
[
"cardset_name",
"player_name",
"rarity",
"image",
"image2",
"pos_1",
"pos_2",
"pos_3",
"pos_4",
"pos_5",
"pos_6",
"pos_7",
"pos_8",
"cost",
"mlbclub",
"franchise",
"fangr_id",
"bbref_id",
"player_id",
"card_id",
]
]
return Response(
content=pd.DataFrame(output).to_csv(index=False), media_type="text/csv"
)
@router.post("", include_in_schema=PRIVATE_IN_SCHEMA)
async def post_team(team: TeamModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to post teams. This event has been logged.",
)
dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev)
if dupe_team:
raise HTTPException(
status_code=400,
detail=f"There is already a season {team.season} team using {team.abbrev}",
)
this_team = Team(
abbrev=team.abbrev,
sname=team.sname,
lname=team.lname,
gmid=team.gmid,
gmname=team.gmname,
wallet=team.wallet,
gsheet=team.gsheet,
team_value=team.team_value,
collection_value=team.collection_value,
logo=team.logo,
color=team.color,
ranking=team.ranking,
season=team.season,
career=team.ps_shiny,
has_guide=team.has_guide,
is_ai=team.is_ai,
)
saved = this_team.save()
if saved == 1:
return_team = model_to_dict(this_team)
return return_team
else:
raise HTTPException(
status_code=418,
detail="Well slap my ass and call me a teapot; I could not save that team",
)
@router.post("/new-season/{new_season}", include_in_schema=PRIVATE_IN_SCHEMA)
async def team_season_update(new_season: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to post teams. This event has been logged.",
)
Team.update(
ranking=1000, season=new_season, wallet=Team.wallet + 250, has_guide=False
).execute()
current = Current.latest()
current.season = new_season
current.save()
return {
"detail": f"Team rankings, season, guides, and wallets updated for season {new_season}"
}
@router.post("/{team_id}/money/{delta}", include_in_schema=PRIVATE_IN_SCHEMA)
async def team_update_money(
team_id: int, delta: int, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to adjust wallets. This event has been logged.",
)
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
this_team.wallet += delta
if this_team.save() == 1:
return_team = model_to_dict(this_team)
return return_team
else:
raise HTTPException(
status_code=418,
detail="Well slap my ass and call me a teapot; I could not save that team",
)
@router.patch("/{team_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def patch_team(
team_id,
sname: Optional[str] = None,
lname: Optional[str] = None,
gmid: Optional[int] = None,
gmname: Optional[str] = None,
gsheet: Optional[str] = None,
team_value: Optional[int] = None,
collection_value: Optional[int] = None,
logo: Optional[str] = None,
color: Optional[str] = None,
season: Optional[int] = None,
ps_shiny: Optional[int] = None,
wallet_delta: Optional[int] = None,
has_guide: Optional[bool] = None,
is_ai: Optional[bool] = None,
ranking: Optional[int] = None,
token: str = Depends(oauth2_scheme),
abbrev: Optional[str] = None,
):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to delete teams. This event has been logged.",
)
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
if abbrev is not None:
this_team.abbrev = abbrev
if sname is not None:
this_team.sname = sname
if lname is not None:
this_team.lname = lname
if gmid is not None:
this_team.gmid = gmid
if gmname is not None:
this_team.gmname = gmname
if gsheet is not None:
this_team.gsheet = gsheet
if team_value is not None:
this_team.team_value = team_value
if collection_value is not None:
this_team.collection_value = collection_value
if logo is not None:
this_team.logo = logo
if color is not None:
this_team.color = color
if season is not None:
this_team.season = season
if ps_shiny is not None:
this_team.career = ps_shiny
if ranking is not None:
this_team.ranking = ranking
if wallet_delta is not None:
this_team.wallet += wallet_delta
if has_guide is not None:
if has_guide:
this_team.has_guide = 1
else:
this_team.has_guide = 0
if is_ai is not None:
if is_ai:
this_team.is_ai = 1
else:
this_team.is_ai = 0
if this_team.save() == 1:
return_team = model_to_dict(this_team)
return return_team
else:
raise HTTPException(
status_code=418,
detail="Well slap my ass and call me a teapot; I could not save that team",
)
@router.delete("/{team_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_team(team_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to delete teams. This event has been logged.",
)
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No team found with id {team_id}")
count = this_team.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f"Team {team_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f"Team {team_id} was not deleted")
@router.get("/{team_id}/evolutions")
async def list_team_evolutions(
team_id: int,
card_type: Optional[str] = Query(default=None),
tier: Optional[int] = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=10, ge=1, le=100),
token: str = Depends(oauth2_scheme),
):
"""List all EvolutionCardState rows for a team, with optional filters.
Joins EvolutionCardState to EvolutionTrack so that card_type filtering
works without a second query. Results are paginated via page/per_page
(1-indexed pages); items are ordered by player_id for stable ordering.
Query parameters:
card_type -- filter to states whose track.card_type matches (e.g. 'batter', 'sp')
tier -- filter to states at a specific current_tier (0-4)
page -- 1-indexed page number (default 1)
per_page -- items per page (default 10, max 100)
Response shape:
{"count": N, "items": [card_state_with_threshold_context, ...]}
Each item in 'items' has the same shape as GET /evolution/cards/{card_id}.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import EvolutionCardState, EvolutionTrack
from ..routers_v2.evolution import _build_card_state_response
query = (
EvolutionCardState.select(EvolutionCardState, EvolutionTrack)
.join(EvolutionTrack)
.where(EvolutionCardState.team == team_id)
.order_by(EvolutionCardState.player_id)
)
if card_type is not None:
query = query.where(EvolutionTrack.card_type == card_type)
if tier is not None:
query = query.where(EvolutionCardState.current_tier == tier)
total = query.count()
offset = (page - 1) * per_page
page_query = query.offset(offset).limit(per_page)
items = [_build_card_state_response(state) for state in page_query]
return {"count": total, "items": items}