import datetime import logging import discord from sqlmodel import Session, SQLModel, create_engine, select, or_, Field, Relationship from sqlalchemy import func from api_calls import db_get, db_post sqlite_url = 'sqlite:///storage/gameplay.db' connect_args = {"check_same_thread": False} engine = create_engine(sqlite_url, echo=False, connect_args=connect_args) CACHE_LIMIT = 1209600 # in seconds class GameCardsetLink(SQLModel, table=True): game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True) cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True) priority: int | None = Field(default=1, index=True) game: 'Game' = Relationship(back_populates='cardset_links') cardset: 'Cardset' = Relationship(back_populates='game_links') class Game(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) away_team_id: int home_team_id: int channel_id: int = Field(index=True) season: int active: bool | None = Field(default=True) is_pd: bool | None = Field(default=True) ranked: bool | None = Field(default=False) short_game: bool | None = Field(default=False) week_num: int | None = Field(default=None) game_num: int | None = Field(default=None) away_roster_id: int | None = Field(default=None) home_roster_id: int | None = Field(default=None) first_message: str | None = Field(default=None) ai_team: str | None = Field(default=None) game_type: str | None = Field(default=None) cardset_links: list[GameCardsetLink] = Relationship(back_populates='game', cascade_delete=True) lineups: list['Lineup'] = Relationship(back_populates='game', cascade_delete=True) @property def cardset_param_string(self) -> str: pri_cardsets = '' back_cardsets = '' for link in self.cardset_links: if link.priority == 1: pri_cardsets += f'&cardset_id={link.cardset_id}' else: back_cardsets += f'&backup_cardset_id={link.cardset_id}' return f'{pri_cardsets}{back_cardsets}' # @property # def game_prop(self) -> str: # return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}' class CardsetBase(SQLModel): id: int | None = Field(default=None, primary_key=True) name: str ranked_legal: bool | None = Field(default=False) class Cardset(CardsetBase, table=True): game_links: list[GameCardsetLink] = Relationship(back_populates='cardset', cascade_delete=True) players: list['Player'] = Relationship(back_populates='cardset') class TeamBase(SQLModel): id: int = Field(primary_key=True) abbrev: str = Field(index=True) sname: str lname: str gmid: int = Field(index=True) gmname: str gsheet: str wallet: int team_value: int collection_value: int logo: str | None = Field(default=None) color: str season: int career: int ranking: int has_guide: bool is_ai: bool created: datetime.datetime | None = Field(default=datetime.datetime.now()) @property def description(self) -> str: return f'{self.id}. {self.abbrev} {self.lname}, {"is_ai" if self.is_ai else "human"}' class Team(TeamBase, table=True): cards: list['Card'] = Relationship(back_populates='team', cascade_delete=True) lineups: list['Lineup'] = Relationship(back_populates='team', cascade_delete=True) async def get_team_or_none( session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None: if team_id is None and gm_id is None and team_abbrev is None: err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search' logging.error(f'gameplay_models - get_team - {err}') raise TypeError(err) if not skip_cache: if team_id is not None: this_team = session.get(Team, team_id) else: if gm_id is not None: statement = select(Team).where(Team.gmid == gm_id) else: statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower()) this_team = session.exec(statement).one_or_none() if this_team is not None: logging.debug(f'we found a team: {this_team} / created: {this_team.created}') tdelta = datetime.datetime.now() - this_team.created logging.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_team else: session.delete(this_team) session.commit() def cache_team(json_data: dict) -> Team: # logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}') valid_team = TeamBase.model_validate(json_data, from_attributes=True) # logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}') db_team = Team.model_validate(valid_team) # logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}') session.add(db_team) session.commit() session.refresh(db_team) return db_team if team_id is not None: t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)]) if t_query is not None: return cache_team(t_query) elif gm_id is not None: t_query = await db_get('teams', params=[('gm_id', gm_id)]) if t_query['count'] != 0: for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: return cache_team(team) elif team_abbrev is not None: t_query = await db_get('teams', params=[('abbrev', team_abbrev)]) if t_query['count'] != 0: for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: return cache_team(team) return None class PlayerBase(SQLModel): id: int | None = Field(primary_key=True) name: str cost: int image: str mlbclub: str franchise: str cardset_id: int | None = Field(default=None, foreign_key='cardset.id') set_num: int rarity_id: int | None = Field(default=None) pos_1: str description: str quantity: int | None = Field(default=999) image2: str | None = Field(default=None) pos_2: str | None = Field(default=None) pos_3: str | None = Field(default=None) pos_4: str | None = Field(default=None) pos_5: str | None = Field(default=None) pos_6: str | None = Field(default=None) pos_7: str | None = Field(default=None) pos_8: str | None = Field(default=None) headshot: str | None = Field(default=None) vanity_card: str | None = Field(default=None) strat_code: str | None = Field(default=None) bbref_id: str | None = Field(default=None) fangr_id: str | None = Field(default=None) mlbplayer_id: int | None = Field(default=None) created: datetime.datetime | None = Field(default=datetime.datetime.now()) class Player(PlayerBase, table=True): cardset: Cardset = Relationship(back_populates='players') cards: list['Card'] = Relationship(back_populates='player', cascade_delete=True) lineups: list['Lineup'] = Relationship(back_populates='player', cascade_delete=True) def player_description(player: Player = None, player_dict: dict = None) -> str: if player is None and player_dict is None: err = 'One of "player" or "player_dict" must be included to get full description' logging.error(f'gameplay_models - player_description - {err}') raise TypeError(err) if player is not None: return f'{player.description} {player.name}' r_val = f'{player_dict['description']}' if 'name' in player_dict: r_val += f' {player_dict["name"]}' elif 'p_name' in player_dict: r_val += f' {player_dict["p_name"]}' return r_val async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None: if not skip_cache: this_player = session.get(Player, player_id) if this_player is not None: logging.info(f'we found a cached player: {this_player} / created: {this_player.created}') tdelta = datetime.datetime.now() - this_player.created logging.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_player else: session.delete(this_player) session.commit() def cache_player(json_data: dict) -> Player: valid_player = PlayerBase.model_validate(json_data, from_attributes=True) db_player = Player.model_validate(valid_player) session.add(db_player) session.commit() session.refresh(db_player) return db_player p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)]) if p_query is not None: if 'id' not in p_query: p_query['id'] = p_query['player_id'] if 'name' not in p_query: p_query['name'] = p_query['p_name'] return cache_player(p_query) return None class CardBase(SQLModel): id: int | None = Field(default=None, primary_key=True) player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE') team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE') variant: int | None = Field(default=0) created: datetime.datetime | None = Field(default=datetime.datetime.now()) class Card(CardBase, table=True): player: Player = Relationship(back_populates='cards') team: Team = Relationship(back_populates='cards',) lineups: list['Lineup'] = Relationship(back_populates='card', cascade_delete=True) async def get_or_create_ai_card(session: Session, player: Player, team: Team) -> Card: if not team.is_ai: err = f'Cannot create AI cards for human teams' logging.error(f'gameplay_models - get_or_create_ai_card: {err}') raise TypeError(err) async def pull_card(player, team): c_query = await db_get('cards', params=[('team_id', team.id), ('player_id', player.id)]) if c_query['count'] > 0: valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True) db_card = Card.model_validate(valid_card) session.add(db_card) session.commit() session.refresh(db_card) return db_card else: return None this_card = await pull_card(player, team) if this_card is not None: return this_card logging.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}') await db_post( 'cards', payload={'cards': [ {'player_id': player.id, 'team_id': team.id, 'pack_id': 1} ]} ) this_card = await pull_card(player, team) if this_card is not None: return this_card err = f'Could not create {player.name} card for {team.abbrev}' logging.error(f'gameplay_models - get_or_create_ai_card - {err}') raise LookupError(err) async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None: if not skip_cache: this_card = session.get(Card, card_id) if this_card is not None: logging.info(f'we found a cached card: {this_card} / created: {this_card.created}') tdelta = datetime.datetime.now() - this_card.created logging.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_card else: session.delete(this_card) session.commit() def cache_card(json_data: dict) -> Card: valid_card = CardBase.model_validate(json_data, from_attributes=True) db_card = Card.model_validate(valid_card) session.add(db_card) session.commit() session.refresh(db_card) return db_card c_query = await db_get('cards', object_id=card_id) if c_query is not None: c_query['team_id'] = c_query['team']['id'] if 'player_id' in c_query['player']: c_query['player_id'] = c_query['player']['player_id'] else: c_query['player_id'] = c_query['player']['id'] return cache_card(c_query) return None class Lineup(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) position: str = Field(index=True) batting_order: int = Field(index=True) after_play: int | None = Field(default=0) replacing_id: int | None = Field(default=None) active: bool = Field(default=True, index=True) is_fatigued: bool | None = Field(default=None) game_id: int = Field(foreign_key='game.id', index=True, ondelete='CASCADE') game: Game = Relationship(back_populates='lineups') team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE') team: Team = Relationship(back_populates='lineups') player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE') player: Player = Relationship(back_populates='lineups') card_id: int = Field(foreign_key='card.id', index=True, ondelete='CASCADE') card: Card = Relationship(back_populates='lineups') """ BEGIN DEVELOPMENT HELPERS """ def create_db_and_tables(): SQLModel.metadata.create_all(engine) def create_test_games(): with Session(engine) as session: game_1 = Game( away_team_id=1, home_team_id=2, channel_id=1234, season=9, ) game_2 = Game( away_team_id=3, home_team_id=4, channel_id=5678, season=9, ) cardset_2024 = Cardset(name='2024 Season', ranked_legal=True) cardset_2022 = Cardset(name='2022 Season', ranked_legal=False) game_1_cardset_2024_link = GameCardsetLink(game=game_1, cardset=cardset_2024, priority=1) game_1_cardset_2022_link = GameCardsetLink(game=game_1, cardset=cardset_2022, priority=2) game_2_cardset_2024_link = GameCardsetLink(game=game_2, cardset=cardset_2024, priority=1) for team_id in [1, 2]: for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]: this_lineup = Lineup(team_id=team_id, card_id=order, player_id=68+order, position=pos, batting_order=order, game=game_1) for team_id in [3, 4]: for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]: this_lineup = Lineup(team_id=team_id, card_id=order, player_id=100+order, position=pos, batting_order=order, game=game_2) session.add(game_1) session.add(game_2) session.commit() def select_speed_testing(): with Session(engine) as session: game_1 = session.exec(select(Game).where(Game.id == 1)).one() ss_search_start = datetime.datetime.now() man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active] ss_search_end = datetime.datetime.now() ss_query_start = datetime.datetime.now() query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all() ss_query_end = datetime.datetime.now() manual_time = ss_search_end - ss_search_start query_time = ss_query_end - ss_query_start print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}') print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}') print(f'Game: {game_1}') games = session.exec(select(Game).where(Game.active == True)).all() print(f'len(games): {len(games)}') def select_all_testing(): with Session(engine) as session: game_search = session.exec(select(Team)).all() for game in game_search: print(f'Game: {game}') def main(): # create_db_and_tables() # create_test_games() select_speed_testing() # select_all_testing() if __name__ == "__main__": main()