Merge branch 'main' into fix/batch-cleanup-group-a
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m27s

This commit is contained in:
Claude 2026-03-23 04:24:25 +00:00
commit 8740c65773
8 changed files with 2615 additions and 1343 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -12,65 +12,89 @@ import datetime
from sqlmodel import Session
from api_calls import db_get, db_post, db_patch, db_delete, get_team_by_abbrev
from helpers import (
ACTIVE_EVENT_LITERAL, PD_PLAYERS_ROLE_NAME, get_team_embed, get_team_by_owner,
legal_channel, Confirm, send_to_channel
ACTIVE_EVENT_LITERAL,
PD_PLAYERS_ROLE_NAME,
get_team_embed,
get_team_by_owner,
legal_channel,
Confirm,
send_to_channel,
)
from helpers.utils import get_roster_sheet, get_cal_user
from utilities.buttons import ask_with_buttons
from in_game.gameplay_models import engine
from in_game.gameplay_queries import get_team_or_none
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
# Try to import gauntlets module, provide fallback if not available
try:
import gauntlets
GAUNTLETS_AVAILABLE = True
except ImportError:
logger.warning("Gauntlets module not available - gauntlet commands will have limited functionality")
logger.warning(
"Gauntlets module not available - gauntlet commands will have limited functionality"
)
GAUNTLETS_AVAILABLE = False
gauntlets = None
class Gauntlet(commands.Cog):
"""Gauntlet game mode functionality for Paper Dynasty."""
def __init__(self, bot):
self.bot = bot
group_gauntlet = app_commands.Group(name='gauntlets', description='Check your progress or start a new Gauntlet')
group_gauntlet = app_commands.Group(
name="gauntlets", description="Check your progress or start a new Gauntlet"
)
@group_gauntlet.command(name='status', description='View status of current Gauntlet run')
@group_gauntlet.command(
name="status", description="View status of current Gauntlet run"
)
@app_commands.describe(
team_abbrev='To check the status of a team\'s active run, enter their abbreviation'
team_abbrev="To check the status of a team's active run, enter their abbreviation"
)
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_run_command(
self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL, # type: ignore
team_abbrev: Optional[str] = None):
self,
interaction: discord.Interaction,
event_name: ACTIVE_EVENT_LITERAL, # type: ignore
team_abbrev: Optional[str] = None,
):
"""View status of current gauntlet run - corrected to match original business logic."""
await interaction.response.defer()
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
if not e_query or e_query.get('count', 0) == 0:
await interaction.edit_original_response(content=f'Hmm...looks like that event is inactive.')
e_query = await db_get(
"events", params=[("name", event_name), ("active", True)]
)
if not e_query or e_query.get("count", 0) == 0:
await interaction.edit_original_response(
content=f"Hmm...looks like that event is inactive."
)
return
else:
this_event = e_query['events'][0]
this_event = e_query["events"][0]
this_run, this_team = None, None
if team_abbrev:
if 'Gauntlet-' not in team_abbrev:
team_abbrev = f'Gauntlet-{team_abbrev}'
t_query = await db_get('teams', params=[('abbrev', team_abbrev)])
if t_query and t_query.get('count', 0) != 0:
this_team = t_query['teams'][0]
r_query = await db_get('gauntletruns', params=[
('team_id', this_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
])
if "Gauntlet-" not in team_abbrev:
team_abbrev = f"Gauntlet-{team_abbrev}"
t_query = await db_get("teams", params=[("abbrev", team_abbrev)])
if t_query and t_query.get("count", 0) != 0:
this_team = t_query["teams"][0]
r_query = await db_get(
"gauntletruns",
params=[
("team_id", this_team["id"]),
("is_active", True),
("gauntlet_id", this_event["id"]),
],
)
if r_query and r_query.get('count', 0) != 0:
this_run = r_query['runs'][0]
if r_query and r_query.get("count", 0) != 0:
this_run = r_query["runs"][0]
else:
await interaction.edit_original_response(
content=f'I do not see an active run for the {this_team["lname"]}.'
@ -78,7 +102,7 @@ class Gauntlet(commands.Cog):
return
else:
await interaction.edit_original_response(
content=f'I do not see an active run for {team_abbrev.upper()}.'
content=f"I do not see an active run for {team_abbrev.upper()}."
)
return
@ -86,127 +110,168 @@ class Gauntlet(commands.Cog):
if GAUNTLETS_AVAILABLE and gauntlets:
await interaction.edit_original_response(
content=None,
embed=await gauntlets.get_embed(this_run, this_event, this_team) # type: ignore
embed=await gauntlets.get_embed(this_run, this_event, this_team), # type: ignore
)
else:
await interaction.edit_original_response(
content='Gauntlet status unavailable - gauntlets module not loaded.'
content="Gauntlet status unavailable - gauntlets module not loaded."
)
@group_gauntlet.command(name='start', description='Start a new Gauntlet run')
@group_gauntlet.command(name="start", description="Start a new Gauntlet run")
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_start_command(self, interaction: discord.Interaction):
"""Start a new gauntlet run."""
# Channel restriction - must be in a 'hello' channel (private channel)
if interaction.channel and hasattr(interaction.channel, 'name') and 'hello' not in str(interaction.channel.name):
if (
interaction.channel
and hasattr(interaction.channel, "name")
and "hello" not in str(interaction.channel.name)
):
await interaction.response.send_message(
content='The draft will probably take you about 15 minutes. Why don\'t you head to your private '
'channel to run the draft?',
ephemeral=True
content="The draft will probably take you about 15 minutes. Why don't you head to your private "
"channel to run the draft?",
ephemeral=True,
)
return
logger.info(f'Starting a gauntlet run for user {interaction.user.name}')
logger.info(f"Starting a gauntlet run for user {interaction.user.name}")
await interaction.response.defer()
with Session(engine) as session:
main_team = await get_team_or_none(session, gm_id=interaction.user.id, main_team=True)
draft_team = await get_team_or_none(session, gm_id=interaction.user.id, gauntlet_team=True)
main_team = await get_team_or_none(
session, gm_id=interaction.user.id, main_team=True
)
draft_team = await get_team_or_none(
session, gm_id=interaction.user.id, gauntlet_team=True
)
# Get active events
e_query = await db_get('events', params=[("active", True)])
if not e_query or e_query.get('count', 0) == 0:
await interaction.edit_original_response(content='Hmm...I don\'t see any active events.')
e_query = await db_get("events", params=[("active", True)])
if not e_query or e_query.get("count", 0) == 0:
await interaction.edit_original_response(
content="Hmm...I don't see any active events."
)
return
elif e_query.get('count', 0) == 1:
this_event = e_query['events'][0]
elif e_query.get("count", 0) == 1:
this_event = e_query["events"][0]
else:
event_choice = await ask_with_buttons(
interaction,
button_options=[x['name'] for x in e_query['events']],
question='Which event would you like to take on?',
button_options=[x["name"] for x in e_query["events"]],
question="Which event would you like to take on?",
timeout=3,
delete_question=False
delete_question=False,
)
this_event = [event for event in e_query['events'] if event['name'] == event_choice][0]
logger.info(f'this_event: {this_event}')
this_event = [
event
for event in e_query["events"]
if event["name"] == event_choice
][0]
logger.info(f"this_event: {this_event}")
first_flag = draft_team is None
if draft_team is not None:
r_query = await db_get(
'gauntletruns',
params=[('team_id', draft_team.id), ('gauntlet_id', this_event['id']), ('is_active', True)]
"gauntletruns",
params=[
("team_id", draft_team.id),
("gauntlet_id", this_event["id"]),
("is_active", True),
],
)
if r_query and r_query.get('count', 0) != 0:
if r_query and r_query.get("count", 0) != 0:
await interaction.edit_original_response(
content=f'Looks like you already have a {r_query["runs"][0]["gauntlet"]["name"]} run active! '
f'You can check it out with the `/gauntlets status` command.'
f"You can check it out with the `/gauntlets status` command."
)
return
try:
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
draft_embed = await gauntlets.run_draft(interaction, main_team, this_event, draft_team) # type: ignore
except ZeroDivisionError as e:
logger.error(
f'ZeroDivisionError in {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
)
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
await interaction.followup.send(
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
f"fix it quick."
)
return
except Exception as e:
logger.error(f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}')
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
logger.error(
f'Failed to run {this_event["name"]} draft for the {main_team.sname if main_team else "unknown"}: {e}'
)
await gauntlets.wipe_team(draft_team, interaction) # type: ignore
await interaction.followup.send(
content=f'Shoot - it looks like we ran into an issue running the draft. I had to clear it all out '
f'for now. I let {get_cal_user(interaction).mention} know what happened so he better '
f'fix it quick.'
content=f"Shoot - it looks like we ran into an issue running the draft. I had to clear it all out "
f"for now. I let {get_cal_user(interaction).mention} know what happened so he better "
f"fix it quick."
)
return
if first_flag:
await interaction.followup.send(
f'Good luck, champ in the making! To start playing, follow these steps:\n\n'
f'1) Make a copy of the Team Sheet Template found in `/help-pd links`\n'
f'2) Run `/newsheet` to link it to your Gauntlet team\n'
f"Good luck, champ in the making! To start playing, follow these steps:\n\n"
f"1) Make a copy of the Team Sheet Template found in `/help-pd links`\n"
f"2) Run `/newsheet` to link it to your Gauntlet team\n"
f'3) Go play your first game with `/new-game gauntlet {this_event["name"]}`'
)
else:
await interaction.followup.send(
f'Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> '
f'**Data Imports** -> **My Cards** then you can set your lineup here and you\'ll be ready to go!\n\n'
f'{get_roster_sheet(draft_team)}'
f"Good luck, champ in the making! In your team sheet, sync your cards with **Paper Dynasty** -> "
f"**Data Imports** -> **My Cards** then you can set your lineup here and you'll be ready to go!\n\n"
f"{get_roster_sheet(draft_team)}"
)
await send_to_channel(
bot=self.bot,
channel_name='pd-news-ticker',
channel_name="pd-news-ticker",
content=f'The {main_team.lname if main_team else "Unknown Team"} have entered the {this_event["name"]} Gauntlet!',
embed=draft_embed
embed=draft_embed,
)
@group_gauntlet.command(name='reset', description='Wipe your current team so you can re-draft')
@group_gauntlet.command(
name="reset", description="Wipe your current team so you can re-draft"
)
@app_commands.checks.has_any_role(PD_PLAYERS_ROLE_NAME)
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
async def gauntlet_reset_command(self, interaction: discord.Interaction, event_name: ACTIVE_EVENT_LITERAL): # type: ignore
"""Reset current gauntlet run."""
await interaction.response.defer()
main_team = await get_team_by_owner(interaction.user.id)
draft_team = await get_team_by_abbrev(f'Gauntlet-{main_team["abbrev"]}')
if draft_team is None:
await interaction.edit_original_response(
content='Hmm, I can\'t find a gauntlet team for you. Have you signed up already?')
content="Hmm, I can't find a gauntlet team for you. Have you signed up already?"
)
return
e_query = await db_get('events', params=[("name", event_name), ("active", True)])
if e_query['count'] == 0:
await interaction.edit_original_response(content='Hmm...looks like that event is inactive.')
e_query = await db_get(
"events", params=[("name", event_name), ("active", True)]
)
if e_query["count"] == 0:
await interaction.edit_original_response(
content="Hmm...looks like that event is inactive."
)
return
else:
this_event = e_query['events'][0]
this_event = e_query["events"][0]
r_query = await db_get('gauntletruns', params=[
('team_id', draft_team['id']), ('is_active', True), ('gauntlet_id', this_event['id'])
])
r_query = await db_get(
"gauntletruns",
params=[
("team_id", draft_team["id"]),
("is_active", True),
("gauntlet_id", this_event["id"]),
],
)
if r_query and r_query.get('count', 0) != 0:
this_run = r_query['runs'][0]
if r_query and r_query.get("count", 0) != 0:
this_run = r_query["runs"][0]
else:
await interaction.edit_original_response(
content=f'I do not see an active run for the {draft_team["lname"]}.'
@ -214,27 +279,24 @@ class Gauntlet(commands.Cog):
return
view = Confirm(responders=[interaction.user], timeout=60)
conf_string = f'Are you sure you want to wipe your active run?'
await interaction.edit_original_response(
content=conf_string,
view=view
)
conf_string = f"Are you sure you want to wipe your active run?"
await interaction.edit_original_response(content=conf_string, view=view)
await view.wait()
if view.value:
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
await gauntlets.end_run(this_run, this_event, draft_team, force_end=True) # type: ignore
await interaction.edit_original_response(
content=f'Your {event_name} run has been reset. Run `/gauntlets start` to redraft!',
view=None
content=f"Your {event_name} run has been reset. Run `/gauntlets start` to redraft!",
view=None,
)
else:
await interaction.edit_original_response(
content=f'~~{conf_string}~~\n\nNo worries, I will leave it active.',
view=None
content=f"~~{conf_string}~~\n\nNo worries, I will leave it active.",
view=None,
)
async def setup(bot):
"""Setup function for the Gauntlet cog."""
await bot.add_cog(Gauntlet(bot))
await bot.add_cog(Gauntlet(bot))

View File

@ -4295,12 +4295,14 @@ async def complete_game(
else this_game.away_team
)
db_game = None
try:
db_game = await db_post("games", payload=game_data)
db_ready_plays = get_db_ready_plays(session, this_game, db_game["id"])
db_ready_decisions = get_db_ready_decisions(session, this_game, db_game["id"])
except Exception as e:
await roll_back(db_game["id"])
if db_game is not None:
await roll_back(db_game["id"])
log_exception(e, msg="Unable to post game to API, rolling back")
# Post game stats to API

View File

@ -4,6 +4,7 @@ Discord Utilities
This module contains Discord helper functions for channels, roles, embeds,
and other Discord-specific operations.
"""
import logging
import os
import asyncio
@ -13,19 +14,21 @@ import discord
from discord.ext import commands
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
async def send_to_bothole(ctx, content, embed):
"""Send a message to the pd-bot-hole channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-bot-hole").send(
content=content, embed=embed
)
async def send_to_news(ctx, content, embed):
"""Send a message to the pd-news-ticker channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-news-ticker").send(
content=content, embed=embed
)
async def typing_pause(ctx, seconds=1):
@ -43,23 +46,20 @@ async def pause_then_type(ctx, message):
async def check_if_pdhole(ctx):
"""Check if the current channel is pd-bot-hole."""
if ctx.message.channel.name != 'pd-bot-hole':
await ctx.send('Slide on down to my bot-hole for running commands.')
await ctx.message.add_reaction('')
if ctx.message.channel.name != "pd-bot-hole":
await ctx.send("Slide on down to my bot-hole for running commands.")
await ctx.message.add_reaction("")
return False
return True
async def bad_channel(ctx):
"""Check if current channel is in the list of bad channels for commands."""
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
bad_channels = ["paper-dynasty-chat", "pd-news-ticker"]
if ctx.message.channel.name in bad_channels:
await ctx.message.add_reaction('')
bot_hole = discord.utils.get(
ctx.guild.text_channels,
name=f'pd-bot-hole'
)
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
await ctx.message.add_reaction("")
bot_hole = discord.utils.get(ctx.guild.text_channels, name=f"pd-bot-hole")
await ctx.send(f"Slide on down to the {bot_hole.mention} ;)")
return True
else:
return False
@ -68,14 +68,11 @@ async def bad_channel(ctx):
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
"""Get a text channel by name."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
return None
channel = discord.utils.get(
guild.text_channels,
name=name
)
channel = discord.utils.get(guild.text_channels, name=name)
if channel:
return channel
return None
@ -87,7 +84,7 @@ async def get_emoji(ctx, name, return_empty=True):
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
except:
if return_empty:
emoji = ''
emoji = ""
else:
return name
return emoji
@ -101,9 +98,13 @@ async def react_and_reply(ctx, reaction, message):
async def send_to_channel(bot, channel_name, content=None, embed=None):
"""Send a message to a specific channel by name or ID."""
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
guild_id = os.environ.get("GUILD_ID")
if not guild_id:
logger.error("GUILD_ID env var is not set")
return
guild = bot.get_guild(int(guild_id))
if not guild:
logger.error('Cannot send to channel - bot not logged in')
logger.error("Cannot send to channel - bot not logged in")
return
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
@ -111,7 +112,7 @@ async def send_to_channel(bot, channel_name, content=None, embed=None):
if not this_channel:
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
if not this_channel:
raise NameError(f'**{channel_name}** channel not found')
raise NameError(f"**{channel_name}** channel not found")
return await this_channel.send(content=content, embed=embed)
@ -128,14 +129,16 @@ async def get_or_create_role(ctx, role_name, mentionable=True):
def get_special_embed(special):
"""Create an embed for a special item."""
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
color=discord.Color.random(),
description=f'{special.short_desc}')
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
if special.thumbnail.lower() != 'none':
embed.set_thumbnail(url=f'{special.thumbnail}')
if special.url.lower() != 'none':
embed.set_image(url=f'{special.url}')
embed = discord.Embed(
title=f"{special.name} - Special #{special.get_id()}",
color=discord.Color.random(),
description=f"{special.short_desc}",
)
embed.add_field(name="Description", value=f"{special.long_desc}", inline=False)
if special.thumbnail.lower() != "none":
embed.set_thumbnail(url=f"{special.thumbnail}")
if special.url.lower() != "none":
embed.set_image(url=f"{special.url}")
return embed
@ -154,99 +157,125 @@ def get_team_embed(title, team=None, thumbnail: bool = True):
if team:
embed = discord.Embed(
title=title,
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16),
)
embed.set_footer(
text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES["logo"])
else:
embed = discord.Embed(
title=title,
color=int(SBA_COLOR, 16)
embed = discord.Embed(title=title, color=int(SBA_COLOR, 16))
embed.set_footer(
text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=IMAGES['logo'])
embed.set_thumbnail(url=IMAGES["logo"])
return embed
async def create_channel_old(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
allowed_roles=None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
allowed_members=None,
allowed_roles=None,
):
"""Create a text channel with specified permissions (legacy version)."""
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
ctx.guild.me: discord.PermissionOverwrite(
read_messages=True, send_messages=True
),
ctx.guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if allowed_members:
if isinstance(allowed_members, list):
for member in allowed_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if allowed_roles:
if isinstance(allowed_roles, list):
for role in allowed_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
this_channel = await ctx.guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
async def create_channel(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
read_send_members: list = None,
read_send_roles: list = None,
read_only_roles: list = None,
):
"""Create a text channel with specified permissions."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
raise ValueError(f'Unable to access guild from context object')
raise ValueError(f"Unable to access guild from context object")
# Get bot member - different for Context vs Interaction
if hasattr(ctx, 'me'): # Context object
if hasattr(ctx, "me"): # Context object
bot_member = ctx.me
elif hasattr(ctx, 'client'): # Interaction object
elif hasattr(ctx, "client"): # Interaction object
bot_member = guild.get_member(ctx.client.user.id)
else:
# Fallback - try to find bot member by getting the first member with bot=True
bot_member = next((m for m in guild.members if m.bot), None)
if not bot_member:
raise ValueError(f'Unable to find bot member in guild')
raise ValueError(f"Unable to find bot member in guild")
this_category = discord.utils.get(guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if read_send_members:
for member in read_send_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_send_roles:
for role in read_send_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_only_roles:
for role in read_only_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=False
)
this_channel = await guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
return this_channel

View File

@ -4,6 +4,7 @@ Discord Utilities
This module contains Discord helper functions for channels, roles, embeds,
and other Discord-specific operations.
"""
import logging
import os
import asyncio
@ -13,19 +14,21 @@ import discord
from discord.ext import commands
from helpers.constants import SBA_COLOR, PD_SEASON, IMAGES
logger = logging.getLogger('discord_app')
logger = logging.getLogger("discord_app")
async def send_to_bothole(ctx, content, embed):
"""Send a message to the pd-bot-hole channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-bot-hole') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-bot-hole").send(
content=content, embed=embed
)
async def send_to_news(ctx, content, embed):
"""Send a message to the pd-news-ticker channel."""
await discord.utils.get(ctx.guild.text_channels, name='pd-news-ticker') \
.send(content=content, embed=embed)
await discord.utils.get(ctx.guild.text_channels, name="pd-news-ticker").send(
content=content, embed=embed
)
async def typing_pause(ctx, seconds=1):
@ -43,23 +46,20 @@ async def pause_then_type(ctx, message):
async def check_if_pdhole(ctx):
"""Check if the current channel is pd-bot-hole."""
if ctx.message.channel.name != 'pd-bot-hole':
await ctx.send('Slide on down to my bot-hole for running commands.')
await ctx.message.add_reaction('')
if ctx.message.channel.name != "pd-bot-hole":
await ctx.send("Slide on down to my bot-hole for running commands.")
await ctx.message.add_reaction("")
return False
return True
async def bad_channel(ctx):
"""Check if current channel is in the list of bad channels for commands."""
bad_channels = ['paper-dynasty-chat', 'pd-news-ticker']
bad_channels = ["paper-dynasty-chat", "pd-news-ticker"]
if ctx.message.channel.name in bad_channels:
await ctx.message.add_reaction('')
bot_hole = discord.utils.get(
ctx.guild.text_channels,
name=f'pd-bot-hole'
)
await ctx.send(f'Slide on down to the {bot_hole.mention} ;)')
await ctx.message.add_reaction("")
bot_hole = discord.utils.get(ctx.guild.text_channels, name=f"pd-bot-hole")
await ctx.send(f"Slide on down to the {bot_hole.mention} ;)")
return True
else:
return False
@ -68,14 +68,11 @@ async def bad_channel(ctx):
def get_channel(ctx, name) -> Optional[discord.TextChannel]:
"""Get a text channel by name."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
return None
channel = discord.utils.get(
guild.text_channels,
name=name
)
channel = discord.utils.get(guild.text_channels, name=name)
if channel:
return channel
return None
@ -87,7 +84,7 @@ async def get_emoji(ctx, name, return_empty=True):
emoji = await commands.converter.EmojiConverter().convert(ctx, name)
except:
if return_empty:
emoji = ''
emoji = ""
else:
return name
return emoji
@ -101,9 +98,13 @@ async def react_and_reply(ctx, reaction, message):
async def send_to_channel(bot, channel_name, content=None, embed=None):
"""Send a message to a specific channel by name or ID."""
guild = bot.get_guild(int(os.environ.get('GUILD_ID')))
guild_id = os.environ.get("GUILD_ID")
if not guild_id:
logger.error("GUILD_ID env var is not set")
return
guild = bot.get_guild(int(guild_id))
if not guild:
logger.error('Cannot send to channel - bot not logged in')
logger.error("Cannot send to channel - bot not logged in")
return
this_channel = discord.utils.get(guild.text_channels, name=channel_name)
@ -111,7 +112,7 @@ async def send_to_channel(bot, channel_name, content=None, embed=None):
if not this_channel:
this_channel = discord.utils.get(guild.text_channels, id=channel_name)
if not this_channel:
raise NameError(f'**{channel_name}** channel not found')
raise NameError(f"**{channel_name}** channel not found")
return await this_channel.send(content=content, embed=embed)
@ -128,14 +129,16 @@ async def get_or_create_role(ctx, role_name, mentionable=True):
def get_special_embed(special):
"""Create an embed for a special item."""
embed = discord.Embed(title=f'{special.name} - Special #{special.get_id()}',
color=discord.Color.random(),
description=f'{special.short_desc}')
embed.add_field(name='Description', value=f'{special.long_desc}', inline=False)
if special.thumbnail.lower() != 'none':
embed.set_thumbnail(url=f'{special.thumbnail}')
if special.url.lower() != 'none':
embed.set_image(url=f'{special.url}')
embed = discord.Embed(
title=f"{special.name} - Special #{special.get_id()}",
color=discord.Color.random(),
description=f"{special.short_desc}",
)
embed.add_field(name="Description", value=f"{special.long_desc}", inline=False)
if special.thumbnail.lower() != "none":
embed.set_thumbnail(url=f"{special.thumbnail}")
if special.url.lower() != "none":
embed.set_image(url=f"{special.url}")
return embed
@ -154,99 +157,125 @@ def get_team_embed(title, team=None, thumbnail: bool = True):
if team:
embed = discord.Embed(
title=title,
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16)
color=int(team["color"], 16) if team["color"] else int(SBA_COLOR, 16),
)
embed.set_footer(
text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {team["season"]}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES['logo'])
embed.set_thumbnail(url=team["logo"] if team["logo"] else IMAGES["logo"])
else:
embed = discord.Embed(
title=title,
color=int(SBA_COLOR, 16)
embed = discord.Embed(title=title, color=int(SBA_COLOR, 16))
embed.set_footer(
text=f"Paper Dynasty Season {PD_SEASON}", icon_url=IMAGES["logo"]
)
embed.set_footer(text=f'Paper Dynasty Season {PD_SEASON}', icon_url=IMAGES['logo'])
if thumbnail:
embed.set_thumbnail(url=IMAGES['logo'])
embed.set_thumbnail(url=IMAGES["logo"])
return embed
async def create_channel_old(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True, allowed_members=None,
allowed_roles=None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
allowed_members=None,
allowed_roles=None,
):
"""Create a text channel with specified permissions (legacy version)."""
this_category = discord.utils.get(ctx.guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
ctx.guild.me: discord.PermissionOverwrite(read_messages=True, send_messages=True),
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
ctx.guild.me: discord.PermissionOverwrite(
read_messages=True, send_messages=True
),
ctx.guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if allowed_members:
if isinstance(allowed_members, list):
for member in allowed_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if allowed_roles:
if isinstance(allowed_roles, list):
for role in allowed_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
this_channel = await ctx.guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
async def create_channel(
ctx, channel_name: str, category_name: str, everyone_send=False, everyone_read=True,
read_send_members: list = None, read_send_roles: list = None, read_only_roles: list = None):
ctx,
channel_name: str,
category_name: str,
everyone_send=False,
everyone_read=True,
read_send_members: list = None,
read_send_roles: list = None,
read_only_roles: list = None,
):
"""Create a text channel with specified permissions."""
# Handle both Context and Interaction objects
guild = ctx.guild if hasattr(ctx, 'guild') else None
guild = ctx.guild if hasattr(ctx, "guild") else None
if not guild:
raise ValueError(f'Unable to access guild from context object')
raise ValueError(f"Unable to access guild from context object")
# Get bot member - different for Context vs Interaction
if hasattr(ctx, 'me'): # Context object
if hasattr(ctx, "me"): # Context object
bot_member = ctx.me
elif hasattr(ctx, 'client'): # Interaction object
elif hasattr(ctx, "client"): # Interaction object
bot_member = guild.get_member(ctx.client.user.id)
else:
# Fallback - try to find bot member by getting the first member with bot=True
bot_member = next((m for m in guild.members if m.bot), None)
if not bot_member:
raise ValueError(f'Unable to find bot member in guild')
raise ValueError(f"Unable to find bot member in guild")
this_category = discord.utils.get(guild.categories, name=category_name)
if not this_category:
raise ValueError(f'I couldn\'t find a category named **{category_name}**')
raise ValueError(f"I couldn't find a category named **{category_name}**")
overwrites = {
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True),
guild.default_role: discord.PermissionOverwrite(read_messages=everyone_read, send_messages=everyone_send)
guild.default_role: discord.PermissionOverwrite(
read_messages=everyone_read, send_messages=everyone_send
),
}
if read_send_members:
for member in read_send_members:
overwrites[member] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[member] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_send_roles:
for role in read_send_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=True)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=True
)
if read_only_roles:
for role in read_only_roles:
overwrites[role] = discord.PermissionOverwrite(read_messages=True, send_messages=False)
overwrites[role] = discord.PermissionOverwrite(
read_messages=True, send_messages=False
)
this_channel = await guild.create_text_channel(
channel_name,
overwrites=overwrites,
category=this_category
channel_name, overwrites=overwrites, category=this_category
)
logger.info(f'Creating channel ({channel_name}) in ({category_name})')
logger.info(f"Creating channel ({channel_name}) in ({category_name})")
return this_channel
return this_channel

View File

@ -0,0 +1,154 @@
"""
Tests for evolution tier completion notification embeds (WP-14).
These are pure unit tests no database or Discord bot connection required.
Each test constructs embeds and asserts on title, description, color, and
footer to verify the notification design spec is met.
"""
import discord
from utilities.evolution_notifications import (
TIER_COLORS,
build_tier_embeds,
tier_up_embed,
)
class TestTierUpEmbed:
"""Unit tests for tier_up_embed() — standard (T1T3) and fully-evolved (T4) paths."""
def test_tier_up_title(self):
"""Standard tier-up embeds must use the 'Evolution Tier Up!' title."""
embed = tier_up_embed(
"Mike Trout", tier=2, tier_name="Rising", track_name="Batter"
)
assert embed.title == "Evolution Tier Up!"
def test_tier_up_description_format(self):
"""Description must include player name, tier number, tier name, and track name."""
embed = tier_up_embed(
"Mike Trout", tier=2, tier_name="Rising", track_name="Batter"
)
assert (
embed.description
== "Mike Trout reached Tier 2 (Rising) on the Batter track"
)
def test_tier_up_color_matches_tier(self):
"""Each tier must map to its specified embed color."""
for tier, expected_color in TIER_COLORS.items():
if tier == 4:
continue # T4 handled in fully-evolved tests
embed = tier_up_embed(
"Test Player", tier=tier, tier_name="Name", track_name="Batter"
)
assert embed.color.value == expected_color, f"Tier {tier} color mismatch"
def test_tier_up_no_footer_for_standard_tiers(self):
"""Standard tier-up embeds (T1T3) must not have a footer."""
for tier in (1, 2, 3):
embed = tier_up_embed(
"Test Player", tier=tier, tier_name="Name", track_name="Batter"
)
assert embed.footer.text is None
class TestFullyEvolvedEmbed:
"""Unit tests for the fully-evolved (T4) embed — distinct title, description, and footer."""
def test_fully_evolved_title(self):
"""T4 embeds must use the 'FULLY EVOLVED!' title."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.title == "FULLY EVOLVED!"
def test_fully_evolved_description(self):
"""T4 description must indicate maximum evolution without mentioning tier number."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert (
embed.description
== "Mike Trout has reached maximum evolution on the Batter track"
)
def test_fully_evolved_footer(self):
"""T4 embeds must include the Phase 2 teaser footer."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.footer.text == "Rating boosts coming in a future update!"
def test_fully_evolved_color(self):
"""T4 embed color must be teal."""
embed = tier_up_embed(
"Mike Trout", tier=4, tier_name="Legendary", track_name="Batter"
)
assert embed.color.value == TIER_COLORS[4]
class TestBuildTierEmbeds:
"""Unit tests for build_tier_embeds() — list construction and edge cases."""
def test_no_tier_ups_returns_empty_list(self):
"""When no tier-ups occurred, build_tier_embeds must return an empty list."""
result = build_tier_embeds([])
assert result == []
def test_single_tier_up_returns_one_embed(self):
"""A single tier-up event must produce exactly one embed."""
tier_ups = [
{
"player_name": "Mike Trout",
"tier": 2,
"tier_name": "Rising",
"track_name": "Batter",
}
]
result = build_tier_embeds(tier_ups)
assert len(result) == 1
assert isinstance(result[0], discord.Embed)
def test_multiple_tier_ups_return_separate_embeds(self):
"""Multiple tier-up events in one game must produce one embed per event."""
tier_ups = [
{
"player_name": "Mike Trout",
"tier": 2,
"tier_name": "Rising",
"track_name": "Batter",
},
{
"player_name": "Sandy Koufax",
"tier": 3,
"tier_name": "Elite",
"track_name": "Starter",
},
]
result = build_tier_embeds(tier_ups)
assert len(result) == 2
assert (
result[0].description
== "Mike Trout reached Tier 2 (Rising) on the Batter track"
)
assert (
result[1].description
== "Sandy Koufax reached Tier 3 (Elite) on the Starter track"
)
def test_fully_evolved_in_batch(self):
"""A T4 event in a batch must produce a fully-evolved embed, not a standard one."""
tier_ups = [
{
"player_name": "Babe Ruth",
"tier": 4,
"tier_name": "Legendary",
"track_name": "Batter",
}
]
result = build_tier_embeds(tier_ups)
assert len(result) == 1
assert result[0].title == "FULLY EVOLVED!"
assert result[0].footer.text == "Rating boosts coming in a future update!"

View File

@ -0,0 +1,59 @@
import discord
# Tier colors as Discord embed color integers
TIER_COLORS = {
1: 0x57F287, # green
2: 0xF1C40F, # gold
3: 0x9B59B6, # purple
4: 0x1ABC9C, # teal
}
MAX_TIER = 4
def tier_up_embed(
player_name: str, tier: int, tier_name: str, track_name: str
) -> discord.Embed:
"""
Build a Discord embed for a single evolution tier-up event.
For tier 4 (fully evolved), uses a distinct title, description, and footer.
For tiers 13, uses the standard tier-up format.
"""
color = TIER_COLORS.get(tier, 0xFFFFFF)
if tier == MAX_TIER:
embed = discord.Embed(
title="FULLY EVOLVED!",
description=f"{player_name} has reached maximum evolution on the {track_name} track",
color=color,
)
embed.set_footer(text="Rating boosts coming in a future update!")
else:
embed = discord.Embed(
title="Evolution Tier Up!",
description=f"{player_name} reached Tier {tier} ({tier_name}) on the {track_name} track",
color=color,
)
return embed
def build_tier_embeds(tier_ups: list) -> list:
"""
Build a list of Discord embeds for all tier-up events in a game.
Each item in tier_ups should be a dict with keys:
player_name (str), tier (int), tier_name (str), track_name (str)
Returns an empty list if there are no tier-ups.
"""
return [
tier_up_embed(
player_name=t["player_name"],
tier=t["tier"],
tier_name=t["tier_name"],
track_name=t["track_name"],
)
for t in tier_ups
]