paper-dynasty-discord/in_game/gameplay_models.py

410 lines
15 KiB
Python

import datetime
import logging
import discord
from sqlmodel import Session, SQLModel, create_engine, select, or_, Field, Relationship
from sqlalchemy import func
from api_calls import db_get, db_post
sqlite_url = 'sqlite:///storage/gameplay.db'
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=False, connect_args=connect_args)
CACHE_LIMIT = 1209600 # in seconds
class GameCardsetLink(SQLModel, table=True):
game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True)
cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True)
priority: int | None = Field(default=1, index=True)
game: 'Game' = Relationship(back_populates='cardset_links')
cardset: 'Cardset' = Relationship(back_populates='game_links')
class Game(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
away_team_id: int
home_team_id: int
channel_id: int = Field(index=True)
season: int
active: bool | None = Field(default=True)
is_pd: bool | None = Field(default=True)
ranked: bool | None = Field(default=False)
short_game: bool | None = Field(default=False)
week_num: int | None = Field(default=None)
game_num: int | None = Field(default=None)
away_roster_id: int | None = Field(default=None)
home_roster_id: int | None = Field(default=None)
first_message: str | None = Field(default=None)
ai_team: str | None = Field(default=None)
game_type: str | None = Field(default=None)
cardset_links: list[GameCardsetLink] = Relationship(back_populates='game', cascade_delete=True)
lineups: list['Lineup'] = Relationship(back_populates='game', cascade_delete=True)
@property
def cardset_param_string(self) -> str:
pri_cardsets = ''
back_cardsets = ''
for link in self.cardset_links:
if link.priority == 1:
pri_cardsets += f'&cardset_id={link.cardset_id}'
else:
back_cardsets += f'&backup_cardset_id={link.cardset_id}'
return f'{pri_cardsets}{back_cardsets}'
# @property
# def game_prop(self) -> str:
# return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}'
class CardsetBase(SQLModel):
id: int | None = Field(default=None, primary_key=True)
name: str
ranked_legal: bool | None = Field(default=False)
class Cardset(CardsetBase, table=True):
game_links: list[GameCardsetLink] = Relationship(back_populates='cardset', cascade_delete=True)
players: list['Player'] = Relationship(back_populates='cardset')
class TeamBase(SQLModel):
id: int = Field(primary_key=True)
abbrev: str = Field(index=True)
sname: str
lname: str
gmid: int = Field(index=True)
gmname: str
gsheet: str
wallet: int
team_value: int
collection_value: int
logo: str | None = Field(default=None)
color: str
season: int
career: int
ranking: int
has_guide: bool
is_ai: bool
created: datetime.datetime | None = Field(default=datetime.datetime.now())
@property
def description(self) -> str:
return f'{self.id}. {self.abbrev} {self.lname}, {"is_ai" if self.is_ai else "human"}'
class Team(TeamBase, table=True):
cards: list['Card'] = Relationship(back_populates='team', cascade_delete=True)
lineups: list['Lineup'] = Relationship(back_populates='team', cascade_delete=True)
async def get_team(
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team:
if team_id is None and gm_id is None and team_abbrev is None:
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
logging.error(f'gameplay_models - get_team - {err}')
raise TypeError(err)
if not skip_cache:
if team_id is not None:
this_team = session.get(Team, team_id)
else:
if gm_id is not None:
statement = select(Team).where(Team.gmid == gm_id)
else:
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
this_team = session.exec(statement).one_or_none()
if this_team is not None:
logging.debug(f'we found a team: {this_team} / created: {this_team.created}')
tdelta = datetime.datetime.now() - this_team.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_team
else:
session.delete(this_team)
session.commit()
def cache_team(json_data: dict) -> Team:
# logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
# logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
db_team = Team.model_validate(valid_team)
# logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
if team_id is not None:
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
if t_query is not None:
return cache_team(t_query)
elif gm_id is not None:
t_query = await db_get('teams', params=[('gm_id', gm_id)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
elif team_abbrev is not None:
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
err = 'Team not found'
logging.error(f'gameplay_models - get_team - {err}')
raise LookupError(err)
class PlayerBase(SQLModel):
id: int | None = Field(primary_key=True)
name: str
cost: int
image: str
mlbclub: str
franchise: str
cardset_id: int | None = Field(default=None, foreign_key='cardset.id')
set_num: int
rarity_id: int | None = Field(default=None)
pos_1: str
description: str
quantity: int | None = Field(default=999)
image2: str | None = Field(default=None)
pos_2: str | None = Field(default=None)
pos_3: str | None = Field(default=None)
pos_4: str | None = Field(default=None)
pos_5: str | None = Field(default=None)
pos_6: str | None = Field(default=None)
pos_7: str | None = Field(default=None)
pos_8: str | None = Field(default=None)
headshot: str | None = Field(default=None)
vanity_card: str | None = Field(default=None)
strat_code: str | None = Field(default=None)
bbref_id: str | None = Field(default=None)
fangr_id: str | None = Field(default=None)
mlbplayer_id: int | None = Field(default=None)
created: datetime.datetime | None = Field(default=datetime.datetime.now())
class Player(PlayerBase, table=True):
cardset: Cardset = Relationship(back_populates='players')
cards: list['Card'] = Relationship(back_populates='player', cascade_delete=True)
lineups: list['Lineup'] = Relationship(back_populates='player', cascade_delete=True)
def player_description(player: Player = None, player_dict: dict = None) -> str:
if player is None and player_dict is None:
err = 'One of "player" or "player_dict" must be included to get full description'
logging.error(f'gameplay_models - player_description - {err}')
raise TypeError(err)
if player is not None:
return f'{player.description} {player.name}'
r_val = f'{player_dict['description']}'
if 'name' in player_dict:
r_val += f' {player_dict["name"]}'
elif 'p_name' in player_dict:
r_val += f' {player_dict["p_name"]}'
return r_val
async def get_player(session: Session, player_id: int, skip_cache: bool = False) -> Player:
if not skip_cache:
this_player = session.get(Player, player_id)
if this_player is not None:
logging.info(f'we found a cached player: {this_player} / created: {this_player.created}')
tdelta = datetime.datetime.now() - this_player.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_player
else:
session.delete(this_player)
session.commit()
def cache_player(json_data: dict) -> Player:
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
db_player = Player.model_validate(valid_player)
session.add(db_player)
session.commit()
session.refresh(db_player)
return db_player
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
if p_query is not None:
if 'id' not in p_query:
p_query['id'] = p_query['player_id']
if 'name' not in p_query:
p_query['name'] = p_query['p_name']
return cache_player(p_query)
err = 'Player not found'
logging.error(f'gameplay_models - get_player - {err}')
class CardBase(SQLModel):
id: int | None = Field(default=None, primary_key=True)
player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE')
team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE')
variant: int | None = Field(default=0)
created: datetime.datetime | None = Field(default=datetime.datetime.now())
class Card(CardBase, table=True):
player: Player = Relationship(back_populates='cards')
team: Team = Relationship(back_populates='cards',)
lineups: list['Lineup'] = Relationship(back_populates='card', cascade_delete=True)
async def get_or_create_ai_card(session: Session, player: Player, team: Team) -> Card:
if not team.is_ai:
err = f'Cannot create AI cards for human teams'
logging.error(f'gameplay_models - get_or_create_ai_card: {err}')
raise TypeError(err)
async def pull_card(player, team):
c_query = await db_get('cards', params=[('team_id', team.id), ('player_id', player.id)])
if c_query['count'] > 0:
valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True)
db_card = Card.model_validate(valid_card)
session.add(db_card)
session.commit()
session.refresh(db_card)
return db_card
else:
return None
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
logging.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}')
await db_post(
'cards',
payload={'cards': [
{'player_id': player.id, 'team_id': team.id, 'pack_id': 1}
]}
)
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
err = f'Could not create {player.name} card for {team.abbrev}'
logging.error(f'gameplay_models - get_or_create_ai_card - {err}')
raise LookupError(err)
class Lineup(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
position: str = Field(index=True)
batting_order: int = Field(index=True)
after_play: int | None = Field(default=0)
replacing_id: int | None = Field(default=None)
active: bool = Field(default=True, index=True)
is_fatigued: bool | None = Field(default=None)
game_id: int = Field(foreign_key='game.id', index=True, ondelete='CASCADE')
game: Game = Relationship(back_populates='lineups')
team_id: int = Field(foreign_key='team.id', index=True, ondelete='CASCADE')
team: Team = Relationship(back_populates='lineups')
player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE')
player: Player = Relationship(back_populates='lineups')
card_id: int = Field(foreign_key='card.id', index=True, ondelete='CASCADE')
card: Card = Relationship(back_populates='lineups')
"""
BEGIN DEVELOPMENT HELPERS
"""
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_test_games():
with Session(engine) as session:
game_1 = Game(
away_team_id=1,
home_team_id=2,
channel_id=1234,
season=9,
)
game_2 = Game(
away_team_id=3,
home_team_id=4,
channel_id=5678,
season=9,
)
cardset_2024 = Cardset(name='2024 Season', ranked_legal=True)
cardset_2022 = Cardset(name='2022 Season', ranked_legal=False)
game_1_cardset_2024_link = GameCardsetLink(game=game_1, cardset=cardset_2024, priority=1)
game_1_cardset_2022_link = GameCardsetLink(game=game_1, cardset=cardset_2022, priority=2)
game_2_cardset_2024_link = GameCardsetLink(game=game_2, cardset=cardset_2024, priority=1)
for team_id in [1, 2]:
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=68+order, position=pos, batting_order=order, game=game_1)
for team_id in [3, 4]:
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=100+order, position=pos, batting_order=order, game=game_2)
session.add(game_1)
session.add(game_2)
session.commit()
def select_speed_testing():
with Session(engine) as session:
game_1 = session.exec(select(Game).where(Game.id == 1)).one()
ss_search_start = datetime.datetime.now()
man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active]
ss_search_end = datetime.datetime.now()
ss_query_start = datetime.datetime.now()
query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all()
ss_query_end = datetime.datetime.now()
manual_time = ss_search_end - ss_search_start
query_time = ss_query_end - ss_query_start
print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}')
print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}')
print(f'Game: {game_1}')
games = session.exec(select(Game).where(Game.active == True)).all()
print(f'len(games): {len(games)}')
def select_all_testing():
with Session(engine) as session:
game_search = session.exec(select(Team)).all()
for game in game_search:
print(f'Game: {game}')
def main():
# create_db_and_tables()
# create_test_games()
select_speed_testing()
# select_all_testing()
if __name__ == "__main__":
main()