paper-dynasty-discord/cogs/players_new/team_management.py
Cal Corum 943dcc9b74 CLAUDE: Add get_context_user() helper for hybrid command compatibility
Created get_context_user() helper function to safely extract the user from
either Context or Interaction objects. This prevents AttributeError issues
when hybrid commands are invoked as slash commands.

Hybrid commands receive commands.Context (with .author) when invoked with
prefix commands, but discord.Interaction (with .user) when invoked as slash
commands. The helper function handles both cases transparently.

Updated all affected hybrid commands:
- /branding-pd (cogs/players.py, cogs/players_new/team_management.py)
- /pullroster (cogs/players.py, cogs/players_new/team_management.py)
- /newsheet (cogs/economy_new/team_setup.py)
- /lastpack (cogs/economy_new/packs.py)

This follows the same pattern as the owner_only() fix and provides a
consistent, maintainable solution for all hybrid commands.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 09:07:09 -06:00

352 lines
15 KiB
Python

# Team Management Module
# Contains team information and management functionality from the original players.py
from discord.ext import commands
from discord import app_commands
import discord
from typing import Optional
# Import specific utilities needed by this module
import logging
import pygsheets
import requests
from api_calls import db_get, db_post, db_patch, get_team_by_abbrev
from helpers import (
PD_PLAYERS_ROLE_NAME, IMAGES, get_channel, get_team_embed,
get_blank_team_card, get_team_by_owner, get_rosters,
legal_channel, team_summary_embed, SelectView,
is_ephemeral_channel, is_restricted_channel, can_send_message,
get_context_user
)
import helpers
from helpers.utils import get_roster_sheet
from helpers.constants import ALL_MLB_TEAMS
logger = logging.getLogger('discord_app')
class TeamManagement(commands.Cog):
"""Team information and management functionality for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
@app_commands.command(name='team', description='Show team overview and rosters')
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def team_slash_command(self, interaction: discord.Interaction, team_abbrev: Optional[str] = None):
"""Display team overview and rosters."""
await interaction.response.defer()
if team_abbrev:
team = await get_team_by_abbrev(team_abbrev)
if not team:
await interaction.followup.send(f'Could not find team with abbreviation: {team_abbrev}')
return
else:
team = await get_team_by_owner(interaction.user.id)
if not team:
await interaction.followup.send('You don\'t have a team yet! Use `/newteam` to create one.')
return
# Create team summary embed
embed = await team_summary_embed(team, interaction, include_roster=True)
# Add roster information
try:
rosters = get_rosters(team, self.bot)
if rosters:
roster_text = ""
for i, roster in enumerate(rosters):
if roster and roster.get('name'):
card_count = len(roster.get('cards', []))
roster_text += f"**{roster.get('name', 'Unknown')}**: {card_count} players\n"
if roster_text:
embed.add_field(name="Saved Rosters", value=roster_text, inline=False)
except Exception as e:
logger.warning(f"Could not retrieve rosters for team {team.get('abbrev', 'Unknown')}: {e}")
# Add Google Sheet link if available
if team.get('gsheet') and team.get('gsheet') != 'None':
sheet_link = get_roster_sheet(team, allow_embed=True)
if sheet_link:
embed.add_field(name="Team Sheet", value=sheet_link, inline=False)
await interaction.followup.send(embed=embed)
@app_commands.command(name='update-player', description='Update a player\'s card to a specific MLB team')
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def update_player_team(self, interaction: discord.Interaction, player_id: int):
"""Update player team functionality."""
owner_team = await get_team_by_owner(interaction.user.id)
if not owner_team:
await interaction.response.send_message(
'Thank you for offering to help - if you sign up for a team with /newteam I can let you post updates.',
ephemeral=True
)
return
if is_restricted_channel(interaction.channel):
await interaction.response.send_message(
f'Slide on down to #pd-bot-hole to run updates - thanks!',
ephemeral=True
)
return
await interaction.response.defer()
this_player = await db_get('players', object_id=player_id)
if not this_player:
await interaction.edit_original_response(content=f'No clue who that is.')
return
from helpers import get_card_embeds
embed = await get_card_embeds(get_blank_team_card(this_player))
await interaction.edit_original_response(content=None, embed=embed[0])
view = helpers.Confirm(responders=[interaction.user])
if can_send_message(interaction.channel):
question = await interaction.channel.send(
content='Is this the player you want to update?',
view=view
)
else:
question = await interaction.followup.send(
content='Is this the player you want to update?',
view=view
)
await view.wait()
if not view.value:
await question.edit(content='Okay, we\'ll leave it be.', view=None)
return
else:
await question.delete()
view = SelectView([
helpers.SelectUpdatePlayerTeam('AL', this_player, owner_team, self.bot),
helpers.SelectUpdatePlayerTeam('NL', this_player, owner_team, self.bot)
])
if can_send_message(interaction.channel):
await interaction.channel.send(content=None, view=view)
else:
await interaction.followup.send(content=None, view=view)
@commands.hybrid_command(name='branding-pd', help='Update your team branding')
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
@commands.check(legal_channel)
async def branding_command(self, ctx, team_logo_url: str = None, color: str = None,
short_name: str = None, full_name: str = None, *, branding_updates: Optional[str] = None):
"""Update team branding (logo, color, etc.)."""
team = await get_team_by_owner(get_context_user(ctx).id)
if not team:
await ctx.send('You don\'t have a team yet! Use `/newteam` to create one.')
return
# Handle both old interface (individual parameters) and new interface (parsed string)
params = []
updates = {}
# Check if using old interface (individual parameters)
if any([team_logo_url, color, short_name, full_name]):
if team_logo_url:
if team_logo_url.startswith('http') and (team_logo_url.endswith('.png') or
team_logo_url.endswith('.jpg') or
team_logo_url.endswith('.jpeg')):
params.append(('logo', team_logo_url))
updates['logo'] = team_logo_url
else:
await ctx.send('Logo must be a valid URL ending in .png, .jpg, or .jpeg')
return
if color:
clean_color = color.replace('#', '')
if len(clean_color) == 6 and all(c in '0123456789abcdefABCDEF' for c in clean_color):
params.append(('color', clean_color))
updates['color'] = clean_color
else:
await ctx.send('Invalid color format. Use hex format like #FF0000 or FF0000')
return
if short_name:
params.append(('sname', short_name))
updates['short_name'] = short_name
if full_name:
params.append(('lname', full_name))
updates['full_name'] = full_name
elif not branding_updates:
# Show current branding
embed = get_team_embed("Current Team Branding", team)
embed.add_field(name="Team Name", value=f"{team.get('lname', 'Unknown')} ({team.get('abbrev', 'UNK')})", inline=False)
embed.add_field(name="Short Name", value=team.get('sname', 'Unknown'), inline=True)
embed.add_field(name="GM Name", value=team.get('gmname', 'Unknown'), inline=True)
embed.add_field(name="Color", value=f"#{team.get('color', 'a6ce39')}", inline=True)
if team.get('logo'):
embed.add_field(name="Logo URL", value=team.get('logo', 'None'), inline=False)
embed.add_field(
name="How to Update",
value="Individual params: `/branding-pd team_logo_url color short_name full_name`\n"
"Or parsed: `/branding-pd branding_updates:color:#FF0000 logo:url`",
inline=False
)
await ctx.send(embed=embed)
return
else:
# Parse branding updates (new interface)
for update in branding_updates.split():
if ':' in update:
key, value = update.split(':', 1)
if key.lower() == 'color':
# Validate hex color
if value.startswith('#'):
value = value[1:]
if len(value) == 6 and all(c in '0123456789abcdefABCDEF' for c in value):
updates['color'] = value
else:
await ctx.send('Invalid color format. Use hex format like #FF0000 or FF0000')
return
elif key.lower() == 'logo':
if value.startswith('http') and (value.endswith('.png') or value.endswith('.jpg') or value.endswith('.jpeg')):
updates['logo'] = value
else:
await ctx.send('Logo must be a valid URL ending in .png, .jpg, or .jpeg')
return
if not updates:
await ctx.send('No valid updates found. Use `color:#hexcode` or `logo:url` format.')
return
# Apply updates
update_params = [(key, value) for key, value in updates.items()]
updated_team = await db_patch('teams', object_id=team.get('id'), params=update_params)
if updated_team:
embed = get_team_embed("Updated Team Branding", updated_team)
embed.add_field(name="Changes Applied", value="\n".join([f"**{k.title()}**: {v}" for k, v in updates.items()]), inline=False)
await ctx.send(embed=embed)
else:
await ctx.send('Failed to update team branding. Please try again.')
@commands.hybrid_command(name='pullroster', help='Pull saved rosters from your team Sheet',
aliases=['roster', 'rosters', 'pullrosters'])
@app_commands.describe(
specific_roster_num='Enter 1, 2, or 3 to only pull one roster; leave blank to pull all 3',
roster_name='The name of the roster to pull (alternative to roster number)'
)
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
@commands.check(legal_channel)
async def pull_roster_command(self, ctx, specific_roster_num: Optional[int] = None,
roster_name: Optional[str] = None):
"""Pull and display saved rosters from Google Sheets."""
team = await get_team_by_owner(get_context_user(ctx).id)
if not team:
await ctx.send('You don\'t have a team yet! Use `/newteam` to create one.')
return
if not team.get('gsheet') or team.get('gsheet') == 'None':
await ctx.send('You don\'t have a Google Sheet linked yet! Use `/newsheet` to link one.')
return
# Get rosters from sheet
try:
rosters = get_rosters(team, self.bot)
except Exception as e:
logger.error(f"Error getting rosters for {team.get('abbrev', 'Unknown')}: {e}")
await ctx.send('Could not retrieve rosters from your sheet.')
return
if not rosters or not any(rosters):
await ctx.send('Could not retrieve rosters from your sheet.')
return
# Original roster sync functionality (matches players_old.py business logic)
logger.debug(f'roster_data: {rosters}')
# Post roster team/card ids and throw error if db says no
synced_count = 0
for index, roster in enumerate(rosters):
logger.debug(f'index: {index} / roster: {roster}')
if (not specific_roster_num or specific_roster_num == index + 1) and roster:
if roster_name and roster.get('name', '').lower() != roster_name.lower():
continue
try:
this_roster = await db_post(
'rosters',
payload={
'team_id': team.get('id'), 'name': roster.get('name', 'Unknown'),
'roster_num': roster.get('roster_num'), 'card_ids': roster.get('cards', [])
}
)
synced_count += 1
logger.info(f"Synced roster '{roster.get('name', 'Unknown')}' for team {team.get('abbrev', 'Unknown')}")
except Exception as e:
logger.error(f"Failed to sync roster '{roster.get('name', 'Unknown')}': {e}")
if synced_count > 0:
# Import missing function
from helpers.random_content import random_conf_gif
await ctx.send(f"Successfully synced {synced_count} roster(s) to database!\n{random_conf_gif()}")
else:
await ctx.send("No rosters were synced. Check your sheet and try again.")
@commands.hybrid_command(name='ai-teams', help='Get list of AI teams and abbreviations')
@commands.has_any_role(PD_PLAYERS_ROLE_NAME)
@commands.check(legal_channel)
async def ai_teams_command(self, ctx):
"""Display list of AI teams and their abbreviations."""
# Get AI teams from database
ai_teams_query = await db_get('teams', params=[('is_ai', True)])
if not ai_teams_query or ai_teams_query.get('count', 0) == 0:
await ctx.send('No AI teams found.')
return
ai_teams = ai_teams_query.get('teams', [])
if not ai_teams:
await ctx.send('No AI teams found.')
return
# Sort teams alphabetically
ai_teams.sort(key=lambda x: x.get('abbrev', 'ZZZ'))
embed = discord.Embed(
title="AI Teams",
description="Available AI teams for gameplay",
color=0x1f8b4c
)
# Split teams into chunks for multiple fields
chunk_size = 15
for i in range(0, len(ai_teams), chunk_size):
chunk = ai_teams[i:i + chunk_size]
field_name = f"Teams {i+1}-{min(i+chunk_size, len(ai_teams))}"
field_value = ""
for team in chunk:
field_value += f"**{team.get('abbrev', 'UNK')}** - {team.get('sname', 'Unknown')}\n"
embed.add_field(name=field_name, value=field_value, inline=True)
embed.add_field(
name="Usage",
value="Use these abbreviations when playing games against AI teams",
inline=False
)
await ctx.send(embed=embed)
async def setup(bot):
"""Setup function for the TeamManagement cog."""
await bot.add_cog(TeamManagement(bot))