Compare commits

..

1 Commits

Author SHA1 Message Date
Cal Corum
61f36353d8 perf: add caching for frequently-accessed stable data (#91)
Closes #91

- league_service.get_current_state(): @cached_single_item(ttl=60) — 60s Redis cache
- standings_service.get_league_standings(): in-memory dict cache with 10-minute TTL keyed by season
- player_service.get_free_agents(): @cached_api_call(ttl=300) — 5-minute Redis cache
- dice/rolls.py _get_channel_embed_color(): in-memory dict cache keyed by channel_id with 5-minute TTL, matching the autocomplete.py pattern from PR #100

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 19:41:31 +00:00
11 changed files with 693 additions and 743 deletions

View File

@ -3,7 +3,10 @@ Dice Rolling Commands
Implements slash commands for dice rolling functionality required for gameplay.
"""
import random
import time
from typing import Dict, Tuple
import discord
from discord.ext import commands
@ -13,7 +16,11 @@ from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from utils.team_utils import get_user_major_league_team
from utils.text_utils import split_text_for_fields
from utils.dice_utils import DiceRoll, parse_and_roll_multiple_dice, parse_and_roll_single_dice
from utils.dice_utils import (
DiceRoll,
parse_and_roll_multiple_dice,
parse_and_roll_single_dice,
)
from views.embeds import EmbedColors, EmbedTemplate
from commands.dev.loaded_dice import get_and_consume_loaded_roll
from .chart_data import (
@ -33,26 +40,27 @@ from .chart_data import (
PITCHER_ERRORS,
)
# In-memory cache for channel embed color lookups: channel_id -> (color, cached_at)
_channel_color_cache: Dict[int, Tuple[int, float]] = {}
_CHANNEL_COLOR_CACHE_TTL = 300 # 5 minutes
class DiceRollCommands(commands.Cog):
"""Dice rolling command handlers for gameplay."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DiceRollCommands')
self.logger = get_contextual_logger(f"{__name__}.DiceRollCommands")
@discord.app_commands.command(
name="roll",
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)"
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)",
)
@discord.app_commands.describe(
dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)"
)
@logged_command("/roll")
async def roll_dice(
self,
interaction: discord.Interaction,
dice: str
):
async def roll_dice(self, interaction: discord.Interaction, dice: str):
"""Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon."""
await interaction.response.defer()
@ -61,7 +69,7 @@ class DiceRollCommands(commands.Cog):
if not roll_results:
await interaction.followup.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20",
ephemeral=True
ephemeral=True,
)
return
@ -72,18 +80,24 @@ class DiceRollCommands(commands.Cog):
@commands.command(name="roll", aliases=["r", "dice"])
async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None):
"""Roll dice using prefix commands (!roll, !r, !dice)."""
self.logger.info(f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice)
self.logger.info(
f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice
)
if dice is None:
self.logger.debug("No dice input provided")
await ctx.send("❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`")
await ctx.send(
"❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`"
)
return
# Parse and validate dice notation (supports multiple rolls)
roll_results = parse_and_roll_multiple_dice(dice)
if not roll_results:
self.logger.warning("Invalid dice notation provided", dice_input=dice)
await ctx.send("❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20")
await ctx.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20"
)
return
self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results))
@ -92,10 +106,7 @@ class DiceRollCommands(commands.Cog):
embed = self._create_multi_roll_embed(dice, roll_results, ctx.author)
await ctx.send(embed=embed)
@discord.app_commands.command(
name="d20",
description="Roll a single d20"
)
@discord.app_commands.command(name="d20", description="Roll a single d20")
@logged_command("/d20")
async def d20_dice(self, interaction: discord.Interaction):
"""Roll a single d20."""
@ -112,15 +123,14 @@ class DiceRollCommands(commands.Cog):
roll_results,
interaction.user,
set_author=False,
embed_color=embed_color
embed_color=embed_color,
)
embed.title = f'd20 roll for {interaction.user.display_name}'
embed.title = f"d20 roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="ab",
description="Roll baseball at-bat dice (1d6;2d6;1d20)"
name="ab", description="Roll baseball at-bat dice (1d6;2d6;1d20)"
)
@logged_command("/ab")
async def ab_dice(self, interaction: discord.Interaction):
@ -137,7 +147,7 @@ class DiceRollCommands(commands.Cog):
# Create DiceRoll objects from loaded values
# For 2d6, we split the total into two dice (arbitrary split that sums correctly)
d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6)
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
roll_results = [
DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1),
DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total),
@ -149,17 +159,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation)
injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12])
injury_risk = (roll_results[0].total == 6) and (
roll_results[1].total in [7, 8, 9, 10, 11, 12]
)
d6_total = roll_results[1].total
embed_title = 'At bat roll'
embed_title = "At bat roll"
if roll_results[2].total == 1:
embed_title = 'Wild pitch roll'
dice_notation = '1d20'
embed_title = "Wild pitch roll"
dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)]
elif roll_results[2].total == 2:
embed_title = 'PB roll'
dice_notation = '1d20'
embed_title = "PB roll"
dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)]
# Create embed for the roll results
@ -168,15 +180,15 @@ class DiceRollCommands(commands.Cog):
roll_results,
interaction.user,
set_author=False,
embed_color=embed_color
embed_color=embed_color,
)
embed.title = f'{embed_title} for {interaction.user.display_name}'
embed.title = f"{embed_title} for {interaction.user.display_name}"
if injury_risk and embed_title == 'At bat roll':
if injury_risk and embed_title == "At bat roll":
embed.add_field(
name=f'Check injury for pitcher injury rating {13 - d6_total}',
value='Oops! All injuries!',
inline=False
name=f"Check injury for pitcher injury rating {13 - d6_total}",
value="Oops! All injuries!",
inline=False,
)
await interaction.followup.send(embed=embed)
@ -188,35 +200,43 @@ class DiceRollCommands(commands.Cog):
team = await get_user_major_league_team(user_id=ctx.author.id)
embed_color = EmbedColors.PRIMARY
if team is not None and team.color is not None:
embed_color = int(team.color,16)
embed_color = int(team.color, 16)
# Use the standard baseball dice combination
dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("At Bat dice rolled successfully", roll_count=len(roll_results))
self.logger.info(
"At Bat dice rolled successfully", roll_count=len(roll_results)
)
# Create embed for the roll results
embed = self._create_multi_roll_embed(dice_notation, roll_results, ctx.author, set_author=False, embed_color=embed_color)
embed.title = f'At bat roll for {ctx.author.display_name}'
embed = self._create_multi_roll_embed(
dice_notation,
roll_results,
ctx.author,
set_author=False,
embed_color=embed_color,
)
embed.title = f"At bat roll for {ctx.author.display_name}"
await ctx.send(embed=embed)
@discord.app_commands.command(
name="scout",
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type"
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type",
)
@discord.app_commands.describe(
card_type="Type of card being scouted"
@discord.app_commands.describe(card_type="Type of card being scouted")
@discord.app_commands.choices(
card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher"),
]
)
@discord.app_commands.choices(card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher")
])
@logged_command("/scout")
async def scout_dice(
self,
interaction: discord.Interaction,
card_type: discord.app_commands.Choice[str]
card_type: discord.app_commands.Choice[str],
):
"""Roll weighted scouting dice based on card type (batter or pitcher)."""
await interaction.response.defer()
@ -228,33 +248,35 @@ class DiceRollCommands(commands.Cog):
roll_results = self._roll_weighted_scout_dice(card_type_value)
# Create embed for the roll results
embed = self._create_multi_roll_embed("1d6;2d6;1d20", roll_results, interaction.user, set_author=False)
embed.title = f'Scouting roll for {interaction.user.display_name}'
embed = self._create_multi_roll_embed(
"1d6;2d6;1d20", roll_results, interaction.user, set_author=False
)
embed.title = f"Scouting roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="fielding",
description="Roll Super Advanced fielding dice for a defensive position"
description="Roll Super Advanced fielding dice for a defensive position",
)
@discord.app_commands.describe(
position="Defensive position"
@discord.app_commands.describe(position="Defensive position")
@discord.app_commands.choices(
position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF"),
]
)
@discord.app_commands.choices(position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF")
])
@logged_command("/fielding")
async def fielding_roll(
self,
interaction: discord.Interaction,
position: discord.app_commands.Choice[str]
position: discord.app_commands.Choice[str],
):
"""Roll Super Advanced fielding dice for a defensive position."""
await interaction.response.defer()
@ -268,16 +290,25 @@ class DiceRollCommands(commands.Cog):
roll_results = parse_and_roll_multiple_dice(dice_notation)
# Create fielding embed
embed = self._create_fielding_embed(pos_value, roll_results, interaction.user, embed_color)
embed = self._create_fielding_embed(
pos_value, roll_results, interaction.user, embed_color
)
await interaction.followup.send(embed=embed)
@commands.command(name="f", aliases=["fielding", "saf"])
async def fielding_roll_prefix(self, ctx: commands.Context, position: str | None = None):
async def fielding_roll_prefix(
self, ctx: commands.Context, position: str | None = None
):
"""Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf)."""
self.logger.info(f"SA Fielding command started by {ctx.author.display_name}", position=position)
self.logger.info(
f"SA Fielding command started by {ctx.author.display_name}",
position=position,
)
if position is None:
await ctx.send("❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`")
await ctx.send(
"❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`"
)
return
# Parse and validate position
@ -290,15 +321,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d20;3d6;1d100"
roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("SA Fielding dice rolled successfully", position=parsed_position, d20=roll_results[0].total, d6_total=roll_results[1].total)
self.logger.info(
"SA Fielding dice rolled successfully",
position=parsed_position,
d20=roll_results[0].total,
d6_total=roll_results[1].total,
)
# Create fielding embed
embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author)
await ctx.send(embed=embed)
@discord.app_commands.command(
name="jump",
description="Roll for baserunner's jump before stealing"
name="jump", description="Roll for baserunner's jump before stealing"
)
@logged_command("/jump")
async def jump_dice(self, interaction: discord.Interaction):
@ -322,7 +357,7 @@ class DiceRollCommands(commands.Cog):
resolution_roll,
interaction.user,
embed_color,
show_author=False
show_author=False,
)
await interaction.followup.send(embed=embed)
@ -344,31 +379,47 @@ class DiceRollCommands(commands.Cog):
# Roll another 1d20 for pickoff/balk resolution
resolution_roll = random.randint(1, 20)
self.logger.info("Jump dice rolled successfully", check=check_roll, jump=jump_result.total if jump_result else None, resolution=resolution_roll)
self.logger.info(
"Jump dice rolled successfully",
check=check_roll,
jump=jump_result.total if jump_result else None,
resolution=resolution_roll,
)
# Create embed based on check roll
embed = self._create_jump_embed(
check_roll,
jump_result,
resolution_roll,
ctx.author,
embed_color
check_roll, jump_result, resolution_roll, ctx.author, embed_color
)
await ctx.send(embed=embed)
async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int:
channel_id = interaction.channel_id
if channel_id is not None and channel_id in _channel_color_cache:
cached_color, cached_at = _channel_color_cache[channel_id]
if time.time() - cached_at < _CHANNEL_COLOR_CACHE_TTL:
return cached_color
# Check if channel is a type that has a name attribute (DMChannel doesn't have one)
if isinstance(interaction.channel, (discord.TextChannel, discord.VoiceChannel, discord.Thread)):
if isinstance(
interaction.channel,
(discord.TextChannel, discord.VoiceChannel, discord.Thread),
):
channel_starter = interaction.channel.name[:6]
if '-' in channel_starter:
abbrev = channel_starter.split('-')[0]
if "-" in channel_starter:
abbrev = channel_starter.split("-")[0]
channel_team = await team_service.get_team_by_abbrev(abbrev)
if channel_team is not None and channel_team.color is not None:
return int(channel_team.color,16)
color = int(channel_team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
team = await get_user_major_league_team(user_id=interaction.user.id)
if team is not None and team.color is not None:
return int(team.color,16)
color = int(team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
return EmbedColors.PRIMARY
@ -381,25 +432,44 @@ class DiceRollCommands(commands.Cog):
# Map common inputs to standard position names
position_map = {
'P': 'P', 'PITCHER': 'P',
'C': 'C', 'CATCHER': 'C',
'1': '1B', '1B': '1B', 'FIRST': '1B', 'FIRSTBASE': '1B',
'2': '2B', '2B': '2B', 'SECOND': '2B', 'SECONDBASE': '2B',
'3': '3B', '3B': '3B', 'THIRD': '3B', 'THIRDBASE': '3B',
'SS': 'SS', 'SHORT': 'SS', 'SHORTSTOP': 'SS',
'LF': 'LF', 'LEFT': 'LF', 'LEFTFIELD': 'LF',
'CF': 'CF', 'CENTER': 'CF', 'CENTERFIELD': 'CF',
'RF': 'RF', 'RIGHT': 'RF', 'RIGHTFIELD': 'RF'
"P": "P",
"PITCHER": "P",
"C": "C",
"CATCHER": "C",
"1": "1B",
"1B": "1B",
"FIRST": "1B",
"FIRSTBASE": "1B",
"2": "2B",
"2B": "2B",
"SECOND": "2B",
"SECONDBASE": "2B",
"3": "3B",
"3B": "3B",
"THIRD": "3B",
"THIRDBASE": "3B",
"SS": "SS",
"SHORT": "SS",
"SHORTSTOP": "SS",
"LF": "LF",
"LEFT": "LF",
"LEFTFIELD": "LF",
"CF": "CF",
"CENTER": "CF",
"CENTERFIELD": "CF",
"RF": "RF",
"RIGHT": "RF",
"RIGHTFIELD": "RF",
}
return position_map.get(pos)
def _create_fielding_embed(
self,
position: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY
self,
position: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed:
"""Create an embed for fielding roll results."""
d20_result = roll_results[0].total
@ -409,15 +479,11 @@ class DiceRollCommands(commands.Cog):
# Create base embed
embed = EmbedTemplate.create_base_embed(
title=f"SA Fielding roll for {user.display_name}",
color=embed_color
title=f"SA Fielding roll for {user.display_name}", color=embed_color
)
# Set user info
embed.set_author(
name=user.display_name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
# Add dice results in standard format
dice_notation = "1d20;3d6"
@ -425,18 +491,14 @@ class DiceRollCommands(commands.Cog):
# Extract just the dice result part from the field
dice_field_value = embed_dice.fields[0].value
embed.add_field(
name="Dice Results",
value=dice_field_value,
inline=False
)
embed.add_field(name="Dice Results", value=dice_field_value, inline=False)
# Add fielding check summary
range_result = self._get_range_result(position, d20_result)
embed.add_field(
name=f"{position} Range Result",
value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```",
inline=False
inline=False,
)
# Add rare play or error result
@ -457,19 +519,15 @@ class DiceRollCommands(commands.Cog):
field_name = base_field_name
# Add part indicator if multiple chunks
if len(result_chunks) > 1:
field_name += f" (Part {i+1}/{len(result_chunks)})"
field_name += f" (Part {i + 1}/{len(result_chunks)})"
embed.add_field(
name=field_name,
value=chunk,
inline=False
)
embed.add_field(name=field_name, value=chunk, inline=False)
# Add help commands
embed.add_field(
name="Help Commands",
value="Run /charts for full chart readout",
inline=False
inline=False,
)
# # Add references
@ -488,125 +546,115 @@ class DiceRollCommands(commands.Cog):
resolution_roll: int,
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY,
show_author: bool = True
show_author: bool = True,
) -> discord.Embed:
"""Create an embed for jump roll results."""
# Create base embed
embed = EmbedTemplate.create_base_embed(
title=f"Jump roll for {user.name}",
color=embed_color
title=f"Jump roll for {user.name}", color=embed_color
)
if show_author:
# Set user info
embed.set_author(
name=user.name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
# Check for pickoff or balk
if check_roll == 1:
# Pickoff attempt
embed.add_field(
name="Special",
value="```md\nCheck pickoff```",
inline=False
name="Special", value="```md\nCheck pickoff```", inline=False
)
embed.add_field(
name="Pickoff roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False
inline=False,
)
elif check_roll == 2:
# Balk
embed.add_field(
name="Special",
value="```md\nCheck balk```",
inline=False
)
embed.add_field(name="Special", value="```md\nCheck balk```", inline=False)
embed.add_field(
name="Balk roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False
inline=False,
)
else:
# Normal jump - show 2d6 result
if jump_result:
rolls_str = ' '.join(str(r) for r in jump_result.rolls)
rolls_str = " ".join(str(r) for r in jump_result.rolls)
embed.add_field(
name="Result",
value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```",
inline=False
inline=False,
)
return embed
def _get_range_result(self, position: str, d20_roll: int) -> str:
"""Get the range result display for a position and d20 roll."""
if position == 'P':
if position == "P":
return self._get_pitcher_range(d20_roll)
elif position in ['1B', '2B', '3B', 'SS']:
elif position in ["1B", "2B", "3B", "SS"]:
return self._get_infield_range(d20_roll)
elif position in ['LF', 'CF', 'RF']:
elif position in ["LF", "CF", "RF"]:
return self._get_outfield_range(d20_roll)
elif position == 'C':
elif position == "C":
return self._get_catcher_range(d20_roll)
return "Unknown position"
def _get_infield_range(self, d20_roll: int) -> str:
"""Get infield range result based on d20 roll."""
return INFIELD_RANGES.get(d20_roll, 'Unknown')
return INFIELD_RANGES.get(d20_roll, "Unknown")
def _get_outfield_range(self, d20_roll: int) -> str:
"""Get outfield range result based on d20 roll."""
return OUTFIELD_RANGES.get(d20_roll, 'Unknown')
return OUTFIELD_RANGES.get(d20_roll, "Unknown")
def _get_catcher_range(self, d20_roll: int) -> str:
"""Get catcher range result based on d20 roll."""
return CATCHER_RANGES.get(d20_roll, 'Unknown')
return CATCHER_RANGES.get(d20_roll, "Unknown")
def _get_pitcher_range(self, d20_roll: int) -> str:
"""Get pitcher range result based on d20 roll."""
return PITCHER_RANGES.get(d20_roll, 'Unknown')
return PITCHER_RANGES.get(d20_roll, "Unknown")
def _get_rare_play(self, position: str, d20_total: int) -> str:
"""Get the rare play result for a position and d20 total"""
starter = 'Rare play! Take the range result from above and consult the chart below.\n\n'
if position == 'P':
starter = "Rare play! Take the range result from above and consult the chart below.\n\n"
if position == "P":
return starter + self._get_pitcher_rare_play(d20_total)
elif position == '1B':
elif position == "1B":
return starter + self._get_infield_rare_play(d20_total)
elif position == '2B':
elif position == "2B":
return starter + self._get_infield_rare_play(d20_total)
elif position == '3B':
elif position == "3B":
return starter + self._get_infield_rare_play(d20_total)
elif position == 'SS':
elif position == "SS":
return starter + self._get_infield_rare_play(d20_total)
elif position in ['LF', 'RF']:
elif position in ["LF", "RF"]:
return starter + self._get_outfield_rare_play(d20_total)
elif position == 'CF':
elif position == "CF":
return starter + self._get_outfield_rare_play(d20_total)
raise ValueError(f'Unknown position: {position}')
raise ValueError(f"Unknown position: {position}")
def _get_error_result(self, position: str, d6_total: int) -> str:
"""Get the error result for a position and 3d6 total."""
# Get the appropriate error chart
if position == 'P':
if position == "P":
return self._get_pitcher_error(d6_total)
elif position == '1B':
elif position == "1B":
return self._get_1b_error(d6_total)
elif position == '2B':
elif position == "2B":
return self._get_2b_error(d6_total)
elif position == '3B':
elif position == "3B":
return self._get_3b_error(d6_total)
elif position == 'SS':
elif position == "SS":
return self._get_ss_error(d6_total)
elif position in ['LF', 'RF']:
elif position in ["LF", "RF"]:
return self._get_corner_of_error(d6_total)
elif position == 'CF':
elif position == "CF":
return self._get_cf_error(d6_total)
elif position == 'C':
elif position == "C":
return self._get_catcher_error(d6_total)
# Should never reach here due to position validation, but follow "Raise or Return" pattern
@ -614,65 +662,64 @@ class DiceRollCommands(commands.Cog):
def _get_3b_error(self, d6_total: int) -> str:
"""Get 3B error result based on 3d6 total."""
return THIRD_BASE_ERRORS.get(d6_total, 'No error')
return THIRD_BASE_ERRORS.get(d6_total, "No error")
def _get_1b_error(self, d6_total: int) -> str:
"""Get 1B error result based on 3d6 total."""
return FIRST_BASE_ERRORS.get(d6_total, 'No error')
return FIRST_BASE_ERRORS.get(d6_total, "No error")
def _get_2b_error(self, d6_total: int) -> str:
"""Get 2B error result based on 3d6 total."""
return SECOND_BASE_ERRORS.get(d6_total, 'No error')
return SECOND_BASE_ERRORS.get(d6_total, "No error")
def _get_ss_error(self, d6_total: int) -> str:
"""Get SS error result based on 3d6 total."""
return SHORTSTOP_ERRORS.get(d6_total, 'No error')
return SHORTSTOP_ERRORS.get(d6_total, "No error")
def _get_corner_of_error(self, d6_total: int) -> str:
"""Get LF/RF error result based on 3d6 total."""
return CORNER_OUTFIELD_ERRORS.get(d6_total, 'No error')
return CORNER_OUTFIELD_ERRORS.get(d6_total, "No error")
def _get_cf_error(self, d6_total: int) -> str:
"""Get CF error result based on 3d6 total."""
return CENTER_FIELD_ERRORS.get(d6_total, 'No error')
return CENTER_FIELD_ERRORS.get(d6_total, "No error")
def _get_catcher_error(self, d6_total: int) -> str:
"""Get Catcher error result based on 3d6 total."""
return CATCHER_ERRORS.get(d6_total, 'No error')
return CATCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_error(self, d6_total: int) -> str:
"""Get Pitcher error result based on 3d6 total."""
return PITCHER_ERRORS.get(d6_total, 'No error')
return PITCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_rare_play(self, d20_total: int) -> str:
return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
)
def _get_infield_rare_play(self, d20_total: int) -> str:
return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
)
def _get_outfield_rare_play(self, d20_total: int) -> str:
return (
f'**F1**: {OUTFIELD_X_CHART["f1"]["rp"]}\n'
f'**F2**: {OUTFIELD_X_CHART["f2"]["rp"]}\n'
f'**F3**: {OUTFIELD_X_CHART["f3"]["rp"]}\n'
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
f'**DO2**: {OUTFIELD_X_CHART["do2"]["rp"]}\n'
f'**DO3**: {OUTFIELD_X_CHART["do3"]["rp"]}\n'
f'**TR3**: {OUTFIELD_X_CHART["tr3"]["rp"]}\n'
f"**F1**: {OUTFIELD_X_CHART['f1']['rp']}\n"
f"**F2**: {OUTFIELD_X_CHART['f2']['rp']}\n"
f"**F3**: {OUTFIELD_X_CHART['f3']['rp']}\n"
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
f"**DO2**: {OUTFIELD_X_CHART['do2']['rp']}\n"
f"**DO3**: {OUTFIELD_X_CHART['do3']['rp']}\n"
f"**TR3**: {OUTFIELD_X_CHART['tr3']['rp']}\n"
)
def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]:
"""
Roll scouting dice with weighted first d6 based on card type.
@ -690,11 +737,11 @@ class DiceRollCommands(commands.Cog):
first_roll = random.randint(4, 6)
first_d6_result = DiceRoll(
dice_notation='1d6',
dice_notation="1d6",
num_dice=1,
die_sides=6,
rolls=[first_roll],
total=first_roll
total=first_roll,
)
# Second roll (2d6) - normal
@ -705,19 +752,20 @@ class DiceRollCommands(commands.Cog):
return [first_d6_result, second_result, third_result]
def _create_multi_roll_embed(self, dice_notation: str, roll_results: list[DiceRoll], user: discord.User | discord.Member, set_author: bool = True, embed_color: int = EmbedColors.PRIMARY) -> discord.Embed:
def _create_multi_roll_embed(
self,
dice_notation: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
set_author: bool = True,
embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed:
"""Create an embed for multiple dice roll results."""
embed = EmbedTemplate.create_base_embed(
title="🎲 Dice Roll",
color=embed_color
)
embed = EmbedTemplate.create_base_embed(title="🎲 Dice Roll", color=embed_color)
if set_author:
# Set user info
embed.set_author(
name=user.name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
# Create summary line with totals
totals = [str(result.total) for result in roll_results]
@ -735,19 +783,16 @@ class DiceRollCommands(commands.Cog):
roll_groups.append(str(rolls[0]))
else:
# Multiple dice: space-separated within the group
roll_groups.append(' '.join(str(r) for r in rolls))
roll_groups.append(" ".join(str(r) for r in rolls))
details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]"
# Set as description
embed.add_field(
name='Result',
value=f"```md\n{summary}\n{details}```"
)
embed.add_field(name="Result", value=f"```md\n{summary}\n{details}```")
return embed
async def setup(bot: commands.Bot):
"""Load the dice roll commands cog."""
await bot.add_cog(DiceRollCommands(bot))
await bot.add_cog(DiceRollCommands(bot))

View File

@ -4,7 +4,6 @@ Custom Commands Service for Discord Bot v2.0
Modern async service layer for managing custom commands with full type safety.
"""
import asyncio
import math
from datetime import UTC, datetime, timedelta
from typing import Optional, List, Any, Tuple
@ -120,8 +119,8 @@ class CustomCommandsService(BaseService[CustomCommand]):
content_length=len(content),
)
# Return command with creator info (use POST response directly)
return result.model_copy(update={"creator": creator})
# Return full command with creator info
return await self.get_command_by_name(name)
async def get_command_by_name(self, name: str) -> CustomCommand:
"""
@ -218,8 +217,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
new_content_length=len(new_content),
)
# Return updated command with creator info (use PUT response directly)
return result.model_copy(update={"creator": command.creator})
return await self.get_command_by_name(name)
async def delete_command(
self, name: str, deleter_discord_id: int, force: bool = False
@ -468,28 +466,21 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands_data = await self.get_items_with_params(params)
creators = await asyncio.gather(
*[
self.get_creator_by_id(cmd_data.creator_id)
for cmd_data in commands_data
],
return_exceptions=True,
)
commands = []
for cmd_data, creator in zip(commands_data, creators):
if isinstance(creator, BotException):
for cmd_data in commands_data:
try:
creator = await self.get_creator_by_id(cmd_data.creator_id)
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning(
"Skipping popular command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(creator),
error=str(e),
)
continue
if isinstance(creator, BaseException):
raise creator
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
return commands
@ -545,9 +536,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Update username if it changed
if creator.username != username or creator.display_name != display_name:
await self._update_creator_info(creator.id, username, display_name)
creator = creator.model_copy(
update={"username": username, "display_name": display_name}
)
creator = await self.get_creator_by_discord_id(discord_id)
return creator
except BotException:
# Creator doesn't exist, create new one
@ -568,8 +557,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
if not result:
raise BotException("Failed to create command creator")
# Return created creator directly from POST response
return CustomCommandCreator(**result)
return await self.get_creator_by_discord_id(discord_id)
async def get_creator_by_discord_id(self, discord_id: int) -> CustomCommandCreator:
"""Get creator by Discord ID.
@ -622,34 +610,31 @@ class CustomCommandsService(BaseService[CustomCommand]):
async def get_statistics(self) -> CustomCommandStats:
"""Get comprehensive statistics about custom commands."""
week_ago = datetime.now(UTC) - timedelta(days=7)
# Get basic counts
total_commands = await self._get_search_count([])
active_commands = await self._get_search_count([("is_active", True)])
total_creators = await self._get_creator_count()
(
total_commands,
active_commands,
total_creators,
all_commands,
popular_commands,
most_active_creator,
recent_count,
warning_count,
deletion_count,
) = await asyncio.gather(
self._get_search_count([]),
self._get_search_count([("is_active", True)]),
self._get_creator_count(),
self.get_items_with_params([("is_active", True)]),
self.get_popular_commands(limit=1),
self._get_most_active_creator(),
self._get_search_count(
[("created_at__gte", week_ago.isoformat()), ("is_active", True)]
),
self._get_commands_needing_warning_count(),
self._get_commands_eligible_for_deletion_count(),
# Get total uses
all_commands = await self.get_items_with_params([("is_active", True)])
total_uses = sum(cmd.use_count for cmd in all_commands)
# Get most popular command
popular_commands = await self.get_popular_commands(limit=1)
most_popular = popular_commands[0] if popular_commands else None
# Get most active creator
most_active_creator = await self._get_most_active_creator()
# Get recent commands count
week_ago = datetime.now(UTC) - timedelta(days=7)
recent_count = await self._get_search_count(
[("created_at__gte", week_ago.isoformat()), ("is_active", True)]
)
total_uses = sum(cmd.use_count for cmd in all_commands)
most_popular = popular_commands[0] if popular_commands else None
# Get cleanup statistics
warning_count = await self._get_commands_needing_warning_count()
deletion_count = await self._get_commands_eligible_for_deletion_count()
return CustomCommandStats(
total_commands=total_commands,
@ -677,28 +662,21 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands_data = await self.get_items_with_params(params)
creators = await asyncio.gather(
*[
self.get_creator_by_id(cmd_data.creator_id)
for cmd_data in commands_data
],
return_exceptions=True,
)
commands = []
for cmd_data, creator in zip(commands_data, creators):
if isinstance(creator, BotException):
for cmd_data in commands_data:
try:
creator = await self.get_creator_by_id(cmd_data.creator_id)
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning(
"Skipping command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(creator),
error=str(e),
)
continue
if isinstance(creator, BaseException):
raise creator
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
return commands
@ -710,28 +688,21 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands_data = await self.get_items_with_params(params)
creators = await asyncio.gather(
*[
self.get_creator_by_id(cmd_data.creator_id)
for cmd_data in commands_data
],
return_exceptions=True,
)
commands = []
for cmd_data, creator in zip(commands_data, creators):
if isinstance(creator, BotException):
for cmd_data in commands_data:
try:
creator = await self.get_creator_by_id(cmd_data.creator_id)
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning(
"Skipping command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(creator),
error=str(e),
)
continue
if isinstance(creator, BaseException):
raise creator
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
return commands

View File

@ -4,7 +4,6 @@ Decision Service
Manages pitching decision operations for game submission.
"""
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from utils.logging import get_contextual_logger
@ -125,19 +124,22 @@ class DecisionService:
if int(decision.get("b_save", 0)) == 1:
bsv_ids.append(pitcher_id)
# Second pass: Fetch all Player objects in parallel
# Order: [wp_id, lp_id, sv_id, *hold_ids, *bsv_ids]; None IDs resolve immediately
ordered_ids = [wp_id, lp_id, sv_id] + hold_ids + bsv_ids
results = await asyncio.gather(
*[
player_service.get_player(pid) if pid else asyncio.sleep(0, result=None)
for pid in ordered_ids
]
)
# Second pass: Fetch Player objects
wp = await player_service.get_player(wp_id) if wp_id else None
lp = await player_service.get_player(lp_id) if lp_id else None
sv = await player_service.get_player(sv_id) if sv_id else None
wp, lp, sv = results[0], results[1], results[2]
holders = [p for p in results[3 : 3 + len(hold_ids)] if p]
blown_saves = [p for p in results[3 + len(hold_ids) :] if p]
holders = []
for hold_id in hold_ids:
holder = await player_service.get_player(hold_id)
if holder:
holders.append(holder)
blown_saves = []
for bsv_id in bsv_ids:
bsv = await player_service.get_player(bsv_id)
if bsv:
blown_saves.append(bsv)
return wp, lp, sv, holders, blown_saves

View File

@ -5,7 +5,6 @@ Modern async service layer for managing help commands with full type safety.
Allows admins and help editors to create custom help topics for league documentation,
resources, FAQs, links, and guides.
"""
from typing import Optional, List
from utils.logging import get_contextual_logger
@ -13,7 +12,7 @@ from models.help_command import (
HelpCommand,
HelpCommandSearchFilters,
HelpCommandSearchResult,
HelpCommandStats,
HelpCommandStats
)
from services.base_service import BaseService
from exceptions import BotException
@ -21,19 +20,16 @@ from exceptions import BotException
class HelpCommandNotFoundError(BotException):
"""Raised when a help command is not found."""
pass
class HelpCommandExistsError(BotException):
"""Raised when trying to create a help command that already exists."""
pass
class HelpCommandPermissionError(BotException):
"""Raised when user lacks permission for help command operation."""
pass
@ -41,8 +37,8 @@ class HelpCommandsService(BaseService[HelpCommand]):
"""Service for managing help commands."""
def __init__(self):
super().__init__(HelpCommand, "help_commands")
self.logger = get_contextual_logger(f"{__name__}.HelpCommandsService")
super().__init__(HelpCommand, 'help_commands')
self.logger = get_contextual_logger(f'{__name__}.HelpCommandsService')
self.logger.info("HelpCommandsService initialized")
# === Command CRUD Operations ===
@ -54,7 +50,7 @@ class HelpCommandsService(BaseService[HelpCommand]):
content: str,
creator_discord_id: str,
category: Optional[str] = None,
display_order: int = 0,
display_order: int = 0
) -> HelpCommand:
"""
Create a new help command.
@ -84,16 +80,14 @@ class HelpCommandsService(BaseService[HelpCommand]):
# Create help command data
help_data = {
"name": name.lower().strip(),
"title": title.strip(),
"content": content.strip(),
"category": category.lower().strip() if category else None,
"created_by_discord_id": str(
creator_discord_id
), # Convert to string for safe storage
"display_order": display_order,
"is_active": True,
"view_count": 0,
'name': name.lower().strip(),
'title': title.strip(),
'content': content.strip(),
'category': category.lower().strip() if category else None,
'created_by_discord_id': str(creator_discord_id), # Convert to string for safe storage
'display_order': display_order,
'is_active': True,
'view_count': 0
}
# Create via API
@ -101,18 +95,18 @@ class HelpCommandsService(BaseService[HelpCommand]):
if not result:
raise BotException("Failed to create help command")
self.logger.info(
"Help command created",
help_name=name,
creator_id=creator_discord_id,
category=category,
)
self.logger.info("Help command created",
help_name=name,
creator_id=creator_discord_id,
category=category)
# Return help command directly from POST response
return result
# Return full help command
return await self.get_help_by_name(name)
async def get_help_by_name(
self, name: str, include_inactive: bool = False
self,
name: str,
include_inactive: bool = False
) -> HelpCommand:
"""
Get a help command by name.
@ -132,12 +126,8 @@ class HelpCommandsService(BaseService[HelpCommand]):
try:
# Use the dedicated by_name endpoint for exact lookup
client = await self.get_client()
params = (
[("include_inactive", include_inactive)] if include_inactive else []
)
data = await client.get(
f"help_commands/by_name/{normalized_name}", params=params
)
params = [('include_inactive', include_inactive)] if include_inactive else []
data = await client.get(f'help_commands/by_name/{normalized_name}', params=params)
if not data:
raise HelpCommandNotFoundError(f"Help topic '{name}' not found")
@ -149,9 +139,9 @@ class HelpCommandsService(BaseService[HelpCommand]):
if "404" in str(e) or "not found" in str(e).lower():
raise HelpCommandNotFoundError(f"Help topic '{name}' not found")
else:
self.logger.error(
"Failed to get help command by name", help_name=name, error=e
)
self.logger.error("Failed to get help command by name",
help_name=name,
error=e)
raise BotException(f"Failed to retrieve help topic '{name}': {e}")
async def update_help(
@ -161,7 +151,7 @@ class HelpCommandsService(BaseService[HelpCommand]):
new_content: Optional[str] = None,
updater_discord_id: Optional[str] = None,
new_category: Optional[str] = None,
new_display_order: Optional[int] = None,
new_display_order: Optional[int] = None
) -> HelpCommand:
"""
Update an existing help command.
@ -186,42 +176,35 @@ class HelpCommandsService(BaseService[HelpCommand]):
update_data = {}
if new_title is not None:
update_data["title"] = new_title.strip()
update_data['title'] = new_title.strip()
if new_content is not None:
update_data["content"] = new_content.strip()
update_data['content'] = new_content.strip()
if new_category is not None:
update_data["category"] = (
new_category.lower().strip() if new_category else None
)
update_data['category'] = new_category.lower().strip() if new_category else None
if new_display_order is not None:
update_data["display_order"] = new_display_order
update_data['display_order'] = new_display_order
if updater_discord_id is not None:
update_data["last_modified_by"] = str(
updater_discord_id
) # Convert to string for safe storage
update_data['last_modified_by'] = str(updater_discord_id) # Convert to string for safe storage
if not update_data:
raise BotException("No fields to update")
# Update via API
client = await self.get_client()
result = await client.put(f"help_commands/{help_cmd.id}", update_data)
result = await client.put(f'help_commands/{help_cmd.id}', update_data)
if not result:
raise BotException("Failed to update help command")
self.logger.info(
"Help command updated",
help_name=name,
updater_id=updater_discord_id,
fields_updated=list(update_data.keys()),
)
self.logger.info("Help command updated",
help_name=name,
updater_id=updater_discord_id,
fields_updated=list(update_data.keys()))
# Return updated help command directly from PUT response
return self.model_class.from_api_data(result)
return await self.get_help_by_name(name)
async def delete_help(self, name: str) -> bool:
"""
@ -240,11 +223,11 @@ class HelpCommandsService(BaseService[HelpCommand]):
# Soft delete via API
client = await self.get_client()
await client.delete(f"help_commands/{help_cmd.id}")
await client.delete(f'help_commands/{help_cmd.id}')
self.logger.info(
"Help command soft deleted", help_name=name, help_id=help_cmd.id
)
self.logger.info("Help command soft deleted",
help_name=name,
help_id=help_cmd.id)
return True
@ -269,11 +252,13 @@ class HelpCommandsService(BaseService[HelpCommand]):
# Restore via API
client = await self.get_client()
result = await client.patch(f"help_commands/{help_cmd.id}/restore")
result = await client.patch(f'help_commands/{help_cmd.id}/restore')
if not result:
raise BotException("Failed to restore help command")
self.logger.info("Help command restored", help_name=name, help_id=help_cmd.id)
self.logger.info("Help command restored",
help_name=name,
help_id=help_cmd.id)
return self.model_class.from_api_data(result)
@ -294,9 +279,10 @@ class HelpCommandsService(BaseService[HelpCommand]):
try:
client = await self.get_client()
await client.patch(f"help_commands/by_name/{normalized_name}/view")
await client.patch(f'help_commands/by_name/{normalized_name}/view')
self.logger.debug("Help command view count incremented", help_name=name)
self.logger.debug("Help command view count incremented",
help_name=name)
# Return updated command
return await self.get_help_by_name(name)
@ -305,15 +291,16 @@ class HelpCommandsService(BaseService[HelpCommand]):
if "404" in str(e) or "not found" in str(e).lower():
raise HelpCommandNotFoundError(f"Help topic '{name}' not found")
else:
self.logger.error(
"Failed to increment view count", help_name=name, error=e
)
self.logger.error("Failed to increment view count",
help_name=name,
error=e)
raise BotException(f"Failed to increment view count for '{name}': {e}")
# === Search and Listing ===
async def search_help_commands(
self, filters: HelpCommandSearchFilters
self,
filters: HelpCommandSearchFilters
) -> HelpCommandSearchResult:
"""
Search for help commands with filtering and pagination.
@ -329,23 +316,23 @@ class HelpCommandsService(BaseService[HelpCommand]):
# Apply filters
if filters.name_contains:
params.append(("name", filters.name_contains)) # API will do ILIKE search
params.append(('name', filters.name_contains)) # API will do ILIKE search
if filters.category:
params.append(("category", filters.category))
params.append(('category', filters.category))
params.append(("is_active", filters.is_active))
params.append(('is_active', filters.is_active))
# Add sorting
params.append(("sort", filters.sort_by))
params.append(('sort', filters.sort_by))
# Add pagination
params.append(("page", filters.page))
params.append(("page_size", filters.page_size))
params.append(('page', filters.page))
params.append(('page_size', filters.page_size))
# Execute search via API
client = await self.get_client()
data = await client.get("help_commands", params=params)
data = await client.get('help_commands', params=params)
if not data:
return HelpCommandSearchResult(
@ -354,14 +341,14 @@ class HelpCommandsService(BaseService[HelpCommand]):
page=filters.page,
page_size=filters.page_size,
total_pages=0,
has_more=False,
has_more=False
)
# Extract response data
help_commands_data = data.get("help_commands", [])
total_count = data.get("total_count", 0)
total_pages = data.get("total_pages", 0)
has_more = data.get("has_more", False)
help_commands_data = data.get('help_commands', [])
total_count = data.get('total_count', 0)
total_pages = data.get('total_pages', 0)
has_more = data.get('has_more', False)
# Convert to HelpCommand objects
help_commands = []
@ -369,21 +356,15 @@ class HelpCommandsService(BaseService[HelpCommand]):
try:
help_commands.append(self.model_class.from_api_data(cmd_data))
except Exception as e:
self.logger.warning(
"Failed to create HelpCommand from API data",
help_id=cmd_data.get("id"),
error=e,
)
self.logger.warning("Failed to create HelpCommand from API data",
help_id=cmd_data.get('id'),
error=e)
continue
self.logger.debug(
"Help commands search completed",
total_results=total_count,
page=filters.page,
filters_applied=len(
[p for p in params if p[0] not in ["sort", "page", "page_size"]]
),
)
self.logger.debug("Help commands search completed",
total_results=total_count,
page=filters.page,
filters_applied=len([p for p in params if p[0] not in ['sort', 'page', 'page_size']]))
return HelpCommandSearchResult(
help_commands=help_commands,
@ -391,11 +372,13 @@ class HelpCommandsService(BaseService[HelpCommand]):
page=filters.page,
page_size=filters.page_size,
total_pages=total_pages,
has_more=has_more,
has_more=has_more
)
async def get_all_help_topics(
self, category: Optional[str] = None, include_inactive: bool = False
self,
category: Optional[str] = None,
include_inactive: bool = False
) -> List[HelpCommand]:
"""
Get all help topics, optionally filtered by category.
@ -410,36 +393,37 @@ class HelpCommandsService(BaseService[HelpCommand]):
params = []
if category:
params.append(("category", category))
params.append(('category', category))
params.append(("is_active", not include_inactive))
params.append(("sort", "display_order"))
params.append(("page_size", 100)) # Get all
params.append(('is_active', not include_inactive))
params.append(('sort', 'display_order'))
params.append(('page_size', 100)) # Get all
client = await self.get_client()
data = await client.get("help_commands", params=params)
data = await client.get('help_commands', params=params)
if not data:
return []
help_commands_data = data.get("help_commands", [])
help_commands_data = data.get('help_commands', [])
help_commands = []
for cmd_data in help_commands_data:
try:
help_commands.append(self.model_class.from_api_data(cmd_data))
except Exception as e:
self.logger.warning(
"Failed to create HelpCommand from API data",
help_id=cmd_data.get("id"),
error=e,
)
self.logger.warning("Failed to create HelpCommand from API data",
help_id=cmd_data.get('id'),
error=e)
continue
return help_commands
async def get_help_names_for_autocomplete(
self, partial_name: str = "", limit: int = 25, include_inactive: bool = False
self,
partial_name: str = "",
limit: int = 25,
include_inactive: bool = False
) -> List[str]:
"""
Get help command names for Discord autocomplete.
@ -455,28 +439,25 @@ class HelpCommandsService(BaseService[HelpCommand]):
try:
# Use the dedicated autocomplete endpoint
client = await self.get_client()
params = [("limit", limit)]
params = [('limit', limit)]
if partial_name:
params.append(("q", partial_name.lower()))
params.append(('q', partial_name.lower()))
result = await client.get("help_commands/autocomplete", params=params)
result = await client.get('help_commands/autocomplete', params=params)
# The autocomplete endpoint returns results with name, title, category
if isinstance(result, dict) and "results" in result:
return [item["name"] for item in result["results"]]
if isinstance(result, dict) and 'results' in result:
return [item['name'] for item in result['results']]
else:
self.logger.warning(
"Unexpected autocomplete response format", response=result
)
self.logger.warning("Unexpected autocomplete response format",
response=result)
return []
except Exception as e:
self.logger.error(
"Failed to get help names for autocomplete",
partial_name=partial_name,
error=e,
)
self.logger.error("Failed to get help names for autocomplete",
partial_name=partial_name,
error=e)
# Return empty list on error to not break Discord autocomplete
return []
@ -486,7 +467,7 @@ class HelpCommandsService(BaseService[HelpCommand]):
"""Get comprehensive statistics about help commands."""
try:
client = await self.get_client()
data = await client.get("help_commands/stats")
data = await client.get('help_commands/stats')
if not data:
return HelpCommandStats(
@ -494,25 +475,23 @@ class HelpCommandsService(BaseService[HelpCommand]):
active_commands=0,
total_views=0,
most_viewed_command=None,
recent_commands_count=0,
recent_commands_count=0
)
# Convert most_viewed_command if present
most_viewed = None
if data.get("most_viewed_command"):
if data.get('most_viewed_command'):
try:
most_viewed = self.model_class.from_api_data(
data["most_viewed_command"]
)
most_viewed = self.model_class.from_api_data(data['most_viewed_command'])
except Exception as e:
self.logger.warning("Failed to parse most viewed command", error=e)
return HelpCommandStats(
total_commands=data.get("total_commands", 0),
active_commands=data.get("active_commands", 0),
total_views=data.get("total_views", 0),
total_commands=data.get('total_commands', 0),
active_commands=data.get('active_commands', 0),
total_views=data.get('total_views', 0),
most_viewed_command=most_viewed,
recent_commands_count=data.get("recent_commands_count", 0),
recent_commands_count=data.get('recent_commands_count', 0)
)
except Exception as e:
@ -523,7 +502,7 @@ class HelpCommandsService(BaseService[HelpCommand]):
active_commands=0,
total_views=0,
most_viewed_command=None,
recent_commands_count=0,
recent_commands_count=0
)

View File

@ -3,6 +3,7 @@ League service for Discord Bot v2.0
Handles league-wide operations including current state, standings, and season information.
"""
import logging
from typing import Optional, List, Dict, Any
@ -10,25 +11,27 @@ from config import get_config
from services.base_service import BaseService
from models.current import Current
from exceptions import APIException
from utils.decorators import cached_single_item
logger = logging.getLogger(f'{__name__}.LeagueService')
logger = logging.getLogger(f"{__name__}.LeagueService")
class LeagueService(BaseService[Current]):
"""
Service for league-wide operations.
Features:
- Current league state retrieval
- Season standings
- League-wide statistics
"""
def __init__(self):
"""Initialize league service."""
super().__init__(Current, 'current')
super().__init__(Current, "current")
logger.debug("LeagueService initialized")
@cached_single_item(ttl=60)
async def get_current_state(self) -> Optional[Current]:
"""
Get the current league state including week, season, and settings.
@ -38,11 +41,13 @@ class LeagueService(BaseService[Current]):
"""
try:
client = await self.get_client()
data = await client.get('current')
data = await client.get("current")
if data:
current = Current.from_api_data(data)
logger.debug(f"Retrieved current state: Week {current.week}, Season {current.season}")
logger.debug(
f"Retrieved current state: Week {current.week}, Season {current.season}"
)
return current
logger.debug("No current state data found")
@ -53,9 +58,7 @@ class LeagueService(BaseService[Current]):
return None
async def update_current_state(
self,
week: Optional[int] = None,
freeze: Optional[bool] = None
self, week: Optional[int] = None, freeze: Optional[bool] = None
) -> Optional[Current]:
"""
Update current league state (week and/or freeze status).
@ -77,9 +80,9 @@ class LeagueService(BaseService[Current]):
# Build update data
update_data = {}
if week is not None:
update_data['week'] = week
update_data["week"] = week
if freeze is not None:
update_data['freeze'] = freeze
update_data["freeze"] = freeze
if not update_data:
logger.warning("update_current_state called with no updates")
@ -89,127 +92,152 @@ class LeagueService(BaseService[Current]):
# (Current table has one row per season, NOT a single row with id=1)
current = await self.get_current_state()
if not current:
logger.error("Cannot update current state - unable to fetch current state")
logger.error(
"Cannot update current state - unable to fetch current state"
)
return None
current_id = current.id
logger.debug(f"Updating current state id={current_id} (season {current.season})")
logger.debug(
f"Updating current state id={current_id} (season {current.season})"
)
# Use BaseService patch method
updated_current = await self.patch(current_id, update_data, use_query_params=True)
updated_current = await self.patch(
current_id, update_data, use_query_params=True
)
if updated_current:
logger.info(f"Updated current state id={current_id}: {update_data}")
return updated_current
else:
logger.error(f"Failed to update current state id={current_id} - patch returned None")
logger.error(
f"Failed to update current state id={current_id} - patch returned None"
)
return None
except Exception as e:
logger.error(f"Error updating current state: {e}")
raise APIException(f"Failed to update current state: {e}")
async def get_standings(self, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
async def get_standings(
self, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get league standings for a season.
Args:
season: Season number (defaults to current season)
Returns:
List of standings data or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
data = await client.get('standings', params=[('season', str(season))])
data = await client.get("standings", params=[("season", str(season))])
if data and isinstance(data, list):
logger.debug(f"Retrieved standings for season {season}: {len(data)} teams")
logger.debug(
f"Retrieved standings for season {season}: {len(data)} teams"
)
return data
elif data and isinstance(data, dict):
# Handle case where API returns a dict with standings array
standings_data = data.get('standings', data.get('items', []))
standings_data = data.get("standings", data.get("items", []))
if standings_data:
logger.debug(f"Retrieved standings for season {season}: {len(standings_data)} teams")
logger.debug(
f"Retrieved standings for season {season}: {len(standings_data)} teams"
)
return standings_data
logger.debug(f"No standings data found for season {season}")
return None
except Exception as e:
logger.error(f"Failed to get standings for season {season}: {e}")
return None
async def get_division_standings(self, division_id: int, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
async def get_division_standings(
self, division_id: int, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get standings for a specific division.
Args:
division_id: Division identifier
season: Season number (defaults to current season)
Returns:
List of division standings or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
data = await client.get(f'standings/division/{division_id}', params=[('season', str(season))])
data = await client.get(
f"standings/division/{division_id}", params=[("season", str(season))]
)
if data and isinstance(data, list):
logger.debug(f"Retrieved division {division_id} standings for season {season}: {len(data)} teams")
logger.debug(
f"Retrieved division {division_id} standings for season {season}: {len(data)} teams"
)
return data
logger.debug(f"No division standings found for division {division_id}, season {season}")
logger.debug(
f"No division standings found for division {division_id}, season {season}"
)
return None
except Exception as e:
logger.error(f"Failed to get division {division_id} standings: {e}")
return None
async def get_league_leaders(self, stat_type: str = 'batting', season: Optional[int] = None, limit: int = 10) -> Optional[List[Dict[str, Any]]]:
async def get_league_leaders(
self, stat_type: str = "batting", season: Optional[int] = None, limit: int = 10
) -> Optional[List[Dict[str, Any]]]:
"""
Get league leaders for a specific statistic category.
Args:
stat_type: Type of stats ('batting', 'pitching', 'fielding')
season: Season number (defaults to current season)
limit: Number of leaders to return
Returns:
List of league leaders or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
params = [
('season', str(season)),
('limit', str(limit))
]
data = await client.get(f'leaders/{stat_type}', params=params)
params = [("season", str(season)), ("limit", str(limit))]
data = await client.get(f"leaders/{stat_type}", params=params)
if data:
# Handle different response formats
if isinstance(data, list):
leaders = data
elif isinstance(data, dict):
leaders = data.get('leaders', data.get('items', data.get('results', [])))
leaders = data.get(
"leaders", data.get("items", data.get("results", []))
)
else:
leaders = []
logger.debug(f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players")
logger.debug(
f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players"
)
return leaders[:limit] # Ensure we don't exceed limit
logger.debug(f"No {stat_type} leaders found for season {season}")
return None
except Exception as e:
logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}")
return None
# Global service instance
league_service = LeagueService()
league_service = LeagueService()

View File

@ -11,6 +11,7 @@ from config import get_config
from services.base_service import BaseService
from models.player import Player
from exceptions import APIException
from utils.decorators import cached_api_call
if TYPE_CHECKING:
from services.team_service import TeamService
@ -270,6 +271,7 @@ class PlayerService(BaseService[Player]):
logger.error(f"Error in fuzzy search for '{query}': {e}")
return []
@cached_api_call(ttl=300)
async def get_free_agents(self, season: int) -> List[Player]:
"""
Get all free agent players.
@ -372,10 +374,7 @@ class PlayerService(BaseService[Player]):
return None
async def update_player_team(
self,
player_id: int,
new_team_id: int,
dem_week: Optional[int] = None
self, player_id: int, new_team_id: int, dem_week: Optional[int] = None
) -> Optional[Player]:
"""
Update a player's team assignment.

View File

@ -5,7 +5,8 @@ Handles team standings retrieval and processing.
"""
import logging
from typing import Optional, List, Dict
import time
from typing import Optional, List, Dict, Tuple
from api.client import get_global_client
from models.standings import TeamStandings
@ -13,6 +14,10 @@ from exceptions import APIException
logger = logging.getLogger(f"{__name__}.StandingsService")
# In-memory cache for standings: season -> (standings_list, cached_at)
_standings_cache: Dict[int, Tuple[List[TeamStandings], float]] = {}
_STANDINGS_CACHE_TTL = 600 # 10 minutes
class StandingsService:
"""
@ -45,6 +50,13 @@ class StandingsService:
List of TeamStandings ordered by record
"""
try:
# Check in-memory cache first
if season in _standings_cache:
cached_standings, cached_at = _standings_cache[season]
if time.time() - cached_at < _STANDINGS_CACHE_TTL:
logger.debug(f"Cache hit for standings season {season}")
return cached_standings
client = await self.get_client()
params = [("season", str(season))]
@ -72,6 +84,10 @@ class StandingsService:
logger.info(
f"Retrieved standings for {len(standings)} teams in season {season}"
)
# Cache the result
_standings_cache[season] = (standings, time.time())
return standings
except Exception as e:

View File

@ -277,35 +277,6 @@ class TransactionBuilder:
Returns:
Tuple of (success: bool, error_message: str). If success is True, error_message is empty.
"""
# Fetch current state once if needed by FA lock or pending-transaction check
is_fa_pickup = (
move.from_roster == RosterType.FREE_AGENCY
and move.to_roster != RosterType.FREE_AGENCY
)
needs_current_state = is_fa_pickup or (
check_pending_transactions and next_week is None
)
current_week: Optional[int] = None
if needs_current_state:
try:
current_state = await league_service.get_current_state()
current_week = current_state.week if current_state else 1
except Exception as e:
logger.warning(f"Could not get current week: {e}")
current_week = 1
# Block adding players FROM Free Agency after the FA lock deadline
if is_fa_pickup and current_week is not None:
config = get_config()
if current_week >= config.fa_lock_week:
error_msg = (
f"Free agency is closed (week {current_week}, deadline was week {config.fa_lock_week}). "
f"Cannot add {move.player.name} from FA."
)
logger.warning(error_msg)
return False, error_msg
# Check if player is already in a move in this transaction builder
existing_move = self.get_move_for_player(move.player.id)
if existing_move:
@ -328,15 +299,23 @@ class TransactionBuilder:
return False, error_msg
# Check if player is already in another team's pending transaction for next week
# This prevents duplicate claims that would need to be resolved at freeze time
# Only applies to /dropadd (scheduled moves), not /ilmove (immediate moves)
if check_pending_transactions:
if next_week is None:
next_week = (current_week + 1) if current_week else 1
try:
current_state = await league_service.get_current_state()
next_week = (current_state.week + 1) if current_state else 1
except Exception as e:
logger.warning(
f"Could not get current week for pending transaction check: {e}"
)
next_week = 1
(
is_pending,
claiming_team,
) = await transaction_service.is_player_in_pending_transaction(
player_id=move.player.id, week=next_week, season=self.season
is_pending, claiming_team = (
await transaction_service.is_player_in_pending_transaction(
player_id=move.player.id, week=next_week, season=self.season
)
)
if is_pending:

View File

@ -3,7 +3,6 @@ Tests for Help Commands Service in Discord Bot v2.0
Comprehensive tests for help commands CRUD operations and business logic.
"""
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock
@ -11,13 +10,13 @@ from unittest.mock import AsyncMock
from services.help_commands_service import (
HelpCommandsService,
HelpCommandNotFoundError,
HelpCommandExistsError,
HelpCommandExistsError
)
from models.help_command import (
HelpCommand,
HelpCommandSearchFilters,
HelpCommandSearchResult,
HelpCommandStats,
HelpCommandStats
)
@ -27,17 +26,17 @@ def sample_help_command() -> HelpCommand:
now = datetime.now(timezone.utc)
return HelpCommand(
id=1,
name="trading-rules",
title="Trading Rules & Guidelines",
content="Complete trading rules for the league...",
category="rules",
created_by_discord_id="123456789",
name='trading-rules',
title='Trading Rules & Guidelines',
content='Complete trading rules for the league...',
category='rules',
created_by_discord_id='123456789',
created_at=now,
updated_at=None,
last_modified_by=None,
is_active=True,
view_count=100,
display_order=10,
display_order=10
)
@ -65,7 +64,6 @@ class TestHelpCommandsServiceInit:
# Multiple imports should return the same instance
from services.help_commands_service import help_commands_service as service2
assert help_commands_service is service2
def test_service_has_required_methods(self):
@ -73,22 +71,22 @@ class TestHelpCommandsServiceInit:
from services.help_commands_service import help_commands_service
# Core CRUD operations
assert hasattr(help_commands_service, "create_help")
assert hasattr(help_commands_service, "get_help_by_name")
assert hasattr(help_commands_service, "update_help")
assert hasattr(help_commands_service, "delete_help")
assert hasattr(help_commands_service, "restore_help")
assert hasattr(help_commands_service, 'create_help')
assert hasattr(help_commands_service, 'get_help_by_name')
assert hasattr(help_commands_service, 'update_help')
assert hasattr(help_commands_service, 'delete_help')
assert hasattr(help_commands_service, 'restore_help')
# Search and listing
assert hasattr(help_commands_service, "search_help_commands")
assert hasattr(help_commands_service, "get_all_help_topics")
assert hasattr(help_commands_service, "get_help_names_for_autocomplete")
assert hasattr(help_commands_service, 'search_help_commands')
assert hasattr(help_commands_service, 'get_all_help_topics')
assert hasattr(help_commands_service, 'get_help_names_for_autocomplete')
# View tracking
assert hasattr(help_commands_service, "increment_view_count")
assert hasattr(help_commands_service, 'increment_view_count')
# Statistics
assert hasattr(help_commands_service, "get_statistics")
assert hasattr(help_commands_service, 'get_statistics')
class TestHelpCommandsServiceCRUD:
@ -120,7 +118,7 @@ class TestHelpCommandsServiceCRUD:
last_modified_by=None,
is_active=True,
view_count=0,
display_order=data.get("display_order", 0),
display_order=data.get("display_order", 0)
)
return created_help
@ -132,8 +130,8 @@ class TestHelpCommandsServiceCRUD:
name="test-topic",
title="Test Topic",
content="This is test content for the help topic.",
creator_discord_id="123456789",
category="info",
creator_discord_id='123456789',
category="info"
)
assert isinstance(result, HelpCommand)
@ -143,48 +141,39 @@ class TestHelpCommandsServiceCRUD:
assert result.view_count == 0
@pytest.mark.asyncio
async def test_create_help_already_exists(
self, help_commands_service_instance, sample_help_command
):
async def test_create_help_already_exists(self, help_commands_service_instance, sample_help_command):
"""Test help command creation when topic already exists."""
# Mock topic already exists
async def mock_get_help_by_name(*args, **kwargs):
return sample_help_command
help_commands_service_instance.get_help_by_name = mock_get_help_by_name
with pytest.raises(
HelpCommandExistsError, match="Help topic 'trading-rules' already exists"
):
with pytest.raises(HelpCommandExistsError, match="Help topic 'trading-rules' already exists"):
await help_commands_service_instance.create_help(
name="trading-rules",
title="Trading Rules",
content="Rules content",
creator_discord_id="123456789",
creator_discord_id='123456789'
)
@pytest.mark.asyncio
async def test_get_help_by_name_success(
self, help_commands_service_instance, sample_help_command
):
async def test_get_help_by_name_success(self, help_commands_service_instance, sample_help_command):
"""Test successful help command retrieval."""
# Mock the API client to return proper data structure
help_data = {
"id": sample_help_command.id,
"name": sample_help_command.name,
"title": sample_help_command.title,
"content": sample_help_command.content,
"category": sample_help_command.category,
"created_by_discord_id": sample_help_command.created_by_discord_id,
"created_at": sample_help_command.created_at.isoformat(),
"updated_at": sample_help_command.updated_at.isoformat()
if sample_help_command.updated_at
else None,
"last_modified_by": sample_help_command.last_modified_by,
"is_active": sample_help_command.is_active,
"view_count": sample_help_command.view_count,
"display_order": sample_help_command.display_order,
'id': sample_help_command.id,
'name': sample_help_command.name,
'title': sample_help_command.title,
'content': sample_help_command.content,
'category': sample_help_command.category,
'created_by_discord_id': sample_help_command.created_by_discord_id,
'created_at': sample_help_command.created_at.isoformat(),
'updated_at': sample_help_command.updated_at.isoformat() if sample_help_command.updated_at else None,
'last_modified_by': sample_help_command.last_modified_by,
'is_active': sample_help_command.is_active,
'view_count': sample_help_command.view_count,
'display_order': sample_help_command.display_order
}
help_commands_service_instance._client.get.return_value = help_data
@ -202,61 +191,66 @@ class TestHelpCommandsServiceCRUD:
# Mock the API client to return None (not found)
help_commands_service_instance._client.get.return_value = None
with pytest.raises(
HelpCommandNotFoundError, match="Help topic 'nonexistent' not found"
):
with pytest.raises(HelpCommandNotFoundError, match="Help topic 'nonexistent' not found"):
await help_commands_service_instance.get_help_by_name("nonexistent")
@pytest.mark.asyncio
async def test_update_help_success(
self, help_commands_service_instance, sample_help_command
):
async def test_update_help_success(self, help_commands_service_instance, sample_help_command):
"""Test successful help command update."""
# Mock getting the existing help command
async def mock_get_help_by_name(name, include_inactive=False):
if name == "trading-rules":
return sample_help_command
raise HelpCommandNotFoundError(f"Help topic '{name}' not found")
# Mock the API update call returning the updated help command data directly
updated_data = {
"id": sample_help_command.id,
"name": sample_help_command.name,
"title": "Updated Trading Rules",
"content": "Updated content",
"category": sample_help_command.category,
"created_by_discord_id": sample_help_command.created_by_discord_id,
"created_at": sample_help_command.created_at.isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"last_modified_by": "987654321",
"is_active": sample_help_command.is_active,
"view_count": sample_help_command.view_count,
"display_order": sample_help_command.display_order,
}
# Mock the API update call
async def mock_put(*args, **kwargs):
return updated_data
return True
help_commands_service_instance.get_help_by_name = mock_get_help_by_name
help_commands_service_instance._client.put = mock_put
# Update should call get_help_by_name again at the end, so mock it to return updated version
updated_help = HelpCommand(
id=sample_help_command.id,
name=sample_help_command.name,
title="Updated Trading Rules",
content="Updated content",
category=sample_help_command.category,
created_by_discord_id=sample_help_command.created_by_discord_id,
created_at=sample_help_command.created_at,
updated_at=datetime.now(timezone.utc),
last_modified_by='987654321',
is_active=sample_help_command.is_active,
view_count=sample_help_command.view_count,
display_order=sample_help_command.display_order
)
call_count = 0
async def mock_get_with_counter(name, include_inactive=False):
nonlocal call_count
call_count += 1
if call_count == 1:
return sample_help_command
else:
return updated_help
help_commands_service_instance.get_help_by_name = mock_get_with_counter
result = await help_commands_service_instance.update_help(
name="trading-rules",
new_title="Updated Trading Rules",
new_content="Updated content",
updater_discord_id="987654321",
updater_discord_id='987654321'
)
assert isinstance(result, HelpCommand)
assert result.title == "Updated Trading Rules"
@pytest.mark.asyncio
async def test_delete_help_success(
self, help_commands_service_instance, sample_help_command
):
async def test_delete_help_success(self, help_commands_service_instance, sample_help_command):
"""Test successful help command deletion (soft delete)."""
# Mock getting the help command
async def mock_get_help_by_name(name, include_inactive=False):
return sample_help_command
@ -278,12 +272,12 @@ class TestHelpCommandsServiceCRUD:
# Mock getting a deleted help command
deleted_help = HelpCommand(
id=1,
name="deleted-topic",
title="Deleted Topic",
content="Content",
created_by_discord_id="123456789",
name='deleted-topic',
title='Deleted Topic',
content='Content',
created_by_discord_id='123456789',
created_at=datetime.now(timezone.utc),
is_active=False,
is_active=False
)
async def mock_get_help_by_name(name, include_inactive=False):
@ -291,15 +285,15 @@ class TestHelpCommandsServiceCRUD:
# Mock the API restore call
restored_data = {
"id": deleted_help.id,
"name": deleted_help.name,
"title": deleted_help.title,
"content": deleted_help.content,
"created_by_discord_id": deleted_help.created_by_discord_id,
"created_at": deleted_help.created_at.isoformat(),
"is_active": True,
"view_count": 0,
"display_order": 0,
'id': deleted_help.id,
'name': deleted_help.name,
'title': deleted_help.title,
'content': deleted_help.content,
'created_by_discord_id': deleted_help.created_by_discord_id,
'created_at': deleted_help.created_at.isoformat(),
'is_active': True,
'view_count': 0,
'display_order': 0
}
help_commands_service_instance.get_help_by_name = mock_get_help_by_name
@ -318,30 +312,33 @@ class TestHelpCommandsServiceSearch:
async def test_search_help_commands(self, help_commands_service_instance):
"""Test searching for help commands with filters."""
filters = HelpCommandSearchFilters(
name_contains="trading", category="rules", page=1, page_size=10
name_contains='trading',
category='rules',
page=1,
page_size=10
)
# Mock API response
api_response = {
"help_commands": [
'help_commands': [
{
"id": 1,
"name": "trading-rules",
"title": "Trading Rules",
"content": "Content",
"category": "rules",
"created_by_discord_id": "123",
"created_at": datetime.now(timezone.utc).isoformat(),
"is_active": True,
"view_count": 100,
"display_order": 0,
'id': 1,
'name': 'trading-rules',
'title': 'Trading Rules',
'content': 'Content',
'category': 'rules',
'created_by_discord_id': '123',
'created_at': datetime.now(timezone.utc).isoformat(),
'is_active': True,
'view_count': 100,
'display_order': 0
}
],
"total_count": 1,
"page": 1,
"page_size": 10,
"total_pages": 1,
"has_more": False,
'total_count': 1,
'page': 1,
'page_size': 10,
'total_pages': 1,
'has_more': False
}
help_commands_service_instance._client.get.return_value = api_response
@ -351,33 +348,33 @@ class TestHelpCommandsServiceSearch:
assert isinstance(result, HelpCommandSearchResult)
assert len(result.help_commands) == 1
assert result.total_count == 1
assert result.help_commands[0].name == "trading-rules"
assert result.help_commands[0].name == 'trading-rules'
@pytest.mark.asyncio
async def test_get_all_help_topics(self, help_commands_service_instance):
"""Test getting all help topics."""
# Mock API response
api_response = {
"help_commands": [
'help_commands': [
{
"id": i,
"name": f"topic-{i}",
"title": f"Topic {i}",
"content": f"Content {i}",
"category": "rules" if i % 2 == 0 else "guides",
"created_by_discord_id": "123",
"created_at": datetime.now(timezone.utc).isoformat(),
"is_active": True,
"view_count": i * 10,
"display_order": i,
'id': i,
'name': f'topic-{i}',
'title': f'Topic {i}',
'content': f'Content {i}',
'category': 'rules' if i % 2 == 0 else 'guides',
'created_by_discord_id': '123',
'created_at': datetime.now(timezone.utc).isoformat(),
'is_active': True,
'view_count': i * 10,
'display_order': i
}
for i in range(1, 6)
],
"total_count": 5,
"page": 1,
"page_size": 100,
"total_pages": 1,
"has_more": False,
'total_count': 5,
'page': 1,
'page_size': 100,
'total_pages': 1,
'has_more': False
}
help_commands_service_instance._client.get.return_value = api_response
@ -389,45 +386,42 @@ class TestHelpCommandsServiceSearch:
assert all(isinstance(cmd, HelpCommand) for cmd in result)
@pytest.mark.asyncio
async def test_get_help_names_for_autocomplete(
self, help_commands_service_instance
):
async def test_get_help_names_for_autocomplete(self, help_commands_service_instance):
"""Test getting help names for autocomplete."""
# Mock API response
api_response = {
"results": [
'results': [
{
"name": "trading-rules",
"title": "Trading Rules",
"category": "rules",
'name': 'trading-rules',
'title': 'Trading Rules',
'category': 'rules'
},
{
"name": "trading-deadline",
"title": "Trading Deadline",
"category": "info",
},
'name': 'trading-deadline',
'title': 'Trading Deadline',
'category': 'info'
}
]
}
help_commands_service_instance._client.get.return_value = api_response
result = await help_commands_service_instance.get_help_names_for_autocomplete(
partial_name="trading", limit=25
partial_name='trading',
limit=25
)
assert isinstance(result, list)
assert len(result) == 2
assert "trading-rules" in result
assert "trading-deadline" in result
assert 'trading-rules' in result
assert 'trading-deadline' in result
class TestHelpCommandsServiceViewTracking:
"""Test view count tracking."""
@pytest.mark.asyncio
async def test_increment_view_count(
self, help_commands_service_instance, sample_help_command
):
async def test_increment_view_count(self, help_commands_service_instance, sample_help_command):
"""Test incrementing view count."""
# Mock the API patch call
help_commands_service_instance._client.patch = AsyncMock()
@ -443,7 +437,7 @@ class TestHelpCommandsServiceViewTracking:
created_at=sample_help_command.created_at,
is_active=sample_help_command.is_active,
view_count=sample_help_command.view_count + 1,
display_order=sample_help_command.display_order,
display_order=sample_help_command.display_order
)
async def mock_get_help_by_name(name, include_inactive=False):
@ -451,9 +445,7 @@ class TestHelpCommandsServiceViewTracking:
help_commands_service_instance.get_help_by_name = mock_get_help_by_name
result = await help_commands_service_instance.increment_view_count(
"trading-rules"
)
result = await help_commands_service_instance.increment_view_count("trading-rules")
assert isinstance(result, HelpCommand)
assert result.view_count == 101
@ -467,21 +459,21 @@ class TestHelpCommandsServiceStatistics:
"""Test getting help command statistics."""
# Mock API response
api_response = {
"total_commands": 50,
"active_commands": 45,
"total_views": 5000,
"most_viewed_command": {
"id": 1,
"name": "popular-topic",
"title": "Popular Topic",
"content": "Content",
"created_by_discord_id": "123",
"created_at": datetime.now(timezone.utc).isoformat(),
"is_active": True,
"view_count": 500,
"display_order": 0,
'total_commands': 50,
'active_commands': 45,
'total_views': 5000,
'most_viewed_command': {
'id': 1,
'name': 'popular-topic',
'title': 'Popular Topic',
'content': 'Content',
'created_by_discord_id': '123',
'created_at': datetime.now(timezone.utc).isoformat(),
'is_active': True,
'view_count': 500,
'display_order': 0
},
"recent_commands_count": 5,
'recent_commands_count': 5
}
help_commands_service_instance._client.get.return_value = api_response
@ -493,7 +485,7 @@ class TestHelpCommandsServiceStatistics:
assert result.active_commands == 45
assert result.total_views == 5000
assert result.most_viewed_command is not None
assert result.most_viewed_command.name == "popular-topic"
assert result.most_viewed_command.name == 'popular-topic'
assert result.recent_commands_count == 5
@ -506,9 +498,7 @@ class TestHelpCommandsServiceErrorHandling:
from exceptions import APIException, BotException
# Mock the API client to raise an APIException
help_commands_service_instance._client.get.side_effect = APIException(
"Connection error"
)
help_commands_service_instance._client.get.side_effect = APIException("Connection error")
with pytest.raises(BotException, match="Failed to retrieve help topic 'test'"):
await help_commands_service_instance.get_help_by_name("test")

View File

@ -115,13 +115,6 @@ class TestTransactionBuilder:
svc.get_current_roster.return_value = mock_roster
return svc
@pytest.fixture(autouse=True)
def mock_league_service(self):
"""Patch league_service for all tests so FA lock check uses week 10 (before deadline)."""
with patch("services.transaction_builder.league_service") as mock_ls:
mock_ls.get_current_state = AsyncMock(return_value=MagicMock(week=10))
yield mock_ls
@pytest.fixture
def builder(self, mock_team, mock_roster_service):
"""Create a TransactionBuilder for testing with injected roster service."""
@ -159,50 +152,6 @@ class TestTransactionBuilder:
assert builder.is_empty is False
assert move in builder.moves
@pytest.mark.asyncio
async def test_add_move_from_fa_blocked_after_deadline(self, builder, mock_player):
"""Test that adding a player FROM Free Agency is blocked after fa_lock_week."""
move = TransactionMove(
player=mock_player,
from_roster=RosterType.FREE_AGENCY,
to_roster=RosterType.MAJOR_LEAGUE,
to_team=builder.team,
)
with patch(
"services.transaction_builder.league_service"
) as mock_league_service:
mock_league_service.get_current_state = AsyncMock(
return_value=MagicMock(week=15)
)
success, error_message = await builder.add_move(
move, check_pending_transactions=False
)
assert success is False
assert "Free agency is closed" in error_message
assert builder.move_count == 0
@pytest.mark.asyncio
async def test_drop_to_fa_allowed_after_deadline(self, builder, mock_player):
"""Test that dropping a player TO Free Agency is still allowed after fa_lock_week."""
move = TransactionMove(
player=mock_player,
from_roster=RosterType.MAJOR_LEAGUE,
to_roster=RosterType.FREE_AGENCY,
from_team=builder.team,
)
# Drop to FA doesn't trigger the FA lock check (autouse fixture provides week 10)
success, error_message = await builder.add_move(
move, check_pending_transactions=False
)
assert success is True
assert error_message == ""
assert builder.move_count == 1
@pytest.mark.asyncio
async def test_add_duplicate_move_fails(self, builder, mock_player):
"""Test that adding duplicate moves for same player fails."""
@ -860,13 +809,6 @@ class TestPendingTransactionValidation:
"""Create a mock player for testing."""
return Player(id=12472, name="Test Player", wara=2.5, season=12, pos_1="OF")
@pytest.fixture(autouse=True)
def mock_league_service(self):
"""Patch league_service so FA lock check and week resolution use week 10."""
with patch("services.transaction_builder.league_service") as mock_ls:
mock_ls.get_current_state = AsyncMock(return_value=MagicMock(week=10))
yield mock_ls
@pytest.fixture
def builder(self, mock_team):
"""Create a TransactionBuilder for testing."""

View File

@ -344,7 +344,6 @@ class TradeAcceptanceView(discord.ui.View):
def __init__(self, builder: TradeBuilder):
super().__init__(timeout=3600.0) # 1 hour timeout
self.builder = builder
self._checked_teams: dict[int, Team] = {}
async def _get_user_team(self, interaction: discord.Interaction) -> Optional[Team]:
"""Get the team owned by the interacting user."""
@ -370,7 +369,6 @@ class TradeAcceptanceView(discord.ui.View):
)
return False
self._checked_teams[interaction.user.id] = user_team
return True
async def on_timeout(self) -> None:
@ -384,7 +382,7 @@ class TradeAcceptanceView(discord.ui.View):
self, interaction: discord.Interaction, button: discord.ui.Button
):
"""Handle accept button click."""
user_team = self._checked_teams.get(interaction.user.id)
user_team = await self._get_user_team(interaction)
if not user_team:
return
@ -419,7 +417,7 @@ class TradeAcceptanceView(discord.ui.View):
self, interaction: discord.Interaction, button: discord.ui.Button
):
"""Handle reject button click - moves trade back to DRAFT."""
user_team = self._checked_teams.get(interaction.user.id)
user_team = await self._get_user_team(interaction)
if not user_team:
return
@ -735,10 +733,10 @@ async def create_trade_embed(builder: TradeBuilder) -> discord.Embed:
Returns:
Discord embed with current trade state
"""
validation = await builder.validate_trade()
if builder.is_empty:
color = EmbedColors.SECONDARY
else:
validation = await builder.validate_trade()
color = EmbedColors.SUCCESS if validation.is_legal else EmbedColors.WARNING
embed = EmbedTemplate.create_base_embed(
@ -793,6 +791,7 @@ async def create_trade_embed(builder: TradeBuilder) -> discord.Embed:
inline=False,
)
validation = await builder.validate_trade()
if validation.is_legal:
status_text = "Trade appears legal"
else: