paper-dynasty-database/db_engine.py
Cal Corum 8c039dedf8 Fix DateTimeField defaults for PostgreSQL compatibility
Paperdex and GauntletRun models used int timestamps as defaults which
worked in SQLite but fail in PostgreSQL. Changed to datetime.now.
2026-01-31 15:56:33 -06:00

892 lines
25 KiB
Python

import math
from datetime import datetime
from typing import List
import logging
import os
from pandas import DataFrame
from peewee import *
from peewee import ModelSelect
from playhouse.shortcuts import model_to_dict
db = SqliteDatabase(
"storage/pd_master.db",
pragmas={"journal_mode": "wal", "cache_size": -1 * 64000, "synchronous": 0},
)
date = f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}"
log_level = logging.INFO if os.environ.get("LOG_LEVEL") == "INFO" else "WARN"
logging.basicConfig(
filename=f"logs/database/{date}.log",
format="%(asctime)s - database - %(levelname)s - %(message)s",
level=log_level,
)
def model_csv_headers(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.keys()]
def model_to_csv(this_obj, exclude=None) -> List:
data = model_to_dict(this_obj, recurse=False, exclude=exclude)
return [x for x in data.values()]
def query_to_csv(all_items: ModelSelect, exclude=None):
if all_items.count() == 0:
data_list = [["No data found"]]
else:
data_list = [model_csv_headers(all_items[0], exclude=exclude)]
for x in all_items:
data_list.append(model_to_csv(x, exclude=exclude))
return DataFrame(data_list).to_csv(header=False, index=False)
def complex_data_to_csv(complex_data: List):
if len(complex_data) == 0:
data_list = [["No data found"]]
else:
data_list = [[x for x in complex_data[0].keys()]]
for line in complex_data:
logging.debug(f"line: {line}")
this_row = []
for key in line:
logging.debug(f"key: {key}")
if line[key] is None:
this_row.append("")
elif isinstance(line[key], dict):
if "name" in line[key]:
this_row.append(line[key]["name"])
elif "abbrev" in line[key]:
this_row.append(line[key]["abbrev"])
else:
this_row.append(line[key]["id"])
elif isinstance(line[key], int) and line[key] > 100000000:
this_row.append(f"'{line[key]}")
elif isinstance(line[key], str) and "," in line[key]:
this_row.append(line[key].replace(",", "-_-"))
else:
this_row.append(line[key])
data_list.append(this_row)
return DataFrame(data_list).to_csv(header=False, index=False)
class BaseModel(Model):
class Meta:
database = db
class Current(BaseModel):
season = IntegerField()
week = IntegerField(default=0)
gsheet_template = CharField()
gsheet_version = CharField()
live_scoreboard = IntegerField()
@staticmethod
def latest():
latest_current = Current.select().order_by(-Current.id).get()
return latest_current
db.create_tables([Current])
class Rarity(BaseModel):
value = IntegerField()
name = CharField(unique=True)
color = CharField()
def __str__(self):
return self.name
db.create_tables([Rarity])
class Event(BaseModel):
name = CharField()
short_desc = CharField(null=True)
url = CharField(null=True)
long_desc = CharField(null=True)
thumbnail = CharField(null=True)
active = BooleanField(default=False)
db.create_tables([Event])
class Cardset(BaseModel):
name = CharField()
description = CharField()
event = ForeignKeyField(Event, null=True)
for_purchase = BooleanField(default=True) # for_purchase
total_cards = IntegerField()
in_packs = BooleanField(default=True)
ranked_legal = BooleanField(default=True)
def __str__(self):
return self.name
db.create_tables([Cardset])
class MlbPlayer(BaseModel):
first_name = CharField()
last_name = CharField()
key_fangraphs = IntegerField(null=True)
key_bbref = CharField(null=True)
key_retro = CharField(null=True)
key_mlbam = IntegerField(null=True)
offense_col = IntegerField(default=1)
db.create_tables([MlbPlayer])
class Player(BaseModel):
player_id = IntegerField(primary_key=True)
p_name = CharField()
cost = IntegerField(default=0)
image = CharField()
image2 = CharField(null=True)
mlbclub = CharField()
franchise = CharField()
cardset = ForeignKeyField(Cardset)
set_num = IntegerField()
rarity = ForeignKeyField(Rarity)
pos_1 = CharField()
pos_2 = CharField(null=True)
pos_3 = CharField(null=True)
pos_4 = CharField(null=True)
pos_5 = CharField(null=True)
pos_6 = CharField(null=True)
pos_7 = CharField(null=True)
pos_8 = CharField(null=True)
headshot = CharField(null=True)
vanity_card = CharField(null=True)
strat_code = CharField(null=True)
bbref_id = CharField(null=True)
fangr_id = CharField(null=True)
description = CharField()
quantity = IntegerField(default=999)
mlbplayer = ForeignKeyField(MlbPlayer, null=True)
def __str__(self):
return f"{self.cardset} {self.p_name} ({self.rarity.name})"
# def __eq__(self, other):
# if self.cardset.id == other.cardset.id and self.name == other.name:
# return True
# else:
# return False
def __lt__(self, other):
if self.wara < other.wara:
return True
elif self.wara > other.wara:
return False
elif self.name < other.name:
return True
else:
return False
def get_all_pos(self):
all_pos = []
if self.pos_1 and self.pos_1 != "CP":
all_pos.append(self.pos_1)
if self.pos_2 and self.pos_2 != "CP":
all_pos.append(self.pos_2)
if self.pos_3 and self.pos_3 != "CP":
all_pos.append(self.pos_3)
if self.pos_4 and self.pos_4 != "CP":
all_pos.append(self.pos_4)
if self.pos_5 and self.pos_5 != "CP":
all_pos.append(self.pos_5)
if self.pos_6 and self.pos_6 != "CP":
all_pos.append(self.pos_6)
if self.pos_7 and self.pos_7 != "CP":
all_pos.append(self.pos_7)
if self.pos_8 and self.pos_8 != "CP":
all_pos.append(self.pos_8)
return all_pos
def change_on_sell(self):
# caps = {
# 'replacement': 15,
# 'reserve': 50,
# 'starter': 200,
# 'all-star': 750,
# 'mvp': 2500,
# 'hof': 999999999
# }
logging.info(f"{self.p_name} cost changing from: {self.cost}")
self.cost = max(math.floor(self.cost * 0.95), 1)
# if self.quantity != 999:
# self.quantity += 1
logging.info(f"{self.p_name} cost now: {self.cost}")
self.save()
def change_on_buy(self):
logging.info(f"{self.p_name} cost changing from: {self.cost}")
self.cost = math.ceil(self.cost * 1.1)
# if self.quantity != 999:
# self.quantity -= 1
logging.info(f"{self.p_name} cost now: {self.cost}")
self.save()
db.create_tables([Player])
class Team(BaseModel):
abbrev = CharField()
sname = CharField()
lname = CharField()
gmid = IntegerField()
gmname = CharField()
gsheet = CharField()
wallet = IntegerField()
team_value = IntegerField()
collection_value = IntegerField()
logo = CharField(null=True)
color = CharField(null=True)
season = IntegerField()
event = ForeignKeyField(Event, null=True)
career = IntegerField(default=0)
ranking = IntegerField(default=1000)
has_guide = BooleanField(default=False)
is_ai = IntegerField(null=True)
def __str__(self):
return f"S{self.season} {self.lname}"
@staticmethod
def get_by_owner(gmid, season=None):
if not season:
season = Current.get().season
team = Team.get_or_none((Team.gmid == gmid) & (Team.season == season))
if not team:
return None
return team
@staticmethod
def select_season(season=None):
if not season:
season = Current.get().season
return Team.select().where(Team.season == season)
@staticmethod
def get_season(abbrev, season=None):
if not season:
season = Current.get().season
return Team.get_or_none(Team.season == season, Team.abbrev == abbrev.upper())
def team_hash(self):
hash_string = f"{self.sname[-1]}{self.gmid / 6950123:.0f}{self.sname[-2]}{self.gmid / 42069123:.0f}"
logging.info(f"string: {hash_string}")
return hash_string
db.create_tables([Team])
class PackType(BaseModel):
name = CharField()
card_count = IntegerField()
description = CharField()
cost = IntegerField()
available = BooleanField(default=True)
db.create_tables([PackType])
class Pack(BaseModel):
team = ForeignKeyField(Team)
pack_type = ForeignKeyField(PackType)
pack_team = ForeignKeyField(Team, null=True)
pack_cardset = ForeignKeyField(Cardset, null=True)
open_time = DateTimeField(null=True)
db.create_tables([Pack])
class Card(BaseModel):
player = ForeignKeyField(Player, null=True)
team = ForeignKeyField(Team, null=True)
pack = ForeignKeyField(Pack, null=True)
value = IntegerField(default=0)
def __str__(self):
if self.player:
return f"{self.player} - {self.team.sname}"
else:
return f"Blank - {self.team.sname}"
@staticmethod
def select_season(season):
return Card.select().join(Team).where(Card.team.season == season)
db.create_tables([Card])
class Roster(BaseModel):
team = ForeignKeyField(Team)
name = CharField()
roster_num = IntegerField()
card_1 = ForeignKeyField(Card)
card_2 = ForeignKeyField(Card)
card_3 = ForeignKeyField(Card)
card_4 = ForeignKeyField(Card)
card_5 = ForeignKeyField(Card)
card_6 = ForeignKeyField(Card)
card_7 = ForeignKeyField(Card)
card_8 = ForeignKeyField(Card)
card_9 = ForeignKeyField(Card)
card_10 = ForeignKeyField(Card)
card_11 = ForeignKeyField(Card)
card_12 = ForeignKeyField(Card)
card_13 = ForeignKeyField(Card)
card_14 = ForeignKeyField(Card)
card_15 = ForeignKeyField(Card)
card_16 = ForeignKeyField(Card)
card_17 = ForeignKeyField(Card)
card_18 = ForeignKeyField(Card)
card_19 = ForeignKeyField(Card)
card_20 = ForeignKeyField(Card)
card_21 = ForeignKeyField(Card)
card_22 = ForeignKeyField(Card)
card_23 = ForeignKeyField(Card)
card_24 = ForeignKeyField(Card)
card_25 = ForeignKeyField(Card)
card_26 = ForeignKeyField(Card)
def __str__(self):
return f"{self.team} Roster"
# def get_cards(self, team):
# all_cards = Card.select().where(Card.roster == self)
# this_roster = []
# return [this_roster.card1, this_roster.card2, this_roster.card3, this_roster.card4, this_roster.card5,
# this_roster.card6, this_roster.card7, this_roster.card8, this_roster.card9, this_roster.card10,
# this_roster.card11, this_roster.card12, this_roster.card13, this_roster.card14, this_roster.card15,
# this_roster.card16, this_roster.card17, this_roster.card18, this_roster.card19, this_roster.card20,
# this_roster.card21, this_roster.card22, this_roster.card23, this_roster.card24, this_roster.card25,
# this_roster.card26]
class Result(BaseModel):
away_team = ForeignKeyField(Team)
home_team = ForeignKeyField(Team)
away_score = IntegerField()
home_score = IntegerField()
away_team_value = IntegerField(null=True)
home_team_value = IntegerField(null=True)
away_team_ranking = IntegerField(null=True)
home_team_ranking = IntegerField(null=True)
scorecard = CharField()
week = IntegerField()
season = IntegerField()
ranked = BooleanField()
short_game = BooleanField()
game_type = CharField(null=True)
@staticmethod
def select_season(season=None):
if not season:
season = Current.get().season
return Result.select().where(Result.season == season)
class BattingStat(BaseModel):
card = ForeignKeyField(Card)
team = ForeignKeyField(Team)
roster_num = IntegerField()
vs_team = ForeignKeyField(Team)
result = ForeignKeyField(Result, null=True)
pos = CharField()
pa = IntegerField()
ab = IntegerField()
run = IntegerField()
hit = IntegerField()
rbi = IntegerField()
double = IntegerField()
triple = IntegerField()
hr = IntegerField()
bb = IntegerField()
so = IntegerField()
hbp = IntegerField()
sac = IntegerField()
ibb = IntegerField()
gidp = IntegerField()
sb = IntegerField()
cs = IntegerField()
bphr = IntegerField()
bpfo = IntegerField()
bp1b = IntegerField()
bplo = IntegerField()
xch = IntegerField()
xhit = IntegerField()
error = IntegerField()
pb = IntegerField()
sbc = IntegerField()
csc = IntegerField()
week = IntegerField()
season = IntegerField()
created = DateTimeField()
game_id = IntegerField()
class PitchingStat(BaseModel):
card = ForeignKeyField(Card)
team = ForeignKeyField(Team)
roster_num = IntegerField()
vs_team = ForeignKeyField(Team)
result = ForeignKeyField(Result, null=True)
ip = FloatField()
hit = IntegerField()
run = IntegerField()
erun = IntegerField()
so = IntegerField()
bb = IntegerField()
hbp = IntegerField()
wp = IntegerField()
balk = IntegerField()
hr = IntegerField()
ir = IntegerField()
irs = IntegerField()
gs = IntegerField()
win = IntegerField()
loss = IntegerField()
hold = IntegerField()
sv = IntegerField()
bsv = IntegerField()
week = IntegerField()
season = IntegerField()
created = DateTimeField()
game_id = IntegerField()
class Award(BaseModel):
name = CharField()
season = IntegerField()
timing = CharField(default="In-Season")
card = ForeignKeyField(Card, null=True)
team = ForeignKeyField(Team, null=True)
image = CharField(null=True)
class Paperdex(BaseModel):
team = ForeignKeyField(Team)
player = ForeignKeyField(Player)
created = DateTimeField(default=datetime.now)
# def add_to_paperdex(self, team, cards: list):
# for x in players:
# if not isinstance(x, Card):
# raise TypeError(f'The Pokedex can only take a list of Player or Card objects')
#
# Paperdex.get_or_create(team=team, player=player)
class Reward(BaseModel):
name = CharField(null=True)
season = IntegerField()
week = IntegerField()
team = ForeignKeyField(Team)
created = DateTimeField()
class GameRewards(BaseModel):
name = CharField()
pack_type = ForeignKeyField(PackType, null=True)
player = ForeignKeyField(Player, null=True)
money = IntegerField(null=True)
class Notification(BaseModel):
created = DateTimeField()
title = CharField()
desc = CharField(null=True)
field_name = CharField()
message = CharField()
about = CharField() # f'{Topic}-{Object ID}'
ack = BooleanField(default=False)
class GauntletReward(BaseModel):
name = CharField()
gauntlet = ForeignKeyField(Event)
reward = ForeignKeyField(GameRewards)
win_num = IntegerField()
loss_max = IntegerField(default=1)
class GauntletRun(BaseModel):
team = ForeignKeyField(Team)
gauntlet = ForeignKeyField(Event)
wins = IntegerField(default=0)
losses = IntegerField(default=0)
gsheet = CharField(null=True)
created = DateTimeField(default=datetime.now)
ended = DateTimeField(null=True)
db.create_tables(
[
Roster,
BattingStat,
PitchingStat,
Result,
Award,
Paperdex,
Reward,
GameRewards,
Notification,
GauntletReward,
GauntletRun,
]
)
class BattingCard(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
steal_low = IntegerField()
steal_high = IntegerField()
steal_auto = BooleanField()
steal_jump = FloatField()
bunting = CharField()
hit_and_run = CharField()
running = IntegerField()
offense_col = IntegerField()
hand = CharField(default="R")
bc_index = ModelIndex(
BattingCard, (BattingCard.player, BattingCard.variant), unique=True
)
BattingCard.add_index(bc_index)
class BattingCardRatings(BaseModel):
battingcard = ForeignKeyField(BattingCard)
vs_hand = CharField(default="R")
pull_rate = FloatField()
center_rate = FloatField()
slap_rate = FloatField()
homerun = FloatField()
bp_homerun = FloatField()
triple = FloatField()
double_three = FloatField()
double_two = FloatField()
double_pull = FloatField()
single_two = FloatField()
single_one = FloatField()
single_center = FloatField()
bp_single = FloatField()
hbp = FloatField()
walk = FloatField()
strikeout = FloatField()
lineout = FloatField()
popout = FloatField()
flyout_a = FloatField()
flyout_bq = FloatField()
flyout_lf_b = FloatField()
flyout_rf_b = FloatField()
groundout_a = FloatField()
groundout_b = FloatField()
groundout_c = FloatField()
avg = FloatField(null=True)
obp = FloatField(null=True)
slg = FloatField(null=True)
bcr_index = ModelIndex(
BattingCardRatings,
(BattingCardRatings.battingcard, BattingCardRatings.vs_hand),
unique=True,
)
BattingCardRatings.add_index(bcr_index)
class PitchingCard(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
balk = IntegerField()
wild_pitch = IntegerField()
hold = IntegerField()
starter_rating = IntegerField()
relief_rating = IntegerField()
closer_rating = IntegerField(null=True)
batting = CharField(null=True)
offense_col = IntegerField()
hand = CharField(default="R")
pc_index = ModelIndex(
PitchingCard, (PitchingCard.player, PitchingCard.variant), unique=True
)
PitchingCard.add_index(pc_index)
class PitchingCardRatings(BaseModel):
pitchingcard = ForeignKeyField(PitchingCard)
vs_hand = CharField(default="R")
homerun = FloatField()
bp_homerun = FloatField()
triple = FloatField()
double_three = FloatField()
double_two = FloatField()
double_cf = FloatField()
single_two = FloatField()
single_one = FloatField()
single_center = FloatField()
bp_single = FloatField()
hbp = FloatField()
walk = FloatField()
strikeout = FloatField()
flyout_lf_b = FloatField()
flyout_cf_b = FloatField()
flyout_rf_b = FloatField()
groundout_a = FloatField()
groundout_b = FloatField()
xcheck_p = FloatField()
xcheck_c = FloatField()
xcheck_1b = FloatField()
xcheck_2b = FloatField()
xcheck_3b = FloatField()
xcheck_ss = FloatField()
xcheck_lf = FloatField()
xcheck_cf = FloatField()
xcheck_rf = FloatField()
avg = FloatField(null=True)
obp = FloatField(null=True)
slg = FloatField(null=True)
pcr_index = ModelIndex(
PitchingCardRatings,
(PitchingCardRatings.pitchingcard, PitchingCardRatings.vs_hand),
unique=True,
)
PitchingCardRatings.add_index(pcr_index)
class CardPosition(BaseModel):
player = ForeignKeyField(Player)
variant = IntegerField()
position = CharField()
innings = IntegerField()
range = IntegerField()
error = IntegerField()
arm = IntegerField(null=True)
pb = IntegerField(null=True)
overthrow = IntegerField(null=True)
pos_index = ModelIndex(
CardPosition,
(CardPosition.player, CardPosition.variant, CardPosition.position),
unique=True,
)
CardPosition.add_index(pos_index)
db.create_tables(
[BattingCard, BattingCardRatings, PitchingCard, PitchingCardRatings, CardPosition]
)
db.close()
# scout_db = SqliteDatabase(
# 'storage/card_creation.db',
# pragmas={
# 'journal_mode': 'wal',
# 'cache_size': -1 * 64000,
# 'synchronous': 0
# }
# )
#
#
# class BaseModelScout(Model):
# class Meta:
# database = scout_db
#
#
# class ScoutCardset(BaseModelScout):
# set_title = CharField()
# set_subtitle = CharField(null=True)
#
#
# class ScoutPlayer(BaseModelScout):
# sba_id = IntegerField(primary_key=True)
# name = CharField()
# fg_id = IntegerField()
# br_id = CharField()
# offense_col = IntegerField()
# hand = CharField(default='R')
#
#
# scout_db.create_tables([ScoutCardset, ScoutPlayer])
#
#
# class BatterRatings(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# vs_hand = FloatField()
# is_prep = BooleanField()
# homerun = FloatField()
# bp_homerun = FloatField()
# triple = FloatField()
# double_three = FloatField()
# double_two = FloatField()
# double_pull = FloatField()
# single_two = FloatField()
# single_one = FloatField()
# single_center = FloatField()
# bp_single = FloatField()
# hbp = FloatField()
# walk = FloatField()
# strikeout = FloatField()
# lineout = FloatField()
# popout = FloatField()
# flyout_a = FloatField()
# flyout_bq = FloatField()
# flyout_lf_b = FloatField()
# flyout_rf_b = FloatField()
# groundout_a = FloatField()
# groundout_b = FloatField()
# groundout_c = FloatField()
# avg = FloatField(null=True)
# obp = FloatField(null=True)
# slg = FloatField(null=True)
#
#
# class PitcherRatings(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# vs_hand = CharField()
# is_prep = BooleanField()
# homerun = FloatField()
# bp_homerun = FloatField()
# triple = FloatField()
# double_three = FloatField()
# double_two = FloatField()
# double_cf = FloatField()
# single_two = FloatField()
# single_one = FloatField()
# single_center = FloatField()
# bp_single = FloatField()
# hbp = FloatField()
# walk = FloatField()
# strikeout = FloatField()
# fo_slap = FloatField()
# fo_center = FloatField()
# groundout_a = FloatField()
# groundout_b = FloatField()
# xcheck_p = FloatField()
# xcheck_c = FloatField()
# xcheck_1b = FloatField()
# xcheck_2b = FloatField()
# xcheck_3b = FloatField()
# xcheck_ss = FloatField()
# xcheck_lf = FloatField()
# xcheck_cf = FloatField()
# xcheck_rf = FloatField()
# avg = FloatField(null=True)
# obp = FloatField(null=True)
# slg = FloatField(null=True)
#
#
# # scout_db.create_tables([BatterRatings, PitcherRatings])
#
#
# class CardColumns(BaseModelScout):
# id = CharField(unique=True, primary_key=True)
# player = ForeignKeyField(ScoutPlayer)
# hand = CharField()
# b_ratings = ForeignKeyField(BatterRatings, null=True)
# p_ratings = ForeignKeyField(PitcherRatings, null=True)
# one_dice = CharField()
# one_results = CharField()
# one_splits = CharField()
# two_dice = CharField()
# two_results = CharField()
# two_splits = CharField()
# three_dice = CharField()
# three_results = CharField()
# three_splits = CharField()
#
#
# class Position(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# position = CharField()
# innings = IntegerField()
# range = IntegerField()
# error = IntegerField()
# arm = CharField(null=True)
# pb = IntegerField(null=True)
# overthrow = IntegerField(null=True)
#
#
# class BatterData(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# stealing = CharField()
# st_low = IntegerField()
# st_high = IntegerField()
# st_auto = BooleanField()
# st_jump = FloatField()
# bunting = CharField(null=True)
# hit_and_run = CharField(null=True)
# running = CharField()
#
#
# class PitcherData(BaseModelScout):
# player = ForeignKeyField(ScoutPlayer)
# cardset = ForeignKeyField(ScoutCardset)
# balk = IntegerField(null=True)
# wild_pitch = IntegerField(null=True)
# hold = CharField()
# starter_rating = IntegerField()
# relief_rating = IntegerField()
# closer_rating = IntegerField(null=True)
# batting = CharField(null=True)
#
#
# scout_db.create_tables([CardColumns, Position, BatterData, PitcherData])
#
#
# class CardOutput(BaseModelScout):
# name = CharField()
# hand = CharField()
# positions = CharField()
# stealing = CharField()
# bunting = CharField()
# hitandrun = CharField()
# running = CharField()
#
#
# scout_db.close()