perf: add caching for frequently-accessed stable data (#91) #120

Open
Claude wants to merge 1 commits from ai/major-domo-v2-91 into main
4 changed files with 349 additions and 261 deletions

View File

@ -3,7 +3,10 @@ Dice Rolling Commands
Implements slash commands for dice rolling functionality required for gameplay.
"""
import random
import time
from typing import Dict, Tuple
import discord
from discord.ext import commands
@ -13,7 +16,11 @@ from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from utils.team_utils import get_user_major_league_team
from utils.text_utils import split_text_for_fields
from utils.dice_utils import DiceRoll, parse_and_roll_multiple_dice, parse_and_roll_single_dice
from utils.dice_utils import (
DiceRoll,
parse_and_roll_multiple_dice,
parse_and_roll_single_dice,
)
from views.embeds import EmbedColors, EmbedTemplate
from commands.dev.loaded_dice import get_and_consume_loaded_roll
from .chart_data import (
@ -33,26 +40,27 @@ from .chart_data import (
PITCHER_ERRORS,
)
# In-memory cache for channel embed color lookups: channel_id -> (color, cached_at)
_channel_color_cache: Dict[int, Tuple[int, float]] = {}
_CHANNEL_COLOR_CACHE_TTL = 300 # 5 minutes
class DiceRollCommands(commands.Cog):
"""Dice rolling command handlers for gameplay."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DiceRollCommands')
self.logger = get_contextual_logger(f"{__name__}.DiceRollCommands")
@discord.app_commands.command(
name="roll",
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)"
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)",
)
@discord.app_commands.describe(
dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)"
)
@logged_command("/roll")
async def roll_dice(
self,
interaction: discord.Interaction,
dice: str
):
async def roll_dice(self, interaction: discord.Interaction, dice: str):
"""Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon."""
await interaction.response.defer()
@ -61,7 +69,7 @@ class DiceRollCommands(commands.Cog):
if not roll_results:
await interaction.followup.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20",
ephemeral=True
ephemeral=True,
)
return
@ -72,18 +80,24 @@ class DiceRollCommands(commands.Cog):
@commands.command(name="roll", aliases=["r", "dice"])
async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None):
"""Roll dice using prefix commands (!roll, !r, !dice)."""
self.logger.info(f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice)
self.logger.info(
f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice
)
if dice is None:
self.logger.debug("No dice input provided")
await ctx.send("❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`")
await ctx.send(
"❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`"
)
return
# Parse and validate dice notation (supports multiple rolls)
roll_results = parse_and_roll_multiple_dice(dice)
if not roll_results:
self.logger.warning("Invalid dice notation provided", dice_input=dice)
await ctx.send("❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20")
await ctx.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20"
)
return
self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results))
@ -92,10 +106,7 @@ class DiceRollCommands(commands.Cog):
embed = self._create_multi_roll_embed(dice, roll_results, ctx.author)
await ctx.send(embed=embed)
@discord.app_commands.command(
name="d20",
description="Roll a single d20"
)
@discord.app_commands.command(name="d20", description="Roll a single d20")
@logged_command("/d20")
async def d20_dice(self, interaction: discord.Interaction):
"""Roll a single d20."""
@ -112,15 +123,14 @@ class DiceRollCommands(commands.Cog):
roll_results,
interaction.user,
set_author=False,
embed_color=embed_color
embed_color=embed_color,
)
embed.title = f'd20 roll for {interaction.user.display_name}'
embed.title = f"d20 roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="ab",
description="Roll baseball at-bat dice (1d6;2d6;1d20)"
name="ab", description="Roll baseball at-bat dice (1d6;2d6;1d20)"
)
@logged_command("/ab")
async def ab_dice(self, interaction: discord.Interaction):
@ -137,7 +147,7 @@ class DiceRollCommands(commands.Cog):
# Create DiceRoll objects from loaded values
# For 2d6, we split the total into two dice (arbitrary split that sums correctly)
d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6)
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
roll_results = [
DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1),
DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total),
@ -149,17 +159,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation)
injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12])
injury_risk = (roll_results[0].total == 6) and (
roll_results[1].total in [7, 8, 9, 10, 11, 12]
)
d6_total = roll_results[1].total
embed_title = 'At bat roll'
embed_title = "At bat roll"
if roll_results[2].total == 1:
embed_title = 'Wild pitch roll'
dice_notation = '1d20'
embed_title = "Wild pitch roll"
dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)]
elif roll_results[2].total == 2:
embed_title = 'PB roll'
dice_notation = '1d20'
embed_title = "PB roll"
dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)]
# Create embed for the roll results
@ -168,15 +180,15 @@ class DiceRollCommands(commands.Cog):
roll_results,
interaction.user,
set_author=False,
embed_color=embed_color
embed_color=embed_color,
)
embed.title = f'{embed_title} for {interaction.user.display_name}'
embed.title = f"{embed_title} for {interaction.user.display_name}"
if injury_risk and embed_title == 'At bat roll':
if injury_risk and embed_title == "At bat roll":
embed.add_field(
name=f'Check injury for pitcher injury rating {13 - d6_total}',
value='Oops! All injuries!',
inline=False
name=f"Check injury for pitcher injury rating {13 - d6_total}",
value="Oops! All injuries!",
inline=False,
)
await interaction.followup.send(embed=embed)
@ -188,35 +200,43 @@ class DiceRollCommands(commands.Cog):
team = await get_user_major_league_team(user_id=ctx.author.id)
embed_color = EmbedColors.PRIMARY
if team is not None and team.color is not None:
embed_color = int(team.color,16)
embed_color = int(team.color, 16)
# Use the standard baseball dice combination
dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("At Bat dice rolled successfully", roll_count=len(roll_results))
self.logger.info(
"At Bat dice rolled successfully", roll_count=len(roll_results)
)
# Create embed for the roll results
embed = self._create_multi_roll_embed(dice_notation, roll_results, ctx.author, set_author=False, embed_color=embed_color)
embed.title = f'At bat roll for {ctx.author.display_name}'
embed = self._create_multi_roll_embed(
dice_notation,
roll_results,
ctx.author,
set_author=False,
embed_color=embed_color,
)
embed.title = f"At bat roll for {ctx.author.display_name}"
await ctx.send(embed=embed)
@discord.app_commands.command(
name="scout",
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type"
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type",
)
@discord.app_commands.describe(
card_type="Type of card being scouted"
@discord.app_commands.describe(card_type="Type of card being scouted")
@discord.app_commands.choices(
card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher"),
]
)
@discord.app_commands.choices(card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher")
])
@logged_command("/scout")
async def scout_dice(
self,
interaction: discord.Interaction,
card_type: discord.app_commands.Choice[str]
card_type: discord.app_commands.Choice[str],
):
"""Roll weighted scouting dice based on card type (batter or pitcher)."""
await interaction.response.defer()
@ -228,33 +248,35 @@ class DiceRollCommands(commands.Cog):
roll_results = self._roll_weighted_scout_dice(card_type_value)
# Create embed for the roll results
embed = self._create_multi_roll_embed("1d6;2d6;1d20", roll_results, interaction.user, set_author=False)
embed.title = f'Scouting roll for {interaction.user.display_name}'
embed = self._create_multi_roll_embed(
"1d6;2d6;1d20", roll_results, interaction.user, set_author=False
)
embed.title = f"Scouting roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="fielding",
description="Roll Super Advanced fielding dice for a defensive position"
description="Roll Super Advanced fielding dice for a defensive position",
)
@discord.app_commands.describe(
position="Defensive position"
@discord.app_commands.describe(position="Defensive position")
@discord.app_commands.choices(
position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF"),
]
)
@discord.app_commands.choices(position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF")
])
@logged_command("/fielding")
async def fielding_roll(
self,
interaction: discord.Interaction,
position: discord.app_commands.Choice[str]
position: discord.app_commands.Choice[str],
):
"""Roll Super Advanced fielding dice for a defensive position."""
await interaction.response.defer()
@ -268,16 +290,25 @@ class DiceRollCommands(commands.Cog):
roll_results = parse_and_roll_multiple_dice(dice_notation)
# Create fielding embed
embed = self._create_fielding_embed(pos_value, roll_results, interaction.user, embed_color)
embed = self._create_fielding_embed(
pos_value, roll_results, interaction.user, embed_color
)
await interaction.followup.send(embed=embed)
@commands.command(name="f", aliases=["fielding", "saf"])
async def fielding_roll_prefix(self, ctx: commands.Context, position: str | None = None):
async def fielding_roll_prefix(
self, ctx: commands.Context, position: str | None = None
):
"""Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf)."""
self.logger.info(f"SA Fielding command started by {ctx.author.display_name}", position=position)
self.logger.info(
f"SA Fielding command started by {ctx.author.display_name}",
position=position,
)
if position is None:
await ctx.send("❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`")
await ctx.send(
"❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`"
)
return
# Parse and validate position
@ -290,15 +321,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d20;3d6;1d100"
roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("SA Fielding dice rolled successfully", position=parsed_position, d20=roll_results[0].total, d6_total=roll_results[1].total)
self.logger.info(
"SA Fielding dice rolled successfully",
position=parsed_position,
d20=roll_results[0].total,
d6_total=roll_results[1].total,
)
# Create fielding embed
embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author)
await ctx.send(embed=embed)
@discord.app_commands.command(
name="jump",
description="Roll for baserunner's jump before stealing"
name="jump", description="Roll for baserunner's jump before stealing"
)
@logged_command("/jump")
async def jump_dice(self, interaction: discord.Interaction):
@ -322,7 +357,7 @@ class DiceRollCommands(commands.Cog):
resolution_roll,
interaction.user,
embed_color,
show_author=False
show_author=False,
)
await interaction.followup.send(embed=embed)
@ -344,31 +379,47 @@ class DiceRollCommands(commands.Cog):
# Roll another 1d20 for pickoff/balk resolution
resolution_roll = random.randint(1, 20)
self.logger.info("Jump dice rolled successfully", check=check_roll, jump=jump_result.total if jump_result else None, resolution=resolution_roll)
self.logger.info(
"Jump dice rolled successfully",
check=check_roll,
jump=jump_result.total if jump_result else None,
resolution=resolution_roll,
)
# Create embed based on check roll
embed = self._create_jump_embed(
check_roll,
jump_result,
resolution_roll,
ctx.author,
embed_color
check_roll, jump_result, resolution_roll, ctx.author, embed_color
)
await ctx.send(embed=embed)
async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int:
channel_id = interaction.channel_id
if channel_id is not None and channel_id in _channel_color_cache:
cached_color, cached_at = _channel_color_cache[channel_id]
if time.time() - cached_at < _CHANNEL_COLOR_CACHE_TTL:
return cached_color
# Check if channel is a type that has a name attribute (DMChannel doesn't have one)
if isinstance(interaction.channel, (discord.TextChannel, discord.VoiceChannel, discord.Thread)):
if isinstance(
interaction.channel,
(discord.TextChannel, discord.VoiceChannel, discord.Thread),
):
channel_starter = interaction.channel.name[:6]
if '-' in channel_starter:
abbrev = channel_starter.split('-')[0]
if "-" in channel_starter:
abbrev = channel_starter.split("-")[0]
channel_team = await team_service.get_team_by_abbrev(abbrev)
if channel_team is not None and channel_team.color is not None:
return int(channel_team.color,16)
color = int(channel_team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
team = await get_user_major_league_team(user_id=interaction.user.id)
if team is not None and team.color is not None:
return int(team.color,16)
color = int(team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
return EmbedColors.PRIMARY
@ -381,25 +432,44 @@ class DiceRollCommands(commands.Cog):
# Map common inputs to standard position names
position_map = {
'P': 'P', 'PITCHER': 'P',
'C': 'C', 'CATCHER': 'C',
'1': '1B', '1B': '1B', 'FIRST': '1B', 'FIRSTBASE': '1B',
'2': '2B', '2B': '2B', 'SECOND': '2B', 'SECONDBASE': '2B',
'3': '3B', '3B': '3B', 'THIRD': '3B', 'THIRDBASE': '3B',
'SS': 'SS', 'SHORT': 'SS', 'SHORTSTOP': 'SS',
'LF': 'LF', 'LEFT': 'LF', 'LEFTFIELD': 'LF',
'CF': 'CF', 'CENTER': 'CF', 'CENTERFIELD': 'CF',
'RF': 'RF', 'RIGHT': 'RF', 'RIGHTFIELD': 'RF'
"P": "P",
"PITCHER": "P",
"C": "C",
"CATCHER": "C",
"1": "1B",
"1B": "1B",
"FIRST": "1B",
"FIRSTBASE": "1B",
"2": "2B",
"2B": "2B",
"SECOND": "2B",
"SECONDBASE": "2B",
"3": "3B",
"3B": "3B",
"THIRD": "3B",
"THIRDBASE": "3B",
"SS": "SS",
"SHORT": "SS",
"SHORTSTOP": "SS",
"LF": "LF",
"LEFT": "LF",
"LEFTFIELD": "LF",
"CF": "CF",
"CENTER": "CF",
"CENTERFIELD": "CF",
"RF": "RF",
"RIGHT": "RF",
"RIGHTFIELD": "RF",
}
return position_map.get(pos)
def _create_fielding_embed(
self,
position: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY
self,
position: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed:
"""Create an embed for fielding roll results."""
d20_result = roll_results[0].total
@ -409,15 +479,11 @@ class DiceRollCommands(commands.Cog):
# Create base embed
embed = EmbedTemplate.create_base_embed(
title=f"SA Fielding roll for {user.display_name}",
color=embed_color
title=f"SA Fielding roll for {user.display_name}", color=embed_color
)
# Set user info
embed.set_author(
name=user.display_name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
# Add dice results in standard format
dice_notation = "1d20;3d6"
@ -425,18 +491,14 @@ class DiceRollCommands(commands.Cog):
# Extract just the dice result part from the field
dice_field_value = embed_dice.fields[0].value
embed.add_field(
name="Dice Results",
value=dice_field_value,
inline=False
)
embed.add_field(name="Dice Results", value=dice_field_value, inline=False)
# Add fielding check summary
range_result = self._get_range_result(position, d20_result)
embed.add_field(
name=f"{position} Range Result",
value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```",
inline=False
inline=False,
)
# Add rare play or error result
@ -457,19 +519,15 @@ class DiceRollCommands(commands.Cog):
field_name = base_field_name
# Add part indicator if multiple chunks
if len(result_chunks) > 1:
field_name += f" (Part {i+1}/{len(result_chunks)})"
field_name += f" (Part {i + 1}/{len(result_chunks)})"
embed.add_field(
name=field_name,
value=chunk,
inline=False
)
embed.add_field(name=field_name, value=chunk, inline=False)
# Add help commands
embed.add_field(
name="Help Commands",
value="Run /charts for full chart readout",
inline=False
inline=False,
)
# # Add references
@ -488,125 +546,115 @@ class DiceRollCommands(commands.Cog):
resolution_roll: int,
user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY,
show_author: bool = True
show_author: bool = True,
) -> discord.Embed:
"""Create an embed for jump roll results."""
# Create base embed
embed = EmbedTemplate.create_base_embed(
title=f"Jump roll for {user.name}",
color=embed_color
title=f"Jump roll for {user.name}", color=embed_color
)
if show_author:
# Set user info
embed.set_author(
name=user.name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
# Check for pickoff or balk
if check_roll == 1:
# Pickoff attempt
embed.add_field(
name="Special",
value="```md\nCheck pickoff```",
inline=False
name="Special", value="```md\nCheck pickoff```", inline=False
)
embed.add_field(
name="Pickoff roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False
inline=False,
)
elif check_roll == 2:
# Balk
embed.add_field(
name="Special",
value="```md\nCheck balk```",
inline=False
)
embed.add_field(name="Special", value="```md\nCheck balk```", inline=False)
embed.add_field(
name="Balk roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False
inline=False,
)
else:
# Normal jump - show 2d6 result
if jump_result:
rolls_str = ' '.join(str(r) for r in jump_result.rolls)
rolls_str = " ".join(str(r) for r in jump_result.rolls)
embed.add_field(
name="Result",
value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```",
inline=False
inline=False,
)
return embed
def _get_range_result(self, position: str, d20_roll: int) -> str:
"""Get the range result display for a position and d20 roll."""
if position == 'P':
if position == "P":
return self._get_pitcher_range(d20_roll)
elif position in ['1B', '2B', '3B', 'SS']:
elif position in ["1B", "2B", "3B", "SS"]:
return self._get_infield_range(d20_roll)
elif position in ['LF', 'CF', 'RF']:
elif position in ["LF", "CF", "RF"]:
return self._get_outfield_range(d20_roll)
elif position == 'C':
elif position == "C":
return self._get_catcher_range(d20_roll)
return "Unknown position"
def _get_infield_range(self, d20_roll: int) -> str:
"""Get infield range result based on d20 roll."""
return INFIELD_RANGES.get(d20_roll, 'Unknown')
return INFIELD_RANGES.get(d20_roll, "Unknown")
def _get_outfield_range(self, d20_roll: int) -> str:
"""Get outfield range result based on d20 roll."""
return OUTFIELD_RANGES.get(d20_roll, 'Unknown')
return OUTFIELD_RANGES.get(d20_roll, "Unknown")
def _get_catcher_range(self, d20_roll: int) -> str:
"""Get catcher range result based on d20 roll."""
return CATCHER_RANGES.get(d20_roll, 'Unknown')
return CATCHER_RANGES.get(d20_roll, "Unknown")
def _get_pitcher_range(self, d20_roll: int) -> str:
"""Get pitcher range result based on d20 roll."""
return PITCHER_RANGES.get(d20_roll, 'Unknown')
return PITCHER_RANGES.get(d20_roll, "Unknown")
def _get_rare_play(self, position: str, d20_total: int) -> str:
"""Get the rare play result for a position and d20 total"""
starter = 'Rare play! Take the range result from above and consult the chart below.\n\n'
if position == 'P':
starter = "Rare play! Take the range result from above and consult the chart below.\n\n"
if position == "P":
return starter + self._get_pitcher_rare_play(d20_total)
elif position == '1B':
elif position == "1B":
return starter + self._get_infield_rare_play(d20_total)
elif position == '2B':
elif position == "2B":
return starter + self._get_infield_rare_play(d20_total)
elif position == '3B':
elif position == "3B":
return starter + self._get_infield_rare_play(d20_total)
elif position == 'SS':
elif position == "SS":
return starter + self._get_infield_rare_play(d20_total)
elif position in ['LF', 'RF']:
elif position in ["LF", "RF"]:
return starter + self._get_outfield_rare_play(d20_total)
elif position == 'CF':
elif position == "CF":
return starter + self._get_outfield_rare_play(d20_total)
raise ValueError(f'Unknown position: {position}')
raise ValueError(f"Unknown position: {position}")
def _get_error_result(self, position: str, d6_total: int) -> str:
"""Get the error result for a position and 3d6 total."""
# Get the appropriate error chart
if position == 'P':
if position == "P":
return self._get_pitcher_error(d6_total)
elif position == '1B':
elif position == "1B":
return self._get_1b_error(d6_total)
elif position == '2B':
elif position == "2B":
return self._get_2b_error(d6_total)
elif position == '3B':
elif position == "3B":
return self._get_3b_error(d6_total)
elif position == 'SS':
elif position == "SS":
return self._get_ss_error(d6_total)
elif position in ['LF', 'RF']:
elif position in ["LF", "RF"]:
return self._get_corner_of_error(d6_total)
elif position == 'CF':
elif position == "CF":
return self._get_cf_error(d6_total)
elif position == 'C':
elif position == "C":
return self._get_catcher_error(d6_total)
# Should never reach here due to position validation, but follow "Raise or Return" pattern
@ -614,65 +662,64 @@ class DiceRollCommands(commands.Cog):
def _get_3b_error(self, d6_total: int) -> str:
"""Get 3B error result based on 3d6 total."""
return THIRD_BASE_ERRORS.get(d6_total, 'No error')
return THIRD_BASE_ERRORS.get(d6_total, "No error")
def _get_1b_error(self, d6_total: int) -> str:
"""Get 1B error result based on 3d6 total."""
return FIRST_BASE_ERRORS.get(d6_total, 'No error')
return FIRST_BASE_ERRORS.get(d6_total, "No error")
def _get_2b_error(self, d6_total: int) -> str:
"""Get 2B error result based on 3d6 total."""
return SECOND_BASE_ERRORS.get(d6_total, 'No error')
return SECOND_BASE_ERRORS.get(d6_total, "No error")
def _get_ss_error(self, d6_total: int) -> str:
"""Get SS error result based on 3d6 total."""
return SHORTSTOP_ERRORS.get(d6_total, 'No error')
return SHORTSTOP_ERRORS.get(d6_total, "No error")
def _get_corner_of_error(self, d6_total: int) -> str:
"""Get LF/RF error result based on 3d6 total."""
return CORNER_OUTFIELD_ERRORS.get(d6_total, 'No error')
return CORNER_OUTFIELD_ERRORS.get(d6_total, "No error")
def _get_cf_error(self, d6_total: int) -> str:
"""Get CF error result based on 3d6 total."""
return CENTER_FIELD_ERRORS.get(d6_total, 'No error')
return CENTER_FIELD_ERRORS.get(d6_total, "No error")
def _get_catcher_error(self, d6_total: int) -> str:
"""Get Catcher error result based on 3d6 total."""
return CATCHER_ERRORS.get(d6_total, 'No error')
return CATCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_error(self, d6_total: int) -> str:
"""Get Pitcher error result based on 3d6 total."""
return PITCHER_ERRORS.get(d6_total, 'No error')
return PITCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_rare_play(self, d20_total: int) -> str:
return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
)
def _get_infield_rare_play(self, d20_total: int) -> str:
return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
)
def _get_outfield_rare_play(self, d20_total: int) -> str:
return (
f'**F1**: {OUTFIELD_X_CHART["f1"]["rp"]}\n'
f'**F2**: {OUTFIELD_X_CHART["f2"]["rp"]}\n'
f'**F3**: {OUTFIELD_X_CHART["f3"]["rp"]}\n'
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
f'**DO2**: {OUTFIELD_X_CHART["do2"]["rp"]}\n'
f'**DO3**: {OUTFIELD_X_CHART["do3"]["rp"]}\n'
f'**TR3**: {OUTFIELD_X_CHART["tr3"]["rp"]}\n'
f"**F1**: {OUTFIELD_X_CHART['f1']['rp']}\n"
f"**F2**: {OUTFIELD_X_CHART['f2']['rp']}\n"
f"**F3**: {OUTFIELD_X_CHART['f3']['rp']}\n"
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
f"**DO2**: {OUTFIELD_X_CHART['do2']['rp']}\n"
f"**DO3**: {OUTFIELD_X_CHART['do3']['rp']}\n"
f"**TR3**: {OUTFIELD_X_CHART['tr3']['rp']}\n"
)
def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]:
"""
Roll scouting dice with weighted first d6 based on card type.
@ -690,11 +737,11 @@ class DiceRollCommands(commands.Cog):
first_roll = random.randint(4, 6)
first_d6_result = DiceRoll(
dice_notation='1d6',
dice_notation="1d6",
num_dice=1,
die_sides=6,
rolls=[first_roll],
total=first_roll
total=first_roll,
)
# Second roll (2d6) - normal
@ -705,19 +752,20 @@ class DiceRollCommands(commands.Cog):
return [first_d6_result, second_result, third_result]
def _create_multi_roll_embed(self, dice_notation: str, roll_results: list[DiceRoll], user: discord.User | discord.Member, set_author: bool = True, embed_color: int = EmbedColors.PRIMARY) -> discord.Embed:
def _create_multi_roll_embed(
self,
dice_notation: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
set_author: bool = True,
embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed:
"""Create an embed for multiple dice roll results."""
embed = EmbedTemplate.create_base_embed(
title="🎲 Dice Roll",
color=embed_color
)
embed = EmbedTemplate.create_base_embed(title="🎲 Dice Roll", color=embed_color)
if set_author:
# Set user info
embed.set_author(
name=user.name,
icon_url=user.display_avatar.url
)
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
# Create summary line with totals
totals = [str(result.total) for result in roll_results]
@ -735,19 +783,16 @@ class DiceRollCommands(commands.Cog):
roll_groups.append(str(rolls[0]))
else:
# Multiple dice: space-separated within the group
roll_groups.append(' '.join(str(r) for r in rolls))
roll_groups.append(" ".join(str(r) for r in rolls))
details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]"
# Set as description
embed.add_field(
name='Result',
value=f"```md\n{summary}\n{details}```"
)
embed.add_field(name="Result", value=f"```md\n{summary}\n{details}```")
return embed
async def setup(bot: commands.Bot):
"""Load the dice roll commands cog."""
await bot.add_cog(DiceRollCommands(bot))
await bot.add_cog(DiceRollCommands(bot))

View File

@ -3,6 +3,7 @@ League service for Discord Bot v2.0
Handles league-wide operations including current state, standings, and season information.
"""
import logging
from typing import Optional, List, Dict, Any
@ -10,25 +11,27 @@ from config import get_config
from services.base_service import BaseService
from models.current import Current
from exceptions import APIException
from utils.decorators import cached_single_item
logger = logging.getLogger(f'{__name__}.LeagueService')
logger = logging.getLogger(f"{__name__}.LeagueService")
class LeagueService(BaseService[Current]):
"""
Service for league-wide operations.
Features:
- Current league state retrieval
- Season standings
- League-wide statistics
"""
def __init__(self):
"""Initialize league service."""
super().__init__(Current, 'current')
super().__init__(Current, "current")
logger.debug("LeagueService initialized")
@cached_single_item(ttl=60)
async def get_current_state(self) -> Optional[Current]:
"""
Get the current league state including week, season, and settings.
@ -38,11 +41,13 @@ class LeagueService(BaseService[Current]):
"""
try:
client = await self.get_client()
data = await client.get('current')
data = await client.get("current")
if data:
current = Current.from_api_data(data)
logger.debug(f"Retrieved current state: Week {current.week}, Season {current.season}")
logger.debug(
f"Retrieved current state: Week {current.week}, Season {current.season}"
)
return current
logger.debug("No current state data found")
@ -53,9 +58,7 @@ class LeagueService(BaseService[Current]):
return None
async def update_current_state(
self,
week: Optional[int] = None,
freeze: Optional[bool] = None
self, week: Optional[int] = None, freeze: Optional[bool] = None
) -> Optional[Current]:
"""
Update current league state (week and/or freeze status).
@ -77,9 +80,9 @@ class LeagueService(BaseService[Current]):
# Build update data
update_data = {}
if week is not None:
update_data['week'] = week
update_data["week"] = week
if freeze is not None:
update_data['freeze'] = freeze
update_data["freeze"] = freeze
if not update_data:
logger.warning("update_current_state called with no updates")
@ -89,127 +92,152 @@ class LeagueService(BaseService[Current]):
# (Current table has one row per season, NOT a single row with id=1)
current = await self.get_current_state()
if not current:
logger.error("Cannot update current state - unable to fetch current state")
logger.error(
"Cannot update current state - unable to fetch current state"
)
return None
current_id = current.id
logger.debug(f"Updating current state id={current_id} (season {current.season})")
logger.debug(
f"Updating current state id={current_id} (season {current.season})"
)
# Use BaseService patch method
updated_current = await self.patch(current_id, update_data, use_query_params=True)
updated_current = await self.patch(
current_id, update_data, use_query_params=True
)
if updated_current:
logger.info(f"Updated current state id={current_id}: {update_data}")
return updated_current
else:
logger.error(f"Failed to update current state id={current_id} - patch returned None")
logger.error(
f"Failed to update current state id={current_id} - patch returned None"
)
return None
except Exception as e:
logger.error(f"Error updating current state: {e}")
raise APIException(f"Failed to update current state: {e}")
async def get_standings(self, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
async def get_standings(
self, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get league standings for a season.
Args:
season: Season number (defaults to current season)
Returns:
List of standings data or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
data = await client.get('standings', params=[('season', str(season))])
data = await client.get("standings", params=[("season", str(season))])
if data and isinstance(data, list):
logger.debug(f"Retrieved standings for season {season}: {len(data)} teams")
logger.debug(
f"Retrieved standings for season {season}: {len(data)} teams"
)
return data
elif data and isinstance(data, dict):
# Handle case where API returns a dict with standings array
standings_data = data.get('standings', data.get('items', []))
standings_data = data.get("standings", data.get("items", []))
if standings_data:
logger.debug(f"Retrieved standings for season {season}: {len(standings_data)} teams")
logger.debug(
f"Retrieved standings for season {season}: {len(standings_data)} teams"
)
return standings_data
logger.debug(f"No standings data found for season {season}")
return None
except Exception as e:
logger.error(f"Failed to get standings for season {season}: {e}")
return None
async def get_division_standings(self, division_id: int, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
async def get_division_standings(
self, division_id: int, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
"""
Get standings for a specific division.
Args:
division_id: Division identifier
season: Season number (defaults to current season)
Returns:
List of division standings or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
data = await client.get(f'standings/division/{division_id}', params=[('season', str(season))])
data = await client.get(
f"standings/division/{division_id}", params=[("season", str(season))]
)
if data and isinstance(data, list):
logger.debug(f"Retrieved division {division_id} standings for season {season}: {len(data)} teams")
logger.debug(
f"Retrieved division {division_id} standings for season {season}: {len(data)} teams"
)
return data
logger.debug(f"No division standings found for division {division_id}, season {season}")
logger.debug(
f"No division standings found for division {division_id}, season {season}"
)
return None
except Exception as e:
logger.error(f"Failed to get division {division_id} standings: {e}")
return None
async def get_league_leaders(self, stat_type: str = 'batting', season: Optional[int] = None, limit: int = 10) -> Optional[List[Dict[str, Any]]]:
async def get_league_leaders(
self, stat_type: str = "batting", season: Optional[int] = None, limit: int = 10
) -> Optional[List[Dict[str, Any]]]:
"""
Get league leaders for a specific statistic category.
Args:
stat_type: Type of stats ('batting', 'pitching', 'fielding')
season: Season number (defaults to current season)
limit: Number of leaders to return
Returns:
List of league leaders or None if not available
"""
try:
season = season or get_config().sba_season
client = await self.get_client()
params = [
('season', str(season)),
('limit', str(limit))
]
data = await client.get(f'leaders/{stat_type}', params=params)
params = [("season", str(season)), ("limit", str(limit))]
data = await client.get(f"leaders/{stat_type}", params=params)
if data:
# Handle different response formats
if isinstance(data, list):
leaders = data
elif isinstance(data, dict):
leaders = data.get('leaders', data.get('items', data.get('results', [])))
leaders = data.get(
"leaders", data.get("items", data.get("results", []))
)
else:
leaders = []
logger.debug(f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players")
logger.debug(
f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players"
)
return leaders[:limit] # Ensure we don't exceed limit
logger.debug(f"No {stat_type} leaders found for season {season}")
return None
except Exception as e:
logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}")
return None
# Global service instance
league_service = LeagueService()
league_service = LeagueService()

View File

@ -11,6 +11,7 @@ from config import get_config
from services.base_service import BaseService
from models.player import Player
from exceptions import APIException
from utils.decorators import cached_api_call
if TYPE_CHECKING:
from services.team_service import TeamService
@ -270,6 +271,7 @@ class PlayerService(BaseService[Player]):
logger.error(f"Error in fuzzy search for '{query}': {e}")
return []
@cached_api_call(ttl=300)
async def get_free_agents(self, season: int) -> List[Player]:
"""
Get all free agent players.
@ -372,10 +374,7 @@ class PlayerService(BaseService[Player]):
return None
async def update_player_team(
self,
player_id: int,
new_team_id: int,
dem_week: Optional[int] = None
self, player_id: int, new_team_id: int, dem_week: Optional[int] = None
) -> Optional[Player]:
"""
Update a player's team assignment.

View File

@ -5,7 +5,8 @@ Handles team standings retrieval and processing.
"""
import logging
from typing import Optional, List, Dict
import time
from typing import Optional, List, Dict, Tuple
from api.client import get_global_client
from models.standings import TeamStandings
@ -13,6 +14,10 @@ from exceptions import APIException
logger = logging.getLogger(f"{__name__}.StandingsService")
# In-memory cache for standings: season -> (standings_list, cached_at)
_standings_cache: Dict[int, Tuple[List[TeamStandings], float]] = {}
_STANDINGS_CACHE_TTL = 600 # 10 minutes
class StandingsService:
"""
@ -45,6 +50,13 @@ class StandingsService:
List of TeamStandings ordered by record
"""
try:
# Check in-memory cache first
if season in _standings_cache:
cached_standings, cached_at = _standings_cache[season]
if time.time() - cached_at < _STANDINGS_CACHE_TTL:
logger.debug(f"Cache hit for standings season {season}")
return cached_standings
client = await self.get_client()
params = [("season", str(season))]
@ -72,6 +84,10 @@ class StandingsService:
logger.info(
f"Retrieved standings for {len(standings)} teams in season {season}"
)
# Cache the result
_standings_cache[season] = (standings, time.time())
return standings
except Exception as e: