import copy from datetime import datetime import pandas as pd from fastapi import APIRouter, Depends, HTTPException, Response, Query from typing import Optional, Literal import logging import pydantic from pandas import DataFrame from ..db_engine import ( db, Team, model_to_dict, fn, Pack, Card, Player, Paperdex, Notification, PackType, Rarity, Current, query_to_csv, complex_data_to_csv, CARDSETS, CardPosition, BattingCardRatings, BattingCard, PitchingCard, PitchingCardRatings, StratGame, LIVE_PROMO_CARDSET_ID, ) from ..dependencies import ( oauth2_scheme, valid_token, LOG_DATA, int_timestamp, PRIVATE_IN_SCHEMA, ) logging.basicConfig( filename=LOG_DATA["filename"], format=LOG_DATA["format"], level=LOG_DATA["log_level"], ) router = APIRouter(prefix="/api/v2/teams", tags=["teams"]) class TeamModel(pydantic.BaseModel): abbrev: str sname: str lname: str gmid: int gmname: str wallet: int = 0 gsheet: str team_value: int = 0 collection_value: int = 0 logo: Optional[str] = None color: Optional[str] = None season: int ps_shiny: Optional[int] = 0 ranking: Optional[int] = 1000 has_guide: Optional[bool] = False is_ai: Optional[bool] = False @router.get("") async def get_teams( season: Optional[int] = None, gm_id: Optional[int] = None, abbrev: Optional[str] = None, tv_min: Optional[int] = None, tv_max: Optional[int] = None, cv_min: Optional[int] = None, cv_max: Optional[int] = None, ps_shiny_min: Optional[int] = None, ps_shiny_max: Optional[int] = None, ranking_min: Optional[int] = None, ranking_max: Optional[int] = None, has_guide: Optional[bool] = None, sname: Optional[str] = None, lname: Optional[str] = None, is_ai: Optional[bool] = None, event_id: Optional[int] = None, limit: Optional[int] = None, csv: Optional[bool] = False, ): """ Param: season: int Param: team_abbrev: string Param: owner_id: int """ if season: all_teams = Team.select_season(season) else: all_teams = Team.select() # if all_teams.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'There are no teams to filter') if gm_id is not None: all_teams = all_teams.where(Team.gmid == gm_id) if abbrev is not None: all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower()) if sname is not None: all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower()) if lname is not None: all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower()) if tv_min is not None: all_teams = all_teams.where(Team.team_value >= tv_min) if tv_max is not None: all_teams = all_teams.where(Team.team_value <= tv_max) if cv_min is not None: all_teams = all_teams.where(Team.collection_value >= cv_min) if cv_max is not None: all_teams = all_teams.where(Team.collection_value <= cv_max) if ps_shiny_min is not None: all_teams = all_teams.where(Team.career >= ps_shiny_min) if ps_shiny_max is not None: all_teams = all_teams.where(Team.career <= ps_shiny_max) if ranking_min is not None: all_teams = all_teams.where(Team.ranking >= ranking_min) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if ranking_max is not None: all_teams = all_teams.where(Team.ranking <= ranking_max) if has_guide is not None: # Use boolean comparison (PostgreSQL-compatible) if not has_guide: all_teams = all_teams.where(Team.has_guide == False) else: all_teams = all_teams.where(Team.has_guide == True) if is_ai is not None: all_teams = all_teams.where(Team.is_ai) if event_id is not None: all_teams = all_teams.where(Team.event_id == event_id) # Default ordering for PostgreSQL compatibility all_teams = all_teams.order_by(Team.id) if limit is not None: all_teams = all_teams.limit(limit) if csv: return_val = query_to_csv(all_teams, exclude=[Team.career]) db.close() return Response(content=return_val, media_type="text/csv") else: return_teams = {"count": all_teams.count(), "teams": []} for x in all_teams: return_teams["teams"].append(model_to_dict(x)) db.close() return return_teams @router.get("/{team_id}") async def get_one_team(team_id, inc_packs: bool = True, csv: Optional[bool] = False): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") p_query = Pack.select().where( (Pack.team == this_team) & (Pack.open_time.is_null(True)) ) if csv: data = model_to_dict(this_team) data["sealed_packs"] = p_query.count() return_val = complex_data_to_csv([data]) else: return_val = model_to_dict(this_team) if inc_packs: return_val["sealed_packs"] = [model_to_dict(x) for x in p_query] db.close() return return_val def get_scouting_dfs(allowed_players, position: str): logging.info(f"allowed_players: {allowed_players}\nposition: {position}") positions = CardPosition.select().where( (CardPosition.player << allowed_players) & (CardPosition.position == position) ) pos_players = [x.player.player_id for x in positions] logging.info(f"pos_players: {pos_players}") if len(pos_players) == 0: return None all_cards = BattingCard.select().where(BattingCard.player << pos_players) all_ratings = BattingCardRatings.select().where( BattingCardRatings.battingcard << all_cards ) vl_query = all_ratings.where(BattingCardRatings.vs_hand == "L") vr_query = all_ratings.where(BattingCardRatings.vs_hand == "R") vl_vals = [model_to_dict(x) for x in vl_query] for x in vl_vals: x.update(x["battingcard"]) x["player_id"] = x["battingcard"]["player"]["player_id"] x["player_name"] = x["battingcard"]["player"]["p_name"] x["rarity"] = x["battingcard"]["player"]["rarity"]["name"] x["cardset_id"] = x["battingcard"]["player"]["cardset"]["id"] x["cardset_name"] = x["battingcard"]["player"]["cardset"]["name"] del x["battingcard"], x["player"] vr_vals = [model_to_dict(x) for x in vr_query] for x in vr_vals: x["player_id"] = x["battingcard"]["player"]["player_id"] del x["battingcard"] vl = pd.DataFrame(vl_vals) vr = pd.DataFrame(vr_vals) bat_df = pd.merge(vl, vr, on="player_id", suffixes=("_vl", "_vr")).set_index( "player_id", drop=False ) logging.info(f"cols:\n{list(bat_df.columns)}") series_list = [] series_list.append( pd.Series( dict([(x.player.player_id, x.range) for x in positions]), name=f"Range {position}", ) ) series_list.append( pd.Series( dict([(x.player.player_id, x.error) for x in positions]), name=f"Error {position}", ) ) series_list.append( pd.Series( dict([(x.player.player_id, x.innings) for x in positions]), name=f"Innings {position}", ) ) if position in ["LF", "CF", "RF"]: series_list.append( pd.Series( dict([(x.player.player_id, x.arm) for x in positions]), name=f"Arm OF" ) ) elif position == "C": series_list.append( pd.Series( dict([(x.player.player_id, x.arm) for x in positions]), name=f"Arm C" ) ) series_list.append( pd.Series( dict([(x.player.player_id, x.pb) for x in positions]), name=f"PB C" ) ) series_list.append( pd.Series( dict([(x.player.player_id, x.overthrow) for x in positions]), name=f"Throw C", ) ) db.close() def get_total_ops(df_data): ops_vl = df_data["obp_vl"] + df_data["slg_vl"] ops_vr = df_data["obp_vr"] + df_data["slg_vr"] return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3 logging.debug(f"series_list: {series_list}") ratings = bat_df.join(series_list) ratings["total_OPS"] = ratings.apply(get_total_ops, axis=1) return ratings @router.get("/{team_id}/lineup/{difficulty_name}") async def get_team_lineup( team_id: int, difficulty_name: str, pitcher_name: str, build_type: str, cardset_id: list = Query(default=None), backup_cardset_id: list = Query(default=None), ): """ d_rank: int - 10: best overall, 9: prioritize range, 8: prioritize error """ this_team = Team.get_or_none(Team.id == team_id) if this_team is None: db.close() raise HTTPException(status_code=404, detail=f"Team id {team_id} not found") if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition": db.close() raise HTTPException( status_code=400, detail=f"Difficulty name {difficulty_name} not a valid check", ) # all_players = Player.select().where( # (fn.Lower(Player.p_name) != pitcher_name.lower()) & (Player.mlbclub == this_team.lname) # ) all_players = Player.select().where(Player.franchise == this_team.sname) if difficulty_name == "exhibition": logging.info(f"pulling an exhibition lineup") if cardset_id is None: db.close() raise HTTPException( status_code=400, detail=f"Must provide at least one cardset_id for exhibition lineups", ) legal_players = all_players.where(Player.cardset_id << cardset_id) if backup_cardset_id is not None: backup_players = all_players.where(Player.cardset_id << backup_cardset_id) else: backup_players = all_players.where( Player.cardset_id << CARDSETS["minor-league"]["primary"] ) else: legal_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["primary"] ) if "secondary" in CARDSETS[difficulty_name]: backup_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["secondary"] ) else: backup_players = None logging.info(f"legal_players: {legal_players.count()}") logging.info(f"legal query: {legal_players}") if backup_players is not None: logging.info(f"backup_players: {backup_players.count()}") player_names = [] starting_nine = { "C": {"player": None, "vl": None, "vr": None, "ops": 0}, "1B": {"player": None, "vl": None, "vr": None, "ops": 0}, "2B": {"player": None, "vl": None, "vr": None, "ops": 0}, "3B": {"player": None, "vl": None, "vr": None, "ops": 0}, "SS": {"player": None, "vl": None, "vr": None, "ops": 0}, "LF": {"player": None, "vl": None, "vr": None, "ops": 0}, "CF": {"player": None, "vl": None, "vr": None, "ops": 0}, "RF": {"player": None, "vl": None, "vr": None, "ops": 0}, "DH": {"player": None, "vl": None, "vr": None, "ops": 0}, } def get_bratings(player_id): this_bcard = BattingCard.get_or_none(BattingCard.player_id == player_id) vl_ratings = BattingCardRatings.get_or_none( BattingCardRatings.battingcard == this_bcard, BattingCardRatings.vs_hand == "L", ) vl_ops = vl_ratings.obp + vl_ratings.slg vr_ratings = BattingCardRatings.get_or_none( BattingCardRatings.battingcard == this_bcard, BattingCardRatings.vs_hand == "R", ) vr_ops = vr_ratings.obp + vr_ratings.slg return ( model_to_dict(vl_ratings), model_to_dict(vr_ratings), (vl_ops + vr_ops + min(vl_ops, vr_ops)) / 3, ) # IDEA: Rank guys by their bat per-position and take the best one that meets a threshold of defensive ability for position in starting_nine.keys(): if position == "DH": # all_bcards = BattingCard.select().where(BattingCard.player << legal_players) # all_batters = BattingCardRatings.select().where( # BattingCardRatings.battingcard << all_bcards # ).order_by(BattingCardRatings.obp + BattingCardRatings.sl) # # for x in all_batters: # if x.battingcard.player.p_name not in player_names: # starting_nine['DH'] = x.battingcard.player # break logging.debug(f"Searching for a DH!") dh_query = legal_players.order_by(Player.cost.desc()) for x in dh_query: logging.debug(f"checking {x.p_name} for {position}") if x.p_name not in player_names and "P" not in x.pos_1: logging.debug(f"adding!") starting_nine["DH"]["player"] = model_to_dict(x) try: vl, vr, total_ops = get_bratings(x.player_id) except AttributeError as e: logging.debug(f"Could not find batting lines") else: # starting_nine[position]['vl'] = vl # starting_nine[position]['vr'] = vr starting_nine[position]["vl"] = vl["obp"] + vl["slg"] starting_nine[position]["vr"] = vr["obp"] + vr["slg"] starting_nine["DH"]["ops"] = total_ops player_names.append(x.p_name) break if starting_nine["DH"]["player"] is None: dh_query = backup_players.order_by(Player.cost.desc()) for x in dh_query: logging.debug(f"checking {x.p_name} for {position}") if x.p_name not in player_names: logging.debug(f"adding!") starting_nine["DH"]["player"] = model_to_dict(x) try: vl, vr, total_ops = get_bratings(x.player_id) except AttributeError as e: logging.debug(f"Could not find batting lines") else: vl, vr, total_ops = get_bratings(x.player_id) starting_nine[position]["vl"] = vl["obp"] + vl["slg"] starting_nine[position]["vr"] = vr["obp"] + vr["slg"] starting_nine["DH"]["ops"] = total_ops player_names.append(x.p_name) break else: pos_group = CardPosition.select().where( (CardPosition.position == position) & (CardPosition.player << legal_players) ) backup_group = ( CardPosition.select() .where( (CardPosition.position == position) & (CardPosition.player << backup_players) ) .order_by(CardPosition.innings.desc()) ) if difficulty_name in ["minor-league", "gauntlet-3", "gauntlet-5"]: pos_group = pos_group.order_by(CardPosition.innings.desc()) for x in pos_group: logging.debug(f"checking {x.player.p_name} for {position}") if ( x.player.p_name not in player_names and x.player.p_name.lower() != pitcher_name ): logging.debug(f"adding!") starting_nine[position]["player"] = model_to_dict(x.player) vl, vr, total_ops = get_bratings(x.player.player_id) starting_nine[position]["vl"] = vl starting_nine[position]["vr"] = vr # starting_nine[position]['vl'] = vl.obp_vl + vl.slg_vl # starting_nine[position]['vr'] = vr.obp_vr + vr.slg_vr starting_nine[position]["ops"] = total_ops player_names.append(x.player.p_name) break # elif difficulty_name in ['major-league', 'flashback', 'hall-of-fame']: else: logging.debug(f"entering difficulty: {difficulty_name}") eligible_cards = get_scouting_dfs(legal_players, position) logging.debug(f"got dataframe:\n{eligible_cards}") # if position == '1B': # return Response(content=eligible_cards.to_csv(index=False), media_type='text/csv') def rank_cards(df_data): if position in ["C", "SS", "2B", "CF"]: r_mult = 0.05 e_mult = -0.01 else: r_mult = 0.025 e_mult = -0.005 r_mod = (3 - df_data[f"Range {position}"]) * r_mult e_mod = df_data[f"Error {position}"] * e_mult i_mult = df_data[f"Innings {position}"] / 1000 # final_ops = df_data['total_OPS'] + r_mod + e_mod # final_ops = (df_data['total_OPS'] * i_mult) + r_mod + e_mod final_ops = (df_data["total_OPS"] + r_mod + e_mod) * i_mult logging.debug( f"{df_data.player_name} total OPS: {df_data.total_OPS} / " f"final OPS: {final_ops}" ) return final_ops if eligible_cards is not None and len(eligible_cards.index) >= 1: eligible_cards["final_ops"] = eligible_cards.apply( rank_cards, axis=1 ) logging.debug(f"final_ops:\n{eligible_cards['final_ops']}") eligible_cards.sort_values( by=["final_ops"], ascending=False, inplace=True ) this_row = None for x in range(len(eligible_cards.index)): if eligible_cards.iloc[x].player_name not in player_names: this_row = eligible_cards.iloc[x] break if this_row is not None: starting_nine[position]["player"] = model_to_dict( Player.get_by_id(this_row.player_id) ) starting_nine[position]["vl"] = ( this_row.obp_vl + this_row.slg_vl ) starting_nine[position]["vr"] = ( this_row.obp_vr + this_row.slg_vr ) starting_nine[position]["ops"] = this_row.total_OPS player_names.append(this_row.player_name) logging.debug( f"pos_group: {pos_group}\n{starting_nine}\n{player_names}\n\n" ) if starting_nine[position]["player"] is None: for x in backup_group: logging.info(f"checking {x.player.p_name} for {position}") if ( x.player.p_name not in player_names and x.player.p_name.lower() != pitcher_name ): logging.debug(f"adding!") starting_nine[position]["player"] = model_to_dict(x.player) vl, vr, total_ops = get_bratings(x.player.player_id) starting_nine[position]["vl"] = vl["obp"] + vl["slg"] starting_nine[position]["vr"] = vr["obp"] + vr["slg"] starting_nine[position]["ops"] = total_ops player_names.append(x.player.p_name) break # all_bcards = BattingCard.select().where(BattingCard.player << starting_nine.values()) # all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_bcards) # # vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L') # vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R') # # vl_vals = [model_to_dict(x) for x in vl_query] # for x in vl_vals: # x.update(x['battingcard']) # x['player_id'] = x['battingcard']['player']['player_id'] # x['player_name'] = x['battingcard']['player']['p_name'] # x['rarity'] = x['battingcard']['player']['rarity']['name'] # x['cardset_id'] = x['battingcard']['player']['cardset']['id'] # x['cardset_name'] = x['battingcard']['player']['cardset']['name'] # del x['player'] # # vr_vals = [model_to_dict(x) for x in vr_query] # for x in vr_vals: # x['player_id'] = x['battingcard']['player']['player_id'] # del x['battingcard'] # # vl = pd.DataFrame(vl_vals) # vr = pd.DataFrame(vr_vals) # db.close() # # output = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')) # # def get_total_ops(df_data): # ops_vl = df_data['obp_vL'] + df_data['slg_vL'] # ops_vr = df_data['obp_vR'] + df_data['slg_vR'] # return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3 # output['total_OPS'] = output.apply(get_total_ops, axis=1) # output = output.sort_values(by=['total_OPS'], ascending=False) sorted_nine = sorted( starting_nine.items(), key=lambda item: item[1]["ops"], reverse=True ) return {"json": dict(sorted_nine), "array": sorted_nine} def sort_pitchers(pitching_card_query) -> DataFrame | None: all_s = [model_to_dict(x, recurse=False) for x in pitching_card_query] if len(all_s) == 0: logging.error(f"Empty pitching_card_query: {pitching_card_query}") return None pitcher_df = pd.DataFrame(all_s).set_index("player", drop=False) logging.debug(f"pitcher_df: {pitcher_df}") def get_total_ops(df_data): vlval = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard_id == df_data["id"], PitchingCardRatings.vs_hand == "L", ) vrval = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard_id == df_data["id"], PitchingCardRatings.vs_hand == "R", ) ops_vl = vlval.obp + vlval.slg ops_vr = vrval.obp + vrval.slg # TODO: should this be max?? return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3 pitcher_df["total_ops"] = pitcher_df.apply(get_total_ops, axis=1) return pitcher_df.sort_values(by="total_ops") @router.get("/{team_id}/sp/{difficulty_name}") async def get_team_sp( team_id: int, difficulty_name: str, sp_rank: int, cardset_id: list = Query(default=None), backup_cardset_id: list = Query(default=None), ): logging.info( f"get_team_sp - team_id: {team_id} / difficulty_name: {difficulty_name} / sp_rank: {sp_rank}" ) this_team = Team.get_or_none(Team.id == team_id) if this_team is None: db.close() raise HTTPException(status_code=404, detail=f"Team id {team_id} not found") if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition": db.close() raise HTTPException( status_code=400, detail=f"Difficulty name {difficulty_name} not a valid check", ) all_players = Player.select().where(Player.franchise == this_team.sname) if difficulty_name == "exhibition": logging.info(f"pulling an exhibition lineup") if cardset_id is None: db.close() raise HTTPException( status_code=400, detail=f"Must provide at least one cardset_id for exhibition lineups", ) legal_players = all_players.where(Player.cardset_id << cardset_id) if backup_cardset_id is not None: backup_players = all_players.where(Player.cardset_id << backup_cardset_id) else: backup_players = all_players.where( Player.cardset_id << CARDSETS["minor-league"]["primary"] ) else: legal_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["primary"] ) if "secondary" in CARDSETS[difficulty_name]: backup_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["secondary"] ) else: backup_players = None def sort_starters(starter_query) -> DataFrame | None: all_s = [model_to_dict(x, recurse=False) for x in starter_query] if len(all_s) == 0: logging.error(f"Empty starter_query: {starter_query}") return None starter_df = pd.DataFrame(all_s).set_index("player", drop=False) logging.debug(f"starter_df: {starter_df}") def get_total_ops(df_data): vlval = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard_id == df_data["id"], PitchingCardRatings.vs_hand == "L", ) vrval = PitchingCardRatings.get_or_none( PitchingCardRatings.pitchingcard_id == df_data["id"], PitchingCardRatings.vs_hand == "R", ) ops_vl = vlval.obp + vlval.slg ops_vr = vrval.obp + vrval.slg return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3 starter_df["total_ops"] = starter_df.apply(get_total_ops, axis=1) return starter_df.sort_values(by="total_ops") # Find SP in primary cardsets s_query = ( PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.starter_rating >= 4) ) ) all_starters = sort_starters(s_query) logging.debug(f"sorted: {all_starters}") if all_starters is not None and len(all_starters.index) >= sp_rank: this_player_id = all_starters.iloc[sp_rank - 1].player this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False) db.close() return this_player if all_starters is not None and len(all_starters.index) > 0: this_player_id = all_starters.iloc[len(all_starters.index) - 1].player this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False) db.close() return this_player # Include backup cardsets s_query = PitchingCard.select().where( (PitchingCard.player << backup_players) & (PitchingCard.starter_rating >= 4) ) all_starters = sort_starters(s_query) logging.debug(f"sorted: {all_starters}") if all_starters is not None and len(all_starters.index) >= sp_rank: this_player_id = all_starters.iloc[sp_rank - 1].player this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False) db.close() return this_player if all_starters is not None and len(all_starters.index) > 0: this_player_id = all_starters.iloc[len(all_starters.index) - 1].player this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False) db.close() return this_player raise HTTPException( status_code=400, detail=f"No SP #{sp_rank} found for Team {team_id}" ) @router.get("/{team_id}/rp/{difficulty_name}") async def get_team_rp( team_id: int, difficulty_name: str, need: Literal["length", "setup", "closer", "middle"], used_pitcher_ids: list = Query(default=[]), cardset_id: list = Query(default=None), backup_cardset_id: list = Query(default=None), ): logging.info( f"get_team_rp - team_id: {team_id} / difficulty_name: {difficulty_name} / need: {need} " f"/ used_pitcher_ids: {used_pitcher_ids}" ) this_team = Team.get_or_none(Team.id == team_id) if this_team is None: db.close() raise HTTPException(status_code=404, detail=f"Team id {team_id} not found") if difficulty_name not in CARDSETS.keys() and difficulty_name != "exhibition": db.close() raise HTTPException( status_code=400, detail=f"Difficulty name {difficulty_name} not a valid check", ) all_players = Player.select().where( (Player.franchise == this_team.sname) & (Player.player_id.not_in(used_pitcher_ids)) ) if difficulty_name == "exhibition": logging.info(f"pulling an exhibition RP") if cardset_id is None: db.close() raise HTTPException( status_code=400, detail=f"Must provide at least one cardset_id for exhibition lineups", ) legal_players = all_players.where(Player.cardset_id << cardset_id) if backup_cardset_id is not None: backup_players = all_players.where(Player.cardset_id << backup_cardset_id) else: backup_players = all_players.where( Player.cardset_id << CARDSETS["minor-league"]["primary"] ) else: legal_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["primary"] ) if "secondary" in CARDSETS[difficulty_name]: backup_players = all_players.where( Player.cardset_id << CARDSETS[difficulty_name]["secondary"] ) else: backup_players = None logging.info(f"legal_players: {legal_players.count()}") logging.info(f"legal query: {legal_players}") if need == "closer": for query in [ PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.closer_rating >= 3) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.closer_rating >= 1) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.closer_rating >= 3) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.closer_rating >= 1) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4) ), ]: all_relievers = sort_pitchers(query) if all_relievers is not None: logging.info(f"RP query: {query}") this_player_id = all_relievers.iloc[0].player this_player = model_to_dict( Player.get_by_id(this_player_id), recurse=False ) db.close() return this_player elif need == "setup": for query in [ PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4) ), ]: all_relievers = sort_pitchers(query) if all_relievers is not None and len(all_relievers.index) >= 2: this_player_id = all_relievers.iloc[1].player this_player = model_to_dict( Player.get_by_id(this_player_id), recurse=False ) db.close() return this_player elif need == "length" or len(used_pitcher_ids) > 4: for query in [ PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.relief_rating >= 3) & (PitchingCard.starter_rating < 4) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.relief_rating >= 2) & (PitchingCard.starter_rating < 4) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.relief_rating >= 2) & (PitchingCard.starter_rating < 4) ), ]: all_relievers = sort_pitchers(query) if all_relievers is not None: this_player_id = all_relievers.iloc[0].player this_player = model_to_dict( Player.get_by_id(this_player_id), recurse=False ) db.close() return this_player elif need == "middle": for query in [ PitchingCard.select() .join(Player) .where( (PitchingCard.player << legal_players) & (PitchingCard.starter_rating == 1) ), PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4) ), ]: all_relievers = sort_pitchers(query) if all_relievers is not None and len(all_relievers.index) >= 3: this_player_id = all_relievers.iloc[2].player this_player = model_to_dict( Player.get_by_id(this_player_id), recurse=False ) db.close() return this_player logging.info(f"Falling to last chance pitcher") all_relievers = sort_pitchers( PitchingCard.select() .join(Player) .where( (PitchingCard.player << backup_players) | (PitchingCard.player << legal_players) ) ) if all_relievers is not None: this_player_id = all_relievers.iloc[len(all_relievers.index) - 1].player this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False) db.close() return this_player raise HTTPException(status_code=400, detail=f"No RP found for Team {team_id}") @router.get("/{team_id}/season-record/{season}") async def get_team_record(team_id: int, season: int): all_games = StratGame.select().where( ((StratGame.away_team_id == team_id) | (StratGame.home_team_id == team_id)) & (StratGame.season == season) & (StratGame.short_game == False) ) template = { "ARI": [0, 0, 0], "ATL": [0, 0, 0], "BAL": [0, 0, 0], "BOS": [0, 0, 0], "CHC": [0, 0, 0], "CHW": [0, 0, 0], "CIN": [0, 0, 0], "CLE": [0, 0, 0], "COL": [0, 0, 0], "DET": [0, 0, 0], "NYY": [0, 0, 0], "TBR": [0, 0, 0], "TOR": [0, 0, 0], "PHI": [0, 0, 0], "MIA": [0, 0, 0], "NYM": [0, 0, 0], "WSN": [0, 0, 0], "MIN": [0, 0, 0], "KCR": [0, 0, 0], "HOU": [0, 0, 0], "TEX": [0, 0, 0], "SEA": [0, 0, 0], "LAA": [0, 0, 0], "OAK": [0, 0, 0], "MIL": [0, 0, 0], "PIT": [0, 0, 0], "STL": [0, 0, 0], "LAD": [0, 0, 0], "SDP": [0, 0, 0], "SFG": [0, 0, 0], "ALAS": [0, 0, 0], "NLAS": [0, 0, 0], } standings = { "minor-league": copy.deepcopy(template), "major-league": copy.deepcopy(template), "hall-of-fame": copy.deepcopy(template), "flashback": copy.deepcopy(template), "unlimited": copy.deepcopy(template), "ranked": copy.deepcopy(template), "exhibition": copy.deepcopy(template), } for game in all_games: run_diff = game.away_score - game.home_score if run_diff > 0: # Away team won if game.away_team_id == team_id: # Human is away team standings[game.game_type][game.home_team.abbrev][0] += 1 standings[game.game_type][game.home_team.abbrev][2] += run_diff else: # Human is home team standings[game.game_type][game.away_team.abbrev][1] += 1 standings[game.game_type][game.away_team.abbrev][2] -= run_diff elif run_diff < 0: # Home team won if game.away_team_id == team_id: # Human is away team if game.home_team.abbrev not in standings[game.game_type]: standings[game.game_type][game.home_team.abbrev] = [0, 0, 0] standings[game.game_type][game.home_team.abbrev][1] += 1 standings[game.game_type][game.home_team.abbrev][2] -= run_diff else: # Human is home team if game.away_team.abbrev not in standings[game.game_type]: standings[game.game_type][game.away_team.abbrev] = [0, 0, 0] standings[game.game_type][game.away_team.abbrev][0] += 1 standings[game.game_type][game.away_team.abbrev][2] -= run_diff # for lg_query in [minor_games, major_games, hof_games]: # this_lg = copy.deepcopy(template) # for x in range(1, 30): # team_games = lg_query.where((StratGame.away_team_id == x) | (StratGame.home_team_id == x)) # for game in team_games: db.close() return standings @router.get("/{team_id}/buy/players", include_in_schema=PRIVATE_IN_SCHEMA) async def team_buy_players(team_id: int, ids: str, ts: str): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") if ts != this_team.team_hash(): logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})") db.close() raise HTTPException( status_code=401, detail=f"You are not authorized to buy {this_team.abbrev} cards. This event has been logged.", ) last_card = Card.select(Card.id).order_by(-Card.id).limit(1) lc_id = last_card[0].id all_ids = ids.split(",") conf_message = "" total_cost = 0 for player_id in all_ids: if player_id != "": try: this_player = Player.get_by_id(player_id) except Exception: db.close() raise HTTPException( status_code=404, detail=f"No player found with id {player_id} /// " f"{conf_message} purchased", ) # check wallet balance if this_team.wallet < this_player.cost: logging.error( f"{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but " f"{this_player} costs {this_player.cost}₼." ) db.close() raise HTTPException( 200, detail=f"{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but " f"{this_player} costs {this_player.cost}₼. /// {conf_message} purchased", ) # Create player card and update cost buy_price = this_player.cost total_cost += buy_price this_card = Card( player_id=this_player.player_id, team_id=this_team.id, value=buy_price ) Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id) this_card.save() this_player.change_on_buy() # Deduct card cost from team logging.info(f"{this_team.abbrev} starting wallet: {this_team.wallet}") this_team.wallet -= buy_price this_team.save() logging.info(f"{this_team.abbrev} ending wallet: {this_team.wallet}") # Post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=datetime.now(), title=f"Price Change", desc="Modified by buying and selling", field_name=f"{this_player.description} " f"{this_player.p_name if this_player.p_name not in this_player.description else ''}", message=f"From {buy_price}₼ 📈 to **{this_player.cost}**₼", about=f"Player-{this_player.player_id}", ) new_notif.save() conf_message += ( f"{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} " f"({this_player.cardset.name}), " ) # sheets.post_new_cards(SHEETS_AUTH, lc_id) raise HTTPException( status_code=200, detail=f"{conf_message} purchased. /// Total Cost: {total_cost}₼ /// " f"Final Wallet: {this_team.wallet}", ) @router.get("/{team_id}/buy/pack/{packtype_id}", include_in_schema=PRIVATE_IN_SCHEMA) async def team_buy_packs( team_id: int, packtype_id: int, ts: str, quantity: Optional[int] = 1 ): try: this_packtype = PackType.get_by_id(packtype_id) except Exception: db.close() raise HTTPException( status_code=404, detail=f"No pack type found with id {packtype_id}" ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") if ts != this_team.team_hash(): logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})") db.close() logging.warning( f"team: {this_team} / pack_type: {this_packtype} / secret: {ts} / " f"actual: {this_team.team_hash()}" ) raise HTTPException( status_code=401, detail=f"You are not authorized to buy {this_team.abbrev} packs. This event has been logged.", ) # check wallet balance total_cost = this_packtype.cost * quantity if this_team.wallet < total_cost: db.close() raise HTTPException( 200, detail=f"{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but " f"{this_packtype} costs {this_packtype.cost}.", ) all_packs = [] cardset_id = None if packtype_id == 9: cardset_id = LIVE_PROMO_CARDSET_ID for i in range(quantity): all_packs.append( Pack( team_id=this_team.id, pack_type_id=this_packtype.id, pack_cardset_id=cardset_id, ) ) # Deduct card cost from team logging.info(f"{this_team.abbrev} starting wallet: {this_team.wallet}") this_team.wallet -= total_cost this_team.save() logging.info(f"{this_team.abbrev} ending wallet: {this_team.wallet}") with db.atomic(): Pack.bulk_create(all_packs, batch_size=15) db.close() raise HTTPException( status_code=200, detail=f"Quantity {quantity} {this_packtype.name} pack{'s' if quantity > 1 else ''} have been purchased by " f"{this_team.lname} for {total_cost} bucks. You may close this window.", ) @router.get("/{team_id}/sell/cards", include_in_schema=PRIVATE_IN_SCHEMA) async def team_sell_cards(team_id: int, ids: str, ts: str): try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") if ts != this_team.team_hash(): logging.warning(f"Bad Team Secret: {ts} ({this_team.team_hash()})") db.close() raise HTTPException( status_code=401, detail=f"You are not authorized to sell {this_team.abbrev} cards. This event has been logged.", ) all_ids = ids.split(",") del_ids = [] conf_message = "" total_cost = 0 for card_id in all_ids: if card_id != "": try: this_card = Card.get_by_id(card_id) except Exception: db.close() raise HTTPException( status_code=404, detail=f"No card found with id {card_id}" ) del_ids.append(card_id) this_player = this_card.player if this_card.team != this_team: raise HTTPException( status_code=401, detail=f"Card id {card_id} ({this_player.p_name}) belongs to " f"{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold", ) orig_price = this_player.cost sell_price = round(this_player.cost * 0.5) total_cost += sell_price # credit selling team's wallet if this_team.wallet is None: this_team.wallet = sell_price else: this_team.wallet += sell_price this_team.save() # decrease price of player this_player.change_on_sell() this_card.delete_instance() # post a notification if this_player.rarity.value >= 2: new_notif = Notification( created=datetime.now(), title=f"Price Change", desc="Modified by buying and selling", field_name=f"{this_player.description} " f"{this_player.p_name if this_player.p_name not in this_player.description else ''}", message=f"From {orig_price}₼ 📉 to **{this_player.cost}**₼", about=f"Player-{this_player.id}", ) new_notif.save() conf_message += ( f"{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} " f"({this_player.cardset.name}), " ) # sheets.post_deletion(SHEETS_AUTH, del_ids) raise HTTPException( status_code=200, detail=f"{conf_message} sold. /// Total Earned: {total_cost}₼ /// " f"Final Wallet: {this_team.wallet}", ) @router.get("/{team_id}/cards") async def get_team_cards(team_id, csv: Optional[bool] = True): """ CSV output specifically targeting team roster sheet Parameters ---------- team_id csv """ try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") if not csv: db.close() raise HTTPException( status_code=400, detail="The /teams/{team_id}/cards endpoint only supports csv output.", ) all_cards = ( Card.select() .join(Player) .join(Rarity) .where(Card.team == this_team) .order_by(-Card.player.rarity.value, Card.player.p_name) ) if all_cards.count() == 0: db.close() raise HTTPException(status_code=404, detail=f"No cards found") card_vals = [model_to_dict(x) for x in all_cards] db.close() for x in card_vals: x.update(x["player"]) x["player_id"] = x["player"]["player_id"] x["player_name"] = x["player"]["p_name"] x["cardset_id"] = x["player"]["cardset"]["id"] x["cardset_name"] = x["player"]["cardset"]["name"] x["rarity"] = x["player"]["rarity"]["name"] x["card_id"] = x["id"] card_df = pd.DataFrame(card_vals) output = card_df[ [ "cardset_name", "player_name", "rarity", "image", "image2", "pos_1", "pos_2", "pos_3", "pos_4", "pos_5", "pos_6", "pos_7", "pos_8", "cost", "mlbclub", "franchise", "fangr_id", "bbref_id", "player_id", "card_id", ] ] return Response( content=pd.DataFrame(output).to_csv(index=False), media_type="text/csv" ) @router.post("", include_in_schema=PRIVATE_IN_SCHEMA) async def post_team(team: TeamModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to post teams. This event has been logged.", ) dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev) if dupe_team: db.close() raise HTTPException( status_code=400, detail=f"There is already a season {team.season} team using {team.abbrev}", ) this_team = Team( abbrev=team.abbrev, sname=team.sname, lname=team.lname, gmid=team.gmid, gmname=team.gmname, wallet=team.wallet, gsheet=team.gsheet, team_value=team.team_value, collection_value=team.collection_value, logo=team.logo, color=team.color, ranking=team.ranking, season=team.season, career=team.ps_shiny, has_guide=team.has_guide, is_ai=team.is_ai, ) saved = this_team.save() if saved == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException( status_code=418, detail="Well slap my ass and call me a teapot; I could not save that team", ) @router.post("/new-season/{new_season}", include_in_schema=PRIVATE_IN_SCHEMA) async def team_season_update(new_season: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to post teams. This event has been logged.", ) r_query = Team.update( ranking=1000, season=new_season, wallet=Team.wallet + 250, has_guide=False ).execute() current = Current.latest() current.season = new_season current.save() db.close() return { "detail": f"Team rankings, season, guides, and wallets updated for season {new_season}" } @router.post("/{team_id}/money/{delta}", include_in_schema=PRIVATE_IN_SCHEMA) async def team_update_money( team_id: int, delta: int, token: str = Depends(oauth2_scheme) ): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to adjust wallets. This event has been logged.", ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") this_team.wallet += delta if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException( status_code=418, detail="Well slap my ass and call me a teapot; I could not save that team", ) @router.patch("/{team_id}", include_in_schema=PRIVATE_IN_SCHEMA) async def patch_team( team_id, sname: Optional[str] = None, lname: Optional[str] = None, gmid: Optional[int] = None, gmname: Optional[str] = None, gsheet: Optional[str] = None, team_value: Optional[int] = None, collection_value: Optional[int] = None, logo: Optional[str] = None, color: Optional[str] = None, season: Optional[int] = None, ps_shiny: Optional[int] = None, wallet_delta: Optional[int] = None, has_guide: Optional[bool] = None, is_ai: Optional[bool] = None, ranking: Optional[int] = None, token: str = Depends(oauth2_scheme), abbrev: Optional[str] = None, ): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to delete teams. This event has been logged.", ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") if abbrev is not None: this_team.abbrev = abbrev if sname is not None: this_team.sname = sname if lname is not None: this_team.lname = lname if gmid is not None: this_team.gmid = gmid if gmname is not None: this_team.gmname = gmname if gsheet is not None: this_team.gsheet = gsheet if team_value is not None: this_team.team_value = team_value if collection_value is not None: this_team.collection_value = collection_value if logo is not None: this_team.logo = logo if color is not None: this_team.color = color if season is not None: this_team.season = season if ps_shiny is not None: this_team.career = ps_shiny if ranking is not None: this_team.ranking = ranking if wallet_delta is not None: this_team.wallet += wallet_delta if has_guide is not None: if has_guide: this_team.has_guide = 1 else: this_team.has_guide = 0 if is_ai is not None: if is_ai: this_team.is_ai = 1 else: this_team.is_ai = 0 if this_team.save() == 1: return_team = model_to_dict(this_team) db.close() return return_team else: raise HTTPException( status_code=418, detail="Well slap my ass and call me a teapot; I could not save that team", ) @router.delete("/{team_id}", include_in_schema=PRIVATE_IN_SCHEMA) async def delete_team(team_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to delete teams. This event has been logged.", ) try: this_team = Team.get_by_id(team_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f"No team found with id {team_id}") count = this_team.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f"Team {team_id} has been deleted") else: raise HTTPException(status_code=500, detail=f"Team {team_id} was not deleted")