paper-dynasty-discord/in_game/gameplay_db.py
2024-10-12 02:08:05 -05:00

223 lines
7.7 KiB
Python

import datetime
import logging
from sqlmodel import Session, SQLModel, create_engine, select, Field, Relationship
from db_calls import db_get
from helpers import PD_SEASON
sqlite_url = 'sqlite:///storage/gameplay.db'
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=False, connect_args=connect_args)
class GameCardsetLink(SQLModel, table=True):
game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True)
cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True)
priority: int | None = Field(default=1, index=True)
game: 'Game' = Relationship(back_populates='cardset_links')
cardset: 'Cardset' = Relationship(back_populates='game_links')
class Game(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
away_team_id: int
home_team_id: int
channel_id: int = Field(index=True)
season: int
active: bool | None = Field(default=True)
is_pd: bool | None = Field(default=True)
ranked: bool | None = Field(default=False)
short_game: bool | None = Field(default=False)
week_num: int | None = Field(default=None)
game_num: int | None = Field(default=None)
away_roster_id: int | None = Field(default=None)
home_roster_id: int | None = Field(default=None)
first_message: int | None = Field(default=None)
ai_team: str | None = Field(default=None)
game_type: str | None = Field(default=None)
cardset_links: list[GameCardsetLink] = Relationship(back_populates='game')
lineups: list['Lineup'] = Relationship(back_populates='game')
# @property
# def game_prop(self) -> str:
# return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}'
class Cardset(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
ranked_legal: bool | None = Field(default=False)
game_links: list[GameCardsetLink] = Relationship(back_populates='cardset')
class Lineup(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
team_id: int
player_id: int
card_id: int
position: str = Field(index=True)
batting_order: int = Field(index=True)
after_play: int | None = Field(default=0)
replacing_id: int | None = Field(default=None)
active: bool = Field(default=True, index=True)
is_fatigued: bool | None = Field(default=None)
game_id: int = Field(foreign_key='game.id', index=True)
game: Game = Relationship(back_populates='lineups')
class Team(SQLModel, table=True):
id: int = Field(primary_key=True)
abbrev: str = Field(index=True)
sname: str
lname: str
gmid: int = Field(index=True)
gmname: str
gsheet: str
wallet: int
team_value: int
collection_value: int
logo: str | None = Field(default=None)
color: str
season: int
career: int
ranking: int
has_guide: bool
is_ai: bool
created: datetime.datetime | None = Field(default=datetime.datetime.now())
async def get_team(
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team:
if team_id is None and gm_id is None and team_abbrev is None:
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
logging.error(f'gameplay_db - get_team - {err}')
raise TypeError(err)
if not skip_cache:
if team_id is not None:
this_team = session.get(Team, team_id)
else:
if gm_id is not None:
statement = select(Team).where(Team.gmid == gm_id)
else:
statement = select(Team).where(Team.abbrev.lower() == team_abbrev.lower())
this_team = session.exec(statement).one_or_none
if this_team is not None:
tdelta = datetime.datetime.now() - this_team.created
if tdelta.total_seconds() < 1209600:
return this_team
else:
session.delete(this_team)
session.commit()
def cache_team(json_data: dict) -> Team:
db_team = Team.model_validate(t_query)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
if team_id is not None:
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
if t_query is not None:
return cache_team(t_query)
elif gm_id is not None:
t_query = await db_get('teams', params=[('season', PD_SEASON), ('gm_id', gm_id)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x.abbrev.lower()]:
return cache_team(team)
elif team_abbrev is not None:
t_query = await db_get('teams', params=[('season', PD_SEASON), ('abbrev', team_abbrev)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x.abbrev.lower()]:
return cache_team(team)
err = 'Team not found'
logging.error(f'gameplay_db - get_team - {err}')
raise LookupError(err)
"""
BEGIN DEVELOPMENT HELPERS
"""
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_test_games():
with Session(engine) as session:
game_1 = Game(
away_team_id=1,
home_team_id=2,
channel_id=1234,
season=9,
)
game_2 = Game(
away_team_id=3,
home_team_id=4,
channel_id=5678,
season=9,
)
cardset_2024 = Cardset(name='2024 Season', ranked_legal=True)
cardset_2022 = Cardset(name='2022 Season', ranked_legal=False)
game_1_cardset_2024_link = GameCardsetLink(game=game_1, cardset=cardset_2024, priority=1)
game_1_cardset_2022_link = GameCardsetLink(game=game_1, cardset=cardset_2022, priority=2)
game_2_cardset_2024_link = GameCardsetLink(game=game_2, cardset=cardset_2024, priority=1)
for team_id in [1, 2]:
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=68+order, position=pos, batting_order=order, game=game_1)
for team_id in [3, 4]:
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=100+order, position=pos, batting_order=order, game=game_2)
session.add(game_1)
session.add(game_2)
session.commit()
def select_speed_testing():
with Session(engine) as session:
game_1 = session.exec(select(Game).where(Game.id == 1)).one()
ss_search_start = datetime.datetime.now()
man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active]
ss_search_end = datetime.datetime.now()
ss_query_start = datetime.datetime.now()
query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all()
ss_query_end = datetime.datetime.now()
manual_time = ss_search_end - ss_search_start
query_time = ss_query_end - ss_query_start
print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}')
print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}')
print(f'Game: {game_1}')
games = session.exec(select(Game).where(Game.active == True)).all()
print(f'len(games): {len(games)}')
def main():
# create_db_and_tables()
# create_test_games()
select_speed_testing()
if __name__ == "__main__":
main()