import copy import logging import random import pydantic from db_calls_gameplay import StratPlay, StratGame, get_one_lineup, get_manager, get_team_lineups, \ get_last_inning_end_play, make_sub, get_player from db_calls import db_get, db_post from peewee import * from typing import Optional, Literal db = SqliteDatabase( 'storage/ai-database.db', pragmas={ 'journal_mode': 'wal', 'cache_size': -1 * 64000, 'synchronous': 0 } ) # 2018, Mario MINOR_CARDSET_PARAMS = [('cardset_id', 8), ('cardset_id', 13)] # 2018, 2023, Mario, 2013 MAJOR_CARDSET_PARAMS = [ ('cardset_id', 9), ('cardset_id', 8), ('cardset_id', 6), ('cardset_id', 13) ] # 2008, 2012, 2013, 2016, 2019, 2021, 2022, 2023, Mario HOF_CARDSET_PARAMS = [ ('cardset_id', 1), ('cardset_id', 3), ('cardset_id', 4), ('cardset_id', 5), ('cardset_id', 6), ('cardset_id', 7), ('cardset_id', 8), ('cardset_id', 9), ('cardset_id', 10), ('cardset_id', 11), ('cardset_id', 12), ('cardset_id', 13) ] # 2008, 2012, 2013, 2016 GAUNTLET2_PARAMS = [ ('cardset_id', 8), ('cardset_id', 12), ('cardset_id', 6), ('cardset_id', 7), ('cardset_id', 11) ] class BaseModel(Model): class Meta: database = db class Lineup(BaseModel): team_id = IntegerField() game_level = CharField() # 'minor', 'major', 'hof' vs_hand = CharField() # 'left', 'right' bat_one_card = IntegerField() bat_one_pos = CharField() bat_two_card = IntegerField() bat_two_pos = CharField() bat_three_card = IntegerField() bat_three_pos = CharField() bat_four_card = IntegerField() bat_four_pos = CharField() bat_five_card = IntegerField() bat_five_pos = CharField() bat_six_card = IntegerField() bat_six_pos = CharField() bat_seven_card = IntegerField() bat_seven_pos = CharField() bat_eight_card = IntegerField() bat_eight_pos = CharField() bat_nine_card = IntegerField() bat_nine_pos = CharField() class Rotation(BaseModel): team_id = IntegerField() game_level = CharField() sp_one_card = IntegerField() sp_two_card = IntegerField() sp_three_card = IntegerField() sp_four_card = IntegerField() sp_five_card = IntegerField() class Bullpen(BaseModel): team_id = IntegerField() game_level = CharField() # 'minor', 'major', 'hof' vs_hand = CharField() # 'left', 'right' # def grade_player() def batter_grading(vs_hand, rg_data): raw_bat = (rg_data[f'contact-{vs_hand}'] + rg_data[f'power-{vs_hand}'] + min(rg_data[f'contact-{vs_hand}'], rg_data[f'power-{vs_hand}'])) / 3 other_metrics = [ rg_data['vision'], rg_data['speed'], rg_data['stealing'], rg_data['reaction'], rg_data['arm'], rg_data['fielding'] ] blended_rating = sum(sorted(other_metrics, reverse=True)[:4], raw_bat * 4) / 8 return { 'overall': rg_data['overall'], 'raw-bat': raw_bat, 'blended': blended_rating } async def get_or_create_card(player: dict, team: dict) -> int: # get player card; create one if none found z = 0 card_id = None while z < 2 and card_id is None: z += 1 c_query = await db_get('cards', params=[('team_id', team['id']), ('player_id', player['player_id'])]) if c_query['count'] > 0: card_id = c_query['cards'][0]['id'] else: await db_post( 'cards', payload={'cards': [ {'player_id': player['player_id'], 'team_id': team['id'], 'pack_id': 1} ]} ) if card_id is None: logging.error(f'Could not create card for {player["p_name"]} on the {team["lname"]}') raise DatabaseError(f'Could not create card for {player["p_name"]} on the {team["lname"]}') return card_id # First attempt - pulls ratings guide info async def build_lineup_graded(team_object: dict, vs_hand: str, league_name: str, num_innings: int, batter_rg): in_lineup = [] # player ids for quick checking players = { 'c': [], '1b': [], '2b': [], '3b': [], 'ss': [], 'lf': [], 'cf': [], 'rf': [], 'dh': [] } # Get all eligible players try: p_query = await db_get( endpoint='players', params=[('mlbclub', team_object['lname']), ('pos_exclude', 'RP'), ('inc_dex', False)], timeout=10 ) all_players = p_query['players'] except ConnectionError as e: raise ConnectionError(f'Error pulling players for the {team_object["lname"]}. Cal help plz.') # Grade players and add to position lists for x in all_players: if x['pos_1'] not in ['SP', 'RP']: this_guy = copy.deepcopy(x) rg_data = next(x for x in batter_rg if x['player_id'] == this_guy['player_id']) grading = batter_grading(vs_hand, rg_data) logging.info(f'player: {this_guy} / grading: {grading}') this_guy['overall'] = grading['overall'] this_guy['raw-bat'] = grading['raw-bat'] this_guy['blended'] = grading['blended'] players['dh'].append(this_guy) if 'C' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['c'].append(this_guy) if '1B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['1b'].append(this_guy) if '2B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['2b'].append(this_guy) if '3B' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['3b'].append(this_guy) if 'SS' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['ss'].append(this_guy) if 'LF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['lf'].append(this_guy) if 'CF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['cf'].append(this_guy) if 'RF' in [this_guy['pos_1'], this_guy['pos_2'], this_guy['pos_3'], this_guy['pos_4'], this_guy['pos_5'], this_guy['pos_6'], this_guy['pos_7'], this_guy['pos_8']]: players['rf'].append(this_guy) # Select players for lineup while len(players) > 0: # Sort players dict by position with fewest eligible players this_pass_data = sorted(players.items(), key=lambda item: len(item[1])) logging.info(f'this_pass_data: {this_pass_data}') # Pull one tuple ('', []) this_pos_data = this_pass_data[0] logging.info(f'this_pos_data: {this_pos_data}') # Sort players at this position by blended rating (raw-bat for DH) if this_pos_data[0] == 'dh': this_pos = sorted(this_pos_data[1], key=lambda item: item['raw-bat'], reverse=True) else: this_pos = sorted(this_pos_data[1], key=lambda item: item['blended'], reverse=True) logging.info(f'this_pos: {this_pos}') # Add top player to the lineup in_lineup.append({'position': copy.deepcopy(this_pos_data[0].upper()), 'player': copy.deepcopy(this_pos[0])}) logging.info(f'adding player: {this_pos[0]}') logging.info(f'deleting position: {this_pos_data[0]}') # Remove this position from consideration del players[this_pos_data[0]] for key in players: for x in players[key]: # Remove duplicate players (even across cardsets) once in lineup if x['strat_code'] == this_pos[0]['strat_code']: players[key].remove(x) # Set batting order as list of lists: [ ['', ], ... ] batting_order = [] return batting_order async def build_lineup(team_object: dict, game_id: int, league_name: str, sp_name: str, vs_hand: str = 'r') -> list: # players = { # 'C': None, # '1B': None, # '2B': None, # '3B': None, # 'SS': None, # 'LF': None, # 'CF': None, # 'RF': None, # 'DH': None # } # p_names = [] # rest_name = None # # set_params = [('cardset_id_exclude', 2)] # if team_object['id'] == 58: # set_params = [] # elif league_name == 'minor-league': # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif league_name == 'major-league': # set_params = copy.deepcopy(MAJOR_CARDSET_PARAMS) # elif league_name == 'hall-of-fame': # set_params = copy.deepcopy(HOF_CARDSET_PARAMS) # elif 'gauntlet-1' in league_name: # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif 'gauntlet-2' in league_name: # set_params = random.sample(GAUNTLET2_PARAMS, 2) # # # Pull players sorted by current cost # try: # params = [ # ('mlbclub', team_object['lname']), ('pos_include', 'C'), ('pos_include', '1B'), ('pos_include', '2B'), # ('pos_include', '3B'), ('pos_include', 'SS'), ('pos_include', 'LF'), ('pos_include', 'CF'), # ('pos_include', 'RF'), ('pos_include', 'DH'), ('inc_dex', False), ('sort_by', 'cost-desc'), ('limit', 50) # ] # params.extend(set_params) # p_query = await db_get( # endpoint='players', # params=params, # timeout=10 # ) # all_players = p_query['players'] # except ConnectionError as e: # raise ConnectionError(f'Error pulling batters for the {team_object["lname"]}. Cal help plz.') # # logging.info(f'build_lineup - eligible batter count: {len(all_players)}') # # # Choose starting nine & position # # In order of cost: # # Try to add to pos_1; if unavailable, try remaining pos; if unavailable, add to DH # # If all pos unavailable; note player's pos_1 and check if incumbent can move; if so, check for new incumbent # # and recurse # # If not possible, pass player and continue # for guy in all_players: # placed = False # if guy['p_name'] not in p_names and guy['p_name'] != rest_name: # for pos in [ # guy['pos_1'], guy['pos_2'], guy['pos_3'], guy['pos_4'], guy['pos_5'], guy['pos_6'], guy['pos_7'], # guy['pos_8'] # ]: # if pos is None or pos in ['SP', 'RP', 'CP']: # break # # if random.randint(1, 10) == 10 and rest_name is None and False: # logging.info(f'Resting {guy["p_name"]} in game {game_id}') # rest_name = guy['p_name'] # elif players[pos] is None: # players[pos] = guy # p_names.append(guy["p_name"]) # placed = True # break # # if not placed and rest_name != guy['p_name']: # if players['DH'] is None: # players['DH'] = guy # p_names.append(guy["p_name"]) # else: # logging.info(f'build_lineup - could not place {guy["p_name"]} in {team_object["sname"]} lineup') # else: # logging.info(f'build_lineup - {guy["p_name"]} already in lineup') # # if None in players.values(): # bad_pos = {x for x in players if players[x] is None} # raise ValueError(f'Could not find a {bad_pos} for the {team_object["sname"]}') # # logging.info(f'build_lineup - {players}') # # # Sort players into lineup # # Pseudo-random? Patterns? Something to mix up lineups # # 421356789 or [123 (rand)][456 (rand)][789 (rand)] (optional: chance to flip 3/4 and 6/7) # # # [ (, ), (, ), etc. ] # sorted_players = sorted(players.items(), key=lambda x: x[1]['cost'], reverse=True) build_type = 'fun' l_query = await db_get( f'teams/{team_object["id"]}/lineup/{league_name}?pitcher_name={sp_name}&build_type={build_type}', timeout=6 ) sorted_players = l_query['array'] logging.info(f'sorted_players: {sorted_players}') grp_1 = sorted_players[:3] grp_2 = sorted_players[3:6] grp_3 = sorted_players[6:] random.shuffle(grp_1) random.shuffle(grp_2) random.shuffle(grp_3) lineups = [] i = 1 for x in [grp_1, grp_2, grp_3]: logging.debug(f'group: {x}') for y in x: logging.debug(f'y: {y}') card_id = await get_or_create_card(y[1]['player'], team_object) lineups.append({ 'game_id': game_id, 'team_id': team_object['id'], 'player_id': y[1]['player']['player_id'], 'card_id': card_id, 'position': y[0], 'batting_order': i, 'after_play': 0 }) i += 1 logging.info(f'build_lineup - final lineup: {lineups}') return lineups async def get_starting_pitcher(team_object: dict, game_id: int, is_home: bool, league_name: str = None) -> dict: # set_params = [('cardset_id_exclude', 2)] # if league_name == 'minor-league': # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif league_name == 'major-league': # set_params = copy.deepcopy(MAJOR_CARDSET_PARAMS) # elif league_name == 'hall-of-fame': # set_params = copy.deepcopy(HOF_CARDSET_PARAMS) # elif league_name == 'gauntlet-1': # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif 'gauntlet-2' in league_name: # set_params = random.sample(GAUNTLET2_PARAMS) # # params = [ # ('mlbclub', team_object['lname']), ('pos_include', 'SP'), ('pos_exclude', 'RP'), # ('inc_dex', False), ('sort_by', 'cost-desc'), ('limit', 5) # ] # counter = 0 # logging.info(f'params: {params}') # while counter < 2: # logging.info(f'counter: {counter}') # counter += 1 # # Pull starters sorted by current cost # logging.info(f'counter: {counter} / params: {params}') # try: # params.extend(set_params) # pitchers = await db_get( # endpoint='players', # params=params, # timeout=10 # ) # logging.info(f'pitchers: {pitchers}') # except ConnectionError as e: # logging.error(f'Could not get pitchers for {team_object["lname"]}: {e}') # raise ConnectionError(f'Error pulling starting pitchers for the {team_object["lname"]}. Cal help plz.') # # if pitchers['count'] == 0: # logging.info(f'pitchers is None') # del params # params = [ # ('mlbclub', team_object['lname']), ('pos_include', 'SP'), # ('inc_dex', False), ('sort_by', 'cost-desc'), ('limit', 5) # ] # else: # break # # logging.info(f'build_lineup - eligible starting pitcher count: {len(pitchers)}') # if pitchers['count'] == 0: # raise DatabaseError(f'Could not find any SP for {team_object["abbrev"]}. Seems like a Cal issue.') d_100 = random.randint(1, 100) if is_home: if d_100 <= 30: sp_rank = 1 elif d_100 <= 55: sp_rank = 2 elif d_100 <= 75: sp_rank = 3 elif d_100 <= 90: sp_rank = 4 else: sp_rank = 5 else: if d_100 <= 50: sp_rank = 1 elif d_100 <= 75: sp_rank = 2 elif d_100 <= 85: sp_rank = 3 elif d_100 <= 95: sp_rank = 4 else: sp_rank = 5 starter = await db_get(f'teams/{team_object["id"]}/sp/{league_name}?sp_rank={sp_rank}') # get player card; create one if none found card_id = await get_or_create_card(starter, team_object) return { 'game_id': game_id, 'team_id': team_object['id'], 'player_id': starter['player_id'], 'card_id': card_id, 'position': 'P', 'batting_order': 10, 'after_play': 0 } async def get_relief_pitcher(this_play: StratPlay, ai_team: dict, league_name: str = None) -> dict: used_ids = [] used_players = await get_team_lineups( game_id=this_play.game.id, team_id=ai_team['id'], inc_inactive=True, as_string=False, pitchers_only=True ) for x in used_players: used_ids.append(f'{x.player_id}') logging.debug(f'used ids: {used_ids}') id_string = "&used_pitcher_ids=".join(used_ids) this_game = this_play.game ai_score = this_play.away_score if this_game.away_team_id == ai_team['id'] else this_play.home_score human_score = this_play.home_score if this_game.away_team_id == ai_team['id'] else this_play.away_score logging.debug(f'scores - ai: {ai_score} / human: {human_score}') if abs(ai_score - human_score) >= 7: need = 'length' elif this_play.inning_num >= 9 and abs(ai_score - human_score) <= 3: need = 'closer' elif this_play.inning_num in [7, 8] and abs(ai_score - human_score) <= 3: need = 'setup' elif abs(ai_score - human_score) <= 3: need = 'middle' else: need = 'length' logging.debug(f'need: {need}') rp_query = await db_get(f'teams/{ai_team["id"]}/rp/{league_name.split("-run")[0]}' f'?need={need}&used_pitcher_ids={id_string}') card_id = await get_or_create_card(rp_query, ai_team) return { 'game_id': this_play.game.id, 'team_id': ai_team['id'], 'player_id': rp_query['player_id'], 'card_id': card_id, 'position': 'P', 'batting_order': 10, 'after_play': this_play.play_num - 1 } """ END NEW GET RP """ # used_codes = [] # used_players = await get_team_lineups( # game_id=this_play.game.id, team_id=ai_team['id'], inc_inactive=True, as_string=False # ) # for x in used_players: # c_query = await db_get('cards', object_id=x.card_id) # used_codes.append(c_query["player"]["strat_code"]) # logging.info(f'get_rp - used_players: {used_codes}') # # reliever = None # attempts = 0 # while reliever is None: # attempts += 1 # if attempts > 3: # raise ValueError(f'Could not find a reliever for the {ai_team["sname"]}. Cal plz.') # # set_params = [('cardset_id_exclude', 2)] # if league_name == 'minor-league': # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif league_name == 'major-league': # set_params = copy.deepcopy(MAJOR_CARDSET_PARAMS) # elif league_name == 'hall-of-fame': # set_params = copy.deepcopy(HOF_CARDSET_PARAMS) # elif 'gauntlet-1' in league_name: # set_params = copy.deepcopy(MINOR_CARDSET_PARAMS) # elif 'gauntlet-2' in league_name: # set_params = copy.deepcopy(GAUNTLET2_PARAMS) # # # Pull relievers sorted by current cost # params = [ # ('mlbclub', ai_team['lname']), ('pos_include', 'RP'), ('inc_dex', False), ('sort_by', 'cost-desc'), # ('limit', 15) # ] # params.extend(set_params) # # use_best = False # if attempts == 1: # # Try to get long man # if this_play.inning_num < 6: # logging.info(f'get_rp - game {this_play.game.id} try for long man') # params.append(('pos_include', 'SP')) # # use_best = True # # # Try to get closer # elif this_play.inning_num > 8: # if (this_play.inning_half == 'top' and this_play.home_score >= this_play.away_score) or \ # (this_play.inning_half == 'bot' and this_play.away_score >= this_play.home_score): # logging.info(f'get_rp - game {this_play.game.id} try for closer') # params.append(('pos_include', 'CP')) # use_best = True # else: # params.append(('pos_exclude', 'CP')) # # # Try to exclude long men # elif attempts == 1: # logging.info(f'get_rp - game {this_play.game.id} try to exclude long men') # params.append(('pos_exclude', 'SP')) # # try: # pitchers = await db_get( # endpoint='players', # params=params, # timeout=10 # ) # except ConnectionError as e: # logging.error(f'Could not get pitchers for {ai_team["lname"]}: {e}') # raise ConnectionError(f'Error pulling starting pitchers for the {ai_team["lname"]}. Cal help plz.') # # if pitchers['count'] > 0: # if use_best or this_play.inning_num > 9 or attempts > 2: # start = 0 # else: # start = 9 - this_play.inning_num # # for count, guy in enumerate(pitchers['players']): # if count >= start and guy['strat_code'] not in used_codes: # card_id = await get_or_create_card(guy, ai_team) # # return { # 'game_id': this_play.game.id, # 'team_id': ai_team['id'], # 'player_id': guy['player_id'], # 'card_id': card_id, # 'position': 'P', # 'batting_order': 10, # 'after_play': this_play.play_num - 1 # } def get_pitcher(this_game: StratGame, this_play: StratPlay): p_team_id = this_game.home_team_id if this_play.inning_half == 'top': p_team_id = this_game.away_team_id return get_one_lineup(this_game.id, team_id=p_team_id, position='P', active=True) async def pitching_ai_note(this_play: StratPlay, this_pitcher: dict): this_ai = get_manager(this_play.game) gm_name = f'{this_pitcher["team"]["gmname"]}' used_pitchers = await get_team_lineups( game_id=this_play.game.id, team_id=this_pitcher["team"]['id'], inc_inactive=True, pitchers_only=True, as_string=False ) last_inning_ender = get_last_inning_end_play( this_play.game.id, this_play.inning_half, this_play.inning_num - 1 ) ai_note = '' pitcher = this_pitcher # Pitcher Substitutions new_pitcher = None if last_inning_ender and last_inning_ender.pitcher != this_play.pitcher: logging.debug(f'{this_pitcher["team"]["sname"]} not making a change.') ai_note += f'- have {this_pitcher["p_name"]} finish the inning\n' elif this_play.is_new_inning and this_play.game.short_game and this_play.inning_num != 1: logging.debug(f'{this_pitcher["team"]["sname"]} going the to pen.') if len(used_pitchers) < 8: make_sub(await get_relief_pitcher( this_play, this_pitcher['team'], this_play.game.game_type )) pitcher = await get_player(this_play.game, get_pitcher(this_play.game, this_play)) new_pitcher = pitcher else: ai_note += f'- continue with auto-fatigued {this_pitcher["p_name"]}\n' else: if len(used_pitchers) == 1: ai_note += f'- go to the pen if the pitcher fatigues __and has allowed 5+ baserunners__ ' \ f'(`/log ai-pitcher-sub`)\n' elif len(used_pitchers) < 8: ai_note += f'- go to the pen if the pitcher fatigues (`/log ai-pitcher-sub`)\n' else: ai_note += f' - continue with {this_pitcher["p_name"]}\n' # Holding Baserunners if this_play.starting_outs == 2 and this_play.on_base_code > 0: if this_play.on_base_code in [1, 2]: ai_note += f'- hold the runner\n' elif this_play.on_base_code in [4, 7]: ai_note += f'- hold the runners\n' elif this_play.on_base_code == 5: ai_note += f'- hold the runner on first\n' elif this_play.on_base_code == 6: ai_note += f'- hold the runner on second\n' elif this_play.on_base_code in [1, 5]: ai_note += f'- hold the runner on 1st if they have ***** auto-jump\n' elif this_play.on_base_code == 2: ai_note += f'- hold the runner on 2nd if safe range is 14+\n' # Defensive Alignment if this_play.on_third and this_play.starting_outs < 2: if this_play.on_first: ai_note += f'- play the corners in\n' elif abs(this_play.away_score - this_play.home_score) <= 3: ai_note += f'- play the whole infield in\n' else: ai_note += f'- play the corners in\n' return { 'note': ai_note, 'pitcher': pitcher, 'gm_name': gm_name, 'sub': new_pitcher } def batting_ai_note(this_play: StratPlay, this_batter: dict): this_ai = get_manager(this_play.game) ai_note = '' gm_name = f'{this_batter["team"]["gmname"]}' if this_play.on_first and not this_play.on_second: ai_note += f'- {this_ai.check_jump(2, this_play.starting_outs)}\n' elif this_play.on_second and not this_play.on_third: ai_note += f'- {this_ai.check_jump(3, this_play.starting_outs)}\n' return { 'note': ai_note, 'batter': this_batter, 'gm_name': gm_name }