perf: add caching for frequently-accessed stable data (#91)
Closes #91 - league_service.get_current_state(): @cached_single_item(ttl=60) — 60s Redis cache - standings_service.get_league_standings(): in-memory dict cache with 10-minute TTL keyed by season - player_service.get_free_agents(): @cached_api_call(ttl=300) — 5-minute Redis cache - dice/rolls.py _get_channel_embed_color(): in-memory dict cache keyed by channel_id with 5-minute TTL, matching the autocomplete.py pattern from PR #100 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
8e02889fd4
commit
61f36353d8
@ -3,7 +3,10 @@ Dice Rolling Commands
|
||||
|
||||
Implements slash commands for dice rolling functionality required for gameplay.
|
||||
"""
|
||||
|
||||
import random
|
||||
import time
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
@ -13,7 +16,11 @@ from utils.logging import get_contextual_logger
|
||||
from utils.decorators import logged_command
|
||||
from utils.team_utils import get_user_major_league_team
|
||||
from utils.text_utils import split_text_for_fields
|
||||
from utils.dice_utils import DiceRoll, parse_and_roll_multiple_dice, parse_and_roll_single_dice
|
||||
from utils.dice_utils import (
|
||||
DiceRoll,
|
||||
parse_and_roll_multiple_dice,
|
||||
parse_and_roll_single_dice,
|
||||
)
|
||||
from views.embeds import EmbedColors, EmbedTemplate
|
||||
from commands.dev.loaded_dice import get_and_consume_loaded_roll
|
||||
from .chart_data import (
|
||||
@ -33,26 +40,27 @@ from .chart_data import (
|
||||
PITCHER_ERRORS,
|
||||
)
|
||||
|
||||
# In-memory cache for channel embed color lookups: channel_id -> (color, cached_at)
|
||||
_channel_color_cache: Dict[int, Tuple[int, float]] = {}
|
||||
_CHANNEL_COLOR_CACHE_TTL = 300 # 5 minutes
|
||||
|
||||
|
||||
class DiceRollCommands(commands.Cog):
|
||||
"""Dice rolling command handlers for gameplay."""
|
||||
|
||||
def __init__(self, bot: commands.Bot):
|
||||
self.bot = bot
|
||||
self.logger = get_contextual_logger(f'{__name__}.DiceRollCommands')
|
||||
self.logger = get_contextual_logger(f"{__name__}.DiceRollCommands")
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="roll",
|
||||
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)"
|
||||
description="Roll polyhedral dice using XdY notation (e.g., 2d6, 1d20, 3d8)",
|
||||
)
|
||||
@discord.app_commands.describe(
|
||||
dice="Dice notation - single or multiple separated by semicolon (e.g., 2d6, 1d20;2d6;1d6)"
|
||||
)
|
||||
@logged_command("/roll")
|
||||
async def roll_dice(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
dice: str
|
||||
):
|
||||
async def roll_dice(self, interaction: discord.Interaction, dice: str):
|
||||
"""Roll dice using standard XdY dice notation. Supports multiple rolls separated by semicolon."""
|
||||
await interaction.response.defer()
|
||||
|
||||
@ -61,7 +69,7 @@ class DiceRollCommands(commands.Cog):
|
||||
if not roll_results:
|
||||
await interaction.followup.send(
|
||||
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20",
|
||||
ephemeral=True
|
||||
ephemeral=True,
|
||||
)
|
||||
return
|
||||
|
||||
@ -72,18 +80,24 @@ class DiceRollCommands(commands.Cog):
|
||||
@commands.command(name="roll", aliases=["r", "dice"])
|
||||
async def roll_dice_prefix(self, ctx: commands.Context, *, dice: str | None = None):
|
||||
"""Roll dice using prefix commands (!roll, !r, !dice)."""
|
||||
self.logger.info(f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice)
|
||||
self.logger.info(
|
||||
f"Prefix roll command started by {ctx.author.display_name}", dice_input=dice
|
||||
)
|
||||
|
||||
if dice is None:
|
||||
self.logger.debug("No dice input provided")
|
||||
await ctx.send("❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`")
|
||||
await ctx.send(
|
||||
"❌ Please provide dice notation. Usage: `!roll 2d6` or `!roll 1d6;2d6;1d20`"
|
||||
)
|
||||
return
|
||||
|
||||
# Parse and validate dice notation (supports multiple rolls)
|
||||
roll_results = parse_and_roll_multiple_dice(dice)
|
||||
if not roll_results:
|
||||
self.logger.warning("Invalid dice notation provided", dice_input=dice)
|
||||
await ctx.send("❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20")
|
||||
await ctx.send(
|
||||
"❌ Invalid dice notation. Use format like: 2d6, 1d20, or 1d6;2d6;1d20"
|
||||
)
|
||||
return
|
||||
|
||||
self.logger.info(f"Dice rolled successfully", roll_count=len(roll_results))
|
||||
@ -92,10 +106,7 @@ class DiceRollCommands(commands.Cog):
|
||||
embed = self._create_multi_roll_embed(dice, roll_results, ctx.author)
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="d20",
|
||||
description="Roll a single d20"
|
||||
)
|
||||
@discord.app_commands.command(name="d20", description="Roll a single d20")
|
||||
@logged_command("/d20")
|
||||
async def d20_dice(self, interaction: discord.Interaction):
|
||||
"""Roll a single d20."""
|
||||
@ -112,15 +123,14 @@ class DiceRollCommands(commands.Cog):
|
||||
roll_results,
|
||||
interaction.user,
|
||||
set_author=False,
|
||||
embed_color=embed_color
|
||||
embed_color=embed_color,
|
||||
)
|
||||
embed.title = f'd20 roll for {interaction.user.display_name}'
|
||||
embed.title = f"d20 roll for {interaction.user.display_name}"
|
||||
|
||||
await interaction.followup.send(embed=embed)
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="ab",
|
||||
description="Roll baseball at-bat dice (1d6;2d6;1d20)"
|
||||
name="ab", description="Roll baseball at-bat dice (1d6;2d6;1d20)"
|
||||
)
|
||||
@logged_command("/ab")
|
||||
async def ab_dice(self, interaction: discord.Interaction):
|
||||
@ -137,7 +147,7 @@ class DiceRollCommands(commands.Cog):
|
||||
# Create DiceRoll objects from loaded values
|
||||
# For 2d6, we split the total into two dice (arbitrary split that sums correctly)
|
||||
d6_2a = min(loaded.d6_2_total - 1, 6) # First die (max 6)
|
||||
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
|
||||
d6_2b = loaded.d6_2_total - d6_2a # Second die gets remainder
|
||||
roll_results = [
|
||||
DiceRoll("1d6", 1, 6, [loaded.d6_1], loaded.d6_1),
|
||||
DiceRoll("2d6", 2, 6, [d6_2a, d6_2b], loaded.d6_2_total),
|
||||
@ -149,17 +159,19 @@ class DiceRollCommands(commands.Cog):
|
||||
dice_notation = "1d6;2d6;1d20"
|
||||
roll_results = parse_and_roll_multiple_dice(dice_notation)
|
||||
|
||||
injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12])
|
||||
injury_risk = (roll_results[0].total == 6) and (
|
||||
roll_results[1].total in [7, 8, 9, 10, 11, 12]
|
||||
)
|
||||
d6_total = roll_results[1].total
|
||||
|
||||
embed_title = 'At bat roll'
|
||||
embed_title = "At bat roll"
|
||||
if roll_results[2].total == 1:
|
||||
embed_title = 'Wild pitch roll'
|
||||
dice_notation = '1d20'
|
||||
embed_title = "Wild pitch roll"
|
||||
dice_notation = "1d20"
|
||||
roll_results = [parse_and_roll_single_dice(dice_notation)]
|
||||
elif roll_results[2].total == 2:
|
||||
embed_title = 'PB roll'
|
||||
dice_notation = '1d20'
|
||||
embed_title = "PB roll"
|
||||
dice_notation = "1d20"
|
||||
roll_results = [parse_and_roll_single_dice(dice_notation)]
|
||||
|
||||
# Create embed for the roll results
|
||||
@ -168,15 +180,15 @@ class DiceRollCommands(commands.Cog):
|
||||
roll_results,
|
||||
interaction.user,
|
||||
set_author=False,
|
||||
embed_color=embed_color
|
||||
embed_color=embed_color,
|
||||
)
|
||||
embed.title = f'{embed_title} for {interaction.user.display_name}'
|
||||
embed.title = f"{embed_title} for {interaction.user.display_name}"
|
||||
|
||||
if injury_risk and embed_title == 'At bat roll':
|
||||
if injury_risk and embed_title == "At bat roll":
|
||||
embed.add_field(
|
||||
name=f'Check injury for pitcher injury rating {13 - d6_total}',
|
||||
value='Oops! All injuries!',
|
||||
inline=False
|
||||
name=f"Check injury for pitcher injury rating {13 - d6_total}",
|
||||
value="Oops! All injuries!",
|
||||
inline=False,
|
||||
)
|
||||
|
||||
await interaction.followup.send(embed=embed)
|
||||
@ -188,35 +200,43 @@ class DiceRollCommands(commands.Cog):
|
||||
team = await get_user_major_league_team(user_id=ctx.author.id)
|
||||
embed_color = EmbedColors.PRIMARY
|
||||
if team is not None and team.color is not None:
|
||||
embed_color = int(team.color,16)
|
||||
embed_color = int(team.color, 16)
|
||||
|
||||
# Use the standard baseball dice combination
|
||||
dice_notation = "1d6;2d6;1d20"
|
||||
roll_results = parse_and_roll_multiple_dice(dice_notation)
|
||||
|
||||
self.logger.info("At Bat dice rolled successfully", roll_count=len(roll_results))
|
||||
self.logger.info(
|
||||
"At Bat dice rolled successfully", roll_count=len(roll_results)
|
||||
)
|
||||
|
||||
# Create embed for the roll results
|
||||
embed = self._create_multi_roll_embed(dice_notation, roll_results, ctx.author, set_author=False, embed_color=embed_color)
|
||||
embed.title = f'At bat roll for {ctx.author.display_name}'
|
||||
embed = self._create_multi_roll_embed(
|
||||
dice_notation,
|
||||
roll_results,
|
||||
ctx.author,
|
||||
set_author=False,
|
||||
embed_color=embed_color,
|
||||
)
|
||||
embed.title = f"At bat roll for {ctx.author.display_name}"
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="scout",
|
||||
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type"
|
||||
description="Roll weighted scouting dice (1d6;2d6;1d20) based on card type",
|
||||
)
|
||||
@discord.app_commands.describe(
|
||||
card_type="Type of card being scouted"
|
||||
@discord.app_commands.describe(card_type="Type of card being scouted")
|
||||
@discord.app_commands.choices(
|
||||
card_type=[
|
||||
discord.app_commands.Choice(name="Batter", value="batter"),
|
||||
discord.app_commands.Choice(name="Pitcher", value="pitcher"),
|
||||
]
|
||||
)
|
||||
@discord.app_commands.choices(card_type=[
|
||||
discord.app_commands.Choice(name="Batter", value="batter"),
|
||||
discord.app_commands.Choice(name="Pitcher", value="pitcher")
|
||||
])
|
||||
@logged_command("/scout")
|
||||
async def scout_dice(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
card_type: discord.app_commands.Choice[str]
|
||||
card_type: discord.app_commands.Choice[str],
|
||||
):
|
||||
"""Roll weighted scouting dice based on card type (batter or pitcher)."""
|
||||
await interaction.response.defer()
|
||||
@ -228,33 +248,35 @@ class DiceRollCommands(commands.Cog):
|
||||
roll_results = self._roll_weighted_scout_dice(card_type_value)
|
||||
|
||||
# Create embed for the roll results
|
||||
embed = self._create_multi_roll_embed("1d6;2d6;1d20", roll_results, interaction.user, set_author=False)
|
||||
embed.title = f'Scouting roll for {interaction.user.display_name}'
|
||||
embed = self._create_multi_roll_embed(
|
||||
"1d6;2d6;1d20", roll_results, interaction.user, set_author=False
|
||||
)
|
||||
embed.title = f"Scouting roll for {interaction.user.display_name}"
|
||||
await interaction.followup.send(embed=embed)
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="fielding",
|
||||
description="Roll Super Advanced fielding dice for a defensive position"
|
||||
description="Roll Super Advanced fielding dice for a defensive position",
|
||||
)
|
||||
@discord.app_commands.describe(
|
||||
position="Defensive position"
|
||||
@discord.app_commands.describe(position="Defensive position")
|
||||
@discord.app_commands.choices(
|
||||
position=[
|
||||
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
|
||||
discord.app_commands.Choice(name="Catcher (C)", value="C"),
|
||||
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
|
||||
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
|
||||
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
|
||||
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
|
||||
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
|
||||
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
|
||||
discord.app_commands.Choice(name="Right Field (RF)", value="RF"),
|
||||
]
|
||||
)
|
||||
@discord.app_commands.choices(position=[
|
||||
discord.app_commands.Choice(name="Pitcher (P)", value="P"),
|
||||
discord.app_commands.Choice(name="Catcher (C)", value="C"),
|
||||
discord.app_commands.Choice(name="First Base (1B)", value="1B"),
|
||||
discord.app_commands.Choice(name="Second Base (2B)", value="2B"),
|
||||
discord.app_commands.Choice(name="Third Base (3B)", value="3B"),
|
||||
discord.app_commands.Choice(name="Shortstop (SS)", value="SS"),
|
||||
discord.app_commands.Choice(name="Left Field (LF)", value="LF"),
|
||||
discord.app_commands.Choice(name="Center Field (CF)", value="CF"),
|
||||
discord.app_commands.Choice(name="Right Field (RF)", value="RF")
|
||||
])
|
||||
@logged_command("/fielding")
|
||||
async def fielding_roll(
|
||||
self,
|
||||
interaction: discord.Interaction,
|
||||
position: discord.app_commands.Choice[str]
|
||||
position: discord.app_commands.Choice[str],
|
||||
):
|
||||
"""Roll Super Advanced fielding dice for a defensive position."""
|
||||
await interaction.response.defer()
|
||||
@ -268,16 +290,25 @@ class DiceRollCommands(commands.Cog):
|
||||
roll_results = parse_and_roll_multiple_dice(dice_notation)
|
||||
|
||||
# Create fielding embed
|
||||
embed = self._create_fielding_embed(pos_value, roll_results, interaction.user, embed_color)
|
||||
embed = self._create_fielding_embed(
|
||||
pos_value, roll_results, interaction.user, embed_color
|
||||
)
|
||||
await interaction.followup.send(embed=embed)
|
||||
|
||||
@commands.command(name="f", aliases=["fielding", "saf"])
|
||||
async def fielding_roll_prefix(self, ctx: commands.Context, position: str | None = None):
|
||||
async def fielding_roll_prefix(
|
||||
self, ctx: commands.Context, position: str | None = None
|
||||
):
|
||||
"""Roll Super Advanced fielding dice using prefix commands (!f, !fielding, !saf)."""
|
||||
self.logger.info(f"SA Fielding command started by {ctx.author.display_name}", position=position)
|
||||
self.logger.info(
|
||||
f"SA Fielding command started by {ctx.author.display_name}",
|
||||
position=position,
|
||||
)
|
||||
|
||||
if position is None:
|
||||
await ctx.send("❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`")
|
||||
await ctx.send(
|
||||
"❌ Please specify a position. Usage: `!f 3B` or `!fielding SS`"
|
||||
)
|
||||
return
|
||||
|
||||
# Parse and validate position
|
||||
@ -290,15 +321,19 @@ class DiceRollCommands(commands.Cog):
|
||||
dice_notation = "1d20;3d6;1d100"
|
||||
roll_results = parse_and_roll_multiple_dice(dice_notation)
|
||||
|
||||
self.logger.info("SA Fielding dice rolled successfully", position=parsed_position, d20=roll_results[0].total, d6_total=roll_results[1].total)
|
||||
self.logger.info(
|
||||
"SA Fielding dice rolled successfully",
|
||||
position=parsed_position,
|
||||
d20=roll_results[0].total,
|
||||
d6_total=roll_results[1].total,
|
||||
)
|
||||
|
||||
# Create fielding embed
|
||||
embed = self._create_fielding_embed(parsed_position, roll_results, ctx.author)
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
@discord.app_commands.command(
|
||||
name="jump",
|
||||
description="Roll for baserunner's jump before stealing"
|
||||
name="jump", description="Roll for baserunner's jump before stealing"
|
||||
)
|
||||
@logged_command("/jump")
|
||||
async def jump_dice(self, interaction: discord.Interaction):
|
||||
@ -322,7 +357,7 @@ class DiceRollCommands(commands.Cog):
|
||||
resolution_roll,
|
||||
interaction.user,
|
||||
embed_color,
|
||||
show_author=False
|
||||
show_author=False,
|
||||
)
|
||||
await interaction.followup.send(embed=embed)
|
||||
|
||||
@ -344,31 +379,47 @@ class DiceRollCommands(commands.Cog):
|
||||
# Roll another 1d20 for pickoff/balk resolution
|
||||
resolution_roll = random.randint(1, 20)
|
||||
|
||||
self.logger.info("Jump dice rolled successfully", check=check_roll, jump=jump_result.total if jump_result else None, resolution=resolution_roll)
|
||||
self.logger.info(
|
||||
"Jump dice rolled successfully",
|
||||
check=check_roll,
|
||||
jump=jump_result.total if jump_result else None,
|
||||
resolution=resolution_roll,
|
||||
)
|
||||
|
||||
# Create embed based on check roll
|
||||
embed = self._create_jump_embed(
|
||||
check_roll,
|
||||
jump_result,
|
||||
resolution_roll,
|
||||
ctx.author,
|
||||
embed_color
|
||||
check_roll, jump_result, resolution_roll, ctx.author, embed_color
|
||||
)
|
||||
await ctx.send(embed=embed)
|
||||
|
||||
async def _get_channel_embed_color(self, interaction: discord.Interaction) -> int:
|
||||
channel_id = interaction.channel_id
|
||||
if channel_id is not None and channel_id in _channel_color_cache:
|
||||
cached_color, cached_at = _channel_color_cache[channel_id]
|
||||
if time.time() - cached_at < _CHANNEL_COLOR_CACHE_TTL:
|
||||
return cached_color
|
||||
|
||||
# Check if channel is a type that has a name attribute (DMChannel doesn't have one)
|
||||
if isinstance(interaction.channel, (discord.TextChannel, discord.VoiceChannel, discord.Thread)):
|
||||
if isinstance(
|
||||
interaction.channel,
|
||||
(discord.TextChannel, discord.VoiceChannel, discord.Thread),
|
||||
):
|
||||
channel_starter = interaction.channel.name[:6]
|
||||
if '-' in channel_starter:
|
||||
abbrev = channel_starter.split('-')[0]
|
||||
if "-" in channel_starter:
|
||||
abbrev = channel_starter.split("-")[0]
|
||||
channel_team = await team_service.get_team_by_abbrev(abbrev)
|
||||
if channel_team is not None and channel_team.color is not None:
|
||||
return int(channel_team.color,16)
|
||||
color = int(channel_team.color, 16)
|
||||
if channel_id is not None:
|
||||
_channel_color_cache[channel_id] = (color, time.time())
|
||||
return color
|
||||
|
||||
team = await get_user_major_league_team(user_id=interaction.user.id)
|
||||
if team is not None and team.color is not None:
|
||||
return int(team.color,16)
|
||||
color = int(team.color, 16)
|
||||
if channel_id is not None:
|
||||
_channel_color_cache[channel_id] = (color, time.time())
|
||||
return color
|
||||
|
||||
return EmbedColors.PRIMARY
|
||||
|
||||
@ -381,25 +432,44 @@ class DiceRollCommands(commands.Cog):
|
||||
|
||||
# Map common inputs to standard position names
|
||||
position_map = {
|
||||
'P': 'P', 'PITCHER': 'P',
|
||||
'C': 'C', 'CATCHER': 'C',
|
||||
'1': '1B', '1B': '1B', 'FIRST': '1B', 'FIRSTBASE': '1B',
|
||||
'2': '2B', '2B': '2B', 'SECOND': '2B', 'SECONDBASE': '2B',
|
||||
'3': '3B', '3B': '3B', 'THIRD': '3B', 'THIRDBASE': '3B',
|
||||
'SS': 'SS', 'SHORT': 'SS', 'SHORTSTOP': 'SS',
|
||||
'LF': 'LF', 'LEFT': 'LF', 'LEFTFIELD': 'LF',
|
||||
'CF': 'CF', 'CENTER': 'CF', 'CENTERFIELD': 'CF',
|
||||
'RF': 'RF', 'RIGHT': 'RF', 'RIGHTFIELD': 'RF'
|
||||
"P": "P",
|
||||
"PITCHER": "P",
|
||||
"C": "C",
|
||||
"CATCHER": "C",
|
||||
"1": "1B",
|
||||
"1B": "1B",
|
||||
"FIRST": "1B",
|
||||
"FIRSTBASE": "1B",
|
||||
"2": "2B",
|
||||
"2B": "2B",
|
||||
"SECOND": "2B",
|
||||
"SECONDBASE": "2B",
|
||||
"3": "3B",
|
||||
"3B": "3B",
|
||||
"THIRD": "3B",
|
||||
"THIRDBASE": "3B",
|
||||
"SS": "SS",
|
||||
"SHORT": "SS",
|
||||
"SHORTSTOP": "SS",
|
||||
"LF": "LF",
|
||||
"LEFT": "LF",
|
||||
"LEFTFIELD": "LF",
|
||||
"CF": "CF",
|
||||
"CENTER": "CF",
|
||||
"CENTERFIELD": "CF",
|
||||
"RF": "RF",
|
||||
"RIGHT": "RF",
|
||||
"RIGHTFIELD": "RF",
|
||||
}
|
||||
|
||||
return position_map.get(pos)
|
||||
|
||||
def _create_fielding_embed(
|
||||
self,
|
||||
position: str,
|
||||
roll_results: list[DiceRoll],
|
||||
user: discord.User | discord.Member,
|
||||
embed_color: int = EmbedColors.PRIMARY
|
||||
self,
|
||||
position: str,
|
||||
roll_results: list[DiceRoll],
|
||||
user: discord.User | discord.Member,
|
||||
embed_color: int = EmbedColors.PRIMARY,
|
||||
) -> discord.Embed:
|
||||
"""Create an embed for fielding roll results."""
|
||||
d20_result = roll_results[0].total
|
||||
@ -409,15 +479,11 @@ class DiceRollCommands(commands.Cog):
|
||||
|
||||
# Create base embed
|
||||
embed = EmbedTemplate.create_base_embed(
|
||||
title=f"SA Fielding roll for {user.display_name}",
|
||||
color=embed_color
|
||||
title=f"SA Fielding roll for {user.display_name}", color=embed_color
|
||||
)
|
||||
|
||||
# Set user info
|
||||
embed.set_author(
|
||||
name=user.display_name,
|
||||
icon_url=user.display_avatar.url
|
||||
)
|
||||
embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
|
||||
|
||||
# Add dice results in standard format
|
||||
dice_notation = "1d20;3d6"
|
||||
@ -425,18 +491,14 @@ class DiceRollCommands(commands.Cog):
|
||||
|
||||
# Extract just the dice result part from the field
|
||||
dice_field_value = embed_dice.fields[0].value
|
||||
embed.add_field(
|
||||
name="Dice Results",
|
||||
value=dice_field_value,
|
||||
inline=False
|
||||
)
|
||||
embed.add_field(name="Dice Results", value=dice_field_value, inline=False)
|
||||
|
||||
# Add fielding check summary
|
||||
range_result = self._get_range_result(position, d20_result)
|
||||
embed.add_field(
|
||||
name=f"{position} Range Result",
|
||||
value=f"```md\n 1 | 2 | 3 | 4 | 5\n{range_result}```",
|
||||
inline=False
|
||||
inline=False,
|
||||
)
|
||||
|
||||
# Add rare play or error result
|
||||
@ -457,19 +519,15 @@ class DiceRollCommands(commands.Cog):
|
||||
field_name = base_field_name
|
||||
# Add part indicator if multiple chunks
|
||||
if len(result_chunks) > 1:
|
||||
field_name += f" (Part {i+1}/{len(result_chunks)})"
|
||||
field_name += f" (Part {i + 1}/{len(result_chunks)})"
|
||||
|
||||
embed.add_field(
|
||||
name=field_name,
|
||||
value=chunk,
|
||||
inline=False
|
||||
)
|
||||
embed.add_field(name=field_name, value=chunk, inline=False)
|
||||
|
||||
# Add help commands
|
||||
embed.add_field(
|
||||
name="Help Commands",
|
||||
value="Run /charts for full chart readout",
|
||||
inline=False
|
||||
inline=False,
|
||||
)
|
||||
|
||||
# # Add references
|
||||
@ -488,125 +546,115 @@ class DiceRollCommands(commands.Cog):
|
||||
resolution_roll: int,
|
||||
user: discord.User | discord.Member,
|
||||
embed_color: int = EmbedColors.PRIMARY,
|
||||
show_author: bool = True
|
||||
show_author: bool = True,
|
||||
) -> discord.Embed:
|
||||
"""Create an embed for jump roll results."""
|
||||
# Create base embed
|
||||
embed = EmbedTemplate.create_base_embed(
|
||||
title=f"Jump roll for {user.name}",
|
||||
color=embed_color
|
||||
title=f"Jump roll for {user.name}", color=embed_color
|
||||
)
|
||||
|
||||
if show_author:
|
||||
# Set user info
|
||||
embed.set_author(
|
||||
name=user.name,
|
||||
icon_url=user.display_avatar.url
|
||||
)
|
||||
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
|
||||
|
||||
# Check for pickoff or balk
|
||||
if check_roll == 1:
|
||||
# Pickoff attempt
|
||||
embed.add_field(
|
||||
name="Special",
|
||||
value="```md\nCheck pickoff```",
|
||||
inline=False
|
||||
name="Special", value="```md\nCheck pickoff```", inline=False
|
||||
)
|
||||
embed.add_field(
|
||||
name="Pickoff roll",
|
||||
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
|
||||
inline=False
|
||||
inline=False,
|
||||
)
|
||||
elif check_roll == 2:
|
||||
# Balk
|
||||
embed.add_field(
|
||||
name="Special",
|
||||
value="```md\nCheck balk```",
|
||||
inline=False
|
||||
)
|
||||
embed.add_field(name="Special", value="```md\nCheck balk```", inline=False)
|
||||
embed.add_field(
|
||||
name="Balk roll",
|
||||
value=f"```md\n# {resolution_roll}\nDetails:[1d20 ({resolution_roll})]```",
|
||||
inline=False
|
||||
inline=False,
|
||||
)
|
||||
else:
|
||||
# Normal jump - show 2d6 result
|
||||
if jump_result:
|
||||
rolls_str = ' '.join(str(r) for r in jump_result.rolls)
|
||||
rolls_str = " ".join(str(r) for r in jump_result.rolls)
|
||||
embed.add_field(
|
||||
name="Result",
|
||||
value=f"```md\n# {jump_result.total}\nDetails:[2d6 ({rolls_str})]```",
|
||||
inline=False
|
||||
inline=False,
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def _get_range_result(self, position: str, d20_roll: int) -> str:
|
||||
"""Get the range result display for a position and d20 roll."""
|
||||
if position == 'P':
|
||||
if position == "P":
|
||||
return self._get_pitcher_range(d20_roll)
|
||||
elif position in ['1B', '2B', '3B', 'SS']:
|
||||
elif position in ["1B", "2B", "3B", "SS"]:
|
||||
return self._get_infield_range(d20_roll)
|
||||
elif position in ['LF', 'CF', 'RF']:
|
||||
elif position in ["LF", "CF", "RF"]:
|
||||
return self._get_outfield_range(d20_roll)
|
||||
elif position == 'C':
|
||||
elif position == "C":
|
||||
return self._get_catcher_range(d20_roll)
|
||||
return "Unknown position"
|
||||
|
||||
def _get_infield_range(self, d20_roll: int) -> str:
|
||||
"""Get infield range result based on d20 roll."""
|
||||
return INFIELD_RANGES.get(d20_roll, 'Unknown')
|
||||
return INFIELD_RANGES.get(d20_roll, "Unknown")
|
||||
|
||||
def _get_outfield_range(self, d20_roll: int) -> str:
|
||||
"""Get outfield range result based on d20 roll."""
|
||||
return OUTFIELD_RANGES.get(d20_roll, 'Unknown')
|
||||
return OUTFIELD_RANGES.get(d20_roll, "Unknown")
|
||||
|
||||
def _get_catcher_range(self, d20_roll: int) -> str:
|
||||
"""Get catcher range result based on d20 roll."""
|
||||
return CATCHER_RANGES.get(d20_roll, 'Unknown')
|
||||
return CATCHER_RANGES.get(d20_roll, "Unknown")
|
||||
|
||||
def _get_pitcher_range(self, d20_roll: int) -> str:
|
||||
"""Get pitcher range result based on d20 roll."""
|
||||
return PITCHER_RANGES.get(d20_roll, 'Unknown')
|
||||
|
||||
return PITCHER_RANGES.get(d20_roll, "Unknown")
|
||||
|
||||
def _get_rare_play(self, position: str, d20_total: int) -> str:
|
||||
"""Get the rare play result for a position and d20 total"""
|
||||
starter = 'Rare play! Take the range result from above and consult the chart below.\n\n'
|
||||
if position == 'P':
|
||||
starter = "Rare play! Take the range result from above and consult the chart below.\n\n"
|
||||
if position == "P":
|
||||
return starter + self._get_pitcher_rare_play(d20_total)
|
||||
elif position == '1B':
|
||||
elif position == "1B":
|
||||
return starter + self._get_infield_rare_play(d20_total)
|
||||
elif position == '2B':
|
||||
elif position == "2B":
|
||||
return starter + self._get_infield_rare_play(d20_total)
|
||||
elif position == '3B':
|
||||
elif position == "3B":
|
||||
return starter + self._get_infield_rare_play(d20_total)
|
||||
elif position == 'SS':
|
||||
elif position == "SS":
|
||||
return starter + self._get_infield_rare_play(d20_total)
|
||||
elif position in ['LF', 'RF']:
|
||||
elif position in ["LF", "RF"]:
|
||||
return starter + self._get_outfield_rare_play(d20_total)
|
||||
elif position == 'CF':
|
||||
elif position == "CF":
|
||||
return starter + self._get_outfield_rare_play(d20_total)
|
||||
|
||||
raise ValueError(f'Unknown position: {position}')
|
||||
|
||||
raise ValueError(f"Unknown position: {position}")
|
||||
|
||||
def _get_error_result(self, position: str, d6_total: int) -> str:
|
||||
"""Get the error result for a position and 3d6 total."""
|
||||
# Get the appropriate error chart
|
||||
if position == 'P':
|
||||
if position == "P":
|
||||
return self._get_pitcher_error(d6_total)
|
||||
elif position == '1B':
|
||||
elif position == "1B":
|
||||
return self._get_1b_error(d6_total)
|
||||
elif position == '2B':
|
||||
elif position == "2B":
|
||||
return self._get_2b_error(d6_total)
|
||||
elif position == '3B':
|
||||
elif position == "3B":
|
||||
return self._get_3b_error(d6_total)
|
||||
elif position == 'SS':
|
||||
elif position == "SS":
|
||||
return self._get_ss_error(d6_total)
|
||||
elif position in ['LF', 'RF']:
|
||||
elif position in ["LF", "RF"]:
|
||||
return self._get_corner_of_error(d6_total)
|
||||
elif position == 'CF':
|
||||
elif position == "CF":
|
||||
return self._get_cf_error(d6_total)
|
||||
elif position == 'C':
|
||||
elif position == "C":
|
||||
return self._get_catcher_error(d6_total)
|
||||
|
||||
# Should never reach here due to position validation, but follow "Raise or Return" pattern
|
||||
@ -614,65 +662,64 @@ class DiceRollCommands(commands.Cog):
|
||||
|
||||
def _get_3b_error(self, d6_total: int) -> str:
|
||||
"""Get 3B error result based on 3d6 total."""
|
||||
return THIRD_BASE_ERRORS.get(d6_total, 'No error')
|
||||
return THIRD_BASE_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_1b_error(self, d6_total: int) -> str:
|
||||
"""Get 1B error result based on 3d6 total."""
|
||||
return FIRST_BASE_ERRORS.get(d6_total, 'No error')
|
||||
return FIRST_BASE_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_2b_error(self, d6_total: int) -> str:
|
||||
"""Get 2B error result based on 3d6 total."""
|
||||
return SECOND_BASE_ERRORS.get(d6_total, 'No error')
|
||||
return SECOND_BASE_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_ss_error(self, d6_total: int) -> str:
|
||||
"""Get SS error result based on 3d6 total."""
|
||||
return SHORTSTOP_ERRORS.get(d6_total, 'No error')
|
||||
return SHORTSTOP_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_corner_of_error(self, d6_total: int) -> str:
|
||||
"""Get LF/RF error result based on 3d6 total."""
|
||||
return CORNER_OUTFIELD_ERRORS.get(d6_total, 'No error')
|
||||
return CORNER_OUTFIELD_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_cf_error(self, d6_total: int) -> str:
|
||||
"""Get CF error result based on 3d6 total."""
|
||||
return CENTER_FIELD_ERRORS.get(d6_total, 'No error')
|
||||
return CENTER_FIELD_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_catcher_error(self, d6_total: int) -> str:
|
||||
"""Get Catcher error result based on 3d6 total."""
|
||||
return CATCHER_ERRORS.get(d6_total, 'No error')
|
||||
return CATCHER_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_pitcher_error(self, d6_total: int) -> str:
|
||||
"""Get Pitcher error result based on 3d6 total."""
|
||||
return PITCHER_ERRORS.get(d6_total, 'No error')
|
||||
return PITCHER_ERRORS.get(d6_total, "No error")
|
||||
|
||||
def _get_pitcher_rare_play(self, d20_total: int) -> str:
|
||||
return (
|
||||
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
|
||||
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
|
||||
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
|
||||
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
|
||||
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
|
||||
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
|
||||
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
|
||||
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
|
||||
)
|
||||
|
||||
def _get_infield_rare_play(self, d20_total: int) -> str:
|
||||
return (
|
||||
f'**G3**: {INFIELD_X_CHART["g3"]["rp"]}\n'
|
||||
f'**G2**: {INFIELD_X_CHART["g2"]["rp"]}\n'
|
||||
f'**G1**: {INFIELD_X_CHART["g1"]["rp"]}\n'
|
||||
f'**SI1**: {INFIELD_X_CHART["si1"]["rp"]}\n'
|
||||
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
|
||||
f"**G3**: {INFIELD_X_CHART['g3']['rp']}\n"
|
||||
f"**G2**: {INFIELD_X_CHART['g2']['rp']}\n"
|
||||
f"**G1**: {INFIELD_X_CHART['g1']['rp']}\n"
|
||||
f"**SI1**: {INFIELD_X_CHART['si1']['rp']}\n"
|
||||
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
|
||||
)
|
||||
|
||||
def _get_outfield_rare_play(self, d20_total: int) -> str:
|
||||
return (
|
||||
f'**F1**: {OUTFIELD_X_CHART["f1"]["rp"]}\n'
|
||||
f'**F2**: {OUTFIELD_X_CHART["f2"]["rp"]}\n'
|
||||
f'**F3**: {OUTFIELD_X_CHART["f3"]["rp"]}\n'
|
||||
f'**SI2**: {OUTFIELD_X_CHART["si2"]["rp"]}\n'
|
||||
f'**DO2**: {OUTFIELD_X_CHART["do2"]["rp"]}\n'
|
||||
f'**DO3**: {OUTFIELD_X_CHART["do3"]["rp"]}\n'
|
||||
f'**TR3**: {OUTFIELD_X_CHART["tr3"]["rp"]}\n'
|
||||
f"**F1**: {OUTFIELD_X_CHART['f1']['rp']}\n"
|
||||
f"**F2**: {OUTFIELD_X_CHART['f2']['rp']}\n"
|
||||
f"**F3**: {OUTFIELD_X_CHART['f3']['rp']}\n"
|
||||
f"**SI2**: {OUTFIELD_X_CHART['si2']['rp']}\n"
|
||||
f"**DO2**: {OUTFIELD_X_CHART['do2']['rp']}\n"
|
||||
f"**DO3**: {OUTFIELD_X_CHART['do3']['rp']}\n"
|
||||
f"**TR3**: {OUTFIELD_X_CHART['tr3']['rp']}\n"
|
||||
)
|
||||
|
||||
|
||||
def _roll_weighted_scout_dice(self, card_type: str) -> list[DiceRoll]:
|
||||
"""
|
||||
Roll scouting dice with weighted first d6 based on card type.
|
||||
@ -690,11 +737,11 @@ class DiceRollCommands(commands.Cog):
|
||||
first_roll = random.randint(4, 6)
|
||||
|
||||
first_d6_result = DiceRoll(
|
||||
dice_notation='1d6',
|
||||
dice_notation="1d6",
|
||||
num_dice=1,
|
||||
die_sides=6,
|
||||
rolls=[first_roll],
|
||||
total=first_roll
|
||||
total=first_roll,
|
||||
)
|
||||
|
||||
# Second roll (2d6) - normal
|
||||
@ -705,19 +752,20 @@ class DiceRollCommands(commands.Cog):
|
||||
|
||||
return [first_d6_result, second_result, third_result]
|
||||
|
||||
def _create_multi_roll_embed(self, dice_notation: str, roll_results: list[DiceRoll], user: discord.User | discord.Member, set_author: bool = True, embed_color: int = EmbedColors.PRIMARY) -> discord.Embed:
|
||||
def _create_multi_roll_embed(
|
||||
self,
|
||||
dice_notation: str,
|
||||
roll_results: list[DiceRoll],
|
||||
user: discord.User | discord.Member,
|
||||
set_author: bool = True,
|
||||
embed_color: int = EmbedColors.PRIMARY,
|
||||
) -> discord.Embed:
|
||||
"""Create an embed for multiple dice roll results."""
|
||||
embed = EmbedTemplate.create_base_embed(
|
||||
title="🎲 Dice Roll",
|
||||
color=embed_color
|
||||
)
|
||||
embed = EmbedTemplate.create_base_embed(title="🎲 Dice Roll", color=embed_color)
|
||||
|
||||
if set_author:
|
||||
# Set user info
|
||||
embed.set_author(
|
||||
name=user.name,
|
||||
icon_url=user.display_avatar.url
|
||||
)
|
||||
embed.set_author(name=user.name, icon_url=user.display_avatar.url)
|
||||
|
||||
# Create summary line with totals
|
||||
totals = [str(result.total) for result in roll_results]
|
||||
@ -735,19 +783,16 @@ class DiceRollCommands(commands.Cog):
|
||||
roll_groups.append(str(rolls[0]))
|
||||
else:
|
||||
# Multiple dice: space-separated within the group
|
||||
roll_groups.append(' '.join(str(r) for r in rolls))
|
||||
roll_groups.append(" ".join(str(r) for r in rolls))
|
||||
|
||||
details = f"Details:[{';'.join(dice_notations)} ({' - '.join(roll_groups)})]"
|
||||
|
||||
# Set as description
|
||||
embed.add_field(
|
||||
name='Result',
|
||||
value=f"```md\n{summary}\n{details}```"
|
||||
)
|
||||
embed.add_field(name="Result", value=f"```md\n{summary}\n{details}```")
|
||||
|
||||
return embed
|
||||
|
||||
|
||||
async def setup(bot: commands.Bot):
|
||||
"""Load the dice roll commands cog."""
|
||||
await bot.add_cog(DiceRollCommands(bot))
|
||||
await bot.add_cog(DiceRollCommands(bot))
|
||||
|
||||
@ -3,6 +3,7 @@ League service for Discord Bot v2.0
|
||||
|
||||
Handles league-wide operations including current state, standings, and season information.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
@ -10,25 +11,27 @@ from config import get_config
|
||||
from services.base_service import BaseService
|
||||
from models.current import Current
|
||||
from exceptions import APIException
|
||||
from utils.decorators import cached_single_item
|
||||
|
||||
logger = logging.getLogger(f'{__name__}.LeagueService')
|
||||
logger = logging.getLogger(f"{__name__}.LeagueService")
|
||||
|
||||
|
||||
class LeagueService(BaseService[Current]):
|
||||
"""
|
||||
Service for league-wide operations.
|
||||
|
||||
|
||||
Features:
|
||||
- Current league state retrieval
|
||||
- Season standings
|
||||
- League-wide statistics
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize league service."""
|
||||
super().__init__(Current, 'current')
|
||||
super().__init__(Current, "current")
|
||||
logger.debug("LeagueService initialized")
|
||||
|
||||
|
||||
@cached_single_item(ttl=60)
|
||||
async def get_current_state(self) -> Optional[Current]:
|
||||
"""
|
||||
Get the current league state including week, season, and settings.
|
||||
@ -38,11 +41,13 @@ class LeagueService(BaseService[Current]):
|
||||
"""
|
||||
try:
|
||||
client = await self.get_client()
|
||||
data = await client.get('current')
|
||||
data = await client.get("current")
|
||||
|
||||
if data:
|
||||
current = Current.from_api_data(data)
|
||||
logger.debug(f"Retrieved current state: Week {current.week}, Season {current.season}")
|
||||
logger.debug(
|
||||
f"Retrieved current state: Week {current.week}, Season {current.season}"
|
||||
)
|
||||
return current
|
||||
|
||||
logger.debug("No current state data found")
|
||||
@ -53,9 +58,7 @@ class LeagueService(BaseService[Current]):
|
||||
return None
|
||||
|
||||
async def update_current_state(
|
||||
self,
|
||||
week: Optional[int] = None,
|
||||
freeze: Optional[bool] = None
|
||||
self, week: Optional[int] = None, freeze: Optional[bool] = None
|
||||
) -> Optional[Current]:
|
||||
"""
|
||||
Update current league state (week and/or freeze status).
|
||||
@ -77,9 +80,9 @@ class LeagueService(BaseService[Current]):
|
||||
# Build update data
|
||||
update_data = {}
|
||||
if week is not None:
|
||||
update_data['week'] = week
|
||||
update_data["week"] = week
|
||||
if freeze is not None:
|
||||
update_data['freeze'] = freeze
|
||||
update_data["freeze"] = freeze
|
||||
|
||||
if not update_data:
|
||||
logger.warning("update_current_state called with no updates")
|
||||
@ -89,127 +92,152 @@ class LeagueService(BaseService[Current]):
|
||||
# (Current table has one row per season, NOT a single row with id=1)
|
||||
current = await self.get_current_state()
|
||||
if not current:
|
||||
logger.error("Cannot update current state - unable to fetch current state")
|
||||
logger.error(
|
||||
"Cannot update current state - unable to fetch current state"
|
||||
)
|
||||
return None
|
||||
|
||||
current_id = current.id
|
||||
logger.debug(f"Updating current state id={current_id} (season {current.season})")
|
||||
logger.debug(
|
||||
f"Updating current state id={current_id} (season {current.season})"
|
||||
)
|
||||
|
||||
# Use BaseService patch method
|
||||
updated_current = await self.patch(current_id, update_data, use_query_params=True)
|
||||
updated_current = await self.patch(
|
||||
current_id, update_data, use_query_params=True
|
||||
)
|
||||
|
||||
if updated_current:
|
||||
logger.info(f"Updated current state id={current_id}: {update_data}")
|
||||
return updated_current
|
||||
else:
|
||||
logger.error(f"Failed to update current state id={current_id} - patch returned None")
|
||||
logger.error(
|
||||
f"Failed to update current state id={current_id} - patch returned None"
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating current state: {e}")
|
||||
raise APIException(f"Failed to update current state: {e}")
|
||||
|
||||
async def get_standings(self, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
|
||||
async def get_standings(
|
||||
self, season: Optional[int] = None
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Get league standings for a season.
|
||||
|
||||
|
||||
Args:
|
||||
season: Season number (defaults to current season)
|
||||
|
||||
|
||||
Returns:
|
||||
List of standings data or None if not available
|
||||
"""
|
||||
try:
|
||||
season = season or get_config().sba_season
|
||||
client = await self.get_client()
|
||||
data = await client.get('standings', params=[('season', str(season))])
|
||||
|
||||
data = await client.get("standings", params=[("season", str(season))])
|
||||
|
||||
if data and isinstance(data, list):
|
||||
logger.debug(f"Retrieved standings for season {season}: {len(data)} teams")
|
||||
logger.debug(
|
||||
f"Retrieved standings for season {season}: {len(data)} teams"
|
||||
)
|
||||
return data
|
||||
elif data and isinstance(data, dict):
|
||||
# Handle case where API returns a dict with standings array
|
||||
standings_data = data.get('standings', data.get('items', []))
|
||||
standings_data = data.get("standings", data.get("items", []))
|
||||
if standings_data:
|
||||
logger.debug(f"Retrieved standings for season {season}: {len(standings_data)} teams")
|
||||
logger.debug(
|
||||
f"Retrieved standings for season {season}: {len(standings_data)} teams"
|
||||
)
|
||||
return standings_data
|
||||
|
||||
|
||||
logger.debug(f"No standings data found for season {season}")
|
||||
return None
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get standings for season {season}: {e}")
|
||||
return None
|
||||
|
||||
async def get_division_standings(self, division_id: int, season: Optional[int] = None) -> Optional[List[Dict[str, Any]]]:
|
||||
|
||||
async def get_division_standings(
|
||||
self, division_id: int, season: Optional[int] = None
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Get standings for a specific division.
|
||||
|
||||
|
||||
Args:
|
||||
division_id: Division identifier
|
||||
season: Season number (defaults to current season)
|
||||
|
||||
|
||||
Returns:
|
||||
List of division standings or None if not available
|
||||
"""
|
||||
try:
|
||||
season = season or get_config().sba_season
|
||||
client = await self.get_client()
|
||||
data = await client.get(f'standings/division/{division_id}', params=[('season', str(season))])
|
||||
|
||||
data = await client.get(
|
||||
f"standings/division/{division_id}", params=[("season", str(season))]
|
||||
)
|
||||
|
||||
if data and isinstance(data, list):
|
||||
logger.debug(f"Retrieved division {division_id} standings for season {season}: {len(data)} teams")
|
||||
logger.debug(
|
||||
f"Retrieved division {division_id} standings for season {season}: {len(data)} teams"
|
||||
)
|
||||
return data
|
||||
|
||||
logger.debug(f"No division standings found for division {division_id}, season {season}")
|
||||
|
||||
logger.debug(
|
||||
f"No division standings found for division {division_id}, season {season}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get division {division_id} standings: {e}")
|
||||
return None
|
||||
|
||||
async def get_league_leaders(self, stat_type: str = 'batting', season: Optional[int] = None, limit: int = 10) -> Optional[List[Dict[str, Any]]]:
|
||||
|
||||
async def get_league_leaders(
|
||||
self, stat_type: str = "batting", season: Optional[int] = None, limit: int = 10
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Get league leaders for a specific statistic category.
|
||||
|
||||
|
||||
Args:
|
||||
stat_type: Type of stats ('batting', 'pitching', 'fielding')
|
||||
season: Season number (defaults to current season)
|
||||
limit: Number of leaders to return
|
||||
|
||||
|
||||
Returns:
|
||||
List of league leaders or None if not available
|
||||
"""
|
||||
try:
|
||||
season = season or get_config().sba_season
|
||||
client = await self.get_client()
|
||||
|
||||
params = [
|
||||
('season', str(season)),
|
||||
('limit', str(limit))
|
||||
]
|
||||
|
||||
data = await client.get(f'leaders/{stat_type}', params=params)
|
||||
|
||||
|
||||
params = [("season", str(season)), ("limit", str(limit))]
|
||||
|
||||
data = await client.get(f"leaders/{stat_type}", params=params)
|
||||
|
||||
if data:
|
||||
# Handle different response formats
|
||||
if isinstance(data, list):
|
||||
leaders = data
|
||||
elif isinstance(data, dict):
|
||||
leaders = data.get('leaders', data.get('items', data.get('results', [])))
|
||||
leaders = data.get(
|
||||
"leaders", data.get("items", data.get("results", []))
|
||||
)
|
||||
else:
|
||||
leaders = []
|
||||
|
||||
logger.debug(f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players")
|
||||
|
||||
logger.debug(
|
||||
f"Retrieved {stat_type} leaders for season {season}: {len(leaders)} players"
|
||||
)
|
||||
return leaders[:limit] # Ensure we don't exceed limit
|
||||
|
||||
|
||||
logger.debug(f"No {stat_type} leaders found for season {season}")
|
||||
return None
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get {stat_type} leaders for season {season}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# Global service instance
|
||||
league_service = LeagueService()
|
||||
league_service = LeagueService()
|
||||
|
||||
@ -11,6 +11,7 @@ from config import get_config
|
||||
from services.base_service import BaseService
|
||||
from models.player import Player
|
||||
from exceptions import APIException
|
||||
from utils.decorators import cached_api_call
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from services.team_service import TeamService
|
||||
@ -270,6 +271,7 @@ class PlayerService(BaseService[Player]):
|
||||
logger.error(f"Error in fuzzy search for '{query}': {e}")
|
||||
return []
|
||||
|
||||
@cached_api_call(ttl=300)
|
||||
async def get_free_agents(self, season: int) -> List[Player]:
|
||||
"""
|
||||
Get all free agent players.
|
||||
@ -372,10 +374,7 @@ class PlayerService(BaseService[Player]):
|
||||
return None
|
||||
|
||||
async def update_player_team(
|
||||
self,
|
||||
player_id: int,
|
||||
new_team_id: int,
|
||||
dem_week: Optional[int] = None
|
||||
self, player_id: int, new_team_id: int, dem_week: Optional[int] = None
|
||||
) -> Optional[Player]:
|
||||
"""
|
||||
Update a player's team assignment.
|
||||
|
||||
@ -5,7 +5,8 @@ Handles team standings retrieval and processing.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional, List, Dict
|
||||
import time
|
||||
from typing import Optional, List, Dict, Tuple
|
||||
|
||||
from api.client import get_global_client
|
||||
from models.standings import TeamStandings
|
||||
@ -13,6 +14,10 @@ from exceptions import APIException
|
||||
|
||||
logger = logging.getLogger(f"{__name__}.StandingsService")
|
||||
|
||||
# In-memory cache for standings: season -> (standings_list, cached_at)
|
||||
_standings_cache: Dict[int, Tuple[List[TeamStandings], float]] = {}
|
||||
_STANDINGS_CACHE_TTL = 600 # 10 minutes
|
||||
|
||||
|
||||
class StandingsService:
|
||||
"""
|
||||
@ -45,6 +50,13 @@ class StandingsService:
|
||||
List of TeamStandings ordered by record
|
||||
"""
|
||||
try:
|
||||
# Check in-memory cache first
|
||||
if season in _standings_cache:
|
||||
cached_standings, cached_at = _standings_cache[season]
|
||||
if time.time() - cached_at < _STANDINGS_CACHE_TTL:
|
||||
logger.debug(f"Cache hit for standings season {season}")
|
||||
return cached_standings
|
||||
|
||||
client = await self.get_client()
|
||||
|
||||
params = [("season", str(season))]
|
||||
@ -72,6 +84,10 @@ class StandingsService:
|
||||
logger.info(
|
||||
f"Retrieved standings for {len(standings)} teams in season {season}"
|
||||
)
|
||||
|
||||
# Cache the result
|
||||
_standings_cache[season] = (standings, time.time())
|
||||
|
||||
return standings
|
||||
|
||||
except Exception as e:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user