Merge branch 'main' into issue/142-feat-add-limit-pagination-to-cardpositions-endpoin

This commit is contained in:
cal 2026-03-25 14:52:34 +00:00
commit 3d0c99b183
24 changed files with 1769 additions and 118 deletions

View File

@ -1245,6 +1245,13 @@ refractor_card_state_index = ModelIndex(
) )
RefractorCardState.add_index(refractor_card_state_index) RefractorCardState.add_index(refractor_card_state_index)
refractor_card_state_team_index = ModelIndex(
RefractorCardState,
(RefractorCardState.team,),
unique=False,
)
RefractorCardState.add_index(refractor_card_state_team_index)
class RefractorTierBoost(BaseModel): class RefractorTierBoost(BaseModel):
track = ForeignKeyField(RefractorTrack) track = ForeignKeyField(RefractorTrack)

View File

@ -55,6 +55,7 @@ async def get_awards(
all_awards = all_awards.where(Award.image == image) all_awards = all_awards.where(Award.image == image)
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
total_count = all_awards.count() if not csv else 0
all_awards = all_awards.limit(limit) all_awards = all_awards.limit(limit)
if csv: if csv:
@ -76,7 +77,7 @@ async def get_awards(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {"count": all_awards.count(), "awards": []} return_val = {"count": total_count, "awards": []}
for x in all_awards: for x in all_awards:
return_val["awards"].append(model_to_dict(x)) return_val["awards"].append(model_to_dict(x))

View File

@ -6,14 +6,20 @@ import logging
import pydantic import pydantic
from pandas import DataFrame from pandas import DataFrame
from ..db_engine import db, BattingStat, model_to_dict, fn, Card, Player, Current, DoesNotExist from ..db_engine import (
db,
BattingStat,
model_to_dict,
fn,
Card,
Player,
Current,
DoesNotExist,
)
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA
router = APIRouter( router = APIRouter(prefix="/api/v2/batstats", tags=["Pre-Season 7 Batting Stats"])
prefix='/api/v2/batstats',
tags=['Pre-Season 7 Batting Stats']
)
class BatStat(pydantic.BaseModel): class BatStat(pydantic.BaseModel):
@ -50,7 +56,7 @@ class BatStat(pydantic.BaseModel):
csc: Optional[int] = 0 csc: Optional[int] = 0
week: int week: int
season: int season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000) created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000)
game_id: int game_id: int
@ -63,11 +69,20 @@ class BatStatReturnList(pydantic.BaseModel):
stats: list[BatStat] stats: list[BatStat]
@router.get('', response_model=BatStatReturnList) @router.get("", response_model=BatStatReturnList)
async def get_batstats( async def get_batstats(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None, card_id: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, csv: bool = None, player_id: int = None,
limit: Optional[int] = 100): team_id: int = None,
vs_team_id: int = None,
week: int = None,
season: int = None,
week_start: int = None,
week_end: int = None,
created: int = None,
csv: bool = None,
limit: Optional[int] = 100,
):
all_stats = BattingStat.select().join(Card).join(Player).order_by(BattingStat.id) all_stats = BattingStat.select().join(Card).join(Player).order_by(BattingStat.id)
if season is not None: if season is not None:
@ -100,43 +115,122 @@ async def get_batstats(
# raise HTTPException(status_code=404, detail=f'No batting stats found') # raise HTTPException(status_code=404, detail=f'No batting stats found')
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
total_count = all_stats.count() if not csv else 0
all_stats = all_stats.limit(limit) all_stats = all_stats.limit(limit)
if csv: if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'pos', 'pa', 'ab', 'run', 'hit', 'rbi', 'double', data_list = [
'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b', [
'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', 'week', 'season', 'created', 'game_id', 'roster_num']] "id",
"card_id",
"player_id",
"cardset",
"team",
"vs_team",
"pos",
"pa",
"ab",
"run",
"hit",
"rbi",
"double",
"triple",
"hr",
"bb",
"so",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
"bphr",
"bpfo",
"bp1b",
"bplo",
"xch",
"xhit",
"error",
"pb",
"sbc",
"csc",
"week",
"season",
"created",
"game_id",
"roster_num",
]
]
for line in all_stats: for line in all_stats:
data_list.append( data_list.append(
[ [
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev, line.id,
line.pos, line.pa, line.ab, line.run, line.hit, line.rbi, line.double, line.triple, line.hr, line.card.id,
line.bb, line.so, line.hbp, line.sac, line.ibb, line.gidp, line.sb, line.cs, line.bphr, line.bpfo, line.card.player.player_id,
line.bp1b, line.bplo, line.xch, line.xhit, line.error, line.pb, line.sbc, line.csc, line.week, line.card.player.cardset.name,
line.season, line.created, line.game_id, line.roster_num line.team.abbrev,
line.vs_team.abbrev,
line.pos,
line.pa,
line.ab,
line.run,
line.hit,
line.rbi,
line.double,
line.triple,
line.hr,
line.bb,
line.so,
line.hbp,
line.sac,
line.ibb,
line.gidp,
line.sb,
line.cs,
line.bphr,
line.bpfo,
line.bp1b,
line.bplo,
line.xch,
line.xhit,
line.error,
line.pb,
line.sbc,
line.csc,
line.week,
line.season,
line.created,
line.game_id,
line.roster_num,
] ]
) )
return_val = DataFrame(data_list).to_csv(header=False, index=False) return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {'count': all_stats.count(), 'stats': []} return_val = {"count": total_count, "stats": []}
for x in all_stats: for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False)) return_val["stats"].append(model_to_dict(x, recurse=False))
return return_val return return_val
@router.get('/player/{player_id}', response_model=BatStat) @router.get("/player/{player_id}", response_model=BatStat)
async def get_player_stats( async def get_player_stats(
player_id: int, team_id: int = None, vs_team_id: int = None, week_start: int = None, week_end: int = None, player_id: int,
csv: bool = None): team_id: int = None,
all_stats = (BattingStat vs_team_id: int = None,
.select(fn.COUNT(BattingStat.created).alias('game_count')) week_start: int = None,
.join(Card) week_end: int = None,
.group_by(BattingStat.card) csv: bool = None,
.where(BattingStat.card.player == player_id)).scalar() ):
all_stats = (
BattingStat.select(fn.COUNT(BattingStat.created).alias("game_count"))
.join(Card)
.group_by(BattingStat.card)
.where(BattingStat.card.player == player_id)
).scalar()
if team_id is not None: if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id) all_stats = all_stats.where(BattingStat.team_id == team_id)
@ -150,37 +244,82 @@ async def get_player_stats(
if csv: if csv:
data_list = [ data_list = [
[ [
'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', "pa",
'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', "ab",
],[ "run",
all_stats.pa_sum, all_stats.ab_sum, all_stats.run, all_stats.hit_sum, all_stats.rbi_sum, "hit",
all_stats.double_sum, all_stats.triple_sum, all_stats.hr_sum, all_stats.bb_sum, all_stats.so_sum, "rbi",
all_stats.hbp_sum, all_stats.sac, all_stats.ibb_sum, all_stats.gidp_sum, all_stats.sb_sum, "double",
all_stats.cs_sum, all_stats.bphr_sum, all_stats.bpfo_sum, all_stats.bp1b_sum, all_stats.bplo_sum, "triple",
all_stats.xch, all_stats.xhit_sum, all_stats.error_sum, all_stats.pb_sum, all_stats.sbc_sum, "hr",
all_stats.csc_sum "bb",
] "so",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
"bphr",
"bpfo",
"bp1b",
"bplo",
"xch",
"xhit",
"error",
"pb",
"sbc",
"csc",
],
[
all_stats.pa_sum,
all_stats.ab_sum,
all_stats.run,
all_stats.hit_sum,
all_stats.rbi_sum,
all_stats.double_sum,
all_stats.triple_sum,
all_stats.hr_sum,
all_stats.bb_sum,
all_stats.so_sum,
all_stats.hbp_sum,
all_stats.sac,
all_stats.ibb_sum,
all_stats.gidp_sum,
all_stats.sb_sum,
all_stats.cs_sum,
all_stats.bphr_sum,
all_stats.bpfo_sum,
all_stats.bp1b_sum,
all_stats.bplo_sum,
all_stats.xch,
all_stats.xhit_sum,
all_stats.error_sum,
all_stats.pb_sum,
all_stats.sbc_sum,
all_stats.csc_sum,
],
] ]
return_val = DataFrame(data_list).to_csv(header=False, index=False) return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
logging.debug(f'stat pull query: {all_stats}\n') logging.debug(f"stat pull query: {all_stats}\n")
# logging.debug(f'result 0: {all_stats[0]}\n') # logging.debug(f'result 0: {all_stats[0]}\n')
for x in all_stats: for x in all_stats:
logging.debug(f'this_line: {model_to_dict(x)}') logging.debug(f"this_line: {model_to_dict(x)}")
return_val = model_to_dict(all_stats[0]) return_val = model_to_dict(all_stats[0])
return return_val return return_val
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA) @router.post("", include_in_schema=PRIVATE_IN_SCHEMA)
async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_scheme)): async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token): if not valid_token(token):
logging.warning('Bad Token: [REDACTED]') logging.warning("Bad Token: [REDACTED]")
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail='You are not authorized to post stats. This event has been logged.' detail="You are not authorized to post stats. This event has been logged.",
) )
new_stats = [] new_stats = []
@ -219,36 +358,40 @@ async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_sch
csc=x.csc, csc=x.csc,
week=x.week, week=x.week,
season=x.season, season=x.season,
created=datetime.fromtimestamp(x.created / 1000) if x.created else datetime.now(), created=datetime.fromtimestamp(x.created / 1000)
game_id=x.game_id if x.created
else datetime.now(),
game_id=x.game_id,
) )
new_stats.append(this_stat) new_stats.append(this_stat)
with db.atomic(): with db.atomic():
BattingStat.bulk_create(new_stats, batch_size=15) BattingStat.bulk_create(new_stats, batch_size=15)
raise HTTPException(status_code=200, detail=f'{len(new_stats)} batting lines have been added') raise HTTPException(
status_code=200, detail=f"{len(new_stats)} batting lines have been added"
)
@router.delete('/{stat_id}', include_in_schema=PRIVATE_IN_SCHEMA) @router.delete("/{stat_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)): async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token): if not valid_token(token):
logging.warning('Bad Token: [REDACTED]') logging.warning("Bad Token: [REDACTED]")
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail='You are not authorized to delete stats. This event has been logged.' detail="You are not authorized to delete stats. This event has been logged.",
) )
try: try:
this_stat = BattingStat.get_by_id(stat_id) this_stat = BattingStat.get_by_id(stat_id)
except DoesNotExist: except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}') raise HTTPException(status_code=404, detail=f"No stat found with id {stat_id}")
count = this_stat.delete_instance() count = this_stat.delete_instance()
if count == 1: if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted') raise HTTPException(status_code=200, detail=f"Stat {stat_id} has been deleted")
else: else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted') raise HTTPException(status_code=500, detail=f"Stat {stat_id} was not deleted")
# @app.get('/api/v1/plays/batting') # @app.get('/api/v1/plays/batting')
@ -453,4 +596,3 @@ async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)):
# } # }
# db.close() # db.close()
# return return_stats # return return_stats

View File

@ -179,6 +179,7 @@ async def get_card_ratings(
) )
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards) all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
total_count = all_ratings.count() if not csv else 0
all_ratings = all_ratings.limit(max(0, min(limit, 500))) all_ratings = all_ratings.limit(max(0, min(limit, 500)))
if csv: if csv:
@ -195,7 +196,7 @@ async def get_card_ratings(
else: else:
return_val = { return_val = {
"count": all_ratings.count(), "count": total_count,
"ratings": [ "ratings": [
model_to_dict(x, recurse=not short_output) for x in all_ratings model_to_dict(x, recurse=not short_output) for x in all_ratings
], ],

View File

@ -8,10 +8,7 @@ from ..db_engine import Event, model_to_dict, fn, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token from ..dependencies import oauth2_scheme, valid_token
router = APIRouter( router = APIRouter(prefix="/api/v2/events", tags=["events"])
prefix='/api/v2/events',
tags=['events']
)
class EventModel(pydantic.BaseModel): class EventModel(pydantic.BaseModel):
@ -23,78 +20,102 @@ class EventModel(pydantic.BaseModel):
active: Optional[bool] = False active: Optional[bool] = False
@router.get('') @router.get("")
async def v1_events_get( async def v1_events_get(
name: Optional[str] = None, in_desc: Optional[str] = None, active: Optional[bool] = None, name: Optional[str] = None,
csv: Optional[bool] = None, limit: Optional[int] = 100): in_desc: Optional[str] = None,
active: Optional[bool] = None,
csv: Optional[bool] = None,
limit: Optional[int] = 100,
):
all_events = Event.select().order_by(Event.id) all_events = Event.select().order_by(Event.id)
if name is not None: if name is not None:
all_events = all_events.where(fn.Lower(Event.name) == name.lower()) all_events = all_events.where(fn.Lower(Event.name) == name.lower())
if in_desc is not None: if in_desc is not None:
all_events = all_events.where( all_events = all_events.where(
(fn.Lower(Event.short_desc).contains(in_desc.lower())) | (fn.Lower(Event.short_desc).contains(in_desc.lower()))
(fn.Lower(Event.long_desc).contains(in_desc.lower())) | (fn.Lower(Event.long_desc).contains(in_desc.lower()))
) )
if active is not None: if active is not None:
all_events = all_events.where(Event.active == active) all_events = all_events.where(Event.active == active)
total_count = all_events.count() if not csv else 0
all_events = all_events.limit(max(0, min(limit, 500))) all_events = all_events.limit(max(0, min(limit, 500)))
if csv: if csv:
data_list = [['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active']] data_list = [
["id", "name", "short_desc", "long_desc", "url", "thumbnail", "active"]
]
for line in all_events: for line in all_events:
data_list.append( data_list.append(
[ [
line.id, line.name, line.short_desc, line.long_desc, line.url, line.thumbnail, line.active line.id,
line.name,
line.short_desc,
line.long_desc,
line.url,
line.thumbnail,
line.active,
] ]
) )
return_val = DataFrame(data_list).to_csv(header=False, index=False) return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {'count': all_events.count(), 'events': []} return_val = {"count": total_count, "events": []}
for x in all_events: for x in all_events:
return_val['events'].append(model_to_dict(x)) return_val["events"].append(model_to_dict(x))
return return_val return return_val
@router.get('/{event_id}') @router.get("/{event_id}")
async def v1_events_get_one(event_id, csv: Optional[bool] = False): async def v1_events_get_one(event_id, csv: Optional[bool] = False):
try: try:
this_event = Event.get_by_id(event_id) this_event = Event.get_by_id(event_id)
except DoesNotExist: except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
if csv: if csv:
data_list = [ data_list = [
['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active'], ["id", "name", "short_desc", "long_desc", "url", "thumbnail", "active"],
[this_event.id, this_event.name, this_event.short_desc, this_event.long_desc, this_event.url, [
this_event.thumbnail, this_event.active] this_event.id,
this_event.name,
this_event.short_desc,
this_event.long_desc,
this_event.url,
this_event.thumbnail,
this_event.active,
],
] ]
return_val = DataFrame(data_list).to_csv(header=False, index=False) return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
return_val = model_to_dict(this_event) return_val = model_to_dict(this_event)
return return_val return return_val
@router.post('') @router.post("")
async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme)): async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token): if not valid_token(token):
logging.warning('Bad Token: [REDACTED]') logging.warning("Bad Token: [REDACTED]")
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail='You are not authorized to post events. This event has been logged.' detail="You are not authorized to post events. This event has been logged.",
) )
dupe_event = Event.get_or_none(Event.name == event.name) dupe_event = Event.get_or_none(Event.name == event.name)
if dupe_event: if dupe_event:
raise HTTPException(status_code=400, detail=f'There is already an event using {event.name}') raise HTTPException(
status_code=400, detail=f"There is already an event using {event.name}"
)
this_event = Event( this_event = Event(
name=event.name, name=event.name,
@ -102,7 +123,7 @@ async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme))
long_desc=event.long_desc, long_desc=event.long_desc,
url=event.url, url=event.url,
thumbnail=event.thumbnail, thumbnail=event.thumbnail,
active=event.active active=event.active,
) )
saved = this_event.save() saved = this_event.save()
@ -112,25 +133,33 @@ async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme))
else: else:
raise HTTPException( raise HTTPException(
status_code=418, status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset' detail="Well slap my ass and call me a teapot; I could not save that cardset",
) )
@router.patch('/{event_id}') @router.patch("/{event_id}")
async def v1_events_patch( async def v1_events_patch(
event_id, name: Optional[str] = None, short_desc: Optional[str] = None, long_desc: Optional[str] = None, event_id,
url: Optional[str] = None, thumbnail: Optional[str] = None, active: Optional[bool] = None, name: Optional[str] = None,
token: str = Depends(oauth2_scheme)): short_desc: Optional[str] = None,
long_desc: Optional[str] = None,
url: Optional[str] = None,
thumbnail: Optional[str] = None,
active: Optional[bool] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token): if not valid_token(token):
logging.warning('Bad Token: [REDACTED]') logging.warning("Bad Token: [REDACTED]")
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail='You are not authorized to patch events. This event has been logged.' detail="You are not authorized to patch events. This event has been logged.",
) )
try: try:
this_event = Event.get_by_id(event_id) this_event = Event.get_by_id(event_id)
except DoesNotExist: except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
if name is not None: if name is not None:
this_event.name = name this_event.name = name
@ -151,26 +180,30 @@ async def v1_events_patch(
else: else:
raise HTTPException( raise HTTPException(
status_code=418, status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event' detail="Well slap my ass and call me a teapot; I could not save that event",
) )
@router.delete('/{event_id}') @router.delete("/{event_id}")
async def v1_events_delete(event_id, token: str = Depends(oauth2_scheme)): async def v1_events_delete(event_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token): if not valid_token(token):
logging.warning('Bad Token: [REDACTED]') logging.warning("Bad Token: [REDACTED]")
raise HTTPException( raise HTTPException(
status_code=401, status_code=401,
detail='You are not authorized to delete events. This event has been logged.' detail="You are not authorized to delete events. This event has been logged.",
) )
try: try:
this_event = Event.get_by_id(event_id) this_event = Event.get_by_id(event_id)
except DoesNotExist: except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
count = this_event.delete_instance() count = this_event.delete_instance()
if count == 1: if count == 1:
raise HTTPException(status_code=200, detail=f'Event {event_id} has been deleted') raise HTTPException(
status_code=200, detail=f"Event {event_id} has been deleted"
)
else: else:
raise HTTPException(status_code=500, detail=f'Event {event_id} was not deleted') raise HTTPException(status_code=500, detail=f"Event {event_id} was not deleted")

View File

@ -43,6 +43,7 @@ async def v1_gamerewards_get(
all_rewards = all_rewards.where(GameRewards.money == money) all_rewards = all_rewards.where(GameRewards.money == money)
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
total_count = all_rewards.count() if not csv else 0
all_rewards = all_rewards.limit(limit) all_rewards = all_rewards.limit(limit)
if csv: if csv:
@ -61,7 +62,7 @@ async def v1_gamerewards_get(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {"count": all_rewards.count(), "gamerewards": []} return_val = {"count": total_count, "gamerewards": []}
for x in all_rewards: for x in all_rewards:
return_val["gamerewards"].append(model_to_dict(x)) return_val["gamerewards"].append(model_to_dict(x))

View File

@ -48,9 +48,10 @@ async def v1_gauntletreward_get(
all_rewards = all_rewards.order_by(-GauntletReward.loss_max, GauntletReward.win_num) all_rewards = all_rewards.order_by(-GauntletReward.loss_max, GauntletReward.win_num)
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
total_count = all_rewards.count()
all_rewards = all_rewards.limit(limit) all_rewards = all_rewards.limit(limit)
return_val = {"count": all_rewards.count(), "rewards": []} return_val = {"count": total_count, "rewards": []}
for x in all_rewards: for x in all_rewards:
return_val["rewards"].append(model_to_dict(x)) return_val["rewards"].append(model_to_dict(x))

View File

@ -102,6 +102,7 @@ async def get_players(
if offense_col is not None: if offense_col is not None:
all_players = all_players.where(MlbPlayer.offense_col << offense_col) all_players = all_players.where(MlbPlayer.offense_col << offense_col)
total_count = all_players.count() if not csv else 0
all_players = all_players.limit(max(0, min(limit, 500))) all_players = all_players.limit(max(0, min(limit, 500)))
if csv: if csv:
@ -109,7 +110,7 @@ async def get_players(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
return_val = { return_val = {
"count": all_players.count(), "count": total_count,
"players": [model_to_dict(x) for x in all_players], "players": [model_to_dict(x) for x in all_players],
} }
return return_val return return_val

View File

@ -169,6 +169,7 @@ async def get_card_ratings(
) )
all_ratings = all_ratings.where(PitchingCardRatings.pitchingcard << set_cards) all_ratings = all_ratings.where(PitchingCardRatings.pitchingcard << set_cards)
total_count = all_ratings.count() if not csv else 0
all_ratings = all_ratings.limit(max(0, min(limit, 500))) all_ratings = all_ratings.limit(max(0, min(limit, 500)))
if csv: if csv:
@ -177,7 +178,7 @@ async def get_card_ratings(
else: else:
return_val = { return_val = {
"count": all_ratings.count(), "count": total_count,
"ratings": [ "ratings": [
model_to_dict(x, recurse=not short_output) for x in all_ratings model_to_dict(x, recurse=not short_output) for x in all_ratings
], ],

View File

@ -98,6 +98,7 @@ async def get_pit_stats(
if gs is not None: if gs is not None:
all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0) all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0)
total_count = all_stats.count() if not csv else 0
all_stats = all_stats.limit(max(0, min(limit, 500))) all_stats = all_stats.limit(max(0, min(limit, 500)))
# if all_stats.count() == 0: # if all_stats.count() == 0:
@ -177,7 +178,7 @@ async def get_pit_stats(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {"count": all_stats.count(), "stats": []} return_val = {"count": total_count, "stats": []}
for x in all_stats: for x in all_stats:
return_val["stats"].append(model_to_dict(x, recurse=False)) return_val["stats"].append(model_to_dict(x, recurse=False))

View File

@ -21,14 +21,15 @@ _NEXT_THRESHOLD_ATTR = {
} }
def _build_card_state_response(state) -> dict: def _build_card_state_response(state, player_name=None) -> dict:
"""Serialise a RefractorCardState into the standard API response shape. """Serialise a RefractorCardState into the standard API response shape.
Produces a flat dict with player_id and team_id as plain integers, Produces a flat dict with player_id and team_id as plain integers,
a nested 'track' dict with all threshold fields, and a computed a nested 'track' dict with all threshold fields, and computed fields:
'next_threshold' field: - 'next_threshold': threshold for the tier immediately above (None when fully evolved).
- For tiers 0-3: the threshold value for the tier immediately above. - 'progress_pct': current_value / next_threshold * 100, rounded to 1 decimal
- For tier 4 (fully evolved): None. (None when fully evolved or next_threshold is zero).
- 'player_name': included when passed (e.g. from a list join); omitted otherwise.
Uses model_to_dict(recurse=False) internally so FK fields are returned Uses model_to_dict(recurse=False) internally so FK fields are returned
as IDs rather than nested objects, then promotes the needed IDs up to as IDs rather than nested objects, then promotes the needed IDs up to
@ -40,7 +41,11 @@ def _build_card_state_response(state) -> dict:
next_attr = _NEXT_THRESHOLD_ATTR.get(state.current_tier) next_attr = _NEXT_THRESHOLD_ATTR.get(state.current_tier)
next_threshold = getattr(track, next_attr) if next_attr else None next_threshold = getattr(track, next_attr) if next_attr else None
return { progress_pct = None
if next_threshold is not None and next_threshold > 0:
progress_pct = round((state.current_value / next_threshold) * 100, 1)
result = {
"player_id": state.player_id, "player_id": state.player_id,
"team_id": state.team_id, "team_id": state.team_id,
"current_tier": state.current_tier, "current_tier": state.current_tier,
@ -51,8 +56,14 @@ def _build_card_state_response(state) -> dict:
), ),
"track": track_dict, "track": track_dict,
"next_threshold": next_threshold, "next_threshold": next_threshold,
"progress_pct": progress_pct,
} }
if player_name is not None:
result["player_name"] = player_name
return result
@router.get("/tracks") @router.get("/tracks")
async def list_tracks( async def list_tracks(
@ -89,6 +100,118 @@ async def get_track(track_id: int, token: str = Depends(oauth2_scheme)):
return model_to_dict(track, recurse=False) return model_to_dict(track, recurse=False)
@router.get("/cards")
async def list_card_states(
team_id: int = Query(...),
card_type: Optional[str] = Query(default=None),
tier: Optional[int] = Query(default=None, ge=0, le=4),
season: Optional[int] = Query(default=None),
progress: Optional[str] = Query(default=None),
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0),
token: str = Depends(oauth2_scheme),
):
"""List RefractorCardState rows for a team, with optional filters and pagination.
Required:
team_id -- filter to this team's cards; returns empty list if team has no states
Optional filters:
card_type -- one of 'batter', 'sp', 'rp'; filters by RefractorTrack.card_type
tier -- filter by current_tier (0-4)
season -- filter to players who have batting or pitching season stats in that
season (EXISTS subquery against batting/pitching_season_stats)
progress -- 'close' = only cards within 80% of their next tier threshold;
fully evolved cards are always excluded from this filter
Pagination:
limit -- page size (1-100, default 10)
offset -- items to skip (default 0)
Response: {"count": N, "items": [...]}
count is the total matching rows before limit/offset.
Each item includes player_name and progress_pct in addition to the
standard single-card response fields.
Sort order: current_tier DESC, current_value DESC.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import (
RefractorCardState,
RefractorTrack,
Player,
BattingSeasonStats,
PitchingSeasonStats,
fn,
Case,
JOIN,
)
query = (
RefractorCardState.select(RefractorCardState, RefractorTrack, Player)
.join(RefractorTrack)
.switch(RefractorCardState)
.join(
Player, JOIN.LEFT_OUTER, on=(RefractorCardState.player == Player.player_id)
)
.where(RefractorCardState.team == team_id)
.order_by(
RefractorCardState.current_tier.desc(),
RefractorCardState.current_value.desc(),
)
)
if card_type is not None:
query = query.where(RefractorTrack.card_type == card_type)
if tier is not None:
query = query.where(RefractorCardState.current_tier == tier)
if season is not None:
batter_exists = BattingSeasonStats.select().where(
(BattingSeasonStats.player == RefractorCardState.player)
& (BattingSeasonStats.team == RefractorCardState.team)
& (BattingSeasonStats.season == season)
)
pitcher_exists = PitchingSeasonStats.select().where(
(PitchingSeasonStats.player == RefractorCardState.player)
& (PitchingSeasonStats.team == RefractorCardState.team)
& (PitchingSeasonStats.season == season)
)
query = query.where(fn.EXISTS(batter_exists) | fn.EXISTS(pitcher_exists))
if progress == "close":
next_threshold_expr = Case(
RefractorCardState.current_tier,
(
(0, RefractorTrack.t1_threshold),
(1, RefractorTrack.t2_threshold),
(2, RefractorTrack.t3_threshold),
(3, RefractorTrack.t4_threshold),
),
None,
)
query = query.where(
(RefractorCardState.fully_evolved == False) # noqa: E712
& (RefractorCardState.current_value >= next_threshold_expr * 0.8)
)
total = query.count()
items = []
for state in query.offset(offset).limit(limit):
player_name = None
try:
player_name = state.player.p_name
except Exception:
pass
items.append(_build_card_state_response(state, player_name=player_name))
return {"count": total, "items": items}
@router.get("/cards/{card_id}") @router.get("/cards/{card_id}")
async def get_card_state(card_id: int, token: str = Depends(oauth2_scheme)): async def get_card_state(card_id: int, token: str = Depends(oauth2_scheme)):
"""Return the RefractorCardState for a card identified by its Card.id. """Return the RefractorCardState for a card identified by its Card.id.
@ -175,7 +298,7 @@ async def evaluate_game(game_id: int, token: str = Depends(oauth2_scheme)):
logging.warning("Bad Token: [REDACTED]") logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized") raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import RefractorCardState, RefractorTrack, Player, StratPlay from ..db_engine import RefractorCardState, Player, StratPlay
from ..services.refractor_evaluator import evaluate_card from ..services.refractor_evaluator import evaluate_card
plays = list(StratPlay.select().where(StratPlay.game == game_id)) plays = list(StratPlay.select().where(StratPlay.game == game_id))

View File

@ -142,6 +142,7 @@ async def get_results(
all_results = all_results.order_by(Result.id) all_results = all_results.order_by(Result.id)
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
total_count = all_results.count() if not csv else 0
all_results = all_results.limit(limit) all_results = all_results.limit(limit)
# Not functional # Not functional
# if vs_ai is not None: # if vs_ai is not None:
@ -201,7 +202,7 @@ async def get_results(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {"count": all_results.count(), "results": []} return_val = {"count": total_count, "results": []}
for x in all_results: for x in all_results:
return_val["results"].append(model_to_dict(x)) return_val["results"].append(model_to_dict(x))

View File

@ -34,9 +34,6 @@ async def get_rewards(
): ):
all_rewards = Reward.select().order_by(Reward.id) all_rewards = Reward.select().order_by(Reward.id)
if all_rewards.count() == 0:
raise HTTPException(status_code=404, detail="There are no rewards to filter")
if name is not None: if name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower()) all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower())
if team_id is not None: if team_id is not None:
@ -52,7 +49,8 @@ async def get_rewards(
if week is not None: if week is not None:
all_rewards = all_rewards.where(Reward.week == week) all_rewards = all_rewards.where(Reward.week == week)
if all_rewards.count() == 0: total_count = all_rewards.count()
if total_count == 0:
raise HTTPException(status_code=404, detail="No rewards found") raise HTTPException(status_code=404, detail="No rewards found")
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
@ -69,7 +67,7 @@ async def get_rewards(
return Response(content=return_val, media_type="text/csv") return Response(content=return_val, media_type="text/csv")
else: else:
return_val = {"count": all_rewards.count(), "rewards": []} return_val = {"count": total_count, "rewards": []}
for x in all_rewards: for x in all_rewards:
return_val["rewards"].append(model_to_dict(x, recurse=not flat)) return_val["rewards"].append(model_to_dict(x, recurse=not flat))

View File

@ -30,12 +30,14 @@ async def get_scout_claims(
if claimed_by_team_id is not None: if claimed_by_team_id is not None:
query = query.where(ScoutClaim.claimed_by_team_id == claimed_by_team_id) query = query.where(ScoutClaim.claimed_by_team_id == claimed_by_team_id)
total_count = query.count()
if limit is not None: if limit is not None:
limit = max(0, min(limit, 500)) limit = max(0, min(limit, 500))
query = query.limit(limit) query = query.limit(limit)
results = [model_to_dict(x, recurse=False) for x in query] results = [model_to_dict(x, recurse=False) for x in query]
return {"count": len(results), "results": results} return {"count": total_count, "results": results}
@router.get("/{claim_id}") @router.get("/{claim_id}")

File diff suppressed because one or more lines are too long

View File

@ -78,6 +78,7 @@ async def get_games(
StratGame.game_type.contains(f"gauntlet-{gauntlet_id}") StratGame.game_type.contains(f"gauntlet-{gauntlet_id}")
) )
total_count = all_games.count() if not csv else 0
all_games = all_games.limit(max(0, min(limit, 500))) all_games = all_games.limit(max(0, min(limit, 500)))
if csv: if csv:
@ -107,7 +108,7 @@ async def get_games(
return Response(content=output.to_csv(index=False), media_type="text/csv") return Response(content=output.to_csv(index=False), media_type="text/csv")
return_val = { return_val = {
"count": all_games.count(), "count": total_count,
"games": [model_to_dict(x, recurse=not short_output) for x in all_games], "games": [model_to_dict(x, recurse=not short_output) for x in all_games],
} }
return return_val return return_val

View File

@ -0,0 +1,19 @@
-- Migration: Add team_id index to refractor_card_state
-- Date: 2026-03-25
--
-- Adds a non-unique index on refractor_card_state.team_id to support the new
-- GET /api/v2/refractor/cards list endpoint, which filters by team as its
-- primary discriminator and is called on every /refractor status bot command.
--
-- The existing unique index is on (player_id, team_id) with player leading,
-- so team-only queries cannot use it efficiently.
BEGIN;
CREATE INDEX IF NOT EXISTS idx_refractor_card_state_team
ON refractor_card_state (team_id);
COMMIT;
-- Rollback:
-- DROP INDEX IF EXISTS idx_refractor_card_state_team;

View File

@ -204,3 +204,120 @@ def test_tier_t3_boundary():
def test_tier_accepts_namespace_track(): def test_tier_accepts_namespace_track():
"""tier_from_value must work with attribute-style track objects (Peewee models).""" """tier_from_value must work with attribute-style track objects (Peewee models)."""
assert tier_from_value(37, track_ns("batter")) == 1 assert tier_from_value(37, track_ns("batter")) == 1
# ---------------------------------------------------------------------------
# T1-1: Negative singles guard in compute_batter_value
# ---------------------------------------------------------------------------
def test_batter_negative_singles_component():
"""hits=1, doubles=1, triples=1, hr=0 produces singles=-1.
What: The formula computes singles = hits - doubles - triples - hr.
With hits=1, doubles=1, triples=1, hr=0 the result is singles = -1,
which is a physically impossible stat line but valid arithmetic input.
Why: Document the formula's actual behaviour when given an incoherent stat
line so that callers are aware that no clamping or guard exists. If a
guard is added in the future, this test will catch the change in behaviour.
singles = 1 - 1 - 1 - 0 = -1
tb = (-1)*1 + 1*2 + 1*3 + 0*4 = -1 + 2 + 3 = 4
value = pa + tb*2 = 0 + 4*2 = 8
"""
stats = batter_stats(hits=1, doubles=1, triples=1, hr=0)
# singles will be -1; the formula does NOT clamp, so TB = 4 and value = 8.0
result = compute_batter_value(stats)
assert result == 8.0, (
f"Expected 8.0 (negative singles flows through unclamped), got {result}"
)
def test_batter_negative_singles_is_not_clamped():
"""A singles value below zero is NOT clamped to zero by the formula.
What: Confirms that singles < 0 propagates into TB rather than being
floored at 0. If clamping were added, tb would be 0*1 + 1*2 + 1*3 = 5
and value would be 10.0, not 8.0.
Why: Guards future refactors if someone adds `singles = max(0, ...)`,
this assertion will fail immediately, surfacing the behaviour change.
"""
stats = batter_stats(hits=1, doubles=1, triples=1, hr=0)
unclamped_value = compute_batter_value(stats)
# If singles were clamped to 0: tb = 0+2+3 = 5, value = 10.0
clamped_value = 10.0
assert unclamped_value != clamped_value, (
"Formula appears to clamp negative singles — behaviour has changed"
)
# ---------------------------------------------------------------------------
# T1-2: Tier boundary precision with float SP values
# ---------------------------------------------------------------------------
def test_sp_tier_just_below_t1_outs29():
"""SP with outs=29 produces IP=9.666..., which is below T1 threshold (10) → T0.
What: 29 outs / 3 = 9.6666... IP + 0 K = 9.6666... value.
The SP T1 threshold is 10.0, so this value is strictly below T1.
Why: Floating-point IP values accumulate slowly for pitchers. A bug that
truncated or rounded IP upward could cause premature tier advancement.
Verify that tier_from_value uses a >= comparison (not >) and handles
non-integer values correctly.
"""
stats = pitcher_stats(outs=29, strikeouts=0)
value = compute_sp_value(stats)
assert value == pytest.approx(29 / 3) # 9.6666...
assert value < 10.0 # strictly below T1
assert tier_from_value(value, track_dict("sp")) == 0
def test_sp_tier_exactly_t1_outs30():
"""SP with outs=30 produces IP=10.0, exactly at T1 threshold → T1.
What: 30 outs / 3 = 10.0 IP + 0 K = 10.0 value.
The SP T1 threshold is 10.0, so value == t1 satisfies the >= condition.
Why: Off-by-one or strictly-greater-than comparisons would classify
this as T0 instead of T1. The boundary value must correctly promote
to the matching tier.
"""
stats = pitcher_stats(outs=30, strikeouts=0)
value = compute_sp_value(stats)
assert value == 10.0
assert tier_from_value(value, track_dict("sp")) == 1
def test_sp_float_value_at_exact_t2_boundary():
"""SP value exactly at T2 threshold (40.0) → T2.
What: outs=120 -> IP=40.0, strikeouts=0 -> value=40.0.
T2 threshold for SP is 40. The >= comparison must promote to T2.
Why: Validates that all four tier thresholds use inclusive lower-bound
comparisons for float values, not just T1.
"""
stats = pitcher_stats(outs=120, strikeouts=0)
value = compute_sp_value(stats)
assert value == 40.0
assert tier_from_value(value, track_dict("sp")) == 2
def test_sp_float_value_just_below_t2():
"""SP value just below T2 (39.999...) stays at T1.
What: outs=119 -> IP=39.6666..., strikeouts=0 -> value=39.666...
This is strictly less than T2=40, so tier should be 1 (already past T1=10).
Why: Confirms that sub-threshold float values are not prematurely promoted
due to floating-point comparison imprecision.
"""
stats = pitcher_stats(outs=119, strikeouts=0)
value = compute_sp_value(stats)
assert value == pytest.approx(119 / 3) # 39.666...
assert value < 40.0
assert tier_from_value(value, track_dict("sp")) == 1

View File

@ -665,3 +665,136 @@ def test_auth_required_evaluate_game(client):
resp = client.post(f"/api/v2/refractor/evaluate-game/{game.id}") resp = client.post(f"/api/v2/refractor/evaluate-game/{game.id}")
assert resp.status_code == 401 assert resp.status_code == 401
# ---------------------------------------------------------------------------
# T1-3: evaluate-game with non-existent game_id
# ---------------------------------------------------------------------------
def test_evaluate_game_nonexistent_game_id(client):
"""POST /refractor/evaluate-game/99999 with a game_id that does not exist.
What: There is no StratGame row with id=99999. The endpoint queries
StratPlay for plays in that game, finds zero rows, builds an empty
pairs set, and returns without evaluating anyone.
Why: Documents the confirmed behaviour: 200 with {"evaluated": 0,
"tier_ups": []}. The endpoint does not treat a missing game as an
error because StratPlay.select().where(game_id=N) returning 0 rows is
a valid (if unusual) outcome there are simply no players to evaluate.
If the implementation is ever changed to return 404 for missing games,
this test will fail and alert the developer to update the contract.
"""
resp = client.post("/api/v2/refractor/evaluate-game/99999", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["evaluated"] == 0
assert data["tier_ups"] == []
# ---------------------------------------------------------------------------
# T2-3: evaluate-game with zero plays
# ---------------------------------------------------------------------------
def test_evaluate_game_zero_plays(client):
"""evaluate-game on a game with no StratPlay rows returns empty results.
What: Create a StratGame but insert zero StratPlay rows for it. POST
to evaluate-game for that game_id.
Why: The endpoint builds its player list from StratPlay rows. A game
with no plays has no players to evaluate. Verify the endpoint does not
crash and returns the expected empty-batch shape rather than raising a
KeyError or returning an unexpected structure.
"""
team_a = _make_team("ZP1", gmid=20101)
team_b = _make_team("ZP2", gmid=20102)
game = _make_game(team_a, team_b)
# Intentionally no plays created
resp = client.post(
f"/api/v2/refractor/evaluate-game/{game.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["evaluated"] == 0
assert data["tier_ups"] == []
# ---------------------------------------------------------------------------
# T2-9: Per-player error isolation in evaluate_game
# ---------------------------------------------------------------------------
def test_evaluate_game_error_isolation(client, monkeypatch):
"""An exception raised for one player does not abort the rest of the batch.
What: Create two batters in the same game. Both have RefractorCardState
rows. Patch evaluate_card in the refractor router to raise RuntimeError
on the first call and succeed on the second. Verify the endpoint returns
200, evaluated==1 (not 0 or 2), and no tier_ups from the failing player.
Why: The evaluate-game loop catches per-player exceptions and logs them.
If the isolation breaks, a single bad card would silently drop all
evaluations for the rest of the game. The 'evaluated' count is the
observable signal that error isolation is functioning.
Implementation note: we patch the evaluate_card function inside the
router module directly so that the test is independent of how the router
imports it. We use a counter to let the first call fail and the second
succeed.
"""
from app.services import refractor_evaluator
team_a = _make_team("EI1", gmid=20111)
team_b = _make_team("EI2", gmid=20112)
batter_fail = _make_player("WP13 Fail Batter", pos="1B")
batter_ok = _make_player("WP13 Ok Batter", pos="1B")
pitcher = _make_player("WP13 EI Pitcher", pos="SP")
game = _make_game(team_a, team_b)
# Both batters need season stats and a track/state so they are not
# skipped by the "no state" guard before evaluate_card is called.
track = _make_track(name="EI Batter Track")
_make_state(batter_fail, team_a, track)
_make_state(batter_ok, team_a, track)
_make_play(game, 1, batter_fail, team_a, pitcher, team_b, pa=1, ab=1, outs=1)
_make_play(game, 2, batter_ok, team_a, pitcher, team_b, pa=1, ab=1, outs=1)
# The real evaluate_card for batter_ok so we know what it returns
real_evaluate = refractor_evaluator.evaluate_card
call_count = {"n": 0}
fail_player_id = batter_fail.player_id
def patched_evaluate(player_id, team_id, **kwargs):
call_count["n"] += 1
if player_id == fail_player_id:
raise RuntimeError("simulated per-player error")
return real_evaluate(player_id, team_id, **kwargs)
# The router does `from ..services.refractor_evaluator import evaluate_card`
# inside the async function body, so the local import re-resolves on each
# call. Patching the function on its source module ensures the local `from`
# import picks up our patched version when the route handler executes.
monkeypatch.setattr(
"app.services.refractor_evaluator.evaluate_card", patched_evaluate
)
resp = client.post(
f"/api/v2/refractor/evaluate-game/{game.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
# One player succeeded; one was caught by the exception handler
assert data["evaluated"] == 1
# The failing player must not appear in tier_ups
failing_ids = [tu["player_id"] for tu in data["tier_ups"]]
assert fail_player_id not in failing_ids

View File

@ -325,6 +325,59 @@ class TestCareerTotals:
assert result["current_value"] == 50.0 assert result["current_value"] == 50.0
class TestFullyEvolvedPersistence:
"""T2-1: fully_evolved=True is preserved even when stats drop or are absent."""
def test_fully_evolved_persists_when_stats_zeroed(self, batter_track):
"""Card at T4/fully_evolved=True stays fully_evolved after stats are removed.
What: Set up a RefractorCardState at tier=4 with fully_evolved=True.
Then call evaluate_card with no season stats rows (zero career totals).
The evaluator computes value=0 -> new_tier=0, but current_tier must
stay at 4 (no regression) and fully_evolved must remain True.
Why: fully_evolved is a permanent achievement flag it must not be
revoked if a team's stats are rolled back, corrected, or simply not
yet imported. The no-regression rule (max(current, new)) prevents
tier demotion; this test confirms that fully_evolved follows the same
protection.
"""
# Seed state at T4 fully_evolved
_make_state(1, 1, batter_track, current_tier=4, current_value=900.0)
# No stats rows — career totals will be all zeros
# (no _make_stats call)
result = _eval(1, 1)
# The no-regression rule keeps tier at 4
assert result["current_tier"] == 4, (
f"Expected tier=4 (no regression), got {result['current_tier']}"
)
# fully_evolved must still be True since tier >= 4
assert result["fully_evolved"] is True, (
"fully_evolved was reset to False after re-evaluation with zero stats"
)
def test_fully_evolved_persists_with_partial_stats(self, batter_track):
"""Card at T4 stays fully_evolved even with stats below T1.
What: Same setup as above but with a season stats row giving value=30
(below T1=37). The computed tier would be 0, but current_tier must
not regress from 4.
Why: Validates that no-regression applies regardless of whether stats
are zero or merely insufficient for the achieved tier.
"""
_make_state(1, 1, batter_track, current_tier=4, current_value=900.0)
# pa=30 -> value=30, which is below T1=37 -> computed tier=0
_make_stats(1, 1, 1, pa=30)
result = _eval(1, 1)
assert result["current_tier"] == 4
assert result["fully_evolved"] is True
class TestMissingState: class TestMissingState:
"""ValueError when no card state exists for (player_id, team_id).""" """ValueError when no card state exists for (player_id, team_id)."""
@ -359,3 +412,212 @@ class TestReturnShape:
assert isinstance(ts, str) and len(ts) > 0 assert isinstance(ts, str) and len(ts) > 0
# Must be parseable as a datetime # Must be parseable as a datetime
datetime.fromisoformat(ts) datetime.fromisoformat(ts)
class TestFullyEvolvedFlagCorrection:
"""T3-7: fully_evolved/tier mismatch is corrected by evaluate_card.
A database corruption where fully_evolved=True but current_tier < 4 can
occur if the flag was set incorrectly by a migration or external script.
evaluate_card must re-derive fully_evolved from the freshly-computed tier
(after the no-regression max() is applied), not trust the stored flag.
"""
def test_fully_evolved_flag_corrected_when_tier_below_4(self, batter_track):
"""fully_evolved=True with current_tier=3 is corrected to False after evaluation.
What: Manually set database state to fully_evolved=True, current_tier=3
(a corruption scenario tier 3 cannot be "fully evolved" since T4 is
the maximum tier). Provide stats that compute to a value in the T3
range (value=500, which is >= T3=448 but < T4=896).
After evaluate_card:
- computed value = 500 new_tier = 3
- no-regression: max(current_tier=3, new_tier=3) = 3 tier stays 3
- fully_evolved = (3 >= 4) = False flag is corrected
Why: The evaluator always recomputes fully_evolved from the final
current_tier rather than preserving the stored flag. This ensures
that a corrupted fully_evolved=True at tier<4 is silently repaired
on the next evaluation without requiring a separate migration.
"""
# Inject corruption: fully_evolved=True but tier=3
state = CardStateStub.create(
player_id=1,
team_id=1,
track=batter_track,
current_tier=3,
current_value=500.0,
fully_evolved=True, # intentionally wrong
last_evaluated_at=None,
)
# Stats that compute to value=500: pa=500, no hits → value=500+0=500
# T3 threshold=448, T4 threshold=896 → tier=3, NOT 4
_make_stats(1, 1, 1, pa=500)
result = _eval(1, 1)
assert result["current_tier"] == 3, (
f"Expected tier=3 after evaluation with value=500, got {result['current_tier']}"
)
assert result["fully_evolved"] is False, (
"fully_evolved should have been corrected to False for tier=3, "
f"got {result['fully_evolved']}"
)
# Confirm the database row was updated (not just the return dict)
state_reloaded = CardStateStub.get_by_id(state.id)
assert state_reloaded.fully_evolved is False, (
"fully_evolved was not persisted as False after correction"
)
def test_fully_evolved_flag_preserved_when_tier_reaches_4(self, batter_track):
"""fully_evolved=True with current_tier=3 stays True when new stats push to T4.
What: Same corruption setup as above (fully_evolved=True, tier=3),
but now provide stats with value=900 (>= T4=896).
After evaluate_card:
- computed value = 900 new_tier = 4
- no-regression: max(current_tier=3, new_tier=4) = 4 advances to 4
- fully_evolved = (4 >= 4) = True flag stays True (correctly)
Why: Confirms the evaluator correctly sets fully_evolved=True when
the re-computed tier legitimately reaches T4 regardless of whether
the stored flag was already True before evaluation.
"""
CardStateStub.create(
player_id=1,
team_id=1,
track=batter_track,
current_tier=3,
current_value=500.0,
fully_evolved=True, # stored flag (will be re-derived)
last_evaluated_at=None,
)
# pa=900 → value=900 >= T4=896 → new_tier=4
_make_stats(1, 1, 1, pa=900)
result = _eval(1, 1)
assert result["current_tier"] == 4, (
f"Expected tier=4 for value=900, got {result['current_tier']}"
)
assert result["fully_evolved"] is True, (
f"Expected fully_evolved=True for tier=4, got {result['fully_evolved']}"
)
class TestMultiTeamStatIsolation:
"""T3-8: A player's refractor value is isolated to a specific team's stats.
The evaluator queries BattingSeasonStats WHERE player_id=? AND team_id=?.
When a player has stats on two different teams in the same season, each
team's RefractorCardState must reflect only that team's stats not a
combined total.
"""
def test_multi_team_same_season_stats_isolated(self, batter_track):
"""Each team's refractor value reflects only that team's stats, not combined.
What: Create one player with BattingSeasonStats on team_id=1 (pa=80)
and team_id=2 (pa=120) in the same season. Create a RefractorCardState
for each team. Evaluate each team's card separately and verify:
- Team 1 state: value = 80 tier = T1 (80 >= T1=37, < T2=149)
- Team 2 state: value = 120 tier = T1 (120 >= T1=37, < T2=149)
- Neither value equals the combined total (80+120=200 would be T2)
Why: Confirms the `WHERE player_id=? AND team_id=?` filter in the
evaluator is correctly applied. Without proper team isolation, the
combined total of 200 would cross the T2 threshold (149) and both
states would be incorrectly assigned to T2. This is a critical
correctness requirement: a player traded between teams should have
separate refractor progressions for their time with each franchise.
"""
# Stats on team 1: pa=80 → value=80 (T1: 37<=80<149)
_make_stats(player_id=1, team_id=1, season=11, pa=80)
# Stats on team 2: pa=120 → value=120 (T1: 37<=120<149)
_make_stats(player_id=1, team_id=2, season=11, pa=120)
# combined pa would be 200 → value=200 → T2 (149<=200<448)
# Each team must see only its own stats, not 200
_make_state(player_id=1, team_id=1, track=batter_track)
_make_state(player_id=1, team_id=2, track=batter_track)
result_team1 = _eval(player_id=1, team_id=1)
result_team2 = _eval(player_id=1, team_id=2)
# Team 1: only pa=80 counted → value=80 → T1
assert result_team1["current_value"] == 80.0, (
f"Team 1 value should be 80.0 (its own stats only), "
f"got {result_team1['current_value']}"
)
assert result_team1["current_tier"] == 1, (
f"Team 1 tier should be T1 for value=80, got {result_team1['current_tier']}"
)
# Team 2: only pa=120 counted → value=120 → T1
assert result_team2["current_value"] == 120.0, (
f"Team 2 value should be 120.0 (its own stats only), "
f"got {result_team2['current_value']}"
)
assert result_team2["current_tier"] == 1, (
f"Team 2 tier should be T1 for value=120, got {result_team2['current_tier']}"
)
# Sanity: neither team crossed T2 (which would happen if stats were combined)
assert (
result_team1["current_tier"] != 2 and result_team2["current_tier"] != 2
), (
"At least one team was incorrectly assigned T2 — stats may have been combined"
)
def test_multi_team_different_seasons_isolated(self, batter_track):
"""Stats for the same player across multiple seasons remain per-team isolated.
What: Same player with two seasons of stats for each of two teams:
- team_id=1: season 10 pa=90, season 11 pa=70 combined=160
- team_id=2: season 10 pa=100, season 11 pa=80 combined=180
After evaluation:
- Team 1: value=160 T2 (149<=160<448)
- Team 2: value=180 T2 (149<=180<448)
The test confirms that cross-team season aggregation does not bleed
stats from team 2 into team 1's calculation or vice versa.
Why: Multi-season aggregation and multi-team isolation must work
together. A bug that incorrectly sums all player stats regardless
of team would produce combined values of 340 T2, which coincidentally
passes, but the per-team values and tiers would be wrong.
This test uses values where cross-contamination would produce a
materially different value (340 vs 160/180), catching that class of bug.
"""
# Team 1 stats: total pa=160 → value=160 → T2
_make_stats(player_id=1, team_id=1, season=10, pa=90)
_make_stats(player_id=1, team_id=1, season=11, pa=70)
# Team 2 stats: total pa=180 → value=180 → T2
_make_stats(player_id=1, team_id=2, season=10, pa=100)
_make_stats(player_id=1, team_id=2, season=11, pa=80)
_make_state(player_id=1, team_id=1, track=batter_track)
_make_state(player_id=1, team_id=2, track=batter_track)
result_team1 = _eval(player_id=1, team_id=1)
result_team2 = _eval(player_id=1, team_id=2)
assert result_team1["current_value"] == 160.0, (
f"Team 1 multi-season value should be 160.0, got {result_team1['current_value']}"
)
assert result_team1["current_tier"] == 2, (
f"Team 1 tier should be T2 for value=160, got {result_team1['current_tier']}"
)
assert result_team2["current_value"] == 180.0, (
f"Team 2 multi-season value should be 180.0, got {result_team2['current_value']}"
)
assert result_team2["current_tier"] == 2, (
f"Team 2 tier should be T2 for value=180, got {result_team2['current_tier']}"
)

View File

@ -158,6 +158,50 @@ class TestDetermineCardType:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestDetermineCardTypeEdgeCases:
"""T2-2: Parametrized edge cases for _determine_card_type.
Covers all the boundary inputs identified in the PO review:
DH, C, 2B (batters), empty string, None, and the compound 'SP/RP'
which contains both 'SP' and 'RP' substrings.
The function checks 'SP' before 'RP'/'CP', so 'SP/RP' resolves to 'sp'.
"""
@pytest.mark.parametrize(
"pos_1, expected",
[
# Plain batter positions
("DH", "batter"),
("C", "batter"),
("2B", "batter"),
# Empty / None — fall through to batter default
("", "batter"),
(None, "batter"),
# Compound string containing 'SP' first — must resolve to 'sp'
# because _determine_card_type checks "SP" in pos.upper() before RP/CP
("SP/RP", "sp"),
],
)
def test_position_mapping(self, pos_1, expected):
"""_determine_card_type maps each pos_1 value to the expected card_type.
What: Directly exercises _determine_card_type with the given pos_1 string.
None is handled by the `(player.pos_1 or "").upper()` guard in the
implementation, so it falls through to 'batter'.
Why: The card_type string is the key used to look up a RefractorTrack.
An incorrect mapping silently assigns the wrong thresholds to a player's
entire refractor journey. Parametrized so each edge case is a
distinct, independently reported test failure.
"""
player = _FakePlayer(pos_1)
assert _determine_card_type(player) == expected, (
f"pos_1={pos_1!r}: expected {expected!r}, "
f"got {_determine_card_type(player)!r}"
)
class TestInitializeCardEvolution: class TestInitializeCardEvolution:
"""Integration tests for initialize_card_refractor against in-memory SQLite. """Integration tests for initialize_card_refractor against in-memory SQLite.

View File

@ -124,6 +124,89 @@ def test_seed_idempotent():
assert RefractorTrack.select().count() == 3 assert RefractorTrack.select().count() == 3
# ---------------------------------------------------------------------------
# T1-4: Seed threshold ordering invariant (t1 < t2 < t3 < t4 + all positive)
# ---------------------------------------------------------------------------
def test_seed_all_thresholds_strictly_ascending_after_seed():
"""After seeding, every track satisfies t1 < t2 < t3 < t4.
What: Call seed_refractor_tracks(), then assert the full ordering chain
t1 < t2 < t3 < t4 for every row in the database. Also assert that all
four thresholds are strictly positive (> 0).
Why: The refractor tier engine uses these thresholds as exclusive partition
points. If any threshold is out-of-order or zero the tier assignment
becomes incorrect or undefined. This test is the authoritative invariant
guard; if a JSON edit accidentally violates the ordering this test fails
loudly before any cards are affected.
Separate from test_seed_thresholds_ascending which was written earlier
this test combines ordering + positivity into a single explicit assertion
block and uses more descriptive messages to aid debugging.
"""
seed_refractor_tracks()
for track in RefractorTrack.select():
assert track.t1_threshold > 0, (
f"{track.name}: t1_threshold={track.t1_threshold} is not positive"
)
assert track.t2_threshold > 0, (
f"{track.name}: t2_threshold={track.t2_threshold} is not positive"
)
assert track.t3_threshold > 0, (
f"{track.name}: t3_threshold={track.t3_threshold} is not positive"
)
assert track.t4_threshold > 0, (
f"{track.name}: t4_threshold={track.t4_threshold} is not positive"
)
assert (
track.t1_threshold
< track.t2_threshold
< track.t3_threshold
< track.t4_threshold
), (
f"{track.name}: thresholds are not strictly ascending: "
f"t1={track.t1_threshold}, t2={track.t2_threshold}, "
f"t3={track.t3_threshold}, t4={track.t4_threshold}"
)
# ---------------------------------------------------------------------------
# T2-10: Duplicate card_type tracks guard
# ---------------------------------------------------------------------------
def test_seed_each_card_type_has_exactly_one_track():
"""Each card_type must appear exactly once across all RefractorTrack rows.
What: After seeding, group the rows by card_type and assert that every
card_type has a count of exactly 1.
Why: RefractorTrack rows are looked up by card_type (e.g.
RefractorTrack.get(card_type='batter')). If a card_type appears more
than once, Peewee's .get() raises MultipleObjectsReturned, crashing
every pack opening and card evaluation for that type. This test acts as
a uniqueness contract so that seed bugs or accidental DB drift surface
immediately.
"""
seed_refractor_tracks()
from peewee import fn as peewee_fn
# Group by card_type and count occurrences
query = (
RefractorTrack.select(
RefractorTrack.card_type, peewee_fn.COUNT(RefractorTrack.id).alias("cnt")
)
.group_by(RefractorTrack.card_type)
.tuples()
)
for card_type, count in query:
assert count == 1, (
f"card_type={card_type!r} has {count} tracks; expected exactly 1"
)
def test_seed_updates_on_rerun(json_tracks): def test_seed_updates_on_rerun(json_tracks):
"""A second seed call must restore any manually changed threshold to the JSON value. """A second seed call must restore any manually changed threshold to the JSON value.

View File

@ -34,12 +34,47 @@ Test matrix
test_get_card_404_no_state -- card with no RefractorCardState returns 404 test_get_card_404_no_state -- card with no RefractorCardState returns 404
test_duplicate_cards_share_state -- two cards same player+team return the same state row test_duplicate_cards_share_state -- two cards same player+team return the same state row
test_auth_required -- missing token returns 401 on both endpoints test_auth_required -- missing token returns 401 on both endpoints
Tier 3 tests (T3-6) use a SQLite-backed TestClient and run without a PostgreSQL
connection. They test GET /api/v2/refractor/cards/{card_id} when the state row
has last_evaluated_at=None (card initialised but never evaluated).
test_get_card_state_last_evaluated_at_null -- last_evaluated_at: null in response
""" """
import os import os
os.environ.setdefault("API_TOKEN", "test-token")
import pytest import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from peewee import SqliteDatabase
from app.db_engine import (
BattingSeasonStats,
Card,
Cardset,
Decision,
Event,
MlbPlayer,
Pack,
PackType,
PitchingSeasonStats,
Player,
ProcessedGame,
Rarity,
RefractorCardState,
RefractorCosmetic,
RefractorTierBoost,
RefractorTrack,
Roster,
RosterSlot,
ScoutClaim,
ScoutOpportunity,
StratGame,
StratPlay,
Team,
)
POSTGRES_HOST = os.environ.get("POSTGRES_HOST") POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif( _skip_no_pg = pytest.mark.skipif(
@ -607,3 +642,317 @@ def test_auth_required(client, seeded_data):
resp_card = client.get(f"/api/v2/refractor/cards/{card_id}") resp_card = client.get(f"/api/v2/refractor/cards/{card_id}")
assert resp_card.status_code == 401 assert resp_card.status_code == 401
# ===========================================================================
# SQLite-backed tests for T2-4, T2-5, T2-6, T3-6
#
# These tests use the same shared-memory SQLite pattern as test_postgame_refractor
# so they run without a PostgreSQL connection. They test the
# GET /api/v2/teams/{team_id}/refractors, POST /refractor/cards/{card_id}/evaluate,
# and GET /api/v2/refractor/cards/{card_id} endpoints in isolation.
# ===========================================================================
_state_api_db = SqliteDatabase(
"file:stateapitest?mode=memory&cache=shared",
uri=True,
pragmas={"foreign_keys": 1},
)
_STATE_API_MODELS = [
Rarity,
Event,
Cardset,
MlbPlayer,
Player,
Team,
PackType,
Pack,
Card,
Roster,
RosterSlot,
StratGame,
StratPlay,
Decision,
ScoutOpportunity,
ScoutClaim,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
RefractorTrack,
RefractorCardState,
RefractorTierBoost,
RefractorCosmetic,
]
@pytest.fixture(autouse=False)
def setup_state_api_db():
"""Bind state-api test models to shared-memory SQLite and create tables.
Not autouse only the SQLite-backed tests in this section depend on it.
"""
_state_api_db.bind(_STATE_API_MODELS)
_state_api_db.connect(reuse_if_open=True)
_state_api_db.create_tables(_STATE_API_MODELS)
yield _state_api_db
_state_api_db.drop_tables(list(reversed(_STATE_API_MODELS)), safe=True)
def _build_state_api_app() -> FastAPI:
"""Minimal FastAPI app with teams + refractor routers for SQLite tests."""
from app.routers_v2.teams import router as teams_router
from app.routers_v2.refractor import router as refractor_router
app = FastAPI()
@app.middleware("http")
async def db_middleware(request: Request, call_next):
_state_api_db.connect(reuse_if_open=True)
return await call_next(request)
app.include_router(teams_router)
app.include_router(refractor_router)
return app
@pytest.fixture
def state_api_client(setup_state_api_db):
"""FastAPI TestClient for the SQLite-backed state API tests."""
with TestClient(_build_state_api_app()) as c:
yield c
# ---------------------------------------------------------------------------
# Helper factories for SQLite-backed tests
# ---------------------------------------------------------------------------
def _sa_make_rarity():
r, _ = Rarity.get_or_create(
value=50, name="SA_Common", defaults={"color": "#aabbcc"}
)
return r
def _sa_make_cardset():
cs, _ = Cardset.get_or_create(
name="SA Test Set",
defaults={"description": "state api test", "total_cards": 10},
)
return cs
def _sa_make_team(abbrev: str, gmid: int) -> Team:
return Team.create(
abbrev=abbrev,
sname=abbrev,
lname=f"Team {abbrev}",
gmid=gmid,
gmname=f"gm_{abbrev.lower()}",
gsheet="https://docs.google.com/sa_test",
wallet=500,
team_value=1000,
collection_value=1000,
season=11,
is_ai=False,
)
def _sa_make_player(name: str, pos: str = "1B") -> Player:
return Player.create(
p_name=name,
rarity=_sa_make_rarity(),
cardset=_sa_make_cardset(),
set_num=1,
pos_1=pos,
image="https://example.com/sa.png",
mlbclub="TST",
franchise="TST",
description=f"sa test: {name}",
)
def _sa_make_track(card_type: str = "batter") -> RefractorTrack:
track, _ = RefractorTrack.get_or_create(
name=f"SA {card_type} Track",
defaults=dict(
card_type=card_type,
formula="pa + tb * 2",
t1_threshold=37,
t2_threshold=149,
t3_threshold=448,
t4_threshold=896,
),
)
return track
def _sa_make_pack(team: Team) -> Pack:
pt, _ = PackType.get_or_create(
name="SA PackType",
defaults={"cost": 100, "card_count": 5, "description": "sa test pack type"},
)
return Pack.create(team=team, pack_type=pt)
def _sa_make_card(player: Player, team: Team) -> Card:
pack = _sa_make_pack(team)
return Card.create(player=player, team=team, pack=pack, value=0)
def _sa_make_state(player, team, track, current_tier=0, current_value=0.0):
return RefractorCardState.create(
player=player,
team=team,
track=track,
current_tier=current_tier,
current_value=current_value,
fully_evolved=False,
last_evaluated_at=None,
)
# ---------------------------------------------------------------------------
# T2-4: GET /teams/{valid_team_id}/refractors — team exists, zero states
# ---------------------------------------------------------------------------
def test_team_refractors_zero_states(setup_state_api_db, state_api_client):
"""GET /teams/{id}/refractors for a team with no RefractorCardState rows.
What: Create a Team with no associated RefractorCardState rows.
Call the endpoint and verify the response is {"count": 0, "items": []}.
Why: The endpoint uses a JOIN from RefractorCardState to RefractorTrack
filtered by team_id. If the WHERE produces no rows, the correct response
is an empty list with count=0, not a 404 or 500. This is the base-case
for a newly-created team that hasn't opened any packs yet.
"""
team = _sa_make_team("SA4", gmid=30041)
resp = state_api_client.get(
f"/api/v2/teams/{team.id}/refractors", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0
assert data["items"] == []
# ---------------------------------------------------------------------------
# T2-5: GET /teams/99999/refractors — non-existent team
# ---------------------------------------------------------------------------
def test_team_refractors_nonexistent_team(setup_state_api_db, state_api_client):
"""GET /teams/99999/refractors where team_id 99999 does not exist.
What: Call the endpoint with a team_id that has no Team row and no
RefractorCardState rows.
Why: Documents the confirmed behaviour: 200 with {"count": 0, "items": []}.
The endpoint queries RefractorCardState WHERE team_id=99999. Because no
state rows reference that team, the result is an empty list. The endpoint
does NOT validate that the Team row itself exists, so it does not return 404.
If the implementation is ever changed to validate team existence and return
404 for missing teams, this test will fail and surface the contract change.
"""
resp = state_api_client.get("/api/v2/teams/99999/refractors", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
# No state rows reference team 99999 — empty list with count=0
assert data["count"] == 0
assert data["items"] == []
# ---------------------------------------------------------------------------
# T2-6: POST /refractor/cards/{card_id}/evaluate — zero season stats → T0
# ---------------------------------------------------------------------------
def test_evaluate_card_zero_stats_stays_t0(setup_state_api_db, state_api_client):
"""POST /cards/{card_id}/evaluate for a card with no season stats stays at T0.
What: Create a Player, Team, Card, and RefractorCardState. Do NOT create
any BattingSeasonStats rows for this player+team. Call the evaluate
endpoint. The response must show current_tier=0 and current_value=0.0.
Why: A player who has never appeared in a game has zero career stats.
The evaluator sums all stats rows (none) -> all-zero totals ->
compute_batter_value(zeros) = 0.0 -> tier_from_value(0.0) = T0.
Verifies the happy-path zero-stats case returns a valid response rather
than crashing on an empty aggregation.
"""
team = _sa_make_team("SA6", gmid=30061)
player = _sa_make_player("SA6 Batter", pos="1B")
track = _sa_make_track("batter")
card = _sa_make_card(player, team)
_sa_make_state(player, team, track, current_tier=0, current_value=0.0)
# No BattingSeasonStats rows — intentionally empty
resp = state_api_client.post(
f"/api/v2/refractor/cards/{card.id}/evaluate", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["current_tier"] == 0
assert data["current_value"] == 0.0
# ---------------------------------------------------------------------------
# T3-6: GET /refractor/cards/{card_id} — last_evaluated_at is None
# ---------------------------------------------------------------------------
def test_get_card_state_last_evaluated_at_null(setup_state_api_db, state_api_client):
"""GET /refractor/cards/{card_id} returns last_evaluated_at: null for un-evaluated card.
What: Create a Player, Team, Card, and RefractorCardState where
last_evaluated_at is explicitly None (the state was initialised via a
pack-open hook but has never been through the evaluator). Call
GET /api/v2/refractor/cards/{card_id} and verify:
- The response status is 200 (not a 500 crash from calling .isoformat() on None).
- The response body contains the key 'last_evaluated_at'.
- The value of 'last_evaluated_at' is JSON null (Python None after parsing).
Why: The _build_card_state_response helper serialises last_evaluated_at
with `state.last_evaluated_at.isoformat() if state.last_evaluated_at else None`.
This test confirms that the None branch is exercised and the field is always
present in the response envelope. Callers must be able to distinguish
"never evaluated" (null) from a real ISO-8601 timestamp, and the API must
not crash on a newly-created card that has not yet been evaluated.
"""
team = _sa_make_team("SA_T36", gmid=30360)
player = _sa_make_player("T36 Batter", pos="1B")
track = _sa_make_track("batter")
card = _sa_make_card(player, team)
# Create state with last_evaluated_at=None — simulates a freshly initialised
# card that has not yet been through the evaluator
RefractorCardState.create(
player=player,
team=team,
track=track,
current_tier=0,
current_value=0.0,
fully_evolved=False,
last_evaluated_at=None,
)
resp = state_api_client.get(
f"/api/v2/refractor/cards/{card.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}"
data = resp.json()
# 'last_evaluated_at' must be present as a key even when the value is null
assert "last_evaluated_at" in data, (
"Response is missing the 'last_evaluated_at' key"
)
assert data["last_evaluated_at"] is None, (
f"Expected last_evaluated_at=null for un-evaluated card, "
f"got {data['last_evaluated_at']!r}"
)

View File

@ -11,12 +11,47 @@ Tests auto-skip when POSTGRES_HOST is not set.
Test data is inserted via psycopg2 before the test module runs and deleted Test data is inserted via psycopg2 before the test module runs and deleted
afterwards so the tests are repeatable. ON CONFLICT keeps the table clean afterwards so the tests are repeatable. ON CONFLICT keeps the table clean
even if a previous run did not complete teardown. even if a previous run did not complete teardown.
Tier 3 tests (T3-1) in this file use a SQLite-backed TestClient so they run
without a PostgreSQL connection. They test the card_type filter edge cases:
an unrecognised card_type string and an empty string should both return an
empty list (200 with count=0) rather than an error.
""" """
import os import os
import pytest import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from peewee import SqliteDatabase
os.environ.setdefault("API_TOKEN", "test-token")
from app.db_engine import ( # noqa: E402
BattingSeasonStats,
Card,
Cardset,
Decision,
Event,
MlbPlayer,
Pack,
PackType,
PitchingSeasonStats,
Player,
ProcessedGame,
Rarity,
RefractorCardState,
RefractorCosmetic,
RefractorTierBoost,
RefractorTrack,
Roster,
RosterSlot,
ScoutClaim,
ScoutOpportunity,
StratGame,
StratPlay,
Team,
)
POSTGRES_HOST = os.environ.get("POSTGRES_HOST") POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif( _skip_no_pg = pytest.mark.skipif(
@ -130,3 +165,172 @@ def test_auth_required(client, seeded_tracks):
track_id = seeded_tracks[0] track_id = seeded_tracks[0]
resp_single = client.get(f"/api/v2/refractor/tracks/{track_id}") resp_single = client.get(f"/api/v2/refractor/tracks/{track_id}")
assert resp_single.status_code == 401 assert resp_single.status_code == 401
# ===========================================================================
# SQLite-backed tests for T3-1: invalid card_type query parameter
#
# These tests run without a PostgreSQL connection. They verify that the
# card_type filter on GET /api/v2/refractor/tracks handles values that match
# no known track (an unrecognised string, an empty string) gracefully: the
# endpoint must return 200 with {"count": 0, "items": []}, not a 4xx/5xx.
# ===========================================================================
_track_api_db = SqliteDatabase(
"file:trackapitest?mode=memory&cache=shared",
uri=True,
pragmas={"foreign_keys": 1},
)
_TRACK_API_MODELS = [
Rarity,
Event,
Cardset,
MlbPlayer,
Player,
Team,
PackType,
Pack,
Card,
Roster,
RosterSlot,
StratGame,
StratPlay,
Decision,
ScoutOpportunity,
ScoutClaim,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
RefractorTrack,
RefractorCardState,
RefractorTierBoost,
RefractorCosmetic,
]
@pytest.fixture(autouse=False)
def setup_track_api_db():
"""Bind track-API test models to shared-memory SQLite and create tables.
Inserts exactly two tracks (batter, sp) so the filter tests have a
non-empty table to query against confirming that the WHERE predicate
excludes them rather than the table simply being empty.
"""
_track_api_db.bind(_TRACK_API_MODELS)
_track_api_db.connect(reuse_if_open=True)
_track_api_db.create_tables(_TRACK_API_MODELS)
# Seed two real tracks so the table is not empty
RefractorTrack.get_or_create(
name="T3-1 Batter Track",
defaults=dict(
card_type="batter",
formula="pa + tb * 2",
t1_threshold=37,
t2_threshold=149,
t3_threshold=448,
t4_threshold=896,
),
)
RefractorTrack.get_or_create(
name="T3-1 SP Track",
defaults=dict(
card_type="sp",
formula="ip + k",
t1_threshold=10,
t2_threshold=40,
t3_threshold=120,
t4_threshold=240,
),
)
yield _track_api_db
_track_api_db.drop_tables(list(reversed(_TRACK_API_MODELS)), safe=True)
def _build_track_api_app() -> FastAPI:
"""Minimal FastAPI app containing only the refractor router for T3-1 tests."""
from app.routers_v2.refractor import router as refractor_router
app = FastAPI()
@app.middleware("http")
async def db_middleware(request: Request, call_next):
_track_api_db.connect(reuse_if_open=True)
return await call_next(request)
app.include_router(refractor_router)
return app
@pytest.fixture
def track_api_client(setup_track_api_db):
"""FastAPI TestClient for the SQLite-backed T3-1 track filter tests."""
with TestClient(_build_track_api_app()) as c:
yield c
# ---------------------------------------------------------------------------
# T3-1a: card_type=foo (unrecognised value) returns empty list
# ---------------------------------------------------------------------------
def test_invalid_card_type_returns_empty_list(setup_track_api_db, track_api_client):
"""GET /tracks?card_type=foo returns 200 with count=0, not a 4xx/5xx.
What: Query the track list with a card_type value ('foo') that matches
no row in refractor_track. The table contains batter and sp tracks so
the result must be an empty list rather than a full list (which would
indicate the filter was ignored).
Why: The endpoint applies `WHERE card_type == card_type` when the
parameter is not None. An unrecognised value is a valid no-match query
the contract is an empty list, not a validation error. Returning
a 422 Unprocessable Entity or 500 here would break clients that probe
for tracks by card type before knowing which types are registered.
"""
resp = track_api_client.get(
"/api/v2/refractor/tracks?card_type=foo", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0, (
f"Expected count=0 for unknown card_type 'foo', got {data['count']}"
)
assert data["items"] == [], (
f"Expected empty items list for unknown card_type 'foo', got {data['items']}"
)
# ---------------------------------------------------------------------------
# T3-1b: card_type= (empty string) returns empty list
# ---------------------------------------------------------------------------
def test_empty_string_card_type_returns_empty_list(
setup_track_api_db, track_api_client
):
"""GET /tracks?card_type= (empty string) returns 200 with count=0.
What: Pass an empty string as the card_type query parameter. No track
has card_type='' so the response must be an empty list with count=0.
Why: An empty string is not None FastAPI will pass it through as ''
rather than treating it as an absent parameter. The WHERE predicate
`card_type == ''` produces no matches, which is the correct silent
no-results behaviour. This guards against regressions where an empty
string might be mishandled as a None/absent value and accidentally return
all tracks, or raise a server error.
"""
resp = track_api_client.get(
"/api/v2/refractor/tracks?card_type=", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0, (
f"Expected count=0 for empty card_type string, got {data['count']}"
)
assert data["items"] == [], (
f"Expected empty items list for empty card_type string, got {data['items']}"
)