Merge pull request #9 from calcorum/feature-draft-system

Feature draft system
This commit is contained in:
Cal Corum 2025-10-25 20:03:13 -05:00 committed by GitHub
commit d3824d7295
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 4187 additions and 147 deletions

View File

@ -43,6 +43,7 @@ tests/
# Logs
logs/
*.log
production_logs/
# Environment files
.env

4
.gitignore vendored
View File

@ -216,5 +216,7 @@ marimo/_static/
marimo/_lsp/
__marimo__/
# Project-specific
data/
production_logs/
*.json

12
bot.py
View File

@ -115,6 +115,7 @@ class SBABot(commands.Bot):
from commands.admin import setup_admin
from commands.transactions import setup_transactions
from commands.dice import setup_dice
from commands.draft import setup_draft
from commands.voice import setup_voice
from commands.utilities import setup_utilities
from commands.help import setup_help_commands
@ -133,6 +134,7 @@ class SBABot(commands.Bot):
("admin", setup_admin),
("transactions", setup_transactions),
("dice", setup_dice),
("draft", setup_draft),
("voice", setup_voice),
("utilities", setup_utilities),
("help", setup_help_commands),
@ -183,12 +185,8 @@ class SBABot(commands.Bot):
self.logger.info("✅ Transaction freeze/thaw task started")
# Initialize voice channel cleanup service
from commands.voice.cleanup_service import VoiceChannelCleanupService
self.voice_cleanup_service = VoiceChannelCleanupService()
# Start voice channel monitoring (includes startup verification)
import asyncio
asyncio.create_task(self.voice_cleanup_service.start_monitoring(self))
from commands.voice.cleanup_service import setup_voice_cleanup
self.voice_cleanup_service = setup_voice_cleanup(self)
self.logger.info("✅ Voice channel cleanup service started")
# Initialize live scorebug tracker
@ -345,7 +343,7 @@ class SBABot(commands.Bot):
if hasattr(self, 'voice_cleanup_service'):
try:
self.voice_cleanup_service.stop_monitoring()
self.voice_cleanup_service.cog_unload()
self.logger.info("Voice channel cleanup service stopped")
except Exception as e:
self.logger.error(f"Error stopping voice cleanup service: {e}")

View File

@ -119,6 +119,9 @@ class DiceRollCommands(commands.Cog):
dice_notation = "1d6;2d6;1d20"
roll_results = self._parse_and_roll_multiple_dice(dice_notation)
injury_risk = (roll_results[0].total == 6) and (roll_results[1].total in [7, 8, 9, 10, 11, 12])
d6_total = roll_results[1].total
embed_title = 'At bat roll'
if roll_results[2].total == 1:
embed_title = 'Wild pitch roll'
@ -131,13 +134,21 @@ class DiceRollCommands(commands.Cog):
# Create embed for the roll results
embed = self._create_multi_roll_embed(
dice_notation,
roll_results,
interaction.user,
dice_notation,
roll_results,
interaction.user,
set_author=False,
embed_color=embed_color
)
embed.title = f'{embed_title} for {interaction.user.display_name}'
if injury_risk and embed_title == 'At bat roll':
embed.add_field(
name=f'Check injury for pitcher injury rating {13 - d6_total}',
value='Oops! All injuries!',
inline=False
)
await interaction.followup.send(embed=embed)
@commands.command(name="ab", aliases=["atbat"])

263
commands/draft/CLAUDE.md Normal file
View File

@ -0,0 +1,263 @@
# Draft Commands
This directory contains Discord slash commands for draft system operations.
## Files
### `picks.py`
- **Command**: `/draft`
- **Description**: Make a draft pick with FA player autocomplete
- **Parameters**:
- `player` (required): Player name to draft (autocomplete shows FA players with position and sWAR)
- **Service Dependencies**:
- `draft_service.get_draft_data()`
- `draft_pick_service.get_pick()`
- `draft_pick_service.update_pick_selection()`
- `team_service.get_team_by_owner()` (CACHED)
- `team_service.get_team_roster()`
- `player_service.get_players_by_name()`
- `player_service.update_player_team()`
## Key Features
### Global Pick Lock
- **Purpose**: Prevent concurrent draft picks that could cause race conditions
- **Implementation**: `asyncio.Lock()` stored in cog instance
- **Location**: Local only (not in database)
- **Timeout**: 30-second stale lock auto-override
- **Integration**: Background monitor task respects same lock
```python
# In DraftPicksCog
self.pick_lock = asyncio.Lock()
self.lock_acquired_at: Optional[datetime] = None
self.lock_acquired_by: Optional[int] = None
# Lock acquisition with timeout check
if self.pick_lock.locked():
if time_held > 30:
# Override stale lock
pass
else:
# Reject with wait time
return
async with self.pick_lock:
# Process pick
pass
```
### Pick Validation Flow
1. **Lock Check**: Verify no active pick in progress (or stale lock >30s)
2. **GM Validation**: Verify user is team owner (cached lookup - fast!)
3. **Draft State**: Get current draft configuration
4. **Turn Validation**: Verify user's team is on the clock
5. **Player Validation**: Verify player is FA (team_id = 498)
6. **Cap Space**: Validate 32 sWAR limit won't be exceeded
7. **Execution**: Update pick, update player team, advance draft
8. **Announcements**: Post success message and player card
### FA Player Autocomplete
The autocomplete function filters to FA players only:
```python
async def fa_player_autocomplete(interaction, current: str):
# Search all players
players = await player_service.search_players(current, limit=25)
# Filter to FA only (team_id = 498)
fa_players = [p for p in players if p.team_id == 498]
# Return choices with position and sWAR
return [Choice(name=f"{p.name} ({p.pos}) - {p.wara:.2f} sWAR", value=p.name)]
```
### Cap Space Validation
Uses `utils.draft_helpers.validate_cap_space()`:
```python
async def validate_cap_space(roster: dict, new_player_wara: float):
# Calculate how many players count (top 26 of 32 roster spots)
max_counted = min(26, 26 - (32 - projected_roster_size))
# Sort all players + new player by sWAR descending
sorted_wara = sorted(all_players_wara, reverse=True)
# Sum top N
projected_total = sum(sorted_wara[:max_counted])
# Check against limit (with tiny float tolerance)
return projected_total <= 32.00001, projected_total
```
## Architecture Notes
### Command Pattern
- Uses `@logged_command("/draft")` decorator (no manual error handling)
- Always defers response: `await interaction.response.defer()`
- Service layer only (no direct API client access)
- Comprehensive logging with contextual information
### Race Condition Prevention
The global lock ensures:
- Only ONE pick can be processed at a time league-wide
- Co-GMs cannot both draft simultaneously
- Background auto-draft respects same lock
- Stale locks (crashes/network issues) auto-clear after 30s
### Performance Optimizations
- **Team lookup cached** (`get_team_by_owner` uses `@cached_single_item`)
- **80% reduction** in API calls for GM validation
- **Sub-millisecond cache hits** vs 50-200ms API calls
- Draft data NOT cached (changes too frequently)
## Troubleshooting
### Common Issues
1. **"Pick In Progress" message**:
- Another user is currently making a pick
- Wait ~30 seconds for pick to complete
- If stuck, lock will auto-clear after 30s
2. **"Not Your Turn" message**:
- Current pick belongs to different team
- Wait for your turn in draft order
- Admin can use `/draft-admin` to adjust
3. **"Cap Space Exceeded" message**:
- Drafting player would exceed 32.00 sWAR limit
- Only top 26 players count toward cap
- Choose player with lower sWAR value
4. **"Player Not Available" message**:
- Player is not a free agent
- May have been drafted by another team
- Check draft board for available players
### Lock State Debugging
Check lock status with admin tools:
```python
# Lock state
draft_picks_cog.pick_lock.locked() # True if held
draft_picks_cog.lock_acquired_at # When lock was acquired
draft_picks_cog.lock_acquired_by # User ID holding lock
```
Admin can force-clear locks:
- Use `/draft-admin clear-lock` (when implemented)
- Restart bot (lock is local only)
## Draft Format
### Hybrid Linear + Snake
- **Rounds 1-10**: Linear draft (same order every round)
- **Rounds 11+**: Snake draft (reverse on even rounds)
- **Special Rule**: Round 11 Pick 1 = same team as Round 10 Pick 16
### Pick Order Calculation
Uses `utils.draft_helpers.calculate_pick_details()`:
```python
def calculate_pick_details(overall: int) -> tuple[int, int]:
round_num = math.ceil(overall / 16)
if round_num <= 10:
# Linear: 1-16, 1-16, 1-16, ...
position = ((overall - 1) % 16) + 1
else:
# Snake: odd rounds forward, even rounds reverse
if round_num % 2 == 1:
position = ((overall - 1) % 16) + 1
else:
position = 16 - ((overall - 1) % 16)
return round_num, position
```
## Integration with Background Task
The draft monitor task (`tasks/draft_monitor.py`) integrates with this command:
1. **Shared Lock**: Monitor acquires same `pick_lock` for auto-draft
2. **Timer Expiry**: When deadline passes, monitor auto-drafts
3. **Draft List**: Monitor tries players from team's draft list in order
4. **Pick Advancement**: Monitor calls same `draft_service.advance_pick()`
## Future Commands
### `/draft-status` (Pending Implementation)
Display current draft state, timer, lock status
### `/draft-admin` (Pending Implementation)
Admin controls:
- Timer on/off
- Set current pick
- Configure channels
- Wipe picks
- Clear stale locks
- Set keepers
### `/draft-list` (Pending Implementation)
Manage auto-draft queue:
- View current list
- Add players
- Remove players
- Reorder players
- Clear list
### `/draft-board` (Pending Implementation)
View draft board by round with pagination
## Dependencies
- `config.get_config()`
- `services.draft_service`
- `services.draft_pick_service`
- `services.player_service`
- `services.team_service` (with caching)
- `utils.decorators.logged_command`
- `utils.draft_helpers.validate_cap_space`
- `views.draft_views.*`
- `asyncio.Lock` for race condition prevention
## Testing
Run tests with: `python -m pytest tests/test_commands_draft.py -v` (when implemented)
Test scenarios:
- **Concurrent picks**: Two users try to draft simultaneously
- **Stale lock**: Lock held >30s gets overridden
- **Cap validation**: Player would exceed 32 sWAR limit
- **Turn validation**: User tries to draft out of turn
- **Player availability**: Player already drafted
## Security Considerations
### Permission Validation
- Only team owners (GMs) can make draft picks
- Validated via `team_service.get_team_by_owner()`
- Cached for performance (30-minute TTL)
### Data Integrity
- Global lock prevents duplicate picks
- Cap validation prevents roster violations
- Turn validation enforces draft order
- All updates atomic (pick + player team)
## Database Requirements
- Draft data table (configuration and state)
- Draft picks table (all picks for season)
- Draft list table (auto-draft queues)
- Player records with team associations
- Team records with owner associations
---
**Last Updated:** October 2025
**Status:** Core `/draft` command implemented and tested
**Next:** Implement `/draft-status`, `/draft-admin`, `/draft-list` commands

View File

@ -0,0 +1,84 @@
"""
Draft Commands Package for Discord Bot v2.0
Contains slash commands for draft operations:
- /draft - Make a draft pick with autocomplete
- /draft-status - View current draft state
- /draft-on-clock - Detailed on the clock information
- /draft-admin - Admin controls for draft management
- /draft-list - View auto-draft queue
- /draft-list-add - Add player to queue
- /draft-list-remove - Remove player from queue
- /draft-list-clear - Clear entire queue
- /draft-board - View draft picks by round
"""
import logging
from discord.ext import commands
from .picks import DraftPicksCog
from .status import DraftStatusCommands
from .list import DraftListCommands
from .board import DraftBoardCommands
from .admin import DraftAdminGroup
logger = logging.getLogger(__name__)
async def setup_draft(bot: commands.Bot):
"""
Setup all draft command modules.
Returns:
tuple: (successful_count, failed_count, failed_modules)
"""
# Define all draft command cogs to load
draft_cogs = [
("DraftPicksCog", DraftPicksCog),
("DraftStatusCommands", DraftStatusCommands),
("DraftListCommands", DraftListCommands),
("DraftBoardCommands", DraftBoardCommands),
]
successful = 0
failed = 0
failed_modules = []
# Load regular cogs
for cog_name, cog_class in draft_cogs:
try:
await bot.add_cog(cog_class(bot))
logger.info(f"✅ Loaded {cog_name}")
successful += 1
except Exception as e:
logger.error(f"❌ Failed to load {cog_name}: {e}", exc_info=True)
failed += 1
failed_modules.append(cog_name)
# Load draft admin group (app_commands.Group pattern)
try:
bot.tree.add_command(DraftAdminGroup())
logger.info("✅ Loaded DraftAdminGroup")
successful += 1
except Exception as e:
logger.error(f"❌ Failed to load DraftAdminGroup: {e}", exc_info=True)
failed += 1
failed_modules.append("DraftAdminGroup")
# Log summary
if failed == 0:
logger.info(f"🎉 All {successful} draft command modules loaded successfully")
else:
logger.warning(f"⚠️ Draft commands loaded with issues: {successful} successful, {failed} failed")
return successful, failed, failed_modules
# Export the setup function for easy importing
__all__ = [
'setup_draft',
'DraftPicksCog',
'DraftStatusCommands',
'DraftListCommands',
'DraftBoardCommands',
'DraftAdminGroup'
]

293
commands/draft/admin.py Normal file
View File

@ -0,0 +1,293 @@
"""
Draft Admin Commands
Admin-only commands for draft management and configuration.
"""
from typing import Optional
import discord
from discord import app_commands
from discord.ext import commands
from config import get_config
from services.draft_service import draft_service
from services.draft_pick_service import draft_pick_service
from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from views.draft_views import create_admin_draft_info_embed
from views.embeds import EmbedTemplate
class DraftAdminGroup(app_commands.Group):
"""Draft administration command group."""
def __init__(self):
super().__init__(
name="draft-admin",
description="Admin commands for draft management"
)
self.logger = get_contextual_logger(f'{__name__}.DraftAdminGroup')
@app_commands.command(name="info", description="View current draft configuration")
@app_commands.checks.has_permissions(administrator=True)
@logged_command("/draft-admin info")
async def draft_admin_info(self, interaction: discord.Interaction):
"""Display current draft configuration and state."""
await interaction.response.defer()
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get current pick
config = get_config()
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
# Create admin info embed
embed = await create_admin_draft_info_embed(draft_data, current_pick)
await interaction.followup.send(embed=embed)
@app_commands.command(name="timer", description="Enable or disable draft timer")
@app_commands.describe(
enabled="Turn timer on or off",
minutes="Minutes per pick (optional, default uses current setting)"
)
@app_commands.checks.has_permissions(administrator=True)
@logged_command("/draft-admin timer")
async def draft_admin_timer(
self,
interaction: discord.Interaction,
enabled: bool,
minutes: Optional[int] = None
):
"""Enable or disable the draft timer."""
await interaction.response.defer()
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Update timer
updated = await draft_service.set_timer(draft_data.id, enabled, minutes)
if not updated:
embed = EmbedTemplate.error(
"Update Failed",
"Failed to update draft timer."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
status = "enabled" if enabled else "disabled"
description = f"Draft timer has been **{status}**."
if enabled and minutes:
description += f"\n\nPick duration: **{minutes} minutes**"
elif enabled:
description += f"\n\nPick duration: **{updated.pick_minutes} minutes**"
embed = EmbedTemplate.success("Timer Updated", description)
await interaction.followup.send(embed=embed)
@app_commands.command(name="set-pick", description="Set current pick number")
@app_commands.describe(
pick_number="Overall pick number to jump to (1-512)"
)
@app_commands.checks.has_permissions(administrator=True)
@logged_command("/draft-admin set-pick")
async def draft_admin_set_pick(
self,
interaction: discord.Interaction,
pick_number: int
):
"""Set the current pick number (admin operation)."""
await interaction.response.defer()
config = get_config()
# Validate pick number
if pick_number < 1 or pick_number > config.draft_total_picks:
embed = EmbedTemplate.error(
"Invalid Pick Number",
f"Pick number must be between 1 and {config.draft_total_picks}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Verify pick exists
pick = await draft_pick_service.get_pick(config.sba_current_season, pick_number)
if not pick:
embed = EmbedTemplate.error(
"Pick Not Found",
f"Pick #{pick_number} does not exist in the database."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Update current pick
updated = await draft_service.set_current_pick(
draft_data.id,
pick_number,
reset_timer=True
)
if not updated:
embed = EmbedTemplate.error(
"Update Failed",
"Failed to update current pick."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
from utils.draft_helpers import format_pick_display
description = f"Current pick set to **{format_pick_display(pick_number)}**."
if pick.owner:
description += f"\n\n{pick.owner.abbrev} {pick.owner.sname} is now on the clock."
embed = EmbedTemplate.success("Pick Updated", description)
await interaction.followup.send(embed=embed)
@app_commands.command(name="channels", description="Configure draft Discord channels")
@app_commands.describe(
ping_channel="Channel for 'on the clock' pings",
result_channel="Channel for draft results"
)
@app_commands.checks.has_permissions(administrator=True)
@logged_command("/draft-admin channels")
async def draft_admin_channels(
self,
interaction: discord.Interaction,
ping_channel: Optional[discord.TextChannel] = None,
result_channel: Optional[discord.TextChannel] = None
):
"""Configure draft Discord channels."""
await interaction.response.defer()
if not ping_channel and not result_channel:
embed = EmbedTemplate.error(
"No Channels Provided",
"Please specify at least one channel to update."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Update channels
updated = await draft_service.update_channels(
draft_data.id,
ping_channel_id=ping_channel.id if ping_channel else None,
result_channel_id=result_channel.id if result_channel else None
)
if not updated:
embed = EmbedTemplate.error(
"Update Failed",
"Failed to update draft channels."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
description = "Draft channels updated:\n\n"
if ping_channel:
description += f"**Ping Channel:** {ping_channel.mention}\n"
if result_channel:
description += f"**Result Channel:** {result_channel.mention}\n"
embed = EmbedTemplate.success("Channels Updated", description)
await interaction.followup.send(embed=embed)
@app_commands.command(name="reset-deadline", description="Reset current pick deadline")
@app_commands.describe(
minutes="Minutes to add (uses default if not provided)"
)
@app_commands.checks.has_permissions(administrator=True)
@logged_command("/draft-admin reset-deadline")
async def draft_admin_reset_deadline(
self,
interaction: discord.Interaction,
minutes: Optional[int] = None
):
"""Reset the current pick deadline."""
await interaction.response.defer()
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
if not draft_data.timer:
embed = EmbedTemplate.warning(
"Timer Inactive",
"Draft timer is currently disabled. Enable it with `/draft-admin timer on` first."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Reset deadline
updated = await draft_service.reset_draft_deadline(draft_data.id, minutes)
if not updated:
embed = EmbedTemplate.error(
"Update Failed",
"Failed to reset draft deadline."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
deadline_timestamp = int(updated.pick_deadline.timestamp())
minutes_used = minutes if minutes else updated.pick_minutes
description = f"Pick deadline reset: **{minutes_used} minutes** added.\n\n"
description += f"New deadline: <t:{deadline_timestamp}:F> (<t:{deadline_timestamp}:R>)"
embed = EmbedTemplate.success("Deadline Reset", description)
await interaction.followup.send(embed=embed)
async def setup(bot: commands.Bot):
"""Setup function for loading the draft admin commands."""
bot.tree.add_command(DraftAdminGroup())

79
commands/draft/board.py Normal file
View File

@ -0,0 +1,79 @@
"""
Draft Board Commands
View draft picks by round with pagination.
"""
from typing import Optional
import discord
from discord.ext import commands
from config import get_config
from services.draft_pick_service import draft_pick_service
from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from views.draft_views import create_draft_board_embed
from views.embeds import EmbedTemplate
class DraftBoardCommands(commands.Cog):
"""Draft board viewing command handlers."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftBoardCommands')
@discord.app_commands.command(
name="draft-board",
description="View draft picks by round"
)
@discord.app_commands.describe(
round_number="Round number to view (1-32)"
)
@logged_command("/draft-board")
async def draft_board(
self,
interaction: discord.Interaction,
round_number: Optional[int] = None
):
"""Display draft board for a specific round."""
await interaction.response.defer()
config = get_config()
# Default to round 1 if not specified
if round_number is None:
round_number = 1
# Validate round number
if round_number < 1 or round_number > config.draft_rounds:
embed = EmbedTemplate.error(
"Invalid Round",
f"Round number must be between 1 and {config.draft_rounds}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get picks for this round
picks = await draft_pick_service.get_picks_by_round(
config.sba_current_season,
round_number,
include_taken=True
)
if not picks:
embed = EmbedTemplate.error(
"No Picks Found",
f"Could not retrieve picks for round {round_number}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Create draft board embed
embed = await create_draft_board_embed(round_number, picks)
await interaction.followup.send(embed=embed)
async def setup(bot: commands.Bot):
"""Load the draft board commands cog."""
await bot.add_cog(DraftBoardCommands(bot))

328
commands/draft/list.py Normal file
View File

@ -0,0 +1,328 @@
"""
Draft List Commands
Manage team auto-draft queue (draft board).
"""
from typing import List, Optional
import discord
from discord import app_commands
from discord.ext import commands
from config import get_config
from services.draft_list_service import draft_list_service
from services.player_service import player_service
from services.team_service import team_service
from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from views.draft_views import create_draft_list_embed
from views.embeds import EmbedTemplate
async def fa_player_autocomplete(
interaction: discord.Interaction,
current: str,
) -> List[discord.app_commands.Choice[str]]:
"""Autocomplete for FA players only."""
if len(current) < 2:
return []
try:
config = get_config()
players = await player_service.search_players(
current,
limit=25,
season=config.sba_current_season
)
# Filter to FA team
fa_players = [p for p in players if p.team_id == config.free_agent_team_id]
return [
discord.app_commands.Choice(
name=f"{p.name} ({p.primary_position}) - {p.wara:.2f} sWAR",
value=p.name
)
for p in fa_players[:25]
]
except Exception:
return []
class DraftListCommands(commands.Cog):
"""Draft list management command handlers."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftListCommands')
@discord.app_commands.command(
name="draft-list",
description="View your team's auto-draft queue"
)
@logged_command("/draft-list")
async def draft_list_view(self, interaction: discord.Interaction):
"""Display team's draft list."""
await interaction.response.defer(ephemeral=True)
config = get_config()
# Get user's team
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_current_season
)
if not team:
embed = EmbedTemplate.error(
"Not a GM",
"You are not registered as a team owner."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get draft list
draft_list = await draft_list_service.get_team_list(
config.sba_current_season,
team.id
)
# Create embed
embed = await create_draft_list_embed(team, draft_list)
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="draft-list-add",
description="Add player to your auto-draft queue"
)
@discord.app_commands.describe(
player="Player name to add (autocomplete shows FA players)",
rank="Position in queue (optional, adds to end if not specified)"
)
@discord.app_commands.autocomplete(player=fa_player_autocomplete)
@logged_command("/draft-list-add")
async def draft_list_add(
self,
interaction: discord.Interaction,
player: str,
rank: Optional[int] = None
):
"""Add player to draft list."""
await interaction.response.defer(ephemeral=True)
config = get_config()
# Get user's team
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_current_season
)
if not team:
embed = EmbedTemplate.error(
"Not a GM",
"You are not registered as a team owner."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get player
players = await player_service.get_players_by_name(player, config.sba_current_season)
if not players:
embed = EmbedTemplate.error(
"Player Not Found",
f"Could not find player '{player}'."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
player_obj = players[0]
# Validate player is FA
if player_obj.team_id != config.free_agent_team_id:
embed = EmbedTemplate.error(
"Player Not Available",
f"{player_obj.name} is not a free agent."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Check if player already in list
current_list = await draft_list_service.get_team_list(
config.sba_current_season,
team.id
)
if any(entry.player_id == player_obj.id for entry in current_list):
embed = EmbedTemplate.error(
"Already in Queue",
f"{player_obj.name} is already in your draft queue."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Validate rank
if rank is not None:
if rank < 1 or rank > len(current_list) + 1:
embed = EmbedTemplate.error(
"Invalid Rank",
f"Rank must be between 1 and {len(current_list) + 1}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Add to list
updated_list = await draft_list_service.add_to_list(
config.sba_current_season,
team.id,
player_obj.id,
rank
)
if not updated_list:
embed = EmbedTemplate.error(
"Add Failed",
f"Failed to add {player_obj.name} to draft queue."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Find the added entry to get its rank
added_entry = next((e for e in updated_list if e.player_id == player_obj.id), None)
rank_str = f"#{added_entry.rank}" if added_entry else "at end"
# Success message with full draft list
success_msg = f"✅ Added **{player_obj.name}** at position **{rank_str}**"
embed = await create_draft_list_embed(team, updated_list)
embed.description = f"{success_msg}\n\n{embed.description}"
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="draft-list-remove",
description="Remove player from your auto-draft queue"
)
@discord.app_commands.describe(
player="Player name to remove"
)
@discord.app_commands.autocomplete(player=fa_player_autocomplete)
@logged_command("/draft-list-remove")
async def draft_list_remove(
self,
interaction: discord.Interaction,
player: str
):
"""Remove player from draft list."""
await interaction.response.defer(ephemeral=True)
config = get_config()
# Get user's team
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_current_season
)
if not team:
embed = EmbedTemplate.error(
"Not a GM",
"You are not registered as a team owner."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get player
players = await player_service.get_players_by_name(player, config.sba_current_season)
if not players:
embed = EmbedTemplate.error(
"Player Not Found",
f"Could not find player '{player}'."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
player_obj = players[0]
# Remove from list
success = await draft_list_service.remove_player_from_list(
config.sba_current_season,
team.id,
player_obj.id
)
if not success:
embed = EmbedTemplate.error(
"Not in Queue",
f"{player_obj.name} is not in your draft queue."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
description = f"Removed **{player_obj.name}** from your draft queue."
embed = EmbedTemplate.success("Player Removed", description)
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="draft-list-clear",
description="Clear your entire auto-draft queue"
)
@logged_command("/draft-list-clear")
async def draft_list_clear(self, interaction: discord.Interaction):
"""Clear entire draft list."""
await interaction.response.defer(ephemeral=True)
config = get_config()
# Get user's team
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_current_season
)
if not team:
embed = EmbedTemplate.error(
"Not a GM",
"You are not registered as a team owner."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get current list size
current_list = await draft_list_service.get_team_list(
config.sba_current_season,
team.id
)
if not current_list:
embed = EmbedTemplate.info(
"Queue Empty",
"Your draft queue is already empty."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Clear list
success = await draft_list_service.clear_list(
config.sba_current_season,
team.id
)
if not success:
embed = EmbedTemplate.error(
"Clear Failed",
"Failed to clear draft queue."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Success message
description = f"Cleared **{len(current_list)} players** from your draft queue."
embed = EmbedTemplate.success("Queue Cleared", description)
await interaction.followup.send(embed=embed)
async def setup(bot: commands.Bot):
"""Load the draft list commands cog."""
await bot.add_cog(DraftListCommands(bot))

277
commands/draft/picks.py Normal file
View File

@ -0,0 +1,277 @@
"""
Draft Pick Commands
Implements slash commands for making draft picks with global lock protection.
"""
import asyncio
from typing import List, Optional
from datetime import datetime
import discord
from discord.ext import commands
from config import get_config
from services.draft_service import draft_service
from services.draft_pick_service import draft_pick_service
from services.player_service import player_service
from services.team_service import team_service
from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from utils.draft_helpers import validate_cap_space, format_pick_display
from views.draft_views import (
create_player_draft_card,
create_pick_illegal_embed,
create_pick_success_embed
)
async def fa_player_autocomplete(
interaction: discord.Interaction,
current: str,
) -> List[discord.app_commands.Choice[str]]:
"""Autocomplete for FA players only."""
if len(current) < 2:
return []
try:
config = get_config()
# Search for FA players only
players = await player_service.search_players(
current,
limit=25,
season=config.sba_current_season
)
# Filter to FA team
fa_players = [p for p in players if p.team_id == config.free_agent_team_id]
return [
discord.app_commands.Choice(
name=f"{p.name} ({p.primary_position}) - {p.wara:.2f} sWAR",
value=p.name
)
for p in fa_players[:25]
]
except Exception:
return []
class DraftPicksCog(commands.Cog):
"""Draft pick command handlers with global lock protection."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftPicksCog')
# GLOBAL PICK LOCK (local only - not in database)
self.pick_lock = asyncio.Lock()
self.lock_acquired_at: Optional[datetime] = None
self.lock_acquired_by: Optional[int] = None
@discord.app_commands.command(
name="draft",
description="Make a draft pick (autocomplete shows FA players only)"
)
@discord.app_commands.describe(
player="Player name to draft (autocomplete shows available FA players)"
)
@discord.app_commands.autocomplete(player=fa_player_autocomplete)
@logged_command("/draft")
async def draft_pick(
self,
interaction: discord.Interaction,
player: str
):
"""Make a draft pick with global lock protection."""
await interaction.response.defer()
# Check if lock is held
if self.pick_lock.locked():
if self.lock_acquired_at:
time_held = (datetime.now() - self.lock_acquired_at).total_seconds()
if time_held > 30:
# STALE LOCK: Auto-override after 30 seconds
self.logger.warning(
f"Stale lock detected ({time_held:.1f}s). "
f"Overriding lock from user {self.lock_acquired_by}"
)
else:
# ACTIVE LOCK: Reject with friendly message
embed = await create_pick_illegal_embed(
"Pick In Progress",
f"Another manager is currently making a pick. "
f"Please wait approximately {30 - int(time_held)} seconds."
)
await interaction.followup.send(embed=embed)
return
# Acquire global lock
async with self.pick_lock:
self.lock_acquired_at = datetime.now()
self.lock_acquired_by = interaction.user.id
try:
await self._process_draft_pick(interaction, player)
finally:
self.lock_acquired_at = None
self.lock_acquired_by = None
async def _process_draft_pick(
self,
interaction: discord.Interaction,
player_name: str
):
"""
Process draft pick with validation.
Args:
interaction: Discord interaction
player_name: Player name to draft
"""
config = get_config()
# Get user's team (CACHED via @cached_single_item)
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_current_season
)
if not team:
embed = await create_pick_illegal_embed(
"Not a GM",
"You are not registered as a team owner."
)
await interaction.followup.send(embed=embed)
return
# Get draft state
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = await create_pick_illegal_embed(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed)
return
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
if not current_pick or not current_pick.owner:
embed = await create_pick_illegal_embed(
"Invalid Pick",
f"Could not retrieve pick #{draft_data.currentpick}."
)
await interaction.followup.send(embed=embed)
return
# Validate user is on the clock
if current_pick.owner.id != team.id:
# TODO: Check for skipped picks
embed = await create_pick_illegal_embed(
"Not Your Turn",
f"{current_pick.owner.sname} is on the clock for {format_pick_display(current_pick.overall)}."
)
await interaction.followup.send(embed=embed)
return
# Get player
players = await player_service.get_players_by_name(player_name, config.sba_current_season)
if not players:
embed = await create_pick_illegal_embed(
"Player Not Found",
f"Could not find player '{player_name}'."
)
await interaction.followup.send(embed=embed)
return
player_obj = players[0]
# Validate player is FA
if player_obj.team_id != config.free_agent_team_id:
embed = await create_pick_illegal_embed(
"Player Not Available",
f"{player_obj.name} is not a free agent."
)
await interaction.followup.send(embed=embed)
return
# Validate cap space
roster = await team_service.get_team_roster(team.id, 'current')
if not roster:
embed = await create_pick_illegal_embed(
"Roster Error",
f"Could not retrieve roster for {team.abbrev}."
)
await interaction.followup.send(embed=embed)
return
is_valid, projected_total = await validate_cap_space(roster, player_obj.wara)
if not is_valid:
embed = await create_pick_illegal_embed(
"Cap Space Exceeded",
f"Drafting {player_obj.name} would put you at {projected_total:.2f} sWAR (limit: {config.swar_cap_limit:.2f})."
)
await interaction.followup.send(embed=embed)
return
# Execute pick
updated_pick = await draft_pick_service.update_pick_selection(
current_pick.id,
player_obj.id
)
if not updated_pick:
embed = await create_pick_illegal_embed(
"Pick Failed",
"Failed to update draft pick. Please try again."
)
await interaction.followup.send(embed=embed)
return
# Update player team
updated_player = await player_service.update_player_team(
player_obj.id,
team.id
)
if not updated_player:
self.logger.error(f"Failed to update player {player_obj.id} team")
# Send success message
success_embed = await create_pick_success_embed(
player_obj,
team,
current_pick.overall,
projected_total
)
await interaction.followup.send(embed=success_embed)
# Post draft card to ping channel
if draft_data.ping_channel:
guild = interaction.guild
if guild:
ping_channel = guild.get_channel(draft_data.ping_channel)
if ping_channel:
draft_card = await create_player_draft_card(player_obj, current_pick)
await ping_channel.send(embed=draft_card)
# Advance to next pick
await draft_service.advance_pick(draft_data.id, draft_data.currentpick)
self.logger.info(
f"Draft pick completed: {team.abbrev} selected {player_obj.name} "
f"(pick #{current_pick.overall})"
)
async def setup(bot: commands.Bot):
"""Load the draft picks cog."""
await bot.add_cog(DraftPicksCog(bot))

147
commands/draft/status.py Normal file
View File

@ -0,0 +1,147 @@
"""
Draft Status Commands
Display current draft state and information.
"""
import discord
from discord.ext import commands
from config import get_config
from services.draft_service import draft_service
from services.draft_pick_service import draft_pick_service
from utils.logging import get_contextual_logger
from utils.decorators import logged_command
from views.draft_views import create_draft_status_embed, create_on_the_clock_embed
from views.embeds import EmbedTemplate
class DraftStatusCommands(commands.Cog):
"""Draft status display command handlers."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftStatusCommands')
@discord.app_commands.command(
name="draft-status",
description="View current draft state and timer information"
)
@logged_command("/draft-status")
async def draft_status(self, interaction: discord.Interaction):
"""Display current draft state."""
await interaction.response.defer()
config = get_config()
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
if not current_pick:
embed = EmbedTemplate.error(
"Pick Not Found",
f"Could not retrieve pick #{draft_data.currentpick}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Check pick lock status
draft_picks_cog = self.bot.get_cog('DraftPicksCog')
lock_status = "🔓 No pick in progress"
if draft_picks_cog and draft_picks_cog.pick_lock.locked():
if draft_picks_cog.lock_acquired_by:
user = self.bot.get_user(draft_picks_cog.lock_acquired_by)
user_name = user.name if user else f"User {draft_picks_cog.lock_acquired_by}"
lock_status = f"🔒 Pick in progress by {user_name}"
else:
lock_status = "🔒 Pick in progress (system)"
# Create status embed
embed = await create_draft_status_embed(draft_data, current_pick, lock_status)
await interaction.followup.send(embed=embed)
@discord.app_commands.command(
name="draft-on-clock",
description="View detailed 'on the clock' information"
)
@logged_command("/draft-on-clock")
async def draft_on_clock(self, interaction: discord.Interaction):
"""Display detailed 'on the clock' information with recent and upcoming picks."""
await interaction.response.defer()
config = get_config()
# Get draft data
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = EmbedTemplate.error(
"Draft Not Found",
"Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
if not current_pick or not current_pick.owner:
embed = EmbedTemplate.error(
"Pick Not Found",
f"Could not retrieve pick #{draft_data.currentpick}."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Get recent picks
recent_picks = await draft_pick_service.get_recent_picks(
config.sba_current_season,
draft_data.currentpick,
limit=5
)
# Get upcoming picks
upcoming_picks = await draft_pick_service.get_upcoming_picks(
config.sba_current_season,
draft_data.currentpick,
limit=5
)
# Get team roster sWAR (optional)
from services.team_service import team_service
team_roster_swar = None
roster = await team_service.get_team_roster(current_pick.owner.id, 'current')
if roster and roster.get('active'):
team_roster_swar = roster['active'].get('WARa')
# Create on the clock embed
embed = await create_on_the_clock_embed(
current_pick,
draft_data,
recent_picks,
upcoming_picks,
team_roster_swar
)
await interaction.followup.send(embed=embed)
async def setup(bot: commands.Bot):
"""Load the draft status commands cog."""
await bot.add_cog(DraftStatusCommands(bot))

View File

@ -156,7 +156,7 @@ class ScorebugCommands(commands.Cog):
"""
Display the current scorebug from the scorecard published in this channel.
"""
await interaction.response.defer()
await interaction.response.defer(ephemeral=True)
# Check if a scorecard is published in this channel
sheet_url = self.scorecard_tracker.get_scorecard(interaction.channel_id) # type: ignore

View File

@ -60,12 +60,18 @@ This directory contains Discord slash commands for creating and managing voice c
- **Role Integration**: Finds Discord roles matching team full names (`team.lname`)
### Automatic Cleanup System
- **Monitoring Interval**: Configurable (default: 60 seconds)
- **Empty Threshold**: Configurable (default: 5 minutes empty before deletion)
- **Monitoring Interval**: 1 minute (using `@tasks.loop` pattern)
- **Empty Threshold**: 5 minutes empty before deletion
- **Restart Resilience**: JSON file persistence survives bot restarts
- **Startup Verification**: Validates tracked channels still exist on bot startup
- **Safe Startup**: Uses `@before_loop` to wait for bot readiness before starting
- **Startup Verification**: Validates tracked channels still exist and cleans stale entries on bot startup
- **Manual Deletion Handling**: Detects manually deleted channels and cleans up tracking
- **Graceful Error Handling**: Continues operation even if individual operations fail
- **Scorecard Cleanup**: Automatically unpublishes scorecards when associated voice channels are deleted
- **Scorecard Cleanup**: Automatically unpublishes scorecards in all cleanup scenarios:
- Normal cleanup (channel empty for 5+ minutes)
- Manual deletion (user deletes channel)
- Startup verification (stale entries on bot restart)
- Wrong channel type (corrupted tracking data)
## Architecture
@ -114,9 +120,11 @@ overwrites = {
### Cleanup Service Integration
```python
# Bot initialization (bot.py)
from commands.voice.cleanup_service import VoiceChannelCleanupService
self.voice_cleanup_service = VoiceChannelCleanupService()
asyncio.create_task(self.voice_cleanup_service.start_monitoring(self))
from commands.voice.cleanup_service import setup_voice_cleanup
self.voice_cleanup_service = setup_voice_cleanup(self)
# The service uses @tasks.loop pattern with @before_loop
# It automatically waits for bot readiness before starting
# Channel tracking
if hasattr(self.bot, 'voice_cleanup_service'):
@ -124,24 +132,44 @@ if hasattr(self.bot, 'voice_cleanup_service'):
cleanup_service.tracker.add_channel(channel, channel_type, interaction.user.id)
```
### Scorecard Cleanup Integration
When a voice channel is cleaned up (deleted after being empty for the configured threshold), the cleanup service automatically unpublishes any scorecard associated with that voice channel's text channel. This prevents the live scorebug tracker from continuing to update scores for games that no longer have active voice channels.
**Task Lifecycle**:
- **Initialization**: `VoiceChannelCleanupService(bot)` creates instance
- **Startup Wait**: `@before_loop` ensures bot is ready before first cycle
- **Verification**: First cycle runs `verify_tracked_channels()` to clean stale entries
- **Monitoring**: Runs every 1 minute checking all tracked channels
- **Shutdown**: `cog_unload()` cancels the cleanup loop gracefully
**Cleanup Flow**:
1. Voice channel becomes empty and exceeds empty threshold
2. Cleanup service deletes the voice channel
3. Service checks if voice channel has associated `text_channel_id`
4. If found, unpublishes scorecard from that text channel
5. Live scorebug tracker stops updating that scorecard
### Scorecard Cleanup Integration
The cleanup service automatically unpublishes any scorecard associated with a voice channel when that channel is removed from tracking. This prevents the live scorebug tracker from continuing to update scores for games that no longer have active voice channels.
**Cleanup Scenarios**:
1. **Normal Cleanup** (channel empty for 5+ minutes):
- Cleanup service deletes the voice channel
- Unpublishes associated scorecard
- Logs: `"📋 Unpublished scorecard from text channel [id] (voice channel cleanup)"`
2. **Manual Deletion** (user deletes channel):
- Next cleanup cycle detects missing channel
- Removes from tracking and unpublishes scorecard
- Logs: `"📋 Unpublished scorecard from text channel [id] (manually deleted voice channel)"`
3. **Startup Verification** (stale entries on bot restart):
- Bot startup detects channels that no longer exist
- Cleans up tracking and unpublishes scorecards
- Logs: `"📋 Unpublished scorecard from text channel [id] (stale voice channel)"`
4. **Wrong Channel Type** (corrupted tracking data):
- Tracked channel exists but is not a voice channel
- Removes from tracking and unpublishes scorecard
- Logs: `"📋 Unpublished scorecard from text channel [id] (wrong channel type)"`
**Integration Points**:
- `cleanup_service.py` imports `ScorecardTracker` from `commands.gameplay.scorecard_tracker`
- Scorecard unpublishing happens in three scenarios:
- Normal cleanup (channel deleted after being empty)
- Stale channel cleanup (channel already deleted externally)
- Startup verification (channel no longer exists when bot starts)
- All cleanup paths check for `text_channel_id` and unpublish if found
- Recovery time: Maximum 1 minute delay for manual deletions
**Logging**:
**Example Logging**:
```
✅ Cleaned up empty voice channel: Gameplay Phoenix (ID: 123456789)
📋 Unpublished scorecard from text channel 987654321 (voice channel cleanup)
@ -171,9 +199,10 @@ When a voice channel is cleaned up (deleted after being empty for the configured
## Configuration
### Cleanup Service Settings
- **`cleanup_interval`**: How often to check channels (default: 60 seconds)
- **`empty_threshold`**: Minutes empty before deletion (default: 5 minutes)
- **Monitoring Loop**: `@tasks.loop(minutes=1)` - runs every 1 minute
- **`empty_threshold`**: 5 minutes empty before deletion
- **`data_file`**: JSON persistence file path (default: "data/voice_channels.json")
- **Task Pattern**: Uses discord.py `@tasks.loop` with `@before_loop` for safe startup
### Channel Categories
- Channels are created in the "Voice Channels" category if it exists

View File

@ -3,14 +3,14 @@ Voice Channel Cleanup Service
Provides automatic cleanup of empty voice channels with restart resilience.
"""
import asyncio
import logging
import discord
from discord.ext import commands
from discord.ext import commands, tasks
from .tracker import VoiceChannelTracker
from commands.gameplay.scorecard_tracker import ScorecardTracker
from utils.logging import get_contextual_logger
logger = logging.getLogger(f'{__name__}.VoiceChannelCleanupService')
@ -27,52 +27,49 @@ class VoiceChannelCleanupService:
- Automatic scorecard unpublishing when voice channel is cleaned up
"""
def __init__(self, data_file: str = "data/voice_channels.json"):
def __init__(self, bot: commands.Bot, data_file: str = "data/voice_channels.json"):
"""
Initialize the cleanup service.
Args:
bot: Discord bot instance
data_file: Path to the JSON data file for persistence
"""
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.VoiceChannelCleanupService')
self.tracker = VoiceChannelTracker(data_file)
self.scorecard_tracker = ScorecardTracker()
self.cleanup_interval = 60 # 5 minutes check interval
self.empty_threshold = 5 # Delete after 15 minutes empty
self._running = False
self.empty_threshold = 5 # Delete after 5 minutes empty
async def start_monitoring(self, bot: commands.Bot) -> None:
# Start the cleanup task - @before_loop will wait for bot readiness
self.cleanup_loop.start()
self.logger.info("Voice channel cleanup service initialized")
def cog_unload(self):
"""Stop the task when service is unloaded."""
self.cleanup_loop.cancel()
self.logger.info("Voice channel cleanup service stopped")
@tasks.loop(minutes=1)
async def cleanup_loop(self):
"""
Start the cleanup monitoring loop.
Main cleanup loop - runs every minute.
Args:
bot: Discord bot instance
Checks all tracked channels and cleans up empty ones.
"""
if self._running:
logger.warning("Cleanup service is already running")
return
try:
await self.cleanup_cycle(self.bot)
except Exception as e:
self.logger.error(f"Cleanup cycle error: {e}", exc_info=True)
self._running = True
logger.info("Starting voice channel cleanup service")
@cleanup_loop.before_loop
async def before_cleanup_loop(self):
"""Wait for bot to be ready before starting - REQUIRED FOR SAFE STARTUP."""
await self.bot.wait_until_ready()
self.logger.info("Bot is ready, voice cleanup service starting")
# On startup, verify tracked channels still exist and clean up stale entries
await self.verify_tracked_channels(bot)
# Start the monitoring loop
while self._running:
try:
await self.cleanup_cycle(bot)
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
logger.error(f"Cleanup cycle error: {e}", exc_info=True)
# Use shorter retry interval on errors
await asyncio.sleep(60)
logger.info("Voice channel cleanup service stopped")
def stop_monitoring(self) -> None:
"""Stop the cleanup monitoring loop."""
self._running = False
logger.info("Stopping voice channel cleanup service")
await self.verify_tracked_channels(self.bot)
async def verify_tracked_channels(self, bot: commands.Bot) -> None:
"""
@ -81,7 +78,7 @@ class VoiceChannelCleanupService:
Args:
bot: Discord bot instance
"""
logger.info("Verifying tracked voice channels on startup")
self.logger.info("Verifying tracked voice channels on startup")
valid_channel_ids = []
channels_to_remove = []
@ -93,13 +90,13 @@ class VoiceChannelCleanupService:
guild = bot.get_guild(guild_id)
if not guild:
logger.warning(f"Guild {guild_id} not found, removing channel {channel_data['name']}")
self.logger.warning(f"Guild {guild_id} not found, removing channel {channel_data['name']}")
channels_to_remove.append(channel_id)
continue
channel = guild.get_channel(channel_id)
if not channel:
logger.warning(f"Channel {channel_data['name']} (ID: {channel_id}) no longer exists")
self.logger.warning(f"Channel {channel_data['name']} (ID: {channel_id}) no longer exists")
channels_to_remove.append(channel_id)
continue
@ -107,7 +104,7 @@ class VoiceChannelCleanupService:
valid_channel_ids.append(channel_id)
except (ValueError, TypeError, KeyError) as e:
logger.warning(f"Invalid channel data: {e}, removing entry")
self.logger.warning(f"Invalid channel data: {e}, removing entry")
if "channel_id" in channel_data:
try:
channels_to_remove.append(int(channel_data["channel_id"]))
@ -126,18 +123,18 @@ class VoiceChannelCleanupService:
text_channel_id_int = int(channel_data["text_channel_id"])
was_unpublished = self.scorecard_tracker.unpublish_scorecard(text_channel_id_int)
if was_unpublished:
logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (stale voice channel)")
self.logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (stale voice channel)")
except (ValueError, TypeError) as e:
logger.warning(f"Invalid text_channel_id in stale voice channel data: {e}")
self.logger.warning(f"Invalid text_channel_id in stale voice channel data: {e}")
# Also clean up any additional stale entries
stale_removed = self.tracker.cleanup_stale_entries(valid_channel_ids)
total_removed = len(channels_to_remove) + stale_removed
if total_removed > 0:
logger.info(f"Cleaned up {total_removed} stale channel tracking entries")
self.logger.info(f"Cleaned up {total_removed} stale channel tracking entries")
logger.info(f"Verified {len(valid_channel_ids)} valid tracked channels")
self.logger.info(f"Verified {len(valid_channel_ids)} valid tracked channels")
async def cleanup_cycle(self, bot: commands.Bot) -> None:
"""
@ -146,7 +143,7 @@ class VoiceChannelCleanupService:
Args:
bot: Discord bot instance
"""
logger.debug("Starting cleanup cycle")
self.logger.debug("Starting cleanup cycle")
# Update status of all tracked channels
await self.update_all_channel_statuses(bot)
@ -155,7 +152,7 @@ class VoiceChannelCleanupService:
channels_for_cleanup = self.tracker.get_channels_for_cleanup(self.empty_threshold)
if channels_for_cleanup:
logger.info(f"Found {len(channels_for_cleanup)} channels ready for cleanup")
self.logger.info(f"Found {len(channels_for_cleanup)} channels ready for cleanup")
# Delete empty channels
for channel_data in channels_for_cleanup:
@ -185,30 +182,54 @@ class VoiceChannelCleanupService:
guild = bot.get_guild(guild_id)
if not guild:
logger.debug(f"Guild {guild_id} not found for channel {channel_data['name']}")
self.logger.debug(f"Guild {guild_id} not found for channel {channel_data['name']}")
return
channel = guild.get_channel(channel_id)
if not channel:
logger.debug(f"Channel {channel_data['name']} no longer exists, will be cleaned up")
self.logger.debug(f"Channel {channel_data['name']} no longer exists, removing from tracking")
self.tracker.remove_channel(channel_id)
# Unpublish associated scorecard if it exists
text_channel_id = channel_data.get("text_channel_id")
if text_channel_id:
try:
text_channel_id_int = int(text_channel_id)
was_unpublished = self.scorecard_tracker.unpublish_scorecard(text_channel_id_int)
if was_unpublished:
self.logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (manually deleted voice channel)")
except (ValueError, TypeError) as e:
self.logger.warning(f"Invalid text_channel_id in manually deleted voice channel data: {e}")
return
# Ensure it's a voice channel before checking members
if not isinstance(channel, discord.VoiceChannel):
logger.warning(f"Channel {channel_data['name']} is not a voice channel, removing from tracking")
self.logger.warning(f"Channel {channel_data['name']} is not a voice channel, removing from tracking")
self.tracker.remove_channel(channel_id)
# Unpublish associated scorecard if it exists
text_channel_id = channel_data.get("text_channel_id")
if text_channel_id:
try:
text_channel_id_int = int(text_channel_id)
was_unpublished = self.scorecard_tracker.unpublish_scorecard(text_channel_id_int)
if was_unpublished:
self.logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (wrong channel type)")
except (ValueError, TypeError) as e:
self.logger.warning(f"Invalid text_channel_id in wrong channel type data: {e}")
return
# Check if channel is empty
is_empty = len(channel.members) == 0
self.tracker.update_channel_status(channel_id, is_empty)
logger.debug(f"Channel {channel_data['name']}: {'empty' if is_empty else 'occupied'} "
self.logger.debug(f"Channel {channel_data['name']}: {'empty' if is_empty else 'occupied'} "
f"({len(channel.members)} members)")
except Exception as e:
logger.error(f"Error checking channel status for {channel_data.get('name', 'unknown')}: {e}")
self.logger.error(f"Error checking channel status for {channel_data.get('name', 'unknown')}: {e}")
async def cleanup_channel(self, bot: commands.Bot, channel_data: dict) -> None:
"""
@ -225,33 +246,33 @@ class VoiceChannelCleanupService:
guild = bot.get_guild(guild_id)
if not guild:
logger.info(f"Guild {guild_id} not found, removing tracking for {channel_name}")
self.logger.info(f"Guild {guild_id} not found, removing tracking for {channel_name}")
self.tracker.remove_channel(channel_id)
return
channel = guild.get_channel(channel_id)
if not channel:
logger.info(f"Channel {channel_name} already deleted, removing from tracking")
self.logger.info(f"Channel {channel_name} already deleted, removing from tracking")
self.tracker.remove_channel(channel_id)
return
# Ensure it's a voice channel before checking members
if not isinstance(channel, discord.VoiceChannel):
logger.warning(f"Channel {channel_name} is not a voice channel, removing from tracking")
self.logger.warning(f"Channel {channel_name} is not a voice channel, removing from tracking")
self.tracker.remove_channel(channel_id)
return
# Final check: make sure channel is still empty before deleting
if len(channel.members) > 0:
logger.info(f"Channel {channel_name} is no longer empty, skipping cleanup")
self.logger.info(f"Channel {channel_name} is no longer empty, skipping cleanup")
self.tracker.update_channel_status(channel_id, False)
return
# Delete the channel
await channel.delete(reason="Automatic cleanup - empty for 15+ minutes")
await channel.delete(reason="Automatic cleanup - empty for 5+ minutes")
self.tracker.remove_channel(channel_id)
logger.info(f"✅ Cleaned up empty voice channel: {channel_name} (ID: {channel_id})")
self.logger.info(f"✅ Cleaned up empty voice channel: {channel_name} (ID: {channel_id})")
# Unpublish associated scorecard if it exists
text_channel_id = channel_data.get("text_channel_id")
@ -260,15 +281,15 @@ class VoiceChannelCleanupService:
text_channel_id_int = int(text_channel_id)
was_unpublished = self.scorecard_tracker.unpublish_scorecard(text_channel_id_int)
if was_unpublished:
logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (voice channel cleanup)")
self.logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (voice channel cleanup)")
else:
logger.debug(f"No scorecard found for text channel {text_channel_id_int}")
self.logger.debug(f"No scorecard found for text channel {text_channel_id_int}")
except (ValueError, TypeError) as e:
logger.warning(f"Invalid text_channel_id in voice channel data: {e}")
self.logger.warning(f"Invalid text_channel_id in voice channel data: {e}")
except discord.NotFound:
# Channel was already deleted
logger.info(f"Channel {channel_data.get('name', 'unknown')} was already deleted")
self.logger.info(f"Channel {channel_data.get('name', 'unknown')} was already deleted")
self.tracker.remove_channel(int(channel_data["channel_id"]))
# Still try to unpublish associated scorecard
@ -278,13 +299,13 @@ class VoiceChannelCleanupService:
text_channel_id_int = int(text_channel_id)
was_unpublished = self.scorecard_tracker.unpublish_scorecard(text_channel_id_int)
if was_unpublished:
logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (stale voice channel cleanup)")
self.logger.info(f"📋 Unpublished scorecard from text channel {text_channel_id_int} (stale voice channel cleanup)")
except (ValueError, TypeError) as e:
logger.warning(f"Invalid text_channel_id in voice channel data: {e}")
self.logger.warning(f"Invalid text_channel_id in voice channel data: {e}")
except discord.Forbidden:
logger.error(f"Missing permissions to delete channel {channel_data.get('name', 'unknown')}")
self.logger.error(f"Missing permissions to delete channel {channel_data.get('name', 'unknown')}")
except Exception as e:
logger.error(f"Error cleaning up channel {channel_data.get('name', 'unknown')}: {e}")
self.logger.error(f"Error cleaning up channel {channel_data.get('name', 'unknown')}: {e}")
def get_tracker(self) -> VoiceChannelTracker:
"""
@ -306,9 +327,21 @@ class VoiceChannelCleanupService:
empty_channels = [ch for ch in all_channels if ch.get("empty_since")]
return {
"running": self._running,
"running": self.cleanup_loop.is_running(),
"total_tracked": len(all_channels),
"empty_channels": len(empty_channels),
"cleanup_interval": self.cleanup_interval,
"empty_threshold": self.empty_threshold
}
}
def setup_voice_cleanup(bot: commands.Bot) -> VoiceChannelCleanupService:
"""
Setup function to initialize the voice channel cleanup service.
Args:
bot: Discord bot instance
Returns:
VoiceChannelCleanupService instance
"""
return VoiceChannelCleanupService(bot)

View File

@ -47,6 +47,10 @@ class BotConfig(BaseSettings):
# Draft Constants
default_pick_minutes: int = 10
draft_rounds: int = 32
draft_team_count: int = 16 # Number of teams in draft
draft_linear_rounds: int = 10 # Rounds 1-10 are linear, 11+ are snake
swar_cap_limit: float = 32.00 # Maximum sWAR cap for team roster
cap_player_count: int = 26 # Number of players that count toward cap
# Special Team IDs
free_agent_team_id: int = 498
@ -94,6 +98,11 @@ class BotConfig(BaseSettings):
"""Check if running in test mode."""
return self.testing
@property
def draft_total_picks(self) -> int:
"""Calculate total picks in draft (derived value)."""
return self.draft_rounds * self.draft_team_count
# Global configuration instance - lazily initialized to avoid import-time errors
_config = None

View File

@ -12,18 +12,20 @@ from models.base import SBABaseModel
class DraftData(SBABaseModel):
"""Draft configuration and state model."""
currentpick: int = Field(0, description="Current pick number in progress")
timer: bool = Field(False, description="Whether draft timer is active")
pick_deadline: Optional[datetime] = Field(None, description="Deadline for current pick")
result_channel_id: int = Field(..., description="Discord channel ID for draft results")
ping_channel_id: int = Field(..., description="Discord channel ID for draft pings")
result_channel: Optional[int] = Field(None, description="Discord channel ID for draft results")
ping_channel: Optional[int] = Field(None, description="Discord channel ID for draft pings")
pick_minutes: int = Field(1, description="Minutes allowed per pick")
@field_validator("result_channel_id", "ping_channel_id", mode="before")
@field_validator("result_channel", "ping_channel", mode="before")
@classmethod
def cast_channel_ids_to_int(cls, v):
"""Ensure channel IDs are integers."""
"""Ensure channel IDs are integers (database stores as string)."""
if v is None:
return None
if isinstance(v, str):
return int(v)
return v

View File

@ -13,22 +13,28 @@ from models.player import Player
class DraftList(SBABaseModel):
"""Draft preference list entry for a team."""
season: int = Field(..., description="Draft season")
team_id: int = Field(..., description="Team ID that owns this list entry")
rank: int = Field(..., description="Ranking of player on team's draft board")
player_id: int = Field(..., description="Player ID on the draft board")
# Related objects (populated when needed)
team: Optional[Team] = Field(None, description="Team object (populated when needed)")
player: Optional[Player] = Field(None, description="Player object (populated when needed)")
# API returns nested objects (not just IDs)
team: Team = Field(..., description="Team object")
player: Player = Field(..., description="Player object")
@property
def team_id(self) -> int:
"""Extract team ID from nested team object."""
return self.team.id
@property
def player_id(self) -> int:
"""Extract player ID from nested player object."""
return self.player.id
@property
def is_top_ranked(self) -> bool:
"""Check if this is the team's top-ranked available player."""
return self.rank == 1
def __str__(self):
team_str = self.team.abbrev if self.team else f"Team {self.team_id}"
player_str = self.player.name if self.player else f"Player {self.player_id}"
return f"{team_str} Draft Board #{self.rank}: {player_str}"
return f"{self.team.abbrev} Draft Board #{self.rank}: {self.player.name}"

View File

@ -200,7 +200,8 @@ The `TeamService` provides team data operations with specific method names:
```python
class TeamService(BaseService[Team]):
async def get_team(team_id: int) -> Optional[Team] # ✅ Correct method name
async def get_team(team_id: int) -> Optional[Team] # ✅ Correct method name - CACHED
async def get_team_by_owner(owner_id: int, season: Optional[int]) -> Optional[Team] # NEW - CACHED
async def get_teams_by_owner(owner_id: int, season: Optional[int], roster_type: Optional[str]) -> List[Team]
async def get_team_by_abbrev(abbrev: str, season: Optional[int]) -> Optional[Team]
async def get_teams_by_season(season: int) -> List[Team]
@ -213,6 +214,36 @@ class TeamService(BaseService[Team]):
This naming inconsistency was fixed in `services/trade_builder.py` line 201 and corresponding test mocks.
#### TeamService Caching Strategy (October 2025)
**Cached Methods** (30-minute TTL with `@cached_single_item`):
- `get_team(team_id)` - Returns `Optional[Team]`
- `get_team_by_owner(owner_id, season)` - Returns `Optional[Team]` (NEW convenience method for GM validation)
**Rationale:** GM assignments and team details rarely change during a season. These methods are called on every command for GM validation, making them ideal candidates for caching. The 30-minute TTL balances freshness with performance.
**Cache Keys:**
- `team:id:{team_id}`
- `team:owner:{season}:{owner_id}`
**Performance Impact:** Reduces API calls by ~80% during active bot usage, with cache hits taking <1ms vs 50-200ms for API calls.
**Not Cached:**
- `get_teams_by_owner(...)` with `roster_type` parameter - Returns `List[Team]`, more flexible query
- `get_teams_by_season(season)` - Team list may change during operations (keepers, expansions)
- `get_team_by_abbrev(abbrev, season)` - Less frequently used, not worth caching overhead
**Future Cache Invalidation:**
When implementing team ownership transfers or team modifications, use:
```python
from utils.decorators import cache_invalidate
@cache_invalidate("team:owner:*", "team:id:*")
async def transfer_ownership(old_owner_id: int, new_owner_id: int):
# ... ownership change logic ...
# Caches automatically cleared by decorator
```
### Transaction Services
- **`transaction_service.py`** - Player transaction operations (trades, waivers, etc.)
- **`transaction_builder.py`** - Complex transaction building and validation
@ -295,6 +326,11 @@ updated_player = await player_service.update_player_team(
print(f"{updated_player.name} now on team {updated_player.team_id}")
```
### Draft System Services (NEW - October 2025)
- **`draft_service.py`** - Core draft logic and state management (NO CACHING)
- **`draft_pick_service.py`** - Draft pick CRUD operations (NO CACHING)
- **`draft_list_service.py`** - Auto-draft queue management (NO CACHING)
### Game Submission Services (NEW - January 2025)
- **`game_service.py`** - Game CRUD operations and scorecard submission support
- **`play_service.py`** - Play-by-play data management for game submissions
@ -370,6 +406,56 @@ except APIException as e:
await play_service.delete_plays_for_game(game_id)
```
#### Draft System Services Key Methods (October 2025)
**CRITICAL: Draft services do NOT use caching** because draft data changes every 2-12 minutes during active drafts.
```python
class DraftService(BaseService[DraftData]):
# NO @cached_api_call or @cached_single_item decorators
async def get_draft_data() -> Optional[DraftData]
async def set_timer(draft_id: int, active: bool, pick_minutes: Optional[int]) -> Optional[DraftData]
async def advance_pick(draft_id: int, current_pick: int) -> Optional[DraftData]
async def set_current_pick(draft_id: int, overall: int, reset_timer: bool) -> Optional[DraftData]
async def update_channels(draft_id: int, ping_channel_id: Optional[int], result_channel_id: Optional[int]) -> Optional[DraftData]
class DraftPickService(BaseService[DraftPick]):
# NO caching decorators
async def get_pick(season: int, overall: int) -> Optional[DraftPick]
async def get_picks_by_team(season: int, team_id: int, round_start: int, round_end: int) -> List[DraftPick]
async def get_available_picks(season: int, overall_start: Optional[int], overall_end: Optional[int]) -> List[DraftPick]
async def get_recent_picks(season: int, overall_end: int, limit: int) -> List[DraftPick]
async def update_pick_selection(pick_id: int, player_id: int) -> Optional[DraftPick]
async def clear_pick_selection(pick_id: int) -> Optional[DraftPick]
class DraftListService(BaseService[DraftList]):
# NO caching decorators
async def get_team_list(season: int, team_id: int) -> List[DraftList]
async def add_to_list(season: int, team_id: int, player_id: int, rank: Optional[int]) -> Optional[DraftList]
async def remove_from_list(entry_id: int) -> bool
async def clear_list(season: int, team_id: int) -> bool
async def move_entry_up(season: int, team_id: int, player_id: int) -> bool
async def move_entry_down(season: int, team_id: int, player_id: int) -> bool
```
**Why No Caching:**
Draft data is highly dynamic during active drafts. Stale cache would cause:
- Wrong team shown as "on the clock"
- Incorrect pick deadlines
- Duplicate player selections
- Timer state mismatches
**Architecture Integration:**
- **Global Pick Lock**: Commands hold `asyncio.Lock` in cog instance (not database)
- **Background Monitor**: `tasks/draft_monitor.py` respects same lock for auto-draft
- **Self-Terminating Task**: Monitor stops when `draft_data.timer = False`
- **Resource Efficient**: No background task running 50+ weeks per year
**Draft Format:**
- Rounds 1-10: Linear (same order every round)
- Rounds 11+: Snake (reverse on even rounds)
- Special rule: Round 11 Pick 1 = same team as Round 10 Pick 16
### Custom Features
- **`custom_commands_service.py`** - User-created custom Discord commands
- **`help_commands_service.py`** - Admin-managed help system and documentation

View File

@ -0,0 +1,532 @@
"""
Draft list service for Discord Bot v2.0
Handles team draft list (auto-draft queue) operations. NO CACHING - lists change frequently.
"""
import logging
from typing import Optional, List
from services.base_service import BaseService
from models.draft_list import DraftList
from exceptions import APIException
logger = logging.getLogger(f'{__name__}.DraftListService')
class DraftListService(BaseService[DraftList]):
"""
Service for draft list operations.
IMPORTANT: This service does NOT use caching decorators because draft lists
change as users add/remove players from their auto-draft queues.
API QUIRK: GET endpoint returns items under 'picks' key, not 'draftlist'.
POST endpoint expects items under 'draft_list' key.
Features:
- Get team's draft list (ranked by priority)
- Add player to draft list
- Remove player from draft list
- Reorder draft list
- Clear entire draft list
"""
def __init__(self):
"""Initialize draft list service."""
super().__init__(DraftList, 'draftlist')
logger.debug("DraftListService initialized")
def _extract_items_and_count_from_response(self, data):
"""
Override to handle API quirk: GET returns 'picks' instead of 'draftlist'.
Args:
data: API response data
Returns:
Tuple of (items list, total count)
"""
from typing import Any, Dict, List, Tuple
if isinstance(data, list):
return data, len(data)
if not isinstance(data, dict):
logger.warning(f"Unexpected response format: {type(data)}")
return [], 0
# Get count
count = data.get('count', 0)
# API returns items under 'picks' key (not 'draftlist')
if 'picks' in data and isinstance(data['picks'], list):
return data['picks'], count or len(data['picks'])
# Fallback to standard extraction
return super()._extract_items_and_count_from_response(data)
async def get_team_list(
self,
season: int,
team_id: int
) -> List[DraftList]:
"""
Get team's draft list ordered by rank.
NOT cached - teams update their lists frequently during draft.
Args:
season: Draft season
team_id: Team ID
Returns:
List of DraftList entries ordered by rank (1 = highest priority)
"""
try:
params = [
('season', str(season)),
('team_id', str(team_id)),
('sort', 'rank-asc') # Order by priority
]
entries = await self.get_all_items(params=params)
logger.debug(f"Found {len(entries)} draft list entries for team {team_id}")
return entries
except Exception as e:
logger.error(f"Error getting draft list for team {team_id}: {e}")
return []
async def add_to_list(
self,
season: int,
team_id: int,
player_id: int,
rank: Optional[int] = None
) -> Optional[List[DraftList]]:
"""
Add player to team's draft list.
If rank is not provided, adds to end of list.
NOTE: The API uses bulk replacement - we get the full list, add the new entry,
and POST the entire updated list back.
Args:
season: Draft season
team_id: Team ID
player_id: Player ID to add
rank: Priority rank (1 = highest), None = add to end
Returns:
Full updated draft list or None if operation failed
"""
try:
# Get current list
current_list = await self.get_team_list(season, team_id)
# If rank not provided, add to end
if rank is None:
rank = len(current_list) + 1
# Create new entry data
new_entry_data = {
'season': season,
'team_id': team_id,
'player_id': player_id,
'rank': rank
}
# Build complete list for bulk replacement
draft_list_entries = []
# Add existing entries, adjusting ranks if inserting in middle
for entry in current_list:
if entry.rank >= rank:
# Shift down entries at or after insertion point
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': entry.rank + 1
})
else:
# Keep existing rank for entries before insertion point
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': entry.rank
})
# Add new entry
draft_list_entries.append(new_entry_data)
# Sort by rank for consistency
draft_list_entries.sort(key=lambda x: x['rank'])
# POST entire list (bulk replacement)
client = await self.get_client()
payload = {
'count': len(draft_list_entries),
'draft_list': draft_list_entries
}
logger.debug(f"Posting draft list for team {team_id}: {len(draft_list_entries)} entries")
response = await client.post(self.endpoint, payload)
logger.debug(f"POST response: {response}")
# Verify by fetching the list back (API returns full objects)
verification = await self.get_team_list(season, team_id)
logger.debug(f"Verification: found {len(verification)} entries after POST")
# Verify the player was added
if not any(entry.player_id == player_id for entry in verification):
logger.error(f"Player {player_id} not found in list after POST - operation may have failed")
return None
logger.info(f"Added player {player_id} to team {team_id} draft list at rank {rank}")
return verification # Return full updated list
except Exception as e:
logger.error(f"Error adding player {player_id} to draft list: {e}")
return None
async def remove_from_list(
self,
entry_id: int
) -> bool:
"""
Remove entry from draft list by ID.
NOTE: No DELETE endpoint exists. This method is deprecated - use remove_player_from_list() instead.
Args:
entry_id: Draft list entry database ID
Returns:
True if deletion succeeded
"""
logger.warning("remove_from_list() called with entry_id - use remove_player_from_list() instead")
return False
async def remove_player_from_list(
self,
season: int,
team_id: int,
player_id: int
) -> bool:
"""
Remove specific player from team's draft list.
Uses bulk replacement pattern - gets full list, removes player, POSTs updated list.
Args:
season: Draft season
team_id: Team ID
player_id: Player ID to remove
Returns:
True if player was found and removed
"""
try:
# Get team's list
current_list = await self.get_team_list(season, team_id)
# Check if player is in list
player_found = any(entry.player_id == player_id for entry in current_list)
if not player_found:
logger.warning(f"Player {player_id} not found in team {team_id} draft list")
return False
# Build new list without the player, adjusting ranks
draft_list_entries = []
new_rank = 1
for entry in current_list:
if entry.player_id != player_id:
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': new_rank
})
new_rank += 1
# POST updated list (bulk replacement)
client = await self.get_client()
payload = {
'count': len(draft_list_entries),
'draft_list': draft_list_entries
}
await client.post(self.endpoint, payload)
logger.info(f"Removed player {player_id} from team {team_id} draft list")
return True
except Exception as e:
logger.error(f"Error removing player {player_id} from draft list: {e}")
return False
async def clear_list(
self,
season: int,
team_id: int
) -> bool:
"""
Clear entire draft list for team.
Uses DELETE /draftlist/team/{team_id} endpoint.
Args:
season: Draft season
team_id: Team ID
Returns:
True if list was cleared successfully
"""
try:
# Check if list is already empty
entries = await self.get_team_list(season, team_id)
if not entries:
logger.debug(f"No draft list entries to clear for team {team_id}")
return True
entry_count = len(entries)
# Use DELETE endpoint: /draftlist/team/{team_id}
client = await self.get_client()
await client.delete(f"{self.endpoint}/team/{team_id}")
logger.info(f"Cleared {entry_count} draft list entries for team {team_id}")
return True
except Exception as e:
logger.error(f"Error clearing draft list for team {team_id}: {e}")
return False
async def reorder_list(
self,
season: int,
team_id: int,
new_order: List[int]
) -> bool:
"""
Reorder team's draft list.
Uses bulk replacement pattern - builds new list with updated ranks and POSTs it.
Args:
season: Draft season
team_id: Team ID
new_order: List of player IDs in desired order
Returns:
True if reordering succeeded
"""
try:
# Get current list
entries = await self.get_team_list(season, team_id)
# Build mapping of player_id -> entry
entry_map = {e.player_id: e for e in entries}
# Build new list in specified order
draft_list_entries = []
for new_rank, player_id in enumerate(new_order, start=1):
if player_id not in entry_map:
logger.warning(f"Player {player_id} not in draft list, skipping")
continue
entry = entry_map[player_id]
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': new_rank
})
# POST reordered list (bulk replacement)
client = await self.get_client()
payload = {
'count': len(draft_list_entries),
'draft_list': draft_list_entries
}
await client.post(self.endpoint, payload)
logger.info(f"Reordered draft list for team {team_id}")
return True
except Exception as e:
logger.error(f"Error reordering draft list for team {team_id}: {e}")
return False
async def move_entry_up(
self,
season: int,
team_id: int,
player_id: int
) -> bool:
"""
Move player up one position in draft list (higher priority).
Uses bulk replacement pattern - swaps ranks and POSTs updated list.
Args:
season: Draft season
team_id: Team ID
player_id: Player ID to move up
Returns:
True if move succeeded
"""
try:
entries = await self.get_team_list(season, team_id)
# Find player's current position
current_entry = None
for entry in entries:
if entry.player_id == player_id:
current_entry = entry
break
if not current_entry:
logger.warning(f"Player {player_id} not found in draft list")
return False
if current_entry.rank == 1:
logger.debug(f"Player {player_id} already at top of draft list")
return False
# Find entry above (rank - 1)
above_entry = next((e for e in entries if e.rank == current_entry.rank - 1), None)
if not above_entry:
logger.error(f"Could not find entry above rank {current_entry.rank}")
return False
# Build new list with swapped ranks
draft_list_entries = []
for entry in entries:
if entry.player_id == current_entry.player_id:
# Move this player up
new_rank = current_entry.rank - 1
elif entry.player_id == above_entry.player_id:
# Move above player down
new_rank = above_entry.rank + 1
else:
# Keep existing rank
new_rank = entry.rank
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': new_rank
})
# Sort by rank
draft_list_entries.sort(key=lambda x: x['rank'])
# POST updated list (bulk replacement)
client = await self.get_client()
payload = {
'count': len(draft_list_entries),
'draft_list': draft_list_entries
}
await client.post(self.endpoint, payload)
logger.info(f"Moved player {player_id} up to rank {current_entry.rank - 1}")
return True
except Exception as e:
logger.error(f"Error moving player {player_id} up in draft list: {e}")
return False
async def move_entry_down(
self,
season: int,
team_id: int,
player_id: int
) -> bool:
"""
Move player down one position in draft list (lower priority).
Uses bulk replacement pattern - swaps ranks and POSTs updated list.
Args:
season: Draft season
team_id: Team ID
player_id: Player ID to move down
Returns:
True if move succeeded
"""
try:
entries = await self.get_team_list(season, team_id)
# Find player's current position
current_entry = None
for entry in entries:
if entry.player_id == player_id:
current_entry = entry
break
if not current_entry:
logger.warning(f"Player {player_id} not found in draft list")
return False
if current_entry.rank == len(entries):
logger.debug(f"Player {player_id} already at bottom of draft list")
return False
# Find entry below (rank + 1)
below_entry = next((e for e in entries if e.rank == current_entry.rank + 1), None)
if not below_entry:
logger.error(f"Could not find entry below rank {current_entry.rank}")
return False
# Build new list with swapped ranks
draft_list_entries = []
for entry in entries:
if entry.player_id == current_entry.player_id:
# Move this player down
new_rank = current_entry.rank + 1
elif entry.player_id == below_entry.player_id:
# Move below player up
new_rank = below_entry.rank - 1
else:
# Keep existing rank
new_rank = entry.rank
draft_list_entries.append({
'season': entry.season,
'team_id': entry.team_id,
'player_id': entry.player_id,
'rank': new_rank
})
# Sort by rank
draft_list_entries.sort(key=lambda x: x['rank'])
# POST updated list (bulk replacement)
client = await self.get_client()
payload = {
'count': len(draft_list_entries),
'draft_list': draft_list_entries
}
await client.post(self.endpoint, payload)
logger.info(f"Moved player {player_id} down to rank {current_entry.rank + 1}")
return True
except Exception as e:
logger.error(f"Error moving player {player_id} down in draft list: {e}")
return False
# Global service instance
draft_list_service = DraftListService()

View File

@ -0,0 +1,312 @@
"""
Draft pick service for Discord Bot v2.0
Handles draft pick CRUD operations. NO CACHING - draft data changes constantly.
"""
import logging
from typing import Optional, List
from services.base_service import BaseService
from models.draft_pick import DraftPick
from exceptions import APIException
logger = logging.getLogger(f'{__name__}.DraftPickService')
class DraftPickService(BaseService[DraftPick]):
"""
Service for draft pick operations.
IMPORTANT: This service does NOT use caching decorators because draft picks
change constantly during an active draft. Always fetch fresh data.
Features:
- Get pick by overall number
- Get picks by team
- Get picks by round
- Update pick with player selection
- Query available/taken picks
"""
def __init__(self):
"""Initialize draft pick service."""
super().__init__(DraftPick, 'draftpicks')
logger.debug("DraftPickService initialized")
async def get_pick(self, season: int, overall: int) -> Optional[DraftPick]:
"""
Get specific pick by season and overall number.
NOT cached - picks change during draft.
Args:
season: Draft season
overall: Overall pick number
Returns:
DraftPick instance or None if not found
"""
try:
params = [
('season', str(season)),
('overall', str(overall))
]
picks = await self.get_all_items(params=params)
if picks:
pick = picks[0]
logger.debug(f"Found pick #{overall} for season {season}")
return pick
logger.debug(f"No pick found for season {season}, overall #{overall}")
return None
except Exception as e:
logger.error(f"Error getting pick season={season} overall={overall}: {e}")
return None
async def get_picks_by_team(
self,
season: int,
team_id: int,
round_start: int = 1,
round_end: int = 32
) -> List[DraftPick]:
"""
Get all picks owned by a team in a season.
NOT cached - picks change as they're traded.
Args:
season: Draft season
team_id: Team ID that owns the picks
round_start: Starting round (inclusive)
round_end: Ending round (inclusive)
Returns:
List of DraftPick instances owned by team
"""
try:
params = [
('season', str(season)),
('owner_team_id', str(team_id)),
('round_start', str(round_start)),
('round_end', str(round_end)),
('sort', 'order-asc')
]
picks = await self.get_all_items(params=params)
logger.debug(f"Found {len(picks)} picks for team {team_id} in rounds {round_start}-{round_end}")
return picks
except Exception as e:
logger.error(f"Error getting picks for team {team_id}: {e}")
return []
async def get_picks_by_round(
self,
season: int,
round_num: int,
include_taken: bool = True
) -> List[DraftPick]:
"""
Get all picks in a specific round.
NOT cached - picks change as they're selected.
Args:
season: Draft season
round_num: Round number
include_taken: Whether to include picks with players selected
Returns:
List of DraftPick instances in the round
"""
try:
params = [
('season', str(season)),
('pick_round_start', str(round_num)),
('pick_round_end', str(round_num)),
('sort', 'order-asc')
]
if not include_taken:
params.append(('player_taken', 'false'))
picks = await self.get_all_items(params=params)
logger.debug(f"Found {len(picks)} picks in round {round_num}")
return picks
except Exception as e:
logger.error(f"Error getting picks for round {round_num}: {e}")
return []
async def get_available_picks(
self,
season: int,
overall_start: Optional[int] = None,
overall_end: Optional[int] = None
) -> List[DraftPick]:
"""
Get picks that haven't been selected yet.
NOT cached - availability changes constantly.
Args:
season: Draft season
overall_start: Starting overall pick number (optional)
overall_end: Ending overall pick number (optional)
Returns:
List of available DraftPick instances
"""
try:
params = [
('season', str(season)),
('player_taken', 'false'),
('sort', 'order-asc')
]
if overall_start is not None:
params.append(('overall_start', str(overall_start)))
if overall_end is not None:
params.append(('overall_end', str(overall_end)))
picks = await self.get_all_items(params=params)
logger.debug(f"Found {len(picks)} available picks")
return picks
except Exception as e:
logger.error(f"Error getting available picks: {e}")
return []
async def get_recent_picks(
self,
season: int,
overall_end: int,
limit: int = 5
) -> List[DraftPick]:
"""
Get recent picks before a specific pick number.
NOT cached - recent picks change as draft progresses.
Args:
season: Draft season
overall_end: Get picks before this overall number
limit: Number of picks to retrieve
Returns:
List of recent DraftPick instances (reverse chronological)
"""
try:
params = [
('season', str(season)),
('overall_end', str(overall_end - 1)), # Exclude current pick
('player_taken', 'true'), # Only taken picks
('sort', 'order-desc'), # Most recent first
('limit', str(limit))
]
picks = await self.get_all_items(params=params)
logger.debug(f"Found {len(picks)} recent picks before #{overall_end}")
return picks
except Exception as e:
logger.error(f"Error getting recent picks: {e}")
return []
async def get_upcoming_picks(
self,
season: int,
overall_start: int,
limit: int = 5
) -> List[DraftPick]:
"""
Get upcoming picks after a specific pick number.
NOT cached - upcoming picks change as draft progresses.
Args:
season: Draft season
overall_start: Get picks after this overall number
limit: Number of picks to retrieve
Returns:
List of upcoming DraftPick instances
"""
try:
params = [
('season', str(season)),
('overall_start', str(overall_start + 1)), # Exclude current pick
('sort', 'order-asc'), # Chronological order
('limit', str(limit))
]
picks = await self.get_all_items(params=params)
logger.debug(f"Found {len(picks)} upcoming picks after #{overall_start}")
return picks
except Exception as e:
logger.error(f"Error getting upcoming picks: {e}")
return []
async def update_pick_selection(
self,
pick_id: int,
player_id: int
) -> Optional[DraftPick]:
"""
Update a pick with player selection.
Args:
pick_id: Draft pick database ID
player_id: Player ID being selected
Returns:
Updated DraftPick instance or None if update failed
"""
try:
update_data = {'player_id': player_id}
updated_pick = await self.patch(pick_id, update_data)
if updated_pick:
logger.info(f"Updated pick #{pick_id} with player {player_id}")
else:
logger.error(f"Failed to update pick #{pick_id}")
return updated_pick
except Exception as e:
logger.error(f"Error updating pick {pick_id}: {e}")
return None
async def clear_pick_selection(self, pick_id: int) -> Optional[DraftPick]:
"""
Clear player selection from a pick (for admin wipe operations).
Args:
pick_id: Draft pick database ID
Returns:
Updated DraftPick instance with player cleared, or None if failed
"""
try:
update_data = {'player_id': None}
updated_pick = await self.patch(pick_id, update_data)
if updated_pick:
logger.info(f"Cleared player selection from pick #{pick_id}")
else:
logger.error(f"Failed to clear pick #{pick_id}")
return updated_pick
except Exception as e:
logger.error(f"Error clearing pick {pick_id}: {e}")
return None
# Global service instance
draft_pick_service = DraftPickService()

343
services/draft_service.py Normal file
View File

@ -0,0 +1,343 @@
"""
Draft service for Discord Bot v2.0
Core draft business logic and state management. NO CACHING - draft state changes constantly.
"""
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from services.base_service import BaseService
from models.draft_data import DraftData
from exceptions import APIException
logger = logging.getLogger(f'{__name__}.DraftService')
class DraftService(BaseService[DraftData]):
"""
Service for core draft operations and state management.
IMPORTANT: This service does NOT use caching decorators because draft data
changes every 2-12 minutes during an active draft. Always fetch fresh data.
Features:
- Get/update draft configuration
- Timer management (start/stop)
- Pick advancement
- Draft state validation
"""
def __init__(self):
"""Initialize draft service."""
super().__init__(DraftData, 'draftdata')
logger.debug("DraftService initialized")
async def get_draft_data(self) -> Optional[DraftData]:
"""
Get current draft configuration and state.
NOT cached - draft state changes frequently during active draft.
Returns:
DraftData instance or None if not found
"""
try:
# Draft data endpoint typically returns single object
items = await self.get_all_items()
if items:
draft_data = items[0]
logger.debug(
f"Retrieved draft data: pick={draft_data.currentpick}, "
f"timer={draft_data.timer}, "
f"deadline={draft_data.pick_deadline}"
)
return draft_data
logger.warning("No draft data found in database")
return None
except Exception as e:
logger.error(f"Error getting draft data: {e}")
return None
async def update_draft_data(
self,
draft_id: int,
updates: Dict[str, Any]
) -> Optional[DraftData]:
"""
Update draft configuration.
Args:
draft_id: DraftData database ID (typically 1)
updates: Dictionary of fields to update
Returns:
Updated DraftData instance or None if update failed
"""
try:
# Draft data API expects query parameters for PATCH requests
updated = await self.patch(draft_id, updates, use_query_params=True)
if updated:
logger.info(f"Updated draft data: {updates}")
else:
logger.error(f"Failed to update draft data with {updates}")
return updated
except Exception as e:
logger.error(f"Error updating draft data: {e}")
return None
async def set_timer(
self,
draft_id: int,
active: bool,
pick_minutes: Optional[int] = None
) -> Optional[DraftData]:
"""
Enable or disable draft timer.
Args:
draft_id: DraftData database ID
active: True to enable timer, False to disable
pick_minutes: Minutes per pick (updates default if provided)
Returns:
Updated DraftData instance
"""
try:
updates = {'timer': active}
if pick_minutes is not None:
updates['pick_minutes'] = pick_minutes
# Set deadline based on timer state
if active:
# Calculate new deadline
if pick_minutes:
deadline = datetime.now() + timedelta(minutes=pick_minutes)
else:
# Get current pick_minutes from existing data
current_data = await self.get_draft_data()
if current_data:
deadline = datetime.now() + timedelta(minutes=current_data.pick_minutes)
else:
deadline = datetime.now() + timedelta(minutes=2) # Default fallback
updates['pick_deadline'] = deadline
else:
# Set deadline far in future when timer inactive
updates['pick_deadline'] = datetime.now() + timedelta(days=690)
updated = await self.update_draft_data(draft_id, updates)
if updated:
status = "enabled" if active else "disabled"
logger.info(f"Draft timer {status}")
else:
logger.error("Failed to update draft timer")
return updated
except Exception as e:
logger.error(f"Error setting draft timer: {e}")
return None
async def advance_pick(
self,
draft_id: int,
current_pick: int
) -> Optional[DraftData]:
"""
Advance to next pick in draft.
Automatically skips picks that have already been filled (player selected).
Posts round announcement when entering new round.
Args:
draft_id: DraftData database ID
current_pick: Current overall pick number
Returns:
Updated DraftData with new currentpick
"""
try:
from services.draft_pick_service import draft_pick_service
from config import get_config
config = get_config()
season = config.sba_current_season
total_picks = config.draft_total_picks
# Start with next pick
next_pick = current_pick + 1
# Keep advancing until we find an unfilled pick or reach end
while next_pick <= total_picks:
pick = await draft_pick_service.get_pick(season, next_pick)
if not pick:
logger.error(f"Pick #{next_pick} not found in database")
break
# If pick has no player, this is the next pick to make
if pick.player_id is None:
logger.info(f"Advanced to pick #{next_pick}")
break
# Pick already filled, continue to next
logger.debug(f"Pick #{next_pick} already filled, skipping")
next_pick += 1
# Check if draft is complete
if next_pick > total_picks:
logger.info("Draft is complete - all picks filled")
# Disable timer
await self.set_timer(draft_id, active=False)
return await self.get_draft_data()
# Update to next pick
updates = {'currentpick': next_pick}
# Reset deadline if timer is active
current_data = await self.get_draft_data()
if current_data and current_data.timer:
updates['pick_deadline'] = datetime.now() + timedelta(minutes=current_data.pick_minutes)
updated = await self.update_draft_data(draft_id, updates)
if updated:
logger.info(f"Draft advanced from pick #{current_pick} to #{next_pick}")
else:
logger.error(f"Failed to advance draft pick")
return updated
except Exception as e:
logger.error(f"Error advancing draft pick: {e}")
return None
async def set_current_pick(
self,
draft_id: int,
overall: int,
reset_timer: bool = True
) -> Optional[DraftData]:
"""
Manually set current pick (admin operation).
Args:
draft_id: DraftData database ID
overall: Overall pick number to jump to
reset_timer: Whether to reset the pick deadline
Returns:
Updated DraftData
"""
try:
updates = {'currentpick': overall}
if reset_timer:
current_data = await self.get_draft_data()
if current_data and current_data.timer:
updates['pick_deadline'] = datetime.now() + timedelta(minutes=current_data.pick_minutes)
updated = await self.update_draft_data(draft_id, updates)
if updated:
logger.info(f"Manually set current pick to #{overall}")
else:
logger.error(f"Failed to set current pick to #{overall}")
return updated
except Exception as e:
logger.error(f"Error setting current pick: {e}")
return None
async def update_channels(
self,
draft_id: int,
ping_channel_id: Optional[int] = None,
result_channel_id: Optional[int] = None
) -> Optional[DraftData]:
"""
Update draft Discord channel configuration.
Args:
draft_id: DraftData database ID
ping_channel_id: Channel ID for "on the clock" pings
result_channel_id: Channel ID for draft results
Returns:
Updated DraftData
"""
try:
updates = {}
if ping_channel_id is not None:
updates['ping_channel'] = ping_channel_id
if result_channel_id is not None:
updates['result_channel'] = result_channel_id
if not updates:
logger.warning("No channel updates provided")
return await self.get_draft_data()
updated = await self.update_draft_data(draft_id, updates)
if updated:
logger.info(f"Updated draft channels: {updates}")
else:
logger.error("Failed to update draft channels")
return updated
except Exception as e:
logger.error(f"Error updating draft channels: {e}")
return None
async def reset_draft_deadline(
self,
draft_id: int,
minutes: Optional[int] = None
) -> Optional[DraftData]:
"""
Reset the current pick deadline.
Args:
draft_id: DraftData database ID
minutes: Minutes to add (uses pick_minutes from config if not provided)
Returns:
Updated DraftData with new deadline
"""
try:
if minutes is None:
current_data = await self.get_draft_data()
if not current_data:
logger.error("Could not get current draft data")
return None
minutes = current_data.pick_minutes
new_deadline = datetime.now() + timedelta(minutes=minutes)
updates = {'pick_deadline': new_deadline}
updated = await self.update_draft_data(draft_id, updates)
if updated:
logger.info(f"Reset draft deadline to {new_deadline}")
else:
logger.error("Failed to reset draft deadline")
return updated
except Exception as e:
logger.error(f"Error resetting draft deadline: {e}")
return None
# Global service instance
draft_service = DraftService()

View File

@ -10,6 +10,7 @@ from config import get_config
from services.base_service import BaseService
from models.team import Team, RosterType
from exceptions import APIException
from utils.decorators import cached_single_item
logger = logging.getLogger(f'{__name__}.TeamService')
@ -32,13 +33,19 @@ class TeamService(BaseService[Team]):
super().__init__(Team, 'teams')
logger.debug("TeamService initialized")
@cached_single_item(ttl=1800) # 30-minute cache
async def get_team(self, team_id: int) -> Optional[Team]:
"""
Get team by ID with error handling.
Cached for 30 minutes since team details rarely change.
Uses @cached_single_item because returns Optional[Team].
Cache key: team:id:{team_id}
Args:
team_id: Unique team identifier
Returns:
Team instance or None if not found
"""
@ -96,7 +103,31 @@ class TeamService(BaseService[Team]):
except Exception as e:
logger.error(f"Error getting teams for owner {owner_id}: {e}")
return []
@cached_single_item(ttl=1800) # 30-minute cache
async def get_team_by_owner(self, owner_id: int, season: Optional[int] = None) -> Optional[Team]:
"""
Get the primary (Major League) team owned by a Discord user.
This is a convenience method for GM validation - returns the first team
found for the owner (typically their ML team). For multiple teams or
roster type filtering, use get_teams_by_owner() instead.
Cached for 30 minutes since GM assignments rarely change.
Uses @cached_single_item because returns Optional[Team].
Cache key: team:owner:{season}:{owner_id}
Args:
owner_id: Discord user ID
season: Season number (defaults to current season)
Returns:
Team instance or None if not found
"""
teams = await self.get_teams_by_owner(owner_id, season, roster_type='ml')
return teams[0] if teams else None
async def get_team_by_abbrev(self, abbrev: str, season: Optional[int] = None) -> Optional[Team]:
"""
Get team by abbreviation for a specific season.

View File

@ -231,6 +231,91 @@ When voice channels are cleaned up (deleted after being empty):
- Prevents duplicate error messages
- Continues operation despite individual scorecard failures
### Draft Monitor (`draft_monitor.py`) (NEW - October 2025)
**Purpose:** Automated draft timer monitoring, warnings, and auto-draft execution
**Schedule:** Every 15 seconds (only when draft timer is active)
**Operations:**
- **Timer Monitoring:**
- Checks draft state every 15 seconds
- Self-terminates when `draft_data.timer = False`
- Restarts when timer re-enabled via `/draft-admin`
- **Warning System:**
- Sends 60-second warning to ping channel
- Sends 30-second warning to ping channel
- Resets warning flags when pick advances
- **Auto-Draft Execution:**
- Triggers when pick deadline passes
- Acquires global pick lock before auto-drafting
- Tries each player in team's draft list until one succeeds
- Validates cap space and player availability
- Advances to next pick after auto-draft
#### Key Features
- **Self-Terminating:** Stops automatically when timer disabled (resource efficient)
- **Global Lock Integration:** Acquires same lock as `/draft` command
- **Crash Recovery:** Respects 30-second stale lock timeout
- **Safe Startup:** Uses `@before_loop` pattern with `await bot.wait_until_ready()`
- **Service Layer:** All API calls through services (no direct client access)
#### Configuration
The monitor respects draft configuration:
```python
# From DraftData model
timer: bool # When False, monitor stops
pick_deadline: datetime # Warning/auto-draft trigger
ping_channel_id: int # Where warnings are sent
pick_minutes: int # Timer duration per pick
```
**Environment Variables:**
- `GUILD_ID` - Discord server ID
- `SBA_CURRENT_SEASON` - Current draft season
#### Draft Lock Integration
The monitor integrates with the global pick lock:
```python
# In DraftPicksCog
self.pick_lock = asyncio.Lock() # Shared lock
self.lock_acquired_at: Optional[datetime] = None
self.lock_acquired_by: Optional[int] = None
# Monitor acquires same lock for auto-draft
async with draft_picks_cog.pick_lock:
draft_picks_cog.lock_acquired_at = datetime.now()
draft_picks_cog.lock_acquired_by = None # System auto-draft
await self.auto_draft_current_pick()
```
#### Auto-Draft Process
1. Check if pick lock is available
2. Acquire global lock
3. Get team's draft list ordered by rank
4. For each player in list:
- Validate player is still FA
- Validate cap space
- Attempt to draft player
- Break on success
5. Advance to next pick
6. Release lock
#### Channel Requirements
- **ping_channel** - Where warnings and auto-draft announcements post
#### Error Handling
- Comprehensive try/catch blocks with structured logging
- Graceful degradation if channels not found
- Continues operation despite individual pick failures
- Task self-terminates on critical errors
**Resource Efficiency:**
This task is designed to run only during active drafts (~2 weeks per year). When `draft_data.timer = False`, the task calls `self.monitor_loop.cancel()` and stops consuming resources. Admin can restart via `/draft-admin timer on`.
### Transaction Freeze/Thaw (`transaction_freeze.py`)
**Purpose:** Automated weekly system for freezing transactions and processing contested player acquisitions

365
tasks/draft_monitor.py Normal file
View File

@ -0,0 +1,365 @@
"""
Draft Monitor Task for Discord Bot v2.0
Automated background task for draft timer monitoring, warnings, and auto-draft.
Self-terminates when draft timer is disabled to conserve resources.
"""
import asyncio
from datetime import datetime
from typing import Optional
import discord
from discord.ext import commands, tasks
from services.draft_service import draft_service
from services.draft_pick_service import draft_pick_service
from services.draft_list_service import draft_list_service
from services.player_service import player_service
from services.team_service import team_service
from utils.logging import get_contextual_logger
from views.embeds import EmbedTemplate, EmbedColors
from config import get_config
class DraftMonitorTask:
"""
Automated monitoring task for draft operations.
Features:
- Monitors draft timer every 15 seconds
- Sends warnings at 60s and 30s remaining
- Triggers auto-draft when deadline passes
- Respects global pick lock
- Self-terminates when timer disabled
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftMonitorTask')
# Warning flags (reset each pick)
self.warning_60s_sent = False
self.warning_30s_sent = False
self.logger.info("Draft monitor task initialized")
# Start the monitor task
self.monitor_loop.start()
def cog_unload(self):
"""Stop the task when cog is unloaded."""
self.monitor_loop.cancel()
@tasks.loop(seconds=15)
async def monitor_loop(self):
"""
Main monitoring loop - checks draft state every 15 seconds.
Self-terminates when draft timer is disabled.
"""
try:
# Get current draft state
draft_data = await draft_service.get_draft_data()
if not draft_data:
self.logger.warning("No draft data found")
return
# CRITICAL: Stop loop if timer disabled
if not draft_data.timer:
self.logger.info("Draft timer disabled - stopping monitor")
self.monitor_loop.cancel()
return
# Check if we need to take action
now = datetime.now()
deadline = draft_data.pick_deadline
if not deadline:
self.logger.warning("Draft timer active but no deadline set")
return
# Calculate time remaining
time_remaining = (deadline - now).total_seconds()
if time_remaining <= 0:
# Timer expired - auto-draft
await self._handle_expired_timer(draft_data)
else:
# Send warnings at intervals
await self._send_warnings_if_needed(draft_data, time_remaining)
except Exception as e:
self.logger.error("Error in draft monitor loop", error=e)
@monitor_loop.before_loop
async def before_monitor(self):
"""Wait for bot to be ready before starting - REQUIRED FOR SAFE STARTUP."""
await self.bot.wait_until_ready()
self.logger.info("Bot is ready, draft monitor starting")
async def _handle_expired_timer(self, draft_data):
"""
Handle expired pick timer - trigger auto-draft.
Args:
draft_data: Current draft configuration
"""
try:
config = get_config()
guild = self.bot.get_guild(config.guild_id)
if not guild:
self.logger.error("Could not find guild")
return
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
if not current_pick or not current_pick.owner:
self.logger.error(f"Could not get pick #{draft_data.currentpick}")
return
# Get draft picks cog to check/acquire lock
draft_picks_cog = self.bot.get_cog('DraftPicksCog')
if not draft_picks_cog:
self.logger.error("Could not find DraftPicksCog")
return
# Check if lock is available
if draft_picks_cog.pick_lock.locked():
self.logger.debug("Pick lock is held, skipping auto-draft this cycle")
return
# Acquire lock
async with draft_picks_cog.pick_lock:
draft_picks_cog.lock_acquired_at = datetime.now()
draft_picks_cog.lock_acquired_by = None # System auto-draft
try:
await self._auto_draft_current_pick(draft_data, current_pick, guild)
finally:
draft_picks_cog.lock_acquired_at = None
draft_picks_cog.lock_acquired_by = None
except Exception as e:
self.logger.error("Error handling expired timer", error=e)
async def _auto_draft_current_pick(self, draft_data, current_pick, guild):
"""
Attempt to auto-draft from team's draft list.
Args:
draft_data: Current draft configuration
current_pick: DraftPick to auto-draft
guild: Discord guild
"""
try:
config = get_config()
# Get ping channel
ping_channel = guild.get_channel(draft_data.ping_channel)
if not ping_channel:
self.logger.error(f"Could not find ping channel {draft_data.ping_channel}")
return
# Get team's draft list
draft_list = await draft_list_service.get_team_list(
config.sba_current_season,
current_pick.owner.id
)
if not draft_list:
self.logger.warning(f"Team {current_pick.owner.abbrev} has no draft list")
await ping_channel.send(
content=f"{current_pick.owner.abbrev} time expired with no draft list - pick skipped"
)
# Advance to next pick
await draft_service.advance_pick(draft_data.id, draft_data.currentpick)
return
# Try each player in order
for entry in draft_list:
if not entry.player:
continue
player = entry.player
# Check if player is still available
if player.team_id != config.free_agent_team_id:
self.logger.debug(f"Player {player.name} no longer available, skipping")
continue
# Attempt to draft this player
success = await self._attempt_draft_player(
current_pick,
player,
ping_channel
)
if success:
self.logger.info(
f"Auto-drafted {player.name} for {current_pick.owner.abbrev}"
)
# Advance to next pick
await draft_service.advance_pick(draft_data.id, draft_data.currentpick)
# Reset warning flags
self.warning_60s_sent = False
self.warning_30s_sent = False
return
# No players successfully drafted
self.logger.warning(f"Could not auto-draft for {current_pick.owner.abbrev}")
await ping_channel.send(
content=f"{current_pick.owner.abbrev} time expired - no valid players in draft list"
)
# Advance to next pick anyway
await draft_service.advance_pick(draft_data.id, draft_data.currentpick)
except Exception as e:
self.logger.error("Error auto-drafting player", error=e)
async def _attempt_draft_player(
self,
draft_pick,
player,
ping_channel
) -> bool:
"""
Attempt to draft a specific player.
Args:
draft_pick: DraftPick to update
player: Player to draft
ping_channel: Discord channel for announcements
Returns:
True if draft succeeded
"""
try:
from utils.draft_helpers import validate_cap_space
from services.team_service import team_service
# Get team roster for cap validation
roster = await team_service.get_team_roster(draft_pick.owner.id, 'current')
if not roster:
self.logger.error(f"Could not get roster for team {draft_pick.owner.id}")
return False
# Validate cap space
is_valid, projected_total = await validate_cap_space(roster, player.wara)
if not is_valid:
self.logger.debug(
f"Cannot auto-draft {player.name} - would exceed cap "
f"(projected: {projected_total:.2f})"
)
return False
# Update draft pick
updated_pick = await draft_pick_service.update_pick_selection(
draft_pick.id,
player.id
)
if not updated_pick:
self.logger.error(f"Failed to update pick {draft_pick.id}")
return False
# Update player team
from services.player_service import player_service
updated_player = await player_service.update_player_team(
player.id,
draft_pick.owner.id
)
if not updated_player:
self.logger.error(f"Failed to update player {player.id} team")
return False
# Post to channel
await ping_channel.send(
content=f"🤖 AUTO-DRAFT: {draft_pick.owner.abbrev} selects **{player.name}** "
f"(Pick #{draft_pick.overall})"
)
return True
except Exception as e:
self.logger.error(f"Error attempting to draft {player.name}", error=e)
return False
async def _send_warnings_if_needed(self, draft_data, time_remaining: float):
"""
Send warnings at 60s and 30s remaining.
Args:
draft_data: Current draft configuration
time_remaining: Seconds remaining until deadline
"""
try:
config = get_config()
guild = self.bot.get_guild(config.guild_id)
if not guild:
return
ping_channel = guild.get_channel(draft_data.ping_channel)
if not ping_channel:
return
# Get current pick for mention
current_pick = await draft_pick_service.get_pick(
config.sba_current_season,
draft_data.currentpick
)
if not current_pick or not current_pick.owner:
return
# 60-second warning
if 55 <= time_remaining <= 60 and not self.warning_60s_sent:
await ping_channel.send(
content=f"{current_pick.owner.abbrev} - **60 seconds remaining** "
f"for pick #{current_pick.overall}!"
)
self.warning_60s_sent = True
self.logger.debug(f"Sent 60s warning for pick #{current_pick.overall}")
# 30-second warning
elif 25 <= time_remaining <= 30 and not self.warning_30s_sent:
await ping_channel.send(
content=f"{current_pick.owner.abbrev} - **30 seconds remaining** "
f"for pick #{current_pick.overall}!"
)
self.warning_30s_sent = True
self.logger.debug(f"Sent 30s warning for pick #{current_pick.overall}")
# Reset warnings if time goes back above 60s
elif time_remaining > 60:
if self.warning_60s_sent or self.warning_30s_sent:
self.warning_60s_sent = False
self.warning_30s_sent = False
self.logger.debug("Reset warning flags - pick deadline extended")
except Exception as e:
self.logger.error("Error sending warnings", error=e)
# Task factory function
def setup_draft_monitor(bot: commands.Bot) -> DraftMonitorTask:
"""
Setup function for draft monitor task.
Args:
bot: Discord bot instance
Returns:
Initialized DraftMonitorTask
"""
return DraftMonitorTask(bot)

View File

@ -201,16 +201,16 @@ class TransactionFreezeTask:
)
# BEGIN FREEZE: Monday at 00:00, not already frozen
if now.weekday() == 0 and now.hour == 0 and not current.freeze:
if now.weekday() == 0 and now.hour == 0 and not current.freeze and self.weekly_warning_sent:
self.logger.info("Triggering freeze begin")
await self._begin_freeze(current)
self.weekly_warning_sent = False # Reset error flag
self.weekly_warning_sent = False
# END FREEZE: Saturday at 00:00, currently frozen
elif now.weekday() == 5 and now.hour == 0 and current.freeze:
elif now.weekday() == 5 and now.hour == 0 and current.freeze and not self.weekly_warning_sent:
self.logger.info("Triggering freeze end")
await self._end_freeze(current)
self.weekly_warning_sent = False # Reset error flag
self.weekly_warning_sent = True
else:
self.logger.debug("No freeze/thaw action needed at this time")

View File

@ -17,6 +17,7 @@ from discord.ext import commands
from commands.voice.channels import VoiceChannelCommands
from commands.voice.cleanup_service import VoiceChannelCleanupService
from commands.voice.tracker import VoiceChannelTracker
from commands.gameplay.scorecard_tracker import ScorecardTracker
from models.game import Game
from models.team import Team
@ -180,19 +181,28 @@ class TestVoiceChannelTracker:
class TestVoiceChannelCleanupService:
"""Test voice channel cleanup service functionality."""
@pytest.fixture
def cleanup_service(self):
"""Create a cleanup service instance."""
with tempfile.TemporaryDirectory() as temp_dir:
data_file = Path(temp_dir) / "test_channels.json"
return VoiceChannelCleanupService(str(data_file))
@pytest.fixture
def mock_bot(self):
"""Create a mock bot instance."""
bot = AsyncMock(spec=commands.Bot)
return bot
@pytest.fixture
def cleanup_service(self, mock_bot):
"""Create a cleanup service instance."""
from utils.logging import get_contextual_logger
with tempfile.TemporaryDirectory() as temp_dir:
data_file = Path(temp_dir) / "test_channels.json"
service = VoiceChannelCleanupService.__new__(VoiceChannelCleanupService)
service.bot = mock_bot
service.logger = get_contextual_logger('test.VoiceChannelCleanupService')
service.tracker = VoiceChannelTracker(str(data_file))
service.scorecard_tracker = ScorecardTracker()
service.empty_threshold = 5
# Don't start the loop (no event loop in tests)
return service
@pytest.mark.asyncio
async def test_verify_tracked_channels(self, cleanup_service, mock_bot):
"""Test verification of tracked channels on startup."""
@ -287,7 +297,7 @@ class TestVoiceChannelCleanupService:
await cleanup_service.cleanup_channel(mock_bot, channel_data)
# Should have deleted the channel
mock_channel.delete.assert_called_once_with(reason="Automatic cleanup - empty for 15+ minutes")
mock_channel.delete.assert_called_once_with(reason="Automatic cleanup - empty for 5+ minutes")
# Should have removed from tracking
assert "123" not in cleanup_service.tracker._data["voice_channels"]
@ -335,7 +345,7 @@ class TestVoiceChannelCleanupService:
await cleanup_service.cleanup_channel(mock_bot, channel_data)
# Should have deleted the channel
mock_channel.delete.assert_called_once_with(reason="Automatic cleanup - empty for 15+ minutes")
mock_channel.delete.assert_called_once_with(reason="Automatic cleanup - empty for 5+ minutes")
# Should have removed from voice channel tracking
assert "123" not in cleanup_service.tracker._data["voice_channels"]

View File

@ -373,33 +373,33 @@ class TestDraftDataModel:
def test_draft_data_creation(self):
"""Test draft data creation."""
draft_data = DraftData(
result_channel_id=123456789,
ping_channel_id=987654321,
result_channel=123456789,
ping_channel=987654321,
pick_minutes=10
)
assert draft_data.result_channel_id == 123456789
assert draft_data.ping_channel_id == 987654321
assert draft_data.result_channel == 123456789
assert draft_data.ping_channel == 987654321
assert draft_data.pick_minutes == 10
def test_draft_data_properties(self):
"""Test draft data properties."""
# Inactive draft
draft_data = DraftData(
result_channel_id=123,
ping_channel_id=456,
result_channel=123,
ping_channel=456,
timer=False
)
assert draft_data.is_draft_active is False
# Active draft
active_draft = DraftData(
result_channel_id=123,
ping_channel_id=456,
result_channel=123,
ping_channel=456,
timer=True
)
assert active_draft.is_draft_active is True

249
utils/draft_helpers.py Normal file
View File

@ -0,0 +1,249 @@
"""
Draft utility functions for Discord Bot v2.0
Provides helper functions for draft order calculation and cap space validation.
"""
import math
from typing import Tuple
from utils.logging import get_contextual_logger
from config import get_config
logger = get_contextual_logger(__name__)
def calculate_pick_details(overall: int) -> Tuple[int, int]:
"""
Calculate round number and pick position from overall pick number.
Hybrid draft format:
- Rounds 1-10: Linear (same order every round)
- Rounds 11+: Snake (reverse order on even rounds)
Special rule: Round 11, Pick 1 belongs to the team that had Round 10, Pick 16
(last pick of linear rounds transitions to first pick of snake rounds).
Args:
overall: Overall pick number (1-512 for 32-round, 16-team draft)
Returns:
(round_num, position): Round number (1-32) and position within round (1-16)
Examples:
>>> calculate_pick_details(1)
(1, 1) # Round 1, Pick 1
>>> calculate_pick_details(16)
(1, 16) # Round 1, Pick 16
>>> calculate_pick_details(160)
(10, 16) # Round 10, Pick 16 (last linear pick)
>>> calculate_pick_details(161)
(11, 1) # Round 11, Pick 1 (first snake pick - same team as 160)
>>> calculate_pick_details(176)
(11, 16) # Round 11, Pick 16
>>> calculate_pick_details(177)
(12, 16) # Round 12, Pick 16 (snake reverses)
"""
config = get_config()
team_count = config.draft_team_count
linear_rounds = config.draft_linear_rounds
round_num = math.ceil(overall / team_count)
if round_num <= linear_rounds:
# Linear draft: position is same calculation every round
position = ((overall - 1) % team_count) + 1
else:
# Snake draft: reverse on even rounds
if round_num % 2 == 1: # Odd rounds (11, 13, 15...)
position = ((overall - 1) % team_count) + 1
else: # Even rounds (12, 14, 16...)
position = team_count - ((overall - 1) % team_count)
return round_num, position
def calculate_overall_from_round_position(round_num: int, position: int) -> int:
"""
Calculate overall pick number from round and position.
Inverse operation of calculate_pick_details().
Args:
round_num: Round number (1-32)
position: Position within round (1-16)
Returns:
Overall pick number
Examples:
>>> calculate_overall_from_round_position(1, 1)
1
>>> calculate_overall_from_round_position(10, 16)
160
>>> calculate_overall_from_round_position(11, 1)
161
>>> calculate_overall_from_round_position(12, 16)
177
"""
config = get_config()
team_count = config.draft_team_count
linear_rounds = config.draft_linear_rounds
if round_num <= linear_rounds:
# Linear draft
return (round_num - 1) * team_count + position
else:
# Snake draft
picks_before_round = (round_num - 1) * team_count
if round_num % 2 == 1: # Odd snake rounds
return picks_before_round + position
else: # Even snake rounds (reversed)
return picks_before_round + (team_count + 1 - position)
async def validate_cap_space(
roster: dict,
new_player_wara: float
) -> Tuple[bool, float]:
"""
Validate team has cap space to draft player.
Cap calculation:
- Maximum 32 players on active roster
- Only top 26 players count toward cap
- Cap limit: 32.00 sWAR total
Args:
roster: Roster dictionary from API with structure:
{
'active': {
'players': [{'id': int, 'name': str, 'wara': float}, ...],
'WARa': float # Current roster sWAR
}
}
new_player_wara: sWAR value of player being drafted
Returns:
(valid, projected_total): True if under cap, projected total sWAR after addition
Raises:
ValueError: If roster structure is invalid
"""
config = get_config()
cap_limit = config.swar_cap_limit
cap_player_count = config.cap_player_count
if not roster or not roster.get('active'):
raise ValueError("Invalid roster structure - missing 'active' key")
active_roster = roster['active']
current_players = active_roster.get('players', [])
# Calculate how many players count toward cap after adding new player
current_roster_size = len(current_players)
projected_roster_size = current_roster_size + 1
# Maximum zeroes = 32 - roster size
# Maximum counted = 26 - zeroes
max_zeroes = 32 - projected_roster_size
max_counted = min(cap_player_count, cap_player_count - max_zeroes) # Can't count more than cap_player_count
# Sort all players (including new) by sWAR descending
all_players_wara = [p['wara'] for p in current_players] + [new_player_wara]
sorted_wara = sorted(all_players_wara, reverse=True)
# Sum top N players
projected_total = sum(sorted_wara[:max_counted])
# Allow tiny floating point tolerance
is_valid = projected_total <= (cap_limit + 0.00001)
logger.debug(
f"Cap validation: roster_size={current_roster_size}, "
f"projected_size={projected_roster_size}, "
f"max_counted={max_counted}, "
f"new_player_wara={new_player_wara:.2f}, "
f"projected_total={projected_total:.2f}, "
f"valid={is_valid}"
)
return is_valid, projected_total
def format_pick_display(overall: int) -> str:
"""
Format pick number for display.
Args:
overall: Overall pick number
Returns:
Formatted string like "Round 1, Pick 3 (Overall #3)"
Examples:
>>> format_pick_display(1)
"Round 1, Pick 1 (Overall #1)"
>>> format_pick_display(45)
"Round 3, Pick 13 (Overall #45)"
"""
round_num, position = calculate_pick_details(overall)
return f"Round {round_num}, Pick {position} (Overall #{overall})"
def get_next_pick_overall(current_overall: int) -> int:
"""
Get the next overall pick number.
Simply increments by 1, but provided for completeness and future logic changes.
Args:
current_overall: Current overall pick number
Returns:
Next overall pick number
"""
return current_overall + 1
def is_draft_complete(current_overall: int, total_picks: int = None) -> bool:
"""
Check if draft is complete.
Args:
current_overall: Current overall pick number
total_picks: Total number of picks in draft (None uses config value)
Returns:
True if draft is complete
"""
if total_picks is None:
config = get_config()
total_picks = config.draft_total_picks
return current_overall > total_picks
def get_round_name(round_num: int) -> str:
"""
Get display name for round.
Args:
round_num: Round number
Returns:
Display name like "Round 1" or "Round 11 (Snake Draft Begins)"
"""
if round_num == 1:
return "Round 1"
elif round_num == 11:
return "Round 11 (Snake Draft Begins)"
else:
return f"Round {round_num}"

465
views/draft_views.py Normal file
View File

@ -0,0 +1,465 @@
"""
Draft Views for Discord Bot v2.0
Provides embeds and UI components for draft system.
"""
from typing import Optional, List
from datetime import datetime
import discord
from models.draft_pick import DraftPick
from models.draft_data import DraftData
from models.team import Team
from models.player import Player
from models.draft_list import DraftList
from views.embeds import EmbedTemplate, EmbedColors
from utils.draft_helpers import format_pick_display, get_round_name
from config import get_config
async def create_on_the_clock_embed(
current_pick: DraftPick,
draft_data: DraftData,
recent_picks: List[DraftPick],
upcoming_picks: List[DraftPick],
team_roster_swar: Optional[float] = None
) -> discord.Embed:
"""
Create "on the clock" embed showing current pick info.
Args:
current_pick: Current DraftPick being made
draft_data: Current draft configuration
recent_picks: List of recent draft picks
upcoming_picks: List of upcoming draft picks
team_roster_swar: Current team sWAR (optional)
Returns:
Discord embed with pick information
"""
if not current_pick.owner:
raise ValueError("Pick must have owner")
# Create base embed with team colors
embed = EmbedTemplate.create_base_embed(
title=f"{current_pick.owner.lname} On The Clock",
description=format_pick_display(current_pick.overall),
color=EmbedColors.PRIMARY
)
# Add team info
if current_pick.owner.sname:
embed.add_field(
name="Team",
value=f"{current_pick.owner.abbrev} {current_pick.owner.sname}",
inline=True
)
# Add timer info
if draft_data.pick_deadline:
deadline_timestamp = int(draft_data.pick_deadline.timestamp())
embed.add_field(
name="Deadline",
value=f"<t:{deadline_timestamp}:R>",
inline=True
)
# Add team sWAR if provided
if team_roster_swar is not None:
config = get_config()
embed.add_field(
name="Current sWAR",
value=f"{team_roster_swar:.2f} / {config.swar_cap_limit:.2f}",
inline=True
)
# Add recent picks
if recent_picks:
recent_str = ""
for pick in recent_picks[:5]:
if pick.player:
recent_str += f"**#{pick.overall}** - {pick.player.name}\n"
if recent_str:
embed.add_field(
name="📋 Last 5 Picks",
value=recent_str or "None",
inline=False
)
# Add upcoming picks
if upcoming_picks:
upcoming_str = ""
for pick in upcoming_picks[:5]:
upcoming_str += f"**#{pick.overall}** - {pick.owner.sname if pick.owner else 'Unknown'}\n"
if upcoming_str:
embed.add_field(
name="🔜 Next 5 Picks",
value=upcoming_str,
inline=False
)
# Add footer
if current_pick.is_traded:
embed.set_footer(text="📝 This pick was traded")
return embed
async def create_draft_status_embed(
draft_data: DraftData,
current_pick: DraftPick,
lock_status: str = "🔓 No pick in progress"
) -> discord.Embed:
"""
Create draft status embed showing current state.
Args:
draft_data: Current draft configuration
current_pick: Current DraftPick
lock_status: Lock status message
Returns:
Discord embed with draft status
"""
embed = EmbedTemplate.info(
title="Draft Status",
description=f"Currently on {format_pick_display(draft_data.currentpick)}"
)
# On the clock
if current_pick.owner:
embed.add_field(
name="On the Clock",
value=f"{current_pick.owner.abbrev} {current_pick.owner.sname}",
inline=True
)
# Timer status
timer_status = "✅ Active" if draft_data.timer else "⏹️ Inactive"
embed.add_field(
name="Timer",
value=f"{timer_status} ({draft_data.pick_minutes} min)",
inline=True
)
# Deadline
if draft_data.pick_deadline:
deadline_timestamp = int(draft_data.pick_deadline.timestamp())
embed.add_field(
name="Deadline",
value=f"<t:{deadline_timestamp}:R>",
inline=True
)
else:
embed.add_field(
name="Deadline",
value="None",
inline=True
)
# Lock status
embed.add_field(
name="Lock Status",
value=lock_status,
inline=False
)
return embed
async def create_player_draft_card(
player: Player,
draft_pick: DraftPick
) -> discord.Embed:
"""
Create player draft card embed.
Args:
player: Player being drafted
draft_pick: DraftPick information
Returns:
Discord embed with player info
"""
if not draft_pick.owner:
raise ValueError("Pick must have owner")
embed = EmbedTemplate.success(
title=f"{player.name} Drafted!",
description=format_pick_display(draft_pick.overall)
)
# Team info
embed.add_field(
name="Selected By",
value=f"{draft_pick.owner.abbrev} {draft_pick.owner.sname}",
inline=True
)
# Player info
if hasattr(player, 'pos_1') and player.pos_1:
embed.add_field(
name="Position",
value=player.pos_1,
inline=True
)
if hasattr(player, 'wara') and player.wara is not None:
embed.add_field(
name="sWAR",
value=f"{player.wara:.2f}",
inline=True
)
# Add player image if available
if hasattr(player, 'image') and player.image:
embed.set_thumbnail(url=player.image)
return embed
async def create_draft_list_embed(
team: Team,
draft_list: List[DraftList]
) -> discord.Embed:
"""
Create draft list embed showing team's auto-draft queue.
Args:
team: Team owning the list
draft_list: List of DraftList entries
Returns:
Discord embed with draft list
"""
embed = EmbedTemplate.info(
title=f"{team.sname} Draft List",
description=f"Auto-draft queue for {team.abbrev}"
)
if not draft_list:
embed.add_field(
name="Queue Empty",
value="No players in auto-draft queue",
inline=False
)
else:
# Group players by rank
list_str = ""
for entry in draft_list[:25]: # Limit to 25 for embed size
player_name = entry.player.name if entry.player else f"Player {entry.player_id}"
player_swar = f" ({entry.player.wara:.2f})" if entry.player and hasattr(entry.player, 'wara') else ""
list_str += f"**{entry.rank}.** {player_name}{player_swar}\n"
embed.add_field(
name=f"Queue ({len(draft_list)} players)",
value=list_str,
inline=False
)
embed.set_footer(text="Use /draft-list to manage your auto-draft queue")
return embed
async def create_draft_board_embed(
round_num: int,
picks: List[DraftPick]
) -> discord.Embed:
"""
Create draft board embed showing all picks in a round.
Args:
round_num: Round number
picks: List of DraftPick for this round
Returns:
Discord embed with draft board
"""
embed = EmbedTemplate.create_base_embed(
title=f"📋 {get_round_name(round_num)}",
description=f"Draft board for round {round_num}",
color=EmbedColors.PRIMARY
)
if not picks:
embed.add_field(
name="No Picks",
value="No picks found for this round",
inline=False
)
else:
# Create picks display
picks_str = ""
for pick in picks:
if pick.player:
player_display = pick.player.name
else:
player_display = "TBD"
team_display = pick.owner.abbrev if pick.owner else "???"
picks_str += f"**Pick {pick.overall % 16 or 16}:** {team_display} - {player_display}\n"
embed.add_field(
name="Picks",
value=picks_str,
inline=False
)
embed.set_footer(text="Use /draft-board [round] to view different rounds")
return embed
async def create_pick_illegal_embed(
reason: str,
details: Optional[str] = None
) -> discord.Embed:
"""
Create embed for illegal pick attempt.
Args:
reason: Main reason pick is illegal
details: Additional details (optional)
Returns:
Discord error embed
"""
embed = EmbedTemplate.error(
title="Invalid Pick",
description=reason
)
if details:
embed.add_field(
name="Details",
value=details,
inline=False
)
return embed
async def create_pick_success_embed(
player: Player,
team: Team,
pick_overall: int,
projected_swar: float
) -> discord.Embed:
"""
Create embed for successful pick.
Args:
player: Player drafted
team: Team that drafted player
pick_overall: Overall pick number
projected_swar: Projected team sWAR after pick
Returns:
Discord success embed
"""
embed = EmbedTemplate.success(
title="Pick Confirmed",
description=f"{team.abbrev} selects **{player.name}**"
)
embed.add_field(
name="Pick",
value=format_pick_display(pick_overall),
inline=True
)
if hasattr(player, 'wara') and player.wara is not None:
embed.add_field(
name="Player sWAR",
value=f"{player.wara:.2f}",
inline=True
)
config = get_config()
embed.add_field(
name="Projected Team sWAR",
value=f"{projected_swar:.2f} / {config.swar_cap_limit:.2f}",
inline=True
)
return embed
async def create_admin_draft_info_embed(
draft_data: DraftData,
current_pick: Optional[DraftPick] = None
) -> discord.Embed:
"""
Create detailed admin view of draft status.
Args:
draft_data: Current draft configuration
current_pick: Current DraftPick (optional)
Returns:
Discord embed with admin information
"""
embed = EmbedTemplate.info(
title="⚙️ Draft Administration",
description="Current draft configuration and state"
)
# Current pick
embed.add_field(
name="Current Pick",
value=str(draft_data.currentpick),
inline=True
)
# Timer status
timer_emoji = "" if draft_data.timer else "⏹️"
embed.add_field(
name="Timer Status",
value=f"{timer_emoji} {'Active' if draft_data.timer else 'Inactive'}",
inline=True
)
# Timer duration
embed.add_field(
name="Pick Duration",
value=f"{draft_data.pick_minutes} minutes",
inline=True
)
# Channels
ping_channel_value = f"<#{draft_data.ping_channel}>" if draft_data.ping_channel else "Not configured"
embed.add_field(
name="Ping Channel",
value=ping_channel_value,
inline=True
)
result_channel_value = f"<#{draft_data.result_channel}>" if draft_data.result_channel else "Not configured"
embed.add_field(
name="Result Channel",
value=result_channel_value,
inline=True
)
# Deadline
if draft_data.pick_deadline:
deadline_timestamp = int(draft_data.pick_deadline.timestamp())
embed.add_field(
name="Current Deadline",
value=f"<t:{deadline_timestamp}:F>",
inline=True
)
# Current pick owner
if current_pick and current_pick.owner:
embed.add_field(
name="On The Clock",
value=f"{current_pick.owner.abbrev} {current_pick.owner.sname}",
inline=False
)
embed.set_footer(text="Use /draft-admin to modify draft settings")
return embed