major-domo-database/app/routers_v3/players.py
2023-07-17 23:09:04 -05:00

191 lines
6.8 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Player, model_to_dict, chunked, fn
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'])
router = APIRouter(
prefix='/api/v3/players',
tags=['players']
)
class PlayerModel(pydantic.BaseModel):
name: str
wara: float
image: str
image2: Optional[str] = None
team_id: int
season: int
pitcher_injury: Optional[int] = None
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
vanity_card: Optional[str] = None
headshot: Optional[str] = None
last_game: Optional[str] = None
last_game2: Optional[str] = None
il_return: Optional[str] = None
demotion_week: Optional[int] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
injury_rating: Optional[str] = None
class PlayerList(pydantic.BaseModel):
players: List[PlayerModel]
@router.get('')
async def get_players(
season: Optional[int], name: Optional[str] = None, team_id: list = Query(default=None),
pos: list = Query(default=None), strat_code: list = Query(default=None), is_injured: Optional[bool] = None,
sort: Optional[str] = None, short_output: Optional[bool] = False, csv: Optional[bool] = False):
all_players = Player.select_season(season)
if team_id is not None:
all_players = all_players.where(Player.team_id << team_id)
if strat_code is not None:
code_list = [x.lower() for x in strat_code]
all_players = all_players.where(fn.Lower(Player.strat_code) << code_list)
if name is not None:
all_players = all_players.where(fn.lower(Player.name) == name.lower())
if pos is not None:
p_list = [x.upper() for x in pos]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
if is_injured is not None:
all_players = all_players.where(Player.il_return.is_null(False))
if sort is not None:
if sort == 'cost-asc':
all_players = all_players.order_by(Player.wara)
elif sort == 'cost-desc':
all_players = all_players.order_by(-Player.wara)
elif sort == 'name-asc':
all_players = all_players.order_by(Player.name)
elif sort == 'name-desc':
all_players = all_players.order_by(-Player.name)
print(f'csv: {csv}')
if csv:
player_list = [
['name', 'wara', 'image', 'image2', 'team', 'season', 'pitcher_injury', 'pos_1', 'pos_2', 'pos_3',
'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'last_game', 'last_game2', 'il_return', 'demotion_week',
'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'injury_rating', 'player_id']
]
for line in all_players:
player_list.append(
[
line.name, line.wara, line.image, line.image2, line.team.abbrev, line.season, line.pitcher_injury,
line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8,
line.last_game, line.last_game2, line.il_return, line.demotion_week, line.headshot,
line.vanity_card, line.strat_code.replace(",", "-_-"), line.bbref_id, line.injury_rating, line.id
]
)
return_players = {
'count': all_players.count(),
'players': DataFrame(player_list).to_csv(header=False, index=False),
'csv': True
}
db.close()
return Response(content=return_players['players'], media_type='text/csv')
else:
return_players = {
'count': all_players.count(),
'players': [model_to_dict(x, recurse=not short_output) for x in all_players]
}
db.close()
return return_players
@router.get('/{player_id}')
async def get_one_player(player_id: int, short_output: Optional[bool] = False):
this_player = Player.get_or_none(Player.id == player_id)
if this_player:
r_player = model_to_dict(this_player, recurse=not short_output)
else:
r_player = None
db.close()
return r_player
@router.patch('/{player_id}')
async def patch_player(
player_id: int, new_player: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'patch_player - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
if Player.get_or_none(Player.id == player_id) is None:
raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found')
Player.update(**new_player.dict()).where(Player.id == player_id).execute()
r_player = model_to_dict(Player.get_by_id(player_id))
db.close()
return r_player
@router.post('')
async def post_players(p_list: PlayerList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'post_players - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
new_players = []
for player in p_list.players:
dupe = Player.get_or_none(Player.season == player.season, Player.name == player.name)
if dupe:
db.close()
raise HTTPException(
status_code=500,
detail=f'Player name {player.name} already in use in Season {player.season}'
)
new_players.append(player.dict())
with db.atomic():
for batch in chunked(new_players, 15):
Player.insert_many(batch).on_conflict_replace().execute()
db.close()
return f'Inserted {len(new_players)} players'
@router.delete('/{player_id}')
async def delete_player(player_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'delete_player - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_player = Player.get_or_none(Player.id == player_id)
if not this_player:
db.close()
raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found')
count = this_player.delete_instance()
db.close()
if count == 1:
return f'Player {player_id} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Player {player_id} could not be deleted')