469 lines
18 KiB
Python
469 lines
18 KiB
Python
import copy
|
|
import logging
|
|
import random
|
|
|
|
import pydantic
|
|
from db_calls_gameplay import StratPlay, get_team_lineups
|
|
from db_calls import db_get, db_post
|
|
from peewee import *
|
|
from typing import Optional, Literal
|
|
|
|
db = SqliteDatabase(
|
|
'storage/ai-database.db',
|
|
pragmas={
|
|
'journal_mode': 'wal',
|
|
'cache_size': -1 * 64000,
|
|
'synchronous': 0
|
|
}
|
|
)
|
|
|
|
MINOR_CARDSET_PARAMS = [('cardset_id', 1), ('cardset_id', 3)]
|
|
MAJOR_CARDSET_PARAMS = [('cardset_id', 1), ('cardset_id', 3), ('cardset_id', 4), ('cardset_id', 5), ('cardset_id', 6)]
|
|
|
|
|
|
class BaseModel(Model):
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
class Lineup(BaseModel):
|
|
team_id = IntegerField()
|
|
game_level = CharField() # 'minor', 'major', 'hof'
|
|
vs_hand = CharField() # 'left', 'right'
|
|
bat_one_card = IntegerField()
|
|
bat_one_pos = CharField()
|
|
bat_two_card = IntegerField()
|
|
bat_two_pos = CharField()
|
|
bat_three_card = IntegerField()
|
|
bat_three_pos = CharField()
|
|
bat_four_card = IntegerField()
|
|
bat_four_pos = CharField()
|
|
bat_five_card = IntegerField()
|
|
bat_five_pos = CharField()
|
|
bat_six_card = IntegerField()
|
|
bat_six_pos = CharField()
|
|
bat_seven_card = IntegerField()
|
|
bat_seven_pos = CharField()
|
|
bat_eight_card = IntegerField()
|
|
bat_eight_pos = CharField()
|
|
bat_nine_card = IntegerField()
|
|
bat_nine_pos = CharField()
|
|
|
|
|
|
class Rotation(BaseModel):
|
|
team_id = IntegerField()
|
|
game_level = CharField()
|
|
sp_one_card = IntegerField()
|
|
sp_two_card = IntegerField()
|
|
sp_three_card = IntegerField()
|
|
sp_four_card = IntegerField()
|
|
sp_five_card = IntegerField()
|
|
|
|
|
|
class Bullpen(BaseModel):
|
|
team_id = IntegerField()
|
|
game_level = CharField() # 'minor', 'major', 'hof'
|
|
vs_hand = CharField() # 'left', 'right'
|
|
|
|
|
|
# def grade_player()
|
|
def batter_grading(vs_hand, rg_data):
|
|
raw_bat = (rg_data[f'contact-{vs_hand}'] + rg_data[f'power-{vs_hand}'] +
|
|
min(rg_data[f'contact-{vs_hand}'], rg_data[f'power-{vs_hand}'])) / 3
|
|
other_metrics = [
|
|
rg_data['vision'], rg_data['speed'], rg_data['stealing'], rg_data['reaction'], rg_data['arm'],
|
|
rg_data['fielding']
|
|
]
|
|
blended_rating = sum(sorted(other_metrics, reverse=True)[:4], raw_bat * 4) / 8
|
|
return {
|
|
'overall': rg_data['overall'],
|
|
'raw-bat': raw_bat,
|
|
'blended': blended_rating
|
|
}
|
|
|
|
|
|
def get_or_create_card(player: dict, team: dict) -> int:
|
|
# get player card; create one if none found
|
|
z = 0
|
|
card_id = None
|
|
while z < 2 and card_id is None:
|
|
z += 1
|
|
c_query = db_get('cards',
|
|
params=[('team_id', team['id']), ('player_id', player['player_id'])])
|
|
if c_query['count'] > 0:
|
|
card_id = c_query['cards'][0]['id']
|
|
else:
|
|
db_post(
|
|
'cards',
|
|
payload={'cards': [
|
|
{'player_id': player['player_id'], 'team_id': team['id'], 'pack_id': 1}
|
|
]}
|
|
)
|
|
if card_id is None:
|
|
logging.error(f'Could not create card for {player["p_name"]} on the {team["lname"]}')
|
|
raise DatabaseError(f'Could not create card for {player["p_name"]} on the {team["lname"]}')
|
|
|
|
return card_id
|
|
|
|
|
|
# First attempt - pulls ratings guide info
|
|
def build_lineup_graded(team_object: dict, vs_hand: str, league_name: str, num_innings: int, batter_rg):
|
|
in_lineup = [] # player ids for quick checking
|
|
|
|
players = {
|
|
'c': [],
|
|
'1b': [],
|
|
'2b': [],
|
|
'3b': [],
|
|
'ss': [],
|
|
'lf': [],
|
|
'cf': [],
|
|
'rf': [],
|
|
'dh': []
|
|
}
|
|
|
|
# Get all eligible players
|
|
try:
|
|
all_players = db_get(
|
|
endpoint='players',
|
|
params=[('mlbclub', team_object['lname']), ('pos_exclude', 'RP'), ('inc_dex', False)],
|
|
timeout=10
|
|
)['players']
|
|
except ConnectionError as e:
|
|
raise ConnectionError(f'Error pulling players for the {team_object["lname"]}. Cal help plz.')
|
|
|
|
# Grade players and add to position lists
|
|
for x in all_players:
|
|
if x['pos_1'] not in ['SP', 'RP']:
|
|
this_guy = copy.deepcopy(x)
|
|
rg_data = next(x for x in batter_rg if x['player_id'] == this_guy['player_id'])
|
|
grading = batter_grading(vs_hand, rg_data)
|
|
logging.info(f'player: {this_guy} / grading: {grading}')
|
|
|
|
this_guy['overall'] = grading['overall']
|
|
this_guy['raw-bat'] = grading['raw-bat']
|
|
this_guy['blended'] = grading['blended']
|
|
|
|
players['dh'].append(this_guy)
|
|
if 'C' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['c'].append(this_guy)
|
|
if '1B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['1b'].append(this_guy)
|
|
if '2B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['2b'].append(this_guy)
|
|
if '3B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['3b'].append(this_guy)
|
|
if 'SS' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['ss'].append(this_guy)
|
|
if 'LF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['lf'].append(this_guy)
|
|
if 'CF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['cf'].append(this_guy)
|
|
if 'RF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'],
|
|
this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]:
|
|
players['rf'].append(this_guy)
|
|
|
|
# Select players for lineup
|
|
while len(players) > 0:
|
|
# Sort players dict by position with fewest eligible players
|
|
this_pass_data = sorted(players.items(), key=lambda item: len(item[1]))
|
|
logging.info(f'this_pass_data: {this_pass_data}')
|
|
# Pull one tuple ('<position>', [<player objects>])
|
|
this_pos_data = this_pass_data[0]
|
|
logging.info(f'this_pos_data: {this_pos_data}')
|
|
# Sort players at this position by blended rating (raw-bat for DH)
|
|
if this_pos_data[0] == 'dh':
|
|
this_pos = sorted(this_pos_data[1], key=lambda item: item['raw-bat'], reverse=True)
|
|
else:
|
|
this_pos = sorted(this_pos_data[1], key=lambda item: item['blended'], reverse=True)
|
|
logging.info(f'this_pos: {this_pos}')
|
|
|
|
# Add top player to the lineup
|
|
in_lineup.append({'position': copy.deepcopy(this_pos_data[0].upper()), 'player': copy.deepcopy(this_pos[0])})
|
|
logging.info(f'adding player: {this_pos[0]}')
|
|
logging.info(f'deleting position: {this_pos_data[0]}')
|
|
# Remove this position from consideration
|
|
del players[this_pos_data[0]]
|
|
|
|
for key in players:
|
|
for x in players[key]:
|
|
# Remove duplicate players (even across cardsets) once in lineup
|
|
if x['strat_code'] == this_pos[0]['strat_code']:
|
|
players[key].remove(x)
|
|
|
|
# Set batting order as list of lists: [ ['<pos>', <player_id>], ... ]
|
|
batting_order = []
|
|
|
|
return batting_order
|
|
|
|
|
|
def build_lineup(team_object: dict, game_id: int, league_name: str, vs_hand: str = 'r') -> list:
|
|
players = {
|
|
'C': None,
|
|
'1B': None,
|
|
'2B': None,
|
|
'3B': None,
|
|
'SS': None,
|
|
'LF': None,
|
|
'CF': None,
|
|
'RF': None,
|
|
'DH': None
|
|
}
|
|
p_names = []
|
|
|
|
set_params = [('cardset_id_exclude', 2)]
|
|
if league_name == 'minor-league':
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
elif league_name == 'major-league':
|
|
set_params = MAJOR_CARDSET_PARAMS
|
|
elif league_name == 'gauntlet-1':
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
|
|
# Pull players sorted by current cost
|
|
try:
|
|
params = [
|
|
('mlbclub', team_object['lname']), ('pos_include', 'C'), ('pos_include', '1B'), ('pos_include', '2B'),
|
|
('pos_include', '3B'), ('pos_include', 'SS'), ('pos_include', 'LF'), ('pos_include', 'CF'),
|
|
('pos_include', 'RF'), ('pos_include', 'DH'), ('inc_dex', False), ('sort_by', 'cost-desc')
|
|
]
|
|
params.extend(set_params)
|
|
all_players = db_get(
|
|
endpoint='players',
|
|
params=params,
|
|
timeout=10
|
|
)['players']
|
|
except ConnectionError as e:
|
|
raise ConnectionError(f'Error pulling batters for the {team_object["lname"]}. Cal help plz.')
|
|
|
|
logging.info(f'build_lineup - eligible batter count: {len(all_players)}')
|
|
|
|
# Choose starting nine & position
|
|
# In order of cost:
|
|
# Try to add to pos_1; if unavailable, try remaining pos; if unavailable, add to DH
|
|
# If all pos unavailable; note player's pos_1 and check if incumbent can move; if so, check for new incumbent
|
|
# and recurse
|
|
# If not possible, pass player and continue
|
|
for guy in all_players:
|
|
placed = False
|
|
if guy['p_name'] not in p_names:
|
|
for pos in [
|
|
guy['pos_1'], guy['pos_2'], guy['pos_3'], guy['pos_4'], guy['pos_5'], guy['pos_6'], guy['pos_7'],
|
|
guy['pos_8']
|
|
]:
|
|
if pos is None or pos in ['SP', 'RP', 'CP']:
|
|
break
|
|
elif players[pos] is None:
|
|
players[pos] = guy
|
|
p_names.append(guy["p_name"])
|
|
placed = True
|
|
break
|
|
|
|
if not placed:
|
|
if players['DH'] is None:
|
|
players['DH'] = guy
|
|
p_names.append(guy["p_name"])
|
|
else:
|
|
logging.info(f'build_lineup - could not place {guy["p_name"]} in {team_object["sname"]} lineup')
|
|
else:
|
|
logging.info(f'build_lineup - {guy["p_name"]} already in lineup')
|
|
|
|
if None in players.values():
|
|
bad_pos = {x for x in players if players[x] is None}
|
|
raise ValueError(f'Could not find a {bad_pos} for the {team_object["sname"]}')
|
|
|
|
logging.info(f'build_lineup - {players}')
|
|
|
|
# Sort players into lineup
|
|
# Pseudo-random? Patterns? Something to mix up lineups
|
|
# 421356789 or [123 (rand)][456 (rand)][789 (rand)] (optional: chance to flip 3/4 and 6/7)
|
|
lineups = []
|
|
|
|
# [ (<pos>, <player_obj>), (<pos>, <player_obj>), etc. ]
|
|
sorted_players = sorted(players.items(), key=lambda x: x[1]['cost'], reverse=True)
|
|
|
|
grp_1 = sorted_players[:3]
|
|
grp_2 = sorted_players[3:6]
|
|
grp_3 = sorted_players[6:]
|
|
random.shuffle(grp_1)
|
|
random.shuffle(grp_2)
|
|
random.shuffle(grp_3)
|
|
|
|
i = 1
|
|
for x in [grp_1, grp_2, grp_3]:
|
|
for y in x:
|
|
card_id = get_or_create_card(y[1], team_object)
|
|
|
|
lineups.append({
|
|
'game_id': game_id,
|
|
'team_id': team_object['id'],
|
|
'player_id': y[1]['player_id'],
|
|
'card_id': card_id,
|
|
'position': y[0],
|
|
'batting_order': i,
|
|
'after_play': 0
|
|
})
|
|
i += 1
|
|
|
|
logging.info(f'build_lineup - final lineup: {lineups}')
|
|
|
|
return lineups
|
|
|
|
|
|
def get_starting_pitcher(team_object: dict, game_id: int, is_home: bool, league_name: str = None) -> dict:
|
|
set_params = [('cardset_id_exclude', 2)]
|
|
if league_name == 'minor-league':
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
elif league_name == 'major-league':
|
|
set_params = MAJOR_CARDSET_PARAMS
|
|
elif league_name == 'gauntlet-1':
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
|
|
# Pull starters sorted by current cost
|
|
try:
|
|
params = [
|
|
('mlbclub', team_object['lname']), ('pos_include', 'SP'), ('pos_exclude', 'RP'),
|
|
('inc_dex', False), ('sort_by', 'cost-desc'), ('limit', 5)
|
|
]
|
|
params.extend(set_params)
|
|
pitchers = db_get(
|
|
endpoint='players',
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
except ConnectionError as e:
|
|
logging.error(f'Could not get pitchers for {team_object["lname"]}: {e}')
|
|
raise ConnectionError(f'Error pulling starting pitchers for the {team_object["lname"]}. Cal help plz.')
|
|
|
|
logging.info(f'build_lineup - eligible starting pitcher count: {len(pitchers)}')
|
|
if pitchers['count'] == 0:
|
|
raise DatabaseError(f'Could not find any SP for {team_object["abbrev"]}. Seems like a Cal issue.')
|
|
|
|
d_100 = random.randint(1, 100)
|
|
starter = None
|
|
if is_home:
|
|
if d_100 <= 30:
|
|
starter = pitchers['players'][0]
|
|
elif d_100 <= 55 and pitchers['count'] > 1:
|
|
starter = pitchers['players'][1]
|
|
elif d_100 <= 75 and pitchers['count'] > 2:
|
|
starter = pitchers['players'][2]
|
|
elif d_100 <= 90 and pitchers['count'] > 3:
|
|
starter = pitchers['players'][3]
|
|
elif pitchers['count'] == 5:
|
|
starter = pitchers['players'][4]
|
|
else:
|
|
starter = pitchers['players'][0]
|
|
else:
|
|
if d_100 <= 50:
|
|
starter = pitchers['players'][0]
|
|
elif d_100 <= 75 and pitchers['count'] > 1:
|
|
starter = pitchers['players'][1]
|
|
elif d_100 <= 85 and pitchers['count'] > 2:
|
|
starter = pitchers['players'][2]
|
|
elif d_100 <= 95 and pitchers['count'] > 3:
|
|
starter = pitchers['players'][3]
|
|
elif pitchers['count'] == 5:
|
|
starter = pitchers['players'][4]
|
|
else:
|
|
starter = pitchers['players'][0]
|
|
|
|
# get player card; create one if none found
|
|
card_id = get_or_create_card(starter, team_object)
|
|
|
|
return {
|
|
'game_id': game_id,
|
|
'team_id': team_object['id'],
|
|
'player_id': starter['player_id'],
|
|
'card_id': card_id,
|
|
'position': 'P',
|
|
'batting_order': 10,
|
|
'after_play': 0
|
|
}
|
|
|
|
|
|
def get_relief_pitcher(this_play: StratPlay, ai_team: dict, used_pitchers: list, league_name: str = None) -> dict:
|
|
used_codes = [db_get('cards', object_id=x.card_id)["player"]["strat_code"] for x in used_pitchers]
|
|
logging.info(f'get_rp - used_pitchers: {used_codes}')
|
|
|
|
reliever = None
|
|
attempts = 0
|
|
while reliever is None:
|
|
attempts += 1
|
|
if attempts > 3:
|
|
raise ValueError(f'Could not find a reliever for the {ai_team["sname"]}. Cal plz.')
|
|
|
|
logging.info(f'TEST GAME - league_name: {league_name}')
|
|
set_params = [('cardset_id_exclude', 2)]
|
|
if league_name == 'minor-league':
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
elif league_name == 'major-league':
|
|
set_params = MAJOR_CARDSET_PARAMS
|
|
elif 'gauntlet-1' in league_name:
|
|
set_params = MINOR_CARDSET_PARAMS
|
|
|
|
# Pull relievers sorted by current cost
|
|
params = [
|
|
('mlbclub', ai_team['lname']), ('pos_include', 'RP'), ('inc_dex', False), ('sort_by', 'cost-desc')
|
|
]
|
|
params.extend(set_params)
|
|
|
|
alt_flag = False
|
|
if attempts == 1:
|
|
# Try to get long man
|
|
if this_play.inning_num < 6:
|
|
logging.info(f'get_rp - game {this_play.game.id} try for long man')
|
|
params.append(('pos_include', 'SP'))
|
|
alt_flag = True
|
|
|
|
# Try to get closer
|
|
elif this_play.inning_num > 8:
|
|
if (this_play.inning_half == 'top' and this_play.home_score >= this_play.away_score) or \
|
|
(this_play.inning_half == 'bot' and this_play.away_score >= this_play.home_score):
|
|
logging.info(f'get_rp - game {this_play.game.id} try for closer')
|
|
params.append(('pos_include', 'CP'))
|
|
alt_flag = True
|
|
else:
|
|
params.append(('pos_exclude', 'CP'))
|
|
|
|
# Try to exclude long men
|
|
elif attempts == 1:
|
|
logging.info(f'get_rp - game {this_play.game.id} try to exclude long men')
|
|
params.append(('pos_exclude', 'SP'))
|
|
|
|
try:
|
|
pitchers = db_get(
|
|
endpoint='players',
|
|
params=params,
|
|
timeout=10
|
|
)
|
|
except ConnectionError as e:
|
|
logging.error(f'Could not get pitchers for {ai_team["lname"]}: {e}')
|
|
raise ConnectionError(f'Error pulling starting pitchers for the {ai_team["lname"]}. Cal help plz.')
|
|
|
|
if pitchers['count'] > 0:
|
|
if alt_flag or this_play.inning_num > 9 or attempts > 2:
|
|
start = 0
|
|
else:
|
|
start = 9 - this_play.inning_num
|
|
|
|
for count, guy in enumerate(pitchers['players']):
|
|
if count >= start and guy['strat_code'] not in used_codes:
|
|
card_id = get_or_create_card(guy, ai_team)
|
|
|
|
return {
|
|
'game_id': this_play.game.id,
|
|
'team_id': ai_team['id'],
|
|
'player_id': guy['player_id'],
|
|
'card_id': card_id,
|
|
'position': 'P',
|
|
'batting_order': 10,
|
|
'after_play': this_play.play_num - 1
|
|
}
|