from datetime import datetime import logging import os from db_engine import * from typing import Optional, List, Union from fastapi import FastAPI, HTTPException, Depends, Response, Query from fastapi.security import OAuth2PasswordBearer import pydantic import pygsheets import sheets from playhouse.shortcuts import model_to_dict from pandas import DataFrame raw_log_level = os.getenv('LOG_LEVEL') if raw_log_level == 'INFO': log_level = logging.INFO elif raw_log_level == 'WARN': log_level = logging.WARN else: log_level = logging.ERROR date = f'{datetime.now().year}-{datetime.now().month}-{datetime.now().day}' logging.basicConfig( filename=f'logs/{date}.log', format='%(asctime)s - %(levelname)s - %(message)s', level=log_level ) app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") DEFAULT_SEASON = 4 SHEETS_AUTH = pygsheets.authorize(service_file='storage/paper-dynasty-service-creds.json', retries=1) def valid_token(token): if token == os.environ.get('API_TOKEN'): return True else: return False def int_timestamp(datetime_obj: datetime) -> int: return int(datetime.timestamp(datetime_obj) * 1000) """ CURRENT ENDPOINTS """ class CurrentModel(pydantic.BaseModel): season: int week: int gsheet_template: str gsheet_version: str @app.get('/api/v1/current') async def v1_current_get(season: Optional[int] = None, csv: Optional[bool] = False): if season: current = Current.get_or_none(season=season) else: current = Current.latest() if csv: current_list = [ ['id', 'season', 'week'], [current.id, current.season, current.week] ] return_val = DataFrame(current_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(current) db.close() return return_val @app.get('/api/v1/current/{current_id}') async def v1_current_get_one(current_id, csv: Optional[bool] = False): try: current = Current.get_by_id(current_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No current found with id {current_id}') if csv: current_list = [ ['id', 'season', 'week'], [current.id, current.season, current.week] ] return_val = DataFrame(current_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(current) db.close() return return_val @app.post('/api/v1/current') async def v1_current_post(current: CurrentModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post current. This event has been logged.' ) dupe_curr = Current.get_or_none(Current.season == current.season) if dupe_curr: db.close() raise HTTPException(status_code=400, detail=f'There is already a current for season {current.season}') this_curr = Current( season=current.season, week=current.week, gsheet_template=current.gsheet_template, gsheet_version=current.gsheet_version ) saved = this_curr.save() if saved == 1: return_val = model_to_dict(this_curr) db.close() return return_val else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @app.patch('/api/v1/current/{current_id}') async def v1_current_patch( current_id: int, season: Optional[int] = None, week: Optional[int] = None, gsheet_template: Optional[str] = None, gsheet_version: Optional[str] = None, live_scoreboard: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch current. This event has been logged.' ) try: current = Current.get_by_id(current_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No current found with id {current_id}') if season is not None: current.season = season if week is not None: current.week = week if gsheet_template is not None: current.gsheet_template = gsheet_template if gsheet_version is not None: current.gsheet_version = gsheet_version if live_scoreboard is not None: current.live_scoreboard = live_scoreboard if current.save() == 1: return_val = model_to_dict(current) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that current' ) @app.delete('/api/v1/current/{current_id}') async def v1_current_delete(current_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete current. This event has been logged.' ) try: this_curr = Current.get_by_id(current_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No current found with id {current_id}') count = this_curr.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Current {current_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Current {current_id} was not deleted') """ TEAMS ENDPOINTS """ class TeamModel(pydantic.BaseModel): abbrev: str sname: str lname: str gmid: int gmname: str wallet: int = 0 gsheet: str team_value: int = 0 collection_value: int = 0 logo: Optional[str] = None color: Optional[str] = None season: int ps_shiny: Optional[int] = 0 ranking: Optional[int] = 1000 has_guide: Optional[bool] = False is_ai: Optional[bool] = False @app.get('/api/v1/teams') async def v1_teams_get( season: Optional[int] = None, gm_id: Optional[int] = None, abbrev: Optional[str] = None, tv_min: Optional[int] = None, tv_max: Optional[int] = None, cv_min: Optional[int] = None, cv_max: Optional[int] = None, ps_shiny_min: Optional[int] = None, ps_shiny_max: Optional[int] = None, ranking_min: Optional[int] = None, ranking_max: Optional[int] = None, has_guide: Optional[bool] = None, sname: Optional[str] = None, lname: Optional[str] = None, is_ai: Optional[bool] = None, limit: Optional[int] = None, csv: Optional[bool] = False): """ Param: season: int Param: team_abbrev: string Param: owner_id: int """ if season: all_teams = Team.select_season(season) else: all_teams = Team.select() # if all_teams.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no teams to filter') if gm_id is not None: all_teams = all_teams.where(Team.gmid == gm_id) if abbrev is not None: all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower()) if sname is not None: all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower()) if lname is not None: all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower()) if tv_min is not None: all_teams = all_teams.where(Team.team_value >= tv_min) if tv_max is not None: all_teams = all_teams.where(Team.team_value <= tv_max) if cv_min is not None: all_teams = all_teams.where(Team.collection_value >= cv_min) if cv_max is not None: all_teams = all_teams.where(Team.collection_value <= cv_max) if ps_shiny_min is not None: all_teams = all_teams.where(Team.career >= ps_shiny_min) if ps_shiny_max is not None: all_teams = all_teams.where(Team.career <= ps_shiny_max) if ranking_min is not None: all_teams = all_teams.where(Team.ranking >= ranking_min) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if has_guide is not None: if not has_guide: all_teams = all_teams.where(Team.has_guide == 0) else: all_teams = all_teams.where(Team.has_guide == 1) if is_ai is not None: all_teams = all_teams.where(Team.is_ai) if limit is not None: all_teams = all_teams.limit(limit) # if all_teams.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No teams found') if csv: data_list = [[ 'id', 'abbrev', 'sname', 'lname', 'gmid', 'gmname', 'wallet', 'gsheet', 'team_value', 'collection_value', 'logo', 'color', 'season', 'ranking' ]] for line in all_teams: data_list.append( [ line.id, line.abbrev, line.sname, line.lname, line.gmid, line.gmname, line.wallet, line.gsheet, line.team_value, line.collection_value, line.logo, f'\'{line.color}', line.season, line.ranking ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_teams = {'count': all_teams.count(), 'teams': []} for x in all_teams: return_teams['teams'].append(model_to_dict(x)) db.close() return return_teams @app.get('/api/v1/teams/{team_id}') async def v1_teams_get_one(team_id, csv: Optional[bool] = False): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if csv: team_packs = Pack.select().where((Pack.team == this_team) & (Pack.open_time.is_null(True))) data_list = [ ['id', 'abbrev', 'sname', 'lname', 'gmid', 'gmname', 'wallet', 'ranking', 'gsheet', 'sealed_packs', 'collection_value', 'logo', 'color', 'season'], [this_team.id, this_team.abbrev, this_team.sname, this_team.lname, this_team.gmid, this_team.gmname, this_team.wallet, this_team.ranking, this_team.gsheet, team_packs.count(), this_team.collection_value, this_team.logo, this_team.color, this_team.season] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_team) db.close() return return_val @app.get('/api/v1/teams/{team_id}/buy/players') async def v1_team_cards_buy(team_id: int, ids: str, ts: int): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() raise HTTPException( status_code=401, detail=f'You are not authorized to buy {this_team.abbrev} cards. This event has been logged.' ) last_card = Card.select(Card.id).order_by(-Card.id).limit(1) lc_id = last_card[0].id all_ids = ids.split(',') conf_message = '' total_cost = 0 for player_id in all_ids: if player_id != '': try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id} /// ' f'{conf_message} purchased') # check wallet balance if this_team.wallet < this_player.cost: logging.info(f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but ' f'{this_player} costs {this_player.cost}₼.') db.close() raise HTTPException( 200, detail=f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but ' f'{this_player} costs {this_player.cost}₼. /// {conf_message} purchased' ) # Create player card and update cost buy_price = this_player.cost total_cost += buy_price this_card = Card( player_id=this_player.player_id, team_id=this_team.id, value=buy_price ) Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id) this_card.save() this_player.change_on_buy() # Deduct card cost from team logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}') this_team.wallet -= buy_price this_team.save() logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}') # Post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=int_timestamp(datetime.now()), title=f'Price Change', desc='Modified by buying and selling', field_name=f'{this_player.description}', message=f'From {buy_price}₼ 📈 to **{this_player.cost}**₼', about=f'Player-{this_player.player_id}' ) new_notif.save() conf_message += f'{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \ f'({this_player.cardset.name}), ' # sheets.post_new_cards(SHEETS_AUTH, lc_id) raise HTTPException(status_code=200, detail=f'{conf_message} purchased. /// Total Cost: {total_cost}₼ /// ' f'Final Wallet: {this_team.wallet}') @app.get('/api/v1/teams/{team_id}/buy/pack/{packtype_id}') async def v1_team_pack_buy(team_id: int, packtype_id: int, ts: int, quantity: Optional[int] = 1): try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack type found with id {packtype_id}') try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() logging.warning(f'team: {this_team} / pack_type: {this_packtype} / secret: {ts} / ' f'actual: {this_team.team_hash()}') raise HTTPException( status_code=401, detail=f'You are not authorized to buy {this_team.abbrev} packs. This event has been logged.' ) # check wallet balance total_cost = this_packtype.cost * quantity if this_team.wallet < total_cost: db.close() raise HTTPException( 200, detail=f'{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but ' f'{this_packtype} costs {this_packtype.cost}.' ) all_packs = [] for i in range(quantity): all_packs.append(Pack(team_id=this_team.id, pack_type_id=this_packtype.id)) # Deduct card cost from team logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}') this_team.wallet -= total_cost this_team.save() logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}') with db.atomic(): Pack.bulk_create(all_packs, batch_size=15) db.close() raise HTTPException( status_code=200, detail=f'Quantity {quantity} {this_packtype.name} pack{"s" if quantity > 1 else ""} have been purchased by ' f'{this_team.lname} for {total_cost} bucks. You may close this window.' ) @app.get('/api/v1/teams/{team_id}/sell/cards') async def v1_team_cards_sell(team_id: int, ids: str, ts: int): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() raise HTTPException( status_code=401, detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.' ) all_ids = ids.split(',') del_ids = [] conf_message = '' total_cost = 0 for card_id in all_ids: if card_id != '': try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No card found with id {card_id}') del_ids.append(card_id) this_player = this_card.player if this_card.team != this_team: raise HTTPException(status_code=401, detail=f'Card id {card_id} ({this_player.p_name}) belongs to ' f'{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold') orig_price = this_player.cost sell_price = round(this_player.cost * .5) total_cost += sell_price # credit selling team's wallet if this_team.wallet is None: this_team.wallet = sell_price else: this_team.wallet += sell_price this_team.save() # decrease price of player this_player.change_on_sell() this_card.delete_instance() # post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=int_timestamp(datetime.now()), title=f'Price Change', desc='Modified by buying and selling', field_name=f'{this_player.description}', message=f'From {orig_price}₼ 📉 to **{this_player.cost}**₼', about=f'Player-{this_player.id}' ) new_notif.save() conf_message += f'{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \ f'({this_player.cardset.name}), ' # sheets.post_deletion(SHEETS_AUTH, del_ids) raise HTTPException(status_code=200, detail=f'{conf_message} sold. /// Total Earned: {total_cost}₼ /// ' f'Final Wallet: {this_team.wallet}') @app.get('/api/v1/teams/{team_id}/cards') async def v1_teams_cards_get(team_id, csv: Optional[bool] = True): """ CSV output specifically targeting team roster sheet Parameters ---------- team_id csv """ try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if not csv: db.close() raise HTTPException( status_code=400, detail='The /teams/{team_id}/cards endpoint only supports csv output.' ) all_cards = (Card .select() .join(Player) .join(Rarity) .where(Card.team == this_team) .order_by(-Card.player.rarity.value, Card.player.p_name) ) if all_cards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No cards found') data_list = [[ 'cardset', 'player', 'rarity', 'image', 'image2', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'cost', 'mlbclub', 'franchise', 'set_num', 'bbref_id', 'player_id', 'card_id' ]] for line in all_cards: data_list.append( [ line.player.cardset, line.player.p_name, line.player.rarity, line.player.image, line.player.image2, line.player.pos_1, line.player.pos_2, line.player.pos_3, line.player.pos_4, line.player.pos_5, line.player.pos_6, line.player.pos_7, line.player.pos_8, line.player.cost, line.player.mlbclub, line.player.franchise, line.player.set_num, line.player.bbref_id, line.player.player_id, line.id ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') @app.post('/api/v1/teams') async def v1_teams_post(team: TeamModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post teams. This event has been logged.' ) dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev) if dupe_team: db.close() raise HTTPException(status_code=400, detail=f'There is already a season {team.season} team using {team.abbrev}') this_team = Team( abbrev=team.abbrev, sname=team.sname, lname=team.lname, gmid=team.gmid, gmname=team.gmname, wallet=team.wallet, gsheet=team.gsheet, team_value=team.team_value, collection_value=team.collection_value, logo=team.logo, color=team.color, ranking=team.ranking, season=team.season, career=team.ps_shiny, has_guide=team.has_guide, is_ai=team.is_ai ) saved = this_team.save() if saved == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @app.post('/api/v1/teams/{team_id}/money/{delta}') async def v1_teams_money_delta(team_id: int, delta: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to adjust wallets. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') this_team.wallet += delta if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @app.patch('/api/v1/teams/{team_id}') async def v1_teams_patch( team_id, sname: Optional[str] = None, lname: Optional[str] = None, gmid: Optional[int] = None, gmname: Optional[str] = None, gsheet: Optional[str] = None, team_value: Optional[int] = None, collection_value: Optional[int] = None, logo: Optional[str] = None, color: Optional[str] = None, season: Optional[int] = None, ps_shiny: Optional[int] = None, wallet_delta: Optional[int] = None, has_guide: Optional[bool] = None, is_ai: Optional[bool] = None, ranking: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete teams. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if sname is not None: this_team.sname = sname if lname is not None: this_team.lname = lname if gmid is not None: this_team.gmid = gmid if gmname is not None: this_team.gmname = gmname if gsheet is not None: this_team.gsheet = gsheet if team_value is not None: this_team.team_value = team_value if collection_value is not None: this_team.collection_value = collection_value if logo is not None: this_team.logo = logo if color is not None: this_team.color = color if season is not None: this_team.season = season if ps_shiny is not None: this_team.career = ps_shiny if ranking is not None: this_team.ranking = ranking if wallet_delta is not None: this_team.wallet += wallet_delta if has_guide is not None: if has_guide: this_team.has_guide = 1 else: this_team.has_guide = 0 if is_ai is not None: if is_ai: this_team.is_ai = 1 else: this_team.is_ai = 0 if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @app.delete('/api/v1/teams/{team_id}') async def v1_teams_delete(team_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete teams. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') count = this_team.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Team {team_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Team {team_id} was not deleted') """ RARITY ENDPOINTS """ class RarityModel(pydantic.BaseModel): value: int name: str color: str @app.get('/api/v1/rarities') async def v1_rarities_get(value: Optional[int] = None, name: Optional[str] = None, min_value: Optional[int] = None, max_value: Optional[int] = None, csv: Optional[bool] = None): all_rarities = Rarity.select() if all_rarities.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no rarities to filter') if value is not None: all_rarities = all_rarities.where(Rarity.value == value) if name is not None: all_rarities = all_rarities.where(fn.Lower(Rarity.name) == name.lower()) if min_value is not None: all_rarities = all_rarities.where(Rarity.value >= min_value) if max_value is not None: all_rarities = all_rarities.where(Rarity.value <= max_value) if all_rarities.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No rarities found') if csv: data_list = [['id', 'value', 'name']] for line in all_rarities: data_list.append( [ line.id, line.value, line.name ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_rarities.count(), 'rarities': []} for x in all_rarities: return_val['rarities'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/rarities/{rarity_id}') async def v1_rarities_get_one(rarity_id, csv: Optional[bool] = False): try: this_rarity = Rarity.get_by_id(rarity_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}') if csv: data_list = [['id', 'value', 'name']] for line in this_rarity: data_list.append( [ line.id, line.value, line.name ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_rarity) db.close() return return_val @app.post('/api/v1/rarities') async def v1_rarities_post(rarity: RarityModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post rarities. This event has been logged.' ) dupe_team = Rarity.get_or_none(Rarity.name) if dupe_team: db.close() raise HTTPException(status_code=400, detail=f'There is already a rarity using {rarity.name}') this_rarity = Rarity( value=rarity.value, name=rarity.name, color=rarity.color ) saved = this_rarity.save() if saved == 1: return_val = model_to_dict(this_rarity) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.patch('/api/v1/rarities/{rarity_id}') async def v1_rarities_patch( rarity_id, value: Optional[int] = None, name: Optional[str] = None, color: Optional[str] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch rarities. This event has been logged.' ) try: this_rarity = Rarity.get_by_id(rarity_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}') if value is not None: this_rarity.value = value if name is not None: this_rarity.name = name if color is not None: this_rarity.color = color if this_rarity.save() == 1: return_val = model_to_dict(this_rarity) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/rarities/{rarity_id}') async def v1_rarities_delete(rarity_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete rarities. This event has been logged.' ) try: this_rarity = Rarity.get_by_id(rarity_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}') count = this_rarity.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Rarity {rarity_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Rarity {rarity_id} was not deleted') """ CARDSET ENDPOINTS """ class CardsetModel(pydantic.BaseModel): name: str description: str event_id: Optional[int] = None in_packs: Optional[bool] = True total_cards: int = 0 for_purchase: Optional[bool] = True ranked_legal: Optional[bool] = True @app.get('/api/v1/cardsets') async def v1_cardsets_get( name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None, in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None): all_cardsets = Cardset.select() if all_cardsets.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no cardsets to filter') if name is not None: all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower()) if in_desc is not None: all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower())) if event_id is not None: try: this_event = Event.get_by_id(event_id) all_cardsets = all_cardsets.where(Cardset.event == this_event) except Exception as e: logging.error(f'Failed to find event {event_id}: {e}') raise HTTPException(status_code=404, detail=f'Event id {event_id} not found') if in_packs is not None: all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs) if ranked_legal is not None: all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal) if all_cardsets.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No cardsets found') if csv: data_list = [[ 'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal' ]] for line in all_cardsets: data_list.append( [ line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs, line.for_purchase, line.total_cards, line.ranked_legal ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_cardsets.count(), 'cardsets': []} for x in all_cardsets: return_val['cardsets'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/cardsets/{cardset_id}') async def v1_cardsets_get_one(cardset_id, csv: Optional[bool] = False): try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') if csv: data_list = [ ['id', 'name', 'description'], [this_cardset.id, this_cardset.name, this_cardset.description] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_cardset) db.close() return return_val @app.post('/api/v1/cardsets') async def v1_cardsets_post(cardset: CardsetModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post cardsets. This event has been logged.' ) dupe_set = Cardset.get_or_none(Cardset.name) if dupe_set: db.close() raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}') this_cardset = Cardset(**cardset.__dict__) saved = this_cardset.save() if saved == 1: return_val = model_to_dict(this_cardset) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @app.patch('/api/v1/cardsets/{cardset_id}') async def v1_cardsets_patch( cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None, for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch cardsets. This event has been logged.' ) try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') if name is not None: this_cardset.name = name if description is not None: this_cardset.description = description if in_packs is not None: this_cardset.in_packs = in_packs if for_purchase is not None: this_cardset.for_purchase = for_purchase if total_cards is not None: this_cardset.total_cards = total_cards if ranked_legal is not None: this_cardset.ranked_legal = ranked_legal if this_cardset.save() == 1: return_val = model_to_dict(this_cardset) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/cardsets/{cardset_id}') async def v1_cardsets_delete(cardset_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete cardsets. This event has been logged.' ) try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') count = this_cardset.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted') """ PLAYER ENDPOINTS """ class PlayerPydantic(pydantic.BaseModel): player_id: int p_name: str cost: int image: str image2: Optional[str] = None mlbclub: str franchise: str cardset_id: int set_num: int rarity_id: int pos_1: str pos_2: Optional[str] = None pos_3: Optional[str] = None pos_4: Optional[str] = None pos_5: Optional[str] = None pos_6: Optional[str] = None pos_7: Optional[str] = None pos_8: Optional[str] = None headshot: Optional[str] = None vanity_card: Optional[str] = None strat_code: Optional[str] = None bbref_id: Optional[str] = None fangr_id: Optional[str] = None description: str quantity: Optional[int] = 999 class PlayerModel(pydantic.BaseModel): players: List[PlayerPydantic] # NOT A TEMPLATE - BROKE MOLD FOR pos_exclude @app.get('/api/v1/players') async def v1_players_get( name: Optional[str] = None, value: Optional[int] = None, min_cost: Optional[int] = None, max_cost: Optional[int] = None, has_image2: Optional[bool] = None, mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: list = Query(default=None), rarity_id: list = Query(default=None), pos_include: list = Query(default=None), pos_exclude: list = Query(default=None), has_headshot: Optional[bool] = None, has_vanity_card: Optional[bool] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None, fangr_id: Optional[str] = None, inc_dex: Optional[bool] = True, in_desc: Optional[str] = None, flat: Optional[bool] = False, sort_by: Optional[str] = False, cardset_id_exclude: list = Query(default=None), limit: Optional[int] = None, csv: Optional[bool] = None): all_players = Player.select() if all_players.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no players to filter') if name is not None: all_players = all_players.where(fn.Lower(Player.p_name) == name.lower()) if value is not None: all_players = all_players.where(Player.cost == value) if min_cost is not None: all_players = all_players.where(Player.cost >= min_cost) if max_cost is not None: all_players = all_players.where(Player.cost <= max_cost) if has_image2 is not None: all_players = all_players.where(Player.image2.is_null(not has_image2)) if mlbclub is not None: all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower()) if franchise is not None: all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower()) if cardset_id is not None: all_players = all_players.where(Player.cardset_id << cardset_id) if cardset_id_exclude is not None: all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude)) if rarity_id is not None: all_players = all_players.where(Player.rarity_id << rarity_id) if pos_include is not None: p_list = [x.upper() for x in pos_include] all_players = all_players.where( (Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) | (Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list) ) if has_headshot is not None: all_players = all_players.where(Player.headshot.is_null(not has_headshot)) if has_vanity_card is not None: all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card)) if strat_code is not None: all_players = all_players.where(Player.strat_code == strat_code) if bbref_id is not None: all_players = all_players.where(Player.bbref_id == bbref_id) if fangr_id is not None: all_players = all_players.where(Player.fangr_id == fangr_id) if in_desc is not None: all_players = all_players.where(fn.Lower(Player.description).contains(in_desc.lower())) if sort_by is not None: if sort_by == 'cost-desc': all_players = all_players.order_by(-Player.cost) elif sort_by == 'cost-asc': all_players = all_players.order_by(Player.cost) elif sort_by == 'name-asc': all_players = all_players.order_by(Player.p_name) elif sort_by == 'name-desc': all_players = all_players.order_by(-Player.p_name) elif sort_by == 'rarity-desc': all_players = all_players.order_by(Player.rarity) elif sort_by == 'rarity-asc': all_players = all_players.order_by(-Player.rarity) final_players = [] # logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}') for x in all_players: if pos_exclude is not None and set([x.upper() for x in pos_exclude]).intersection(x.get_all_pos()): pass else: final_players.append(x) if limit is not None and len(final_players) >= limit: break if len(final_players) == 0: db.close() raise HTTPException(status_code=404, detail=f'No players found') if csv: all_players.order_by(-Player.rarity.value, Player.p_name) data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']] for line in final_players: data_list.append( [ line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise, line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id, line.description, line.cardset.for_purchase, line.cardset.ranked_legal # line.description, line.cardset.in_packs, line.quantity ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': len(final_players), 'players': []} for x in final_players: this_record = model_to_dict(x, recurse=not flat) if inc_dex: this_dex = Paperdex.select().where(Paperdex.player == x) this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for y in this_dex: this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False)) return_val['players'].append(this_record) # return_val['players'].append(model_to_dict(x, recurse=not flat)) db.close() return return_val @app.get('/api/v1/players/random') async def v1_players_get_random( min_cost: Optional[int] = None, max_cost: Optional[int] = None, in_packs: Optional[bool] = True, min_rarity: Optional[int] = None, max_rarity: Optional[int] = None, limit: Optional[int] = None, pos_include: Optional[str] = None, pos_exclude: Optional[str] = None, franchise: Optional[str] = None, mlbclub: Optional[str] = None, csv: Optional[bool] = None): all_players = (Player .select() .join(Cardset) .switch(Player) .join(Rarity) .order_by(fn.Random())) if min_cost is not None: all_players = all_players.where(Player.cost >= min_cost) if max_cost is not None: all_players = all_players.where(Player.cost <= max_cost) if in_packs is not None: if in_packs: all_players = all_players.where(Player.cardset.in_packs) if min_rarity is not None: all_players = all_players.where(Player.rarity.value >= min_rarity) if max_rarity is not None: all_players = all_players.where(Player.rarity.value <= max_rarity) if pos_include is not None: all_players = all_players.where( (fn.lower(Player.pos_1) == pos_include.lower()) | (fn.lower(Player.pos_2) == pos_include.lower()) | (fn.lower(Player.pos_3) == pos_include.lower()) | (fn.lower(Player.pos_4) == pos_include.lower()) | (fn.lower(Player.pos_5) == pos_include.lower()) | (fn.lower(Player.pos_6) == pos_include.lower()) | (fn.lower(Player.pos_7) == pos_include.lower()) | (fn.lower(Player.pos_8) == pos_include.lower()) ) if franchise is not None: all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower()) if mlbclub is not None: all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower()) if pos_exclude: final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()] else: final_players = [x for x in all_players] if limit is not None: final_players = final_players[:limit] # if len(final_players) == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No players found') if csv: data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description']] for line in final_players: data_list.append( [ line.id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise, line.cardset.name, line.rarity.name, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id, line.description ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': len(final_players), 'players': []} for x in final_players: this_record = model_to_dict(x) this_dex = Paperdex.select().where(Paperdex.player == x) this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for y in this_dex: this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False)) return_val['players'].append(this_record) # return_val['players'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/players/{player_id}') async def v1_players_get_one(player_id, csv: Optional[bool] = False): try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') if csv: data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card', 'strat_code', 'bbref_id', 'description']] return_val = DataFrame(data_list).to_csv(header=False, index=False) data_list.append( [ this_player.id, this_player.p_name, this_player.cost, this_player.image, this_player.image2, this_player.mlbclub, this_player.franchise, this_player.cardset.name, this_player.rarity.name, this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_4, this_player.pos_5, this_player.pos_6, this_player.pos_7, this_player.pos_8, this_player.headshot, this_player.vanity_card, this_player.strat_code, this_player.bbref_id, this_player.description ] ) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_player) this_dex = Paperdex.select().where(Paperdex.player == this_player) return_val['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for x in this_dex: return_val['paperdex']['paperdex'].append(model_to_dict(x, recurse=False)) db.close() return return_val @app.patch('/api/v1/players/{player_id}') async def v1_players_patch( player_id, name: Optional[str] = None, image: Optional[str] = None, image2: Optional[str] = None, mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: Optional[int] = None, rarity_id: Optional[int] = None, pos_1: Optional[str] = None, pos_2: Optional[str] = None, pos_3: Optional[str] = None, pos_4: Optional[str] = None, pos_5: Optional[str] = None, pos_6: Optional[str] = None, pos_7: Optional[str] = None, pos_8: Optional[str] = None, headshot: Optional[str] = None, vanity_card: Optional[str] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None, description: Optional[str] = None, cost: Optional[int] = None, fangr_id: Optional[str] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch players. This event has been logged.' ) try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') if cost is not None: this_player.cost = cost if name is not None: this_player.p_name = name if image is not None: this_player.image = image if image2 is not None: if image2.lower() == 'false': this_player.image2 = None else: this_player.image2 = image2 if mlbclub is not None: this_player.mlbclub = mlbclub if franchise is not None: this_player.franchise = franchise if cardset_id is not None: try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') this_player.cardset = this_cardset if rarity_id is not None: try: this_rarity = Rarity.get_by_id(rarity_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}') this_player.rarity = this_rarity if pos_1 is not None: if pos_1 == 'False': this_player.pos_1 = None else: this_player.pos_1 = pos_1 if pos_2 is not None: if pos_2 == 'False': this_player.pos_2 = None else: this_player.pos_2 = pos_2 if pos_3 is not None: if pos_3 == 'False': this_player.pos_3 = None else: this_player.pos_3 = pos_3 if pos_4 is not None: if pos_4 == 'False': this_player.pos_4 = None else: this_player.pos_4 = pos_4 if pos_5 is not None: if pos_5 == 'False': this_player.pos_5 = None else: this_player.pos_5 = pos_5 if pos_6 is not None: if pos_6 == 'False': this_player.pos_6 = None else: this_player.pos_6 = pos_6 if pos_7 is not None: if pos_7 == 'False': this_player.pos_7 = None else: this_player.pos_7 = pos_7 if pos_8 is not None: if pos_8 == 'False': this_player.pos_8 = None else: this_player.pos_8 = pos_8 if headshot is not None: this_player.headshot = headshot if vanity_card is not None: this_player.vanity_card = vanity_card if strat_code is not None: this_player.strat_code = strat_code if bbref_id is not None: this_player.bbref_id = bbref_id if fangr_id is not None: this_player.fangr_id = fangr_id if description is not None: this_player.description = description if this_player.save() == 1: return_val = model_to_dict(this_player) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.post('/api/v1/players') async def v1_players_post(players: PlayerModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post players. This event has been logged.' ) new_players = [] for x in players.players: # this_player = Player( # player_id=x.player_id, # p_name=x.p_name, # cost=x.cost, # image=x.image, # image2=x.image2, # mlbclub=x.mlbclub, # franchise=x.franchise, # cardset_id=x.cardset_id, # rarity_id=x.rarity_id, # set_num=x.set_num, # pos_1=x.pos_1, # pos_2=x.pos_2, # pos_3=x.pos_3, # pos_4=x.pos_4, # pos_5=x.pos_5, # pos_6=x.pos_6, # pos_7=x.pos_7, # pos_8=x.pos_8, # headshot=x.headshot, # vanity_card=x.vanity_card, # strat_code=x.strat_code, # fangr_id=x.fangr_id, # bbref_id=x.bbref_id, # description=x.description # ) # new_players.append(this_player) new_players.append({ 'player_id': x.player_id, 'p_name': x.p_name, 'cost': x.cost, 'image': x.image, 'image2': x.image2, 'mlbclub': x.mlbclub.title(), 'franchise': x.franchise.title(), 'cardset_id': x.cardset_id, 'rarity_id': x.rarity_id, 'set_num': x.set_num, 'pos_1': x.pos_1, 'pos_2': x.pos_2, 'pos_3': x.pos_3, 'pos_4': x.pos_4, 'pos_5': x.pos_5, 'pos_6': x.pos_6, 'pos_7': x.pos_7, 'pos_8': x.pos_8, 'headshot': x.headshot, 'vanity_card': x.vanity_card, 'strat_code': x.strat_code, 'fangr_id': x.fangr_id, 'bbref_id': x.bbref_id, 'description': x.description }) logging.info(f'new_players: {new_players}') with db.atomic(): # Player.bulk_create(new_players, batch_size=15) for batch in chunked(new_players, 15): logging.info(f'batch: {batch}') Player.insert_many(batch).on_conflict_replace().execute() db.close() # sheets.update_all_players(SHEETS_AUTH) raise HTTPException(status_code=200, detail=f'{len(new_players)} players have been added') # @app.put('/api/v1/players') # async def v1_players_put(players: PlayerModel, token: str = Depends(oauth2_scheme)): # if not valid_token(token): # logging.warning(f'Bad Token: {token}') # db.close() # raise HTTPException( # status_code=401, # detail='You are not authorized to post players. This event has been logged.' # ) # # new_players = [] # for x in players.players: # try: # this_player = Player.get_by_id(x.player_id) # except Exception as e: # new_players.append({ # 'player_id': x.player_id, # 'p_name': x.p_name, # 'cost': x.cost, # 'image': x.image, # 'image2': x.image2, # 'mlbclub': x.mlbclub.title(), # 'franchise': x.franchise.title(), # 'cardset_id': x.cardset_id, # 'rarity_id': x.rarity_id, # 'set_num': x.set_num, # 'pos_1': x.pos_1, # 'pos_2': x.pos_2, # 'pos_3': x.pos_3, # 'pos_4': x.pos_4, # 'pos_5': x.pos_5, # 'pos_6': x.pos_6, # 'pos_7': x.pos_7, # 'pos_8': x.pos_8, # 'headshot': x.headshot, # 'vanity_card': x.vanity_card, # 'strat_code': x.strat_code, # 'fangr_id': x.fangr_id, # 'bbref_id': x.bbref_id, # 'description': x.description # }) # finally: # # @app.patch('/api/v1/players') # async def v1_players_put(players: PlayerModel, token: str = Depends(oauth2_scheme)): # if not valid_token(token): # logging.warning(f'Bad Token: {token}') # db.close() # raise HTTPException( # status_code=401, # detail='You are not authorized to post players. This event has been logged.' # ) # # new_players = [] # for x in players.players: @app.delete('/api/v1/players/{player_id}') async def v1_players_delete(player_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete players. This event has been logged.' ) try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') count = this_player.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Player {player_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Player {player_id} was not deleted') """ PACKTYPE ENDPOINTS """ class PacktypeModel(pydantic.BaseModel): name: str card_count: int description: str cost: int available: Optional[bool] = True @app.get('/api/v1/packtypes') async def v1_packtypes_get( name: Optional[str] = None, card_count: Optional[int] = None, in_desc: Optional[str] = None, available: Optional[bool] = None, csv: Optional[bool] = None): all_packtypes = PackType.select() if all_packtypes.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no packtypes to filter') if name is not None: all_packtypes = all_packtypes.where(fn.Lower(PackType.name) == name.lower()) if card_count is not None: all_packtypes = all_packtypes.where(PackType.card_count == card_count) if in_desc is not None: all_packtypes = all_packtypes.where(fn.Lower(PackType.description).contains(in_desc.lower())) if available is not None: all_packtypes = all_packtypes.where(PackType.available == available) # if all_packtypes.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No packtypes found') if csv: data_list = [['id', 'name', 'card_count', 'description']] for line in all_packtypes: data_list.append( [ line.id, line.name, line.card_count, line.description ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_packtypes.count(), 'packtypes': []} for x in all_packtypes: return_val['packtypes'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/packtypes/{packtype_id}') async def v1_packtypes_get_one(packtype_id, csv: Optional[bool] = False): try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}') if csv: data_list = [ ['id', 'name', 'card_count', 'description'], [this_packtype.id, this_packtype.name, this_packtype.card_count, this_packtype.description] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_packtype) db.close() return return_val @app.post('/api/v1/packtypes') async def v1_packtypes_post(packtype: PacktypeModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post packtypes. This event has been logged.' ) dupe_packtype = PackType.get_or_none(PackType.name == packtype.name) if dupe_packtype: db.close() raise HTTPException(status_code=400, detail=f'There is already a packtype using {packtype.name}') this_packtype = PackType( name=packtype.name, card_count=packtype.card_count, description=packtype.description, cost=packtype.cost, available=packtype.available ) saved = this_packtype.save() if saved == 1: return_val = model_to_dict(this_packtype) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @app.patch('/api/v1/packtypes/{packtype_id}') async def v1_packtypes_patch( packtype_id, name: Optional[str] = None, card_count: Optional[int] = None, description: Optional[str] = None, cost: Optional[int] = None, available: Optional[bool] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch packtypes. This event has been logged.' ) try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}') if name is not None: this_packtype.name = name if card_count is not None: this_packtype.card_count = card_count if description is not None: this_packtype.description = description if cost is not None: this_packtype.cost = cost if available is not None: this_packtype.available = available if this_packtype.save() == 1: return_val = model_to_dict(this_packtype) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/packtypes/{packtype_id}') async def v1_packtypes_delete(packtype_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete packtypes. This event has been logged.' ) try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}') count = this_packtype.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Packtype {packtype_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Packtype {packtype_id} was not deleted') """ PACK ENDPOINTS """ class PackPydantic(pydantic.BaseModel): team_id: int pack_type_id: int open_time: Optional[str] = None class PackModel(pydantic.BaseModel): packs: List[PackPydantic] @app.get('/api/v1/packs') async def v1_packs_get( team_id: Optional[int] = None, pack_type_id: Optional[int] = None, opened: Optional[bool] = None, limit: Optional[int] = None, new_to_old: Optional[bool] = None, csv: Optional[bool] = None): all_packs = Pack.select() if all_packs.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no packs to filter') if team_id is not None: try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') all_packs = all_packs.where(Pack.team == this_team) if pack_type_id is not None: try: this_pack_type = PackType.get_by_id(pack_type_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack type found with id {pack_type_id}') all_packs = all_packs.where(Pack.pack_type == this_pack_type) if opened is not None: all_packs = all_packs.where(Pack.open_time.is_null(not opened)) if limit is not None: all_packs = all_packs.limit(limit) if new_to_old is not None: all_packs = all_packs.order_by(-Pack.id) # if all_packs.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No packs found') if csv: data_list = [['id', 'team', 'pack_type', 'open_time']] for line in all_packs: data_list.append( [ line.id, line.team.abbrev, line.pack_type.name, datetime.fromtimestamp(line.open_time) if line.open_time else None ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_packs.count(), 'packs': []} for x in all_packs: return_val['packs'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/packs/{pack_id}') async def v1_packs_get_one(pack_id, csv: Optional[bool] = False): try: this_pack = Pack.get_by_id(pack_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}') if csv: data_list = [ ['id', 'team', 'pack_type', 'open_time'], [this_pack.id, this_pack.team.abbrev, this_pack.pack_type.name, datetime.fromtimestamp(this_pack.open_time) if this_pack.open_time else None] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_pack) db.close() return return_val @app.post('/api/v1/packs') async def v1_packs_post(packs: PackModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post packs. This event has been logged.' ) new_packs = [] for x in packs.packs: this_player = Pack( team_id=x.team_id, pack_type_id=x.pack_type_id, open_time=x.open_time if x.open_time != "" else None ) new_packs.append(this_player) with db.atomic(): Pack.bulk_create(new_packs, batch_size=15) db.close() raise HTTPException(status_code=200, detail=f'{len(new_packs)} packs have been added') @app.post('/api/v1/packs/one') async def v1_packs_post_one(pack: PackPydantic, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post packs. This event has been logged.' ) this_pack = Pack( team_id=pack.team_id, pack_type_id=pack.pack_type_id, open_time=pack.open_time ) saved = this_pack.save() if saved == 1: return_val = model_to_dict(this_pack) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @app.patch('/api/v1/packs/{pack_id}') async def v1_packs_patch( pack_id, team_id: Optional[int] = None, pack_type_id: Optional[int] = None, open_time: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch packs. This event has been logged.' ) try: this_pack = Pack.get_by_id(pack_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}') if team_id is not None: this_pack.team_id = team_id if pack_type_id is not None: this_pack.pack_type_id = pack_type_id if open_time is not None: this_pack.open_time = open_time if this_pack.save() == 1: return_val = model_to_dict(this_pack) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/packs/{pack_id}') async def v1_packs_delete(pack_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete packs. This event has been logged.' ) try: this_pack = Pack.get_by_id(pack_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No packs found with id {pack_id}') count = this_pack.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Pack {pack_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Pack {pack_id} was not deleted') """ CARD ENDPOINTS """ class CardPydantic(pydantic.BaseModel): player_id: int team_id: int pack_id: int value: Optional[int] = 0 class CardModel(pydantic.BaseModel): cards: List[CardPydantic] @app.get('/api/v1/cards') async def v1_cards_get( player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None, value: Optional[int] = None, min_value: Optional[int] = None, max_value: Optional[int] = None, order_by: Optional[str] = None, limit: Optional[int] = None, dupes: Optional[bool] = None, csv: Optional[bool] = None): all_cards = Card.select() # if all_cards.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no cards to filter') if team_id is not None: try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') all_cards = all_cards.where(Card.team == this_team) if player_id is not None: try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id}') all_cards = all_cards.where(Card.player == this_player) if pack_id is not None: try: this_pack = Pack.get_by_id(pack_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}') all_cards = all_cards.where(Card.pack == this_pack) if value is not None: all_cards = all_cards.where(Card.value == value) if min_value is not None: all_cards = all_cards.where(Card.value >= min_value) if max_value is not None: all_cards = all_cards.where(Card.value <= max_value) if order_by is not None: if order_by.lower() == 'new': all_cards = all_cards.order_by(-Card.id) if limit is not None: all_cards = all_cards.limit(limit) # if all_cards.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No cards found') if csv: data_list = [['id', 'player', 'cardset', 'rarity', 'team', 'pack', 'value']] for line in all_cards: data_list.append( [ line.id, line.player.p_name, line.player.cardset, line.player.rarity, line.team.abbrev, line.pack, line.value ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_cards.count(), 'cards': []} for x in all_cards: this_record = model_to_dict(x) logging.debug(f'this_record: {this_record}') this_dex = Paperdex.select().where(Paperdex.player == x) this_record['player']['paperdex'] = {'count': this_dex.count(), 'paperdex': []} for y in this_dex: this_record['player']['paperdex']['paperdex'].append(model_to_dict(y, recurse=False)) return_val['cards'].append(this_record) # return_val['cards'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/cards/{card_id}') async def v1_cards_get_one(card_id, csv: Optional[bool] = False): try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No card found with id {card_id}') if csv: data_list = [ ['id', 'player', 'team', 'pack', 'value', 'roster1', 'roster2', 'roster3'], [this_card.id, this_card.player, this_card.team.abbrev, this_card.pack, this_card.value, this_card.roster1.name, this_card.roster2.name, this_card.roster3.name] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_card) db.close() return return_val @app.post('/api/v1/cards') async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post cards. This event has been logged.' ) last_card = Card.select(Card.id).order_by(-Card.id).limit(1) lc_id = last_card[0].id new_cards = [] player_ids = [] # new_dex = [] # now = int(datetime.timestamp(datetime.now()) * 1000) for x in cards.cards: this_card = Card( player_id=x.player_id, team_id=x.team_id, pack_id=x.pack_id, value=x.value ) Paperdex.get_or_create(team_id=x.team_id, player_id=x.player_id) player_ids.append(x.player_id) new_cards.append(this_card) with db.atomic(): Card.bulk_create(new_cards, batch_size=15) cost_query = Player.update(cost=Player.cost + 1).where(Player.player_id << player_ids) cost_query.execute() # sheets.post_new_cards(SHEETS_AUTH, lc_id) db.close() raise HTTPException(status_code=200, detail=f'{len(new_cards)} cards have been added') @app.post('/api/v1/cards/ai-update') async def v1_cards_ai_update(token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to update AI cards. This event has been logged.' ) sheets.send_ai_cards(SHEETS_AUTH) raise HTTPException(status_code=200, detail=f'Just sent AI cards to sheets') @app.post('/api/v1/cards/post-update/{starting_id}') async def v1_cards_post_update(starting_id: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to update card lists. This event has been logged.' ) # sheets.post_new_cards(SHEETS_AUTH, starting_id) db.close() raise HTTPException(status_code=200, detail=f'Just sent cards to sheets starting at ID {starting_id}') @app.post('/api/v1/cards/post-delete') async def v1_cards_post_delete(del_ids: str, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete card lists. This event has been logged.' ) logging.info(f'del_ids: {del_ids} / type: {type(del_ids)}') # sheets.post_deletion(SHEETS_AUTH, del_ids.split(',')) # @app.get('/api/v1/cards/{card_id}/sell') # async def v1_cards_sell(card_id, ts: int = None): # try: # this_card = Card.get_by_id(card_id) # except Exception: # db.close() # raise HTTPException(status_code=404, detail=f'No card found with id {card_id}') # # this_team = this_card.team # if ts != this_team.team_hash(): # logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') # db.close() # raise HTTPException( # status_code=401, # detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.' # ) # # this_player = this_card.player # orig_price = this_player.cost # sell_price = round(this_player.cost * .5) # # # credit selling team's wallet # if this_team.wallet is None: # this_team.wallet = sell_price # else: # this_team.wallet += sell_price # this_team.save() # # # decrease price of player # this_player.change_on_sell() # this_card.delete_instance() # # # post a notification # new_notif = Notification( # created=int_timestamp(datetime.now()), # title=f'Price Change', # desc='Modified by buying and selling', # field_name=f'{this_player.p_name}', # message=f'From {orig_price}₼ to **{this_player.cost}**₼', # about=f'Player-{this_player.id}' # ) # new_notif.save() # # raise HTTPException(status_code=200, detail=f'Card {card_id} has been sold for {sell_price} bucks') @app.patch('/api/v1/cards/{card_id}') async def v1_cards_patch( card_id, player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None, value: Optional[int] = None, roster1_id: Optional[int] = None, roster2_id: Optional[int] = None, roster3_id: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch cards. This event has been logged.' ) try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No card found with id {card_id}') if player_id is not None: this_card.player_id = player_id if team_id is not None: this_card.team_id = team_id if pack_id is not None: this_card.pack_id = pack_id if value is not None: this_card.value = value if roster1_id is not None: this_card.roster1_id = roster1_id if roster2_id is not None: this_card.roster2_id = roster2_id if roster3_id is not None: this_card.roster3_id = roster3_id if this_card.save() == 1: return_val = model_to_dict(this_card) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/cards/{card_id}') async def v1_cards_delete(card_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete packs. This event has been logged.' ) try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cards found with id {card_id}') count = this_card.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Card {card_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Card {card_id} was not deleted') """ EVENTS ENDPOINTS """ class EventModel(pydantic.BaseModel): name: str short_desc: str long_desc: str url: Optional[str] = None thumbnail: Optional[str] = None active: Optional[bool] = False @app.get('/api/v1/events') async def v1_events_get( name: Optional[str] = None, in_desc: Optional[str] = None, active: Optional[bool] = None, csv: Optional[bool] = None): all_events = Event.select() if all_events.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no events to filter') if name is not None: all_events = all_events.where(fn.Lower(Event.name) == name.lower()) if in_desc is not None: all_events = all_events.where( (fn.Lower(Event.short_desc).contains(in_desc.lower())) | (fn.Lower(Event.long_desc).contains(in_desc.lower())) ) if active is not None: all_events = all_events.where(Event.active == active) if all_events.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No events found') if csv: data_list = [['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active']] for line in all_events: data_list.append( [ line.id, line.name, line.short_desc, line.long_desc, line.url, line.thumbnail, line.active ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_events.count(), 'events': []} for x in all_events: return_val['events'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/events/{event_id}') async def v1_events_get_one(event_id, csv: Optional[bool] = False): try: this_event = Event.get_by_id(event_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') if csv: data_list = [ ['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active'], [this_event.id, this_event.name, this_event.short_desc, this_event.long_desc, this_event.url, this_event.thumbnail, this_event.active] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_event) db.close() return return_val @app.post('/api/v1/events') async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post events. This event has been logged.' ) dupe_event = Event.get_or_none(Event.name == event.name) if dupe_event: db.close() raise HTTPException(status_code=400, detail=f'There is already an event using {event.name}') this_event = Event( name=event.name, short_desc=event.short_desc, long_desc=event.long_desc, url=event.url, thumbnail=event.thumbnail, active=event.active ) saved = this_event.save() if saved == 1: return_val = model_to_dict(this_event) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @app.patch('/api/v1/events/{event_id}') async def v1_events_patch( event_id, name: Optional[str] = None, short_desc: Optional[str] = None, long_desc: Optional[str] = None, url: Optional[str] = None, thumbnail: Optional[str] = None, active: Optional[bool] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch events. This event has been logged.' ) try: this_event = Event.get_by_id(event_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') if name is not None: this_event.name = name if short_desc is not None: this_event.short_desc = short_desc if long_desc is not None: this_event.long_desc = long_desc if url is not None: this_event.url = url if thumbnail is not None: this_event.thumbnail = thumbnail if active is not None: this_event.active = active if this_event.save() == 1: return_val = model_to_dict(this_event) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that event' ) @app.delete('/api/v1/events/{event_id}') async def v1_events_delete(event_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete events. This event has been logged.' ) try: this_event = Event.get_by_id(event_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No event found with id {event_id}') count = this_event.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Event {event_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Event {event_id} was not deleted') """ ROSTERS ENDPOINTS """ class RosterModel(pydantic.BaseModel): team_id: int name: Optional[str] = 'My Roster' roster_num: int card_ids: list @app.get('/api/v1/rosters') async def v1_rosters_get(team_id: Optional[int] = None, csv: Optional[bool] = None): all_rosters = Roster.select() # if all_rosters.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no rosters to filter') if team_id: try: this_team = Team.get_by_id(team_id) all_rosters = all_rosters.where(Roster.team == this_team) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if csv: data_list = [['id', 'roster', 'team_id', 'team_abbrev']] for line in all_rosters: data_list.append([ line.id, line.name, line.team, line.team.abbrev ]) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_rosters.count(), 'rosters': []} for x in all_rosters: return_val['rosters'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/rosters/{roster_id}') async def v1_rosters_get_one(roster_id, csv: Optional[bool] = None): try: this_roster = Roster.get_by_id(roster_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}') if csv: data_list = [ ['id', 'roster', 'team_id', 'team_abbrev'], [this_roster.id, this_roster.name, this_roster.team, this_roster.team.abbrev] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_roster) db.close() return return_val @app.post('/api/v1/rosters') async def v1_rosters_post(roster: RosterModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post rosters. This event has been logged.' ) c_query = Card.select().where(Card.id << roster.card_ids) logging.debug(f'c_query: {c_query}') for card in c_query: if card.team_id != roster.team_id: raise HTTPException( status_code=401, detail=f'Card ID {card.id} ({card.player.rarity.name} {card.player.p_name}) belongs to ' f'{card.team.abbrev} and cannot be added to your roster.' ) r_query = Roster.delete().where(Roster.team_id == roster.team_id, Roster.roster_num == roster.roster_num) logging.debug(f'r_query: {r_query}') r_query.execute() this_roster = Roster( team_id=roster.team_id, name=roster.name, roster_num=roster.roster_num, card_1_id=roster.card_ids[0], card_2_id=roster.card_ids[1], card_3_id=roster.card_ids[2], card_4_id=roster.card_ids[3], card_5_id=roster.card_ids[4], card_6_id=roster.card_ids[5], card_7_id=roster.card_ids[6], card_8_id=roster.card_ids[7], card_9_id=roster.card_ids[8], card_10_id=roster.card_ids[9], card_11_id=roster.card_ids[10], card_12_id=roster.card_ids[11], card_13_id=roster.card_ids[12], card_14_id=roster.card_ids[13], card_15_id=roster.card_ids[14], card_16_id=roster.card_ids[15], card_17_id=roster.card_ids[16], card_18_id=roster.card_ids[17], card_19_id=roster.card_ids[18], card_20_id=roster.card_ids[19], card_21_id=roster.card_ids[20], card_22_id=roster.card_ids[21], card_23_id=roster.card_ids[22], card_24_id=roster.card_ids[23], card_25_id=roster.card_ids[24], card_26_id=roster.card_ids[25], ) saved = this_roster.save() if saved == 1: return_val = model_to_dict(this_roster, recurse=False) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that roster' ) @app.patch('/api/v1/rosters/{roster_id}') async def v1_rosters_patch( roster_id, team_id: Optional[int] = None, name: Optional[str] = None, roster_num: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch rosters. This event has been logged.' ) try: this_roster = Roster.get_by_id(roster_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}') if team_id is not None: this_roster.team_id = team_id if name is not None: this_roster.name = name if roster_num is not None: this_roster.roster_num = roster_num if this_roster.save() == 1: return_val = model_to_dict(this_roster) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that event' ) @app.delete('/api/v1/rosters/{roster_id}') async def v1_rosters_delete(roster_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete rosters. This event has been logged.' ) try: this_roster = Roster.get_by_id(roster_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}') count = this_roster.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Roster {roster_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Roster {roster_id} was not deleted') """ RESULTS ENDPOINTS """ class ResultModel(pydantic.BaseModel): away_team_id: int home_team_id: int away_score: int home_score: int away_team_value: Optional[int] = None home_team_value: Optional[int] = None away_team_ranking: Optional[int] = None home_team_ranking: Optional[int] = None scorecard: str week: int season: int ranked: bool short_game: bool game_type: str @app.get('/api/v1/results') async def v1_results_get( away_team_id: Optional[int] = None, home_team_id: Optional[int] = None, team_one_id: Optional[int] = None, team_two_id: Optional[int] = None, away_score_min: Optional[int] = None, away_score_max: Optional[int] = None, home_score_min: Optional[int] = None, home_score_max: Optional[int] = None, bothscore_min: Optional[int] = None, bothscore_max: Optional[int] = None, season: Optional[int] = None, week: Optional[int] = None, week_start: Optional[int] = None, week_end: Optional[int] = None, ranked: Optional[bool] = None, short_game: Optional[bool] = None, game_type: Optional[str] = None, vs_ai: Optional[bool] = None, csv: Optional[bool] = None): all_results = Result.select() # if all_results.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no results to filter') if away_team_id is not None: try: this_team = Team.get_by_id(away_team_id) all_results = all_results.where(Result.away_team == this_team) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {away_team_id}') if home_team_id is not None: try: this_team = Team.get_by_id(home_team_id) all_results = all_results.where(Result.home_team == this_team) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {home_team_id}') if team_one_id is not None: try: this_team = Team.get_by_id(team_one_id) all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team)) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_one_id}') if team_two_id is not None: try: this_team = Team.get_by_id(team_two_id) all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team)) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_two_id}') if away_score_min is not None: all_results = all_results.where(Result.away_score >= away_score_min) if away_score_max is not None: all_results = all_results.where(Result.away_score <= away_score_max) if home_score_min is not None: all_results = all_results.where(Result.home_score >= home_score_min) if home_score_max is not None: all_results = all_results.where(Result.home_score <= home_score_max) if bothscore_min is not None: all_results = all_results.where((Result.home_score >= bothscore_min) & (Result.away_score >= bothscore_min)) if bothscore_max is not None: all_results = all_results.where((Result.home_score <= bothscore_max) & (Result.away_score <= bothscore_max)) if season is not None: all_results = all_results.where(Result.season == season) if week is not None: all_results = all_results.where(Result.week == week) if ranked is not None: all_results = all_results.where(Result.ranked == ranked) if short_game is not None: all_results = all_results.where(Result.short_game == short_game) if week_start is not None: all_results = all_results.where(Result.week >= week_start) if week_end is not None: all_results = all_results.where(Result.week <= week_end) if game_type is not None: all_results = all_results.where(Result.game_type == game_type) all_results = all_results.order_by(Result.id) # Not functional # if vs_ai is not None: # AwayTeam = Team.alias() # all_results = all_results.join( # Team, on=Result.home_team # ).switch(Result).join( # Team, on=(AwayTeam.id == Result.away_team).alias('a_team') # ) # # if vs_ai: # all_results = all_results.where( # (Result.home_team.is_ai == 1) | (Result.a_team.is_ai == 1) # ) # else: # all_results = all_results.where( # (Result.home_team.is_ai == 0) & (Result.a_team.is_ai == 0) # ) # logging.info(f'Result Query:\n\n{all_results}') if csv: data_list = [['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv', 'game_type', 'season', 'week', 'short_game', 'ranked']] for line in all_results: data_list.append([ line.id, line.away_team.abbrev, line.home_team.abbrev, line.away_score, line.home_score, line.away_team_value, line.home_team_value, line.game_type if line.game_type else 'minor-league', line.season, line.week, line.short_game, line.ranked ]) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_results.count(), 'results': []} for x in all_results: return_val['results'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/results/{result_id}') async def v1_results_get_one(result_id, csv: Optional[bool] = None): try: this_result = Result.get_by_id(result_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No result found with id {result_id}') if csv: data_list = [ ['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv', 'game_type', 'season', 'week', 'game_type'], [this_result.id, this_result.away_team.abbrev, this_result.away_team.abbrev, this_result.away_score, this_result.home_score, this_result.away_team_value, this_result.home_team_value, this_result.game_type if this_result.game_type else 'minor-league', this_result.season, this_result.week, this_result.game_type] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_result) db.close() return return_val @app.get('/api/v1/results/team/{team_id}') async def v1_results_team_get( team_id: int, season: Optional[int] = None, week: Optional[int] = None, csv: Optional[bool] = False): all_results = Result.select().where((Result.away_team_id == team_id) | (Result.home_team_id == team_id)) try: this_team = Team.get_by_id(team_id) except Exception as e: logging.error(f'Unknown team id {team_id} trying to pull team results') raise HTTPException(404, f'Team id {team_id} not found') if season is not None: all_results = all_results.where(Result.season == season) if week is not None: all_results = all_results.where(Result.week == week) r_wins, r_loss, c_wins, c_loss = 0, 0, 0, 0 for x in all_results: if x.away_team_id == team_id: if x.away_score > x.home_score: if x.ranked: r_wins += 1 else: c_wins += 1 else: if x.ranked: r_loss += 1 else: c_loss += 1 elif x.home_team_id == team_id: if x.away_score > x.home_score: if x.ranked: r_loss += 1 else: c_loss += 1 else: if x.ranked: r_wins += 1 else: c_wins += 1 if csv: data_list = [ ['team_id', 'ranked_wins', 'ranked_losses', 'casual_wins', 'casual_losses', 'team_ranking'], [team_id, r_wins, r_loss, c_wins, c_loss, this_team.ranking] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = { 'team': model_to_dict(this_team), 'ranked_wins': r_wins, 'ranked_losses': r_loss, 'casual_wins': c_wins, 'casual_losses': c_loss, } db.close() return return_val @app.post('/api/v1/results') async def v1_results_post(result: ResultModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post results. This event has been logged.' ) this_result = Result(**result.__dict__) saved = this_result.save() if result.ranked: if not result.away_team_ranking: db.close() error = f'Ranked game did not include away team ({result.away_team_id}) ranking.' logging.error(error) raise DataError(error) if not result.home_team_ranking: db.close() error = f'Ranked game did not include home team ({result.home_team_id}) ranking.' logging.error(error) raise DataError(error) k_value = 20 if result.short_game else 60 ratio = (result.home_team_ranking - result.away_team_ranking) / 400 exp_score = 1 / (1 + (10 ** ratio)) away_win = True if result.away_score > result.home_score else False total_delta = k_value * exp_score high_delta = total_delta * exp_score if exp_score > .5 else total_delta * (1 - exp_score) low_delta = total_delta - high_delta # exp_score > .5 means away team is favorite if exp_score > .5 and away_win: final_delta = low_delta away_delta = low_delta * 3 home_delta = -low_delta elif away_win: final_delta = high_delta away_delta = high_delta * 3 home_delta = -high_delta elif exp_score <= .5 and not away_win: final_delta = low_delta away_delta = -low_delta home_delta = low_delta * 3 elif not away_win: final_delta = high_delta away_delta = -high_delta home_delta = high_delta * 3 else: final_delta = 0 away_delta = 0 home_delta = 0 logging.debug(f'/results ranking deltas\n\nk_value: {k_value} / ratio: {ratio} / ' f'exp_score: {exp_score} / away_win: {away_win} / total_delta: {total_delta} / ' f'high_delta: {high_delta} / low_delta: {low_delta} / final_delta: {final_delta} / ') away_team = Team.get_by_id(result.away_team_id) away_team.ranking += away_delta away_team.save() logging.info(f'Just updated {away_team.abbrev} ranking to {away_team.ranking}') home_team = Team.get_by_id(result.home_team_id) home_team.ranking += home_delta home_team.save() logging.info(f'Just updated {home_team.abbrev} ranking to {home_team.ranking}') if saved == 1: return_val = model_to_dict(this_result) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that roster' ) @app.patch('/api/v1/results/{result_id}') async def v1_results_patch( result_id, away_team_id: Optional[int] = None, home_team_id: Optional[int] = None, away_score: Optional[int] = None, home_score: Optional[int] = None, away_team_value: Optional[int] = None, home_team_value: Optional[int] = None, scorecard: Optional[str] = None, week: Optional[int] = None, season: Optional[int] = None, short_game: Optional[bool] = None, game_type: Optional[str] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch results. This event has been logged.' ) try: this_result = Result.get_by_id(result_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No result found with id {result_id}') if away_team_id is not None: this_result.away_team_id = away_team_id if home_team_id is not None: this_result.home_team_id = home_team_id if away_score is not None: this_result.away_score = away_score if home_score is not None: this_result.home_score = home_score if away_team_value is not None: this_result.away_team_value = away_team_value if home_team_value is not None: this_result.home_team_value = home_team_value if scorecard is not None: this_result.scorecard = scorecard if week is not None: this_result.week = week if season is not None: this_result.season = season if game_type is not None: this_result.game_type = game_type if short_game is not None: if not short_game: this_result.short_game = None else: this_result.short_game = short_game if this_result.save() == 1: return_val = model_to_dict(this_result) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that event' ) @app.delete('/api/v1/results/{result_id}') async def v1_results_delete(result_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post results. This event has been logged.' ) try: this_result = Result.get_by_id(result_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No result found with id {result_id}') count = this_result.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Result {result_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Result {result_id} was not deleted') """ AWARDS ENDPOINTS """ class AwardModel(pydantic.BaseModel): name: str season: int timing: str = 'In-Season' card_id: Optional[int] = None team_id: Optional[int] = None image: Optional[str] = None @app.get('/api/v1/awards') async def v1_awards_get( name: Optional[str] = None, season: Optional[int] = None, timing: Optional[str] = None, card_id: Optional[int] = None, team_id: Optional[int] = None, image: Optional[str] = None, csv: Optional[bool] = None): all_awards = Award.select() if all_awards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no awards to filter') if name is not None: all_awards = all_awards.where(Award.name == name) if season is not None: all_awards = all_awards.where(Award.season == season) if timing is not None: all_awards = all_awards.where(Award.timing == timing) if card_id is not None: all_awards = all_awards.where(Award.card_id == card_id) if team_id is not None: all_awards = all_awards.where(Award.team_id == team_id) if image is not None: all_awards = all_awards.where(Award.image == image) if csv: data_list = [['id', 'name', 'season', 'timing', 'card', 'team', 'image']] for line in all_awards: data_list.append([ line.id, line.name, line.season, line.timing, line.card, line.team, line.image ]) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_awards.count(), 'awards': []} for x in all_awards: return_val['awards'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/awards/{award_id}') async def v1_awards_get_one(award_id, csv: Optional[bool] = None): try: this_award = Award.get_by_id(award_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No award found with id {award_id}') if csv: data_list = [ ['id', 'name', 'season', 'timing', 'card', 'team', 'image'], [this_award.id, this_award.name, this_award.season, this_award.timing, this_award.card, this_award.team, this_award.image] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_award) db.close() return return_val @app.post('/api/v1/awards') async def v1_awards_post(award: AwardModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post awards. This event has been logged.' ) this_award = Award( name=award.name, season=award.season, timing=award.season, card_id=award.card_id, team_id=award.team_id, image=award.image ) saved = this_award.save() if saved == 1: return_val = model_to_dict(this_award) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that roster' ) @app.delete('/api/v1/awards/{award_id}') async def v1_awards_delete(award_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete awards. This event has been logged.' ) try: this_award = Award.get_by_id(award_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No award found with id {award_id}') count = this_award.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Award {award_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Award {award_id} was not deleted') """ REWARD ENDPOINTS """ class RewardModel(pydantic.BaseModel): name: str season: int week: int team_id: int created: Optional[int] = int(datetime.timestamp(datetime.now())*1000) @app.get('/api/v1/rewards') async def v1_rewards_get( name: Optional[str] = None, in_name: Optional[str] = None, team_id: Optional[int] = None, season: Optional[int] = None, week: Optional[int] = None, created_after: Optional[int] = None, flat: Optional[bool] = False, csv: Optional[bool] = None): all_rewards = Reward.select() if all_rewards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no rewards to filter') if name is not None: all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower()) if team_id is not None: all_rewards = all_rewards.where(Reward.team_id == team_id) if created_after is not None: all_rewards = all_rewards.where(Reward.created >= created_after) if in_name is not None: all_rewards = all_rewards.where(fn.Lower(Reward.name).contains(in_name.lower())) if season is not None: all_rewards = all_rewards.where(Reward.season == season) if week is not None: all_rewards = all_rewards.where(Reward.week == week) if all_rewards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No rewards found') if csv: data_list = [['id', 'name', 'team', 'daily', 'created']] for line in all_rewards: data_list.append( [ line.id, line.name, line.team.id, line.daily, line.created ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_rewards.count(), 'rewards': []} for x in all_rewards: return_val['rewards'].append(model_to_dict(x, recurse=not flat)) db.close() return return_val @app.get('/api/v1/rewards/{reward_id}') async def v1_rewards_get_one(reward_id, csv: Optional[bool] = False): try: this_reward = Reward.get_by_id(reward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}') if csv: data_list = [ ['id', 'name', 'card_count', 'description'], [this_reward.id, this_reward.name, this_reward.team.id, this_reward.daily, this_reward.created] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_reward) db.close() return return_val @app.post('/api/v1/rewards') async def v1_rewards_post(reward: RewardModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post rewards. This event has been logged.' ) this_reward = Reward(**reward.dict()) saved = this_reward.save() if saved == 1: return_val = model_to_dict(this_reward) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @app.patch('/api/v1/rewards/{reward_id}') async def v1_rewards_patch( reward_id, name: Optional[str] = None, team_id: Optional[int] = None, created: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch rewards. This event has been logged.' ) try: this_reward = Reward.get_by_id(reward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}') if name is not None: this_reward.name = name if team_id is not None: this_reward.team_id = team_id if created is not None: this_reward.created = created if this_reward.save() == 1: return_val = model_to_dict(this_reward) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/rewards/{reward_id}') async def v1_rewards_delete(reward_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete rewards. This event has been logged.' ) try: this_reward = Reward.get_by_id(reward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}') count = this_reward.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Reward {reward_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Reward {reward_id} was not deleted') """ BATTING STATS ENDPOINTS """ class BatStat(pydantic.BaseModel): card_id: int team_id: int roster_num: int vs_team_id: int pos: str pa: Optional[int] = 0 ab: Optional[int] = 0 run: Optional[int] = 0 hit: Optional[int] = 0 rbi: Optional[int] = 0 double: Optional[int] = 0 triple: Optional[int] = 0 hr: Optional[int] = 0 bb: Optional[int] = 0 so: Optional[int] = 0 hbp: Optional[int] = 0 sac: Optional[int] = 0 ibb: Optional[int] = 0 gidp: Optional[int] = 0 sb: Optional[int] = 0 cs: Optional[int] = 0 bphr: Optional[int] = 0 bpfo: Optional[int] = 0 bp1b: Optional[int] = 0 bplo: Optional[int] = 0 xch: Optional[int] = 0 xhit: Optional[int] = 0 error: Optional[int] = 0 pb: Optional[int] = 0 sbc: Optional[int] = 0 csc: Optional[int] = 0 week: int season: int created: Optional[int] = int(datetime.timestamp(datetime.now())*100000) game_id: int class BattingStatModel(pydantic.BaseModel): stats: List[BatStat] @app.get('/api/v1/batstats') async def v1_batstats_get( card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None, season: int = None, week_start: int = None, week_end: int = None, created: int = None, csv: bool = None): all_stats = BattingStat.select().join(Card).join(Player) if card_id is not None: all_stats = all_stats.where(BattingStat.card_id == card_id) if player_id is not None: all_stats = all_stats.where(BattingStat.card.player.player_id == player_id) if team_id is not None: all_stats = all_stats.where(BattingStat.team_id == team_id) if vs_team_id is not None: all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id) if week is not None: all_stats = all_stats.where(BattingStat.week == week) if season is not None: all_stats = all_stats.where(BattingStat.season == season) if week_start is not None: all_stats = all_stats.where(BattingStat.week >= week_start) if week_end is not None: all_stats = all_stats.where(BattingStat.week <= week_end) if created is not None: all_stats = all_stats.where(BattingStat.created == created) # if all_stats.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No batting stats found') if csv: data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'pos', 'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', 'week', 'season', 'created', 'game_id', 'roster_num']] for line in all_stats: data_list.append( [ line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev, line.pos, line.pa, line.ab, line.run, line.hit, line.rbi, line.double, line.triple, line.hr, line.bb, line.so, line.hbp, line.sac, line.ibb, line.gidp, line.sb, line.cs, line.bphr, line.bpfo, line.bp1b, line.bplo, line.xch, line.xhit, line.error, line.pb, line.sbc, line.csc, line.week, line.season, line.created, line.game_id, line.roster_num ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_stats.count(), 'stats': []} for x in all_stats: return_val['stats'].append(model_to_dict(x, recurse=False)) db.close() return return_val @app.get('/api/v1/batstats/player/{player_id}') async def v1_batstats_get_card( player_id: int, team_id: int = None, vs_team_id: int = None, week_start: int = None, week_end: int = None, csv: bool = None): all_stats = (BattingStat .select(fn.COUNT(BattingStat.created).alias('game_count')) .join(Card) .group_by(BattingStat.card) .where(BattingStat.card.player == player_id)).scalar() if team_id is not None: all_stats = all_stats.where(BattingStat.team_id == team_id) if vs_team_id is not None: all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id) if week_start is not None: all_stats = all_stats.where(BattingStat.week >= week_start) if week_end is not None: all_stats = all_stats.where(BattingStat.week <= week_end) if csv: data_list = [ [ 'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', ],[ all_stats.pa_sum, all_stats.ab_sum, all_stats.run, all_stats.hit_sum, all_stats.rbi_sum, all_stats.double_sum, all_stats.triple_sum, all_stats.hr_sum, all_stats.bb_sum, all_stats.so_sum, all_stats.hbp_sum, all_stats.sac, all_stats.ibb_sum, all_stats.gidp_sum, all_stats.sb_sum, all_stats.cs_sum, all_stats.bphr_sum, all_stats.bpfo_sum, all_stats.bp1b_sum, all_stats.bplo_sum, all_stats.xch, all_stats.xhit_sum, all_stats.error_sum, all_stats.pb_sum, all_stats.sbc_sum, all_stats.csc_sum ] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: logging.debug(f'stat pull query: {all_stats}\n') # logging.debug(f'result 0: {all_stats[0]}\n') for x in all_stats: logging.debug(f'this_line: {model_to_dict(x)}') return_val = model_to_dict(all_stats[0]) db.close() return return_val @app.post('/api/v1/batstats') async def v1_batstats_post(stats: BattingStatModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post stats. This event has been logged.' ) new_stats = [] for x in stats.stats: this_stat = BattingStat( card_id=x.card_id, team_id=x.team_id, roster_num=x.roster_num, vs_team_id=x.vs_team_id, pos=x.pos, pa=x.pa, ab=x.ab, run=x.run, hit=x.hit, rbi=x.rbi, double=x.double, triple=x.triple, hr=x.hr, bb=x.bb, so=x.so, hbp=x.hbp, sac=x.sac, ibb=x.ibb, gidp=x.gidp, sb=x.sb, cs=x.cs, bphr=x.bphr, bpfo=x.bpfo, bp1b=x.bp1b, bplo=x.bplo, xch=x.xch, xhit=x.xhit, error=x.error, pb=x.pb, sbc=x.sbc, csc=x.csc, week=x.week, season=x.season, created=x.created, game_id=x.game_id ) new_stats.append(this_stat) with db.atomic(): BattingStat.bulk_create(new_stats, batch_size=15) db.close() raise HTTPException(status_code=200, detail=f'{len(new_stats)} batting lines have been added') @app.delete('/api/v1/batstats/{stat_id}') async def v1_rewards_delete(stat_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete stats. This event has been logged.' ) try: this_reward = Reward.get_by_id(stat_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}') count = this_reward.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted') """ PITCHING STATS ENDPOINTS """ class PitStat(pydantic.BaseModel): card_id: int team_id: int vs_team_id: int roster_num: int ip: float hit: Optional[int] = 0 run: Optional[int] = 0 erun: Optional[int] = 0 so: Optional[int] = 0 bb: Optional[int] = 0 hbp: Optional[int] = 0 wp: Optional[int] = 0 balk: Optional[int] = 0 hr: Optional[int] = 0 ir: Optional[int] = 0 irs: Optional[int] = 0 gs: Optional[int] = 0 win: Optional[int] = 0 loss: Optional[int] = 0 hold: Optional[int] = 0 sv: Optional[int] = 0 bsv: Optional[int] = 0 week: int season: int created: Optional[int] = int(datetime.timestamp(datetime.now())*100000) game_id: int class PitchingStatModel(pydantic.BaseModel): stats: List[PitStat] @app.get('/api/v1/pitstats') async def v1_pitstats_get( card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None, season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None, csv: bool = None): all_stats = PitchingStat.select().join(Card).join(Player) logging.debug(f'pit query:\n\n{all_stats}') if card_id is not None: all_stats = all_stats.where(PitchingStat.card_id == card_id) if player_id is not None: all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id) if team_id is not None: all_stats = all_stats.where(PitchingStat.team_id == team_id) if vs_team_id is not None: all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id) if week is not None: all_stats = all_stats.where(PitchingStat.week == week) if season is not None: all_stats = all_stats.where(PitchingStat.season == season) if week_start is not None: all_stats = all_stats.where(PitchingStat.week >= week_start) if week_end is not None: all_stats = all_stats.where(PitchingStat.week <= week_end) if created is not None: all_stats = all_stats.where(PitchingStat.created <= created) if gs is not None: all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0) # if all_stats.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No pitching stats found') if csv: data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'ip', 'hit', 'run', 'erun', 'so', 'bb', 'hbp', 'wp', 'balk', 'hr', 'ir', 'irs', 'gs', 'win', 'loss', 'hold', 'sv', 'bsv', 'week', 'season', 'created', 'game_id', 'roster_num']] for line in all_stats: data_list.append( [ line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev, line.ip, line.hit, line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs, line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created, line.game_id, line.roster_num ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_stats.count(), 'stats': []} for x in all_stats: return_val['stats'].append(model_to_dict(x, recurse=False)) db.close() return return_val @app.post('/api/v1/pitstats') async def v1_batstats_post(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post stats. This event has been logged.' ) new_stats = [] for x in stats.stats: this_stat = PitchingStat( card_id=x.card_id, team_id=x.team_id, vs_team_id=x.vs_team_id, roster_num=x.roster_num, ip=x.ip, hit=x.hit, run=x.run, erun=x.erun, so=x.so, bb=x.bb, hbp=x.hbp, wp=x.wp, balk=x.balk, hr=x.hr, ir=x.ir, irs=x.irs, gs=x.gs, win=x.win, loss=x.loss, hold=x.hold, sv=x.sv, bsv=x.bsv, week=x.week, season=x.season, created=x.created, game_id=x.game_id ) new_stats.append(this_stat) with db.atomic(): PitchingStat.bulk_create(new_stats, batch_size=15) db.close() raise HTTPException(status_code=200, detail=f'{len(new_stats)} pitching lines have been added') @app.delete('/api/v1/pitstats/{stat_id}') async def v1_rewards_delete(stat_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete stats. This event has been logged.' ) try: this_reward = Reward.get_by_id(stat_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}') count = this_reward.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted') """ NOTIFICATIONS ENDPOINTS """ class NotifModel(pydantic.BaseModel): created: int title: str desc: Optional[str] = None field_name: str message: str about: Optional[str] = 'blank' ack: Optional[bool] = False @app.get('/api/v1/notifs') async def v1_notifs_get( created_after: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None, field_name: Optional[str] = None, in_desc: Optional[str] = None, about: Optional[str] = None, ack: Optional[bool] = None, csv: Optional[bool] = None): all_notif = Notification.select() if all_notif.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no notifications to filter') if created_after is not None: all_notif = all_notif.where(Notification.created < created_after) if title is not None: all_notif = all_notif.where(Notification.title == title) if desc is not None: all_notif = all_notif.where(Notification.desc == desc) if field_name is not None: all_notif = all_notif.where(Notification.field_name == field_name) if in_desc is not None: all_notif = all_notif.where(fn.Lower(Notification.desc).contains(in_desc.lower())) if about is not None: all_notif = all_notif.where(Notification.about == about) if ack is not None: all_notif = all_notif.where(Notification.ack == ack) if csv: data_list = [['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack']] for line in all_notif: data_list.append([ line.id, line.created, line.title, line.desc, line.field_name, line.message, line.about, line.ack ]) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_notif.count(), 'notifs': []} for x in all_notif: return_val['notifs'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/notifs/{notif_id}') async def v1_notifs_get_one(notif_id, csv: Optional[bool] = None): try: this_notif = Notification.get_by_id(notif_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}') if csv: data_list = [ ['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack'], [this_notif.id, this_notif.created, this_notif.title, this_notif.desc, this_notif.field_name, this_notif.message, this_notif.about, this_notif.ack] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_notif) db.close() return return_val @app.post('/api/v1/notifs') async def v1_notifs_post(notif: NotifModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post notifications. This event has been logged.' ) logging.info(f'new notif: {notif}') this_notif = Notification( created=notif.created, title=notif.title, desc=notif.desc, field_name=notif.field_name, message=notif.message, about=notif.about, ) saved = this_notif.save() if saved == 1: return_val = model_to_dict(this_notif) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that notification' ) @app.patch('/api/v1/notifs/{notif_id}') async def v1_rewards_patch( notif_id, created: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None, field_name: Optional[str] = None, message: Optional[str] = None, about: Optional[str] = None, ack: Optional[bool] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch notifications. This event has been logged.' ) try: this_notif = Notification.get_by_id(notif_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}') if title is not None: this_notif.title = title if desc is not None: this_notif.desc = desc if field_name is not None: this_notif.field_name = field_name if message is not None: this_notif.message = message if about is not None: this_notif.about = about if ack is not None: this_notif.ack = ack if created is not None: this_notif.created = created if this_notif.save() == 1: return_val = model_to_dict(this_notif) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/notifs/{notif_id}') async def v1_notifs_delete(notif_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete notifications. This event has been logged.' ) try: this_notif = Notification.get_by_id(notif_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}') count = this_notif.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Notification {notif_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Notification {notif_id} was not deleted') """ PAPERDEX ENDPOINTS """ class PaperdexModel(pydantic.BaseModel): team_id: int player_id: int created: Optional[int] = int(datetime.timestamp(datetime.now())*1000) @app.get('/api/v1/paperdex') async def v1_paperdex_get( team_id: Optional[int] = None, player_id: Optional[int] = None, created_after: Optional[int] = None, cardset_id: Optional[int] = None, created_before: Optional[int] = None, flat: Optional[bool] = False, csv: Optional[bool] = None): all_dex = Paperdex.select().join(Player).join(Cardset) if all_dex.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no paperdex to filter') if team_id is not None: all_dex = all_dex.where(Paperdex.team_id == team_id) if player_id is not None: all_dex = all_dex.where(Paperdex.player_id == player_id) if cardset_id is not None: all_dex = all_dex.where(Paperdex.player.cardset.id == cardset_id) if created_after is not None: all_dex = all_dex.where(Paperdex.created >= created_after) if created_before is not None: all_dex = all_dex.where(Paperdex.created <= created_before) # if all_dex.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No paperdex found') if csv: data_list = [['id', 'team_id', 'player_id', 'created']] for line in all_dex: data_list.append( [ line.id, line.team.id, line.player.player_id, line.created ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_dex.count(), 'paperdex': []} for x in all_dex: return_val['paperdex'].append(model_to_dict(x, recurse=not flat)) db.close() return return_val @app.get('/api/v1/paperdex/{paperdex_id}') async def v1_paperdex_get_one(paperdex_id, csv: Optional[bool] = False): try: this_dex = Paperdex.get_by_id(paperdex_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}') if csv: data_list = [ ['id', 'team_id', 'player_id', 'created'], [this_dex.id, this_dex.team.id, this_dex.player.id, this_dex.created] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_dex) db.close() return return_val @app.post('/api/v1/paperdex') async def v1_paperdex_post(paperdex: PaperdexModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post paperdex. This event has been logged.' ) dupe_dex = Paperdex.get_or_none(Paperdex.team_id == paperdex.team_id, Paperdex.player_id == paperdex.player_id) if dupe_dex: return_val = model_to_dict(dupe_dex) db.close() return return_val this_dex = Paperdex( team_id=paperdex.team_id, player_id=paperdex.player_id, created=paperdex.created ) saved = this_dex.save() if saved == 1: return_val = model_to_dict(this_dex) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that dex' ) @app.patch('/api/v1/paperdex/{paperdex_id}') async def v1_paperdex_patch( paperdex_id, team_id: Optional[int] = None, player_id: Optional[int] = None, created: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch paperdex. This event has been logged.' ) try: this_dex = Paperdex.get_by_id(paperdex_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}') if team_id is not None: this_dex.team_id = team_id if player_id is not None: this_dex.player_id = player_id if created is not None: this_dex.created = created if this_dex.save() == 1: return_val = model_to_dict(this_dex) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/paperdex/{paperdex_id}') async def v1_paperdex_delete(paperdex_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete rewards. This event has been logged.' ) try: this_dex = Paperdex.get_by_id(paperdex_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}') count = this_dex.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Paperdex {this_dex} has been deleted') else: raise HTTPException(status_code=500, detail=f'Paperdex {this_dex} was not deleted') """ GAMEREWARDS ENDPOINTS """ class GameRewardModel(pydantic.BaseModel): name: str pack_type_id: Optional[int] = None player_id: Optional[int] = None money: Optional[int] = None @app.get('/api/v1/gamerewards') async def v1_gamerewards_get( name: Optional[str] = None, pack_type_id: Optional[int] = None, player_id: Optional[int] = None, money: Optional[int] = None, csv: Optional[bool] = None): all_rewards = GameRewards.select() # if all_rewards.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no awards to filter') if name is not None: all_rewards = all_rewards.where(GameRewards.name == name) if pack_type_id is not None: all_rewards = all_rewards.where(GameRewards.pack_type_id == pack_type_id) if player_id is not None: all_rewards = all_rewards.where(GameRewards.player_id == player_id) if money is not None: all_rewards = all_rewards.where(GameRewards.money == money) if csv: data_list = [['id', 'pack_type_id', 'player_id', 'money']] for line in all_rewards: data_list.append([ line.id, line.pack_type_id if line.pack_type else None, line.player_id if line.player else None, line.money ]) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_rewards.count(), 'gamerewards': []} for x in all_rewards: return_val['gamerewards'].append(model_to_dict(x)) db.close() return return_val @app.get('/api/v1/gamerewards/{gameaward_id}') async def v1_gamerewards_get_one(gamereward_id, csv: Optional[bool] = None): try: this_game_reward = GameRewards.get_by_id(gamereward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No game reward found with id {gamereward_id}') if csv: data_list = [ ['id', 'pack_type_id', 'player_id', 'money'], [this_game_reward.id, this_game_reward.pack_type_id if this_game_reward.pack_type else None, this_game_reward.player_id if this_game_reward.player else None, this_game_reward.money] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_game_reward) db.close() return return_val @app.post('/api/v1/gamerewards') async def v1_gamerewards_post(game_reward: GameRewardModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post game rewards. This event has been logged.' ) this_award = GameRewards( name=game_reward.name, pack_type_id=game_reward.pack_type_id, player_id=game_reward.player_id, money=game_reward.money ) saved = this_award.save() if saved == 1: return_val = model_to_dict(this_award) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that roster' ) @app.patch('/api/v1/gamerewards/{game_reward_id}') async def v1_gamerewards_patch( game_reward_id: int, name: Optional[str] = None, pack_type_id: Optional[int] = None, player_id: Optional[int] = None, money: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch gamerewards. This event has been logged.' ) try: this_game_reward = GameRewards.get_by_id(game_reward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No game reward found with id {game_reward_id}') if name is not None: this_game_reward.name = name if pack_type_id is not None: if not pack_type_id: this_game_reward.pack_type_id = None else: this_game_reward.pack_type_id = pack_type_id if player_id is not None: if not player_id: this_game_reward.player_id = None else: this_game_reward.player_id = player_id if money is not None: if not money: this_game_reward.money = None else: this_game_reward.money = money if this_game_reward.save() == 1: return_val = model_to_dict(this_game_reward) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @app.delete('/api/v1/gamerewards/{gamereward_id}') async def v1_gamerewards_delete(gamereward_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete awards. This event has been logged.' ) try: this_award = GameRewards.get_by_id(gamereward_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No award found with id {gamereward_id}') count = this_award.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Game Reward {gamereward_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Game Reward {gamereward_id} was not deleted')