190 lines
6.6 KiB
Python
190 lines
6.6 KiB
Python
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from typing import Optional
|
|
import logging
|
|
import pydantic
|
|
|
|
from ..db_engine import GauntletRun, model_to_dict, DatabaseError, DoesNotExist
|
|
from ..dependencies import oauth2_scheme, valid_token
|
|
|
|
|
|
router = APIRouter(prefix="/api/v2/gauntletruns", tags=["notifs"])
|
|
|
|
|
|
class GauntletRunModel(pydantic.BaseModel):
|
|
team_id: int
|
|
gauntlet_id: int
|
|
wins: Optional[int] = 0
|
|
losses: Optional[int] = 0
|
|
gsheet: Optional[str] = None
|
|
created: Optional[int] = None
|
|
ended: Optional[int] = None
|
|
|
|
|
|
@router.get("")
|
|
async def get_gauntletruns(
|
|
team_id: list = Query(default=None),
|
|
wins: Optional[int] = None,
|
|
wins_min: Optional[int] = None,
|
|
wins_max: Optional[int] = None,
|
|
losses: Optional[int] = None,
|
|
losses_min: Optional[int] = None,
|
|
losses_max: Optional[int] = None,
|
|
gsheet: Optional[str] = None,
|
|
created_after: Optional[int] = None,
|
|
created_before: Optional[int] = None,
|
|
ended_after: Optional[int] = None,
|
|
ended_before: Optional[int] = None,
|
|
is_active: Optional[bool] = None,
|
|
gauntlet_id: list = Query(default=None),
|
|
season: list = Query(default=None),
|
|
limit: int = 100,
|
|
):
|
|
all_gauntlets = GauntletRun.select().order_by(GauntletRun.id)
|
|
|
|
if team_id is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.team_id << team_id)
|
|
if wins is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.wins == wins)
|
|
if wins_min is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.wins >= wins_min)
|
|
if wins_max is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.wins <= wins_max)
|
|
if losses is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.losses == losses)
|
|
if losses_min is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.losses >= losses_min)
|
|
if losses_max is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.losses <= losses_max)
|
|
if gsheet is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.gsheet == gsheet)
|
|
if created_after is not None:
|
|
# Convert milliseconds timestamp to datetime for PostgreSQL comparison
|
|
created_after_dt = datetime.fromtimestamp(created_after / 1000)
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.created >= created_after_dt)
|
|
if created_before is not None:
|
|
created_before_dt = datetime.fromtimestamp(created_before / 1000)
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.created <= created_before_dt)
|
|
if ended_after is not None:
|
|
ended_after_dt = datetime.fromtimestamp(ended_after / 1000)
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.ended >= ended_after_dt)
|
|
if ended_before is not None:
|
|
ended_before_dt = datetime.fromtimestamp(ended_before / 1000)
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.ended <= ended_before_dt)
|
|
if is_active is not None:
|
|
if is_active is True:
|
|
# Active runs have NULL ended, not 0
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.ended.is_null())
|
|
else:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.ended.is_null(False))
|
|
if gauntlet_id is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.gauntlet_id << gauntlet_id)
|
|
if season is not None:
|
|
all_gauntlets = all_gauntlets.where(GauntletRun.team.season << season)
|
|
|
|
limit = max(0, min(limit, 500))
|
|
return_val = {"count": all_gauntlets.count(), "runs": []}
|
|
for x in all_gauntlets.limit(limit):
|
|
return_val["runs"].append(model_to_dict(x))
|
|
|
|
return return_val
|
|
|
|
|
|
@router.get("/{gauntletrun_id}")
|
|
async def get_one_gauntletrun(gauntletrun_id):
|
|
try:
|
|
this_gauntlet = GauntletRun.get_by_id(gauntletrun_id)
|
|
except DoesNotExist:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No gauntlet found with id {gauntletrun_id}"
|
|
)
|
|
|
|
return_val = model_to_dict(this_gauntlet)
|
|
return return_val
|
|
|
|
|
|
@router.patch("/{gauntletrun_id}")
|
|
async def patch_gauntletrun(
|
|
gauntletrun_id,
|
|
team_id: Optional[int] = None,
|
|
wins: Optional[int] = None,
|
|
losses: Optional[int] = None,
|
|
gsheet: Optional[str] = None,
|
|
created: Optional[bool] = None,
|
|
ended: Optional[bool] = None,
|
|
token: str = Depends(oauth2_scheme),
|
|
):
|
|
if not valid_token(token):
|
|
logging.warning("Bad Token: [REDACTED]")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="You are not authorized to patch gauntlet runs. This event has been logged.",
|
|
)
|
|
|
|
this_run = GauntletRun.get_or_none(GauntletRun.id == gauntletrun_id)
|
|
if this_run is None:
|
|
raise KeyError(f"Gauntlet Run ID {gauntletrun_id} not found")
|
|
|
|
if team_id is not None:
|
|
this_run.team_id = team_id
|
|
if wins is not None:
|
|
this_run.wins = wins
|
|
if losses is not None:
|
|
this_run.losses = losses
|
|
if gsheet is not None:
|
|
this_run.gsheet = gsheet
|
|
if created is not None:
|
|
if created is True:
|
|
this_run.created = datetime.now()
|
|
else:
|
|
this_run.created = None
|
|
if ended is not None:
|
|
if ended is True:
|
|
this_run.ended = datetime.now()
|
|
else:
|
|
this_run.ended = None
|
|
|
|
if this_run.save():
|
|
r_curr = model_to_dict(this_run)
|
|
return r_curr
|
|
else:
|
|
raise DatabaseError(f"Unable to patch gauntlet run {gauntletrun_id}")
|
|
|
|
|
|
@router.post("")
|
|
async def post_gauntletrun(
|
|
gauntletrun: GauntletRunModel, token: str = Depends(oauth2_scheme)
|
|
):
|
|
if not valid_token(token):
|
|
logging.warning("Bad Token: [REDACTED]")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="You are not authorized to post gauntlets. This event has been logged.",
|
|
)
|
|
|
|
run_data = gauntletrun.dict()
|
|
# Convert milliseconds timestamps to datetime for PostgreSQL
|
|
if run_data.get("created"):
|
|
run_data["created"] = datetime.fromtimestamp(run_data["created"] / 1000)
|
|
else:
|
|
run_data["created"] = datetime.now()
|
|
if run_data.get("ended"):
|
|
run_data["ended"] = datetime.fromtimestamp(run_data["ended"] / 1000)
|
|
else:
|
|
run_data["ended"] = None
|
|
this_run = GauntletRun(**run_data)
|
|
|
|
if this_run.save():
|
|
r_run = model_to_dict(this_run)
|
|
return r_run
|
|
else:
|
|
raise DatabaseError("Unable to post gauntlet run")
|
|
|
|
|
|
@router.delete("/{gauntletrun_id}")
|
|
async def delete_gauntletrun(gauntletrun_id):
|
|
if GauntletRun.delete_by_id(gauntletrun_id) == 1:
|
|
return f"Deleted gauntlet run ID {gauntletrun_id}"
|
|
|
|
raise DatabaseError(f"Unable to delete gauntlet run {gauntletrun_id}")
|