major-domo-database/app/routers_v3/teams.py
2023-09-15 00:03:23 -05:00

261 lines
9.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional, Literal
import copy
import logging
import pydantic
from ..db_engine import db, Team, Manager, Division, model_to_dict, chunked, fn, query_to_csv
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v3/teams',
tags=['teams']
)
class TeamModel(pydantic.BaseModel):
abbrev: str
sname: str
lname: str
gmid: Optional[int] = None
gmid2: Optional[int] = None
manager1_id: Optional[int] = None
manager2_id: Optional[int] = None
division_id: Optional[int] = None
stadium: Optional[str] = None
thumbnail: Optional[str] = None
color: Optional[str] = None
dice_color: Optional[str] = None
season: int
class TeamList(pydantic.BaseModel):
teams: List[TeamModel]
@router.get('')
async def get_teams(
season: Optional[int] = None, owner_id: list = Query(default=None), manager_id: list = Query(default=None),
team_abbrev: list = Query(default=None), active_only: Optional[bool] = False,
short_output: Optional[bool] = False, csv: Optional[bool] = False):
if season is not None:
all_teams = Team.select_season(season)
else:
all_teams = Team.select()
if manager_id is not None:
managers = Manager.select().where(Manager.id << manager_id)
all_teams = all_teams.where(
(Team.manager1_id << managers) | (Team.manager2_id << managers)
)
if owner_id:
all_teams = all_teams.where((Team.gmid << owner_id) | (Team.gmid2 << owner_id))
if team_abbrev is not None:
team_list = [x.lower() for x in team_abbrev]
all_teams = all_teams.where(fn.lower(Team.abbrev) << team_list)
if active_only:
all_teams = all_teams.where(
~(Team.abbrev.endswith('IL')) & ~(Team.abbrev.endswith('MiL'))
)
if csv:
return_val = query_to_csv(all_teams, exclude=[Team.division_legacy, Team.mascot, Team.gsheet])
db.close()
return Response(content=return_val, media_type='text/csv')
return_teams = {
'count': all_teams.count(),
'teams': [model_to_dict(x, recurse=not short_output) for x in all_teams]
}
db.close()
return return_teams
@router.get('/{team_id}')
async def get_one_team(team_id: int):
this_team = Team.get_or_none(Team.id == team_id)
if this_team:
r_team = model_to_dict(this_team)
else:
r_team = None
db.close()
return r_team
@router.get('/{team_id}/roster/{which}')
async def get_team_roster(team_id: int, which: Literal['current', 'next'], sort: Optional[str] = None):
try:
this_team = Team.get_by_id(team_id)
except Exception as e:
raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found')
if which == 'current':
full_roster = this_team.get_this_week()
else:
full_roster = this_team.get_next_week()
active_players = copy.deepcopy(full_roster['active']['players'])
sil_players = copy.deepcopy(full_roster['shortil']['players'])
lil_players = copy.deepcopy(full_roster['longil']['players'])
full_roster['active']['players'] = []
full_roster['shortil']['players'] = []
full_roster['longil']['players'] = []
for player in active_players:
full_roster['active']['players'].append(model_to_dict(player))
for player in sil_players:
full_roster['shortil']['players'].append(model_to_dict(player))
for player in lil_players:
full_roster['longil']['players'].append(model_to_dict(player))
if sort:
if sort == 'wara-desc':
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
db.close()
return full_roster
@router.patch('/{team_id}')
async def patch_team(
team_id: int, manager1_id: Optional[int] = None, manager2_id: Optional[int] = None, gmid: Optional[int] = None,
gmid2: Optional[int] = None, mascot: Optional[str] = None, stadium: Optional[str] = None,
thumbnail: Optional[str] = None, color: Optional[str] = None, abbrev: Optional[str] = None,
sname: Optional[str] = None, lname: Optional[str] = None, dice_color: Optional[str] = None,
division_id: Optional[int] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'patch_team - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_team = Team.get_or_none(Team.id == team_id)
if not this_team:
return None
if abbrev is not None:
this_team.abbrev = abbrev
if manager1_id is not None:
if manager1_id == 0:
this_team.manager1 = None
else:
this_manager = Manager.get_or_none(Manager.id == manager1_id)
if not this_manager:
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {manager1_id} not found')
this_team.manager1 = this_manager
if manager2_id is not None:
if manager2_id == 0:
this_team.manager2 = None
else:
this_manager = Manager.get_or_none(Manager.id == manager2_id)
if not this_manager:
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {manager2_id} not found')
this_team.manager2 = this_manager
if gmid is not None:
this_team.gmid = gmid
if gmid2 is not None:
if gmid2 == 0:
this_team.gmid2 = None
else:
this_team.gmid2 = gmid2
if mascot is not None:
if mascot == 'False':
this_team.mascot = None
else:
this_team.mascot = mascot
if stadium is not None:
this_team.stadium = stadium
if thumbnail is not None:
this_team.thumbnail = thumbnail
if color is not None:
this_team.color = color
if dice_color is not None:
this_team.dice_color = dice_color
if sname is not None:
this_team.sname = sname
if lname is not None:
this_team.lname = lname
if division_id is not None:
if division_id == 0:
this_team.division = None
else:
this_division = Division.get_or_none(Division.id == division_id)
if not this_division:
db.close()
raise HTTPException(status_code=404, detail=f'Division ID {division_id} not found')
this_team.division = this_division
if this_team.save():
r_team = model_to_dict(this_team)
db.close()
return r_team
else:
db.close()
raise HTTPException(status_code=500, detail=f'Unable to patch team {team_id}')
@router.post('')
async def post_team(team_list: TeamList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'post_team - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
new_teams = []
for team in team_list.teams:
dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev)
if dupe_team:
db.close()
raise HTTPException(
status_code=500, detail=f'Team Abbrev {team.abbrev} already in use in Season {team.season}'
)
if team.manager1_id and not Manager.get_or_none(Manager.id == team.manager1_id):
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {team.manager1_id} not found')
if team.manager2_id and not Manager.get_or_none(Manager.id == team.manager2_id):
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {team.manager2_id} not found')
if team.division_id and not Division.get_or_none(Division.id == team.division_id):
db.close()
raise HTTPException(status_code=404, detail=f'Division ID {team.division_id} not found')
new_teams.append(team.dict())
with db.atomic():
for batch in chunked(new_teams, 15):
Team.insert_many(batch).on_conflict_replace().execute()
db.close()
return f'Inserted {len(new_teams)} teams'
@router.get('/{team_id}')
async def delete_team(team_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'delete_team - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_team = Team.get_or_none(Team.id == team_id)
if not this_team:
db.close()
raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found')
count = this_team.delete_instance()
db.close()
if count == 1:
return f'Team {team_id} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Team {team_id} could not be deleted')