feat: add is_admin() helper to utils/permissions.py (#55)
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m9s

Add centralized `is_admin(interaction)` helper that includes the
`isinstance(interaction.user, discord.Member)` guard, preventing
AttributeError in DM contexts.

Use it in `can_edit_player_image()` which previously accessed
`guild_permissions.administrator` directly without the isinstance
guard. Update the corresponding test to mock the user with
`spec=discord.Member` so the isinstance check passes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-03-04 22:33:31 -06:00
parent f7a65706a1
commit ed40b532b5
3 changed files with 154 additions and 108 deletions

View File

@ -4,6 +4,7 @@ Player Image Management Commands
Allows users to update player fancy card and headshot images for players
on teams they own. Admins can update any player's images.
"""
from typing import List, Tuple
import asyncio
import aiohttp
@ -20,10 +21,11 @@ from utils.decorators import logged_command
from views.embeds import EmbedColors, EmbedTemplate
from views.base import BaseView
from models.player import Player
from utils.permissions import is_admin
# URL Validation Functions
def validate_url_format(url: str) -> Tuple[bool, str]:
"""
Validate URL format for image links.
@ -40,17 +42,20 @@ def validate_url_format(url: str) -> Tuple[bool, str]:
return False, "URL too long (max 500 characters)"
# Protocol check
if not url.startswith(('http://', 'https://')):
if not url.startswith(("http://", "https://")):
return False, "URL must start with http:// or https://"
# Image extension check
valid_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.webp')
valid_extensions = (".jpg", ".jpeg", ".png", ".gif", ".webp")
url_lower = url.lower()
# Check if URL ends with valid extension (ignore query params)
base_url = url_lower.split('?')[0] # Remove query parameters
base_url = url_lower.split("?")[0] # Remove query parameters
if not any(base_url.endswith(ext) for ext in valid_extensions):
return False, f"URL must end with a valid image extension: {', '.join(valid_extensions)}"
return (
False,
f"URL must end with a valid image extension: {', '.join(valid_extensions)}",
)
return True, ""
@ -68,14 +73,19 @@ async def check_url_accessibility(url: str) -> Tuple[bool, str]:
"""
try:
async with aiohttp.ClientSession() as session:
async with session.head(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
async with session.head(
url, timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status != 200:
return False, f"URL returned status {response.status}"
# Check content-type header
content_type = response.headers.get('content-type', '').lower()
if content_type and not content_type.startswith('image/'):
return False, f"URL does not return an image (content-type: {content_type})"
content_type = response.headers.get("content-type", "").lower()
if content_type and not content_type.startswith("image/"):
return (
False,
f"URL does not return an image (content-type: {content_type})",
)
return True, ""
@ -89,11 +99,9 @@ async def check_url_accessibility(url: str) -> Tuple[bool, str]:
# Permission Checking
async def can_edit_player_image(
interaction: discord.Interaction,
player: Player,
season: int,
logger
interaction: discord.Interaction, player: Player, season: int, logger
) -> Tuple[bool, str]:
"""
Check if user can edit player's image.
@ -109,7 +117,7 @@ async def can_edit_player_image(
If has permission, error_message is empty string
"""
# Admins can edit anyone
if interaction.user.guild_permissions.administrator:
if is_admin(interaction):
logger.debug("User is admin, granting permission", user_id=interaction.user.id)
return True, ""
@ -130,7 +138,7 @@ async def can_edit_player_image(
"User owns organization, granting permission",
user_id=interaction.user.id,
user_team=user_team.abbrev,
player_team=player.team.abbrev
player_team=player.team.abbrev,
)
return True, ""
@ -141,6 +149,7 @@ async def can_edit_player_image(
# Confirmation View
class ImageUpdateConfirmView(BaseView):
"""Confirmation view showing image preview before updating."""
@ -151,27 +160,33 @@ class ImageUpdateConfirmView(BaseView):
self.image_type = image_type
self.confirmed = False
@discord.ui.button(label="Confirm Update", style=discord.ButtonStyle.success, emoji="")
async def confirm_button(self, interaction: discord.Interaction, button: discord.ui.Button):
@discord.ui.button(
label="Confirm Update", style=discord.ButtonStyle.success, emoji=""
)
async def confirm_button(
self, interaction: discord.Interaction, button: discord.ui.Button
):
"""Confirm the image update."""
self.confirmed = True
# Disable all buttons
for item in self.children:
if hasattr(item, 'disabled'):
if hasattr(item, "disabled"):
item.disabled = True # type: ignore
await interaction.response.edit_message(view=self)
self.stop()
@discord.ui.button(label="Cancel", style=discord.ButtonStyle.secondary, emoji="")
async def cancel_button(self, interaction: discord.Interaction, button: discord.ui.Button):
async def cancel_button(
self, interaction: discord.Interaction, button: discord.ui.Button
):
"""Cancel the image update."""
self.confirmed = False
# Disable all buttons
for item in self.children:
if hasattr(item, 'disabled'):
if hasattr(item, "disabled"):
item.disabled = True # type: ignore
await interaction.response.edit_message(view=self)
@ -180,6 +195,7 @@ class ImageUpdateConfirmView(BaseView):
# Autocomplete
async def player_name_autocomplete(
interaction: discord.Interaction,
current: str,
@ -191,6 +207,7 @@ async def player_name_autocomplete(
try:
# Use the shared autocomplete utility with team prioritization
from utils.autocomplete import player_autocomplete
return await player_autocomplete(interaction, current)
except Exception:
# Return empty list on error to avoid breaking autocomplete
@ -199,27 +216,29 @@ async def player_name_autocomplete(
# Main Command Cog
class ImageCommands(commands.Cog):
"""Player image management command handlers."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.ImageCommands')
self.logger = get_contextual_logger(f"{__name__}.ImageCommands")
self.logger.info("ImageCommands cog initialized")
@app_commands.command(
name="set-image",
description="Update a player's fancy card or headshot image"
name="set-image", description="Update a player's fancy card or headshot image"
)
@app_commands.describe(
image_type="Type of image to update",
player_name="Player name (use autocomplete)",
image_url="Direct URL to the image file"
image_url="Direct URL to the image file",
)
@app_commands.choices(
image_type=[
app_commands.Choice(name="Fancy Card", value="fancy-card"),
app_commands.Choice(name="Headshot", value="headshot"),
]
)
@app_commands.choices(image_type=[
app_commands.Choice(name="Fancy Card", value="fancy-card"),
app_commands.Choice(name="Headshot", value="headshot")
])
@app_commands.autocomplete(player_name=player_name_autocomplete)
@logged_command("/set-image")
async def set_image(
@ -227,7 +246,7 @@ class ImageCommands(commands.Cog):
interaction: discord.Interaction,
image_type: app_commands.Choice[str],
player_name: str,
image_url: str
image_url: str,
):
"""Update a player's image (fancy card or headshot)."""
# Defer response for potentially slow operations
@ -242,7 +261,7 @@ class ImageCommands(commands.Cog):
"Image update requested",
user_id=interaction.user.id,
player_name=player_name,
image_type=img_type
image_type=img_type,
)
# Step 1: Validate URL format
@ -252,10 +271,10 @@ class ImageCommands(commands.Cog):
embed = EmbedTemplate.error(
title="Invalid URL Format",
description=f"{format_error}\n\n"
f"**Requirements:**\n"
f"• Must start with `http://` or `https://`\n"
f"• Must end with `.jpg`, `.jpeg`, `.png`, `.gif`, or `.webp`\n"
f"• Maximum 500 characters"
f"**Requirements:**\n"
f"• Must start with `http://` or `https://`\n"
f"• Must end with `.jpg`, `.jpeg`, `.png`, `.gif`, or `.webp`\n"
f"• Maximum 500 characters",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -268,24 +287,26 @@ class ImageCommands(commands.Cog):
embed = EmbedTemplate.error(
title="URL Not Accessible",
description=f"{access_error}\n\n"
f"**Please check:**\n"
f"• URL is correct and not expired\n"
f"• Image host is online\n"
f"• URL points directly to an image file\n"
f"• URL is publicly accessible"
f"**Please check:**\n"
f"• URL is correct and not expired\n"
f"• Image host is online\n"
f"• URL points directly to an image file\n"
f"• URL is publicly accessible",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Step 3: Find player
self.logger.debug("Searching for player", player_name=player_name)
players = await player_service.get_players_by_name(player_name, get_config().sba_season)
players = await player_service.get_players_by_name(
player_name, get_config().sba_season
)
if not players:
self.logger.warning("Player not found", player_name=player_name)
embed = EmbedTemplate.error(
title="Player Not Found",
description=f"❌ No player found matching `{player_name}` in the current season."
description=f"❌ No player found matching `{player_name}` in the current season.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -303,11 +324,13 @@ class ImageCommands(commands.Cog):
if player is None:
# Multiple candidates, ask user to be more specific
player_list = "\n".join([f"{p.name} ({p.primary_position})" for p in players[:10]])
player_list = "\n".join(
[f"{p.name} ({p.primary_position})" for p in players[:10]]
)
embed = EmbedTemplate.info(
title="Multiple Players Found",
description=f"🔍 Multiple players match `{player_name}`:\n\n{player_list}\n\n"
f"Please use the exact name from autocomplete."
f"Please use the exact name from autocomplete.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -324,12 +347,12 @@ class ImageCommands(commands.Cog):
"Permission denied",
user_id=interaction.user.id,
player_id=player.id,
error=permission_error
error=permission_error,
)
embed = EmbedTemplate.error(
title="Permission Denied",
description=f"{permission_error}\n\n"
f"You can only update images for players on teams you own."
f"You can only update images for players on teams you own.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -339,52 +362,46 @@ class ImageCommands(commands.Cog):
preview_embed = EmbedTemplate.create_base_embed(
title=f"🖼️ Update {display_name} for {player.name}",
description=f"Preview the new {display_name.lower()} below. Click **Confirm Update** to save this change.",
color=EmbedColors.INFO
color=EmbedColors.INFO,
)
# Add current image info
current_image = getattr(player, field_name, None)
if current_image:
preview_embed.add_field(
name="Current Image",
value="Will be replaced",
inline=True
name="Current Image", value="Will be replaced", inline=True
)
else:
preview_embed.add_field(
name="Current Image",
value="None set",
inline=True
)
preview_embed.add_field(name="Current Image", value="None set", inline=True)
# Add player info
preview_embed.add_field(
name="Player",
value=f"{player.name} ({player.primary_position})",
inline=True
inline=True,
)
if hasattr(player, 'team') and player.team:
preview_embed.add_field(
name="Team",
value=player.team.abbrev,
inline=True
)
if hasattr(player, "team") and player.team:
preview_embed.add_field(name="Team", value=player.team.abbrev, inline=True)
# Set the new image as thumbnail for preview
preview_embed.set_thumbnail(url=image_url)
preview_embed.set_footer(text="This preview shows how the image will appear. Confirm to save.")
preview_embed.set_footer(
text="This preview shows how the image will appear. Confirm to save."
)
# Create confirmation view
confirm_view = ImageUpdateConfirmView(
player=player,
image_url=image_url,
image_type=img_type,
user_id=interaction.user.id
user_id=interaction.user.id,
)
await interaction.followup.send(embed=preview_embed, view=confirm_view, ephemeral=True)
await interaction.followup.send(
embed=preview_embed, view=confirm_view, ephemeral=True
)
# Wait for confirmation
await confirm_view.wait()
@ -393,7 +410,7 @@ class ImageCommands(commands.Cog):
self.logger.info("Image update cancelled by user", player_id=player.id)
cancelled_embed = EmbedTemplate.info(
title="Update Cancelled",
description=f"No changes were made to {player.name}'s {display_name.lower()}."
description=f"No changes were made to {player.name}'s {display_name.lower()}.",
)
await interaction.edit_original_response(embed=cancelled_embed, view=None)
return
@ -403,7 +420,7 @@ class ImageCommands(commands.Cog):
"Updating player image",
player_id=player.id,
field=field_name,
url_length=len(image_url)
url_length=len(image_url),
)
update_data = {field_name: image_url}
@ -413,7 +430,7 @@ class ImageCommands(commands.Cog):
self.logger.error("Failed to update player", player_id=player.id)
error_embed = EmbedTemplate.error(
title="Update Failed",
description="❌ An error occurred while updating the player's image. Please try again."
description="❌ An error occurred while updating the player's image. Please try again.",
)
await interaction.edit_original_response(embed=error_embed, view=None)
return
@ -423,32 +440,24 @@ class ImageCommands(commands.Cog):
"Player image updated successfully",
player_id=player.id,
field=field_name,
user_id=interaction.user.id
user_id=interaction.user.id,
)
success_embed = EmbedTemplate.success(
title="Image Updated Successfully!",
description=f"**{display_name}** for **{player.name}** has been updated."
description=f"**{display_name}** for **{player.name}** has been updated.",
)
success_embed.add_field(
name="Player",
value=f"{player.name} ({player.primary_position})",
inline=True
inline=True,
)
if hasattr(player, 'team') and player.team:
success_embed.add_field(
name="Team",
value=player.team.abbrev,
inline=True
)
if hasattr(player, "team") and player.team:
success_embed.add_field(name="Team", value=player.team.abbrev, inline=True)
success_embed.add_field(
name="Image Type",
value=display_name,
inline=True
)
success_embed.add_field(name="Image Type", value=display_name, inline=True)
# Show the new image
success_embed.set_thumbnail(url=image_url)

View File

@ -3,17 +3,19 @@ Tests for player image management commands.
Covers URL validation, permission checking, and command execution.
"""
import pytest
import asyncio
from unittest.mock import MagicMock, patch
import aiohttp
import discord
from aioresponses import aioresponses
from commands.profile.images import (
validate_url_format,
check_url_accessibility,
can_edit_player_image,
ImageCommands
ImageCommands,
)
from tests.factories import PlayerFactory, TeamFactory
@ -94,7 +96,7 @@ class TestURLAccessibility:
url = "https://example.com/image.jpg"
with aioresponses() as m:
m.head(url, status=200, headers={'content-type': 'image/jpeg'})
m.head(url, status=200, headers={"content-type": "image/jpeg"})
is_accessible, error = await check_url_accessibility(url)
@ -118,7 +120,7 @@ class TestURLAccessibility:
url = "https://example.com/page.html"
with aioresponses() as m:
m.head(url, status=200, headers={'content-type': 'text/html'})
m.head(url, status=200, headers={"content-type": "text/html"})
is_accessible, error = await check_url_accessibility(url)
@ -157,6 +159,7 @@ class TestPermissionChecking:
async def test_admin_can_edit_any_player(self):
"""Test administrator can edit any player's images."""
mock_interaction = MagicMock()
mock_interaction.user = MagicMock(spec=discord.Member)
mock_interaction.user.id = 12345
mock_interaction.user.guild_permissions.administrator = True
@ -186,7 +189,9 @@ class TestPermissionChecking:
mock_logger = MagicMock()
with patch('commands.profile.images.team_service.get_teams_by_owner') as mock_get_teams:
with patch(
"commands.profile.images.team_service.get_teams_by_owner"
) as mock_get_teams:
mock_get_teams.return_value = [user_team]
has_permission, error = await can_edit_player_image(
@ -211,7 +216,9 @@ class TestPermissionChecking:
mock_logger = MagicMock()
with patch('commands.profile.images.team_service.get_teams_by_owner') as mock_get_teams:
with patch(
"commands.profile.images.team_service.get_teams_by_owner"
) as mock_get_teams:
mock_get_teams.return_value = [user_team]
has_permission, error = await can_edit_player_image(
@ -236,7 +243,9 @@ class TestPermissionChecking:
mock_logger = MagicMock()
with patch('commands.profile.images.team_service.get_teams_by_owner') as mock_get_teams:
with patch(
"commands.profile.images.team_service.get_teams_by_owner"
) as mock_get_teams:
mock_get_teams.return_value = [user_team]
has_permission, error = await can_edit_player_image(
@ -258,7 +267,9 @@ class TestPermissionChecking:
mock_logger = MagicMock()
with patch('commands.profile.images.team_service.get_teams_by_owner') as mock_get_teams:
with patch(
"commands.profile.images.team_service.get_teams_by_owner"
) as mock_get_teams:
mock_get_teams.return_value = []
has_permission, error = await can_edit_player_image(
@ -299,7 +310,7 @@ class TestImageCommandsIntegration:
async def test_set_image_command_structure(self, commands_cog):
"""Test that set_image command is properly configured."""
assert hasattr(commands_cog, 'set_image')
assert hasattr(commands_cog, "set_image")
assert commands_cog.set_image.name == "set-image"
async def test_fancy_card_updates_vanity_card_field(self, commands_cog):

View File

@ -7,6 +7,7 @@ servers and user types:
- @league_only: Only available in the league server
- @requires_team: User must have a team (works with global commands)
"""
import logging
from functools import wraps
from typing import Optional, Callable
@ -21,6 +22,7 @@ logger = logging.getLogger(__name__)
class PermissionError(Exception):
"""Raised when a user doesn't have permission to use a command."""
pass
@ -54,17 +56,16 @@ async def get_user_team(user_id: int) -> Optional[dict]:
# This call is automatically cached by TeamService
config = get_config()
team = await team_service.get_team_by_owner(
owner_id=user_id,
season=config.sba_season
owner_id=user_id, season=config.sba_season
)
if team:
logger.debug(f"User {user_id} has team: {team.lname}")
return {
'id': team.id,
'name': team.lname,
'abbrev': team.abbrev,
'season': team.season
"id": team.id,
"name": team.lname,
"abbrev": team.abbrev,
"season": team.season,
}
logger.debug(f"User {user_id} does not have a team")
@ -77,6 +78,18 @@ def is_league_server(guild_id: int) -> bool:
return guild_id == config.guild_id
def is_admin(interaction: discord.Interaction) -> bool:
"""Check if the interaction user is a server administrator.
Includes an isinstance guard for discord.Member so this is safe
to call from DM contexts (where guild_permissions is unavailable).
"""
return (
isinstance(interaction.user, discord.Member)
and interaction.user.guild_permissions.administrator
)
def league_only():
"""
Decorator to restrict a command to the league server only.
@ -87,14 +100,14 @@ def league_only():
async def team_command(self, interaction: discord.Interaction):
# Only executes in league server
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self, interaction: discord.Interaction, *args, **kwargs):
# Check if in a guild
if not interaction.guild:
await interaction.response.send_message(
"❌ This command can only be used in a server.",
ephemeral=True
"❌ This command can only be used in a server.", ephemeral=True
)
return
@ -102,13 +115,14 @@ def league_only():
if not is_league_server(interaction.guild.id):
await interaction.response.send_message(
"❌ This command is only available in the SBa league server.",
ephemeral=True
ephemeral=True,
)
return
return await func(self, interaction, *args, **kwargs)
return wrapper
return decorator
@ -123,6 +137,7 @@ def requires_team():
async def mymoves_command(self, interaction: discord.Interaction):
# Only executes if user has a team
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self, interaction: discord.Interaction, *args, **kwargs):
@ -133,29 +148,33 @@ def requires_team():
if team is None:
await interaction.response.send_message(
"❌ This command requires you to have a team in the SBa league. Contact an admin if you believe this is an error.",
ephemeral=True
ephemeral=True,
)
return
# Store team info in interaction for command to use
# This allows commands to access the team without another lookup
interaction.extras['user_team'] = team
interaction.extras["user_team"] = team
return await func(self, interaction, *args, **kwargs)
except Exception as e:
# Log the error for debugging
logger.error(f"Error checking team ownership for user {interaction.user.id}: {e}", exc_info=True)
logger.error(
f"Error checking team ownership for user {interaction.user.id}: {e}",
exc_info=True,
)
# Provide helpful error message to user
await interaction.response.send_message(
"❌ Unable to verify team ownership due to a temporary error. Please try again in a moment. "
"If this persists, contact an admin.",
ephemeral=True
ephemeral=True,
)
return
return wrapper
return decorator
@ -170,12 +189,14 @@ def global_command():
async def roll_command(self, interaction: discord.Interaction):
# Available in all servers
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self, interaction: discord.Interaction, *args, **kwargs):
return await func(self, interaction, *args, **kwargs)
return wrapper
return decorator
@ -190,35 +211,35 @@ def admin_only():
async def sync_command(self, interaction: discord.Interaction):
# Only executes for admins
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self, interaction: discord.Interaction, *args, **kwargs):
# Check if user is guild admin
if not interaction.guild:
await interaction.response.send_message(
"❌ This command can only be used in a server.",
ephemeral=True
"❌ This command can only be used in a server.", ephemeral=True
)
return
# Check if user has admin permissions
if not isinstance(interaction.user, discord.Member):
await interaction.response.send_message(
"❌ Unable to verify permissions.",
ephemeral=True
"❌ Unable to verify permissions.", ephemeral=True
)
return
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"❌ This command requires administrator permissions.",
ephemeral=True
ephemeral=True,
)
return
return await func(self, interaction, *args, **kwargs)
return wrapper
return decorator
@ -241,6 +262,7 @@ def league_admin_only():
async def admin_sync_prefix(self, ctx: commands.Context):
# Only league server admins can use this
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(self, ctx_or_interaction, *args, **kwargs):
@ -254,6 +276,7 @@ def league_admin_only():
async def send_error(msg: str):
await ctx.send(msg)
else:
interaction = ctx_or_interaction
guild = interaction.guild
@ -269,7 +292,9 @@ def league_admin_only():
# Check if league server
if not is_league_server(guild.id):
await send_error("❌ This command is only available in the SBa league server.")
await send_error(
"❌ This command is only available in the SBa league server."
)
return
# Check admin permissions
@ -284,4 +309,5 @@ def league_admin_only():
return await func(self, ctx_or_interaction, *args, **kwargs)
return wrapper
return decorator