import copy import logging import random import pydantic from db_calls_gameplay import StratPlay, get_team_lineups from db_calls import db_get, db_post from peewee import * from typing import Optional, Literal db = SqliteDatabase( 'storage/ai-database.db', pragmas={ 'journal_mode': 'wal', 'cache_size': -1 * 64000, 'synchronous': 0 } ) MINOR_CARDSET_PARAMS = [('cardset_id', 1), ('cardset_id', 3)] MAJOR_CARDSET_PARAMS = [('cardset_id', 1), ('cardset_id', 3), ('cardset_id', 4), ('cardset_id', 5), ('cardset_id', 6)] class BaseModel(Model): class Meta: database = db class Lineup(BaseModel): team_id = IntegerField() game_level = CharField() # 'minor', 'major', 'hof' vs_hand = CharField() # 'left', 'right' bat_one_card = IntegerField() bat_one_pos = CharField() bat_two_card = IntegerField() bat_two_pos = CharField() bat_three_card = IntegerField() bat_three_pos = CharField() bat_four_card = IntegerField() bat_four_pos = CharField() bat_five_card = IntegerField() bat_five_pos = CharField() bat_six_card = IntegerField() bat_six_pos = CharField() bat_seven_card = IntegerField() bat_seven_pos = CharField() bat_eight_card = IntegerField() bat_eight_pos = CharField() bat_nine_card = IntegerField() bat_nine_pos = CharField() class Rotation(BaseModel): team_id = IntegerField() game_level = CharField() sp_one_card = IntegerField() sp_two_card = IntegerField() sp_three_card = IntegerField() sp_four_card = IntegerField() sp_five_card = IntegerField() class Bullpen(BaseModel): team_id = IntegerField() game_level = CharField() # 'minor', 'major', 'hof' vs_hand = CharField() # 'left', 'right' # def grade_player() def batter_grading(vs_hand, rg_data): raw_bat = (rg_data[f'contact-{vs_hand}'] + rg_data[f'power-{vs_hand}'] + min(rg_data[f'contact-{vs_hand}'], rg_data[f'power-{vs_hand}'])) / 3 other_metrics = [ rg_data['vision'], rg_data['speed'], rg_data['stealing'], rg_data['reaction'], rg_data['arm'], rg_data['fielding'] ] blended_rating = sum(sorted(other_metrics, reverse=True)[:4], raw_bat * 4) / 8 return { 'overall': rg_data['overall'], 'raw-bat': raw_bat, 'blended': blended_rating } def get_or_create_card(player: dict, team: dict) -> int: # get player card; create one if none found z = 0 card_id = None while z < 2 and card_id is None: z += 1 c_query = db_get('cards', params=[('team_id', team['id']), ('player_id', player['player_id'])]) if c_query['count'] > 0: card_id = c_query['cards'][0]['id'] else: db_post( 'cards', payload={'cards': [ {'player_id': player['player_id'], 'team_id': team['id'], 'pack_id': 1} ]} ) if card_id is None: logging.error(f'Could not create card for {player["p_name"]} on the {team["lname"]}') return card_id # First attempt - pulls ratings guide info def build_lineup_graded(team_object: dict, vs_hand: str, league_name: str, num_innings: int, batter_rg): in_lineup = [] # player ids for quick checking players = { 'c': [], '1b': [], '2b': [], '3b': [], 'ss': [], 'lf': [], 'cf': [], 'rf': [], 'dh': [] } # Get all eligible players try: all_players = db_get( endpoint='players', params=[('mlbclub', team_object['lname']), ('pos_exclude', 'RP'), ('inc_dex', False)], timeout=10 )['players'] except ConnectionError as e: raise ConnectionError(f'Error pulling players for the {team_object["lname"]}. Cal help plz.') # Grade players and add to position lists for x in all_players: if x['pos_1'] not in ['SP', 'RP']: this_guy = copy.deepcopy(x) rg_data = next(x for x in batter_rg if x['player_id'] == this_guy['player_id']) grading = batter_grading(vs_hand, rg_data) logging.info(f'player: {this_guy} / grading: {grading}') this_guy['overall'] = grading['overall'] this_guy['raw-bat'] = grading['raw-bat'] this_guy['blended'] = grading['blended'] players['dh'].append(this_guy) if 'C' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['c'].append(this_guy) if '1B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['1b'].append(this_guy) if '2B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['2b'].append(this_guy) if '3B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['3b'].append(this_guy) if 'SS' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['ss'].append(this_guy) if 'LF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['lf'].append(this_guy) if 'CF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['cf'].append(this_guy) if 'RF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['rf'].append(this_guy) # Select players for lineup while len(players) > 0: # Sort players dict by position with fewest eligible players this_pass_data = sorted(players.items(), key=lambda item: len(item[1])) logging.info(f'this_pass_data: {this_pass_data}') # Pull one tuple ('', []) this_pos_data = this_pass_data[0] logging.info(f'this_pos_data: {this_pos_data}') # Sort players at this position by blended rating (raw-bat for DH) if this_pos_data[0] == 'dh': this_pos = sorted(this_pos_data[1], key=lambda item: item['raw-bat'], reverse=True) else: this_pos = sorted(this_pos_data[1], key=lambda item: item['blended'], reverse=True) logging.info(f'this_pos: {this_pos}') # Add top player to the lineup in_lineup.append({'position': copy.deepcopy(this_pos_data[0].upper()), 'player': copy.deepcopy(this_pos[0])}) logging.info(f'adding player: {this_pos[0]}') logging.info(f'deleting position: {this_pos_data[0]}') # Remove this position from consideration del players[this_pos_data[0]] for key in players: for x in players[key]: # Remove duplicate players (even across cardsets) once in lineup if x['strat_code'] == this_pos[0]['strat_code']: players[key].remove(x) # Set batting order as list of lists: [ ['', ], ... ] batting_order = [] return batting_order def build_lineup(team_object: dict, game_id: int, league_name: str, vs_hand: str = 'r') -> list: players = { 'C': None, '1B': None, '2B': None, '3B': None, 'SS': None, 'LF': None, 'CF': None, 'RF': None, 'DH': None } p_names = [] set_params = [('cardset_id_exclude', 2)] if league_name == 'minor-league': set_params = MINOR_CARDSET_PARAMS elif league_name == 'major-league': set_params = MAJOR_CARDSET_PARAMS # Pull players sorted by current cost try: params = [ ('mlbclub', team_object['lname']), ('pos_include', 'C'), ('pos_include', '1B'), ('pos_include', '2B'), ('pos_include', '3B'), ('pos_include', 'SS'), ('pos_include', 'LF'), ('pos_include', 'CF'), ('pos_include', 'RF'), ('pos_include', 'DH'), ('inc_dex', False), ('sort_by', 'cost-desc') ] params.extend(set_params) all_players = db_get( endpoint='players', params=params, timeout=10 )['players'] except ConnectionError as e: raise ConnectionError(f'Error pulling batters for the {team_object["lname"]}. Cal help plz.') logging.info(f'build_lineup - eligible batter count: {len(all_players)}') # Choose starting nine & position # In order of cost: # Try to add to pos_1; if unavailable, try remaining pos; if unavailable, add to DH # If all pos unavailable; note player's pos_1 and check if incumbent can move; if so, check for new incumbent # and recurse # If not possible, pass player and continue for guy in all_players: placed = False if guy['p_name'] not in p_names: for pos in [ guy['pos_1'], guy['pos_2'], guy['pos_3'], guy['pos_4'], guy['pos_5'], guy['pos_6'], guy['pos_7'], guy['pos_8'] ]: if pos is None or pos in ['SP', 'RP', 'CP']: break elif players[pos] is None: players[pos] = guy p_names.append(guy["p_name"]) placed = True break if not placed: if players['DH'] is None: players['DH'] = guy p_names.append(guy["p_name"]) else: logging.info(f'build_lineup - could not place {guy["p_name"]} in {team_object["sname"]} lineup') else: logging.info(f'build_lineup - {guy["p_name"]} already in lineup') if None in players.values(): bad_pos = {x for x in players if players[x] is None} raise ValueError(f'Could not find a {bad_pos} for the {team_object["sname"]}') logging.info(f'build_lineup - {players}') # Sort players into lineup # Pseudo-random? Patterns? Something to mix up lineups # 421356789 or [123 (rand)][456 (rand)][789 (rand)] (optional: chance to flip 3/4 and 6/7) lineups = [] # [ (, ), (, ), etc. ] sorted_players = sorted(players.items(), key=lambda x: x[1]['cost'], reverse=True) grp_1 = sorted_players[:3] grp_2 = sorted_players[3:6] grp_3 = sorted_players[6:] random.shuffle(grp_1) random.shuffle(grp_2) random.shuffle(grp_3) i = 1 for x in [grp_1, grp_2, grp_3]: for y in x: # get player card; create one if none found z = 0 card_id = None while z < 2 and card_id is None: z += 1 c_query = db_get('cards', params=[('team_id', team_object['id']), ('player_id', y[1]['player_id'])]) if c_query['count'] > 0: card_id = c_query['cards'][0]['id'] else: db_post( 'cards', payload={'cards': [ {'player_id': y[1]['player_id'], 'team_id': team_object['id'], 'pack_id': 1} ]} ) if card_id is None: logging.error(f'Could not create card for {y[1]["p_name"]} on the {team_object["lname"]}') lineups.append({ 'game_id': game_id, 'team_id': team_object['id'], 'player_id': y[1]['player_id'], 'card_id': card_id, 'position': y[0], 'batting_order': i, 'after_play': 0 }) i += 1 logging.info(f'build_lineup - final lineup: {lineups}') return lineups def get_starting_pitcher(team_object: dict, game_id: int, is_home: bool, league_name: str = None) -> dict: set_params = [('cardset_id_exclude', 2)] if league_name == 'minor-league': set_params = MINOR_CARDSET_PARAMS elif league_name == 'major-league': set_params = MAJOR_CARDSET_PARAMS # Pull starters sorted by current cost try: params = [ ('mlbclub', team_object['lname']), ('pos_include', 'SP'), ('pos_exclude', 'RP'), ('inc_dex', False), ('sort_by', 'cost-desc'), ('limit', 5) ] params.extend(set_params) pitchers = db_get( endpoint='players', params=params, timeout=10 ) except ConnectionError as e: logging.error(f'Could not get pitchers for {team_object["lname"]}: {e}') raise ConnectionError(f'Error pulling starting pitchers for the {team_object["lname"]}. Cal help plz.') logging.info(f'build_lineup - eligible starting pitcher count: {len(pitchers)}') if pitchers['count'] == 0: raise DatabaseError(f'Could not find any SP for {team_object["abbrev"]}. Seems like a Cal issue.') d_100 = random.randint(1, 100) starter = None if is_home: if d_100 <= 30: starter = pitchers['players'][0] elif d_100 <= 55 and pitchers['count'] > 1: starter = pitchers['players'][1] elif d_100 <= 75 and pitchers['count'] > 2: starter = pitchers['players'][2] elif d_100 <= 90 and pitchers['count'] > 3: starter = pitchers['players'][3] elif pitchers['count'] == 5: starter = pitchers['players'][4] else: starter = pitchers['players'][0] else: if d_100 <= 50: starter = pitchers['players'][0] elif d_100 <= 75 and pitchers['count'] > 1: starter = pitchers['players'][1] elif d_100 <= 85 and pitchers['count'] > 2: starter = pitchers['players'][2] elif d_100 <= 95 and pitchers['count'] > 3: starter = pitchers['players'][3] elif pitchers['count'] == 5: starter = pitchers['players'][4] else: starter = pitchers['players'][0] # get player card; create one if none found z = 0 card_id = None while z < 2 and card_id is None: z += 1 c_query = db_get('cards', params=[('team_id', team_object['id']), ('player_id', starter['player_id'])]) if c_query['count'] > 0: card_id = c_query['cards'][0]['id'] else: db_post( 'cards', payload={'cards': [ {'player_id': starter['player_id'], 'team_id': team_object['id'], 'pack_id': 1} ]} ) if card_id is None: logging.error(f'Could not create card for {starter["p_name"]} on the {team_object["lname"]}') return { 'game_id': game_id, 'team_id': team_object['id'], 'player_id': starter['player_id'], 'card_id': card_id, 'position': 'P', 'batting_order': 10, 'after_play': 0 } def get_relief_pitcher(this_play: StratPlay, ai_team: dict, used_pitchers: list, league_name: str = None) -> dict: used_codes = [db_get('cards', object_id=x.card_id)["player"]["strat_code"] for x in used_pitchers] logging.info(f'get_rp - used_pitchers: {used_codes}') reliever = None attempts = 0 while reliever is None: attempts += 1 if attempts > 3: raise ValueError(f'Could not find a reliever for the {ai_team["sname"]}. Cal plz.') set_params = [('cardset_id_exclude', 2)] if league_name == 'minor-league': set_params = MINOR_CARDSET_PARAMS elif league_name == 'major-league': set_params = MAJOR_CARDSET_PARAMS # Pull relievers sorted by current cost params = [ ('mlbclub', ai_team['lname']), ('pos_include', 'RP'), ('inc_dex', False), ('sort_by', 'cost-desc') ] params.extend(set_params) alt_flag = False if attempts == 1: # Try to get long man if this_play.inning_num < 6: logging.info(f'get_rp - game {this_play.game.id} try for long man') params.append(('pos_include', 'SP')) alt_flag = True # Try to get closer elif this_play.inning_num > 8: if (this_play.inning_half == 'top' and this_play.home_score >= this_play.away_score) or \ (this_play.inning_half == 'bot' and this_play.away_score >= this_play.home_score): logging.info(f'get_rp - game {this_play.game.id} try for closer') params.append(('pos_include', 'CP')) alt_flag = True else: params.append(('pos_exclude', 'CP')) # Try to exclude long men elif attempts == 1: logging.info(f'get_rp - game {this_play.game.id} try to exclude long men') params.append(('pos_exclude', 'SP')) try: pitchers = db_get( endpoint='players', params=params, timeout=10 ) except ConnectionError as e: logging.error(f'Could not get pitchers for {ai_team["lname"]}: {e}') raise ConnectionError(f'Error pulling starting pitchers for the {ai_team["lname"]}. Cal help plz.') if pitchers['count'] > 0: if alt_flag or this_play.inning_num > 9 or attempts > 2: start = 0 else: start = 9 - this_play.inning_num for count, guy in enumerate(pitchers['players']): if count >= start and guy['strat_code'] not in used_codes: card_id = get_or_create_card(guy, ai_team) return { 'game_id': this_play.game.id, 'team_id': ai_team['id'], 'player_id': guy['player_id'], 'card_id': card_id, 'position': 'P', 'batting_order': 10, 'after_play': this_play.play_num - 1 }