feat(WP-07): card state API endpoints — closes #72

Add two endpoints for reading EvolutionCardState:

  GET /api/v2/teams/{team_id}/evolutions
    - Optional filters: card_type, tier
    - Pagination: page / per_page (default 10, max 100)
    - Joins EvolutionTrack so card_type filter is a single query
    - Returns {count, items} with full card state + threshold context

  GET /api/v2/evolution/cards/{card_id}
    - Resolves card_id -> (player_id, team_id) via Card table
    - Duplicate cards for same player+team share one state row
    - Returns 404 when card missing or has no evolution state

Both endpoints:
  - Require bearer token auth (valid_token dependency)
  - Embed the EvolutionTrack in each item (not just the FK id)
  - Compute next_threshold: threshold for tier above current (null at T4)
  - Share _build_card_state_response() helper in evolution.py

Also cleans up 30 pre-existing ruff violations in teams.py that were
blocking the pre-commit hook: F541 bare f-strings, E712 boolean
comparisons (now noqa where Peewee ORM requires == False/True),
and F841 unused variable assignments.

Tests: tests/test_evolution_state_api.py — 10 integration tests that
skip automatically without POSTGRES_HOST, following the same pattern as
test_evolution_track_api.py.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-03-18 13:48:05 -05:00
parent f12aa858c1
commit 4a03f98803
3 changed files with 782 additions and 31 deletions

View File

@ -7,6 +7,50 @@ from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/evolution", tags=["evolution"])
# Tier -> threshold attribute name. Index = current_tier; value is the
# attribute on EvolutionTrack whose value is the *next* threshold to reach.
# Tier 4 is fully evolved so there is no next threshold (None sentinel).
_NEXT_THRESHOLD_ATTR = {
0: "t1_threshold",
1: "t2_threshold",
2: "t3_threshold",
3: "t4_threshold",
4: None,
}
def _build_card_state_response(state) -> dict:
"""Serialise an EvolutionCardState into the standard API response shape.
Produces a flat dict with player_id and team_id as plain integers,
a nested 'track' dict with all threshold fields, and a computed
'next_threshold' field:
- For tiers 0-3: the threshold value for the tier immediately above.
- For tier 4 (fully evolved): None.
Uses model_to_dict(recurse=False) internally so FK fields are returned
as IDs rather than nested objects, then promotes the needed IDs up to
the top level.
"""
track = state.track
track_dict = model_to_dict(track, recurse=False)
next_attr = _NEXT_THRESHOLD_ATTR.get(state.current_tier)
next_threshold = getattr(track, next_attr) if next_attr else None
return {
"player_id": state.player_id,
"team_id": state.team_id,
"current_tier": state.current_tier,
"current_value": state.current_value,
"fully_evolved": state.fully_evolved,
"last_evaluated_at": (
state.last_evaluated_at.isoformat() if state.last_evaluated_at else None
),
"track": track_dict,
"next_threshold": next_threshold,
}
@router.get("/tracks")
async def list_tracks(
@ -41,3 +85,49 @@ async def get_track(track_id: int, token: str = Depends(oauth2_scheme)):
raise HTTPException(status_code=404, detail=f"Track {track_id} not found")
return model_to_dict(track, recurse=False)
@router.get("/cards/{card_id}")
async def get_card_state(card_id: int, token: str = Depends(oauth2_scheme)):
"""Return the EvolutionCardState for a card identified by its Card.id.
Resolves card_id -> (player_id, team_id) via the Card table, then looks
up the matching EvolutionCardState row. Because duplicate cards for the
same player+team share one state row (unique-(player,team) constraint),
any card_id belonging to that player on that team returns the same state.
Returns 404 when:
- The card_id does not exist in the Card table.
- The card exists but has no corresponding EvolutionCardState yet.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import Card, EvolutionCardState, EvolutionTrack, DoesNotExist
# Resolve card_id to player+team
try:
card = Card.get_by_id(card_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"Card {card_id} not found")
# Look up the evolution state for this (player, team) pair, joining the
# track so a single query resolves both rows.
try:
state = (
EvolutionCardState.select(EvolutionCardState, EvolutionTrack)
.join(EvolutionTrack)
.where(
(EvolutionCardState.player == card.player_id)
& (EvolutionCardState.team == card.team_id)
)
.get()
)
except DoesNotExist:
raise HTTPException(
status_code=404,
detail=f"No evolution state for card {card_id}",
)
return _build_card_state_response(state)

View File

@ -135,15 +135,15 @@ async def get_teams(
if has_guide is not None:
# Use boolean comparison (PostgreSQL-compatible)
if not has_guide:
all_teams = all_teams.where(Team.has_guide == False)
all_teams = all_teams.where(Team.has_guide == False) # noqa: E712
else:
all_teams = all_teams.where(Team.has_guide == True)
all_teams = all_teams.where(Team.has_guide == True) # noqa: E712
if is_ai is not None:
if not is_ai:
all_teams = all_teams.where(Team.is_ai == False)
all_teams = all_teams.where(Team.is_ai == False) # noqa: E712
else:
all_teams = all_teams.where(Team.is_ai == True)
all_teams = all_teams.where(Team.is_ai == True) # noqa: E712
if event_id is not None:
all_teams = all_teams.where(Team.event_id == event_id)
@ -254,24 +254,24 @@ def get_scouting_dfs(allowed_players, position: str):
if position in ["LF", "CF", "RF"]:
series_list.append(
pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]), name=f"Arm OF"
dict([(x.player.player_id, x.arm) for x in positions]), name="Arm OF"
)
)
elif position == "C":
series_list.append(
pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]), name=f"Arm C"
dict([(x.player.player_id, x.arm) for x in positions]), name="Arm C"
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.pb) for x in positions]), name=f"PB C"
dict([(x.player.player_id, x.pb) for x in positions]), name="PB C"
)
)
series_list.append(
pd.Series(
dict([(x.player.player_id, x.overthrow) for x in positions]),
name=f"Throw C",
name="Throw C",
)
)
@ -314,11 +314,11 @@ async def get_team_lineup(
all_players = Player.select().where(Player.franchise == this_team.sname)
if difficulty_name == "exhibition":
logging.info(f"pulling an exhibition lineup")
logging.info("pulling an exhibition lineup")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail=f"Must provide at least one cardset_id for exhibition lineups",
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
@ -404,17 +404,17 @@ async def get_team_lineup(
# if x.battingcard.player.p_name not in player_names:
# starting_nine['DH'] = x.battingcard.player
# break
logging.debug(f"Searching for a DH!")
logging.debug("Searching for a DH!")
dh_query = legal_players.order_by(Player.cost.desc())
for x in dh_query:
logging.debug(f"checking {x.p_name} for {position}")
if x.p_name not in player_names and "P" not in x.pos_1:
logging.debug(f"adding!")
logging.debug("adding!")
starting_nine["DH"]["player"] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError as e:
logging.debug(f"Could not find batting lines")
except AttributeError:
logging.debug("Could not find batting lines")
else:
# starting_nine[position]['vl'] = vl
# starting_nine[position]['vr'] = vr
@ -429,12 +429,12 @@ async def get_team_lineup(
for x in dh_query:
logging.debug(f"checking {x.p_name} for {position}")
if x.p_name not in player_names:
logging.debug(f"adding!")
logging.debug("adding!")
starting_nine["DH"]["player"] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError as e:
logging.debug(f"Could not find batting lines")
except AttributeError:
logging.debug("Could not find batting lines")
else:
vl, vr, total_ops = get_bratings(x.player_id)
starting_nine[position]["vl"] = vl["obp"] + vl["slg"]
@ -464,7 +464,7 @@ async def get_team_lineup(
x.player.p_name not in player_names
and x.player.p_name.lower() != pitcher_name
):
logging.debug(f"adding!")
logging.debug("adding!")
starting_nine[position]["player"] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]["vl"] = vl
@ -542,7 +542,7 @@ async def get_team_lineup(
x.player.p_name not in player_names
and x.player.p_name.lower() != pitcher_name
):
logging.debug(f"adding!")
logging.debug("adding!")
starting_nine[position]["player"] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]["vl"] = vl["obp"] + vl["slg"]
@ -649,11 +649,11 @@ async def get_team_sp(
all_players = Player.select().where(Player.franchise == this_team.sname)
if difficulty_name == "exhibition":
logging.info(f"pulling an exhibition lineup")
logging.info("pulling an exhibition lineup")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail=f"Must provide at least one cardset_id for exhibition lineups",
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
@ -778,11 +778,11 @@ async def get_team_rp(
)
if difficulty_name == "exhibition":
logging.info(f"pulling an exhibition RP")
logging.info("pulling an exhibition RP")
if cardset_id is None:
raise HTTPException(
status_code=400,
detail=f"Must provide at least one cardset_id for exhibition lineups",
detail="Must provide at least one cardset_id for exhibition lineups",
)
legal_players = all_players.where(Player.cardset_id << cardset_id)
@ -934,7 +934,7 @@ async def get_team_rp(
)
return this_player
logging.info(f"Falling to last chance pitcher")
logging.info("Falling to last chance pitcher")
all_relievers = sort_pitchers(
PitchingCard.select()
.join(Player)
@ -957,7 +957,7 @@ async def get_team_record(team_id: int, season: int):
all_games = StratGame.select().where(
((StratGame.away_team_id == team_id) | (StratGame.home_team_id == team_id))
& (StratGame.season == season)
& (StratGame.short_game == False)
& (StratGame.short_game == False) # noqa: E712
)
template = {
@ -1049,8 +1049,6 @@ async def team_buy_players(team_id: int, ids: str, ts: str):
detail=f"You are not authorized to buy {this_team.abbrev} cards. This event has been logged.",
)
last_card = Card.select(Card.id).order_by(-Card.id).limit(1)
lc_id = last_card[0].id
all_ids = ids.split(",")
conf_message = ""
@ -1098,7 +1096,7 @@ async def team_buy_players(team_id: int, ids: str, ts: str):
if this_player.rarity.value >= 2:
new_notif = Notification(
created=datetime.now(),
title=f"Price Change",
title="Price Change",
desc="Modified by buying and selling",
field_name=f"{this_player.description} "
f"{this_player.p_name if this_player.p_name not in this_player.description else ''}",
@ -1242,7 +1240,7 @@ async def team_sell_cards(team_id: int, ids: str, ts: str):
if this_player.rarity.value >= 2:
new_notif = Notification(
created=datetime.now(),
title=f"Price Change",
title="Price Change",
desc="Modified by buying and selling",
field_name=f"{this_player.description} "
f"{this_player.p_name if this_player.p_name not in this_player.description else ''}",
@ -1293,7 +1291,7 @@ async def get_team_cards(team_id, csv: Optional[bool] = True):
.order_by(-Card.player.rarity.value, Card.player.p_name)
)
if all_cards.count() == 0:
raise HTTPException(status_code=404, detail=f"No cards found")
raise HTTPException(status_code=404, detail="No cards found")
card_vals = [model_to_dict(x) for x in all_cards]
@ -1391,7 +1389,7 @@ async def team_season_update(new_season: int, token: str = Depends(oauth2_scheme
detail="You are not authorized to post teams. This event has been logged.",
)
r_query = Team.update(
Team.update(
ranking=1000, season=new_season, wallet=Team.wallet + 250, has_guide=False
).execute()
current = Current.latest()
@ -1531,3 +1529,57 @@ async def delete_team(team_id, token: str = Depends(oauth2_scheme)):
raise HTTPException(status_code=200, detail=f"Team {team_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f"Team {team_id} was not deleted")
@router.get("/{team_id}/evolutions")
async def list_team_evolutions(
team_id: int,
card_type: Optional[str] = Query(default=None),
tier: Optional[int] = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=10, ge=1, le=100),
token: str = Depends(oauth2_scheme),
):
"""List all EvolutionCardState rows for a team, with optional filters.
Joins EvolutionCardState to EvolutionTrack so that card_type filtering
works without a second query. Results are paginated via page/per_page
(1-indexed pages); items are ordered by player_id for stable ordering.
Query parameters:
card_type -- filter to states whose track.card_type matches (e.g. 'batter', 'sp')
tier -- filter to states at a specific current_tier (0-4)
page -- 1-indexed page number (default 1)
per_page -- items per page (default 10, max 100)
Response shape:
{"count": N, "items": [card_state_with_threshold_context, ...]}
Each item in 'items' has the same shape as GET /evolution/cards/{card_id}.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import EvolutionCardState, EvolutionTrack
from ..routers_v2.evolution import _build_card_state_response
query = (
EvolutionCardState.select(EvolutionCardState, EvolutionTrack)
.join(EvolutionTrack)
.where(EvolutionCardState.team == team_id)
.order_by(EvolutionCardState.player_id)
)
if card_type is not None:
query = query.where(EvolutionTrack.card_type == card_type)
if tier is not None:
query = query.where(EvolutionCardState.current_tier == tier)
total = query.count()
offset = (page - 1) * per_page
page_query = query.offset(offset).limit(per_page)
items = [_build_card_state_response(state) for state in page_query]
return {"count": total, "items": items}

View File

@ -0,0 +1,609 @@
"""Integration tests for the evolution card state API endpoints (WP-07).
Tests cover:
GET /api/v2/teams/{team_id}/evolutions
GET /api/v2/evolution/cards/{card_id}
All tests require a live PostgreSQL connection (POSTGRES_HOST env var) and
assume the evolution schema migration (WP-04) has already been applied.
Tests auto-skip when POSTGRES_HOST is not set.
Test data is inserted via psycopg2 before each module fixture runs and
cleaned up in teardown so the tests are repeatable. ON CONFLICT / CASCADE
clauses keep the table clean even if a previous run did not complete teardown.
Object graph built by fixtures
-------------------------------
rarity_row -- a seeded rarity row
cardset_row -- a seeded cardset row
player_row -- a seeded player row (FK: rarity, cardset)
team_row -- a seeded team row
track_row -- a seeded evolution_track row (batter)
card_row -- a seeded card row (FK: player, team, pack, pack_type, cardset)
state_row -- a seeded evolution_card_state row (FK: player, team, track)
Test matrix
-----------
test_list_team_evolutions -- baseline: returns count + items for a team
test_list_filter_by_card_type -- card_type query param filters by track.card_type
test_list_filter_by_tier -- tier query param filters by current_tier
test_list_pagination -- page/per_page params slice results correctly
test_get_card_state_shape -- single card returns all required response fields
test_get_card_state_next_threshold -- next_threshold is the threshold for tier above current
test_get_card_id_resolves_player -- card_id joins Card -> Player/Team -> EvolutionCardState
test_get_card_404_no_state -- card with no EvolutionCardState returns 404
test_duplicate_cards_share_state -- two cards same player+team return the same state row
test_auth_required -- missing token returns 401 on both endpoints
"""
import os
import pytest
from fastapi.testclient import TestClient
POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif(
not POSTGRES_HOST, reason="POSTGRES_HOST not set — integration tests skipped"
)
AUTH_HEADER = {"Authorization": f"Bearer {os.environ.get('API_TOKEN', 'test-token')}"}
# ---------------------------------------------------------------------------
# Shared fixtures: seed and clean up the full object graph
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def seeded_data(pg_conn):
"""Insert all rows needed for state API tests; delete them after the module.
Returns a dict with the integer IDs of every inserted row so individual
test functions can reference them by key.
Insertion order respects FK dependencies:
rarity -> cardset -> player
pack_type (needs cardset) -> pack (needs team + pack_type) -> card
evolution_track -> evolution_card_state
"""
cur = pg_conn.cursor()
# Rarity
cur.execute(
"""
INSERT INTO rarity (value, name, color)
VALUES (99, 'WP07TestRarity', '#123456')
ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value
RETURNING id
"""
)
rarity_id = cur.fetchone()[0]
# Cardset
cur.execute(
"""
INSERT INTO cardset (name, description, total_cards)
VALUES ('WP07 Test Set', 'evo state api tests', 1)
ON CONFLICT (name) DO UPDATE SET description = EXCLUDED.description
RETURNING id
"""
)
cardset_id = cur.fetchone()[0]
# Player 1 (batter)
cur.execute(
"""
INSERT INTO player (p_name, rarity_id, cardset_id, set_num, pos_1,
image, mlbclub, franchise, description)
VALUES ('WP07 Batter', %s, %s, 901, '1B',
'https://example.com/wp07_b.png', 'TST', 'TST', 'wp07 test batter')
RETURNING player_id
""",
(rarity_id, cardset_id),
)
player_id = cur.fetchone()[0]
# Player 2 (sp) for cross-card_type filter test
cur.execute(
"""
INSERT INTO player (p_name, rarity_id, cardset_id, set_num, pos_1,
image, mlbclub, franchise, description)
VALUES ('WP07 Pitcher', %s, %s, 902, 'SP',
'https://example.com/wp07_p.png', 'TST', 'TST', 'wp07 test pitcher')
RETURNING player_id
""",
(rarity_id, cardset_id),
)
player2_id = cur.fetchone()[0]
# Team
cur.execute(
"""
INSERT INTO team (abbrev, sname, lname, gmid, gmname, gsheet,
wallet, team_value, collection_value, season, is_ai)
VALUES ('WP7', 'WP07', 'WP07 Test Team', 700000001, 'wp07user',
'https://docs.google.com/wp07', 0, 0, 0, 11, false)
RETURNING id
"""
)
team_id = cur.fetchone()[0]
# Evolution tracks
cur.execute(
"""
INSERT INTO evolution_track (name, card_type, formula,
t1_threshold, t2_threshold,
t3_threshold, t4_threshold)
VALUES ('WP07 Batter Track', 'batter', 'pa + tb * 2', 37, 149, 448, 896)
ON CONFLICT (name) DO UPDATE SET card_type = EXCLUDED.card_type
RETURNING id
"""
)
batter_track_id = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO evolution_track (name, card_type, formula,
t1_threshold, t2_threshold,
t3_threshold, t4_threshold)
VALUES ('WP07 SP Track', 'sp', 'ip + k', 10, 40, 120, 240)
ON CONFLICT (name) DO UPDATE SET card_type = EXCLUDED.card_type
RETURNING id
"""
)
sp_track_id = cur.fetchone()[0]
# Pack type + pack (needed as FK parent for Card)
cur.execute(
"""
INSERT INTO pack_type (name, cost, card_count, cardset_id)
VALUES ('WP07 Pack Type', 100, 5, %s)
RETURNING id
""",
(cardset_id,),
)
pack_type_id = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO pack (team_id, pack_type_id)
VALUES (%s, %s)
RETURNING id
""",
(team_id, pack_type_id),
)
pack_id = cur.fetchone()[0]
# Card linking batter player to team
cur.execute(
"""
INSERT INTO card (player_id, team_id, pack_id, value)
VALUES (%s, %s, %s, 0)
RETURNING id
""",
(player_id, team_id, pack_id),
)
card_id = cur.fetchone()[0]
# Second card for same player+team (shared-state test)
cur.execute(
"""
INSERT INTO pack (team_id, pack_type_id)
VALUES (%s, %s)
RETURNING id
""",
(team_id, pack_type_id),
)
pack2_id = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO card (player_id, team_id, pack_id, value)
VALUES (%s, %s, %s, 0)
RETURNING id
""",
(player_id, team_id, pack2_id),
)
card2_id = cur.fetchone()[0]
# Card with NO state (404 test)
cur.execute(
"""
INSERT INTO pack (team_id, pack_type_id)
VALUES (%s, %s)
RETURNING id
""",
(team_id, pack_type_id),
)
pack3_id = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO card (player_id, team_id, pack_id, value)
VALUES (%s, %s, %s, 0)
RETURNING id
""",
(player2_id, team_id, pack3_id),
)
card_no_state_id = cur.fetchone()[0]
# Evolution card states
# Batter player at tier 1, value 87.5
cur.execute(
"""
INSERT INTO evolution_card_state
(player_id, team_id, track_id, current_tier, current_value,
fully_evolved, last_evaluated_at)
VALUES (%s, %s, %s, 1, 87.5, false, '2026-03-12T14:00:00Z')
RETURNING id
""",
(player_id, team_id, batter_track_id),
)
state_id = cur.fetchone()[0]
pg_conn.commit()
yield {
"rarity_id": rarity_id,
"cardset_id": cardset_id,
"player_id": player_id,
"player2_id": player2_id,
"team_id": team_id,
"batter_track_id": batter_track_id,
"sp_track_id": sp_track_id,
"pack_type_id": pack_type_id,
"card_id": card_id,
"card2_id": card2_id,
"card_no_state_id": card_no_state_id,
"state_id": state_id,
}
# Teardown: delete in reverse FK order
cur.execute("DELETE FROM evolution_card_state WHERE id = %s", (state_id,))
cur.execute(
"DELETE FROM card WHERE id = ANY(%s)",
([card_id, card2_id, card_no_state_id],),
)
cur.execute("DELETE FROM pack WHERE id = ANY(%s)", ([pack_id, pack2_id, pack3_id],))
cur.execute("DELETE FROM pack_type WHERE id = %s", (pack_type_id,))
cur.execute(
"DELETE FROM evolution_track WHERE id = ANY(%s)",
([batter_track_id, sp_track_id],),
)
cur.execute(
"DELETE FROM player WHERE player_id = ANY(%s)", ([player_id, player2_id],)
)
cur.execute("DELETE FROM team WHERE id = %s", (team_id,))
cur.execute("DELETE FROM cardset WHERE id = %s", (cardset_id,))
cur.execute("DELETE FROM rarity WHERE id = %s", (rarity_id,))
pg_conn.commit()
@pytest.fixture(scope="module")
def client():
"""FastAPI TestClient backed by the real PostgreSQL database."""
from app.main import app
with TestClient(app) as c:
yield c
# ---------------------------------------------------------------------------
# Tests: GET /api/v2/teams/{team_id}/evolutions
# ---------------------------------------------------------------------------
@_skip_no_pg
def test_list_team_evolutions(client, seeded_data):
"""GET /teams/{id}/evolutions returns count=1 and one item for the seeded state.
Verifies the basic list response shape: a dict with 'count' and 'items',
and that the single item contains player_id, team_id, and current_tier.
"""
team_id = seeded_data["team_id"]
resp = client.get(f"/api/v2/teams/{team_id}/evolutions", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 1
assert len(data["items"]) == 1
item = data["items"][0]
assert item["player_id"] == seeded_data["player_id"]
assert item["team_id"] == team_id
assert item["current_tier"] == 1
@_skip_no_pg
def test_list_filter_by_card_type(client, seeded_data, pg_conn):
"""card_type filter includes states whose track.card_type matches and excludes others.
Seeds a second evolution_card_state for player2 (sp track) then queries
card_type=batter (returns 1) and card_type=sp (returns 1).
Verifies the JOIN to evolution_track and the WHERE predicate on card_type.
"""
cur = pg_conn.cursor()
# Add a state for the sp player so we have two types in this team
cur.execute(
"""
INSERT INTO evolution_card_state
(player_id, team_id, track_id, current_tier, current_value, fully_evolved)
VALUES (%s, %s, %s, 0, 0.0, false)
RETURNING id
""",
(seeded_data["player2_id"], seeded_data["team_id"], seeded_data["sp_track_id"]),
)
sp_state_id = cur.fetchone()[0]
pg_conn.commit()
try:
team_id = seeded_data["team_id"]
resp_batter = client.get(
f"/api/v2/teams/{team_id}/evolutions?card_type=batter", headers=AUTH_HEADER
)
assert resp_batter.status_code == 200
batter_data = resp_batter.json()
assert batter_data["count"] == 1
assert batter_data["items"][0]["player_id"] == seeded_data["player_id"]
resp_sp = client.get(
f"/api/v2/teams/{team_id}/evolutions?card_type=sp", headers=AUTH_HEADER
)
assert resp_sp.status_code == 200
sp_data = resp_sp.json()
assert sp_data["count"] == 1
assert sp_data["items"][0]["player_id"] == seeded_data["player2_id"]
finally:
cur.execute("DELETE FROM evolution_card_state WHERE id = %s", (sp_state_id,))
pg_conn.commit()
@_skip_no_pg
def test_list_filter_by_tier(client, seeded_data, pg_conn):
"""tier filter includes only states at the specified current_tier.
The base fixture has player1 at tier=1. This test temporarily advances
it to tier=2, then queries tier=1 (should return 0) and tier=2 (should
return 1). Restores to tier=1 after assertions.
"""
cur = pg_conn.cursor()
# Advance to tier 2
cur.execute(
"UPDATE evolution_card_state SET current_tier = 2 WHERE id = %s",
(seeded_data["state_id"],),
)
pg_conn.commit()
try:
team_id = seeded_data["team_id"]
resp_t1 = client.get(
f"/api/v2/teams/{team_id}/evolutions?tier=1", headers=AUTH_HEADER
)
assert resp_t1.status_code == 200
assert resp_t1.json()["count"] == 0
resp_t2 = client.get(
f"/api/v2/teams/{team_id}/evolutions?tier=2", headers=AUTH_HEADER
)
assert resp_t2.status_code == 200
t2_data = resp_t2.json()
assert t2_data["count"] == 1
assert t2_data["items"][0]["current_tier"] == 2
finally:
cur.execute(
"UPDATE evolution_card_state SET current_tier = 1 WHERE id = %s",
(seeded_data["state_id"],),
)
pg_conn.commit()
@_skip_no_pg
def test_list_pagination(client, seeded_data, pg_conn):
"""page/per_page params slice the full result set correctly.
Temporarily inserts a second state (for player2 on the same team) so
the list has 2 items. With per_page=1, page=1 returns item 1 and
page=2 returns item 2; they must be different players.
"""
cur = pg_conn.cursor()
cur.execute(
"""
INSERT INTO evolution_card_state
(player_id, team_id, track_id, current_tier, current_value, fully_evolved)
VALUES (%s, %s, %s, 0, 0.0, false)
RETURNING id
""",
(
seeded_data["player2_id"],
seeded_data["team_id"],
seeded_data["batter_track_id"],
),
)
extra_state_id = cur.fetchone()[0]
pg_conn.commit()
try:
team_id = seeded_data["team_id"]
resp1 = client.get(
f"/api/v2/teams/{team_id}/evolutions?page=1&per_page=1", headers=AUTH_HEADER
)
assert resp1.status_code == 200
data1 = resp1.json()
assert len(data1["items"]) == 1
resp2 = client.get(
f"/api/v2/teams/{team_id}/evolutions?page=2&per_page=1", headers=AUTH_HEADER
)
assert resp2.status_code == 200
data2 = resp2.json()
assert len(data2["items"]) == 1
assert data1["items"][0]["player_id"] != data2["items"][0]["player_id"]
finally:
cur.execute("DELETE FROM evolution_card_state WHERE id = %s", (extra_state_id,))
pg_conn.commit()
# ---------------------------------------------------------------------------
# Tests: GET /api/v2/evolution/cards/{card_id}
# ---------------------------------------------------------------------------
@_skip_no_pg
def test_get_card_state_shape(client, seeded_data):
"""GET /evolution/cards/{card_id} returns all required fields.
Verifies the full response envelope:
player_id, team_id, current_tier, current_value, fully_evolved,
last_evaluated_at, next_threshold, and a nested 'track' dict
with id, name, card_type, formula, and t1-t4 thresholds.
"""
card_id = seeded_data["card_id"]
resp = client.get(f"/api/v2/evolution/cards/{card_id}", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["player_id"] == seeded_data["player_id"]
assert data["team_id"] == seeded_data["team_id"]
assert data["current_tier"] == 1
assert data["current_value"] == 87.5
assert data["fully_evolved"] is False
t = data["track"]
assert t["id"] == seeded_data["batter_track_id"]
assert t["name"] == "WP07 Batter Track"
assert t["card_type"] == "batter"
assert t["formula"] == "pa + tb * 2"
assert t["t1_threshold"] == 37
assert t["t2_threshold"] == 149
assert t["t3_threshold"] == 448
assert t["t4_threshold"] == 896
# tier=1 -> next threshold is t2_threshold
assert data["next_threshold"] == 149
@_skip_no_pg
def test_get_card_state_next_threshold(client, seeded_data, pg_conn):
"""next_threshold reflects the threshold for the tier immediately above current.
Tier mapping:
0 -> t1_threshold (37)
1 -> t2_threshold (149)
2 -> t3_threshold (448)
3 -> t4_threshold (896)
4 -> null (fully evolved)
This test advances the state to tier=2, confirms next_threshold=448,
then to tier=4 (fully_evolved=True) and confirms next_threshold=null.
Restores original state after assertions.
"""
cur = pg_conn.cursor()
card_id = seeded_data["card_id"]
state_id = seeded_data["state_id"]
# Advance to tier 2
cur.execute(
"UPDATE evolution_card_state SET current_tier = 2 WHERE id = %s", (state_id,)
)
pg_conn.commit()
try:
resp = client.get(f"/api/v2/evolution/cards/{card_id}", headers=AUTH_HEADER)
assert resp.status_code == 200
assert resp.json()["next_threshold"] == 448 # t3_threshold
# Advance to tier 4 (fully evolved)
cur.execute(
"UPDATE evolution_card_state SET current_tier = 4, fully_evolved = true "
"WHERE id = %s",
(state_id,),
)
pg_conn.commit()
resp2 = client.get(f"/api/v2/evolution/cards/{card_id}", headers=AUTH_HEADER)
assert resp2.status_code == 200
assert resp2.json()["next_threshold"] is None
finally:
cur.execute(
"UPDATE evolution_card_state SET current_tier = 1, fully_evolved = false "
"WHERE id = %s",
(state_id,),
)
pg_conn.commit()
@_skip_no_pg
def test_get_card_id_resolves_player(client, seeded_data):
"""card_id is resolved via the Card table to obtain (player_id, team_id).
The endpoint must JOIN Card -> Player + Team to find the EvolutionCardState.
Verifies that card_id correctly maps to the right player's evolution state.
"""
card_id = seeded_data["card_id"]
resp = client.get(f"/api/v2/evolution/cards/{card_id}", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["player_id"] == seeded_data["player_id"]
assert data["team_id"] == seeded_data["team_id"]
@_skip_no_pg
def test_get_card_404_no_state(client, seeded_data):
"""GET /evolution/cards/{card_id} returns 404 when no EvolutionCardState exists.
card_no_state_id is a card row for player2 on the team, but no
evolution_card_state row was created for player2. The endpoint must
return 404, not 500 or an empty response.
"""
card_id = seeded_data["card_no_state_id"]
resp = client.get(f"/api/v2/evolution/cards/{card_id}", headers=AUTH_HEADER)
assert resp.status_code == 404
@_skip_no_pg
def test_duplicate_cards_share_state(client, seeded_data):
"""Two Card rows for the same player+team share one EvolutionCardState.
card_id and card2_id both belong to player_id on team_id. Because the
unique-(player,team) constraint means only one state row can exist, both
card IDs must resolve to the same state data.
"""
card1_id = seeded_data["card_id"]
card2_id = seeded_data["card2_id"]
resp1 = client.get(f"/api/v2/evolution/cards/{card1_id}", headers=AUTH_HEADER)
resp2 = client.get(f"/api/v2/evolution/cards/{card2_id}", headers=AUTH_HEADER)
assert resp1.status_code == 200
assert resp2.status_code == 200
data1 = resp1.json()
data2 = resp2.json()
assert data1["player_id"] == data2["player_id"] == seeded_data["player_id"]
assert data1["current_tier"] == data2["current_tier"] == 1
assert data1["current_value"] == data2["current_value"] == 87.5
# ---------------------------------------------------------------------------
# Auth tests
# ---------------------------------------------------------------------------
@_skip_no_pg
def test_auth_required(client, seeded_data):
"""Both endpoints return 401 when no Bearer token is provided.
Verifies that the valid_token dependency is enforced on:
GET /api/v2/teams/{id}/evolutions
GET /api/v2/evolution/cards/{id}
"""
team_id = seeded_data["team_id"]
card_id = seeded_data["card_id"]
resp_list = client.get(f"/api/v2/teams/{team_id}/evolutions")
assert resp_list.status_code == 401
resp_card = client.get(f"/api/v2/evolution/cards/{card_id}")
assert resp_card.status_code == 401