from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Response from typing import Optional import logging import pydantic from pandas import DataFrame from ..db_engine import db, Team, model_to_dict, fn, Pack, Card, Player, Paperdex, Notification, PackType, \ Rarity, Current, query_to_csv, complex_data_to_csv from ..dependencies import oauth2_scheme, valid_token, LOG_DATA, int_timestamp logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) router = APIRouter( prefix='/api/v2/teams', tags=['teams'] ) class TeamModel(pydantic.BaseModel): abbrev: str sname: str lname: str gmid: int gmname: str wallet: int = 0 gsheet: str team_value: int = 0 collection_value: int = 0 logo: Optional[str] = None color: Optional[str] = None season: int ps_shiny: Optional[int] = 0 ranking: Optional[int] = 1000 has_guide: Optional[bool] = False is_ai: Optional[bool] = False @router.get('') async def get_teams( season: Optional[int] = None, gm_id: Optional[int] = None, abbrev: Optional[str] = None, tv_min: Optional[int] = None, tv_max: Optional[int] = None, cv_min: Optional[int] = None, cv_max: Optional[int] = None, ps_shiny_min: Optional[int] = None, ps_shiny_max: Optional[int] = None, ranking_min: Optional[int] = None, ranking_max: Optional[int] = None, has_guide: Optional[bool] = None, sname: Optional[str] = None, lname: Optional[str] = None, is_ai: Optional[bool] = None, event_id: Optional[int] = None, limit: Optional[int] = None, csv: Optional[bool] = False): """ Param: season: int Param: team_abbrev: string Param: owner_id: int """ if season: all_teams = Team.select_season(season) else: all_teams = Team.select() # if all_teams.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no teams to filter') if gm_id is not None: all_teams = all_teams.where(Team.gmid == gm_id) if abbrev is not None: all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower()) if sname is not None: all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower()) if lname is not None: all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower()) if tv_min is not None: all_teams = all_teams.where(Team.team_value >= tv_min) if tv_max is not None: all_teams = all_teams.where(Team.team_value <= tv_max) if cv_min is not None: all_teams = all_teams.where(Team.collection_value >= cv_min) if cv_max is not None: all_teams = all_teams.where(Team.collection_value <= cv_max) if ps_shiny_min is not None: all_teams = all_teams.where(Team.career >= ps_shiny_min) if ps_shiny_max is not None: all_teams = all_teams.where(Team.career <= ps_shiny_max) if ranking_min is not None: all_teams = all_teams.where(Team.ranking >= ranking_min) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if has_guide is not None: if not has_guide: all_teams = all_teams.where(Team.has_guide == 0) else: all_teams = all_teams.where(Team.has_guide == 1) if is_ai is not None: all_teams = all_teams.where(Team.is_ai) if event_id is not None: all_teams = all_teams.where(Team.event_id == event_id) if limit is not None: all_teams = all_teams.limit(limit) if csv: return_val = query_to_csv(all_teams, exclude=[Team.career]) db.close() return Response(content=return_val, media_type='text/csv') else: return_teams = {'count': all_teams.count(), 'teams': []} for x in all_teams: return_teams['teams'].append(model_to_dict(x)) db.close() return return_teams @router.get('/{team_id}') async def get_one_team(team_id, csv: Optional[bool] = False): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') p_query = Pack.select().where((Pack.team == this_team) & (Pack.open_time.is_null(True))) if csv: data = model_to_dict(this_team) data['sealed_packs'] = p_query.count() return_val = complex_data_to_csv([data]) else: return_val = model_to_dict(this_team) return_val['sealed_packs'] = [model_to_dict(x) for x in p_query] db.close() return return_val @router.get('/{team_id}/buy/players') async def team_buy_players(team_id: int, ids: str, ts: str): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() raise HTTPException( status_code=401, detail=f'You are not authorized to buy {this_team.abbrev} cards. This event has been logged.' ) last_card = Card.select(Card.id).order_by(-Card.id).limit(1) lc_id = last_card[0].id all_ids = ids.split(',') conf_message = '' total_cost = 0 for player_id in all_ids: if player_id != '': try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No player found with id {player_id} /// ' f'{conf_message} purchased') # check wallet balance if this_team.wallet < this_player.cost: logging.info(f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but ' f'{this_player} costs {this_player.cost}₼.') db.close() raise HTTPException( 200, detail=f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but ' f'{this_player} costs {this_player.cost}₼. /// {conf_message} purchased' ) # Create player card and update cost buy_price = this_player.cost total_cost += buy_price this_card = Card( player_id=this_player.player_id, team_id=this_team.id, value=buy_price ) Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id) this_card.save() this_player.change_on_buy() # Deduct card cost from team logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}') this_team.wallet -= buy_price this_team.save() logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}') # Post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=int_timestamp(datetime.now()), title=f'Price Change', desc='Modified by buying and selling', field_name=f'{this_player.description}', message=f'From {buy_price}₼ 📈 to **{this_player.cost}**₼', about=f'Player-{this_player.player_id}' ) new_notif.save() conf_message += f'{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \ f'({this_player.cardset.name}), ' # sheets.post_new_cards(SHEETS_AUTH, lc_id) raise HTTPException(status_code=200, detail=f'{conf_message} purchased. /// Total Cost: {total_cost}₼ /// ' f'Final Wallet: {this_team.wallet}') @router.get('/{team_id}/buy/pack/{packtype_id}') async def team_buy_packs(team_id: int, packtype_id: int, ts: str, quantity: Optional[int] = 1): try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No pack type found with id {packtype_id}') try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() logging.warning(f'team: {this_team} / pack_type: {this_packtype} / secret: {ts} / ' f'actual: {this_team.team_hash()}') raise HTTPException( status_code=401, detail=f'You are not authorized to buy {this_team.abbrev} packs. This event has been logged.' ) # check wallet balance total_cost = this_packtype.cost * quantity if this_team.wallet < total_cost: db.close() raise HTTPException( 200, detail=f'{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but ' f'{this_packtype} costs {this_packtype.cost}.' ) all_packs = [] for i in range(quantity): all_packs.append(Pack(team_id=this_team.id, pack_type_id=this_packtype.id)) # Deduct card cost from team logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}') this_team.wallet -= total_cost this_team.save() logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}') with db.atomic(): Pack.bulk_create(all_packs, batch_size=15) db.close() raise HTTPException( status_code=200, detail=f'Quantity {quantity} {this_packtype.name} pack{"s" if quantity > 1 else ""} have been purchased by ' f'{this_team.lname} for {total_cost} bucks. You may close this window.' ) @router.get('/{team_id}/sell/cards') async def team_sell_cards(team_id: int, ids: str, ts: str): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if ts != this_team.team_hash(): logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})') db.close() raise HTTPException( status_code=401, detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.' ) all_ids = ids.split(',') del_ids = [] conf_message = '' total_cost = 0 for card_id in all_ids: if card_id != '': try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No card found with id {card_id}') del_ids.append(card_id) this_player = this_card.player if this_card.team != this_team: raise HTTPException(status_code=401, detail=f'Card id {card_id} ({this_player.p_name}) belongs to ' f'{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold') orig_price = this_player.cost sell_price = round(this_player.cost * .5) total_cost += sell_price # credit selling team's wallet if this_team.wallet is None: this_team.wallet = sell_price else: this_team.wallet += sell_price this_team.save() # decrease price of player this_player.change_on_sell() this_card.delete_instance() # post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=int_timestamp(datetime.now()), title=f'Price Change', desc='Modified by buying and selling', field_name=f'{this_player.description}', message=f'From {orig_price}₼ 📉 to **{this_player.cost}**₼', about=f'Player-{this_player.id}' ) new_notif.save() conf_message += f'{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \ f'({this_player.cardset.name}), ' # sheets.post_deletion(SHEETS_AUTH, del_ids) raise HTTPException(status_code=200, detail=f'{conf_message} sold. /// Total Earned: {total_cost}₼ /// ' f'Final Wallet: {this_team.wallet}') @router.get('/{team_id}/cards') async def get_team_cards(team_id, csv: Optional[bool] = True): """ CSV output specifically targeting team roster sheet Parameters ---------- team_id csv """ try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if not csv: db.close() raise HTTPException( status_code=400, detail='The /teams/{team_id}/cards endpoint only supports csv output.' ) all_cards = (Card .select() .join(Player) .join(Rarity) .where(Card.team == this_team) .order_by(-Card.player.rarity.value, Card.player.p_name) ) if all_cards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No cards found') data_list = [[ 'cardset', 'player', 'rarity', 'image', 'image2', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'cost', 'mlbclub', 'franchise', 'set_num', 'bbref_id', 'player_id', 'card_id' ]] for line in all_cards: data_list.append( [ line.player.cardset, line.player.p_name, line.player.rarity, line.player.image, line.player.image2, line.player.pos_1, line.player.pos_2, line.player.pos_3, line.player.pos_4, line.player.pos_5, line.player.pos_6, line.player.pos_7, line.player.pos_8, line.player.cost, line.player.mlbclub, line.player.franchise, line.player.set_num, line.player.bbref_id, line.player.player_id, line.id ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') @router.post('') async def post_team(team: TeamModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post teams. This event has been logged.' ) dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev) if dupe_team: db.close() raise HTTPException(status_code=400, detail=f'There is already a season {team.season} team using {team.abbrev}') this_team = Team( abbrev=team.abbrev, sname=team.sname, lname=team.lname, gmid=team.gmid, gmname=team.gmname, wallet=team.wallet, gsheet=team.gsheet, team_value=team.team_value, collection_value=team.collection_value, logo=team.logo, color=team.color, ranking=team.ranking, season=team.season, career=team.ps_shiny, has_guide=team.has_guide, is_ai=team.is_ai ) saved = this_team.save() if saved == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @router.post('/new-season/{new_season}') async def team_season_update(new_season: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post teams. This event has been logged.' ) r_query = Team.update(ranking=1000, season=new_season, wallet=Team.wallet + 250).execute() current = Current.latest() current.season = new_season current.save() db.close() return {'detail': f'Team rankings, season, and wallet updated for season {new_season}'} @router.post('/{team_id}/money/{delta}') async def team_update_money(team_id: int, delta: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to adjust wallets. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') this_team.wallet += delta if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @router.patch('/{team_id}') async def patch_team( team_id, sname: Optional[str] = None, lname: Optional[str] = None, gmid: Optional[int] = None, gmname: Optional[str] = None, gsheet: Optional[str] = None, team_value: Optional[int] = None, collection_value: Optional[int] = None, logo: Optional[str] = None, color: Optional[str] = None, season: Optional[int] = None, ps_shiny: Optional[int] = None, wallet_delta: Optional[int] = None, has_guide: Optional[bool] = None, is_ai: Optional[bool] = None, ranking: Optional[int] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete teams. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') if sname is not None: this_team.sname = sname if lname is not None: this_team.lname = lname if gmid is not None: this_team.gmid = gmid if gmname is not None: this_team.gmname = gmname if gsheet is not None: this_team.gsheet = gsheet if team_value is not None: this_team.team_value = team_value if collection_value is not None: this_team.collection_value = collection_value if logo is not None: this_team.logo = logo if color is not None: this_team.color = color if season is not None: this_team.season = season if ps_shiny is not None: this_team.career = ps_shiny if ranking is not None: this_team.ranking = ranking if wallet_delta is not None: this_team.wallet += wallet_delta if has_guide is not None: if has_guide: this_team.has_guide = 1 else: this_team.has_guide = 0 if is_ai is not None: if is_ai: this_team.is_ai = 1 else: this_team.is_ai = 0 if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team') @router.delete('/{team_id}') async def delete_team(team_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete teams. This event has been logged.' ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No team found with id {team_id}') count = this_team.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Team {team_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Team {team_id} was not deleted')