Created get_context_user() helper function to safely extract the user from either Context or Interaction objects. This prevents AttributeError issues when hybrid commands are invoked as slash commands. Hybrid commands receive commands.Context (with .author) when invoked with prefix commands, but discord.Interaction (with .user) when invoked as slash commands. The helper function handles both cases transparently. Updated all affected hybrid commands: - /branding-pd (cogs/players.py, cogs/players_new/team_management.py) - /pullroster (cogs/players.py, cogs/players_new/team_management.py) - /newsheet (cogs/economy_new/team_setup.py) - /lastpack (cogs/economy_new/packs.py) This follows the same pattern as the owner_only() fix and provides a consistent, maintainable solution for all hybrid commands. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
352 lines
15 KiB
Python
352 lines
15 KiB
Python
# Team Management Module
|
|
# Contains team information and management functionality from the original players.py
|
|
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
import discord
|
|
from typing import Optional
|
|
|
|
# Import specific utilities needed by this module
|
|
import logging
|
|
import pygsheets
|
|
import requests
|
|
from api_calls import db_get, db_post, db_patch, get_team_by_abbrev
|
|
from helpers import (
|
|
PD_PLAYERS_ROLE_NAME, IMAGES, get_channel, get_team_embed,
|
|
get_blank_team_card, get_team_by_owner, get_rosters,
|
|
legal_channel, team_summary_embed, SelectView,
|
|
is_ephemeral_channel, is_restricted_channel, can_send_message,
|
|
get_context_user
|
|
)
|
|
import helpers
|
|
from helpers.utils import get_roster_sheet
|
|
from helpers.constants import ALL_MLB_TEAMS
|
|
|
|
logger = logging.getLogger('discord_app')
|
|
|
|
|
|
class TeamManagement(commands.Cog):
|
|
"""Team information and management functionality for Paper Dynasty."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
|
|
@app_commands.command(name='team', description='Show team overview and rosters')
|
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
async def team_slash_command(self, interaction: discord.Interaction, team_abbrev: Optional[str] = None):
|
|
"""Display team overview and rosters."""
|
|
|
|
await interaction.response.defer()
|
|
|
|
if team_abbrev:
|
|
team = await get_team_by_abbrev(team_abbrev)
|
|
if not team:
|
|
await interaction.followup.send(f'Could not find team with abbreviation: {team_abbrev}')
|
|
return
|
|
else:
|
|
team = await get_team_by_owner(interaction.user.id)
|
|
if not team:
|
|
await interaction.followup.send('You don\'t have a team yet! Use `/newteam` to create one.')
|
|
return
|
|
|
|
# Create team summary embed
|
|
embed = await team_summary_embed(team, interaction, include_roster=True)
|
|
|
|
# Add roster information
|
|
try:
|
|
rosters = get_rosters(team, self.bot)
|
|
if rosters:
|
|
roster_text = ""
|
|
for i, roster in enumerate(rosters):
|
|
if roster and roster.get('name'):
|
|
card_count = len(roster.get('cards', []))
|
|
roster_text += f"**{roster.get('name', 'Unknown')}**: {card_count} players\n"
|
|
|
|
if roster_text:
|
|
embed.add_field(name="Saved Rosters", value=roster_text, inline=False)
|
|
except Exception as e:
|
|
logger.warning(f"Could not retrieve rosters for team {team.get('abbrev', 'Unknown')}: {e}")
|
|
|
|
# Add Google Sheet link if available
|
|
if team.get('gsheet') and team.get('gsheet') != 'None':
|
|
sheet_link = get_roster_sheet(team, allow_embed=True)
|
|
if sheet_link:
|
|
embed.add_field(name="Team Sheet", value=sheet_link, inline=False)
|
|
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
@app_commands.command(name='update-player', description='Update a player\'s card to a specific MLB team')
|
|
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
async def update_player_team(self, interaction: discord.Interaction, player_id: int):
|
|
"""Update player team functionality."""
|
|
|
|
owner_team = await get_team_by_owner(interaction.user.id)
|
|
if not owner_team:
|
|
await interaction.response.send_message(
|
|
'Thank you for offering to help - if you sign up for a team with /newteam I can let you post updates.',
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
if is_restricted_channel(interaction.channel):
|
|
await interaction.response.send_message(
|
|
f'Slide on down to #pd-bot-hole to run updates - thanks!',
|
|
ephemeral=True
|
|
)
|
|
return
|
|
|
|
await interaction.response.defer()
|
|
|
|
this_player = await db_get('players', object_id=player_id)
|
|
if not this_player:
|
|
await interaction.edit_original_response(content=f'No clue who that is.')
|
|
return
|
|
|
|
from helpers import get_card_embeds
|
|
embed = await get_card_embeds(get_blank_team_card(this_player))
|
|
await interaction.edit_original_response(content=None, embed=embed[0])
|
|
|
|
view = helpers.Confirm(responders=[interaction.user])
|
|
if can_send_message(interaction.channel):
|
|
question = await interaction.channel.send(
|
|
content='Is this the player you want to update?',
|
|
view=view
|
|
)
|
|
else:
|
|
question = await interaction.followup.send(
|
|
content='Is this the player you want to update?',
|
|
view=view
|
|
)
|
|
await view.wait()
|
|
|
|
if not view.value:
|
|
await question.edit(content='Okay, we\'ll leave it be.', view=None)
|
|
return
|
|
else:
|
|
await question.delete()
|
|
|
|
view = SelectView([
|
|
helpers.SelectUpdatePlayerTeam('AL', this_player, owner_team, self.bot),
|
|
helpers.SelectUpdatePlayerTeam('NL', this_player, owner_team, self.bot)
|
|
])
|
|
if can_send_message(interaction.channel):
|
|
await interaction.channel.send(content=None, view=view)
|
|
else:
|
|
await interaction.followup.send(content=None, view=view)
|
|
|
|
@commands.hybrid_command(name='branding-pd', help='Update your team branding')
|
|
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
@commands.check(legal_channel)
|
|
async def branding_command(self, ctx, team_logo_url: str = None, color: str = None,
|
|
short_name: str = None, full_name: str = None, *, branding_updates: Optional[str] = None):
|
|
"""Update team branding (logo, color, etc.)."""
|
|
|
|
team = await get_team_by_owner(get_context_user(ctx).id)
|
|
if not team:
|
|
await ctx.send('You don\'t have a team yet! Use `/newteam` to create one.')
|
|
return
|
|
|
|
# Handle both old interface (individual parameters) and new interface (parsed string)
|
|
params = []
|
|
updates = {}
|
|
|
|
# Check if using old interface (individual parameters)
|
|
if any([team_logo_url, color, short_name, full_name]):
|
|
if team_logo_url:
|
|
if team_logo_url.startswith('http') and (team_logo_url.endswith('.png') or
|
|
team_logo_url.endswith('.jpg') or
|
|
team_logo_url.endswith('.jpeg')):
|
|
params.append(('logo', team_logo_url))
|
|
updates['logo'] = team_logo_url
|
|
else:
|
|
await ctx.send('Logo must be a valid URL ending in .png, .jpg, or .jpeg')
|
|
return
|
|
|
|
if color:
|
|
clean_color = color.replace('#', '')
|
|
if len(clean_color) == 6 and all(c in '0123456789abcdefABCDEF' for c in clean_color):
|
|
params.append(('color', clean_color))
|
|
updates['color'] = clean_color
|
|
else:
|
|
await ctx.send('Invalid color format. Use hex format like #FF0000 or FF0000')
|
|
return
|
|
|
|
if short_name:
|
|
params.append(('sname', short_name))
|
|
updates['short_name'] = short_name
|
|
|
|
if full_name:
|
|
params.append(('lname', full_name))
|
|
updates['full_name'] = full_name
|
|
|
|
elif not branding_updates:
|
|
# Show current branding
|
|
embed = get_team_embed("Current Team Branding", team)
|
|
embed.add_field(name="Team Name", value=f"{team.get('lname', 'Unknown')} ({team.get('abbrev', 'UNK')})", inline=False)
|
|
embed.add_field(name="Short Name", value=team.get('sname', 'Unknown'), inline=True)
|
|
embed.add_field(name="GM Name", value=team.get('gmname', 'Unknown'), inline=True)
|
|
embed.add_field(name="Color", value=f"#{team.get('color', 'a6ce39')}", inline=True)
|
|
|
|
if team.get('logo'):
|
|
embed.add_field(name="Logo URL", value=team.get('logo', 'None'), inline=False)
|
|
|
|
embed.add_field(
|
|
name="How to Update",
|
|
value="Individual params: `/branding-pd team_logo_url color short_name full_name`\n"
|
|
"Or parsed: `/branding-pd branding_updates:color:#FF0000 logo:url`",
|
|
inline=False
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
return
|
|
else:
|
|
# Parse branding updates (new interface)
|
|
for update in branding_updates.split():
|
|
if ':' in update:
|
|
key, value = update.split(':', 1)
|
|
if key.lower() == 'color':
|
|
# Validate hex color
|
|
if value.startswith('#'):
|
|
value = value[1:]
|
|
if len(value) == 6 and all(c in '0123456789abcdefABCDEF' for c in value):
|
|
updates['color'] = value
|
|
else:
|
|
await ctx.send('Invalid color format. Use hex format like #FF0000 or FF0000')
|
|
return
|
|
elif key.lower() == 'logo':
|
|
if value.startswith('http') and (value.endswith('.png') or value.endswith('.jpg') or value.endswith('.jpeg')):
|
|
updates['logo'] = value
|
|
else:
|
|
await ctx.send('Logo must be a valid URL ending in .png, .jpg, or .jpeg')
|
|
return
|
|
|
|
if not updates:
|
|
await ctx.send('No valid updates found. Use `color:#hexcode` or `logo:url` format.')
|
|
return
|
|
|
|
# Apply updates
|
|
update_params = [(key, value) for key, value in updates.items()]
|
|
updated_team = await db_patch('teams', object_id=team.get('id'), params=update_params)
|
|
|
|
if updated_team:
|
|
embed = get_team_embed("Updated Team Branding", updated_team)
|
|
embed.add_field(name="Changes Applied", value="\n".join([f"**{k.title()}**: {v}" for k, v in updates.items()]), inline=False)
|
|
await ctx.send(embed=embed)
|
|
else:
|
|
await ctx.send('Failed to update team branding. Please try again.')
|
|
|
|
@commands.hybrid_command(name='pullroster', help='Pull saved rosters from your team Sheet',
|
|
aliases=['roster', 'rosters', 'pullrosters'])
|
|
@app_commands.describe(
|
|
specific_roster_num='Enter 1, 2, or 3 to only pull one roster; leave blank to pull all 3',
|
|
roster_name='The name of the roster to pull (alternative to roster number)'
|
|
)
|
|
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
@commands.check(legal_channel)
|
|
async def pull_roster_command(self, ctx, specific_roster_num: Optional[int] = None,
|
|
roster_name: Optional[str] = None):
|
|
"""Pull and display saved rosters from Google Sheets."""
|
|
|
|
team = await get_team_by_owner(get_context_user(ctx).id)
|
|
if not team:
|
|
await ctx.send('You don\'t have a team yet! Use `/newteam` to create one.')
|
|
return
|
|
|
|
if not team.get('gsheet') or team.get('gsheet') == 'None':
|
|
await ctx.send('You don\'t have a Google Sheet linked yet! Use `/newsheet` to link one.')
|
|
return
|
|
|
|
# Get rosters from sheet
|
|
try:
|
|
rosters = get_rosters(team, self.bot)
|
|
except Exception as e:
|
|
logger.error(f"Error getting rosters for {team.get('abbrev', 'Unknown')}: {e}")
|
|
await ctx.send('Could not retrieve rosters from your sheet.')
|
|
return
|
|
|
|
if not rosters or not any(rosters):
|
|
await ctx.send('Could not retrieve rosters from your sheet.')
|
|
return
|
|
|
|
# Original roster sync functionality (matches players_old.py business logic)
|
|
logger.debug(f'roster_data: {rosters}')
|
|
|
|
# Post roster team/card ids and throw error if db says no
|
|
synced_count = 0
|
|
for index, roster in enumerate(rosters):
|
|
logger.debug(f'index: {index} / roster: {roster}')
|
|
if (not specific_roster_num or specific_roster_num == index + 1) and roster:
|
|
if roster_name and roster.get('name', '').lower() != roster_name.lower():
|
|
continue
|
|
|
|
try:
|
|
this_roster = await db_post(
|
|
'rosters',
|
|
payload={
|
|
'team_id': team.get('id'), 'name': roster.get('name', 'Unknown'),
|
|
'roster_num': roster.get('roster_num'), 'card_ids': roster.get('cards', [])
|
|
}
|
|
)
|
|
synced_count += 1
|
|
logger.info(f"Synced roster '{roster.get('name', 'Unknown')}' for team {team.get('abbrev', 'Unknown')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync roster '{roster.get('name', 'Unknown')}': {e}")
|
|
|
|
if synced_count > 0:
|
|
# Import missing function
|
|
from helpers.random_content import random_conf_gif
|
|
await ctx.send(f"Successfully synced {synced_count} roster(s) to database!\n{random_conf_gif()}")
|
|
else:
|
|
await ctx.send("No rosters were synced. Check your sheet and try again.")
|
|
|
|
@commands.hybrid_command(name='ai-teams', help='Get list of AI teams and abbreviations')
|
|
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
|
|
@commands.check(legal_channel)
|
|
async def ai_teams_command(self, ctx):
|
|
"""Display list of AI teams and their abbreviations."""
|
|
|
|
# Get AI teams from database
|
|
ai_teams_query = await db_get('teams', params=[('is_ai', True)])
|
|
|
|
if not ai_teams_query or ai_teams_query.get('count', 0) == 0:
|
|
await ctx.send('No AI teams found.')
|
|
return
|
|
|
|
ai_teams = ai_teams_query.get('teams', [])
|
|
if not ai_teams:
|
|
await ctx.send('No AI teams found.')
|
|
return
|
|
|
|
# Sort teams alphabetically
|
|
ai_teams.sort(key=lambda x: x.get('abbrev', 'ZZZ'))
|
|
|
|
embed = discord.Embed(
|
|
title="AI Teams",
|
|
description="Available AI teams for gameplay",
|
|
color=0x1f8b4c
|
|
)
|
|
|
|
# Split teams into chunks for multiple fields
|
|
chunk_size = 15
|
|
for i in range(0, len(ai_teams), chunk_size):
|
|
chunk = ai_teams[i:i + chunk_size]
|
|
field_name = f"Teams {i+1}-{min(i+chunk_size, len(ai_teams))}"
|
|
field_value = ""
|
|
|
|
for team in chunk:
|
|
field_value += f"**{team.get('abbrev', 'UNK')}** - {team.get('sname', 'Unknown')}\n"
|
|
|
|
embed.add_field(name=field_name, value=field_value, inline=True)
|
|
|
|
embed.add_field(
|
|
name="Usage",
|
|
value="Use these abbreviations when playing games against AI teams",
|
|
inline=False
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
|
|
async def setup(bot):
|
|
"""Setup function for the TeamManagement cog."""
|
|
await bot.add_cog(TeamManagement(bot)) |