paper-dynasty-discord/db_calls_gameplay.py
Cal Corum bfd72ae0f5 Update logging to RotatingFileHandler
Add auto game end
Calculate stats and decisions
Support raising instantiated exceptions
2024-11-09 23:14:54 -06:00

2471 lines
94 KiB
Python

import asyncio
import datetime
import logging
import random
import requests
import pydantic
from typing import Optional
from peewee import *
from playhouse.shortcuts import model_to_dict
from dataclasses import dataclass
from helpers import SBA_SEASON, PD_SEASON, get_player_url, get_sheets
from api_calls import db_get
from in_game.data_cache import get_pd_player, CardPosition, BattingCard, get_pd_team
db = SqliteDatabase(
'storage/gameplay-legacy.db',
pragmas={
'journal_mode': 'wal',
'cache_size': -1 * 64000,
'synchronous': 0
}
)
SBA_DB_URL = 'http://database/api'
logger = logging.getLogger('discord_app')
def param_char(other_params):
if other_params:
return '&'
else:
return '?'
def get_sba_team(id_or_abbrev, season=None):
req_url = f'{SBA_DB_URL}/v1/teams/{id_or_abbrev}'
if season:
req_url += f'?season={season}'
resp = requests.get(req_url, timeout=3)
if resp.status_code == 200:
return resp.json()
else:
logger.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
def get_sba_player(id_or_name, season=None):
req_url = f'{SBA_DB_URL}/v2/players/{id_or_name}'
if season is not None:
req_url += f'?season={season}'
resp = requests.get(req_url, timeout=3)
if resp.status_code == 200:
return resp.json()
else:
logger.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
def get_sba_team_by_owner(season, owner_id):
resp = requests.get(f'{SBA_DB_URL}/v1/teams?season={season}&owner_id={owner_id}&active_only=True', timeout=3)
if resp.status_code == 200:
full_resp = resp.json()
if len(full_resp['teams']) != 1:
raise ValueError(f'One team requested, but {len(full_resp)} were returned')
else:
return full_resp['teams'][0]
else:
logger.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
# def pd_await db_get(endpoint: str, api_ver: int = 1, object_id: int = None, params: list = None, none_okay: bool = True):
# req_url = pd_get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
# logger.info(f'get:\n{endpoint} id: {object_id} params: {params}')
#
# resp = requests.get(req_url, timeout=3)
# if resp.status_code == 200:
# data = resp.json()
# logger.info(f'return: {data}')
# return data
# elif none_okay:
# data = resp.json()
# logger.info(f'return: {data}')
# return None
# else:
# logger.warning(resp.text)
# raise ValueError(f'DB: {resp.text}')
# def pd_get_one_team(team_abbrev: str):
# team = pd_await db_get('teams', params=[('abbrev', team_abbrev), ('season', PD_SEASON)], none_okay=False)['teams'][0]
# return team
#
# # req_url = pd_get_req_url('teams', )
# #
# # resp = requests.get(req_url, timeout=3)
# # if resp.status_code == 200:
# # data = resp.json()
# # logger.info(f'return: {data}')
# # return data['teams'][0]
# # else:
# # logger.warning(resp.text)
# # raise ValueError(f'PD DB: {resp.text}')
# def pd_get_card_by_id(card_id: int):
# return pd_await db_get('cards', object_id=card_id, none_okay=False)
class DecisionModel(pydantic.BaseModel):
game_id: int
season: int
week: int
pitcher_id: int
pitcher_team_id: int
win: int = 0
loss: int = 0
hold: int = 0
is_save: int = 0
is_start: bool = False
b_save: int = 0
irunners: int = 0
irunners_scored: int = 0
rest_ip: float = 0
rest_required: int = 0
class BaseModel(Model):
class Meta:
database = db
class ManagerAi(BaseModel):
name = CharField()
steal = IntegerField(default=5)
running = IntegerField(default=5)
hold = IntegerField(default=5)
catcher_throw = IntegerField(default=5)
uncapped_home = IntegerField(default=5)
uncapped_third = IntegerField(default=5)
uncapped_trail = IntegerField(default=5)
bullpen_matchup = IntegerField(default=5)
behind_aggression = IntegerField(default=5)
ahead_aggression = IntegerField(default=5)
decide_throw = IntegerField(default=5)
def load_ai():
all_ai = [
{'name': 'Basic'}
]
for x in all_ai:
ManagerAi.create(**x)
return True
def ai_batting(this_game, this_play) -> bool:
return (this_play.inning_half == 'Top' and this_game.ai_team == 'away') or \
(this_play.inning_half == 'Bot' and this_game.ai_team == 'home')
db.create_tables([ManagerAi])
class Game(BaseModel):
away_team_id = IntegerField()
home_team_id = IntegerField()
week_num = IntegerField(null=True)
game_num = IntegerField(null=True)
channel_id = IntegerField()
season = IntegerField()
active = BooleanField(default=True)
is_pd = BooleanField(default=False)
ranked = BooleanField(default=True)
short_game = BooleanField(default=False)
away_roster_num = IntegerField(null=True)
home_roster_num = IntegerField(null=True)
first_message = IntegerField(null=True)
ai_team = CharField(null=True)
game_type = CharField(default='minor-league')
cardset_ids = CharField(null=True)
backup_cardset_ids = CharField(null=True)
# TODO: add get_away_team and get_home_team that deals with SBa/PD and returns Team object
@dataclass
class StratGame:
id: int
away_team_id: int
home_team_id: int
channel_id: int
season: int
active: bool = True
is_pd: bool = False
ranked: bool = True
short_game: bool = False
week_num: int = None
game_num: int = None
away_roster_num: int = None
home_roster_num: int = None
first_message: int = None
ai_team: str = None
game_type: str = None
cardset_ids: str = None
backup_cardset_ids: str = None
db.create_tables([Game])
# class GameModel(pydantic.BaseModel):
# away_team_id: int
# home_team_id: int
# week_num: Optional[int] = None
# game_num: Optional[int] = None
# channel_id: int
# active: Optional[bool] = True
# is_pd: Optional[bool] = True
def count_team_games(team_id: int, active: bool = True):
all_games = Game.select().where(
((Game.away_team_id == team_id) | (Game.home_team_id == team_id)) & (Game.active == active)
)
return {'count': all_games.count(), 'games': [model_to_dict(x) for x in all_games]}
def post_game(game_dict: dict):
# game_model = GameModel.parse_obj(game_dict)
# new_game = Game(
# away_team_id=game_model.away_team_id,
# home_team_id=game_model.home_team_id,
# week_num=game_model.week_num,
# game_num=game_model.game_num,
# channel_id=game_model.channel_id,
# active=game_model.active,
# is_pd=game_model.is_pd
# )
# new_game.save()
new_game = Game.create(**game_dict)
# return_game = model_to_dict(new_game)
return_game = StratGame(
new_game.id, new_game.away_team_id, new_game.home_team_id, new_game.channel_id, new_game.season,
new_game.active, new_game.is_pd, new_game.ranked, new_game.short_game, new_game.week_num, new_game.game_num,
new_game.away_roster_num, new_game.home_roster_num, new_game.first_message, new_game.ai_team,
new_game.game_type, new_game.cardset_ids, new_game.backup_cardset_ids
)
db.close()
return return_game
def get_one_game(game_id=None, away_team_id=None, home_team_id=None, week_num=None, game_num=None,
channel_id=None, active=None) -> Optional[StratGame]:
single_game, this_game = None, None
if game_id is not None:
single_game = Game.get_by_id(game_id)
else:
this_game = Game.select()
if away_team_id is not None:
this_game = this_game.where(Game.away_team_id == away_team_id)
if home_team_id is not None:
this_game = this_game.where(Game.home_team_id == home_team_id)
if week_num is not None:
this_game = this_game.where(Game.week_num == week_num)
if game_num is not None:
this_game = this_game.where(Game.game_num == game_num)
if channel_id is not None:
this_game = this_game.where(Game.channel_id == channel_id)
if active is not None:
this_game = this_game.where(Game.active == active)
if single_game or this_game.count() > 0:
# return_game = model_to_dict(this_game[0])
r_game = this_game[0] if this_game else single_game
return_game = StratGame(**model_to_dict(r_game))
db.close()
return return_game
else:
return None
async def get_game_team(
game: StratGame, gm_id: int = None, team_abbrev: str = None, team_id: int = None,
skip_cache: bool = False) -> dict:
if not gm_id and not team_abbrev and not team_id:
raise KeyError(f'get_game_team requires either one of gm_id, team_abbrev, or team_id to not be None')
logger.debug(f'getting game team for game {game.id} / gm_id: {gm_id} / '
f'tm_abbrev: {team_abbrev} / team_id: {team_id} / game: {game}')
if game.is_pd:
if team_id:
return await get_pd_team(team_id, skip_cache=skip_cache)
elif gm_id:
return await get_pd_team(gm_id, skip_cache=skip_cache)
# t_query = await db_get('teams', params=[('season', PD_SEASON), ('gm_id', gm_id)])
# return t_query['teams'][0]
else:
t_query = await db_get('teams', params=[('season', PD_SEASON), ('abbrev', team_abbrev)])
return t_query['teams'][0]
else:
if gm_id:
return get_sba_team_by_owner(season=SBA_SEASON, owner_id=gm_id)
elif team_id:
return get_sba_team(team_id)
else:
return get_sba_team(team_abbrev, season=SBA_SEASON)
def patch_game(
game_id, away_team_id=None, home_team_id=None, week_num=None, game_num=None, channel_id=None, active=None,
first_message=None, home_roster_num=None, away_roster_num=None, ai_team=None, cardset_ids=None,
backup_cardset_ids=None):
this_game = Game.get_by_id(game_id)
if away_team_id is not None:
this_game.away_team_id = away_team_id
if home_team_id is not None:
this_game.home_team_id = home_team_id
if week_num is not None:
this_game.week_num = week_num
if game_num is not None:
this_game.game_num = game_num
if channel_id is not None:
this_game.channel_id = channel_id
if active is not None:
this_game.active = active
if first_message is not None:
this_game.first_message = first_message
if home_roster_num is not None:
this_game.home_roster_num = home_roster_num
if away_roster_num is not None:
this_game.away_roster_num = away_roster_num
if ai_team is not None:
this_game.ai_team = ai_team
if cardset_ids is not None:
this_game.cardset_ids = cardset_ids
if backup_cardset_ids is not None:
this_game.backup_cardset_ids = backup_cardset_ids
this_game.save()
# return_game = model_to_dict(this_game)
return_game = StratGame(
this_game.id, this_game.away_team_id, this_game.home_team_id, this_game.channel_id, this_game.season,
this_game.active, this_game.is_pd, this_game.ranked, this_game.short_game, this_game.week_num,
this_game.game_num, this_game.away_roster_num, this_game.home_roster_num, this_game.first_message,
this_game.ai_team, this_game.game_type, this_game.cardset_ids, this_game.backup_cardset_ids
)
db.close()
return return_game
def get_last_game_ids(num_games: int = 10) -> list:
last_games = Game.select(Game.id).order_by(-Game.id).limit(num_games)
game_list = [x.id for x in last_games]
return game_list
def get_active_games(limit: int = 10) -> list:
g_query = Game.select().where(Game.active)
if limit < 0:
limit = 1
g_query = g_query.limit(limit)
return [x for x in g_query]
class Lineup(BaseModel):
game = ForeignKeyField(Game)
team_id = IntegerField()
player_id = IntegerField()
card_id = IntegerField(null=True) # Only needed for Paper Dynasty games
position = CharField()
batting_order = IntegerField()
after_play = IntegerField()
replacing_id = IntegerField(null=True)
active = BooleanField(default=True)
variant = IntegerField(default=0)
is_fatigued = BooleanField(null=True)
@dataclass
class StratLineup:
id: int
game: StratGame
team_id: int
player_id: int
position: str
batting_order: int
after_play: int
replacing_id: int = None
active: bool = True
card_id: int = None
variant: int = 0
is_fatigued: bool = None
def convert_stratlineup(lineup: Lineup) -> StratLineup:
lineup_dict = model_to_dict(lineup)
lineup_dict['game'] = StratGame(**lineup_dict['game'])
return StratLineup(**lineup_dict)
db.create_tables([Lineup])
def get_one_lineup(
game_id: int, lineup_id: int = None, team_id: int = None, batting_order: int = None, position: str = None,
card_id: int = None, active: bool = True, as_obj: bool = False) -> Optional[StratLineup]:
if not batting_order and not position and not lineup_id and not card_id:
raise KeyError(f'One of batting_order, position, card_id , or lineup_id must not be None')
if lineup_id:
this_lineup = Lineup.get_by_id(lineup_id)
elif card_id:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id, Lineup.card_id == card_id, Lineup.active == active
)
elif batting_order:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.batting_order == batting_order,
Lineup.active == active
)
else:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.position == position,
Lineup.active == active
)
logger.debug(f'get_one_lineup / this_lineup: {this_lineup}')
if as_obj:
return this_lineup
if not this_lineup:
return None
# return_value = model_to_dict(this_lineup)
# return_value = StratLineup(**vars(this_lineup)["__data__"])
return_value = convert_stratlineup(this_lineup)
# return_value = StratLineup(
# game=StratGame(**model_to_dict(this_lineup.game)),
# team_id=this_lineup.team_id,
# player_id=this_lineup.player_id,
# position=this_lineup.position,
# batting_order=this_lineup.batting_order,
# after_play=this_lineup.after_play,
# active=
# )
db.close()
return return_value
async def get_team_lineups(
game_id: int, team_id: int, inc_inactive: bool = False, pitchers_only: bool = False, as_string: bool = True):
all_lineups = Lineup.select().where(
(Lineup.game_id == game_id) & (Lineup.team_id == team_id)
).order_by(Lineup.batting_order)
if not inc_inactive:
all_lineups = all_lineups.where(Lineup.active == True)
if pitchers_only:
all_lineups = all_lineups.where(Lineup.position == 'P')
# if all_lineups.count() == 0:
# return None
if as_string:
l_string = ''
this_game = Game.get_by_id(game_id)
for x in all_lineups:
l_string += f'{x.batting_order}. {player_link(this_game, await get_player(this_game, x))} - {x.position}\n'
db.close()
return l_string
return_lineups = []
for x in all_lineups:
# return_lineups.append(model_to_dict(x))
return_lineups.append(convert_stratlineup(x))
db.close()
return return_lineups
def post_lineups(lineups: list):
with db.atomic():
Lineup.insert_many(lineups).execute()
db.close()
def patch_lineup(lineup_id, active: Optional[bool] = None, position: Optional[str] = None,
replacing_id: Optional[int] = None, is_fatigued: Optional[bool] = None) -> StratLineup:
this_lineup = Lineup.get_by_id(lineup_id)
if active is not None:
this_lineup.active = active
if position is not None:
this_lineup.position = position
if replacing_id is not None:
this_lineup.replacing_id = replacing_id
if is_fatigued is not None:
this_lineup.is_fatigued = is_fatigued
this_lineup.save()
# return_lineup = model_to_dict(this_lineup)
return_value = convert_stratlineup(this_lineup)
db.close()
return return_value
def make_sub(lineup_dict: dict):
# Check for dupe player / card
this_game = Game.get_by_id(lineup_dict['game_id'])
if this_game.is_pd:
player_conflict = Lineup.select().where(
(Lineup.game_id == lineup_dict['game_id']) & (Lineup.team_id == lineup_dict['team_id']) &
(Lineup.card_id == lineup_dict['card_id'])
)
db_error = f'This card is already in the lineup'
else:
player_conflict = Lineup.select().where(
(Lineup.game_id == lineup_dict['game_id']) & (Lineup.team_id == lineup_dict['team_id']) &
(Lineup.player_id == lineup_dict['player_id'])
)
db_error = f'This player is already in the lineup'
if player_conflict.count():
db.close()
raise DatabaseError(db_error)
subbed_player = Lineup.get_or_none(
Lineup.game_id == lineup_dict['game_id'], Lineup.team_id == lineup_dict['team_id'],
Lineup.batting_order == lineup_dict['batting_order'], Lineup.active == True
)
logger.debug(f'subbed_player: {subbed_player}')
if subbed_player:
subbed_player = patch_lineup(subbed_player.id, active=False)
lineup_dict['replacing_id'] = subbed_player.id
new_lineup = Lineup.create(**lineup_dict)
# return_lineup = model_to_dict(new_lineup)
return_value = convert_stratlineup(new_lineup)
curr_play = get_current_play(lineup_dict['game_id'])
logger.debug(f'\n\nreturn_value: {return_value}\n\ncurr_play: {curr_play}\n\n')
# Check current play for updates
if curr_play:
if (not curr_play.pitcher and curr_play.batter.team_id != return_value.team_id) or return_value.position == 'P':
patch_play(curr_play.id, pitcher_id=return_value.id)
if subbed_player:
if curr_play.batter == subbed_player:
patch_play(curr_play.id, batter_id=return_value.id)
elif curr_play.pitcher == subbed_player:
patch_play(curr_play.id, pitcher_id=return_value.id)
elif curr_play.catcher == subbed_player:
patch_play(curr_play.id, catcher_id=return_value.id)
elif curr_play.on_first == subbed_player:
patch_play(curr_play.id, on_first_id=return_value.id)
elif curr_play.on_second == subbed_player:
patch_play(curr_play.id, on_second_id=return_value.id)
elif curr_play.on_third == subbed_player:
patch_play(curr_play.id, on_third_id=return_value.id)
db.close()
return return_value
def undo_subs(game: StratGame, new_play_num: int):
logger.info(f'get new players')
new_players = Lineup.select().where((Lineup.game_id == game.id) & (Lineup.after_play > new_play_num))
logger.info(f'new_player count: {new_players.count()}')
replacements = [(x.id, x.replacing_id) for x in new_players]
for x in replacements:
logger.info(f'replacing {x[0]} with {x[1]}')
old_player = get_one_lineup(game_id=game.id, lineup_id=x[1])
logger.info(f'old_player: {old_player}')
patch_lineup(old_player.id, active=True)
logger.info(f'activated!')
logger.info(f'done activating old players')
Lineup.delete().where((Lineup.game_id == game.id) & (Lineup.after_play > new_play_num)).execute()
async def get_player(game, lineup_member, as_dict: bool = True):
# await asyncio.sleep(0.1)
return await get_pd_player(lineup_member.player_id, as_dict=as_dict)
if isinstance(game, Game):
if game.is_pd:
this_card = await db_get(f'cards', object_id=lineup_member.card_id)
player = this_card['player']
player['name'] = player['p_name']
player['team'] = this_card['team']
return player
else:
return get_sba_player(lineup_member.player_id)
elif isinstance(game, StratGame):
if game.is_pd:
# card_id = lineup_member.card_id if isinstance(lineup_member, Lineup) else lineup_member['card_id']
card_id = lineup_member['card_id'] if isinstance(lineup_member, dict) else lineup_member.card_id
this_card = await db_get(f'cards', object_id=card_id)
player = this_card['player']
player['name'] = player['p_name']
player['team'] = this_card['team']
logger.debug(f'player: {player}')
return player
else:
return get_sba_player(lineup_member.player_id)
else:
raise TypeError(f'Cannot get player; game is not a valid object')
def player_link(game, player):
if isinstance(game, Game):
if game.is_pd:
return f'[{player["p_name"]}]({player["image"]})'
else:
return get_player_url(player)
elif isinstance(game, StratGame):
if game.is_pd:
return f'[{player["p_name"]}]({player["image"]})'
else:
return get_player_url(player)
else:
raise TypeError(f'Cannot get player link; game is not a valid object')
class Play(BaseModel):
game = ForeignKeyField(Game)
play_num = IntegerField()
batter = ForeignKeyField(Lineup)
batter_pos = CharField(default='DH')
pitcher = ForeignKeyField(Lineup, null=True)
on_base_code = IntegerField()
inning_half = CharField()
inning_num = IntegerField()
batting_order = IntegerField()
starting_outs = IntegerField()
away_score = IntegerField()
home_score = IntegerField()
in_pow = BooleanField(default=False)
on_first = ForeignKeyField(Lineup, null=True)
on_first_final = IntegerField(null=True)
on_second = ForeignKeyField(Lineup, null=True)
on_second_final = IntegerField(null=True)
on_third = ForeignKeyField(Lineup, null=True)
on_third_final = IntegerField(null=True)
batter_final = IntegerField(null=True)
pa = IntegerField(default=0)
ab = IntegerField(default=0)
run = IntegerField(default=0)
e_run = IntegerField(default=0)
hit = IntegerField(default=0)
rbi = IntegerField(default=0)
double = IntegerField(default=0)
triple = IntegerField(default=0)
homerun = IntegerField(default=0)
bb = IntegerField(default=0)
so = IntegerField(default=0)
hbp = IntegerField(default=0)
sac = IntegerField(default=0)
ibb = IntegerField(default=0)
gidp = IntegerField(default=0)
bphr = IntegerField(default=0)
bpfo = IntegerField(default=0)
bp1b = IntegerField(default=0)
bplo = IntegerField(default=0)
sb = IntegerField(default=0)
cs = IntegerField(default=0)
outs = IntegerField(default=0)
wpa = FloatField(default=0.0)
re24 = FloatField(default=0.0)
catcher = ForeignKeyField(Lineup, null=True)
defender = ForeignKeyField(Lineup, null=True)
runner = ForeignKeyField(Lineup, null=True)
check_pos = CharField(null=True)
error = IntegerField(default=0)
wild_pitch = IntegerField(default=0)
passed_ball = IntegerField(default=0)
pick_off = IntegerField(default=0)
balk = IntegerField(default=0)
complete = BooleanField(default=False)
locked = BooleanField(default=False)
is_go_ahead = BooleanField(default=False)
is_tied = BooleanField(default=False)
is_new_inning = BooleanField(default=False)
@dataclass
class StratPlay:
id: int
game: StratGame
play_num: int
batter: StratLineup
pitcher: StratLineup
on_base_code: int
inning_half: str
inning_num: int
batting_order: int
starting_outs: int
away_score: int
home_score: int
batter_pos: str = None
in_pow: bool = False
on_first: StratLineup = None
on_first_final: int = None
on_second: StratLineup = None
on_second_final: int = None
on_third: StratLineup = None
on_third_final: int = None
batter_final: int = None
pa: int = 0
ab: int = 0
run: int = 0
e_run: int = 0
hit: int = 0
rbi: int = 0
double: int = 0
triple: int = 0
homerun: int = 0
bb: int = 0
so: int = 0
hbp: int = 0
sac: int = 0
ibb: int = 0
gidp: int = 0
bphr: int = 0
bpfo: int = 0
bp1b: int = 0
bplo: int = 0
sb: int = 0
cs: int = 0
outs: int = 0
wpa: float = 0.0
re24: float = 0.0
catcher: StratLineup = None
defender: StratLineup = None
runner: StratLineup = None
check_pos: str = None
error: int = 0
wild_pitch: int = 0
passed_ball: int = 0
pick_off: int = 0
balk: int = 0
complete: bool = False
locked: bool = False
is_go_ahead: bool = False
is_tied: bool = False
is_new_inning: bool = False
def ai_run_diff(self):
if self.game.ai_team == 'away':
return self.away_score - self.home_score
else:
return self.home_score - self.away_score
def convert_stratplay(play: Play) -> StratPlay:
play_dict = model_to_dict(play)
play_dict['game'] = StratGame(**play_dict['game'])
if play_dict['batter']:
play_dict['batter'] = convert_stratlineup(play.batter)
if play_dict['pitcher']:
play_dict['pitcher'] = convert_stratlineup(play.pitcher)
if play_dict['on_first']:
play_dict['on_first'] = convert_stratlineup(play.on_first)
if play_dict['on_second']:
play_dict['on_second'] = convert_stratlineup(play.on_second)
if play_dict['on_third']:
play_dict['on_third'] = convert_stratlineup(play.on_third)
if play_dict['catcher']:
play_dict['catcher'] = convert_stratlineup(play.catcher)
if play_dict['defender']:
play_dict['defender'] = convert_stratlineup(play.defender)
if play_dict['runner']:
play_dict['runner'] = convert_stratlineup(play.runner)
return StratPlay(**play_dict)
db.create_tables([Play])
def post_play(play_dict: dict) -> StratPlay:
logger.debug(f'play_dict: {play_dict}')
new_play = Play.create(**play_dict)
# return_play = model_to_dict(new_play)
return_play = convert_stratplay(new_play)
db.close()
return return_play
def patch_play(
play_id, batter_id: int = None, pitcher_id: int = None, catcher_id: int = None, locked: bool = None,
pa: int = None, ab: int = None, hit: int = None, double: int = None, triple: int = None, homerun: int = None,
outs: int = None, so: int = None, bp1b: int = None, bplo: int = None, bphr: int = None, bpfo: int = None,
walk: int = None, hbp: int = None, ibb: int = None, sac: int = None, sb: int = None, is_go_ahead: bool = None,
defender_id: int = None, check_pos: str = None, error: int = None, play_num: int = None, cs: int = None,
on_first_id: int = None, on_first_final: int = None, on_second_id: int = None, on_second_final: int = None,
on_third_id: int = None, on_third_final: int = None, starting_outs: int = None, runner_id: int = None,
complete: bool = None, rbi: int = None, wp: int = None, pb: int = None, pick: int = None, balk: int = None,
is_new_inning: bool = None, batter_final: int = None, in_pow: bool = None):
this_play = Play.get_by_id(play_id)
if batter_id is not None:
this_play.batter_id = batter_id
if pitcher_id is not None:
this_play.pitcher_id = pitcher_id
if catcher_id is not None:
this_play.catcher_id = catcher_id
if runner_id is not None:
this_play.runner_id = runner_id
if locked is not None:
this_play.locked = locked
if complete is not None:
this_play.complete = complete
if pa is not None:
this_play.pa = pa
if ab is not None:
this_play.ab = ab
if hit is not None:
this_play.hit = hit
if rbi is not None:
this_play.rbi = rbi
if double is not None:
this_play.double = double
if triple is not None:
this_play.triple = triple
if homerun is not None:
this_play.homerun = homerun
if starting_outs is not None:
this_play.starting_outs = starting_outs
if outs is not None:
this_play.outs = outs
if so is not None:
this_play.so = so
if bp1b is not None:
this_play.bp1b = bp1b
if bplo is not None:
this_play.bplo = bplo
if bphr is not None:
this_play.bphr = bphr
if bpfo is not None:
this_play.bpfo = bpfo
if walk is not None:
this_play.bb = walk
if hbp is not None:
this_play.hbp = hbp
if ibb is not None:
this_play.ibb = ibb
if sac is not None:
this_play.sac = sac
if sb is not None:
this_play.sb = sb
if cs is not None:
this_play.cs = cs
if wp is not None:
this_play.wild_pitch = wp
if pb is not None:
this_play.passed_ball = pb
if pick is not None:
this_play.pick_off = pick
if balk is not None:
this_play.balk = balk
if defender_id is not None:
if not defender_id:
this_play.defender_id = None
else:
this_play.defender_id = defender_id
if check_pos is not None:
if check_pos.lower() == 'false':
this_play.check_pos = None
else:
this_play.check_pos = check_pos
if error is not None:
this_play.error = error
if play_num is not None:
this_play.play_num = play_num
if on_first_id is not None:
if not on_first_id:
this_play.on_first = None
else:
this_play.on_first_id = on_first_id
if on_first_final is not None:
if not on_first_final:
this_play.on_first_final = None
else:
this_play.on_first_final = on_first_final
if on_second_id is not None:
this_play.on_second_id = on_second_id
if not on_second_id:
this_play.on_second = None
else:
this_play.on_second_id = on_second_id
if on_second_final is not None:
if not on_second_final:
this_play.on_second_final = None
else:
this_play.on_second_final = on_second_final
if on_third_id is not None:
this_play.on_third_id = on_third_id
if not on_third_id:
this_play.on_third = None
else:
this_play.on_third_id = on_third_id
if on_third_final is not None:
if not on_third_final:
this_play.on_third_final = None
else:
this_play.on_third_final = on_third_final
if batter_final is not None:
if not batter_final:
this_play.batter_final = None
else:
this_play.batter_final = batter_final
if is_go_ahead is not None:
if not is_go_ahead:
this_play.is_go_ahead = 0
else:
this_play.is_go_ahead = 1
if is_new_inning is not None:
if not is_new_inning:
this_play.is_new_inning = 0
else:
this_play.is_new_inning = 1
if in_pow is not None:
if not in_pow:
this_play.in_pow = 0
else:
this_play.in_pow = 1
this_play.save()
# return_play = model_to_dict(this_play)
return_play = convert_stratplay(this_play)
db.close()
return return_play
def get_plays(game_id: int = None, pitcher_id: int = None):
all_plays = Play.select().where(Play.game_id == game_id)
if pitcher_id is not None:
all_plays = all_plays.where(Play.pitcher_id == pitcher_id)
return_plays = {'count': all_plays.count(), 'plays': []}
for line in all_plays:
return_plays['plays'].append(convert_stratplay(line))
db.close()
return return_plays
def get_play_by_num(game_id, play_num: int):
latest_play = Play.get_or_none(Play.game_id == game_id, Play.play_num == play_num)
if not latest_play:
return None
# return_play = model_to_dict(latest_play)
return_play = convert_stratplay(latest_play)
db.close()
return return_play
def get_latest_play(game_id):
latest_play = Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
if not latest_play:
return None
# return_play = model_to_dict(latest_play[0])
return_play = convert_stratplay(latest_play[0])
db.close()
return return_play
def undo_play(game_id):
p_query = Play.delete().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
logger.debug(f'p_query: {p_query}')
count = p_query.execute()
db.close()
return count
def get_current_play(game_id) -> Optional[StratPlay]:
curr_play = Play.get_or_none(Play.game_id == game_id, Play.complete == False)
if not curr_play:
latest_play = Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
if not latest_play:
return None
else:
complete_play(latest_play[0].id, batter_to_base=latest_play[0].batter_final)
curr_play = Play.get_or_none(Play.game_id == game_id, Play.complete == False)
# return_play = model_to_dict(curr_play)
return_play = convert_stratplay(curr_play)
db.close()
return return_play
def get_last_inning_end_play(game_id, inning_half, inning_num):
this_play = Play.select().where(
(Play.game_id == game_id) & (Play.inning_half == inning_half) & (Play.inning_num == inning_num)
).order_by(-Play.id)
# return_play = model_to_dict(this_play[0])
if this_play.count() > 0:
return_play = convert_stratplay(this_play[0])
else:
return_play = None
db.close()
return return_play
def add_run_last_player_ab(lineup: Lineup, is_erun: bool = True):
try:
last_ab = Play.select().where(Play.batter == lineup).order_by(-Play.id).get()
except DoesNotExist as e:
logger.error(f'Unable to apply run to Lineup {lineup}')
else:
last_ab.run = 1
last_ab.e_run = 1 if is_erun else 0
last_ab.save()
def get_dbready_plays(game_id: int, db_game_id: int):
all_plays = Play.select().where(Play.game_id == game_id)
prep_plays = [model_to_dict(x) for x in all_plays]
db.close()
obc_list = ['000', '001', '010', '100', '011', '101', '110', '111']
for x in prep_plays:
x['pitcher_id'] = x['pitcher']['player_id']
x['batter_id'] = x['batter']['player_id']
x['batter_team_id'] = x['batter']['team_id']
x['pitcher_team_id'] = x['pitcher']['team_id']
if x['catcher'] is not None:
x['catcher_id'] = x['catcher']['player_id']
x['catcher_team_id'] = x['catcher']['team_id']
if x['defender'] is not None:
x['defender_id'] = x['defender']['player_id']
x['defender_team_id'] = x['defender']['team_id']
if x['runner'] is not None:
x['runner_id'] = x['runner']['player_id']
x['runner_team_id'] = x['runner']['team_id']
x['game_id'] = db_game_id
x['on_base_code'] = obc_list[x['on_base_code']]
logger.debug(f'all_plays:\n\n{prep_plays}\n')
return prep_plays
class Bullpen(BaseModel):
ai_team_id = IntegerField()
closer_id = IntegerField()
setup_id = IntegerField()
middle_one_id = IntegerField(null=True)
middle_two_id = IntegerField(null=True)
middle_three_id = IntegerField(null=True)
long_one_id = IntegerField(null=True)
long_two_id = IntegerField(null=True)
long_three_id = IntegerField(null=True)
long_four_id = IntegerField(null=True)
last_updated = DateTimeField()
db.create_tables([Bullpen])
@dataclass
class StratBullpen:
id: int
ai_team_id: int
closer_id: int
setup_id: int
last_updated: int
middle_one_id: Optional[int] = None
middle_two_id: Optional[int] = None
middle_three_id: Optional[int] = None
long_one_id: Optional[int] = None
long_two_id: Optional[int] = None
long_three_id: Optional[int] = None
long_four_id: Optional[int] = None
def convert_bullpen_to_strat(bullpen: Bullpen) -> StratBullpen:
bullpen_dict = model_to_dict(bullpen)
return StratBullpen(**bullpen_dict)
def get_or_create_bullpen(ai_team, bot):
this_pen = Bullpen.get_or_none(Bullpen.ai_team_id == ai_team['id'])
if this_pen:
return convert_bullpen_to_strat(this_pen)
three_days_ago = int(datetime.datetime.timestamp(datetime.datetime.now() - datetime.timedelta(days=3))) * 1000
logger.debug(f'3da: {three_days_ago} / last_up: {this_pen.last_updated} / L > 3: '
f'{this_pen.last_updated > three_days_ago}')
if this_pen and this_pen.last_updated > three_days_ago:
return convert_bullpen_to_strat(this_pen)
else:
this_pen.delete_instance()
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(ai_team['gsheet'])
r_sheet = this_sheet.worksheet_by_title('My Rosters')
bullpen_range = f'N30:N41'
raw_cells = r_sheet.range(bullpen_range)
logger.debug(f'raw_cells: {raw_cells}')
bullpen = Bullpen(
ai_team_id=ai_team['id'],
closer_id=raw_cells[0][0].value,
setup_id=raw_cells[1][0].value,
middle_one_id=raw_cells[2][0].value if raw_cells[2][0].value != '' else None,
middle_two_id=raw_cells[3][0].value if raw_cells[3][0].value != '' else None,
middle_three_id=raw_cells[4][0].value if raw_cells[4][0].value != '' else None,
long_one_id=raw_cells[5][0].value if raw_cells[5][0].value != '' else None,
long_two_id=raw_cells[6][0].value if raw_cells[6][0].value != '' else None,
long_three_id=raw_cells[7][0].value if raw_cells[7][0].value != '' else None,
long_four_id=raw_cells[8][0].value if raw_cells[8][0].value != '' else None,
last_updated=int(datetime.datetime.timestamp(datetime.datetime.now())*1000)
)
bullpen.save()
logger.debug(f'bullpen: {bullpen}')
return convert_bullpen_to_strat(bullpen)
def advance_runners(play_id: int, num_bases: int, is_error: bool = False, only_forced: bool = False):
"""
Advances runners and tallies RBIs
"""
this_play = Play.get_by_id(play_id)
this_play.rbi = 0
if only_forced:
if not this_play.on_first:
if this_play.on_second:
this_play.on_second_final = 2
if this_play.on_third:
this_play.on_third_final = 3
this_play.save()
db.close()
return
if this_play.on_second:
if this_play.on_third:
if num_bases > 0:
this_play.on_third_final = 4
this_play.rbi += 1 if not is_error else 0
if num_bases > 1:
this_play.on_second_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 1:
this_play.on_second_final = 3
else:
this_play.on_second_final = 2
else:
if this_play.on_third:
this_play.on_third_final = 3
if num_bases > 2:
this_play.on_first_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
else:
this_play.on_first_final = 1
else:
if this_play.on_third:
if num_bases > 0:
this_play.on_third_final = 4
this_play.rbi += 1 if not is_error else 0
else:
this_play.on_third_final = 3
if this_play.on_second:
if num_bases > 1:
this_play.on_second_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 1:
this_play.on_second_final = 3
else:
this_play.on_second_final = 2
if this_play.on_first:
if num_bases > 2:
this_play.on_first_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
else:
this_play.on_first_final = 1
if num_bases == 4:
this_play.batter_final = 4
this_play.rbi += 1
this_play.save()
db.close()
def advance_one_runner(play_id: int, from_base: int, num_bases: int):
"""
Advances runner and excludes RBIs
"""
this_play = Play.get_by_id(play_id)
if from_base == 1:
if num_bases > 2:
this_play.on_first_final = 4
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
elif from_base == 2:
if num_bases > 1:
this_play.on_second_final = 4
elif num_bases == 1:
this_play.on_second_final = 3
elif from_base == 3:
if num_bases > 0:
this_play.on_third_final = 4
# if this_play.on_first and this_play.on_first_final is not None:
# this_play.on_first_final = 1
# if this_play.on_second and this_play.on_second_final is not None:
# this_play.on_second_final = 2
# if this_play.on_third and this_play.on_third_final is not None:
# this_play.on_third_final = 3
this_play.save()
db.close()
def complete_play(play_id, batter_to_base: int = None):
"""
Finalizes current play and sets base values for next play
"""
this_play = Play.get_by_id(play_id)
this_play.locked = False
this_play.complete = True
this_play.save()
logger.debug(f'starting the inning calc')
new_inning_half = this_play.inning_half
new_inning_num = this_play.inning_num
if this_play.runner or this_play.wild_pitch or this_play.passed_ball or this_play.pick_off or this_play.balk:
new_batting_order = this_play.batting_order
else:
new_batting_order = this_play.batting_order + 1 if this_play.batting_order < 9 else 1
new_bteam_id = this_play.batter.team_id
new_pteam_id = this_play.pitcher.team_id
new_starting_outs = this_play.starting_outs + this_play.outs
new_on_first = None
new_on_second = None
new_on_third = None
# score_increment = this_play.homerun
# patch to handle little league home runs TODO: standardize on just _on_final for these
logger.debug(f'complete_play - this_play: {this_play}')
if this_play.batter_final == 4 or batter_to_base == 4:
this_play.run = 1
score_increment = 1
if not this_play.error:
this_play.e_run = 1
else:
score_increment = 0
logger.debug(f'complete_play - score_increment: {score_increment}')
if this_play.on_first_final == 99:
this_play.on_first_final = None
elif this_play.on_first_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_first, this_play.error == 0)
if this_play.on_second_final == 99:
this_play.on_second_final = None
elif this_play.on_second_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_second, this_play.error == 0)
if this_play.on_third_final == 99:
this_play.on_third_final = None
elif this_play.on_third_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_third, this_play.error == 0)
this_play.save()
# for runner in [this_play.on_first_final, this_play.on_second_final, this_play.on_third_final]:
# if runner == 4:
# score_increment += 1
if this_play.starting_outs + this_play.outs > 2:
new_starting_outs = 0
new_obc = 0
if this_play.inning_half == 'Top':
new_inning_half = 'Bot'
new_bteam_id = this_play.game.home_team_id
new_pteam_id = this_play.game.away_team_id
else:
new_inning_half = 'Top'
new_inning_num += 1
new_bteam_id = this_play.game.away_team_id
new_pteam_id = this_play.game.home_team_id
if new_inning_num > 1:
last_inning_play = get_last_inning_end_play(this_play.game.id, new_inning_half, new_inning_num - 1)
if last_inning_play.runner or last_inning_play.pick_off:
new_batting_order = last_inning_play.batting_order
else:
new_batting_order = last_inning_play.batting_order + 1 if last_inning_play.batting_order < 9 else 1
else:
new_batting_order = 1
# Not an inning-ending play
else:
logger.debug(f'starting the obc calc')
bases_occ = [False, False, False, False]
# Set the occupied bases for the next play and lineup member occupying it
for runner, base in [
(this_play.on_first, this_play.on_first_final), (this_play.on_second, this_play.on_second_final),
(this_play.on_third, this_play.on_third_final)
]:
if base:
bases_occ[base - 1] = True
if base == 1:
new_on_first = runner
elif base == 2:
new_on_second = runner
elif base == 3:
new_on_third = runner
# elif base == 4:
# score_increment += 1
# Set the batter-runner occupied base for the next play
if batter_to_base:
if batter_to_base == 1:
bases_occ[0] = True
new_on_first = this_play.batter
elif batter_to_base == 2:
bases_occ[1] = True
new_on_second = this_play.batter
elif batter_to_base == 3:
bases_occ[2] = True
new_on_third = this_play.batter
# elif batter_to_base == 4:
# score_increment += 1
this_play.batter_final = batter_to_base
this_play.save()
# Set the OBC based on the bases occupied
if bases_occ[2]:
if bases_occ[1]:
if bases_occ[0]:
new_obc = 7
else:
new_obc = 6
elif bases_occ[0]:
new_obc = 5
else:
new_obc = 3
elif bases_occ[1]:
if bases_occ[0]:
new_obc = 4
else:
new_obc = 2
elif bases_occ[0]:
new_obc = 1
else:
new_obc = 0
if this_play.inning_half == 'Top':
new_away_score = this_play.away_score + score_increment
new_home_score = this_play.home_score
else:
new_away_score = this_play.away_score
new_home_score = this_play.home_score + score_increment
# A team score
if score_increment:
logger.debug(f'complete_play: \n\nscore_increment: {score_increment}\n\nnew home score: {new_home_score}\n\n'
f'new_away_score: {new_away_score}\n\nthis_play.away_score: {this_play.away_score}\n\n'
f'this_player.home_score: {this_play.home_score}')
# Game is now tied
if new_home_score == new_away_score:
logger.debug(f'\n\nGame {this_play.game} is now tied\n\n')
this_play.is_tied = 1
# One team took the lead
elif (this_play.away_score <= this_play.home_score) and (new_away_score > new_home_score):
logger.debug(f'\n\nTeam {this_play.batter.team_id} took the lead\n\n')
this_play.is_go_ahead = 1
this_play.save()
elif (this_play.home_score <= this_play.away_score) and (new_home_score > new_away_score):
logger.debug(f'\n\nteam {this_play.batter.team_id} took the lead\n\n')
this_play.is_go_ahead = 1
this_play.save()
# RE24 Calc
re_data = {
0: [0.457, 0.231, 0.077],
1: [0.793, 0.438, 0.171],
2: [1.064, 0.596, 0.259],
4: [1.373, 0.772, 0.351],
3: [1.340, 0.874, 0.287],
5: [1.687, 1.042, 0.406],
6: [1.973, 1.311, 0.448],
7: [2.295, 1.440, 0.618]
}
start_re24 = re_data[this_play.on_base_code][this_play.starting_outs]
end_re24 = 0 if this_play.starting_outs + this_play.outs == 3 else re_data[new_obc][new_starting_outs]
this_play.re24 = end_re24 - start_re24 + score_increment
this_play.save()
batter = get_one_lineup(this_play.game.id, team_id=new_bteam_id, batting_order=new_batting_order)
batter_id = batter.id if batter else None
pitcher = get_one_lineup(this_play.game_id, team_id=new_pteam_id, position='P')
pitcher_id = pitcher.id if pitcher else None
logger.debug(f'done the obc calc')
next_play = Play.create(**{
'game_id': this_play.game.id,
'play_num': this_play.play_num + 1,
'batter_id': batter_id,
'pitcher_id': pitcher_id,
'on_base_code': new_obc,
'inning_half': new_inning_half,
'inning_num': new_inning_num,
'next_inning_num': new_inning_num,
'batting_order': new_batting_order,
'starting_outs': new_starting_outs,
'away_score': new_away_score,
'home_score': new_home_score,
'on_first': new_on_first,
'on_second': new_on_second,
'on_third': new_on_third,
'is_new_inning': 1 if new_inning_half != this_play.inning_half else 0
})
# return_play = model_to_dict(next_play)
return_play = convert_stratplay(next_play)
db.close()
return return_play
def get_batting_stats(game_id, lineup_id: int = None, team_id: int = None):
batting_stats = Play.select(
Play.batter,
fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias('pl_pa'),
fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias('pl_ab'),
fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias('pl_hit'),
fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias('pl_rbi'),
# fn.COUNT(Play.on_first_final).filter(
# Play.on_first_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_first'),
# fn.COUNT(Play.on_second_final).filter(
# Play.on_second_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_second'),
# fn.COUNT(Play.on_third_final).filter(
# Play.on_third_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_third'),
# fn.COUNT(Play.batter_final).filter(
# Play.batter_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_batter'),
fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias('pl_double'),
fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias('pl_triple'),
fn.SUM(Play.homerun).over(partition_by=[Play.batter_id]).alias('pl_homerun'),
fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias('pl_bb'),
fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias('pl_so'),
fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias('pl_hbp'),
fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias('pl_sac'),
fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias('pl_ibb'),
fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias('pl_gidp'),
fn.SUM(Play.sb).over(partition_by=[Play.runner_id]).alias('pl_sb'),
fn.SUM(Play.cs).over(partition_by=[Play.runner_id]).alias('pl_cs'),
fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias('pl_bphr'),
fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias('pl_bpfo'),
fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias('pl_bp1b'),
fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias('pl_bplo'),
fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias('tm_pa'),
fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias('tm_ab'),
fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias('tm_hit'),
fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias('tm_rbi'),
fn.SUM(Play.double).over(partition_by=[Play.batter.team_id]).alias('tm_double'),
fn.SUM(Play.triple).over(partition_by=[Play.batter.team_id]).alias('tm_triple'),
fn.SUM(Play.homerun).over(partition_by=[Play.batter.team_id]).alias('tm_homerun'),
fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias('tm_bb'),
fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias('tm_so'),
fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias('tm_hbp'),
fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias('tm_sac'),
fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias('tm_ibb'),
fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias('tm_gidp'),
fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias('tm_sb'),
fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias('tm_cs'),
fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias('tm_bphr'),
fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias('tm_bpfo'),
fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias('tm_bp1b'),
fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias('tm_bplo'),
).join(Lineup, on=Play.batter).where(Play.game_id == game_id)
if lineup_id is not None:
batting_stats = batting_stats.where(Play.batter_id == lineup_id)
elif team_id is not None:
batting_stats = batting_stats.where(Play.batter.team_id == team_id)
done_batters = []
return_batters = []
for x in batting_stats:
if x.batter.id not in done_batters:
runs_scored = Play.select(Play.pa).where(
((Play.on_first == x.batter) & (Play.on_first_final == 4)) |
((Play.on_second == x.batter) & (Play.on_second_final == 4)) |
((Play.on_third == x.batter) & (Play.on_third_final == 4)) |
((Play.batter == x.batter) & (Play.batter_final == 4))
).count()
stolen_bases = Play.select(Play.pa).where(
(Play.runner == x.batter) & (Play.sb == 1)
).count()
return_batters.append({
'batter_id': x.batter_id,
'card_id': x.batter.card_id,
'team_id': x.batter.team_id,
'pos': x.batter.position,
'pl_run': runs_scored,
'pl_pa': x.pl_pa,
'pl_ab': x.pl_ab,
'pl_hit': x.pl_hit,
'pl_rbi': x.pl_rbi,
'pl_double': x.pl_double,
'pl_triple': x.pl_triple,
'pl_homerun': x.pl_homerun,
'pl_bb': x.pl_bb,
'pl_so': x.pl_so,
'pl_hbp': x.pl_hbp,
'pl_sac': x.pl_sac,
'pl_ibb': x.pl_ibb,
'pl_gidp': x.pl_gidp,
'pl_sb': stolen_bases,
'pl_cs': x.pl_cs,
'pl_bphr': x.pl_bphr,
'pl_bpfo': x.pl_bpfo,
'pl_bp1b': x.pl_bp1b,
'pl_bplo': x.pl_bplo,
'tm_pa': x.tm_pa,
'tm_ab': x.tm_ab,
'tm_hit': x.tm_hit,
'tm_rbi': x.tm_rbi,
'tm_double': x.tm_double,
'tm_triple': x.tm_triple,
'tm_homerun': x.tm_homerun,
'tm_bb': x.tm_bb,
'tm_so': x.tm_so,
'tm_hbp': x.tm_hbp,
'tm_sac': x.tm_sac,
'tm_ibb': x.tm_ibb,
'tm_gidp': x.tm_gidp,
'tm_sb': x.tm_sb,
'tm_cs': x.tm_cs,
'tm_bphr': x.tm_bphr,
'tm_bpfo': x.tm_bpfo,
'tm_bp1b': x.tm_bp1b,
'tm_bplo': x.tm_bplo,
})
done_batters.append(x.batter.id)
db.close()
return return_batters
def get_fielding_stats(game_id, lineup_id: int = None, team_id: int = None):
fielding_stats = Play.select(
Play.defender,
Play.check_pos,
fn.SUM(Play.error).over(partition_by=[Play.defender_id]).alias('pl_error'),
fn.SUM(Play.hit).over(partition_by=[Play.defender_id]).alias('pl_xhit'),
fn.COUNT(Play.defender).over(partition_by=[Play.defender_id]).alias('pl_xch'),
fn.SUM(Play.error).over(partition_by=[Play.defender.team_id]).alias('tm_error'),
).join(Lineup, on=Play.defender).where(Play.game_id == game_id)
if lineup_id is not None:
fielding_stats = fielding_stats.where(Play.defender_id == lineup_id)
elif team_id is not None:
fielding_stats = fielding_stats.where(Play.defender.team_id == team_id)
added_card_ids = []
all_stats = []
for x in fielding_stats:
if x.defender.card_id not in added_card_ids:
added_card_ids.append(x.defender.card_id)
all_stats.append({
'defender_id': x.defender_id,
'card_id': x.defender.card_id,
'team_id': x.defender.team_id,
'pos': x.check_pos,
'pl_error': x.pl_error,
'pl_xhit': x.pl_xhit,
'pl_xch': x.pl_xch,
'tm_error': x.pl_error,
'pl_pb': 0,
'pl_sbc': 0,
'pl_csc': 0
})
catching_stats = Play.select(
Play.catcher,
fn.SUM(Play.passed_ball).over(partition_by=[Play.catcher_id]).alias('pl_pb'),
fn.SUM(Play.sb).over(partition_by=[Play.catcher_id]).alias('pl_sbc'),
fn.SUM(Play.cs).over(partition_by=[Play.catcher_id]).alias('pl_csc')
).join(Lineup, on=Play.catcher).where(Play.game_id == game_id)
if lineup_id is not None:
catching_stats = catching_stats.where(Play.defender_id == lineup_id)
elif team_id is not None:
catching_stats = catching_stats.where(Play.defender.team_id == team_id)
for x in catching_stats:
all_stats.append({
'defender_id': x.catcher_id,
'card_id': x.catcher.card_id,
'team_id': x.catcher.team_id,
'pos': 'C',
'pl_error': 0,
'pl_xhit': 0,
'pl_xch': 0,
'tm_error': 0,
'pl_pb': x.pl_pb,
'pl_sbc': x.pl_sbc,
'pl_csc': x.pl_csc
})
logger.debug(f'fielding_stats: {all_stats}')
db.close()
return all_stats
# def new_get_batting_stats(game_id, lineup_id: int = None, team_id: int = None):
# return_stats = []
#
# if lineup_id is not None:
# batting_stats = Play.select(
# Play.batter,
# fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias('pl_pa'),
# fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias('pl_ab'),
# fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias('pl_hit'),
# fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias('pl_rbi'),
# fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias('pl_double'),
# fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias('pl_triple'),
# fn.SUM(Play.homerun).over(partition_by=[Play.batter_id]).alias('pl_homerun'),
# fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias('pl_bb'),
# fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias('pl_so'),
# fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias('pl_hbp'),
# fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias('pl_sac'),
# fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias('pl_ibb'),
# fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias('pl_gidp'),
# fn.SUM(Play.sb).over(partition_by=[Play.batter_id]).alias('pl_sb'),
# fn.SUM(Play.cs).over(partition_by=[Play.batter_id]).alias('pl_cs'),
# fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias('pl_bphr'),
# fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias('pl_bpfo'),
# fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias('pl_bp1b'),
# fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias('pl_bplo')
# ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter_id == lineup_id))
#
# done_batters = []
# for x in batting_stats:
# if x.batter.id not in done_batters:
# return_stats.append({
# 'batter_id': x.batter_id,
# 'pl_pa': x.pl_pa,
# 'pl_ab': x.pl_ab,
# 'pl_hit': x.pl_hit,
# 'pl_rbi': x.pl_rbi,
# 'pl_double': x.pl_double,
# 'pl_triple': x.pl_triple,
# 'pl_homerun': x.pl_homerun,
# 'pl_bb': x.pl_bb,
# 'pl_so': x.pl_so,
# 'pl_hbp': x.pl_hbp,
# 'pl_sac': x.pl_sac,
# 'pl_ibb': x.pl_ibb,
# 'pl_gidp': x.pl_gidp,
# 'pl_sb': x.pl_sb,
# 'pl_cs': x.pl_cs,
# 'pl_bphr': x.pl_bphr,
# 'pl_bpfo': x.pl_bpfo,
# 'pl_bp1b': x.pl_bp1b,
# 'pl_bplo': x.pl_bplo,
# })
# done_batters.append(x.batter.id)
# logger.info(f'batting stats: {return_stats}')
#
# elif team_id is not None:
# batting_stats = Play.select(
# fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias('tm_pa'),
# fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias('tm_ab'),
# fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias('tm_hit'),
# fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias('tm_rbi'),
# fn.SUM(Play.double).over(partition_by=[Play.batter.team_id]).alias('tm_double'),
# fn.SUM(Play.triple).over(partition_by=[Play.batter.team_id]).alias('tm_triple'),
# fn.SUM(Play.homerun).over(partition_by=[Play.batter.team_id]).alias('tm_homerun'),
# fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias('tm_bb'),
# fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias('tm_so'),
# fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias('tm_hbp'),
# fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias('tm_sac'),
# fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias('tm_ibb'),
# fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias('tm_gidp'),
# fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias('tm_sb'),
# fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias('tm_ccs'),
# fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias('tm_bphr'),
# fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias('tm_bpfo'),
# fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias('tm_bp1b'),
# fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias('tm_bplo')
# ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter.team_id == team_id))
#
# logger.info(f'batting_stats count: {batting_stats.count()}')
# done_batters = []
# for x in batting_stats:
# if x.batter.id not in done_batters:
# return_stats.append({
# 'tm_pa': x.tm_pa,
# 'tm_ab': x.tm_ab,
# 'tm_hit': x.tm_hit,
# 'tm_rbi': x.tm_rbi,
# 'tm_double': x.tm_double,
# 'tm_triple': x.tm_triple,
# 'tm_homerun': x.tm_homerun,
# 'tm_bb': x.tm_bb,
# 'tm_so': x.tm_so,
# 'tm_hbp': x.tm_hbp,
# 'tm_sac': x.tm_sac,
# 'tm_ibb': x.tm_ibb,
# 'tm_gidp': x.tm_gidp,
# 'tm_sb': x.tm_sb,
# 'tm_cs': x.tm_cs,
# 'tm_bphr': x.tm_bphr,
# 'tm_bpfo': x.tm_bpfo,
# 'tm_bp1b': x.tm_bp1b,
# 'tm_bplo': x.tm_bplo,
# })
# done_batters.append(x.batter.id)
#
# else:
# raise DatabaseError(f'Either lineup_id or team_id must be specified.')
#
# db.close()
# return return_stats
# def get_final_batting_stats(game_id: int, team_id: int):
# team_lineups = Lineup.select().where(
# (Lineup.game_id == game_id) & (Lineup.team_id == team_id) & (Lineup.batting_order < 10)
# ).order_by(Lineup.batting_order)
# logger.info(f'team_lineups count: {team_lineups.count()} / lineups: {team_lineups}')
#
# all_stats = []
# for line in team_lineups:
# all_stats.append(get_batting_stats(game_id, lineup_id=line.id))
#
# return all_stats
def get_pitching_stats(
game_id, lineup_id: int = None, team_id: int = None, in_pow: bool = None, in_innings: list = None):
if in_innings is None:
in_innings = [x for x in range(1, 30)]
logger.info(f'db_calls_gameplay - get_pitching_stats - in_innings: {in_innings}')
pitching_stats = Play.select(
Play.pitcher,
fn.SUM(Play.outs).over(partition_by=[Play.pitcher_id]).alias('pl_outs'),
fn.SUM(Play.hit).over(partition_by=[Play.pitcher_id]).alias('pl_hit'),
fn.COUNT(Play.on_first_final).filter(
Play.on_first_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_first'),
fn.COUNT(Play.on_second_final).filter(
Play.on_second_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_second'),
fn.COUNT(Play.on_third_final).filter(
Play.on_third_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_third'),
fn.COUNT(Play.batter_final).filter(
Play.batter_final == 4).over(partition_by=[Play.pitcher_id]).alias('pl_run_batter'),
fn.COUNT(Play.on_first_final).filter(
(Play.on_first_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_first'),
fn.COUNT(Play.on_second_final).filter(
(Play.on_second_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_second'),
fn.COUNT(Play.on_third_final).filter(
(Play.on_third_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_third'),
fn.COUNT(Play.batter_final).filter(
(Play.batter_final == 4) & (Play.inning_num << in_innings)).over(partition_by=[Play.pitcher_id]).alias('pl_in_run_batter'),
fn.SUM(Play.so).over(partition_by=[Play.pitcher_id]).alias('pl_so'),
fn.SUM(Play.bb).over(partition_by=[Play.pitcher_id]).alias('pl_bb'),
fn.SUM(Play.hbp).over(partition_by=[Play.pitcher_id]).alias('pl_hbp'),
fn.SUM(Play.wild_pitch).over(partition_by=[Play.pitcher_id]).alias('pl_wild_pitch'),
fn.SUM(Play.balk).over(partition_by=[Play.pitcher_id]).alias('pl_balk'),
fn.SUM(Play.homerun).over(partition_by=[Play.pitcher_id]).alias('pl_homerun'),
fn.SUM(Play.outs).over(partition_by=[Play.pitcher.team_id]).alias('tm_outs'),
fn.SUM(Play.hit).over(partition_by=[Play.pitcher.team_id]).alias('tm_hit'),
fn.COUNT(Play.on_first_final).filter(
Play.on_first_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_first'),
fn.COUNT(Play.on_second_final).filter(
Play.on_second_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_second'),
fn.COUNT(Play.on_third_final).filter(
Play.on_third_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_third'),
fn.COUNT(Play.batter_final).filter(
Play.batter_final == 4).over(partition_by=[Play.pitcher.team_id]).alias('tm_run_batter'),
fn.SUM(Play.so).over(partition_by=[Play.pitcher.team_id]).alias('tm_so'),
fn.SUM(Play.bb).over(partition_by=[Play.pitcher.team_id]).alias('tm_bb'),
fn.SUM(Play.hbp).over(partition_by=[Play.pitcher.team_id]).alias('tm_hbp'),
fn.SUM(Play.wild_pitch).over(partition_by=[Play.pitcher.team_id]).alias('tm_wild_pitch'),
fn.SUM(Play.balk).over(partition_by=[Play.pitcher.team_id]).alias('tm_balk'),
fn.SUM(Play.homerun).over(partition_by=[Play.pitcher.team_id]).alias('tm_homerun'),
).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
logger.debug(f'db_calls_gameplay - get_pitching_stats - pitching_stats: {pitching_stats}')
# This is counging plays with multiple runs scored on 1 ER and the rest unearned
# earned_runs_pl = Play.select().where(
# ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) |
# (Play.batter_final == 4)) & (Play.error == 0)
# ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
# logger.info(f'earned_runs: {earned_runs_pl}')
er_first = Play.select().where(
(Play.on_first_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
er_second = Play.select().where(
(Play.on_second_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
er_third = Play.select().where(
(Play.on_third_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
er_batter = Play.select().where(
(Play.batter_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
# earned_runs_tm = Play.select().where(
# ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) |
# (Play.batter_final == 4)) & (Play.error == 0)
# ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
if lineup_id is not None:
pitching_stats = pitching_stats.where(Play.pitcher_id == lineup_id)
if in_pow is not None:
pitching_stats = pitching_stats.where(Play.in_pow == in_pow)
# if in_innings is not None:
# pitching_stats = pitching_stats.where(Play.inning_num << in_innings)
# logger.info(f'db_calls_gameplay - get_pitching_stats - in_innings: {in_innings} / query: {pitching_stats} / '
# f'stats: {pitching_stats.count()}')
tm_earned_runs = None
if team_id is not None:
tm_er_first = Play.select().where(
(Play.on_first_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
tm_er_second = Play.select().where(
(Play.on_second_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
tm_er_third = Play.select().where(
(Play.on_third_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
tm_er_batter = Play.select().where(
(Play.batter_final == 4) & (Play.error == 0)
).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
# er_first = er_first.where(Play.pitcher.team_id == team_id)
# er_second = er_second.where(Play.pitcher.team_id == team_id)
# er_third = er_third.where(Play.pitcher.team_id == team_id)
# er_batter = er_batter.where(Play.pitcher.team_id == team_id)
pitching_stats = pitching_stats.where(Play.pitcher.team_id == team_id)
tm_er_first = tm_er_first.where(Play.pitcher.team_id == team_id)
tm_er_second = tm_er_second.where(Play.pitcher.team_id == team_id)
tm_er_third = tm_er_third.where(Play.pitcher.team_id == team_id)
tm_er_batter = tm_er_batter.where(Play.pitcher.team_id == team_id)
tm_earned_runs = tm_er_first.count() + tm_er_second.count() + tm_er_third.count() + tm_er_batter.count()
pl_earned_runs = er_first.count() + er_second.count() + er_third.count() + er_batter.count()
done_pitchers = []
return_pitchers = []
for x in pitching_stats:
if x.pitcher.id not in done_pitchers:
return_pitchers.append({
'pitcher_id': x.pitcher_id,
'card_id': x.pitcher.card_id,
'team_id': x.pitcher.team_id,
'pl_outs': x.pl_outs,
'pl_hit': x.pl_hit,
'pl_eruns': pl_earned_runs,
'pl_runs': x.pl_run_first + x.pl_run_second + x.pl_run_third + x.pl_run_batter,
'pl_in_runs': x.pl_in_run_first + x.pl_in_run_second + x.pl_in_run_third + x.pl_in_run_batter,
'pl_so': x.pl_so,
'pl_bb': x.pl_bb,
'pl_hbp': x.pl_hbp,
'pl_homerun': x.pl_homerun,
'pl_wild_pitch': x.pl_wild_pitch,
'pl_balk': x.pl_balk,
'tm_outs': x.tm_outs,
'tm_hit': x.tm_hit,
'tm_eruns': tm_earned_runs,
'tm_runs': x.tm_run_first + x.tm_run_second + x.tm_run_third + x.tm_run_batter,
'tm_so': x.tm_so,
'tm_bb': x.tm_bb,
'tm_hbp': x.tm_hbp,
'tm_homerun': x.tm_homerun,
'tm_wild_pitch': x.tm_wild_pitch,
'tm_balk': x.tm_balk,
'pl_gs': 1 if x.pitcher.after_play == 0 else 0
})
done_pitchers.append(x.pitcher_id)
db.close()
logger.debug(f'pitching stats: {return_pitchers}')
return return_pitchers
def get_pitching_decisions(game: StratGame, db_game_id: int):
# is_win, is_hold, is_loss = False, False, False
# away_lineups = get_team_lineups(game.id, game.away_team_id)
# home_lineups = get_team_lineups(game.id, game.home_team_id)
# away_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)]
# home_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)]
# last_play = get_current_play(game.id)
logger.debug(f'this game: {game}')
winner = None
loser = None
save = None
gs = []
b_save = []
holds = []
# Get starting pitchers and update this as a pointer for the play crawl
away_pitcher = Lineup.get(Lineup.game_id == game.id, Lineup.team_id == game.away_team_id, Lineup.position == 'P')
home_pitcher = Lineup.get(Lineup.game_id == game.id, Lineup.team_id == game.home_team_id, Lineup.position == 'P')
gs.extend([away_pitcher.card_id, home_pitcher.card_id])
logger.debug(f'SPs: {away_pitcher} / {home_pitcher}')
decisions = {
away_pitcher.player_id: DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=away_pitcher.player_id,
pitcher_team_id=away_pitcher.team_id,
is_start=True
),
home_pitcher.player_id: DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=home_pitcher.player_id,
pitcher_team_id=home_pitcher.team_id,
is_start=True
)
} # { <player_id>: DecisionModel }
for x in Play.select().where(Play.game_id == game.id):
logger.debug(f'checking play num {x.play_num}')
if x.inning_half == 'Top' and home_pitcher != x.pitcher:
if save == home_pitcher:
if x.home_score > x.away_score:
holds.append(save)
else:
b_save.append(save)
save = None
home_pitcher = x.pitcher
if x.home_score > x.away_score and x.home_score - x.away_score <= 3:
save = home_pitcher
elif x.inning_half == 'Bot' and away_pitcher != x.pitcher:
if save == away_pitcher:
if x.away_score > x.home_score:
holds.append(save)
else:
b_save.append(save)
save = None
away_pitcher = x.pitcher
if x.away_score > x.home_score and x.away_score - x.home_score <= 3:
save = away_pitcher
if x.is_go_ahead:
logger.debug(f'is go ahead: {x}')
if x.on_third_final == 4:
# winning_run = x.on_third
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_third
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher
elif x.on_second_final == 4:
# winning_run = x.on_second
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_second
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher
elif x.on_first_final == 4:
# winning_run = x.on_first
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_first
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher
elif x.batter_final == 4:
# winning_run = x.batter
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.batter
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == 'Bot' else away_pitcher
if x.is_tied:
logger.debug(f'is tied: {x}')
winner, loser = None, None
if save:
b_save.append(save)
save = None
if home_pitcher.player_id not in decisions:
decisions[home_pitcher.player_id] = DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=home_pitcher.player_id,
pitcher_team_id=home_pitcher.team_id
)
if away_pitcher.player_id not in decisions:
decisions[away_pitcher.player_id] = DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=away_pitcher.player_id,
pitcher_team_id=away_pitcher.team_id
)
decisions[winner.player_id].win = 1
decisions[loser.player_id].loss = 1
if save is not None:
decisions[save.player_id].is_save = 1
for x in holds:
decisions[x.player_id].hold = 1
for x in b_save:
decisions[x.player_id].b_save = 1
return [x.dict() for x in decisions.values()]
logger.debug(f'\n\nWin: {winner}\nLose: {loser}\nSave: {save}\nBlown Save: {b_save}\nHolds: {holds}')
return {
'winner': winner.card_id,
'loser': loser.card_id,
'save': save.card_id if save else None,
'b_save': b_save,
'holds': holds,
'starters': gs,
'w_lineup': winner,
'l_lineup': loser,
's_lineup': save
}
# for count, pit_pair in enumerate(away_pitchers):
# """
# If starter & team won
# check for win condition
# if starter & team lost
# check for loss condition
# if reliever & team won
# check for save situation
# if save situation, check for blown save
# credit either BS or Save; if Save is not None, add Save to holds and replace with this pitcher
# if reliever & team lost
# check for loss condition
# """
# # If starter & team won
# if pit_pair[0].after_play == 0 and (last_play.away_score > last_play.home_score):
# if len(away_pitchers) == 1:
# winner = pit_pair[0]
# else:
# next_p_first = away_pitchers[count + 1][1]
# if next_p_first.away_score > next_p_first.home_score:
# winner = pit_pair[0]
# else:
# winner = away_pitchers[count + 1][0]
# # if starter & team lost
# elif pit_pair[0].after_play == 0:
# if len(away_pitchers) == 1:
# loser = pit_pair[0]
# else:
# next_p_first = away_pitchers[count + 1][1]
# if next_p_first.away_score < next_p_first.home_score:
# def get_final_scorebug(away_team, home_team, away_score, home_score):
# return f'```' \
# f'Team | R | H | E |\n' \
# f'{away_team["abbrev"].replace("Gauntlet-", ""): <4} | {away_stats["score"]: >2} | ' \
# f'{away_stats["hits"]: >2} | ' \
# f'{away_stats["f_lines"][0]["tm_error"] if away_stats["f_lines"] else 0: >2} |\n' \
# f'{home_team["abbrev"].replace("Gauntlet-", ""): <4} | {home_stats["score"]: >2} | ' \
# f'{home_stats["hits"]: >2} | ' \
# f'{home_stats["f_lines"][0]["tm_error"] if home_stats["f_lines"] else 0: >2} |\n' \
# f'```'
class StratManagerAi(pydantic.BaseModel):
id: int
name: str
steal: int = 5
running: int = 5
hold: int = 5
catcher_throw: int = 5
uncapped_home: int = 5
uncapped_third: int = 5
uncapped_trail: int = 5
bullpen_matchup: int = 5
behind_aggression: int = 5
ahead_aggression: int = 5
decide_throw: int = 5
"""
Rating Rule of Thumb:
1: Least Aggressive
5: Average
10: Most Aggressive
"""
# def __init__(self, **data) -> None:
# super().__init__(**data)
#
# seed = random.randint(1, 100)
# if seed > 95:
# self.steal = 10
# self.running = 10
# self.uncapped_third = 10
# self.uncapped_home = 10
# self.uncapped_trail = 10
# self.behind_aggression = 10
# self.ahead_aggression = 10
# elif seed > 80:
# self.steal = 8
# self.running = 8
# self.uncapped_third = 8
# self.uncapped_home = 8
# self.uncapped_trail = 8
# self.behind_aggression = 8
# self.ahead_aggression = 5
# elif seed <= 40:
# self.steal = 3
# self.running = 3
# self.uncapped_third = 3
# self.uncapped_home = 3
# self.uncapped_trail = 3
# self.behind_aggression = 5
# self.ahead_aggression = 3
# elif seed <= 15:
# self.steal = 1
# self.running = 1
# self.uncapped_third = 1
# self.uncapped_home = 1
# self.uncapped_trail = 1
# self.behind_aggression = 3
# self.ahead_aggression = 1
def check_jump(self, to_base: int, outs: int) -> Optional[str]:
"""Returns a string to be appended to the AI note"""
steal_base = f'attempt to steal'
if to_base == 2 or to_base == 3:
if self.steal == 10:
if to_base == 2:
return f'{steal_base} second if the runner has an ***** auto-jump or the safe range is 13+'
else:
steal_range = 13
elif self.steal >= 8:
steal_range = 14
elif self.steal >= 5:
steal_range = 15
elif self.steal >= 3:
steal_range = 16
else:
steal_range = 17
if outs == 2:
steal_range += 1
elif outs == 0:
steal_range -= 1
return f'{steal_base} {"second" if to_base == 2 else "third"} if their safe range is {steal_range}+'
else:
return None
def tag_from_second(self, outs: int) -> str:
"""Returns a string to be posted ahead of tag up message"""
tag_base = f'attempt to tag up if their safe range is'
if self.running >= 8:
tag_range = 5
elif self.running >= 5:
tag_range = 10
else:
tag_range = 12
if outs == 2:
tag_range += 3
elif outs == 0:
tag_range -= 2
return f'{tag_base} {tag_range}+'
def tag_from_third(self, outs: int) -> str:
"""Returns a string to be posted with the tag up message"""
tag_base = f'attempt to tag up if their safe range is'
if self.running >= 8:
tag_range = 8
elif self.running >= 5:
tag_range = 10
else:
tag_range = 12
if outs == 2:
tag_range -= 2
elif outs == 0:
tag_range += 2
return f'{tag_base} {tag_range}+'
def uncapped_advance(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the advancement message"""
advance_base = f'attempt to advance if their safe range is'
if to_base == 3:
if self.uncapped_third >= 8:
advance_range = 14
elif self.uncapped_third >= 5:
advance_range = 18
else:
return f'not attempt to advance'
if outs == 2:
advance_range += 2
else:
if self.uncapped_home >= 8:
advance_range = 8
elif self.uncapped_home >= 5:
advance_range = 10
else:
advance_range = 12
if outs == 2:
advance_range -= 2
elif outs == 0:
advance_range += 3
return f'{advance_base} {advance_range}+'
# def uncapped_advance_runner(self, this_play: StratPlay, to_base: int, runner: BattingCard, defender_pos: CardPosition, modifier: int = -1):
# total_mod = modifier + defender_pos.arm
# if to_base == 3:
def trail_advance(self, to_base: int, outs: int, sent_home: bool = False) -> str:
"""Returns a string to be posted with the advancement message"""
advance_base = f'attempt to advance if their safe range is'
if sent_home:
if self.uncapped_trail >= 8:
return 'attempt to advance'
elif self.uncapped_trail >= 5:
if outs == 2:
return 'attempt to advance'
else:
advance_range = 14
else:
return 'not attempt to advance'
else:
if self.uncapped_trail >= 8:
advance_range = 14
else:
advance_range = 16
return f'{advance_base} {advance_range}+'
def throw_lead_runner(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the throw message"""
return 'throw for the lead runner'
def throw_which_runner(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the throw message"""
if to_base == 4:
return 'throw for the lead runner'
else:
return 'throw for the lead runner if their safe range is 14-'
def gb_decide_advance(self, starting_outs: int, run_lead: int):
"""Returns a string to be posted with the advancement message"""
advance_base = f'attempt to advance if their safe range is'
if self.running >= 8:
advance_range = 10
elif self.running >= 5:
advance_range = 13
else:
advance_range = 16
if starting_outs == 1:
advance_range += 3
if run_lead >= 4:
advance_range -= 3
elif run_lead < 0:
advance_range += 3
return f'{advance_base} {min(advance_range, 20)}+'
def gb_decide_throw(self, starting_outs: int, run_lead: int):
"""Returns a string to be posted with the advancement message"""
throw_base = f'throw for the lead runner if their safe range is'
if self.decide_throw >= 8:
throw_range = 13
elif self.decide_throw >= 5:
throw_range = 10
else:
throw_range = 8
if starting_outs == 1:
throw_range -= 3
if run_lead >= 4:
throw_range -= 3
elif run_lead < 0:
throw_range += 3
return f'{throw_base} {max(throw_range, 0)}-'
def go_to_reliever(
self, this_play, tot_allowed: int, is_starter: bool = False) -> bool:
run_lead = this_play.ai_run_diff()
obc = this_play.on_base_code
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: '
f'outs: {this_play.starting_outs}, obc: {obc}, run_lead: {run_lead}, '
f'tot_allowed: {tot_allowed}')
lead_target = run_lead if is_starter else 3
# AI up big
if tot_allowed < 5 and is_starter:
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 1')
return False
elif run_lead > 5 or (run_lead > 2 and self.ahead_aggression > 5):
if tot_allowed <= lead_target or obc <= 3 or (this_play.starting_outs == 2 and not is_starter):
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 2')
return False
elif run_lead > 2 or (run_lead >= 0 and self.ahead_aggression > 5):
if tot_allowed < lead_target or obc <= 1 or (this_play.starting_outs == 2 and not is_starter):
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 3')
return False
elif run_lead >= 0 or (run_lead >= -2 and self.behind_aggression > 5):
if tot_allowed < 5 or obc <= run_lead or (this_play.starting_outs == 2 and not is_starter):
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 4')
return False
elif run_lead >= -3 and self.behind_aggression > 5:
if tot_allowed < 5 and obc <= 1:
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 5')
return False
elif run_lead <= -5:
if is_starter and this_play.inning_num <= 3:
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 6')
return False
if this_play.starting_outs != 0:
logger.info(f'db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 7')
return False
return True
def convert_strat_manager(manager: ManagerAi) -> StratManagerAi:
manager_dict = model_to_dict(manager)
return StratManagerAi(**manager_dict)
def get_manager(game) -> Optional[StratManagerAi]:
if not game.ai_team:
return None
# manager_ai_id = game.home_team_id if game.ai_team == 'home' else game.away_team_id
# manager_ai_id = 1
team_id = game.home_team_id if game.ai_team == 'home' else game.away_team_id
manager_ai_id = ((datetime.datetime.now().day * team_id) % 3) + 1
if manager_ai_id > 3 or manager_ai_id < 1:
manager_ai_id = 1
logger.debug(f'manager id: {manager_ai_id} for game {game}')
try:
this_manager = ManagerAi.get_by_id(manager_ai_id)
except Exception as e:
e_message = f'Could not find manager id {manager_ai_id}'
logger.error(f'{e_message}: {type(e)}: {e}')
raise KeyError(f'Could not find this AI manager\'s playbook')
return convert_strat_manager(this_manager)
db.close()