import csv import json import db_calls import db_calls_gameplay from helpers import * from db_calls import * from discord import Member from discord.ext import commands, tasks from discord import app_commands import in_game # date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' # logging.basicConfig( # filename=f'logs/{date}.log', # format='%(asctime)s - %(levelname)s - %(message)s', # level=logging.WARNING # ) class Admins(commands.Cog): def __init__(self, bot): self.bot = bot self.weekly_reset_done = False # async def cog_load(self): # await self.bot.change_presence(activity=discord.Game(name='strat | .help')) async def cog_command_error(self, ctx, error): await ctx.send(f'{error}') async def dev_startup(self): # Check for Paper Sluggers event e_query = await db_get('events', params=[('name', 'Paper Sluggers')]) if e_query is None: this_event = db_post( 'events', payload={ "name": "Paper Sluggers", "short_desc": f'Draft a team to win you ten games as we celebrate the introduction of the ' f'Mario Super Sluggers cardset to Paper Dynasty!', "long_desc": "", "url": f'https://cdn.discordapp.com/attachments/603421569972305921/1087862987215347805/' f'PD-Mario-Full.png', "active": True } ) else: this_event = e_query['events'][0] # Check for Game Rewards gr_query = await db_get('gamerewards', params=[('name', 'MVP Pack')]) if gr_query['count'] == 0: mv_pack = db_post( 'gamerewards', payload={ 'name': 'MVP', 'pack_type_id': 5 } ) else: mv_pack = gr_query['gamerewards'][0] gr_query = await db_get('gamerewards', params=[('name', 'All-Star Pack')]) if gr_query['count'] == 0: as_pack = db_post( 'gamerewards', payload={ 'name': 'All-Star Pack', 'pack_type_id': 6 } ) else: as_pack = gr_query['gamerewards'][0] gr_query = await db_get('gamerewards', params=[('name', 'Mario Pack')]) if gr_query['count'] == 0: m_pack = await db_post( 'gamerewards', payload={ 'name': 'Mario Pack', 'pack_type_id': 7 } ) else: m_pack = gr_query['gamerewards'][0] # Check for Gauntlet rewards gr_query = await db_get('gauntletrewards', params=[('gauntlet_id', this_event['id'])]) if gr_query['count'] == 0: await db_post( 'gauntletrewards', payload={ 'rewards': [ { 'name': '3 Wins', 'gauntlet_id': this_event['id'], 'reward_id': m_pack['id'], 'win_num': 3 }, { 'name': '6 Wins', 'gauntlet_id': this_event['id'], 'reward_id': as_pack['id'], 'win_num': 6 }, { 'name': '8 Wins', 'gauntlet_id': this_event['id'], 'reward_id': m_pack['id'], 'win_num': 8 }, { 'name': '10 Wins', 'gauntlet_id': this_event['id'], 'reward_id': mv_pack['id'], 'win_num': 10 }, { 'name': '10-0', 'gauntlet_id': this_event['id'], 'reward_id': m_pack['id'], 'win_num': 10, 'loss_max': 0 } ] } ) @commands.command(name='dev_startup', help='Run startup function') async def dev_startup_command(self, ctx): await self.dev_startup() await ctx.send(random_conf_gif()) group_give = app_commands.Group(name='give', description='Mod: Distribute packs or tokens') @group_give.command(name='packs') async def give_packs_subcommand( self, interaction: discord.Interaction, num_packs: int, pack_type: Literal['Standard', 'Premium', 'MVP'], team_abbrevs: str): if not owner_only(interaction): await interaction.response.send_message(random_no_gif()) return current = await db_get('current') await interaction.response.defer() p_query = await db_get('packtypes', params=[('name', pack_type)]) response = '' for x in team_abbrevs.split(' '): t_query = await db_get('teams', params=[('abbrev', x), ('season', current['season'])]) team = t_query['teams'][0] if team: total_packs = await give_packs(team, num_packs, pack_type=p_query['packtypes'][0]) response += f'Just gave {num_packs} {pack_type} pack{"s" if num_packs > 1 else ""} to the ' \ f'{team["sname"]}. They now have {total_packs["count"]} ' \ f'pack{"s" if total_packs["count"] > 1 else ""}.\n' elif x.upper() == 'LEAGUE': all_teams = await db_get('teams', params=[('season', current['season'])]) for y in all_teams['teams']: logging.warning(f'Giving {num_packs} pack(s) to team: {y["abbrev"]}') await give_packs(team, num_packs) response = f'Just gave all {all_teams["count"]} teams {num_packs} ' \ f'standard pack{"s" if num_packs > 1 else ""}!' else: await interaction.edit_original_response(content=f'Hmm...I\'m not sure who **{x.upper()}** is.') return logging.info(f'give info: {response}') await interaction.edit_original_response(content=f'{response if len(response) > 0 else "All done!"}') @commands.hybrid_command(name='post-guide', help='Mod: Post the ratings guide to team sheet') @commands.is_owner() async def post_guide_command(self, ctx, gm: Member): team = await get_team_by_owner(gm.id) t_query = await db_get('teams', params=[('gm_id', gm.id)]) if t_query['count'] == 0: await ctx.send(f'Huh...I don\'t see any teams for {gm.name}') for x in t_query['teams']: await db_patch('teams', object_id=x['id'], params=[('has_guide', True)]) await ctx.send(random_conf_gif()) @app_commands.command(name='add-player-card', description='Mod: Manually upload a new PD card') @app_commands.checks.has_any_role('Da Commish') async def new_manual_card_slash( self, interaction: discord.Interaction, player_type: Literal['batter', 'pitcher'], player_json: str, bc_or_pc_json: str, position_list: str, ratings_vl_json: str, ratings_vr_json: str): await interaction.response.defer() try: d_player = json.loads(player_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response(content=f'RIP. Failed to process that player.') return try: d_bcpc = json.loads(bc_or_pc_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response(content=f'RIP. Failed to process that {player_type} card.') return try: d_positions = json.loads(position_list) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response(content=f'RIP. Failed to process the position data.') return try: d_ratings_vl = json.loads(ratings_vl_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response(content=f'RIP. Failed to process the vL ratings.') return try: d_ratings_vr = json.loads(ratings_vr_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response(content=f'RIP. Failed to process the vR ratings.') return logging.info(f'Data gathered:\n\n{d_player}\n\n{d_bcpc}\n\n{d_positions}\n\n{d_ratings_vl}\n\n{d_ratings_vr}') await interaction.edit_original_response( content='Just spit out the debug info to the log, but processing was successful!') @app_commands.command(name='reset-image', description='Force a refresh of a player\'s card images') async def reset_image(self, interaction: discord.Interaction, player_id: int): await interaction.response.defer() new_player = await db_post(f'players/{player_id}/image-reset') player_embed = await get_card_embeds(get_blank_team_card(new_player)) if not owner_only(interaction): await send_to_channel( self.bot, 'pd-network-news', content=f'{interaction.user.display_name} just refreshed {player_desc(new_player)}\'s card ' f'{await get_emoji(interaction, "prayge")}' ) await interaction.edit_original_response(content=None, embeds=player_embed) @commands.hybrid_command(name='sync-sheets', help='Mod: Sync AI team sheets') @commands.is_owner() async def sync_sheets_command(self, ctx): t_query = await db_get('teams', params=[('is_ai', True)]) response = await ctx.send(f'Alright, I\'m getting started...') sheets = get_sheets(self.bot) for count, team in enumerate(t_query['teams']): this_sheet = sheets.open_by_key(team['gsheet']) team_data = this_sheet.worksheet_by_title('Team Data') team_data.update_values( crange='B1:B2', values=[[f'{team["id"]}'], [f'\'{team_hash(team)}']] ) await response.edit(content=f'Just finished the {team["sname"]} ({count + 1}/{len(t_query["teams"])})...') await response.edit(content=f'All done!') @commands.command(name='update-rarity', help='Mod: Pull current rarities and update players') @commands.is_owner() async def update_rarity_command(self, ctx): await ctx.send(f'Oh boy, here I go sheetsing again!') rarities = { 'MVP': 1, 'All-Star': 2, 'Starter': 3, 'Reserve': 4, 'Replacement': 5, 'Hall of Fame': 99, 'HoF': 99 } def new_cost(player, new_rarity, old_rarity): old_cost = player['cost'] old_rarity = old_rarity new_rarity = new_rarity logging.info(f'old_rarity: {old_rarity} / new_rarity: {new_rarity}') if old_rarity == 1: if new_rarity == 2: return max(old_cost - 540, 100) elif new_rarity == 3: return max(old_cost - 720, 50) elif new_rarity == 4: return max(old_cost - 780, 15) elif new_rarity == 5: return max(old_cost - 800, 5) elif new_rarity == 99: return old_cost + 1600 elif old_rarity == 2: if new_rarity == 1: return old_cost + 540 elif new_rarity == 3: return max(old_cost - 180, 50) elif new_rarity == 4: return max(old_cost - 240, 15) elif new_rarity == 5: return max(old_cost - 260, 5) elif new_rarity == 99: return old_cost + 2140 elif old_rarity == 3: if new_rarity == 1: return old_cost + 720 elif new_rarity == 2: return old_cost + 180 elif new_rarity == 4: return max(old_cost - 60, 15) elif new_rarity == 5: return max(old_cost - 80, 5) elif new_rarity == 99: return old_cost + 2320 elif old_rarity == 4: if new_rarity == 1: return old_cost + 780 elif new_rarity == 2: return old_cost + 240 elif new_rarity == 3: return old_cost + 60 elif new_rarity == 5: return max(old_cost - 20, 5) elif new_rarity == 99: return old_cost + 2380 elif old_rarity == 5: if new_rarity == 1: return old_cost + 800 elif new_rarity == 2: return old_cost + 260 elif new_rarity == 3: return old_cost + 80 elif new_rarity == 4: return old_cost + 20 elif new_rarity == 99: return old_cost + 2400 elif old_rarity == 99: if new_rarity == 1: return max(old_cost - 1600, 800) elif new_rarity == 2: return max(old_cost - 2140, 100) elif new_rarity == 3: return max(old_cost - 2320, 50) elif new_rarity == 4: return max(old_cost - 2380, 15) elif new_rarity == 5: return max(old_cost - 2400, 5) raise KeyError(f'Could not find a cost update for {player["p_name"]} from {player["rarity"]["name"]} to ' f'{new_rarity}') await ctx.send(f'Running player updates...') errors = [] counter = 0 # Read player-json.csv with open('storage/player_json.csv') as csv_file: csv_reader = csv.reader(csv_file) # Per line, search for player by cardset_id and bbref_id for row in csv_reader: if counter > 50: pass p_query = await db_get('players', params=[('cardset_id', 9), ('bbref_id', row[4])]) if p_query['count'] > 0: this_player = p_query['players'][0] updates = [] if '0706' in this_player['image']: updates.append(('image', this_player['image'].replace('2023-0706', '2023-0802'))) if this_player['rarity']['name'] != row[9] and not \ (this_player['rarity']['name'] == 'Hall of Fame' and row[9] == 'HoF'): new_r = rarities[row[9]] updates.append( ('cost', new_cost(this_player, new_r, rarities[this_player['rarity']['name']])) ) updates.append( ('rarity_id', new_r) ) if this_player['pos_1'] != row[11]: updates.append(('pos_1', row[11])) try: if len(row[12]) > 0: if this_player['pos_2'] != row[12]: updates.append(('pos_2', row[12])) elif this_player['pos_2'] is not None: updates.append(('pos_2', False)) except IndexError: pass try: if len(row[13]) > 0: if this_player['pos_3'] != row[13]: updates.append(('pos_3', row[13])) elif this_player['pos_3'] is not None: updates.append(('pos_3', False)) except IndexError: pass try: if len(row[14]) > 0: if this_player['pos_4'] != row[14]: updates.append(('pos_4', row[14])) elif this_player['pos_4'] is not None: updates.append(('pos_4', False)) except IndexError: pass try: if len(row[15]) > 0: if this_player['pos_5'] != row[15]: updates.append(('pos_5', row[15])) elif this_player['pos_5'] is not None: updates.append(('pos_5', False)) except IndexError: pass try: if len(row[16]) > 0: if this_player['pos_6'] != row[16]: updates.append(('pos_6', row[16])) elif this_player['pos_6'] is not None: updates.append(('pos_6', False)) except IndexError: pass try: if len(row[17]) > 0: if this_player['pos_7'] != row[17]: updates.append(('pos_7', row[17])) elif this_player['pos_7'] is not None: updates.append(('pos_7', False)) except IndexError: pass try: if len(row[18]) > 0: if this_player['pos_8'] != row[18]: updates.append(('pos_5', row[18])) elif this_player['pos_8'] is not None: updates.append(('pos_8', False)) except IndexError: pass # Patch player with new rarity_id, cost, and positions 1 - 6 if len(updates) > 0: logging.info(f'Updating {row[1]} - params: {updates}') await db_patch( 'players', object_id=this_player['player_id'], params=updates ) else: errors.append(f'{row[1]} - {row[4]}') counter += 1 await ctx.send(f'All done!') if len(errors) > 0: e_string = "\n- ".join(errors) logging.error(f'update errors:\n{e_string}') await ctx.send(f'I encountered the following errors:\n\n{e_string}') @app_commands.command(name='reset-cache', description='Reset all cached player cards for gameplay') @app_commands.checks.has_any_role('Da Commish') async def reset_cache_command( self, interaction: discord.Interaction, player_cache: Optional[bool] = True, batting_cache: Optional[bool] = True, pitching_cache: Optional[bool] = True): await interaction.response.defer() if player_cache: in_game.data_cache.PLAYER_CACHE = {} if batting_cache: in_game.data_cache.BATTINGCARD_CACHE = {} if pitching_cache: in_game.data_cache.PITCHINGCARD_CACHE = {} await interaction.edit_original_response( content=random_gif(random.choice(['all done', 'yes sir', 'complete'])) ) @commands.command(name='tc', help='Mod: Test command') @commands.is_owner() async def test_choices_command(self, ctx): await ctx.send(f'Wiping AI dexes...') await db_post('paperdex/wipe-ai', timeout=15) await ctx.send(f'All done!') @commands.command(name='get-bc', help='Mod: Test batting card cache') @commands.is_owner() async def get_battingcard_command(self, ctx, player_id: int): await ctx.channel.send(f'Pulling the batting card for player ID: {player_id}') this_data = None async with ctx.channel.typing(): this_data = await in_game.data_cache.get_pd_battingcard(player_id) await ctx.channel.send(f'Dumping data here:\n\n{this_data}') @commands.command(name='get-pc', help='Mod: Test pitching card cache') @commands.is_owner() async def get_pitchngcard_command(self, ctx, player_id: int): await ctx.channel.send(f'Pulling the pitching card for player ID: {player_id}') this_data = None async with ctx.channel.typing(): this_data = await in_game.data_cache.get_pd_pitchingcard(player_id) await ctx.channel.send(f'Dumping data here:\n\n{this_data}') @commands.command(name='test-fatigue', help='Mod: Test the fatigue AI') @commands.is_owner() async def test_fatigue_command(self, ctx, play_id: int): this_play = db_calls_gameplay.convert_stratplay(db_calls_gameplay.Play.get_by_id(play_id)) is_fatigued = in_game.ai_manager.is_pitcher_fatigued(this_play) await ctx.channel.send(f'Checking fatigue for Play #{play_id} / ' f'Pitcher {"IS" if is_fatigued else "IS NOT"} fatigued') async def setup(bot): await bot.add_cog(Admins(bot))