import datetime import logging import math from typing import Literal import pydantic from sqlalchemy import func from api_calls import db_get, db_post from sqlmodel import col from in_game.gameplay_models import CACHE_LIMIT, BatterScouting, BatterScoutingBase, BattingCard, BattingCardBase, BattingRatings, BattingRatingsBase, Card, CardBase, Lineup, PitcherScouting, PitchingCard, PitchingCardBase, PitchingRatings, PitchingRatingsBase, Player, PlayerBase, PositionRating, PositionRatingBase, RosterLink, Session, Team, TeamBase, select, or_, Game, Play from exceptions import DatabaseError, PositionNotFoundException, log_exception, PlayNotFoundException logger = logging.getLogger('discord_app') class DecisionModel(pydantic.BaseModel): game_id: int season: int week: int pitcher_id: int pitcher_team_id: int win: int = 0 loss: int = 0 hold: int = 0 is_save: int = 0 is_start: bool = False b_save: int = 0 irunners: int = 0 irunners_scored: int = 0 rest_ip: float = 0 rest_required: int = 0 game_finished: int = 0 def get_batting_team(session: Session, this_play: Play) -> Team: return this_play.game.away_team if this_play.inning_half == 'top' else this_play.game.home_team def get_games_by_channel(session: Session, channel_id: int) -> list[Game]: logger.info(f'Getting games in channel {channel_id}') return session.exec(select(Game).where(Game.channel_id == channel_id, Game.active)).all() def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None: logger.info(f'Getting one game from channel {channel_id}') all_games = get_games_by_channel(session, channel_id) if len(all_games) > 1: err = 'Too many games found in get_channel_game_or_none' logger.error(f'cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}') raise Exception(err) elif len(all_games) == 0: return None return all_games[0] def get_active_games_by_team(session: Session, team: Team) -> list[Game]: logger.info(f'Getting game for team {team.lname}') return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id))).all() async def get_team_or_none( session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None: logger.info(f'Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache}') if team_id is None and gm_id is None and team_abbrev is None: err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search' logger.error(f'gameplay_models - get_team - {err}') raise TypeError(err) if not skip_cache: if team_id is not None: this_team = session.get(Team, team_id) else: if gm_id is not None: statement = select(Team).where(Team.gmid == gm_id) else: statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower()) this_team = session.exec(statement).one_or_none() if this_team is not None: logger.debug(f'we found a team: {this_team} / created: {this_team.created}') tdelta = datetime.datetime.now() - this_team.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_team else: session.delete(this_team) session.commit() def cache_team(json_data: dict) -> Team: # logger.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}') valid_team = TeamBase.model_validate(json_data, from_attributes=True) # logger.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}') db_team = Team.model_validate(valid_team) # logger.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}') session.add(db_team) session.commit() session.refresh(db_team) return db_team if team_id is not None: t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)]) if t_query is not None: return cache_team(t_query) elif gm_id is not None: t_query = await db_get('teams', params=[('gm_id', gm_id)]) if t_query['count'] != 0: for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: return cache_team(team) elif team_abbrev is not None: t_query = await db_get('teams', params=[('abbrev', team_abbrev)]) if t_query['count'] != 0: for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]: return cache_team(team) return None async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None: logger.info(f'gameplay_models - get_player_or_none - player_id: {player_id}') if not skip_cache: this_player = session.get(Player, player_id) if this_player is not None: logger.info(f'we found a cached player: {this_player} / created: {this_player.created}') tdelta = datetime.datetime.now() - this_player.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_player else: session.delete(this_player) session.commit() def cache_player(json_data: dict) -> Player: logger.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}') valid_player = PlayerBase.model_validate(json_data, from_attributes=True) db_player = Player.model_validate(valid_player) session.add(db_player) session.commit() session.refresh(db_player) return db_player p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)]) if p_query is not None: if 'id' not in p_query: p_query['id'] = p_query['player_id'] if 'name' not in p_query: p_query['name'] = p_query['p_name'] return cache_player(p_query) return None async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> BatterScouting | None: logger.info(f'Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc}') if not skip_cache and card.batterscouting is not None: this_scouting = session.get(BatterScouting, card.batterscouting.id) if this_scouting is not None: logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') tdelta = datetime.datetime.now() - this_scouting.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_scouting else: session.delete(this_scouting) session.commit() def cache_scouting(batting_card: dict, ratings_vr: dict, ratings_vl: dict) -> BatterScouting: valid_bc = BattingCardBase.model_validate(batting_card, from_attributes=True) db_bc = BattingCard.model_validate(valid_bc) valid_vl = BattingRatingsBase.model_validate(ratings_vl, from_attributes=True) db_vl = BattingRatings.model_validate(valid_vl) valid_vr = BattingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = BattingRatings.model_validate(valid_vr) db_scouting = BatterScouting( battingcard=db_bc, ratings_vl=db_vl, ratings_vr=db_vr ) session.add(db_scouting) session.commit() session.refresh(db_scouting) return db_scouting s_query = await db_get(f'battingcardratings/player/{card.player.id}', none_okay=False) if s_query['count'] == 2: return cache_scouting( batting_card=s_query['ratings'][0]['battingcard'], ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] ) return None async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> PitcherScouting | None: logger.info(f'Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}') if not skip_cache and card.pitcherscouting is not None: this_scouting = session.get(PitcherScouting, card.pitcherscouting.id) if this_scouting is not None: logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}') tdelta = datetime.datetime.now() - this_scouting.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_scouting else: session.delete(this_scouting) session.commit() def cache_scouting(pitching_card: dict, ratings_vr: dict, ratings_vl: dict) -> PitcherScouting: valid_bc = PitchingCardBase.model_validate(pitching_card, from_attributes=True) db_bc = PitchingCard.model_validate(valid_bc) valid_vl = PitchingRatingsBase.model_validate(ratings_vl, from_attributes=True) db_vl = PitchingRatings.model_validate(valid_vl) valid_vr = PitchingRatingsBase.model_validate(ratings_vr, from_attributes=True) db_vr = PitchingRatings.model_validate(valid_vr) db_scouting = PitcherScouting( pitchingcard=db_bc, ratings_vl=db_vl, ratings_vr=db_vr ) session.add(db_scouting) session.commit() session.refresh(db_scouting) return db_scouting s_query = await db_get(f'pitchingcardratings/player/{card.player.id}', none_okay=False) if s_query['count'] == 2: scouting = cache_scouting( pitching_card=s_query['ratings'][0]['pitchingcard'], ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1], ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1] ) pos_rating = await get_position(session, card, 'P') return scouting return None # async def get_position_rating_or_none(session: Session, card: Card, position: str, skip_cache: bool = False) -> PositionRating | None: # logger.info(f'Getting position rating for card ID') # if not skip_cache: # ratings = session.exec(select(PositionRating).where(PositionRating.player == card.player, PositionRating.variant == card.variant, PositionRating.position == position).limit(1)).all() # """Test all of this; rebuild DB""" # if len(ratings) > 0: # logger.info(f'we found a cached position: {ratings[0]} / created {ratings[0].created}') # tdelta = datetime.datetime.now() - ratings[0].created # logger.debug(f'tdelta: {tdelta}') # if tdelta.total_seconds() < CACHE_LIMIT: # return ratings[0] # else: # session.delete(ratings[0]) # session.commit() # def cache_rating(json_data: dict) -> PositionRating: # valid_position = PositionRatingBase.model_validate(json_data, from_attributes=True) # db_position = PositionRating.model_validate(valid_position) # session.add(db_position) # session.commit() # session.refresh(db_position) # return db_position # p_query = await db_get('cardpositions', params=[('player_id', card.player.id)]) # if p_query['count'] > 0: # return cache_rating(p_query['positions'][0]) # return None def get_player_id_from_dict(json_data: dict) -> int: logger.info(f'Getting player from dict {json_data}') if 'player_id' in json_data: return json_data['player_id'] elif 'id' in json_data: return json_data['id'] log_exception(KeyError, 'Player ID could not be extracted from json data') def get_player_name_from_dict(json_data: dict) -> str: logger.info(f'Getting player from dict {json_data}') if 'name' in json_data: return json_data['name'] elif 'p_name' in json_data: return json_data['p_name'] log_exception(KeyError, 'Player name could not be extracted from json data') async def shared_get_scouting(session: Session, this_card: Card, which: Literal['batter', 'pitcher']): if which == 'batter': logger.info(f'Pulling batter scouting for {this_card.player.name_with_desc}') this_scouting = await get_batter_scouting_or_none(session, this_card) else: logger.info(f'Pulling pitcher scouting for {this_card.player.name_with_desc}') this_scouting = await get_pitcher_scouting_or_none(session, this_card) logger.info(f'this_scouting: {this_scouting}') return this_scouting async def get_position(session: Session, this_card: Card, position: Literal['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF'], skip_cache: bool = False) -> PositionRating: logger.info(f'Pulling position rating for {this_card.player.name_with_desc} at {position}') if not skip_cache: this_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.position == position, PositionRating.variant == this_card.variant)).all() logger.info(f'Ratings found: {len(this_pos)}') if len(this_pos) > 0: logger.info(f'we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}') tdelta = datetime.datetime.now() - this_pos[0].created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_pos[0] else: session.delete(this_pos[0]) session.commit() def cache_pos(json_data: dict) -> PositionRating: if 'id' in json_data: del json_data['id'] valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True) db_pos = PositionRating.model_validate(valid_pos) session.add(db_pos) session.commit() session.refresh(db_pos) return db_pos p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id), ('position', position)]) if p_query['count'] > 0: json_data = p_query['positions'][0] json_data['player_id'] = get_player_id_from_dict(json_data['player']) this_pos = cache_pos(json_data) session.add(this_pos) session.commit() session.refresh(this_pos) return this_pos log_exception(PositionNotFoundException, f'{position} ratings not found for {this_card.player.name_with_desc}') async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card: logger.info(f'Getting or creating card for {player.name_with_desc} on the {team.sname}') if not team.is_ai: err = f'Cannot create AI cards for human teams' logger.error(f'gameplay_models - get_or_create_ai_card: {err}') raise TypeError(err) logger.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}') if not skip_cache: c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all() if len(c_query) > 0: this_card = c_query[0] logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') tdelta = datetime.datetime.now() - this_card.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_card else: session.delete(this_card) session.commit() async def pull_card(p: Player, t: Team): c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)]) if c_query['count'] > 0: json_data = c_query['cards'][0] logger.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}') json_data['team_id'] = json_data['team']['id'] json_data['player_id'] = get_player_id_from_dict(json_data['player']) valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True) db_card = Card.model_validate(valid_card) session.add(db_card) session.commit() session.refresh(db_card) return db_card else: return None this_card = await pull_card(player, team) if this_card is not None: if player.pos_1 not in ['SP', 'RP']: this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') else: this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') session.add(this_card) session.commit() session.refresh(this_card) return this_card logger.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}') if dev_mode: this_card = Card(player=player, team=team) session.add(this_card) session.commit() session.refresh(this_card) return this_card await db_post( 'cards', payload={'cards': [ {'player_id': player.id, 'team_id': team.id, 'pack_id': 1} ]} ) this_card = await pull_card(player, team) if this_card is not None: if player.pos_1 not in ['SP', 'RP']: this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') else: this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') session.add(this_card) session.commit() session.refresh(this_card) return this_card err = f'Could not create {player.name} card for {team.abbrev}' logger.error(f'gameplay_models - get_or_create_ai_card - {err}') raise LookupError(err) async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None: logger.info(f'Getting card {card_id}') if not skip_cache: this_card = session.get(Card, card_id) if this_card is not None: logger.info(f'we found a cached card: {this_card} / created: {this_card.created}') tdelta = datetime.datetime.now() - this_card.created logger.debug(f'tdelta: {tdelta}') if tdelta.total_seconds() < CACHE_LIMIT: return this_card else: session.delete(this_card) session.commit() def cache_card(json_data: dict) -> Card: valid_card = CardBase.model_validate(json_data, from_attributes=True) db_card = Card.model_validate(valid_card) session.add(db_card) session.commit() session.refresh(db_card) return db_card c_query = await db_get('cards', object_id=card_id) if c_query is not None: c_query['team_id'] = c_query['team']['id'] c_query['player_id'] = get_player_id_from_dict(c_query['player']) this_player = await get_player_or_none(session, player_id=c_query['player_id']) this_team = await get_team_or_none(session, team_id=c_query['team_id']) if this_player is None: raise LookupError(f'Player ID {c_query["player_id"]} not found during card check') if this_team is None: raise LookupError(f'Team ID {c_query["team_id"]} not found during card check') logger.info(f'Caching card ID {card_id} now') this_card = cache_card(c_query) if this_player.pos_1 not in ['SP', 'RP']: this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter') else: this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher') session.add(this_card) session.commit() session.refresh(this_card) return this_card return None def get_game_lineups(session: Session, this_game: Game, specific_team: Team = None, is_active: bool = None) -> list[Lineup]: logger.info(f'Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}') st = select(Lineup).where(Lineup.game == this_game) if specific_team is not None: st = st.where(Lineup.team == specific_team) if is_active is not None: st = st.where(Lineup.active == is_active) return session.exec(st).all() def get_players_last_pa(session: Session, lineup_member: Lineup, none_okay: bool = False): logger.info(f'Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}') last_pa = session.exec(select(Play).where(Play.game == lineup_member.game, Play.batter == lineup_member).order_by(Play.id.desc()).limit(1)).all() if len(last_pa) == 1: return last_pa[0] else: if none_okay: return None else: log_exception(PlayNotFoundException, f'No play found for {lineup_member.player.name_with_desc}\'s last AB') def get_one_lineup(session: Session, this_game: Game, this_team: Team, active: bool = True, position: str = None, batting_order: int = None) -> Lineup: logger.info(f'Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}') if position is None and batting_order is None: raise KeyError('Position or batting order must be provided for get_one_lineup') st = select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active) if position is not None: st = st.where(Lineup.position == position) else: st = st.where(Lineup.batting_order == batting_order) return session.exec(st).one() def get_last_team_play(session: Session, this_game: Game, this_team: Team, none_okay: bool = False): logger.info(f'Getting last play for the {this_team.lname} in game {this_game.id}') last_play = session.exec(select(Play).join(Lineup, onclause=Lineup.id == Play.batter_id).where(Play.game == this_game, Lineup.team == this_team).order_by(Play.id.desc()).limit(1)).all() if len(last_play) == 1: return last_play[0] else: if none_okay: return None else: log_exception(PlayNotFoundException, f'No last play found for the {this_team.sname}') def get_sorted_lineups(session: Session, this_game: Game, this_team: Team) -> list[Lineup]: logger.info(f'Getting sorted lineups for the {this_team.lname} in game {this_game.id}') custom_order = {'P': 1, 'C': 2, '1B': 3, '2B': 4, '3B': 5, 'SS': 6, 'LF': 7, 'CF': 8, 'RF': 9} all_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team)).all() sorted_lineups = sorted(all_lineups, key=lambda x: custom_order.get(x.position, float('inf'))) return sorted_lineups def get_db_ready_plays(session: Session, this_game: Game, db_game_id: int): logger.info(f'Getting db ready plays for game {this_game.id}') all_plays = session.exec(select(Play).where(Play.game == this_game)).all() obc_list = ['000', '001', '010', '100', '011', '101', '110', '111'] return_plays = [] for play in all_plays: dump = play.model_dump() dump['game_id'] = db_game_id dump['on_base_code'] = obc_list[play.on_base_code] dump['batter_id'] = play.batter.player.id dump['pitcher_id'] = play.pitcher.player.id dump['catcher_id'] = play.catcher.player.id if 'runner_id' in dump and dump['runner_id'] is not None: dump['runner_id'] = play.runner.player.id if 'defender_id' in dump and dump['defender_id'] is not None: dump['defender_id'] = play.defender.player.id if 'on_first_id' in dump and dump['on_first_id'] is not None: dump['on_first_id'] = play.on_first.player.id if 'on_second_id' in dump and dump['on_second_id'] is not None: dump['on_second_id'] = play.on_second.player.id if 'on_third_id' in dump and dump['on_third_id'] is not None: dump['on_third_id'] = play.on_third.player.id return_plays.append(dump) return {'plays': return_plays} def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) -> list[DecisionModel]: logger.info(f'Game {this_game.id} | Getting db ready decisions for game {this_game.id}') winner = None loser = None save = None away_starter = None home_starter = None away_finisher = None home_finisher = None b_save = [] holds = [] away_pitcher = None home_pitcher = None decisions = { # { : DecisionModel } } final_inning = session.exec(select(func.max(Play.inning_num)).where(Play.game == this_game)).one() # Get starting pitchers and update this as a pointer for the play crawl for play in session.exec(select(Play).where(Play.game == this_game)).all(): logger.info(f'Game {this_game.id} | Crawling play #{play.play_num}') runs_scored = 0 if play.inning_half == 'top': if home_starter is None: logger.info(f'Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_starter = play.pitcher if home_finisher is None: logger.info(f'Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_finisher = play.pitcher if home_pitcher != play.pitcher: logger.info(f'Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') home_pitcher = play.pitcher if save == play.pitcher: if play.home_score > play.away_score: if play.pitcher not in holds: logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') holds.append(play.pitcher) else: if play.pitcher not in b_save: logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') b_save.append(play.pitcher) elif play.home_score > play.away_score and play.home_score - play.away_score <= 3 and home_pitcher != home_starter and play.inning_num + 2 >= final_inning: logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}') save = home_pitcher elif play.inning_half == 'bot': if away_starter is None: logger.info(f'Game {this_game.id} | Setting away starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}') away_starter = play.pitcher if away_finisher is None: logger.info(f'Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') away_finisher = play.pitcher if away_pitcher != play.pitcher: logger.info(f'Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}') away_pitcher = play.pitcher if save == play.pitcher: if play.away_score > play.home_score: if play.pitcher not in holds: logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}') holds.append(play.pitcher) else: if play.pitcher not in b_save: logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}') b_save.append(play.pitcher) if play.is_go_ahead: run_diff = play.home_score - play.away_score for x in [play.on_first_final, play.on_second_final, play.on_third_final, play.batter_final]: runs_scored += 1 if x == 4 else 0 if play.inning_half == 'top': run_diff -= runs_scored else: run_diff += runs_scored logger.info(f'run_diff for go-ahead: {run_diff}') logger.info(f'go-ahead play: {play}') count = 1 for runner, dest in [(play.on_third, play.on_third_final), (play.on_second, play.on_second_final), (play.on_first, play.on_first_final), (play.batter, play.batter_final)]: logger.info(f'Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}') if dest == 4 and count == abs(run_diff): winning_play = get_players_last_pa(session, runner) loser = winning_play.pitcher logger.info(f'Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}') if save == loser: logger.info(f'Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}') b_save.append(loser) winner = home_pitcher if play.inning_half == 'bot' else away_pitcher logger.info(f'Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}') break count += 1 if winner is None: logger.info(f'Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}') winner = home_pitcher if play.inning_half == 'bot' else away_pitcher if loser is None: logger.info(f'Game {this_game.id} | Setting loser to {loser} by default on play #{play.play_num}') loser = play.pitcher if play.is_tied and runs_scored == 0: logger.info(f'Game {this_game.id} | Clearing winner and loser on play #{play.play_num}') winner, loser = None, None if save is not None: logger.info(f'Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}') b_save.append(save) save = None if play.pitcher_id not in decisions: logger.info(f'Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}') decisions[play.pitcher.player_id] = DecisionModel( game_id=db_game_id, season=this_game.season, week=0 if this_game.week is None else this_game.week, pitcher_id=play.pitcher.player_id, pitcher_team_id=play.pitcher.team_id ) logger.info(f'winner: {winner} / loser: {loser}') decisions[winner.player_id].win = 1 decisions[loser.player_id].loss = 1 decisions[away_starter.player_id].is_start = True decisions[home_starter.player_id].is_start = True decisions[away_finisher.player_id].game_finished = 1 decisions[home_finisher.player_id].game_finished = 1 for lineup in holds: decisions[lineup.player_id].hold = 1 if save is not None: decisions[save.player_id].is_save = 1 decisions[save.player_id].hold = 0 for lineup in b_save: decisions[lineup.player_id].b_save = 1 decisions[lineup.player_id].save = 0 return [x.model_dump() for x in decisions.values()] async def post_game_rewards(session: Session, winning_team: Team, losing_team: Team, this_game: Game): wr_query = await db_get( 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Win')]) lr_query = await db_get( 'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Loss')]) if not wr_query['count'] or not lr_query['count']: raise DatabaseError(f'Game rewards were not found. Leaving this game active.') win_reward = wr_query['gamerewards'][0] loss_reward = lr_query['gamerewards'][0] win_string = f'1x {win_reward["pack_type"]["name"]} Pack\n' # Post Campaign Team Choice packs if this_game.ai_team is not None and losing_team.is_ai and 'gauntlet' not in this_game.game_type and not this_game.short_game: g_query = await db_get('games', params=[('team1_id', winning_team.id), ('season', this_game.season), ('forfeit', False)]) win_points = 0 for x in g_query['games']: if (x['away_score'] > x['home_score'] and x['away_team']['id'] == winning_team.id) or (x['home_score'] > x['away_score'] and x['home_team']['id'] == winning_team.id): if x['game_type'] == 'minor-league': win_points += 1 elif x['game_type'] in ['major-league', 'flashback']: win_points += 2 elif x['game_type'] == 'hall-of-fame': win_points += 3 if this_game.game_type == 'minor-league': this_game_points = 1 elif this_game.game_type in ['major-league', 'flashback']: this_game_points = 2 else: this_game_points = 3 pre_game_points = win_points - this_game_points if math.floor(win_points / 6) > math.floor(pre_game_points / 6): await db_post('packs/one', payload={ 'team_id': winning_team.id, 'pack_type_id': 8, 'pack_team_id': losing_team.id }) win_string += f'1x {losing_team.abbrev} Team Choice pack\n' win_string += f'{win_reward["money"]}₼\n' loss_string = f'{loss_reward["money"]}₼\n' if 'gauntlet' in this_game.game_type: winning_abbrev = winning_team.abbrev.lower() if 'gauntlet' in winning_abbrev: winning_team = await get_team_or_none(session, team_abbrev=winning_abbrev.split('-')[1]) if winning_team is None: raise DatabaseError(f'Main team not found for {winning_abbrev}') losing_abbrev = losing_team.abbrev.lower() if 'gauntlet' in losing_abbrev: losing_team = await get_team_or_none(session, team_abbrev=losing_abbrev.split('-')[1]) if losing_team is None: raise DatabaseError(f'Main team not found for {losing_abbrev}') await db_post('packs/one', payload={'team_id': winning_team.id, 'pack_type_id': win_reward['pack_type']['id']}) await db_post(f'teams/{winning_team.id}/money/{win_reward["money"]}') await db_post(f'teams/{losing_team.id}/money/{loss_reward["money"]}') return win_string, loss_string def get_available_subs(session: Session, this_game: Game, this_team: Team) -> list[Card]: team_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team)).all() used_card_ids = [x.card.id for x in team_lineups] all_roster_links = session.exec(select(RosterLink).where(RosterLink.game == this_game, RosterLink.team == this_team)).all() return [x.card for x in all_roster_links if x.card_id not in used_card_ids] def get_available_pitchers(session: Session, this_game: Game, this_team: Team, sort: Literal['starter-desc', 'closer-desc'] = 'closer-desc') -> list[Card]: logger.info(f'getting available pitchers for team {this_team.id} in game {this_game.id}') all_subs = get_available_subs(session, this_game, this_team) logger.info(f'all_subs: {all_subs}') pitchers = [x for x in all_subs if x.pitcherscouting is not None] logger.info(f'pitchers: {pitchers}') def sort_by_pow(this_card: Card): s_pow = this_card.pitcherscouting.pitchingcard.starter_rating r_pow = this_card.pitcherscouting.pitchingcard.relief_rating c_pow = this_card.pitcherscouting.pitchingcard.closer_rating if sort == 'starter-desc': r_val = (s_pow * 3) + r_pow else: r_val = (c_pow * 10) - (r_pow * 5) - (s_pow * 3) return r_val pitchers.sort(key=sort_by_pow, reverse=True) return pitchers