from fastapi import APIRouter, Depends, HTTPException, Query, Response from typing import List, Optional, Literal import copy import logging import pydantic from ..db_engine import db, Team, Manager, Division, model_to_dict, chunked, fn, query_to_csv, Player from ..dependencies import add_cache_headers, cache_result, oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors, invalidate_cache logger = logging.getLogger('discord_app') router = APIRouter( prefix='/api/v3/teams', tags=['teams'] ) class TeamModel(pydantic.BaseModel): abbrev: str sname: str lname: str gmid: Optional[int] = None gmid2: Optional[int] = None manager1_id: Optional[int] = None manager2_id: Optional[int] = None division_id: Optional[int] = None stadium: Optional[str] = None thumbnail: Optional[str] = None color: Optional[str] = None dice_color: Optional[str] = None season: int class TeamList(pydantic.BaseModel): teams: List[TeamModel] @router.get('') @handle_db_errors @cache_result(ttl=10*60, key_prefix='teams') async def get_teams( season: Optional[int] = None, owner_id: list = Query(default=None), manager_id: list = Query(default=None), team_abbrev: list = Query(default=None), active_only: Optional[bool] = False, short_output: Optional[bool] = False, csv: Optional[bool] = False): if season is not None: all_teams = Team.select_season(season) else: all_teams = Team.select() if manager_id is not None: managers = Manager.select().where(Manager.id << manager_id) all_teams = all_teams.where( (Team.manager1_id << managers) | (Team.manager2_id << managers) ) if owner_id: all_teams = all_teams.where((Team.gmid << owner_id) | (Team.gmid2 << owner_id)) if team_abbrev is not None: team_list = [x.lower() for x in team_abbrev] all_teams = all_teams.where(fn.lower(Team.abbrev) << team_list) if active_only: all_teams = all_teams.where( ~(Team.abbrev.endswith('IL')) & ~(Team.abbrev.endswith('MiL')) ) if csv: return_val = query_to_csv(all_teams, exclude=[Team.division_legacy, Team.mascot, Team.gsheet]) db.close() return Response(content=return_val, media_type='text/csv') return_teams = { 'count': all_teams.count(), 'teams': [model_to_dict(x, recurse=not short_output) for x in all_teams] } db.close() return return_teams @router.get('/{team_id}') @handle_db_errors # @add_cache_headers(max_age=60*60) @cache_result(ttl=30*60, key_prefix='team') async def get_one_team(team_id: int): this_team = Team.get_or_none(Team.id == team_id) if this_team: r_team = model_to_dict(this_team) else: r_team = None db.close() return r_team @router.get('/{team_id}/roster/{which}', include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors # @add_cache_headers(max_age=60*60) @cache_result(ttl=30*60, key_prefix='team-roster') async def get_team_roster(team_id: int, which: Literal['current', 'next'], sort: Optional[str] = None): try: this_team = Team.get_by_id(team_id) except Exception as e: raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found') if which == 'current': full_roster = this_team.get_this_week() else: full_roster = this_team.get_next_week() active_players = copy.deepcopy(full_roster['active']['players']) sil_players = copy.deepcopy(full_roster['shortil']['players']) lil_players = copy.deepcopy(full_roster['longil']['players']) full_roster['active']['players'] = [] full_roster['shortil']['players'] = [] full_roster['longil']['players'] = [] for player in active_players: full_roster['active']['players'].append(model_to_dict(player)) for player in sil_players: full_roster['shortil']['players'].append(model_to_dict(player)) for player in lil_players: full_roster['longil']['players'].append(model_to_dict(player)) if sort: if sort == 'wara-desc': full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True) full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True) full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True) db.close() return full_roster @router.patch('/{team_id}', include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def patch_team( team_id: int, manager1_id: Optional[int] = None, manager2_id: Optional[int] = None, gmid: Optional[int] = None, gmid2: Optional[int] = None, mascot: Optional[str] = None, stadium: Optional[str] = None, thumbnail: Optional[str] = None, color: Optional[str] = None, abbrev: Optional[str] = None, sname: Optional[str] = None, lname: Optional[str] = None, dice_color: Optional[str] = None, division_id: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logger.warning(f'patch_team - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') this_team = Team.get_or_none(Team.id == team_id) if not this_team: return None if abbrev is not None: this_team.abbrev = abbrev if manager1_id is not None: if manager1_id == 0: this_team.manager1 = None else: this_manager = Manager.get_or_none(Manager.id == manager1_id) if not this_manager: db.close() raise HTTPException(status_code=404, detail=f'Manager ID {manager1_id} not found') this_team.manager1 = this_manager if manager2_id is not None: if manager2_id == 0: this_team.manager2 = None else: this_manager = Manager.get_or_none(Manager.id == manager2_id) if not this_manager: db.close() raise HTTPException(status_code=404, detail=f'Manager ID {manager2_id} not found') this_team.manager2 = this_manager if gmid is not None: this_team.gmid = gmid if gmid2 is not None: if gmid2 == 0: this_team.gmid2 = None else: this_team.gmid2 = gmid2 if mascot is not None: if mascot == 'False': this_team.mascot = None else: this_team.mascot = mascot if stadium is not None: this_team.stadium = stadium if thumbnail is not None: this_team.thumbnail = thumbnail if color is not None: this_team.color = color if dice_color is not None: this_team.dice_color = dice_color if sname is not None: this_team.sname = sname if lname is not None: this_team.lname = lname if division_id is not None: if division_id == 0: this_team.division = None else: this_division = Division.get_or_none(Division.id == division_id) if not this_division: db.close() raise HTTPException(status_code=404, detail=f'Division ID {division_id} not found') this_team.division = this_division if this_team.save(): r_team = model_to_dict(this_team) db.close() # Invalidate team-related cache entries invalidate_cache("teams*") invalidate_cache(f"team*{team_id}*") invalidate_cache("team-roster*") return r_team else: db.close() raise HTTPException(status_code=500, detail=f'Unable to patch team {team_id}') @router.post('', include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def post_team(team_list: TeamList, token: str = Depends(oauth2_scheme)): if not valid_token(token): logger.warning(f'post_team - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') new_teams = [] for team in team_list.teams: dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev) if dupe_team: db.close() raise HTTPException( status_code=500, detail=f'Team Abbrev {team.abbrev} already in use in Season {team.season}' ) if team.manager1_id and not Manager.get_or_none(Manager.id == team.manager1_id): db.close() raise HTTPException(status_code=404, detail=f'Manager ID {team.manager1_id} not found') if team.manager2_id and not Manager.get_or_none(Manager.id == team.manager2_id): db.close() raise HTTPException(status_code=404, detail=f'Manager ID {team.manager2_id} not found') if team.division_id and not Division.get_or_none(Division.id == team.division_id): db.close() raise HTTPException(status_code=404, detail=f'Division ID {team.division_id} not found') new_teams.append(team.dict()) with db.atomic(): for batch in chunked(new_teams, 15): Team.insert_many(batch).on_conflict_ignore().execute() db.close() # Invalidate team-related cache entries invalidate_cache("teams*") invalidate_cache("team*") invalidate_cache("team-roster*") return f'Inserted {len(new_teams)} teams' @router.delete('/{team_id}', include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def delete_team(team_id: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logger.warning(f'delete_team - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') this_team = Team.get_or_none(Team.id == team_id) if not this_team: db.close() raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found') count = this_team.delete_instance() db.close() if count == 1: # Invalidate team-related cache entries invalidate_cache("teams*") invalidate_cache(f"team*{team_id}*") invalidate_cache("team-roster*") return f'Team {team_id} has been deleted' else: raise HTTPException(status_code=500, detail=f'Team {team_id} could not be deleted')