import csv import json import db_calls_gameplay from gauntlets import evolve_pokemon from helpers import * from discord import Member from discord.ext import commands, tasks from discord import app_commands from api_calls import * from helpers import * import in_game from in_game import ai_manager from in_game.gameplay_models import ( Play, Session, select, engine, Game, Cardset, Lineup, Team, Player, ) from in_game.gameplay_queries import get_team_or_none, reset_cache logger = logging.getLogger("discord_app") class Admins(commands.Cog): def __init__(self, bot): self.bot = bot self.weekly_reset_done = False # async def cog_load(self): # await self.bot.change_presence(activity=discord.Game(name='strat | .help')) async def cog_command_error(self, ctx, error): await ctx.send(f"{error}") async def dev_startup(self): # Check for Paper Sluggers event e_query = await db_get("events", params=[("name", "Paper Sluggers")]) if e_query is None: this_event = await db_post( "events", payload={ "name": "Paper Sluggers", "short_desc": f"Draft a team to win you ten games as we celebrate the introduction of the " f"Mario Super Sluggers cardset to Paper Dynasty!", "long_desc": "", "url": f"https://cdn.discordapp.com/attachments/603421569972305921/1087862987215347805/" f"PD-Mario-Full.png", "active": True, }, ) else: this_event = e_query["events"][0] # Check for Game Rewards gr_query = await db_get("gamerewards", params=[("name", "MVP Pack")]) if gr_query: if gr_query["count"] == 0: mv_pack = db_post( "gamerewards", payload={"name": "MVP", "pack_type_id": 5} ) else: mv_pack = gr_query["gamerewards"][0] gr_query = await db_get("gamerewards", params=[("name", "All-Star Pack")]) if gr_query["count"] == 0: as_pack = db_post( "gamerewards", payload={"name": "All-Star Pack", "pack_type_id": 6} ) else: as_pack = gr_query["gamerewards"][0] gr_query = await db_get("gamerewards", params=[("name", "Mario Pack")]) if gr_query["count"] == 0: m_pack = await db_post( "gamerewards", payload={"name": "Mario Pack", "pack_type_id": 7} ) else: m_pack = gr_query["gamerewards"][0] # Check for Gauntlet rewards gr_query = await db_get( "gauntletrewards", params=[("gauntlet_id", this_event["id"])] ) if gr_query["count"] == 0: await db_post( "gauntletrewards", payload={ "rewards": [ { "name": "3 Wins", "gauntlet_id": this_event["id"], "reward_id": m_pack["id"], "win_num": 3, }, { "name": "6 Wins", "gauntlet_id": this_event["id"], "reward_id": as_pack["id"], "win_num": 6, }, { "name": "8 Wins", "gauntlet_id": this_event["id"], "reward_id": m_pack["id"], "win_num": 8, }, { "name": "10 Wins", "gauntlet_id": this_event["id"], "reward_id": mv_pack["id"], "win_num": 10, }, { "name": "10-0", "gauntlet_id": this_event["id"], "reward_id": m_pack["id"], "win_num": 10, "loss_max": 0, }, ] }, ) @commands.command(name="dev_startup", help="Run startup function") async def dev_startup_command(self, ctx): await self.dev_startup() await ctx.send(random_conf_gif()) group_give = app_commands.Group( name="give", description="Mod: Distribute packs or tokens" ) @group_give.command(name="packs") async def give_packs_subcommand( self, interaction: discord.Interaction, team_abbrevs: str, num_packs: int, pack_type: Literal["Standard", "Premium", "MVP"], cardset_id: int = None, ): if not owner_only(interaction): await interaction.response.send_message(random_no_gif()) return current = await db_get("current") await interaction.response.defer() p_query = await db_get("packtypes", params=[("name", pack_type)]) if p_query["count"] == 0: raise KeyError(f"Packtype not found") this_packtype = p_query["packtypes"][0] c_id = None if cardset_id is not None: cardset = await db_get("cardsets", object_id=cardset_id, none_okay=False) c_id = cardset["id"] response = "" for x in team_abbrevs.split(" "): if x.upper() == "LEAGUE": all_teams = await db_get( "teams", params=[("season", current["season"])] ) total_teams = 0 for y in all_teams["teams"]: if not y["is_ai"] and "gauntlet" not in y["abbrev"].lower(): logger.warning( f'Giving {num_packs} pack(s) to team: {y["abbrev"]}' ) await db_post( "packs", payload={ "packs": [ { "team_id": y["id"], "pack_type_id": this_packtype["id"], "pack_cardset_id": c_id, } for x in range(num_packs) ] }, ) total_teams += 1 response = ( f"Just gave all {total_teams} teams {num_packs} " f'{pack_type} pack{"s" if num_packs > 1 else ""}!' ) else: t_query = await db_get( "teams", params=[("abbrev", x), ("season", current["season"])] ) if t_query["count"] > 0: team = t_query["teams"][0] await db_post( "packs", payload={ "packs": [ { "team_id": team["id"], "pack_type_id": this_packtype["id"], "pack_cardset_id": c_id, } for x in range(num_packs) ] }, ) response += f'Just gave {num_packs} {pack_type} pack{"s" if num_packs > 1 else ""} to the {team["sname"]}.' else: await interaction.edit_original_response( content=f"Hmm...I'm not sure who **{x.upper()}** is." ) return logger.info(f"give info: {response}") await interaction.edit_original_response( content=f'{response if len(response) > 0 else "All done!"}' ) @commands.hybrid_command( name="post-guide", help="Mod: Post the ratings guide to team sheet" ) @commands.is_owner() async def post_guide_command(self, ctx, gm: Member): team = await get_team_by_owner(gm.id) t_query = await db_get("teams", params=[("gm_id", gm.id)]) if t_query["count"] == 0: await ctx.send(f"Huh...I don't see any teams for {gm.name}") for x in t_query["teams"]: await db_patch("teams", object_id=x["id"], params=[("has_guide", True)]) await ctx.send(random_conf_gif()) @app_commands.command( name="add-player-card", description="Mod: Manually upload a new PD card" ) @app_commands.checks.has_any_role("Da Commish") async def new_manual_card_slash( self, interaction: discord.Interaction, player_type: Literal["batter", "pitcher"], player_json: str, bc_or_pc_json: str, position_list: str, ratings_vl_json: str, ratings_vr_json: str, ): await interaction.response.defer() try: d_player = json.loads(player_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response( content=f"RIP. Failed to process that player." ) return try: d_bcpc = json.loads(bc_or_pc_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response( content=f"RIP. Failed to process that {player_type} card." ) return try: d_positions = json.loads(position_list) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response( content=f"RIP. Failed to process the position data." ) return try: d_ratings_vl = json.loads(ratings_vl_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response( content=f"RIP. Failed to process the vL ratings." ) return try: d_ratings_vr = json.loads(ratings_vr_json) except json.decoder.JSONDecodeError as e: await interaction.edit_original_response( content=f"RIP. Failed to process the vR ratings." ) return logger.info( f"Data gathered:\n\n{d_player}\n\n{d_bcpc}\n\n{d_positions}\n\n{d_ratings_vl}\n\n{d_ratings_vr}" ) await interaction.edit_original_response( content="Just spit out the debug info to the log, but processing was successful!" ) @app_commands.command( name="reset-image", description="Force a refresh of a player's card images" ) async def reset_image(self, interaction: discord.Interaction, player_id: int): await interaction.response.defer() new_player = await db_post(f"players/{player_id}/image-reset") player_embed = await get_card_embeds(get_blank_team_card(new_player)) if not owner_only(interaction): await send_to_channel( self.bot, "pd-network-news", content=f"{interaction.user.display_name} just refreshed {player_desc(new_player)}'s card " f'{await get_emoji(interaction, "prayge")}', ) await interaction.edit_original_response(content=None, embeds=player_embed) @commands.hybrid_command(name="sync-sheets", help="Mod: Sync AI team sheets") @commands.is_owner() async def sync_sheets_command(self, ctx): t_query = await db_get("teams", params=[("is_ai", True)]) response = await ctx.send(f"Alright, I'm getting started...") sheets = get_sheets(self.bot) for count, team in enumerate(t_query["teams"]): this_sheet = sheets.open_by_key(team["gsheet"]) team_data = this_sheet.worksheet_by_title("Team Data") team_data.update_values( crange="B1:B2", values=[[f'{team["id"]}'], [f"'{team_hash(team)}"]] ) await response.edit( content=f'Just finished the {team["sname"]} ({count + 1}/{len(t_query["teams"])})...' ) await response.edit(content=f"All done!") @commands.command( name="update-rarity", help="Mod: Pull current rarities and update players" ) @commands.is_owner() async def update_rarity_command(self, ctx): await ctx.send(f"Oh boy, here I go sheetsing again!") rarities = { "MVP": 1, "All-Star": 2, "Starter": 3, "Reserve": 4, "Replacement": 5, "Hall of Fame": 99, "HoF": 99, } def new_cost(player, new_rarity, old_rarity): old_cost = player["cost"] old_rarity = old_rarity new_rarity = new_rarity logger.info(f"old_rarity: {old_rarity} / new_rarity: {new_rarity}") if old_rarity == 1: if new_rarity == 2: return max(old_cost - 540, 100) elif new_rarity == 3: return max(old_cost - 720, 50) elif new_rarity == 4: return max(old_cost - 780, 15) elif new_rarity == 5: return max(old_cost - 800, 5) elif new_rarity == 99: return old_cost + 1600 elif old_rarity == 2: if new_rarity == 1: return old_cost + 540 elif new_rarity == 3: return max(old_cost - 180, 50) elif new_rarity == 4: return max(old_cost - 240, 15) elif new_rarity == 5: return max(old_cost - 260, 5) elif new_rarity == 99: return old_cost + 2140 elif old_rarity == 3: if new_rarity == 1: return old_cost + 720 elif new_rarity == 2: return old_cost + 180 elif new_rarity == 4: return max(old_cost - 60, 15) elif new_rarity == 5: return max(old_cost - 80, 5) elif new_rarity == 99: return old_cost + 2320 elif old_rarity == 4: if new_rarity == 1: return old_cost + 780 elif new_rarity == 2: return old_cost + 240 elif new_rarity == 3: return old_cost + 60 elif new_rarity == 5: return max(old_cost - 20, 5) elif new_rarity == 99: return old_cost + 2380 elif old_rarity == 5: if new_rarity == 1: return old_cost + 800 elif new_rarity == 2: return old_cost + 260 elif new_rarity == 3: return old_cost + 80 elif new_rarity == 4: return old_cost + 20 elif new_rarity == 99: return old_cost + 2400 elif old_rarity == 99: if new_rarity == 1: return max(old_cost - 1600, 800) elif new_rarity == 2: return max(old_cost - 2140, 100) elif new_rarity == 3: return max(old_cost - 2320, 50) elif new_rarity == 4: return max(old_cost - 2380, 15) elif new_rarity == 5: return max(old_cost - 2400, 5) raise KeyError( f'Could not find a cost update for {player["p_name"]} from {player["rarity"]["name"]} to ' f"{new_rarity}" ) await ctx.send(f"Running player updates...") errors = [] counter = 0 # Read player-json.csv with open("storage/player_json.csv") as csv_file: csv_reader = csv.reader(csv_file) # Per line, search for player by cardset_id and bbref_id for row in csv_reader: if counter > 50: pass p_query = await db_get( "players", params=[("cardset_id", 9), ("bbref_id", row[4])] ) if p_query["count"] > 0: this_player = p_query["players"][0] updates = [] if "0706" in this_player["image"]: updates.append( ( "image", this_player["image"].replace("2023-0706", "2023-0802"), ) ) if this_player["rarity"]["name"] != row[9] and not ( this_player["rarity"]["name"] == "Hall of Fame" and row[9] == "HoF" ): new_r = rarities[row[9]] updates.append( ( "cost", new_cost( this_player, new_r, rarities[this_player["rarity"]["name"]], ), ) ) updates.append(("rarity_id", new_r)) if this_player["pos_1"] != row[11]: updates.append(("pos_1", row[11])) try: if len(row[12]) > 0: if this_player["pos_2"] != row[12]: updates.append(("pos_2", row[12])) elif this_player["pos_2"] is not None: updates.append(("pos_2", False)) except IndexError: pass try: if len(row[13]) > 0: if this_player["pos_3"] != row[13]: updates.append(("pos_3", row[13])) elif this_player["pos_3"] is not None: updates.append(("pos_3", False)) except IndexError: pass try: if len(row[14]) > 0: if this_player["pos_4"] != row[14]: updates.append(("pos_4", row[14])) elif this_player["pos_4"] is not None: updates.append(("pos_4", False)) except IndexError: pass try: if len(row[15]) > 0: if this_player["pos_5"] != row[15]: updates.append(("pos_5", row[15])) elif this_player["pos_5"] is not None: updates.append(("pos_5", False)) except IndexError: pass try: if len(row[16]) > 0: if this_player["pos_6"] != row[16]: updates.append(("pos_6", row[16])) elif this_player["pos_6"] is not None: updates.append(("pos_6", False)) except IndexError: pass try: if len(row[17]) > 0: if this_player["pos_7"] != row[17]: updates.append(("pos_7", row[17])) elif this_player["pos_7"] is not None: updates.append(("pos_7", False)) except IndexError: pass try: if len(row[18]) > 0: if this_player["pos_8"] != row[18]: updates.append(("pos_5", row[18])) elif this_player["pos_8"] is not None: updates.append(("pos_8", False)) except IndexError: pass # Patch player with new rarity_id, cost, and positions 1 - 6 if len(updates) > 0: logger.info(f"Updating {row[1]} - params: {updates}") await db_patch( "players", object_id=this_player["player_id"], params=updates, ) else: errors.append(f"{row[1]} - {row[4]}") counter += 1 await ctx.send(f"All done!") if len(errors) > 0: e_string = "\n- ".join(errors) logger.error(f"update errors:\n{e_string}") await ctx.send(f"I encountered the following errors:\n\n{e_string}") @app_commands.command( name="reset-cache", description="Reset all cached player cards for gameplay" ) @app_commands.checks.has_any_role("Da Commish") async def reset_cache_command( self, interaction: discord.Interaction, player_cache: Optional[bool] = True, scouting_cache: Optional[bool] = True, team_cache: bool = True, ): await interaction.response.defer() # if player_cache: # in_game.data_cache.PLAYER_CACHE = {} # if batting_cache: # in_game.data_cache.BATTINGCARD_CACHE = {} # if pitching_cache: # in_game.data_cache.PITCHINGCARD_CACHE = {} # if team_cache: # in_game.data_cache.TEAM_CACHE = {} with Session(engine) as session: reset_cache( session=session, players=player_cache, scouting=scouting_cache, team=team_cache, ) await interaction.edit_original_response( content=random_gif( random.choice(["all done", "yes sir", "complete", "finished"]) ) ) @commands.command(name="tc", help="Mod: Test command") @commands.is_owner() async def test_choices_command(self, ctx): await ctx.send(f"Wiping AI dexes...") await db_post("paperdex/wipe-ai", timeout=15) await ctx.send(f"All done!") @commands.command(name="get-bc", help="Mod: Test batting card cache") @commands.is_owner() async def get_battingcard_command(self, ctx, player_id: int): await ctx.channel.send(f"Pulling the batting card for player ID: {player_id}") this_data = None async with ctx.channel.typing(): this_data = await in_game.data_cache.get_pd_battingcard(player_id) await ctx.channel.send(f"Dumping data here:\n\n{this_data}") @commands.command(name="get-pc", help="Mod: Test pitching card cache") @commands.is_owner() async def get_pitchngcard_command(self, ctx, player_id: int): await ctx.channel.send(f"Pulling the pitching card for player ID: {player_id}") this_data = None async with ctx.channel.typing(): this_data = await in_game.data_cache.get_pd_pitchingcard(player_id) await ctx.channel.send(f"Dumping data here:\n\n{this_data}") @commands.command(name="test-fatigue", help="Mod: Test the fatigue AI") @commands.is_owner() async def test_fatigue_command(self, ctx, play_id: int): this_play = db_calls_gameplay.convert_stratplay( db_calls_gameplay.Play.get_by_id(play_id) ) is_fatigued = in_game.ai_manager.is_pitcher_fatigued(this_play) await ctx.channel.send( f"Checking fatigue for Play #{play_id} / " f'Pitcher {"IS" if is_fatigued else "IS NOT"} fatigued' ) @commands.command( name="test-exhibition", help="Mod: Test the lineup gen for exhibition games" ) @commands.is_owner() async def test_exhibition_command( self, ctx, which: Literal["sp", "rp", "lineup"], team_id: int, cardset_ids: str, backup_cardset_ids: str, ): if which == "sp": await ctx.send(f"Fetching a SP for Team ID {team_id}...") this_pitcher = await ai_manager.get_starting_pitcher( {"id": team_id}, game_id=69, is_home=True, league_name="exhibition" ) await ctx.send(f"Selected Pitcher:\n{this_pitcher}") # elif which == 'rp': # await ctx.send(f'Fetching an RP for Team ID {team_id}...') @commands.command( name="test-dropdown", help="Mod: Test the custom dropdown objects" ) @commands.is_owner() async def test_dropdown_command(self, ctx): options = [ discord.SelectOption(label="2024 Season", value="17"), discord.SelectOption(label="2018 Season", value="13"), discord.SelectOption(label="2016 Season", value="11"), discord.SelectOption(label="2008 Season", value="12"), discord.SelectOption(label="2007 Season", value="07"), discord.SelectOption(label="2006 Season", value="06"), discord.SelectOption(label="2005 Season", value="05"), discord.SelectOption(label="2004 Season", value="04"), discord.SelectOption(label="2003 Season", value="03"), discord.SelectOption(label="2002 Season", value="02"), ] async def my_callback(interaction: discord.Interaction, values): await interaction.response.send_message( f'Your selection{"s are" if len(values) > 1 else " is"}: {", ".join(values)}' ) my_dropdown = Dropdown( option_list=options, placeholder="Select a cardset", callback=my_callback, max_values=8, ) view = DropdownView([my_dropdown]) await ctx.send(f"Here is your dropdown:", view=view) @commands.command(name="db-check", help="Mod: Check cached db volume") @commands.is_owner() async def db_check_command(self, ctx): message = await ctx.send( "I'll take a quick peek at the db and see what you've got..." ) with Session(engine) as session: games = session.exec(select(Game.id)).all() cardsets = session.exec(select(Cardset.id)).all() lineups = session.exec(select(Lineup.id)).all() teams = session.exec(select(Team.id)).all() players = session.exec(select(Player.id)).all() plays = session.exec(select(Play.id)).all() output = f"## Database Counts\nGames: {len(games)}\nCardsets: {len(cardsets)}\nLineups: {len(lineups)}\nTeams: {len(teams)}\nPlayers: {len(players)}\nPlays: {len(plays)}" await message.edit(content=output) @commands.command(name="test_evo", help="Mod: Test pokemon evolution") @commands.is_owner() async def test_evolution(self, ctx, team_abbrev: str): with Session(engine) as session: this_team = await get_team_or_none(session, team_abbrev=team_abbrev) await evolve_pokemon(this_team, ctx.channel, responders=[ctx.author]) @app_commands.command( name="resend_scout", description="Admin: Resend a scout opportunity to pack-openings", ) async def resend_scout( self, interaction: discord.Interaction, scout_opp_id: int, minutes: int = 60 ): if not owner_only(interaction): await interaction.response.send_message(random_no_gif()) return await interaction.response.defer(ephemeral=True) import datetime from helpers.scouting import build_scout_embed from helpers.utils import int_timestamp from discord_ui.scout_view import ScoutView # Fetch the scout opportunity scout_opp = await db_get(f"scout_opportunities/{scout_opp_id}") if not scout_opp: await interaction.followup.send( f"Scout opportunity {scout_opp_id} not found.", ephemeral=True ) return # Fetch cards card_ids = scout_opp.get("card_ids", []) if isinstance(card_ids, str): card_ids = json.loads(card_ids) cards = [] for card_id in card_ids: card = await db_get(f"cards/{card_id}") if card: cards.append(card) if not cards: await interaction.followup.send( "Could not fetch any cards.", ephemeral=True ) return # Get opener team (API returns it as a nested object) opener_team = scout_opp.get("opener_team") if not opener_team: await interaction.followup.send( "Could not fetch opener team.", ephemeral=True ) return # Find pack-openings channel channel = discord.utils.get( interaction.guild.text_channels, name="pack-openings" ) if not channel: await interaction.followup.send( "pack-openings channel not found.", ephemeral=True ) return # Build with custom timeout now = datetime.datetime.now() expires_dt = now + datetime.timedelta(minutes=minutes) expires_unix = int(expires_dt.timestamp()) embed, card_lines = build_scout_embed( opener_team, cards, expires_unix=expires_unix ) view = ScoutView( scout_opp_id=scout_opp_id, cards=cards, opener_team=opener_team, opener_user_id=0, bot=self.bot, expires_unix=expires_unix, ) view.timeout = float(minutes * 60) view.card_lines = card_lines msg = await channel.send(embed=embed, view=view) view.message = msg await interaction.followup.send( f"Scout opportunity #{scout_opp_id} posted to {channel.mention} with {minutes} min window.", ephemeral=True, ) async def setup(bot): await bot.add_cog(Admins(bot))