paper-dynasty-database/app/routers_v2/scout_opportunities.py
Cal Corum 5e182bedac feat: add scout_opportunities and scout_claims tables and API endpoints (#44)
Support the Discord bot's new scouting feature where players can scout
cards from other teams' opened packs. Stores opportunities with expiry
timestamps and tracks which teams claim which cards.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 03:45:38 +00:00

124 lines
3.9 KiB
Python

import json
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from typing import Optional, List
import logging
import pydantic
from ..db_engine import ScoutOpportunity, ScoutClaim, model_to_dict, fn
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/scout_opportunities", tags=["scout_opportunities"])
class ScoutOpportunityModel(pydantic.BaseModel):
pack_id: Optional[int] = None
opener_team_id: int
card_ids: List[int]
expires_at: int
created: Optional[int] = None
def opportunity_to_dict(opp, recurse=True):
"""Convert a ScoutOpportunity to dict with card_ids deserialized."""
result = model_to_dict(opp, recurse=recurse)
if isinstance(result.get("card_ids"), str):
result["card_ids"] = json.loads(result["card_ids"])
return result
@router.get("")
async def get_scout_opportunities(
claimed: Optional[bool] = None,
expired_before: Optional[int] = None,
opener_team_id: Optional[int] = None,
):
query = ScoutOpportunity.select().order_by(ScoutOpportunity.id)
if opener_team_id is not None:
query = query.where(ScoutOpportunity.opener_team_id == opener_team_id)
if expired_before is not None:
query = query.where(ScoutOpportunity.expires_at < expired_before)
if claimed is not None:
# Check whether any scout_claims exist for each opportunity
claim_subquery = ScoutClaim.select(ScoutClaim.scout_opportunity)
if claimed:
query = query.where(ScoutOpportunity.id.in_(claim_subquery))
else:
query = query.where(ScoutOpportunity.id.not_in(claim_subquery))
results = [opportunity_to_dict(x, recurse=False) for x in query]
return {"count": len(results), "results": results}
@router.get("/{opportunity_id}")
async def get_one_scout_opportunity(opportunity_id: int):
try:
opp = ScoutOpportunity.get_by_id(opportunity_id)
except Exception:
raise HTTPException(
status_code=404,
detail=f"No scout opportunity found with id {opportunity_id}",
)
return opportunity_to_dict(opp)
@router.post("")
async def post_scout_opportunity(
opportunity: ScoutOpportunityModel, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
raise HTTPException(
status_code=401,
detail="You are not authorized to post scout opportunities. This event has been logged.",
)
opp_data = opportunity.dict()
opp_data["card_ids"] = json.dumps(opp_data["card_ids"])
if opp_data["created"] is None:
opp_data["created"] = int(datetime.timestamp(datetime.now()) * 1000)
this_opp = ScoutOpportunity(**opp_data)
saved = this_opp.save()
if saved == 1:
return opportunity_to_dict(this_opp)
else:
raise HTTPException(status_code=418, detail="Could not save scout opportunity")
@router.delete("/{opportunity_id}")
async def delete_scout_opportunity(
opportunity_id: int, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning(f"Bad Token: {token}")
raise HTTPException(
status_code=401,
detail="You are not authorized to delete scout opportunities. This event has been logged.",
)
try:
opp = ScoutOpportunity.get_by_id(opportunity_id)
except Exception:
raise HTTPException(
status_code=404,
detail=f"No scout opportunity found with id {opportunity_id}",
)
count = opp.delete_instance()
if count == 1:
raise HTTPException(
status_code=200,
detail=f"Scout opportunity {opportunity_id} has been deleted",
)
else:
raise HTTPException(
status_code=500,
detail=f"Scout opportunity {opportunity_id} was not deleted",
)