From 61f36353d82936da9115c9fa93e27e66b32171c0 Mon Sep 17 00:00:00 2001 From: Cal Corum Date: Sat, 21 Mar 2026 07:35:40 -0500 Subject: [PATCH] perf: add caching for frequently-accessed stable data (#91) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #91 - league_service.get_current_state(): @cached_single_item(ttl=60) — 60s Redis cache - standings_service.get_league_standings(): in-memory dict cache with 10-minute TTL keyed by season - player_service.get_free_agents(): @cached_api_call(ttl=300) — 5-minute Redis cache - dice/rolls.py _get_channel_embed_color(): in-memory dict cache keyed by channel_id with 5-minute TTL, matching the autocomplete.py pattern from PR #100 Co-Authored-By: Claude Sonnet 4.6 --- commands/dice/rolls.py | 449 +++++++++++++++++++--------------- services/league_service.py | 136 ++++++---- services/player_service.py | 7 +- services/standings_service.py | 18 +- 4 files changed, 349 insertions(+), 261 deletions(-) diff --git a/commands/dice/rolls.py b/commands/dice/rolls.py index b974cbe..1b4139c 100644 --- a/commands/dice/rolls.py +++ b/commands/dice/rolls.py @@ -3,7 +3,10 @@ Dice Rolling Commands Implements slash commands for dice rolling functionality required for gameplay. """ + import random +import time +from typing import Dict, Tuple import discord from discord.ext import commands @@ -13,7 +16,11 @@ from utils.logging import get_contextual_logger from utils.decorators import logged_command from utils.team_utils import get_user_major_league_team from utils.text_utils import split_text_for_fields -from utils.dice_utils import DiceRoll, parse_and_roll_multiple_dice, parse_and_roll_single_dice +from utils.dice_utils import ( + DiceRoll, + parse_and_roll_multiple_dice, + parse_and_roll_single_dice, +) from views.embeds import EmbedColors, EmbedTemplate from commands.dev.loaded_dice import get_and_consume_loaded_roll from .chart_data import ( @@ -33,26 +40,27 @@ from .chart_data import ( PITCHER_ERRORS, ) +# In-memory cache for channel embed color lookups: channel_id -> (color, cached_at) +_channel_color_cache: Dict[int, Tuple[int, float]] = {} +_CHANNEL_COLOR_CACHE_TTL = 300 # 5 minutes + + class DiceRollCommands(commands.Cog): """Dice rolling command handlers for gameplay.""" def __init__(self, bot: commands.Bot): self.bot = bot - self.logger = get_contextual_logger(f'{__name__}.DiceRollCommands') + self.logger = get_contextual_logger(f"{__name__}.DiceRollCommands") @discord.app_commands.command( name="roll", - description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)" + description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)", ) @discord.app_commands.describe( dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)" ) @logged_command("/roll") - async def roll_dice( - self, - interaction: discord.Interaction, - dice: str - ): + async def roll_dice(self, interaction: discord.Interaction, dice: str): """Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon.""" await interaction.response.defer() @@ -61,7 +69,7 @@ class DiceRollCommands(commands.Cog): if not roll_results: await interaction.followup.send( "❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20", - ephemeral=True + ephemeral=True, ) return @@ -72,18 +80,24 @@ class DiceRollCommands(commands.Cog): @commands.command(name="roll", aliases=["r", "dice"]) async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None): """Roll dice using prefix commands (!roll, !r, !dice).""" - self.logger.info(f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice) + self.logger.info( + f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice + ) if dice is None: self.logger.debug("No dice input provided") - await ctx.send("❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`") + await ctx.send( + "❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`" + ) return # Parse and validate dice notation (supports multiple rolls) roll_results = parse_and_roll_multiple_dice(dice) if not roll_results: self.logger.warning("Invalid dice notation provided", dice_input=dice) - await ctx.send("❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20") + await ctx.send( + "❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20" + ) return self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results)) @@ -92,10 +106,7 @@ class DiceRollCommands(commands.Cog): embed = self._create_multi_roll_embed(dice, roll_results, ctx.author) await ctx.send(embed=embed) - @discord.app_commands.command( - name="d20", - description="Roll a single d20" - ) + @discord.app_commands.command(name="d20", description="Roll a single d20") @logged_command("/d20") async def d20_dice(self, interaction: discord.Interaction): """Roll a single d20.""" @@ -112,15 +123,14 @@ class DiceRollCommands(commands.Cog): roll_results, interaction.user, set_author=False, - embed_color=embed_color + embed_color=embed_color, ) - embed.title = f'd20 roll for {interaction.user.display_name}' + embed.title = f"d20 roll for {interaction.user.display_name}" await interaction.followup.send(embed=embed) @discord.app_commands.command( - name="ab", - description="Roll baseball at-bat dice (1d6;2d6;1d20)" + name="ab", description="Roll baseball at-bat dice (1d6;2d6;1d20)" ) @logged_command("/ab") async def ab_dice(self, interaction: discord.Interaction): @@ -137,7 +147,7 @@ class DiceRollCommands(commands.Cog): # Create DiceRoll objects from loaded values # For 2d6, we split the total into two dice (arbitrary split that sums correctly) d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6) - d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder + d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder roll_results = [ DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1), DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total), @@ -149,17 +159,19 @@ class DiceRollCommands(commands.Cog): dice_notation = "1d6;2d6;1d20" roll_results = parse_and_roll_multiple_dice(dice_notation) - injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12]) + injury_risk = (roll_results[0].total == 6) and ( + roll_results[1].total in [7, 8, 9, 10, 11, 12] + ) d6_total = roll_results[1].total - embed_title = 'At bat roll' + embed_title = "At bat roll" if roll_results[2].total == 1: - embed_title = 'Wild pitch roll' - dice_notation = '1d20' + embed_title = "Wild pitch roll" + dice_notation = "1d20" roll_results = [parse_and_roll_single_dice(dice_notation)] elif roll_results[2].total == 2: - embed_title = 'PB roll' - dice_notation = '1d20' + embed_title = "PB roll" + dice_notation = "1d20" roll_results = [parse_and_roll_single_dice(dice_notation)] # Create embed for the roll results @@ -168,15 +180,15 @@ class DiceRollCommands(commands.Cog): roll_results, interaction.user, set_author=False, - embed_color=embed_color + embed_color=embed_color, ) - embed.title = f'{embed_title} for {interaction.user.display_name}' + embed.title = f"{embed_title} for {interaction.user.display_name}" - if injury_risk and embed_title == 'At bat roll': + if injury_risk and embed_title == "At bat roll": embed.add_field( - name=f'Check injury for pitcher injury rating {13 - d6_total}', - value='Oops! All injuries!', - inline=False + name=f"Check injury for pitcher injury rating {13 - d6_total}", + value="Oops! All injuries!", + inline=False, ) await interaction.followup.send(embed=embed) @@ -188,35 +200,43 @@ class DiceRollCommands(commands.Cog): team = await get_user_major_league_team(user_id=ctx.author.id) embed_color = EmbedColors.PRIMARY if team is not None and team.color is not None: - embed_color = int(team.color,16) + embed_color = int(team.color, 16) # Use the standard baseball dice combination dice_notation = "1d6;2d6;1d20" roll_results = parse_and_roll_multiple_dice(dice_notation) - self.logger.info("At Bat dice rolled successfully", roll_count=len(roll_results)) + self.logger.info( + "At Bat dice rolled successfully", roll_count=len(roll_results) + ) # Create embed for the roll results - embed = self._create_multi_roll_embed(dice_notation, roll_results, ctx.author, set_author=False, embed_color=embed_color) - embed.title = f'At bat roll for {ctx.author.display_name}' + embed = self._create_multi_roll_embed( + dice_notation, + roll_results, + ctx.author, + set_author=False, + embed_color=embed_color, + ) + embed.title = f"At bat roll for {ctx.author.display_name}" await ctx.send(embed=embed) @discord.app_commands.command( name="scout", - description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type" + description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type", ) - @discord.app_commands.describe( - card_type="Type of card being scouted" + @discord.app_commands.describe(card_type="Type of card being scouted") + @discord.app_commands.choices( + card_type=[ + discord.app_commands.Choice(name="Batter", value="batter"), + discord.app_commands.Choice(name="Pitcher", value="pitcher"), + ] ) - @discord.app_commands.choices(card_type=[ - discord.app_commands.Choice(name="Batter", value="batter"), - discord.app_commands.Choice(name="Pitcher", value="pitcher") - ]) @logged_command("/scout") async def scout_dice( self, interaction: discord.Interaction, - card_type: discord.app_commands.Choice[str] + card_type: discord.app_commands.Choice[str], ): """Roll weighted scouting dice based on card type (batter or pitcher).""" await interaction.response.defer() @@ -228,33 +248,35 @@ class DiceRollCommands(commands.Cog): roll_results = self._roll_weighted_scout_dice(card_type_value) # Create embed for the roll results - embed = self._create_multi_roll_embed("1d6;2d6;1d20", roll_results, interaction.user, set_author=False) - embed.title = f'Scouting roll for {interaction.user.display_name}' + embed = self._create_multi_roll_embed( + "1d6;2d6;1d20", roll_results, interaction.user, set_author=False + ) + embed.title = f"Scouting roll for {interaction.user.display_name}" await interaction.followup.send(embed=embed) @discord.app_commands.command( name="fielding", - description="Roll Super Advanced fielding dice for a defensive position" + description="Roll Super Advanced fielding dice for a defensive position", ) - @discord.app_commands.describe( - position="Defensive position" + @discord.app_commands.describe(position="Defensive position") + @discord.app_commands.choices( + position=[ + discord.app_commands.Choice(name="Pitcher (P)", value="P"), + discord.app_commands.Choice(name="Catcher (C)", value="C"), + discord.app_commands.Choice(name="First Base (1B)", value="1B"), + discord.app_commands.Choice(name="Second Base (2B)", value="2B"), + discord.app_commands.Choice(name="Third Base (3B)", value="3B"), + discord.app_commands.Choice(name="Shortstop (SS)", value="SS"), + discord.app_commands.Choice(name="Left Field (LF)", value="LF"), + discord.app_commands.Choice(name="Center Field (CF)", value="CF"), + discord.app_commands.Choice(name="Right Field (RF)", value="RF"), + ] ) - @discord.app_commands.choices(position=[ - discord.app_commands.Choice(name="Pitcher (P)", value="P"), - discord.app_commands.Choice(name="Catcher (C)", value="C"), - discord.app_commands.Choice(name="First Base (1B)", value="1B"), - discord.app_commands.Choice(name="Second Base (2B)", value="2B"), - discord.app_commands.Choice(name="Third Base (3B)", value="3B"), - discord.app_commands.Choice(name="Shortstop (SS)", value="SS"), - discord.app_commands.Choice(name="Left Field (LF)", value="LF"), - discord.app_commands.Choice(name="Center Field (CF)", value="CF"), - discord.app_commands.Choice(name="Right Field (RF)", value="RF") - ]) @logged_command("/fielding") async def fielding_roll( self, interaction: discord.Interaction, - position: discord.app_commands.Choice[str] + position: discord.app_commands.Choice[str], ): """Roll Super Advanced fielding dice for a defensive position.""" await interaction.response.defer() @@ -268,16 +290,25 @@ class DiceRollCommands(commands.Cog): roll_results = parse_and_roll_multiple_dice(dice_notation) # Create fielding embed - embed = self._create_fielding_embed(pos_value, roll_results, interaction.user, embed_color) + embed = self._create_fielding_embed( + pos_value, roll_results, interaction.user, embed_color + ) await interaction.followup.send(embed=embed) @commands.command(name="f", aliases=["fielding", "saf"]) - async def fielding_roll_prefix(self, ctx: commands.Context, position: str | None = None): + async def fielding_roll_prefix( + self, ctx: commands.Context, position: str | None = None + ): """Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf).""" - self.logger.info(f"SA Fielding command started by {ctx.author.display_name}", position=position) + self.logger.info( + f"SA Fielding command started by {ctx.author.display_name}", + position=position, + ) if position is None: - await ctx.send("❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`") + await ctx.send( + "❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`" + ) return # Parse and validate position @@ -290,15 +321,19 @@ class DiceRollCommands(commands.Cog): dice_notation = "1d20;3d6;1d100" roll_results = parse_and_roll_multiple_dice(dice_notation) - self.logger.info("SA Fielding dice rolled successfully", position=parsed_position, d20=roll_results[0].total, d6_total=roll_results[1].total) + self.logger.info( + "SA Fielding dice rolled successfully", + position=parsed_position, + d20=roll_results[0].total, + d6_total=roll_results[1].total, + ) # Create fielding embed embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author) await ctx.send(embed=embed) @discord.app_commands.command( - name="jump", - description="Roll for baserunner's jump before stealing" + name="jump", description="Roll for baserunner's jump before stealing" ) @logged_command("/jump") async def jump_dice(self, interaction: discord.Interaction): @@ -322,7 +357,7 @@ class DiceRollCommands(commands.Cog): resolution_roll, interaction.user, embed_color, - show_author=False + show_author=False, ) await interaction.followup.send(embed=embed) @@ -344,31 +379,47 @@ class DiceRollCommands(commands.Cog): # Roll another 1d20 for pickoff/balk resolution resolution_roll = random.randint(1, 20) - self.logger.info("Jump dice rolled successfully", check=check_roll, jump=jump_result.total if jump_result else None, resolution=resolution_roll) + self.logger.info( + "Jump dice rolled successfully", + check=check_roll, + jump=jump_result.total if jump_result else None, + resolution=resolution_roll, + ) # Create embed based on check roll embed = self._create_jump_embed( - check_roll, - jump_result, - resolution_roll, - ctx.author, - embed_color + check_roll, jump_result, resolution_roll, ctx.author, embed_color ) await ctx.send(embed=embed) async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int: + channel_id = interaction.channel_id + if channel_id is not None and channel_id in _channel_color_cache: + cached_color, cached_at = _channel_color_cache[channel_id] + if time.time() - cached_at < _CHANNEL_COLOR_CACHE_TTL: + return cached_color + # Check if channel is a type that has a name attribute (DMChannel doesn't have one) - if isinstance(interaction.channel, (discord.TextChannel, discord.VoiceChannel, discord.Thread)): + if isinstance( + interaction.channel, + (discord.TextChannel, discord.VoiceChannel, discord.Thread), + ): channel_starter = interaction.channel.name[:6] - if '-' in channel_starter: - abbrev = channel_starter.split('-')[0] + if "-" in channel_starter: + abbrev = channel_starter.split("-")[0] channel_team = await team_service.get_team_by_abbrev(abbrev) if channel_team is not None and channel_team.color is not None: - return int(channel_team.color,16) + color = int(channel_team.color, 16) + if channel_id is not None: + _channel_color_cache[channel_id] = (color, time.time()) + return color team = await get_user_major_league_team(user_id=interaction.user.id) if team is not None and team.color is not None: - return int(team.color,16) + color = int(team.color, 16) + if channel_id is not None: + _channel_color_cache[channel_id] = (color, time.time()) + return color return EmbedColors.PRIMARY @@ -381,25 +432,44 @@ class DiceRollCommands(commands.Cog): # Map common inputs to standard position names position_map = { - 'P': 'P', 'PITCHER': 'P', - 'C': 'C', 'CATCHER': 'C', - '1': '1B', '1B': '1B', 'FIRST': '1B', 'FIRSTBASE': '1B', - '2': '2B', '2B': '2B', 'SECOND': '2B', 'SECONDBASE': '2B', - '3': '3B', '3B': '3B', 'THIRD': '3B', 'THIRDBASE': '3B', - 'SS': 'SS', 'SHORT': 'SS', 'SHORTSTOP': 'SS', - 'LF': 'LF', 'LEFT': 'LF', 'LEFTFIELD': 'LF', - 'CF': 'CF', 'CENTER': 'CF', 'CENTERFIELD': 'CF', - 'RF': 'RF', 'RIGHT': 'RF', 'RIGHTFIELD': 'RF' + "P": "P", + "PITCHER": "P", + "C": "C", + "CATCHER": "C", + "1": "1B", + "1B": "1B", + "FIRST": "1B", + "FIRSTBASE": "1B", + "2": "2B", + "2B": "2B", + "SECOND": "2B", + "SECONDBASE": "2B", + "3": "3B", + "3B": "3B", + "THIRD": "3B", + "THIRDBASE": "3B", + "SS": "SS", + "SHORT": "SS", + "SHORTSTOP": "SS", + "LF": "LF", + "LEFT": "LF", + "LEFTFIELD": "LF", + "CF": "CF", + "CENTER": "CF", + "CENTERFIELD": "CF", + "RF": "RF", + "RIGHT": "RF", + "RIGHTFIELD": "RF", } return position_map.get(pos) def _create_fielding_embed( - self, - position: str, - roll_results: list[DiceRoll], - user: discord.User | discord.Member, - embed_color: int = EmbedColors.PRIMARY + self, + position: str, + roll_results: list[DiceRoll], + user: discord.User | discord.Member, + embed_color: int = EmbedColors.PRIMARY, ) -> discord.Embed: """Create an embed for fielding roll results.""" d20_result = roll_results[0].total @@ -409,15 +479,11 @@ class DiceRollCommands(commands.Cog): # Create base embed embed = EmbedTemplate.create_base_embed( - title=f"SA Fielding roll for {user.display_name}", - color=embed_color + title=f"SA Fielding roll for {user.display_name}", color=embed_color ) # Set user info - embed.set_author( - name=user.display_name, - icon_url=user.display_avatar.url - ) + embed.set_author(name=user.display_name, icon_url=user.display_avatar.url) # Add dice results in standard format dice_notation = "1d20;3d6" @@ -425,18 +491,14 @@ class DiceRollCommands(commands.Cog): # Extract just the dice result part from the field dice_field_value = embed_dice.fields[0].value - embed.add_field( - name="Dice Results", - value=dice_field_value, - inline=False - ) + embed.add_field(name="Dice Results", value=dice_field_value, inline=False) # Add fielding check summary range_result = self._get_range_result(position, d20_result) embed.add_field( name=f"{position} Range Result", value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```", - inline=False + inline=False, ) # Add rare play or error result @@ -457,19 +519,15 @@ class DiceRollCommands(commands.Cog): field_name = base_field_name # Add part indicator if multiple chunks if len(result_chunks) > 1: - field_name += f" (Part {i+1}/{len(result_chunks)})" + field_name += f" (Part {i + 1}/{len(result_chunks)})" - embed.add_field( - name=field_name, - value=chunk, - inline=False - ) + embed.add_field(name=field_name, value=chunk, inline=False) # Add help commands embed.add_field( name="Help Commands", value="Run /charts for full chart readout", - inline=False + inline=False, ) # # Add references @@ -488,125 +546,115 @@ class DiceRollCommands(commands.Cog): resolution_roll: int, user: discord.User | discord.Member, embed_color: int = EmbedColors.PRIMARY, - show_author: bool = True + show_author: bool = True, ) -> discord.Embed: """Create an embed for jump roll results.""" # Create base embed embed = EmbedTemplate.create_base_embed( - title=f"Jump roll for {user.name}", - color=embed_color + title=f"Jump roll for {user.name}", color=embed_color ) if show_author: # Set user info - embed.set_author( - name=user.name, - icon_url=user.display_avatar.url - ) + embed.set_author(name=user.name, icon_url=user.display_avatar.url) # Check for pickoff or balk if check_roll == 1: # Pickoff attempt embed.add_field( - name="Special", - value="```md\nCheck pickoff```", - inline=False + name="Special", value="```md\nCheck pickoff```", inline=False ) embed.add_field( name="Pickoff roll", value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```", - inline=False + inline=False, ) elif check_roll == 2: # Balk - embed.add_field( - name="Special", - value="```md\nCheck balk```", - inline=False - ) + embed.add_field(name="Special", value="```md\nCheck balk```", inline=False) embed.add_field( name="Balk roll", value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```", - inline=False + inline=False, ) else: # Normal jump - show 2d6 result if jump_result: - rolls_str = ' '.join(str(r) for r in jump_result.rolls) + rolls_str = " ".join(str(r) for r in jump_result.rolls) embed.add_field( name="Result", value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```", - inline=False + inline=False, ) return embed def _get_range_result(self, position: str, d20_roll: int) -> str: """Get the range result display for a position and d20 roll.""" - if position == 'P': + if position == "P": return self._get_pitcher_range(d20_roll) - elif position in ['1B', '2B', '3B', 'SS']: + elif position in ["1B", "2B", "3B", "SS"]: return self._get_infield_range(d20_roll) - elif position in ['LF', 'CF', 'RF']: + elif position in ["LF", "CF", "RF"]: return self._get_outfield_range(d20_roll) - elif position == 'C': + elif position == "C": return self._get_catcher_range(d20_roll) return "Unknown position" def _get_infield_range(self, d20_roll: int) -> str: """Get infield range result based on d20 roll.""" - return INFIELD_RANGES.get(d20_roll, 'Unknown') + return INFIELD_RANGES.get(d20_roll, "Unknown") def _get_outfield_range(self, d20_roll: int) -> str: """Get outfield range result based on d20 roll.""" - return OUTFIELD_RANGES.get(d20_roll, 'Unknown') + return OUTFIELD_RANGES.get(d20_roll, "Unknown") def _get_catcher_range(self, d20_roll: int) -> str: """Get catcher range result based on d20 roll.""" - return CATCHER_RANGES.get(d20_roll, 'Unknown') + return CATCHER_RANGES.get(d20_roll, "Unknown") def _get_pitcher_range(self, d20_roll: int) -> str: """Get pitcher range result based on d20 roll.""" - return PITCHER_RANGES.get(d20_roll, 'Unknown') - + return PITCHER_RANGES.get(d20_roll, "Unknown") + def _get_rare_play(self, position: str, d20_total: int) -> str: """Get the rare play result for a position and d20 total""" - starter = 'Rare play! Take the range result from above and consult the chart below.\n\n' - if position == 'P': + starter = "Rare play! Take the range result from above and consult the chart below.\n\n" + if position == "P": return starter + self._get_pitcher_rare_play(d20_total) - elif position == '1B': + elif position == "1B": return starter + self._get_infield_rare_play(d20_total) - elif position == '2B': + elif position == "2B": return starter + self._get_infield_rare_play(d20_total) - elif position == '3B': + elif position == "3B": return starter + self._get_infield_rare_play(d20_total) - elif position == 'SS': + elif position == "SS": return starter + self._get_infield_rare_play(d20_total) - elif position in ['LF', 'RF']: + elif position in ["LF", "RF"]: return starter + self._get_outfield_rare_play(d20_total) - elif position == 'CF': + elif position == "CF": return starter + self._get_outfield_rare_play(d20_total) - - raise ValueError(f'Unknown position: {position}') + + raise ValueError(f"Unknown position: {position}") def _get_error_result(self, position: str, d6_total: int) -> str: """Get the error result for a position and 3d6 total.""" # Get the appropriate error chart - if position == 'P': + if position == "P": return self._get_pitcher_error(d6_total) - elif position == '1B': + elif position == "1B": return self._get_1b_error(d6_total) - elif position == '2B': + elif position == "2B": return self._get_2b_error(d6_total) - elif position == '3B': + elif position == "3B": return self._get_3b_error(d6_total) - elif position == 'SS': + elif position == "SS": return self._get_ss_error(d6_total) - elif position in ['LF', 'RF']: + elif position in ["LF", "RF"]: return self._get_corner_of_error(d6_total) - elif position == 'CF': + elif position == "CF": return self._get_cf_error(d6_total) - elif position == 'C': + elif position == "C": return self._get_catcher_error(d6_total) # Should never reach here due to position validation, but follow "Raise or Return" pattern @@ -614,65 +662,64 @@ class DiceRollCommands(commands.Cog): def _get_3b_error(self, d6_total: int) -> str: """Get 3B error result based on 3d6 total.""" - return THIRD_BASE_ERRORS.get(d6_total, 'No error') + return THIRD_BASE_ERRORS.get(d6_total, "No error") def _get_1b_error(self, d6_total: int) -> str: """Get 1B error result based on 3d6 total.""" - return FIRST_BASE_ERRORS.get(d6_total, 'No error') + return FIRST_BASE_ERRORS.get(d6_total, "No error") def _get_2b_error(self, d6_total: int) -> str: """Get 2B error result based on 3d6 total.""" - return SECOND_BASE_ERRORS.get(d6_total, 'No error') + return SECOND_BASE_ERRORS.get(d6_total, "No error") def _get_ss_error(self, d6_total: int) -> str: """Get SS error result based on 3d6 total.""" - return SHORTSTOP_ERRORS.get(d6_total, 'No error') + return SHORTSTOP_ERRORS.get(d6_total, "No error") def _get_corner_of_error(self, d6_total: int) -> str: """Get LF/RF error result based on 3d6 total.""" - return CORNER_OUTFIELD_ERRORS.get(d6_total, 'No error') + return CORNER_OUTFIELD_ERRORS.get(d6_total, "No error") def _get_cf_error(self, d6_total: int) -> str: """Get CF error result based on 3d6 total.""" - return CENTER_FIELD_ERRORS.get(d6_total, 'No error') + return CENTER_FIELD_ERRORS.get(d6_total, "No error") def _get_catcher_error(self, d6_total: int) -> str: """Get Catcher error result based on 3d6 total.""" - return CATCHER_ERRORS.get(d6_total, 'No error') + return CATCHER_ERRORS.get(d6_total, "No error") def _get_pitcher_error(self, d6_total: int) -> str: """Get Pitcher error result based on 3d6 total.""" - return PITCHER_ERRORS.get(d6_total, 'No error') + return PITCHER_ERRORS.get(d6_total, "No error") def _get_pitcher_rare_play(self, d20_total: int) -> str: return ( - f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n' - f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n' - f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n' - f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n' + f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n" + f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n" + f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n" + f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n" ) def _get_infield_rare_play(self, d20_total: int) -> str: return ( - f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n' - f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n' - f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n' - f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n' - f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n' + f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n" + f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n" + f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n" + f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n" + f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n" ) def _get_outfield_rare_play(self, d20_total: int) -> str: return ( - f'**F1**: {OUTFIELD_X_CHART["f1"]["rp"]}\n' - f'**F2**: {OUTFIELD_X_CHART["f2"]["rp"]}\n' - f'**F3**: {OUTFIELD_X_CHART["f3"]["rp"]}\n' - f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n' - f'**DO2**: {OUTFIELD_X_CHART["do2"]["rp"]}\n' - f'**DO3**: {OUTFIELD_X_CHART["do3"]["rp"]}\n' - f'**TR3**: {OUTFIELD_X_CHART["tr3"]["rp"]}\n' + f"**F1**: {OUTFIELD_X_CHART['f1']['rp']}\n" + f"**F2**: {OUTFIELD_X_CHART['f2']['rp']}\n" + f"**F3**: {OUTFIELD_X_CHART['f3']['rp']}\n" + f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n" + f"**DO2**: {OUTFIELD_X_CHART['do2']['rp']}\n" + f"**DO3**: {OUTFIELD_X_CHART['do3']['rp']}\n" + f"**TR3**: {OUTFIELD_X_CHART['tr3']['rp']}\n" ) - def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]: """ Roll scouting dice with weighted first d6 based on card type. @@ -690,11 +737,11 @@ class DiceRollCommands(commands.Cog): first_roll = random.randint(4, 6) first_d6_result = DiceRoll( - dice_notation='1d6', + dice_notation="1d6", num_dice=1, die_sides=6, rolls=[first_roll], - total=first_roll + total=first_roll, ) # Second roll (2d6) - normal @@ -705,19 +752,20 @@ class DiceRollCommands(commands.Cog): return [first_d6_result, second_result, third_result] - def _create_multi_roll_embed(self, dice_notation: str, roll_results: list[DiceRoll], user: discord.User | discord.Member, set_author: bool = True, embed_color: int = EmbedColors.PRIMARY) -> discord.Embed: + def _create_multi_roll_embed( + self, + dice_notation: str, + roll_results: list[DiceRoll], + user: discord.User | discord.Member, + set_author: bool = True, + embed_color: int = EmbedColors.PRIMARY, + ) -> discord.Embed: """Create an embed for multiple dice roll results.""" - embed = EmbedTemplate.create_base_embed( - title="🎲 Dice Roll", - color=embed_color - ) + embed = EmbedTemplate.create_base_embed(title="🎲 Dice Roll", color=embed_color) if set_author: # Set user info - embed.set_author( - name=user.name, - icon_url=user.display_avatar.url - ) + embed.set_author(name=user.name, icon_url=user.display_avatar.url) # Create summary line with totals totals = [str(result.total) for result in roll_results] @@ -735,19 +783,16 @@ class DiceRollCommands(commands.Cog): roll_groups.append(str(rolls[0])) else: # Multiple dice: space-separated within the group - roll_groups.append(' '.join(str(r) for r in rolls)) + roll_groups.append(" ".join(str(r) for r in rolls)) details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]" # Set as description - embed.add_field( - name='Result', - value=f"```md\n{summary}\n{details}```" - ) + embed.add_field(name="Result", value=f"```md\n{summary}\n{details}```") return embed async def setup(bot: commands.Bot): """Load the dice roll commands cog.""" - await bot.add_cog(DiceRollCommands(bot)) \ No newline at end of file + await bot.add_cog(DiceRollCommands(bot)) diff --git a/services/league_service.py b/services/league_service.py index 04e9ca3..9c84a45 100644 --- a/services/league_service.py +++ b/services/league_service.py @@ -3,6 +3,7 @@ League service for Discord Bot v2.0 Handles league-wide operations including current state, standings, and season information. """ + import logging from typing import Optional, List, Dict, Any @@ -10,25 +11,27 @@ from config import get_config from services.base_service import BaseService from models.current import Current from exceptions import APIException +from utils.decorators import cached_single_item -logger = logging.getLogger(f'{__name__}.LeagueService') +logger = logging.getLogger(f"{__name__}.LeagueService") class LeagueService(BaseService[Current]): """ Service for league-wide operations. - + Features: - Current league state retrieval - Season standings - League-wide statistics """ - + def __init__(self): """Initialize league service.""" - super().__init__(Current, 'current') + super().__init__(Current, "current") logger.debug("LeagueService initialized") - + + @cached_single_item(ttl=60) async def get_current_state(self) -> Optional[Current]: """ Get the current league state including week, season, and settings. @@ -38,11 +41,13 @@ class LeagueService(BaseService[Current]): """ try: client = await self.get_client() - data = await client.get('current') + data = await client.get("current") if data: current = Current.from_api_data(data) - logger.debug(f"Retrieved current state: Week {current.week}, Season {current.season}") + logger.debug( + f"Retrieved current state: Week {current.week}, Season {current.season}" + ) return current logger.debug("No current state data found") @@ -53,9 +58,7 @@ class LeagueService(BaseService[Current]): return None async def update_current_state( - self, - week: Optional[int] = None, - freeze: Optional[bool] = None + self, week: Optional[int] = None, freeze: Optional[bool] = None ) -> Optional[Current]: """ Update current league state (week and/or freeze status). @@ -77,9 +80,9 @@ class LeagueService(BaseService[Current]): # Build update data update_data = {} if week is not None: - update_data['week'] = week + update_data["week"] = week if freeze is not None: - update_data['freeze'] = freeze + update_data["freeze"] = freeze if not update_data: logger.warning("update_current_state called with no updates") @@ -89,127 +92,152 @@ class LeagueService(BaseService[Current]): # (Current table has one row per season, NOT a single row with id=1) current = await self.get_current_state() if not current: - logger.error("Cannot update current state - unable to fetch current state") + logger.error( + "Cannot update current state - unable to fetch current state" + ) return None current_id = current.id - logger.debug(f"Updating current state id={current_id} (season {current.season})") + logger.debug( + f"Updating current state id={current_id} (season {current.season})" + ) # Use BaseService patch method - updated_current = await self.patch(current_id, update_data, use_query_params=True) + updated_current = await self.patch( + current_id, update_data, use_query_params=True + ) if updated_current: logger.info(f"Updated current state id={current_id}: {update_data}") return updated_current else: - logger.error(f"Failed to update current state id={current_id} - patch returned None") + logger.error( + f"Failed to update current state id={current_id} - patch returned None" + ) return None except Exception as e: logger.error(f"Error updating current state: {e}") raise APIException(f"Failed to update current state: {e}") - async def get_standings(self, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]: + async def get_standings( + self, season: Optional[int] = None + ) -> Optional[List[Dict[str, Any]]]: """ Get league standings for a season. - + Args: season: Season number (defaults to current season) - + Returns: List of standings data or None if not available """ try: season = season or get_config().sba_season client = await self.get_client() - data = await client.get('standings', params=[('season', str(season))]) - + data = await client.get("standings", params=[("season", str(season))]) + if data and isinstance(data, list): - logger.debug(f"Retrieved standings for season {season}: {len(data)} teams") + logger.debug( + f"Retrieved standings for season {season}: {len(data)} teams" + ) return data elif data and isinstance(data, dict): # Handle case where API returns a dict with standings array - standings_data = data.get('standings', data.get('items', [])) + standings_data = data.get("standings", data.get("items", [])) if standings_data: - logger.debug(f"Retrieved standings for season {season}: {len(standings_data)} teams") + logger.debug( + f"Retrieved standings for season {season}: {len(standings_data)} teams" + ) return standings_data - + logger.debug(f"No standings data found for season {season}") return None - + except Exception as e: logger.error(f"Failed to get standings for season {season}: {e}") return None - - async def get_division_standings(self, division_id: int, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]: + + async def get_division_standings( + self, division_id: int, season: Optional[int] = None + ) -> Optional[List[Dict[str, Any]]]: """ Get standings for a specific division. - + Args: division_id: Division identifier season: Season number (defaults to current season) - + Returns: List of division standings or None if not available """ try: season = season or get_config().sba_season client = await self.get_client() - data = await client.get(f'standings/division/{division_id}', params=[('season', str(season))]) - + data = await client.get( + f"standings/division/{division_id}", params=[("season", str(season))] + ) + if data and isinstance(data, list): - logger.debug(f"Retrieved division {division_id} standings for season {season}: {len(data)} teams") + logger.debug( + f"Retrieved division {division_id} standings for season {season}: {len(data)} teams" + ) return data - - logger.debug(f"No division standings found for division {division_id}, season {season}") + + logger.debug( + f"No division standings found for division {division_id}, season {season}" + ) return None - + except Exception as e: logger.error(f"Failed to get division {division_id} standings: {e}") return None - - async def get_league_leaders(self, stat_type: str = 'batting', season: Optional[int] = None, limit: int = 10) -> Optional[List[Dict[str, Any]]]: + + async def get_league_leaders( + self, stat_type: str = "batting", season: Optional[int] = None, limit: int = 10 + ) -> Optional[List[Dict[str, Any]]]: """ Get league leaders for a specific statistic category. - + Args: stat_type: Type of stats ('batting', 'pitching', 'fielding') season: Season number (defaults to current season) limit: Number of leaders to return - + Returns: List of league leaders or None if not available """ try: season = season or get_config().sba_season client = await self.get_client() - - params = [ - ('season', str(season)), - ('limit', str(limit)) - ] - - data = await client.get(f'leaders/{stat_type}', params=params) - + + params = [("season", str(season)), ("limit", str(limit))] + + data = await client.get(f"leaders/{stat_type}", params=params) + if data: # Handle different response formats if isinstance(data, list): leaders = data elif isinstance(data, dict): - leaders = data.get('leaders', data.get('items', data.get('results', []))) + leaders = data.get( + "leaders", data.get("items", data.get("results", [])) + ) else: leaders = [] - - logger.debug(f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players") + + logger.debug( + f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players" + ) return leaders[:limit] # Ensure we don't exceed limit - + logger.debug(f"No {stat_type} leaders found for season {season}") return None - + except Exception as e: logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}") return None # Global service instance -league_service = LeagueService() \ No newline at end of file +league_service = LeagueService() diff --git a/services/player_service.py b/services/player_service.py index 0877121..3abd39c 100644 --- a/services/player_service.py +++ b/services/player_service.py @@ -11,6 +11,7 @@ from config import get_config from services.base_service import BaseService from models.player import Player from exceptions import APIException +from utils.decorators import cached_api_call if TYPE_CHECKING: from services.team_service import TeamService @@ -270,6 +271,7 @@ class PlayerService(BaseService[Player]): logger.error(f"Error in fuzzy search for '{query}': {e}") return [] + @cached_api_call(ttl=300) async def get_free_agents(self, season: int) -> List[Player]: """ Get all free agent players. @@ -372,10 +374,7 @@ class PlayerService(BaseService[Player]): return None async def update_player_team( - self, - player_id: int, - new_team_id: int, - dem_week: Optional[int] = None + self, player_id: int, new_team_id: int, dem_week: Optional[int] = None ) -> Optional[Player]: """ Update a player's team assignment. diff --git a/services/standings_service.py b/services/standings_service.py index 4240c61..7dc8b3d 100644 --- a/services/standings_service.py +++ b/services/standings_service.py @@ -5,7 +5,8 @@ Handles team standings retrieval and processing. """ import logging -from typing import Optional, List, Dict +import time +from typing import Optional, List, Dict, Tuple from api.client import get_global_client from models.standings import TeamStandings @@ -13,6 +14,10 @@ from exceptions import APIException logger = logging.getLogger(f"{__name__}.StandingsService") +# In-memory cache for standings: season -> (standings_list, cached_at) +_standings_cache: Dict[int, Tuple[List[TeamStandings], float]] = {} +_STANDINGS_CACHE_TTL = 600 # 10 minutes + class StandingsService: """ @@ -45,6 +50,13 @@ class StandingsService: List of TeamStandings ordered by record """ try: + # Check in-memory cache first + if season in _standings_cache: + cached_standings, cached_at = _standings_cache[season] + if time.time() - cached_at < _STANDINGS_CACHE_TTL: + logger.debug(f"Cache hit for standings season {season}") + return cached_standings + client = await self.get_client() params = [("season", str(season))] @@ -72,6 +84,10 @@ class StandingsService: logger.info( f"Retrieved standings for {len(standings)} teams in season {season}" ) + + # Cache the result + _standings_cache[season] = (standings, time.time()) + return standings except Exception as e: