diff --git a/gauntlets.py b/gauntlets.py index 9b8beb6..2d89e5c 100644 --- a/gauntlets.py +++ b/gauntlets.py @@ -2472,7 +2472,13 @@ def build_gauntlet_recap_embed( if loss_max is not None else wins >= win_num ) - marker = "✅" if earned else "❌" if losses > (loss_max or 99) else "⬜" + marker = ( + "✅" + if earned + else "❌" + if loss_max is not None and losses > loss_max + else "⬜" + ) reward = r.get("reward", {}) if reward.get("money"): diff --git a/in_game/gameplay_queries.py b/in_game/gameplay_queries.py index 0836f26..789aebf 100644 --- a/in_game/gameplay_queries.py +++ b/in_game/gameplay_queries.py @@ -5,51 +5,13 @@ from typing import Literal import pydantic from sqlalchemy import func -from sqlalchemy.exc import NoResultFound from api_calls import db_get, db_post from sqlmodel import col, update -from in_game.gameplay_models import ( - CACHE_LIMIT, - BatterScouting, - BatterScoutingBase, - BattingCard, - BattingCardBase, - BattingRatings, - BattingRatingsBase, - Card, - CardBase, - Cardset, - CardsetBase, - GameCardsetLink, - Lineup, - PitcherScouting, - PitchingCard, - PitchingCardBase, - PitchingRatings, - PitchingRatingsBase, - Player, - PlayerBase, - PositionRating, - PositionRatingBase, - RosterLink, - Session, - Team, - TeamBase, - select, - or_, - Game, - Play, -) -from exceptions import ( - DatabaseError, - PositionNotFoundException, - log_errors, - log_exception, - PlayNotFoundException, -) +from in_game.gameplay_models import CACHE_LIMIT, BatterScouting, BatterScoutingBase, BattingCard, BattingCardBase, BattingRatings, BattingRatingsBase, Card, CardBase, Cardset, CardsetBase, GameCardsetLink, Lineup, PitcherScouting, PitchingCard, PitchingCardBase, PitchingRatings, PitchingRatingsBase, Player, PlayerBase, PositionRating, PositionRatingBase, RosterLink, Session, Team, TeamBase, select, or_, Game, Play +from exceptions import DatabaseError, PositionNotFoundException, log_errors, log_exception, PlayNotFoundException -logger = logging.getLogger("discord_app") +logger = logging.getLogger('discord_app') class DecisionModel(pydantic.BaseModel): @@ -72,28 +34,20 @@ class DecisionModel(pydantic.BaseModel): def get_batting_team(session: Session, this_play: Play) -> Team: - return ( - this_play.game.away_team - if this_play.inning_half == "top" - else this_play.game.home_team - ) + return this_play.game.away_team if this_play.inning_half == 'top' else this_play.game.home_team def get_games_by_channel(session: Session, channel_id: int) -> list[Game]: - logger.info(f"Getting games in channel {channel_id}") - return session.exec( - select(Game).where(Game.channel_id == channel_id, Game.active) - ).all() + logger.info(f'Getting games in channel {channel_id}') + return session.exec(select(Game).where(Game.channel_id == channel_id, Game.active)).all() def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None: - logger.info(f"Getting one game from channel {channel_id}") + logger.info(f'Getting one game from channel {channel_id}') all_games = get_games_by_channel(session, channel_id) if len(all_games) > 1: - err = "Too many games found in get_channel_game_or_none" - logger.error( - f"cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}" - ) + err = 'Too many games found in get_channel_game_or_none' + logger.error(f'cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}') raise Exception(err) elif len(all_games) == 0: return None @@ -101,298 +55,224 @@ def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None: def get_active_games_by_team(session: Session, team: Team) -> list[Game]: - logger.info(f"Getting game for team {team.lname}") - return session.exec( - select(Game).where( - Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id) - ) - ).all() + logger.info(f'Getting game for team {team.lname}') + return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id))).all() async def get_team_or_none( - session: Session, - team_id: int | None = None, - gm_id: int | None = None, - team_abbrev: str | None = None, - skip_cache: bool = False, - main_team: bool = None, - gauntlet_team: bool = None, - include_packs: bool = False, -) -> Team | None: - logger.info( - f"Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache} / main_team: {main_team} / gauntlet_team: {gauntlet_team}" - ) - + session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False, main_team: bool = None, gauntlet_team: bool = None, include_packs: bool = False) -> Team | None: + logger.info(f'Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache} / main_team: {main_team} / gauntlet_team: {gauntlet_team}') + this_team = None if gm_id is not None: if main_team is None and gauntlet_team is None: main_team = True gauntlet_team = False elif main_team == gauntlet_team: - log_exception(KeyError, "Must select either main_team or gauntlet_team") - - logger.info(f"main_team: {main_team} / gauntlet_team: {gauntlet_team}") + log_exception(KeyError, 'Must select either main_team or gauntlet_team') + + logger.info(f'main_team: {main_team} / gauntlet_team: {gauntlet_team}') if team_id is None and gm_id is None and team_abbrev is None: - log_exception( - KeyError, - 'One of "team_id", "gm_id", or "team_abbrev" must be included in search', - ) - + log_exception(KeyError, 'One of "team_id", "gm_id", or "team_abbrev" must be included in search') + if not skip_cache: if team_id is not None: - logger.info(f"Getting team by team_id: {team_id}") + logger.info(f'Getting team by team_id: {team_id}') this_team = session.get(Team, team_id) else: if gm_id is not None: - logger.info(f"Getting team by gm_id: {gm_id}") + logger.info(f'Getting team by gm_id: {gm_id}') for team in session.exec(select(Team).where(Team.gmid == gm_id)).all(): - if ("gauntlet" in team.abbrev.lower() and gauntlet_team) or ( - "gauntlet" not in team.abbrev.lower() and main_team - ): - logger.info(f"Found the team: {team}") + if ('gauntlet' in team.abbrev.lower() and gauntlet_team) or ('gauntlet' not in team.abbrev.lower() and main_team): + logger.info(f'Found the team: {team}') this_team = team break - logger.info(f"post loop, this_team: {this_team}") + logger.info(f'post loop, this_team: {this_team}') else: - logger.info(f"Getting team by abbrev: {team_abbrev}") - this_team = session.exec( - select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower()) - ).one_or_none() - + logger.info(f'Getting team by abbrev: {team_abbrev}') + this_team = session.exec(select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())).one_or_none() + if this_team is not None: - logger.info(f"we found a team: {this_team} / created: {this_team.created}") + logger.info(f'we found a team: {this_team} / created: {this_team.created}') tdelta = datetime.datetime.now() - this_team.created - logger.info(f"tdelta: {tdelta}") + logger.info(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_team # else: # session.delete(this_team) # session.commit() - + def cache_team(json_data: dict) -> Team: - logger.info( - f"gameplay_queries - cache_team - writing a team to cache: {json_data}" - ) + logger.info(f'gameplay_queries - cache_team - writing a team to cache: {json_data}') valid_team = TeamBase.model_validate(json_data, from_attributes=True) - logger.info(f"gameplay_queries - cache_team - valid_team: {valid_team}") + logger.info(f'gameplay_queries - cache_team - valid_team: {valid_team}') db_team = Team.model_validate(valid_team) - logger.info(f"gameplay_queries - cache_team - db_team: {db_team}") - logger.info(f"Checking for existing team ID: {db_team.id}") + logger.info(f'gameplay_queries - cache_team - db_team: {db_team}') + logger.info(f'Checking for existing team ID: {db_team.id}') try: this_team = session.exec(select(Team).where(Team.id == db_team.id)).one() - logger.info(f"Found team: {this_team}\nUpdating with db team: {db_team}") - + logger.info(f'Found team: {this_team}\nUpdating with db team: {db_team}') + for key, value in db_team.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_team, key, value) - logger.info(f"Set this_team to db_team") + logger.info(f'Set this_team to db_team') session.add(this_team) session.commit() - logger.info(f"Refreshing this_team") + logger.info(f'Refreshing this_team') session.refresh(this_team) return this_team - except NoResultFound: - logger.info(f"Team not found, adding to db") + except Exception: + logger.info(f'Team not found, adding to db') session.add(db_team) session.commit() session.refresh(db_team) return db_team - + if team_id is not None: - t_query = await db_get( - "teams", object_id=team_id, params=[("inc_packs", include_packs)] - ) + t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', include_packs)]) if t_query is not None: return cache_team(t_query) - + elif gm_id is not None: - t_query = await db_get( - "teams", params=[("gm_id", gm_id), ("inc_packs", include_packs)] - ) - if t_query["count"] != 0: - for team in t_query["teams"]: - logger.info( - f"in t_query loop / team: {team} / gauntlet_team: {gauntlet_team} / main_team: {main_team}" - ) - if (gauntlet_team and "gauntlet" in team["abbrev"].lower()) or ( - main_team and "gauntlet" not in team["abbrev"].lower() - ): + t_query = await db_get('teams', params=[('gm_id', gm_id), ('inc_packs', include_packs)]) + if t_query['count'] != 0: + for team in t_query['teams']: + logger.info(f'in t_query loop / team: {team} / gauntlet_team: {gauntlet_team} / main_team: {main_team}') + if (gauntlet_team and 'gauntlet' in team['abbrev'].lower()) or (main_team and 'gauntlet' not in team['abbrev'].lower()): return cache_team(team) - + elif team_abbrev is not None: - t_query = await db_get( - "teams", params=[("abbrev", team_abbrev), ("inc_packs", include_packs)] - ) - if t_query["count"] != 0: - if "gauntlet" in team_abbrev.lower(): - return cache_team(t_query["teams"][0]) - - for team in [ - x for x in t_query["teams"] if "gauntlet" not in x["abbrev"].lower() - ]: + t_query = await db_get('teams', params=[('abbrev', team_abbrev), ('inc_packs', include_packs)]) + if t_query['count'] != 0: + if 'gauntlet' in team_abbrev.lower(): + return cache_team(t_query['teams'][0]) + + for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: return cache_team(team) - - logger.warning(f"No team found") + + logger.warning(f'No team found') return None -async def get_cardset_or_none( - session: Session, cardset_id: int = None, cardset_name: str = None -): - logger.info( - f"Getting cardset or none / cardset_id: {cardset_id} / cardset_name: {cardset_name}" - ) +async def get_cardset_or_none(session: Session, cardset_id: int = None, cardset_name: str = None): + logger.info(f'Getting cardset or none / cardset_id: {cardset_id} / cardset_name: {cardset_name}') if cardset_id is None and cardset_name is None: - log_exception( - KeyError, 'One of "cardset_id" or "cardset_name" must be included in search' - ) - + log_exception(KeyError, 'One of "cardset_id" or "cardset_name" must be included in search') + if cardset_id is not None: - logger.info(f"Getting cardset by id: {cardset_id}") + logger.info(f'Getting cardset by id: {cardset_id}') this_cardset = session.get(Cardset, cardset_id) else: - logger.info(f"Getting cardset by name: {cardset_name}") - this_cardset = session.exec( - select(Cardset).where(func.lower(Cardset.name) == cardset_name.lower()) - ).one_or_none() - + logger.info(f'Getting cardset by name: {cardset_name}') + this_cardset = session.exec(select(Cardset).where(func.lower(Cardset.name) == cardset_name.lower())).one_or_none() + if this_cardset is not None: - logger.info(f"we found a cardset: {this_cardset}") + logger.info(f'we found a cardset: {this_cardset}') return this_cardset - + def cache_cardset(json_data: dict) -> Cardset: - logger.info( - f"gameplay_queries - cache_team - writing a team to cache: {json_data}" - ) + logger.info(f'gameplay_queries - cache_team - writing a team to cache: {json_data}') valid_cardset = CardsetBase.model_validate(json_data, from_attributes=True) - logger.info(f"gameplay_queries - cache_team - valid_cardset: {valid_cardset}") + logger.info(f'gameplay_queries - cache_team - valid_cardset: {valid_cardset}') db_cardset = Cardset.model_validate(valid_cardset) - logger.info(f"gameplay_queries - cache_team - db_cardset: {db_cardset}") + logger.info(f'gameplay_queries - cache_team - db_cardset: {db_cardset}') session.add(db_cardset) session.commit() session.refresh(db_cardset) return db_cardset - + if cardset_id is not None: - c_query = await db_get("cardsets", object_id=cardset_id) + c_query = await db_get('cardsets', object_id=cardset_id) if c_query is not None: return cache_cardset(c_query) - + elif cardset_name is not None: - c_query = await db_get("cardsets", params=[("name", cardset_name)]) - if c_query["count"] != 0: - return cache_cardset(c_query["cardsets"][0]) - - logger.warning(f"No cardset found") + c_query = await db_get('cardsets', params=[('name', cardset_name)]) + if c_query['count'] != 0: + return cache_cardset(c_query['cardsets'][0]) + + logger.warning(f'No cardset found') return None -async def get_player_or_none( - session: Session, player_id: int, skip_cache: bool = False -) -> Player | None: - logger.info( - f"gameplay_models - get_player_or_none - player_id: {player_id} / skip_cache: {skip_cache}" - ) +async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None: + logger.info(f'gameplay_models - get_player_or_none - player_id: {player_id} / skip_cache: {skip_cache}') if not skip_cache: this_player = session.get(Player, player_id) if this_player is not None: - logger.info( - f"we found a cached player: {this_player}\ncreated: {this_player.created}" - ) + logger.info(f'we found a cached player: {this_player}\ncreated: {this_player.created}') tdelta = datetime.datetime.now() - this_player.created - logger.info(f"tdelta: {tdelta}") + logger.info(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: - logger.info(f"returning this player") + logger.info(f'returning this player') return this_player # else: # logger.warning('Deleting old player record') # session.delete(this_player) # session.commit() - + def cache_player(json_data: dict) -> Player: - logger.info( - f"gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}" - ) + logger.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}') valid_player = PlayerBase.model_validate(json_data, from_attributes=True) db_player = Player.model_validate(valid_player) try: this_player = session.get(Player, player_id) - logger.info( - f"Found player: {this_player}\nUpdating with db_player: {db_player}" - ) - + logger.info(f'Found player: {this_player}\nUpdating with db_player: {db_player}') + for key, value in db_player.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_player, key, value) - logger.info(f"Set this_player to db_player") + logger.info(f'Set this_player to db_player') session.add(this_player) session.commit() - logger.info(f"Refreshing this_player") + logger.info(f'Refreshing this_player') session.refresh(this_player) return this_player - except NoResultFound: + except Exception: session.add(db_player) session.commit() session.refresh(db_player) return db_player - - p_query = await db_get("players", object_id=player_id, params=[("inc_dex", False)]) + + p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)]) if p_query is not None: - if "id" not in p_query: - p_query["id"] = p_query["player_id"] - if "name" not in p_query: - p_query["name"] = p_query["p_name"] + if 'id' not in p_query: + p_query['id'] = p_query['player_id'] + if 'name' not in p_query: + p_query['name'] = p_query['p_name'] return cache_player(p_query) - + return None -async def get_batter_scouting_or_none( - session: Session, card: Card, skip_cache: bool = False -) -> BatterScouting | None: - logger.info( - f"Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc} / skip_cache: {skip_cache}" - ) +async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> BatterScouting | None: + logger.info(f'Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc} / skip_cache: {skip_cache}') - s_query = await db_get( - f"battingcardratings/player/{card.player.id}?variant={card.variant}", - none_okay=False, - ) - if s_query["count"] != 2: - log_exception( - DatabaseError, f"Scouting for {card.player.name_with_desc} was not found." - ) + s_query = await db_get(f'battingcardratings/player/{card.player.id}?variant={card.variant}', none_okay=False) + if s_query['count'] != 2: + log_exception(DatabaseError, f'Scouting for {card.player.name_with_desc} was not found.') - bs_query = session.exec( - select(BatterScouting).where( - BatterScouting.battingcard_id == s_query["ratings"][0]["battingcard"]["id"] - ) - ).all() - logger.info(f"bs_query: {bs_query}") + bs_query = session.exec(select(BatterScouting).where(BatterScouting.battingcard_id == s_query['ratings'][0]['battingcard']['id'])).all() + logger.info(f'bs_query: {bs_query}') if len(bs_query) > 0: this_scouting = bs_query[0] - logger.info(f"this_scouting: {this_scouting}") - logger.info( - f"we found a cached scouting: {this_scouting} / created {this_scouting.created}" - ) + logger.info(f'this_scouting: {this_scouting}') + logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') tdelta = datetime.datetime.now() - this_scouting.created - logger.debug(f"tdelta: {tdelta}") - if tdelta.total_seconds() < CACHE_LIMIT and None not in [ - this_scouting.battingcard, - this_scouting.ratings_vl, - this_scouting.ratings_vr, - ]: - logger.info(f"returning cached scouting") + logger.debug(f'tdelta: {tdelta}') + if tdelta.total_seconds() < CACHE_LIMIT and None not in [this_scouting.battingcard, this_scouting.ratings_vl, this_scouting.ratings_vr]: + logger.info(f'returning cached scouting') return this_scouting else: - logger.info(f"Refreshing cache") + logger.info(f'Refreshing cache') # logger.info(f'deleting cached scouting') # session.delete(this_scouting.battingcard) # session.delete(this_scouting.ratings_vl) @@ -400,10 +280,8 @@ async def get_batter_scouting_or_none( # session.delete(this_scouting) # session.commit() - def cache_scouting( - batting_card: dict, ratings_vr: dict, ratings_vl: dict - ) -> BatterScouting: - logger.info(f"Beginning batter scouting cache process") + def cache_scouting(batting_card: dict, ratings_vr: dict, ratings_vl: dict) -> BatterScouting: + logger.info(f'Beginning batter scouting cache process') valid_bc = BattingCardBase.model_validate(batting_card, from_attributes=True) db_bc = BattingCard.model_validate(valid_bc) @@ -412,81 +290,71 @@ async def get_batter_scouting_or_none( valid_vr = BattingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = BattingRatings.model_validate(valid_vr) - logger.info(f"db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}") + logger.info(f'db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}') - logger.info(f"Checking for existing battingcard ID: {db_bc.id}") + logger.info(f'Checking for existing battingcard ID: {db_bc.id}') try: - this_card = session.exec( - select(BattingCard).where(BattingCard.id == db_bc.id) - ).one() - logger.info(f"Found card: {this_card}\nUpdating with db card: {db_bc}") - + this_card = session.exec(select(BattingCard).where(BattingCard.id == db_bc.id)).one() + logger.info(f'Found card: {this_card}\nUpdating with db card: {db_bc}') + for key, value in db_bc.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_card, key, value) - logger.info(f"Set this_card to db_bc") + logger.info(f'Set this_card to db_bc') session.add(this_card) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_card = db_bc session.add(this_card) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f"Checking for existing vl ratings ID: {db_vl.id}") + logger.info(f'Checking for existing vl ratings ID: {db_vl.id}') try: - this_vl_rating = session.exec( - select(BattingRatings).where(BattingRatings.id == db_vl.id) - ).one() - logger.info( - f"Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}" - ) - + this_vl_rating = session.exec(select(BattingRatings).where(BattingRatings.id == db_vl.id)).one() + logger.info(f'Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}') + for key, value in db_vl.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_vl_rating, key, value) - logger.info(f"Set this_vr_rating to db_vl") + logger.info(f'Set this_vr_rating to db_vl') session.add(this_vl_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_vl_rating = db_vl session.add(this_vl_rating) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f"Checking for existing vr ratings ID: {db_vr.id}") + logger.info(f'Checking for existing vr ratings ID: {db_vr.id}') try: - this_vr_rating = session.exec( - select(BattingRatings).where(BattingRatings.id == db_vr.id) - ).one() - logger.info( - f"Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}" - ) - + this_vr_rating = session.exec(select(BattingRatings).where(BattingRatings.id == db_vr.id)).one() + logger.info(f'Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}') + for key, value in db_vr.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_vr_rating, key, value) - logger.info(f"Set this_vr_rating to db_vl") + logger.info(f'Set this_vr_rating to db_vl') session.add(this_vr_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_vr_rating = db_vr session.add(this_vr_rating) # session.commit() @@ -494,7 +362,9 @@ async def get_batter_scouting_or_none( # return db_card db_scouting = BatterScouting( - battingcard=this_card, ratings_vl=this_vl_rating, ratings_vr=this_vr_rating + battingcard=this_card, + ratings_vl=this_vl_rating, + ratings_vr=this_vr_rating ) # db_scouting = BatterScouting( @@ -502,81 +372,53 @@ async def get_batter_scouting_or_none( # ratings_vl=db_vl, # ratings_vr=db_vr # ) - + session.add(db_scouting) - logger.info(f"caching scouting") + logger.info(f'caching scouting') session.commit() session.refresh(db_scouting) - logger.info( - f"scouting id: {db_scouting.id} / battingcard: {db_scouting.battingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}" - ) + logger.info(f'scouting id: {db_scouting.id} / battingcard: {db_scouting.battingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}') return db_scouting - + return cache_scouting( - batting_card=s_query["ratings"][0]["battingcard"], - ratings_vr=s_query["ratings"][0] - if s_query["ratings"][0]["vs_hand"] == "R" - else s_query["ratings"][1], - ratings_vl=s_query["ratings"][0] - if s_query["ratings"][0]["vs_hand"] == "L" - else s_query["ratings"][1], + batting_card=s_query['ratings'][0]['battingcard'], + ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], + ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] ) -async def get_pitcher_scouting_or_none( - session: Session, card: Card, skip_cache: bool = False -) -> PitcherScouting | None: - logger.info( - f"Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}" - ) +async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> PitcherScouting | None: + logger.info(f'Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}') + + s_query = await db_get(f'pitchingcardratings/player/{card.player.id}?variant={card.variant}', none_okay=False) + if s_query['count'] != 2: + log_exception(DatabaseError, f'Scouting for {card.player.name_with_desc} was not found.') - s_query = await db_get( - f"pitchingcardratings/player/{card.player.id}?variant={card.variant}", - none_okay=False, - ) - if s_query["count"] != 2: - log_exception( - DatabaseError, f"Scouting for {card.player.name_with_desc} was not found." - ) - - bs_query = session.exec( - select(PitcherScouting).where( - PitcherScouting.pitchingcard_id - == s_query["ratings"][0]["pitchingcard"]["id"] - ) - ).all() - logger.info(f"bs_query: {bs_query}") + bs_query = session.exec(select(PitcherScouting).where(PitcherScouting.pitchingcard_id == s_query['ratings'][0]['pitchingcard']['id'])).all() + logger.info(f'bs_query: {bs_query}') # this_scouting = session.get(PitcherScouting, s_query['ratings'][0]['pitchingcard']['id']) if len(bs_query) > 0: this_scouting = bs_query[0] - logger.info( - f"we found a cached scouting: {this_scouting} / created {this_scouting.created}" - ) + logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') tdelta = datetime.datetime.now() - this_scouting.created - logger.debug(f"tdelta: {tdelta}") + logger.debug(f'tdelta: {tdelta}') - if tdelta.total_seconds() < CACHE_LIMIT and None not in [ - this_scouting.pitchingcard, - this_scouting.ratings_vl, - this_scouting.ratings_vr, - ]: - logger.info(f"returning cached scouting") + if tdelta.total_seconds() < CACHE_LIMIT and None not in [this_scouting.pitchingcard, this_scouting.ratings_vl, this_scouting.ratings_vr]: + logger.info(f'returning cached scouting') return this_scouting - + else: - logger.info(f"Refreshing cache") + logger.info(f'Refreshing cache') # logger.info(f'deleting cached scouting') # session.delete(this_scouting.pitchingcard) # session.delete(this_scouting.ratings_vl) # session.delete(this_scouting.ratings_vr) # session.delete(this_scouting) # session.commit() - - def cache_scouting( - pitching_card: dict, ratings_vr: dict, ratings_vl: dict - ) -> PitcherScouting: - logger.info(f"Beginning pitcher scouting cache process") + + def cache_scouting(pitching_card: dict, ratings_vr: dict, ratings_vl: dict) -> PitcherScouting: + logger.info(f'Beginning pitcher scouting cache process') valid_bc = PitchingCardBase.model_validate(pitching_card, from_attributes=True) db_bc = PitchingCard.model_validate(valid_bc) @@ -585,81 +427,71 @@ async def get_pitcher_scouting_or_none( valid_vr = PitchingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = PitchingRatings.model_validate(valid_vr) - logger.info(f"db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}") + logger.info(f'db_bc: {db_bc}\n\ndb_vl: {db_vl}\n\ndb_vr: {db_vr}') - logger.info(f"Checking for existing battingcard ID: {db_bc.id}") + logger.info(f'Checking for existing battingcard ID: {db_bc.id}') try: - this_card = session.exec( - select(PitchingCard).where(PitchingCard.id == db_bc.id) - ).one() - logger.info(f"Found card: {this_card}\nUpdating with db card: {db_bc}") - + this_card = session.exec(select(PitchingCard).where(PitchingCard.id == db_bc.id)).one() + logger.info(f'Found card: {this_card}\nUpdating with db card: {db_bc}') + for key, value in db_bc.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_card, key, value) - logger.info(f"Set this_card to db_bc") + logger.info(f'Set this_card to db_bc') session.add(this_card) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_card = db_bc session.add(this_card) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f"Checking for existing vl ratings ID: {db_vl.id}") + logger.info(f'Checking for existing vl ratings ID: {db_vl.id}') try: - this_vl_rating = session.exec( - select(PitchingRatings).where(PitchingRatings.id == db_vl.id) - ).one() - logger.info( - f"Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}" - ) - + this_vl_rating = session.exec(select(PitchingRatings).where(PitchingRatings.id == db_vl.id)).one() + logger.info(f'Found ratings: {this_vl_rating}\nUpdating with db ratings: {db_vl}') + for key, value in db_vl.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_vl_rating, key, value) - logger.info(f"Set this_vr_rating to db_vl") + logger.info(f'Set this_vr_rating to db_vl') session.add(this_vl_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_vl_rating = db_vl session.add(this_vl_rating) # session.commit() # session.refresh(db_card) # return db_card - logger.info(f"Checking for existing vr ratings ID: {db_vr.id}") + logger.info(f'Checking for existing vr ratings ID: {db_vr.id}') try: - this_vr_rating = session.exec( - select(PitchingRatings).where(PitchingRatings.id == db_vr.id) - ).one() - logger.info( - f"Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}" - ) - + this_vr_rating = session.exec(select(PitchingRatings).where(PitchingRatings.id == db_vr.id)).one() + logger.info(f'Found ratings: {this_vr_rating}\nUpdating with db ratings: {db_vr}') + for key, value in db_vr.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_vr_rating, key, value) - logger.info(f"Set this_vr_rating to db_vl") + logger.info(f'Set this_vr_rating to db_vl') session.add(this_vr_rating) # session.commit() # logger.info(f'Refreshing this_card') # session.refresh(this_card) # return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') this_vr_rating = db_vr session.add(this_vr_rating) # session.commit() @@ -667,111 +499,87 @@ async def get_pitcher_scouting_or_none( # return db_card db_scouting = PitcherScouting( - pitchingcard=this_card, ratings_vl=this_vl_rating, ratings_vr=this_vr_rating + pitchingcard=this_card, + ratings_vl=this_vl_rating, + ratings_vr=this_vr_rating ) - + session.add(db_scouting) - logger.info(f"caching scouting") + logger.info(f'caching scouting') session.commit() session.refresh(db_scouting) - logger.info( - f"scouting id: {db_scouting.id} / pitching: {db_scouting.pitchingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}" - ) + logger.info(f'scouting id: {db_scouting.id} / pitching: {db_scouting.pitchingcard.id} / vL: {db_scouting.ratings_vl.id} / vR: {db_scouting.ratings_vr.id}') return db_scouting scouting = cache_scouting( - pitching_card=s_query["ratings"][0]["pitchingcard"], - ratings_vr=s_query["ratings"][0] - if s_query["ratings"][0]["vs_hand"] == "R" - else s_query["ratings"][1], - ratings_vl=s_query["ratings"][0] - if s_query["ratings"][0]["vs_hand"] == "L" - else s_query["ratings"][1], + pitching_card=s_query['ratings'][0]['pitchingcard'], + ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], + ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] ) - pos_rating = await get_position(session, card, "P") + pos_rating = await get_position(session, card, 'P') return scouting def get_player_id_from_dict(json_data: dict) -> int: - logger.info(f"Getting player from dict {json_data}") - if "player_id" in json_data: - return json_data["player_id"] - elif "id" in json_data: - return json_data["id"] - log_exception(KeyError, "Player ID could not be extracted from json data") + logger.info(f'Getting player from dict {json_data}') + if 'player_id' in json_data: + return json_data['player_id'] + elif 'id' in json_data: + return json_data['id'] + log_exception(KeyError, 'Player ID could not be extracted from json data') def get_player_name_from_dict(json_data: dict) -> str: - logger.info(f"Getting player from dict {json_data}") - if "name" in json_data: - return json_data["name"] - elif "p_name" in json_data: - return json_data["p_name"] - log_exception(KeyError, "Player name could not be extracted from json data") + logger.info(f'Getting player from dict {json_data}') + if 'name' in json_data: + return json_data['name'] + elif 'p_name' in json_data: + return json_data['p_name'] + log_exception(KeyError, 'Player name could not be extracted from json data') -async def shared_get_scouting( - session: Session, this_card: Card, which: Literal["batter", "pitcher"] -): - if which == "batter": - logger.info(f"Pulling batter scouting for {this_card.player.name_with_desc}") +async def shared_get_scouting(session: Session, this_card: Card, which: Literal['batter', 'pitcher']): + if which == 'batter': + logger.info(f'Pulling batter scouting for {this_card.player.name_with_desc}') this_scouting = await get_batter_scouting_or_none(session, this_card) else: - logger.info(f"Pulling pitcher scouting for {this_card.player.name_with_desc}") + logger.info(f'Pulling pitcher scouting for {this_card.player.name_with_desc}') this_scouting = await get_pitcher_scouting_or_none(session, this_card) - logger.info(f"this_scouting: {this_scouting}") + logger.info(f'this_scouting: {this_scouting}') return this_scouting -async def get_position( - session: Session, - this_card: Card, - position: Literal["P", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"], - skip_cache: bool = False, -) -> PositionRating: - logger.info( - f"Pulling position rating for {this_card.player.name_with_desc} at {position} / skip_cache: {skip_cache}" - ) +async def get_position(session: Session, this_card: Card, position: Literal['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF'], skip_cache: bool = False) -> PositionRating: + logger.info(f'Pulling position rating for {this_card.player.name_with_desc} at {position} / skip_cache: {skip_cache}') if not skip_cache: - this_pos = session.exec( - select(PositionRating).where( - PositionRating.player_id == this_card.player.id, - PositionRating.position == position, - PositionRating.variant == this_card.variant, - ) - ).all() - logger.info(f"Ratings found: {len(this_pos)}") + this_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.position == position, PositionRating.variant == this_card.variant)).all() + logger.info(f'Ratings found: {len(this_pos)}') if len(this_pos) > 0: - logger.info( - f"we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}" - ) + logger.info(f'we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}') tdelta = datetime.datetime.now() - this_pos[0].created - logger.debug(f"tdelta: {tdelta}") + logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_pos[0] else: session.delete(this_pos[0]) session.commit() - + def cache_pos(json_data: dict) -> PositionRating: - if "id" in json_data: - del json_data["id"] + if 'id' in json_data: + del json_data['id'] valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True) db_pos = PositionRating.model_validate(valid_pos) session.add(db_pos) session.commit() session.refresh(db_pos) return db_pos - - p_query = await db_get( - "cardpositions", - params=[("player_id", this_card.player.id), ("position", position)], - ) - if p_query["count"] > 0: - json_data = p_query["positions"][0] - json_data["player_id"] = get_player_id_from_dict(json_data["player"]) + + p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id), ('position', position)]) + if p_query['count'] > 0: + json_data = p_query['positions'][0] + json_data['player_id'] = get_player_id_from_dict(json_data['player']) this_pos = cache_pos(json_data) session.add(this_pos) @@ -779,119 +587,86 @@ async def get_position( session.refresh(this_pos) return this_pos - - log_exception( - PositionNotFoundException, - f"{position} ratings not found for {this_card.player.name_with_desc}", - ) + + log_exception(PositionNotFoundException, f'{position} ratings not found for {this_card.player.name_with_desc}') -async def get_all_positions( - session: Session, this_card: Card, skip_cache: bool = False -) -> int: - logger.info( - f"Pulling all position ratings for {this_card.player.name_with_desc} / skip_cache: {skip_cache}" - ) +async def get_all_positions(session: Session, this_card: Card, skip_cache: bool = False) -> int: + logger.info(f'Pulling all position ratings for {this_card.player.name_with_desc} / skip_cache: {skip_cache}') if not skip_cache: - all_pos = session.exec( - select(PositionRating).where( - PositionRating.player_id == this_card.player.id, - PositionRating.variant == this_card.variant, - ) - ).all() - logger.info(f"Ratings found: {len(all_pos)}") + all_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.variant == this_card.variant)).all() + logger.info(f'Ratings found: {len(all_pos)}') should_repull = False for position in all_pos: - logger.info( - f"we found a cached position rating: {position} / created: {position.created}" - ) + logger.info(f'we found a cached position rating: {position} / created: {position.created}') tdelta = datetime.datetime.now() - position.created - logger.debug(f"tdelta: {tdelta}") + logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() >= CACHE_LIMIT or datetime.datetime.now().day < 5: session.delete(position) session.commit() should_repull = True - + if not should_repull and len(all_pos) > 0: - logger.info(f"Returning {len(all_pos)}") + logger.info(f'Returning {len(all_pos)}') return len(all_pos) - p_query = await db_get("cardpositions", params=[("player_id", this_card.player.id)]) + p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id)]) - if not p_query or p_query["count"] == 0: - logger.info(f"No positions received, returning 0") + if not p_query or p_query['count'] == 0: + logger.info(f'No positions received, returning 0') return 0 - - old_pos = session.exec( - select(PositionRating).where(PositionRating.player_id == this_card.player_id) - ).all() + + old_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player_id)).all() for position in old_pos: - logger.info(f"Deleting orphaned position rating: {position}") + logger.info(f'Deleting orphaned position rating: {position}') session.delete(position) - + session.commit() - + def cache_pos(json_data: dict) -> PositionRating: - if "id" in json_data: - del json_data["id"] + if 'id' in json_data: + del json_data['id'] valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True) db_pos = PositionRating.model_validate(valid_pos) session.add(db_pos) session.commit() session.refresh(db_pos) return db_pos - + added_count = 0 - for json_data in p_query["positions"]: - logger.info(f"Processing: {json_data}") - json_data["player_id"] = get_player_id_from_dict(json_data["player"]) + for json_data in p_query['positions']: + logger.info(f'Processing: {json_data}') + json_data['player_id'] = get_player_id_from_dict(json_data['player']) this_pos = cache_pos(json_data) session.add(this_pos) added_count += 1 - + return added_count -async def get_or_create_ai_card( - session: Session, - player: Player, - team: Team, - skip_cache: bool = False, - dev_mode: bool = False, -) -> Card: - logger.info( - f"Getting or creating card for {player.name_with_desc} on the {team.sname} / skip_cache: {skip_cache}" - ) +async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card: + logger.info(f'Getting or creating card for {player.name_with_desc} on the {team.sname} / skip_cache: {skip_cache}') if not team.is_ai: - err = f"Cannot create AI cards for human teams" - logger.error(f"gameplay_models - get_or_create_ai_card: {err}") + err = f'Cannot create AI cards for human teams' + logger.error(f'gameplay_models - get_or_create_ai_card: {err}') raise TypeError(err) - - logger.info( - f"gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}" - ) + + logger.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}') if not skip_cache: - c_query = session.exec( - select(Card).where(Card.player == player, Card.team == team) - ).all() + c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all() if len(c_query) > 0: this_card = c_query[0] - logger.info( - f"we found a cached card: {this_card} / created: {this_card.created}" - ) + logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') tdelta = datetime.datetime.now() - this_card.created - logger.debug(f"tdelta: {tdelta}") - if tdelta.total_seconds() < CACHE_LIMIT and ( - this_card.pitcherscouting is not None - or this_card.batterscouting is not None - ): - logger.info(f"returning this_card") + logger.debug(f'tdelta: {tdelta}') + if tdelta.total_seconds() < CACHE_LIMIT and (this_card.pitcherscouting is not None or this_card.batterscouting is not None): + logger.info(f'returning this_card') return this_card # else: # logger.info(f'deleting card record') @@ -899,73 +674,58 @@ async def get_or_create_ai_card( # session.commit() async def pull_card(p: Player, t: Team): - c_query = await db_get("cards", params=[("team_id", t.id), ("player_id", p.id)]) - if c_query["count"] > 0: - json_data = c_query["cards"][0] - logger.info( - f"gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}" - ) - json_data["team_id"] = json_data["team"]["id"] - json_data["player_id"] = get_player_id_from_dict(json_data["player"]) - valid_card = CardBase.model_validate( - c_query["cards"][0], from_attributes=True - ) + c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)]) + if c_query['count'] > 0: + json_data = c_query['cards'][0] + logger.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}') + json_data['team_id'] = json_data['team']['id'] + json_data['player_id'] = get_player_id_from_dict(json_data['player']) + valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True) db_card = Card.model_validate(valid_card) - logger.info(f"gameplay_queries - cache_team - db_card: {db_card}") - logger.info(f"Checking for existing card ID: {db_card.id}") + logger.info(f'gameplay_queries - cache_team - db_card: {db_card}') + logger.info(f'Checking for existing card ID: {db_card.id}') try: - this_card = session.exec( - select(Card).where(Card.id == db_card.id) - ).one() - logger.info( - f"Found card: {this_card}\nUpdating with db card: {db_card}" - ) - + this_card = session.exec(select(Card).where(Card.id == db_card.id)).one() + logger.info(f'Found card: {this_card}\nUpdating with db card: {db_card}') + for key, value in db_card.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_card, key, value) - logger.info(f"Set this_card to db_card") + logger.info(f'Set this_card to db_card') session.add(this_card) session.commit() - logger.info(f"Refreshing this_card") + logger.info(f'Refreshing this_card') session.refresh(this_card) return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') session.add(db_card) session.commit() session.refresh(db_card) return db_card else: return None - + this_card = await pull_card(player, team) if this_card is not None: - if player.pos_1 not in ["SP", "RP"]: - this_card.batterscouting = await shared_get_scouting( - session, this_card, "batter" - ) + if player.pos_1 not in ['SP', 'RP']: + this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') else: - this_card.pitcherscouting = await shared_get_scouting( - session, this_card, "pitcher" - ) - + this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') + session.add(this_card) session.commit() session.refresh(this_card) return this_card - logger.info( - f"gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}" - ) + logger.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}') if dev_mode: # Find next available ID since Card model has autoincrement=False from sqlmodel import func - max_id = session.exec(select(func.max(Card.id))).one() next_id = (max_id or 0) + 1 this_card = Card(id=next_id, player=player, team=team) @@ -975,51 +735,42 @@ async def get_or_create_ai_card( return this_card await db_post( - "cards", - payload={"cards": [{"player_id": player.id, "team_id": team.id, "pack_id": 1}]}, + 'cards', + payload={'cards': [ + {'player_id': player.id, 'team_id': team.id, 'pack_id': 1} + ]} ) - + this_card = await pull_card(player, team) if this_card is not None: - if player.pos_1 not in ["SP", "RP"]: - this_card.batterscouting = await shared_get_scouting( - session, this_card, "batter" - ) + if player.pos_1 not in ['SP', 'RP']: + this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') else: - this_card.pitcherscouting = await shared_get_scouting( - session, this_card, "pitcher" - ) - + this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') + session.add(this_card) session.commit() session.refresh(this_card) return this_card - - err = f"Could not create {player.name} card for {team.abbrev}" - logger.error(f"gameplay_models - get_or_create_ai_card - {err}") + + err = f'Could not create {player.name} card for {team.abbrev}' + logger.error(f'gameplay_models - get_or_create_ai_card - {err}') raise LookupError(err) @log_errors -async def get_card_or_none( - session: Session, card_id: int, skip_cache: bool = False -) -> Card | None: - logger.info(f"Getting card {card_id} / skip_cache: {skip_cache}") +async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None: + logger.info(f'Getting card {card_id} / skip_cache: {skip_cache}') if not skip_cache: this_card = session.get(Card, card_id) if this_card is not None: - logger.info( - f"we found a cached card: {this_card} / created: {this_card.created}" - ) + logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') tdelta = datetime.datetime.now() - this_card.created - logger.debug(f"tdelta: {tdelta}") - if tdelta.total_seconds() < CACHE_LIMIT and ( - this_card.pitcherscouting is not None - or this_card.batterscouting is not None - ): - logger.info(f"returning this_card") + logger.debug(f'tdelta: {tdelta}') + if tdelta.total_seconds() < CACHE_LIMIT and (this_card.pitcherscouting is not None or this_card.batterscouting is not None): + logger.info(f'returning this_card') return this_card # else: # logger.info(f'deleting this_card') @@ -1027,7 +778,7 @@ async def get_card_or_none( # session.delete(this_card.batterscouting) # except Exception as e: # logger.error(f'Could not delete batter scouting: {e}') - + # try: # session.delete(this_card.pitcherscouting) # except Exception as e: @@ -1035,183 +786,120 @@ async def get_card_or_none( # session.delete(this_card) # session.commit() - + def cache_card(json_data: dict) -> Card: valid_card = CardBase.model_validate(json_data, from_attributes=True) db_card = Card.model_validate(valid_card) - logger.info(f"gameplay_queries - cache_team - db_card: {db_card}") - logger.info(f"Checking for existing card ID: {db_card.id}") + logger.info(f'gameplay_queries - cache_team - db_card: {db_card}') + logger.info(f'Checking for existing card ID: {db_card.id}') try: this_card = session.exec(select(Card).where(Card.id == db_card.id)).one() - logger.info(f"Found card: {this_card}\nUpdating with db card: {db_card}") - + logger.info(f'Found card: {this_card}\nUpdating with db card: {db_card}') + # this_team = db_team for key, value in db_card.model_dump(exclude_unset=True).items(): - logger.info(f"Setting key ({key}) to value ({value})") + logger.info(f'Setting key ({key}) to value ({value})') setattr(this_card, key, value) - logger.info(f"Set this_card to db_card") + logger.info(f'Set this_card to db_card') session.add(this_card) session.commit() - logger.info(f"Refreshing this_card") + logger.info(f'Refreshing this_card') session.refresh(this_card) return this_card - except NoResultFound: - logger.info(f"Card not found, adding to db") + except Exception: + logger.info(f'Card not found, adding to db') session.add(db_card) session.commit() session.refresh(db_card) return db_card - - c_query = await db_get("cards", object_id=card_id) + + c_query = await db_get('cards', object_id=card_id) if c_query is not None: - c_query["team_id"] = c_query["team"]["id"] - c_query["player_id"] = get_player_id_from_dict(c_query["player"]) - - this_player = await get_player_or_none(session, player_id=c_query["player_id"]) - this_team = await get_team_or_none(session, team_id=c_query["team_id"]) + c_query['team_id'] = c_query['team']['id'] + c_query['player_id'] = get_player_id_from_dict(c_query['player']) + + this_player = await get_player_or_none(session, player_id=c_query['player_id']) + this_team = await get_team_or_none(session, team_id=c_query['team_id']) if this_player is None: - raise LookupError( - f"Player ID {c_query['player_id']} not found during card check" - ) + raise LookupError(f'Player ID {c_query["player_id"]} not found during card check') if this_team is None: - raise LookupError( - f"Team ID {c_query['team_id']} not found during card check" - ) + raise LookupError(f'Team ID {c_query["team_id"]} not found during card check') - logger.info(f"Caching card ID {card_id} now") + logger.info(f'Caching card ID {card_id} now') this_card = cache_card(c_query) - - logger.info(f"Card is cached, checking for scouting") - all_pos = [ - x - for x in [ - this_player.pos_1, - this_player.pos_2, - this_player.pos_3, - this_player.pos_3, - this_player.pos_4, - this_player.pos_5, - this_player.pos_6, - this_player.pos_7, - this_player.pos_8, - ] - if x is not None - ] - logger.info(f"All positions: {all_pos}") - if "SP" in all_pos or "RP" in all_pos: - logger.info(f"Pulling pitcher scouting") - this_card.pitcherscouting = await shared_get_scouting( - session, this_card, "pitcher" - ) - if any( - item in all_pos - for item in ["DH", "C", "1B", "2B", "3B", "SS", "LF", "CF", "RF"] - ): - logger.info(f"Pulling batter scouting") - this_card.batterscouting = await shared_get_scouting( - session, this_card, "batter" - ) - - logger.info(f"Updating this_card") + + logger.info(f'Card is cached, checking for scouting') + all_pos = [x for x in [this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_3, this_player.pos_4, this_player.pos_5, this_player.pos_6, this_player.pos_7, this_player.pos_8] if x is not None] + logger.info(f'All positions: {all_pos}') + if 'SP' in all_pos or 'RP' in all_pos: + logger.info(f'Pulling pitcher scouting') + this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') + if any(item in all_pos for item in ['DH', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']): + logger.info(f'Pulling batter scouting') + this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') + + logger.info(f'Updating this_card') session.add(this_card) session.commit() - logger.info(f"Refreshing this_card") + logger.info(f'Refreshing this_card') session.refresh(this_card) - logger.info(f"this_card: {this_card}") + logger.info(f'this_card: {this_card}') return this_card return None -def get_game_lineups( - session: Session, - this_game: Game, - specific_team: Team = None, - is_active: bool = None, -) -> list[Lineup]: - logger.info( - f"Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}" - ) +def get_game_lineups(session: Session, this_game: Game, specific_team: Team = None, is_active: bool = None) -> list[Lineup]: + logger.info(f'Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}') st = select(Lineup).where(Lineup.game == this_game) - + if specific_team is not None: st = st.where(Lineup.team == specific_team) if is_active is not None: st = st.where(Lineup.active == is_active) - + return session.exec(st).all() -def get_players_last_pa( - session: Session, lineup_member: Lineup, none_okay: bool = False -): - logger.info( - f"Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}" - ) - last_pa = session.exec( - select(Play) - .where(Play.game == lineup_member.game, Play.batter == lineup_member) - .order_by(Play.play_num.desc()) - .limit(1) - ).all() +def get_players_last_pa(session: Session, lineup_member: Lineup, none_okay: bool = False): + logger.info(f'Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}') + last_pa = session.exec(select(Play).where(Play.game == lineup_member.game, Play.batter == lineup_member).order_by(Play.play_num.desc()).limit(1)).all() if len(last_pa) == 1: return last_pa[0] else: if none_okay: return None else: - log_exception( - PlayNotFoundException, - f"No play found for {lineup_member.player.name_with_desc}'s last AB", - ) + log_exception(PlayNotFoundException, f'No play found for {lineup_member.player.name_with_desc}\'s last AB') -def get_one_lineup( - session: Session, - this_game: Game, - this_team: Team, - active: bool = True, - position: str = None, - batting_order: int = None, -) -> Lineup: - logger.info( - f"Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}" - ) +def get_one_lineup(session: Session, this_game: Game, this_team: Team, active: bool = True, position: str = None, batting_order: int = None) -> Lineup: + logger.info(f'Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}') if position is None and batting_order is None: - raise KeyError("Position or batting order must be provided for get_one_lineup") - - st = select(Lineup).where( - Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active - ) + raise KeyError('Position or batting order must be provided for get_one_lineup') + + st = select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active) if position is not None: st = st.where(Lineup.position == position) else: st = st.where(Lineup.batting_order == batting_order) - - logger.info(f"get_one_lineup query: {st}") + + logger.info(f'get_one_lineup query: {st}') compiled = st.compile(compile_kwargs={"literal_binds": True}) - logger.info(f"get_one_lineup literal SQL: {compiled}") + logger.info(f'get_one_lineup literal SQL: {compiled}') this_lineup = session.exec(st).one() - logger.info(f"Found lineup: {this_lineup}") + logger.info(f'Found lineup: {this_lineup}') return this_lineup -def get_last_team_play( - session: Session, this_game: Game, this_team: Team, none_okay: bool = False -): - logger.info(f"Getting last play for the {this_team.lname} in game {this_game.id}") - last_play = session.exec( - select(Play) - .join(Lineup, onclause=Lineup.id == Play.batter_id) - .where(Play.game == this_game, Lineup.team == this_team) - .order_by(Play.play_num.desc()) - .limit(1) - ).all() +def get_last_team_play(session: Session, this_game: Game, this_team: Team, none_okay: bool = False): + logger.info(f'Getting last play for the {this_team.lname} in game {this_game.id}') + last_play = session.exec(select(Play).join(Lineup, onclause=Lineup.id == Play.batter_id).where(Play.game == this_game, Lineup.team == this_team).order_by(Play.play_num.desc()).limit(1)).all() if len(last_play) == 1: return last_play[0] @@ -1219,78 +907,50 @@ def get_last_team_play( if none_okay: return None else: - log_exception( - PlayNotFoundException, f"No last play found for the {this_team.sname}" - ) + log_exception(PlayNotFoundException, f'No last play found for the {this_team.sname}') -def get_sorted_lineups( - session: Session, this_game: Game, this_team: Team -) -> list[Lineup]: - logger.info( - f"Getting sorted lineups for the {this_team.lname} in game {this_game.id}" - ) - custom_order = { - "P": 1, - "C": 2, - "1B": 3, - "2B": 4, - "3B": 5, - "SS": 6, - "LF": 7, - "CF": 8, - "RF": 9, - } +def get_sorted_lineups(session: Session, this_game: Game, this_team: Team) -> list[Lineup]: + logger.info(f'Getting sorted lineups for the {this_team.lname} in game {this_game.id}') + custom_order = {'P': 1, 'C': 2, '1B': 3, '2B': 4, '3B': 5, 'SS': 6, 'LF': 7, 'CF': 8, 'RF': 9} - all_lineups = session.exec( - select(Lineup).where( - Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team - ) - ).all() + all_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team)).all() - sorted_lineups = sorted( - all_lineups, key=lambda x: custom_order.get(x.position, float("inf")) - ) + sorted_lineups = sorted(all_lineups, key=lambda x: custom_order.get(x.position, float('inf'))) return sorted_lineups def get_db_ready_plays(session: Session, this_game: Game, db_game_id: int): - logger.info(f"Getting db ready plays for game {this_game.id}") - all_plays = session.exec( - select(Play).where(Play.game == this_game).order_by(Play.play_num.desc()) - ).all() + logger.info(f'Getting db ready plays for game {this_game.id}') + all_plays = session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num.desc())).all() - obc_list = ["000", "001", "010", "100", "011", "101", "110", "111"] + obc_list = ['000', '001', '010', '100', '011', '101', '110', '111'] return_plays = [] for play in all_plays: dump = play.model_dump() - dump["game_id"] = db_game_id - dump["on_base_code"] = obc_list[play.on_base_code] - dump["batter_id"] = play.batter.player.id - dump["pitcher_id"] = play.pitcher.player.id - dump["catcher_id"] = play.catcher.player.id - if "runner_id" in dump and dump["runner_id"] is not None: - dump["runner_id"] = play.runner.player.id - if "defender_id" in dump and dump["defender_id"] is not None: - dump["defender_id"] = play.defender.player.id - if "on_first_id" in dump and dump["on_first_id"] is not None: - dump["on_first_id"] = play.on_first.player.id - if "on_second_id" in dump and dump["on_second_id"] is not None: - dump["on_second_id"] = play.on_second.player.id - if "on_third_id" in dump and dump["on_third_id"] is not None: - dump["on_third_id"] = play.on_third.player.id + dump['game_id'] = db_game_id + dump['on_base_code'] = obc_list[play.on_base_code] + dump['batter_id'] = play.batter.player.id + dump['pitcher_id'] = play.pitcher.player.id + dump['catcher_id'] = play.catcher.player.id + if 'runner_id' in dump and dump['runner_id'] is not None: + dump['runner_id'] = play.runner.player.id + if 'defender_id' in dump and dump['defender_id'] is not None: + dump['defender_id'] = play.defender.player.id + if 'on_first_id' in dump and dump['on_first_id'] is not None: + dump['on_first_id'] = play.on_first.player.id + if 'on_second_id' in dump and dump['on_second_id'] is not None: + dump['on_second_id'] = play.on_second.player.id + if 'on_third_id' in dump and dump['on_third_id'] is not None: + dump['on_third_id'] = play.on_third.player.id return_plays.append(dump) - - return {"plays": return_plays} + + return {'plays': return_plays} -def get_db_ready_decisions( - session: Session, this_game: Game, db_game_id: int -) -> list[DecisionModel]: - logger.info( - f"Game {this_game.id} | Getting db ready decisions for game {this_game.id}" - ) +def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) -> list[DecisionModel]: + logger.info(f'Game {this_game.id} | Getting db ready decisions for game {this_game.id}') save = None away_starter = None home_starter = None @@ -1308,219 +968,146 @@ def get_db_ready_decisions( # { : DecisionModel } } - final_inning = session.exec( - select(func.max(Play.inning_num)).where(Play.game == this_game) - ).one() - away_starter = session.exec( - select(Lineup).where( - Lineup.game == this_game, - Lineup.team == this_game.away_team, - Lineup.position == "P", - Lineup.after_play == 0, - ) - ).one() + final_inning = session.exec(select(func.max(Play.inning_num)).where(Play.game == this_game)).one() + away_starter = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.team == this_game.away_team, Lineup.position == 'P', Lineup.after_play == 0)).one() away_pitcher = away_starter last_winner = None last_loser = None # Get starting pitchers and update this as a pointer for the play crawl - for play in session.exec( - select(Play).where(Play.game == this_game).order_by(Play.play_num) - ).all(): - logger.info(f"Game {this_game.id} | Crawling play #{play.play_num}") + for play in session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num)).all(): + logger.info(f'Game {this_game.id} | Crawling play #{play.play_num}') runs_scored = 0 - if play.inning_half == "top": + if play.inning_half == 'top': if home_starter is None: - logger.info( - f"Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_starter = play.pitcher if home_finisher is None: - logger.info( - f"Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_finisher = play.pitcher - + if home_pitcher != play.pitcher: - logger.info( - f"Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_pitcher = play.pitcher if save == play.pitcher: if play.home_score > play.away_score: if play.pitcher not in holds: - logger.info( - f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') holds.append(play.pitcher) else: if play.pitcher not in b_save: - logger.info( - f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') b_save.append(play.pitcher) - - elif ( - play.home_score > play.away_score - and play.home_score - play.away_score <= 3 - and home_pitcher != home_starter - and play.inning_num >= final_inning - 2 - ): - logger.info( - f"Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}" - ) + + elif play.home_score > play.away_score and play.home_score - play.away_score <= 3 and home_pitcher != home_starter and play.inning_num >= final_inning - 2: + logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}') save = home_pitcher - - elif play.inning_half == "bot": + + elif play.inning_half == 'bot': if away_finisher is None: - logger.info( - f"Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') away_finisher = play.pitcher - + if away_pitcher != play.pitcher: - logger.info( - f"Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') away_pitcher = play.pitcher if save == play.pitcher: if play.away_score > play.home_score: if play.pitcher not in holds: - logger.info( - f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') holds.append(play.pitcher) else: if play.pitcher not in b_save: - logger.info( - f"Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') b_save.append(play.pitcher) - - elif ( - play.away_score > play.home_score - and play.away_score - play.home_score <= 3 - and away_pitcher != away_starter - and play.inning_num >= final_inning - 2 - ): - logger.info( - f"Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}" - ) + + elif play.away_score > play.home_score and play.away_score - play.home_score <= 3 and away_pitcher != away_starter and play.inning_num >= final_inning - 2: + logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}') save = away_pitcher - + if play.is_go_ahead: run_diff = play.home_score - play.away_score - for x in [ - play.on_first_final, - play.on_second_final, - play.on_third_final, - play.batter_final, - ]: + for x in [play.on_first_final, play.on_second_final, play.on_third_final, play.batter_final]: runs_scored += 1 if x == 4 else 0 - if play.inning_half == "top": + if play.inning_half == 'top': run_diff -= runs_scored else: run_diff += runs_scored - logger.info(f"run_diff for go-ahead: {run_diff}") - logger.info(f"go-ahead play: {play}") + logger.info(f'run_diff for go-ahead: {run_diff}') + logger.info(f'go-ahead play: {play}') count = 1 - for runner, dest in [ - (play.on_third, play.on_third_final), - (play.on_second, play.on_second_final), - (play.on_first, play.on_first_final), - (play.batter, play.batter_final), - ]: - logger.info( - f"Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}" - ) + for runner, dest in [(play.on_third, play.on_third_final), (play.on_second, play.on_second_final), (play.on_first, play.on_first_final), (play.batter, play.batter_final)]: + logger.info(f'Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}') if dest == 4 and count == abs(run_diff): winning_play = get_players_last_pa(session, runner) loser = winning_play.pitcher - logger.info( - f"Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}" - ) - + logger.info(f'Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}') + if save == loser: - logger.info( - f"Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}') b_save.append(loser) - - winner = home_pitcher if play.inning_half == "bot" else away_pitcher - logger.info( - f"Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}" - ) + + winner = home_pitcher if play.inning_half == 'bot' else away_pitcher + logger.info(f'Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}') break count += 1 - + if winner is None: - winner = home_pitcher if play.inning_half == "bot" else away_pitcher - logger.info( - f"Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}" - ) - + winner = home_pitcher if play.inning_half == 'bot' else away_pitcher + logger.info(f'Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}') + if loser is None: - logger.info( - f"Game {this_game.id} | Setting loser to {play.pitcher} by default on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Setting loser to {play.pitcher} by default on play #{play.play_num}') loser = play.pitcher - + if play.is_tied and runs_scored == 0: - logger.info( - f"Game {this_game.id} | Clearing winner and loser on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Clearing winner and loser on play #{play.play_num}') last_winner = winner last_loser = loser winner, loser = None, None if save is not None: - logger.info( - f"Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}') b_save.append(save) save = None - + if play.pitcher.player_id not in decisions: - logger.info( - f"Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}" - ) + logger.info(f'Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}') decisions[play.pitcher.player_id] = DecisionModel( game_id=db_game_id, season=this_game.season, week=0 if this_game.week is None else this_game.week, pitcher_id=play.pitcher.player_id, - pitcher_team_id=play.pitcher.team_id, + pitcher_team_id=play.pitcher.team_id ) - + # After the play loop, determine winner/loser from final game state if still None - final_play = session.exec( - select(Play).where(Play.game == this_game).order_by(Play.play_num.desc()) - ).first() + final_play = session.exec(select(Play).where(Play.game == this_game).order_by(Play.play_num.desc())).first() if winner is None: if final_play.home_score > final_play.away_score: winner = home_finisher - logger.info(f"Setting winner to home_finisher: {winner}") + logger.info(f'Setting winner to home_finisher: {winner}') else: winner = away_finisher - logger.info(f"Setting winner to away_finisher: {winner}") + logger.info(f'Setting winner to away_finisher: {winner}') if loser is None: if final_play.home_score > final_play.away_score: loser = away_finisher - logger.info(f"Setting loser to away_finisher: {loser}") + logger.info(f'Setting loser to away_finisher: {loser}') else: loser = home_finisher - logger.info(f"Setting loser to home_finisher: {loser}") - - logger.info(f"winner: {winner} / loser: {loser}") + logger.info(f'Setting loser to home_finisher: {loser}') + + logger.info(f'winner: {winner} / loser: {loser}') if winner is not None: decisions[winner.player_id].win = 1 if loser is not None: @@ -1533,453 +1120,301 @@ def get_db_ready_decisions( decisions[away_finisher.player_id].game_finished = 1 if home_finisher is not None: decisions[home_finisher.player_id].game_finished = 1 - + for lineup in holds: decisions[lineup.player_id].hold = 1 - + if save is not None: decisions[save.player_id].is_save = 1 decisions[save.player_id].hold = 0 - + for lineup in b_save: decisions[lineup.player_id].b_save = 1 decisions[lineup.player_id].is_save = 0 - + return [x.model_dump() for x in decisions.values()] -async def post_game_rewards( - session: Session, winning_team: Team, losing_team: Team, this_game: Game -): +async def post_game_rewards(session: Session, winning_team: Team, losing_team: Team, this_game: Game): wr_query = await db_get( - "gamerewards", - params=[("name", f"{'Short' if this_game.short_game else 'Full'} Game Win")], - ) + 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Win')]) lr_query = await db_get( - "gamerewards", - params=[("name", f"{'Short' if this_game.short_game else 'Full'} Game Loss")], - ) - if not wr_query["count"] or not lr_query["count"]: - raise DatabaseError(f"Game rewards were not found. Leaving this game active.") - - win_reward = wr_query["gamerewards"][0] - loss_reward = lr_query["gamerewards"][0] - win_string = f"1x {win_reward['pack_type']['name']} Pack\n" + 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Loss')]) + if not wr_query['count'] or not lr_query['count']: + raise DatabaseError(f'Game rewards were not found. Leaving this game active.') + + win_reward = wr_query['gamerewards'][0] + loss_reward = lr_query['gamerewards'][0] + win_string = f'1x {win_reward["pack_type"]["name"]} Pack\n' # Post Campaign Team Choice packs - if ( - this_game.ai_team is not None - and losing_team.is_ai - and "gauntlet" not in this_game.game_type - and not this_game.short_game - ): - g_query = await db_get( - "games", - params=[ - ("team1_id", winning_team.id), - ("season", this_game.season), - ("forfeit", False), - ], - ) + if this_game.ai_team is not None and losing_team.is_ai and 'gauntlet' not in this_game.game_type and not this_game.short_game: + g_query = await db_get('games', params=[('team1_id', winning_team.id), ('season', this_game.season), ('forfeit', False)]) win_points = 0 - for x in g_query["games"]: - if ( - x["away_score"] > x["home_score"] - and x["away_team"]["id"] == winning_team.id - ) or ( - x["home_score"] > x["away_score"] - and x["home_team"]["id"] == winning_team.id - ): - if x["game_type"] == "minor-league": + for x in g_query['games']: + if (x['away_score'] > x['home_score'] and x['away_team']['id'] == winning_team.id) or (x['home_score'] > x['away_score'] and x['home_team']['id'] == winning_team.id): + if x['game_type'] == 'minor-league': win_points += 1 - elif x["game_type"] in ["major-league", "flashback"]: + elif x['game_type'] in ['major-league', 'flashback']: win_points += 2 - - elif x["game_type"] == "hall-of-fame": + + elif x['game_type'] == 'hall-of-fame': win_points += 3 - - if this_game.game_type == "minor-league": + + if this_game.game_type == 'minor-league': this_game_points = 1 - elif this_game.game_type in ["major-league", "flashback"]: + elif this_game.game_type in ['major-league', 'flashback']: this_game_points = 2 else: this_game_points = 3 pre_game_points = win_points - this_game_points if math.floor(win_points / 6) > math.floor(pre_game_points / 6): - await db_post( - "packs/one", - payload={ - "team_id": winning_team.id, - "pack_type_id": 8, - "pack_team_id": losing_team.id, - }, - ) - win_string += f"1x {losing_team.abbrev} Team Choice pack\n" + await db_post('packs/one', payload={ + 'team_id': winning_team.id, + 'pack_type_id': 8, + 'pack_team_id': losing_team.id + }) + win_string += f'1x {losing_team.abbrev} Team Choice pack\n' - win_string += f"{win_reward['money']}₼\n" - loss_string = f"{loss_reward['money']}₼\n" + win_string += f'{win_reward["money"]}₼\n' + loss_string = f'{loss_reward["money"]}₼\n' - if "gauntlet" in this_game.game_type: + if 'gauntlet' in this_game.game_type: winning_abbrev = winning_team.abbrev.lower() - if "gauntlet" in winning_abbrev: - winning_team = await get_team_or_none( - session, team_abbrev=winning_abbrev.split("-")[1] - ) + if 'gauntlet' in winning_abbrev: + winning_team = await get_team_or_none(session, team_abbrev=winning_abbrev.split('-')[1]) if winning_team is None: - raise DatabaseError(f"Main team not found for {winning_abbrev}") - + raise DatabaseError(f'Main team not found for {winning_abbrev}') + losing_abbrev = losing_team.abbrev.lower() - if "gauntlet" in losing_abbrev: - losing_team = await get_team_or_none( - session, team_abbrev=losing_abbrev.split("-")[1] - ) + if 'gauntlet' in losing_abbrev: + losing_team = await get_team_or_none(session, team_abbrev=losing_abbrev.split('-')[1]) if losing_team is None: - raise DatabaseError(f"Main team not found for {losing_abbrev}") + raise DatabaseError(f'Main team not found for {losing_abbrev}') - await db_post( - "packs/one", - payload={ - "team_id": winning_team.id, - "pack_type_id": win_reward["pack_type"]["id"], - }, - ) - await db_post(f"teams/{winning_team.id}/money/{win_reward['money']}") - await db_post(f"teams/{losing_team.id}/money/{loss_reward['money']}") + await db_post('packs/one', payload={'team_id': winning_team.id, 'pack_type_id': win_reward['pack_type']['id']}) + await db_post(f'teams/{winning_team.id}/money/{win_reward["money"]}') + await db_post(f'teams/{losing_team.id}/money/{loss_reward["money"]}') return win_string, loss_string -def get_available_subs( - session: Session, this_game: Game, this_team: Team -) -> list[Card]: - logger.info(f"Getting all available subs") - team_lineups = session.exec( - select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team) - ).all() +def get_available_subs(session: Session, this_game: Game, this_team: Team) -> list[Card]: + logger.info(f'Getting all available subs') + team_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team)).all() used_card_ids = [x.card.id for x in team_lineups] - logger.info(f"USED CARD IDS: {used_card_ids}") + logger.info(f'USED CARD IDS: {used_card_ids}') - all_roster_links = session.exec( - select(RosterLink).where( - RosterLink.game == this_game, RosterLink.team == this_team - ) - ).all() + all_roster_links = session.exec(select(RosterLink).where(RosterLink.game == this_game, RosterLink.team == this_team)).all() return [x.card for x in all_roster_links if x.card_id not in used_card_ids] -def get_available_pitchers( - session: Session, - this_game: Game, - this_team: Team, - sort: Literal["starter-desc", "closer-desc"] = "closer-desc", -) -> list[Card]: - logger.info( - f"getting available pitchers for team {this_team.id} in game {this_game.id}" - ) +def get_available_pitchers(session: Session, this_game: Game, this_team: Team, sort: Literal['starter-desc', 'closer-desc'] = 'closer-desc') -> list[Card]: + logger.info(f'getting available pitchers for team {this_team.id} in game {this_game.id}') all_subs = get_available_subs(session, this_game, this_team) - logger.info(f"all_subs: {all_subs}") + logger.info(f'all_subs: {all_subs}') pitchers = [x for x in all_subs if x.pitcherscouting is not None] - logger.info(f"pitchers: {pitchers}") - + logger.info(f'pitchers: {pitchers}') + def sort_by_pow(this_card: Card): s_pow = this_card.pitcherscouting.pitchingcard.starter_rating r_pow = this_card.pitcherscouting.pitchingcard.relief_rating - c_pow = ( - this_card.pitcherscouting.pitchingcard.closer_rating - if this_card.pitcherscouting.pitchingcard.closer_rating is not None - else 0 - ) - - if sort == "starter-desc": + c_pow = this_card.pitcherscouting.pitchingcard.closer_rating if this_card.pitcherscouting.pitchingcard.closer_rating is not None else 0 + + if sort == 'starter-desc': r_val = (s_pow * 3) + r_pow else: r_val = (c_pow * 10) - (r_pow * 5) - (s_pow * 3) - + return r_val - + pitchers.sort(key=sort_by_pow, reverse=True) return pitchers -def get_available_batters( - session: Session, this_game: Game, this_team: Team -) -> list[Card]: - logger.info( - f"getting available batters for team {this_team.id} in game {this_game.id}" - ) +def get_available_batters(session: Session, this_game: Game, this_team: Team) -> list[Card]: + logger.info(f'getting available batters for team {this_team.id} in game {this_game.id}') all_subs = get_available_subs(session, this_game, this_team) - logger.info(f"all_subs: {all_subs}") + logger.info(f'all_subs: {all_subs}') batters = [x for x in all_subs if x.batterscouting is not None] - logger.info(f"batters: {batters}") + logger.info(f'batters: {batters}') return batters def get_batter_card(this_card: Card = None, this_lineup: Lineup = None) -> BattingCard: if this_card is not None: - logger.info(f"Getting batter card for {this_card.player.name}") + logger.info(f'Getting batter card for {this_card.player.name}') return this_card.batterscouting.battingcard if this_lineup is not None: - logger.info(f"Getting batter card for {this_lineup.player.name}") + logger.info(f'Getting batter card for {this_lineup.player.name}') return this_lineup.card.batterscouting.battingcard - log_exception( - KeyError, "Either a Card or Lineup must be provided to get_batter_card" - ) + log_exception(KeyError, 'Either a Card or Lineup must be provided to get_batter_card') def get_batting_statline(session: Session, this_lineup: Lineup) -> str: - logger.info( - f"Getting batting statline for {this_lineup.player.name} in Game {this_lineup.game.id}" - ) + logger.info(f'Getting batting statline for {this_lineup.player.name} in Game {this_lineup.game.id}') - at_bats = session.exec( - select(func.count(Play.id)).where( - Play.game == this_lineup.game, - Play.batter == this_lineup, - Play.ab == 1, - Play.complete == True, - ) - ).one() - hits = session.exec( - select(func.count(Play.id)).where( + at_bats = session.exec(select(func.count(Play.id)).where( + Play.game == this_lineup.game, Play.batter == this_lineup, Play.ab == 1, Play.complete == True + )).one() + hits = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.hit == 1 - ) - ).one() + )).one() - bat_string = f"{hits}-{at_bats}" - logger.info(f"at-bat bat_string: {bat_string}") + bat_string = f'{hits}-{at_bats}' + logger.info(f'at-bat bat_string: {bat_string}') - homeruns = session.exec( - select(func.count(Play.id)).where( + homeruns = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.homerun == 1 - ) - ).one() + )).one() if homeruns > 0: - number_string = f"{homeruns} " if homeruns > 1 else "" - bat_string += f", {number_string}HR" + number_string = f'{homeruns} ' if homeruns > 1 else "" + bat_string += f', {number_string}HR' - triples = session.exec( - select(func.count(Play.id)).where( + triples = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.triple == 1 - ) - ).one() + )).one() if triples > 0: - number_string = f"{triples} " if triples > 1 else "" - bat_string += f", {number_string}3B" - - doubles = session.exec( - select(func.count(Play.id)).where( + number_string = f'{triples} ' if triples > 1 else "" + bat_string += f', {number_string}3B' + + doubles = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.double == 1 - ) - ).one() + )).one() if doubles > 0: - number_string = f"{doubles} " if doubles > 1 else "" - bat_string += f", {number_string}2B" - - stolenbases = session.exec( - select(func.count(Play.id)).where( + number_string = f'{doubles} ' if doubles > 1 else "" + bat_string += f', {number_string}2B' + + stolenbases = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.runner == this_lineup, Play.sb == 1 - ) - ).one() + )).one() if stolenbases > 0: - number_string = f"{stolenbases} " if stolenbases > 1 else "" - bat_string += f", {number_string}SB" - - walks = session.exec( - select(func.count(Play.id)).where( + number_string = f'{stolenbases} ' if stolenbases > 1 else "" + bat_string += f', {number_string}SB' + + walks = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.bb == 1 - ) - ).one() + )).one() if walks > 0: - number_string = f"{walks} " if walks > 1 else "" - bat_string += f", {number_string}BB" - - strikeouts = session.exec( - select(func.count(Play.id)).where( + number_string = f'{walks} ' if walks > 1 else "" + bat_string += f', {number_string}BB' + + strikeouts = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.batter == this_lineup, Play.so == 1 - ) - ).one() + )).one() if strikeouts > 0: - number_string = f"{strikeouts} " if strikeouts > 1 else "" - bat_string += f", {number_string}K" + number_string = f'{strikeouts} ' if strikeouts > 1 else "" + bat_string += f', {number_string}K' + + logger.info(f'bat_string: {bat_string}') - logger.info(f"bat_string: {bat_string}") - - if bat_string == "0-0": - return "1st AB" + if bat_string == '0-0': + return '1st AB' else: return bat_string def get_pitching_statline(session: Session, this_lineup: Lineup) -> str: - logger.info( - f"Getting pitching statline for {this_lineup.player.name} in Game {this_lineup.game.id}" - ) + logger.info(f'Getting pitching statline for {this_lineup.player.name} in Game {this_lineup.game.id}') - outs = session.exec( - select(func.sum(Play.outs)).where( - Play.game == this_lineup.game, - Play.pitcher == this_lineup, - Play.complete == True, - ) - ).one() + outs = session.exec(select(func.sum(Play.outs)).where( + Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.complete == True + )).one() if outs is None: - return "***N E W P I T C H E R***" + return '***N E W P I T C H E R***' whole_innings = math.floor(outs / 3) rem_outs = outs % 3 - pit_string = f"{whole_innings}.{rem_outs} IP" - logger.info(f"IP pit_string: {pit_string}") + pit_string = f'{whole_innings}.{rem_outs} IP' + logger.info(f'IP pit_string: {pit_string}') - runs = session.exec( - select(func.count(Play.id)).where( + runs = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.run == 1 - ) - ).one() + )).one() if runs > 0: - number_string = f"{runs} " if runs > 1 else "" - pit_string += f", {number_string}R" + number_string = f'{runs} ' if runs > 1 else "" + pit_string += f', {number_string}R' - e_runs = session.exec( - select(func.count(Play.id)).where( + e_runs = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.e_run == 1 - ) - ).one() + )).one() if e_runs != runs: - pit_string += f" ({e_runs} ER)" + pit_string += f' ({e_runs} ER)' - hits = session.exec( - select(func.count(Play.id)).where( + hits = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.hit == 1 - ) - ).one() + )).one() if hits > 0: - pit_string += f", {hits} H" + pit_string += f', {hits} H' - walks = session.exec( - select(func.count(Play.id)).where( + walks = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.bb == 1 - ) - ).one() + )).one() if walks > 0: - number_string = f"{walks} " if walks > 1 else "" - pit_string += f", {number_string}BB" + number_string = f'{walks} ' if walks > 1 else "" + pit_string += f', {number_string}BB' - strikeouts = session.exec( - select(func.count(Play.id)).where( + strikeouts = session.exec(select(func.count(Play.id)).where( Play.game == this_lineup.game, Play.pitcher == this_lineup, Play.so == 1 - ) - ).one() + )).one() if strikeouts > 0: - number_string = f"{strikeouts} " if strikeouts > 1 else "" - pit_string += f", {number_string}K" + number_string = f'{strikeouts} ' if strikeouts > 1 else "" + pit_string += f', {number_string}K' return pit_string def get_game_cardset_links(session: Session, this_game: Game) -> list[GameCardsetLink]: - logger.info(f"Getting game cardset links for game: {this_game}") - cardset_links = session.exec( - select(GameCardsetLink).where(GameCardsetLink.game == this_game) - ).all() - logger.info(f"links: {cardset_links}") + logger.info(f'Getting game cardset links for game: {this_game}') + cardset_links = session.exec(select(GameCardsetLink).where(GameCardsetLink.game == this_game)).all() + logger.info(f'links: {cardset_links}') return cardset_links -def get_plays_by_pitcher( - session: Session, this_game: Game, this_lineup: Lineup, reversed: bool = False -) -> list[Play]: - logger.info( - f"Getting all pitching plays for {this_lineup.card.player.name_with_desc}" - ) +def get_plays_by_pitcher(session: Session, this_game: Game, this_lineup: Lineup, reversed: bool = False) -> list[Play]: + logger.info(f'Getting all pitching plays for {this_lineup.card.player.name_with_desc}') statement = select(Play).where(Play.game == this_game, Play.pitcher == this_lineup) if reversed: statement = statement.order_by(Play.play_num.desc()) all_plays = session.exec(statement).all() - - logger.info(f"all_plays: {all_plays}") + + logger.info(f'all_plays: {all_plays}') return all_plays -def get_pitcher_runs_by_innings( - session: Session, this_game: Game, this_pitcher: Lineup, innings: list[int] -) -> int: - logger.info( - f"Checking runs for {this_pitcher.player.name_with_desc} in innings: {innings}" - ) - runs = session.exec( - select(func.count(Play.id)).where( - Play.game == this_game, - Play.pitcher == this_pitcher, - Play.run == 1, - Play.inning_num.in_(innings), - ) - ).one() - logger.info(f"runs: {runs}") +def get_pitcher_runs_by_innings(session: Session, this_game: Game, this_pitcher: Lineup, innings: list[int]) -> int: + logger.info(f'Checking runs for {this_pitcher.player.name_with_desc} in innings: {innings}') + runs = session.exec(select(func.count(Play.id)).where( + Play.game == this_game, Play.pitcher == this_pitcher, Play.run == 1, Play.inning_num.in_(innings) + )).one() + logger.info(f'runs: {runs}') return runs + - -def reset_cache( - session: Session, players: bool = True, scouting: bool = True, team: bool = True -): +def reset_cache(session: Session, players: bool = True, scouting: bool = True, team: bool = True): if players: - logger.warning(f"Resetting created date for Players") - session.exec( - update(Player).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) + logger.warning(f'Resetting created date for Players') + session.exec(update(Player).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) if scouting: - logger.warning(f"Resetting created date for scouting objects") - session.exec( - update(BattingCard).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(BatterScouting).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(PitchingCard).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(PitcherScouting).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(PositionRating).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(PitchingRatings).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - session.exec( - update(BattingRatings).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) + logger.warning(f'Resetting created date for scouting objects') + session.exec(update(BattingCard).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(BatterScouting).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(PitchingCard).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(PitcherScouting).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(PositionRating).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(PitchingRatings).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.exec(update(BattingRatings).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) if team: - logger.warning(f"Resetting created date for Teams") - session.exec( - update(Team).values( - created=datetime.datetime.now() - datetime.timedelta(days=365) - ) - ) - + logger.warning(f'Resetting created date for Teams') + session.exec(update(Team).values(created=datetime.datetime.now() - datetime.timedelta(days=365))) + session.commit() diff --git a/tests/test_gauntlet_recap.py b/tests/test_gauntlet_recap.py index de5daa0..64fd144 100644 --- a/tests/test_gauntlet_recap.py +++ b/tests/test_gauntlet_recap.py @@ -204,11 +204,11 @@ class TestBuildGauntletRecapEmbed: make_run(wins=10, losses=1), make_event(), make_main_team(), make_rewards() ) prize_field = next(f for f in embed.fields if f.name == "Prize Distribution") - # The 10-0 bonus line must appear without ✅ (either ❌ or ⬜) + # The 10-0 bonus line must appear with ❌ — definitively missed, not pending (⬜) lines = prize_field.value.split("\n") bonus_line = next((line for line in lines if "10-0" in line), None) assert bonus_line is not None, "Expected a '10-0' line in prizes" - assert "✅" not in bonus_line + assert "❌" in bonus_line def test_empty_rewards_list_omits_prize_field(self): """When rewards is an empty list the Prize Distribution field must be omitted.