perf: add caching for frequently-accessed stable data (#91)

Closes #91

- league_service.get_current_state(): @cached_single_item(ttl=60) — 60s Redis cache
- standings_service.get_league_standings(): in-memory dict cache with 10-minute TTL keyed by season
- player_service.get_free_agents(): @cached_api_call(ttl=300) — 5-minute Redis cache
- dice/rolls.py _get_channel_embed_color(): in-memory dict cache keyed by channel_id with 5-minute TTL, matching the autocomplete.py pattern from PR #100

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-03-21 07:35:40 -05:00
parent 6889499fff
commit 327570e8cc
4 changed files with 349 additions and 261 deletions

View File

@ -3,7 +3,10 @@ Dice Rolling Commands
Implements slash commands for dice rolling functionality required for gameplay. Implements slash commands for dice rolling functionality required for gameplay.
""" """
import random import random
import time
from typing import Dict, Tuple
import discord import discord
from discord.ext import commands from discord.ext import commands
@ -13,7 +16,11 @@ from utils.logging import get_contextual_logger
from utils.decorators import logged_command from utils.decorators import logged_command
from utils.team_utils import get_user_major_league_team from utils.team_utils import get_user_major_league_team
from utils.text_utils import split_text_for_fields from utils.text_utils import split_text_for_fields
from utils.dice_utils import DiceRoll, parse_and_roll_multiple_dice, parse_and_roll_single_dice from utils.dice_utils import (
DiceRoll,
parse_and_roll_multiple_dice,
parse_and_roll_single_dice,
)
from views.embeds import EmbedColors, EmbedTemplate from views.embeds import EmbedColors, EmbedTemplate
from commands.dev.loaded_dice import get_and_consume_loaded_roll from commands.dev.loaded_dice import get_and_consume_loaded_roll
from .chart_data import ( from .chart_data import (
@ -33,26 +40,27 @@ from .chart_data import (
PITCHER_ERRORS, PITCHER_ERRORS,
) )
# In-memory cache for channel embed color lookups: channel_id -> (color, cached_at)
_channel_color_cache: Dict[int, Tuple[int, float]] = {}
_CHANNEL_COLOR_CACHE_TTL = 300 # 5 minutes
class DiceRollCommands(commands.Cog): class DiceRollCommands(commands.Cog):
"""Dice rolling command handlers for gameplay.""" """Dice rolling command handlers for gameplay."""
def __init__(self, bot: commands.Bot): def __init__(self, bot: commands.Bot):
self.bot = bot self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DiceRollCommands') self.logger = get_contextual_logger(f"{__name__}.DiceRollCommands")
@discord.app_commands.command( @discord.app_commands.command(
name="roll", name="roll",
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)" description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)",
) )
@discord.app_commands.describe( @discord.app_commands.describe(
dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)" dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)"
) )
@logged_command("/roll") @logged_command("/roll")
async def roll_dice( async def roll_dice(self, interaction: discord.Interaction, dice: str):
self,
interaction: discord.Interaction,
dice: str
):
"""Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon.""" """Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon."""
await interaction.response.defer() await interaction.response.defer()
@ -61,7 +69,7 @@ class DiceRollCommands(commands.Cog):
if not roll_results: if not roll_results:
await interaction.followup.send( await interaction.followup.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20", "❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20",
ephemeral=True ephemeral=True,
) )
return return
@ -72,18 +80,24 @@ class DiceRollCommands(commands.Cog):
@commands.command(name="roll", aliases=["r", "dice"]) @commands.command(name="roll", aliases=["r", "dice"])
async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None): async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None):
"""Roll dice using prefix commands (!roll, !r, !dice).""" """Roll dice using prefix commands (!roll, !r, !dice)."""
self.logger.info(f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice) self.logger.info(
f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice
)
if dice is None: if dice is None:
self.logger.debug("No dice input provided") self.logger.debug("No dice input provided")
await ctx.send("❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`") await ctx.send(
"❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`"
)
return return
# Parse and validate dice notation (supports multiple rolls) # Parse and validate dice notation (supports multiple rolls)
roll_results = parse_and_roll_multiple_dice(dice) roll_results = parse_and_roll_multiple_dice(dice)
if not roll_results: if not roll_results:
self.logger.warning("Invalid dice notation provided", dice_input=dice) self.logger.warning("Invalid dice notation provided", dice_input=dice)
await ctx.send("❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20") await ctx.send(
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20"
)
return return
self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results)) self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results))
@ -92,10 +106,7 @@ class DiceRollCommands(commands.Cog):
embed = self._create_multi_roll_embed(dice, roll_results, ctx.author) embed = self._create_multi_roll_embed(dice, roll_results, ctx.author)
await ctx.send(embed=embed) await ctx.send(embed=embed)
@discord.app_commands.command( @discord.app_commands.command(name="d20", description="Roll a single d20")
name="d20",
description="Roll a single d20"
)
@logged_command("/d20") @logged_command("/d20")
async def d20_dice(self, interaction: discord.Interaction): async def d20_dice(self, interaction: discord.Interaction):
"""Roll a single d20.""" """Roll a single d20."""
@ -112,15 +123,14 @@ class DiceRollCommands(commands.Cog):
roll_results, roll_results,
interaction.user, interaction.user,
set_author=False, set_author=False,
embed_color=embed_color embed_color=embed_color,
) )
embed.title = f'd20 roll for {interaction.user.display_name}' embed.title = f"d20 roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
@discord.app_commands.command( @discord.app_commands.command(
name="ab", name="ab", description="Roll baseball at-bat dice (1d6;2d6;1d20)"
description="Roll baseball at-bat dice (1d6;2d6;1d20)"
) )
@logged_command("/ab") @logged_command("/ab")
async def ab_dice(self, interaction: discord.Interaction): async def ab_dice(self, interaction: discord.Interaction):
@ -137,7 +147,7 @@ class DiceRollCommands(commands.Cog):
# Create DiceRoll objects from loaded values # Create DiceRoll objects from loaded values
# For 2d6, we split the total into two dice (arbitrary split that sums correctly) # For 2d6, we split the total into two dice (arbitrary split that sums correctly)
d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6) d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6)
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
roll_results = [ roll_results = [
DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1), DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1),
DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total), DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total),
@ -149,17 +159,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d6;2d6;1d20" dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation) roll_results = parse_and_roll_multiple_dice(dice_notation)
injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12]) injury_risk = (roll_results[0].total == 6) and (
roll_results[1].total in [7, 8, 9, 10, 11, 12]
)
d6_total = roll_results[1].total d6_total = roll_results[1].total
embed_title = 'At bat roll' embed_title = "At bat roll"
if roll_results[2].total == 1: if roll_results[2].total == 1:
embed_title = 'Wild pitch roll' embed_title = "Wild pitch roll"
dice_notation = '1d20' dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)] roll_results = [parse_and_roll_single_dice(dice_notation)]
elif roll_results[2].total == 2: elif roll_results[2].total == 2:
embed_title = 'PB roll' embed_title = "PB roll"
dice_notation = '1d20' dice_notation = "1d20"
roll_results = [parse_and_roll_single_dice(dice_notation)] roll_results = [parse_and_roll_single_dice(dice_notation)]
# Create embed for the roll results # Create embed for the roll results
@ -168,15 +180,15 @@ class DiceRollCommands(commands.Cog):
roll_results, roll_results,
interaction.user, interaction.user,
set_author=False, set_author=False,
embed_color=embed_color embed_color=embed_color,
) )
embed.title = f'{embed_title} for {interaction.user.display_name}' embed.title = f"{embed_title} for {interaction.user.display_name}"
if injury_risk and embed_title == 'At bat roll': if injury_risk and embed_title == "At bat roll":
embed.add_field( embed.add_field(
name=f'Check injury for pitcher injury rating {13 - d6_total}', name=f"Check injury for pitcher injury rating {13 - d6_total}",
value='Oops! All injuries!', value="Oops! All injuries!",
inline=False inline=False,
) )
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
@ -188,35 +200,43 @@ class DiceRollCommands(commands.Cog):
team = await get_user_major_league_team(user_id=ctx.author.id) team = await get_user_major_league_team(user_id=ctx.author.id)
embed_color = EmbedColors.PRIMARY embed_color = EmbedColors.PRIMARY
if team is not None and team.color is not None: if team is not None and team.color is not None:
embed_color = int(team.color,16) embed_color = int(team.color, 16)
# Use the standard baseball dice combination # Use the standard baseball dice combination
dice_notation = "1d6;2d6;1d20" dice_notation = "1d6;2d6;1d20"
roll_results = parse_and_roll_multiple_dice(dice_notation) roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("At Bat dice rolled successfully", roll_count=len(roll_results)) self.logger.info(
"At Bat dice rolled successfully", roll_count=len(roll_results)
)
# Create embed for the roll results # Create embed for the roll results
embed = self._create_multi_roll_embed(dice_notation, roll_results, ctx.author, set_author=False, embed_color=embed_color) embed = self._create_multi_roll_embed(
embed.title = f'At bat roll for {ctx.author.display_name}' dice_notation,
roll_results,
ctx.author,
set_author=False,
embed_color=embed_color,
)
embed.title = f"At bat roll for {ctx.author.display_name}"
await ctx.send(embed=embed) await ctx.send(embed=embed)
@discord.app_commands.command( @discord.app_commands.command(
name="scout", name="scout",
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type" description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type",
) )
@discord.app_commands.describe( @discord.app_commands.describe(card_type="Type of card being scouted")
card_type="Type of card being scouted" @discord.app_commands.choices(
card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher"),
]
) )
@discord.app_commands.choices(card_type=[
discord.app_commands.Choice(name="Batter", value="batter"),
discord.app_commands.Choice(name="Pitcher", value="pitcher")
])
@logged_command("/scout") @logged_command("/scout")
async def scout_dice( async def scout_dice(
self, self,
interaction: discord.Interaction, interaction: discord.Interaction,
card_type: discord.app_commands.Choice[str] card_type: discord.app_commands.Choice[str],
): ):
"""Roll weighted scouting dice based on card type (batter or pitcher).""" """Roll weighted scouting dice based on card type (batter or pitcher)."""
await interaction.response.defer() await interaction.response.defer()
@ -228,33 +248,35 @@ class DiceRollCommands(commands.Cog):
roll_results = self._roll_weighted_scout_dice(card_type_value) roll_results = self._roll_weighted_scout_dice(card_type_value)
# Create embed for the roll results # Create embed for the roll results
embed = self._create_multi_roll_embed("1d6;2d6;1d20", roll_results, interaction.user, set_author=False) embed = self._create_multi_roll_embed(
embed.title = f'Scouting roll for {interaction.user.display_name}' "1d6;2d6;1d20", roll_results, interaction.user, set_author=False
)
embed.title = f"Scouting roll for {interaction.user.display_name}"
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
@discord.app_commands.command( @discord.app_commands.command(
name="fielding", name="fielding",
description="Roll Super Advanced fielding dice for a defensive position" description="Roll Super Advanced fielding dice for a defensive position",
) )
@discord.app_commands.describe( @discord.app_commands.describe(position="Defensive position")
position="Defensive position" @discord.app_commands.choices(
position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF"),
]
) )
@discord.app_commands.choices(position=[
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
discord.app_commands.Choice(name="Catcher (C)", value="C"),
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
discord.app_commands.Choice(name="Right Field (RF)", value="RF")
])
@logged_command("/fielding") @logged_command("/fielding")
async def fielding_roll( async def fielding_roll(
self, self,
interaction: discord.Interaction, interaction: discord.Interaction,
position: discord.app_commands.Choice[str] position: discord.app_commands.Choice[str],
): ):
"""Roll Super Advanced fielding dice for a defensive position.""" """Roll Super Advanced fielding dice for a defensive position."""
await interaction.response.defer() await interaction.response.defer()
@ -268,16 +290,25 @@ class DiceRollCommands(commands.Cog):
roll_results = parse_and_roll_multiple_dice(dice_notation) roll_results = parse_and_roll_multiple_dice(dice_notation)
# Create fielding embed # Create fielding embed
embed = self._create_fielding_embed(pos_value, roll_results, interaction.user, embed_color) embed = self._create_fielding_embed(
pos_value, roll_results, interaction.user, embed_color
)
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
@commands.command(name="f", aliases=["fielding", "saf"]) @commands.command(name="f", aliases=["fielding", "saf"])
async def fielding_roll_prefix(self, ctx: commands.Context, position: str | None = None): async def fielding_roll_prefix(
self, ctx: commands.Context, position: str | None = None
):
"""Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf).""" """Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf)."""
self.logger.info(f"SA Fielding command started by {ctx.author.display_name}", position=position) self.logger.info(
f"SA Fielding command started by {ctx.author.display_name}",
position=position,
)
if position is None: if position is None:
await ctx.send("❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`") await ctx.send(
"❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`"
)
return return
# Parse and validate position # Parse and validate position
@ -290,15 +321,19 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d20;3d6;1d100" dice_notation = "1d20;3d6;1d100"
roll_results = parse_and_roll_multiple_dice(dice_notation) roll_results = parse_and_roll_multiple_dice(dice_notation)
self.logger.info("SA Fielding dice rolled successfully", position=parsed_position, d20=roll_results[0].total, d6_total=roll_results[1].total) self.logger.info(
"SA Fielding dice rolled successfully",
position=parsed_position,
d20=roll_results[0].total,
d6_total=roll_results[1].total,
)
# Create fielding embed # Create fielding embed
embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author) embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author)
await ctx.send(embed=embed) await ctx.send(embed=embed)
@discord.app_commands.command( @discord.app_commands.command(
name="jump", name="jump", description="Roll for baserunner's jump before stealing"
description="Roll for baserunner's jump before stealing"
) )
@logged_command("/jump") @logged_command("/jump")
async def jump_dice(self, interaction: discord.Interaction): async def jump_dice(self, interaction: discord.Interaction):
@ -322,7 +357,7 @@ class DiceRollCommands(commands.Cog):
resolution_roll, resolution_roll,
interaction.user, interaction.user,
embed_color, embed_color,
show_author=False show_author=False,
) )
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
@ -344,31 +379,47 @@ class DiceRollCommands(commands.Cog):
# Roll another 1d20 for pickoff/balk resolution # Roll another 1d20 for pickoff/balk resolution
resolution_roll = random.randint(1, 20) resolution_roll = random.randint(1, 20)
self.logger.info("Jump dice rolled successfully", check=check_roll, jump=jump_result.total if jump_result else None, resolution=resolution_roll) self.logger.info(
"Jump dice rolled successfully",
check=check_roll,
jump=jump_result.total if jump_result else None,
resolution=resolution_roll,
)
# Create embed based on check roll # Create embed based on check roll
embed = self._create_jump_embed( embed = self._create_jump_embed(
check_roll, check_roll, jump_result, resolution_roll, ctx.author, embed_color
jump_result,
resolution_roll,
ctx.author,
embed_color
) )
await ctx.send(embed=embed) await ctx.send(embed=embed)
async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int: async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int:
channel_id = interaction.channel_id
if channel_id is not None and channel_id in _channel_color_cache:
cached_color, cached_at = _channel_color_cache[channel_id]
if time.time() - cached_at < _CHANNEL_COLOR_CACHE_TTL:
return cached_color
# Check if channel is a type that has a name attribute (DMChannel doesn't have one) # Check if channel is a type that has a name attribute (DMChannel doesn't have one)
if isinstance(interaction.channel, (discord.TextChannel, discord.VoiceChannel, discord.Thread)): if isinstance(
interaction.channel,
(discord.TextChannel, discord.VoiceChannel, discord.Thread),
):
channel_starter = interaction.channel.name[:6] channel_starter = interaction.channel.name[:6]
if '-' in channel_starter: if "-" in channel_starter:
abbrev = channel_starter.split('-')[0] abbrev = channel_starter.split("-")[0]
channel_team = await team_service.get_team_by_abbrev(abbrev) channel_team = await team_service.get_team_by_abbrev(abbrev)
if channel_team is not None and channel_team.color is not None: if channel_team is not None and channel_team.color is not None:
return int(channel_team.color,16) color = int(channel_team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
team = await get_user_major_league_team(user_id=interaction.user.id) team = await get_user_major_league_team(user_id=interaction.user.id)
if team is not None and team.color is not None: if team is not None and team.color is not None:
return int(team.color,16) color = int(team.color, 16)
if channel_id is not None:
_channel_color_cache[channel_id] = (color, time.time())
return color
return EmbedColors.PRIMARY return EmbedColors.PRIMARY
@ -381,25 +432,44 @@ class DiceRollCommands(commands.Cog):
# Map common inputs to standard position names # Map common inputs to standard position names
position_map = { position_map = {
'P': 'P', 'PITCHER': 'P', "P": "P",
'C': 'C', 'CATCHER': 'C', "PITCHER": "P",
'1': '1B', '1B': '1B', 'FIRST': '1B', 'FIRSTBASE': '1B', "C": "C",
'2': '2B', '2B': '2B', 'SECOND': '2B', 'SECONDBASE': '2B', "CATCHER": "C",
'3': '3B', '3B': '3B', 'THIRD': '3B', 'THIRDBASE': '3B', "1": "1B",
'SS': 'SS', 'SHORT': 'SS', 'SHORTSTOP': 'SS', "1B": "1B",
'LF': 'LF', 'LEFT': 'LF', 'LEFTFIELD': 'LF', "FIRST": "1B",
'CF': 'CF', 'CENTER': 'CF', 'CENTERFIELD': 'CF', "FIRSTBASE": "1B",
'RF': 'RF', 'RIGHT': 'RF', 'RIGHTFIELD': 'RF' "2": "2B",
"2B": "2B",
"SECOND": "2B",
"SECONDBASE": "2B",
"3": "3B",
"3B": "3B",
"THIRD": "3B",
"THIRDBASE": "3B",
"SS": "SS",
"SHORT": "SS",
"SHORTSTOP": "SS",
"LF": "LF",
"LEFT": "LF",
"LEFTFIELD": "LF",
"CF": "CF",
"CENTER": "CF",
"CENTERFIELD": "CF",
"RF": "RF",
"RIGHT": "RF",
"RIGHTFIELD": "RF",
} }
return position_map.get(pos) return position_map.get(pos)
def _create_fielding_embed( def _create_fielding_embed(
self, self,
position: str, position: str,
roll_results: list[DiceRoll], roll_results: list[DiceRoll],
user: discord.User | discord.Member, user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed: ) -> discord.Embed:
"""Create an embed for fielding roll results.""" """Create an embed for fielding roll results."""
d20_result = roll_results[0].total d20_result = roll_results[0].total
@ -409,15 +479,11 @@ class DiceRollCommands(commands.Cog):
# Create base embed # Create base embed
embed = EmbedTemplate.create_base_embed( embed = EmbedTemplate.create_base_embed(
title=f"SA Fielding roll for {user.display_name}", title=f"SA Fielding roll for {user.display_name}", color=embed_color
color=embed_color
) )
# Set user info # Set user info
embed.set_author( embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
name=user.display_name,
icon_url=user.display_avatar.url
)
# Add dice results in standard format # Add dice results in standard format
dice_notation = "1d20;3d6" dice_notation = "1d20;3d6"
@ -425,18 +491,14 @@ class DiceRollCommands(commands.Cog):
# Extract just the dice result part from the field # Extract just the dice result part from the field
dice_field_value = embed_dice.fields[0].value dice_field_value = embed_dice.fields[0].value
embed.add_field( embed.add_field(name="Dice Results", value=dice_field_value, inline=False)
name="Dice Results",
value=dice_field_value,
inline=False
)
# Add fielding check summary # Add fielding check summary
range_result = self._get_range_result(position, d20_result) range_result = self._get_range_result(position, d20_result)
embed.add_field( embed.add_field(
name=f"{position} Range Result", name=f"{position} Range Result",
value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```", value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```",
inline=False inline=False,
) )
# Add rare play or error result # Add rare play or error result
@ -457,19 +519,15 @@ class DiceRollCommands(commands.Cog):
field_name = base_field_name field_name = base_field_name
# Add part indicator if multiple chunks # Add part indicator if multiple chunks
if len(result_chunks) > 1: if len(result_chunks) > 1:
field_name += f" (Part {i+1}/{len(result_chunks)})" field_name += f" (Part {i + 1}/{len(result_chunks)})"
embed.add_field( embed.add_field(name=field_name, value=chunk, inline=False)
name=field_name,
value=chunk,
inline=False
)
# Add help commands # Add help commands
embed.add_field( embed.add_field(
name="Help Commands", name="Help Commands",
value="Run /charts for full chart readout", value="Run /charts for full chart readout",
inline=False inline=False,
) )
# # Add references # # Add references
@ -488,125 +546,115 @@ class DiceRollCommands(commands.Cog):
resolution_roll: int, resolution_roll: int,
user: discord.User | discord.Member, user: discord.User | discord.Member,
embed_color: int = EmbedColors.PRIMARY, embed_color: int = EmbedColors.PRIMARY,
show_author: bool = True show_author: bool = True,
) -> discord.Embed: ) -> discord.Embed:
"""Create an embed for jump roll results.""" """Create an embed for jump roll results."""
# Create base embed # Create base embed
embed = EmbedTemplate.create_base_embed( embed = EmbedTemplate.create_base_embed(
title=f"Jump roll for {user.name}", title=f"Jump roll for {user.name}", color=embed_color
color=embed_color
) )
if show_author: if show_author:
# Set user info # Set user info
embed.set_author( embed.set_author(name=user.name, icon_url=user.display_avatar.url)
name=user.name,
icon_url=user.display_avatar.url
)
# Check for pickoff or balk # Check for pickoff or balk
if check_roll == 1: if check_roll == 1:
# Pickoff attempt # Pickoff attempt
embed.add_field( embed.add_field(
name="Special", name="Special", value="```md\nCheck pickoff```", inline=False
value="```md\nCheck pickoff```",
inline=False
) )
embed.add_field( embed.add_field(
name="Pickoff roll", name="Pickoff roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```", value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False inline=False,
) )
elif check_roll == 2: elif check_roll == 2:
# Balk # Balk
embed.add_field( embed.add_field(name="Special", value="```md\nCheck balk```", inline=False)
name="Special",
value="```md\nCheck balk```",
inline=False
)
embed.add_field( embed.add_field(
name="Balk roll", name="Balk roll",
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```", value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
inline=False inline=False,
) )
else: else:
# Normal jump - show 2d6 result # Normal jump - show 2d6 result
if jump_result: if jump_result:
rolls_str = ' '.join(str(r) for r in jump_result.rolls) rolls_str = " ".join(str(r) for r in jump_result.rolls)
embed.add_field( embed.add_field(
name="Result", name="Result",
value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```", value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```",
inline=False inline=False,
) )
return embed return embed
def _get_range_result(self, position: str, d20_roll: int) -> str: def _get_range_result(self, position: str, d20_roll: int) -> str:
"""Get the range result display for a position and d20 roll.""" """Get the range result display for a position and d20 roll."""
if position == 'P': if position == "P":
return self._get_pitcher_range(d20_roll) return self._get_pitcher_range(d20_roll)
elif position in ['1B', '2B', '3B', 'SS']: elif position in ["1B", "2B", "3B", "SS"]:
return self._get_infield_range(d20_roll) return self._get_infield_range(d20_roll)
elif position in ['LF', 'CF', 'RF']: elif position in ["LF", "CF", "RF"]:
return self._get_outfield_range(d20_roll) return self._get_outfield_range(d20_roll)
elif position == 'C': elif position == "C":
return self._get_catcher_range(d20_roll) return self._get_catcher_range(d20_roll)
return "Unknown position" return "Unknown position"
def _get_infield_range(self, d20_roll: int) -> str: def _get_infield_range(self, d20_roll: int) -> str:
"""Get infield range result based on d20 roll.""" """Get infield range result based on d20 roll."""
return INFIELD_RANGES.get(d20_roll, 'Unknown') return INFIELD_RANGES.get(d20_roll, "Unknown")
def _get_outfield_range(self, d20_roll: int) -> str: def _get_outfield_range(self, d20_roll: int) -> str:
"""Get outfield range result based on d20 roll.""" """Get outfield range result based on d20 roll."""
return OUTFIELD_RANGES.get(d20_roll, 'Unknown') return OUTFIELD_RANGES.get(d20_roll, "Unknown")
def _get_catcher_range(self, d20_roll: int) -> str: def _get_catcher_range(self, d20_roll: int) -> str:
"""Get catcher range result based on d20 roll.""" """Get catcher range result based on d20 roll."""
return CATCHER_RANGES.get(d20_roll, 'Unknown') return CATCHER_RANGES.get(d20_roll, "Unknown")
def _get_pitcher_range(self, d20_roll: int) -> str: def _get_pitcher_range(self, d20_roll: int) -> str:
"""Get pitcher range result based on d20 roll.""" """Get pitcher range result based on d20 roll."""
return PITCHER_RANGES.get(d20_roll, 'Unknown') return PITCHER_RANGES.get(d20_roll, "Unknown")
def _get_rare_play(self, position: str, d20_total: int) -> str: def _get_rare_play(self, position: str, d20_total: int) -> str:
"""Get the rare play result for a position and d20 total""" """Get the rare play result for a position and d20 total"""
starter = 'Rare play! Take the range result from above and consult the chart below.\n\n' starter = "Rare play! Take the range result from above and consult the chart below.\n\n"
if position == 'P': if position == "P":
return starter + self._get_pitcher_rare_play(d20_total) return starter + self._get_pitcher_rare_play(d20_total)
elif position == '1B': elif position == "1B":
return starter + self._get_infield_rare_play(d20_total) return starter + self._get_infield_rare_play(d20_total)
elif position == '2B': elif position == "2B":
return starter + self._get_infield_rare_play(d20_total) return starter + self._get_infield_rare_play(d20_total)
elif position == '3B': elif position == "3B":
return starter + self._get_infield_rare_play(d20_total) return starter + self._get_infield_rare_play(d20_total)
elif position == 'SS': elif position == "SS":
return starter + self._get_infield_rare_play(d20_total) return starter + self._get_infield_rare_play(d20_total)
elif position in ['LF', 'RF']: elif position in ["LF", "RF"]:
return starter + self._get_outfield_rare_play(d20_total) return starter + self._get_outfield_rare_play(d20_total)
elif position == 'CF': elif position == "CF":
return starter + self._get_outfield_rare_play(d20_total) return starter + self._get_outfield_rare_play(d20_total)
raise ValueError(f'Unknown position: {position}') raise ValueError(f"Unknown position: {position}")
def _get_error_result(self, position: str, d6_total: int) -> str: def _get_error_result(self, position: str, d6_total: int) -> str:
"""Get the error result for a position and 3d6 total.""" """Get the error result for a position and 3d6 total."""
# Get the appropriate error chart # Get the appropriate error chart
if position == 'P': if position == "P":
return self._get_pitcher_error(d6_total) return self._get_pitcher_error(d6_total)
elif position == '1B': elif position == "1B":
return self._get_1b_error(d6_total) return self._get_1b_error(d6_total)
elif position == '2B': elif position == "2B":
return self._get_2b_error(d6_total) return self._get_2b_error(d6_total)
elif position == '3B': elif position == "3B":
return self._get_3b_error(d6_total) return self._get_3b_error(d6_total)
elif position == 'SS': elif position == "SS":
return self._get_ss_error(d6_total) return self._get_ss_error(d6_total)
elif position in ['LF', 'RF']: elif position in ["LF", "RF"]:
return self._get_corner_of_error(d6_total) return self._get_corner_of_error(d6_total)
elif position == 'CF': elif position == "CF":
return self._get_cf_error(d6_total) return self._get_cf_error(d6_total)
elif position == 'C': elif position == "C":
return self._get_catcher_error(d6_total) return self._get_catcher_error(d6_total)
# Should never reach here due to position validation, but follow "Raise or Return" pattern # Should never reach here due to position validation, but follow "Raise or Return" pattern
@ -614,65 +662,64 @@ class DiceRollCommands(commands.Cog):
def _get_3b_error(self, d6_total: int) -> str: def _get_3b_error(self, d6_total: int) -> str:
"""Get 3B error result based on 3d6 total.""" """Get 3B error result based on 3d6 total."""
return THIRD_BASE_ERRORS.get(d6_total, 'No error') return THIRD_BASE_ERRORS.get(d6_total, "No error")
def _get_1b_error(self, d6_total: int) -> str: def _get_1b_error(self, d6_total: int) -> str:
"""Get 1B error result based on 3d6 total.""" """Get 1B error result based on 3d6 total."""
return FIRST_BASE_ERRORS.get(d6_total, 'No error') return FIRST_BASE_ERRORS.get(d6_total, "No error")
def _get_2b_error(self, d6_total: int) -> str: def _get_2b_error(self, d6_total: int) -> str:
"""Get 2B error result based on 3d6 total.""" """Get 2B error result based on 3d6 total."""
return SECOND_BASE_ERRORS.get(d6_total, 'No error') return SECOND_BASE_ERRORS.get(d6_total, "No error")
def _get_ss_error(self, d6_total: int) -> str: def _get_ss_error(self, d6_total: int) -> str:
"""Get SS error result based on 3d6 total.""" """Get SS error result based on 3d6 total."""
return SHORTSTOP_ERRORS.get(d6_total, 'No error') return SHORTSTOP_ERRORS.get(d6_total, "No error")
def _get_corner_of_error(self, d6_total: int) -> str: def _get_corner_of_error(self, d6_total: int) -> str:
"""Get LF/RF error result based on 3d6 total.""" """Get LF/RF error result based on 3d6 total."""
return CORNER_OUTFIELD_ERRORS.get(d6_total, 'No error') return CORNER_OUTFIELD_ERRORS.get(d6_total, "No error")
def _get_cf_error(self, d6_total: int) -> str: def _get_cf_error(self, d6_total: int) -> str:
"""Get CF error result based on 3d6 total.""" """Get CF error result based on 3d6 total."""
return CENTER_FIELD_ERRORS.get(d6_total, 'No error') return CENTER_FIELD_ERRORS.get(d6_total, "No error")
def _get_catcher_error(self, d6_total: int) -> str: def _get_catcher_error(self, d6_total: int) -> str:
"""Get Catcher error result based on 3d6 total.""" """Get Catcher error result based on 3d6 total."""
return CATCHER_ERRORS.get(d6_total, 'No error') return CATCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_error(self, d6_total: int) -> str: def _get_pitcher_error(self, d6_total: int) -> str:
"""Get Pitcher error result based on 3d6 total.""" """Get Pitcher error result based on 3d6 total."""
return PITCHER_ERRORS.get(d6_total, 'No error') return PITCHER_ERRORS.get(d6_total, "No error")
def _get_pitcher_rare_play(self, d20_total: int) -> str: def _get_pitcher_rare_play(self, d20_total: int) -> str:
return ( return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n' f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n' f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n' f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n' f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
) )
def _get_infield_rare_play(self, d20_total: int) -> str: def _get_infield_rare_play(self, d20_total: int) -> str:
return ( return (
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n' f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n' f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n' f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n' f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n' f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
) )
def _get_outfield_rare_play(self, d20_total: int) -> str: def _get_outfield_rare_play(self, d20_total: int) -> str:
return ( return (
f'**F1**: {OUTFIELD_X_CHART["f1"]["rp"]}\n' f"**F1**: {OUTFIELD_X_CHART['f1']['rp']}\n"
f'**F2**: {OUTFIELD_X_CHART["f2"]["rp"]}\n' f"**F2**: {OUTFIELD_X_CHART['f2']['rp']}\n"
f'**F3**: {OUTFIELD_X_CHART["f3"]["rp"]}\n' f"**F3**: {OUTFIELD_X_CHART['f3']['rp']}\n"
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n' f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
f'**DO2**: {OUTFIELD_X_CHART["do2"]["rp"]}\n' f"**DO2**: {OUTFIELD_X_CHART['do2']['rp']}\n"
f'**DO3**: {OUTFIELD_X_CHART["do3"]["rp"]}\n' f"**DO3**: {OUTFIELD_X_CHART['do3']['rp']}\n"
f'**TR3**: {OUTFIELD_X_CHART["tr3"]["rp"]}\n' f"**TR3**: {OUTFIELD_X_CHART['tr3']['rp']}\n"
) )
def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]: def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]:
""" """
Roll scouting dice with weighted first d6 based on card type. Roll scouting dice with weighted first d6 based on card type.
@ -690,11 +737,11 @@ class DiceRollCommands(commands.Cog):
first_roll = random.randint(4, 6) first_roll = random.randint(4, 6)
first_d6_result = DiceRoll( first_d6_result = DiceRoll(
dice_notation='1d6', dice_notation="1d6",
num_dice=1, num_dice=1,
die_sides=6, die_sides=6,
rolls=[first_roll], rolls=[first_roll],
total=first_roll total=first_roll,
) )
# Second roll (2d6) - normal # Second roll (2d6) - normal
@ -705,19 +752,20 @@ class DiceRollCommands(commands.Cog):
return [first_d6_result, second_result, third_result] return [first_d6_result, second_result, third_result]
def _create_multi_roll_embed(self, dice_notation: str, roll_results: list[DiceRoll], user: discord.User | discord.Member, set_author: bool = True, embed_color: int = EmbedColors.PRIMARY) -> discord.Embed: def _create_multi_roll_embed(
self,
dice_notation: str,
roll_results: list[DiceRoll],
user: discord.User | discord.Member,
set_author: bool = True,
embed_color: int = EmbedColors.PRIMARY,
) -> discord.Embed:
"""Create an embed for multiple dice roll results.""" """Create an embed for multiple dice roll results."""
embed = EmbedTemplate.create_base_embed( embed = EmbedTemplate.create_base_embed(title="🎲 Dice Roll", color=embed_color)
title="🎲 Dice Roll",
color=embed_color
)
if set_author: if set_author:
# Set user info # Set user info
embed.set_author( embed.set_author(name=user.name, icon_url=user.display_avatar.url)
name=user.name,
icon_url=user.display_avatar.url
)
# Create summary line with totals # Create summary line with totals
totals = [str(result.total) for result in roll_results] totals = [str(result.total) for result in roll_results]
@ -735,19 +783,16 @@ class DiceRollCommands(commands.Cog):
roll_groups.append(str(rolls[0])) roll_groups.append(str(rolls[0]))
else: else:
# Multiple dice: space-separated within the group # Multiple dice: space-separated within the group
roll_groups.append(' '.join(str(r) for r in rolls)) roll_groups.append(" ".join(str(r) for r in rolls))
details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]" details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]"
# Set as description # Set as description
embed.add_field( embed.add_field(name="Result", value=f"```md\n{summary}\n{details}```")
name='Result',
value=f"```md\n{summary}\n{details}```"
)
return embed return embed
async def setup(bot: commands.Bot): async def setup(bot: commands.Bot):
"""Load the dice roll commands cog.""" """Load the dice roll commands cog."""
await bot.add_cog(DiceRollCommands(bot)) await bot.add_cog(DiceRollCommands(bot))

View File

@ -3,6 +3,7 @@ League service for Discord Bot v2.0
Handles league-wide operations including current state, standings, and season information. Handles league-wide operations including current state, standings, and season information.
""" """
import logging import logging
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
@ -10,25 +11,27 @@ from config import get_config
from services.base_service import BaseService from services.base_service import BaseService
from models.current import Current from models.current import Current
from exceptions import APIException from exceptions import APIException
from utils.decorators import cached_single_item
logger = logging.getLogger(f'{__name__}.LeagueService') logger = logging.getLogger(f"{__name__}.LeagueService")
class LeagueService(BaseService[Current]): class LeagueService(BaseService[Current]):
""" """
Service for league-wide operations. Service for league-wide operations.
Features: Features:
- Current league state retrieval - Current league state retrieval
- Season standings - Season standings
- League-wide statistics - League-wide statistics
""" """
def __init__(self): def __init__(self):
"""Initialize league service.""" """Initialize league service."""
super().__init__(Current, 'current') super().__init__(Current, "current")
logger.debug("LeagueService initialized") logger.debug("LeagueService initialized")
@cached_single_item(ttl=60)
async def get_current_state(self) -> Optional[Current]: async def get_current_state(self) -> Optional[Current]:
""" """
Get the current league state including week, season, and settings. Get the current league state including week, season, and settings.
@ -38,11 +41,13 @@ class LeagueService(BaseService[Current]):
""" """
try: try:
client = await self.get_client() client = await self.get_client()
data = await client.get('current') data = await client.get("current")
if data: if data:
current = Current.from_api_data(data) current = Current.from_api_data(data)
logger.debug(f"Retrieved current state: Week {current.week}, Season {current.season}") logger.debug(
f"Retrieved current state: Week {current.week}, Season {current.season}"
)
return current return current
logger.debug("No current state data found") logger.debug("No current state data found")
@ -53,9 +58,7 @@ class LeagueService(BaseService[Current]):
return None return None
async def update_current_state( async def update_current_state(
self, self, week: Optional[int] = None, freeze: Optional[bool] = None
week: Optional[int] = None,
freeze: Optional[bool] = None
) -> Optional[Current]: ) -> Optional[Current]:
""" """
Update current league state (week and/or freeze status). Update current league state (week and/or freeze status).
@ -77,9 +80,9 @@ class LeagueService(BaseService[Current]):
# Build update data # Build update data
update_data = {} update_data = {}
if week is not None: if week is not None:
update_data['week'] = week update_data["week"] = week
if freeze is not None: if freeze is not None:
update_data['freeze'] = freeze update_data["freeze"] = freeze
if not update_data: if not update_data:
logger.warning("update_current_state called with no updates") logger.warning("update_current_state called with no updates")
@ -89,127 +92,152 @@ class LeagueService(BaseService[Current]):
# (Current table has one row per season, NOT a single row with id=1) # (Current table has one row per season, NOT a single row with id=1)
current = await self.get_current_state() current = await self.get_current_state()
if not current: if not current:
logger.error("Cannot update current state - unable to fetch current state") logger.error(
"Cannot update current state - unable to fetch current state"
)
return None return None
current_id = current.id current_id = current.id
logger.debug(f"Updating current state id={current_id} (season {current.season})") logger.debug(
f"Updating current state id={current_id} (season {current.season})"
)
# Use BaseService patch method # Use BaseService patch method
updated_current = await self.patch(current_id, update_data, use_query_params=True) updated_current = await self.patch(
current_id, update_data, use_query_params=True
)
if updated_current: if updated_current:
logger.info(f"Updated current state id={current_id}: {update_data}") logger.info(f"Updated current state id={current_id}: {update_data}")
return updated_current return updated_current
else: else:
logger.error(f"Failed to update current state id={current_id} - patch returned None") logger.error(
f"Failed to update current state id={current_id} - patch returned None"
)
return None return None
except Exception as e: except Exception as e:
logger.error(f"Error updating current state: {e}") logger.error(f"Error updating current state: {e}")
raise APIException(f"Failed to update current state: {e}") raise APIException(f"Failed to update current state: {e}")
async def get_standings(self, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]: async def get_standings(
self, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
""" """
Get league standings for a season. Get league standings for a season.
Args: Args:
season: Season number (defaults to current season) season: Season number (defaults to current season)
Returns: Returns:
List of standings data or None if not available List of standings data or None if not available
""" """
try: try:
season = season or get_config().sba_season season = season or get_config().sba_season
client = await self.get_client() client = await self.get_client()
data = await client.get('standings', params=[('season', str(season))]) data = await client.get("standings", params=[("season", str(season))])
if data and isinstance(data, list): if data and isinstance(data, list):
logger.debug(f"Retrieved standings for season {season}: {len(data)} teams") logger.debug(
f"Retrieved standings for season {season}: {len(data)} teams"
)
return data return data
elif data and isinstance(data, dict): elif data and isinstance(data, dict):
# Handle case where API returns a dict with standings array # Handle case where API returns a dict with standings array
standings_data = data.get('standings', data.get('items', [])) standings_data = data.get("standings", data.get("items", []))
if standings_data: if standings_data:
logger.debug(f"Retrieved standings for season {season}: {len(standings_data)} teams") logger.debug(
f"Retrieved standings for season {season}: {len(standings_data)} teams"
)
return standings_data return standings_data
logger.debug(f"No standings data found for season {season}") logger.debug(f"No standings data found for season {season}")
return None return None
except Exception as e: except Exception as e:
logger.error(f"Failed to get standings for season {season}: {e}") logger.error(f"Failed to get standings for season {season}: {e}")
return None return None
async def get_division_standings(self, division_id: int, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]: async def get_division_standings(
self, division_id: int, season: Optional[int] = None
) -> Optional[List[Dict[str, Any]]]:
""" """
Get standings for a specific division. Get standings for a specific division.
Args: Args:
division_id: Division identifier division_id: Division identifier
season: Season number (defaults to current season) season: Season number (defaults to current season)
Returns: Returns:
List of division standings or None if not available List of division standings or None if not available
""" """
try: try:
season = season or get_config().sba_season season = season or get_config().sba_season
client = await self.get_client() client = await self.get_client()
data = await client.get(f'standings/division/{division_id}', params=[('season', str(season))]) data = await client.get(
f"standings/division/{division_id}", params=[("season", str(season))]
)
if data and isinstance(data, list): if data and isinstance(data, list):
logger.debug(f"Retrieved division {division_id} standings for season {season}: {len(data)} teams") logger.debug(
f"Retrieved division {division_id} standings for season {season}: {len(data)} teams"
)
return data return data
logger.debug(f"No division standings found for division {division_id}, season {season}") logger.debug(
f"No division standings found for division {division_id}, season {season}"
)
return None return None
except Exception as e: except Exception as e:
logger.error(f"Failed to get division {division_id} standings: {e}") logger.error(f"Failed to get division {division_id} standings: {e}")
return None return None
async def get_league_leaders(self, stat_type: str = 'batting', season: Optional[int] = None, limit: int = 10) -> Optional[List[Dict[str, Any]]]: async def get_league_leaders(
self, stat_type: str = "batting", season: Optional[int] = None, limit: int = 10
) -> Optional[List[Dict[str, Any]]]:
""" """
Get league leaders for a specific statistic category. Get league leaders for a specific statistic category.
Args: Args:
stat_type: Type of stats ('batting', 'pitching', 'fielding') stat_type: Type of stats ('batting', 'pitching', 'fielding')
season: Season number (defaults to current season) season: Season number (defaults to current season)
limit: Number of leaders to return limit: Number of leaders to return
Returns: Returns:
List of league leaders or None if not available List of league leaders or None if not available
""" """
try: try:
season = season or get_config().sba_season season = season or get_config().sba_season
client = await self.get_client() client = await self.get_client()
params = [ params = [("season", str(season)), ("limit", str(limit))]
('season', str(season)),
('limit', str(limit)) data = await client.get(f"leaders/{stat_type}", params=params)
]
data = await client.get(f'leaders/{stat_type}', params=params)
if data: if data:
# Handle different response formats # Handle different response formats
if isinstance(data, list): if isinstance(data, list):
leaders = data leaders = data
elif isinstance(data, dict): elif isinstance(data, dict):
leaders = data.get('leaders', data.get('items', data.get('results', []))) leaders = data.get(
"leaders", data.get("items", data.get("results", []))
)
else: else:
leaders = [] leaders = []
logger.debug(f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players") logger.debug(
f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players"
)
return leaders[:limit] # Ensure we don't exceed limit return leaders[:limit] # Ensure we don't exceed limit
logger.debug(f"No {stat_type} leaders found for season {season}") logger.debug(f"No {stat_type} leaders found for season {season}")
return None return None
except Exception as e: except Exception as e:
logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}") logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}")
return None return None
# Global service instance # Global service instance
league_service = LeagueService() league_service = LeagueService()

View File

@ -11,6 +11,7 @@ from config import get_config
from services.base_service import BaseService from services.base_service import BaseService
from models.player import Player from models.player import Player
from exceptions import APIException from exceptions import APIException
from utils.decorators import cached_api_call
if TYPE_CHECKING: if TYPE_CHECKING:
from services.team_service import TeamService from services.team_service import TeamService
@ -270,6 +271,7 @@ class PlayerService(BaseService[Player]):
logger.error(f"Error in fuzzy search for '{query}': {e}") logger.error(f"Error in fuzzy search for '{query}': {e}")
return [] return []
@cached_api_call(ttl=300)
async def get_free_agents(self, season: int) -> List[Player]: async def get_free_agents(self, season: int) -> List[Player]:
""" """
Get all free agent players. Get all free agent players.
@ -372,10 +374,7 @@ class PlayerService(BaseService[Player]):
return None return None
async def update_player_team( async def update_player_team(
self, self, player_id: int, new_team_id: int, dem_week: Optional[int] = None
player_id: int,
new_team_id: int,
dem_week: Optional[int] = None
) -> Optional[Player]: ) -> Optional[Player]:
""" """
Update a player's team assignment. Update a player's team assignment.

View File

@ -5,7 +5,8 @@ Handles team standings retrieval and processing.
""" """
import logging import logging
from typing import Optional, List, Dict import time
from typing import Optional, List, Dict, Tuple
from api.client import get_global_client from api.client import get_global_client
from models.standings import TeamStandings from models.standings import TeamStandings
@ -13,6 +14,10 @@ from exceptions import APIException
logger = logging.getLogger(f"{__name__}.StandingsService") logger = logging.getLogger(f"{__name__}.StandingsService")
# In-memory cache for standings: season -> (standings_list, cached_at)
_standings_cache: Dict[int, Tuple[List[TeamStandings], float]] = {}
_STANDINGS_CACHE_TTL = 600 # 10 minutes
class StandingsService: class StandingsService:
""" """
@ -45,6 +50,13 @@ class StandingsService:
List of TeamStandings ordered by record List of TeamStandings ordered by record
""" """
try: try:
# Check in-memory cache first
if season in _standings_cache:
cached_standings, cached_at = _standings_cache[season]
if time.time() - cached_at < _STANDINGS_CACHE_TTL:
logger.debug(f"Cache hit for standings season {season}")
return cached_standings
client = await self.get_client() client = await self.get_client()
params = [("season", str(season))] params = [("season", str(season))]
@ -72,6 +84,10 @@ class StandingsService:
logger.info( logger.info(
f"Retrieved standings for {len(standings)} teams in season {season}" f"Retrieved standings for {len(standings)} teams in season {season}"
) )
# Cache the result
_standings_cache[season] = (standings, time.time())
return standings return standings
except Exception as e: except Exception as e: