paper-dynasty-discord/db_calls_gameplay.py
Cal Corum c0af0c3d32
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m19s
fix: pack rarity targeting, StratGame methods, HR detection (#20 #21 #22)
- Fix pack distribution to use exact rarity targeting (rarity=0 for
  Replacement, rarity=1 for Reserve) instead of max_rarity=1 which
  matched both tiers; applied to cogs/economy.py and
  cogs/economy_new/team_setup.py

- Add get_away_team() and get_home_team() async methods to StratGame
  dataclass, delegating to get_game_team() with the appropriate
  team_id; remove stale TODO comment from Game model

- Standardize home-run detection in complete_play(): set
  batter_final = batter_to_base when not None before the HR check,
  then only check batter_final == 4 (removes redundant batter_to_base
  path and the patch comment)

Closes #20, Closes #21, Closes #22

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 23:31:16 -05:00

2825 lines
99 KiB
Python

import asyncio
import datetime
import logging
import random
import requests
import pydantic
from typing import Optional
from peewee import *
from playhouse.shortcuts import model_to_dict
from dataclasses import dataclass
from helpers import SBA_SEASON, PD_SEASON, get_player_url, get_sheets
from api_calls import db_get
from in_game.data_cache import get_pd_player, CardPosition, BattingCard, get_pd_team
db = SqliteDatabase(
"storage/gameplay-legacy.db",
pragmas={"journal_mode": "wal", "cache_size": -1 * 64000, "synchronous": 0},
)
SBA_DB_URL = "http://database/api"
logger = logging.getLogger("discord_app")
def param_char(other_params):
if other_params:
return "&"
else:
return "?"
def get_sba_team(id_or_abbrev, season=None):
req_url = f"{SBA_DB_URL}/v1/teams/{id_or_abbrev}"
if season:
req_url += f"?season={season}"
resp = requests.get(req_url, timeout=3)
if resp.status_code == 200:
return resp.json()
else:
logger.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
def get_sba_player(id_or_name, season=None):
req_url = f"{SBA_DB_URL}/v2/players/{id_or_name}"
if season is not None:
req_url += f"?season={season}"
resp = requests.get(req_url, timeout=3)
if resp.status_code == 200:
return resp.json()
else:
logger.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
def get_sba_team_by_owner(season, owner_id):
resp = requests.get(
f"{SBA_DB_URL}/v1/teams?season={season}&owner_id={owner_id}&active_only=True",
timeout=3,
)
if resp.status_code == 200:
full_resp = resp.json()
if len(full_resp["teams"]) != 1:
raise ValueError(f"One team requested, but {len(full_resp)} were returned")
else:
return full_resp["teams"][0]
else:
logger.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
# def pd_await db_get(endpoint: str, api_ver: int = 1, object_id: int = None, params: list = None, none_okay: bool = True):
# req_url = pd_get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
# logger.info(f'get:\n{endpoint} id: {object_id} params: {params}')
#
# resp = requests.get(req_url, timeout=3)
# if resp.status_code == 200:
# data = resp.json()
# logger.info(f'return: {data}')
# return data
# elif none_okay:
# data = resp.json()
# logger.info(f'return: {data}')
# return None
# else:
# logger.warning(resp.text)
# raise ValueError(f'DB: {resp.text}')
# def pd_get_one_team(team_abbrev: str):
# team = pd_await db_get('teams', params=[('abbrev', team_abbrev), ('season', PD_SEASON)], none_okay=False)['teams'][0]
# return team
#
# # req_url = pd_get_req_url('teams', )
# #
# # resp = requests.get(req_url, timeout=3)
# # if resp.status_code == 200:
# # data = resp.json()
# # logger.info(f'return: {data}')
# # return data['teams'][0]
# # else:
# # logger.warning(resp.text)
# # raise ValueError(f'PD DB: {resp.text}')
# def pd_get_card_by_id(card_id: int):
# return pd_await db_get('cards', object_id=card_id, none_okay=False)
class DecisionModel(pydantic.BaseModel):
game_id: int
season: int
week: int
pitcher_id: int
pitcher_team_id: int
win: int = 0
loss: int = 0
hold: int = 0
is_save: int = 0
is_start: bool = False
b_save: int = 0
irunners: int = 0
irunners_scored: int = 0
rest_ip: float = 0
rest_required: int = 0
class BaseModel(Model):
class Meta:
database = db
class ManagerAi(BaseModel):
name = CharField()
steal = IntegerField(default=5)
running = IntegerField(default=5)
hold = IntegerField(default=5)
catcher_throw = IntegerField(default=5)
uncapped_home = IntegerField(default=5)
uncapped_third = IntegerField(default=5)
uncapped_trail = IntegerField(default=5)
bullpen_matchup = IntegerField(default=5)
behind_aggression = IntegerField(default=5)
ahead_aggression = IntegerField(default=5)
decide_throw = IntegerField(default=5)
def load_ai():
all_ai = [{"name": "Basic"}]
for x in all_ai:
ManagerAi.create(**x)
return True
def ai_batting(this_game, this_play) -> bool:
return (this_play.inning_half == "Top" and this_game.ai_team == "away") or (
this_play.inning_half == "Bot" and this_game.ai_team == "home"
)
db.create_tables([ManagerAi])
class Game(BaseModel):
away_team_id = IntegerField()
home_team_id = IntegerField()
week_num = IntegerField(null=True)
game_num = IntegerField(null=True)
channel_id = IntegerField()
season = IntegerField()
active = BooleanField(default=True)
is_pd = BooleanField(default=False)
ranked = BooleanField(default=True)
short_game = BooleanField(default=False)
away_roster_num = IntegerField(null=True)
home_roster_num = IntegerField(null=True)
first_message = IntegerField(null=True)
ai_team = CharField(null=True)
game_type = CharField(default="minor-league")
cardset_ids = CharField(null=True)
backup_cardset_ids = CharField(null=True)
@dataclass
class StratGame:
id: int
away_team_id: int
home_team_id: int
channel_id: int
season: int
active: bool = True
is_pd: bool = False
ranked: bool = True
short_game: bool = False
week_num: int = None
game_num: int = None
away_roster_num: int = None
home_roster_num: int = None
first_message: int = None
ai_team: str = None
game_type: str = None
cardset_ids: str = None
backup_cardset_ids: str = None
async def get_away_team(self):
return await get_game_team(self, team_id=self.away_team_id)
async def get_home_team(self):
return await get_game_team(self, team_id=self.home_team_id)
db.create_tables([Game])
# class GameModel(pydantic.BaseModel):
# away_team_id: int
# home_team_id: int
# week_num: Optional[int] = None
# game_num: Optional[int] = None
# channel_id: int
# active: Optional[bool] = True
# is_pd: Optional[bool] = True
def count_team_games(team_id: int, active: bool = True):
all_games = Game.select().where(
((Game.away_team_id == team_id) | (Game.home_team_id == team_id))
& (Game.active == active)
)
return {"count": all_games.count(), "games": [model_to_dict(x) for x in all_games]}
def post_game(game_dict: dict):
# game_model = GameModel.parse_obj(game_dict)
# new_game = Game(
# away_team_id=game_model.away_team_id,
# home_team_id=game_model.home_team_id,
# week_num=game_model.week_num,
# game_num=game_model.game_num,
# channel_id=game_model.channel_id,
# active=game_model.active,
# is_pd=game_model.is_pd
# )
# new_game.save()
new_game = Game.create(**game_dict)
# return_game = model_to_dict(new_game)
return_game = StratGame(
new_game.id,
new_game.away_team_id,
new_game.home_team_id,
new_game.channel_id,
new_game.season,
new_game.active,
new_game.is_pd,
new_game.ranked,
new_game.short_game,
new_game.week_num,
new_game.game_num,
new_game.away_roster_num,
new_game.home_roster_num,
new_game.first_message,
new_game.ai_team,
new_game.game_type,
new_game.cardset_ids,
new_game.backup_cardset_ids,
)
db.close()
return return_game
def get_one_game(
game_id=None,
away_team_id=None,
home_team_id=None,
week_num=None,
game_num=None,
channel_id=None,
active=None,
) -> Optional[StratGame]:
single_game, this_game = None, None
if game_id is not None:
single_game = Game.get_by_id(game_id)
else:
this_game = Game.select()
if away_team_id is not None:
this_game = this_game.where(Game.away_team_id == away_team_id)
if home_team_id is not None:
this_game = this_game.where(Game.home_team_id == home_team_id)
if week_num is not None:
this_game = this_game.where(Game.week_num == week_num)
if game_num is not None:
this_game = this_game.where(Game.game_num == game_num)
if channel_id is not None:
this_game = this_game.where(Game.channel_id == channel_id)
if active is not None:
this_game = this_game.where(Game.active == active)
if single_game or this_game.count() > 0:
# return_game = model_to_dict(this_game[0])
r_game = this_game[0] if this_game else single_game
return_game = StratGame(**model_to_dict(r_game))
db.close()
return return_game
else:
return None
async def get_game_team(
game: StratGame,
gm_id: int = None,
team_abbrev: str = None,
team_id: int = None,
skip_cache: bool = False,
) -> dict:
if not gm_id and not team_abbrev and not team_id:
raise KeyError(
f"get_game_team requires either one of gm_id, team_abbrev, or team_id to not be None"
)
logger.debug(
f"getting game team for game {game.id} / gm_id: {gm_id} / "
f"tm_abbrev: {team_abbrev} / team_id: {team_id} / game: {game}"
)
if game.is_pd:
if team_id:
return await get_pd_team(team_id, skip_cache=skip_cache)
elif gm_id:
return await get_pd_team(gm_id, skip_cache=skip_cache)
# t_query = await db_get('teams', params=[('season', PD_SEASON), ('gm_id', gm_id)])
# return t_query['teams'][0]
else:
t_query = await db_get(
"teams", params=[("season", PD_SEASON), ("abbrev", team_abbrev)]
)
return t_query["teams"][0]
else:
if gm_id:
return get_sba_team_by_owner(season=SBA_SEASON, owner_id=gm_id)
elif team_id:
return get_sba_team(team_id)
else:
return get_sba_team(team_abbrev, season=SBA_SEASON)
def patch_game(
game_id,
away_team_id=None,
home_team_id=None,
week_num=None,
game_num=None,
channel_id=None,
active=None,
first_message=None,
home_roster_num=None,
away_roster_num=None,
ai_team=None,
cardset_ids=None,
backup_cardset_ids=None,
):
this_game = Game.get_by_id(game_id)
if away_team_id is not None:
this_game.away_team_id = away_team_id
if home_team_id is not None:
this_game.home_team_id = home_team_id
if week_num is not None:
this_game.week_num = week_num
if game_num is not None:
this_game.game_num = game_num
if channel_id is not None:
this_game.channel_id = channel_id
if active is not None:
this_game.active = active
if first_message is not None:
this_game.first_message = first_message
if home_roster_num is not None:
this_game.home_roster_num = home_roster_num
if away_roster_num is not None:
this_game.away_roster_num = away_roster_num
if ai_team is not None:
this_game.ai_team = ai_team
if cardset_ids is not None:
this_game.cardset_ids = cardset_ids
if backup_cardset_ids is not None:
this_game.backup_cardset_ids = backup_cardset_ids
this_game.save()
# return_game = model_to_dict(this_game)
return_game = StratGame(
this_game.id,
this_game.away_team_id,
this_game.home_team_id,
this_game.channel_id,
this_game.season,
this_game.active,
this_game.is_pd,
this_game.ranked,
this_game.short_game,
this_game.week_num,
this_game.game_num,
this_game.away_roster_num,
this_game.home_roster_num,
this_game.first_message,
this_game.ai_team,
this_game.game_type,
this_game.cardset_ids,
this_game.backup_cardset_ids,
)
db.close()
return return_game
def get_last_game_ids(num_games: int = 10) -> list:
last_games = Game.select(Game.id).order_by(-Game.id).limit(num_games)
game_list = [x.id for x in last_games]
return game_list
def get_active_games(limit: int = 10) -> list:
g_query = Game.select().where(Game.active)
if limit < 0:
limit = 1
g_query = g_query.limit(limit)
return [x for x in g_query]
class Lineup(BaseModel):
game = ForeignKeyField(Game)
team_id = IntegerField()
player_id = IntegerField()
card_id = IntegerField(null=True) # Only needed for Paper Dynasty games
position = CharField()
batting_order = IntegerField()
after_play = IntegerField()
replacing_id = IntegerField(null=True)
active = BooleanField(default=True)
variant = IntegerField(default=0)
is_fatigued = BooleanField(null=True)
@dataclass
class StratLineup:
id: int
game: StratGame
team_id: int
player_id: int
position: str
batting_order: int
after_play: int
replacing_id: int = None
active: bool = True
card_id: int = None
variant: int = 0
is_fatigued: bool = None
def convert_stratlineup(lineup: Lineup) -> StratLineup:
lineup_dict = model_to_dict(lineup)
lineup_dict["game"] = StratGame(**lineup_dict["game"])
return StratLineup(**lineup_dict)
db.create_tables([Lineup])
def get_one_lineup(
game_id: int,
lineup_id: int = None,
team_id: int = None,
batting_order: int = None,
position: str = None,
card_id: int = None,
active: bool = True,
as_obj: bool = False,
) -> Optional[StratLineup]:
if not batting_order and not position and not lineup_id and not card_id:
raise KeyError(
f"One of batting_order, position, card_id , or lineup_id must not be None"
)
if lineup_id:
this_lineup = Lineup.get_by_id(lineup_id)
elif card_id:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id,
Lineup.card_id == card_id,
Lineup.active == active,
)
elif batting_order:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id,
Lineup.team_id == team_id,
Lineup.batting_order == batting_order,
Lineup.active == active,
)
else:
this_lineup = Lineup.get_or_none(
Lineup.game_id == game_id,
Lineup.team_id == team_id,
Lineup.position == position,
Lineup.active == active,
)
logger.debug(f"get_one_lineup / this_lineup: {this_lineup}")
if as_obj:
return this_lineup
if not this_lineup:
return None
# return_value = model_to_dict(this_lineup)
# return_value = StratLineup(**vars(this_lineup)["__data__"])
return_value = convert_stratlineup(this_lineup)
# return_value = StratLineup(
# game=StratGame(**model_to_dict(this_lineup.game)),
# team_id=this_lineup.team_id,
# player_id=this_lineup.player_id,
# position=this_lineup.position,
# batting_order=this_lineup.batting_order,
# after_play=this_lineup.after_play,
# active=
# )
db.close()
return return_value
async def get_team_lineups(
game_id: int,
team_id: int,
inc_inactive: bool = False,
pitchers_only: bool = False,
as_string: bool = True,
):
all_lineups = (
Lineup.select()
.where((Lineup.game_id == game_id) & (Lineup.team_id == team_id))
.order_by(Lineup.batting_order)
)
if not inc_inactive:
all_lineups = all_lineups.where(Lineup.active == True)
if pitchers_only:
all_lineups = all_lineups.where(Lineup.position == "P")
# if all_lineups.count() == 0:
# return None
if as_string:
l_string = ""
this_game = Game.get_by_id(game_id)
for x in all_lineups:
l_string += f"{x.batting_order}. {player_link(this_game, await get_player(this_game, x))} - {x.position}\n"
db.close()
return l_string
return_lineups = []
for x in all_lineups:
# return_lineups.append(model_to_dict(x))
return_lineups.append(convert_stratlineup(x))
db.close()
return return_lineups
def post_lineups(lineups: list):
with db.atomic():
Lineup.insert_many(lineups).execute()
db.close()
def patch_lineup(
lineup_id,
active: Optional[bool] = None,
position: Optional[str] = None,
replacing_id: Optional[int] = None,
is_fatigued: Optional[bool] = None,
) -> StratLineup:
this_lineup = Lineup.get_by_id(lineup_id)
if active is not None:
this_lineup.active = active
if position is not None:
this_lineup.position = position
if replacing_id is not None:
this_lineup.replacing_id = replacing_id
if is_fatigued is not None:
this_lineup.is_fatigued = is_fatigued
this_lineup.save()
# return_lineup = model_to_dict(this_lineup)
return_value = convert_stratlineup(this_lineup)
db.close()
return return_value
def make_sub(lineup_dict: dict):
# Check for dupe player / card
this_game = Game.get_by_id(lineup_dict["game_id"])
if this_game.is_pd:
player_conflict = Lineup.select().where(
(Lineup.game_id == lineup_dict["game_id"])
& (Lineup.team_id == lineup_dict["team_id"])
& (Lineup.card_id == lineup_dict["card_id"])
)
db_error = f"This card is already in the lineup"
else:
player_conflict = Lineup.select().where(
(Lineup.game_id == lineup_dict["game_id"])
& (Lineup.team_id == lineup_dict["team_id"])
& (Lineup.player_id == lineup_dict["player_id"])
)
db_error = f"This player is already in the lineup"
if player_conflict.count():
db.close()
raise DatabaseError(db_error)
subbed_player = Lineup.get_or_none(
Lineup.game_id == lineup_dict["game_id"],
Lineup.team_id == lineup_dict["team_id"],
Lineup.batting_order == lineup_dict["batting_order"],
Lineup.active == True,
)
logger.debug(f"subbed_player: {subbed_player}")
if subbed_player:
subbed_player = patch_lineup(subbed_player.id, active=False)
lineup_dict["replacing_id"] = subbed_player.id
new_lineup = Lineup.create(**lineup_dict)
# return_lineup = model_to_dict(new_lineup)
return_value = convert_stratlineup(new_lineup)
curr_play = get_current_play(lineup_dict["game_id"])
logger.debug(f"\n\nreturn_value: {return_value}\n\ncurr_play: {curr_play}\n\n")
# Check current play for updates
if curr_play:
if (
not curr_play.pitcher and curr_play.batter.team_id != return_value.team_id
) or return_value.position == "P":
patch_play(curr_play.id, pitcher_id=return_value.id)
if subbed_player:
if curr_play.batter == subbed_player:
patch_play(curr_play.id, batter_id=return_value.id)
elif curr_play.pitcher == subbed_player:
patch_play(curr_play.id, pitcher_id=return_value.id)
elif curr_play.catcher == subbed_player:
patch_play(curr_play.id, catcher_id=return_value.id)
elif curr_play.on_first == subbed_player:
patch_play(curr_play.id, on_first_id=return_value.id)
elif curr_play.on_second == subbed_player:
patch_play(curr_play.id, on_second_id=return_value.id)
elif curr_play.on_third == subbed_player:
patch_play(curr_play.id, on_third_id=return_value.id)
db.close()
return return_value
def undo_subs(game: StratGame, new_play_num: int):
logger.info(f"get new players")
new_players = Lineup.select().where(
(Lineup.game_id == game.id) & (Lineup.after_play > new_play_num)
)
logger.info(f"new_player count: {new_players.count()}")
replacements = [(x.id, x.replacing_id) for x in new_players]
for x in replacements:
logger.info(f"replacing {x[0]} with {x[1]}")
old_player = get_one_lineup(game_id=game.id, lineup_id=x[1])
logger.info(f"old_player: {old_player}")
patch_lineup(old_player.id, active=True)
logger.info(f"activated!")
logger.info(f"done activating old players")
Lineup.delete().where(
(Lineup.game_id == game.id) & (Lineup.after_play > new_play_num)
).execute()
async def get_player(game, lineup_member, as_dict: bool = True):
# await asyncio.sleep(0.1)
return await get_pd_player(lineup_member.player_id, as_dict=as_dict)
if isinstance(game, Game):
if game.is_pd:
this_card = await db_get(f"cards", object_id=lineup_member.card_id)
player = this_card["player"]
player["name"] = player["p_name"]
player["team"] = this_card["team"]
return player
else:
return get_sba_player(lineup_member.player_id)
elif isinstance(game, StratGame):
if game.is_pd:
# card_id = lineup_member.card_id if isinstance(lineup_member, Lineup) else lineup_member['card_id']
card_id = (
lineup_member["card_id"]
if isinstance(lineup_member, dict)
else lineup_member.card_id
)
this_card = await db_get(f"cards", object_id=card_id)
player = this_card["player"]
player["name"] = player["p_name"]
player["team"] = this_card["team"]
logger.debug(f"player: {player}")
return player
else:
return get_sba_player(lineup_member.player_id)
else:
raise TypeError(f"Cannot get player; game is not a valid object")
def player_link(game, player):
if isinstance(game, Game):
if game.is_pd:
return f"[{player['p_name']}]({player['image']})"
else:
return get_player_url(player)
elif isinstance(game, StratGame):
if game.is_pd:
return f"[{player['p_name']}]({player['image']})"
else:
return get_player_url(player)
else:
raise TypeError(f"Cannot get player link; game is not a valid object")
class Play(BaseModel):
game = ForeignKeyField(Game)
play_num = IntegerField()
batter = ForeignKeyField(Lineup)
batter_pos = CharField(default="DH")
pitcher = ForeignKeyField(Lineup, null=True)
on_base_code = IntegerField()
inning_half = CharField()
inning_num = IntegerField()
batting_order = IntegerField()
starting_outs = IntegerField()
away_score = IntegerField()
home_score = IntegerField()
in_pow = BooleanField(default=False)
on_first = ForeignKeyField(Lineup, null=True)
on_first_final = IntegerField(null=True)
on_second = ForeignKeyField(Lineup, null=True)
on_second_final = IntegerField(null=True)
on_third = ForeignKeyField(Lineup, null=True)
on_third_final = IntegerField(null=True)
batter_final = IntegerField(null=True)
pa = IntegerField(default=0)
ab = IntegerField(default=0)
run = IntegerField(default=0)
e_run = IntegerField(default=0)
hit = IntegerField(default=0)
rbi = IntegerField(default=0)
double = IntegerField(default=0)
triple = IntegerField(default=0)
homerun = IntegerField(default=0)
bb = IntegerField(default=0)
so = IntegerField(default=0)
hbp = IntegerField(default=0)
sac = IntegerField(default=0)
ibb = IntegerField(default=0)
gidp = IntegerField(default=0)
bphr = IntegerField(default=0)
bpfo = IntegerField(default=0)
bp1b = IntegerField(default=0)
bplo = IntegerField(default=0)
sb = IntegerField(default=0)
cs = IntegerField(default=0)
outs = IntegerField(default=0)
wpa = FloatField(default=0.0)
re24 = FloatField(default=0.0)
catcher = ForeignKeyField(Lineup, null=True)
defender = ForeignKeyField(Lineup, null=True)
runner = ForeignKeyField(Lineup, null=True)
check_pos = CharField(null=True)
error = IntegerField(default=0)
wild_pitch = IntegerField(default=0)
passed_ball = IntegerField(default=0)
pick_off = IntegerField(default=0)
balk = IntegerField(default=0)
complete = BooleanField(default=False)
locked = BooleanField(default=False)
is_go_ahead = BooleanField(default=False)
is_tied = BooleanField(default=False)
is_new_inning = BooleanField(default=False)
@dataclass
class StratPlay:
id: int
game: StratGame
play_num: int
batter: StratLineup
pitcher: StratLineup
on_base_code: int
inning_half: str
inning_num: int
batting_order: int
starting_outs: int
away_score: int
home_score: int
batter_pos: str = None
in_pow: bool = False
on_first: StratLineup = None
on_first_final: int = None
on_second: StratLineup = None
on_second_final: int = None
on_third: StratLineup = None
on_third_final: int = None
batter_final: int = None
pa: int = 0
ab: int = 0
run: int = 0
e_run: int = 0
hit: int = 0
rbi: int = 0
double: int = 0
triple: int = 0
homerun: int = 0
bb: int = 0
so: int = 0
hbp: int = 0
sac: int = 0
ibb: int = 0
gidp: int = 0
bphr: int = 0
bpfo: int = 0
bp1b: int = 0
bplo: int = 0
sb: int = 0
cs: int = 0
outs: int = 0
wpa: float = 0.0
re24: float = 0.0
catcher: StratLineup = None
defender: StratLineup = None
runner: StratLineup = None
check_pos: str = None
error: int = 0
wild_pitch: int = 0
passed_ball: int = 0
pick_off: int = 0
balk: int = 0
complete: bool = False
locked: bool = False
is_go_ahead: bool = False
is_tied: bool = False
is_new_inning: bool = False
def ai_run_diff(self):
if self.game.ai_team == "away":
return self.away_score - self.home_score
else:
return self.home_score - self.away_score
def convert_stratplay(play: Play) -> StratPlay:
play_dict = model_to_dict(play)
play_dict["game"] = StratGame(**play_dict["game"])
if play_dict["batter"]:
play_dict["batter"] = convert_stratlineup(play.batter)
if play_dict["pitcher"]:
play_dict["pitcher"] = convert_stratlineup(play.pitcher)
if play_dict["on_first"]:
play_dict["on_first"] = convert_stratlineup(play.on_first)
if play_dict["on_second"]:
play_dict["on_second"] = convert_stratlineup(play.on_second)
if play_dict["on_third"]:
play_dict["on_third"] = convert_stratlineup(play.on_third)
if play_dict["catcher"]:
play_dict["catcher"] = convert_stratlineup(play.catcher)
if play_dict["defender"]:
play_dict["defender"] = convert_stratlineup(play.defender)
if play_dict["runner"]:
play_dict["runner"] = convert_stratlineup(play.runner)
return StratPlay(**play_dict)
db.create_tables([Play])
def post_play(play_dict: dict) -> StratPlay:
logger.debug(f"play_dict: {play_dict}")
new_play = Play.create(**play_dict)
# return_play = model_to_dict(new_play)
return_play = convert_stratplay(new_play)
db.close()
return return_play
def patch_play(
play_id,
batter_id: int = None,
pitcher_id: int = None,
catcher_id: int = None,
locked: bool = None,
pa: int = None,
ab: int = None,
hit: int = None,
double: int = None,
triple: int = None,
homerun: int = None,
outs: int = None,
so: int = None,
bp1b: int = None,
bplo: int = None,
bphr: int = None,
bpfo: int = None,
walk: int = None,
hbp: int = None,
ibb: int = None,
sac: int = None,
sb: int = None,
is_go_ahead: bool = None,
defender_id: int = None,
check_pos: str = None,
error: int = None,
play_num: int = None,
cs: int = None,
on_first_id: int = None,
on_first_final: int = None,
on_second_id: int = None,
on_second_final: int = None,
on_third_id: int = None,
on_third_final: int = None,
starting_outs: int = None,
runner_id: int = None,
complete: bool = None,
rbi: int = None,
wp: int = None,
pb: int = None,
pick: int = None,
balk: int = None,
is_new_inning: bool = None,
batter_final: int = None,
in_pow: bool = None,
):
this_play = Play.get_by_id(play_id)
if batter_id is not None:
this_play.batter_id = batter_id
if pitcher_id is not None:
this_play.pitcher_id = pitcher_id
if catcher_id is not None:
this_play.catcher_id = catcher_id
if runner_id is not None:
this_play.runner_id = runner_id
if locked is not None:
this_play.locked = locked
if complete is not None:
this_play.complete = complete
if pa is not None:
this_play.pa = pa
if ab is not None:
this_play.ab = ab
if hit is not None:
this_play.hit = hit
if rbi is not None:
this_play.rbi = rbi
if double is not None:
this_play.double = double
if triple is not None:
this_play.triple = triple
if homerun is not None:
this_play.homerun = homerun
if starting_outs is not None:
this_play.starting_outs = starting_outs
if outs is not None:
this_play.outs = outs
if so is not None:
this_play.so = so
if bp1b is not None:
this_play.bp1b = bp1b
if bplo is not None:
this_play.bplo = bplo
if bphr is not None:
this_play.bphr = bphr
if bpfo is not None:
this_play.bpfo = bpfo
if walk is not None:
this_play.bb = walk
if hbp is not None:
this_play.hbp = hbp
if ibb is not None:
this_play.ibb = ibb
if sac is not None:
this_play.sac = sac
if sb is not None:
this_play.sb = sb
if cs is not None:
this_play.cs = cs
if wp is not None:
this_play.wild_pitch = wp
if pb is not None:
this_play.passed_ball = pb
if pick is not None:
this_play.pick_off = pick
if balk is not None:
this_play.balk = balk
if defender_id is not None:
if not defender_id:
this_play.defender_id = None
else:
this_play.defender_id = defender_id
if check_pos is not None:
if check_pos.lower() == "false":
this_play.check_pos = None
else:
this_play.check_pos = check_pos
if error is not None:
this_play.error = error
if play_num is not None:
this_play.play_num = play_num
if on_first_id is not None:
if not on_first_id:
this_play.on_first = None
else:
this_play.on_first_id = on_first_id
if on_first_final is not None:
if not on_first_final:
this_play.on_first_final = None
else:
this_play.on_first_final = on_first_final
if on_second_id is not None:
this_play.on_second_id = on_second_id
if not on_second_id:
this_play.on_second = None
else:
this_play.on_second_id = on_second_id
if on_second_final is not None:
if not on_second_final:
this_play.on_second_final = None
else:
this_play.on_second_final = on_second_final
if on_third_id is not None:
this_play.on_third_id = on_third_id
if not on_third_id:
this_play.on_third = None
else:
this_play.on_third_id = on_third_id
if on_third_final is not None:
if not on_third_final:
this_play.on_third_final = None
else:
this_play.on_third_final = on_third_final
if batter_final is not None:
if not batter_final:
this_play.batter_final = None
else:
this_play.batter_final = batter_final
if is_go_ahead is not None:
if not is_go_ahead:
this_play.is_go_ahead = 0
else:
this_play.is_go_ahead = 1
if is_new_inning is not None:
if not is_new_inning:
this_play.is_new_inning = 0
else:
this_play.is_new_inning = 1
if in_pow is not None:
if not in_pow:
this_play.in_pow = 0
else:
this_play.in_pow = 1
this_play.save()
# return_play = model_to_dict(this_play)
return_play = convert_stratplay(this_play)
db.close()
return return_play
def get_plays(game_id: int = None, pitcher_id: int = None):
all_plays = Play.select().where(Play.game_id == game_id)
if pitcher_id is not None:
all_plays = all_plays.where(Play.pitcher_id == pitcher_id)
return_plays = {"count": all_plays.count(), "plays": []}
for line in all_plays:
return_plays["plays"].append(convert_stratplay(line))
db.close()
return return_plays
def get_play_by_num(game_id, play_num: int):
latest_play = Play.get_or_none(Play.game_id == game_id, Play.play_num == play_num)
if not latest_play:
return None
# return_play = model_to_dict(latest_play)
return_play = convert_stratplay(latest_play)
db.close()
return return_play
def get_latest_play(game_id):
latest_play = (
Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
)
if not latest_play:
return None
# return_play = model_to_dict(latest_play[0])
return_play = convert_stratplay(latest_play[0])
db.close()
return return_play
def undo_play(game_id):
p_query = Play.delete().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
logger.debug(f"p_query: {p_query}")
count = p_query.execute()
db.close()
return count
def get_current_play(game_id) -> Optional[StratPlay]:
curr_play = Play.get_or_none(Play.game_id == game_id, Play.complete == False)
if not curr_play:
latest_play = (
Play.select().where(Play.game_id == game_id).order_by(-Play.id).limit(1)
)
if not latest_play:
return None
else:
complete_play(latest_play[0].id, batter_to_base=latest_play[0].batter_final)
curr_play = Play.get_or_none(
Play.game_id == game_id, Play.complete == False
)
# return_play = model_to_dict(curr_play)
return_play = convert_stratplay(curr_play)
db.close()
return return_play
def get_last_inning_end_play(game_id, inning_half, inning_num):
this_play = (
Play.select()
.where(
(Play.game_id == game_id)
& (Play.inning_half == inning_half)
& (Play.inning_num == inning_num)
)
.order_by(-Play.id)
)
# return_play = model_to_dict(this_play[0])
if this_play.count() > 0:
return_play = convert_stratplay(this_play[0])
else:
return_play = None
db.close()
return return_play
def add_run_last_player_ab(lineup: Lineup, is_erun: bool = True):
try:
last_ab = Play.select().where(Play.batter == lineup).order_by(-Play.id).get()
except DoesNotExist as e:
logger.error(f"Unable to apply run to Lineup {lineup}")
else:
last_ab.run = 1
last_ab.e_run = 1 if is_erun else 0
last_ab.save()
def get_dbready_plays(game_id: int, db_game_id: int):
all_plays = Play.select().where(Play.game_id == game_id)
prep_plays = [model_to_dict(x) for x in all_plays]
db.close()
obc_list = ["000", "001", "010", "100", "011", "101", "110", "111"]
for x in prep_plays:
x["pitcher_id"] = x["pitcher"]["player_id"]
x["batter_id"] = x["batter"]["player_id"]
x["batter_team_id"] = x["batter"]["team_id"]
x["pitcher_team_id"] = x["pitcher"]["team_id"]
if x["catcher"] is not None:
x["catcher_id"] = x["catcher"]["player_id"]
x["catcher_team_id"] = x["catcher"]["team_id"]
if x["defender"] is not None:
x["defender_id"] = x["defender"]["player_id"]
x["defender_team_id"] = x["defender"]["team_id"]
if x["runner"] is not None:
x["runner_id"] = x["runner"]["player_id"]
x["runner_team_id"] = x["runner"]["team_id"]
x["game_id"] = db_game_id
x["on_base_code"] = obc_list[x["on_base_code"]]
logger.debug(f"all_plays:\n\n{prep_plays}\n")
return prep_plays
class Bullpen(BaseModel):
ai_team_id = IntegerField()
closer_id = IntegerField()
setup_id = IntegerField()
middle_one_id = IntegerField(null=True)
middle_two_id = IntegerField(null=True)
middle_three_id = IntegerField(null=True)
long_one_id = IntegerField(null=True)
long_two_id = IntegerField(null=True)
long_three_id = IntegerField(null=True)
long_four_id = IntegerField(null=True)
last_updated = DateTimeField()
db.create_tables([Bullpen])
@dataclass
class StratBullpen:
id: int
ai_team_id: int
closer_id: int
setup_id: int
last_updated: int
middle_one_id: Optional[int] = None
middle_two_id: Optional[int] = None
middle_three_id: Optional[int] = None
long_one_id: Optional[int] = None
long_two_id: Optional[int] = None
long_three_id: Optional[int] = None
long_four_id: Optional[int] = None
def convert_bullpen_to_strat(bullpen: Bullpen) -> StratBullpen:
bullpen_dict = model_to_dict(bullpen)
return StratBullpen(**bullpen_dict)
def get_or_create_bullpen(ai_team, bot):
this_pen = Bullpen.get_or_none(Bullpen.ai_team_id == ai_team["id"])
if this_pen:
return convert_bullpen_to_strat(this_pen)
three_days_ago = (
int(
datetime.datetime.timestamp(
datetime.datetime.now() - datetime.timedelta(days=3)
)
)
* 1000
)
logger.debug(
f"3da: {three_days_ago} / last_up: {this_pen.last_updated} / L > 3: "
f"{this_pen.last_updated > three_days_ago}"
)
if this_pen and this_pen.last_updated > three_days_ago:
return convert_bullpen_to_strat(this_pen)
else:
this_pen.delete_instance()
sheets = get_sheets(bot)
this_sheet = sheets.open_by_key(ai_team["gsheet"])
r_sheet = this_sheet.worksheet_by_title("My Rosters")
bullpen_range = f"N30:N41"
raw_cells = r_sheet.range(bullpen_range)
logger.debug(f"raw_cells: {raw_cells}")
bullpen = Bullpen(
ai_team_id=ai_team["id"],
closer_id=raw_cells[0][0].value,
setup_id=raw_cells[1][0].value,
middle_one_id=raw_cells[2][0].value if raw_cells[2][0].value != "" else None,
middle_two_id=raw_cells[3][0].value if raw_cells[3][0].value != "" else None,
middle_three_id=raw_cells[4][0].value if raw_cells[4][0].value != "" else None,
long_one_id=raw_cells[5][0].value if raw_cells[5][0].value != "" else None,
long_two_id=raw_cells[6][0].value if raw_cells[6][0].value != "" else None,
long_three_id=raw_cells[7][0].value if raw_cells[7][0].value != "" else None,
long_four_id=raw_cells[8][0].value if raw_cells[8][0].value != "" else None,
last_updated=int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000),
)
bullpen.save()
logger.debug(f"bullpen: {bullpen}")
return convert_bullpen_to_strat(bullpen)
def advance_runners(
play_id: int, num_bases: int, is_error: bool = False, only_forced: bool = False
):
"""
Advances runners and tallies RBIs
"""
this_play = Play.get_by_id(play_id)
this_play.rbi = 0
if only_forced:
if not this_play.on_first:
if this_play.on_second:
this_play.on_second_final = 2
if this_play.on_third:
this_play.on_third_final = 3
this_play.save()
db.close()
return
if this_play.on_second:
if this_play.on_third:
if num_bases > 0:
this_play.on_third_final = 4
this_play.rbi += 1 if not is_error else 0
if num_bases > 1:
this_play.on_second_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 1:
this_play.on_second_final = 3
else:
this_play.on_second_final = 2
else:
if this_play.on_third:
this_play.on_third_final = 3
if num_bases > 2:
this_play.on_first_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
else:
this_play.on_first_final = 1
else:
if this_play.on_third:
if num_bases > 0:
this_play.on_third_final = 4
this_play.rbi += 1 if not is_error else 0
else:
this_play.on_third_final = 3
if this_play.on_second:
if num_bases > 1:
this_play.on_second_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 1:
this_play.on_second_final = 3
else:
this_play.on_second_final = 2
if this_play.on_first:
if num_bases > 2:
this_play.on_first_final = 4
this_play.rbi += 1 if not is_error else 0
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
else:
this_play.on_first_final = 1
if num_bases == 4:
this_play.batter_final = 4
this_play.rbi += 1
this_play.save()
db.close()
def advance_one_runner(play_id: int, from_base: int, num_bases: int):
"""
Advances runner and excludes RBIs
"""
this_play = Play.get_by_id(play_id)
if from_base == 1:
if num_bases > 2:
this_play.on_first_final = 4
elif num_bases == 2:
this_play.on_first_final = 3
elif num_bases == 1:
this_play.on_first_final = 2
elif from_base == 2:
if num_bases > 1:
this_play.on_second_final = 4
elif num_bases == 1:
this_play.on_second_final = 3
elif from_base == 3:
if num_bases > 0:
this_play.on_third_final = 4
# if this_play.on_first and this_play.on_first_final is not None:
# this_play.on_first_final = 1
# if this_play.on_second and this_play.on_second_final is not None:
# this_play.on_second_final = 2
# if this_play.on_third and this_play.on_third_final is not None:
# this_play.on_third_final = 3
this_play.save()
db.close()
def complete_play(play_id, batter_to_base: int = None):
"""
Finalizes current play and sets base values for next play
"""
this_play = Play.get_by_id(play_id)
this_play.locked = False
this_play.complete = True
this_play.save()
logger.debug(f"starting the inning calc")
new_inning_half = this_play.inning_half
new_inning_num = this_play.inning_num
if (
this_play.runner
or this_play.wild_pitch
or this_play.passed_ball
or this_play.pick_off
or this_play.balk
):
new_batting_order = this_play.batting_order
else:
new_batting_order = (
this_play.batting_order + 1 if this_play.batting_order < 9 else 1
)
new_bteam_id = this_play.batter.team_id
new_pteam_id = this_play.pitcher.team_id
new_starting_outs = this_play.starting_outs + this_play.outs
new_on_first = None
new_on_second = None
new_on_third = None
# score_increment = this_play.homerun
if batter_to_base is not None:
this_play.batter_final = batter_to_base
logger.debug(f"complete_play - this_play: {this_play}")
if this_play.batter_final == 4:
this_play.run = 1
score_increment = 1
if not this_play.error:
this_play.e_run = 1
else:
score_increment = 0
logger.debug(f"complete_play - score_increment: {score_increment}")
if this_play.on_first_final == 99:
this_play.on_first_final = None
elif this_play.on_first_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_first, this_play.error == 0)
if this_play.on_second_final == 99:
this_play.on_second_final = None
elif this_play.on_second_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_second, this_play.error == 0)
if this_play.on_third_final == 99:
this_play.on_third_final = None
elif this_play.on_third_final == 4:
score_increment += 1
add_run_last_player_ab(this_play.on_third, this_play.error == 0)
this_play.save()
# for runner in [this_play.on_first_final, this_play.on_second_final, this_play.on_third_final]:
# if runner == 4:
# score_increment += 1
if this_play.starting_outs + this_play.outs > 2:
new_starting_outs = 0
new_obc = 0
if this_play.inning_half == "Top":
new_inning_half = "Bot"
new_bteam_id = this_play.game.home_team_id
new_pteam_id = this_play.game.away_team_id
else:
new_inning_half = "Top"
new_inning_num += 1
new_bteam_id = this_play.game.away_team_id
new_pteam_id = this_play.game.home_team_id
if new_inning_num > 1:
last_inning_play = get_last_inning_end_play(
this_play.game.id, new_inning_half, new_inning_num - 1
)
if last_inning_play.runner or last_inning_play.pick_off:
new_batting_order = last_inning_play.batting_order
else:
new_batting_order = (
last_inning_play.batting_order + 1
if last_inning_play.batting_order < 9
else 1
)
else:
new_batting_order = 1
# Not an inning-ending play
else:
logger.debug(f"starting the obc calc")
bases_occ = [False, False, False, False]
# Set the occupied bases for the next play and lineup member occupying it
for runner, base in [
(this_play.on_first, this_play.on_first_final),
(this_play.on_second, this_play.on_second_final),
(this_play.on_third, this_play.on_third_final),
]:
if base:
bases_occ[base - 1] = True
if base == 1:
new_on_first = runner
elif base == 2:
new_on_second = runner
elif base == 3:
new_on_third = runner
# elif base == 4:
# score_increment += 1
# Set the batter-runner occupied base for the next play
if batter_to_base:
if batter_to_base == 1:
bases_occ[0] = True
new_on_first = this_play.batter
elif batter_to_base == 2:
bases_occ[1] = True
new_on_second = this_play.batter
elif batter_to_base == 3:
bases_occ[2] = True
new_on_third = this_play.batter
# elif batter_to_base == 4:
# score_increment += 1
this_play.batter_final = batter_to_base
this_play.save()
# Set the OBC based on the bases occupied
if bases_occ[2]:
if bases_occ[1]:
if bases_occ[0]:
new_obc = 7
else:
new_obc = 6
elif bases_occ[0]:
new_obc = 5
else:
new_obc = 3
elif bases_occ[1]:
if bases_occ[0]:
new_obc = 4
else:
new_obc = 2
elif bases_occ[0]:
new_obc = 1
else:
new_obc = 0
if this_play.inning_half == "Top":
new_away_score = this_play.away_score + score_increment
new_home_score = this_play.home_score
else:
new_away_score = this_play.away_score
new_home_score = this_play.home_score + score_increment
# A team score
if score_increment:
logger.debug(
f"complete_play: \n\nscore_increment: {score_increment}\n\nnew home score: {new_home_score}\n\n"
f"new_away_score: {new_away_score}\n\nthis_play.away_score: {this_play.away_score}\n\n"
f"this_player.home_score: {this_play.home_score}"
)
# Game is now tied
if new_home_score == new_away_score:
logger.debug(f"\n\nGame {this_play.game} is now tied\n\n")
this_play.is_tied = 1
# One team took the lead
elif (this_play.away_score <= this_play.home_score) and (
new_away_score > new_home_score
):
logger.debug(f"\n\nTeam {this_play.batter.team_id} took the lead\n\n")
this_play.is_go_ahead = 1
this_play.save()
elif (this_play.home_score <= this_play.away_score) and (
new_home_score > new_away_score
):
logger.debug(f"\n\nteam {this_play.batter.team_id} took the lead\n\n")
this_play.is_go_ahead = 1
this_play.save()
# RE24 Calc
re_data = {
0: [0.457, 0.231, 0.077],
1: [0.793, 0.438, 0.171],
2: [1.064, 0.596, 0.259],
4: [1.373, 0.772, 0.351],
3: [1.340, 0.874, 0.287],
5: [1.687, 1.042, 0.406],
6: [1.973, 1.311, 0.448],
7: [2.295, 1.440, 0.618],
}
start_re24 = re_data[this_play.on_base_code][this_play.starting_outs]
end_re24 = (
0
if this_play.starting_outs + this_play.outs == 3
else re_data[new_obc][new_starting_outs]
)
this_play.re24 = end_re24 - start_re24 + score_increment
this_play.save()
batter = get_one_lineup(
this_play.game.id, team_id=new_bteam_id, batting_order=new_batting_order
)
batter_id = batter.id if batter else None
pitcher = get_one_lineup(this_play.game_id, team_id=new_pteam_id, position="P")
pitcher_id = pitcher.id if pitcher else None
logger.debug(f"done the obc calc")
next_play = Play.create(
**{
"game_id": this_play.game.id,
"play_num": this_play.play_num + 1,
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"on_base_code": new_obc,
"inning_half": new_inning_half,
"inning_num": new_inning_num,
"next_inning_num": new_inning_num,
"batting_order": new_batting_order,
"starting_outs": new_starting_outs,
"away_score": new_away_score,
"home_score": new_home_score,
"on_first": new_on_first,
"on_second": new_on_second,
"on_third": new_on_third,
"is_new_inning": 1 if new_inning_half != this_play.inning_half else 0,
}
)
# return_play = model_to_dict(next_play)
return_play = convert_stratplay(next_play)
db.close()
return return_play
def get_batting_stats(game_id, lineup_id: int = None, team_id: int = None):
batting_stats = (
Play.select(
Play.batter,
fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias("pl_pa"),
fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias("pl_ab"),
fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias("pl_hit"),
fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias("pl_rbi"),
# fn.COUNT(Play.on_first_final).filter(
# Play.on_first_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_first'),
# fn.COUNT(Play.on_second_final).filter(
# Play.on_second_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_second'),
# fn.COUNT(Play.on_third_final).filter(
# Play.on_third_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_third'),
# fn.COUNT(Play.batter_final).filter(
# Play.batter_final == 4).over(partition_by=[Play.batter_id]).alias('pl_run_batter'),
fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias("pl_double"),
fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias("pl_triple"),
fn.SUM(Play.homerun)
.over(partition_by=[Play.batter_id])
.alias("pl_homerun"),
fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias("pl_bb"),
fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias("pl_so"),
fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias("pl_hbp"),
fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias("pl_sac"),
fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias("pl_ibb"),
fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias("pl_gidp"),
fn.SUM(Play.sb).over(partition_by=[Play.runner_id]).alias("pl_sb"),
fn.SUM(Play.cs).over(partition_by=[Play.runner_id]).alias("pl_cs"),
fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias("pl_bphr"),
fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias("pl_bpfo"),
fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias("pl_bp1b"),
fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias("pl_bplo"),
fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias("tm_pa"),
fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias("tm_ab"),
fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias("tm_hit"),
fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias("tm_rbi"),
fn.SUM(Play.double)
.over(partition_by=[Play.batter.team_id])
.alias("tm_double"),
fn.SUM(Play.triple)
.over(partition_by=[Play.batter.team_id])
.alias("tm_triple"),
fn.SUM(Play.homerun)
.over(partition_by=[Play.batter.team_id])
.alias("tm_homerun"),
fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias("tm_bb"),
fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias("tm_so"),
fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias("tm_hbp"),
fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias("tm_sac"),
fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias("tm_ibb"),
fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias("tm_gidp"),
fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias("tm_sb"),
fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias("tm_cs"),
fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias("tm_bphr"),
fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias("tm_bpfo"),
fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias("tm_bp1b"),
fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias("tm_bplo"),
)
.join(Lineup, on=Play.batter)
.where(Play.game_id == game_id)
)
if lineup_id is not None:
batting_stats = batting_stats.where(Play.batter_id == lineup_id)
elif team_id is not None:
batting_stats = batting_stats.where(Play.batter.team_id == team_id)
done_batters = []
return_batters = []
for x in batting_stats:
if x.batter.id not in done_batters:
runs_scored = (
Play.select(Play.pa)
.where(
((Play.on_first == x.batter) & (Play.on_first_final == 4))
| ((Play.on_second == x.batter) & (Play.on_second_final == 4))
| ((Play.on_third == x.batter) & (Play.on_third_final == 4))
| ((Play.batter == x.batter) & (Play.batter_final == 4))
)
.count()
)
stolen_bases = (
Play.select(Play.pa)
.where((Play.runner == x.batter) & (Play.sb == 1))
.count()
)
return_batters.append(
{
"batter_id": x.batter_id,
"card_id": x.batter.card_id,
"team_id": x.batter.team_id,
"pos": x.batter.position,
"pl_run": runs_scored,
"pl_pa": x.pl_pa,
"pl_ab": x.pl_ab,
"pl_hit": x.pl_hit,
"pl_rbi": x.pl_rbi,
"pl_double": x.pl_double,
"pl_triple": x.pl_triple,
"pl_homerun": x.pl_homerun,
"pl_bb": x.pl_bb,
"pl_so": x.pl_so,
"pl_hbp": x.pl_hbp,
"pl_sac": x.pl_sac,
"pl_ibb": x.pl_ibb,
"pl_gidp": x.pl_gidp,
"pl_sb": stolen_bases,
"pl_cs": x.pl_cs,
"pl_bphr": x.pl_bphr,
"pl_bpfo": x.pl_bpfo,
"pl_bp1b": x.pl_bp1b,
"pl_bplo": x.pl_bplo,
"tm_pa": x.tm_pa,
"tm_ab": x.tm_ab,
"tm_hit": x.tm_hit,
"tm_rbi": x.tm_rbi,
"tm_double": x.tm_double,
"tm_triple": x.tm_triple,
"tm_homerun": x.tm_homerun,
"tm_bb": x.tm_bb,
"tm_so": x.tm_so,
"tm_hbp": x.tm_hbp,
"tm_sac": x.tm_sac,
"tm_ibb": x.tm_ibb,
"tm_gidp": x.tm_gidp,
"tm_sb": x.tm_sb,
"tm_cs": x.tm_cs,
"tm_bphr": x.tm_bphr,
"tm_bpfo": x.tm_bpfo,
"tm_bp1b": x.tm_bp1b,
"tm_bplo": x.tm_bplo,
}
)
done_batters.append(x.batter.id)
db.close()
return return_batters
def get_fielding_stats(game_id, lineup_id: int = None, team_id: int = None):
fielding_stats = (
Play.select(
Play.defender,
Play.check_pos,
fn.SUM(Play.error).over(partition_by=[Play.defender_id]).alias("pl_error"),
fn.SUM(Play.hit).over(partition_by=[Play.defender_id]).alias("pl_xhit"),
fn.COUNT(Play.defender)
.over(partition_by=[Play.defender_id])
.alias("pl_xch"),
fn.SUM(Play.error)
.over(partition_by=[Play.defender.team_id])
.alias("tm_error"),
)
.join(Lineup, on=Play.defender)
.where(Play.game_id == game_id)
)
if lineup_id is not None:
fielding_stats = fielding_stats.where(Play.defender_id == lineup_id)
elif team_id is not None:
fielding_stats = fielding_stats.where(Play.defender.team_id == team_id)
added_card_ids = []
all_stats = []
for x in fielding_stats:
if x.defender.card_id not in added_card_ids:
added_card_ids.append(x.defender.card_id)
all_stats.append(
{
"defender_id": x.defender_id,
"card_id": x.defender.card_id,
"team_id": x.defender.team_id,
"pos": x.check_pos,
"pl_error": x.pl_error,
"pl_xhit": x.pl_xhit,
"pl_xch": x.pl_xch,
"tm_error": x.pl_error,
"pl_pb": 0,
"pl_sbc": 0,
"pl_csc": 0,
}
)
catching_stats = (
Play.select(
Play.catcher,
fn.SUM(Play.passed_ball)
.over(partition_by=[Play.catcher_id])
.alias("pl_pb"),
fn.SUM(Play.sb).over(partition_by=[Play.catcher_id]).alias("pl_sbc"),
fn.SUM(Play.cs).over(partition_by=[Play.catcher_id]).alias("pl_csc"),
)
.join(Lineup, on=Play.catcher)
.where(Play.game_id == game_id)
)
if lineup_id is not None:
catching_stats = catching_stats.where(Play.defender_id == lineup_id)
elif team_id is not None:
catching_stats = catching_stats.where(Play.defender.team_id == team_id)
for x in catching_stats:
all_stats.append(
{
"defender_id": x.catcher_id,
"card_id": x.catcher.card_id,
"team_id": x.catcher.team_id,
"pos": "C",
"pl_error": 0,
"pl_xhit": 0,
"pl_xch": 0,
"tm_error": 0,
"pl_pb": x.pl_pb,
"pl_sbc": x.pl_sbc,
"pl_csc": x.pl_csc,
}
)
logger.debug(f"fielding_stats: {all_stats}")
db.close()
return all_stats
# def new_get_batting_stats(game_id, lineup_id: int = None, team_id: int = None):
# return_stats = []
#
# if lineup_id is not None:
# batting_stats = Play.select(
# Play.batter,
# fn.SUM(Play.pa).over(partition_by=[Play.batter_id]).alias('pl_pa'),
# fn.SUM(Play.ab).over(partition_by=[Play.batter_id]).alias('pl_ab'),
# fn.SUM(Play.hit).over(partition_by=[Play.batter_id]).alias('pl_hit'),
# fn.SUM(Play.rbi).over(partition_by=[Play.batter_id]).alias('pl_rbi'),
# fn.SUM(Play.double).over(partition_by=[Play.batter_id]).alias('pl_double'),
# fn.SUM(Play.triple).over(partition_by=[Play.batter_id]).alias('pl_triple'),
# fn.SUM(Play.homerun).over(partition_by=[Play.batter_id]).alias('pl_homerun'),
# fn.SUM(Play.bb).over(partition_by=[Play.batter_id]).alias('pl_bb'),
# fn.SUM(Play.so).over(partition_by=[Play.batter_id]).alias('pl_so'),
# fn.SUM(Play.hbp).over(partition_by=[Play.batter_id]).alias('pl_hbp'),
# fn.SUM(Play.sac).over(partition_by=[Play.batter_id]).alias('pl_sac'),
# fn.SUM(Play.ibb).over(partition_by=[Play.batter_id]).alias('pl_ibb'),
# fn.SUM(Play.gidp).over(partition_by=[Play.batter_id]).alias('pl_gidp'),
# fn.SUM(Play.sb).over(partition_by=[Play.batter_id]).alias('pl_sb'),
# fn.SUM(Play.cs).over(partition_by=[Play.batter_id]).alias('pl_cs'),
# fn.SUM(Play.bphr).over(partition_by=[Play.batter_id]).alias('pl_bphr'),
# fn.SUM(Play.bpfo).over(partition_by=[Play.batter_id]).alias('pl_bpfo'),
# fn.SUM(Play.bp1b).over(partition_by=[Play.batter_id]).alias('pl_bp1b'),
# fn.SUM(Play.bplo).over(partition_by=[Play.batter_id]).alias('pl_bplo')
# ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter_id == lineup_id))
#
# done_batters = []
# for x in batting_stats:
# if x.batter.id not in done_batters:
# return_stats.append({
# 'batter_id': x.batter_id,
# 'pl_pa': x.pl_pa,
# 'pl_ab': x.pl_ab,
# 'pl_hit': x.pl_hit,
# 'pl_rbi': x.pl_rbi,
# 'pl_double': x.pl_double,
# 'pl_triple': x.pl_triple,
# 'pl_homerun': x.pl_homerun,
# 'pl_bb': x.pl_bb,
# 'pl_so': x.pl_so,
# 'pl_hbp': x.pl_hbp,
# 'pl_sac': x.pl_sac,
# 'pl_ibb': x.pl_ibb,
# 'pl_gidp': x.pl_gidp,
# 'pl_sb': x.pl_sb,
# 'pl_cs': x.pl_cs,
# 'pl_bphr': x.pl_bphr,
# 'pl_bpfo': x.pl_bpfo,
# 'pl_bp1b': x.pl_bp1b,
# 'pl_bplo': x.pl_bplo,
# })
# done_batters.append(x.batter.id)
# logger.info(f'batting stats: {return_stats}')
#
# elif team_id is not None:
# batting_stats = Play.select(
# fn.SUM(Play.pa).over(partition_by=[Play.batter.team_id]).alias('tm_pa'),
# fn.SUM(Play.ab).over(partition_by=[Play.batter.team_id]).alias('tm_ab'),
# fn.SUM(Play.hit).over(partition_by=[Play.batter.team_id]).alias('tm_hit'),
# fn.SUM(Play.rbi).over(partition_by=[Play.batter.team_id]).alias('tm_rbi'),
# fn.SUM(Play.double).over(partition_by=[Play.batter.team_id]).alias('tm_double'),
# fn.SUM(Play.triple).over(partition_by=[Play.batter.team_id]).alias('tm_triple'),
# fn.SUM(Play.homerun).over(partition_by=[Play.batter.team_id]).alias('tm_homerun'),
# fn.SUM(Play.bb).over(partition_by=[Play.batter.team_id]).alias('tm_bb'),
# fn.SUM(Play.so).over(partition_by=[Play.batter.team_id]).alias('tm_so'),
# fn.SUM(Play.hbp).over(partition_by=[Play.batter.team_id]).alias('tm_hbp'),
# fn.SUM(Play.sac).over(partition_by=[Play.batter.team_id]).alias('tm_sac'),
# fn.SUM(Play.ibb).over(partition_by=[Play.batter.team_id]).alias('tm_ibb'),
# fn.SUM(Play.gidp).over(partition_by=[Play.batter.team_id]).alias('tm_gidp'),
# fn.SUM(Play.sb).over(partition_by=[Play.batter.team_id]).alias('tm_sb'),
# fn.SUM(Play.cs).over(partition_by=[Play.batter.team_id]).alias('tm_ccs'),
# fn.SUM(Play.bphr).over(partition_by=[Play.batter.team_id]).alias('tm_bphr'),
# fn.SUM(Play.bpfo).over(partition_by=[Play.batter.team_id]).alias('tm_bpfo'),
# fn.SUM(Play.bp1b).over(partition_by=[Play.batter.team_id]).alias('tm_bp1b'),
# fn.SUM(Play.bplo).over(partition_by=[Play.batter.team_id]).alias('tm_bplo')
# ).join(Lineup, on=Play.batter).where((Play.game_id == game_id) & (Play.batter.team_id == team_id))
#
# logger.info(f'batting_stats count: {batting_stats.count()}')
# done_batters = []
# for x in batting_stats:
# if x.batter.id not in done_batters:
# return_stats.append({
# 'tm_pa': x.tm_pa,
# 'tm_ab': x.tm_ab,
# 'tm_hit': x.tm_hit,
# 'tm_rbi': x.tm_rbi,
# 'tm_double': x.tm_double,
# 'tm_triple': x.tm_triple,
# 'tm_homerun': x.tm_homerun,
# 'tm_bb': x.tm_bb,
# 'tm_so': x.tm_so,
# 'tm_hbp': x.tm_hbp,
# 'tm_sac': x.tm_sac,
# 'tm_ibb': x.tm_ibb,
# 'tm_gidp': x.tm_gidp,
# 'tm_sb': x.tm_sb,
# 'tm_cs': x.tm_cs,
# 'tm_bphr': x.tm_bphr,
# 'tm_bpfo': x.tm_bpfo,
# 'tm_bp1b': x.tm_bp1b,
# 'tm_bplo': x.tm_bplo,
# })
# done_batters.append(x.batter.id)
#
# else:
# raise DatabaseError(f'Either lineup_id or team_id must be specified.')
#
# db.close()
# return return_stats
# def get_final_batting_stats(game_id: int, team_id: int):
# team_lineups = Lineup.select().where(
# (Lineup.game_id == game_id) & (Lineup.team_id == team_id) & (Lineup.batting_order < 10)
# ).order_by(Lineup.batting_order)
# logger.info(f'team_lineups count: {team_lineups.count()} / lineups: {team_lineups}')
#
# all_stats = []
# for line in team_lineups:
# all_stats.append(get_batting_stats(game_id, lineup_id=line.id))
#
# return all_stats
def get_pitching_stats(
game_id,
lineup_id: int = None,
team_id: int = None,
in_pow: bool = None,
in_innings: list = None,
):
if in_innings is None:
in_innings = [x for x in range(1, 30)]
logger.info(f"db_calls_gameplay - get_pitching_stats - in_innings: {in_innings}")
pitching_stats = (
Play.select(
Play.pitcher,
fn.SUM(Play.outs).over(partition_by=[Play.pitcher_id]).alias("pl_outs"),
fn.SUM(Play.hit).over(partition_by=[Play.pitcher_id]).alias("pl_hit"),
fn.COUNT(Play.on_first_final)
.filter(Play.on_first_final == 4)
.over(partition_by=[Play.pitcher_id])
.alias("pl_run_first"),
fn.COUNT(Play.on_second_final)
.filter(Play.on_second_final == 4)
.over(partition_by=[Play.pitcher_id])
.alias("pl_run_second"),
fn.COUNT(Play.on_third_final)
.filter(Play.on_third_final == 4)
.over(partition_by=[Play.pitcher_id])
.alias("pl_run_third"),
fn.COUNT(Play.batter_final)
.filter(Play.batter_final == 4)
.over(partition_by=[Play.pitcher_id])
.alias("pl_run_batter"),
fn.COUNT(Play.on_first_final)
.filter((Play.on_first_final == 4) & (Play.inning_num << in_innings))
.over(partition_by=[Play.pitcher_id])
.alias("pl_in_run_first"),
fn.COUNT(Play.on_second_final)
.filter((Play.on_second_final == 4) & (Play.inning_num << in_innings))
.over(partition_by=[Play.pitcher_id])
.alias("pl_in_run_second"),
fn.COUNT(Play.on_third_final)
.filter((Play.on_third_final == 4) & (Play.inning_num << in_innings))
.over(partition_by=[Play.pitcher_id])
.alias("pl_in_run_third"),
fn.COUNT(Play.batter_final)
.filter((Play.batter_final == 4) & (Play.inning_num << in_innings))
.over(partition_by=[Play.pitcher_id])
.alias("pl_in_run_batter"),
fn.SUM(Play.so).over(partition_by=[Play.pitcher_id]).alias("pl_so"),
fn.SUM(Play.bb).over(partition_by=[Play.pitcher_id]).alias("pl_bb"),
fn.SUM(Play.hbp).over(partition_by=[Play.pitcher_id]).alias("pl_hbp"),
fn.SUM(Play.wild_pitch)
.over(partition_by=[Play.pitcher_id])
.alias("pl_wild_pitch"),
fn.SUM(Play.balk).over(partition_by=[Play.pitcher_id]).alias("pl_balk"),
fn.SUM(Play.homerun)
.over(partition_by=[Play.pitcher_id])
.alias("pl_homerun"),
fn.SUM(Play.outs)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_outs"),
fn.SUM(Play.hit).over(partition_by=[Play.pitcher.team_id]).alias("tm_hit"),
fn.COUNT(Play.on_first_final)
.filter(Play.on_first_final == 4)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_run_first"),
fn.COUNT(Play.on_second_final)
.filter(Play.on_second_final == 4)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_run_second"),
fn.COUNT(Play.on_third_final)
.filter(Play.on_third_final == 4)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_run_third"),
fn.COUNT(Play.batter_final)
.filter(Play.batter_final == 4)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_run_batter"),
fn.SUM(Play.so).over(partition_by=[Play.pitcher.team_id]).alias("tm_so"),
fn.SUM(Play.bb).over(partition_by=[Play.pitcher.team_id]).alias("tm_bb"),
fn.SUM(Play.hbp).over(partition_by=[Play.pitcher.team_id]).alias("tm_hbp"),
fn.SUM(Play.wild_pitch)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_wild_pitch"),
fn.SUM(Play.balk)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_balk"),
fn.SUM(Play.homerun)
.over(partition_by=[Play.pitcher.team_id])
.alias("tm_homerun"),
)
.join(Lineup, on=Play.pitcher)
.where(Play.game_id == game_id)
)
logger.debug(
f"db_calls_gameplay - get_pitching_stats - pitching_stats: {pitching_stats}"
)
# This is counging plays with multiple runs scored on 1 ER and the rest unearned
# earned_runs_pl = Play.select().where(
# ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) |
# (Play.batter_final == 4)) & (Play.error == 0)
# ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
# logger.info(f'earned_runs: {earned_runs_pl}')
er_first = (
Play.select()
.where((Play.on_first_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
)
er_second = (
Play.select()
.where((Play.on_second_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
)
er_third = (
Play.select()
.where((Play.on_third_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
)
er_batter = (
Play.select()
.where((Play.batter_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where((Play.game_id == game_id) & (Play.pitcher_id == lineup_id))
)
# earned_runs_tm = Play.select().where(
# ((Play.on_first_final == 4) | (Play.on_second_final == 4) | (Play.on_third_final == 4) |
# (Play.batter_final == 4)) & (Play.error == 0)
# ).join(Lineup, on=Play.pitcher).where(Play.game_id == game_id)
if lineup_id is not None:
pitching_stats = pitching_stats.where(Play.pitcher_id == lineup_id)
if in_pow is not None:
pitching_stats = pitching_stats.where(Play.in_pow == in_pow)
# if in_innings is not None:
# pitching_stats = pitching_stats.where(Play.inning_num << in_innings)
# logger.info(f'db_calls_gameplay - get_pitching_stats - in_innings: {in_innings} / query: {pitching_stats} / '
# f'stats: {pitching_stats.count()}')
tm_earned_runs = None
if team_id is not None:
tm_er_first = (
Play.select()
.where((Play.on_first_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where(Play.game_id == game_id)
)
tm_er_second = (
Play.select()
.where((Play.on_second_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where(Play.game_id == game_id)
)
tm_er_third = (
Play.select()
.where((Play.on_third_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where(Play.game_id == game_id)
)
tm_er_batter = (
Play.select()
.where((Play.batter_final == 4) & (Play.error == 0))
.join(Lineup, on=Play.pitcher)
.where(Play.game_id == game_id)
)
# er_first = er_first.where(Play.pitcher.team_id == team_id)
# er_second = er_second.where(Play.pitcher.team_id == team_id)
# er_third = er_third.where(Play.pitcher.team_id == team_id)
# er_batter = er_batter.where(Play.pitcher.team_id == team_id)
pitching_stats = pitching_stats.where(Play.pitcher.team_id == team_id)
tm_er_first = tm_er_first.where(Play.pitcher.team_id == team_id)
tm_er_second = tm_er_second.where(Play.pitcher.team_id == team_id)
tm_er_third = tm_er_third.where(Play.pitcher.team_id == team_id)
tm_er_batter = tm_er_batter.where(Play.pitcher.team_id == team_id)
tm_earned_runs = (
tm_er_first.count()
+ tm_er_second.count()
+ tm_er_third.count()
+ tm_er_batter.count()
)
pl_earned_runs = (
er_first.count() + er_second.count() + er_third.count() + er_batter.count()
)
done_pitchers = []
return_pitchers = []
for x in pitching_stats:
if x.pitcher.id not in done_pitchers:
return_pitchers.append(
{
"pitcher_id": x.pitcher_id,
"card_id": x.pitcher.card_id,
"team_id": x.pitcher.team_id,
"pl_outs": x.pl_outs,
"pl_hit": x.pl_hit,
"pl_eruns": pl_earned_runs,
"pl_runs": x.pl_run_first
+ x.pl_run_second
+ x.pl_run_third
+ x.pl_run_batter,
"pl_in_runs": x.pl_in_run_first
+ x.pl_in_run_second
+ x.pl_in_run_third
+ x.pl_in_run_batter,
"pl_so": x.pl_so,
"pl_bb": x.pl_bb,
"pl_hbp": x.pl_hbp,
"pl_homerun": x.pl_homerun,
"pl_wild_pitch": x.pl_wild_pitch,
"pl_balk": x.pl_balk,
"tm_outs": x.tm_outs,
"tm_hit": x.tm_hit,
"tm_eruns": tm_earned_runs,
"tm_runs": x.tm_run_first
+ x.tm_run_second
+ x.tm_run_third
+ x.tm_run_batter,
"tm_so": x.tm_so,
"tm_bb": x.tm_bb,
"tm_hbp": x.tm_hbp,
"tm_homerun": x.tm_homerun,
"tm_wild_pitch": x.tm_wild_pitch,
"tm_balk": x.tm_balk,
"pl_gs": 1 if x.pitcher.after_play == 0 else 0,
}
)
done_pitchers.append(x.pitcher_id)
db.close()
logger.debug(f"pitching stats: {return_pitchers}")
return return_pitchers
def get_pitching_decisions(game: StratGame, db_game_id: int):
# is_win, is_hold, is_loss = False, False, False
# away_lineups = get_team_lineups(game.id, game.away_team_id)
# home_lineups = get_team_lineups(game.id, game.home_team_id)
# away_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)]
# home_pitchers = [] # [(pitcher, first_play), (pitcher2, their_first_play)]
# last_play = get_current_play(game.id)
logger.debug(f"this game: {game}")
winner = None
loser = None
save = None
gs = []
b_save = []
holds = []
# Get starting pitchers and update this as a pointer for the play crawl
away_pitcher = Lineup.get(
Lineup.game_id == game.id,
Lineup.team_id == game.away_team_id,
Lineup.position == "P",
)
home_pitcher = Lineup.get(
Lineup.game_id == game.id,
Lineup.team_id == game.home_team_id,
Lineup.position == "P",
)
gs.extend([away_pitcher.card_id, home_pitcher.card_id])
logger.debug(f"SPs: {away_pitcher} / {home_pitcher}")
decisions = {
away_pitcher.player_id: DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=away_pitcher.player_id,
pitcher_team_id=away_pitcher.team_id,
is_start=True,
),
home_pitcher.player_id: DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=home_pitcher.player_id,
pitcher_team_id=home_pitcher.team_id,
is_start=True,
),
} # { <player_id>: DecisionModel }
for x in Play.select().where(Play.game_id == game.id):
logger.debug(f"checking play num {x.play_num}")
if x.inning_half == "Top" and home_pitcher != x.pitcher:
if save == home_pitcher:
if x.home_score > x.away_score:
holds.append(save)
else:
b_save.append(save)
save = None
home_pitcher = x.pitcher
if x.home_score > x.away_score and x.home_score - x.away_score <= 3:
save = home_pitcher
elif x.inning_half == "Bot" and away_pitcher != x.pitcher:
if save == away_pitcher:
if x.away_score > x.home_score:
holds.append(save)
else:
b_save.append(save)
save = None
away_pitcher = x.pitcher
if x.away_score > x.home_score and x.away_score - x.home_score <= 3:
save = away_pitcher
if x.is_go_ahead:
logger.debug(f"is go ahead: {x}")
if x.on_third_final == 4:
# winning_run = x.on_third
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_third
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == "Bot" else away_pitcher
elif x.on_second_final == 4:
# winning_run = x.on_second
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_second
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == "Bot" else away_pitcher
elif x.on_first_final == 4:
# winning_run = x.on_first
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.on_first
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == "Bot" else away_pitcher
elif x.batter_final == 4:
# winning_run = x.batter
#
# # Get winning run's last PA
# last_pa = Play.select().where(
# Play.game_id == game.id, Play.batter == x.batter
# ).order_by(-Play.id).limit(1)
# loser = Lineup.get_by_id(last_pa[0].pitcher_id) # pitcher from winning_run's last PA
loser = x.pitcher
if save == loser:
b_save.append(save)
save = None
winner = home_pitcher if x.inning_half == "Bot" else away_pitcher
if x.is_tied:
logger.debug(f"is tied: {x}")
winner, loser = None, None
if save:
b_save.append(save)
save = None
if home_pitcher.player_id not in decisions:
decisions[home_pitcher.player_id] = DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=home_pitcher.player_id,
pitcher_team_id=home_pitcher.team_id,
)
if away_pitcher.player_id not in decisions:
decisions[away_pitcher.player_id] = DecisionModel(
game_id=db_game_id,
season=game.season,
week=game.week_num,
pitcher_id=away_pitcher.player_id,
pitcher_team_id=away_pitcher.team_id,
)
decisions[winner.player_id].win = 1
decisions[loser.player_id].loss = 1
if save is not None:
decisions[save.player_id].is_save = 1
for x in holds:
decisions[x.player_id].hold = 1
for x in b_save:
decisions[x.player_id].b_save = 1
return [x.dict() for x in decisions.values()]
logger.debug(
f"\n\nWin: {winner}\nLose: {loser}\nSave: {save}\nBlown Save: {b_save}\nHolds: {holds}"
)
return {
"winner": winner.card_id,
"loser": loser.card_id,
"save": save.card_id if save else None,
"b_save": b_save,
"holds": holds,
"starters": gs,
"w_lineup": winner,
"l_lineup": loser,
"s_lineup": save,
}
# for count, pit_pair in enumerate(away_pitchers):
# """
# If starter & team won
# check for win condition
# if starter & team lost
# check for loss condition
# if reliever & team won
# check for save situation
# if save situation, check for blown save
# credit either BS or Save; if Save is not None, add Save to holds and replace with this pitcher
# if reliever & team lost
# check for loss condition
# """
# # If starter & team won
# if pit_pair[0].after_play == 0 and (last_play.away_score > last_play.home_score):
# if len(away_pitchers) == 1:
# winner = pit_pair[0]
# else:
# next_p_first = away_pitchers[count + 1][1]
# if next_p_first.away_score > next_p_first.home_score:
# winner = pit_pair[0]
# else:
# winner = away_pitchers[count + 1][0]
# # if starter & team lost
# elif pit_pair[0].after_play == 0:
# if len(away_pitchers) == 1:
# loser = pit_pair[0]
# else:
# next_p_first = away_pitchers[count + 1][1]
# if next_p_first.away_score < next_p_first.home_score:
# def get_final_scorebug(away_team, home_team, away_score, home_score):
# return f'```' \
# f'Team | R | H | E |\n' \
# f'{away_team["abbrev"].replace("Gauntlet-", ""): <4} | {away_stats["score"]: >2} | ' \
# f'{away_stats["hits"]: >2} | ' \
# f'{away_stats["f_lines"][0]["tm_error"] if away_stats["f_lines"] else 0: >2} |\n' \
# f'{home_team["abbrev"].replace("Gauntlet-", ""): <4} | {home_stats["score"]: >2} | ' \
# f'{home_stats["hits"]: >2} | ' \
# f'{home_stats["f_lines"][0]["tm_error"] if home_stats["f_lines"] else 0: >2} |\n' \
# f'```'
class StratManagerAi(pydantic.BaseModel):
id: int
name: str
steal: int = 5
running: int = 5
hold: int = 5
catcher_throw: int = 5
uncapped_home: int = 5
uncapped_third: int = 5
uncapped_trail: int = 5
bullpen_matchup: int = 5
behind_aggression: int = 5
ahead_aggression: int = 5
decide_throw: int = 5
"""
Rating Rule of Thumb:
1: Least Aggressive
5: Average
10: Most Aggressive
"""
# def __init__(self, **data) -> None:
# super().__init__(**data)
#
# seed = random.randint(1, 100)
# if seed > 95:
# self.steal = 10
# self.running = 10
# self.uncapped_third = 10
# self.uncapped_home = 10
# self.uncapped_trail = 10
# self.behind_aggression = 10
# self.ahead_aggression = 10
# elif seed > 80:
# self.steal = 8
# self.running = 8
# self.uncapped_third = 8
# self.uncapped_home = 8
# self.uncapped_trail = 8
# self.behind_aggression = 8
# self.ahead_aggression = 5
# elif seed <= 40:
# self.steal = 3
# self.running = 3
# self.uncapped_third = 3
# self.uncapped_home = 3
# self.uncapped_trail = 3
# self.behind_aggression = 5
# self.ahead_aggression = 3
# elif seed <= 15:
# self.steal = 1
# self.running = 1
# self.uncapped_third = 1
# self.uncapped_home = 1
# self.uncapped_trail = 1
# self.behind_aggression = 3
# self.ahead_aggression = 1
def check_jump(self, to_base: int, outs: int) -> Optional[str]:
"""Returns a string to be appended to the AI note"""
steal_base = f"attempt to steal"
if to_base == 2 or to_base == 3:
if self.steal == 10:
if to_base == 2:
return f"{steal_base} second if the runner has an ***** auto-jump or the safe range is 13+"
else:
steal_range = 13
elif self.steal >= 8:
steal_range = 14
elif self.steal >= 5:
steal_range = 15
elif self.steal >= 3:
steal_range = 16
else:
steal_range = 17
if outs == 2:
steal_range += 1
elif outs == 0:
steal_range -= 1
return f"{steal_base} {'second' if to_base == 2 else 'third'} if their safe range is {steal_range}+"
else:
return None
def tag_from_second(self, outs: int) -> str:
"""Returns a string to be posted ahead of tag up message"""
tag_base = f"attempt to tag up if their safe range is"
if self.running >= 8:
tag_range = 5
elif self.running >= 5:
tag_range = 10
else:
tag_range = 12
if outs == 2:
tag_range += 3
elif outs == 0:
tag_range -= 2
return f"{tag_base} {tag_range}+"
def tag_from_third(self, outs: int) -> str:
"""Returns a string to be posted with the tag up message"""
tag_base = f"attempt to tag up if their safe range is"
if self.running >= 8:
tag_range = 8
elif self.running >= 5:
tag_range = 10
else:
tag_range = 12
if outs == 2:
tag_range -= 2
elif outs == 0:
tag_range += 2
return f"{tag_base} {tag_range}+"
def uncapped_advance(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the advancement message"""
advance_base = f"attempt to advance if their safe range is"
if to_base == 3:
if self.uncapped_third >= 8:
advance_range = 14
elif self.uncapped_third >= 5:
advance_range = 18
else:
return f"not attempt to advance"
if outs == 2:
advance_range += 2
else:
if self.uncapped_home >= 8:
advance_range = 8
elif self.uncapped_home >= 5:
advance_range = 10
else:
advance_range = 12
if outs == 2:
advance_range -= 2
elif outs == 0:
advance_range += 3
return f"{advance_base} {advance_range}+"
# def uncapped_advance_runner(self, this_play: StratPlay, to_base: int, runner: BattingCard, defender_pos: CardPosition, modifier: int = -1):
# total_mod = modifier + defender_pos.arm
# if to_base == 3:
def trail_advance(self, to_base: int, outs: int, sent_home: bool = False) -> str:
"""Returns a string to be posted with the advancement message"""
advance_base = f"attempt to advance if their safe range is"
if sent_home:
if self.uncapped_trail >= 8:
return "attempt to advance"
elif self.uncapped_trail >= 5:
if outs == 2:
return "attempt to advance"
else:
advance_range = 14
else:
return "not attempt to advance"
else:
if self.uncapped_trail >= 8:
advance_range = 14
else:
advance_range = 16
return f"{advance_base} {advance_range}+"
def throw_lead_runner(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the throw message"""
return "throw for the lead runner"
def throw_which_runner(self, to_base: int, outs: int) -> str:
"""Returns a string to be posted with the throw message"""
if to_base == 4:
return "throw for the lead runner"
else:
return "throw for the lead runner if their safe range is 14-"
def gb_decide_advance(self, starting_outs: int, run_lead: int):
"""Returns a string to be posted with the advancement message"""
advance_base = f"attempt to advance if their safe range is"
if self.running >= 8:
advance_range = 10
elif self.running >= 5:
advance_range = 13
else:
advance_range = 16
if starting_outs == 1:
advance_range += 3
if run_lead >= 4:
advance_range -= 3
elif run_lead < 0:
advance_range += 3
return f"{advance_base} {min(advance_range, 20)}+"
def gb_decide_throw(self, starting_outs: int, run_lead: int):
"""Returns a string to be posted with the advancement message"""
throw_base = f"throw for the lead runner if their safe range is"
if self.decide_throw >= 8:
throw_range = 13
elif self.decide_throw >= 5:
throw_range = 10
else:
throw_range = 8
if starting_outs == 1:
throw_range -= 3
if run_lead >= 4:
throw_range -= 3
elif run_lead < 0:
throw_range += 3
return f"{throw_base} {max(throw_range, 0)}-"
def go_to_reliever(
self, this_play, tot_allowed: int, is_starter: bool = False
) -> bool:
run_lead = this_play.ai_run_diff()
obc = this_play.on_base_code
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: "
f"outs: {this_play.starting_outs}, obc: {obc}, run_lead: {run_lead}, "
f"tot_allowed: {tot_allowed}"
)
lead_target = run_lead if is_starter else 3
# AI up big
if tot_allowed < 5 and is_starter:
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 1"
)
return False
elif run_lead > 5 or (run_lead > 2 and self.ahead_aggression > 5):
if (
tot_allowed <= lead_target
or obc <= 3
or (this_play.starting_outs == 2 and not is_starter)
):
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 2"
)
return False
elif run_lead > 2 or (run_lead >= 0 and self.ahead_aggression > 5):
if (
tot_allowed < lead_target
or obc <= 1
or (this_play.starting_outs == 2 and not is_starter)
):
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 3"
)
return False
elif run_lead >= 0 or (run_lead >= -2 and self.behind_aggression > 5):
if (
tot_allowed < 5
or obc <= run_lead
or (this_play.starting_outs == 2 and not is_starter)
):
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 4"
)
return False
elif run_lead >= -3 and self.behind_aggression > 5:
if tot_allowed < 5 and obc <= 1:
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 5"
)
return False
elif run_lead <= -5:
if is_starter and this_play.inning_num <= 3:
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 6"
)
return False
if this_play.starting_outs != 0:
logger.info(
f"db_calls_gameplay - StratManagerAi - ID: {self.id} - go_to_reliever: False / code 7"
)
return False
return True
def convert_strat_manager(manager: ManagerAi) -> StratManagerAi:
manager_dict = model_to_dict(manager)
return StratManagerAi(**manager_dict)
def get_manager(game) -> Optional[StratManagerAi]:
if not game.ai_team:
return None
# manager_ai_id = game.home_team_id if game.ai_team == 'home' else game.away_team_id
# manager_ai_id = 1
team_id = game.home_team_id if game.ai_team == "home" else game.away_team_id
manager_ai_id = ((datetime.datetime.now().day * team_id) % 3) + 1
if manager_ai_id > 3 or manager_ai_id < 1:
manager_ai_id = 1
logger.debug(f"manager id: {manager_ai_id} for game {game}")
try:
this_manager = ManagerAi.get_by_id(manager_ai_id)
except Exception as e:
e_message = f"Could not find manager id {manager_ai_id}"
logger.error(f"{e_message}: {type(e)}: {e}")
raise KeyError(f"Could not find this AI manager's playbook")
return convert_strat_manager(this_manager)
db.close()