major-domo-database/app/routers_v3/players.py
2023-05-05 11:06:11 -05:00

154 lines
4.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
import logging
import pydantic
from ..db_engine import db, Player, model_to_dict, chunked
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(
prefix='/api/v3/players',
tags=['players']
)
class PlayerModel(pydantic.BaseModel):
name: str
wara: float
image: str
image2: Optional[str] = None
team_id: int
season: int
pitcher_injury: Optional[int] = None
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
last_game: Optional[str] = None
last_game2: Optional[str] = None
il_return: Optional[str] = None
demotion_week: Optional[int] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
injury_rating: Optional[str] = None
class PlayerList(pydantic.BaseModel):
players: List[PlayerModel]
@router.get('')
async def get_players(
season: Optional[int], team_id: list = Query(default=None), pos: list = Query(default=None),
is_injured: Optional[bool] = None, sort: Optional[str] = None):
logging.info(f'team_id: {team_id}')
all_players = Player.select_season(season)
if team_id is not None:
all_players = all_players.where(Player.team_id << team_id)
if pos is not None:
p_list = [x.upper() for x in pos]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
if is_injured is not None:
all_players = all_players.where(Player.il_return.is_null(False))
if sort is not None:
if sort == 'cost-asc':
all_players = all_players.order_by(Player.wara)
elif sort == 'cost-desc':
all_players = all_players.order_by(-Player.wara)
elif sort == 'name-asc':
all_players = all_players.order_by(Player.name)
elif sort == 'name-desc':
all_players = all_players.order_by(-Player.name)
return_players = {
'count': all_players.count(),
'players': [model_to_dict(x, recurse=False) for x in all_players]
}
db.close()
return return_players
@router.get('/{player_id}')
async def get_one_player(player_id: int, short_output: Optional[bool] = False):
this_player = Player.get_or_none(Player.id == player_id)
if this_player:
r_player = model_to_dict(this_player)
else:
r_player = None
db.close()
return r_player
@router.patch('/{player_id}')
async def patch_player(
player_id: int, new_player: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'patch_player - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
if Player.get_or_none(Player.id == player_id) is None:
raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found')
Player.update(**new_player.dict()).where(Player.id == player_id).execute()
r_player = model_to_dict(Player.get_by_id(player_id))
db.close()
return r_player
@router.post('')
async def post_players(p_list: PlayerList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'post_players - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
new_players = []
for player in p_list.players:
dupe = Player.get_or_none(Player.season == player.season, Player.name == player.name)
if dupe:
db.close()
raise HTTPException(
status_code=500,
detail=f'Player name {player.name} already in use in Season {player.season}'
)
new_players.append(player.dict())
with db.atomic():
for batch in chunked(new_players, 15):
Player.insert_many(batch).on_conflict_replace().execute()
db.close()
return f'Inserted {len(new_players)} players'
@router.delete('/{player_id}')
async def delete_player(player_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'delete_player - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
this_player = Player.get_or_none(Player.id == player_id)
if not this_player:
db.close()
raise HTTPException(status_code=404, detail=f'Player ID {player_id} not found')
count = this_player.delete_instance()
db.close()
if count == 1:
return f'Player {player_id} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Player {player_id} could not be deleted')