Merge branch 'main' into issue/143-feat-add-limit-pagination-to-paperdex-endpoint

This commit is contained in:
cal 2026-03-25 14:52:41 +00:00
commit 2c077d0fd3
26 changed files with 2413 additions and 495 deletions

View File

@ -1245,6 +1245,13 @@ refractor_card_state_index = ModelIndex(
)
RefractorCardState.add_index(refractor_card_state_index)
refractor_card_state_team_index = ModelIndex(
RefractorCardState,
(RefractorCardState.team,),
unique=False,
)
RefractorCardState.add_index(refractor_card_state_team_index)
class RefractorTierBoost(BaseModel):
track = ForeignKeyField(RefractorTrack)

View File

@ -8,16 +8,13 @@ from ..db_engine import Award, model_to_dict, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA
router = APIRouter(
prefix='/api/v2/awards',
tags=['awards']
)
router = APIRouter(prefix="/api/v2/awards", tags=["awards"])
class AwardModel(pydantic.BaseModel):
name: str
season: int
timing: str = 'In-Season'
timing: str = "In-Season"
card_id: Optional[int] = None
team_id: Optional[int] = None
image: Optional[str] = None
@ -28,15 +25,21 @@ class AwardReturnList(pydantic.BaseModel):
awards: list[AwardModel]
@router.get('')
@router.get("")
async def get_awards(
name: Optional[str] = None, season: Optional[int] = None, timing: Optional[str] = None,
card_id: Optional[int] = None, team_id: Optional[int] = None, image: Optional[str] = None,
csv: Optional[bool] = None):
name: Optional[str] = None,
season: Optional[int] = None,
timing: Optional[str] = None,
card_id: Optional[int] = None,
team_id: Optional[int] = None,
image: Optional[str] = None,
csv: Optional[bool] = None,
limit: int = 100,
):
all_awards = Award.select().order_by(Award.id)
if all_awards.count() == 0:
raise HTTPException(status_code=404, detail=f'There are no awards to filter')
raise HTTPException(status_code=404, detail="There are no awards to filter")
if name is not None:
all_awards = all_awards.where(Award.name == name)
@ -51,53 +54,74 @@ async def get_awards(
if image is not None:
all_awards = all_awards.where(Award.image == image)
limit = max(0, min(limit, 500))
total_count = all_awards.count() if not csv else 0
all_awards = all_awards.limit(limit)
if csv:
data_list = [['id', 'name', 'season', 'timing', 'card', 'team', 'image']]
data_list = [["id", "name", "season", "timing", "card", "team", "image"]]
for line in all_awards:
data_list.append([
line.id, line.name, line.season, line.timing, line.card, line.team, line.image
])
data_list.append(
[
line.id,
line.name,
line.season,
line.timing,
line.card,
line.team,
line.image,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_awards.count(), 'awards': []}
return_val = {"count": total_count, "awards": []}
for x in all_awards:
return_val['awards'].append(model_to_dict(x))
return_val["awards"].append(model_to_dict(x))
return return_val
@router.get('/{award_id}')
@router.get("/{award_id}")
async def get_one_award(award_id, csv: Optional[bool] = None):
try:
this_award = Award.get_by_id(award_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
raise HTTPException(
status_code=404, detail=f"No award found with id {award_id}"
)
if csv:
data_list = [
['id', 'name', 'season', 'timing', 'card', 'team', 'image'],
[this_award.id, this_award.name, this_award.season, this_award.timing, this_award.card,
this_award.team, this_award.image]
["id", "name", "season", "timing", "card", "team", "image"],
[
this_award.id,
this_award.name,
this_award.season,
this_award.timing,
this_award.card,
this_award.team,
this_award.image,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_award)
return return_val
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
@router.post("", include_in_schema=PRIVATE_IN_SCHEMA)
async def post_awards(award: AwardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post awards. This event has been logged.'
detail="You are not authorized to post awards. This event has been logged.",
)
this_award = Award(
@ -106,7 +130,7 @@ async def post_awards(award: AwardModel, token: str = Depends(oauth2_scheme)):
timing=award.season,
card_id=award.card_id,
team_id=award.team_id,
image=award.image
image=award.image,
)
saved = this_award.save()
@ -116,28 +140,30 @@ async def post_awards(award: AwardModel, token: str = Depends(oauth2_scheme)):
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
detail="Well slap my ass and call me a teapot; I could not save that roster",
)
@router.delete('/{award_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@router.delete("/{award_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_award(award_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete awards. This event has been logged.'
detail="You are not authorized to delete awards. This event has been logged.",
)
try:
this_award = Award.get_by_id(award_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
raise HTTPException(
status_code=404, detail=f"No award found with id {award_id}"
)
count = this_award.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Award {award_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Award {award_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Award {award_id} was not deleted')
raise HTTPException(status_code=500, detail=f"Award {award_id} was not deleted")

View File

@ -6,14 +6,20 @@ import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, BattingStat, model_to_dict, fn, Card, Player, Current, DoesNotExist
from ..db_engine import (
db,
BattingStat,
model_to_dict,
fn,
Card,
Player,
Current,
DoesNotExist,
)
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA
router = APIRouter(
prefix='/api/v2/batstats',
tags=['Pre-Season 7 Batting Stats']
)
router = APIRouter(prefix="/api/v2/batstats", tags=["Pre-Season 7 Batting Stats"])
class BatStat(pydantic.BaseModel):
@ -50,7 +56,7 @@ class BatStat(pydantic.BaseModel):
csc: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000)
game_id: int
@ -63,10 +69,20 @@ class BatStatReturnList(pydantic.BaseModel):
stats: list[BatStat]
@router.get('', response_model=BatStatReturnList)
@router.get("", response_model=BatStatReturnList)
async def get_batstats(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, csv: bool = None):
card_id: int = None,
player_id: int = None,
team_id: int = None,
vs_team_id: int = None,
week: int = None,
season: int = None,
week_start: int = None,
week_end: int = None,
created: int = None,
csv: bool = None,
limit: Optional[int] = 100,
):
all_stats = BattingStat.select().join(Card).join(Player).order_by(BattingStat.id)
if season is not None:
@ -98,41 +114,123 @@ async def get_batstats(
# db.close()
# raise HTTPException(status_code=404, detail=f'No batting stats found')
limit = max(0, min(limit, 500))
total_count = all_stats.count() if not csv else 0
all_stats = all_stats.limit(limit)
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'pos', 'pa', 'ab', 'run', 'hit', 'rbi', 'double',
'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b',
'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', 'week', 'season', 'created', 'game_id', 'roster_num']]
data_list = [
[
"id",
"card_id",
"player_id",
"cardset",
"team",
"vs_team",
"pos",
"pa",
"ab",
"run",
"hit",
"rbi",
"double",
"triple",
"hr",
"bb",
"so",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
"bphr",
"bpfo",
"bp1b",
"bplo",
"xch",
"xhit",
"error",
"pb",
"sbc",
"csc",
"week",
"season",
"created",
"game_id",
"roster_num",
]
]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev,
line.pos, line.pa, line.ab, line.run, line.hit, line.rbi, line.double, line.triple, line.hr,
line.bb, line.so, line.hbp, line.sac, line.ibb, line.gidp, line.sb, line.cs, line.bphr, line.bpfo,
line.bp1b, line.bplo, line.xch, line.xhit, line.error, line.pb, line.sbc, line.csc, line.week,
line.season, line.created, line.game_id, line.roster_num
line.id,
line.card.id,
line.card.player.player_id,
line.card.player.cardset.name,
line.team.abbrev,
line.vs_team.abbrev,
line.pos,
line.pa,
line.ab,
line.run,
line.hit,
line.rbi,
line.double,
line.triple,
line.hr,
line.bb,
line.so,
line.hbp,
line.sac,
line.ibb,
line.gidp,
line.sb,
line.cs,
line.bphr,
line.bpfo,
line.bp1b,
line.bplo,
line.xch,
line.xhit,
line.error,
line.pb,
line.sbc,
line.csc,
line.week,
line.season,
line.created,
line.game_id,
line.roster_num,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_stats.count(), 'stats': []}
return_val = {"count": total_count, "stats": []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
return_val["stats"].append(model_to_dict(x, recurse=False))
return return_val
@router.get('/player/{player_id}', response_model=BatStat)
@router.get("/player/{player_id}", response_model=BatStat)
async def get_player_stats(
player_id: int, team_id: int = None, vs_team_id: int = None, week_start: int = None, week_end: int = None,
csv: bool = None):
all_stats = (BattingStat
.select(fn.COUNT(BattingStat.created).alias('game_count'))
.join(Card)
.group_by(BattingStat.card)
.where(BattingStat.card.player == player_id)).scalar()
player_id: int,
team_id: int = None,
vs_team_id: int = None,
week_start: int = None,
week_end: int = None,
csv: bool = None,
):
all_stats = (
BattingStat.select(fn.COUNT(BattingStat.created).alias("game_count"))
.join(Card)
.group_by(BattingStat.card)
.where(BattingStat.card.player == player_id)
).scalar()
if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id)
@ -146,37 +244,82 @@ async def get_player_stats(
if csv:
data_list = [
[
'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp',
'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc',
],[
all_stats.pa_sum, all_stats.ab_sum, all_stats.run, all_stats.hit_sum, all_stats.rbi_sum,
all_stats.double_sum, all_stats.triple_sum, all_stats.hr_sum, all_stats.bb_sum, all_stats.so_sum,
all_stats.hbp_sum, all_stats.sac, all_stats.ibb_sum, all_stats.gidp_sum, all_stats.sb_sum,
all_stats.cs_sum, all_stats.bphr_sum, all_stats.bpfo_sum, all_stats.bp1b_sum, all_stats.bplo_sum,
all_stats.xch, all_stats.xhit_sum, all_stats.error_sum, all_stats.pb_sum, all_stats.sbc_sum,
all_stats.csc_sum
]
"pa",
"ab",
"run",
"hit",
"rbi",
"double",
"triple",
"hr",
"bb",
"so",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
"bphr",
"bpfo",
"bp1b",
"bplo",
"xch",
"xhit",
"error",
"pb",
"sbc",
"csc",
],
[
all_stats.pa_sum,
all_stats.ab_sum,
all_stats.run,
all_stats.hit_sum,
all_stats.rbi_sum,
all_stats.double_sum,
all_stats.triple_sum,
all_stats.hr_sum,
all_stats.bb_sum,
all_stats.so_sum,
all_stats.hbp_sum,
all_stats.sac,
all_stats.ibb_sum,
all_stats.gidp_sum,
all_stats.sb_sum,
all_stats.cs_sum,
all_stats.bphr_sum,
all_stats.bpfo_sum,
all_stats.bp1b_sum,
all_stats.bplo_sum,
all_stats.xch,
all_stats.xhit_sum,
all_stats.error_sum,
all_stats.pb_sum,
all_stats.sbc_sum,
all_stats.csc_sum,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
logging.debug(f'stat pull query: {all_stats}\n')
logging.debug(f"stat pull query: {all_stats}\n")
# logging.debug(f'result 0: {all_stats[0]}\n')
for x in all_stats:
logging.debug(f'this_line: {model_to_dict(x)}')
logging.debug(f"this_line: {model_to_dict(x)}")
return_val = model_to_dict(all_stats[0])
return return_val
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
@router.post("", include_in_schema=PRIVATE_IN_SCHEMA)
async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
detail="You are not authorized to post stats. This event has been logged.",
)
new_stats = []
@ -215,36 +358,40 @@ async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_sch
csc=x.csc,
week=x.week,
season=x.season,
created=datetime.fromtimestamp(x.created / 1000) if x.created else datetime.now(),
game_id=x.game_id
created=datetime.fromtimestamp(x.created / 1000)
if x.created
else datetime.now(),
game_id=x.game_id,
)
new_stats.append(this_stat)
with db.atomic():
BattingStat.bulk_create(new_stats, batch_size=15)
raise HTTPException(status_code=200, detail=f'{len(new_stats)} batting lines have been added')
raise HTTPException(
status_code=200, detail=f"{len(new_stats)} batting lines have been added"
)
@router.delete('/{stat_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@router.delete("/{stat_id}", include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
detail="You are not authorized to delete stats. This event has been logged.",
)
try:
this_stat = BattingStat.get_by_id(stat_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
raise HTTPException(status_code=404, detail=f"No stat found with id {stat_id}")
count = this_stat.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
raise HTTPException(status_code=200, detail=f"Stat {stat_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')
raise HTTPException(status_code=500, detail=f"Stat {stat_id} was not deleted")
# @app.get('/api/v1/plays/batting')
@ -449,4 +596,3 @@ async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)):
# }
# db.close()
# return return_stats

View File

@ -145,6 +145,7 @@ async def get_card_ratings(
vs_hand: Literal["R", "L", "vR", "vL"] = None,
short_output: bool = False,
csv: bool = False,
limit: int = 100,
):
this_team = Team.get_or_none(Team.id == team_id)
logging.debug(f"Team: {this_team} / has_guide: {this_team.has_guide}")
@ -178,6 +179,9 @@ async def get_card_ratings(
)
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
total_count = all_ratings.count() if not csv else 0
all_ratings = all_ratings.limit(max(0, min(limit, 500)))
if csv:
# return_val = query_to_csv(all_ratings)
return_vals = [model_to_dict(x) for x in all_ratings]
@ -192,7 +196,7 @@ async def get_card_ratings(
else:
return_val = {
"count": all_ratings.count(),
"count": total_count,
"ratings": [
model_to_dict(x, recurse=not short_output) for x in all_ratings
],
@ -281,7 +285,7 @@ def get_scouting_dfs(cardset_id: list = None):
)
]
),
name=f"Arm OF",
name="Arm OF",
)
)
series_list.append(
@ -292,7 +296,7 @@ def get_scouting_dfs(cardset_id: list = None):
for x in positions.where(CardPosition.position == "C")
]
),
name=f"Arm C",
name="Arm C",
)
)
series_list.append(
@ -303,7 +307,7 @@ def get_scouting_dfs(cardset_id: list = None):
for x in positions.where(CardPosition.position == "C")
]
),
name=f"PB C",
name="PB C",
)
)
series_list.append(
@ -314,7 +318,7 @@ def get_scouting_dfs(cardset_id: list = None):
for x in positions.where(CardPosition.position == "C")
]
),
name=f"Throw C",
name="Throw C",
)
)
logging.debug(f"series_list: {series_list}")
@ -334,9 +338,9 @@ async def get_card_scouting(team_id: int, ts: str):
"https://ko-fi.com/manticorum/shop"
)
if os.path.isfile(f"storage/batting-ratings.csv"):
if os.path.isfile("storage/batting-ratings.csv"):
return FileResponse(
path=f"storage/batting-ratings.csv",
path="storage/batting-ratings.csv",
media_type="text/csv",
# headers=headers
)
@ -354,7 +358,7 @@ async def post_calc_scouting(token: str = Depends(oauth2_scheme)):
status_code=401, detail="You are not authorized to calculate card ratings."
)
logging.warning(f"Re-calculating batting ratings\n\n")
logging.warning("Re-calculating batting ratings\n\n")
output = get_scouting_dfs()
first = ["player_id", "player_name", "cardset_name", "rarity", "hand", "variant"]
@ -370,9 +374,9 @@ async def post_calc_scouting(token: str = Depends(oauth2_scheme)):
@router.get("/basic")
async def get_basic_scouting(cardset_id: list = Query(default=None)):
if os.path.isfile(f"storage/batting-basic.csv"):
if os.path.isfile("storage/batting-basic.csv"):
return FileResponse(
path=f"storage/batting-basic.csv",
path="storage/batting-basic.csv",
media_type="text/csv",
# headers=headers
)
@ -390,7 +394,7 @@ async def post_calc_basic(token: str = Depends(oauth2_scheme)):
status_code=401, detail="You are not authorized to calculate basic ratings."
)
logging.warning(f"Re-calculating basic batting ratings\n\n")
logging.warning("Re-calculating basic batting ratings\n\n")
raw_data = get_scouting_dfs()
logging.debug(f"output: {raw_data}")
@ -667,9 +671,11 @@ async def get_player_ratings(
if variant is not None:
all_cards = all_cards.where(BattingCard.variant << variant)
all_ratings = BattingCardRatings.select().where(
BattingCardRatings.battingcard << all_cards
).order_by(BattingCardRatings.id)
all_ratings = (
BattingCardRatings.select()
.where(BattingCardRatings.battingcard << all_cards)
.order_by(BattingCardRatings.id)
)
return_val = {
"count": all_ratings.count(),

View File

@ -8,10 +8,7 @@ from ..db_engine import Event, model_to_dict, fn, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/events',
tags=['events']
)
router = APIRouter(prefix="/api/v2/events", tags=["events"])
class EventModel(pydantic.BaseModel):
@ -23,76 +20,102 @@ class EventModel(pydantic.BaseModel):
active: Optional[bool] = False
@router.get('')
@router.get("")
async def v1_events_get(
name: Optional[str] = None, in_desc: Optional[str] = None, active: Optional[bool] = None,
csv: Optional[bool] = None):
name: Optional[str] = None,
in_desc: Optional[str] = None,
active: Optional[bool] = None,
csv: Optional[bool] = None,
limit: Optional[int] = 100,
):
all_events = Event.select().order_by(Event.id)
if name is not None:
all_events = all_events.where(fn.Lower(Event.name) == name.lower())
if in_desc is not None:
all_events = all_events.where(
(fn.Lower(Event.short_desc).contains(in_desc.lower())) |
(fn.Lower(Event.long_desc).contains(in_desc.lower()))
(fn.Lower(Event.short_desc).contains(in_desc.lower()))
| (fn.Lower(Event.long_desc).contains(in_desc.lower()))
)
if active is not None:
all_events = all_events.where(Event.active == active)
total_count = all_events.count() if not csv else 0
all_events = all_events.limit(max(0, min(limit, 500)))
if csv:
data_list = [['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active']]
data_list = [
["id", "name", "short_desc", "long_desc", "url", "thumbnail", "active"]
]
for line in all_events:
data_list.append(
[
line.id, line.name, line.short_desc, line.long_desc, line.url, line.thumbnail, line.active
line.id,
line.name,
line.short_desc,
line.long_desc,
line.url,
line.thumbnail,
line.active,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_events.count(), 'events': []}
return_val = {"count": total_count, "events": []}
for x in all_events:
return_val['events'].append(model_to_dict(x))
return_val["events"].append(model_to_dict(x))
return return_val
@router.get('/{event_id}')
@router.get("/{event_id}")
async def v1_events_get_one(event_id, csv: Optional[bool] = False):
try:
this_event = Event.get_by_id(event_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
if csv:
data_list = [
['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active'],
[this_event.id, this_event.name, this_event.short_desc, this_event.long_desc, this_event.url,
this_event.thumbnail, this_event.active]
["id", "name", "short_desc", "long_desc", "url", "thumbnail", "active"],
[
this_event.id,
this_event.name,
this_event.short_desc,
this_event.long_desc,
this_event.url,
this_event.thumbnail,
this_event.active,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_event)
return return_val
@router.post('')
@router.post("")
async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post events. This event has been logged.'
detail="You are not authorized to post events. This event has been logged.",
)
dupe_event = Event.get_or_none(Event.name == event.name)
if dupe_event:
raise HTTPException(status_code=400, detail=f'There is already an event using {event.name}')
raise HTTPException(
status_code=400, detail=f"There is already an event using {event.name}"
)
this_event = Event(
name=event.name,
@ -100,7 +123,7 @@ async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme))
long_desc=event.long_desc,
url=event.url,
thumbnail=event.thumbnail,
active=event.active
active=event.active,
)
saved = this_event.save()
@ -110,25 +133,33 @@ async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme))
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
detail="Well slap my ass and call me a teapot; I could not save that cardset",
)
@router.patch('/{event_id}')
@router.patch("/{event_id}")
async def v1_events_patch(
event_id, name: Optional[str] = None, short_desc: Optional[str] = None, long_desc: Optional[str] = None,
url: Optional[str] = None, thumbnail: Optional[str] = None, active: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
event_id,
name: Optional[str] = None,
short_desc: Optional[str] = None,
long_desc: Optional[str] = None,
url: Optional[str] = None,
thumbnail: Optional[str] = None,
active: Optional[bool] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch events. This event has been logged.'
detail="You are not authorized to patch events. This event has been logged.",
)
try:
this_event = Event.get_by_id(event_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
if name is not None:
this_event.name = name
@ -149,26 +180,30 @@ async def v1_events_patch(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
detail="Well slap my ass and call me a teapot; I could not save that event",
)
@router.delete('/{event_id}')
@router.delete("/{event_id}")
async def v1_events_delete(event_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete events. This event has been logged.'
detail="You are not authorized to delete events. This event has been logged.",
)
try:
this_event = Event.get_by_id(event_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
raise HTTPException(
status_code=404, detail=f"No event found with id {event_id}"
)
count = this_event.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Event {event_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Event {event_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Event {event_id} was not deleted')
raise HTTPException(status_code=500, detail=f"Event {event_id} was not deleted")

View File

@ -8,10 +8,7 @@ from ..db_engine import GameRewards, model_to_dict, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/gamerewards',
tags=['gamerewards']
)
router = APIRouter(prefix="/api/v2/gamerewards", tags=["gamerewards"])
class GameRewardModel(pydantic.BaseModel):
@ -21,10 +18,15 @@ class GameRewardModel(pydantic.BaseModel):
money: Optional[int] = None
@router.get('')
@router.get("")
async def v1_gamerewards_get(
name: Optional[str] = None, pack_type_id: Optional[int] = None, player_id: Optional[int] = None,
money: Optional[int] = None, csv: Optional[bool] = None):
name: Optional[str] = None,
pack_type_id: Optional[int] = None,
player_id: Optional[int] = None,
money: Optional[int] = None,
csv: Optional[bool] = None,
limit: int = 100,
):
all_rewards = GameRewards.select().order_by(GameRewards.id)
# if all_rewards.count() == 0:
@ -40,61 +42,77 @@ async def v1_gamerewards_get(
if money is not None:
all_rewards = all_rewards.where(GameRewards.money == money)
limit = max(0, min(limit, 500))
total_count = all_rewards.count() if not csv else 0
all_rewards = all_rewards.limit(limit)
if csv:
data_list = [['id', 'pack_type_id', 'player_id', 'money']]
data_list = [["id", "pack_type_id", "player_id", "money"]]
for line in all_rewards:
data_list.append([
line.id, line.pack_type_id if line.pack_type else None, line.player_id if line.player else None,
line.money
])
data_list.append(
[
line.id,
line.pack_type_id if line.pack_type else None,
line.player_id if line.player else None,
line.money,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_rewards.count(), 'gamerewards': []}
return_val = {"count": total_count, "gamerewards": []}
for x in all_rewards:
return_val['gamerewards'].append(model_to_dict(x))
return_val["gamerewards"].append(model_to_dict(x))
return return_val
@router.get('/{gameaward_id}')
@router.get("/{gameaward_id}")
async def v1_gamerewards_get_one(gamereward_id, csv: Optional[bool] = None):
try:
this_game_reward = GameRewards.get_by_id(gamereward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No game reward found with id {gamereward_id}')
raise HTTPException(
status_code=404, detail=f"No game reward found with id {gamereward_id}"
)
if csv:
data_list = [
['id', 'pack_type_id', 'player_id', 'money'],
[this_game_reward.id, this_game_reward.pack_type_id if this_game_reward.pack_type else None,
this_game_reward.player_id if this_game_reward.player else None, this_game_reward.money]
["id", "pack_type_id", "player_id", "money"],
[
this_game_reward.id,
this_game_reward.pack_type_id if this_game_reward.pack_type else None,
this_game_reward.player_id if this_game_reward.player else None,
this_game_reward.money,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_game_reward)
return return_val
@router.post('')
async def v1_gamerewards_post(game_reward: GameRewardModel, token: str = Depends(oauth2_scheme)):
@router.post("")
async def v1_gamerewards_post(
game_reward: GameRewardModel, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post game rewards. This event has been logged.'
detail="You are not authorized to post game rewards. This event has been logged.",
)
this_award = GameRewards(
name=game_reward.name,
pack_type_id=game_reward.pack_type_id,
player_id=game_reward.player_id,
money=game_reward.money
money=game_reward.money,
)
saved = this_award.save()
@ -104,24 +122,31 @@ async def v1_gamerewards_post(game_reward: GameRewardModel, token: str = Depends
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
detail="Well slap my ass and call me a teapot; I could not save that roster",
)
@router.patch('/{game_reward_id}')
@router.patch("/{game_reward_id}")
async def v1_gamerewards_patch(
game_reward_id: int, name: Optional[str] = None, pack_type_id: Optional[int] = None,
player_id: Optional[int] = None, money: Optional[int] = None, token: str = Depends(oauth2_scheme)):
game_reward_id: int,
name: Optional[str] = None,
pack_type_id: Optional[int] = None,
player_id: Optional[int] = None,
money: Optional[int] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch gamerewards. This event has been logged.'
detail="You are not authorized to patch gamerewards. This event has been logged.",
)
try:
this_game_reward = GameRewards.get_by_id(game_reward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No game reward found with id {game_reward_id}')
raise HTTPException(
status_code=404, detail=f"No game reward found with id {game_reward_id}"
)
if name is not None:
this_game_reward.name = name
@ -147,27 +172,32 @@ async def v1_gamerewards_patch(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
detail="Well slap my ass and call me a teapot; I could not save that rarity",
)
@router.delete('/{gamereward_id}')
@router.delete("/{gamereward_id}")
async def v1_gamerewards_delete(gamereward_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete awards. This event has been logged.'
detail="You are not authorized to delete awards. This event has been logged.",
)
try:
this_award = GameRewards.get_by_id(gamereward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No award found with id {gamereward_id}')
raise HTTPException(
status_code=404, detail=f"No award found with id {gamereward_id}"
)
count = this_award.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Game Reward {gamereward_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Game Reward {gamereward_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Game Reward {gamereward_id} was not deleted')
raise HTTPException(
status_code=500, detail=f"Game Reward {gamereward_id} was not deleted"
)

View File

@ -30,6 +30,7 @@ async def v1_gauntletreward_get(
reward_id: list = Query(default=None),
win_num: Optional[int] = None,
loss_max: Optional[int] = None,
limit: int = 100,
):
all_rewards = GauntletReward.select().order_by(GauntletReward.id)
@ -46,7 +47,11 @@ async def v1_gauntletreward_get(
all_rewards = all_rewards.order_by(-GauntletReward.loss_max, GauntletReward.win_num)
return_val = {"count": all_rewards.count(), "rewards": []}
limit = max(0, min(limit, 500))
total_count = all_rewards.count()
all_rewards = all_rewards.limit(limit)
return_val = {"count": total_count, "rewards": []}
for x in all_rewards:
return_val["rewards"].append(model_to_dict(x))

View File

@ -8,10 +8,7 @@ from ..db_engine import GauntletRun, model_to_dict, DatabaseError, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/gauntletruns',
tags=['notifs']
)
router = APIRouter(prefix="/api/v2/gauntletruns", tags=["notifs"])
class GauntletRunModel(pydantic.BaseModel):
@ -24,13 +21,25 @@ class GauntletRunModel(pydantic.BaseModel):
ended: Optional[int] = None
@router.get('')
@router.get("")
async def get_gauntletruns(
team_id: list = Query(default=None), wins: Optional[int] = None, wins_min: Optional[int] = None,
wins_max: Optional[int] = None, losses: Optional[int] = None, losses_min: Optional[int] = None,
losses_max: Optional[int] = None, gsheet: Optional[str] = None, created_after: Optional[int] = None,
created_before: Optional[int] = None, ended_after: Optional[int] = None, ended_before: Optional[int] = None,
is_active: Optional[bool] = None, gauntlet_id: list = Query(default=None), season: list = Query(default=None)):
team_id: list = Query(default=None),
wins: Optional[int] = None,
wins_min: Optional[int] = None,
wins_max: Optional[int] = None,
losses: Optional[int] = None,
losses_min: Optional[int] = None,
losses_max: Optional[int] = None,
gsheet: Optional[str] = None,
created_after: Optional[int] = None,
created_before: Optional[int] = None,
ended_after: Optional[int] = None,
ended_before: Optional[int] = None,
is_active: Optional[bool] = None,
gauntlet_id: list = Query(default=None),
season: list = Query(default=None),
limit: int = 100,
):
all_gauntlets = GauntletRun.select().order_by(GauntletRun.id)
if team_id is not None:
@ -73,39 +82,48 @@ async def get_gauntletruns(
if season is not None:
all_gauntlets = all_gauntlets.where(GauntletRun.team.season << season)
return_val = {'count': all_gauntlets.count(), 'runs': []}
for x in all_gauntlets:
return_val['runs'].append(model_to_dict(x))
limit = max(0, min(limit, 500))
return_val = {"count": all_gauntlets.count(), "runs": []}
for x in all_gauntlets.limit(limit):
return_val["runs"].append(model_to_dict(x))
return return_val
@router.get('/{gauntletrun_id}')
@router.get("/{gauntletrun_id}")
async def get_one_gauntletrun(gauntletrun_id):
try:
this_gauntlet = GauntletRun.get_by_id(gauntletrun_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No gauntlet found with id {gauntletrun_id}')
raise HTTPException(
status_code=404, detail=f"No gauntlet found with id {gauntletrun_id}"
)
return_val = model_to_dict(this_gauntlet)
return return_val
@router.patch('/{gauntletrun_id}')
@router.patch("/{gauntletrun_id}")
async def patch_gauntletrun(
gauntletrun_id, team_id: Optional[int] = None, wins: Optional[int] = None, losses: Optional[int] = None,
gsheet: Optional[str] = None, created: Optional[bool] = None, ended: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
gauntletrun_id,
team_id: Optional[int] = None,
wins: Optional[int] = None,
losses: Optional[int] = None,
gsheet: Optional[str] = None,
created: Optional[bool] = None,
ended: Optional[bool] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch gauntlet runs. This event has been logged.'
detail="You are not authorized to patch gauntlet runs. This event has been logged.",
)
this_run = GauntletRun.get_or_none(GauntletRun.id == gauntletrun_id)
if this_run is None:
raise KeyError(f'Gauntlet Run ID {gauntletrun_id} not found')
raise KeyError(f"Gauntlet Run ID {gauntletrun_id} not found")
if team_id is not None:
this_run.team_id = team_id
@ -130,41 +148,42 @@ async def patch_gauntletrun(
r_curr = model_to_dict(this_run)
return r_curr
else:
raise DatabaseError(f'Unable to patch gauntlet run {gauntletrun_id}')
raise DatabaseError(f"Unable to patch gauntlet run {gauntletrun_id}")
@router.post('')
async def post_gauntletrun(gauntletrun: GauntletRunModel, token: str = Depends(oauth2_scheme)):
@router.post("")
async def post_gauntletrun(
gauntletrun: GauntletRunModel, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post gauntlets. This event has been logged.'
detail="You are not authorized to post gauntlets. This event has been logged.",
)
run_data = gauntletrun.dict()
# Convert milliseconds timestamps to datetime for PostgreSQL
if run_data.get('created'):
run_data['created'] = datetime.fromtimestamp(run_data['created'] / 1000)
if run_data.get("created"):
run_data["created"] = datetime.fromtimestamp(run_data["created"] / 1000)
else:
run_data['created'] = datetime.now()
if run_data.get('ended'):
run_data['ended'] = datetime.fromtimestamp(run_data['ended'] / 1000)
run_data["created"] = datetime.now()
if run_data.get("ended"):
run_data["ended"] = datetime.fromtimestamp(run_data["ended"] / 1000)
else:
run_data['ended'] = None
run_data["ended"] = None
this_run = GauntletRun(**run_data)
if this_run.save():
r_run = model_to_dict(this_run)
return r_run
else:
raise DatabaseError(f'Unable to post gauntlet run')
raise DatabaseError("Unable to post gauntlet run")
@router.delete('/{gauntletrun_id}')
@router.delete("/{gauntletrun_id}")
async def delete_gauntletrun(gauntletrun_id):
if GauntletRun.delete_by_id(gauntletrun_id) == 1:
return f'Deleted gauntlet run ID {gauntletrun_id}'
raise DatabaseError(f'Unable to delete gauntlet run {gauntletrun_id}')
return f"Deleted gauntlet run ID {gauntletrun_id}"
raise DatabaseError(f"Unable to delete gauntlet run {gauntletrun_id}")

View File

@ -73,6 +73,7 @@ async def get_players(
key_mlbam: list = Query(default=None),
offense_col: list = Query(default=None),
csv: Optional[bool] = False,
limit: int = 100,
):
all_players = MlbPlayer.select().order_by(MlbPlayer.id)
@ -101,12 +102,15 @@ async def get_players(
if offense_col is not None:
all_players = all_players.where(MlbPlayer.offense_col << offense_col)
total_count = all_players.count() if not csv else 0
all_players = all_players.limit(max(0, min(limit, 500)))
if csv:
return_val = query_to_csv(all_players)
return Response(content=return_val, media_type="text/csv")
return_val = {
"count": all_players.count(),
"count": total_count,
"players": [model_to_dict(x) for x in all_players],
}
return return_val
@ -222,7 +226,7 @@ async def post_one_player(player: PlayerModel, token: str = Depends(oauth2_schem
| (MlbPlayer.key_bbref == player.key_bbref)
)
if dupes.count() > 0:
logging.info(f"POST /mlbplayers/one - dupes found:")
logging.info("POST /mlbplayers/one - dupes found:")
for x in dupes:
logging.info(f"{x}")
raise HTTPException(

View File

@ -9,10 +9,7 @@ from ..db_engine import Notification, model_to_dict, fn, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/notifs',
tags=['notifs']
)
router = APIRouter(prefix="/api/v2/notifs", tags=["notifs"])
class NotifModel(pydantic.BaseModel):
@ -21,19 +18,30 @@ class NotifModel(pydantic.BaseModel):
desc: Optional[str] = None
field_name: str
message: str
about: Optional[str] = 'blank'
about: Optional[str] = "blank"
ack: Optional[bool] = False
@router.get('')
@router.get("")
async def get_notifs(
created_after: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None,
field_name: Optional[str] = None, in_desc: Optional[str] = None, about: Optional[str] = None,
ack: Optional[bool] = None, csv: Optional[bool] = None):
created_after: Optional[int] = None,
title: Optional[str] = None,
desc: Optional[str] = None,
field_name: Optional[str] = None,
in_desc: Optional[str] = None,
about: Optional[str] = None,
ack: Optional[bool] = None,
csv: Optional[bool] = None,
limit: Optional[int] = 100,
):
if limit is not None:
limit = max(0, min(limit, 500))
all_notif = Notification.select().order_by(Notification.id)
if all_notif.count() == 0:
raise HTTPException(status_code=404, detail=f'There are no notifications to filter')
raise HTTPException(
status_code=404, detail="There are no notifications to filter"
)
if created_after is not None:
# Convert milliseconds timestamp to datetime for PostgreSQL comparison
@ -46,62 +54,90 @@ async def get_notifs(
if field_name is not None:
all_notif = all_notif.where(Notification.field_name == field_name)
if in_desc is not None:
all_notif = all_notif.where(fn.Lower(Notification.desc).contains(in_desc.lower()))
all_notif = all_notif.where(
fn.Lower(Notification.desc).contains(in_desc.lower())
)
if about is not None:
all_notif = all_notif.where(Notification.about == about)
if ack is not None:
all_notif = all_notif.where(Notification.ack == ack)
total_count = all_notif.count()
if limit is not None:
all_notif = all_notif.limit(limit)
if csv:
data_list = [['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack']]
data_list = [
["id", "created", "title", "desc", "field_name", "message", "about", "ack"]
]
for line in all_notif:
data_list.append([
line.id, line.created, line.title, line.desc, line.field_name, line.message, line.about, line.ack
])
data_list.append(
[
line.id,
line.created,
line.title,
line.desc,
line.field_name,
line.message,
line.about,
line.ack,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_notif.count(), 'notifs': []}
return_val = {"count": total_count, "notifs": []}
for x in all_notif:
return_val['notifs'].append(model_to_dict(x))
return_val["notifs"].append(model_to_dict(x))
return return_val
@router.get('/{notif_id}')
@router.get("/{notif_id}")
async def get_one_notif(notif_id, csv: Optional[bool] = None):
try:
this_notif = Notification.get_by_id(notif_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
raise HTTPException(
status_code=404, detail=f"No notification found with id {notif_id}"
)
if csv:
data_list = [
['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack'],
[this_notif.id, this_notif.created, this_notif.title, this_notif.desc, this_notif.field_name,
this_notif.message, this_notif.about, this_notif.ack]
["id", "created", "title", "desc", "field_name", "message", "about", "ack"],
[
this_notif.id,
this_notif.created,
this_notif.title,
this_notif.desc,
this_notif.field_name,
this_notif.message,
this_notif.about,
this_notif.ack,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_notif)
return return_val
@router.post('')
@router.post("")
async def post_notif(notif: NotifModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post notifications. This event has been logged.'
detail="You are not authorized to post notifications. This event has been logged.",
)
logging.info(f'new notif: {notif}')
logging.info(f"new notif: {notif}")
this_notif = Notification(
created=datetime.fromtimestamp(notif.created / 1000),
title=notif.title,
@ -118,25 +154,34 @@ async def post_notif(notif: NotifModel, token: str = Depends(oauth2_scheme)):
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that notification'
detail="Well slap my ass and call me a teapot; I could not save that notification",
)
@router.patch('/{notif_id}')
@router.patch("/{notif_id}")
async def patch_notif(
notif_id, created: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None,
field_name: Optional[str] = None, message: Optional[str] = None, about: Optional[str] = None,
ack: Optional[bool] = None, token: str = Depends(oauth2_scheme)):
notif_id,
created: Optional[int] = None,
title: Optional[str] = None,
desc: Optional[str] = None,
field_name: Optional[str] = None,
message: Optional[str] = None,
about: Optional[str] = None,
ack: Optional[bool] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch notifications. This event has been logged.'
detail="You are not authorized to patch notifications. This event has been logged.",
)
try:
this_notif = Notification.get_by_id(notif_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
raise HTTPException(
status_code=404, detail=f"No notification found with id {notif_id}"
)
if title is not None:
this_notif.title = title
@ -159,26 +204,32 @@ async def patch_notif(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
detail="Well slap my ass and call me a teapot; I could not save that rarity",
)
@router.delete('/{notif_id}')
@router.delete("/{notif_id}")
async def delete_notif(notif_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete notifications. This event has been logged.'
detail="You are not authorized to delete notifications. This event has been logged.",
)
try:
this_notif = Notification.get_by_id(notif_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
raise HTTPException(
status_code=404, detail=f"No notification found with id {notif_id}"
)
count = this_notif.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Notification {notif_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Notification {notif_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Notification {notif_id} was not deleted')
raise HTTPException(
status_code=500, detail=f"Notification {notif_id} was not deleted"
)

View File

@ -143,6 +143,7 @@ async def get_card_ratings(
short_output: bool = False,
csv: bool = False,
cardset_id: list = Query(default=None),
limit: int = 100,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
@ -168,13 +169,16 @@ async def get_card_ratings(
)
all_ratings = all_ratings.where(PitchingCardRatings.pitchingcard << set_cards)
total_count = all_ratings.count() if not csv else 0
all_ratings = all_ratings.limit(max(0, min(limit, 500)))
if csv:
return_val = query_to_csv(all_ratings)
return Response(content=return_val, media_type="text/csv")
else:
return_val = {
"count": all_ratings.count(),
"count": total_count,
"ratings": [
model_to_dict(x, recurse=not short_output) for x in all_ratings
],
@ -231,10 +235,10 @@ def get_scouting_dfs(cardset_id: list = None):
series_list = [
pd.Series(
dict([(x.player.player_id, x.range) for x in positions]), name=f"Range P"
dict([(x.player.player_id, x.range) for x in positions]), name="Range P"
),
pd.Series(
dict([(x.player.player_id, x.error) for x in positions]), name=f"Error P"
dict([(x.player.player_id, x.error) for x in positions]), name="Error P"
),
]
logging.debug(f"series_list: {series_list}")
@ -274,7 +278,7 @@ async def post_calc_scouting(token: str = Depends(oauth2_scheme)):
status_code=401, detail="You are not authorized to calculate card ratings."
)
logging.warning(f"Re-calculating pitching ratings\n\n")
logging.warning("Re-calculating pitching ratings\n\n")
output = get_scouting_dfs()
first = ["player_id", "player_name", "cardset_name", "rarity", "hand", "variant"]
@ -310,7 +314,7 @@ async def post_calc_basic(token: str = Depends(oauth2_scheme)):
status_code=401, detail="You are not authorized to calculate basic ratings."
)
logging.warning(f"Re-calculating basic pitching ratings\n\n")
logging.warning("Re-calculating basic pitching ratings\n\n")
raw_data = get_scouting_dfs()
logging.debug(f"output: {raw_data}")

View File

@ -5,14 +5,19 @@ import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, PitchingStat, model_to_dict, Card, Player, Current, DoesNotExist
from ..db_engine import (
db,
PitchingStat,
model_to_dict,
Card,
Player,
Current,
DoesNotExist,
)
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/pitstats',
tags=['pitstats']
)
router = APIRouter(prefix="/api/v2/pitstats", tags=["pitstats"])
class PitStat(pydantic.BaseModel):
@ -40,7 +45,7 @@ class PitStat(pydantic.BaseModel):
bsv: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000)
game_id: int
@ -48,13 +53,23 @@ class PitchingStatModel(pydantic.BaseModel):
stats: List[PitStat]
@router.get('')
@router.get("")
async def get_pit_stats(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None,
csv: bool = None):
card_id: int = None,
player_id: int = None,
team_id: int = None,
vs_team_id: int = None,
week: int = None,
season: int = None,
week_start: int = None,
week_end: int = None,
created: int = None,
gs: bool = None,
csv: bool = None,
limit: Optional[int] = 100,
):
all_stats = PitchingStat.select().join(Card).join(Player).order_by(PitchingStat.id)
logging.debug(f'pit query:\n\n{all_stats}')
logging.debug(f"pit query:\n\n{all_stats}")
if season is not None:
all_stats = all_stats.where(PitchingStat.season == season)
@ -83,43 +98,100 @@ async def get_pit_stats(
if gs is not None:
all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0)
total_count = all_stats.count() if not csv else 0
all_stats = all_stats.limit(max(0, min(limit, 500)))
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No pitching stats found')
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'ip', 'hit', 'run', 'erun', 'so', 'bb', 'hbp',
'wp', 'balk', 'hr', 'ir', 'irs', 'gs', 'win', 'loss', 'hold', 'sv', 'bsv', 'week', 'season',
'created', 'game_id', 'roster_num']]
data_list = [
[
"id",
"card_id",
"player_id",
"cardset",
"team",
"vs_team",
"ip",
"hit",
"run",
"erun",
"so",
"bb",
"hbp",
"wp",
"balk",
"hr",
"ir",
"irs",
"gs",
"win",
"loss",
"hold",
"sv",
"bsv",
"week",
"season",
"created",
"game_id",
"roster_num",
]
]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev,
line.vs_team.abbrev, line.ip, line.hit,
line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs,
line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created,
line.game_id, line.roster_num
line.id,
line.card.id,
line.card.player.player_id,
line.card.player.cardset.name,
line.team.abbrev,
line.vs_team.abbrev,
line.ip,
line.hit,
line.run,
line.erun,
line.so,
line.bb,
line.hbp,
line.wp,
line.balk,
line.hr,
line.ir,
line.irs,
line.gs,
line.win,
line.loss,
line.hold,
line.sv,
line.bsv,
line.week,
line.season,
line.created,
line.game_id,
line.roster_num,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_stats.count(), 'stats': []}
return_val = {"count": total_count, "stats": []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
return_val["stats"].append(model_to_dict(x, recurse=False))
return return_val
@router.post('')
@router.post("")
async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
detail="You are not authorized to post stats. This event has been logged.",
)
new_stats = []
@ -149,33 +221,37 @@ async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_sch
bsv=x.bsv,
week=x.week,
season=x.season,
created=datetime.fromtimestamp(x.created / 1000) if x.created else datetime.now(),
game_id=x.game_id
created=datetime.fromtimestamp(x.created / 1000)
if x.created
else datetime.now(),
game_id=x.game_id,
)
new_stats.append(this_stat)
with db.atomic():
PitchingStat.bulk_create(new_stats, batch_size=15)
raise HTTPException(status_code=200, detail=f'{len(new_stats)} pitching lines have been added')
raise HTTPException(
status_code=200, detail=f"{len(new_stats)} pitching lines have been added"
)
@router.delete('/{stat_id}')
@router.delete("/{stat_id}")
async def delete_pitstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
detail="You are not authorized to delete stats. This event has been logged.",
)
try:
this_stat = PitchingStat.get_by_id(stat_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
raise HTTPException(status_code=404, detail=f"No stat found with id {stat_id}")
count = this_stat.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
raise HTTPException(status_code=200, detail=f"Stat {stat_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')
raise HTTPException(status_code=500, detail=f"Stat {stat_id} was not deleted")

View File

@ -21,14 +21,15 @@ _NEXT_THRESHOLD_ATTR = {
}
def _build_card_state_response(state) -> dict:
def _build_card_state_response(state, player_name=None) -> dict:
"""Serialise a RefractorCardState into the standard API response shape.
Produces a flat dict with player_id and team_id as plain integers,
a nested 'track' dict with all threshold fields, and a computed
'next_threshold' field:
- For tiers 0-3: the threshold value for the tier immediately above.
- For tier 4 (fully evolved): None.
a nested 'track' dict with all threshold fields, and computed fields:
- 'next_threshold': threshold for the tier immediately above (None when fully evolved).
- 'progress_pct': current_value / next_threshold * 100, rounded to 1 decimal
(None when fully evolved or next_threshold is zero).
- 'player_name': included when passed (e.g. from a list join); omitted otherwise.
Uses model_to_dict(recurse=False) internally so FK fields are returned
as IDs rather than nested objects, then promotes the needed IDs up to
@ -40,7 +41,11 @@ def _build_card_state_response(state) -> dict:
next_attr = _NEXT_THRESHOLD_ATTR.get(state.current_tier)
next_threshold = getattr(track, next_attr) if next_attr else None
return {
progress_pct = None
if next_threshold is not None and next_threshold > 0:
progress_pct = round((state.current_value / next_threshold) * 100, 1)
result = {
"player_id": state.player_id,
"team_id": state.team_id,
"current_tier": state.current_tier,
@ -51,8 +56,14 @@ def _build_card_state_response(state) -> dict:
),
"track": track_dict,
"next_threshold": next_threshold,
"progress_pct": progress_pct,
}
if player_name is not None:
result["player_name"] = player_name
return result
@router.get("/tracks")
async def list_tracks(
@ -89,6 +100,118 @@ async def get_track(track_id: int, token: str = Depends(oauth2_scheme)):
return model_to_dict(track, recurse=False)
@router.get("/cards")
async def list_card_states(
team_id: int = Query(...),
card_type: Optional[str] = Query(default=None),
tier: Optional[int] = Query(default=None, ge=0, le=4),
season: Optional[int] = Query(default=None),
progress: Optional[str] = Query(default=None),
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0),
token: str = Depends(oauth2_scheme),
):
"""List RefractorCardState rows for a team, with optional filters and pagination.
Required:
team_id -- filter to this team's cards; returns empty list if team has no states
Optional filters:
card_type -- one of 'batter', 'sp', 'rp'; filters by RefractorTrack.card_type
tier -- filter by current_tier (0-4)
season -- filter to players who have batting or pitching season stats in that
season (EXISTS subquery against batting/pitching_season_stats)
progress -- 'close' = only cards within 80% of their next tier threshold;
fully evolved cards are always excluded from this filter
Pagination:
limit -- page size (1-100, default 10)
offset -- items to skip (default 0)
Response: {"count": N, "items": [...]}
count is the total matching rows before limit/offset.
Each item includes player_name and progress_pct in addition to the
standard single-card response fields.
Sort order: current_tier DESC, current_value DESC.
"""
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import (
RefractorCardState,
RefractorTrack,
Player,
BattingSeasonStats,
PitchingSeasonStats,
fn,
Case,
JOIN,
)
query = (
RefractorCardState.select(RefractorCardState, RefractorTrack, Player)
.join(RefractorTrack)
.switch(RefractorCardState)
.join(
Player, JOIN.LEFT_OUTER, on=(RefractorCardState.player == Player.player_id)
)
.where(RefractorCardState.team == team_id)
.order_by(
RefractorCardState.current_tier.desc(),
RefractorCardState.current_value.desc(),
)
)
if card_type is not None:
query = query.where(RefractorTrack.card_type == card_type)
if tier is not None:
query = query.where(RefractorCardState.current_tier == tier)
if season is not None:
batter_exists = BattingSeasonStats.select().where(
(BattingSeasonStats.player == RefractorCardState.player)
& (BattingSeasonStats.team == RefractorCardState.team)
& (BattingSeasonStats.season == season)
)
pitcher_exists = PitchingSeasonStats.select().where(
(PitchingSeasonStats.player == RefractorCardState.player)
& (PitchingSeasonStats.team == RefractorCardState.team)
& (PitchingSeasonStats.season == season)
)
query = query.where(fn.EXISTS(batter_exists) | fn.EXISTS(pitcher_exists))
if progress == "close":
next_threshold_expr = Case(
RefractorCardState.current_tier,
(
(0, RefractorTrack.t1_threshold),
(1, RefractorTrack.t2_threshold),
(2, RefractorTrack.t3_threshold),
(3, RefractorTrack.t4_threshold),
),
None,
)
query = query.where(
(RefractorCardState.fully_evolved == False) # noqa: E712
& (RefractorCardState.current_value >= next_threshold_expr * 0.8)
)
total = query.count()
items = []
for state in query.offset(offset).limit(limit):
player_name = None
try:
player_name = state.player.p_name
except Exception:
pass
items.append(_build_card_state_response(state, player_name=player_name))
return {"count": total, "items": items}
@router.get("/cards/{card_id}")
async def get_card_state(card_id: int, token: str = Depends(oauth2_scheme)):
"""Return the RefractorCardState for a card identified by its Card.id.
@ -175,7 +298,7 @@ async def evaluate_game(game_id: int, token: str = Depends(oauth2_scheme)):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
from ..db_engine import RefractorCardState, RefractorTrack, Player, StratPlay
from ..db_engine import RefractorCardState, Player, StratPlay
from ..services.refractor_evaluator import evaluate_card
plays = list(StratPlay.select().where(StratPlay.game == game_id))

View File

@ -8,10 +8,7 @@ from ..db_engine import Result, model_to_dict, Team, DataError, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/results',
tags=['results']
)
router = APIRouter(prefix="/api/v2/results", tags=["results"])
class ResultModel(pydantic.BaseModel):
@ -31,15 +28,29 @@ class ResultModel(pydantic.BaseModel):
game_type: str
@router.get('')
@router.get("")
async def get_results(
away_team_id: Optional[int] = None, home_team_id: Optional[int] = None, team_one_id: Optional[int] = None,
team_two_id: Optional[int] = None, away_score_min: Optional[int] = None, away_score_max: Optional[int] = None,
home_score_min: Optional[int] = None, home_score_max: Optional[int] = None, bothscore_min: Optional[int] = None,
bothscore_max: Optional[int] = None, season: Optional[int] = None, week: Optional[int] = None,
week_start: Optional[int] = None, week_end: Optional[int] = None, ranked: Optional[bool] = None,
short_game: Optional[bool] = None, game_type: Optional[str] = None, vs_ai: Optional[bool] = None,
csv: Optional[bool] = None):
away_team_id: Optional[int] = None,
home_team_id: Optional[int] = None,
team_one_id: Optional[int] = None,
team_two_id: Optional[int] = None,
away_score_min: Optional[int] = None,
away_score_max: Optional[int] = None,
home_score_min: Optional[int] = None,
home_score_max: Optional[int] = None,
bothscore_min: Optional[int] = None,
bothscore_max: Optional[int] = None,
season: Optional[int] = None,
week: Optional[int] = None,
week_start: Optional[int] = None,
week_end: Optional[int] = None,
ranked: Optional[bool] = None,
short_game: Optional[bool] = None,
game_type: Optional[str] = None,
vs_ai: Optional[bool] = None,
csv: Optional[bool] = None,
limit: int = 100,
):
all_results = Result.select()
# if all_results.count() == 0:
@ -51,28 +62,40 @@ async def get_results(
this_team = Team.get_by_id(away_team_id)
all_results = all_results.where(Result.away_team == this_team)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No team found with id {away_team_id}')
raise HTTPException(
status_code=404, detail=f"No team found with id {away_team_id}"
)
if home_team_id is not None:
try:
this_team = Team.get_by_id(home_team_id)
all_results = all_results.where(Result.home_team == this_team)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No team found with id {home_team_id}')
raise HTTPException(
status_code=404, detail=f"No team found with id {home_team_id}"
)
if team_one_id is not None:
try:
this_team = Team.get_by_id(team_one_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
all_results = all_results.where(
(Result.home_team == this_team) | (Result.away_team == this_team)
)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No team found with id {team_one_id}')
raise HTTPException(
status_code=404, detail=f"No team found with id {team_one_id}"
)
if team_two_id is not None:
try:
this_team = Team.get_by_id(team_two_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
all_results = all_results.where(
(Result.home_team == this_team) | (Result.away_team == this_team)
)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No team found with id {team_two_id}')
raise HTTPException(
status_code=404, detail=f"No team found with id {team_two_id}"
)
if away_score_min is not None:
all_results = all_results.where(Result.away_score >= away_score_min)
@ -87,10 +110,14 @@ async def get_results(
all_results = all_results.where(Result.home_score <= home_score_max)
if bothscore_min is not None:
all_results = all_results.where((Result.home_score >= bothscore_min) & (Result.away_score >= bothscore_min))
all_results = all_results.where(
(Result.home_score >= bothscore_min) & (Result.away_score >= bothscore_min)
)
if bothscore_max is not None:
all_results = all_results.where((Result.home_score <= bothscore_max) & (Result.away_score <= bothscore_max))
all_results = all_results.where(
(Result.home_score <= bothscore_max) & (Result.away_score <= bothscore_max)
)
if season is not None:
all_results = all_results.where(Result.season == season)
@ -114,6 +141,9 @@ async def get_results(
all_results = all_results.where(Result.game_type == game_type)
all_results = all_results.order_by(Result.id)
limit = max(0, min(limit, 500))
total_count = all_results.count() if not csv else 0
all_results = all_results.limit(limit)
# Not functional
# if vs_ai is not None:
# AwayTeam = Team.alias()
@ -134,60 +164,115 @@ async def get_results(
# logging.info(f'Result Query:\n\n{all_results}')
if csv:
data_list = [['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv',
'game_type', 'season', 'week', 'short_game', 'ranked']]
data_list = [
[
"id",
"away_abbrev",
"home_abbrev",
"away_score",
"home_score",
"away_tv",
"home_tv",
"game_type",
"season",
"week",
"short_game",
"ranked",
]
]
for line in all_results:
data_list.append([
line.id, line.away_team.abbrev, line.home_team.abbrev, line.away_score, line.home_score,
line.away_team_value, line.home_team_value, line.game_type if line.game_type else 'minor-league',
line.season, line.week, line.short_game, line.ranked
])
data_list.append(
[
line.id,
line.away_team.abbrev,
line.home_team.abbrev,
line.away_score,
line.home_score,
line.away_team_value,
line.home_team_value,
line.game_type if line.game_type else "minor-league",
line.season,
line.week,
line.short_game,
line.ranked,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_results.count(), 'results': []}
return_val = {"count": total_count, "results": []}
for x in all_results:
return_val['results'].append(model_to_dict(x))
return_val["results"].append(model_to_dict(x))
return return_val
@router.get('/{result_id}')
@router.get("/{result_id}")
async def get_one_results(result_id, csv: Optional[bool] = None):
try:
this_result = Result.get_by_id(result_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
raise HTTPException(
status_code=404, detail=f"No result found with id {result_id}"
)
if csv:
data_list = [
['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv', 'game_type',
'season', 'week', 'game_type'],
[this_result.id, this_result.away_team.abbrev, this_result.away_team.abbrev, this_result.away_score,
this_result.home_score, this_result.away_team_value, this_result.home_team_value,
this_result.game_type if this_result.game_type else 'minor-league',
this_result.season, this_result.week, this_result.game_type]
[
"id",
"away_abbrev",
"home_abbrev",
"away_score",
"home_score",
"away_tv",
"home_tv",
"game_type",
"season",
"week",
"game_type",
],
[
this_result.id,
this_result.away_team.abbrev,
this_result.away_team.abbrev,
this_result.away_score,
this_result.home_score,
this_result.away_team_value,
this_result.home_team_value,
this_result.game_type if this_result.game_type else "minor-league",
this_result.season,
this_result.week,
this_result.game_type,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_result)
return return_val
@router.get('/team/{team_id}')
@router.get("/team/{team_id}")
async def get_team_results(
team_id: int, season: Optional[int] = None, week: Optional[int] = None, csv: Optional[bool] = False):
all_results = Result.select().where((Result.away_team_id == team_id) | (Result.home_team_id == team_id)).order_by(Result.id)
team_id: int,
season: Optional[int] = None,
week: Optional[int] = None,
csv: Optional[bool] = False,
):
all_results = (
Result.select()
.where((Result.away_team_id == team_id) | (Result.home_team_id == team_id))
.order_by(Result.id)
)
try:
this_team = Team.get_by_id(team_id)
except DoesNotExist as e:
logging.error(f'Unknown team id {team_id} trying to pull team results')
raise HTTPException(404, f'Team id {team_id} not found')
except DoesNotExist:
logging.error(f"Unknown team id {team_id} trying to pull team results")
raise HTTPException(404, f"Team id {team_id} not found")
if season is not None:
all_results = all_results.where(Result.season == season)
@ -224,31 +309,38 @@ async def get_team_results(
if csv:
data_list = [
['team_id', 'ranked_wins', 'ranked_losses', 'casual_wins', 'casual_losses', 'team_ranking'],
[team_id, r_wins, r_loss, c_wins, c_loss, this_team.ranking]
[
"team_id",
"ranked_wins",
"ranked_losses",
"casual_wins",
"casual_losses",
"team_ranking",
],
[team_id, r_wins, r_loss, c_wins, c_loss, this_team.ranking],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {
'team': model_to_dict(this_team),
'ranked_wins': r_wins,
'ranked_losses': r_loss,
'casual_wins': c_wins,
'casual_losses': c_loss,
"team": model_to_dict(this_team),
"ranked_wins": r_wins,
"ranked_losses": r_loss,
"casual_wins": c_wins,
"casual_losses": c_loss,
}
return return_val
@router.post('')
@router.post("")
async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
detail="You are not authorized to post results. This event has been logged.",
)
this_result = Result(**result.__dict__)
@ -256,24 +348,28 @@ async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
if result.ranked:
if not result.away_team_ranking:
error = f'Ranked game did not include away team ({result.away_team_id}) ranking.'
error = f"Ranked game did not include away team ({result.away_team_id}) ranking."
logging.error(error)
raise DataError(error)
if not result.home_team_ranking:
error = f'Ranked game did not include home team ({result.home_team_id}) ranking.'
error = f"Ranked game did not include home team ({result.home_team_id}) ranking."
logging.error(error)
raise DataError(error)
k_value = 20 if result.short_game else 60
ratio = (result.home_team_ranking - result.away_team_ranking) / 400
exp_score = 1 / (1 + (10 ** ratio))
exp_score = 1 / (1 + (10**ratio))
away_win = True if result.away_score > result.home_score else False
total_delta = k_value * exp_score
high_delta = total_delta * exp_score if exp_score > .5 else total_delta * (1 - exp_score)
high_delta = (
total_delta * exp_score
if exp_score > 0.5
else total_delta * (1 - exp_score)
)
low_delta = total_delta - high_delta
# exp_score > .5 means away team is favorite
if exp_score > .5 and away_win:
if exp_score > 0.5 and away_win:
final_delta = low_delta
away_delta = low_delta * 3
home_delta = -low_delta
@ -281,7 +377,7 @@ async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
final_delta = high_delta
away_delta = high_delta * 3
home_delta = -high_delta
elif exp_score <= .5 and not away_win:
elif exp_score <= 0.5 and not away_win:
final_delta = low_delta
away_delta = -low_delta
home_delta = low_delta * 3
@ -294,18 +390,20 @@ async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
away_delta = 0
home_delta = 0
logging.debug(f'/results ranking deltas\n\nk_value: {k_value} / ratio: {ratio} / '
f'exp_score: {exp_score} / away_win: {away_win} / total_delta: {total_delta} / '
f'high_delta: {high_delta} / low_delta: {low_delta} / final_delta: {final_delta} / ')
logging.debug(
f"/results ranking deltas\n\nk_value: {k_value} / ratio: {ratio} / "
f"exp_score: {exp_score} / away_win: {away_win} / total_delta: {total_delta} / "
f"high_delta: {high_delta} / low_delta: {low_delta} / final_delta: {final_delta} / "
)
away_team = Team.get_by_id(result.away_team_id)
away_team.ranking += away_delta
away_team.save()
logging.info(f'Just updated {away_team.abbrev} ranking to {away_team.ranking}')
logging.info(f"Just updated {away_team.abbrev} ranking to {away_team.ranking}")
home_team = Team.get_by_id(result.home_team_id)
home_team.ranking += home_delta
home_team.save()
logging.info(f'Just updated {home_team.abbrev} ranking to {home_team.ranking}')
logging.info(f"Just updated {home_team.abbrev} ranking to {home_team.ranking}")
if saved == 1:
return_val = model_to_dict(this_result)
@ -313,27 +411,38 @@ async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
detail="Well slap my ass and call me a teapot; I could not save that roster",
)
@router.patch('/{result_id}')
@router.patch("/{result_id}")
async def patch_result(
result_id, away_team_id: Optional[int] = None, home_team_id: Optional[int] = None,
away_score: Optional[int] = None, home_score: Optional[int] = None, away_team_value: Optional[int] = None,
home_team_value: Optional[int] = None, scorecard: Optional[str] = None, week: Optional[int] = None,
season: Optional[int] = None, short_game: Optional[bool] = None, game_type: Optional[str] = None,
token: str = Depends(oauth2_scheme)):
result_id,
away_team_id: Optional[int] = None,
home_team_id: Optional[int] = None,
away_score: Optional[int] = None,
home_score: Optional[int] = None,
away_team_value: Optional[int] = None,
home_team_value: Optional[int] = None,
scorecard: Optional[str] = None,
week: Optional[int] = None,
season: Optional[int] = None,
short_game: Optional[bool] = None,
game_type: Optional[str] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch results. This event has been logged.'
detail="You are not authorized to patch results. This event has been logged.",
)
try:
this_result = Result.get_by_id(result_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
raise HTTPException(
status_code=404, detail=f"No result found with id {result_id}"
)
if away_team_id is not None:
this_result.away_team_id = away_team_id
@ -377,27 +486,32 @@ async def patch_result(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
detail="Well slap my ass and call me a teapot; I could not save that event",
)
@router.delete('/{result_id}')
@router.delete("/{result_id}")
async def delete_result(result_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
detail="You are not authorized to post results. This event has been logged.",
)
try:
this_result = Result.get_by_id(result_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
raise HTTPException(
status_code=404, detail=f"No result found with id {result_id}"
)
count = this_result.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Result {result_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Result {result_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Result {result_id} was not deleted')
raise HTTPException(
status_code=500, detail=f"Result {result_id} was not deleted"
)

View File

@ -9,10 +9,7 @@ from ..db_engine import Reward, model_to_dict, fn, DoesNotExist
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/rewards',
tags=['rewards']
)
router = APIRouter(prefix="/api/v2/rewards", tags=["rewards"])
class RewardModel(pydantic.BaseModel):
@ -20,19 +17,23 @@ class RewardModel(pydantic.BaseModel):
season: int
week: int
team_id: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000)
@router.get('')
@router.get("")
async def get_rewards(
name: Optional[str] = None, in_name: Optional[str] = None, team_id: Optional[int] = None,
season: Optional[int] = None, week: Optional[int] = None, created_after: Optional[int] = None,
flat: Optional[bool] = False, csv: Optional[bool] = None):
name: Optional[str] = None,
in_name: Optional[str] = None,
team_id: Optional[int] = None,
season: Optional[int] = None,
week: Optional[int] = None,
created_after: Optional[int] = None,
flat: Optional[bool] = False,
csv: Optional[bool] = None,
limit: Optional[int] = 100,
):
all_rewards = Reward.select().order_by(Reward.id)
if all_rewards.count() == 0:
raise HTTPException(status_code=404, detail=f'There are no rewards to filter')
if name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower())
if team_id is not None:
@ -48,63 +49,73 @@ async def get_rewards(
if week is not None:
all_rewards = all_rewards.where(Reward.week == week)
if all_rewards.count() == 0:
raise HTTPException(status_code=404, detail=f'No rewards found')
total_count = all_rewards.count()
if total_count == 0:
raise HTTPException(status_code=404, detail="No rewards found")
limit = max(0, min(limit, 500))
all_rewards = all_rewards.limit(limit)
if csv:
data_list = [['id', 'name', 'team', 'daily', 'created']]
data_list = [["id", "name", "team", "daily", "created"]]
for line in all_rewards:
data_list.append(
[
line.id, line.name, line.team.id, line.daily, line.created
]
[line.id, line.name, line.team.id, line.daily, line.created]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = {'count': all_rewards.count(), 'rewards': []}
return_val = {"count": total_count, "rewards": []}
for x in all_rewards:
return_val['rewards'].append(model_to_dict(x, recurse=not flat))
return_val["rewards"].append(model_to_dict(x, recurse=not flat))
return return_val
@router.get('/{reward_id}')
@router.get("/{reward_id}")
async def get_one_reward(reward_id, csv: Optional[bool] = False):
try:
this_reward = Reward.get_by_id(reward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
raise HTTPException(
status_code=404, detail=f"No reward found with id {reward_id}"
)
if csv:
data_list = [
['id', 'name', 'card_count', 'description'],
[this_reward.id, this_reward.name, this_reward.team.id, this_reward.daily, this_reward.created]
["id", "name", "card_count", "description"],
[
this_reward.id,
this_reward.name,
this_reward.team.id,
this_reward.daily,
this_reward.created,
],
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type='text/csv')
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_reward)
return return_val
@router.post('')
@router.post("")
async def post_rewards(reward: RewardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to post rewards. This event has been logged.'
detail="You are not authorized to post rewards. This event has been logged.",
)
reward_data = reward.dict()
# Convert milliseconds timestamp to datetime for PostgreSQL
if reward_data.get('created'):
reward_data['created'] = datetime.fromtimestamp(reward_data['created'] / 1000)
if reward_data.get("created"):
reward_data["created"] = datetime.fromtimestamp(reward_data["created"] / 1000)
this_reward = Reward(**reward_data)
saved = this_reward.save()
@ -114,24 +125,30 @@ async def post_rewards(reward: RewardModel, token: str = Depends(oauth2_scheme))
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
detail="Well slap my ass and call me a teapot; I could not save that cardset",
)
@router.patch('/{reward_id}')
@router.patch("/{reward_id}")
async def patch_reward(
reward_id, name: Optional[str] = None, team_id: Optional[int] = None, created: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
reward_id,
name: Optional[str] = None,
team_id: Optional[int] = None,
created: Optional[int] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to patch rewards. This event has been logged.'
detail="You are not authorized to patch rewards. This event has been logged.",
)
try:
this_reward = Reward.get_by_id(reward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
raise HTTPException(
status_code=404, detail=f"No reward found with id {reward_id}"
)
if name is not None:
this_reward.name = name
@ -147,28 +164,32 @@ async def patch_reward(
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
detail="Well slap my ass and call me a teapot; I could not save that rarity",
)
@router.delete('/{reward_id}')
@router.delete("/{reward_id}")
async def delete_reward(reward_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('Bad Token: [REDACTED]')
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rewards. This event has been logged.'
detail="You are not authorized to delete rewards. This event has been logged.",
)
try:
this_reward = Reward.get_by_id(reward_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
raise HTTPException(
status_code=404, detail=f"No reward found with id {reward_id}"
)
count = this_reward.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f'Reward {reward_id} has been deleted')
raise HTTPException(
status_code=200, detail=f"Reward {reward_id} has been deleted"
)
else:
raise HTTPException(status_code=500, detail=f'Reward {reward_id} was not deleted')
raise HTTPException(
status_code=500, detail=f"Reward {reward_id} was not deleted"
)

View File

@ -4,7 +4,7 @@ from typing import Optional
import logging
import pydantic
from ..db_engine import ScoutClaim, ScoutOpportunity, model_to_dict
from ..db_engine import ScoutClaim, model_to_dict
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/scout_claims", tags=["scout_claims"])
@ -18,7 +18,9 @@ class ScoutClaimModel(pydantic.BaseModel):
@router.get("")
async def get_scout_claims(
scout_opportunity_id: Optional[int] = None, claimed_by_team_id: Optional[int] = None
scout_opportunity_id: Optional[int] = None,
claimed_by_team_id: Optional[int] = None,
limit: Optional[int] = 100,
):
query = ScoutClaim.select().order_by(ScoutClaim.id)
@ -28,8 +30,14 @@ async def get_scout_claims(
if claimed_by_team_id is not None:
query = query.where(ScoutClaim.claimed_by_team_id == claimed_by_team_id)
total_count = query.count()
if limit is not None:
limit = max(0, min(limit, 500))
query = query.limit(limit)
results = [model_to_dict(x, recurse=False) for x in query]
return {"count": len(results), "results": results}
return {"count": total_count, "results": results}
@router.get("/{claim_id}")

View File

@ -5,7 +5,7 @@ from typing import Optional, List
import logging
import pydantic
from ..db_engine import ScoutOpportunity, ScoutClaim, model_to_dict, fn
from ..db_engine import ScoutOpportunity, ScoutClaim, model_to_dict
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/scout_opportunities", tags=["scout_opportunities"])
@ -32,8 +32,10 @@ async def get_scout_opportunities(
claimed: Optional[bool] = None,
expired_before: Optional[int] = None,
opener_team_id: Optional[int] = None,
limit: Optional[int] = 100,
):
limit = max(0, min(limit, 500))
query = ScoutOpportunity.select().order_by(ScoutOpportunity.id)
if opener_team_id is not None:
@ -50,8 +52,10 @@ async def get_scout_opportunities(
else:
query = query.where(ScoutOpportunity.id.not_in(claim_subquery))
total_count = query.count()
query = query.limit(limit)
results = [opportunity_to_dict(x, recurse=False) for x in query]
return {"count": len(results), "results": results}
return {"count": total_count, "results": results}
@router.get("/{opportunity_id}")

View File

@ -8,10 +8,7 @@ from ..db_engine import StratGame, model_to_dict, fn
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v2/games',
tags=['games']
)
router = APIRouter(prefix="/api/v2/games", tags=["games"])
class GameModel(pydantic.BaseModel):
@ -35,13 +32,22 @@ class GameList(pydantic.BaseModel):
games: List[GameModel]
@router.get('')
@router.get("")
async def get_games(
season: list = Query(default=None), forfeit: Optional[bool] = None, away_team_id: list = Query(default=None),
home_team_id: list = Query(default=None), team1_id: list = Query(default=None),
team2_id: list = Query(default=None), game_type: list = Query(default=None), ranked: Optional[bool] = None,
short_game: Optional[bool] = None, csv: Optional[bool] = False, short_output: bool = False,
gauntlet_id: Optional[int] = None):
season: list = Query(default=None),
forfeit: Optional[bool] = None,
away_team_id: list = Query(default=None),
home_team_id: list = Query(default=None),
team1_id: list = Query(default=None),
team2_id: list = Query(default=None),
game_type: list = Query(default=None),
ranked: Optional[bool] = None,
short_game: Optional[bool] = None,
csv: Optional[bool] = False,
short_output: bool = False,
gauntlet_id: Optional[int] = None,
limit: int = 100,
):
all_games = StratGame.select().order_by(StratGame.id)
if season is not None:
@ -68,49 +74,71 @@ async def get_games(
if short_game is not None:
all_games = all_games.where(StratGame.short_game == short_game)
if gauntlet_id is not None:
all_games = all_games.where(StratGame.game_type.contains(f'gauntlet-{gauntlet_id}'))
all_games = all_games.where(
StratGame.game_type.contains(f"gauntlet-{gauntlet_id}")
)
total_count = all_games.count() if not csv else 0
all_games = all_games.limit(max(0, min(limit, 500)))
if csv:
return_vals = [model_to_dict(x) for x in all_games]
for x in return_vals:
x['away_abbrev'] = x['away_team']['abbrev']
x['home_abbrev'] = x['home_team']['abbrev']
del x['away_team'], x['home_team']
x["away_abbrev"] = x["away_team"]["abbrev"]
x["home_abbrev"] = x["home_team"]["abbrev"]
del x["away_team"], x["home_team"]
output = pd.DataFrame(return_vals)[[
'id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_team_value', 'home_team_value',
'game_type', 'season', 'week', 'short_game', 'ranked'
]]
output = pd.DataFrame(return_vals)[
[
"id",
"away_abbrev",
"home_abbrev",
"away_score",
"home_score",
"away_team_value",
"home_team_value",
"game_type",
"season",
"week",
"short_game",
"ranked",
]
]
return Response(content=output.to_csv(index=False), media_type='text/csv')
return Response(content=output.to_csv(index=False), media_type="text/csv")
return_val = {'count': all_games.count(), 'games': [
model_to_dict(x, recurse=not short_output) for x in all_games
]}
return_val = {
"count": total_count,
"games": [model_to_dict(x, recurse=not short_output) for x in all_games],
}
return return_val
@router.get('/{game_id}')
@router.get("/{game_id}")
async def get_one_game(game_id: int):
this_game = StratGame.get_or_none(StratGame.id == game_id)
if not this_game:
raise HTTPException(status_code=404, detail=f'StratGame ID {game_id} not found')
raise HTTPException(status_code=404, detail=f"StratGame ID {game_id} not found")
g_result = model_to_dict(this_game)
return g_result
@router.patch('/{game_id}')
@router.patch("/{game_id}")
async def patch_game(
game_id: int, game_type: Optional[str] = None, away_score: Optional[int] = None,
home_score: Optional[int] = None, token: str = Depends(oauth2_scheme)):
game_id: int,
game_type: Optional[str] = None,
away_score: Optional[int] = None,
home_score: Optional[int] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning('patch_game - Bad Token: [REDACTED]')
raise HTTPException(status_code=401, detail='Unauthorized')
logging.warning("patch_game - Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
this_game = StratGame.get_or_none(StratGame.id == game_id)
if not this_game:
raise HTTPException(status_code=404, detail=f'StratGame ID {game_id} not found')
raise HTTPException(status_code=404, detail=f"StratGame ID {game_id} not found")
if away_score is not None:
this_game.away_score = away_score
@ -123,14 +151,14 @@ async def patch_game(
g_result = model_to_dict(this_game)
return g_result
else:
raise HTTPException(status_code=500, detail=f'Unable to patch game {game_id}')
raise HTTPException(status_code=500, detail=f"Unable to patch game {game_id}")
@router.post('')
@router.post("")
async def post_game(this_game: GameModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('post_games - Bad Token: [REDACTED]')
raise HTTPException(status_code=401, detail='Unauthorized')
logging.warning("post_games - Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
this_game = StratGame(**this_game.dict())
@ -141,25 +169,25 @@ async def post_game(this_game: GameModel, token: str = Depends(oauth2_scheme)):
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that game'
detail="Well slap my ass and call me a teapot; I could not save that game",
)
@router.delete('/{game_id}')
@router.delete("/{game_id}")
async def delete_game(game_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning('delete_game - Bad Token: [REDACTED]')
raise HTTPException(status_code=401, detail='Unauthorized')
logging.warning("delete_game - Bad Token: [REDACTED]")
raise HTTPException(status_code=401, detail="Unauthorized")
this_game = StratGame.get_or_none(StratGame.id == game_id)
if not this_game:
raise HTTPException(status_code=404, detail=f'StratGame ID {game_id} not found')
raise HTTPException(status_code=404, detail=f"StratGame ID {game_id} not found")
count = this_game.delete_instance()
if count == 1:
return f'StratGame {game_id} has been deleted'
return f"StratGame {game_id} has been deleted"
else:
raise HTTPException(status_code=500, detail=f'StratGame {game_id} could not be deleted')
raise HTTPException(
status_code=500, detail=f"StratGame {game_id} could not be deleted"
)

View File

@ -0,0 +1,19 @@
-- Migration: Add team_id index to refractor_card_state
-- Date: 2026-03-25
--
-- Adds a non-unique index on refractor_card_state.team_id to support the new
-- GET /api/v2/refractor/cards list endpoint, which filters by team as its
-- primary discriminator and is called on every /refractor status bot command.
--
-- The existing unique index is on (player_id, team_id) with player leading,
-- so team-only queries cannot use it efficiently.
BEGIN;
CREATE INDEX IF NOT EXISTS idx_refractor_card_state_team
ON refractor_card_state (team_id);
COMMIT;
-- Rollback:
-- DROP INDEX IF EXISTS idx_refractor_card_state_team;

View File

@ -204,3 +204,120 @@ def test_tier_t3_boundary():
def test_tier_accepts_namespace_track():
"""tier_from_value must work with attribute-style track objects (Peewee models)."""
assert tier_from_value(37, track_ns("batter")) == 1
# ---------------------------------------------------------------------------
# T1-1: Negative singles guard in compute_batter_value
# ---------------------------------------------------------------------------
def test_batter_negative_singles_component():
"""hits=1, doubles=1, triples=1, hr=0 produces singles=-1.
What: The formula computes singles = hits - doubles - triples - hr.
With hits=1, doubles=1, triples=1, hr=0 the result is singles = -1,
which is a physically impossible stat line but valid arithmetic input.
Why: Document the formula's actual behaviour when given an incoherent stat
line so that callers are aware that no clamping or guard exists. If a
guard is added in the future, this test will catch the change in behaviour.
singles = 1 - 1 - 1 - 0 = -1
tb = (-1)*1 + 1*2 + 1*3 + 0*4 = -1 + 2 + 3 = 4
value = pa + tb*2 = 0 + 4*2 = 8
"""
stats = batter_stats(hits=1, doubles=1, triples=1, hr=0)
# singles will be -1; the formula does NOT clamp, so TB = 4 and value = 8.0
result = compute_batter_value(stats)
assert result == 8.0, (
f"Expected 8.0 (negative singles flows through unclamped), got {result}"
)
def test_batter_negative_singles_is_not_clamped():
"""A singles value below zero is NOT clamped to zero by the formula.
What: Confirms that singles < 0 propagates into TB rather than being
floored at 0. If clamping were added, tb would be 0*1 + 1*2 + 1*3 = 5
and value would be 10.0, not 8.0.
Why: Guards future refactors if someone adds `singles = max(0, ...)`,
this assertion will fail immediately, surfacing the behaviour change.
"""
stats = batter_stats(hits=1, doubles=1, triples=1, hr=0)
unclamped_value = compute_batter_value(stats)
# If singles were clamped to 0: tb = 0+2+3 = 5, value = 10.0
clamped_value = 10.0
assert unclamped_value != clamped_value, (
"Formula appears to clamp negative singles — behaviour has changed"
)
# ---------------------------------------------------------------------------
# T1-2: Tier boundary precision with float SP values
# ---------------------------------------------------------------------------
def test_sp_tier_just_below_t1_outs29():
"""SP with outs=29 produces IP=9.666..., which is below T1 threshold (10) → T0.
What: 29 outs / 3 = 9.6666... IP + 0 K = 9.6666... value.
The SP T1 threshold is 10.0, so this value is strictly below T1.
Why: Floating-point IP values accumulate slowly for pitchers. A bug that
truncated or rounded IP upward could cause premature tier advancement.
Verify that tier_from_value uses a >= comparison (not >) and handles
non-integer values correctly.
"""
stats = pitcher_stats(outs=29, strikeouts=0)
value = compute_sp_value(stats)
assert value == pytest.approx(29 / 3) # 9.6666...
assert value < 10.0 # strictly below T1
assert tier_from_value(value, track_dict("sp")) == 0
def test_sp_tier_exactly_t1_outs30():
"""SP with outs=30 produces IP=10.0, exactly at T1 threshold → T1.
What: 30 outs / 3 = 10.0 IP + 0 K = 10.0 value.
The SP T1 threshold is 10.0, so value == t1 satisfies the >= condition.
Why: Off-by-one or strictly-greater-than comparisons would classify
this as T0 instead of T1. The boundary value must correctly promote
to the matching tier.
"""
stats = pitcher_stats(outs=30, strikeouts=0)
value = compute_sp_value(stats)
assert value == 10.0
assert tier_from_value(value, track_dict("sp")) == 1
def test_sp_float_value_at_exact_t2_boundary():
"""SP value exactly at T2 threshold (40.0) → T2.
What: outs=120 -> IP=40.0, strikeouts=0 -> value=40.0.
T2 threshold for SP is 40. The >= comparison must promote to T2.
Why: Validates that all four tier thresholds use inclusive lower-bound
comparisons for float values, not just T1.
"""
stats = pitcher_stats(outs=120, strikeouts=0)
value = compute_sp_value(stats)
assert value == 40.0
assert tier_from_value(value, track_dict("sp")) == 2
def test_sp_float_value_just_below_t2():
"""SP value just below T2 (39.999...) stays at T1.
What: outs=119 -> IP=39.6666..., strikeouts=0 -> value=39.666...
This is strictly less than T2=40, so tier should be 1 (already past T1=10).
Why: Confirms that sub-threshold float values are not prematurely promoted
due to floating-point comparison imprecision.
"""
stats = pitcher_stats(outs=119, strikeouts=0)
value = compute_sp_value(stats)
assert value == pytest.approx(119 / 3) # 39.666...
assert value < 40.0
assert tier_from_value(value, track_dict("sp")) == 1

View File

@ -665,3 +665,136 @@ def test_auth_required_evaluate_game(client):
resp = client.post(f"/api/v2/refractor/evaluate-game/{game.id}")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# T1-3: evaluate-game with non-existent game_id
# ---------------------------------------------------------------------------
def test_evaluate_game_nonexistent_game_id(client):
"""POST /refractor/evaluate-game/99999 with a game_id that does not exist.
What: There is no StratGame row with id=99999. The endpoint queries
StratPlay for plays in that game, finds zero rows, builds an empty
pairs set, and returns without evaluating anyone.
Why: Documents the confirmed behaviour: 200 with {"evaluated": 0,
"tier_ups": []}. The endpoint does not treat a missing game as an
error because StratPlay.select().where(game_id=N) returning 0 rows is
a valid (if unusual) outcome there are simply no players to evaluate.
If the implementation is ever changed to return 404 for missing games,
this test will fail and alert the developer to update the contract.
"""
resp = client.post("/api/v2/refractor/evaluate-game/99999", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
assert data["evaluated"] == 0
assert data["tier_ups"] == []
# ---------------------------------------------------------------------------
# T2-3: evaluate-game with zero plays
# ---------------------------------------------------------------------------
def test_evaluate_game_zero_plays(client):
"""evaluate-game on a game with no StratPlay rows returns empty results.
What: Create a StratGame but insert zero StratPlay rows for it. POST
to evaluate-game for that game_id.
Why: The endpoint builds its player list from StratPlay rows. A game
with no plays has no players to evaluate. Verify the endpoint does not
crash and returns the expected empty-batch shape rather than raising a
KeyError or returning an unexpected structure.
"""
team_a = _make_team("ZP1", gmid=20101)
team_b = _make_team("ZP2", gmid=20102)
game = _make_game(team_a, team_b)
# Intentionally no plays created
resp = client.post(
f"/api/v2/refractor/evaluate-game/{game.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["evaluated"] == 0
assert data["tier_ups"] == []
# ---------------------------------------------------------------------------
# T2-9: Per-player error isolation in evaluate_game
# ---------------------------------------------------------------------------
def test_evaluate_game_error_isolation(client, monkeypatch):
"""An exception raised for one player does not abort the rest of the batch.
What: Create two batters in the same game. Both have RefractorCardState
rows. Patch evaluate_card in the refractor router to raise RuntimeError
on the first call and succeed on the second. Verify the endpoint returns
200, evaluated==1 (not 0 or 2), and no tier_ups from the failing player.
Why: The evaluate-game loop catches per-player exceptions and logs them.
If the isolation breaks, a single bad card would silently drop all
evaluations for the rest of the game. The 'evaluated' count is the
observable signal that error isolation is functioning.
Implementation note: we patch the evaluate_card function inside the
router module directly so that the test is independent of how the router
imports it. We use a counter to let the first call fail and the second
succeed.
"""
from app.services import refractor_evaluator
team_a = _make_team("EI1", gmid=20111)
team_b = _make_team("EI2", gmid=20112)
batter_fail = _make_player("WP13 Fail Batter", pos="1B")
batter_ok = _make_player("WP13 Ok Batter", pos="1B")
pitcher = _make_player("WP13 EI Pitcher", pos="SP")
game = _make_game(team_a, team_b)
# Both batters need season stats and a track/state so they are not
# skipped by the "no state" guard before evaluate_card is called.
track = _make_track(name="EI Batter Track")
_make_state(batter_fail, team_a, track)
_make_state(batter_ok, team_a, track)
_make_play(game, 1, batter_fail, team_a, pitcher, team_b, pa=1, ab=1, outs=1)
_make_play(game, 2, batter_ok, team_a, pitcher, team_b, pa=1, ab=1, outs=1)
# The real evaluate_card for batter_ok so we know what it returns
real_evaluate = refractor_evaluator.evaluate_card
call_count = {"n": 0}
fail_player_id = batter_fail.player_id
def patched_evaluate(player_id, team_id, **kwargs):
call_count["n"] += 1
if player_id == fail_player_id:
raise RuntimeError("simulated per-player error")
return real_evaluate(player_id, team_id, **kwargs)
# The router does `from ..services.refractor_evaluator import evaluate_card`
# inside the async function body, so the local import re-resolves on each
# call. Patching the function on its source module ensures the local `from`
# import picks up our patched version when the route handler executes.
monkeypatch.setattr(
"app.services.refractor_evaluator.evaluate_card", patched_evaluate
)
resp = client.post(
f"/api/v2/refractor/evaluate-game/{game.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
# One player succeeded; one was caught by the exception handler
assert data["evaluated"] == 1
# The failing player must not appear in tier_ups
failing_ids = [tu["player_id"] for tu in data["tier_ups"]]
assert fail_player_id not in failing_ids

View File

@ -325,6 +325,59 @@ class TestCareerTotals:
assert result["current_value"] == 50.0
class TestFullyEvolvedPersistence:
"""T2-1: fully_evolved=True is preserved even when stats drop or are absent."""
def test_fully_evolved_persists_when_stats_zeroed(self, batter_track):
"""Card at T4/fully_evolved=True stays fully_evolved after stats are removed.
What: Set up a RefractorCardState at tier=4 with fully_evolved=True.
Then call evaluate_card with no season stats rows (zero career totals).
The evaluator computes value=0 -> new_tier=0, but current_tier must
stay at 4 (no regression) and fully_evolved must remain True.
Why: fully_evolved is a permanent achievement flag it must not be
revoked if a team's stats are rolled back, corrected, or simply not
yet imported. The no-regression rule (max(current, new)) prevents
tier demotion; this test confirms that fully_evolved follows the same
protection.
"""
# Seed state at T4 fully_evolved
_make_state(1, 1, batter_track, current_tier=4, current_value=900.0)
# No stats rows — career totals will be all zeros
# (no _make_stats call)
result = _eval(1, 1)
# The no-regression rule keeps tier at 4
assert result["current_tier"] == 4, (
f"Expected tier=4 (no regression), got {result['current_tier']}"
)
# fully_evolved must still be True since tier >= 4
assert result["fully_evolved"] is True, (
"fully_evolved was reset to False after re-evaluation with zero stats"
)
def test_fully_evolved_persists_with_partial_stats(self, batter_track):
"""Card at T4 stays fully_evolved even with stats below T1.
What: Same setup as above but with a season stats row giving value=30
(below T1=37). The computed tier would be 0, but current_tier must
not regress from 4.
Why: Validates that no-regression applies regardless of whether stats
are zero or merely insufficient for the achieved tier.
"""
_make_state(1, 1, batter_track, current_tier=4, current_value=900.0)
# pa=30 -> value=30, which is below T1=37 -> computed tier=0
_make_stats(1, 1, 1, pa=30)
result = _eval(1, 1)
assert result["current_tier"] == 4
assert result["fully_evolved"] is True
class TestMissingState:
"""ValueError when no card state exists for (player_id, team_id)."""
@ -359,3 +412,212 @@ class TestReturnShape:
assert isinstance(ts, str) and len(ts) > 0
# Must be parseable as a datetime
datetime.fromisoformat(ts)
class TestFullyEvolvedFlagCorrection:
"""T3-7: fully_evolved/tier mismatch is corrected by evaluate_card.
A database corruption where fully_evolved=True but current_tier < 4 can
occur if the flag was set incorrectly by a migration or external script.
evaluate_card must re-derive fully_evolved from the freshly-computed tier
(after the no-regression max() is applied), not trust the stored flag.
"""
def test_fully_evolved_flag_corrected_when_tier_below_4(self, batter_track):
"""fully_evolved=True with current_tier=3 is corrected to False after evaluation.
What: Manually set database state to fully_evolved=True, current_tier=3
(a corruption scenario tier 3 cannot be "fully evolved" since T4 is
the maximum tier). Provide stats that compute to a value in the T3
range (value=500, which is >= T3=448 but < T4=896).
After evaluate_card:
- computed value = 500 new_tier = 3
- no-regression: max(current_tier=3, new_tier=3) = 3 tier stays 3
- fully_evolved = (3 >= 4) = False flag is corrected
Why: The evaluator always recomputes fully_evolved from the final
current_tier rather than preserving the stored flag. This ensures
that a corrupted fully_evolved=True at tier<4 is silently repaired
on the next evaluation without requiring a separate migration.
"""
# Inject corruption: fully_evolved=True but tier=3
state = CardStateStub.create(
player_id=1,
team_id=1,
track=batter_track,
current_tier=3,
current_value=500.0,
fully_evolved=True, # intentionally wrong
last_evaluated_at=None,
)
# Stats that compute to value=500: pa=500, no hits → value=500+0=500
# T3 threshold=448, T4 threshold=896 → tier=3, NOT 4
_make_stats(1, 1, 1, pa=500)
result = _eval(1, 1)
assert result["current_tier"] == 3, (
f"Expected tier=3 after evaluation with value=500, got {result['current_tier']}"
)
assert result["fully_evolved"] is False, (
"fully_evolved should have been corrected to False for tier=3, "
f"got {result['fully_evolved']}"
)
# Confirm the database row was updated (not just the return dict)
state_reloaded = CardStateStub.get_by_id(state.id)
assert state_reloaded.fully_evolved is False, (
"fully_evolved was not persisted as False after correction"
)
def test_fully_evolved_flag_preserved_when_tier_reaches_4(self, batter_track):
"""fully_evolved=True with current_tier=3 stays True when new stats push to T4.
What: Same corruption setup as above (fully_evolved=True, tier=3),
but now provide stats with value=900 (>= T4=896).
After evaluate_card:
- computed value = 900 new_tier = 4
- no-regression: max(current_tier=3, new_tier=4) = 4 advances to 4
- fully_evolved = (4 >= 4) = True flag stays True (correctly)
Why: Confirms the evaluator correctly sets fully_evolved=True when
the re-computed tier legitimately reaches T4 regardless of whether
the stored flag was already True before evaluation.
"""
CardStateStub.create(
player_id=1,
team_id=1,
track=batter_track,
current_tier=3,
current_value=500.0,
fully_evolved=True, # stored flag (will be re-derived)
last_evaluated_at=None,
)
# pa=900 → value=900 >= T4=896 → new_tier=4
_make_stats(1, 1, 1, pa=900)
result = _eval(1, 1)
assert result["current_tier"] == 4, (
f"Expected tier=4 for value=900, got {result['current_tier']}"
)
assert result["fully_evolved"] is True, (
f"Expected fully_evolved=True for tier=4, got {result['fully_evolved']}"
)
class TestMultiTeamStatIsolation:
"""T3-8: A player's refractor value is isolated to a specific team's stats.
The evaluator queries BattingSeasonStats WHERE player_id=? AND team_id=?.
When a player has stats on two different teams in the same season, each
team's RefractorCardState must reflect only that team's stats not a
combined total.
"""
def test_multi_team_same_season_stats_isolated(self, batter_track):
"""Each team's refractor value reflects only that team's stats, not combined.
What: Create one player with BattingSeasonStats on team_id=1 (pa=80)
and team_id=2 (pa=120) in the same season. Create a RefractorCardState
for each team. Evaluate each team's card separately and verify:
- Team 1 state: value = 80 tier = T1 (80 >= T1=37, < T2=149)
- Team 2 state: value = 120 tier = T1 (120 >= T1=37, < T2=149)
- Neither value equals the combined total (80+120=200 would be T2)
Why: Confirms the `WHERE player_id=? AND team_id=?` filter in the
evaluator is correctly applied. Without proper team isolation, the
combined total of 200 would cross the T2 threshold (149) and both
states would be incorrectly assigned to T2. This is a critical
correctness requirement: a player traded between teams should have
separate refractor progressions for their time with each franchise.
"""
# Stats on team 1: pa=80 → value=80 (T1: 37<=80<149)
_make_stats(player_id=1, team_id=1, season=11, pa=80)
# Stats on team 2: pa=120 → value=120 (T1: 37<=120<149)
_make_stats(player_id=1, team_id=2, season=11, pa=120)
# combined pa would be 200 → value=200 → T2 (149<=200<448)
# Each team must see only its own stats, not 200
_make_state(player_id=1, team_id=1, track=batter_track)
_make_state(player_id=1, team_id=2, track=batter_track)
result_team1 = _eval(player_id=1, team_id=1)
result_team2 = _eval(player_id=1, team_id=2)
# Team 1: only pa=80 counted → value=80 → T1
assert result_team1["current_value"] == 80.0, (
f"Team 1 value should be 80.0 (its own stats only), "
f"got {result_team1['current_value']}"
)
assert result_team1["current_tier"] == 1, (
f"Team 1 tier should be T1 for value=80, got {result_team1['current_tier']}"
)
# Team 2: only pa=120 counted → value=120 → T1
assert result_team2["current_value"] == 120.0, (
f"Team 2 value should be 120.0 (its own stats only), "
f"got {result_team2['current_value']}"
)
assert result_team2["current_tier"] == 1, (
f"Team 2 tier should be T1 for value=120, got {result_team2['current_tier']}"
)
# Sanity: neither team crossed T2 (which would happen if stats were combined)
assert (
result_team1["current_tier"] != 2 and result_team2["current_tier"] != 2
), (
"At least one team was incorrectly assigned T2 — stats may have been combined"
)
def test_multi_team_different_seasons_isolated(self, batter_track):
"""Stats for the same player across multiple seasons remain per-team isolated.
What: Same player with two seasons of stats for each of two teams:
- team_id=1: season 10 pa=90, season 11 pa=70 combined=160
- team_id=2: season 10 pa=100, season 11 pa=80 combined=180
After evaluation:
- Team 1: value=160 T2 (149<=160<448)
- Team 2: value=180 T2 (149<=180<448)
The test confirms that cross-team season aggregation does not bleed
stats from team 2 into team 1's calculation or vice versa.
Why: Multi-season aggregation and multi-team isolation must work
together. A bug that incorrectly sums all player stats regardless
of team would produce combined values of 340 T2, which coincidentally
passes, but the per-team values and tiers would be wrong.
This test uses values where cross-contamination would produce a
materially different value (340 vs 160/180), catching that class of bug.
"""
# Team 1 stats: total pa=160 → value=160 → T2
_make_stats(player_id=1, team_id=1, season=10, pa=90)
_make_stats(player_id=1, team_id=1, season=11, pa=70)
# Team 2 stats: total pa=180 → value=180 → T2
_make_stats(player_id=1, team_id=2, season=10, pa=100)
_make_stats(player_id=1, team_id=2, season=11, pa=80)
_make_state(player_id=1, team_id=1, track=batter_track)
_make_state(player_id=1, team_id=2, track=batter_track)
result_team1 = _eval(player_id=1, team_id=1)
result_team2 = _eval(player_id=1, team_id=2)
assert result_team1["current_value"] == 160.0, (
f"Team 1 multi-season value should be 160.0, got {result_team1['current_value']}"
)
assert result_team1["current_tier"] == 2, (
f"Team 1 tier should be T2 for value=160, got {result_team1['current_tier']}"
)
assert result_team2["current_value"] == 180.0, (
f"Team 2 multi-season value should be 180.0, got {result_team2['current_value']}"
)
assert result_team2["current_tier"] == 2, (
f"Team 2 tier should be T2 for value=180, got {result_team2['current_tier']}"
)

View File

@ -158,6 +158,50 @@ class TestDetermineCardType:
# ---------------------------------------------------------------------------
class TestDetermineCardTypeEdgeCases:
"""T2-2: Parametrized edge cases for _determine_card_type.
Covers all the boundary inputs identified in the PO review:
DH, C, 2B (batters), empty string, None, and the compound 'SP/RP'
which contains both 'SP' and 'RP' substrings.
The function checks 'SP' before 'RP'/'CP', so 'SP/RP' resolves to 'sp'.
"""
@pytest.mark.parametrize(
"pos_1, expected",
[
# Plain batter positions
("DH", "batter"),
("C", "batter"),
("2B", "batter"),
# Empty / None — fall through to batter default
("", "batter"),
(None, "batter"),
# Compound string containing 'SP' first — must resolve to 'sp'
# because _determine_card_type checks "SP" in pos.upper() before RP/CP
("SP/RP", "sp"),
],
)
def test_position_mapping(self, pos_1, expected):
"""_determine_card_type maps each pos_1 value to the expected card_type.
What: Directly exercises _determine_card_type with the given pos_1 string.
None is handled by the `(player.pos_1 or "").upper()` guard in the
implementation, so it falls through to 'batter'.
Why: The card_type string is the key used to look up a RefractorTrack.
An incorrect mapping silently assigns the wrong thresholds to a player's
entire refractor journey. Parametrized so each edge case is a
distinct, independently reported test failure.
"""
player = _FakePlayer(pos_1)
assert _determine_card_type(player) == expected, (
f"pos_1={pos_1!r}: expected {expected!r}, "
f"got {_determine_card_type(player)!r}"
)
class TestInitializeCardEvolution:
"""Integration tests for initialize_card_refractor against in-memory SQLite.

View File

@ -124,6 +124,89 @@ def test_seed_idempotent():
assert RefractorTrack.select().count() == 3
# ---------------------------------------------------------------------------
# T1-4: Seed threshold ordering invariant (t1 < t2 < t3 < t4 + all positive)
# ---------------------------------------------------------------------------
def test_seed_all_thresholds_strictly_ascending_after_seed():
"""After seeding, every track satisfies t1 < t2 < t3 < t4.
What: Call seed_refractor_tracks(), then assert the full ordering chain
t1 < t2 < t3 < t4 for every row in the database. Also assert that all
four thresholds are strictly positive (> 0).
Why: The refractor tier engine uses these thresholds as exclusive partition
points. If any threshold is out-of-order or zero the tier assignment
becomes incorrect or undefined. This test is the authoritative invariant
guard; if a JSON edit accidentally violates the ordering this test fails
loudly before any cards are affected.
Separate from test_seed_thresholds_ascending which was written earlier
this test combines ordering + positivity into a single explicit assertion
block and uses more descriptive messages to aid debugging.
"""
seed_refractor_tracks()
for track in RefractorTrack.select():
assert track.t1_threshold > 0, (
f"{track.name}: t1_threshold={track.t1_threshold} is not positive"
)
assert track.t2_threshold > 0, (
f"{track.name}: t2_threshold={track.t2_threshold} is not positive"
)
assert track.t3_threshold > 0, (
f"{track.name}: t3_threshold={track.t3_threshold} is not positive"
)
assert track.t4_threshold > 0, (
f"{track.name}: t4_threshold={track.t4_threshold} is not positive"
)
assert (
track.t1_threshold
< track.t2_threshold
< track.t3_threshold
< track.t4_threshold
), (
f"{track.name}: thresholds are not strictly ascending: "
f"t1={track.t1_threshold}, t2={track.t2_threshold}, "
f"t3={track.t3_threshold}, t4={track.t4_threshold}"
)
# ---------------------------------------------------------------------------
# T2-10: Duplicate card_type tracks guard
# ---------------------------------------------------------------------------
def test_seed_each_card_type_has_exactly_one_track():
"""Each card_type must appear exactly once across all RefractorTrack rows.
What: After seeding, group the rows by card_type and assert that every
card_type has a count of exactly 1.
Why: RefractorTrack rows are looked up by card_type (e.g.
RefractorTrack.get(card_type='batter')). If a card_type appears more
than once, Peewee's .get() raises MultipleObjectsReturned, crashing
every pack opening and card evaluation for that type. This test acts as
a uniqueness contract so that seed bugs or accidental DB drift surface
immediately.
"""
seed_refractor_tracks()
from peewee import fn as peewee_fn
# Group by card_type and count occurrences
query = (
RefractorTrack.select(
RefractorTrack.card_type, peewee_fn.COUNT(RefractorTrack.id).alias("cnt")
)
.group_by(RefractorTrack.card_type)
.tuples()
)
for card_type, count in query:
assert count == 1, (
f"card_type={card_type!r} has {count} tracks; expected exactly 1"
)
def test_seed_updates_on_rerun(json_tracks):
"""A second seed call must restore any manually changed threshold to the JSON value.

View File

@ -34,12 +34,47 @@ Test matrix
test_get_card_404_no_state -- card with no RefractorCardState returns 404
test_duplicate_cards_share_state -- two cards same player+team return the same state row
test_auth_required -- missing token returns 401 on both endpoints
Tier 3 tests (T3-6) use a SQLite-backed TestClient and run without a PostgreSQL
connection. They test GET /api/v2/refractor/cards/{card_id} when the state row
has last_evaluated_at=None (card initialised but never evaluated).
test_get_card_state_last_evaluated_at_null -- last_evaluated_at: null in response
"""
import os
os.environ.setdefault("API_TOKEN", "test-token")
import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from peewee import SqliteDatabase
from app.db_engine import (
BattingSeasonStats,
Card,
Cardset,
Decision,
Event,
MlbPlayer,
Pack,
PackType,
PitchingSeasonStats,
Player,
ProcessedGame,
Rarity,
RefractorCardState,
RefractorCosmetic,
RefractorTierBoost,
RefractorTrack,
Roster,
RosterSlot,
ScoutClaim,
ScoutOpportunity,
StratGame,
StratPlay,
Team,
)
POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif(
@ -607,3 +642,317 @@ def test_auth_required(client, seeded_data):
resp_card = client.get(f"/api/v2/refractor/cards/{card_id}")
assert resp_card.status_code == 401
# ===========================================================================
# SQLite-backed tests for T2-4, T2-5, T2-6, T3-6
#
# These tests use the same shared-memory SQLite pattern as test_postgame_refractor
# so they run without a PostgreSQL connection. They test the
# GET /api/v2/teams/{team_id}/refractors, POST /refractor/cards/{card_id}/evaluate,
# and GET /api/v2/refractor/cards/{card_id} endpoints in isolation.
# ===========================================================================
_state_api_db = SqliteDatabase(
"file:stateapitest?mode=memory&cache=shared",
uri=True,
pragmas={"foreign_keys": 1},
)
_STATE_API_MODELS = [
Rarity,
Event,
Cardset,
MlbPlayer,
Player,
Team,
PackType,
Pack,
Card,
Roster,
RosterSlot,
StratGame,
StratPlay,
Decision,
ScoutOpportunity,
ScoutClaim,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
RefractorTrack,
RefractorCardState,
RefractorTierBoost,
RefractorCosmetic,
]
@pytest.fixture(autouse=False)
def setup_state_api_db():
"""Bind state-api test models to shared-memory SQLite and create tables.
Not autouse only the SQLite-backed tests in this section depend on it.
"""
_state_api_db.bind(_STATE_API_MODELS)
_state_api_db.connect(reuse_if_open=True)
_state_api_db.create_tables(_STATE_API_MODELS)
yield _state_api_db
_state_api_db.drop_tables(list(reversed(_STATE_API_MODELS)), safe=True)
def _build_state_api_app() -> FastAPI:
"""Minimal FastAPI app with teams + refractor routers for SQLite tests."""
from app.routers_v2.teams import router as teams_router
from app.routers_v2.refractor import router as refractor_router
app = FastAPI()
@app.middleware("http")
async def db_middleware(request: Request, call_next):
_state_api_db.connect(reuse_if_open=True)
return await call_next(request)
app.include_router(teams_router)
app.include_router(refractor_router)
return app
@pytest.fixture
def state_api_client(setup_state_api_db):
"""FastAPI TestClient for the SQLite-backed state API tests."""
with TestClient(_build_state_api_app()) as c:
yield c
# ---------------------------------------------------------------------------
# Helper factories for SQLite-backed tests
# ---------------------------------------------------------------------------
def _sa_make_rarity():
r, _ = Rarity.get_or_create(
value=50, name="SA_Common", defaults={"color": "#aabbcc"}
)
return r
def _sa_make_cardset():
cs, _ = Cardset.get_or_create(
name="SA Test Set",
defaults={"description": "state api test", "total_cards": 10},
)
return cs
def _sa_make_team(abbrev: str, gmid: int) -> Team:
return Team.create(
abbrev=abbrev,
sname=abbrev,
lname=f"Team {abbrev}",
gmid=gmid,
gmname=f"gm_{abbrev.lower()}",
gsheet="https://docs.google.com/sa_test",
wallet=500,
team_value=1000,
collection_value=1000,
season=11,
is_ai=False,
)
def _sa_make_player(name: str, pos: str = "1B") -> Player:
return Player.create(
p_name=name,
rarity=_sa_make_rarity(),
cardset=_sa_make_cardset(),
set_num=1,
pos_1=pos,
image="https://example.com/sa.png",
mlbclub="TST",
franchise="TST",
description=f"sa test: {name}",
)
def _sa_make_track(card_type: str = "batter") -> RefractorTrack:
track, _ = RefractorTrack.get_or_create(
name=f"SA {card_type} Track",
defaults=dict(
card_type=card_type,
formula="pa + tb * 2",
t1_threshold=37,
t2_threshold=149,
t3_threshold=448,
t4_threshold=896,
),
)
return track
def _sa_make_pack(team: Team) -> Pack:
pt, _ = PackType.get_or_create(
name="SA PackType",
defaults={"cost": 100, "card_count": 5, "description": "sa test pack type"},
)
return Pack.create(team=team, pack_type=pt)
def _sa_make_card(player: Player, team: Team) -> Card:
pack = _sa_make_pack(team)
return Card.create(player=player, team=team, pack=pack, value=0)
def _sa_make_state(player, team, track, current_tier=0, current_value=0.0):
return RefractorCardState.create(
player=player,
team=team,
track=track,
current_tier=current_tier,
current_value=current_value,
fully_evolved=False,
last_evaluated_at=None,
)
# ---------------------------------------------------------------------------
# T2-4: GET /teams/{valid_team_id}/refractors — team exists, zero states
# ---------------------------------------------------------------------------
def test_team_refractors_zero_states(setup_state_api_db, state_api_client):
"""GET /teams/{id}/refractors for a team with no RefractorCardState rows.
What: Create a Team with no associated RefractorCardState rows.
Call the endpoint and verify the response is {"count": 0, "items": []}.
Why: The endpoint uses a JOIN from RefractorCardState to RefractorTrack
filtered by team_id. If the WHERE produces no rows, the correct response
is an empty list with count=0, not a 404 or 500. This is the base-case
for a newly-created team that hasn't opened any packs yet.
"""
team = _sa_make_team("SA4", gmid=30041)
resp = state_api_client.get(
f"/api/v2/teams/{team.id}/refractors", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0
assert data["items"] == []
# ---------------------------------------------------------------------------
# T2-5: GET /teams/99999/refractors — non-existent team
# ---------------------------------------------------------------------------
def test_team_refractors_nonexistent_team(setup_state_api_db, state_api_client):
"""GET /teams/99999/refractors where team_id 99999 does not exist.
What: Call the endpoint with a team_id that has no Team row and no
RefractorCardState rows.
Why: Documents the confirmed behaviour: 200 with {"count": 0, "items": []}.
The endpoint queries RefractorCardState WHERE team_id=99999. Because no
state rows reference that team, the result is an empty list. The endpoint
does NOT validate that the Team row itself exists, so it does not return 404.
If the implementation is ever changed to validate team existence and return
404 for missing teams, this test will fail and surface the contract change.
"""
resp = state_api_client.get("/api/v2/teams/99999/refractors", headers=AUTH_HEADER)
assert resp.status_code == 200
data = resp.json()
# No state rows reference team 99999 — empty list with count=0
assert data["count"] == 0
assert data["items"] == []
# ---------------------------------------------------------------------------
# T2-6: POST /refractor/cards/{card_id}/evaluate — zero season stats → T0
# ---------------------------------------------------------------------------
def test_evaluate_card_zero_stats_stays_t0(setup_state_api_db, state_api_client):
"""POST /cards/{card_id}/evaluate for a card with no season stats stays at T0.
What: Create a Player, Team, Card, and RefractorCardState. Do NOT create
any BattingSeasonStats rows for this player+team. Call the evaluate
endpoint. The response must show current_tier=0 and current_value=0.0.
Why: A player who has never appeared in a game has zero career stats.
The evaluator sums all stats rows (none) -> all-zero totals ->
compute_batter_value(zeros) = 0.0 -> tier_from_value(0.0) = T0.
Verifies the happy-path zero-stats case returns a valid response rather
than crashing on an empty aggregation.
"""
team = _sa_make_team("SA6", gmid=30061)
player = _sa_make_player("SA6 Batter", pos="1B")
track = _sa_make_track("batter")
card = _sa_make_card(player, team)
_sa_make_state(player, team, track, current_tier=0, current_value=0.0)
# No BattingSeasonStats rows — intentionally empty
resp = state_api_client.post(
f"/api/v2/refractor/cards/{card.id}/evaluate", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["current_tier"] == 0
assert data["current_value"] == 0.0
# ---------------------------------------------------------------------------
# T3-6: GET /refractor/cards/{card_id} — last_evaluated_at is None
# ---------------------------------------------------------------------------
def test_get_card_state_last_evaluated_at_null(setup_state_api_db, state_api_client):
"""GET /refractor/cards/{card_id} returns last_evaluated_at: null for un-evaluated card.
What: Create a Player, Team, Card, and RefractorCardState where
last_evaluated_at is explicitly None (the state was initialised via a
pack-open hook but has never been through the evaluator). Call
GET /api/v2/refractor/cards/{card_id} and verify:
- The response status is 200 (not a 500 crash from calling .isoformat() on None).
- The response body contains the key 'last_evaluated_at'.
- The value of 'last_evaluated_at' is JSON null (Python None after parsing).
Why: The _build_card_state_response helper serialises last_evaluated_at
with `state.last_evaluated_at.isoformat() if state.last_evaluated_at else None`.
This test confirms that the None branch is exercised and the field is always
present in the response envelope. Callers must be able to distinguish
"never evaluated" (null) from a real ISO-8601 timestamp, and the API must
not crash on a newly-created card that has not yet been evaluated.
"""
team = _sa_make_team("SA_T36", gmid=30360)
player = _sa_make_player("T36 Batter", pos="1B")
track = _sa_make_track("batter")
card = _sa_make_card(player, team)
# Create state with last_evaluated_at=None — simulates a freshly initialised
# card that has not yet been through the evaluator
RefractorCardState.create(
player=player,
team=team,
track=track,
current_tier=0,
current_value=0.0,
fully_evolved=False,
last_evaluated_at=None,
)
resp = state_api_client.get(
f"/api/v2/refractor/cards/{card.id}", headers=AUTH_HEADER
)
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}: {resp.text}"
data = resp.json()
# 'last_evaluated_at' must be present as a key even when the value is null
assert "last_evaluated_at" in data, (
"Response is missing the 'last_evaluated_at' key"
)
assert data["last_evaluated_at"] is None, (
f"Expected last_evaluated_at=null for un-evaluated card, "
f"got {data['last_evaluated_at']!r}"
)

View File

@ -11,12 +11,47 @@ Tests auto-skip when POSTGRES_HOST is not set.
Test data is inserted via psycopg2 before the test module runs and deleted
afterwards so the tests are repeatable. ON CONFLICT keeps the table clean
even if a previous run did not complete teardown.
Tier 3 tests (T3-1) in this file use a SQLite-backed TestClient so they run
without a PostgreSQL connection. They test the card_type filter edge cases:
an unrecognised card_type string and an empty string should both return an
empty list (200 with count=0) rather than an error.
"""
import os
import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from peewee import SqliteDatabase
os.environ.setdefault("API_TOKEN", "test-token")
from app.db_engine import ( # noqa: E402
BattingSeasonStats,
Card,
Cardset,
Decision,
Event,
MlbPlayer,
Pack,
PackType,
PitchingSeasonStats,
Player,
ProcessedGame,
Rarity,
RefractorCardState,
RefractorCosmetic,
RefractorTierBoost,
RefractorTrack,
Roster,
RosterSlot,
ScoutClaim,
ScoutOpportunity,
StratGame,
StratPlay,
Team,
)
POSTGRES_HOST = os.environ.get("POSTGRES_HOST")
_skip_no_pg = pytest.mark.skipif(
@ -130,3 +165,172 @@ def test_auth_required(client, seeded_tracks):
track_id = seeded_tracks[0]
resp_single = client.get(f"/api/v2/refractor/tracks/{track_id}")
assert resp_single.status_code == 401
# ===========================================================================
# SQLite-backed tests for T3-1: invalid card_type query parameter
#
# These tests run without a PostgreSQL connection. They verify that the
# card_type filter on GET /api/v2/refractor/tracks handles values that match
# no known track (an unrecognised string, an empty string) gracefully: the
# endpoint must return 200 with {"count": 0, "items": []}, not a 4xx/5xx.
# ===========================================================================
_track_api_db = SqliteDatabase(
"file:trackapitest?mode=memory&cache=shared",
uri=True,
pragmas={"foreign_keys": 1},
)
_TRACK_API_MODELS = [
Rarity,
Event,
Cardset,
MlbPlayer,
Player,
Team,
PackType,
Pack,
Card,
Roster,
RosterSlot,
StratGame,
StratPlay,
Decision,
ScoutOpportunity,
ScoutClaim,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
RefractorTrack,
RefractorCardState,
RefractorTierBoost,
RefractorCosmetic,
]
@pytest.fixture(autouse=False)
def setup_track_api_db():
"""Bind track-API test models to shared-memory SQLite and create tables.
Inserts exactly two tracks (batter, sp) so the filter tests have a
non-empty table to query against confirming that the WHERE predicate
excludes them rather than the table simply being empty.
"""
_track_api_db.bind(_TRACK_API_MODELS)
_track_api_db.connect(reuse_if_open=True)
_track_api_db.create_tables(_TRACK_API_MODELS)
# Seed two real tracks so the table is not empty
RefractorTrack.get_or_create(
name="T3-1 Batter Track",
defaults=dict(
card_type="batter",
formula="pa + tb * 2",
t1_threshold=37,
t2_threshold=149,
t3_threshold=448,
t4_threshold=896,
),
)
RefractorTrack.get_or_create(
name="T3-1 SP Track",
defaults=dict(
card_type="sp",
formula="ip + k",
t1_threshold=10,
t2_threshold=40,
t3_threshold=120,
t4_threshold=240,
),
)
yield _track_api_db
_track_api_db.drop_tables(list(reversed(_TRACK_API_MODELS)), safe=True)
def _build_track_api_app() -> FastAPI:
"""Minimal FastAPI app containing only the refractor router for T3-1 tests."""
from app.routers_v2.refractor import router as refractor_router
app = FastAPI()
@app.middleware("http")
async def db_middleware(request: Request, call_next):
_track_api_db.connect(reuse_if_open=True)
return await call_next(request)
app.include_router(refractor_router)
return app
@pytest.fixture
def track_api_client(setup_track_api_db):
"""FastAPI TestClient for the SQLite-backed T3-1 track filter tests."""
with TestClient(_build_track_api_app()) as c:
yield c
# ---------------------------------------------------------------------------
# T3-1a: card_type=foo (unrecognised value) returns empty list
# ---------------------------------------------------------------------------
def test_invalid_card_type_returns_empty_list(setup_track_api_db, track_api_client):
"""GET /tracks?card_type=foo returns 200 with count=0, not a 4xx/5xx.
What: Query the track list with a card_type value ('foo') that matches
no row in refractor_track. The table contains batter and sp tracks so
the result must be an empty list rather than a full list (which would
indicate the filter was ignored).
Why: The endpoint applies `WHERE card_type == card_type` when the
parameter is not None. An unrecognised value is a valid no-match query
the contract is an empty list, not a validation error. Returning
a 422 Unprocessable Entity or 500 here would break clients that probe
for tracks by card type before knowing which types are registered.
"""
resp = track_api_client.get(
"/api/v2/refractor/tracks?card_type=foo", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0, (
f"Expected count=0 for unknown card_type 'foo', got {data['count']}"
)
assert data["items"] == [], (
f"Expected empty items list for unknown card_type 'foo', got {data['items']}"
)
# ---------------------------------------------------------------------------
# T3-1b: card_type= (empty string) returns empty list
# ---------------------------------------------------------------------------
def test_empty_string_card_type_returns_empty_list(
setup_track_api_db, track_api_client
):
"""GET /tracks?card_type= (empty string) returns 200 with count=0.
What: Pass an empty string as the card_type query parameter. No track
has card_type='' so the response must be an empty list with count=0.
Why: An empty string is not None FastAPI will pass it through as ''
rather than treating it as an absent parameter. The WHERE predicate
`card_type == ''` produces no matches, which is the correct silent
no-results behaviour. This guards against regressions where an empty
string might be mishandled as a None/absent value and accidentally return
all tracks, or raise a server error.
"""
resp = track_api_client.get(
"/api/v2/refractor/tracks?card_type=", headers=AUTH_HEADER
)
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0, (
f"Expected count=0 for empty card_type string, got {data['count']}"
)
assert data["items"] == [], (
f"Expected empty items list for empty card_type string, got {data['items']}"
)