major-domo-database/app/db_engine.py
Cal Corum c451e02c52
All checks were successful
Build Docker Image / build (pull_request) Successful in 18m25s
fix: remove hardcoded fallback password from PostgreSQL connection
Raise RuntimeError on startup if POSTGRES_PASSWORD env var is not set,
instead of silently falling back to a known password in source code.

Closes #C2 from postgres migration review.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 23:15:07 -05:00

3035 lines
102 KiB
Python

import copy
import datetime
import logging
import math
import os
from typing import Literal, List, Optional
from pandas import DataFrame
from peewee import *
from peewee import ModelSelect
from playhouse.shortcuts import model_to_dict
# Database configuration - supports both SQLite and PostgreSQL
DATABASE_TYPE = os.environ.get("DATABASE_TYPE", "sqlite")
if DATABASE_TYPE.lower() == "postgresql":
from playhouse.pool import PooledPostgresqlDatabase
_postgres_password = os.environ.get("POSTGRES_PASSWORD")
if _postgres_password is None:
raise RuntimeError(
"POSTGRES_PASSWORD environment variable is not set. "
"This variable is required when DATABASE_TYPE=postgresql."
)
db = PooledPostgresqlDatabase(
os.environ.get("POSTGRES_DB", "sba_master"),
user=os.environ.get("POSTGRES_USER", "sba_admin"),
password=_postgres_password,
host=os.environ.get("POSTGRES_HOST", "sba_postgres"),
port=int(os.environ.get("POSTGRES_PORT", "5432")),
max_connections=20,
stale_timeout=300, # 5 minutes
timeout=0,
autoconnect=True,
autorollback=True, # Automatically rollback failed transactions
)
else:
# Default SQLite configuration
db = SqliteDatabase(
"storage/sba_master.db",
pragmas={"journal_mode": "wal", "cache_size": -1 * 64000, "synchronous": 0},
)
date = f"{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}"
logger = logging.getLogger("discord_app")
"""
Per season updates:
Result: regular_season & post_season - set season length
update_standings - confirm division alignments and records
Standings: recalculate - e_number function, set season length
- wildcard section, set league abbrevs
"""
def model_csv_headers(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.keys()]
def model_to_csv(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.values()]
def query_to_csv(all_items: ModelSelect, exclude=None):
if all_items.count() == 0:
data_list = [["No data found"]]
else:
data_list = [model_csv_headers(all_items[0], exclude=exclude)]
for x in all_items:
data_list.append(model_to_csv(x, exclude=exclude))
return DataFrame(data_list).to_csv(header=False, index=False)
def complex_data_to_csv(complex_data: List):
if len(complex_data) == 0:
data_list = [["No data found"]]
else:
data_list = [[x for x in complex_data[0].keys()]]
for line in complex_data:
logger.info(f"line: {line}")
this_row = []
for key in line:
logger.info(f"key: {key}")
if line[key] is None:
this_row.append("")
elif isinstance(line[key], dict):
if "name" in line[key]:
this_row.append(line[key]["name"])
elif "abbrev" in line[key]:
this_row.append(line[key]["abbrev"])
else:
this_row.append(line[key]["id"])
elif isinstance(line[key], int) and line[key] > 100000000:
this_row.append(f"'{line[key]}")
elif isinstance(line[key], str) and "," in line[key]:
this_row.append(line[key].replace(",", "-_-"))
else:
this_row.append(line[key])
data_list.append(this_row)
return DataFrame(data_list).to_csv(header=False, index=False)
def per_season_weeks(season: int, s_type: Literal["regular", "post", "total"]):
if season == 1:
if s_type == "regular":
return {"start": 1, "end": 20}
elif s_type == "post":
return {"start": 21, "end": 22}
else:
return {"start": 1, "end": 22}
elif season in [3, 4, 5, 6, 7]:
if s_type == "regular":
return {"start": 1, "end": 22}
elif s_type == "post":
return {"start": 23, "end": 25}
else:
return {"start": 1, "end": 25}
# Season 2, 8, and beyond
else:
if s_type == "regular":
return {"start": 1, "end": 18}
elif s_type == "post":
return {"start": 19, "end": 21}
else:
return {"start": 1, "end": 21}
def win_pct(this_team_stan):
if this_team_stan.wins + this_team_stan.losses == 0:
return 0
else:
return (this_team_stan.wins / (this_team_stan.wins + this_team_stan.losses)) + (
this_team_stan.run_diff * 0.000001
)
def games_back(leader, chaser):
return ((leader.wins - chaser.wins) + (chaser.losses - leader.losses)) / 2
def e_number(leader, chaser):
e_num = 73 - leader.wins - chaser.losses
return e_num if e_num > 0 else 0
class BaseModel(Model):
class Meta:
database = db
class Current(BaseModel):
week = IntegerField(default=0)
freeze = BooleanField(default=True)
season = IntegerField()
transcount = IntegerField(default=0)
bstatcount = IntegerField(default=0, null=True)
pstatcount = IntegerField(default=0, null=True)
bet_week = IntegerField(default=0, null=True)
trade_deadline = IntegerField()
pick_trade_start = IntegerField()
pick_trade_end = IntegerField()
playoffs_begin = IntegerField()
injury_count = IntegerField(null=True)
@staticmethod
def latest():
latest_current = Current.select().order_by(-Current.id).get()
return latest_current
class Division(BaseModel):
division_name = CharField()
division_abbrev = CharField()
league_name = CharField(null=True)
league_abbrev = CharField(null=True)
season = IntegerField(default=0)
def abbrev(self):
league_short = self.league_abbrev + " " if self.league_abbrev else ""
return f"{league_short}{self.division_abbrev}"
def short_name(self):
league_short = self.league_abbrev + " " if self.league_abbrev else ""
return f"{league_short}{self.division_name}"
def full_name(self):
league_long = self.league_name + " " if self.league_name else ""
return f"{league_long}{self.division_name}"
def sort_division(self, season):
div_query = Standings.select_season(season).where(
Standings.team.division == self
)
div_teams = [team_stan for team_stan in div_query]
div_teams.sort(key=lambda team: win_pct(team), reverse=True)
# Assign div_gb and e_num
for x in range(len(div_teams)):
# # Used for two playoff teams per divsion
# # Special calculations for the division leader
# if x == 0:
# div_teams[0].div_gb = -games_back(div_teams[0], div_teams[2])
# div_teams[0].div_e_num = None
# div_teams[0].wc_gb = None
# div_teams[0].wc_e_num = None
# elif x == 1:
# div_teams[1].div_gb = 0
# div_teams[1].div_e_num = None
# div_teams[1].wc_gb = None
# div_teams[1].wc_e_num = None
# else:
# div_teams[x].div_gb = games_back(div_teams[1], div_teams[x])
# div_teams[x].div_e_num = e_number(div_teams[1], div_teams[x])
# Used for one playoff team per division
if x == 0:
div_teams[0].div_gb = None
div_teams[0].div_e_num = None
div_teams[0].wc_gb = None
div_teams[0].wc_e_num = None
else:
div_teams[x].div_gb = games_back(div_teams[0], div_teams[x])
div_teams[x].div_e_num = e_number(div_teams[0], div_teams[x])
div_teams[x].wc_gb = 99
div_teams[x].wc_e_num = 99
div_teams[x].save()
@staticmethod
def sort_wildcard(season, league_abbrev):
divisions = Division.select().where(Division.league_abbrev == league_abbrev)
teams_query = Standings.select_season(season).where(
Standings.wc_gb.is_null(False) & (Standings.team.division << divisions)
)
league_teams = [team_stan for team_stan in teams_query]
league_teams.sort(key=lambda team: win_pct(team), reverse=True)
for x in range(len(league_teams)):
# Special calculations for two wildcard teams
if x < 4:
league_teams[x].wc_gb = -games_back(league_teams[x], league_teams[4])
league_teams[x].wc_e_num = None
# elif x == 3:
# league_teams[x].wc_gb = 0
# league_teams[x].wc_e_num = None
else:
league_teams[x].wc_gb = games_back(league_teams[3], league_teams[x])
league_teams[x].wc_e_num = e_number(league_teams[3], league_teams[x])
league_teams[x].save()
class Manager(BaseModel):
name = CharField(unique=True)
image = CharField(null=True)
headline = CharField(null=True)
bio = CharField(null=True)
class Team(BaseModel):
abbrev = CharField()
sname = CharField()
lname = CharField()
manager_legacy = CharField(null=True)
division_legacy = CharField(null=True)
gmid = CharField(max_length=20, null=True) # Discord snowflake IDs as strings
gmid2 = CharField(max_length=20, null=True) # Discord snowflake IDs as strings
manager1 = ForeignKeyField(Manager, null=True)
manager2 = ForeignKeyField(Manager, null=True)
division = ForeignKeyField(Division, null=True)
mascot = CharField(null=True)
stadium = CharField(null=True)
gsheet = CharField(null=True)
thumbnail = CharField(null=True)
color = CharField(null=True)
dice_color = CharField(null=True)
season = IntegerField()
auto_draft = BooleanField(null=True)
salary_cap = FloatField(null=True)
@staticmethod
def select_season(num):
return Team.select().where(Team.season == num)
@staticmethod
def get_by_owner(gmid, season):
team = Team.get_or_none(Team.gmid == gmid, Team.season == season)
if not team:
team = Team.get_or_none(Team.gmid2 == gmid, Team.season == season)
if not team:
return None
return team
@staticmethod
def get_season(name_or_abbrev, season):
team = Team.get_or_none(
fn.Upper(Team.abbrev) == name_or_abbrev.upper(), Team.season == season
)
if not team:
team = Team.get_or_none(
fn.Lower(Team.sname) == name_or_abbrev.lower(), Team.season == season
)
if not team:
team = Team.get_or_none(
fn.Lower(Team.lname) == name_or_abbrev.lower(),
Team.season == season,
)
return team
def get_record(self, week):
wins = Result.select_season(Current.latest().season).where(
(
((Result.hometeam == self) & (Result.homescore > Result.awayscore))
| ((Result.awayteam == self) & (Result.awayscore > Result.homescore))
)
& (Result.week <= week)
)
losses = Result.select_season(Current.latest().season).where(
(
((Result.awayteam == self) & (Result.homescore > Result.awayscore))
| ((Result.hometeam == self) & (Result.awayscore > Result.homescore))
)
& (Result.week <= week)
)
if wins.count() + losses.count() > 0:
pct = wins.count() / (wins.count() + losses.count())
else:
pct = 0
return {"w": wins.count(), "l": losses.count(), "pct": pct}
def get_gms(self):
if self.gmid2:
return [self.gmid, self.gmid2]
else:
return [self.gmid]
def get_this_week(self):
active_team = (
Player.select_season(self.season)
.where(Player.team == self)
.order_by(Player.wara)
)
active_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in active_team:
active_roster["WARa"] += guy.wara
active_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
try:
for pos in guy_pos:
active_roster[pos] += 1
except KeyError:
# This happens for season 1 without player positions listed
pass
if combo_pitchers > 0:
if active_roster["SP"] < 5:
if 5 - active_roster["SP"] <= combo_pitchers:
delta = 5 - active_roster["SP"]
else:
delta = combo_pitchers
active_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
active_roster["RP"] += combo_pitchers
short_il = (
Player.select_season(self.season)
.join(Team)
.where(Player.team.abbrev == f"{self.abbrev}IL")
)
short_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in short_il:
short_roster["WARa"] += guy.wara
short_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
short_roster[pos] += 1
if combo_pitchers > 0:
if short_roster["SP"] < 5:
if 5 - short_roster["SP"] <= combo_pitchers:
delta = 5 - short_roster["SP"]
else:
delta = combo_pitchers
short_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
short_roster["RP"] += combo_pitchers
long_il = (
Player.select_season(self.season)
.join(Team)
.where(Player.team.abbrev == f"{self.abbrev}MiL")
)
long_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in long_il:
long_roster["WARa"] += guy.wara
long_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
long_roster[pos] += 1
if combo_pitchers > 0:
if long_roster["SP"] < 5:
if 5 - long_roster["SP"] <= combo_pitchers:
delta = 5 - long_roster["SP"]
else:
delta = combo_pitchers
long_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
long_roster["RP"] += combo_pitchers
return {"active": active_roster, "shortil": short_roster, "longil": long_roster}
def get_next_week(self):
current = Current.latest()
active_team = Player.select_season(current.season).where(Player.team == self)
active_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in active_team:
active_roster["WARa"] += guy.wara
active_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
active_roster[pos] += 1
all_drops = Transaction.select_season(Current.latest().season).where(
(Transaction.oldteam == self)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
all_adds = Transaction.select_season(Current.latest().season).where(
(Transaction.newteam == self)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
for move in all_drops:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers -= 1
else:
for pos in guy_pos:
active_roster[pos] -= 1
# print(f'dropping {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
active_roster["WARa"] -= move.player.wara
try:
active_roster["players"].remove(move.player)
except Exception:
print(f"I could not drop {move.player.name}")
for move in all_adds:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
active_roster[pos] += 1
# print(f'adding {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
active_roster["WARa"] += move.player.wara
active_roster["players"].append(move.player)
if combo_pitchers > 0:
if active_roster["SP"] < 5:
if 5 - active_roster["SP"] <= combo_pitchers:
delta = 5 - active_roster["SP"]
else:
delta = combo_pitchers
active_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
active_roster["RP"] += combo_pitchers
short_il = (
Player.select_season(current.season)
.join(Team)
.where(Player.team.abbrev == f"{self.abbrev}SIL")
)
short_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in short_il:
short_roster["WARa"] += guy.wara
short_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
short_roster[pos] += 1
sil_team = Team.get_season(f"{self.abbrev}SIL", current.season)
all_drops = Transaction.select_season(Current.latest().season).where(
(Transaction.oldteam == sil_team)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
all_adds = Transaction.select_season(Current.latest().season).where(
(Transaction.newteam == sil_team)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
for move in all_drops:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers -= 1
else:
for pos in guy_pos:
short_roster[pos] -= 1
short_roster["WARa"] -= move.player.wara
# print(f'SIL dropping {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
try:
short_roster["players"].remove(move.player)
except Exception:
print(f"I could not drop {move.player.name}")
for move in all_adds:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
short_roster[pos] += 1
# print(f'SIL adding {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
short_roster["WARa"] += move.player.wara
short_roster["players"].append(move.player)
if combo_pitchers > 0:
if short_roster["SP"] < 5:
if 5 - short_roster["SP"] <= combo_pitchers:
delta = 5 - short_roster["SP"]
else:
delta = combo_pitchers
short_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
short_roster["RP"] += combo_pitchers
long_il = (
Player.select_season(current.season)
.join(Team)
.where(Player.team.abbrev == f"{self.abbrev}MiL")
)
long_roster = {
"C": 0,
"1B": 0,
"2B": 0,
"3B": 0,
"SS": 0,
"LF": 0,
"CF": 0,
"RF": 0,
"DH": 0,
"SP": 0,
"RP": 0,
"CP": 0,
"WARa": 0,
"players": [],
}
combo_pitchers = 0
for guy in long_il:
long_roster["WARa"] += guy.wara
long_roster["players"].append(guy)
guy_pos = guy.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
long_roster[pos] += 1
lil_team = Team.get_season(f"{self.abbrev}LIL", current.season)
all_drops = Transaction.select_season(Current.latest().season).where(
(Transaction.oldteam == lil_team)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
all_adds = Transaction.select_season(Current.latest().season).where(
(Transaction.newteam == lil_team)
& (Transaction.week == current.week + 1)
& (Transaction.cancelled == 0)
)
for move in all_drops:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers -= 1
else:
for pos in guy_pos:
long_roster[pos] -= 1
long_roster["WARa"] -= move.player.wara
# print(f'LIL dropping {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
try:
long_roster["players"].remove(move.player)
except Exception:
print(f"I could not drop {move.player.name}")
for move in all_adds:
guy_pos = move.player.get_positions()
if "SP" in guy_pos and "RP" in guy_pos:
combo_pitchers += 1
else:
for pos in guy_pos:
long_roster[pos] += 1
# print(f'LIL adding {move.player.name} id ({move.player.get_id()}) for {move.player.wara} WARa')
long_roster["WARa"] += move.player.wara
long_roster["players"].append(move.player)
if combo_pitchers > 0:
if long_roster["SP"] < 5:
if 5 - long_roster["SP"] <= combo_pitchers:
delta = 5 - long_roster["SP"]
else:
delta = combo_pitchers
long_roster["SP"] += delta
combo_pitchers -= delta
if combo_pitchers > 0:
long_roster["RP"] += combo_pitchers
return {"active": active_roster, "shortil": short_roster, "longil": long_roster}
def run_pythag_last8(self):
team_stan = Standings.get_or_none(Standings.team == self)
runs_scored, runs_allowed = 0, 0
away_games = StratGame.select(
fn.SUM(StratGame.away_score).alias("r_scored"),
fn.SUM(StratGame.home_score).alias("r_allowed"),
).where((StratGame.away_team == self) & StratGame.game_num.is_null(False))
if away_games.count() > 0:
runs_scored += away_games[0].r_scored
runs_allowed += away_games[0].r_allowed
home_games = StratGame.select(
fn.SUM(StratGame.home_score).alias("r_scored"),
fn.SUM(StratGame.away_score).alias("r_allowed"),
).where((StratGame.home_team == self) & StratGame.game_num.is_null(False))
if home_games.count() > 0:
runs_scored += home_games[0].r_scored
runs_allowed += home_games[0].r_allowed
if runs_allowed == 0:
pythag_win_pct = 1
elif runs_scored == 0:
pythag_win_pct = 0
else:
pythag_win_pct = runs_scored**1.83 / (
(runs_scored**1.83) + (runs_allowed**1.83)
)
games_played = team_stan.wins + team_stan.losses
team_stan.pythag_wins = round(games_played * pythag_win_pct)
team_stan.pythag_losses = games_played - team_stan.pythag_wins
last_games = (
StratGame.select()
.where(
((StratGame.home_team == self) | (StratGame.away_team == self))
& (StratGame.game_num.is_null(False))
)
.order_by(-StratGame.season, -StratGame.week, -StratGame.game_num)
.limit(8)
)
for game in last_games:
if game.home_score > game.away_score:
if game.home_team == self:
team_stan.last8_wins += 1
else:
team_stan.last8_losses += 1
else:
if game.home_team == self:
team_stan.last8_losses += 1
else:
team_stan.last8_wins += 1
return team_stan.save()
class Result(BaseModel):
week = IntegerField()
game = IntegerField()
awayteam = ForeignKeyField(Team)
hometeam = ForeignKeyField(Team)
awayscore = IntegerField()
homescore = IntegerField()
season = IntegerField()
scorecard_url = CharField(null=True)
@staticmethod
def regular_season(num):
if num == 1:
return Result.select().where((Result.season == 1) & (Result.week < 21))
elif num == 2:
return Result.select().where((Result.season == 2) & (Result.week < 19))
elif num == 3 or num == 4:
return Result.select().where((Result.season == num) & (Result.week < 23))
else:
return None
@staticmethod
def post_season(num):
if num == 1:
return Result.select().where((Result.season == 1) & (Result.week >= 21))
elif num == 2:
return Result.select().where((Result.season == 2) & (Result.week >= 19))
elif num == 3 or num == 4:
return Result.select().where((Result.season == num) & (Result.week >= 23))
else:
return None
@staticmethod
def select_season(num):
return Result.select().where(Result.season == num)
# def update_standings(self):
# away_stan = Standings.get_season(self.awayteam)
# home_stan = Standings.get_season(self.hometeam)
# away_div = Division.get_by_id(self.awayteam.division.id)
# home_div = Division.get_by_id(self.hometeam.division.id)
#
# if self.homescore > self.awayscore:
# # - generic w/l & home/away w/l
# home_stan.wins += 1
# home_stan.home_wins += 1
# away_stan.losses += 1
# away_stan.away_losses += 1
#
# # - update streak wl and num
# if home_stan.streak_wl == 'w':
# home_stan.streak_num += 1
# else:
# home_stan.streak_wl = 'w'
# home_stan.streak_num = 1
#
# if away_stan.streak_wl == 'l':
# away_stan.streak_num += 1
# else:
# away_stan.streak_wl = 'l'
# away_stan.streak_num = 1
#
# # - if 1-run, tally accordingly
# if self.homescore == self.awayscore + 1:
# home_stan.one_run_wins += 1
# away_stan.one_run_losses += 1
#
# # Used for one league with 3 divisions
# # - update record v division
# # if away_div.division_abbrev == 'BE':
# # home_stan.div1_wins += 1
# # elif away_div.division_abbrev == 'DO':
# # home_stan.div2_wins += 1
# # else:
# # home_stan.div3_wins += 1
# #
# # if home_div.division_abbrev == 'BE':
# # away_stan.div1_losses += 1
# # elif home_div.division_abbrev == 'DO':
# # away_stan.div2_losses += 1
# # else:
# # away_stan.div3_losses += 1
#
# # Used for two league plus divisions
# if away_div.league_abbrev == 'AL':
# if away_div.division_abbrev == 'E':
# home_stan.div1_wins += 1
# else:
# home_stan.div2_wins += 1
# else:
# if away_div.division_abbrev == 'E':
# home_stan.div3_wins += 1
# else:
# home_stan.div4_wins += 1
#
# if home_div.league_abbrev == 'AL':
# if home_div.division_abbrev == 'E':
# away_stan.div1_losses += 1
# else:
# away_stan.div2_losses += 1
# else:
# if home_div.division_abbrev == 'E':
# away_stan.div3_losses += 1
# else:
# away_stan.div4_losses += 1
#
# # - adjust run_diff
# home_stan.run_diff += self.homescore - self.awayscore
# away_stan.run_diff -= self.homescore - self.awayscore
# else:
# # - generic w/l & home/away w/l
# home_stan.losses += 1
# home_stan.home_losses += 1
# away_stan.wins += 1
# away_stan.away_wins += 1
#
# # - update streak wl and num
# if home_stan.streak_wl == 'l':
# home_stan.streak_num += 1
# else:
# home_stan.streak_wl = 'l'
# home_stan.streak_num = 1
#
# if away_stan.streak_wl == 'w':
# away_stan.streak_num += 1
# else:
# away_stan.streak_wl = 'w'
# away_stan.streak_num = 1
#
# # - if 1-run, tally accordingly
# if self.awayscore == self.homescore + 1:
# home_stan.one_run_losses += 1
# away_stan.one_run_wins += 1
#
# # Used for one league with 3 divisions
# # - update record v division
# # if away_div.division_abbrev == 'BE':
# # home_stan.div1_losses += 1
# # elif away_div.division_abbrev == 'DO':
# # home_stan.div2_losses += 1
# # else:
# # home_stan.div3_losses += 1
# #
# # if home_div.division_abbrev == 'BE':
# # away_stan.div1_wins += 1
# # elif home_div.division_abbrev == 'DO':
# # away_stan.div2_wins += 1
# # else:
# # away_stan.div3_wins += 1
#
# # Used for two league plus divisions
# if away_div.league_abbrev == 'AL':
# if away_div.division_abbrev == 'E':
# home_stan.div1_losses += 1
# else:
# home_stan.div2_losses += 1
# else:
# if away_div.division_abbrev == 'E':
# home_stan.div3_losses += 1
# else:
# home_stan.div4_losses += 1
#
# if home_div.league_abbrev == 'AL':
# if home_div.division_abbrev == 'E':
# away_stan.div1_wins += 1
# else:
# away_stan.div2_wins += 1
# else:
# if home_div.division_abbrev == 'E':
# away_stan.div3_wins += 1
# else:
# away_stan.div4_wins += 1
#
# # - adjust run_diff
# home_stan.run_diff -= self.awayscore - self.homescore
# away_stan.run_diff += self.awayscore - self.homescore
#
# home_stan.save()
# away_stan.save()
class SbaPlayer(BaseModel):
first_name = CharField()
last_name = CharField()
key_fangraphs = IntegerField(null=True)
key_bbref = CharField(null=True)
key_retro = CharField(null=True)
key_mlbam = IntegerField(null=True)
class Player(BaseModel):
name = CharField(max_length=500)
wara = FloatField()
image = CharField(max_length=1000)
image2 = CharField(max_length=1000, null=True)
team = ForeignKeyField(Team)
season = IntegerField()
pitcher_injury = IntegerField(null=True)
pos_1 = CharField(max_length=5)
pos_2 = CharField(max_length=5, null=True)
pos_3 = CharField(max_length=5, null=True)
pos_4 = CharField(max_length=5, null=True)
pos_5 = CharField(max_length=5, null=True)
pos_6 = CharField(max_length=5, null=True)
pos_7 = CharField(max_length=5, null=True)
pos_8 = CharField(max_length=5, null=True)
last_game = CharField(max_length=20, null=True)
last_game2 = CharField(max_length=20, null=True)
il_return = CharField(max_length=20, null=True)
demotion_week = IntegerField(null=True)
headshot = CharField(max_length=500, null=True)
vanity_card = CharField(max_length=500, null=True)
strat_code = CharField(max_length=100, null=True)
bbref_id = CharField(max_length=50, null=True)
injury_rating = CharField(max_length=50, null=True)
sbaplayer = ForeignKeyField(SbaPlayer, null=True)
@staticmethod
def select_season(num):
return Player.select().where(Player.season == num)
@staticmethod
def get_season(name, num):
player = None
try:
player = Player.get(
fn.Lower(Player.name) == name.lower(), Player.season == num
)
except Exception as e:
print(f"**Error** (db_engine player): {e}")
finally:
return player
def get_positions(self):
"""
Params: None
Return: List of positions (ex ['1b', '3b'])
"""
pos_list = []
if self.pos_1:
pos_list.append(self.pos_1)
if self.pos_2:
pos_list.append(self.pos_2)
if self.pos_3:
pos_list.append(self.pos_3)
if self.pos_4:
pos_list.append(self.pos_4)
if self.pos_5:
pos_list.append(self.pos_5)
if self.pos_6:
pos_list.append(self.pos_6)
if self.pos_7:
pos_list.append(self.pos_7)
if self.pos_8:
pos_list.append(self.pos_8)
return pos_list
class Schedule(BaseModel):
week = IntegerField()
awayteam = ForeignKeyField(Team)
hometeam = ForeignKeyField(Team)
gamecount = IntegerField()
season = IntegerField()
@staticmethod
def select_season(season):
return Schedule.select().where(Schedule.season == season)
class Transaction(BaseModel):
week = IntegerField()
player = ForeignKeyField(Player)
oldteam = ForeignKeyField(Team)
newteam = ForeignKeyField(Team)
season = IntegerField()
moveid = CharField(max_length=50)
cancelled = BooleanField(default=False)
frozen = BooleanField(default=False)
@staticmethod
def select_season(num):
return Transaction.select().where(Transaction.season == num)
class BattingStat(BaseModel):
player = ForeignKeyField(Player)
team = ForeignKeyField(Team)
pos = CharField()
pa = IntegerField()
ab = IntegerField()
run = IntegerField()
hit = IntegerField()
rbi = IntegerField()
double = IntegerField()
triple = IntegerField()
hr = IntegerField()
bb = IntegerField()
so = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
gidp = IntegerField()
sb = IntegerField()
cs = IntegerField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
xba = IntegerField()
xbt = IntegerField()
xch = IntegerField()
xhit = IntegerField()
error = IntegerField()
pb = IntegerField()
sbc = IntegerField()
csc = IntegerField()
roba = IntegerField()
robs = IntegerField()
raa = IntegerField()
rto = IntegerField()
week = IntegerField()
game = IntegerField()
season = IntegerField()
@staticmethod
def combined_season(season):
"""
Params: season, integer (season number), optional
Return: ModelSelect object for <num> season
"""
return BattingStat.select().where(BattingStat.season == season)
@staticmethod
def regular_season(season):
"""
Params: num, integer (season number)
Return: ModelSelect object for <num> season's regular season
"""
if season == 1:
return (
BattingStat.select()
.where((BattingStat.season == 1) & (BattingStat.week < 21))
.order_by(BattingStat.week)
)
elif season == 2:
return (
BattingStat.select()
.where((BattingStat.season == 2) & (BattingStat.week < 19))
.order_by(BattingStat.week)
)
elif season > 2:
return (
BattingStat.select()
.where((BattingStat.season == season) & (BattingStat.week < 23))
.order_by(BattingStat.week)
)
else:
return None
@staticmethod
def post_season(season):
"""
Params: num, integer (season number)
Return: ModelSelect object for <num> season's post season
"""
if season == 1:
return BattingStat.select().where(
(BattingStat.season == 1) & (BattingStat.week >= 21)
)
elif season == 2:
return BattingStat.select().where(
(BattingStat.season == 2) & (BattingStat.week >= 19)
)
elif season > 2:
return BattingStat.select().where(
(BattingStat.season == season) & (BattingStat.week >= 23)
)
else:
return None
@staticmethod
def team_season(team, season):
b_stats = (
BattingStat.regular_season(season)
.join(Player)
.select(
fn.SUM(BattingStat.pa).alias("pas"),
fn.SUM(BattingStat.ab).alias("abs"),
fn.SUM(BattingStat.run).alias("runs"),
fn.SUM(BattingStat.hit).alias("hits"),
fn.SUM(BattingStat.rbi).alias("rbis"),
fn.SUM(BattingStat.double).alias("doubles"),
fn.SUM(BattingStat.triple).alias("triples"),
fn.SUM(BattingStat.hr).alias("hrs"),
fn.SUM(BattingStat.bb).alias("bbs"),
fn.SUM(BattingStat.so).alias("sos"),
fn.SUM(BattingStat.hbp).alias("hbps"),
fn.SUM(BattingStat.sac).alias("sacs"),
fn.SUM(BattingStat.ibb).alias("ibbs"),
fn.SUM(BattingStat.gidp).alias("gidps"),
fn.SUM(BattingStat.sb).alias("sbs"),
fn.SUM(BattingStat.cs).alias("css"),
fn.SUM(BattingStat.bphr).alias("bphr"),
fn.SUM(BattingStat.bpfo).alias("bpfo"),
fn.SUM(BattingStat.bp1b).alias("bp1b"),
fn.SUM(BattingStat.bplo).alias("bplo"),
# fn.SUM(BattingStat.xba).alias('xba'),
# fn.SUM(BattingStat.xbt).alias('xbt'),
fn.COUNT(BattingStat.game).alias("games"),
)
.where(BattingStat.team == team)
)
total = {
"game": b_stats[0].games if b_stats[0].games else 0,
"pa": b_stats[0].pas if b_stats[0].pas else 0,
"ab": b_stats[0].abs if b_stats[0].abs else 0,
"run": b_stats[0].runs if b_stats[0].runs else 0,
"hit": b_stats[0].hits if b_stats[0].hits else 0,
"rbi": b_stats[0].rbis if b_stats[0].rbis else 0,
"double": b_stats[0].doubles if b_stats[0].doubles else 0,
"triple": b_stats[0].triples if b_stats[0].triples else 0,
"hr": b_stats[0].hrs if b_stats[0].hrs else 0,
"bb": b_stats[0].bbs if b_stats[0].bbs else 0,
"so": b_stats[0].sos if b_stats[0].sos else 0,
"hbp": b_stats[0].hbps if b_stats[0].hbps else 0,
"sac": b_stats[0].sacs if b_stats[0].sacs else 0,
"ibb": b_stats[0].ibbs if b_stats[0].ibbs else 0,
"gidp": b_stats[0].gidps if b_stats[0].gidps else 0,
"sb": b_stats[0].sbs if b_stats[0].sbs else 0,
"cs": b_stats[0].css if b_stats[0].css else 0,
"ba": 0,
"obp": 0,
"slg": 0,
"woba": 0,
"kpct": 0,
"bphr": b_stats[0].bphr if b_stats[0].bphr else 0,
"bpfo": b_stats[0].bpfo if b_stats[0].bpfo else 0,
"bp1b": b_stats[0].bp1b if b_stats[0].bp1b else 0,
"bplo": b_stats[0].bplo if b_stats[0].bplo else 0,
# 'xba': b_stats[0].xba if b_stats[0].xba else 0,
# 'xbt': b_stats[0].xbt if b_stats[0].xbt else 0,
}
if b_stats[0].abs:
total["ba"] = b_stats[0].hits / b_stats[0].abs
total["obp"] = (
b_stats[0].bbs + b_stats[0].hits + b_stats[0].hbps + b_stats[0].ibbs
) / b_stats[0].pas
total["slg"] = (
(b_stats[0].hrs * 4)
+ (b_stats[0].triples * 3)
+ (b_stats[0].doubles * 2)
+ (
b_stats[0].hits
- b_stats[0].hrs
- b_stats[0].triples
- b_stats[0].doubles
)
) / b_stats[0].abs
total["woba"] = (
(b_stats[0].bbs * 0.69)
+ (b_stats[0].hbps * 0.722)
+ (b_stats[0].doubles * 1.271)
+ (b_stats[0].triples * 1.616)
+ (b_stats[0].hrs * 2.101)
+ (
(
b_stats[0].hits
- b_stats[0].hrs
- b_stats[0].triples
- b_stats[0].doubles
)
* 0.888
)
) / (b_stats[0].pas - b_stats[0].ibbs)
total["kpct"] = (total["so"] * 100) / total["ab"]
total_innings = (
PitchingStat.regular_season(season)
.join(Player)
.select(
fn.SUM(PitchingStat.ip).alias("ips"),
)
.where(PitchingStat.player.team == team)
)
total["rper9"] = (total["run"] * 9) / total_innings[0].ips
return total
@staticmethod
def team_fielding_season(team, season):
f_stats = (
BattingStat.regular_season(season)
.select(
fn.SUM(BattingStat.xch).alias("xchs"),
fn.SUM(BattingStat.xhit).alias("xhits"),
fn.SUM(BattingStat.error).alias("errors"),
# fn.SUM(BattingStat.roba).alias('roba'),
# fn.SUM(BattingStat.robs).alias('robs'),
# fn.SUM(BattingStat.raa).alias('raa'),
# fn.SUM(BattingStat.rto).alias('rto'),
fn.SUM(BattingStat.pb).alias("pbs"),
fn.SUM(BattingStat.sbc).alias("sbas"),
fn.SUM(BattingStat.csc).alias("cscs"),
fn.COUNT(BattingStat.game).alias("games"),
)
.where(BattingStat.team == team)
)
total = {
"game": f_stats[0].games if f_stats[0].games else 0,
"xch": f_stats[0].xchs if f_stats[0].xchs else 0,
"xhit": f_stats[0].xhits if f_stats[0].xhits else 0,
"error": f_stats[0].errors if f_stats[0].errors else 0,
# 'roba': f_stats[0].roba if f_stats[0].roba else 0,
# 'robs': f_stats[0].robs if f_stats[0].robs else 0,
# 'raa': f_stats[0].raa if f_stats[0].raa else 0,
# 'rto': f_stats[0].rto if f_stats[0].rto else 0,
"pb": f_stats[0].pbs if f_stats[0].pbs else 0,
"sbc": f_stats[0].sbas if f_stats[0].sbas else 0,
"csc": f_stats[0].cscs if f_stats[0].cscs else 0,
"wfpct": 0,
"cspct": 0,
}
if total["xch"] > 0:
total["wfpct"] = (
total["xch"] - (total["error"] * 0.5) - (total["xhit"] * 0.75)
) / (total["xch"])
if total["sbc"] > 0:
total["cspct"] = (total["csc"] / total["sbc"]) * 100
return total
class PitchingStat(BaseModel):
player = ForeignKeyField(Player)
team = ForeignKeyField(Team)
ip = FloatField()
hit = FloatField()
run = FloatField()
erun = FloatField()
so = FloatField()
bb = FloatField()
hbp = FloatField()
wp = FloatField()
balk = FloatField()
hr = FloatField()
ir = FloatField()
irs = FloatField()
gs = FloatField()
win = FloatField()
loss = FloatField()
hold = FloatField()
sv = FloatField()
bsv = FloatField()
week = IntegerField()
game = IntegerField()
season = IntegerField()
@staticmethod
def select_season(season):
return PitchingStat.select().where(PitchingStat.season == season)
@staticmethod
def regular_season(season):
if season == 1:
return (
PitchingStat.select()
.where((PitchingStat.season == 1) & (PitchingStat.week < 21))
.order_by(PitchingStat.week)
)
elif season == 2:
return (
PitchingStat.select()
.where((PitchingStat.season == 2) & (PitchingStat.week < 19))
.order_by(PitchingStat.week)
)
elif season > 2:
return (
PitchingStat.select()
.where((PitchingStat.season == season) & (PitchingStat.week < 23))
.order_by(PitchingStat.week)
)
else:
return None
@staticmethod
def post_season(season):
if season == 1:
return (
PitchingStat.select()
.where((PitchingStat.season == 1) & (PitchingStat.week >= 21))
.order_by(PitchingStat.week)
)
elif season == 2:
return (
PitchingStat.select()
.where((PitchingStat.season == 2) & (PitchingStat.week >= 19))
.order_by(PitchingStat.week)
)
elif season > 2:
return (
PitchingStat.select()
.where((PitchingStat.season == season) & (PitchingStat.week >= 23))
.order_by(PitchingStat.week)
)
else:
return None
@staticmethod
def team_season(team, season):
p_stats = (
PitchingStat.regular_season(season)
.select(
fn.SUM(PitchingStat.ip).alias("ips"),
fn.SUM(PitchingStat.hit).alias("hits"),
fn.SUM(PitchingStat.run).alias("runs"),
fn.SUM(PitchingStat.erun).alias("eruns"),
fn.SUM(PitchingStat.so).alias("sos"),
fn.SUM(PitchingStat.bb).alias("bbs"),
fn.SUM(PitchingStat.hbp).alias("hbps"),
fn.SUM(PitchingStat.wp).alias("wps"),
fn.SUM(PitchingStat.ir).alias("ir"),
fn.SUM(PitchingStat.irs).alias("irs"),
fn.SUM(PitchingStat.balk).alias("balks"),
fn.SUM(PitchingStat.hr).alias("hrs"),
fn.COUNT(PitchingStat.game).alias("games"),
fn.SUM(PitchingStat.gs).alias("gss"),
fn.SUM(PitchingStat.win).alias("wins"),
fn.SUM(PitchingStat.loss).alias("losses"),
fn.SUM(PitchingStat.hold).alias("holds"),
fn.SUM(PitchingStat.sv).alias("saves"),
fn.SUM(PitchingStat.bsv).alias("bsaves"),
)
.where(PitchingStat.team == team)
)
total = {
"ip": p_stats[0].ips if p_stats[0].ips else 0,
"hit": int(p_stats[0].hits) if p_stats[0].hits else 0,
"run": int(p_stats[0].runs) if p_stats[0].runs else 0,
"erun": int(p_stats[0].eruns) if p_stats[0].eruns else 0,
"so": int(p_stats[0].sos) if p_stats[0].sos else 0,
"bb": int(p_stats[0].bbs) if p_stats[0].bbs else 0,
"hbp": int(p_stats[0].hbps) if p_stats[0].hbps else 0,
"wp": int(p_stats[0].wps) if p_stats[0].wps else 0,
"balk": int(p_stats[0].balks) if p_stats[0].balks else 0,
"hr": int(p_stats[0].hrs) if p_stats[0].hrs else 0,
"game": int(p_stats[0].games) if p_stats[0].games else 0,
"gs": int(p_stats[0].gss) if p_stats[0].gss else 0,
"win": int(p_stats[0].wins) if p_stats[0].wins else 0,
"loss": int(p_stats[0].losses) if p_stats[0].losses else 0,
"hold": int(p_stats[0].holds) if p_stats[0].holds else 0,
"sv": int(p_stats[0].saves) if p_stats[0].saves else 0,
"bsv": int(p_stats[0].bsaves) if p_stats[0].bsaves else 0,
"wl%": 0,
"era": 0,
"whip": 0,
"ir": int(p_stats[0].ir) if p_stats[0].ir else 0,
"irs": int(p_stats[0].irs) if p_stats[0].irs else 0,
}
if total["ip"]:
total["era"] = (total["erun"] * 9) / total["ip"]
total["whip"] = (total["bb"] + total["hit"]) / total["ip"]
if total["win"] + total["loss"] > 0:
total["wl%"] = total["win"] / (total["win"] + total["loss"])
return total
class Standings(BaseModel):
team = ForeignKeyField(Team)
wins = IntegerField(default=0)
losses = IntegerField(default=0)
run_diff = IntegerField(default=0)
div_gb = FloatField(default=0.0, null=True)
div_e_num = IntegerField(default=0, null=True)
wc_gb = FloatField(default=99.0, null=True)
wc_e_num = IntegerField(default=99, null=True)
home_wins = IntegerField(default=0)
home_losses = IntegerField(default=0)
away_wins = IntegerField(default=0)
away_losses = IntegerField(default=0)
last8_wins = IntegerField(default=0)
last8_losses = IntegerField(default=0)
streak_wl = CharField(default="w")
streak_num = IntegerField(default=0)
one_run_wins = IntegerField(default=0)
one_run_losses = IntegerField(default=0)
pythag_wins = IntegerField(default=0)
pythag_losses = IntegerField(default=0)
div1_wins = IntegerField(default=0)
div1_losses = IntegerField(default=0)
div2_wins = IntegerField(default=0)
div2_losses = IntegerField(default=0)
div3_wins = IntegerField(default=0)
div3_losses = IntegerField(default=0)
div4_wins = IntegerField(default=0)
div4_losses = IntegerField(default=0)
@staticmethod
def select_season(season):
return Standings.select().join(Team).where(Standings.team.season == season)
@staticmethod
def get_season(team):
return Standings.get_or_none(Standings.team == team)
@staticmethod
def recalculate(season, full_wipe=True):
all_teams = Team.select_season(season).where(Team.division.is_null(False))
if full_wipe:
# Wipe existing data
s_teams = Team.select().where(Team.season == season)
Standings.delete().where(Standings.team << s_teams).execute()
# delete_lines = Standings.select_season(season)
# for line in delete_lines:
# line.delete_instance()
# Recreate current season Standings objects
create_teams = [Standings(team=team) for team in all_teams]
with db.atomic():
Standings.bulk_create(create_teams)
# Iterate through each individual result
# for game in Result.select_season(season).where(Result.week <= 22):
for game in (
StratGame.select()
.where(
(StratGame.season == season)
& (StratGame.week <= 18)
& (StratGame.game_num.is_null(False))
)
.order_by(StratGame.week, StratGame.game_num)
):
# tally win and loss for each standings object
game.update_standings()
# Set pythag record and iterate through last 8 games for last8 record
for team in all_teams:
team.run_pythag_last8()
# Pull each division at a time and sort by win pct
for division in Division.select().where(Division.season == season):
division.sort_division(season)
# Pull each league (filter by not null wc_gb) and sort by win pct
# # For one league:
Division.sort_wildcard(season, "SBa")
# For two leagues
# Division.sort_wildcard(season, 'AL')
# Division.sort_wildcard(season, 'NL')
class BattingCareer(BaseModel):
name = CharField()
pa = FloatField(default=0)
ab = FloatField(default=0)
run = FloatField(default=0)
hit = FloatField(default=0)
rbi = FloatField(default=0)
double = FloatField(default=0)
triple = FloatField(default=0)
hr = FloatField(default=0)
bb = FloatField(default=0)
so = FloatField(default=0)
hbp = FloatField(default=0)
sac = FloatField(default=0)
ibb = FloatField(default=0)
gidp = FloatField(default=0)
sb = FloatField(default=0)
cs = FloatField(default=0)
bphr = FloatField(default=0)
bpfo = FloatField(default=0)
bp1b = FloatField(default=0)
bplo = FloatField(default=0)
xba = FloatField(default=0)
xbt = FloatField(default=0)
game = FloatField(default=0)
@staticmethod
def recalculate():
# Wipe existing data
delete_lines = BattingCareer.select()
for line in delete_lines:
line.delete_instance()
# For each seasonstat, find career or create new and increment
for this_season in BattingSeason.select().where(
BattingSeason.season_type == "Regular"
):
this_career = BattingCareer.get_or_none(
BattingCareer.name == this_season.player.name
)
if not this_career:
this_career = BattingCareer(name=this_season.player.name)
this_career.save()
this_career.pa += this_season.pa
this_career.ab += this_season.ab
this_career.run += this_season.run
this_career.hit += this_season.hit
this_career.rbi += this_season.rbi
this_career.double += this_season.double
this_career.triple += this_season.triple
this_career.hr += this_season.hr
this_career.bb += this_season.bb
this_career.so += this_season.so
this_career.hbp += this_season.hbp
this_career.sac += this_season.sac
this_career.ibb += this_season.ibb
this_career.gidp += this_season.gidp
this_career.sb += this_season.sb
this_career.cs += this_season.cs
this_career.bphr += this_season.bphr
this_career.bpfo += this_season.bpfo
this_career.bp1b += this_season.bp1b
this_career.bplo += this_season.bplo
this_career.xba += this_season.xba
this_career.xbt += this_season.xbt
this_career.save()
class PitchingCareer(BaseModel):
name = CharField()
ip = FloatField(default=0)
hit = FloatField(default=0)
run = FloatField(default=0)
erun = FloatField(default=0)
so = FloatField(default=0)
bb = FloatField(default=0)
hbp = FloatField(default=0)
wp = FloatField(default=0)
balk = FloatField(default=0)
hr = FloatField(default=0)
ir = FloatField(default=0)
irs = FloatField(default=0)
gs = FloatField(default=0)
win = FloatField(default=0)
loss = FloatField(default=0)
hold = FloatField(default=0)
sv = FloatField(default=0)
bsv = FloatField(default=0)
game = FloatField(default=0)
@staticmethod
def recalculate():
# Wipe existing data
delete_lines = PitchingCareer.select()
for line in delete_lines:
line.delete_instance()
# For each seasonstat, find career or create new and increment
for this_season in PitchingSeason.select().where(
PitchingSeason.season_type == "Regular"
):
this_career = PitchingCareer.get_or_none(
PitchingCareer.name == this_season.player.name
)
if not this_career:
this_career = PitchingCareer(name=this_season.player.name)
this_career.save()
this_career.ip += this_season.ip
this_career.hit += this_season.hit
this_career.run += this_season.run
this_career.erun += this_season.erun
this_career.so += this_season.so
this_career.bb += this_season.bb
this_career.hbp += this_season.hbp
this_career.wp += this_season.wp
this_career.balk += this_season.balk
this_career.hr += this_season.hr
this_career.ir += this_season.ir
this_career.irs += this_season.irs
this_career.gs += this_season.gs
this_career.win += this_season.win
this_career.loss += this_season.loss
this_career.hold += this_season.hold
this_career.sv += this_season.sv
this_career.bsv += this_season.bsv
this_career.save()
class FieldingCareer(BaseModel):
name = CharField()
pos = CharField()
xch = IntegerField(default=0)
xhit = IntegerField(default=0)
error = IntegerField(default=0)
pb = IntegerField(default=0)
sbc = IntegerField(default=0)
csc = IntegerField(default=0)
roba = IntegerField(default=0)
robs = IntegerField(default=0)
raa = IntegerField(default=0)
rto = IntegerField(default=0)
game = IntegerField(default=0)
@staticmethod
def recalculate():
# Wipe existing data
delete_lines = FieldingCareer.select()
for line in delete_lines:
line.delete_instance()
# For each seasonstat, find career or create new and increment
for this_season in FieldingSeason.select().where(
FieldingSeason.season_type == "Regular"
):
this_career = FieldingCareer.get_or_none(
FieldingCareer.name == this_season.player.name,
FieldingCareer.pos == this_season.pos,
)
if not this_career:
this_career = FieldingCareer(
name=this_season.player.name, pos=this_season.pos
)
this_career.save()
this_career.xch += this_season.xch
this_career.xhit += this_season.xhit
this_career.error += this_season.error
this_career.pb += this_season.pb
this_career.sbc += this_season.sbc
this_career.csc += this_season.csc
this_career.roba += this_season.roba
this_career.robs += this_season.robs
this_career.raa += this_season.raa
this_career.rto += this_season.rto
this_career.save()
class BattingSeason(BaseModel):
player = ForeignKeyField(Player)
season = IntegerField()
season_type = CharField(default="Regular")
career = ForeignKeyField(BattingCareer, null=True)
pa = FloatField(default=0)
ab = FloatField(default=0)
run = FloatField(default=0)
hit = FloatField(default=0)
rbi = FloatField(default=0)
double = FloatField(default=0)
triple = FloatField(default=0)
hr = FloatField(default=0)
bb = FloatField(default=0)
so = FloatField(default=0)
hbp = FloatField(default=0)
sac = FloatField(default=0)
ibb = FloatField(default=0)
gidp = FloatField(default=0)
sb = FloatField(default=0)
cs = FloatField(default=0)
bphr = FloatField(default=0)
bpfo = FloatField(default=0)
bp1b = FloatField(default=0)
bplo = FloatField(default=0)
xba = FloatField(default=0)
xbt = FloatField(default=0)
game = FloatField(default=0)
@staticmethod
def select_season(season):
return BattingSeason.select().where(BattingSeason.season == season)
# @staticmethod
# def recalculate(season, manager_id):
# # Wipe existing data
# delete_lines = BattingSeason.select_season(season)
# for line in delete_lines:
# line.delete_instance()
#
# # For each battingstat, find season or create new and increment
# for line in BattingStat.select().where(
# (BattingStat.season == season) & (BattingStat.player.team.manager1 == manager_id)
# ):
# if line.season == 1:
# s_type = 'Regular' if line.week < 21 else 'Post'
# elif line.season == 2:
# s_type = 'Regular' if line.week < 19 else 'Post'
# else:
# s_type = 'Regular' if line.week < 23 else 'Post'
#
# this_season = BattingSeason.get_or_none(player=line.player, season_type=s_type)
# if not this_season:
# this_season = BattingSeason(player=line.player, season_type=s_type, season=line.season)
# this_season.save()
#
# this_season.pa += line.pa
# this_season.ab += line.ab
# this_season.run += line.run
# this_season.hit += line.hit
# this_season.rbi += line.rbi
# this_season.double += line.double
# this_season.triple += line.triple
# this_season.hr += line.hr
# this_season.bb += line.bb
# this_season.so += line.so
# this_season.hbp += line.hbp
# this_season.sac += line.sac
# this_season.ibb += line.ibb
# this_season.gidp += line.gidp
# this_season.sb += line.sb
# this_season.cs += line.cs
# this_season.save()
def recalculate(self):
self.pa = 0
self.ab = 0
self.run = 0
self.hit = 0
self.rbi = 0
self.double = 0
self.triple = 0
self.hr = 0
self.bb = 0
self.so = 0
self.hbp = 0
self.sac = 0
self.ibb = 0
self.gidp = 0
self.sb = 0
self.cs = 0
self.bphr = 0
self.bpfo = 0
self.bp1b = 0
self.bplo = 0
self.xba = 0
self.xbt = 0
self.game = 0
if self.season_type == "Regular":
all_stats = BattingStat.regular_season(self.season).where(
BattingStat.player == self.player
)
else:
all_stats = BattingStat.post_season(self.season).where(
BattingStat.player == self.player
)
for line in all_stats:
self.pa += line.pa
self.ab += line.ab
self.run += line.run
self.hit += line.hit
self.rbi += line.rbi
self.double += line.double
self.triple += line.triple
self.hr += line.hr
self.bb += line.bb
self.so += line.so
self.hbp += line.hbp
self.sac += line.sac
self.ibb += line.ibb
self.gidp += line.gidp
self.sb += line.sb
self.cs += line.cs
self.bphr += line.bphr
self.bpfo += line.bpfo
self.bp1b += line.bp1b
self.bplo += line.bplo
self.xba += line.xba
self.xbt += line.xbt
self.game += 1
self.save()
return all_stats.count()
class PitchingSeason(BaseModel):
player = ForeignKeyField(Player)
season = IntegerField()
season_type = CharField(default="Regular")
career = ForeignKeyField(PitchingCareer, null=True)
ip = FloatField(default=0)
hit = FloatField(default=0)
run = FloatField(default=0)
erun = FloatField(default=0)
so = FloatField(default=0)
bb = FloatField(default=0)
hbp = FloatField(default=0)
wp = FloatField(default=0)
balk = FloatField(default=0)
hr = FloatField(default=0)
ir = FloatField(default=0)
irs = FloatField(default=0)
gs = FloatField(default=0)
win = FloatField(default=0)
loss = FloatField(default=0)
hold = FloatField(default=0)
sv = FloatField(default=0)
bsv = FloatField(default=0)
game = FloatField(default=0)
@staticmethod
def select_season(season):
return PitchingSeason.select().where(PitchingSeason.season == season)
# @staticmethod
# def recalculate(season, manager_id):
# # Wipe existing data
# delete_lines = PitchingSeason.select_season(season)
# for line in delete_lines:
# line.delete_instance()
#
# # For each pitchingstat, find season or create new and increment
# for line in PitchingStat.select().where(
# (PitchingStat.season == season) & (PitchingStat.player.team.manager1 == manager_id)
# ):
# if line.season == 1:
# s_type = 'Regular' if line.week < 21 else 'Post'
# elif line.season == 2:
# s_type = 'Regular' if line.week < 19 else 'Post'
# else:
# s_type = 'Regular' if line.week < 23 else 'Post'
#
# this_season = PitchingSeason.get_or_none(player=line.player, season_type=s_type)
# if not this_season:
# this_season = PitchingSeason(player=line.player, season_type=s_type, season=line.season)
# this_season.save()
#
# this_season.ip += line.ip
# this_season.hit += line.hit
# this_season.run += line.run
# this_season.erun += line.erun
# this_season.so += line.so
# this_season.bb += line.bb
# this_season.hbp += line.hbp
# this_season.wp += line.wp
# this_season.balk += line.balk
# this_season.hr += line.hr
# this_season.gs += line.gs
# this_season.win += line.win
# this_season.loss += line.loss
# this_season.hold += line.hold
# this_season.sv += line.sv
# this_season.bsv += line.bsv
# this_season.game += 1
# this_season.save()
def recalculate(self):
self.ip = 0
self.hit = 0
self.run = 0
self.erun = 0
self.so = 0
self.bb = 0
self.hbp = 0
self.wp = 0
self.balk = 0
self.hr = 0
self.ir = 0
self.irs = 0
self.gs = 0
self.win = 0
self.loss = 0
self.hold = 0
self.sv = 0
self.bsv = 0
self.game = 0
if self.season_type == "Regular":
all_stats = PitchingStat.regular_season(self.season).where(
PitchingStat.player == self.player
)
else:
all_stats = PitchingStat.post_season(self.season).where(
PitchingStat.player == self.player
)
for line in all_stats:
self.ip += line.ip
self.hit += line.hit
self.run += line.run
self.erun += line.erun
self.so += line.so
self.bb += line.bb
self.hbp += line.hbp
self.wp += line.wp
self.balk += line.balk
self.hr += line.hr
self.ir += line.ir
self.irs += line.irs
self.gs += line.gs
self.win += line.win
self.loss += line.loss
self.hold += line.hold
self.sv += line.sv
self.bsv += line.bsv
self.game += 1
self.save()
return all_stats.count()
class FieldingSeason(BaseModel):
player = ForeignKeyField(Player)
season = IntegerField()
season_type = CharField(default="Regular")
pos = CharField()
career = ForeignKeyField(FieldingCareer, null=True)
xch = IntegerField(default=0)
xhit = IntegerField(default=0)
error = IntegerField(default=0)
pb = IntegerField(default=0)
sbc = IntegerField(default=0)
csc = IntegerField(default=0)
roba = IntegerField(default=0)
robs = IntegerField(default=0)
raa = IntegerField(default=0)
rto = IntegerField(default=0)
game = IntegerField(default=0)
@staticmethod
def select_season(season):
return FieldingSeason.select().where(FieldingSeason.season == season)
# @staticmethod
# def recalculate(season, manager_id):
# # Wipe existing data
# delete_lines = FieldingSeason.select()
# for line in delete_lines:
# line.delete_instance()
#
# # players = Player.select_season(season).where(Player.team)
#
# # For each battingstat, find season or create new and increment
# for line in BattingStat.select().join(Player).join(Team).where(
# (BattingStat.season == season) & (BattingStat.player.team.manager1 == manager_id)
# ):
# if line.season == 1:
# s_type = 'Regular' if line.week < 21 else 'Post'
# elif line.season == 2:
# s_type = 'Regular' if line.week < 19 else 'Post'
# else:
# s_type = 'Regular' if line.week < 23 else 'Post'
#
# this_season = BattingSeason.get_or_none(player=line.player, season_type=s_type, pos=line.pos)
# if not this_season:
# this_season = BattingSeason(player=line.player, season_type=s_type, pos=line.pos, season=line.season)
# this_season.save()
#
# this_season.xch += line.xch
# this_season.xhit += line.xhit
# this_season.error += line.error
# this_season.pb += line.pb
# this_season.sbc += line.sbc
# this_season.csc += line.csc
# this_season.game += 1
# this_season.save()
def recalculate(self):
self.xch = 0
self.xhit = 0
self.error = 0
self.pb = 0
self.sbc = 0
self.csc = 0
self.roba = 0
self.robs = 0
self.raa = 0
self.rto = 0
self.game = 0
if self.season_type == "Regular":
all_stats = BattingStat.regular_season(self.season).where(
(BattingStat.player == self.player) & (BattingStat.pos == self.pos)
)
else:
all_stats = BattingStat.post_season(self.season).where(
(BattingStat.player == self.player) & (BattingStat.pos == self.pos)
)
for line in all_stats:
self.xch += line.xch
self.xhit += line.xhit
self.error += line.error
self.pb += line.pb
self.sbc += line.sbc
self.csc += line.csc
self.roba += line.roba
self.robs += line.robs
self.raa += line.raa
self.rto += line.rto
self.game += 1
self.save()
return all_stats.count()
class DraftPick(BaseModel):
overall = IntegerField(null=True)
round = IntegerField()
origowner = ForeignKeyField(Team)
owner = ForeignKeyField(Team)
season = IntegerField()
player = ForeignKeyField(Player, null=True)
@staticmethod
def select_season(num):
return DraftPick.select().where(DraftPick.season == num)
@staticmethod
def get_season(team, rd, num):
return DraftPick.get(
DraftPick.season == num, DraftPick.origowner == team, DraftPick.round == rd
)
class DraftData(BaseModel):
currentpick = IntegerField()
timer = BooleanField()
pick_deadline = DateTimeField(null=True)
result_channel = CharField(max_length=20, null=True) # Discord channel ID as string
ping_channel = CharField(max_length=20, null=True) # Discord channel ID as string
pick_minutes = IntegerField(null=True)
class Award(BaseModel):
name = CharField()
season = IntegerField()
timing = CharField(default="In-Season")
manager1 = ForeignKeyField(Manager, null=True)
manager2 = ForeignKeyField(Manager, null=True)
player = ForeignKeyField(Player, null=True)
team = ForeignKeyField(Team, null=True)
image = CharField(null=True)
class DiceRoll(BaseModel):
season = IntegerField(default=12) # Will be updated to current season when needed
week = IntegerField(default=1) # Will be updated to current week when needed
team = ForeignKeyField(Team, null=True)
roller = CharField(max_length=20)
dsix = IntegerField(null=True)
twodsix = IntegerField(null=True)
threedsix = IntegerField(null=True)
dtwenty = IntegerField(null=True)
class DraftList(BaseModel):
season = IntegerField()
team = ForeignKeyField(Team)
rank = IntegerField()
player = ForeignKeyField(Player)
class Keeper(BaseModel):
season = IntegerField()
team = ForeignKeyField(Team)
player = ForeignKeyField(Player)
class Injury(BaseModel):
season = IntegerField()
player = ForeignKeyField(Player)
total_games = IntegerField()
start_week = IntegerField()
start_game = IntegerField()
end_week = IntegerField()
end_game = IntegerField()
is_active = BooleanField(default=True)
class StratGame(BaseModel):
season = IntegerField()
week = IntegerField()
game_num = IntegerField(null=True)
season_type = CharField(default="regular")
away_team = ForeignKeyField(Team)
home_team = ForeignKeyField(Team)
away_score = IntegerField(null=True)
home_score = IntegerField(null=True)
away_manager = ForeignKeyField(Manager, null=True)
home_manager = ForeignKeyField(Manager, null=True)
scorecard_url = CharField(null=True)
def update_standings(self):
away_stan = Standings.get_season(self.away_team)
home_stan = Standings.get_season(self.home_team)
away_div = Division.get_by_id(self.away_team.division.id)
home_div = Division.get_by_id(self.home_team.division.id)
# Home Team Won
if self.home_score > self.away_score:
# - generic w/l & home/away w/l
home_stan.wins += 1
home_stan.home_wins += 1
away_stan.losses += 1
away_stan.away_losses += 1
# - update streak wl and num
if home_stan.streak_wl == "w":
home_stan.streak_num += 1
else:
home_stan.streak_wl = "w"
home_stan.streak_num = 1
if away_stan.streak_wl == "l":
away_stan.streak_num += 1
else:
away_stan.streak_wl = "l"
away_stan.streak_num = 1
# - if 1-run, tally accordingly
if self.home_score == self.away_score + 1:
home_stan.one_run_wins += 1
away_stan.one_run_losses += 1
# Used for one league with 3 divisions
# - update record v division
# if away_div.division_abbrev == 'BE':
# home_stan.div1_wins += 1
# elif away_div.division_abbrev == 'DO':
# home_stan.div2_wins += 1
# else:
# home_stan.div3_wins += 1
#
# if home_div.division_abbrev == 'BE':
# away_stan.div1_losses += 1
# elif home_div.division_abbrev == 'DO':
# away_stan.div2_losses += 1
# else:
# away_stan.div3_losses += 1
# Used for one league with 4 divisions
# - update record v division (check opponent's division)
if away_div.division_abbrev == "TC":
home_stan.div1_wins += 1
elif away_div.division_abbrev == "ETSOS":
home_stan.div2_wins += 1
elif away_div.division_abbrev == "APL":
home_stan.div3_wins += 1
elif away_div.division_abbrev == "BBC":
home_stan.div4_wins += 1
if home_div.division_abbrev == "TC":
away_stan.div1_losses += 1
elif home_div.division_abbrev == "ETSOS":
away_stan.div2_losses += 1
elif home_div.division_abbrev == "APL":
away_stan.div3_losses += 1
elif home_div.division_abbrev == "BBC":
away_stan.div4_losses += 1
# Used for two league plus divisions
# if away_div.league_abbrev == 'AL':
# if away_div.division_abbrev == 'E':
# home_stan.div1_wins += 1
# else:
# home_stan.div2_wins += 1
# else:
# if away_div.division_abbrev == 'E':
# home_stan.div3_wins += 1
# else:
# home_stan.div4_wins += 1
#
# if home_div.league_abbrev == 'AL':
# if home_div.division_abbrev == 'E':
# away_stan.div1_losses += 1
# else:
# away_stan.div2_losses += 1
# else:
# if home_div.division_abbrev == 'E':
# away_stan.div3_losses += 1
# else:
# away_stan.div4_losses += 1
# - adjust run_diff
home_stan.run_diff += self.home_score - self.away_score
away_stan.run_diff -= self.home_score - self.away_score
# Away Team Won
else:
# - generic w/l & home/away w/l
home_stan.losses += 1
home_stan.home_losses += 1
away_stan.wins += 1
away_stan.away_wins += 1
# - update streak wl and num
if home_stan.streak_wl == "l":
home_stan.streak_num += 1
else:
home_stan.streak_wl = "l"
home_stan.streak_num = 1
if away_stan.streak_wl == "w":
away_stan.streak_num += 1
else:
away_stan.streak_wl = "w"
away_stan.streak_num = 1
# - if 1-run, tally accordingly
if self.away_score == self.home_score + 1:
home_stan.one_run_losses += 1
away_stan.one_run_wins += 1
# Used for one league with 3 divisions
# - update record v division
# if away_div.division_abbrev == 'BE':
# home_stan.div1_losses += 1
# elif away_div.division_abbrev == 'DO':
# home_stan.div2_losses += 1
# else:
# home_stan.div3_losses += 1
#
# if home_div.division_abbrev == 'BE':
# away_stan.div1_wins += 1
# elif home_div.division_abbrev == 'DO':
# away_stan.div2_wins += 1
# else:
# away_stan.div3_wins += 1
# Used for one league with 4 divisions
# - update record v division (check opponent's division)
if away_div.division_abbrev == "TC":
home_stan.div1_losses += 1
elif away_div.division_abbrev == "ETSOS":
home_stan.div2_losses += 1
elif away_div.division_abbrev == "APL":
home_stan.div3_losses += 1
elif away_div.division_abbrev == "BBC":
home_stan.div4_losses += 1
if home_div.division_abbrev == "TC":
away_stan.div1_wins += 1
elif home_div.division_abbrev == "ETSOS":
away_stan.div2_wins += 1
elif home_div.division_abbrev == "APL":
away_stan.div3_wins += 1
elif home_div.division_abbrev == "BBC":
away_stan.div4_wins += 1
# Used for two league plus divisions
# if away_div.league_abbrev == 'AL':
# if away_div.division_abbrev == 'E':
# home_stan.div1_losses += 1
# else:
# home_stan.div2_losses += 1
# else:
# if away_div.division_abbrev == 'E':
# home_stan.div3_losses += 1
# else:
# home_stan.div4_losses += 1
#
# if home_div.league_abbrev == 'AL':
# if home_div.division_abbrev == 'E':
# away_stan.div1_wins += 1
# else:
# away_stan.div2_wins += 1
# else:
# if home_div.division_abbrev == 'E':
# away_stan.div3_wins += 1
# else:
# away_stan.div4_wins += 1
# - adjust run_diff
home_stan.run_diff -= self.away_score - self.home_score
away_stan.run_diff += self.away_score - self.home_score
home_stan.save()
away_stan.save()
class StratPlay(BaseModel):
game = ForeignKeyField(StratGame)
play_num = IntegerField()
batter = ForeignKeyField(Player, null=True)
batter_team = ForeignKeyField(Team, null=True)
pitcher = ForeignKeyField(Player)
pitcher_team = ForeignKeyField(Team)
on_base_code = CharField()
inning_half = CharField()
inning_num = IntegerField()
batting_order = IntegerField()
starting_outs = IntegerField()
away_score = IntegerField()
home_score = IntegerField()
batter_pos = CharField(null=True)
# These <base>_final fields track the base this runner advances to post-play (None) if out
on_first = ForeignKeyField(Player, null=True)
on_first_final = IntegerField(null=True)
on_second = ForeignKeyField(Player, null=True)
on_second_final = IntegerField(null=True)
on_third = ForeignKeyField(Player, null=True)
on_third_final = IntegerField(null=True)
batter_final = IntegerField(null=True)
pa = IntegerField(default=0)
ab = IntegerField(default=0)
e_run = IntegerField(default=0)
run = IntegerField(default=0)
hit = IntegerField(default=0)
rbi = IntegerField(default=0)
double = IntegerField(default=0)
triple = IntegerField(default=0)
homerun = IntegerField(default=0)
bb = IntegerField(default=0)
so = IntegerField(default=0)
hbp = IntegerField(default=0)
sac = IntegerField(default=0)
ibb = IntegerField(default=0)
gidp = IntegerField(default=0)
bphr = IntegerField(default=0)
bpfo = IntegerField(default=0)
bp1b = IntegerField(default=0)
bplo = IntegerField(default=0)
sb = IntegerField(default=0)
cs = IntegerField(default=0)
outs = IntegerField(default=0)
wpa = FloatField(default=0)
# These <position> fields are only required if the play is an x-check or baserunning play
catcher = ForeignKeyField(Player, null=True)
catcher_team = ForeignKeyField(Team, null=True)
defender = ForeignKeyField(Player, null=True)
defender_team = ForeignKeyField(Team, null=True)
runner = ForeignKeyField(Player, null=True)
runner_team = ForeignKeyField(Team, null=True)
check_pos = CharField(null=True)
error = IntegerField(default=0)
wild_pitch = IntegerField(default=0)
passed_ball = IntegerField(default=0)
pick_off = IntegerField(default=0)
balk = IntegerField(default=0)
is_go_ahead = BooleanField(default=False)
is_tied = BooleanField(default=False)
is_new_inning = BooleanField(default=False)
# Season 9 addons
hand_batting = CharField(null=True)
hand_pitching = CharField(null=True)
re24_primary = FloatField(null=True)
re24_running = FloatField(null=True)
class Decision(BaseModel):
game = ForeignKeyField(StratGame)
season = IntegerField()
week = IntegerField()
game_num = IntegerField()
pitcher = ForeignKeyField(Player)
team = ForeignKeyField(Team, null=True)
win = IntegerField()
loss = IntegerField()
hold = IntegerField()
is_save = IntegerField()
b_save = IntegerField()
irunners = IntegerField()
irunners_scored = IntegerField()
rest_ip = FloatField()
rest_required = IntegerField()
is_start = BooleanField(default=False)
class CustomCommandCreator(BaseModel):
"""Model for custom command creators."""
discord_id = CharField(max_length=20, unique=True) # Discord snowflake ID as string
username = CharField(max_length=32)
display_name = CharField(max_length=32, null=True)
created_at = DateTimeField()
total_commands = IntegerField(default=0)
active_commands = IntegerField(default=0)
class Meta:
table_name = "custom_command_creators"
class CustomCommand(BaseModel):
"""Model for custom commands created by users."""
name = CharField(max_length=32, unique=True)
content = TextField()
creator = ForeignKeyField(CustomCommandCreator, backref="commands")
# Timestamps
created_at = DateTimeField()
updated_at = DateTimeField(null=True)
last_used = DateTimeField(null=True)
# Usage tracking
use_count = IntegerField(default=0)
warning_sent = BooleanField(default=False)
# Metadata
is_active = BooleanField(default=True)
tags = TextField(null=True) # JSON string for tags list
class Meta:
table_name = "custom_commands"
@staticmethod
def get_by_name(name: str):
"""Get a custom command by name (case-insensitive)."""
return CustomCommand.get_or_none(fn.Lower(CustomCommand.name) == name.lower())
@staticmethod
def search_by_name(partial_name: str, limit: int = 25):
"""Search commands by partial name match."""
return (
CustomCommand.select()
.where(
(CustomCommand.is_active == True)
& (fn.Lower(CustomCommand.name).contains(partial_name.lower()))
)
.order_by(CustomCommand.name)
.limit(limit)
)
@staticmethod
def get_popular(limit: int = 10):
"""Get most popular commands by usage."""
return (
CustomCommand.select()
.where(CustomCommand.is_active == True)
.order_by(CustomCommand.use_count.desc())
.limit(limit)
)
@staticmethod
def get_by_creator(creator_id: int, limit: int = 25, offset: int = 0):
"""Get commands by creator ID."""
return (
CustomCommand.select()
.where(
(CustomCommand.creator == creator_id)
& (CustomCommand.is_active == True)
)
.order_by(CustomCommand.name)
.limit(limit)
.offset(offset)
)
@staticmethod
def get_unused_commands(days: int = 60):
"""Get commands that haven't been used in specified days."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
return CustomCommand.select().where(
(CustomCommand.is_active == True)
& (
(CustomCommand.last_used.is_null())
| (CustomCommand.last_used < cutoff_date)
)
)
@staticmethod
def get_commands_needing_warning():
"""Get commands that need deletion warning (60+ days unused, no warning sent)."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=60)
return CustomCommand.select().where(
(CustomCommand.is_active == True)
& (CustomCommand.warning_sent == False)
& (
(CustomCommand.last_used.is_null())
| (CustomCommand.last_used < cutoff_date)
)
)
@staticmethod
def get_commands_eligible_for_deletion():
"""Get commands eligible for deletion (90+ days unused)."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=90)
return CustomCommand.select().where(
(CustomCommand.is_active == True)
& (
(CustomCommand.last_used.is_null())
| (CustomCommand.last_used < cutoff_date)
)
)
def execute(self):
"""Execute the command and update usage statistics."""
from datetime import datetime
self.last_used = datetime.now()
self.use_count += 1
self.warning_sent = False # Reset warning on use
self.save()
def mark_warning_sent(self):
"""Mark that a deletion warning has been sent."""
self.warning_sent = True
self.save()
def get_tags_list(self):
"""Parse tags JSON string into a list."""
if not self.tags:
return []
try:
import json
return json.loads(self.tags)
except Exception:
return []
def set_tags_list(self, tags_list):
"""Set tags from a list to JSON string."""
if tags_list:
import json
self.tags = json.dumps(tags_list)
else:
self.tags = None
class HelpCommand(BaseModel):
"""Model for admin-created help topics."""
name = CharField(max_length=32, unique=True)
title = CharField(max_length=200)
content = TextField()
category = CharField(max_length=50, null=True)
# Audit fields
created_by_discord_id = CharField(max_length=20) # Discord snowflake ID as string
created_at = DateTimeField()
updated_at = DateTimeField(null=True)
last_modified_by = CharField(
max_length=20, null=True
) # Discord snowflake ID as string
# Status and metrics
is_active = BooleanField(default=True)
view_count = IntegerField(default=0)
display_order = IntegerField(default=0)
class Meta:
table_name = "help_commands"
@staticmethod
def get_by_name(name: str, include_inactive: bool = False):
"""Get a help command by name (case-insensitive)."""
query = HelpCommand.select().where(fn.Lower(HelpCommand.name) == name.lower())
if not include_inactive:
query = query.where(HelpCommand.is_active == True)
return query.first()
@staticmethod
def search_by_name(partial_name: str, limit: int = 25):
"""Search help topics by partial name match."""
return (
HelpCommand.select()
.where(
(HelpCommand.is_active == True)
& (fn.Lower(HelpCommand.name).contains(partial_name.lower()))
)
.order_by(HelpCommand.display_order, HelpCommand.name)
.limit(limit)
)
@staticmethod
def get_by_category(category: str, include_inactive: bool = False):
"""Get help commands by category."""
query = HelpCommand.select().where(
fn.Lower(HelpCommand.category) == category.lower()
)
if not include_inactive:
query = query.where(HelpCommand.is_active == True)
return query.order_by(HelpCommand.display_order, HelpCommand.name)
@staticmethod
def get_all_active():
"""Get all active help topics."""
return (
HelpCommand.select()
.where(HelpCommand.is_active == True)
.order_by(HelpCommand.display_order, HelpCommand.name)
)
@staticmethod
def get_most_viewed(limit: int = 10):
"""Get most viewed help topics."""
return (
HelpCommand.select()
.where(HelpCommand.is_active == True)
.order_by(HelpCommand.view_count.desc())
.limit(limit)
)
def increment_view_count(self):
"""Increment view count for this help topic."""
self.view_count += 1
self.save()
def soft_delete(self):
"""Soft delete this help topic."""
self.is_active = False
self.save()
def restore(self):
"""Restore this soft-deleted help topic."""
self.is_active = True
self.save()
class SeasonBattingStatsView(BaseModel):
name = CharField()
player_id = IntegerField()
sbaplayer_id = IntegerField()
season = IntegerField()
player_team_id = CharField()
pa = IntegerField()
ab = IntegerField()
run = IntegerField()
hit = IntegerField()
double = IntegerField()
triple = IntegerField()
hr = IntegerField()
rbi = IntegerField()
sb = IntegerField()
cs = IntegerField()
bb = IntegerField()
so = IntegerField()
avg = FloatField()
obp = FloatField()
slg = FloatField()
ops = FloatField()
woba = FloatField()
k_pct = FloatField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
gidp = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
class Meta:
table_name = "season_batting_stats_view"
primary_key = False
@staticmethod
def get_by_season(season):
return SeasonBattingStatsView.select().where(
SeasonBattingStatsView.season == season
)
@staticmethod
def get_team_stats(season, team_id):
return SeasonBattingStatsView.select().where(
SeasonBattingStatsView.season == season,
SeasonBattingStatsView.player_team_id == team_id,
)
@staticmethod
def get_top_hitters(season, stat="avg", limit=10, desc=True):
"""Get top hitters by specified stat (avg, hr, rbi, ops, etc.)"""
stat_field = getattr(SeasonBattingStatsView, stat, SeasonBattingStatsView.avg)
order_field = stat_field.desc() if desc else stat_field.asc()
return (
SeasonBattingStatsView.select()
.where(SeasonBattingStatsView.season == season)
.order_by(order_field)
.limit(limit)
)
class SeasonPitchingStats(BaseModel):
player = ForeignKeyField(Player)
sbaplayer = ForeignKeyField(SbaPlayer, null=True)
team = ForeignKeyField(Team)
season = IntegerField()
name = CharField()
player_team_id = IntegerField()
player_team_abbrev = CharField()
# Counting stats
tbf = IntegerField()
outs = IntegerField()
games = IntegerField()
# Decision Info
gs = IntegerField()
win = IntegerField()
loss = IntegerField()
hold = IntegerField()
saves = IntegerField()
bsave = IntegerField()
ir = IntegerField()
irs = IntegerField()
# Counting stats part 2
ab = IntegerField()
run = IntegerField()
e_run = IntegerField()
hits = IntegerField()
double = IntegerField()
triple = IntegerField()
homerun = IntegerField()
bb = IntegerField()
so = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
gidp = IntegerField()
sb = IntegerField()
cs = IntegerField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
wp = IntegerField()
balk = IntegerField()
# Calculated stats
wpa = FloatField()
era = FloatField()
whip = FloatField()
avg = FloatField()
obp = FloatField()
slg = FloatField()
ops = FloatField()
woba = FloatField()
hper9 = FloatField()
kper9 = FloatField()
bbper9 = FloatField()
kperbb = FloatField()
lob_2outs = FloatField()
rbipercent = FloatField()
re24 = FloatField()
class Meta:
table_name = "seasonpitchingstats"
primary_key = CompositeKey("player", "season")
@staticmethod
def get_team_stats(season, team_id):
return SeasonPitchingStats.select().where(
SeasonPitchingStats.season == season,
SeasonPitchingStats.player_team_id == team_id,
)
@staticmethod
def get_top_pitchers(
season: Optional[int] = None,
stat: str = "era",
limit: Optional[int] = 200,
desc: bool = False,
team_id: Optional[int] = None,
player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None,
min_outs: Optional[int] = None,
offset: int = 0,
):
"""
Get top pitchers by specified stat with optional filtering.
Args:
season: Season to filter by (None for all seasons)
stat: Stat field to sort by (default: era)
limit: Maximum number of results (None for no limit)
desc: Sort descending if True, ascending if False (default False for ERA)
team_id: Filter by team ID
player_id: Filter by specific player ID
sbaplayer_id: Filter by SBA player ID
min_outs: Minimum outs pitched filter
offset: Number of results to skip for pagination
"""
stat_field = getattr(SeasonPitchingStats, stat, SeasonPitchingStats.era)
order_field = stat_field.desc() if desc else stat_field.asc()
query = SeasonPitchingStats.select().order_by(order_field)
# Apply filters
if season is not None:
query = query.where(SeasonPitchingStats.season == season)
if team_id is not None:
query = query.where(SeasonPitchingStats.player_team_id == team_id)
if player_id is not None:
query = query.where(SeasonPitchingStats.player_id == player_id)
if sbaplayer_id is not None:
query = query.where(SeasonPitchingStats.sbaplayer_id == sbaplayer_id)
if min_outs is not None:
query = query.where(SeasonPitchingStats.outs >= min_outs)
# Apply pagination
if offset > 0:
query = query.offset(offset)
if limit is not None and limit > 0:
query = query.limit(limit)
return query
class SeasonBattingStats(BaseModel):
player = ForeignKeyField(Player)
sbaplayer = ForeignKeyField(SbaPlayer, null=True)
team = ForeignKeyField(Team)
season = IntegerField()
name = CharField()
player_team_id = IntegerField()
player_team_abbrev = CharField()
# Counting stats
pa = IntegerField()
ab = IntegerField()
run = IntegerField()
hit = IntegerField()
double = IntegerField()
triple = IntegerField()
homerun = IntegerField()
rbi = IntegerField()
bb = IntegerField()
so = IntegerField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
gidp = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
# Calculating stats
avg = FloatField()
obp = FloatField()
slg = FloatField()
ops = FloatField()
woba = FloatField()
k_pct = FloatField()
# Running stats
sb = IntegerField()
cs = IntegerField()
class Meta:
table_name = "seasonbattingstats"
primary_key = CompositeKey("player", "season")
@staticmethod
def get_team_stats(season, team_id):
return SeasonBattingStats.select().where(
SeasonBattingStats.season == season,
SeasonBattingStats.player_team_id == team_id,
)
@staticmethod
def get_top_hitters(
season: Optional[int] = None,
stat: str = "woba",
limit: Optional[int] = 200,
desc: bool = True,
team_id: Optional[int] = None,
player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None,
min_pa: Optional[int] = None,
offset: int = 0,
):
"""
Get top hitters by specified stat with optional filtering.
Args:
season: Season to filter by (None for all seasons)
stat: Stat field to sort by (default: woba)
limit: Maximum number of results (None for no limit)
desc: Sort descending if True, ascending if False
team_id: Filter by team ID
player_id: Filter by specific player ID
min_pa: Minimum plate appearances filter
offset: Number of results to skip for pagination
"""
stat_field = getattr(SeasonBattingStats, stat, SeasonBattingStats.woba)
order_field = stat_field.desc() if desc else stat_field.asc()
query = SeasonBattingStats.select().order_by(order_field)
# Apply filters
if season is not None:
query = query.where(SeasonBattingStats.season == season)
if team_id is not None:
query = query.where(SeasonBattingStats.player_team_id == team_id)
if player_id is not None:
query = query.where(SeasonBattingStats.player_id == player_id)
if sbaplayer_id is not None:
query = query.where(SeasonBattingStats.sbaplayer_id == sbaplayer_id)
if min_pa is not None:
query = query.where(SeasonBattingStats.pa >= min_pa)
# Apply pagination
if offset > 0:
query = query.offset(offset)
if limit is not None and limit > 0:
query = query.limit(limit)
return query
# class Streak(BaseModel):
# player = ForeignKeyField(Player)
# streak_type = CharField()
# start_season = IntegerField()
# start_week = IntegerField()
# start_game = IntegerField()
# end_season = IntegerField()
# end_week = IntegerField()
# end_game = IntegerField()
# game_length = IntegerField()
# active = BooleanField()
#
# def recalculate(self):
# # Pitcher streaks
# if self.streak_type in ['win', 'loss', 'save', 'scoreless']:
# all_stats = PitchingStat.select_season(self.start_season).where(
# (PitchingStat.player == self.player) & (PitchingStat.week >= self.start_week)
# )
# sorted_stats = sorted(all_stats, key=lambda x: f'{x.season:0>2}-{x.week:0>2}-{x.game:}')
#
# for line in sorted_stats:
# Table creation moved to migration scripts to avoid dependency issues
# db.create_tables([
# Current, Division, Manager, Team, Result, Player, Schedule, Transaction, BattingStat, PitchingStat, Standings,
# BattingCareer, PitchingCareer, FieldingCareer, Manager, Award, DiceRoll, DraftList, Keeper, StratGame, StratPlay,
# Injury, Decision, CustomCommandCreator, CustomCommand
# ])
# db.close()