164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from typing import List, Optional
|
|
import logging
|
|
import pydantic
|
|
|
|
from ..db_engine import db, Schedule, Team, model_to_dict, chunked
|
|
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA, PRIVATE_IN_SCHEMA
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DATA['filename'],
|
|
format=LOG_DATA['format'],
|
|
level=LOG_DATA['log_level']
|
|
)
|
|
|
|
router = APIRouter(
|
|
prefix='/api/v3/schedules',
|
|
tags=['schedules']
|
|
)
|
|
|
|
|
|
class ScheduleModel(pydantic.BaseModel):
|
|
week: int
|
|
awayteam_id: int
|
|
hometeam_id: int
|
|
gamecount: int
|
|
season: int
|
|
|
|
|
|
class ScheduleList(pydantic.BaseModel):
|
|
schedules: List[ScheduleModel]
|
|
|
|
|
|
@router.get('')
|
|
async def get_schedules(
|
|
season: int, team_abbrev: list = Query(default=None), away_abbrev: list = Query(default=None),
|
|
home_abbrev: list = Query(default=None), week_start: Optional[int] = None, week_end: Optional[int] = None,
|
|
short_output: Optional[bool] = True):
|
|
all_sched = Schedule.select_season(season)
|
|
|
|
if team_abbrev is not None:
|
|
team_list = []
|
|
for x in team_abbrev:
|
|
team_list.append(Team.get_season(x, season))
|
|
all_sched = all_sched.where(
|
|
(Schedule.awayteam << team_list) | (Schedule.hometeam << team_list)
|
|
)
|
|
|
|
if away_abbrev is not None:
|
|
team_list = []
|
|
for x in away_abbrev:
|
|
team_list.append(Team.get_season(x, season))
|
|
all_sched = all_sched.where(Schedule.awayteam << team_list)
|
|
|
|
if home_abbrev is not None:
|
|
team_list = []
|
|
for x in home_abbrev:
|
|
team_list.append(Team.get_season(x, season))
|
|
all_sched = all_sched.where(Schedule.hometeam << team_list)
|
|
|
|
if week_start is not None:
|
|
all_sched = all_sched.where(Schedule.week >= week_start)
|
|
|
|
if week_end is not None:
|
|
all_sched = all_sched.where(Schedule.week <= week_end)
|
|
|
|
all_sched = all_sched.order_by(Schedule.id)
|
|
|
|
return_sched = {
|
|
'count': all_sched.count(),
|
|
'schedules': [model_to_dict(x, recurse=not short_output) for x in all_sched]
|
|
}
|
|
db.close()
|
|
return return_sched
|
|
|
|
|
|
@router.get('/{schedule_id}')
|
|
async def get_one_schedule(schedule_id: int):
|
|
this_sched = Schedule.get_or_none(Schedule.id == schedule_id)
|
|
if this_sched is not None:
|
|
r_sched = model_to_dict(this_sched)
|
|
else:
|
|
r_sched = None
|
|
db.close()
|
|
return r_sched
|
|
|
|
|
|
@router.patch('/{schedule_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def patch_schedule(
|
|
schedule_id: int, week: list = Query(default=None), awayteam_id: Optional[int] = None,
|
|
hometeam_id: Optional[int] = None, gamecount: Optional[int] = None, season: Optional[int] = None,
|
|
token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'patch_schedule - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
this_sched = Schedule.get_or_none(Schedule.id == schedule_id)
|
|
if this_sched is None:
|
|
raise HTTPException(status_code=404, detail=f'Schedule ID {schedule_id} not found')
|
|
|
|
if week is not None:
|
|
this_sched.week = week
|
|
|
|
if awayteam_id is not None:
|
|
this_sched.awayteam_id = awayteam_id
|
|
|
|
if hometeam_id is not None:
|
|
this_sched.hometeam_id = hometeam_id
|
|
|
|
if gamecount is not None:
|
|
this_sched.gamecount = gamecount
|
|
|
|
if season is not None:
|
|
this_sched.season = season
|
|
|
|
if this_sched.save() == 1:
|
|
r_sched = model_to_dict(this_sched)
|
|
db.close()
|
|
return r_sched
|
|
else:
|
|
db.close()
|
|
raise HTTPException(status_code=500, detail=f'Unable to patch schedule {schedule_id}')
|
|
|
|
|
|
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def post_schedules(sched_list: ScheduleList, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'post_schedules - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
new_sched = []
|
|
for x in sched_list.schedules:
|
|
if Team.get_or_none(Team.id == x.awayteam_id) is None:
|
|
raise HTTPException(status_code=404, detail=f'Team ID {x.awayteam_id} not found')
|
|
if Team.get_or_none(Team.id == x.hometeam_id) is None:
|
|
raise HTTPException(status_code=404, detail=f'Team ID {x.hometeam_id} not found')
|
|
|
|
new_sched.append(x.dict())
|
|
|
|
with db.atomic():
|
|
for batch in chunked(new_sched, 15):
|
|
Schedule.insert_many(batch).on_conflict_replace().execute()
|
|
db.close()
|
|
|
|
return f'Inserted {len(new_sched)} schedules'
|
|
|
|
|
|
@router.delete('/{schedule_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def delete_schedule(schedule_id: int, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'delete_schedule - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
this_sched = Schedule.get_or_none(Schedule.id == schedule_id)
|
|
if this_sched is None:
|
|
raise HTTPException(status_code=404, detail=f'Schedule ID {schedule_id} not found')
|
|
|
|
count = this_sched.delete_instance()
|
|
db.close()
|
|
|
|
if count == 1:
|
|
return f'Schedule {this_sched} has been deleted'
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f'Schedule {this_sched} could not be deleted')
|