import datetime import logging from typing import Literal import discord import pydantic from sqlmodel import Session, SQLModel, create_engine, select, or_, Field, Relationship from sqlalchemy import func from api_calls import db_get, db_post from in_game.managerai_responses import JumpResponse sqlite_url = 'sqlite:///storage/gameplay.db' connect_args = {"check_same_thread": False} engine = create_engine(sqlite_url, echo=False, connect_args=connect_args) CACHE_LIMIT = 1209600 # in seconds class ManagerAiBase(SQLModel): id: int | None = Field(primary_key=True) name: str = Field(index=True) steal: int | None = Field(default=5) running: int | None = Field(default=5) hold: int | None = Field(default=5) catcher_throw: int | None = Field(default=5) uncapped_home: int | None = Field(default=5) uncapped_third: int | None = Field(default=5) uncapped_trail: int | None = Field(default=5) bullpen_matchup: int | None = Field(default=5) behind_aggression: int | None = Field(default=5) ahead_aggression: int | None = Field(default=5) decide_throw: int | None = Field(default=5) class ManagerAi(ManagerAiBase, table=True): def create_ai(session: Session = None): def get_new_ai(this_session: Session): all_ai = session.exec(select(ManagerAi.id)).all() if len(all_ai) == 0: logging.info(f'Creating ManagerAI records') new_ai = [ ManagerAi( name='Balanced' ), ManagerAi( name='Yolo', steal=10, running=10, hold=5, catcher_throw=10, uncapped_home=10, uncapped_third=10, uncapped_trail=10, bullpen_matchup=3, behind_aggression=10, ahead_aggression=10, decide_throw=10 ), ManagerAi( name='Safe', steal=3, running=3, hold=8, catcher_throw=5, uncapped_home=5, uncapped_third=3, uncapped_trail=5, bullpen_matchup=8, behind_aggression=5, ahead_aggression=1, decide_throw=1 ) ] for x in new_ai: session.add(x) session.commit() if session is None: with Session(engine) as session: get_new_ai(session) else: get_new_ai(session) return True def check_jump(self, to_base: Literal[2, 3, 4], num_outs: Literal[0, 1, 2], run_diff: int) -> JumpResponse | None: this_resp = JumpResponse() if to_base == 2: match self.steal: case 10: this_resp.min_safe = 12 + num_outs case self.steal if self.steal > 8 and run_diff <= 5: this_resp.min_safe = 13 + num_outs case self.steal if self.steal > 6 and run_diff <= 5: this_resp.min_safe = 14 + num_outs case self.steal if self.steal > 4 and num_outs < 2 and run_diff <= 5: this_resp.min_safe = 15 + num_outs case self.steal if self.steal > 2 and num_outs < 2 and run_diff <= 5: this_resp.min_safe = 16 + num_outs case _: this_resp = 17 + num_outs if self.steal > 7 and num_outs < 2 and run_diff <= 5: this_resp.run_if_auto_jump = True elif self.steal < 5: this_resp.must_auto_jump = True elif to_base == 3: match self.steal: case 10: this_resp.min_safe = 12 + num_outs case self.steal if self.steal > 6 and num_outs < 2 and run_diff <= 5: this_resp.min_safe = 15 + num_outs case _: this_resp.min_safe = None if self.steal == 10 and num_outs < 2 and run_diff <= 5: this_resp.run_if_auto_jump = True elif self.steal <= 5: this_resp.must_auto_jump = True elif run_diff == -1: match self.steal: case self.steal if self.steal == 10: this_resp.min_safe = 5 case self.steal if self.steal > 5: this_resp.min_safe = 7 return this_resp class GameCardsetLink(SQLModel, table=True): game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True) cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True) priority: int | None = Field(default=1, index=True) game: 'Game' = Relationship(back_populates='cardset_links') cardset: 'Cardset' = Relationship(back_populates='game_links') class TeamBase(SQLModel): id: int = Field(primary_key=True) abbrev: str = Field(index=True) sname: str lname: str gmid: int = Field(index=True) gmname: str gsheet: str wallet: int team_value: int collection_value: int logo: str | None = Field(default=None) color: str season: int career: int ranking: int has_guide: bool is_ai: bool created: datetime.datetime | None = Field(default=datetime.datetime.now()) @property def description(self) -> str: return f'{self.id}. {self.abbrev} {self.lname}, {"is_ai" if self.is_ai else "human"}' class Team(TeamBase, table=True): cards: list['Card'] = Relationship(back_populates='team', cascade_delete=True) lineups: list['Lineup'] = Relationship(back_populates='team', cascade_delete=True) # away_games: list['Game'] = Relationship(back_populates='away_team') # home_games: list['Game'] = Relationship(back_populates='home_team') class Game(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) away_team_id: int = Field(foreign_key='team.id') home_team_id: int = Field(foreign_key='team.id') channel_id: int = Field(index=True) season: int active: bool | None = Field(default=True) is_pd: bool | None = Field(default=True) ranked: bool | None = Field(default=False) short_game: bool | None = Field(default=False) week_num: int | None = Field(default=None) game_num: int | None = Field(default=None) away_roster_id: int | None = Field(default=None) home_roster_id: int | None = Field(default=None) first_message: str | None = Field(default=None) ai_team: str | None = Field(default=None) game_type: str cardset_links: list[GameCardsetLink] = Relationship(back_populates='game', cascade_delete=True) away_team: Team = Relationship( # back_populates='away_games', # sa_relationship_kwargs={ # 'primaryjoin': 'Game.away_team_id==Team.id', # 'foreign_keys': '[Game.away_team.id]', # 'lazy': 'joined' # } sa_relationship_kwargs=dict(foreign_keys="[Game.away_team_id]") ) home_team: Team = Relationship( # back_populates='home_games', # sa_relationship_kwargs={ # 'primaryjoin': 'Game.home_team_id==Team.id', # 'foreign_keys': '[Game.home_team.id]', # 'lazy': 'joined' # } sa_relationship_kwargs=dict(foreign_keys="[Game.home_team_id]") ) lineups: list['Lineup'] = Relationship(back_populates='game', cascade_delete=True) plays: list['Play'] = Relationship(back_populates='game', cascade_delete=True) @property def cardset_param_string(self) -> str: pri_cardsets = '' back_cardsets = '' for link in self.cardset_links: if link.priority == 1: pri_cardsets += f'&cardset_id={link.cardset_id}' else: back_cardsets += f'&backup_cardset_id={link.cardset_id}' return f'{pri_cardsets}{back_cardsets}' def current_play_or_none(self, session: Session): this_play = session.exec(select(Play).where(Play.game == self).order_by(Play.id.desc()).limit(1)).all() if len(this_play) == 1: return this_play[0] else: return None def get_scorebug_embed(self, session: Session, full_length: bool = True, classic: bool = True) -> discord.Embed: gt_string = ' - Unlimited' if self.game_type == 'minor-league': gt_string = ' - Minor League' elif self.game_type == 'major-league': gt_string = ' - Major League' elif self.game_type == 'hall-of-fame': gt_string = ' - Hall of Fame' elif 'gauntlet' in self.game_type: gt_string = ' - Gauntlet' elif 'flashback' in self.game_type: gt_string = ' - Flashback' elif 'exhibition' in self.game_type: gt_string = ' - Exhibition' logging.info(f'gameplay_models - Game.get_scorebug_embed - this_game: {self} / gt_string: {gt_string}') embed = discord.Embed( title=f'{self.away_team.sname} @ {self.home_team.sname}{gt_string}', color=int('a6ce39', 16) ) logging.info(f'gameplay_models - Game.get_scorebug_embed - embed: {embed}') curr_play = self.current_play_or_none(session) logging.info(f'gameplay_models - Game.get_scorebug_embed - curr_play: {self}') if curr_play is not None: embed.add_field( name='Game State', value=curr_play.scorebug_ascii, inline=False ) logging.info(f'gameplay_models - Game.get_scorebug_embed - embed post gamestate: {embed}') if classic: embed.add_field( name='Pitcher', value=curr_play.pitcher.player.name_card_link('pitching') ) embed.add_field( name='Batter', value=curr_play.batter.player.name_card_link('batting') ) logging.info(f'gameplay_models - Game.get_scorebug_embed - embed post batter: {embed}') baserunner_string = '' if curr_play.on_first is not None: baserunner_string += f'On First: {curr_play.on_first.player.name_card_link}\n' if curr_play.on_second is not None: baserunner_string += f'On Second: {curr_play.on_second.player.name_card_link}\n' if curr_play.on_third is not None: baserunner_string += f'On Third: {curr_play.on_third.player.name_card_link}' logging.info(f'gameplay_models - Game.get_scorebug_embed - baserunner_string: {baserunner_string}') if len(baserunner_string) > 0: embed.add_field(name=' ', value=' ', inline=False) embed.add_field(name='Baserunners', value=baserunner_string) embed.add_field(name='Catcher', value=curr_play.catcher.player.name_card_link) logging.info(f'gameplay_models - Game.get_scorebug_embed - embed post runners: {embed}') ai_note = curr_play.ai_note logging.info(f'gameplay_models - Game.get_scorebug_embed - ai_note: {ai_note}') if len(ai_note) > 0: gm_name = self.home_team.gmname if self.ai_team == 'home' else self.away_team.gmname embed.add_field(name=f'{gm_name} will...', value=ai_note, inline=False) return embed # @property # def game_prop(self) -> str: # return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}' class CardsetBase(SQLModel): id: int | None = Field(default=None, primary_key=True) name: str ranked_legal: bool | None = Field(default=False) class Cardset(CardsetBase, table=True): game_links: list[GameCardsetLink] = Relationship(back_populates='cardset', cascade_delete=True) players: list['Player'] = Relationship(back_populates='cardset') class PlayerBase(SQLModel): id: int | None = Field(primary_key=True) name: str cost: int image: str mlbclub: str franchise: str cardset_id: int | None = Field(default=None, foreign_key='cardset.id') set_num: int rarity_id: int | None = Field(default=None) pos_1: str description: str quantity: int | None = Field(default=999) image2: str | None = Field(default=None) pos_2: str | None = Field(default=None) pos_3: str | None = Field(default=None) pos_4: str | None = Field(default=None) pos_5: str | None = Field(default=None) pos_6: str | None = Field(default=None) pos_7: str | None = Field(default=None) pos_8: str | None = Field(default=None) headshot: str | None = Field(default=None) vanity_card: str | None = Field(default=None) strat_code: str | None = Field(default=None) bbref_id: str | None = Field(default=None) fangr_id: str | None = Field(default=None) mlbplayer_id: int | None = Field(default=None) created: datetime.datetime | None = Field(default=datetime.datetime.now()) @property def p_card_url(self): if 'pitching' in self.image: return self.image elif self.image2 is not None and 'pitching' in self.image2: return self.image2 else: logging.error(f'gameplay_models - PlayerBase - pitching card url not found for {self.id}. {self.description} {self.name}') return self.image @property def b_card_url(self): if 'batting' in self.image: return self.image elif self.image2 is not None and 'batting' in self.image2: return self.image2 else: logging.error(f'gameplay_models - PlayerBase - batting card url not found for {self.id}. {self.description} {self.name}') return self.image def name_card_link(self, which: Literal['pitching', 'batting']): if which == 'pitching': return f'[{self.name}]({self.p_card_url})' else: return f'[{self.name}]({self.b_card_url})' class Player(PlayerBase, table=True): cardset: Cardset = Relationship(back_populates='players') cards: list['Card'] = Relationship(back_populates='player', cascade_delete=True) lineups: list['Lineup'] = Relationship(back_populates='player', cascade_delete=True) def player_description(player: Player = None, player_dict: dict = None) -> str: if player is None and player_dict is None: err = 'One of "player" or "player_dict" must be included to get full description' logging.error(f'gameplay_models - player_description - {err}') raise TypeError(err) if player is not None: return f'{player.description} {player.name}' r_val = f'{player_dict['description']}' if 'name' in player_dict: r_val += f' {player_dict["name"]}' elif 'p_name' in player_dict: r_val += f' {player_dict["p_name"]}' return r_val class CardBase(SQLModel): id: int | None = Field(default=None, primary_key=True) player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE') team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE') variant: int | None = Field(default=0) created: datetime.datetime | None = Field(default=datetime.datetime.now()) class Card(CardBase, table=True): player: Player = Relationship(back_populates='cards') team: Team = Relationship(back_populates='cards',) lineups: list['Lineup'] = Relationship(back_populates='card', cascade_delete=True) class Lineup(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) position: str = Field(index=True) batting_order: int = Field(index=True) after_play: int | None = Field(default=0) replacing_id: int | None = Field(default=None) active: bool = Field(default=True, index=True) is_fatigued: bool | None = Field(default=None) game_id: int = Field(foreign_key='game.id', index=True, ondelete='CASCADE') game: Game = Relationship(back_populates='lineups') team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE') team: Team = Relationship(back_populates='lineups') player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE') player: Player = Relationship(back_populates='lineups') card_id: int = Field(foreign_key='card.id', index=True, ondelete='CASCADE') card: Card = Relationship(back_populates='lineups') # TODO: add function to return string value of game stats class PlayBase(SQLModel): id: int | None = Field(default=None, primary_key=True) game_id: int = Field(foreign_key='game.id') play_num: int batter_id: int = Field(foreign_key='lineup.id') pitcher_id: int = Field(foreign_key='lineup.id') on_base_code: int = Field(default=0) inning_half: str = Field(default='away') inning_num: int = Field(default=1, ge=1) batting_order: int = Field(default=1, ge=1, le=9) starting_outs: int = Field(default=0, ge=0, le=2) away_score: int = Field(default=0) home_score: int = Field(default=0) batter_pos: str | None = Field(default=None) in_pow: bool = Field(default=False) on_first_id: int | None = Field(default=None, foreign_key='lineup.id') on_first_final: int | None = Field(default=None) on_second_id: int | None = Field(default=None, foreign_key='lineup.id') on_second_final: int | None = Field(default=None) on_third_id: int | None = Field(default=None, foreign_key='lineup.id') on_third_final: int | None = Field(default=None) batter_final: int | None = Field(default=None) pa: int = Field(default=0) ab: int = Field(default=0) run: int = Field(default=0) e_run: int = Field(default=0) hit: int = Field(default=0) rbi: int = Field(default=0) double: int = Field(default=0) triple: int = Field(default=0) homerun: int = Field(default=0) bb: int = Field(default=0) so: int = Field(default=0) hbp: int = Field(default=0) sac: int = Field(default=0) ibb: int = Field(default=0) gidp: int = Field(default=0) bphr: int = Field(default=0) bpfo: int = Field(default=0) bp1b: int = Field(default=0) bplo: int = Field(default=0) sb: int = Field(default=0) cs: int = Field(default=0) outs: int = Field(default=0) wpa: float = Field(default=0) re24: float = Field(default=0) catcher_id: int = Field(foreign_key='lineup.id') defender_id: int | None = Field(default=None, foreign_key='lineup.id') runner_id: int | None = Field(default=None, foreign_key='lineup.id') check_pos: str | None = Field(default=None) error: int = Field(default=0) wild_pitch: int = Field(default=0) passed_ball: int = Field(default=0) pick_off: int = Field(default=0) balk: int = Field(default=0) complete: bool = Field(default=False) locked: bool = Field(default=False) is_go_ahead: bool = Field(default=False) is_tied: bool = Field(default=False) is_new_inning: bool = Field(default=False) def ai_run_diff(self): if self.game.ai_team == 'away': return self.away_score - self.home_score else: return self.home_score - self.away_score class Play(PlayBase, table=True): game: Game = Relationship(back_populates='plays') batter: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.batter_id]") ) pitcher: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.pitcher_id]") ) on_first: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.on_first_id]") ) on_second: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.on_second_id]") ) on_third: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.on_third_id]") ) catcher: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.catcher_id]") ) defender: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.defender_id]") ) runner: Lineup = Relationship( sa_relationship_kwargs=dict(foreign_keys="[Play.runner_id]") ) @property def scorebug_ascii(self): occupied = '●' unoccupied = '○' first_base = unoccupied if not self.on_first else occupied second_base = unoccupied if not self.on_second else occupied third_base = unoccupied if not self.on_third else occupied half = '▲' if self.inning_half == 'Top' else '▼' if self.game.active: inning = f'{half} {self.inning_num}' outs = f'{self.starting_outs} Out{"s" if self.starting_outs != 1 else ""}' else: inning = f'F/{self.inning_num if self.inning_half == "Bot" else self.inning_num - 1}' outs = '' game_string = f'```\n' \ f'{self.game.away_team.abbrev.replace("Gauntlet-", ""): ^5}{self.away_score: ^3} {second_base}{inning: >10}\n' \ f'{self.game.home_team.abbrev.replace("Gauntlet-", ""): ^5}{self.home_score: ^3} {third_base} {first_base}{outs: >8}\n```' return game_string @property def pitching_ai_note(self) -> str: ai_note = '' # Holding Baserunners if self.starting_outs == 2 and self.on_base_code > 0: if self.on_base_code in [1, 2]: ai_note += f'- hold the runner\n' elif self.on_base_code in [4, 7]: ai_note += f'- hold the runners\n' elif self.on_base_code == 5: ai_note += f'- hold the runner on first\n' elif self.on_base_code == 6: ai_note += f'- hold the runner on second\n' elif self.on_base_code in [1, 5]: ai_note += f'- hold the runner on 1st if they have ***** auto-jump\n' elif self.on_base_code == 2: ai_note += f'- hold the runner on 2nd if safe range is 14+\n' # Defensive Alignment if self.on_third and self.starting_outs < 2: if self.on_first: ai_note += f'- play the corners in\n' elif abs(self.away_score - self.home_score) <= 3: ai_note += f'- play the whole infield in\n' else: ai_note += f'- play the corners in\n' return ai_note @property def batting_ai_note(self) -> str: ai_note = '' # TODO: migrate Manager AI to their own local model return ai_note @property def ai_note(self) -> str: # TODO: test these three functions with specific OBCs if self.inning_half == 'top': if self.game.ai_team == 'away': return self.batting_ai_note else: return self.pitching_ai_note else: if self.game.ai_team == 'away': return self.pitching_ai_note else: return self.batting_ai_note """ BEGIN DEVELOPMENT HELPERS """ def create_db_and_tables(): SQLModel.metadata.create_all(engine) ManagerAi.create_ai() def create_test_games(): with Session(engine) as session: game_1 = Game( away_team_id=1, home_team_id=2, channel_id=1234, season=9, ) game_2 = Game( away_team_id=3, home_team_id=4, channel_id=5678, season=9, ) cardset_2024 = Cardset(name='2024 Season', ranked_legal=True) cardset_2022 = Cardset(name='2022 Season', ranked_legal=False) game_1_cardset_2024_link = GameCardsetLink(game=game_1, cardset=cardset_2024, priority=1) game_1_cardset_2022_link = GameCardsetLink(game=game_1, cardset=cardset_2022, priority=2) game_2_cardset_2024_link = GameCardsetLink(game=game_2, cardset=cardset_2024, priority=1) for team_id in [1, 2]: for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]: this_lineup = Lineup(team_id=team_id, card_id=order, player_id=68+order, position=pos, batting_order=order, game=game_1) for team_id in [3, 4]: for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]: this_lineup = Lineup(team_id=team_id, card_id=order, player_id=100+order, position=pos, batting_order=order, game=game_2) session.add(game_1) session.add(game_2) session.commit() def select_speed_testing(): with Session(engine) as session: game_1 = session.exec(select(Game).where(Game.id == 1)).one() ss_search_start = datetime.datetime.now() man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active] ss_search_end = datetime.datetime.now() ss_query_start = datetime.datetime.now() query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all() ss_query_end = datetime.datetime.now() manual_time = ss_search_end - ss_search_start query_time = ss_query_end - ss_query_start print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}') print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}') print(f'Game: {game_1}') games = session.exec(select(Game).where(Game.active == True)).all() print(f'len(games): {len(games)}') def select_all_testing(): with Session(engine) as session: game_search = session.exec(select(Team)).all() for game in game_search: print(f'Game: {game}') def main(): # create_db_and_tables() # create_test_games() select_speed_testing() # select_all_testing() if __name__ == "__main__": main()