paper-dynasty-database/app/routers_v2/teams.py
2023-11-20 01:00:10 -06:00

1160 lines
47 KiB
Python

import copy
from datetime import datetime
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException, Response, Query
from typing import Optional, Literal
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Team, model_to_dict, fn, Pack, Card, Player, Paperdex, Notification, PackType, \
Rarity, Current, query_to_csv, complex_data_to_csv, CARDSETS, CardPosition, BattingCardRatings, BattingCard, \
PitchingCard, PitchingCardRatings, StratGame
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA, int_timestamp
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/teams',
tags=['teams']
)
class TeamModel(pydantic.BaseModel):
abbrev: str
sname: str
lname: str
gmid: int
gmname: str
wallet: int = 0
gsheet: str
team_value: int = 0
collection_value: int = 0
logo: Optional[str] = None
color: Optional[str] = None
season: int
ps_shiny: Optional[int] = 0
ranking: Optional[int] = 1000
has_guide: Optional[bool] = False
is_ai: Optional[bool] = False
@router.get('')
async def get_teams(
season: Optional[int] = None, gm_id: Optional[int] = None, abbrev: Optional[str] = None,
tv_min: Optional[int] = None, tv_max: Optional[int] = None, cv_min: Optional[int] = None,
cv_max: Optional[int] = None, ps_shiny_min: Optional[int] = None, ps_shiny_max: Optional[int] = None,
ranking_min: Optional[int] = None, ranking_max: Optional[int] = None, has_guide: Optional[bool] = None,
sname: Optional[str] = None, lname: Optional[str] = None, is_ai: Optional[bool] = None,
event_id: Optional[int] = None, limit: Optional[int] = None, csv: Optional[bool] = False):
"""
Param: season: int
Param: team_abbrev: string
Param: owner_id: int
"""
if season:
all_teams = Team.select_season(season)
else:
all_teams = Team.select()
# if all_teams.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no teams to filter')
if gm_id is not None:
all_teams = all_teams.where(Team.gmid == gm_id)
if abbrev is not None:
all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower())
if sname is not None:
all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower())
if lname is not None:
all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower())
if tv_min is not None:
all_teams = all_teams.where(Team.team_value >= tv_min)
if tv_max is not None:
all_teams = all_teams.where(Team.team_value <= tv_max)
if cv_min is not None:
all_teams = all_teams.where(Team.collection_value >= cv_min)
if cv_max is not None:
all_teams = all_teams.where(Team.collection_value <= cv_max)
if ps_shiny_min is not None:
all_teams = all_teams.where(Team.career >= ps_shiny_min)
if ps_shiny_max is not None:
all_teams = all_teams.where(Team.career <= ps_shiny_max)
if ranking_min is not None:
all_teams = all_teams.where(Team.ranking >= ranking_min)
if ranking_max is not None:
all_teams = all_teams.where(Team.ranking <= ranking_max)
if ranking_max is not None:
all_teams = all_teams.where(Team.ranking <= ranking_max)
if has_guide is not None:
if not has_guide:
all_teams = all_teams.where(Team.has_guide == 0)
else:
all_teams = all_teams.where(Team.has_guide == 1)
if is_ai is not None:
all_teams = all_teams.where(Team.is_ai)
if event_id is not None:
all_teams = all_teams.where(Team.event_id == event_id)
if limit is not None:
all_teams = all_teams.limit(limit)
if csv:
return_val = query_to_csv(all_teams, exclude=[Team.career])
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_teams = {'count': all_teams.count(), 'teams': []}
for x in all_teams:
return_teams['teams'].append(model_to_dict(x))
db.close()
return return_teams
@router.get('/{team_id}')
async def get_one_team(team_id, inc_packs: bool = True, csv: Optional[bool] = False):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
p_query = Pack.select().where((Pack.team == this_team) & (Pack.open_time.is_null(True)))
if csv:
data = model_to_dict(this_team)
data['sealed_packs'] = p_query.count()
return_val = complex_data_to_csv([data])
else:
return_val = model_to_dict(this_team)
if inc_packs:
return_val['sealed_packs'] = [model_to_dict(x) for x in p_query]
db.close()
return return_val
def get_scouting_dfs(allowed_players, position: str):
positions = CardPosition.select().where(
(CardPosition.player << allowed_players) & (CardPosition.position == position)
)
pos_players = [x.player.player_id for x in positions]
all_cards = BattingCard.select().where(BattingCard.player << pos_players)
all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_cards)
vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
vl_vals = [model_to_dict(x) for x in vl_query]
for x in vl_vals:
x.update(x['battingcard'])
x['player_id'] = x['battingcard']['player']['player_id']
x['player_name'] = x['battingcard']['player']['p_name']
x['rarity'] = x['battingcard']['player']['rarity']['name']
x['cardset_id'] = x['battingcard']['player']['cardset']['id']
x['cardset_name'] = x['battingcard']['player']['cardset']['name']
del x['battingcard'], x['player']
vr_vals = [model_to_dict(x) for x in vr_query]
for x in vr_vals:
x['player_id'] = x['battingcard']['player']['player_id']
del x['battingcard']
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
bat_df = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')).set_index('player_id', drop=False)
logging.info(f'cols:\n{list(bat_df.columns)}')
series_list = []
series_list.append(pd.Series(
dict([(x.player.player_id, x.range) for x in positions]),
name=f'Range {position}'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.error) for x in positions]),
name=f'Error {position}'
))
if position in ['LF', 'CF', 'RF']:
series_list.append(pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]),
name=f'Arm OF'
))
elif position == 'C':
series_list.append(pd.Series(
dict([(x.player.player_id, x.arm) for x in positions]),
name=f'Arm C'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.pb) for x in positions]),
name=f'PB C'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.overthrow) for x in positions]),
name=f'Throw C'
))
db.close()
def get_total_ops(df_data):
ops_vl = df_data['obp_vl'] + df_data['slg_vl']
ops_vr = df_data['obp_vr'] + df_data['slg_vr']
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
logging.debug(f'series_list: {series_list}')
ratings = bat_df.join(series_list)
ratings['total_OPS'] = ratings.apply(get_total_ops, axis=1)
return ratings
@router.get('/{team_id}/lineup/{difficulty_name}')
async def get_team_lineup(team_id: int, difficulty_name: str, pitcher_name: str, build_type: str):
"""
d_rank: int - 10: best overall, 9: prioritize range, 8: prioritize error
"""
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
db.close()
raise HTTPException(status_code=404, detail=f'Team id {team_id} not found')
if difficulty_name not in CARDSETS.keys():
db.close()
raise HTTPException(status_code=400, detail=f'Difficulty name {difficulty_name} not a valid check')
# all_players = Player.select().where(
# (fn.Lower(Player.p_name) != pitcher_name.lower()) & (Player.mlbclub == this_team.lname)
# )
all_players = Player.select().where(Player.franchise == this_team.lname)
legal_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['primary'])
if 'secondary' in CARDSETS[difficulty_name]:
backup_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['secondary'])
else:
backup_players = None
logging.info(f'legal_players: {legal_players.count()}')
if backup_players is not None:
logging.info(f'backup_players: {backup_players.count()}')
player_names = []
starting_nine = {
'C': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'1B': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'2B': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'3B': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'SS': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'LF': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'CF': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'RF': {'player': None, 'vl': None, 'vr': None, 'ops': 0},
'DH': {'player': None, 'vl': None, 'vr': None, 'ops': 0}
}
def get_bratings(player_id):
this_bcard = BattingCard.get_or_none(BattingCard.player_id == player_id)
vl_ratings = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bcard, BattingCardRatings.vs_hand == 'L'
)
vl_ops = vl_ratings.obp + vl_ratings.slg
vr_ratings = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bcard, BattingCardRatings.vs_hand == 'R'
)
vr_ops = vr_ratings.obp + vr_ratings.slg
return model_to_dict(vl_ratings), model_to_dict(vr_ratings), (vl_ops + vr_ops + min(vl_ops, vr_ops)) / 3
# IDEA: Rank guys by their bat per-position and take the best one that meets a threshold of defensive ability
for position in starting_nine.keys():
if position == 'DH':
# all_bcards = BattingCard.select().where(BattingCard.player << legal_players)
# all_batters = BattingCardRatings.select().where(
# BattingCardRatings.battingcard << all_bcards
# ).order_by(BattingCardRatings.obp + BattingCardRatings.sl)
#
# for x in all_batters:
# if x.battingcard.player.p_name not in player_names:
# starting_nine['DH'] = x.battingcard.player
# break
logging.debug(f'Searching for a DH!')
dh_query = legal_players.order_by(Player.cost.desc())
for x in dh_query:
logging.debug(f'checking {x.p_name} for {position}')
if x.p_name not in player_names and 'P' not in x.pos_1:
logging.debug(f'adding!')
starting_nine['DH']['player'] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError as e:
logging.debug(f'Could not find batting lines')
else:
# starting_nine[position]['vl'] = vl
# starting_nine[position]['vr'] = vr
starting_nine[position]['vl'] = vl['obp'] + vl['slg']
starting_nine[position]['vr'] = vr['obp'] + vr['slg']
starting_nine['DH']['ops'] = total_ops
player_names.append(x.p_name)
break
if starting_nine['DH']['player'] is None:
dh_query = backup_players.order_by(Player.cost.desc())
for x in dh_query:
logging.debug(f'checking {x.p_name} for {position}')
if x.p_name not in player_names:
logging.debug(f'adding!')
starting_nine['DH']['player'] = model_to_dict(x)
try:
vl, vr, total_ops = get_bratings(x.player_id)
except AttributeError as e:
logging.debug(f'Could not find batting lines')
else:
vl, vr, total_ops = get_bratings(x.player_id)
starting_nine[position]['vl'] = vl['obp'] + vl['slg']
starting_nine[position]['vr'] = vr['obp'] + vr['slg']
starting_nine['DH']['ops'] = total_ops
player_names.append(x.p_name)
break
else:
pos_group = CardPosition.select().where(
(CardPosition.position == position) & (CardPosition.player << legal_players)
)
backup_group = CardPosition.select().where(
(CardPosition.position == position) & (CardPosition.player << backup_players)
).order_by(CardPosition.innings.desc())
if difficulty_name in ['minor-league', 'gauntlet-3']:
pos_group = pos_group.order_by(CardPosition.innings.desc())
for x in pos_group:
logging.debug(f'checking {x.player.p_name} for {position}')
if x.player.p_name not in player_names and x.player.p_name.lower() != pitcher_name:
logging.debug(f'adding!')
starting_nine[position]['player'] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]['vl'] = vl
starting_nine[position]['vr'] = vr
# starting_nine[position]['vl'] = vl.obp_vl + vl.slg_vl
# starting_nine[position]['vr'] = vr.obp_vr + vr.slg_vr
starting_nine[position]['ops'] = total_ops
player_names.append(x.player.p_name)
break
elif difficulty_name in ['major-league', 'flashback', 'hall-of-fame']:
logging.debug(f'entering difficulty: {difficulty_name}')
eligible_cards = get_scouting_dfs(legal_players, position)
logging.debug(f'got dataframe:\n{eligible_cards}')
# if position == '1B':
# return Response(content=eligible_cards.to_csv(index=False), media_type='text/csv')
def rank_cards(df_data):
if position in ['C', 'SS', '2B', 'CF']:
r_mult = 0.05
e_mult = -0.01
else:
r_mult = 0.025
e_mult = -0.005
r_mod = (3 - df_data[f'Range {position}']) * r_mult
e_mod = df_data[f'Error {position}'] * e_mult
logging.debug(f'{df_data.player_name} total OPS: {df_data.total_OPS} / '
f'final OPS: {df_data.total_OPS + r_mod + e_mod}')
return df_data['total_OPS'] + r_mod + e_mod
if len(eligible_cards.index) >= 1:
eligible_cards['final_ops'] = eligible_cards.apply(rank_cards, axis=1)
logging.debug(f'final_ops:\n{eligible_cards["final_ops"]}')
eligible_cards.sort_values(by=['final_ops'], ascending=False, inplace=True)
this_row = None
for x in range(len(eligible_cards.index)):
if eligible_cards.iloc[x].player_name not in player_names:
this_row = eligible_cards.iloc[x]
break
if this_row is not None:
starting_nine[position]['player'] = model_to_dict(Player.get_by_id(this_row.player_id))
starting_nine[position]['vl'] = this_row.obp_vl + this_row.slg_vl
starting_nine[position]['vr'] = this_row.obp_vr + this_row.slg_vr
starting_nine[position]['ops'] = this_row.total_OPS
player_names.append(this_row.player_name)
logging.debug(f'pos_group: {pos_group}\n{starting_nine}\n{player_names}\n\n')
if starting_nine[position]['player'] is None:
for x in backup_group:
logging.info(f'checking {x.player.p_name} for {position}')
if x.player.p_name not in player_names and x.player.p_name.lower() != pitcher_name:
logging.debug(f'adding!')
starting_nine[position]['player'] = model_to_dict(x.player)
vl, vr, total_ops = get_bratings(x.player.player_id)
starting_nine[position]['vl'] = vl['obp'] + vl['slg']
starting_nine[position]['vr'] = vr['obp'] + vr['slg']
starting_nine[position]['ops'] = total_ops
player_names.append(x.player.p_name)
break
# all_bcards = BattingCard.select().where(BattingCard.player << starting_nine.values())
# all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_bcards)
#
# vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
# vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
#
# vl_vals = [model_to_dict(x) for x in vl_query]
# for x in vl_vals:
# x.update(x['battingcard'])
# x['player_id'] = x['battingcard']['player']['player_id']
# x['player_name'] = x['battingcard']['player']['p_name']
# x['rarity'] = x['battingcard']['player']['rarity']['name']
# x['cardset_id'] = x['battingcard']['player']['cardset']['id']
# x['cardset_name'] = x['battingcard']['player']['cardset']['name']
# del x['player']
#
# vr_vals = [model_to_dict(x) for x in vr_query]
# for x in vr_vals:
# x['player_id'] = x['battingcard']['player']['player_id']
# del x['battingcard']
#
# vl = pd.DataFrame(vl_vals)
# vr = pd.DataFrame(vr_vals)
# db.close()
#
# output = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr'))
#
# def get_total_ops(df_data):
# ops_vl = df_data['obp_vL'] + df_data['slg_vL']
# ops_vr = df_data['obp_vR'] + df_data['slg_vR']
# return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
# output['total_OPS'] = output.apply(get_total_ops, axis=1)
# output = output.sort_values(by=['total_OPS'], ascending=False)
sorted_nine = sorted(starting_nine.items(), key=lambda item: item[1]['ops'], reverse=True)
return {
'json': dict(sorted_nine),
'array': sorted_nine
}
def sort_pitchers(pitching_card_query) -> DataFrame | None:
all_s = [model_to_dict(x, recurse=False) for x in pitching_card_query]
if len(all_s) == 0:
logging.error(f'Empty pitching_card_query: {pitching_card_query}')
return None
pitcher_df = pd.DataFrame(all_s).set_index('player', drop=False)
logging.debug(f'pitcher_df: {pitcher_df}')
def get_total_ops(df_data):
vlval = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard_id == df_data['id'], PitchingCardRatings.vs_hand == 'L')
vrval = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard_id == df_data['id'], PitchingCardRatings.vs_hand == 'R')
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
pitcher_df['total_ops'] = pitcher_df.apply(get_total_ops, axis=1)
return pitcher_df.sort_values(by='total_ops')
@router.get('/{team_id}/sp/{difficulty_name}')
async def get_team_sp(team_id: int, difficulty_name: str, sp_rank: int):
logging.info(f'get_team_sp - team_id: {team_id} / difficulty_name: {difficulty_name} / sp_rank: {sp_rank}')
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
db.close()
raise HTTPException(status_code=404, detail=f'Team id {team_id} not found')
if difficulty_name not in CARDSETS.keys():
db.close()
raise HTTPException(status_code=400, detail=f'Difficulty name {difficulty_name} not a valid check')
all_players = Player.select().where(Player.mlbclub == this_team.lname)
legal_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['primary'])
if 'secondary' in CARDSETS[difficulty_name]:
backup_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['secondary'])
else:
backup_players = None
def sort_starters(starter_query) -> DataFrame | None:
all_s = [model_to_dict(x, recurse=False) for x in starter_query]
if len(all_s) == 0:
logging.error(f'Empty starter_query: {starter_query}')
return None
starter_df = pd.DataFrame(all_s).set_index('player', drop=False)
logging.debug(f'starter_df: {starter_df}')
def get_total_ops(df_data):
vlval = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard_id == df_data['id'], PitchingCardRatings.vs_hand == 'L')
vrval = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard_id == df_data['id'], PitchingCardRatings.vs_hand == 'R')
ops_vl = vlval.obp + vlval.slg
ops_vr = vrval.obp + vrval.slg
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
starter_df['total_ops'] = starter_df.apply(get_total_ops, axis=1)
return starter_df.sort_values(by='total_ops')
# Find SP in primary cardsets
s_query = PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.starter_rating >= 4)
)
all_starters = sort_starters(s_query)
logging.debug(f'sorted: {all_starters}')
if all_starters is not None and len(all_starters.index) >= sp_rank:
this_player_id = all_starters.iloc[sp_rank - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
if all_starters is not None and len(all_starters.index) > 0:
this_player_id = all_starters.iloc[len(all_starters.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
# Include backup cardsets
s_query = PitchingCard.select().where(
(PitchingCard.player << backup_players) & (PitchingCard.starter_rating >= 4)
)
all_starters = sort_starters(s_query)
logging.debug(f'sorted: {all_starters}')
if all_starters is not None and len(all_starters.index) >= sp_rank:
this_player_id = all_starters.iloc[sp_rank - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
if all_starters is not None and len(all_starters.index) > 0:
this_player_id = all_starters.iloc[len(all_starters.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
raise HTTPException(status_code=500, detail=f'No SP #{sp_rank} found for Team {team_id}')
@router.get('/{team_id}/rp/{difficulty_name}')
async def get_team_rp(
team_id: int, difficulty_name: str, need: Literal['length', 'setup', 'closer', 'middle'],
used_pitcher_ids: list = Query(default=[])):
logging.info(f'get_team_rp - team_id: {team_id} / difficulty_name: {difficulty_name} / need: {need} '
f'/ used_pitcher_ids: {used_pitcher_ids}')
this_team = Team.get_or_none(Team.id == team_id)
if this_team is None:
db.close()
raise HTTPException(status_code=404, detail=f'Team id {team_id} not found')
if difficulty_name not in CARDSETS.keys():
db.close()
raise HTTPException(status_code=400, detail=f'Difficulty name {difficulty_name} not a valid check')
all_players = Player.select().where(
(Player.mlbclub == this_team.lname) & (Player.player_id.not_in(used_pitcher_ids))
)
legal_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['primary'])
if 'secondary' in CARDSETS[difficulty_name]:
backup_players = all_players.where(Player.cardset_id << CARDSETS[difficulty_name]['secondary'])
else:
backup_players = None
if need == 'closer':
for query in [PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.closer_rating >= 3) &
(PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.closer_rating >= 1) &
(PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.closer_rating >= 3) &
(PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.closer_rating >= 1) &
(PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4))
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None:
this_player_id = all_relievers.iloc[0].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
elif need == 'setup':
for query in [PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4))
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None and len(all_relievers.index) >= 2:
this_player_id = all_relievers.iloc[1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
elif need == 'length' or len(used_pitcher_ids) > 4:
for query in [PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.relief_rating >= 3) &
(PitchingCard.starter_rating < 4)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.relief_rating >= 2) &
(PitchingCard.starter_rating < 4)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.relief_rating >= 2) &
(PitchingCard.starter_rating < 4))
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None:
this_player_id = all_relievers.iloc[0].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
elif need == 'middle':
for query in [PitchingCard.select().join(Player).where(
(PitchingCard.player << legal_players) & (PitchingCard.starter_rating == 1)),
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) & (PitchingCard.starter_rating < 4))
]:
all_relievers = sort_pitchers(query)
if all_relievers is not None and len(all_relievers.index) >= 3:
this_player_id = all_relievers.iloc[2].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
logging.info(f'Falling to last chance pitcher')
all_relievers = sort_pitchers(
PitchingCard.select().join(Player).where(
(PitchingCard.player << backup_players) | (PitchingCard.player << legal_players)
)
)
if all_relievers is not None:
this_player_id = all_relievers.iloc[len(all_relievers.index) - 1].player
this_player = model_to_dict(Player.get_by_id(this_player_id), recurse=False)
db.close()
return this_player
@router.get('/{team_id}/season-record/{season}')
async def get_team_record(team_id: int, season: int):
all_games = StratGame.select().where(
((StratGame.away_team_id == team_id) | (StratGame.home_team_id == team_id)) & (StratGame.season == season)
)
template = {
'ARI': [0, 0, 0], 'ATL': [0, 0, 0], 'BAL': [0, 0, 0], 'BOS': [0, 0, 0], 'CHC': [0, 0, 0], 'CHW': [0, 0, 0],
'CIN': [0, 0, 0], 'CLE': [0, 0, 0], 'COL': [0, 0, 0], 'DET': [0, 0, 0],
'NYY': [0, 0, 0], 'TBR': [0, 0, 0], 'TOR': [0, 0, 0], 'PHI': [0, 0, 0], 'MIA': [0, 0, 0], 'NYM': [0, 0, 0],
'WSN': [0, 0, 0], 'MIN': [0, 0, 0], 'KCR': [0, 0, 0], 'HOU': [0, 0, 0],
'TEX': [0, 0, 0], 'SEA': [0, 0, 0], 'LAA': [0, 0, 0], 'OAK': [0, 0, 0], 'MIL': [0, 0, 0], 'PIT': [0, 0, 0],
'STL': [0, 0, 0], 'LAD': [0, 0, 0], 'SDP': [0, 0, 0], 'SFG': [0, 0, 0]
}
standings = {
'minor-league': copy.deepcopy(template),
'major-league': copy.deepcopy(template),
'hall-of-fame': copy.deepcopy(template),
'flashback': copy.deepcopy(template)
}
for game in all_games:
run_diff = game.away_score - game.home_score
if run_diff > 0: # Away team won
if game.away_team_id == team_id: # Human is away team
standings[game.game_type][game.home_team.abbrev][0] += 1
standings[game.game_type][game.home_team.abbrev][2] += run_diff
else: # Human is home team
standings[game.game_type][game.away_team.abbrev][1] += 1
standings[game.game_type][game.away_team.abbrev][2] -= run_diff
elif run_diff < 0: # Home team won
if game.away_team_id == team_id: # Human is away team
standings[game.game_type][game.home_team.abbrev][1] += 1
standings[game.game_type][game.home_team.abbrev][2] -= run_diff
else: # Human is home team
standings[game.game_type][game.away_team.abbrev][0] += 1
standings[game.game_type][game.away_team.abbrev][2] -= run_diff
# for lg_query in [minor_games, major_games, hof_games]:
# this_lg = copy.deepcopy(template)
# for x in range(1, 30):
# team_games = lg_query.where((StratGame.away_team_id == x) | (StratGame.home_team_id == x))
# for game in team_games:
db.close()
return standings
@router.get('/{team_id}/buy/players')
async def team_buy_players(team_id: int, ids: str, ts: str):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
raise HTTPException(
status_code=401,
detail=f'You are not authorized to buy {this_team.abbrev} cards. This event has been logged.'
)
last_card = Card.select(Card.id).order_by(-Card.id).limit(1)
lc_id = last_card[0].id
all_ids = ids.split(',')
conf_message = ''
total_cost = 0
for player_id in all_ids:
if player_id != '':
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id} /// '
f'{conf_message} purchased')
# check wallet balance
if this_team.wallet < this_player.cost:
logging.error(f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but '
f'{this_player} costs {this_player.cost}₼.')
db.close()
raise HTTPException(
200,
detail=f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but '
f'{this_player} costs {this_player.cost}₼. /// {conf_message} purchased'
)
# Create player card and update cost
buy_price = this_player.cost
total_cost += buy_price
this_card = Card(
player_id=this_player.player_id,
team_id=this_team.id,
value=buy_price
)
Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id)
this_card.save()
this_player.change_on_buy()
# Deduct card cost from team
logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}')
this_team.wallet -= buy_price
this_team.save()
logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}')
# Post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=int_timestamp(datetime.now()),
title=f'Price Change',
desc='Modified by buying and selling',
field_name=f'{this_player.description} '
f'{this_player.p_name if this_player.p_name not in this_player.description else ""}',
message=f'From {buy_price}₼ 📈 to **{this_player.cost}**₼',
about=f'Player-{this_player.player_id}'
)
new_notif.save()
conf_message += f'{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \
f'({this_player.cardset.name}), '
# sheets.post_new_cards(SHEETS_AUTH, lc_id)
raise HTTPException(status_code=200, detail=f'{conf_message} purchased. /// Total Cost: {total_cost}₼ /// '
f'Final Wallet: {this_team.wallet}')
@router.get('/{team_id}/buy/pack/{packtype_id}')
async def team_buy_packs(team_id: int, packtype_id: int, ts: str, quantity: Optional[int] = 1):
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack type found with id {packtype_id}')
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
logging.warning(f'team: {this_team} / pack_type: {this_packtype} / secret: {ts} / '
f'actual: {this_team.team_hash()}')
raise HTTPException(
status_code=401,
detail=f'You are not authorized to buy {this_team.abbrev} packs. This event has been logged.'
)
# check wallet balance
total_cost = this_packtype.cost * quantity
if this_team.wallet < total_cost:
db.close()
raise HTTPException(
200,
detail=f'{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but '
f'{this_packtype} costs {this_packtype.cost}.'
)
all_packs = []
for i in range(quantity):
all_packs.append(Pack(team_id=this_team.id, pack_type_id=this_packtype.id))
# Deduct card cost from team
logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}')
this_team.wallet -= total_cost
this_team.save()
logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}')
with db.atomic():
Pack.bulk_create(all_packs, batch_size=15)
db.close()
raise HTTPException(
status_code=200,
detail=f'Quantity {quantity} {this_packtype.name} pack{"s" if quantity > 1 else ""} have been purchased by '
f'{this_team.lname} for {total_cost} bucks. You may close this window.'
)
@router.get('/{team_id}/sell/cards')
async def team_sell_cards(team_id: int, ids: str, ts: str):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
raise HTTPException(
status_code=401,
detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.'
)
all_ids = ids.split(',')
del_ids = []
conf_message = ''
total_cost = 0
for card_id in all_ids:
if card_id != '':
try:
this_card = Card.get_by_id(card_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No card found with id {card_id}')
del_ids.append(card_id)
this_player = this_card.player
if this_card.team != this_team:
raise HTTPException(status_code=401,
detail=f'Card id {card_id} ({this_player.p_name}) belongs to '
f'{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold')
orig_price = this_player.cost
sell_price = round(this_player.cost * .5)
total_cost += sell_price
# credit selling team's wallet
if this_team.wallet is None:
this_team.wallet = sell_price
else:
this_team.wallet += sell_price
this_team.save()
# decrease price of player
this_player.change_on_sell()
this_card.delete_instance()
# post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=int_timestamp(datetime.now()),
title=f'Price Change',
desc='Modified by buying and selling',
field_name=f'{this_player.description} '
f'{this_player.p_name if this_player.p_name not in this_player.description else ""}',
message=f'From {orig_price}₼ 📉 to **{this_player.cost}**₼',
about=f'Player-{this_player.id}'
)
new_notif.save()
conf_message += f'{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \
f'({this_player.cardset.name}), '
# sheets.post_deletion(SHEETS_AUTH, del_ids)
raise HTTPException(status_code=200, detail=f'{conf_message} sold. /// Total Earned: {total_cost}₼ /// '
f'Final Wallet: {this_team.wallet}')
@router.get('/{team_id}/cards')
async def get_team_cards(team_id, csv: Optional[bool] = True):
"""
CSV output specifically targeting team roster sheet
Parameters
----------
team_id
csv
"""
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if not csv:
db.close()
raise HTTPException(
status_code=400,
detail='The /teams/{team_id}/cards endpoint only supports csv output.'
)
all_cards = (Card
.select()
.join(Player)
.join(Rarity)
.where(Card.team == this_team)
.order_by(-Card.player.rarity.value, Card.player.p_name)
)
if all_cards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No cards found')
card_vals = [model_to_dict(x) for x in all_cards]
db.close()
for x in card_vals:
x.update(x['player'])
x['player_id'] = x['player']['player_id']
x['player_name'] = x['player']['p_name']
x['cardset_id'] = x['player']['cardset']['id']
x['cardset_name'] = x['player']['cardset']['name']
x['rarity'] = x['player']['rarity']['name']
x['card_id'] = x['id']
card_df = pd.DataFrame(card_vals)
output = card_df[[
'cardset_name', 'player_name', 'rarity', 'image', 'image2', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5',
'pos_6', 'pos_7', 'pos_8', 'cost', 'mlbclub', 'franchise', 'fangr_id', 'bbref_id', 'player_id', 'card_id']]
return Response(content=pd.DataFrame(output).to_csv(index=False), media_type='text/csv')
@router.post('')
async def post_team(team: TeamModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post teams. This event has been logged.'
)
dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev)
if dupe_team:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a season {team.season} team using {team.abbrev}')
this_team = Team(
abbrev=team.abbrev,
sname=team.sname,
lname=team.lname,
gmid=team.gmid,
gmname=team.gmname,
wallet=team.wallet,
gsheet=team.gsheet,
team_value=team.team_value,
collection_value=team.collection_value,
logo=team.logo,
color=team.color,
ranking=team.ranking,
season=team.season,
career=team.ps_shiny,
has_guide=team.has_guide,
is_ai=team.is_ai
)
saved = this_team.save()
if saved == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@router.post('/new-season/{new_season}')
async def team_season_update(new_season: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post teams. This event has been logged.'
)
r_query = Team.update(ranking=1000, season=new_season, wallet=Team.wallet + 250).execute()
current = Current.latest()
current.season = new_season
current.save()
db.close()
return {'detail': f'Team rankings, season, and wallet updated for season {new_season}'}
@router.post('/{team_id}/money/{delta}')
async def team_update_money(team_id: int, delta: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to adjust wallets. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
this_team.wallet += delta
if this_team.save() == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@router.patch('/{team_id}')
async def patch_team(
team_id, sname: Optional[str] = None, lname: Optional[str] = None, gmid: Optional[int] = None,
gmname: Optional[str] = None, gsheet: Optional[str] = None, team_value: Optional[int] = None,
collection_value: Optional[int] = None, logo: Optional[str] = None, color: Optional[str] = None,
season: Optional[int] = None, ps_shiny: Optional[int] = None, wallet_delta: Optional[int] = None,
has_guide: Optional[bool] = None, is_ai: Optional[bool] = None, ranking: Optional[int] = None,
token: str = Depends(oauth2_scheme), abbrev: Optional[str] = None):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete teams. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if abbrev is not None:
this_team.abbrev = abbrev
if sname is not None:
this_team.sname = sname
if lname is not None:
this_team.lname = lname
if gmid is not None:
this_team.gmid = gmid
if gmname is not None:
this_team.gmname = gmname
if gsheet is not None:
this_team.gsheet = gsheet
if team_value is not None:
this_team.team_value = team_value
if collection_value is not None:
this_team.collection_value = collection_value
if logo is not None:
this_team.logo = logo
if color is not None:
this_team.color = color
if season is not None:
this_team.season = season
if ps_shiny is not None:
this_team.career = ps_shiny
if ranking is not None:
this_team.ranking = ranking
if wallet_delta is not None:
this_team.wallet += wallet_delta
if has_guide is not None:
if has_guide:
this_team.has_guide = 1
else:
this_team.has_guide = 0
if is_ai is not None:
if is_ai:
this_team.is_ai = 1
else:
this_team.is_ai = 0
if this_team.save() == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@router.delete('/{team_id}')
async def delete_team(team_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete teams. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
count = this_team.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Team {team_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Team {team_id} was not deleted')