335 lines
12 KiB
Python
335 lines
12 KiB
Python
import datetime
|
|
import logging
|
|
from sqlmodel import Session, SQLModel, create_engine, select, or_, Field, Relationship
|
|
from sqlalchemy import func
|
|
|
|
from api_calls import db_get
|
|
from helpers import PD_SEASON
|
|
|
|
|
|
sqlite_url = 'sqlite:///storage/gameplay.db'
|
|
connect_args = {"check_same_thread": False}
|
|
engine = create_engine(sqlite_url, echo=False, connect_args=connect_args)
|
|
CACHE_LIMIT = 1209600 # in seconds
|
|
|
|
|
|
class GameCardsetLink(SQLModel, table=True):
|
|
game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True)
|
|
cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True)
|
|
priority: int | None = Field(default=1, index=True)
|
|
|
|
game: 'Game' = Relationship(back_populates='cardset_links')
|
|
cardset: 'Cardset' = Relationship(back_populates='game_links')
|
|
|
|
|
|
class Game(SQLModel, table=True):
|
|
id: int | None = Field(default=None, primary_key=True)
|
|
away_team_id: int
|
|
home_team_id: int
|
|
channel_id: int = Field(index=True)
|
|
season: int
|
|
active: bool | None = Field(default=True)
|
|
is_pd: bool | None = Field(default=True)
|
|
ranked: bool | None = Field(default=False)
|
|
short_game: bool | None = Field(default=False)
|
|
week_num: int | None = Field(default=None)
|
|
game_num: int | None = Field(default=None)
|
|
away_roster_id: int | None = Field(default=None)
|
|
home_roster_id: int | None = Field(default=None)
|
|
first_message: str | None = Field(default=None)
|
|
ai_team: str | None = Field(default=None)
|
|
game_type: str | None = Field(default=None)
|
|
|
|
cardset_links: list[GameCardsetLink] = Relationship(back_populates='game')
|
|
lineups: list['Lineup'] = Relationship(back_populates='game')
|
|
|
|
# @property
|
|
# def game_prop(self) -> str:
|
|
# return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}'
|
|
|
|
|
|
class CardsetBase(SQLModel):
|
|
id: int | None = Field(default=None, primary_key=True)
|
|
name: str
|
|
ranked_legal: bool | None = Field(default=False)
|
|
|
|
|
|
class Cardset(CardsetBase, table=True):
|
|
game_links: list[GameCardsetLink] = Relationship(back_populates='cardset')
|
|
players: list['Player'] = Relationship(back_populates='cardset')
|
|
|
|
|
|
class Lineup(SQLModel, table=True):
|
|
id: int | None = Field(default=None, primary_key=True)
|
|
team_id: int
|
|
player_id: int
|
|
card_id: int
|
|
position: str = Field(index=True)
|
|
batting_order: int = Field(index=True)
|
|
after_play: int | None = Field(default=0)
|
|
replacing_id: int | None = Field(default=None)
|
|
active: bool = Field(default=True, index=True)
|
|
is_fatigued: bool | None = Field(default=None)
|
|
|
|
game_id: int = Field(foreign_key='game.id', index=True)
|
|
game: Game = Relationship(back_populates='lineups')
|
|
|
|
|
|
class TeamBase(SQLModel):
|
|
id: int = Field(primary_key=True)
|
|
abbrev: str = Field(index=True)
|
|
sname: str
|
|
lname: str
|
|
gmid: int = Field(index=True)
|
|
gmname: str
|
|
gsheet: str
|
|
wallet: int
|
|
team_value: int
|
|
collection_value: int
|
|
logo: str | None = Field(default=None)
|
|
color: str
|
|
season: int
|
|
career: int
|
|
ranking: int
|
|
has_guide: bool
|
|
is_ai: bool
|
|
created: datetime.datetime | None = Field(default=datetime.datetime.now())
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return f'{self.id}. {self.abbrev} {self.lname}, {"is_ai" if self.is_ai else "human"}'
|
|
|
|
|
|
class Team(TeamBase, table=True):
|
|
pass
|
|
|
|
|
|
async def get_team(
|
|
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team:
|
|
if team_id is None and gm_id is None and team_abbrev is None:
|
|
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
|
|
logging.error(f'gameplay_models - get_team - {err}')
|
|
raise TypeError(err)
|
|
|
|
if not skip_cache:
|
|
if team_id is not None:
|
|
this_team = session.get(Team, team_id)
|
|
else:
|
|
if gm_id is not None:
|
|
statement = select(Team).where(Team.gmid == gm_id)
|
|
else:
|
|
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
|
|
this_team = session.exec(statement).one_or_none()
|
|
|
|
if this_team is not None:
|
|
logging.debug(f'we found a team: {this_team} / created: {this_team.created}')
|
|
tdelta = datetime.datetime.now() - this_team.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_team
|
|
else:
|
|
session.delete(this_team)
|
|
session.commit()
|
|
|
|
def cache_team(json_data: dict) -> Team:
|
|
# logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
|
|
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
|
|
# logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
|
|
db_team = Team.model_validate(valid_team)
|
|
# logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
|
|
session.add(db_team)
|
|
session.commit()
|
|
session.refresh(db_team)
|
|
return db_team
|
|
|
|
if team_id is not None:
|
|
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
|
|
if t_query is not None:
|
|
return cache_team(t_query)
|
|
|
|
elif gm_id is not None:
|
|
t_query = await db_get('teams', params=[('season', PD_SEASON), ('gm_id', gm_id)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
elif team_abbrev is not None:
|
|
t_query = await db_get('teams', params=[('season', PD_SEASON), ('abbrev', team_abbrev)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
err = 'Team not found'
|
|
logging.error(f'gameplay_models - get_team - {err}')
|
|
raise LookupError(err)
|
|
|
|
|
|
class PlayerBase(SQLModel):
|
|
id: int | None = Field(primary_key=True)
|
|
name: str
|
|
cost: int
|
|
image: str
|
|
mlbclub: str
|
|
franchise: str
|
|
cardset_id: int | None = Field(default=None, foreign_key='cardset.id')
|
|
set_num: int
|
|
rarity_id: int | None = Field(default=None)
|
|
pos_1: str
|
|
description: str
|
|
quantity: int | None = Field(default=999)
|
|
image2: str | None = Field(default=None)
|
|
pos_2: str | None = Field(default=None)
|
|
pos_3: str | None = Field(default=None)
|
|
pos_4: str | None = Field(default=None)
|
|
pos_5: str | None = Field(default=None)
|
|
pos_6: str | None = Field(default=None)
|
|
pos_7: str | None = Field(default=None)
|
|
pos_8: str | None = Field(default=None)
|
|
headshot: str | None = Field(default=None)
|
|
vanity_card: str | None = Field(default=None)
|
|
strat_code: str | None = Field(default=None)
|
|
bbref_id: str | None = Field(default=None)
|
|
fangr_id: str | None = Field(default=None)
|
|
mlbplayer_id: int | None = Field(default=None)
|
|
created: datetime.datetime | None = Field(default=datetime.datetime.now())
|
|
|
|
|
|
class Player(PlayerBase, table=True):
|
|
cardset: Cardset = Relationship(back_populates='players')
|
|
|
|
|
|
def player_description(player: Player = None, player_dict: dict = None) -> str:
|
|
if player is None and player_dict is None:
|
|
err = 'One of "player" or "player_dict" must be included to get full description'
|
|
logging.error(f'gameplay_models - player_description - {err}')
|
|
raise TypeError(err)
|
|
|
|
if player is not None:
|
|
return f'{player.description} {player.name}'
|
|
|
|
r_val = f'{player_dict['description']}'
|
|
if 'name' in player_dict:
|
|
r_val += f' {player_dict["name"]}'
|
|
elif 'p_name' in player_dict:
|
|
r_val += f' {player_dict["p_name"]}'
|
|
return r_val
|
|
|
|
|
|
async def get_player(session: Session, player_id: int, skip_cache: bool = False) -> Player:
|
|
if not skip_cache:
|
|
this_player = session.get(Player, player_id)
|
|
|
|
if this_player is not None:
|
|
logging.info(f'we found a cached player: {this_player} / created: {this_player.created}')
|
|
tdelta = datetime.datetime.now() - this_player.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_player
|
|
else:
|
|
session.delete(this_player)
|
|
session.commit()
|
|
|
|
def cache_player(json_data: dict) -> Player:
|
|
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
|
|
db_player = Player.model_validate(valid_player)
|
|
session.add(db_player)
|
|
session.commit()
|
|
session.refresh(db_player)
|
|
return db_player
|
|
|
|
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
|
|
if p_query is not None:
|
|
if 'id' not in p_query:
|
|
p_query['id'] = p_query['player_id']
|
|
if 'name' not in p_query:
|
|
p_query['name'] = p_query['p_name']
|
|
return cache_player(p_query)
|
|
|
|
err = 'Player not found'
|
|
logging.error(f'gameplay_models - get_player - {err}')
|
|
|
|
|
|
|
|
"""
|
|
BEGIN DEVELOPMENT HELPERS
|
|
"""
|
|
|
|
|
|
def create_db_and_tables():
|
|
SQLModel.metadata.create_all(engine)
|
|
|
|
|
|
def create_test_games():
|
|
with Session(engine) as session:
|
|
game_1 = Game(
|
|
away_team_id=1,
|
|
home_team_id=2,
|
|
channel_id=1234,
|
|
season=9,
|
|
)
|
|
game_2 = Game(
|
|
away_team_id=3,
|
|
home_team_id=4,
|
|
channel_id=5678,
|
|
season=9,
|
|
)
|
|
|
|
cardset_2024 = Cardset(name='2024 Season', ranked_legal=True)
|
|
cardset_2022 = Cardset(name='2022 Season', ranked_legal=False)
|
|
|
|
game_1_cardset_2024_link = GameCardsetLink(game=game_1, cardset=cardset_2024, priority=1)
|
|
game_1_cardset_2022_link = GameCardsetLink(game=game_1, cardset=cardset_2022, priority=2)
|
|
game_2_cardset_2024_link = GameCardsetLink(game=game_2, cardset=cardset_2024, priority=1)
|
|
|
|
for team_id in [1, 2]:
|
|
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
|
|
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=68+order, position=pos, batting_order=order, game=game_1)
|
|
|
|
for team_id in [3, 4]:
|
|
for (order, pos) in [(1, 'C'), (2, '1B'), (3, '2B'), (4, '3B'), (5, 'SS'), (6, 'LF'), (7, 'CF'), (8, 'RF'), (9, 'DH')]:
|
|
this_lineup = Lineup(team_id=team_id, card_id=order, player_id=100+order, position=pos, batting_order=order, game=game_2)
|
|
|
|
session.add(game_1)
|
|
session.add(game_2)
|
|
session.commit()
|
|
|
|
|
|
def select_speed_testing():
|
|
with Session(engine) as session:
|
|
game_1 = session.exec(select(Game).where(Game.id == 1)).one()
|
|
ss_search_start = datetime.datetime.now()
|
|
man_ss = [x for x in game_1.lineups if x.position == 'SS' and x.active]
|
|
ss_search_end = datetime.datetime.now()
|
|
|
|
ss_query_start = datetime.datetime.now()
|
|
query_ss = session.exec(select(Lineup).where(Lineup.game == game_1, Lineup.position == 'SS', Lineup.active == True)).all()
|
|
ss_query_end = datetime.datetime.now()
|
|
|
|
manual_time = ss_search_end - ss_search_start
|
|
query_time = ss_query_end - ss_query_start
|
|
|
|
print(f'Manual Shortstops: time: {manual_time.microseconds} ms / {man_ss}')
|
|
print(f'Query Shortstops: time: {query_time.microseconds} ms / {query_ss}')
|
|
print(f'Game: {game_1}')
|
|
|
|
games = session.exec(select(Game).where(Game.active == True)).all()
|
|
print(f'len(games): {len(games)}')
|
|
|
|
|
|
def select_all_testing():
|
|
with Session(engine) as session:
|
|
game_search = session.exec(select(Team)).all()
|
|
for game in game_search:
|
|
print(f'Game: {game}')
|
|
|
|
|
|
def main():
|
|
# create_db_and_tables()
|
|
# create_test_games()
|
|
select_speed_testing()
|
|
# select_all_testing()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|