New Position exception Pull scouting data with lineups More bunt types String validation on gameplay models AI Defensive alignment
800 lines
35 KiB
Python
800 lines
35 KiB
Python
import datetime
|
|
import logging
|
|
import math
|
|
from typing import Literal
|
|
|
|
import pydantic
|
|
from sqlalchemy import func
|
|
from api_calls import db_get, db_post
|
|
from in_game.gameplay_models import CACHE_LIMIT, BatterScouting, BatterScoutingBase, BattingCard, BattingCardBase, BattingRatings, BattingRatingsBase, Card, CardBase, Lineup, PitcherScouting, PitchingCard, PitchingCardBase, PitchingRatings, PitchingRatingsBase, Player, PlayerBase, PositionRating, PositionRatingBase, Session, Team, TeamBase, select, or_, Game, Play
|
|
from exceptions import DatabaseError, PositionNotFoundException, log_exception, PlayNotFoundException
|
|
|
|
|
|
logger = logging.getLogger('discord_app')
|
|
|
|
|
|
class DecisionModel(pydantic.BaseModel):
|
|
game_id: int
|
|
season: int
|
|
week: int
|
|
pitcher_id: int
|
|
pitcher_team_id: int
|
|
win: int = 0
|
|
loss: int = 0
|
|
hold: int = 0
|
|
is_save: int = 0
|
|
is_start: bool = False
|
|
b_save: int = 0
|
|
irunners: int = 0
|
|
irunners_scored: int = 0
|
|
rest_ip: float = 0
|
|
rest_required: int = 0
|
|
game_finished: int = 0
|
|
|
|
|
|
def get_games_by_channel(session: Session, channel_id: int) -> list[Game]:
|
|
logger.info(f'Getting games in channel {channel_id}')
|
|
return session.exec(select(Game).where(Game.channel_id == channel_id, Game.active)).all()
|
|
|
|
|
|
def get_channel_game_or_none(session: Session, channel_id: int) -> Game | None:
|
|
logger.info(f'Getting one game from channel {channel_id}')
|
|
all_games = get_games_by_channel(session, channel_id)
|
|
if len(all_games) > 1:
|
|
err = 'Too many games found in get_channel_game_or_none'
|
|
logger.error(f'cogs.gameplay - get_channel_game_or_none - channel_id: {channel_id} / {err}')
|
|
raise Exception(err)
|
|
elif len(all_games) == 0:
|
|
return None
|
|
return all_games[0]
|
|
|
|
|
|
def get_active_games_by_team(session: Session, team: Team) -> list[Game]:
|
|
logger.info(f'Getting game for team {team.lname}')
|
|
return session.exec(select(Game).where(Game.active, or_(Game.away_team_id == team.id, Game.home_team_id == team.id))).all()
|
|
|
|
|
|
async def get_team_or_none(
|
|
session: Session, team_id: int | None = None, gm_id: int | None = None, team_abbrev: str | None = None, skip_cache: bool = False) -> Team | None:
|
|
logger.info(f'Getting team or none / team_id: {team_id} / gm_id: {gm_id} / team_abbrev: {team_abbrev} / skip_cache: {skip_cache}')
|
|
if team_id is None and gm_id is None and team_abbrev is None:
|
|
err = 'One of "team_id", "gm_id", or "team_abbrev" must be included in search'
|
|
logger.error(f'gameplay_models - get_team - {err}')
|
|
raise TypeError(err)
|
|
|
|
if not skip_cache:
|
|
if team_id is not None:
|
|
this_team = session.get(Team, team_id)
|
|
else:
|
|
if gm_id is not None:
|
|
statement = select(Team).where(Team.gmid == gm_id)
|
|
else:
|
|
statement = select(Team).where(func.lower(Team.abbrev) == team_abbrev.lower())
|
|
this_team = session.exec(statement).one_or_none()
|
|
|
|
if this_team is not None:
|
|
logger.debug(f'we found a team: {this_team} / created: {this_team.created}')
|
|
tdelta = datetime.datetime.now() - this_team.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_team
|
|
else:
|
|
session.delete(this_team)
|
|
session.commit()
|
|
|
|
def cache_team(json_data: dict) -> Team:
|
|
# logger.info(f'gameplay_models - get_team - cache_team - writing a team to cache: {json_data}')
|
|
valid_team = TeamBase.model_validate(json_data, from_attributes=True)
|
|
# logger.info(f'gameplay_models - get_team - cache_team - valid_team: {valid_team}')
|
|
db_team = Team.model_validate(valid_team)
|
|
# logger.info(f'gameplay_models - get_team - cache_team - db_team: {db_team}')
|
|
session.add(db_team)
|
|
session.commit()
|
|
session.refresh(db_team)
|
|
return db_team
|
|
|
|
if team_id is not None:
|
|
t_query = await db_get('teams', object_id=team_id, params=[('inc_packs', False)])
|
|
if t_query is not None:
|
|
return cache_team(t_query)
|
|
|
|
elif gm_id is not None:
|
|
t_query = await db_get('teams', params=[('gm_id', gm_id)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
elif team_abbrev is not None:
|
|
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
|
|
if t_query['count'] != 0:
|
|
for team in [x for x in t_query['teams'] if 'gauntlet' not in x['abbrev'].lower()]:
|
|
return cache_team(team)
|
|
|
|
return None
|
|
|
|
|
|
async def get_player_or_none(session: Session, player_id: int, skip_cache: bool = False) -> Player | None:
|
|
logger.info(f'gameplay_models - get_player_or_none - player_id: {player_id}')
|
|
if not skip_cache:
|
|
this_player = session.get(Player, player_id)
|
|
|
|
if this_player is not None:
|
|
logger.info(f'we found a cached player: {this_player} / created: {this_player.created}')
|
|
tdelta = datetime.datetime.now() - this_player.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_player
|
|
else:
|
|
session.delete(this_player)
|
|
session.commit()
|
|
|
|
def cache_player(json_data: dict) -> Player:
|
|
logger.info(f'gameplay_models - get_player_or_none - cache_player - caching player data: {json_data}')
|
|
valid_player = PlayerBase.model_validate(json_data, from_attributes=True)
|
|
db_player = Player.model_validate(valid_player)
|
|
session.add(db_player)
|
|
session.commit()
|
|
session.refresh(db_player)
|
|
return db_player
|
|
|
|
p_query = await db_get('players', object_id=player_id, params=[('inc_dex', False)])
|
|
if p_query is not None:
|
|
if 'id' not in p_query:
|
|
p_query['id'] = p_query['player_id']
|
|
if 'name' not in p_query:
|
|
p_query['name'] = p_query['p_name']
|
|
return cache_player(p_query)
|
|
|
|
return None
|
|
|
|
|
|
async def get_batter_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> BatterScouting | None:
|
|
logger.info(f'Getting batting scouting for card ID #{card.id}: {card.player.name_with_desc}')
|
|
if not skip_cache and card.batterscouting is not None:
|
|
this_scouting = session.get(BatterScouting, card.batterscouting.id)
|
|
|
|
if this_scouting is not None:
|
|
logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}')
|
|
tdelta = datetime.datetime.now() - this_scouting.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_scouting
|
|
else:
|
|
session.delete(this_scouting)
|
|
session.commit()
|
|
|
|
def cache_scouting(batting_card: dict, ratings_vr: dict, ratings_vl: dict) -> BatterScouting:
|
|
valid_bc = BattingCardBase.model_validate(batting_card, from_attributes=True)
|
|
db_bc = BattingCard.model_validate(valid_bc)
|
|
|
|
valid_vl = BattingRatingsBase.model_validate(ratings_vl, from_attributes=True)
|
|
db_vl = BattingRatings.model_validate(valid_vl)
|
|
|
|
valid_vr = BattingRatingsBase.model_validate(ratings_vr, from_attributes=True)
|
|
db_vr = BattingRatings.model_validate(valid_vr)
|
|
|
|
db_scouting = BatterScouting(
|
|
battingcard=db_bc,
|
|
ratings_vl=db_vl,
|
|
ratings_vr=db_vr
|
|
)
|
|
|
|
session.add(db_scouting)
|
|
session.commit()
|
|
session.refresh(db_scouting)
|
|
return db_scouting
|
|
|
|
s_query = await db_get(f'battingcardratings/player/{card.player.id}', none_okay=False)
|
|
if s_query['count'] == 2:
|
|
return cache_scouting(
|
|
batting_card=s_query['ratings'][0]['battingcard'],
|
|
ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1],
|
|
ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1]
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
async def get_pitcher_scouting_or_none(session: Session, card: Card, skip_cache: bool = False) -> PitcherScouting | None:
|
|
logger.info(f'Getting pitching scouting for card ID #{card.id}: {card.player.name_with_desc}')
|
|
if not skip_cache and card.pitcherscouting is not None:
|
|
this_scouting = session.get(PitcherScouting, card.pitcherscouting.id)
|
|
|
|
if this_scouting is not None:
|
|
logger.info(f'we found a cached scouting: {this_scouting} / created {this_scouting.created}')
|
|
tdelta = datetime.datetime.now() - this_scouting.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_scouting
|
|
else:
|
|
session.delete(this_scouting)
|
|
session.commit()
|
|
|
|
def cache_scouting(pitching_card: dict, ratings_vr: dict, ratings_vl: dict) -> PitcherScouting:
|
|
valid_bc = PitchingCardBase.model_validate(pitching_card, from_attributes=True)
|
|
db_bc = PitchingCard.model_validate(valid_bc)
|
|
|
|
valid_vl = PitchingRatingsBase.model_validate(ratings_vl, from_attributes=True)
|
|
db_vl = PitchingRatings.model_validate(valid_vl)
|
|
|
|
valid_vr = PitchingRatingsBase.model_validate(ratings_vr, from_attributes=True)
|
|
db_vr = PitchingRatings.model_validate(valid_vr)
|
|
|
|
db_scouting = PitcherScouting(
|
|
pitchingcard=db_bc,
|
|
ratings_vl=db_vl,
|
|
ratings_vr=db_vr
|
|
)
|
|
|
|
session.add(db_scouting)
|
|
session.commit()
|
|
session.refresh(db_scouting)
|
|
return db_scouting
|
|
|
|
s_query = await db_get(f'pitchingcardratings/player/{card.player.id}', none_okay=False)
|
|
if s_query['count'] == 2:
|
|
scouting = cache_scouting(
|
|
pitching_card=s_query['ratings'][0]['pitchingcard'],
|
|
ratings_vr=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'R' else s_query['ratings'][1],
|
|
ratings_vl=s_query['ratings'][0] if s_query['ratings'][0]['vs_hand'] == 'L' else s_query['ratings'][1]
|
|
)
|
|
pos_rating = await get_and_cache_position(session, card, 'P')
|
|
return scouting
|
|
|
|
return None
|
|
|
|
|
|
# async def get_position_rating_or_none(session: Session, card: Card, position: str, skip_cache: bool = False) -> PositionRating | None:
|
|
# logger.info(f'Getting position rating for card ID')
|
|
# if not skip_cache:
|
|
# ratings = session.exec(select(PositionRating).where(PositionRating.player == card.player, PositionRating.variant == card.variant, PositionRating.position == position).limit(1)).all()
|
|
|
|
# """Test all of this; rebuild DB"""
|
|
|
|
# if len(ratings) > 0:
|
|
# logger.info(f'we found a cached position: {ratings[0]} / created {ratings[0].created}')
|
|
# tdelta = datetime.datetime.now() - ratings[0].created
|
|
# logger.debug(f'tdelta: {tdelta}')
|
|
# if tdelta.total_seconds() < CACHE_LIMIT:
|
|
# return ratings[0]
|
|
# else:
|
|
# session.delete(ratings[0])
|
|
# session.commit()
|
|
|
|
# def cache_rating(json_data: dict) -> PositionRating:
|
|
# valid_position = PositionRatingBase.model_validate(json_data, from_attributes=True)
|
|
# db_position = PositionRating.model_validate(valid_position)
|
|
# session.add(db_position)
|
|
# session.commit()
|
|
# session.refresh(db_position)
|
|
# return db_position
|
|
|
|
# p_query = await db_get('cardpositions', params=[('player_id', card.player.id)])
|
|
# if p_query['count'] > 0:
|
|
# return cache_rating(p_query['positions'][0])
|
|
|
|
# return None
|
|
|
|
|
|
def get_player_id_from_dict(json_data: dict) -> int:
|
|
logger.info(f'Getting player from dict {json_data}')
|
|
if 'player_id' in json_data:
|
|
return json_data['player_id']
|
|
elif 'id' in json_data:
|
|
return json_data['id']
|
|
log_exception(KeyError, 'Player ID could not be extracted from json data')
|
|
|
|
|
|
def get_player_name_from_dict(json_data: dict) -> str:
|
|
logger.info(f'Getting player from dict {json_data}')
|
|
if 'name' in json_data:
|
|
return json_data['name']
|
|
elif 'p_name' in json_data:
|
|
return json_data['p_name']
|
|
log_exception(KeyError, 'Player name could not be extracted from json data')
|
|
|
|
|
|
async def shared_get_scouting(session: Session, this_card: Card, which: Literal['batter', 'pitcher']):
|
|
if which == 'batter':
|
|
logger.info(f'Pulling batter scouting for {this_card.player.name_with_desc}')
|
|
this_scouting = await get_batter_scouting_or_none(session, this_card)
|
|
else:
|
|
logger.info(f'Pulling pitcher scouting for {this_card.player.name_with_desc}')
|
|
this_scouting = await get_pitcher_scouting_or_none(session, this_card)
|
|
|
|
logger.info(f'this_scouting: {this_scouting}')
|
|
return this_scouting
|
|
|
|
|
|
async def get_and_cache_position(session: Session, this_card: Card, position: Literal['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF'], skip_cache: bool = False):
|
|
logger.info(f'Pulling position rating for {this_card.player.name_with_desc} at {position}')
|
|
if not skip_cache:
|
|
this_pos = session.exec(select(PositionRating).where(PositionRating.player_id == this_card.player.id, PositionRating.position == position, PositionRating.variant == this_card.variant)).all()
|
|
logger.info(f'Ratings found: {len(this_pos)}')
|
|
|
|
if len(this_pos) > 0:
|
|
logger.info(f'we found a cached position rating: {this_pos[0]} / created: {this_pos[0].created}')
|
|
tdelta = datetime.datetime.now() - this_pos[0].created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_pos[0]
|
|
else:
|
|
session.delete(this_pos[0])
|
|
session.commit()
|
|
|
|
def cache_pos(json_data: dict) -> PositionRating:
|
|
if 'id' in json_data:
|
|
del json_data['id']
|
|
valid_pos = PositionRatingBase.model_validate(json_data, from_attributes=True)
|
|
db_pos = PositionRating.model_validate(valid_pos)
|
|
session.add(db_pos)
|
|
session.commit()
|
|
session.refresh(db_pos)
|
|
return db_pos
|
|
|
|
p_query = await db_get('cardpositions', params=[('player_id', this_card.player.id), ('position', position)])
|
|
if p_query['count'] > 0:
|
|
json_data = p_query['positions'][0]
|
|
json_data['player_id'] = get_player_id_from_dict(json_data['player'])
|
|
this_pos = cache_pos(json_data)
|
|
|
|
session.add(this_pos)
|
|
session.commit()
|
|
session.refresh(this_pos)
|
|
|
|
return this_card
|
|
|
|
log_exception(PositionNotFoundException, f'{position} ratings not found for {this_card.player.name_with_desc}')
|
|
|
|
|
|
|
|
async def get_or_create_ai_card(session: Session, player: Player, team: Team, skip_cache: bool = False, dev_mode: bool = False) -> Card:
|
|
logger.info(f'Getting or creating card for {player.name_with_desc} on the {team.sname}')
|
|
if not team.is_ai:
|
|
err = f'Cannot create AI cards for human teams'
|
|
logger.error(f'gameplay_models - get_or_create_ai_card: {err}')
|
|
raise TypeError(err)
|
|
|
|
logger.info(f'gameplay_models - get_or_create_ai_card - player.id: {player.id} / team.id: {team.id}')
|
|
if not skip_cache:
|
|
c_query = session.exec(select(Card).where(Card.player == player, Card.team == team)).all()
|
|
|
|
if len(c_query) > 0:
|
|
this_card = c_query[0]
|
|
logger.info(f'we found a cached card: {this_card} / created: {this_card.created}')
|
|
tdelta = datetime.datetime.now() - this_card.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_card
|
|
else:
|
|
session.delete(this_card)
|
|
session.commit()
|
|
|
|
async def pull_card(p: Player, t: Team):
|
|
c_query = await db_get('cards', params=[('team_id', t.id), ('player_id', p.id)])
|
|
if c_query['count'] > 0:
|
|
json_data = c_query['cards'][0]
|
|
logger.info(f'gameplay_models - get_or_create_ai_card - pull_card - caching json_data: {json_data}')
|
|
json_data['team_id'] = json_data['team']['id']
|
|
json_data['player_id'] = get_player_id_from_dict(json_data['player'])
|
|
valid_card = CardBase.model_validate(c_query['cards'][0], from_attributes=True)
|
|
db_card = Card.model_validate(valid_card)
|
|
session.add(db_card)
|
|
session.commit()
|
|
session.refresh(db_card)
|
|
return db_card
|
|
else:
|
|
return None
|
|
|
|
this_card = await pull_card(player, team)
|
|
if this_card is not None:
|
|
if player.pos_1 not in ['SP', 'RP']:
|
|
this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter')
|
|
else:
|
|
this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher')
|
|
|
|
session.add(this_card)
|
|
session.commit()
|
|
session.refresh(this_card)
|
|
|
|
return this_card
|
|
|
|
logger.info(f'gameplay_models - get_or_create_ai_card: creating {player.description} {player.name} card for {team.abbrev}')
|
|
|
|
if dev_mode:
|
|
this_card = Card(player=player, team=team)
|
|
session.add(this_card)
|
|
session.commit()
|
|
session.refresh(this_card)
|
|
return this_card
|
|
|
|
await db_post(
|
|
'cards',
|
|
payload={'cards': [
|
|
{'player_id': player.id, 'team_id': team.id, 'pack_id': 1}
|
|
]}
|
|
)
|
|
|
|
this_card = await pull_card(player, team)
|
|
if this_card is not None:
|
|
if player.pos_1 not in ['SP', 'RP']:
|
|
this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter')
|
|
else:
|
|
this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher')
|
|
|
|
session.add(this_card)
|
|
session.commit()
|
|
session.refresh(this_card)
|
|
|
|
return this_card
|
|
|
|
err = f'Could not create {player.name} card for {team.abbrev}'
|
|
logger.error(f'gameplay_models - get_or_create_ai_card - {err}')
|
|
raise LookupError(err)
|
|
|
|
|
|
async def get_card_or_none(session: Session, card_id: int, skip_cache: bool = False) -> Card | None:
|
|
logger.info(f'Getting card {card_id}')
|
|
if not skip_cache:
|
|
this_card = session.get(Card, card_id)
|
|
|
|
if this_card is not None:
|
|
logger.info(f'we found a cached card: {this_card} / created: {this_card.created}')
|
|
tdelta = datetime.datetime.now() - this_card.created
|
|
logger.debug(f'tdelta: {tdelta}')
|
|
if tdelta.total_seconds() < CACHE_LIMIT:
|
|
return this_card
|
|
else:
|
|
session.delete(this_card)
|
|
session.commit()
|
|
|
|
def cache_card(json_data: dict) -> Card:
|
|
valid_card = CardBase.model_validate(json_data, from_attributes=True)
|
|
db_card = Card.model_validate(valid_card)
|
|
session.add(db_card)
|
|
session.commit()
|
|
session.refresh(db_card)
|
|
return db_card
|
|
|
|
c_query = await db_get('cards', object_id=card_id)
|
|
if c_query is not None:
|
|
c_query['team_id'] = c_query['team']['id']
|
|
c_query['player_id'] = get_player_id_from_dict(c_query['player'])
|
|
|
|
this_player = await get_player_or_none(session, player_id=c_query['player_id'])
|
|
this_team = await get_team_or_none(session, team_id=c_query['team_id'])
|
|
|
|
if this_player is None:
|
|
raise LookupError(f'Player ID {c_query["player_id"]} not found during card check')
|
|
if this_team is None:
|
|
raise LookupError(f'Team ID {c_query["team_id"]} not found during card check')
|
|
|
|
logger.info(f'Caching card ID {card_id} now')
|
|
this_card = cache_card(c_query)
|
|
|
|
if this_player.pos_1 not in ['SP', 'RP']:
|
|
this_card.batterscouting = await shared_get_scouting(session, this_card, 'batter')
|
|
else:
|
|
this_card.pitcherscouting = await shared_get_scouting(session, this_card, 'pitcher')
|
|
|
|
session.add(this_card)
|
|
session.commit()
|
|
session.refresh(this_card)
|
|
|
|
return this_card
|
|
|
|
return None
|
|
|
|
|
|
def get_game_lineups(session: Session, this_game: Game, specific_team: Team = None, is_active: bool = None) -> list[Lineup]:
|
|
logger.info(f'Getting lineups for game {this_game.id} / specific_team: {specific_team} / is_active: {is_active}')
|
|
st = select(Lineup).where(Lineup.game == this_game)
|
|
|
|
if specific_team is not None:
|
|
st = st.where(Lineup.team == specific_team)
|
|
if is_active is not None:
|
|
st = st.where(Lineup.active == is_active)
|
|
|
|
return session.exec(st).all()
|
|
|
|
|
|
def get_players_last_pa(session: Session, lineup_member: Lineup, none_okay: bool = False):
|
|
logger.info(f'Getting last AB for {lineup_member.player.name_with_desc} on the {lineup_member.team.lname}')
|
|
last_pa = session.exec(select(Play).where(Play.game == lineup_member.game, Play.batter == lineup_member).order_by(Play.id.desc()).limit(1)).all()
|
|
if len(last_pa) == 1:
|
|
return last_pa[0]
|
|
else:
|
|
if none_okay:
|
|
return None
|
|
else:
|
|
log_exception(PlayNotFoundException, f'No play found for {lineup_member.player.name_with_desc}\'s last AB')
|
|
|
|
|
|
def get_one_lineup(session: Session, this_game: Game, this_team: Team, active: bool = True, position: str = None, batting_order: int = None) -> Lineup:
|
|
logger.info(f'Getting one lineup / this_game: {this_game.id} / this_team: {this_team.lname} / active: {active}, position: {position}, batting_order: {batting_order}')
|
|
if position is None and batting_order is None:
|
|
raise KeyError('Position or batting order must be provided for get_one_lineup')
|
|
|
|
st = select(Lineup).where(Lineup.game == this_game, Lineup.team == this_team, Lineup.active == active)
|
|
if position is not None:
|
|
st = st.where(Lineup.position == position)
|
|
else:
|
|
st = st.where(Lineup.batting_order == batting_order)
|
|
|
|
return session.exec(st).one()
|
|
|
|
|
|
def get_last_team_play(session: Session, this_game: Game, this_team: Team, none_okay: bool = False):
|
|
logger.info(f'Getting last play for the {this_team.lname} in game {this_game.id}')
|
|
last_play = session.exec(select(Play).join(Lineup, onclause=Lineup.id == Play.batter_id).where(Play.game == this_game, Lineup.team == this_team).order_by(Play.id.desc()).limit(1)).all()
|
|
|
|
if len(last_play) == 1:
|
|
return last_play[0]
|
|
else:
|
|
if none_okay:
|
|
return None
|
|
else:
|
|
log_exception(PlayNotFoundException, f'No last play found for the {this_team.sname}')
|
|
|
|
|
|
def get_sorted_lineups(session: Session, this_game: Game, this_team: Team) -> list[Lineup]:
|
|
logger.info(f'Getting sorted lineups for the {this_team.lname} in game {this_game.id}')
|
|
custom_order = {'P': 1, 'C': 2, '1B': 3, '2B': 4, '3B': 5, 'SS': 6, 'LF': 7, 'CF': 8, 'RF': 9}
|
|
|
|
all_lineups = session.exec(select(Lineup).where(Lineup.game == this_game, Lineup.active == True, Lineup.team == this_team)).all()
|
|
|
|
sorted_lineups = sorted(all_lineups, key=lambda x: custom_order.get(x.position, float('inf')))
|
|
return sorted_lineups
|
|
|
|
|
|
def get_db_ready_plays(session: Session, this_game: Game, db_game_id: int):
|
|
logger.info(f'Getting db ready plays for game {this_game.id}')
|
|
all_plays = session.exec(select(Play).where(Play.game == this_game)).all()
|
|
|
|
obc_list = ['000', '001', '010', '100', '011', '101', '110', '111']
|
|
|
|
return_plays = []
|
|
for play in all_plays:
|
|
dump = play.model_dump()
|
|
dump['game_id'] = db_game_id
|
|
dump['on_base_code'] = obc_list[play.on_base_code]
|
|
dump['batter_id'] = play.batter.player.id
|
|
dump['pitcher_id'] = play.pitcher.player.id
|
|
dump['catcher_id'] = play.catcher.player.id
|
|
if 'runner_id' in dump and dump['runner_id'] is not None:
|
|
dump['runner_id'] = play.runner.player.id
|
|
if 'defender_id' in dump and dump['defender_id'] is not None:
|
|
dump['defender_id'] = play.defender.player.id
|
|
if 'on_first_id' in dump and dump['on_first_id'] is not None:
|
|
dump['on_first_id'] = play.on_first.player.id
|
|
if 'on_second_id' in dump and dump['on_second_id'] is not None:
|
|
dump['on_second_id'] = play.on_second.player.id
|
|
if 'on_third_id' in dump and dump['on_third_id'] is not None:
|
|
dump['on_third_id'] = play.on_third.player.id
|
|
return_plays.append(dump)
|
|
|
|
return {'plays': return_plays}
|
|
|
|
|
|
def get_db_ready_decisions(session: Session, this_game: Game, db_game_id: int) -> list[DecisionModel]:
|
|
logger.info(f'Game {this_game.id} | Getting db ready decisions for game {this_game.id}')
|
|
winner = None
|
|
loser = None
|
|
save = None
|
|
away_starter = None
|
|
home_starter = None
|
|
away_finisher = None
|
|
home_finisher = None
|
|
b_save = []
|
|
holds = []
|
|
|
|
away_pitcher = None
|
|
home_pitcher = None
|
|
|
|
decisions = {
|
|
# { <player_id>: DecisionModel }
|
|
}
|
|
|
|
final_inning = session.exec(select(func.max(Play.inning_num)).where(Play.game == this_game)).one()
|
|
|
|
# Get starting pitchers and update this as a pointer for the play crawl
|
|
for play in session.exec(select(Play).where(Play.game == this_game)).all():
|
|
logger.info(f'Game {this_game.id} | Crawling play #{play.play_num}')
|
|
runs_scored = 0
|
|
if play.inning_half == 'top':
|
|
if home_starter is None:
|
|
logger.info(f'Game {this_game.id} | Setting home starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
home_starter = play.pitcher
|
|
|
|
if home_finisher is None:
|
|
logger.info(f'Game {this_game.id} | Setting home finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
home_finisher = play.pitcher
|
|
|
|
if home_pitcher != play.pitcher:
|
|
logger.info(f'Game {this_game.id} | Setting home pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
home_pitcher = play.pitcher
|
|
|
|
if save == play.pitcher:
|
|
if play.home_score > play.away_score:
|
|
if play.pitcher not in holds:
|
|
logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}')
|
|
holds.append(play.pitcher)
|
|
else:
|
|
if play.pitcher not in b_save:
|
|
logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}')
|
|
b_save.append(play.pitcher)
|
|
|
|
elif play.home_score > play.away_score and play.home_score - play.away_score <= 3 and home_pitcher != home_starter and play.inning_num + 2 >= final_inning:
|
|
logger.info(f'Game {this_game.id} | Setting {play.pitcher.player.name_with_desc} to save on play #{play.play_num}')
|
|
save = home_pitcher
|
|
|
|
elif play.inning_half == 'bot':
|
|
if away_starter is None:
|
|
logger.info(f'Game {this_game.id} | Setting away starter to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
away_starter = play.pitcher
|
|
|
|
if away_finisher is None:
|
|
logger.info(f'Game {this_game.id} | Setting away finisher to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
away_finisher = play.pitcher
|
|
|
|
if away_pitcher != play.pitcher:
|
|
logger.info(f'Game {this_game.id} | Setting away pitcher to {play.pitcher.player.name_with_desc} on play #{play.play_num}')
|
|
away_pitcher = play.pitcher
|
|
|
|
if save == play.pitcher:
|
|
if play.away_score > play.home_score:
|
|
if play.pitcher not in holds:
|
|
logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to holds on play #{play.play_num}')
|
|
holds.append(play.pitcher)
|
|
else:
|
|
if play.pitcher not in b_save:
|
|
logger.info(f'Game {this_game.id} | Appending {play.pitcher.player.name_with_desc} to blown saves on play #{play.play_num}')
|
|
b_save.append(play.pitcher)
|
|
|
|
if play.is_go_ahead:
|
|
run_diff = play.home_score - play.away_score
|
|
for x in [play.on_first_final, play.on_second_final, play.on_third_final, play.batter_final]:
|
|
runs_scored += 1 if x == 4 else 0
|
|
|
|
if play.inning_half == 'top':
|
|
run_diff -= runs_scored
|
|
else:
|
|
run_diff += runs_scored
|
|
|
|
logger.info(f'run_diff for go-ahead: {run_diff}')
|
|
logger.info(f'go-ahead play: {play}')
|
|
count = 1
|
|
|
|
for runner, dest in [(play.on_third, play.on_third_final), (play.on_second, play.on_second_final), (play.on_first, play.on_first_final), (play.batter, play.batter_final)]:
|
|
logger.info(f'Game {this_game.id} | Looking for go-ahead runner / runner, dest: {runner}, {dest} / count: {count}')
|
|
if dest == 4 and count == abs(run_diff):
|
|
winning_play = get_players_last_pa(session, runner)
|
|
|
|
loser = winning_play.pitcher
|
|
logger.info(f'Game {this_game.id} | Setting loser to {loser} on play #{play.play_num}')
|
|
|
|
if save == loser:
|
|
logger.info(f'Game {this_game.id} | Appending {loser} to blown saves on play #{play.play_num}')
|
|
b_save.append(loser)
|
|
|
|
winner = home_pitcher if play.inning_half == 'bot' else away_pitcher
|
|
logger.info(f'Game {this_game.id} | Setting winner to {winner} on play #{play.play_num}')
|
|
break
|
|
|
|
count += 1
|
|
|
|
if winner is None:
|
|
logger.info(f'Game {this_game.id} | Setting winner to {winner} by default on play #{play.play_num}')
|
|
winner = home_pitcher if play.inning_half == 'bot' else away_pitcher
|
|
|
|
if loser is None:
|
|
logger.info(f'Game {this_game.id} | Setting loser to {loser} by default on play #{play.play_num}')
|
|
loser = play.pitcher
|
|
|
|
if play.is_tied and runs_scored == 0:
|
|
logger.info(f'Game {this_game.id} | Clearing winner and loser on play #{play.play_num}')
|
|
winner, loser = None, None
|
|
|
|
if save is not None:
|
|
logger.info(f'Game {this_game.id} | Appending current save pitcher {save.player.name_with_desc} to blown saves and clearing save on play #{play.play_num}')
|
|
b_save.append(save)
|
|
save = None
|
|
|
|
if play.pitcher_id not in decisions:
|
|
logger.info(f'Game {this_game.id} | Adding {play.pitcher.player.name} to decisions dict on play #{play.play_num}')
|
|
decisions[play.pitcher.player_id] = DecisionModel(
|
|
game_id=db_game_id,
|
|
season=this_game.season,
|
|
week=0 if this_game.week is None else this_game.week,
|
|
pitcher_id=play.pitcher.player_id,
|
|
pitcher_team_id=play.pitcher.team_id
|
|
)
|
|
|
|
logger.info(f'winner: {winner} / loser: {loser}')
|
|
decisions[winner.player_id].win = 1
|
|
decisions[loser.player_id].loss = 1
|
|
decisions[away_starter.player_id].is_start = True
|
|
decisions[home_starter.player_id].is_start = True
|
|
decisions[away_finisher.player_id].game_finished = 1
|
|
decisions[home_finisher.player_id].game_finished = 1
|
|
|
|
for lineup in holds:
|
|
decisions[lineup.player_id].hold = 1
|
|
|
|
if save is not None:
|
|
decisions[save.player_id].is_save = 1
|
|
decisions[save.player_id].hold = 0
|
|
|
|
for lineup in b_save:
|
|
decisions[lineup.player_id].b_save = 1
|
|
decisions[lineup.player_id].save = 0
|
|
|
|
return [x.model_dump() for x in decisions.values()]
|
|
|
|
|
|
async def post_game_rewards(session: Session, winning_team: Team, losing_team: Team, this_game: Game):
|
|
wr_query = await db_get(
|
|
'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Win')])
|
|
lr_query = await db_get(
|
|
'gamerewards', params=[('name', f'{"Short" if this_game.short_game else "Full"} Game Loss')])
|
|
if not wr_query['count'] or not lr_query['count']:
|
|
raise DatabaseError(f'Game rewards were not found. Leaving this game active.')
|
|
|
|
win_reward = wr_query['gamerewards'][0]
|
|
loss_reward = lr_query['gamerewards'][0]
|
|
win_string = f'1x {win_reward["pack_type"]["name"]} Pack\n'
|
|
|
|
# Post Campaign Team Choice packs
|
|
if this_game.ai_team is not None and losing_team.is_ai and 'gauntlet' not in this_game.game_type and not this_game.short_game:
|
|
g_query = await db_get('games', params=[('team1_id', winning_team.id), ('season', this_game.season), ('forfeit', False)])
|
|
|
|
win_points = 0
|
|
for x in g_query['games']:
|
|
if (x['away_score'] > x['home_score'] and x['away_team']['id'] == winning_team.id) or (x['home_score'] > x['away_score'] and x['home_team']['id'] == winning_team.id):
|
|
if x['game_type'] == 'minor-league':
|
|
win_points += 1
|
|
|
|
elif x['game_type'] in ['major-league', 'flashback']:
|
|
win_points += 2
|
|
|
|
elif x['game_type'] == 'hall-of-fame':
|
|
win_points += 3
|
|
|
|
if this_game.game_type == 'minor-league':
|
|
this_game_points = 1
|
|
elif this_game.game_type in ['major-league', 'flashback']:
|
|
this_game_points = 2
|
|
else:
|
|
this_game_points = 3
|
|
|
|
pre_game_points = win_points - this_game_points
|
|
if math.floor(win_points / 6) > math.floor(pre_game_points / 6):
|
|
await db_post('packs/one', payload={
|
|
'team_id': winning_team.id,
|
|
'pack_type_id': 8,
|
|
'pack_team_id': losing_team.id
|
|
})
|
|
win_string += f'1x {losing_team.abbrev} Team Choice pack\n'
|
|
|
|
win_string += f'{win_reward["money"]}₼\n'
|
|
loss_string = f'{loss_reward["money"]}₼\n'
|
|
|
|
if 'gauntlet' in this_game.game_type:
|
|
winning_abbrev = winning_team.abbrev.lower()
|
|
if 'gauntlet' in winning_abbrev:
|
|
winning_team = await get_team_or_none(session, team_abbrev=winning_abbrev.split('-')[1])
|
|
if winning_team is None:
|
|
raise DatabaseError(f'Main team not found for {winning_abbrev}')
|
|
|
|
losing_abbrev = losing_team.abbrev.lower()
|
|
if 'gauntlet' in losing_abbrev:
|
|
losing_team = await get_team_or_none(session, team_abbrev=losing_abbrev.split('-')[1])
|
|
if losing_team is None:
|
|
raise DatabaseError(f'Main team not found for {losing_abbrev}')
|
|
|
|
await db_post('packs/one', payload={'team_id': winning_team.id, 'pack_type_id': win_reward['pack_type']['id']})
|
|
await db_post(f'teams/{winning_team.id}/money/{win_reward["money"]}')
|
|
await db_post(f'teams/{losing_team.id}/money/{loss_reward["money"]}')
|
|
|
|
return win_string, loss_string
|