paper-dynasty-database/db_engine.py
Cal Corum 985a6ed2b0 Add default ORDER BY id to BaseModel.select() for PostgreSQL compatibility
PostgreSQL does not guarantee row order without ORDER BY, unlike SQLite
which implicitly returned rows by rowid. This caused bugs where queries
returned results in unexpected order (e.g., get_team_by_owner returning
gauntlet team instead of main team).

Override select() in BaseModel to add default ordering by id. Explicit
.order_by() calls will override this default.

Also mark legacy db_engine.py as deprecated.
2026-01-31 16:06:44 -06:00

899 lines
26 KiB
Python

import math
"""
DEPRECATED: This file is a legacy implementation from before the deployment
of /database/app/. The active codebase is now in /database/app/db_engine.py.
This file is kept for reference only and should not be used.
"""
from datetime import datetime
from typing import List
import logging
import os
from pandas import DataFrame
from peewee import *
from peewee import ModelSelect
from playhouse.shortcuts import model_to_dict
db = SqliteDatabase(
"storage/pd_master.db",
pragmas={"journal_mode": "wal", "cache_size": -1 * 64000, "synchronous": 0},
)
date = f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}"
log_level = logging.INFO if os.environ.get("LOG_LEVEL") == "INFO" else "WARN"
logging.basicConfig(
filename=f"logs/database/{date}.log",
format="%(asctime)s - database - %(levelname)s - %(message)s",
level=log_level,
)
def model_csv_headers(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.keys()]
def model_to_csv(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.values()]
def query_to_csv(all_items: ModelSelect, exclude=None):
if all_items.count() == 0:
data_list = [["No data found"]]
else:
data_list = [model_csv_headers(all_items[0], exclude=exclude)]
for x in all_items:
data_list.append(model_to_csv(x, exclude=exclude))
return DataFrame(data_list).to_csv(header=False, index=False)
def complex_data_to_csv(complex_data: List):
if len(complex_data) == 0:
data_list = [["No data found"]]
else:
data_list = [[x for x in complex_data[0].keys()]]
for line in complex_data:
logging.debug(f"line: {line}")
this_row = []
for key in line:
logging.debug(f"key: {key}")
if line[key] is None:
this_row.append("")
elif isinstance(line[key], dict):
if "name" in line[key]:
this_row.append(line[key]["name"])
elif "abbrev" in line[key]:
this_row.append(line[key]["abbrev"])
else:
this_row.append(line[key]["id"])
elif isinstance(line[key], int) and line[key] > 100000000:
this_row.append(f"'{line[key]}")
elif isinstance(line[key], str) and "," in line[key]:
this_row.append(line[key].replace(",", "-_-"))
else:
this_row.append(line[key])
data_list.append(this_row)
return DataFrame(data_list).to_csv(header=False, index=False)
class BaseModel(Model):
class Meta:
database = db
class Current(BaseModel):
season = IntegerField()
week = IntegerField(default=0)
gsheet_template = CharField()
gsheet_version = CharField()
live_scoreboard = IntegerField()
@staticmethod
def latest():
latest_current = Current.select().order_by(-Current.id).get()
return latest_current
db.create_tables([Current])
class Rarity(BaseModel):
value = IntegerField()
name = CharField(unique=True)
color = CharField()
def __str__(self):
return self.name
db.create_tables([Rarity])
class Event(BaseModel):
name = CharField()
short_desc = CharField(null=True)
url = CharField(null=True)
long_desc = CharField(null=True)
thumbnail = CharField(null=True)
active = BooleanField(default=False)
db.create_tables([Event])
class Cardset(BaseModel):
name = CharField()
description = CharField()
event = ForeignKeyField(Event, null=True)
for_purchase = BooleanField(default=True) # for_purchase
total_cards = IntegerField()
in_packs = BooleanField(default=True)
ranked_legal = BooleanField(default=True)
def __str__(self):
return self.name
db.create_tables([Cardset])
class MlbPlayer(BaseModel):
first_name = CharField()
last_name = CharField()
key_fangraphs = IntegerField(null=True)
key_bbref = CharField(null=True)
key_retro = CharField(null=True)
key_mlbam = IntegerField(null=True)
offense_col = IntegerField(default=1)
db.create_tables([MlbPlayer])
class Player(BaseModel):
player_id = IntegerField(primary_key=True)
p_name = CharField()
cost = IntegerField(default=0)
image = CharField()
image2 = CharField(null=True)
mlbclub = CharField()
franchise = CharField()
cardset = ForeignKeyField(Cardset)
set_num = IntegerField()
rarity = ForeignKeyField(Rarity)
pos_1 = CharField()
pos_2 = CharField(null=True)
pos_3 = CharField(null=True)
pos_4 = CharField(null=True)
pos_5 = CharField(null=True)
pos_6 = CharField(null=True)
pos_7 = CharField(null=True)
pos_8 = CharField(null=True)
headshot = CharField(null=True)
vanity_card = CharField(null=True)
strat_code = CharField(null=True)
bbref_id = CharField(null=True)
fangr_id = CharField(null=True)
description = CharField()
quantity = IntegerField(default=999)
mlbplayer = ForeignKeyField(MlbPlayer, null=True)
def __str__(self):
return f"{self.cardset} {self.p_name} ({self.rarity.name})"
# def __eq__(self, other):
# if self.cardset.id == other.cardset.id and self.name == other.name:
# return True
# else:
# return False
def __lt__(self, other):
if self.wara < other.wara:
return True
elif self.wara > other.wara:
return False
elif self.name < other.name:
return True
else:
return False
def get_all_pos(self):
all_pos = []
if self.pos_1 and self.pos_1 != "CP":
all_pos.append(self.pos_1)
if self.pos_2 and self.pos_2 != "CP":
all_pos.append(self.pos_2)
if self.pos_3 and self.pos_3 != "CP":
all_pos.append(self.pos_3)
if self.pos_4 and self.pos_4 != "CP":
all_pos.append(self.pos_4)
if self.pos_5 and self.pos_5 != "CP":
all_pos.append(self.pos_5)
if self.pos_6 and self.pos_6 != "CP":
all_pos.append(self.pos_6)
if self.pos_7 and self.pos_7 != "CP":
all_pos.append(self.pos_7)
if self.pos_8 and self.pos_8 != "CP":
all_pos.append(self.pos_8)
return all_pos
def change_on_sell(self):
# caps = {
# 'replacement': 15,
# 'reserve': 50,
# 'starter': 200,
# 'all-star': 750,
# 'mvp': 2500,
# 'hof': 999999999
# }
logging.info(f"{self.p_name} cost changing from: {self.cost}")
self.cost = max(math.floor(self.cost * 0.95), 1)
# if self.quantity != 999:
# self.quantity += 1
logging.info(f"{self.p_name} cost now: {self.cost}")
self.save()
def change_on_buy(self):
logging.info(f"{self.p_name} cost changing from: {self.cost}")
self.cost = math.ceil(self.cost * 1.1)
# if self.quantity != 999:
# self.quantity -= 1
logging.info(f"{self.p_name} cost now: {self.cost}")
self.save()
db.create_tables([Player])
class Team(BaseModel):
abbrev = CharField()
sname = CharField()
lname = CharField()
gmid = IntegerField()
gmname = CharField()
gsheet = CharField()
wallet = IntegerField()
team_value = IntegerField()
collection_value = IntegerField()
logo = CharField(null=True)
color = CharField(null=True)
season = IntegerField()
event = ForeignKeyField(Event, null=True)
career = IntegerField(default=0)
ranking = IntegerField(default=1000)
has_guide = BooleanField(default=False)
is_ai = IntegerField(null=True)
def __str__(self):
return f"S{self.season} {self.lname}"
@staticmethod
def get_by_owner(gmid, season=None):
if not season:
season = Current.get().season
team = Team.get_or_none((Team.gmid == gmid) & (Team.season == season))
if not team:
return None
return team
@staticmethod
def select_season(season=None):
if not season:
season = Current.get().season
return Team.select().where(Team.season == season)
@staticmethod
def get_season(abbrev, season=None):
if not season:
season = Current.get().season
return Team.get_or_none(Team.season == season, Team.abbrev == abbrev.upper())
def team_hash(self):
hash_string = f"{self.sname[-1]}{self.gmid / 6950123:.0f}{self.sname[-2]}{self.gmid / 42069123:.0f}"
logging.info(f"string: {hash_string}")
return hash_string
db.create_tables([Team])
class PackType(BaseModel):
name = CharField()
card_count = IntegerField()
description = CharField()
cost = IntegerField()
available = BooleanField(default=True)
db.create_tables([PackType])
class Pack(BaseModel):
team = ForeignKeyField(Team)
pack_type = ForeignKeyField(PackType)
pack_team = ForeignKeyField(Team, null=True)
pack_cardset = ForeignKeyField(Cardset, null=True)
open_time = DateTimeField(null=True)
db.create_tables([Pack])
class Card(BaseModel):
player = ForeignKeyField(Player, null=True)
team = ForeignKeyField(Team, null=True)
pack = ForeignKeyField(Pack, null=True)
value = IntegerField(default=0)
def __str__(self):
if self.player:
return f"{self.player} - {self.team.sname}"
else:
return f"Blank - {self.team.sname}"
@staticmethod
def select_season(season):
return Card.select().join(Team).where(Card.team.season == season)
db.create_tables([Card])
class Roster(BaseModel):
team = ForeignKeyField(Team)
name = CharField()
roster_num = IntegerField()
card_1 = ForeignKeyField(Card)
card_2 = ForeignKeyField(Card)
card_3 = ForeignKeyField(Card)
card_4 = ForeignKeyField(Card)
card_5 = ForeignKeyField(Card)
card_6 = ForeignKeyField(Card)
card_7 = ForeignKeyField(Card)
card_8 = ForeignKeyField(Card)
card_9 = ForeignKeyField(Card)
card_10 = ForeignKeyField(Card)
card_11 = ForeignKeyField(Card)
card_12 = ForeignKeyField(Card)
card_13 = ForeignKeyField(Card)
card_14 = ForeignKeyField(Card)
card_15 = ForeignKeyField(Card)
card_16 = ForeignKeyField(Card)
card_17 = ForeignKeyField(Card)
card_18 = ForeignKeyField(Card)
card_19 = ForeignKeyField(Card)
card_20 = ForeignKeyField(Card)
card_21 = ForeignKeyField(Card)
card_22 = ForeignKeyField(Card)
card_23 = ForeignKeyField(Card)
card_24 = ForeignKeyField(Card)
card_25 = ForeignKeyField(Card)
card_26 = ForeignKeyField(Card)
def __str__(self):
return f"{self.team} Roster"
# def get_cards(self, team):
# all_cards = Card.select().where(Card.roster == self)
# this_roster = []
# return [this_roster.card1, this_roster.card2, this_roster.card3, this_roster.card4, this_roster.card5,
# this_roster.card6, this_roster.card7, this_roster.card8, this_roster.card9, this_roster.card10,
# this_roster.card11, this_roster.card12, this_roster.card13, this_roster.card14, this_roster.card15,
# this_roster.card16, this_roster.card17, this_roster.card18, this_roster.card19, this_roster.card20,
# this_roster.card21, this_roster.card22, this_roster.card23, this_roster.card24, this_roster.card25,
# this_roster.card26]
class Result(BaseModel):
away_team = ForeignKeyField(Team)
home_team = ForeignKeyField(Team)
away_score = IntegerField()
home_score = IntegerField()
away_team_value = IntegerField(null=True)
home_team_value = IntegerField(null=True)
away_team_ranking = IntegerField(null=True)
home_team_ranking = IntegerField(null=True)
scorecard = CharField()
week = IntegerField()
season = IntegerField()
ranked = BooleanField()
short_game = BooleanField()
game_type = CharField(null=True)
@staticmethod
def select_season(season=None):
if not season:
season = Current.get().season
return Result.select().where(Result.season == season)
class BattingStat(BaseModel):
card = ForeignKeyField(Card)
team = ForeignKeyField(Team)
roster_num = IntegerField()
vs_team = ForeignKeyField(Team)
result = ForeignKeyField(Result, null=True)
pos = CharField()
pa = IntegerField()
ab = IntegerField()
run = IntegerField()
hit = IntegerField()
rbi = IntegerField()
double = IntegerField()
triple = IntegerField()
hr = IntegerField()
bb = IntegerField()
so = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
gidp = IntegerField()
sb = IntegerField()
cs = IntegerField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
xch = IntegerField()
xhit = IntegerField()
error = IntegerField()
pb = IntegerField()
sbc = IntegerField()
csc = IntegerField()
week = IntegerField()
season = IntegerField()
created = DateTimeField()
game_id = IntegerField()
class PitchingStat(BaseModel):
card = ForeignKeyField(Card)
team = ForeignKeyField(Team)
roster_num = IntegerField()
vs_team = ForeignKeyField(Team)
result = ForeignKeyField(Result, null=True)
ip = FloatField()
hit = IntegerField()
run = IntegerField()
erun = IntegerField()
so = IntegerField()
bb = IntegerField()
hbp = IntegerField()
wp = IntegerField()
balk = IntegerField()
hr = IntegerField()
ir = IntegerField()
irs = IntegerField()
gs = IntegerField()
win = IntegerField()
loss = IntegerField()
hold = IntegerField()
sv = IntegerField()
bsv = IntegerField()
week = IntegerField()
season = IntegerField()
created = DateTimeField()
game_id = IntegerField()
class Award(BaseModel):
name = CharField()
season = IntegerField()
timing = CharField(default="In-Season")
card = ForeignKeyField(Card, null=True)
team = ForeignKeyField(Team, null=True)
image = CharField(null=True)
class Paperdex(BaseModel):
team = ForeignKeyField(Team)
player = ForeignKeyField(Player)
created = DateTimeField(default=datetime.now)
# def add_to_paperdex(self, team, cards: list):
# for x in players:
# if not isinstance(x, Card):
# raise TypeError(f'The Pokedex can only take a list of Player or Card objects')
#
# Paperdex.get_or_create(team=team, player=player)
class Reward(BaseModel):
name = CharField(null=True)
season = IntegerField()
week = IntegerField()
team = ForeignKeyField(Team)
created = DateTimeField()
class GameRewards(BaseModel):
name = CharField()
pack_type = ForeignKeyField(PackType, null=True)
player = ForeignKeyField(Player, null=True)
money = IntegerField(null=True)
class Notification(BaseModel):
created = DateTimeField()
title = CharField()
desc = CharField(null=True)
field_name = CharField()
message = CharField()
about = CharField() # f'{Topic}-{Object ID}'
ack = BooleanField(default=False)
class GauntletReward(BaseModel):
name = CharField()
gauntlet = ForeignKeyField(Event)
reward = ForeignKeyField(GameRewards)
win_num = IntegerField()
loss_max = IntegerField(default=1)
class GauntletRun(BaseModel):
team = ForeignKeyField(Team)
gauntlet = ForeignKeyField(Event)
wins = IntegerField(default=0)
losses = IntegerField(default=0)
gsheet = CharField(null=True)
created = DateTimeField(default=datetime.now)
ended = DateTimeField(null=True)
db.create_tables(
[
Roster,
BattingStat,
PitchingStat,
Result,
Award,
Paperdex,
Reward,
GameRewards,
Notification,
GauntletReward,
GauntletRun,
]
)
class BattingCard(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
steal_low = IntegerField()
steal_high = IntegerField()
steal_auto = BooleanField()
steal_jump = FloatField()
bunting = CharField()
hit_and_run = CharField()
running = IntegerField()
offense_col = IntegerField()
hand = CharField(default="R")
bc_index = ModelIndex(
BattingCard, (BattingCard.player, BattingCard.variant), unique=True
)
BattingCard.add_index(bc_index)
class BattingCardRatings(BaseModel):
battingcard = ForeignKeyField(BattingCard)
vs_hand = CharField(default="R")
pull_rate = FloatField()
center_rate = FloatField()
slap_rate = FloatField()
homerun = FloatField()
bp_homerun = FloatField()
triple = FloatField()
double_three = FloatField()
double_two = FloatField()
double_pull = FloatField()
single_two = FloatField()
single_one = FloatField()
single_center = FloatField()
bp_single = FloatField()
hbp = FloatField()
walk = FloatField()
strikeout = FloatField()
lineout = FloatField()
popout = FloatField()
flyout_a = FloatField()
flyout_bq = FloatField()
flyout_lf_b = FloatField()
flyout_rf_b = FloatField()
groundout_a = FloatField()
groundout_b = FloatField()
groundout_c = FloatField()
avg = FloatField(null=True)
obp = FloatField(null=True)
slg = FloatField(null=True)
bcr_index = ModelIndex(
BattingCardRatings,
(BattingCardRatings.battingcard, BattingCardRatings.vs_hand),
unique=True,
)
BattingCardRatings.add_index(bcr_index)
class PitchingCard(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
balk = IntegerField()
wild_pitch = IntegerField()
hold = IntegerField()
starter_rating = IntegerField()
relief_rating = IntegerField()
closer_rating = IntegerField(null=True)
batting = CharField(null=True)
offense_col = IntegerField()
hand = CharField(default="R")
pc_index = ModelIndex(
PitchingCard, (PitchingCard.player, PitchingCard.variant), unique=True
)
PitchingCard.add_index(pc_index)
class PitchingCardRatings(BaseModel):
pitchingcard = ForeignKeyField(PitchingCard)
vs_hand = CharField(default="R")
homerun = FloatField()
bp_homerun = FloatField()
triple = FloatField()
double_three = FloatField()
double_two = FloatField()
double_cf = FloatField()
single_two = FloatField()
single_one = FloatField()
single_center = FloatField()
bp_single = FloatField()
hbp = FloatField()
walk = FloatField()
strikeout = FloatField()
flyout_lf_b = FloatField()
flyout_cf_b = FloatField()
flyout_rf_b = FloatField()
groundout_a = FloatField()
groundout_b = FloatField()
xcheck_p = FloatField()
xcheck_c = FloatField()
xcheck_1b = FloatField()
xcheck_2b = FloatField()
xcheck_3b = FloatField()
xcheck_ss = FloatField()
xcheck_lf = FloatField()
xcheck_cf = FloatField()
xcheck_rf = FloatField()
avg = FloatField(null=True)
obp = FloatField(null=True)
slg = FloatField(null=True)
pcr_index = ModelIndex(
PitchingCardRatings,
(PitchingCardRatings.pitchingcard, PitchingCardRatings.vs_hand),
unique=True,
)
PitchingCardRatings.add_index(pcr_index)
class CardPosition(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
position = CharField()
innings = IntegerField()
range = IntegerField()
error = IntegerField()
arm = IntegerField(null=True)
pb = IntegerField(null=True)
overthrow = IntegerField(null=True)
pos_index = ModelIndex(
CardPosition,
(CardPosition.player, CardPosition.variant, CardPosition.position),
unique=True,
)
CardPosition.add_index(pos_index)
db.create_tables(
[BattingCard, BattingCardRatings, PitchingCard, PitchingCardRatings, CardPosition]
)
db.close()
# scout_db = SqliteDatabase(
# 'storage/card_creation.db',
# pragmas={
# 'journal_mode': 'wal',
# 'cache_size': -1 * 64000,
# 'synchronous': 0
# }
# )
#
#
# class BaseModelScout(Model):
# class Meta:
# database = scout_db
#
#
# class ScoutCardset(BaseModelScout):
# set_title = CharField()
# set_subtitle = CharField(null=True)
#
#
# class ScoutPlayer(BaseModelScout):
# sba_id = IntegerField(primary_key=True)
# name = CharField()
# fg_id = IntegerField()
# br_id = CharField()
# offense_col = IntegerField()
# hand = CharField(default='R')
#
#
# scout_db.create_tables([ScoutCardset, ScoutPlayer])
#
#
# class BatterRatings(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# vs_hand = FloatField()
# is_prep = BooleanField()
# homerun = FloatField()
# bp_homerun = FloatField()
# triple = FloatField()
# double_three = FloatField()
# double_two = FloatField()
# double_pull = FloatField()
# single_two = FloatField()
# single_one = FloatField()
# single_center = FloatField()
# bp_single = FloatField()
# hbp = FloatField()
# walk = FloatField()
# strikeout = FloatField()
# lineout = FloatField()
# popout = FloatField()
# flyout_a = FloatField()
# flyout_bq = FloatField()
# flyout_lf_b = FloatField()
# flyout_rf_b = FloatField()
# groundout_a = FloatField()
# groundout_b = FloatField()
# groundout_c = FloatField()
# avg = FloatField(null=True)
# obp = FloatField(null=True)
# slg = FloatField(null=True)
#
#
# class PitcherRatings(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# vs_hand = CharField()
# is_prep = BooleanField()
# homerun = FloatField()
# bp_homerun = FloatField()
# triple = FloatField()
# double_three = FloatField()
# double_two = FloatField()
# double_cf = FloatField()
# single_two = FloatField()
# single_one = FloatField()
# single_center = FloatField()
# bp_single = FloatField()
# hbp = FloatField()
# walk = FloatField()
# strikeout = FloatField()
# fo_slap = FloatField()
# fo_center = FloatField()
# groundout_a = FloatField()
# groundout_b = FloatField()
# xcheck_p = FloatField()
# xcheck_c = FloatField()
# xcheck_1b = FloatField()
# xcheck_2b = FloatField()
# xcheck_3b = FloatField()
# xcheck_ss = FloatField()
# xcheck_lf = FloatField()
# xcheck_cf = FloatField()
# xcheck_rf = FloatField()
# avg = FloatField(null=True)
# obp = FloatField(null=True)
# slg = FloatField(null=True)
#
#
# # scout_db.create_tables([BatterRatings, PitcherRatings])
#
#
# class CardColumns(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# hand = CharField()
# b_ratings = ForeignKeyField(BatterRatings, null=True)
# p_ratings = ForeignKeyField(PitcherRatings, null=True)
# one_dice = CharField()
# one_results = CharField()
# one_splits = CharField()
# two_dice = CharField()
# two_results = CharField()
# two_splits = CharField()
# three_dice = CharField()
# three_results = CharField()
# three_splits = CharField()
#
#
# class Position(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# position = CharField()
# innings = IntegerField()
# range = IntegerField()
# error = IntegerField()
# arm = CharField(null=True)
# pb = IntegerField(null=True)
# overthrow = IntegerField(null=True)
#
#
# class BatterData(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# stealing = CharField()
# st_low = IntegerField()
# st_high = IntegerField()
# st_auto = BooleanField()
# st_jump = FloatField()
# bunting = CharField(null=True)
# hit_and_run = CharField(null=True)
# running = CharField()
#
#
# class PitcherData(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# balk = IntegerField(null=True)
# wild_pitch = IntegerField(null=True)
# hold = CharField()
# starter_rating = IntegerField()
# relief_rating = IntegerField()
# closer_rating = IntegerField(null=True)
# batting = CharField(null=True)
#
#
# scout_db.create_tables([CardColumns, Position, BatterData, PitcherData])
#
#
# class CardOutput(BaseModelScout):
# name = CharField()
# hand = CharField()
# positions = CharField()
# stealing = CharField()
# bunting = CharField()
# hitandrun = CharField()
# running = CharField()
#
#
# scout_db.close()