247 lines
9.8 KiB
Python
247 lines
9.8 KiB
Python
import datetime
|
|
import logging
|
|
|
|
from sqlalchemy import func
|
|
from api_calls import db_get, db_post
|
|
from in_game.gameplay_models import CACHE_LIMIT, Card, CardBase, Lineup, Player, PlayerBase, Session, Team, TeamBase, select, or_, Game
|
|
|
|
|
|
def get_games_by_channel(session: Session, channel_id: int) -> list[Game]:
|
|
return session.exec(select(Game).where(Game.channel_id == channel_id, Game.active)).all()
|
|
|
|
|
|
def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None:
|
|
all_games = get_games_by_channel(session, channel_id)
|
|
if len(all_games) > 1:
|
|
err = 'Too many games found in get_channel_game_or_none'
|
|
logging.error(f'cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}')
|
|
raise Exception(err)
|
|
elif len(all_games) == 0:
|
|
return None
|
|
return all_games[0]
|
|
|
|
|
|
def get_active_games_by_team(session: Session, team_id: int) -> list[Game]:
|
|
return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team_id, Game.home_team_id == team_id))).all()
|
|
|
|
|
|
async def get_team_or_none(
|
|
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None:
|
|
if team_id is None and gm_id is None and team_abbrev is None:
|
|
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
|
|
logging.error(f'gameplay_models - get_team - {err}')
|
|
raise TypeError(err)
|
|
|
|
if not skip_cache:
|
|
if team_id is not None:
|
|
this_team = session.get(Team, team_id)
|
|
else:
|
|
if gm_id is not None:
|
|
statement = select(Team).where(Team.gmid == gm_id)
|
|
else:
|
|
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
|
|
this_team = session.exec(statement).one_or_none()
|
|
|
|
if this_team is not None:
|
|
logging.debug(f'we found a team: {this_team} / created: {this_team.created}')
|
|
tdelta = datetime.datetime.now() - this_team.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_team
|
|
else:
|
|
session.delete(this_team)
|
|
session.commit()
|
|
|
|
def cache_team(json_data: dict) -> Team:
|
|
# logging.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
|
|
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
|
|
# logging.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
|
|
db_team = Team.model_validate(valid_team)
|
|
# logging.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
|
|
session.add(db_team)
|
|
session.commit()
|
|
session.refresh(db_team)
|
|
return db_team
|
|
|
|
if team_id is not None:
|
|
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
|
|
if t_query is not None:
|
|
return cache_team(t_query)
|
|
|
|
elif gm_id is not None:
|
|
t_query = await db_get('teams', params=[('gm_id', gm_id)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
elif team_abbrev is not None:
|
|
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
return None
|
|
|
|
|
|
async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None:
|
|
logging.info(f'gameplay_models - get_player_or_none - player_id: {player_id}')
|
|
if not skip_cache:
|
|
this_player = session.get(Player, player_id)
|
|
|
|
if this_player is not None:
|
|
logging.info(f'we found a cached player: {this_player} / created: {this_player.created}')
|
|
tdelta = datetime.datetime.now() - this_player.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_player
|
|
else:
|
|
session.delete(this_player)
|
|
session.commit()
|
|
|
|
def cache_player(json_data: dict) -> Player:
|
|
logging.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}')
|
|
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
|
|
db_player = Player.model_validate(valid_player)
|
|
session.add(db_player)
|
|
session.commit()
|
|
session.refresh(db_player)
|
|
return db_player
|
|
|
|
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
|
|
if p_query is not None:
|
|
if 'id' not in p_query:
|
|
p_query['id'] = p_query['player_id']
|
|
if 'name' not in p_query:
|
|
p_query['name'] = p_query['p_name']
|
|
return cache_player(p_query)
|
|
|
|
return None
|
|
|
|
|
|
def get_player_id_from_dict(json_data: dict) -> int:
|
|
if 'player_id' in json_data:
|
|
return json_data['player_id']
|
|
elif 'id' in json_data:
|
|
return json_data['id']
|
|
err = 'Player ID could not be extracted from json data'
|
|
logging.error(f'{err}: {json_data}')
|
|
raise KeyError(err)
|
|
|
|
|
|
async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card:
|
|
if not team.is_ai:
|
|
err = f'Cannot create AI cards for human teams'
|
|
logging.error(f'gameplay_models - get_or_create_ai_card: {err}')
|
|
raise TypeError(err)
|
|
|
|
logging.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}')
|
|
if not skip_cache:
|
|
c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all()
|
|
|
|
if len(c_query) > 0:
|
|
this_card = c_query[0]
|
|
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
|
|
tdelta = datetime.datetime.now() - this_card.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_card
|
|
else:
|
|
session.delete(this_card)
|
|
session.commit()
|
|
|
|
async def pull_card(p: Player, t: Team):
|
|
c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)])
|
|
if c_query['count'] > 0:
|
|
json_data = c_query['cards'][0]
|
|
logging.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}')
|
|
json_data['team_id'] = json_data['team']['id']
|
|
json_data['player_id'] = get_player_id_from_dict(json_data['player'])
|
|
valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True)
|
|
db_card = Card.model_validate(valid_card)
|
|
session.add(db_card)
|
|
session.commit()
|
|
session.refresh(db_card)
|
|
return db_card
|
|
else:
|
|
return None
|
|
|
|
this_card = await pull_card(player, team)
|
|
if this_card is not None:
|
|
return this_card
|
|
|
|
logging.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}')
|
|
|
|
if dev_mode:
|
|
this_card = Card(player=player, team=team)
|
|
session.add(this_card)
|
|
session.commit()
|
|
session.refresh(this_card)
|
|
return this_card
|
|
|
|
await db_post(
|
|
'cards',
|
|
payload={'cards': [
|
|
{'player_id': player.id, 'team_id': team.id, 'pack_id': 1}
|
|
]}
|
|
)
|
|
|
|
this_card = await pull_card(player, team)
|
|
if this_card is not None:
|
|
return this_card
|
|
|
|
err = f'Could not create {player.name} card for {team.abbrev}'
|
|
logging.error(f'gameplay_models - get_or_create_ai_card - {err}')
|
|
raise LookupError(err)
|
|
|
|
|
|
async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None:
|
|
if not skip_cache:
|
|
this_card = session.get(Card, card_id)
|
|
|
|
if this_card is not None:
|
|
logging.info(f'we found a cached card: {this_card} / created: {this_card.created}')
|
|
tdelta = datetime.datetime.now() - this_card.created
|
|
logging.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_card
|
|
else:
|
|
session.delete(this_card)
|
|
session.commit()
|
|
|
|
def cache_card(json_data: dict) -> Card:
|
|
valid_card = CardBase.model_validate(json_data, from_attributes=True)
|
|
db_card = Card.model_validate(valid_card)
|
|
session.add(db_card)
|
|
session.commit()
|
|
session.refresh(db_card)
|
|
return db_card
|
|
|
|
c_query = await db_get('cards', object_id=card_id)
|
|
if c_query is not None:
|
|
c_query['team_id'] = c_query['team']['id']
|
|
c_query['player_id'] = get_player_id_from_dict(c_query['player'])
|
|
|
|
this_player = await get_player_or_none(session, player_id=c_query['player_id'])
|
|
this_team = await get_team_or_none(session, team_id=c_query['team_id'])
|
|
|
|
if this_player is None:
|
|
raise LookupError(f'Player ID {c_query["player_id"]} not found during card check')
|
|
if this_team is None:
|
|
raise LookupError(f'Team ID {c_query["team_id"]} not found during card check')
|
|
|
|
return cache_card(c_query)
|
|
|
|
return None
|
|
|
|
|
|
def get_game_lineups(session: Session, this_game: Game, specific_team: Team = None, is_active: bool = None) -> list[Lineup]:
|
|
st = select(Lineup).where(Lineup.game == this_game)
|
|
|
|
if specific_team is not None:
|
|
st = st.where(Lineup.team == specific_team)
|
|
if is_active is not None:
|
|
st = st.where(Lineup.active == is_active)
|
|
|
|
return session.exec(st).all()
|
|
|