paper-dynasty-database/app/routers_v2/players.py
2023-09-25 00:19:26 -05:00

643 lines
25 KiB
Python

import os.path
from fastapi import APIRouter, Depends, HTTPException, Request, Response, Query
from fastapi.responses import FileResponse
from fastapi.templating import Jinja2Templates
from html2image import Html2Image
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..card_creation import get_batter_card_html, get_batter_card_data
from ..db_engine import db, Player, model_to_dict, fn, chunked, Paperdex, Cardset, Rarity, BattingCard, \
BattingCardRatings
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/players',
tags=['players']
)
templates = Jinja2Templates(directory="storage/templates")
class PlayerPydantic(pydantic.BaseModel):
player_id: int = None
p_name: str
cost: int
image: str
image2: Optional[str] = None
mlbclub: str
franchise: str
cardset_id: int
set_num: int
rarity_id: int
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
headshot: Optional[str] = None
vanity_card: Optional[str] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
fangr_id: Optional[str] = None
description: str
quantity: Optional[int] = 999
class PlayerModel(pydantic.BaseModel):
players: List[PlayerPydantic]
@router.get('')
async def get_players(
name: Optional[str] = None, value: Optional[int] = None, min_cost: Optional[int] = None,
max_cost: Optional[int] = None, has_image2: Optional[bool] = None, mlbclub: Optional[str] = None,
franchise: Optional[str] = None, cardset_id: list = Query(default=None), rarity_id: list = Query(default=None),
pos_include: list = Query(default=None), pos_exclude: list = Query(default=None), has_headshot: Optional[bool] = None,
has_vanity_card: Optional[bool] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None,
fangr_id: Optional[str] = None, inc_dex: Optional[bool] = True, in_desc: Optional[str] = None,
flat: Optional[bool] = False, sort_by: Optional[str] = False, cardset_id_exclude: list = Query(default=None),
limit: Optional[int] = None, csv: Optional[bool] = None, short_output: Optional[bool] = False):
all_players = Player.select()
if all_players.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no players to filter')
if name is not None:
all_players = all_players.where(fn.Lower(Player.p_name) == name.lower())
if value is not None:
all_players = all_players.where(Player.cost == value)
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if has_image2 is not None:
all_players = all_players.where(Player.image2.is_null(not has_image2))
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if cardset_id_exclude is not None:
all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude))
if rarity_id is not None:
all_players = all_players.where(Player.rarity_id << rarity_id)
if pos_include is not None:
p_list = [x.upper() for x in pos_include]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
if has_headshot is not None:
all_players = all_players.where(Player.headshot.is_null(not has_headshot))
if has_vanity_card is not None:
all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card))
if strat_code is not None:
all_players = all_players.where(Player.strat_code == strat_code)
if bbref_id is not None:
all_players = all_players.where(Player.bbref_id == bbref_id)
if fangr_id is not None:
all_players = all_players.where(Player.fangr_id == fangr_id)
if in_desc is not None:
all_players = all_players.where(fn.Lower(Player.description).contains(in_desc.lower()))
if sort_by is not None:
if sort_by == 'cost-desc':
all_players = all_players.order_by(-Player.cost)
elif sort_by == 'cost-asc':
all_players = all_players.order_by(Player.cost)
elif sort_by == 'name-asc':
all_players = all_players.order_by(Player.p_name)
elif sort_by == 'name-desc':
all_players = all_players.order_by(-Player.p_name)
elif sort_by == 'rarity-desc':
all_players = all_players.order_by(Player.rarity)
elif sort_by == 'rarity-asc':
all_players = all_players.order_by(-Player.rarity)
final_players = []
# logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}')
for x in all_players:
if pos_exclude is not None and set([x.upper() for x in pos_exclude]).intersection(x.get_all_pos()):
pass
else:
final_players.append(x)
if limit is not None and len(final_players) >= limit:
break
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
all_players.order_by(-Player.rarity.value, Player.p_name)
data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']]
for line in final_players:
data_list.append(
[
line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise,
line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6,
line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id,
line.description, line.cardset.for_purchase, line.cardset.ranked_legal
# line.description, line.cardset.in_packs, line.quantity
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x, recurse=not (flat or short_output))
if inc_dex:
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@router.get('/random')
async def get_random_player(
min_cost: Optional[int] = None, max_cost: Optional[int] = None, in_packs: Optional[bool] = None,
min_rarity: Optional[int] = None, max_rarity: Optional[int] = None, limit: Optional[int] = None,
pos_include: Optional[str] = None, pos_exclude: Optional[str] = None, franchise: Optional[str] = None,
mlbclub: Optional[str] = None, cardset_id: list = Query(default=None), pos_inc: list = Query(default=None),
pos_exc: list = Query(default=None), csv: Optional[bool] = None):
all_players = (Player
.select()
.join(Cardset)
.switch(Player)
.join(Rarity)
.order_by(fn.Random()))
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if in_packs is not None:
if in_packs:
all_players = all_players.where(Player.cardset.in_packs)
if min_rarity is not None:
all_players = all_players.where(Player.rarity.value >= min_rarity)
if max_rarity is not None:
all_players = all_players.where(Player.rarity.value <= max_rarity)
if pos_include is not None:
all_players = all_players.where(
(fn.lower(Player.pos_1) == pos_include.lower()) | (fn.lower(Player.pos_2) == pos_include.lower()) |
(fn.lower(Player.pos_3) == pos_include.lower()) | (fn.lower(Player.pos_4) == pos_include.lower()) |
(fn.lower(Player.pos_5) == pos_include.lower()) | (fn.lower(Player.pos_6) == pos_include.lower()) |
(fn.lower(Player.pos_7) == pos_include.lower()) | (fn.lower(Player.pos_8) == pos_include.lower())
)
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if pos_inc is not None:
p_list = [x.upper() for x in pos_inc]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
# if pos_exc is not None:
# p_list = [x.upper() for x in pos_exc]
# logging.info(f'starting query: {all_players}\n\np_list: {p_list}\n\n')
# all_players = all_players.where(
# Player.pos_1.not_in(p_list) & Player.pos_2.not_in(p_list) & Player.pos_3.not_in(p_list) &
# Player.pos_4.not_in(p_list) & Player.pos_5.not_in(p_list) & Player.pos_6.not_in(p_list) &
# Player.pos_7.not_in(p_list) & Player.pos_8.not_in(p_list)
# )
# logging.info(f'post pos query: {all_players}')
if pos_exclude is not None and pos_exc is None:
final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()]
elif pos_exc is not None and pos_exclude is None:
final_players = []
p_list = [x.upper() for x in pos_exc]
for x in all_players:
if limit is not None and len(final_players) >= limit:
break
if not set(p_list).intersection(x.get_all_pos()):
final_players.append(x)
else:
final_players = all_players
if limit is not None:
final_players = final_players[:limit]
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
for line in final_players:
data_list.append(
[
line.id, line.p_name, line.cost, line.image, line.image2,
line.mlbclub, line.franchise, line.cardset.name, line.rarity.name,
line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5,
line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card,
line.strat_code, line.bbref_id, line.description
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x)
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{player_id}')
async def get_one_player(player_id, csv: Optional[bool] = False):
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
data_list.append(
[
this_player.id, this_player.p_name, this_player.cost, this_player.image, this_player.image2,
this_player.mlbclub, this_player.franchise, this_player.cardset.name, this_player.rarity.name,
this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_4, this_player.pos_5,
this_player.pos_6, this_player.pos_7, this_player.pos_8, this_player.headshot, this_player.vanity_card,
this_player.strat_code, this_player.bbref_id, this_player.description
]
)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_player)
this_dex = Paperdex.select().where(Paperdex.player == this_player)
return_val['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for x in this_dex:
return_val['paperdex']['paperdex'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@router.get('/{player_id}/batting-card')
async def get_player_card(
request: Request, player_id: int, variant: int = 0, d: str = None, html: Optional[bool] = False):
if os.path.isfile(f'storage/cards/{player_id}-{d}-v{variant}.png') and html is False:
db.close()
return FileResponse(
path=f'storage/cards/{player_id}-{d}-v{variant}.png',
media_type='image/png'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
this_bc = BattingCard.get_or_none(BattingCard.player == this_player, BattingCard.variant == variant)
if this_bc is None:
raise HTTPException(status_code=404, detail=f'Batting card not found for id {player_id}, variant {variant}')
rating_vl = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == 'L')
rating_vr = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == 'R')
if None in [rating_vr, rating_vl]:
raise HTTPException(status_code=404, detail=f'Ratings not found for batting card {this_bc.id}')
hti = Html2Image(
browser='chromium',
size=(1200, 600),
output_path=f'storage/cards',
custom_flags=['--no-sandbox', '--disable-remote-debugging', '--headless', '--disable-gpu',
'--disable-software-rasterizer', '--disable-dev-shm-usage']
)
card_data = {
'player': this_player,
'card_type': 'batter',
'results_vl_one': 'Big Dongs',
'results_vl_two': 'Lesser Dongs',
'results_vl_three': 'Sad Dongs',
'results_vr_one': 'Light Dongs',
'results_vr_two': 'Hefty Dongs',
'results_vr_three': 'Obese Dongs',
'request': request
}
html_response = templates.TemplateResponse("player_card.html", card_data)
if html:
db.close()
return html_response
logging.debug(f'body:\n{html_response.body.decode("UTF-8")}')
x = hti.screenshot(
html_str=str(html_response.body.decode("UTF-8")),
save_as=f'{player_id}-{d}-v{variant}.png'
)
db.close()
return FileResponse(path=x[0], media_type='image/png')
@router.patch('/{player_id}')
async def v1_players_patch(
player_id, name: Optional[str] = None, image: Optional[str] = None, image2: Optional[str] = None,
mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: Optional[int] = None,
rarity_id: Optional[int] = None, pos_1: Optional[str] = None, pos_2: Optional[str] = None,
pos_3: Optional[str] = None, pos_4: Optional[str] = None, pos_5: Optional[str] = None,
pos_6: Optional[str] = None, pos_7: Optional[str] = None, pos_8: Optional[str] = None,
headshot: Optional[str] = None, vanity_card: Optional[str] = None, strat_code: Optional[str] = None,
bbref_id: Optional[str] = None, description: Optional[str] = None, cost: Optional[int] = None,
fangr_id: Optional[str] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if cost is not None:
this_player.cost = cost
if name is not None:
this_player.p_name = name
if image is not None:
this_player.image = image
if image2 is not None:
if image2.lower() == 'false':
this_player.image2 = None
else:
this_player.image2 = image2
if mlbclub is not None:
this_player.mlbclub = mlbclub
if franchise is not None:
this_player.franchise = franchise
if cardset_id is not None:
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
this_player.cardset = this_cardset
if rarity_id is not None:
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
this_player.rarity = this_rarity
if pos_1 is not None:
if pos_1 == 'False':
this_player.pos_1 = None
else:
this_player.pos_1 = pos_1
if pos_2 is not None:
if pos_2 == 'False':
this_player.pos_2 = None
else:
this_player.pos_2 = pos_2
if pos_3 is not None:
if pos_3 == 'False':
this_player.pos_3 = None
else:
this_player.pos_3 = pos_3
if pos_4 is not None:
if pos_4 == 'False':
this_player.pos_4 = None
else:
this_player.pos_4 = pos_4
if pos_5 is not None:
if pos_5 == 'False':
this_player.pos_5 = None
else:
this_player.pos_5 = pos_5
if pos_6 is not None:
if pos_6 == 'False':
this_player.pos_6 = None
else:
this_player.pos_6 = pos_6
if pos_7 is not None:
if pos_7 == 'False':
this_player.pos_7 = None
else:
this_player.pos_7 = pos_7
if pos_8 is not None:
if pos_8 == 'False':
this_player.pos_8 = None
else:
this_player.pos_8 = pos_8
if headshot is not None:
this_player.headshot = headshot
if vanity_card is not None:
this_player.vanity_card = vanity_card
if strat_code is not None:
this_player.strat_code = strat_code
if bbref_id is not None:
this_player.bbref_id = bbref_id
if fangr_id is not None:
this_player.fangr_id = fangr_id
if description is not None:
this_player.description = description
if this_player.save() == 1:
return_val = model_to_dict(this_player)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.put('')
async def put_players(players: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post players. This event has been logged.'
)
new_players = []
for x in players.players:
# this_player = Player(
# player_id=x.player_id,
# p_name=x.p_name,
# cost=x.cost,
# image=x.image,
# image2=x.image2,
# mlbclub=x.mlbclub,
# franchise=x.franchise,
# cardset_id=x.cardset_id,
# rarity_id=x.rarity_id,
# set_num=x.set_num,
# pos_1=x.pos_1,
# pos_2=x.pos_2,
# pos_3=x.pos_3,
# pos_4=x.pos_4,
# pos_5=x.pos_5,
# pos_6=x.pos_6,
# pos_7=x.pos_7,
# pos_8=x.pos_8,
# headshot=x.headshot,
# vanity_card=x.vanity_card,
# strat_code=x.strat_code,
# fangr_id=x.fangr_id,
# bbref_id=x.bbref_id,
# description=x.description
# )
# new_players.append(this_player)
new_players.append({
'player_id': x.player_id,
'p_name': x.p_name,
'cost': x.cost,
'image': x.image,
'image2': x.image2,
'mlbclub': x.mlbclub.title(),
'franchise': x.franchise.title(),
'cardset_id': x.cardset_id,
'rarity_id': x.rarity_id,
'set_num': x.set_num,
'pos_1': x.pos_1,
'pos_2': x.pos_2,
'pos_3': x.pos_3,
'pos_4': x.pos_4,
'pos_5': x.pos_5,
'pos_6': x.pos_6,
'pos_7': x.pos_7,
'pos_8': x.pos_8,
'headshot': x.headshot,
'vanity_card': x.vanity_card,
'strat_code': x.strat_code,
'fangr_id': x.fangr_id,
'bbref_id': x.bbref_id,
'description': x.description
})
logging.info(f'new_players: {new_players}')
with db.atomic():
# Player.bulk_create(new_players, batch_size=15)
for batch in chunked(new_players, 15):
logging.info(f'batch: {batch}')
Player.insert_many(batch).on_conflict_replace().execute()
db.close()
# sheets.update_all_players(SHEETS_AUTH)
raise HTTPException(status_code=200, detail=f'{len(new_players)} players have been added')
@router.post('')
async def post_players(new_player: PlayerPydantic, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post players. This event has been logged.'
)
dupe_query = Player.select().where(
(Player.bbref_id == new_player.bbref_id) & (Player.cardset_id == new_player.cardset_id)
)
if dupe_query.count() != 0:
db.close()
raise HTTPException(
status_code=400,
detail=f'This appears to be a duplicate with player {dupe_query[0].player_id}'
)
p_query = Player.select(Player.player_id).order_by(-Player.player_id).limit(1)
new_id = p_query[0].player_id + 1
new_player.player_id = new_id
p_id = Player.insert(new_player.dict()).execute()
return_val = model_to_dict(Player.get_by_id(p_id))
db.close()
return return_val
@router.delete('/{player_id}')
async def delete_player(player_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
count = this_player.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Player {player_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Player {player_id} was not deleted')