Finish loading utilities

Push queries from _models to _queries
This commit is contained in:
Cal Corum 2024-10-14 12:22:05 -05:00
parent 63e738d048
commit 428afe048e
7 changed files with 360 additions and 248 deletions

View File

@ -16,7 +16,7 @@ from in_game.game_helpers import PUBLIC_FIELDS_CATEGORY_NAME, legal_check
from in_game.gameplay_models import Lineup, Session, engine, get_card_or_none, player_description, select, Game, get_team_or_none
from in_game.gameplay_queries import get_channel_game_or_none, get_active_games_by_team
from utilities.confirm import Confirm
from utilities.buttons import Confirm
class Gameplay(commands.Cog):
@ -241,6 +241,35 @@ class Gameplay(commands.Cog):
view=None
)
@app_commands.command(name='read-lineup', description='Import a saved lineup for this channel\'s PD game.')
@app_commands.describe(
roster='Which roster to pull from your sheet?',
lineup='Which handedness lineup are you using?'
)
@app_commands.choices(
roster=[
Choice(name=1, value='Primary'),
Choice(name=2, value='Secondary'),
Choice(name=3, value='Ranked')
],
lineup=[
Choice(name=1, value='v Right'),
Choice(name=2, value='v Left')
]
)
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def read_lineup_command(self, interaction: discord.Interaction, roster: Choice[int], lineup: Choice[int]):
await interaction.response.defer()
with Session(engine) as session:
this_game = get_channel_game_or_none(session, interaction.channel_id)
if this_game is None:
await interaction.edit_original_response(
content=f'Hm. I don\'t see a game going on in this channel. Am I drunk?'
)
return
async def setup(bot):
await bot.add_cog(Gameplay(bot))

View File

@ -52,64 +52,6 @@ class Team(TeamBase, table=True):
lineups: list['Lineup'] = Relationship(back_populates='team', cascade_delete=True)
async def get_team_or_none(
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None:
if team_id is None and gm_id is None and team_abbrev is None:
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
logging.error(f'gameplay_models - get_team - {err}')
raise TypeError(err)
if not skip_cache:
if team_id is not None:
this_team = session.get(Team, team_id)
else:
if gm_id is not None:
statement = select(Team).where(Team.gmid == gm_id)
else:
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
this_team = session.exec(statement).one_or_none()
if this_team is not None:
logging.debug(f'we found a team: {this_team} / created: {this_team.created}')
tdelta = datetime.datetime.now() - this_team.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_team
else:
session.delete(this_team)
session.commit()
def cache_team(json_data: dict) -> Team:
# logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
# logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
db_team = Team.model_validate(valid_team)
# logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
if team_id is not None:
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
if t_query is not None:
return cache_team(t_query)
elif gm_id is not None:
t_query = await db_get('teams', params=[('gm_id', gm_id)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
elif team_abbrev is not None:
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
return None
class Game(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
away_team_id: int
@ -236,51 +178,6 @@ def player_description(player: Player = None, player_dict: dict = None) -> str:
return r_val
async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None:
logging.info(f'gameplay_models - get_player_or_none - player_id: {player_id}')
if not skip_cache:
this_player = session.get(Player, player_id)
if this_player is not None:
logging.info(f'we found a cached player: {this_player} / created: {this_player.created}')
tdelta = datetime.datetime.now() - this_player.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_player
else:
session.delete(this_player)
session.commit()
def cache_player(json_data: dict) -> Player:
logging.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}')
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
db_player = Player.model_validate(valid_player)
session.add(db_player)
session.commit()
session.refresh(db_player)
return db_player
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
if p_query is not None:
if 'id' not in p_query:
p_query['id'] = p_query['player_id']
if 'name' not in p_query:
p_query['name'] = p_query['p_name']
return cache_player(p_query)
return None
def get_player_id_from_dict(json_data: dict) -> int:
if 'player_id' in json_data:
return json_data['player_id']
elif 'id' in json_data:
return json_data['id']
err = 'Player ID could not be extracted from json data'
logging.error(f'{err}: {json_data}')
raise KeyError(err)
class CardBase(SQLModel):
id: int | None = Field(default=None, primary_key=True)
player_id: int = Field(foreign_key='player.id', index=True, ondelete='CASCADE')
@ -295,111 +192,6 @@ class Card(CardBase, table=True):
lineups: list['Lineup'] = Relationship(back_populates='card', cascade_delete=True)
async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card:
if not team.is_ai:
err = f'Cannot create AI cards for human teams'
logging.error(f'gameplay_models - get_or_create_ai_card: {err}')
raise TypeError(err)
logging.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}')
if not skip_cache:
c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all()
if len(c_query) > 0:
this_card = c_query[0]
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
tdelta = datetime.datetime.now() - this_card.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_card
else:
session.delete(this_card)
session.commit()
async def pull_card(p: Player, t: Team):
c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)])
if c_query['count'] > 0:
json_data = c_query['cards'][0]
logging.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}')
json_data['team_id'] = json_data['team']['id']
json_data['player_id'] = get_player_id_from_dict(json_data['player'])
valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True)
db_card = Card.model_validate(valid_card)
session.add(db_card)
session.commit()
session.refresh(db_card)
return db_card
else:
return None
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
logging.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}')
if dev_mode:
this_card = Card(player=player, team=team)
session.add(this_card)
session.commit()
session.refresh(this_card)
return this_card
await db_post(
'cards',
payload={'cards': [
{'player_id': player.id, 'team_id': team.id, 'pack_id': 1}
]}
)
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
err = f'Could not create {player.name} card for {team.abbrev}'
logging.error(f'gameplay_models - get_or_create_ai_card - {err}')
raise LookupError(err)
async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None:
if not skip_cache:
this_card = session.get(Card, card_id)
if this_card is not None:
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
tdelta = datetime.datetime.now() - this_card.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_card
else:
session.delete(this_card)
session.commit()
def cache_card(json_data: dict) -> Card:
valid_card = CardBase.model_validate(json_data, from_attributes=True)
db_card = Card.model_validate(valid_card)
session.add(db_card)
session.commit()
session.refresh(db_card)
return db_card
c_query = await db_get('cards', object_id=card_id)
if c_query is not None:
c_query['team_id'] = c_query['team']['id']
c_query['player_id'] = get_player_id_from_dict(c_query['player'])
this_player = await get_player_or_none(session, player_id=c_query['player_id'])
this_team = await get_team_or_none(session, team_id=c_query['team_id'])
if this_player is None:
raise LookupError(f'Player ID {c_query["player_id"]} not found during card check')
if this_team is None:
raise LookupError(f'Team ID {c_query["team_id"]} not found during card check')
return cache_card(c_query)
return None
class Lineup(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)

View File

@ -1,5 +1,9 @@
import datetime
import logging
from in_game.gameplay_models import Session, select, or_, Game
from sqlalchemy import func
from api_calls import db_get, db_post
from in_game.gameplay_models import CACHE_LIMIT, Card, CardBase, Player, PlayerBase, Session, Team, TeamBase, select, or_, Game
def get_games_by_channel(session: Session, channel_id: int) -> list[Game]:
@ -18,4 +22,213 @@ def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None:
def get_active_games_by_team(session: Session, team_id: int) -> list[Game]:
return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team_id, Game.home_team_id == team_id))).all()
return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team_id, Game.home_team_id == team_id))).all()
async def get_team_or_none(
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None:
if team_id is None and gm_id is None and team_abbrev is None:
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
logging.error(f'gameplay_models - get_team - {err}')
raise TypeError(err)
if not skip_cache:
if team_id is not None:
this_team = session.get(Team, team_id)
else:
if gm_id is not None:
statement = select(Team).where(Team.gmid == gm_id)
else:
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
this_team = session.exec(statement).one_or_none()
if this_team is not None:
logging.debug(f'we found a team: {this_team} / created: {this_team.created}')
tdelta = datetime.datetime.now() - this_team.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_team
else:
session.delete(this_team)
session.commit()
def cache_team(json_data: dict) -> Team:
# logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
# logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
db_team = Team.model_validate(valid_team)
# logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
if team_id is not None:
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
if t_query is not None:
return cache_team(t_query)
elif gm_id is not None:
t_query = await db_get('teams', params=[('gm_id', gm_id)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
elif team_abbrev is not None:
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
if t_query['count'] != 0:
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
return cache_team(team)
return None
async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None:
logging.info(f'gameplay_models - get_player_or_none - player_id: {player_id}')
if not skip_cache:
this_player = session.get(Player, player_id)
if this_player is not None:
logging.info(f'we found a cached player: {this_player} / created: {this_player.created}')
tdelta = datetime.datetime.now() - this_player.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_player
else:
session.delete(this_player)
session.commit()
def cache_player(json_data: dict) -> Player:
logging.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}')
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
db_player = Player.model_validate(valid_player)
session.add(db_player)
session.commit()
session.refresh(db_player)
return db_player
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
if p_query is not None:
if 'id' not in p_query:
p_query['id'] = p_query['player_id']
if 'name' not in p_query:
p_query['name'] = p_query['p_name']
return cache_player(p_query)
return None
def get_player_id_from_dict(json_data: dict) -> int:
if 'player_id' in json_data:
return json_data['player_id']
elif 'id' in json_data:
return json_data['id']
err = 'Player ID could not be extracted from json data'
logging.error(f'{err}: {json_data}')
raise KeyError(err)
async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card:
if not team.is_ai:
err = f'Cannot create AI cards for human teams'
logging.error(f'gameplay_models - get_or_create_ai_card: {err}')
raise TypeError(err)
logging.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}')
if not skip_cache:
c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all()
if len(c_query) > 0:
this_card = c_query[0]
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
tdelta = datetime.datetime.now() - this_card.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_card
else:
session.delete(this_card)
session.commit()
async def pull_card(p: Player, t: Team):
c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)])
if c_query['count'] > 0:
json_data = c_query['cards'][0]
logging.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}')
json_data['team_id'] = json_data['team']['id']
json_data['player_id'] = get_player_id_from_dict(json_data['player'])
valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True)
db_card = Card.model_validate(valid_card)
session.add(db_card)
session.commit()
session.refresh(db_card)
return db_card
else:
return None
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
logging.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}')
if dev_mode:
this_card = Card(player=player, team=team)
session.add(this_card)
session.commit()
session.refresh(this_card)
return this_card
await db_post(
'cards',
payload={'cards': [
{'player_id': player.id, 'team_id': team.id, 'pack_id': 1}
]}
)
this_card = await pull_card(player, team)
if this_card is not None:
return this_card
err = f'Could not create {player.name} card for {team.abbrev}'
logging.error(f'gameplay_models - get_or_create_ai_card - {err}')
raise LookupError(err)
async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None:
if not skip_cache:
this_card = session.get(Card, card_id)
if this_card is not None:
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
tdelta = datetime.datetime.now() - this_card.created
logging.debug(f'tdelta: {tdelta}')
if tdelta.total_seconds() < CACHE_LIMIT:
return this_card
else:
session.delete(this_card)
session.commit()
def cache_card(json_data: dict) -> Card:
valid_card = CardBase.model_validate(json_data, from_attributes=True)
db_card = Card.model_validate(valid_card)
session.add(db_card)
session.commit()
session.refresh(db_card)
return db_card
c_query = await db_get('cards', object_id=card_id)
if c_query is not None:
c_query['team_id'] = c_query['team']['id']
c_query['player_id'] = get_player_id_from_dict(c_query['player'])
this_player = await get_player_or_none(session, player_id=c_query['player_id'])
this_team = await get_team_or_none(session, team_id=c_query['team_id'])
if this_player is None:
raise LookupError(f'Player ID {c_query["player_id"]} not found during card check')
if this_team is None:
raise LookupError(f'Team ID {c_query["team_id"]} not found during card check')
return cache_card(c_query)
return None

View File

@ -2,7 +2,8 @@ import datetime
import pytest
from sqlmodel import Session
from in_game.gameplay_models import CACHE_LIMIT, Player, player_description, select, get_player_or_none, get_player_id_from_dict
from in_game.gameplay_models import CACHE_LIMIT, Player, player_description, select
from in_game.gameplay_queries import get_player_or_none, get_player_id_from_dict
from factory import session_fixture

View File

@ -1,7 +1,8 @@
import datetime
from sqlmodel import Session, select
from in_game.gameplay_models import Team, get_team_or_none, CACHE_LIMIT
from in_game.gameplay_models import Team, CACHE_LIMIT
from in_game.gameplay_queries import get_team_or_none
from factory import session_fixture, new_teams_fixture, pytest
def test_create_team(session: Session, new_teams: list[Team]):

111
utilities/buttons.py Normal file
View File

@ -0,0 +1,111 @@
import discord
from typing import Literal
class Confirm(discord.ui.View):
def __init__(self, responders: list, timeout: float = 300.0, label_type: Literal['yes', 'confirm'] = 'confirm'):
super().__init__(timeout=timeout)
if not isinstance(responders, list):
raise TypeError('responders must be a list')
self.value = None
self.responders = responders
if label_type == 'yes':
self.confirm.label = 'Yes'
self.cancel.label = 'No'
# When the confirm button is pressed, set the inner value to `True` and
# stop the View from listening to more input.
# We also send the user an ephemeral message that we're confirming their choice.
@discord.ui.button(label='Confirm', style=discord.ButtonStyle.green)
async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = True
self.clear_items()
self.stop()
# This one is similar to the confirmation button except sets the inner value to `False`
@discord.ui.button(label='Cancel', style=discord.ButtonStyle.grey)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = False
self.clear_items()
self.stop()
class ButtonOptions(discord.ui.View):
def __init__(self, responders: list, timeout: float = 300.0, labels=None):
super().__init__(timeout=timeout)
if not isinstance(responders, list):
raise TypeError('responders must be a list')
self.value = None
self.responders = responders
self.options = labels
for count, x in enumerate(labels):
if count == 0:
self.option1.label = x
if x is None or x.lower() == 'na' or x == 'N/A':
self.remove_item(self.option1)
if count == 1:
self.option2.label = x
if x is None or x.lower() == 'na' or x == 'N/A':
self.remove_item(self.option2)
if count == 2:
self.option3.label = x
if x is None or x.lower() == 'na' or x == 'N/A':
self.remove_item(self.option3)
if count == 3:
self.option4.label = x
if x is None or x.lower() == 'na' or x == 'N/A':
self.remove_item(self.option4)
if count == 4:
self.option5.label = x
if x is None or x.lower() == 'na' or x == 'N/A':
self.remove_item(self.option5)
@discord.ui.button(label='Option 1', style=discord.ButtonStyle.primary)
async def option1(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = self.options[0]
self.clear_items()
self.stop()
@discord.ui.button(label='Option 2', style=discord.ButtonStyle.primary)
async def option2(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = self.options[1]
self.clear_items()
self.stop()
@discord.ui.button(label='Option 3', style=discord.ButtonStyle.primary)
async def option3(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = self.options[2]
self.clear_items()
self.stop()
@discord.ui.button(label='Option 4', style=discord.ButtonStyle.primary)
async def option4(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = self.options[3]
self.clear_items()
self.stop()
@discord.ui.button(label='Option 5', style=discord.ButtonStyle.primary)
async def option5(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = self.options[4]
self.clear_items()
self.stop()

View File

@ -1,35 +0,0 @@
import discord
from typing import Literal
class Confirm(discord.ui.View):
def __init__(self, responders: list, timeout: float = 300.0, label_type: Literal['yes', 'confirm'] = 'confirm'):
super().__init__(timeout=timeout)
if not isinstance(responders, list):
raise TypeError('responders must be a list')
self.value = None
self.responders = responders
if label_type == 'yes':
self.confirm.label = 'Yes'
self.cancel.label = 'No'
# When the confirm button is pressed, set the inner value to `True` and
# stop the View from listening to more input.
# We also send the user an ephemeral message that we're confirming their choice.
@discord.ui.button(label='Confirm', style=discord.ButtonStyle.green)
async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = True
self.clear_items()
self.stop()
# This one is similar to the confirmation button except sets the inner value to `False`
@discord.ui.button(label='Cancel', style=discord.ButtonStyle.grey)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button):
if interaction.user not in self.responders:
return
self.value = False
self.clear_items()
self.stop()