refactor exceptions and continue gameplay cog rebuild

This commit is contained in:
Cal Corum 2024-10-24 15:32:07 -05:00
parent 6128792d09
commit 79efceb6dd
7 changed files with 286 additions and 118 deletions

View File

@ -8,7 +8,8 @@ from discord.ext import commands, tasks
import pygsheets
from api_calls import db_get
from command_logic.logic_gameplay import get_lineups_from_sheets
from command_logic.logic_gameplay import get_lineups_from_sheets, checks_log_interaction
from exceptions import GameNotFoundException, TeamNotFoundException, PlayNotFoundException, GameException
from helpers import PD_PLAYERS_ROLE_NAME, team_role, user_has_role, random_gif, random_from_list
# from in_game import ai_manager
@ -165,7 +166,7 @@ class Gameplay(commands.Cog):
legal_data = await legal_check([sp_card_id], difficulty_name=league.value)
if not legal_data['legal']:
await interaction.edit_original_response(
content=f'It looks like this is a Ranked Legal game and {player_description(player=human_sp_card.player)} is not legal in {league.name} games. You can start a new game once you pick a new SP.'
content=f'It looks like this is a Ranked Legal game and {human_sp_card.player.with_desc} is not legal in {league.name} games. You can start a new game once you pick a new SP.'
)
return
@ -192,7 +193,7 @@ class Gameplay(commands.Cog):
league.value
)
await interaction.edit_original_response(
content=f'The {ai_team.sname} are starting **{player_description(player=ai_sp_lineup.player)}**:\n\n{ai_sp_lineup.player.p_card_url}'
content=f'The {ai_team.sname} are starting **{ai_sp_lineup.player.with_desc}**:\n\n{ai_sp_lineup.player.p_card_url}'
)
# Get AI Lineup
@ -318,6 +319,29 @@ class Gameplay(commands.Cog):
await interaction.edit_original_response(content=None, embed=this_game.get_scorebug_embed(session))
@app_commands.command(name='gamestate', description='Post the current game state')
async def gamestate_command(self, interaction: discord.Interaction, include_lineups: bool = False):
await interaction.response.defer(ephemeral=True, thinking=True)
with Session(engine) as session:
this_game = get_channel_game_or_none(session, interaction.channel_id)
if this_game is None:
await interaction.edit_original_response(
content=f'Hm. I don\'t see a game going on in this channel. Am I drunk?'
)
return
await interaction.edit_original_response(
content=None,
embed=this_game.get_scorebug_embed(session, full_length=include_lineups)
)
group_log = app_commands.Group(name='log', description='Log a play in this channel\'s game')
@group_log.command(name='flyball', description='Flyballs: a, b, ballpark, bq, c')
async def log_flyball(self, interaction: discord.Interaction, flyball_type: Literal['a', 'b', 'ballpark', 'b?', 'c']):
with Session(engine) as session:
this_game, owner_team, this_play = await checks_log_interaction(session, interaction, command_name='flyball')
async def setup(bot):
await bot.add_cog(Gameplay(bot))

View File

@ -1,10 +1,12 @@
import logging
import discord
from sqlmodel import Session, select
from in_game.game_helpers import legal_check, CardLegalityException
from in_game.gameplay_models import Game, Lineup, Team
from in_game.gameplay_queries import get_card_or_none
from exceptions import *
from in_game.game_helpers import legal_check
from in_game.gameplay_models import Game, Lineup, Team, Play
from in_game.gameplay_queries import get_card_or_none, get_channel_game_or_none, get_team_or_none
async def get_lineups_from_sheets(session: Session, sheets, this_game: Game, this_team: Team, lineup_num: int, roster_num: int) -> list[Lineup]:
@ -84,4 +86,34 @@ async def get_lineups_from_sheets(session: Session, sheets, this_game: Game, thi
return all_lineups
async def checks_log_interaction(session: Session, interaction: discord.Interaction, command_name: str) -> tuple[Game, Team, Play]:
this_game = get_channel_game_or_none(session, interaction.channel_id)
if this_game is None:
raise GameNotFoundException('I don\'t see an active game in this channel.')
owner_team = await get_team_or_none(session, gm_id=interaction.user.id)
if owner_team is None:
logging.exception(f'{command_name} command: No team found for GM ID {interaction.user.id}')
raise TeamNotFoundException(f'Do I know you? I cannot find your team.')
if 'gauntlet' in this_game.game_type:
gauntlet_abbrev = f'Gauntlet-{owner_team.abbrev}'
owner_team = await get_team_or_none(session, team_abbrev=gauntlet_abbrev)
if owner_team is None:
logging.exception(f'{command_name} command: No gauntlet team found with abbrev {gauntlet_abbrev}')
raise TeamNotFoundException(f'Hm, I was not able to find a gauntlet team for you.')
if not owner_team['id'] in [this_game.away_team_id, this_game.home_team_id]:
logging.exception(f'{interaction.user.display_name} tried to run a command in Game {this_game.id} when they aren\'t a GM in the game.')
raise TeamNotFoundException('Bruh. Only GMs of the active teams can log plays.')
this_play = this_game.current_play_or_none(session)
if this_play is None:
logging.error(f'{command_name} command: No play found for Game ID {this_game.id} - attempting to initialize play')
this_play = this_game.initialize_play(session)
return this_game, owner_team, this_play

28
exceptions.py Normal file
View File

@ -0,0 +1,28 @@
import logging
def log_exception(e: Exception, msg: str = ''):
logging.debug(msg, stack_info=True)
raise e
class GameException(Exception):
pass
class LineupsMissingException(GameException):
pass
class CardLegalityException(GameException):
pass
class GameNotFoundException(GameException):
pass
class TeamNotFoundException(GameException):
pass
class PlayNotFoundException(GameException):
pass

View File

@ -14,10 +14,6 @@ from typing import Literal, Optional
PUBLIC_FIELDS_CATEGORY_NAME = 'Public Fields'
class CardLegalityException(Exception):
pass
def single_onestar(this_play: StratPlay, comp_play: bool = True):
patch_play(this_play.id, locked=True)
advance_runners(this_play.id, num_bases=1)

View File

@ -9,6 +9,7 @@ from sqlmodel import Session, SQLModel, create_engine, select, or_, Field, Relat
from sqlalchemy import func
from api_calls import db_get, db_post
from exceptions import *
from in_game.managerai_responses import JumpResponse
@ -34,103 +35,6 @@ class ManagerAiBase(SQLModel):
decide_throw: int | None = Field(default=5)
class ManagerAi(ManagerAiBase, table=True):
def create_ai(session: Session = None):
def get_new_ai(this_session: Session):
all_ai = session.exec(select(ManagerAi.id)).all()
if len(all_ai) == 0:
logging.info(f'Creating ManagerAI records')
new_ai = [
ManagerAi(
name='Balanced'
),
ManagerAi(
name='Yolo',
steal=10,
running=10,
hold=5,
catcher_throw=10,
uncapped_home=10,
uncapped_third=10,
uncapped_trail=10,
bullpen_matchup=3,
behind_aggression=10,
ahead_aggression=10,
decide_throw=10
),
ManagerAi(
name='Safe',
steal=3,
running=3,
hold=8,
catcher_throw=5,
uncapped_home=5,
uncapped_third=3,
uncapped_trail=5,
bullpen_matchup=8,
behind_aggression=5,
ahead_aggression=1,
decide_throw=1
)
]
for x in new_ai:
session.add(x)
session.commit()
if session is None:
with Session(engine) as session:
get_new_ai(session)
else:
get_new_ai(session)
return True
def check_jump(self, to_base: Literal[2, 3, 4], num_outs: Literal[0, 1, 2], run_diff: int) -> JumpResponse | None:
this_resp = JumpResponse()
if to_base == 2:
match self.steal:
case 10:
this_resp.min_safe = 12 + num_outs
case self.steal if self.steal > 8 and run_diff <= 5:
this_resp.min_safe = 13 + num_outs
case self.steal if self.steal > 6 and run_diff <= 5:
this_resp.min_safe = 14 + num_outs
case self.steal if self.steal > 4 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 15 + num_outs
case self.steal if self.steal > 2 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 16 + num_outs
case _:
this_resp = 17 + num_outs
if self.steal > 7 and num_outs < 2 and run_diff <= 5:
this_resp.run_if_auto_jump = True
elif self.steal < 5:
this_resp.must_auto_jump = True
elif to_base == 3:
match self.steal:
case 10:
this_resp.min_safe = 12 + num_outs
case self.steal if self.steal > 6 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 15 + num_outs
case _:
this_resp.min_safe = None
if self.steal == 10 and num_outs < 2 and run_diff <= 5:
this_resp.run_if_auto_jump = True
elif self.steal <= 5:
this_resp.must_auto_jump = True
elif run_diff == -1:
match self.steal:
case self.steal if self.steal == 10:
this_resp.min_safe = 5
case self.steal if self.steal > 5:
this_resp.min_safe = 7
return this_resp
class GameCardsetLink(SQLModel, table=True):
game_id: int | None = Field(default=None, foreign_key='game.id', primary_key=True)
cardset_id: int | None = Field(default=None, foreign_key='cardset.id', primary_key=True)
@ -294,13 +198,175 @@ class Game(SQLModel, table=True):
if len(ai_note) > 0:
gm_name = self.home_team.gmname if self.ai_team == 'home' else self.away_team.gmname
embed.add_field(name=f'{gm_name} will...', value=ai_note, inline=False)
else:
embed.add_field(
name='Game State',
value='No plays found for this game',
inline=False
)
return embed
def initialize_play(self, session: Session):
existing_play = self.current_play_or_none(session)
if existing_play is not None:
return existing_play
leadoff_batter, home_pitcher, home_catcher = None, None, None
home_positions, away_positions = [], []
for line in [x for x in self.lineups if x.active]:
if line.team == self.away_team:
if line.position not in away_positions:
away_positions.append(line.position)
if line.batting_order == 1:
leadoff_batter = line
else:
if line.position not in home_positions:
home_positions.append(line.position)
if line.position == 'P':
home_pitcher = line
elif line.position == 'C':
home_catcher = line
if len(home_positions) != 10:
e_msg = f'Only {len(home_positions)} players found on home team'
log_exception(LineupsMissingException(e_msg), e_msg)
if len(away_positions) != 10:
e_msg = f'Only {len(away_positions)} players found on away team'
log_exception(LineupsMissingException(e_msg), e_msg)
if None in [leadoff_batter, home_pitcher, home_catcher]:
e_msg = f'Could not set the initial pitcher, catcher, and batter'
log_exception(LineupsMissingException(e_msg), e_msg)
new_play = Play(
game=self,
play_num=1,
batter=leadoff_batter,
pitcher=home_pitcher,
batter_pos=leadoff_batter.position,
catcher=home_catcher,
is_tied=True,
is_new_inning=True
)
session.add(new_play)
session.commit()
session.refresh(new_play)
new_play.init_ai()
return new_play
# @property
# def game_prop(self) -> str:
# return f'Game {self.id} / Week {self.week_num} / Type {self.game_type}'
class ManagerAi(ManagerAiBase, table=True):
plays: list['Play'] = Relationship(back_populates='managerai')
def create_ai(session: Session = None):
def get_new_ai(this_session: Session):
all_ai = session.exec(select(ManagerAi.id)).all()
if len(all_ai) == 0:
logging.info(f'Creating ManagerAI records')
new_ai = [
ManagerAi(
name='Balanced'
),
ManagerAi(
name='Yolo',
steal=10,
running=10,
hold=5,
catcher_throw=10,
uncapped_home=10,
uncapped_third=10,
uncapped_trail=10,
bullpen_matchup=3,
behind_aggression=10,
ahead_aggression=10,
decide_throw=10
),
ManagerAi(
name='Safe',
steal=3,
running=3,
hold=8,
catcher_throw=5,
uncapped_home=5,
uncapped_third=3,
uncapped_trail=5,
bullpen_matchup=8,
behind_aggression=5,
ahead_aggression=1,
decide_throw=1
)
]
for x in new_ai:
session.add(x)
session.commit()
if session is None:
with Session(engine) as session:
get_new_ai(session)
else:
get_new_ai(session)
return True
def check_jump(self, session: Session, this_game: Game, to_base: Literal[2, 3, 4]) -> JumpResponse | None:
this_resp = JumpResponse()
this_play = this_game.current_play_or_none(session)
if this_play is None:
raise KeyError(f'No game found while checking for jump')
num_outs = this_play.starting_outs
run_diff = this_play.away_score - this_play.home_score
if this_game.ai_team == 'home':
run_diff = run_diff * -1
if to_base == 2:
match self.steal:
case 10:
this_resp.min_safe = 12 + num_outs
case self.steal if self.steal > 8 and run_diff <= 5:
this_resp.min_safe = 13 + num_outs
case self.steal if self.steal > 6 and run_diff <= 5:
this_resp.min_safe = 14 + num_outs
case self.steal if self.steal > 4 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 15 + num_outs
case self.steal if self.steal > 2 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 16 + num_outs
case _:
this_resp = 17 + num_outs
if self.steal > 7 and num_outs < 2 and run_diff <= 5:
this_resp.run_if_auto_jump = True
elif self.steal < 5:
this_resp.must_auto_jump = True
elif to_base == 3:
match self.steal:
case 10:
this_resp.min_safe = 12 + num_outs
case self.steal if self.steal > 6 and num_outs < 2 and run_diff <= 5:
this_resp.min_safe = 15 + num_outs
case _:
this_resp.min_safe = None
if self.steal == 10 and num_outs < 2 and run_diff <= 5:
this_resp.run_if_auto_jump = True
elif self.steal <= 5:
this_resp.must_auto_jump = True
elif run_diff in [-1, 0]:
if self.steal == 10:
this_resp.min_safe = 5
elif self.steal > 5:
this_resp.min_safe = 7
elif this_play.inning_num > 7 and self.steal >= 5:
this_resp.min_safe = 6
return this_resp
class CardsetBase(SQLModel):
id: int | None = Field(default=None, primary_key=True)
@ -375,6 +441,10 @@ class Player(PlayerBase, table=True):
cards: list['Card'] = Relationship(back_populates='player', cascade_delete=True)
lineups: list['Lineup'] = Relationship(back_populates='player', cascade_delete=True)
@property
def with_desc(self):
return f'{self.description} {self.name}'
def player_description(player: Player = None, player_dict: dict = None) -> str:
if player is None and player_dict is None:
@ -496,6 +566,7 @@ class PlayBase(SQLModel):
is_go_ahead: bool = Field(default=False)
is_tied: bool = Field(default=False)
is_new_inning: bool = Field(default=False)
managerai_id: int | None = Field(default=None, foreign_key='managerai.id')
def ai_run_diff(self):
if self.game.ai_team == 'away':
@ -530,6 +601,17 @@ class Play(PlayBase, table=True):
runner: Lineup = Relationship(
sa_relationship_kwargs=dict(foreign_keys="[Play.runner_id]")
)
managerai: ManagerAi = Relationship(back_populates='plays')
def init_ai(self, session: Session):
id = ((datetime.datetime.now().day * self.batter.team.id) % 3) + 1
if id > 3 or id < 1:
self.managerai_id = 1
else:
self.managerai = id
session.add(self)
session.commit()
@property
def scorebug_ascii(self):
@ -569,7 +651,7 @@ class Play(PlayBase, table=True):
ai_note += f'- hold the runner on second\n'
elif self.on_base_code in [1, 5]:
ai_note += f'- hold the runner on 1st if they have ***** auto-jump\n'
elif self.on_base_code == 2:
elif self.on_base_code in [2, 4]:
ai_note += f'- hold the runner on 2nd if safe range is 14+\n'
# Defensive Alignment
@ -583,7 +665,6 @@ class Play(PlayBase, table=True):
return ai_note
@property
def batting_ai_note(self) -> str:
ai_note = '' # TODO: migrate Manager AI to their own local model

View File

@ -1,6 +1,6 @@
from sqlmodel import Session, select
from in_game.gameplay_models import ManagerAi
from in_game.gameplay_models import Game, Lineup, ManagerAi, Play
from factory import session_fixture
from in_game.managerai_responses import JumpResponse
@ -20,11 +20,18 @@ def test_check_jump(session: Session):
balanced_ai = session.exec(select(ManagerAi).where(ManagerAi.name == 'Balanced')).one()
aggressive_ai = session.exec(select(ManagerAi).where(ManagerAi.name == 'Yolo')).one()
bal_second_22 = balanced_ai.check_jump(to_base=2, num_outs=0)
agg_second_20 = aggressive_ai.check_jump(to_base=2, num_outs=0)
agg_second_22 = aggressive_ai.check_jump(to_base=2, num_outs=2)
this_game = session.get(Game, 1)
runner = session.get(Lineup, 5)
this_play = session.get(Play, 2)
assert bal_second_22 == JumpResponse(min_safe=15)
assert balanced_ai.check_jump(to_base=4, num_outs=2) is None
assert agg_second_20 == JumpResponse(min_safe=12, run_if_auto_jump=True)
assert agg_second_22.run_if_auto_jump == False
this_play.on_first = runner
assert this_play.starting_outs == 1
assert balanced_ai.check_jump(session, this_game, to_base=2) == JumpResponse(min_safe=16)
assert aggressive_ai.check_jump(session, this_game, to_base=2) == JumpResponse(min_safe=13, run_if_auto_jump=True)
this_play.on_third = runner
assert balanced_ai.check_jump(session, this_game, to_base=4) == JumpResponse(min_safe=None)
assert aggressive_ai.check_jump(session, this_game, to_base=4) == JumpResponse(min_safe=5)

View File

@ -46,7 +46,7 @@ def test_player_description(session: Session):
player_dict = {'player_id': player_2.id, 'p_name': player_2.name, 'description': player_2.description}
assert player_description(player=player_1) == f'2024 Player 0'
assert player_1.with_desc == f'2024 Player 0'
assert player_description(player_dict=player_dict) == f'Live Player 1'