from fastapi import APIRouter, Depends, HTTPException, Query, Response from typing import List, Optional import logging import pydantic from pandas import DataFrame from ..db_engine import db, Player, model_to_dict, chunked, fn from ..dependencies import oauth2_scheme, valid_token, LOG_DATA logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) router = APIRouter( prefix='/api/v3/players', tags=['players'] ) class PlayerModel(pydantic.BaseModel): name: str wara: float image: str image2: Optional[str] = None team_id: int season: int pitcher_injury: Optional[int] = None pos_1: str pos_2: Optional[str] = None pos_3: Optional[str] = None pos_4: Optional[str] = None pos_5: Optional[str] = None pos_6: Optional[str] = None pos_7: Optional[str] = None pos_8: Optional[str] = None vanity_card: Optional[str] = None headshot: Optional[str] = None last_game: Optional[str] = None last_game2: Optional[str] = None il_return: Optional[str] = None demotion_week: Optional[int] = None strat_code: Optional[str] = None bbref_id: Optional[str] = None injury_rating: Optional[str] = None class PlayerList(pydantic.BaseModel): players: List[PlayerModel] @router.get('') async def get_players( season: Optional[int], name: Optional[str] = None, team_id: list = Query(default=None), pos: list = Query(default=None), strat_code: list = Query(default=None), is_injured: Optional[bool] = None, sort: Optional[str] = None, short_output: Optional[bool] = False, csv: Optional[bool] = False): all_players = Player.select_season(season) if team_id is not None: all_players = all_players.where(Player.team_id << team_id) if strat_code is not None: code_list = [x.lower() for x in strat_code] all_players = all_players.where(fn.Lower(Player.strat_code) << code_list) if name is not None: all_players = all_players.where(fn.lower(Player.name) == name.lower()) if pos is not None: p_list = [x.upper() for x in pos] all_players = all_players.where( (Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) | (Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list) ) if is_injured is not None: all_players = all_players.where(Player.il_return.is_null(False)) if sort is not None: if sort == 'cost-asc': all_players = all_players.order_by(Player.wara) elif sort == 'cost-desc': all_players = all_players.order_by(-Player.wara) elif sort == 'name-asc': all_players = all_players.order_by(Player.name) elif sort == 'name-desc': all_players = all_players.order_by(-Player.name) if csv: player_list = [ ['name', 'wara', 'image', 'image2', 'team', 'season', 'pitcher_injury', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'last_game', 'last_game2', 'il_return', 'demotion_week', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'injury_rating', 'player_id'] ] for line in all_players: player_list.append( [ line.name, line.wara, line.image, line.image2, line.team.abbrev, line.season, line.pitcher_injury, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8, line.last_game, line.last_game2, line.il_return, line.demotion_week, line.headshot, line.vanity_card, line.strat_code.replace(",", "-_-"), line.bbref_id, line.injury_rating, line.id ] ) return_players = { 'count': all_players.count(), 'players': DataFrame(player_list).to_csv(header=False, index=False), 'csv': True } db.close() return Response(content=return_players['players'], media_type='text/csv') else: return_players = { 'count': all_players.count(), 'players': [model_to_dict(x, recurse=not short_output) for x in all_players] } db.close() return return_players @router.get('/{player_id}') async def get_one_player(player_id: int, short_output: Optional[bool] = False): this_player = Player.get_or_none(Player.id == player_id) if this_player: r_player = model_to_dict(this_player, recurse=not short_output) else: r_player = None db.close() return r_player @router.patch('/{player_id}') async def patch_player( player_id: int, new_player: PlayerModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'patch_player - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') if Player.get_or_none(Player.id == player_id) is None: db.close() raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found') Player.update(**new_player.dict()).where(Player.id == player_id).execute() r_player = model_to_dict(Player.get_by_id(player_id)) db.close() return r_player @router.post('') async def post_players(p_list: PlayerList, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'post_players - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') new_players = [] for player in p_list.players: dupe = Player.get_or_none(Player.season == player.season, Player.name == player.name) if dupe: db.close() raise HTTPException( status_code=500, detail=f'Player name {player.name} already in use in Season {player.season}' ) new_players.append(player.dict()) with db.atomic(): for batch in chunked(new_players, 15): Player.insert_many(batch).on_conflict_replace().execute() db.close() return f'Inserted {len(new_players)} players' @router.delete('/{player_id}') async def delete_player(player_id: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'delete_player - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') this_player = Player.get_or_none(Player.id == player_id) if not this_player: db.close() raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found') count = this_player.delete_instance() db.close() if count == 1: return f'Player {player_id} has been deleted' else: raise HTTPException(status_code=500, detail=f'Player {player_id} could not be deleted')