import os.path import base64 from fastapi import APIRouter, Depends, HTTPException, Request, Response, Query from fastapi.responses import FileResponse from fastapi.templating import Jinja2Templates from html2image import Html2Image from typing import Optional, List, Literal import logging import pydantic from pandas import DataFrame from ..card_creation import get_batter_card_data, get_pitcher_card_data from ..db_engine import db, Player, model_to_dict, fn, chunked, Paperdex, Cardset, Rarity, BattingCard, \ BattingCardRatings, PitchingCard, PitchingCardRatings, CardPosition from ..dependencies import oauth2_scheme, valid_token, LOG_DATA logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) router = APIRouter( prefix='/api/v2/players', tags=['players'] ) templates = Jinja2Templates(directory="storage/templates") class PlayerPydantic(pydantic.BaseModel): player_id: int = None p_name: str cost: int image: str image2: Optional[str] = None mlbclub: str franchise: str cardset_id: int set_num: int rarity_id: int pos_1: str pos_2: Optional[str] = None pos_3: Optional[str] = None pos_4: Optional[str] = None pos_5: Optional[str] = None pos_6: Optional[str] = None pos_7: Optional[str] = None pos_8: Optional[str] = None headshot: Optional[str] = None vanity_card: Optional[str] = None strat_code: Optional[str] = None bbref_id: Optional[str] = None fangr_id: Optional[str] = None description: str quantity: Optional[int] = 999 class PlayerModel(pydantic.BaseModel): players: List[PlayerPydantic] @router.get('') async def get_players( name: Optional[str] = None, value: Optional[int] = None, min_cost: Optional[int] = None, max_cost: Optional[int] = None, has_image2: Optional[bool] = None, mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: list = Query(default=None), rarity_id: list = Query(default=None), pos_include: list = Query(default=None), pos_exclude: list = Query(default=None), has_headshot: Optional[bool] = None, has_vanity_card: Optional[bool] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None, fangr_id: Optional[str] = None, inc_dex: Optional[bool] = True, in_desc: Optional[str] = None, flat: Optional[bool] = False, sort_by: Optional[str] = False, cardset_id_exclude: list = Query(default=None), limit: Optional[int] = None, csv: Optional[bool] = None, short_output: Optional[bool] = False): all_players = Player.select() if all_players.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no players to filter') if name is not None: all_players = all_players.where(fn.Lower(Player.p_name) == name.lower()) if value is not None: all_players = all_players.where(Player.cost == value) if min_cost is not None: all_players = all_players.where(Player.cost >= min_cost) if max_cost is not None: all_players = all_players.where(Player.cost <= max_cost) if has_image2 is not None: all_players = all_players.where(Player.image2.is_null(not has_image2)) if mlbclub is not None: all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower()) if franchise is not None: all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower()) if cardset_id is not None: all_players = all_players.where(Player.cardset_id << cardset_id) if cardset_id_exclude is not None: all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude)) if rarity_id is not None: all_players = all_players.where(Player.rarity_id << rarity_id) if pos_include is not None: p_list = [x.upper() for x in pos_include] all_players = all_players.where( (Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) | (Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list) ) if has_headshot is not None: all_players = all_players.where(Player.headshot.is_null(not has_headshot)) if has_vanity_card is not None: all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card)) if strat_code is not None: all_players = all_players.where(Player.strat_code == strat_code) if bbref_id is not None: all_players = all_players.where(Player.bbref_id == bbref_id) if fangr_id is not None: all_players = all_players.where(Player.fangr_id == fangr_id) if in_desc is not None: all_players = all_players.where(fn.Lower(Player.description).contains(in_desc.lower())) if sort_by is not None: if sort_by == 'cost-desc': all_players = all_players.order_by(-Player.cost) elif sort_by == 'cost-asc': all_players = all_players.order_by(Player.cost) elif sort_by == 'name-asc': all_players = all_players.order_by(Player.p_name) elif sort_by == 'name-desc': all_players = all_players.order_by(-Player.p_name) elif sort_by == 'rarity-desc': all_players = all_players.order_by(Player.rarity) elif sort_by == 'rarity-asc': all_players = all_players.order_by(-Player.rarity) final_players = [] # logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}') for x in all_players: if pos_exclude is not None and set([x.upper() for x in pos_exclude]).intersection(x.get_all_pos()): pass else: final_players.append(x) if limit is not None and len(final_players) >= limit: break # if len(final_players) == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No players found') if csv: all_players.order_by(-Player.rarity.value, Player.p_name) data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']] for line in final_players: data_list.append( [ line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise, line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id, line.description, line.cardset.for_purchase, line.cardset.ranked_legal # line.description, line.cardset.in_packs, line.quantity ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': len(final_players), 'players': []} for x in final_players: this_record = model_to_dict(x, recurse=not (flat or short_output)) if inc_dex: this_dex = Paperdex.select().where(Paperdex.player == x) this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for y in this_dex: this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False)) return_val['players'].append(this_record) # return_val['players'].append(model_to_dict(x, recurse=not flat)) db.close() return return_val @router.get('/random') async def get_random_player( min_cost: Optional[int] = None, max_cost: Optional[int] = None, in_packs: Optional[bool] = None, min_rarity: Optional[int] = None, max_rarity: Optional[int] = None, limit: Optional[int] = None, pos_include: Optional[str] = None, pos_exclude: Optional[str] = None, franchise: Optional[str] = None, mlbclub: Optional[str] = None, cardset_id: list = Query(default=None), pos_inc: list = Query(default=None), pos_exc: list = Query(default=None), csv: Optional[bool] = None): all_players = (Player .select() .join(Cardset) .switch(Player) .join(Rarity) .order_by(fn.Random())) if min_cost is not None: all_players = all_players.where(Player.cost >= min_cost) if max_cost is not None: all_players = all_players.where(Player.cost <= max_cost) if in_packs is not None: if in_packs: all_players = all_players.where(Player.cardset.in_packs) if min_rarity is not None: all_players = all_players.where(Player.rarity.value >= min_rarity) if max_rarity is not None: all_players = all_players.where(Player.rarity.value <= max_rarity) if pos_include is not None: all_players = all_players.where( (fn.lower(Player.pos_1) == pos_include.lower()) | (fn.lower(Player.pos_2) == pos_include.lower()) | (fn.lower(Player.pos_3) == pos_include.lower()) | (fn.lower(Player.pos_4) == pos_include.lower()) | (fn.lower(Player.pos_5) == pos_include.lower()) | (fn.lower(Player.pos_6) == pos_include.lower()) | (fn.lower(Player.pos_7) == pos_include.lower()) | (fn.lower(Player.pos_8) == pos_include.lower()) ) if franchise is not None: all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower()) if mlbclub is not None: all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower()) if cardset_id is not None: all_players = all_players.where(Player.cardset_id << cardset_id) if pos_inc is not None: p_list = [x.upper() for x in pos_inc] all_players = all_players.where( (Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) | (Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list) ) # if pos_exc is not None: # p_list = [x.upper() for x in pos_exc] # logging.info(f'starting query: {all_players}\n\np_list: {p_list}\n\n') # all_players = all_players.where( # Player.pos_1.not_in(p_list) & Player.pos_2.not_in(p_list) & Player.pos_3.not_in(p_list) & # Player.pos_4.not_in(p_list) & Player.pos_5.not_in(p_list) & Player.pos_6.not_in(p_list) & # Player.pos_7.not_in(p_list) & Player.pos_8.not_in(p_list) # ) # logging.info(f'post pos query: {all_players}') if pos_exclude is not None and pos_exc is None: final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()] elif pos_exc is not None and pos_exclude is None: final_players = [] p_list = [x.upper() for x in pos_exc] for x in all_players: if limit is not None and len(final_players) >= limit: break if not set(p_list).intersection(x.get_all_pos()): final_players.append(x) else: final_players = all_players if limit is not None: final_players = final_players[:limit] # if len(final_players) == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No players found') if csv: data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description']] for line in final_players: data_list.append( [ line.id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise, line.cardset.name, line.rarity.name, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id, line.description ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': len(final_players), 'players': []} for x in final_players: this_record = model_to_dict(x) this_dex = Paperdex.select().where(Paperdex.player == x) this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for y in this_dex: this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False)) return_val['players'].append(this_record) # return_val['players'].append(model_to_dict(x)) db.close() return return_val @router.get('/{player_id}') async def get_one_player(player_id, csv: Optional[bool] = False): try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') if csv: data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description']] return_val = DataFrame(data_list).to_csv(header=False, index=False) data_list.append( [ this_player.id, this_player.p_name, this_player.cost, this_player.image, this_player.image2, this_player.mlbclub, this_player.franchise, this_player.cardset.name, this_player.rarity.name, this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_4, this_player.pos_5, this_player.pos_6, this_player.pos_7, this_player.pos_8, this_player.headshot, this_player.vanity_card, this_player.strat_code, this_player.bbref_id, this_player.description ] ) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_player) this_dex = Paperdex.select().where(Paperdex.player == this_player) return_val['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for x in this_dex: return_val['paperdex']['paperdex'].append(model_to_dict(x, recurse=False)) db.close() return return_val @router.get('/{player_id}/{card_type}card') async def get_batter_card( request: Request, player_id: int, card_type: Literal['batting', 'pitching'], variant: int = 0, d: str = None, html: Optional[bool] = False): try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') if os.path.isfile(f'storage/cards/cardset-{this_player.cardset.id}/{player_id}-{d}-v{variant}.png') and html is False: db.close() return FileResponse( path=f'storage/cards/cardset-{this_player.cardset.id}/{player_id}-{d}-v{variant}.png', media_type='image/png' ) all_pos = CardPosition.select().where(CardPosition.player == this_player).order_by(CardPosition.innings.desc()) if card_type == 'batting': this_bc = BattingCard.get_or_none(BattingCard.player == this_player, BattingCard.variant == variant) if this_bc is None: raise HTTPException(status_code=404, detail=f'Batting card not found for id {player_id}, variant {variant}') rating_vl = BattingCardRatings.get_or_none( BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == 'L') rating_vr = BattingCardRatings.get_or_none( BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == 'R') if None in [rating_vr, rating_vl]: raise HTTPException(status_code=404, detail=f'Ratings not found for batting card {this_bc.id}') card_data = get_batter_card_data(this_player, this_bc, rating_vl, rating_vr, all_pos) card_data['request'] = request html_response = templates.TemplateResponse("player_card.html", card_data) else: this_pc = PitchingCard.get_or_none(PitchingCard.player == this_player, PitchingCard.variant == variant) if this_pc is None: raise HTTPException( status_code=404, detail=f'Pitching card not found for id {player_id}, variant {variant}') rating_vl = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard == this_pc, PitchingCardRatings.vs_hand == 'L') rating_vr = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard == this_pc, PitchingCardRatings.vs_hand == 'R') if None in [rating_vr, rating_vl]: raise HTTPException(status_code=404, detail=f'Ratings not found for pitching card {this_pc.id}') card_data = get_pitcher_card_data(this_player, this_pc, rating_vl, rating_vr, all_pos) card_data['request'] = request html_response = templates.TemplateResponse("player_card.html", card_data) if html: db.close() return html_response updates = 0 if card_type == 'batting': updates += BattingCardRatings.update(card_data['new_ratings_vl'].dict()).where( (BattingCardRatings.id == rating_vl.id) ).execute() updates += BattingCardRatings.update(card_data['new_ratings_vr'].dict()).where( (BattingCardRatings.id == rating_vr.id) ).execute() else: updates += PitchingCardRatings.update(card_data['new_ratings_vl'].dict()).where( (PitchingCardRatings.id == rating_vl.id) ).execute() updates += PitchingCardRatings.update(card_data['new_ratings_vr'].dict()).where( (PitchingCardRatings.id == rating_vr.id) ).execute() logging.info(f'Rating updates: {updates}') hti = Html2Image( browser='chromium', size=(1200, 600), output_path=f'storage/cards/cardset-{this_player.cardset.id}/', custom_flags=['--no-sandbox', '--disable-remote-debugging', '--headless', '--disable-gpu', '--disable-software-rasterizer', '--disable-dev-shm-usage'] ) logging.debug(f'body:\n{html_response.body.decode("UTF-8")}') x = hti.screenshot( html_str=str(html_response.body.decode("UTF-8")), save_as=f'{player_id}-{d}-v{variant}.png' ) db.close() return FileResponse(path=x[0], media_type='image/png') # @router.get('/{player_id}/pitchingcard') # async def get_pitcher_card( # request: Request, player_id: int, variant: int = 0, d: str = None, html: Optional[bool] = False) @router.patch('/{player_id}') async def v1_players_patch( player_id, name: Optional[str] = None, image: Optional[str] = None, image2: Optional[str] = None, mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: Optional[int] = None, rarity_id: Optional[int] = None, pos_1: Optional[str] = None, pos_2: Optional[str] = None, pos_3: Optional[str] = None, pos_4: Optional[str] = None, pos_5: Optional[str] = None, pos_6: Optional[str] = None, pos_7: Optional[str] = None, pos_8: Optional[str] = None, headshot: Optional[str] = None, vanity_card: Optional[str] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None, description: Optional[str] = None, cost: Optional[int] = None, fangr_id: Optional[str] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch players. This event has been logged.' ) try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') if cost is not None: this_player.cost = cost if name is not None: this_player.p_name = name if image is not None: this_player.image = image if image2 is not None: if image2.lower() == 'false': this_player.image2 = None else: this_player.image2 = image2 if mlbclub is not None: this_player.mlbclub = mlbclub if franchise is not None: this_player.franchise = franchise if cardset_id is not None: try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') this_player.cardset = this_cardset if rarity_id is not None: try: this_rarity = Rarity.get_by_id(rarity_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}') this_player.rarity = this_rarity if pos_1 is not None: if pos_1 in ['None', 'False', '']: this_player.pos_1 = None else: this_player.pos_1 = pos_1 if pos_2 is not None: if pos_2 in ['None', 'False', '']: this_player.pos_2 = None else: this_player.pos_2 = pos_2 if pos_3 is not None: if pos_3 in ['None', 'False', '']: this_player.pos_3 = None else: this_player.pos_3 = pos_3 if pos_4 is not None: if pos_4 in ['None', 'False', '']: this_player.pos_4 = None else: this_player.pos_4 = pos_4 if pos_5 is not None: if pos_5 in ['None', 'False', '']: this_player.pos_5 = None else: this_player.pos_5 = pos_5 if pos_6 is not None: if pos_6 in ['None', 'False', '']: this_player.pos_6 = None else: this_player.pos_6 = pos_6 if pos_7 is not None: if pos_7 in ['None', 'False', '']: this_player.pos_7 = None else: this_player.pos_7 = pos_7 if pos_8 is not None: if pos_8 in ['None', 'False', '']: this_player.pos_8 = None else: this_player.pos_8 = pos_8 if headshot is not None: this_player.headshot = headshot if vanity_card is not None: this_player.vanity_card = vanity_card if strat_code is not None: this_player.strat_code = strat_code if bbref_id is not None: this_player.bbref_id = bbref_id if fangr_id is not None: this_player.fangr_id = fangr_id if description is not None: this_player.description = description if this_player.save() == 1: return_val = model_to_dict(this_player) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @router.put('') async def put_players(players: PlayerModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post players. This event has been logged.' ) new_players = [] for x in players.players: # this_player = Player( # player_id=x.player_id, # p_name=x.p_name, # cost=x.cost, # image=x.image, # image2=x.image2, # mlbclub=x.mlbclub, # franchise=x.franchise, # cardset_id=x.cardset_id, # rarity_id=x.rarity_id, # set_num=x.set_num, # pos_1=x.pos_1, # pos_2=x.pos_2, # pos_3=x.pos_3, # pos_4=x.pos_4, # pos_5=x.pos_5, # pos_6=x.pos_6, # pos_7=x.pos_7, # pos_8=x.pos_8, # headshot=x.headshot, # vanity_card=x.vanity_card, # strat_code=x.strat_code, # fangr_id=x.fangr_id, # bbref_id=x.bbref_id, # description=x.description # ) # new_players.append(this_player) new_players.append({ 'player_id': x.player_id, 'p_name': x.p_name, 'cost': x.cost, 'image': x.image, 'image2': x.image2, 'mlbclub': x.mlbclub.title(), 'franchise': x.franchise.title(), 'cardset_id': x.cardset_id, 'rarity_id': x.rarity_id, 'set_num': x.set_num, 'pos_1': x.pos_1, 'pos_2': x.pos_2, 'pos_3': x.pos_3, 'pos_4': x.pos_4, 'pos_5': x.pos_5, 'pos_6': x.pos_6, 'pos_7': x.pos_7, 'pos_8': x.pos_8, 'headshot': x.headshot, 'vanity_card': x.vanity_card, 'strat_code': x.strat_code, 'fangr_id': x.fangr_id, 'bbref_id': x.bbref_id, 'description': x.description }) logging.info(f'new_players: {new_players}') with db.atomic(): # Player.bulk_create(new_players, batch_size=15) for batch in chunked(new_players, 15): logging.info(f'batch: {batch}') Player.insert_many(batch).on_conflict_replace().execute() db.close() # sheets.update_all_players(SHEETS_AUTH) raise HTTPException(status_code=200, detail=f'{len(new_players)} players have been added') @router.post('') async def post_players(new_player: PlayerPydantic, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post players. This event has been logged.' ) dupe_query = Player.select().where( (Player.bbref_id == new_player.bbref_id) & (Player.cardset_id == new_player.cardset_id) ) if dupe_query.count() != 0: db.close() raise HTTPException( status_code=400, detail=f'This appears to be a duplicate with player {dupe_query[0].player_id}' ) p_query = Player.select(Player.player_id).order_by(-Player.player_id).limit(1) new_id = p_query[0].player_id + 1 new_player.player_id = new_id p_id = Player.insert(new_player.dict()).execute() return_val = model_to_dict(Player.get_by_id(p_id)) db.close() return return_val @router.delete('/{player_id}') async def delete_player(player_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete players. This event has been logged.' ) try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') count = this_player.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Player {player_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Player {player_id} was not deleted')