import asyncio import datetime import logging import random import requests import pydantic from typing import Optional from peewee import * from playhouse.shortcuts import model_to_dict from dataclasses import dataclass from helpers import SBA_SEASON, PD_SEASON, get_player_url, get_sheets from db_calls import db_get from in_game.data_cache import get_pd_player, CardPosition, BattingCard, get_pd_team db = SqliteDatabase( 'storage/gameplay.db', pragmas={ 'journal_mode': 'wal', 'cache_size': -1 * 64000, 'synchronous': 0 } ) SBA_DB_URL = 'http://database/api' def param_char(other_params): if other_params: return '&' else: return '?' def get_sba_team(id_or_abbrev, season=None): req_url = f'{SBA_DB_URL}/v1/teams/{id_or_abbrev}' if season: req_url += f'?season={season}' resp = requests.get(req_url, timeout=3) if resp.status_code == 200: return resp.json() else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') def get_sba_player(id_or_name, season=None): req_url = f'{SBA_DB_URL}/v2/players/{id_or_name}' if season is not None: req_url += f'?season={season}' resp = requests.get(req_url, timeout=3) if resp.status_code == 200: return resp.json() else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') def get_sba_team_by_owner(season, owner_id): resp = requests.get(f'{SBA_DB_URL}/v1/teams?season={season}&owner_id={owner_id}&active_only=True', timeout=3) if resp.status_code == 200: full_resp = resp.json() if len(full_resp['teams']) != 1: raise ValueError(f'One team requested, but {len(full_resp)} were returned') else: return full_resp['teams'][0] else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') # def pd_await db_get(endpoint: str, api_ver: int = 1, object_id: int = None, params: list = None, none_okay: bool = True): # req_url = pd_get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) # logging.info(f'get:\n{endpoint} id: {object_id} params: {params}') # # resp = requests.get(req_url, timeout=3) # if resp.status_code == 200: # data = resp.json() # logging.info(f'return: {data}') # return data # elif none_okay: # data = resp.json() # logging.info(f'return: {data}') # return None # else: # logging.warning(resp.text) # raise ValueError(f'DB: {resp.text}') # def pd_get_one_team(team_abbrev: str): # team = pd_await db_get('teams', params=[('abbrev', team_abbrev), ('season', PD_SEASON)], none_okay=False)['teams'][0] # return team # # # req_url = pd_get_req_url('teams', ) # # # # resp = requests.get(req_url, timeout=3) # # if resp.status_code == 200: # # data = resp.json() # # logging.info(f'return: {data}') # # return data['teams'][0] # # else: # # logging.warning(resp.text) # # raise ValueError(f'PD DB: {resp.text}') # def pd_get_card_by_id(card_id: int): # return pd_await db_get('cards', object_id=card_id, none_okay=False) class DecisionModel(pydantic.BaseModel): game_id: int season: int week: int pitcher_id: int pitcher_team_id: int win: int = 0 loss: int = 0 hold: int = 0 is_save: int = 0 is_start: bool = False b_save: int = 0 irunners: int = 0 irunners_scored: int = 0 rest_ip: float = 0 rest_required: int = 0 class BaseModel(Model): class Meta: database = db class ManagerAi(BaseModel): name = CharField() steal = IntegerField(default=5) running = IntegerField(default=5) hold = IntegerField(default=5) catcher_throw = IntegerField(default=5) uncapped_home = IntegerField(default=5) uncapped_third = IntegerField(default=5) uncapped_trail = IntegerField(default=5) bullpen_matchup = IntegerField(default=5) behind_aggression = IntegerField(default=5) ahead_aggression = IntegerField(default=5) decide_throw = IntegerField(default=5) def load_ai(): all_ai = [ {'name': 'Basic'} ] for x in all_ai: ManagerAi.create(**x) return True def ai_batting(this_game, this_play) -> bool: return (this_play.inning_half == 'Top' and this_game.ai_team == 'away') or \ (this_play.inning_half == 'Bot' and this_game.ai_team == 'home') db.create_tables([ManagerAi]) class Game(BaseModel): away_team_id = IntegerField() home_team_id = IntegerField() week_num = IntegerField(null=True) game_num = IntegerField(null=True) channel_id = IntegerField() season = IntegerField() active = BooleanField(default=True) is_pd = BooleanField(default=False) ranked = BooleanField(default=True) short_game = BooleanField(default=False) away_roster_num = IntegerField(null=True) home_roster_num = IntegerField(null=True) first_message = IntegerField(null=True) ai_team = CharField(null=True) game_type = CharField(default='minor-league') cardset_ids = CharField(null=True) backup_cardset_ids = CharField(null=True) # TODO: add get_away_team and get_home_team that deals with SBa/PD and returns Team object @dataclass class StratGame: id: int away_team_id: int home_team_id: int channel_id: int season: int active: bool = True is_pd: bool = False ranked: bool = True short_game: bool = False week_num: int = None game_num: int = None away_roster_num: int = None home_roster_num: int = None first_message: int = None ai_team: str = None game_type: str = None cardset_ids: str = None backup_cardset_ids: str = None db.create_tables([Game]) # class GameModel(pydantic.BaseModel): # away_team_id: int # home_team_id: int # week_num: Optional[int] = None # game_num: Optional[int] = None # channel_id: int # active: Optional[bool] = True # is_pd: Optional[bool] = True def count_team_games(team_id: int, active: bool = True): all_games = Game.select().where( ((Game.away_team_id == team_id) | (Game.home_team_id == team_id)) & (Game.active == active) ) return {'count': all_games.count(), 'games': [model_to_dict(x) for x in all_games]} def post_game(game_dict: dict): # game_model = GameModel.parse_obj(game_dict) # new_game = Game( # away_team_id=game_model.away_team_id, # home_team_id=game_model.home_team_id, # week_num=game_model.week_num, # game_num=game_model.game_num, # channel_id=game_model.channel_id, # active=game_model.active, # is_pd=game_model.is_pd # ) # new_game.save() new_game = Game.create(**game_dict) # return_game = model_to_dict(new_game) return_game = StratGame( new_game.id, new_game.away_team_id, new_game.home_team_id, new_game.channel_id, new_game.season, new_game.active, new_game.is_pd, new_game.ranked, new_game.short_game, new_game.week_num, new_game.game_num, new_game.away_roster_num, new_game.home_roster_num, new_game.first_message, new_game.ai_team, new_game.game_type, new_game.cardset_ids, new_game.backup_cardset_ids ) db.close() return return_game def get_one_game(game_id=None, away_team_id=None, home_team_id=None, week_num=None, game_num=None, channel_id=None, active=None) -> Optional[StratGame]: single_game, this_game = None, None if game_id is not None: single_game = Game.get_by_id(game_id) else: this_game = Game.select() if away_team_id is not None: this_game = this_game.where(Game.away_team_id == away_team_id) if home_team_id is not None: this_game = this_game.where(Game.home_team_id == home_team_id) if week_num is not None: this_game = this_game.where(Game.week_num == week_num) if game_num is not None: this_game = this_game.where(Game.game_num == game_num) if channel_id is not None: this_game = this_game.where(Game.channel_id == channel_id) if active is not None: this_game = this_game.where(Game.active == active) if single_game or this_game.count() > 0: # return_game = model_to_dict(this_game[0]) r_game = this_game[0] if this_game else single_game return_game = StratGame(**model_to_dict(r_game)) db.close() return return_game else: return None async def get_game_team( game: StratGame, gm_id: int = None, team_abbrev: str = None, team_id: int = None, skip_cache: bool = False) -> dict: if not gm_id and not team_abbrev and not team_id: raise KeyError(f'get_game_team requires either one of gm_id, team_abbrev, or team_id to not be None') logging.debug(f'getting game team for game {game.id} / gm_id: {gm_id} / ' f'tm_abbrev: {team_abbrev} / team_id: {team_id} / game: {game}') if game.is_pd: if team_id: return await get_pd_team(team_id, skip_cache=skip_cache) elif gm_id: return await get_pd_team(gm_id, skip_cache=skip_cache) # t_query = await db_get('teams', params=[('season', PD_SEASON), ('gm_id', gm_id)]) # return t_query['teams'][0] else: t_query = await db_get('teams', params=[('season', PD_SEASON), ('abbrev', team_abbrev)]) return t_query['teams'][0] else: if gm_id: return get_sba_team_by_owner(season=SBA_SEASON, owner_id=gm_id) elif team_id: return get_sba_team(team_id) else: return get_sba_team(team_abbrev, season=SBA_SEASON) def patch_game( game_id, away_team_id=None, home_team_id=None, week_num=None, game_num=None, channel_id=None, active=None, first_message=None, home_roster_num=None, away_roster_num=None, ai_team=None, cardset_ids=None, backup_cardset_ids=None): this_game = Game.get_by_id(game_id) if away_team_id is not None: this_game.away_team_id = away_team_id if home_team_id is not None: this_game.home_team_id = home_team_id if week_num is not None: this_game.week_num = week_num if game_num is not None: this_game.game_num = game_num if channel_id is not None: this_game.channel_id = channel_id if active is not None: this_game.active = active if first_message is not None: this_game.first_message = first_message if home_roster_num is not None: this_game.home_roster_num = home_roster_num if away_roster_num is not None: this_game.away_roster_num = away_roster_num if ai_team is not None: this_game.ai_team = ai_team if cardset_ids is not None: this_game.cardset_ids = cardset_ids if backup_cardset_ids is not None: this_game.backup_cardset_ids = backup_cardset_ids this_game.save() # return_game = model_to_dict(this_game) return_game = StratGame( this_game.id, this_game.away_team_id, this_game.home_team_id, this_game.channel_id, this_game.season, this_game.active, this_game.is_pd, this_game.ranked, this_game.short_game, this_game.week_num, this_game.game_num, this_game.away_roster_num, this_game.home_roster_num, this_game.first_message, this_game.ai_team, this_game.game_type, this_game.cardset_ids, this_game.backup_cardset_ids ) db.close() return return_game def get_last_game_ids(num_games: int = 10) -> list: last_games = Game.select(Game.id).order_by(-Game.id).limit(num_games) game_list = [x.id for x in last_games] return game_list def get_active_games(limit: int = 10) -> list: g_query = Game.select().where(Game.active) if limit < 0: limit = 1 g_query = g_query.limit(limit) return [x for x in g_query] class Lineup(BaseModel): game = ForeignKeyField(Game) team_id = IntegerField() player_id = IntegerField() card_id = IntegerField(null=True) # Only needed for Paper Dynasty games position = CharField() batting_order = IntegerField() after_play = IntegerField() replacing_id = IntegerField(null=True) active = BooleanField(default=True) variant = IntegerField(default=0) is_fatigued = BooleanField(null=True) @dataclass class StratLineup: id: int game: StratGame team_id: int player_id: int position: str batting_order: int after_play: int replacing_id: int = None active: bool = True card_id: int = None variant: int = 0 is_fatigued: bool = None def convert_stratlineup(lineup: Lineup) -> StratLineup: lineup_dict = model_to_dict(lineup) lineup_dict['game'] = StratGame(**lineup_dict['game']) return StratLineup(**lineup_dict) db.create_tables([Lineup]) def get_one_lineup( game_id: int, lineup_id: int = None, team_id: int = None, batting_order: int = None, position: str = None, card_id: int = None, active: bool = True, as_obj: bool = False) -> Optional[StratLineup]: if not batting_order and not position and not lineup_id and not card_id: raise KeyError(f'One of batting_order, position, card_id , or lineup_id must not be None') if lineup_id: this_lineup = Lineup.get_by_id(lineup_id) elif card_id: this_lineup = Lineup.get_or_none( Lineup.game_id == game_id, Lineup.card_id == card_id, Lineup.active == active ) elif batting_order: this_lineup = Lineup.get_or_none( Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.batting_order == batting_order, Lineup.active == active ) else: this_lineup = Lineup.get_or_none( Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.position == position, Lineup.active == active ) logging.debug(f'get_one_lineup / this_lineup: {this_lineup}') if as_obj: return this_lineup if not this_lineup: return None # return_value = model_to_dict(this_lineup) # return_value = StratLineup(**vars(this_lineup)["__data__"]) return_value = convert_stratlineup(this_lineup) # return_value = StratLineup( # game=StratGame(**model_to_dict(this_lineup.game)), # team_id=this_lineup.team_id, # player_id=this_lineup.player_id, # position=this_lineup.position, # batting_order=this_lineup.batting_order, # after_play=this_lineup.after_play, # active= # ) db.close() return return_value async def get_team_lineups( game_id: int, team_id: int, inc_inactive: bool = False, pitchers_only: bool = False, as_string: bool = True): all_lineups = Lineup.select().where( (Lineup.game_id == game_id) & (Lineup.team_id == team_id) ).order_by(Lineup.batting_order) if not inc_inactive: all_lineups = all_lineups.where(Lineup.active == True) if pitchers_only: all_lineups = all_lineups.where(Lineup.position == 'P') # if all_lineups.count() == 0: # return None if as_string: l_string = '' this_game = Game.get_by_id(game_id) for x in all_lineups: l_string += f'{x.batting_order}. {player_link(this_game, await get_player(this_game, x))} - {x.position}\n' db.close() return l_string return_lineups = [] for x in all_lineups: # return_lineups.append(model_to_dict(x)) return_lineups.append(convert_stratlineup(x)) db.close() return return_lineups def post_lineups(lineups: list): with db.atomic(): Lineup.insert_many(lineups).execute() db.close() def patch_lineup(lineup_id, active: Optional[bool] = None, position: Optional[str] = None, replacing_id: Optional[int] = None, is_fatigued: Optional[bool] = None) -> StratLineup: this_lineup = Lineup.get_by_id(lineup_id) if active is not None: this_lineup.active = active if position is not None: this_lineup.position = position if replacing_id is not None: this_lineup.replacing_id = replacing_id if is_fatigued is not None: this_lineup.is_fatigued = is_fatigued this_lineup.save() # return_lineup = model_to_dict(this_lineup) return_value = convert_stratlineup(this_lineup) db.close() return return_value def make_sub(lineup_dict: dict): # Check for dupe player / card this_game = Game.get_by_id(lineup_dict['game_id']) if this_game.is_pd: player_conflict = Lineup.select().where( (Lineup.game_id == lineup_dict['game_id']) & (Lineup.team_id == lineup_dict['team_id']) & (Lineup.card_id == lineup_dict['card_id']) ) db_error = f'This card is already in the lineup' else: player_conflict = Lineup.select().where( (Lineup.game_id == lineup_dict['game_id']) & (Lineup.team_id == lineup_dict['team_id']) & (Lineup.player_id == lineup_dict['player_id']) ) db_error = f'This player is already in the lineup' if player_conflict.count(): db.close() raise DatabaseError(db_error) subbed_player = Lineup.get_or_none( Lineup.game_id == lineup_dict['game_id'], Lineup.team_id == lineup_dict['team_id'], Lineup.batting_order == lineup_dict['batting_order'], Lineup.active == True ) logging.debug(f'subbed_player: {subbed_player}') if subbed_player: subbed_player = patch_lineup(subbed_player.id, active=False) lineup_dict['replacing_id'] = subbed_player.id new_lineup = Lineup.create(**lineup_dict) # return_lineup = model_to_dict(new_lineup) return_value = convert_stratlineup(new_lineup) curr_play = get_current_play(lineup_dict['game_id']) logging.debug(f'\n\nreturn_value: {return_value}\n\ncurr_play: {curr_play}\n\n') # Check current play for updates if curr_play: if (not curr_play.pitcher and curr_play.batter.team_id != return_value.team_id) or return_value.position == 'P': patch_play(curr_play.id, pitcher_id=return_value.id) if subbed_player: if curr_play.batter == subbed_player: patch_play(curr_play.id, batter_id=return_value.id) elif curr_play.pitcher == subbed_player: patch_play(curr_play.id, pitcher_id=return_value.id) elif curr_play.catcher == subbed_player: patch_play(curr_play.id, catcher_id=return_value.id) elif curr_play.on_first == subbed_player: patch_play(curr_play.id, on_first_id=return_value.id) elif curr_play.on_second == subbed_player: patch_play(curr_play.id, on_second_id=return_value.id) elif curr_play.on_third == subbed_player: patch_play(curr_play.id, on_third_id=return_value.id) db.close() return return_value def undo_subs(game: StratGame, new_play_num: int): logging.info(f'get new players') new_players = Lineup.select().where((Lineup.game_id == game.id) & (Lineup.after_play > new_play_num)) logging.info(f'new_player count: {new_players.count()}') replacements = [(x.id, x.replacing_id) for x in new_players] for x in replacements: logging.info(f'replacing {x[0]} with {x[1]}') old_player = get_one_lineup(game_id=game.id, lineup_id=x[1]) logging.info(f'old_player: {old_player}') patch_lineup(old_player.id, active=True) logging.info(f'activated!') logging.info(f'done activating old players') Lineup.delete().where((Lineup.game_id == game.id) & (Lineup.after_play > new_play_num)).execute() async def get_player(game, lineup_member, as_dict: bool = True): # await asyncio.sleep(0.1) return await get_pd_player(lineup_member.player_id, as_dict=as_dict) if isinstance(game, Game): if game.is_pd: this_card = await db_get(f'cards', object_id=lineup_member.card_id) player = this_card['player'] player['name'] = player['p_name'] player['team'] = this_card['team'] return player else: return get_sba_player(lineup_member.player_id) elif isinstance(game, StratGame): if game.is_pd: # card_id = lineup_member.card_id if isinstance(lineup_member, Lineup) else lineup_member['card_id'] card_id = lineup_member['card_id'] if isinstance(lineup_member, dict) else lineup_member.card_id this_card = await db_get(f'cards', object_id=card_id) player = this_card['player'] player['name'] = player['p_name'] player['team'] = this_card['team'] logging.debug(f'player: {player}') return player else: return get_sba_player(lineup_member.player_id) else: raise TypeError(f'Cannot get player; game is not a valid object') def player_link(game, player): if isinstance(game, Game): if game.is_pd: return f'[{player["p_name"]}]({player["image"]})' else: return get_player_url(player) elif isinstance(game, StratGame): if game.is_pd: return f'[{player["p_name"]}]({player["image"]})' else: return get_player_url(player) else: raise TypeError(f'Cannot get player link; game is not a valid object') class Play(BaseModel): game = ForeignKeyField(Game) play_num = IntegerField() batter = ForeignKeyField(Lineup) batter_pos = CharField(default='DH') pitcher = ForeignKeyField(Lineup, null=True) on_base_code = IntegerField() inning_half = CharField() inning_num = IntegerField() batting_order = IntegerField() starting_outs = IntegerField() away_score = IntegerField() home_score = IntegerField() in_pow = BooleanField(default=False) on_first = ForeignKeyField(Lineup, null=True) on_first_final = IntegerField(null=True) on_second = ForeignKeyField(Lineup, null=True) on_second_final = IntegerField(null=True) on_third = ForeignKeyField(Lineup, null=True) on_third_final = IntegerField(null=True) batter_final = IntegerField(null=True) pa = IntegerField(default=0) ab = IntegerField(default=0) run = IntegerField(default=0) e_run = IntegerField(default=0) hit = IntegerField(default=0) rbi = IntegerField(default=0) double = IntegerField(default=0) triple = IntegerField(default=0) homerun = IntegerField(default=0) bb = IntegerField(default=0) so = IntegerField(default=0) hbp = IntegerField(default=0) sac = IntegerField(default=0) ibb = IntegerField(default=0) gidp = IntegerField(default=0) bphr = IntegerField(default=0) bpfo = IntegerField(default=0) bp1b = IntegerField(default=0) bplo = IntegerField(default=0) sb = IntegerField(default=0) cs = IntegerField(default=0) outs = IntegerField(default=0) wpa = FloatField(default=0.0) re24 = FloatField(default=0.0) catcher = ForeignKeyField(Lineup, null=True) defender = ForeignKeyField(Lineup, null=True) runner = ForeignKeyField(Lineup, null=True) check_pos = CharField(null=True) error = IntegerField(default=0) wild_pitch = IntegerField(default=0) passed_ball = IntegerField(default=0) pick_off = IntegerField(default=0) balk = IntegerField(default=0) complete = BooleanField(default=False) locked = BooleanField(default=False) is_go_ahead = BooleanField(default=False) is_tied = BooleanField(default=False) is_new_inning = BooleanField(default=False) @dataclass class StratPlay: id: int game: StratGame play_num: int batter: StratLineup pitcher: StratLineup on_base_code: int inning_half: str inning_num: int batting_order: int starting_outs: int away_score: int home_score: int batter_pos: str = None in_pow: bool = False on_first: StratLineup = None on_first_final: int = None on_second: StratLineup = None on_second_final: int = None on_third: StratLineup = None on_third_final: int = None batter_final: int = None pa: int = 0 ab: int = 0 run: int = 0 e_run: int = 0 hit: int = 0 rbi: int = 0 double: int = 0 triple: int = 0 homerun: int = 0 bb: int = 0 so: int = 0 hbp: int = 0 sac: int = 0 ibb: int = 0 gidp: int = 0 bphr: int = 0 bpfo: int = 0 bp1b: int = 0 bplo: int = 0 sb: int = 0 cs: int = 0 outs: int = 0 wpa: float = 0.0 re24: float = 0.0 catcher: StratLineup = None defender: StratLineup = None runner: StratLineup = None check_pos: str = None error: int = 0 wild_pitch: int = 0 passed_ball: int = 0 pick_off: int = 0 balk: int = 0 complete: bool = False locked: bool = False is_go_ahead: bool = False is_tied: bool = False is_new_inning: bool = False def ai_run_diff(self): if self.game.ai_team == 'away': return self.away_score - self.home_score else: return self.home_score - self.away_score def convert_stratplay(play: Play) -> StratPlay: play_dict = model_to_dict(play) play_dict['game'] = StratGame(**play_dict['game']) if play_dict['batter']: play_dict['batter'] = convert_stratlineup(play.batter) if play_dict['pitcher']: play_dict['pitcher'] = convert_stratlineup(play.pitcher) if play_dict['on_first']: play_dict['on_first'] = convert_stratlineup(play.on_first) if play_dict['on_second']: play_dict['on_second'] = convert_stratlineup(play.on_second) if play_dict['on_third']: play_dict['on_third'] = convert_stratlineup(play.on_third) if play_dict['catcher']: play_dict['catcher'] = convert_stratlineup(play.catcher) if play_dict['defender']: play_dict['defender'] = convert_stratlineup(play.defender) if play_dict['runner']: play_dict['runner'] = convert_stratlineup(play.runner) return StratPlay(**play_dict) db.create_tables([Play]) def post_play(play_dict: dict) -> StratPlay: logging.debug(f'play_dict: {play_dict}') new_play = Play.create(**play_dict) # return_play = model_to_dict(new_play) return_play = convert_stratplay(new_play) db.close() return return_play def patch_play( play_id, batter_id: int = None, pitcher_id: int = None, catcher_id: int = None, locked: bool = None, pa: int = None, ab: int = None, hit: int = None, double: int = None, triple: int = None, homerun: int = None, outs: int = None, so: int = None, bp1b: int = None, bplo: int = None, bphr: int = None, bpfo: int = None, walk: int = None, hbp: int = None, ibb: int = None, sac: int = None, sb: int = None, is_go_ahead: bool = None, defender_id: int = None, check_pos: str = None, error: int = None, play_num: int = None, cs: int = None, on_first_id: int = None, on_first_final: int = None, on_second_id: int = None, on_second_final: int = None, on_third_id: int = None, on_third_final: int = None, starting_outs: int = None, runner_id: int = None, complete: bool = None, rbi: int = None, wp: int = None, pb: int = None, pick: int = None, balk: int = None, is_new_inning: bool = None, batter_final: int = None, in_pow: bool = None): this_play = Play.get_by_id(play_id) if batter_id is not None: this_play.batter_id = batter_id if pitcher_id is not None: this_play.pitcher_id = pitcher_id if catcher_id is not None: this_play.catcher_id = catcher_id if runner_id is not None: this_play.runner_id = runner_id if locked is not None: this_play.locked = locked if complete is not None: this_play.complete = complete if pa is not None: this_play.pa = pa if ab is not None: this_play.ab = ab if hit is not None: this_play.hit = hit if rbi is not None: this_play.rbi = rbi if double is not None: this_play.double = double if triple is not None: this_play.triple = triple if homerun is not None: this_play.homerun = homerun if starting_outs is not None: this_play.starting_outs = starting_outs if outs is not None: this_play.outs = outs if so is not None: this_play.so = so if bp1b is not None: this_play.bp1b = bp1b if bplo is not None: this_play.bplo = bplo if bphr is not None: this_play.bphr = bphr if bpfo is not None: this_play.bpfo = bpfo if walk is not None: this_play.bb = walk if hbp is not None: this_play.hbp = hbp if ibb is not None: this_play.ibb = ibb if sac is not None: this_play.sac = sac if sb is not None: this_play.sb = sb if cs is not None: this_play.cs = cs if wp is not None: this_play.wild_pitch = wp if pb is not None: this_play.passed_ball = pb if pick is not None: this_play.pick_off = pick if balk is not None: this_play.balk = balk if defender_id is not None: if not defender_id: this_play.defender_id = None else: this_play.defender_id = defender_id if check_pos is not None: if check_pos.lower() == 'false': this_play.check_pos = None else: this_play.check_pos = check_pos if error is not None: this_play.error = error if play_num is not None: this_play.play_num = play_num if on_first_id is not None: if not on_first_id: this_play.on_first = None else: this_play.on_first_id = on_first_id if on_first_final is not None: if not on_first_final: this_play.on_first_final = None else: this_play.on_first_final = on_first_final if on_second_id is not None: this_play.on_second_id = on_second_id if not on_second_id: this_play.on_second = None else: this_play.on_second_id = on_second_id if on_second_final is not None: if not on_second_final: this_play.on_second_final = None else: this_play.on_second_final = on_second_final if on_third_id is not None: this_play.on_third_id = on_third_id if not on_third_id: this_play.on_third = None else: this_play.on_third_id = on_third_id if on_third_final is not None: if not on_third_final: this_play.on_third_final = None else: this_play.on_third_final = on_third_final if batter_final is not None: if not batter_final: this_play.batter_final = None else: this_play.batter_final = batter_final if is_go_ahead is not None: if not is_go_ahead: this_play.is_go_ahead = 0 else: this_play.is_go_ahead = 1 if is_new_inning is not None: if not is_new_inning: this_play.is_new_inning = 0 else: this_play.is_new_inning = 1 if in_pow is not None: if not in_pow: this_play.in_pow = 0 else: this_play.in_pow = 1 this_play.save() # return_play = model_to_dict(this_play) return_play = convert_stratplay(this_play) db.close() return return_play def get_plays(game_id: int = None, pitcher_id: int = None): all_plays = Play.select().where(Play.game_id == game_id) if pitcher_id is not None: all_plays = all_plays.where(Play.pitcher_id == pitcher_id) return_plays = {'count': all_plays.count(), 'plays': []} for line in all_plays: return_plays['plays'].append(convert_stratplay(line)) db.close() return return_plays def get_play_by_num(game_id, play_num: int): latest_play = Play.get_or_none(Play.game_id == game_id, Play.play_num == play_num) if not latest_play: return None # return_play = model_to_dict(latest_play) return_play = convert_stratplay(latest_play) db.close() return return_play def get_latest_play(game_id): latest_play = Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1) if not latest_play: return None # return_play = model_to_dict(latest_play[0]) return_play = convert_stratplay(latest_play[0]) db.close() return return_play def undo_play(game_id): p_query = Play.delete().where(Play.game_id == game_id).order_by(-Play.id).limit(1) logging.debug(f'p_query: {p_query}') count = p_query.execute() db.close() return count def get_current_play(game_id) -> Optional[StratPlay]: curr_play = Play.get_or_none(Play.game_id == game_id, Play.complete == False) if not curr_play: latest_play = Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1) if not latest_play: return None else: complete_play(latest_play[0].id, batter_to_base=latest_play[0].batter_final) curr_play = Play.get_or_none(Play.game_id == game_id, Play.complete == False) # return_play = model_to_dict(curr_play) return_play = convert_stratplay(curr_play) db.close() return return_play def get_last_inning_end_play(game_id, inning_half, inning_num): this_play = Play.select().where( (Play.game_id == game_id) & (Play.inning_half == inning_half) & (Play.inning_num == inning_num) ).order_by(-Play.id) # return_play = model_to_dict(this_play[0]) if this_play.count() > 0: return_play = convert_stratplay(this_play[0]) else: return_play = None db.close() return return_play def add_run_last_player_ab(lineup: Lineup, is_erun: bool = True): try: last_ab = Play.select().where(Play.batter == lineup).order_by(-Play.id).get() except DoesNotExist as e: logging.error(f'Unable to apply run to Lineup {lineup}') else: last_ab.run = 1 last_ab.e_run = 1 if is_erun else 0 last_ab.save() def get_dbready_plays(game_id: int, db_game_id: int): all_plays = Play.select().where(Play.game_id == game_id) prep_plays = [model_to_dict(x) for x in all_plays] db.close() obc_list = ['000', '001', '010', '100', '011', '101', '110', '111'] for x in prep_plays: x['pitcher_id'] = x['pitcher']['player_id'] x['batter_id'] = x['batter']['player_id'] x['batter_team_id'] = x['batter']['team_id'] x['pitcher_team_id'] = x['pitcher']['team_id'] if x['catcher'] is not None: x['catcher_id'] = x['catcher']['player_id'] x['catcher_team_id'] = x['catcher']['team_id'] if x['defender'] is not None: x['defender_id'] = x['defender']['player_id'] x['defender_team_id'] = x['defender']['team_id'] if x['runner'] is not None: x['runner_id'] = x['runner']['player_id'] x['runner_team_id'] = x['runner']['team_id'] x['game_id'] = db_game_id x['on_base_code'] = obc_list[x['on_base_code']] logging.debug(f'all_plays:\n\n{prep_plays}\n') return prep_plays class Bullpen(BaseModel): ai_team_id = IntegerField() closer_id = IntegerField() setup_id = IntegerField() middle_one_id = IntegerField(null=True) middle_two_id = IntegerField(null=True) middle_three_id = IntegerField(null=True) long_one_id = IntegerField(null=True) long_two_id = IntegerField(null=True) long_three_id = IntegerField(null=True) long_four_id = IntegerField(null=True) last_updated = DateTimeField() db.create_tables([Bullpen]) @dataclass class StratBullpen: id: int ai_team_id: int closer_id: int setup_id: int last_updated: int middle_one_id: Optional[int] = None middle_two_id: Optional[int] = None middle_three_id: Optional[int] = None long_one_id: Optional[int] = None long_two_id: Optional[int] = None long_three_id: Optional[int] = None long_four_id: Optional[int] = None def convert_bullpen_to_strat(bullpen: Bullpen) -> StratBullpen: bullpen_dict = model_to_dict(bullpen) return StratBullpen(**bullpen_dict) def get_or_create_bullpen(ai_team, bot): this_pen = Bullpen.get_or_none(Bullpen.ai_team_id == ai_team['id']) if this_pen: return convert_bullpen_to_strat(this_pen) three_days_ago = int(datetime.datetime.timestamp(datetime.datetime.now() - datetime.timedelta(days=3))) * 1000 logging.debug(f'3da: {three_days_ago} / last_up: {this_pen.last_updated} / L > 3: ' f'{this_pen.last_updated > three_days_ago}') if this_pen and this_pen.last_updated > three_days_ago: return convert_bullpen_to_strat(this_pen) else: this_pen.delete_instance() sheets = get_sheets(bot) this_sheet = sheets.open_by_key(ai_team['gsheet']) r_sheet = this_sheet.worksheet_by_title('My Rosters') bullpen_range = f'N30:N41' raw_cells = r_sheet.range(bullpen_range) logging.debug(f'raw_cells: {raw_cells}') bullpen = Bullpen( ai_team_id=ai_team['id'], closer_id=raw_cells[0][0].value, setup_id=raw_cells[1][0].value, middle_one_id=raw_cells[2][0].value if raw_cells[2][0].value != '' else None, middle_two_id=raw_cells[3][0].value if raw_cells[3][0].value != '' else None, middle_three_id=raw_cells[4][0].value if raw_cells[4][0].value != '' else None, long_one_id=raw_cells[5][0].value if raw_cells[5][0].value != '' else None, long_two_id=raw_cells[6][0].value if raw_cells[6][0].value != '' else None, long_three_id=raw_cells[7][0].value if raw_cells[7][0].value != '' else None, long_four_id=raw_cells[8][0].value if raw_cells[8][0].value != '' else None, last_updated=int(datetime.datetime.timestamp(datetime.datetime.now())*1000) ) bullpen.save() logging.debug(f'bullpen: {bullpen}') return convert_bullpen_to_strat(bullpen) def advance_runners(play_id: int, num_bases: int, is_error: bool = False, only_forced: bool = False): """ Advances runners and tallies RBIs """ this_play = Play.get_by_id(play_id) this_play.rbi = 0 if only_forced: if not this_play.on_first: if this_play.on_second: this_play.on_second_final = 2 if this_play.on_third: this_play.on_third_final = 3 this_play.save() db.close() return if this_play.on_second: if this_play.on_third: if num_bases > 0: this_play.on_third_final = 4 this_play.rbi += 1 if not is_error else 0 if num_bases > 1: this_play.on_second_final = 4 this_play.rbi += 1 if not is_error else 0 elif num_bases == 1: this_play.on_second_final = 3 else: this_play.on_second_final = 2 else: if this_play.on_third: this_play.on_third_final = 3 if num_bases > 2: this_play.on_first_final = 4 this_play.rbi += 1 if not is_error else 0 elif num_bases == 2: this_play.on_first_final = 3 elif num_bases == 1: this_play.on_first_final = 2 else: this_play.on_first_final = 1 else: if this_play.on_third: if num_bases > 0: this_play.on_third_final = 4 this_play.rbi += 1 if not is_error else 0 else: this_play.on_third_final = 3 if this_play.on_second: if num_bases > 1: this_play.on_second_final = 4 this_play.rbi += 1 if not is_error else 0 elif num_bases == 1: this_play.on_second_final = 3 else: this_play.on_second_final = 2 if this_play.on_first: if num_bases > 2: this_play.on_first_final = 4 this_play.rbi += 1 if not is_error else 0 elif num_bases == 2: this_play.on_first_final = 3 elif num_bases == 1: this_play.on_first_final = 2 else: this_play.on_first_final = 1 if num_bases == 4: this_play.batter_final = 4 this_play.rbi += 1 this_play.save() db.close() def advance_one_runner(play_id: int, from_base: int, num_bases: int): """ Advances runner and excludes RBIs """ this_play = Play.get_by_id(play_id) if from_base == 1: if num_bases > 2: this_play.on_first_final = 4 elif num_bases == 2: this_play.on_first_final = 3 elif num_bases == 1: this_play.on_first_final = 2 elif from_base == 2: if num_bases > 1: this_play.on_second_final = 4 elif num_bases == 1: this_play.on_second_final = 3 elif from_base == 3: if num_bases > 0: this_play.on_third_final = 4 # if this_play.on_first and this_play.on_first_final is not None: # this_play.on_first_final = 1 # if this_play.on_second and this_play.on_second_final is not None: # this_play.on_second_final = 2 # if this_play.on_third and this_play.on_third_final is not None: # this_play.on_third_final = 3 this_play.save() db.close() def complete_play(play_id, batter_to_base: int = None): """ Finalizes current play and sets base values for next play """ this_play = Play.get_by_id(play_id) this_play.locked = False this_play.complete = True this_play.save() logging.debug(f'starting the inning calc') new_inning_half = this_play.inning_half new_inning_num = this_play.inning_num if this_play.runner or this_play.wild_pitch or this_play.passed_ball or this_play.pick_off or this_play.balk: new_batting_order = this_play.batting_order else: new_batting_order = this_play.batting_order + 1 if this_play.batting_order < 9 else 1 new_bteam_id = this_play.batter.team_id new_pteam_id = this_play.pitcher.team_id new_starting_outs = this_play.starting_outs + this_play.outs new_on_first = None new_on_second = None new_on_third = None # score_increment = this_play.homerun # patch to handle little league home runs TODO: standardize on just _on_final for these logging.debug(f'complete_play - this_play: {this_play}') if this_play.batter_final == 4 or batter_to_base == 4: this_play.run = 1 score_increment = 1 if not this_play.error: this_play.e_run = 1 else: score_increment = 0 logging.debug(f'complete_play - score_increment: {score_increment}') if this_play.on_first_final == 99: this_play.on_first_final = None elif this_play.on_first_final == 4: score_increment += 1 add_run_last_player_ab(this_play.on_first, this_play.error == 0) if this_play.on_second_final == 99: this_play.on_second_final = None elif this_play.on_second_final == 4: score_increment += 1 add_run_last_player_ab(this_play.on_second, this_play.error == 0) if this_play.on_third_final == 99: this_play.on_third_final = None elif this_play.on_third_final == 4: score_increment += 1 add_run_last_player_ab(this_play.on_third, this_play.error == 0) this_play.save() # for runner in [this_play.on_first_final, this_play.on_second_final, this_play.on_third_final]: # if runner == 4: # score_increment += 1 if this_play.starting_outs + this_play.outs > 2: new_starting_outs = 0 new_obc = 0 if this_play.inning_half == 'Top': new_inning_half = 'Bot' new_bteam_id = this_play.game.home_team_id new_pteam_id = this_play.game.away_team_id else: new_inning_half = 'Top' new_inning_num += 1 new_bteam_id = this_play.game.away_team_id new_pteam_id = this_play.game.home_team_id if new_inning_num > 1: last_inning_play = get_last_inning_end_play(this_play.game.id, new_inning_half, new_inning_num - 1) if last_inning_play.runner or last_inning_play.pick_off: new_batting_order = last_inning_play.batting_order else: new_batting_order = last_inning_play.batting_order + 1 if last_inning_play.batting_order < 9 else 1 else: new_batting_order = 1 # Not an inning-ending play else: logging.debug(f'starting the obc calc') bases_occ = [False, False, False, False] # Set the occupied bases for the next play and lineup member occupying it for runner, base in [ (this_play.on_first, this_play.on_first_final), (this_play.on_second, this_play.on_second_final), (this_play.on_third, this_play.on_third_final) ]: if base: bases_occ[base - 1] = True if base == 1: new_on_first = runner elif base == 2: new_on_second = runner elif base == 3: new_on_third = runner # elif base == 4: # score_increment += 1 # Set the batter-runner occupied base for the next play if batter_to_base: if batter_to_base == 1: bases_occ[0] = True new_on_first = this_play.batter elif batter_to_base == 2: bases_occ[1] = True new_on_second = this_play.batter elif batter_to_base == 3: bases_occ[2] = True new_on_third = this_play.batter # elif batter_to_base == 4: # score_increment += 1 this_play.batter_final = batter_to_base this_play.save() # Set the OBC based on the bases occupied if bases_occ[2]: if bases_occ[1]: if bases_occ[0]: new_obc = 7 else: new_obc = 6 elif bases_occ[0]: new_obc = 5 else: new_obc = 3 elif bases_occ[1]: if bases_occ[0]: new_obc = 4 else: new_obc = 2 elif bases_occ[0]: new_obc = 1 else: new_obc = 0 if this_play.inning_half == 'Top': new_away_score = this_play.away_score + score_increment new_home_score = this_play.home_score else: new_away_score = this_play.away_score new_home_score = this_play.home_score + score_increment # A team score if score_increment: logging.debug(f'complete_play: \n\nscore_increment: {score_increment}\n\nnew home score: {new_home_score}\n\n' f'new_away_score: {new_away_score}\n\nthis_play.away_score: {this_play.away_score}\n\n' f'this_player.home_score: {this_play.home_score}') # Game is now tied if new_home_score == new_away_score: logging.debug(f'\n\nGame {this_play.game} is now tied\n\n') this_play.is_tied = 1 # One team took the lead elif (this_play.away_score <= this_play.home_score) and (new_away_score > new_home_score): logging.debug(f'\n\nTeam {this_play.batter.team_id} took the lead\n\n') this_play.is_go_ahead = 1 this_play.save() elif (this_play.home_score <= this_play.away_score) and (new_home_score > new_away_score): logging.debug(f'\n\nteam {this_play.batter.team_id} took the lead\n\n') this_play.is_go_ahead = 1 this_play.save() # RE24 Calc re_data = { 0: [0.457, 0.231, 0.077], 1: [0.793, 0.438, 0.171], 2: [1.064, 0.596, 0.259], 4: [1.373, 0.772, 0.351], 3: [1.340, 0.874, 0.287], 5: [1.687, 1.042, 0.406], 6: [1.973, 1.311, 0.448], 7: [2.295, 1.440, 0.618] } start_re24 = re_data[this_play.on_base_code][this_play.starting_outs] end_re24 = 0 if this_play.starting_outs + this_play.outs == 3 else re_data[new_obc][new_starting_outs] this_play.re24 = end_re24 - start_re24 + score_increment this_play.save() batter = get_one_lineup(this_play.game.id, team_id=new_bteam_id, batting_order=new_batting_order) batter_id = batter.id if batter else None pitcher = get_one_lineup(this_play.game_id, team_id=new_pteam_id, position='P') pitcher_id = pitcher.id if pitcher else None logging.debug(f'done the obc calc') next_play = Play.create(**{ 'game_id': this_play.game.id, 'play_num': this_play.play_num + 1, 'batter_id': batter_id, 'pitcher_id': pitcher_id, 'on_base_code': new_obc, 'inning_half': new_inning_half, 'inning_num': new_inning_num, 'next_inning_num': new_inning_num, 'batting_order': new_batting_order, 'starting_outs': new_starting_outs, 'away_score': new_away_score, 'home_score': new_home_score, 'on_first': new_on_first, 'on_second': new_on_second, 'on_third': new_on_third, 'is_new_inning': 1 if new_inning_half != this_play.inning_half else 0 }) # return_play = model_to_dict(next_play) return_play = convert_stratplay(next_play) db.close() return return_play def get_batting_stats(game_id, lineup_id: int = None, team_id: int = None): batting_stats = Play.select( Play.batter, fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias('pl_pa'), fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias('pl_ab'), fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias('pl_hit'), fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias('pl_rbi'), # fn.COUNT(Play.on_first_final).filter( # Play.on_first_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_first'), # fn.COUNT(Play.on_second_final).filter( # Play.on_second_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_second'), # fn.COUNT(Play.on_third_final).filter( # Play.on_third_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_third'), # fn.COUNT(Play.batter_final).filter( # Play.batter_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_batter'), fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias('pl_double'), fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias('pl_triple'), fn.SUM(Play.homerun).over(partition_by=[Play.batter_id]).alias('pl_homerun'), fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias('pl_bb'), fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias('pl_so'), fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias('pl_hbp'), fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias('pl_sac'), fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias('pl_ibb'), fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias('pl_gidp'), fn.SUM(Play.sb).over(partition_by=[Play.runner_id]).alias('pl_sb'), fn.SUM(Play.cs).over(partition_by=[Play.runner_id]).alias('pl_cs'), fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias('pl_bphr'), fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias('pl_bpfo'), fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias('pl_bp1b'), fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias('pl_bplo'), fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias('tm_pa'), fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias('tm_ab'), fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias('tm_hit'), fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias('tm_rbi'), fn.SUM(Play.double).over(partition_by=[Play.batter.team_id]).alias('tm_double'), fn.SUM(Play.triple).over(partition_by=[Play.batter.team_id]).alias('tm_triple'), fn.SUM(Play.homerun).over(partition_by=[Play.batter.team_id]).alias('tm_homerun'), fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias('tm_bb'), fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias('tm_so'), fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias('tm_hbp'), fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias('tm_sac'), fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias('tm_ibb'), fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias('tm_gidp'), fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias('tm_sb'), fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias('tm_cs'), fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias('tm_bphr'), fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias('tm_bpfo'), fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias('tm_bp1b'), fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias('tm_bplo'), ).join(Lineup, on=Play.batter).where(Play.game_id == game_id) if lineup_id is not None: batting_stats = batting_stats.where(Play.batter_id == lineup_id) elif team_id is not None: batting_stats = batting_stats.where(Play.batter.team_id == team_id) done_batters = [] return_batters = [] for x in batting_stats: if x.batter.id not in done_batters: runs_scored = Play.select(Play.pa).where( ((Play.on_first == x.batter) & (Play.on_first_final == 4)) | ((Play.on_second == x.batter) & (Play.on_second_final == 4)) | ((Play.on_third == x.batter) & (Play.on_third_final == 4)) | ((Play.batter == x.batter) & (Play.batter_final == 4)) ).count() stolen_bases = Play.select(Play.pa).where( (Play.runner == x.batter) & (Play.sb == 1) ).count() return_batters.append({ 'batter_id': x.batter_id, 'card_id': x.batter.card_id, 'team_id': x.batter.team_id, 'pos': x.batter.position, 'pl_run': runs_scored, 'pl_pa': x.pl_pa, 'pl_ab': x.pl_ab, 'pl_hit': x.pl_hit, 'pl_rbi': x.pl_rbi, 'pl_double': x.pl_double, 'pl_triple': x.pl_triple, 'pl_homerun': x.pl_homerun, 'pl_bb': x.pl_bb, 'pl_so': x.pl_so, 'pl_hbp': x.pl_hbp, 'pl_sac': x.pl_sac, 'pl_ibb': x.pl_ibb, 'pl_gidp': x.pl_gidp, 'pl_sb': stolen_bases, 'pl_cs': x.pl_cs, 'pl_bphr': x.pl_bphr, 'pl_bpfo': x.pl_bpfo, 'pl_bp1b': x.pl_bp1b, 'pl_bplo': x.pl_bplo, 'tm_pa': x.tm_pa, 'tm_ab': x.tm_ab, 'tm_hit': x.tm_hit, 'tm_rbi': x.tm_rbi, 'tm_double': x.tm_double, 'tm_triple': x.tm_triple, 'tm_homerun': x.tm_homerun, 'tm_bb': x.tm_bb, 'tm_so': x.tm_so, 'tm_hbp': x.tm_hbp, 'tm_sac': x.tm_sac, 'tm_ibb': x.tm_ibb, 'tm_gidp': x.tm_gidp, 'tm_sb': x.tm_sb, 'tm_cs': x.tm_cs, 'tm_bphr': x.tm_bphr, 'tm_bpfo': x.tm_bpfo, 'tm_bp1b': x.tm_bp1b, 'tm_bplo': x.tm_bplo, }) done_batters.append(x.batter.id) db.close() return return_batters def get_fielding_stats(game_id, lineup_id: int = None, team_id: int = None): fielding_stats = Play.select( Play.defender, Play.check_pos, fn.SUM(Play.error).over(partition_by=[Play.defender_id]).alias('pl_error'), fn.SUM(Play.hit).over(partition_by=[Play.defender_id]).alias('pl_xhit'), fn.COUNT(Play.defender).over(partition_by=[Play.defender_id]).alias('pl_xch'), fn.SUM(Play.error).over(partition_by=[Play.defender.team_id]).alias('tm_error'), ).join(Lineup, on=Play.defender).where(Play.game_id == game_id) if lineup_id is not None: fielding_stats = fielding_stats.where(Play.defender_id == lineup_id) elif team_id is not None: fielding_stats = fielding_stats.where(Play.defender.team_id == team_id) added_card_ids = [] all_stats = [] for x in fielding_stats: if x.defender.card_id not in added_card_ids: added_card_ids.append(x.defender.card_id) all_stats.append({ 'defender_id': x.defender_id, 'card_id': x.defender.card_id, 'team_id': x.defender.team_id, 'pos': x.check_pos, 'pl_error': x.pl_error, 'pl_xhit': x.pl_xhit, 'pl_xch': x.pl_xch, 'tm_error': x.pl_error, 'pl_pb': 0, 'pl_sbc': 0, 'pl_csc': 0 }) catching_stats = Play.select( Play.catcher, fn.SUM(Play.passed_ball).over(partition_by=[Play.catcher_id]).alias('pl_pb'), fn.SUM(Play.sb).over(partition_by=[Play.catcher_id]).alias('pl_sbc'), fn.SUM(Play.cs).over(partition_by=[Play.catcher_id]).alias('pl_csc') ).join(Lineup, on=Play.catcher).where(Play.game_id == game_id) if lineup_id is not None: catching_stats = catching_stats.where(Play.defender_id == lineup_id) elif team_id is not None: catching_stats = catching_stats.where(Play.defender.team_id == team_id) for x in catching_stats: all_stats.append({ 'defender_id': x.catcher_id, 'card_id': x.catcher.card_id, 'team_id': x.catcher.team_id, 'pos': 'C', 'pl_error': 0, 'pl_xhit': 0, 'pl_xch': 0, 'tm_error': 0, 'pl_pb': x.pl_pb, 'pl_sbc': x.pl_sbc, 'pl_csc': x.pl_csc }) logging.debug(f'fielding_stats: {all_stats}') db.close() return all_stats # def new_get_batting_stats(game_id, lineup_id: int = None, team_id: int = None): # return_stats = [] # # if lineup_id is not None: # batting_stats = Play.select( # Play.batter, # fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias('pl_pa'), # fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias('pl_ab'), # fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias('pl_hit'), # fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias('pl_rbi'), # fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias('pl_double'), # fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias('pl_triple'), # fn.SUM(Play.homerun).over(partition_by=[Play.batter_id]).alias('pl_homerun'), # fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias('pl_bb'), # fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias('pl_so'), # fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias('pl_hbp'), # fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias('pl_sac'), # fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias('pl_ibb'), # fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias('pl_gidp'), # fn.SUM(Play.sb).over(partition_by=[Play.batter_id]).alias('pl_sb'), # fn.SUM(Play.cs).over(partition_by=[Play.batter_id]).alias('pl_cs'), # fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias('pl_bphr'), # fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias('pl_bpfo'), # fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias('pl_bp1b'), # fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias('pl_bplo') # ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter_id == lineup_id)) # # done_batters = [] # for x in batting_stats: # if x.batter.id not in done_batters: # return_stats.append({ # 'batter_id': x.batter_id, # 'pl_pa': x.pl_pa, # 'pl_ab': x.pl_ab, # 'pl_hit': x.pl_hit, # 'pl_rbi': x.pl_rbi, # 'pl_double': x.pl_double, # 'pl_triple': x.pl_triple, # 'pl_homerun': x.pl_homerun, # 'pl_bb': x.pl_bb, # 'pl_so': x.pl_so, # 'pl_hbp': x.pl_hbp, # 'pl_sac': x.pl_sac, # 'pl_ibb': x.pl_ibb, # 'pl_gidp': x.pl_gidp, # 'pl_sb': x.pl_sb, # 'pl_cs': x.pl_cs, # 'pl_bphr': x.pl_bphr, # 'pl_bpfo': x.pl_bpfo, # 'pl_bp1b': x.pl_bp1b, # 'pl_bplo': x.pl_bplo, # }) # done_batters.append(x.batter.id) # logging.info(f'batting stats: {return_stats}') # # elif team_id is not None: # batting_stats = Play.select( # fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias('tm_pa'), # fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias('tm_ab'), # fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias('tm_hit'), # fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias('tm_rbi'), # fn.SUM(Play.double).over(partition_by=[Play.batter.team_id]).alias('tm_double'), # fn.SUM(Play.triple).over(partition_by=[Play.batter.team_id]).alias('tm_triple'), # fn.SUM(Play.homerun).over(partition_by=[Play.batter.team_id]).alias('tm_homerun'), # fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias('tm_bb'), # fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias('tm_so'), # fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias('tm_hbp'), # fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias('tm_sac'), # fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias('tm_ibb'), # fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias('tm_gidp'), # fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias('tm_sb'), # fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias('tm_ccs'), # fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias('tm_bphr'), # fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias('tm_bpfo'), # fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias('tm_bp1b'), # fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias('tm_bplo') # ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter.team_id == team_id)) # # logging.info(f'batting_stats count: {batting_stats.count()}') # done_batters = [] # for x in batting_stats: # if x.batter.id not in done_batters: # return_stats.append({ # 'tm_pa': x.tm_pa, # 'tm_ab': x.tm_ab, # 'tm_hit': x.tm_hit, # 'tm_rbi': x.tm_rbi, # 'tm_double': x.tm_double, # 'tm_triple': x.tm_triple, # 'tm_homerun': x.tm_homerun, # 'tm_bb': x.tm_bb, # 'tm_so': x.tm_so, # 'tm_hbp': x.tm_hbp, # 'tm_sac': x.tm_sac, # 'tm_ibb': x.tm_ibb, # 'tm_gidp': x.tm_gidp, # 'tm_sb': x.tm_sb, # 'tm_cs': x.tm_cs, # 'tm_bphr': x.tm_bphr, # 'tm_bpfo': x.tm_bpfo, # 'tm_bp1b': x.tm_bp1b, # 'tm_bplo': x.tm_bplo, # }) # done_batters.append(x.batter.id) # # else: # raise DatabaseError(f'Either lineup_id or team_id must be specified.') # # db.close() # return return_stats # def get_final_batting_stats(game_id: int, team_id: int): # team_lineups = Lineup.select().where( # (Lineup.game_id == game_id) & (Lineup.team_id == team_id) & (Lineup.batting_order < 10) # ).order_by(Lineup.batting_order) # logging.info(f'team_lineups count: {team_lineups.count()} / lineups: {team_lineups}') # # all_stats = [] # for line in team_lineups: # all_stats.append(get_batting_stats(game_id, lineup_id=line.id)) # # return all_stats def get_pitching_stats( game_id, lineup_id: int = None, team_id: int = None, in_pow: bool = None, in_innings: list = None): if in_innings is None: in_innings = [x for x in range(1, 30)] logging.info(f'db_calls_gameplay - get_pitching_stats - in_innings: {in_innings}') pitching_stats = Play.select( Play.pitcher, fn.SUM(Play.outs).over(partition_by=[Play.pitcher_id]).alias('pl_outs'), fn.SUM(Play.hit).over(partition_by=[Play.pitcher_id]).alias('pl_hit'), fn.COUNT(Play.on_first_final).filter( Play.on_first_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_first'), fn.COUNT(Play.on_second_final).filter( Play.on_second_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_second'), fn.COUNT(Play.on_third_final).filter( Play.on_third_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_third'), fn.COUNT(Play.batter_final).filter( Play.batter_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_batter'), fn.COUNT(Play.on_first_final).filter( (Play.on_first_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_first'), fn.COUNT(Play.on_second_final).filter( (Play.on_second_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_second'), fn.COUNT(Play.on_third_final).filter( (Play.on_third_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_third'), fn.COUNT(Play.batter_final).filter( (Play.batter_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_batter'), fn.SUM(Play.so).over(partition_by=[Play.pitcher_id]).alias('pl_so'), fn.SUM(Play.bb).over(partition_by=[Play.pitcher_id]).alias('pl_bb'), fn.SUM(Play.hbp).over(partition_by=[Play.pitcher_id]).alias('pl_hbp'), fn.SUM(Play.wild_pitch).over(partition_by=[Play.pitcher_id]).alias('pl_wild_pitch'), fn.SUM(Play.balk).over(partition_by=[Play.pitcher_id]).alias('pl_balk'), fn.SUM(Play.homerun).over(partition_by=[Play.pitcher_id]).alias('pl_homerun'), fn.SUM(Play.outs).over(partition_by=[Play.pitcher.team_id]).alias('tm_outs'), fn.SUM(Play.hit).over(partition_by=[Play.pitcher.team_id]).alias('tm_hit'), fn.COUNT(Play.on_first_final).filter( Play.on_first_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_first'), fn.COUNT(Play.on_second_final).filter( Play.on_second_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_second'), fn.COUNT(Play.on_third_final).filter( Play.on_third_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_third'), fn.COUNT(Play.batter_final).filter( Play.batter_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_batter'), fn.SUM(Play.so).over(partition_by=[Play.pitcher.team_id]).alias('tm_so'), fn.SUM(Play.bb).over(partition_by=[Play.pitcher.team_id]).alias('tm_bb'), fn.SUM(Play.hbp).over(partition_by=[Play.pitcher.team_id]).alias('tm_hbp'), fn.SUM(Play.wild_pitch).over(partition_by=[Play.pitcher.team_id]).alias('tm_wild_pitch'), fn.SUM(Play.balk).over(partition_by=[Play.pitcher.team_id]).alias('tm_balk'), fn.SUM(Play.homerun).over(partition_by=[Play.pitcher.team_id]).alias('tm_homerun'), ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) logging.debug(f'db_calls_gameplay - get_pitching_stats - pitching_stats: {pitching_stats}') # This is counging plays with multiple runs scored on 1 ER and the rest unearned # earned_runs_pl = Play.select().where( # ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) | # (Play.batter_final == 4)) & (Play.error == 0) # ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) # logging.info(f'earned_runs: {earned_runs_pl}') er_first = Play.select().where( (Play.on_first_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id)) er_second = Play.select().where( (Play.on_second_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id)) er_third = Play.select().where( (Play.on_third_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id)) er_batter = Play.select().where( (Play.batter_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id)) # earned_runs_tm = Play.select().where( # ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) | # (Play.batter_final == 4)) & (Play.error == 0) # ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) if lineup_id is not None: pitching_stats = pitching_stats.where(Play.pitcher_id == lineup_id) if in_pow is not None: pitching_stats = pitching_stats.where(Play.in_pow == in_pow) # if in_innings is not None: # pitching_stats = pitching_stats.where(Play.inning_num << in_innings) # logging.info(f'db_calls_gameplay - get_pitching_stats - in_innings: {in_innings} / query: {pitching_stats} / ' # f'stats: {pitching_stats.count()}') tm_earned_runs = None if team_id is not None: tm_er_first = Play.select().where( (Play.on_first_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) tm_er_second = Play.select().where( (Play.on_second_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) tm_er_third = Play.select().where( (Play.on_third_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) tm_er_batter = Play.select().where( (Play.batter_final == 4) & (Play.error == 0) ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id) # er_first = er_first.where(Play.pitcher.team_id == team_id) # er_second = er_second.where(Play.pitcher.team_id == team_id) # er_third = er_third.where(Play.pitcher.team_id == team_id) # er_batter = er_batter.where(Play.pitcher.team_id == team_id) pitching_stats = pitching_stats.where(Play.pitcher.team_id == team_id) tm_er_first = tm_er_first.where(Play.pitcher.team_id == team_id) tm_er_second = tm_er_second.where(Play.pitcher.team_id == team_id) tm_er_third = tm_er_third.where(Play.pitcher.team_id == team_id) tm_er_batter = tm_er_batter.where(Play.pitcher.team_id == team_id) tm_earned_runs = tm_er_first.count() + tm_er_second.count() + tm_er_third.count() + tm_er_batter.count() pl_earned_runs = er_first.count() + er_second.count() + er_third.count() + er_batter.count() done_pitchers = [] return_pitchers = [] for x in pitching_stats: if x.pitcher.id not in done_pitchers: return_pitchers.append({ 'pitcher_id': x.pitcher_id, 'card_id': x.pitcher.card_id, 'team_id': x.pitcher.team_id, 'pl_outs': x.pl_outs, 'pl_hit': x.pl_hit, 'pl_eruns': pl_earned_runs, 'pl_runs': x.pl_run_first + x.pl_run_second + x.pl_run_third + x.pl_run_batter, 'pl_in_runs': x.pl_in_run_first + x.pl_in_run_second + x.pl_in_run_third + x.pl_in_run_batter, 'pl_so': x.pl_so, 'pl_bb': x.pl_bb, 'pl_hbp': x.pl_hbp, 'pl_homerun': x.pl_homerun, 'pl_wild_pitch': x.pl_wild_pitch, 'pl_balk': x.pl_balk, 'tm_outs': x.tm_outs, 'tm_hit': x.tm_hit, 'tm_eruns': tm_earned_runs, 'tm_runs': x.tm_run_first + x.tm_run_second + x.tm_run_third + x.tm_run_batter, 'tm_so': x.tm_so, 'tm_bb': x.tm_bb, 'tm_hbp': x.tm_hbp, 'tm_homerun': x.tm_homerun, 'tm_wild_pitch': x.tm_wild_pitch, 'tm_balk': x.tm_balk, 'pl_gs': 1 if x.pitcher.after_play == 0 else 0 }) done_pitchers.append(x.pitcher_id) db.close() logging.debug(f'pitching stats: {return_pitchers}') return return_pitchers def get_pitching_decisions(game: StratGame, db_game_id: int): # is_win, is_hold, is_loss = False, False, False # away_lineups = get_team_lineups(game.id, game.away_team_id) # home_lineups = get_team_lineups(game.id, game.home_team_id) # away_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)] # home_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)] # last_play = get_current_play(game.id) logging.debug(f'this game: {game}') winner = None loser = None save = None gs = [] b_save = [] holds = [] # Get starting pitchers and update this as a pointer for the play crawl away_pitcher = Lineup.get(Lineup.game_id == game.id, Lineup.team_id == game.away_team_id, Lineup.position == 'P') home_pitcher = Lineup.get(Lineup.game_id == game.id, Lineup.team_id == game.home_team_id, Lineup.position == 'P') gs.extend([away_pitcher.card_id, home_pitcher.card_id]) logging.debug(f'SPs: {away_pitcher} / {home_pitcher}') decisions = { away_pitcher.player_id: DecisionModel( game_id=db_game_id, season=game.season, week=game.week_num, pitcher_id=away_pitcher.player_id, pitcher_team_id=away_pitcher.team_id, is_start=True ), home_pitcher.player_id: DecisionModel( game_id=db_game_id, season=game.season, week=game.week_num, pitcher_id=home_pitcher.player_id, pitcher_team_id=home_pitcher.team_id, is_start=True ) } # { : DecisionModel } for x in Play.select().where(Play.game_id == game.id): logging.debug(f'checking play num {x.play_num}') if x.inning_half == 'Top' and home_pitcher != x.pitcher: if save == home_pitcher: if x.home_score > x.away_score: holds.append(save) else: b_save.append(save) save = None home_pitcher = x.pitcher if x.home_score > x.away_score and x.home_score - x.away_score <= 3: save = home_pitcher elif x.inning_half == 'Bot' and away_pitcher != x.pitcher: if save == away_pitcher: if x.away_score > x.home_score: holds.append(save) else: b_save.append(save) save = None away_pitcher = x.pitcher if x.away_score > x.home_score and x.away_score - x.home_score <= 3: save = away_pitcher if x.is_go_ahead: logging.debug(f'is go ahead: {x}') if x.on_third_final == 4: # winning_run = x.on_third # # # Get winning run's last PA # last_pa = Play.select().where( # Play.game_id == game.id, Play.batter == x.on_third # ).order_by(-Play.id).limit(1) # loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA loser = x.pitcher if save == loser: b_save.append(save) save = None winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher elif x.on_second_final == 4: # winning_run = x.on_second # # # Get winning run's last PA # last_pa = Play.select().where( # Play.game_id == game.id, Play.batter == x.on_second # ).order_by(-Play.id).limit(1) # loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA loser = x.pitcher if save == loser: b_save.append(save) save = None winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher elif x.on_first_final == 4: # winning_run = x.on_first # # # Get winning run's last PA # last_pa = Play.select().where( # Play.game_id == game.id, Play.batter == x.on_first # ).order_by(-Play.id).limit(1) # loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA loser = x.pitcher if save == loser: b_save.append(save) save = None winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher elif x.batter_final == 4: # winning_run = x.batter # # # Get winning run's last PA # last_pa = Play.select().where( # Play.game_id == game.id, Play.batter == x.batter # ).order_by(-Play.id).limit(1) # loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA loser = x.pitcher if save == loser: b_save.append(save) save = None winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher if x.is_tied: logging.debug(f'is tied: {x}') winner, loser = None, None if save: b_save.append(save) save = None if home_pitcher.player_id not in decisions: decisions[home_pitcher.player_id] = DecisionModel( game_id=db_game_id, season=game.season, week=game.week_num, pitcher_id=home_pitcher.player_id, pitcher_team_id=home_pitcher.team_id ) if away_pitcher.player_id not in decisions: decisions[away_pitcher.player_id] = DecisionModel( game_id=db_game_id, season=game.season, week=game.week_num, pitcher_id=away_pitcher.player_id, pitcher_team_id=away_pitcher.team_id ) decisions[winner.player_id].win = 1 decisions[loser.player_id].loss = 1 if save is not None: decisions[save.player_id].is_save = 1 for x in holds: decisions[x.player_id].hold = 1 for x in b_save: decisions[x.player_id].b_save = 1 return [x.dict() for x in decisions.values()] logging.debug(f'\n\nWin: {winner}\nLose: {loser}\nSave: {save}\nBlown Save: {b_save}\nHolds: {holds}') return { 'winner': winner.card_id, 'loser': loser.card_id, 'save': save.card_id if save else None, 'b_save': b_save, 'holds': holds, 'starters': gs, 'w_lineup': winner, 'l_lineup': loser, 's_lineup': save } # for count, pit_pair in enumerate(away_pitchers): # """ # If starter & team won # check for win condition # if starter & team lost # check for loss condition # if reliever & team won # check for save situation # if save situation, check for blown save # credit either BS or Save; if Save is not None, add Save to holds and replace with this pitcher # if reliever & team lost # check for loss condition # """ # # If starter & team won # if pit_pair[0].after_play == 0 and (last_play.away_score > last_play.home_score): # if len(away_pitchers) == 1: # winner = pit_pair[0] # else: # next_p_first = away_pitchers[count + 1][1] # if next_p_first.away_score > next_p_first.home_score: # winner = pit_pair[0] # else: # winner = away_pitchers[count + 1][0] # # if starter & team lost # elif pit_pair[0].after_play == 0: # if len(away_pitchers) == 1: # loser = pit_pair[0] # else: # next_p_first = away_pitchers[count + 1][1] # if next_p_first.away_score < next_p_first.home_score: # def get_final_scorebug(away_team, home_team, away_score, home_score): # return f'```' \ # f'Team | R | H | E |\n' \ # f'{away_team["abbrev"].replace("Gauntlet-", ""): <4} | {away_stats["score"]: >2} | ' \ # f'{away_stats["hits"]: >2} | ' \ # f'{away_stats["f_lines"][0]["tm_error"] if away_stats["f_lines"] else 0: >2} |\n' \ # f'{home_team["abbrev"].replace("Gauntlet-", ""): <4} | {home_stats["score"]: >2} | ' \ # f'{home_stats["hits"]: >2} | ' \ # f'{home_stats["f_lines"][0]["tm_error"] if home_stats["f_lines"] else 0: >2} |\n' \ # f'```' class StratManagerAi(pydantic.BaseModel): id: int name: str steal: int = 5 running: int = 5 hold: int = 5 catcher_throw: int = 5 uncapped_home: int = 5 uncapped_third: int = 5 uncapped_trail: int = 5 bullpen_matchup: int = 5 behind_aggression: int = 5 ahead_aggression: int = 5 decide_throw: int = 5 """ Rating Rule of Thumb: 1: Least Aggressive 5: Average 10: Most Aggressive """ # def __init__(self, **data) -> None: # super().__init__(**data) # # seed = random.randint(1, 100) # if seed > 95: # self.steal = 10 # self.running = 10 # self.uncapped_third = 10 # self.uncapped_home = 10 # self.uncapped_trail = 10 # self.behind_aggression = 10 # self.ahead_aggression = 10 # elif seed > 80: # self.steal = 8 # self.running = 8 # self.uncapped_third = 8 # self.uncapped_home = 8 # self.uncapped_trail = 8 # self.behind_aggression = 8 # self.ahead_aggression = 5 # elif seed <= 40: # self.steal = 3 # self.running = 3 # self.uncapped_third = 3 # self.uncapped_home = 3 # self.uncapped_trail = 3 # self.behind_aggression = 5 # self.ahead_aggression = 3 # elif seed <= 15: # self.steal = 1 # self.running = 1 # self.uncapped_third = 1 # self.uncapped_home = 1 # self.uncapped_trail = 1 # self.behind_aggression = 3 # self.ahead_aggression = 1 def check_jump(self, to_base: int, outs: int) -> Optional[str]: """Returns a string to be appended to the AI note""" steal_base = f'attempt to steal' if to_base == 2 or to_base == 3: if self.steal == 10: if to_base == 2: return f'{steal_base} second if the runner has an ***** auto-jump or the safe range is 13+' else: steal_range = 13 elif self.steal >= 8: steal_range = 14 elif self.steal >= 5: steal_range = 15 elif self.steal >= 3: steal_range = 16 else: steal_range = 17 if outs == 2: steal_range += 1 elif outs == 0: steal_range -= 1 return f'{steal_base} {"second" if to_base == 2 else "third"} if their safe range is {steal_range}+' else: return None def tag_from_second(self, outs: int) -> str: """Returns a string to be posted ahead of tag up message""" tag_base = f'attempt to tag up if their safe range is' if self.running >= 8: tag_range = 5 elif self.running >= 5: tag_range = 10 else: tag_range = 12 if outs == 2: tag_range += 3 elif outs == 0: tag_range -= 2 return f'{tag_base} {tag_range}+' def tag_from_third(self, outs: int) -> str: """Returns a string to be posted with the tag up message""" tag_base = f'attempt to tag up if their safe range is' if self.running >= 8: tag_range = 8 elif self.running >= 5: tag_range = 10 else: tag_range = 12 if outs == 2: tag_range -= 2 elif outs == 0: tag_range += 2 return f'{tag_base} {tag_range}+' def uncapped_advance(self, to_base: int, outs: int) -> str: """Returns a string to be posted with the advancement message""" advance_base = f'attempt to advance if their safe range is' if to_base == 3: if self.uncapped_third >= 8: advance_range = 14 elif self.uncapped_third >= 5: advance_range = 18 else: return f'not attempt to advance' if outs == 2: advance_range += 2 else: if self.uncapped_home >= 8: advance_range = 8 elif self.uncapped_home >= 5: advance_range = 10 else: advance_range = 12 if outs == 2: advance_range -= 2 elif outs == 0: advance_range += 3 return f'{advance_base} {advance_range}+' # def uncapped_advance_runner(self, this_play: StratPlay, to_base: int, runner: BattingCard, defender_pos: CardPosition, modifier: int = -1): # total_mod = modifier + defender_pos.arm # if to_base == 3: def trail_advance(self, to_base: int, outs: int, sent_home: bool = False) -> str: """Returns a string to be posted with the advancement message""" advance_base = f'attempt to advance if their safe range is' if sent_home: if self.uncapped_trail >= 8: return 'attempt to advance' elif self.uncapped_trail >= 5: if outs == 2: return 'attempt to advance' else: advance_range = 14 else: return 'not attempt to advance' else: if self.uncapped_trail >= 8: advance_range = 14 else: advance_range = 16 return f'{advance_base} {advance_range}+' def throw_lead_runner(self, to_base: int, outs: int) -> str: """Returns a string to be posted with the throw message""" return 'throw for the lead runner' def throw_which_runner(self, to_base: int, outs: int) -> str: """Returns a string to be posted with the throw message""" if to_base == 4: return 'throw for the lead runner' else: return 'throw for the lead runner if their safe range is 14-' def gb_decide_advance(self, starting_outs: int, run_lead: int): """Returns a string to be posted with the advancement message""" advance_base = f'attempt to advance if their safe range is' if self.running >= 8: advance_range = 10 elif self.running >= 5: advance_range = 13 else: advance_range = 16 if starting_outs == 1: advance_range += 3 if run_lead >= 4: advance_range -= 3 elif run_lead < 0: advance_range += 3 return f'{advance_base} {min(advance_range, 20)}+' def gb_decide_throw(self, starting_outs: int, run_lead: int): """Returns a string to be posted with the advancement message""" throw_base = f'throw for the lead runner if their safe range is' if self.decide_throw >= 8: throw_range = 13 elif self.decide_throw >= 5: throw_range = 10 else: throw_range = 8 if starting_outs == 1: throw_range -= 3 if run_lead >= 4: throw_range -= 3 elif run_lead < 0: throw_range += 3 return f'{throw_base} {max(throw_range, 0)}-' def go_to_reliever( self, this_play, tot_allowed: int, is_starter: bool = False) -> bool: run_lead = this_play.ai_run_diff() obc = this_play.on_base_code logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: ' f'outs: {this_play.starting_outs}, obc: {obc}, run_lead: {run_lead}, ' f'tot_allowed: {tot_allowed}') lead_target = run_lead if is_starter else 3 # AI up big if tot_allowed < 5 and is_starter: logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 1') return False elif run_lead > 5 or (run_lead > 2 and self.ahead_aggression > 5): if tot_allowed <= lead_target or obc <= 3 or (this_play.starting_outs == 2 and not is_starter): logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 2') return False elif run_lead > 2 or (run_lead >= 0 and self.ahead_aggression > 5): if tot_allowed < lead_target or obc <= 1 or (this_play.starting_outs == 2 and not is_starter): logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 3') return False elif run_lead >= 0 or (run_lead >= -2 and self.behind_aggression > 5): if tot_allowed < 5 or obc <= run_lead or (this_play.starting_outs == 2 and not is_starter): logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 4') return False elif run_lead >= -3 and self.behind_aggression > 5: if tot_allowed < 5 and obc <= 1: logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 5') return False elif run_lead <= -5: if is_starter and this_play.inning_num <= 3: logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 6') return False if this_play.starting_outs != 0: logging.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 7') return False return True def convert_strat_manager(manager: ManagerAi) -> StratManagerAi: manager_dict = model_to_dict(manager) return StratManagerAi(**manager_dict) def get_manager(game) -> Optional[StratManagerAi]: if not game.ai_team: return None # manager_ai_id = game.home_team_id if game.ai_team == 'home' else game.away_team_id # manager_ai_id = 1 team_id = game.home_team_id if game.ai_team == 'home' else game.away_team_id manager_ai_id = ((datetime.datetime.now().day * team_id) % 3) + 1 if manager_ai_id > 3 or manager_ai_id < 1: manager_ai_id = 1 logging.debug(f'manager id: {manager_ai_id} for game {game}') try: this_manager = ManagerAi.get_by_id(manager_ai_id) except Exception as e: e_message = f'Could not find manager id {manager_ai_id}' logging.error(f'{e_message}: {type(e)}: {e}') raise KeyError(f'Could not find this AI manager\'s playbook') return convert_strat_manager(this_manager) db.close()