# Economy Team Setup Module # Contains team creation and Google Sheets integration from the original economy.py import logging from discord.ext import commands from discord import app_commands import discord import datetime import os from typing import Optional # Import specific utilities needed by this module import pygsheets from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev from help_text import SHEET_SHARE_STEPS, HELP_SHEET_SCRIPTS from helpers.constants import PD_PLAYERS, ALL_MLB_TEAMS from helpers import ( get_team_by_owner, share_channel, get_role, get_cal_user, get_or_create_role, display_cards, give_packs, get_all_pos, get_sheets, refresh_sheet, post_ratings_guide, team_summary_embed, get_roster_sheet, Question, Confirm, ButtonOptions, legal_channel, get_channel, create_channel, get_context_user ) from api_calls import team_hash from helpers.discord_utils import get_team_embed, send_to_channel logger = logging.getLogger('discord_app') class TeamSetup(commands.Cog): """Team creation and Google Sheets integration functionality for Paper Dynasty.""" def __init__(self, bot): self.bot = bot @app_commands.command(name='newteam', description='Get your fresh team for a new season') @app_commands.checks.has_any_role(PD_PLAYERS) @app_commands.describe( gm_name='The fictional name of your team\'s GM', team_abbrev='2, 3, or 4 character abbreviation (e.g. WV, ATL, MAD)', team_full_name='City/location and name (e.g. Baltimore Orioles)', team_short_name='Name of team (e.g. Yankees)', mlb_anchor_team='2 or 3 character abbreviation of your anchor MLB team (e.g. NYM, MKE)', team_logo_url='[Optional] URL ending in .png or .jpg for your team logo', color='[Optional] Hex color code to highlight your team' ) async def new_team_slash( self, interaction: discord.Interaction, gm_name: str, team_abbrev: str, team_full_name: str, team_short_name: str, mlb_anchor_team: str, team_logo_url: str = None, color: str = None): owner_team = await get_team_by_owner(interaction.user.id) current = await db_get('current') # Check for existing team if owner_team and not os.environ.get('TESTING'): await interaction.response.send_message( f'Whoa there, bucko. I already have you down as GM of the {owner_team["sname"]}.' ) return # Check for duplicate team data dupes = await db_get('teams', params=[('abbrev', team_abbrev)]) if dupes['count']: await interaction.response.send_message( f'Yikes! {team_abbrev.upper()} is a popular abbreviation - it\'s already in use by the ' f'{dupes["teams"][0]["sname"]}. No worries, though, you can run the `/newteam` command again to get ' f'started!' ) return # Check for duplicate team data dupes = await db_get('teams', params=[('lname', team_full_name)]) if dupes['count']: await interaction.response.send_message( f'Yikes! {team_full_name.title()} is a popular name - it\'s already in use by ' f'{dupes["teams"][0]["abbrev"]}. No worries, though, you can run the `/newteam` command again to get ' f'started!' ) return # Get personal bot channel hello_channel = discord.utils.get( interaction.guild.text_channels, name=f'hello-{interaction.user.name.lower()}' ) if hello_channel: op_ch = hello_channel else: op_ch = await create_channel( interaction, channel_name=f'hello-{interaction.user.name}', category_name='Paper Dynasty Team', everyone_read=False, read_send_members=[interaction.user] ) await share_channel(op_ch, interaction.guild.me) await share_channel(op_ch, interaction.user) try: poke_role = get_role(interaction, 'Pokétwo') await share_channel(op_ch, poke_role, read_only=True) except Exception as e: logger.error(f'unable to share sheet with Poketwo') await interaction.response.send_message( f'Let\'s head down to your private channel: {op_ch.mention}', ephemeral=True ) await op_ch.send(f'Hey there, {interaction.user.mention}! I am Paper Domo - welcome to season ' f'{current["season"]} of Paper Dynasty! We\'ve got a lot of special updates in store for this ' f'season including live cards, throwback cards, and special events.') # Confirm user is happy with branding embed = get_team_embed( f'Branding Check', { 'logo': team_logo_url if team_logo_url else None, 'color': color if color else 'a6ce39', 'season': 4 } ) embed.add_field(name='GM Name', value=gm_name, inline=False) embed.add_field(name='Full Team Name', value=team_full_name) embed.add_field(name='Short Team Name', value=team_short_name) embed.add_field(name='Team Abbrev', value=team_abbrev.upper()) view = Confirm(responders=[interaction.user]) question = await op_ch.send('Are you happy with this branding? Don\'t worry - you can update it later!', embed=embed, view=view) await view.wait() if not view.value: await question.edit( content='~~Are you happy with this branding?~~\n\nI gotta go, but when you\'re ready to start again ' 'run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the ' 'command from last time and make edits.', view=None ) return await question.edit( content='Looking good, champ in the making! Let\'s get you your starter team!', view=None ) team_choice = None if mlb_anchor_team.title() in ALL_MLB_TEAMS.keys(): team_choice = mlb_anchor_team.title() else: for x in ALL_MLB_TEAMS: if mlb_anchor_team.upper() in ALL_MLB_TEAMS[x] or mlb_anchor_team.title() in ALL_MLB_TEAMS[x]: team_choice = x break team_string = mlb_anchor_team logger.debug(f'team_string: {team_string} / team_choice: {team_choice}') if not team_choice: # Get MLB anchor team while True: prompt = f'I don\'t recognize **{team_string}**. I try to recognize abbreviations (BAL), ' \ f'short names (Orioles), and long names ("Baltimore Orioles").\n\nWhat MLB club would you ' \ f'like to use as your anchor team?' this_q = Question(self.bot, op_ch, prompt, 'text', 120) team_string = await this_q.ask([interaction.user]) if not team_string: await op_ch.send( f'Tell you hwat. You think on it and come back I gotta go, but when you\'re ready to start again ' 'run the `/newteam` command again and we can get rolling! Hint: you can copy and paste the ' 'command from last time and make edits.' ) return if team_string.title() in ALL_MLB_TEAMS.keys(): team_choice = team_string.title() break else: match = False for x in ALL_MLB_TEAMS: if team_string.upper() in ALL_MLB_TEAMS[x] or team_string.title() in ALL_MLB_TEAMS[x]: team_choice = x match = True break if not match: await op_ch.send(f'Got it!') team = await db_post('teams', payload={ 'abbrev': team_abbrev.upper(), 'sname': team_short_name, 'lname': team_full_name, 'gmid': interaction.user.id, 'gmname': gm_name, 'gsheet': 'None', 'season': current['season'], 'wallet': 100, 'color': color if color else 'a6ce39', 'logo': team_logo_url if team_logo_url else None }) if not team: await op_ch.send(f'Frick. {get_cal_user(interaction).mention}, can you help? I can\'t find this team.') return t_role = await get_or_create_role(interaction, f'{team_abbrev} - {team_full_name}') await interaction.user.add_roles(t_role) anchor_players = [] anchor_all_stars = await db_get( 'players/random', params=[ ('min_rarity', 3), ('max_rarity', 3), ('franchise', team_choice), ('pos_exclude', 'RP'), ('limit', 1), ('in_packs', True) ] ) anchor_starters = await db_get( 'players/random', params=[ ('min_rarity', 2), ('max_rarity', 2), ('franchise', team_choice), ('pos_exclude', 'RP'), ('limit', 2), ('in_packs', True) ] ) if not anchor_all_stars: await op_ch.send(f'I am so sorry, but the {team_choice} do not have an All-Star to ' f'provide as your anchor player. Let\'s start this process over - will you please ' f'run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the ' 'command from last time and make edits.') await db_delete('teams', object_id=team['id']) return if not anchor_starters or anchor_starters['count'] <= 1: await op_ch.send(f'I am so sorry, but the {team_choice} do not have two Starters to ' f'provide as your anchor players. Let\'s start this process over - will you please ' f'run the `/newteam` command again with a new MLB club?\nHint: you can copy and paste the ' 'command from last time and make edits.') await db_delete('teams', object_id=team['id']) return anchor_players.append(anchor_all_stars['players'][0]) anchor_players.append(anchor_starters['players'][0]) anchor_players.append(anchor_starters['players'][1]) this_pack = await db_post('packs/one', payload={'team_id': team['id'], 'pack_type_id': 2, 'open_time': datetime.datetime.timestamp(datetime.datetime.now())*1000}) roster_counts = { 'SP': 0, 'RP': 0, 'CP': 0, 'C': 0, '1B': 0, '2B': 0, '3B': 0, 'SS': 0, 'LF': 0, 'CF': 0, 'RF': 0, 'DH': 0, 'All-Star': 0, 'Starter': 0, 'Reserve': 0, 'Replacement': 0, } def update_roster_counts(players: list): for pl in players: roster_counts[pl['rarity']['name']] += 1 for x in get_all_pos(pl): roster_counts[x] += 1 logger.warning(f'Roster counts for {team["sname"]}: {roster_counts}') # Add anchor position coverage update_roster_counts(anchor_players) await db_post('cards', payload={'cards': [ {'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in anchor_players] }, timeout=10) # Get 10 pitchers to seed team five_sps = await db_get('players/random', params=[('pos_include', 'SP'), ('max_rarity', 1), ('limit', 5)]) five_rps = await db_get('players/random', params=[('pos_include', 'RP'), ('max_rarity', 1), ('limit', 5)]) team_sp = [x for x in five_sps['players']] team_rp = [x for x in five_rps['players']] update_roster_counts([*team_sp, *team_rp]) await db_post('cards', payload={'cards': [ {'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in [*team_sp, *team_rp]] }, timeout=10) # TODO: track reserve vs replacement and if rep < res, get rep, else get res # Collect infielders team_infielders = [] for pos in ['C', '1B', '2B', '3B', 'SS']: max_rar = 1 if roster_counts['Replacement'] < roster_counts['Reserve']: max_rar = 0 r_draw = await db_get( 'players/random', params=[('pos_include', pos), ('max_rarity', max_rar), ('limit', 2)], none_okay=False ) team_infielders.extend(r_draw['players']) update_roster_counts(team_infielders) await db_post('cards', payload={'cards': [ {'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in team_infielders] }, timeout=10) # Collect outfielders team_outfielders = [] for pos in ['LF', 'CF', 'RF']: max_rar = 1 if roster_counts['Replacement'] < roster_counts['Reserve']: max_rar = 0 r_draw = await db_get( 'players/random', params=[('pos_include', pos), ('max_rarity', max_rar), ('limit', 2)], none_okay=False ) team_outfielders.extend(r_draw['players']) update_roster_counts(team_outfielders) await db_post('cards', payload={'cards': [ {'player_id': x['player_id'], 'team_id': team['id'], 'pack_id': this_pack['id']} for x in team_outfielders] }, timeout=10) async with op_ch.typing(): done_anc = await display_cards( [{'player': x, 'team': team} for x in anchor_players], team, op_ch, interaction.user, self.bot, cust_message=f'Let\'s take a look at your three {team_choice} anchor players.\n' f'Press `Close Pack` to continue.', add_roster=False ) error_text = f'Yikes - I can\'t display the rest of your team. {get_cal_user(interaction).mention} plz halp' if not done_anc: await op_ch.send(error_text) async with op_ch.typing(): done_sp = await display_cards( [{'player': x, 'team': team} for x in team_sp], team, op_ch, interaction.user, self.bot, cust_message=f'Here are your starting pitchers.\n' f'Press `Close Pack` to continue.', add_roster=False ) if not done_sp: await op_ch.send(error_text) async with op_ch.typing(): done_rp = await display_cards( [{'player': x, 'team': team} for x in team_rp], team, op_ch, interaction.user, self.bot, cust_message=f'And now for your bullpen.\n' f'Press `Close Pack` to continue.', add_roster=False ) if not done_rp: await op_ch.send(error_text) async with op_ch.typing(): done_inf = await display_cards( [{'player': x, 'team': team} for x in team_infielders], team, op_ch, interaction.user, self.bot, cust_message=f'Next let\'s take a look at your infielders.\n' f'Press `Close Pack` to continue.', add_roster=False ) if not done_inf: await op_ch.send(error_text) async with op_ch.typing(): done_out = await display_cards( [{'player': x, 'team': team} for x in team_outfielders], team, op_ch, interaction.user, self.bot, cust_message=f'Now let\'s take a look at your outfielders.\n' f'Press `Close Pack` to continue.', add_roster=False ) if not done_out: await op_ch.send(error_text) await give_packs(team, 1) await op_ch.send( f'To get you started, I\'ve spotted you 100₼ and a pack of cards. You can rip that with the ' f'`/open` command once your google sheet is set up!' ) await op_ch.send( f'{t_role.mention}\n\n' f'There\'s your roster! We have one more step and you will be ready to play.\n\n{SHEET_SHARE_STEPS}\n\n' f'{get_roster_sheet({"gsheet": current["gsheet_template"]})}' ) new_team_embed = await team_summary_embed(team, interaction, include_roster=False) await send_to_channel( self.bot, "pd-network-news", content='A new challenger approaches...', embed=new_team_embed ) @commands.hybrid_command(name='newsheet', help='Link a new team sheet with your team') @commands.has_any_role(PD_PLAYERS) async def share_sheet_command( self, ctx, google_sheet_url: str, team_abbrev: Optional[str], copy_rosters: Optional[bool] = True): owner_team = await get_team_by_owner(get_context_user(ctx).id) if not owner_team: await ctx.send(f'I don\'t see a team for you, yet. You can sign up with the `/newteam` command!') return team = owner_team if team_abbrev and team_abbrev != owner_team['abbrev']: if get_context_user(ctx).id != 258104532423147520: await ctx.send(f'You can only update the team sheet for your own team, you goober.') return else: team = await get_team_by_abbrev(team_abbrev) current = await db_get('current') if current['gsheet_template'] in google_sheet_url: await ctx.send(f'Ope, looks like that is the template sheet. Would you please make a copy and then share?') return gauntlet_team = await get_team_by_abbrev(f'Gauntlet-{owner_team["abbrev"]}') if gauntlet_team: view = ButtonOptions([ctx.author], timeout=30, labels=['Main Team', 'Gauntlet Team', None, None, None]) question = await ctx.send(f'Is this sheet for your main PD team or your active Gauntlet team?', view=view) await view.wait() if not view.value: await question.edit( content=f'Okay you keep thinking on it and get back to me when you\'re ready.', view=None ) return elif view.value == 'Gauntlet Team': await question.delete() team = gauntlet_team sheets = get_sheets(self.bot) response = await ctx.send(f'I\'ll go grab that sheet...') try: new_sheet = sheets.open_by_url(google_sheet_url) except Exception as e: logger.error(f'Error accessing {team["abbrev"]} sheet: {e}') current = await db_get('current') await ctx.send(f'I wasn\'t able to access that sheet. Did you remember to share it with my PD email?' f'\n\nHere\'s a quick refresher:\n{SHEET_SHARE_STEPS}\n\n' f'{get_roster_sheet({"gsheet": current["gsheet_template"]})}') return team_data = new_sheet.worksheet_by_title('Team Data') if not gauntlet_team or owner_team != gauntlet_team: team_data.update_values( crange='B1:B2', values=[[f'{team["id"]}'], [f'{team_hash(team)}']] ) if copy_rosters and team['gsheet'].lower() != 'none': old_sheet = sheets.open_by_key(team['gsheet']) r_sheet = old_sheet.worksheet_by_title(f'My Rosters') roster_ids = r_sheet.range('B3:B80') lineups_data = r_sheet.range('H4:M26') new_r_data, new_l_data = [], [] for row in roster_ids: if row[0].value != '': new_r_data.append([int(row[0].value)]) else: new_r_data.append([None]) logger.debug(f'new_r_data: {new_r_data}') for row in lineups_data: logger.debug(f'row: {row}') new_l_data.append([ row[0].value if row[0].value != '' else None, int(row[1].value) if row[1].value != '' else None, row[2].value if row[2].value != '' else None, int(row[3].value) if row[3].value != '' else None, row[4].value if row[4].value != '' else None, int(row[5].value) if row[5].value != '' else None ]) logger.debug(f'new_l_data: {new_l_data}') new_r_sheet = new_sheet.worksheet_by_title(f'My Rosters') new_r_sheet.update_values( crange='B3:B80', values=new_r_data ) new_r_sheet.update_values( crange='H4:M26', values=new_l_data ) if team['has_guide']: post_ratings_guide(team, self.bot, this_sheet=new_sheet) team = await db_patch('teams', object_id=team['id'], params=[('gsheet', new_sheet.id)]) await refresh_sheet(team, self.bot, sheets) conf_message = f'Alright, your sheet is linked to your team - good luck' if owner_team == team: conf_message += ' this season!' else: conf_message += ' on your run!' conf_message += f'\n\n{HELP_SHEET_SCRIPTS}' await response.edit(content=f'{conf_message}') async def setup(bot): """Setup function for the TeamSetup cog.""" await bot.add_cog(TeamSetup(bot))