fix: batch quick-wins — 4 issues resolved (closes #37, #27, #25, #38)

- #37: Fix stale comment in transaction_freeze.py referencing wrong moveid format
- #27: Change config.testing default from True to False (was masking prod behavior)
- #25: Replace deprecated asyncio.get_event_loop() with get_running_loop()
- #38: Replace naive datetime.now() with timezone-aware datetime.now(UTC) across
  7 source files and 4 test files to prevent subtle timezone bugs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-02-20 11:48:16 -06:00
parent f64fee8d2e
commit 9cd577cba1
16 changed files with 1785 additions and 1511 deletions

View File

@ -3,10 +3,11 @@ Draft Pick Commands
Implements slash commands for making draft picks with global lock protection.
"""
import asyncio
import re
from typing import List, Optional
from datetime import datetime
from datetime import UTC, datetime
import discord
from discord.ext import commands
@ -27,7 +28,7 @@ from views.draft_views import (
create_player_draft_card,
create_pick_illegal_embed,
create_pick_success_embed,
create_on_clock_announcement_embed
create_on_clock_announcement_embed,
)
@ -53,7 +54,7 @@ def _parse_player_name(raw_input: str) -> str:
# Pattern: "Player Name (POS) - X.XX sWAR"
# Position can be letters or numbers (e.g., SS, RP, 1B, 2B, 3B, OF)
# Extract just the player name before the opening parenthesis
match = re.match(r'^(.+?)\s*\([A-Z0-9]+\)\s*-\s*[\d.]+\s*sWAR$', raw_input)
match = re.match(r"^(.+?)\s*\([A-Z0-9]+\)\s*-\s*[\d.]+\s*sWAR$", raw_input)
if match:
return match.group(1).strip()
@ -73,9 +74,7 @@ async def fa_player_autocomplete(
config = get_config()
# Search for FA players only
players = await player_service.search_players(
current,
limit=25,
season=config.sba_season
current, limit=25, season=config.sba_season
)
# Filter to FA team
@ -84,7 +83,7 @@ async def fa_player_autocomplete(
return [
discord.app_commands.Choice(
name=f"{p.name} ({p.primary_position}) - {p.wara:.2f} sWAR",
value=p.name
value=p.name,
)
for p in fa_players[:25]
]
@ -98,7 +97,7 @@ class DraftPicksCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftPicksCog')
self.logger = get_contextual_logger(f"{__name__}.DraftPicksCog")
# GLOBAL PICK LOCK (local only - not in database)
self.pick_lock = asyncio.Lock()
@ -107,7 +106,7 @@ class DraftPicksCog(commands.Cog):
@discord.app_commands.command(
name="draft",
description="Make a draft pick (autocomplete shows FA players only)"
description="Make a draft pick (autocomplete shows FA players only)",
)
@discord.app_commands.describe(
player="Player name to draft (autocomplete shows available FA players)"
@ -116,18 +115,14 @@ class DraftPicksCog(commands.Cog):
@requires_draft_period
@requires_team()
@logged_command("/draft")
async def draft_pick(
self,
interaction: discord.Interaction,
player: str
):
async def draft_pick(self, interaction: discord.Interaction, player: str):
"""Make a draft pick with global lock protection."""
await interaction.response.defer()
# Check if lock is held
if self.pick_lock.locked():
if self.lock_acquired_at:
time_held = (datetime.now() - self.lock_acquired_at).total_seconds()
time_held = (datetime.now(UTC) - self.lock_acquired_at).total_seconds()
if time_held > 30:
# STALE LOCK: Auto-override after 30 seconds
@ -140,14 +135,14 @@ class DraftPicksCog(commands.Cog):
embed = await create_pick_illegal_embed(
"Pick In Progress",
f"Another manager is currently making a pick. "
f"Please wait approximately {30 - int(time_held)} seconds."
f"Please wait approximately {30 - int(time_held)} seconds.",
)
await interaction.followup.send(embed=embed)
return
# Acquire global lock
async with self.pick_lock:
self.lock_acquired_at = datetime.now()
self.lock_acquired_at = datetime.now(UTC)
self.lock_acquired_by = interaction.user.id
try:
@ -157,9 +152,7 @@ class DraftPicksCog(commands.Cog):
self.lock_acquired_by = None
async def _process_draft_pick(
self,
interaction: discord.Interaction,
player_name: str
self, interaction: discord.Interaction, player_name: str
):
"""
Process draft pick with validation.
@ -176,14 +169,12 @@ class DraftPicksCog(commands.Cog):
# Get user's team (CACHED via @cached_single_item)
team = await team_service.get_team_by_owner(
interaction.user.id,
config.sba_season
interaction.user.id, config.sba_season
)
if not team:
embed = await create_pick_illegal_embed(
"Not a GM",
"You are not registered as a team owner."
"Not a GM", "You are not registered as a team owner."
)
await interaction.followup.send(embed=embed)
return
@ -192,8 +183,7 @@ class DraftPicksCog(commands.Cog):
draft_data = await draft_service.get_draft_data()
if not draft_data:
embed = await create_pick_illegal_embed(
"Draft Not Found",
"Could not retrieve draft configuration."
"Draft Not Found", "Could not retrieve draft configuration."
)
await interaction.followup.send(embed=embed)
return
@ -202,21 +192,19 @@ class DraftPicksCog(commands.Cog):
if draft_data.paused:
embed = await create_pick_illegal_embed(
"Draft Paused",
"The draft is currently paused. Please wait for an administrator to resume."
"The draft is currently paused. Please wait for an administrator to resume.",
)
await interaction.followup.send(embed=embed)
return
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_season,
draft_data.currentpick
config.sba_season, draft_data.currentpick
)
if not current_pick or not current_pick.owner:
embed = await create_pick_illegal_embed(
"Invalid Pick",
f"Could not retrieve pick #{draft_data.currentpick}."
"Invalid Pick", f"Could not retrieve pick #{draft_data.currentpick}."
)
await interaction.followup.send(embed=embed)
return
@ -227,16 +215,14 @@ class DraftPicksCog(commands.Cog):
if current_pick.owner.id != team.id:
# Not on the clock - check for skipped picks
skipped_picks = await draft_pick_service.get_skipped_picks_for_team(
config.sba_season,
team.id,
draft_data.currentpick
config.sba_season, team.id, draft_data.currentpick
)
if not skipped_picks:
# No skipped picks - can't draft
embed = await create_pick_illegal_embed(
"Not Your Turn",
f"{current_pick.owner.sname} is on the clock for {format_pick_display(current_pick.overall)}."
f"{current_pick.owner.sname} is on the clock for {format_pick_display(current_pick.overall)}.",
)
await interaction.followup.send(embed=embed)
return
@ -249,12 +235,13 @@ class DraftPicksCog(commands.Cog):
)
# Get player
players = await player_service.get_players_by_name(player_name, config.sba_season)
players = await player_service.get_players_by_name(
player_name, config.sba_season
)
if not players:
embed = await create_pick_illegal_embed(
"Player Not Found",
f"Could not find player '{player_name}'."
"Player Not Found", f"Could not find player '{player_name}'."
)
await interaction.followup.send(embed=embed)
return
@ -264,55 +251,52 @@ class DraftPicksCog(commands.Cog):
# Validate player is FA
if player_obj.team_id != config.free_agent_team_id:
embed = await create_pick_illegal_embed(
"Player Not Available",
f"{player_obj.name} is not a free agent."
"Player Not Available", f"{player_obj.name} is not a free agent."
)
await interaction.followup.send(embed=embed)
return
# Validate cap space
roster = await team_service.get_team_roster(team.id, 'current')
roster = await team_service.get_team_roster(team.id, "current")
if not roster:
embed = await create_pick_illegal_embed(
"Roster Error",
f"Could not retrieve roster for {team.abbrev}."
"Roster Error", f"Could not retrieve roster for {team.abbrev}."
)
await interaction.followup.send(embed=embed)
return
is_valid, projected_total, cap_limit = await validate_cap_space(roster, player_obj.wara, team)
is_valid, projected_total, cap_limit = await validate_cap_space(
roster, player_obj.wara, team
)
if not is_valid:
embed = await create_pick_illegal_embed(
"Cap Space Exceeded",
f"Drafting {player_obj.name} would put you at {projected_total:.2f} sWAR (limit: {cap_limit:.2f})."
f"Drafting {player_obj.name} would put you at {projected_total:.2f} sWAR (limit: {cap_limit:.2f}).",
)
await interaction.followup.send(embed=embed)
return
# Execute pick (using pick_to_use which may be current or skipped pick)
updated_pick = await draft_pick_service.update_pick_selection(
pick_to_use.id,
player_obj.id
pick_to_use.id, player_obj.id
)
if not updated_pick:
embed = await create_pick_illegal_embed(
"Pick Failed",
"Failed to update draft pick. Please try again."
"Pick Failed", "Failed to update draft pick. Please try again."
)
await interaction.followup.send(embed=embed)
return
# Get current league state for dem_week calculation
from services.league_service import league_service
current = await league_service.get_current_state()
# Update player team with dem_week set to current.week + 2 for draft picks
updated_player = await player_service.update_player_team(
player_obj.id,
team.id,
dem_week=current.week + 2 if current else None
player_obj.id, team.id, dem_week=current.week + 2 if current else None
)
if not updated_player:
@ -324,7 +308,7 @@ class DraftPicksCog(commands.Cog):
pick=pick_to_use,
player=player_obj,
team=team,
guild=interaction.guild
guild=interaction.guild,
)
# Determine if this was a skipped pick
@ -332,11 +316,7 @@ class DraftPicksCog(commands.Cog):
# Send success message
success_embed = await create_pick_success_embed(
player_obj,
team,
pick_to_use.overall,
projected_total,
cap_limit
player_obj, team, pick_to_use.overall, projected_total, cap_limit
)
# Add note if this was a skipped pick
@ -348,7 +328,10 @@ class DraftPicksCog(commands.Cog):
await interaction.followup.send(embed=success_embed)
# Post draft card to ping channel (only if different from command channel)
if draft_data.ping_channel and draft_data.ping_channel != interaction.channel_id:
if (
draft_data.ping_channel
and draft_data.ping_channel != interaction.channel_id
):
guild = interaction.guild
if guild:
ping_channel = guild.get_channel(draft_data.ping_channel)
@ -369,7 +352,9 @@ class DraftPicksCog(commands.Cog):
if guild:
result_channel = guild.get_channel(draft_data.result_channel)
if result_channel:
result_card = await create_player_draft_card(player_obj, pick_to_use)
result_card = await create_player_draft_card(
player_obj, pick_to_use
)
# Add skipped pick context to result card
if is_skipped_pick:
@ -379,7 +364,9 @@ class DraftPicksCog(commands.Cog):
await result_channel.send(embed=result_card)
else:
self.logger.warning(f"Could not find result channel {draft_data.result_channel}")
self.logger.warning(
f"Could not find result channel {draft_data.result_channel}"
)
# Only advance the draft if this was the current pick (not a skipped pick)
if not is_skipped_pick:
@ -391,8 +378,7 @@ class DraftPicksCog(commands.Cog):
ping_channel = guild.get_channel(draft_data.ping_channel)
if ping_channel:
await self._post_on_clock_announcement(
ping_channel=ping_channel,
guild=guild
ping_channel=ping_channel, guild=guild
)
self.logger.info(
@ -402,12 +388,7 @@ class DraftPicksCog(commands.Cog):
)
async def _write_pick_to_sheets(
self,
draft_data,
pick,
player,
team,
guild: Optional[discord.Guild]
self, draft_data, pick, player, team, guild: Optional[discord.Guild]
):
"""
Write pick to Google Sheets (fire-and-forget with ping channel notification on failure).
@ -426,10 +407,12 @@ class DraftPicksCog(commands.Cog):
success = await draft_sheet_service.write_pick(
season=config.sba_season,
overall=pick.overall,
orig_owner_abbrev=pick.origowner.abbrev if pick.origowner else team.abbrev,
orig_owner_abbrev=(
pick.origowner.abbrev if pick.origowner else team.abbrev
),
owner_abbrev=team.abbrev,
player_name=player.name,
swar=player.wara
swar=player.wara,
)
if not success:
@ -439,7 +422,7 @@ class DraftPicksCog(commands.Cog):
channel_id=draft_data.ping_channel,
pick_overall=pick.overall,
player_name=player.name,
reason="Sheet write returned failure"
reason="Sheet write returned failure",
)
except Exception as e:
@ -450,7 +433,7 @@ class DraftPicksCog(commands.Cog):
channel_id=draft_data.ping_channel,
pick_overall=pick.overall,
player_name=player.name,
reason=str(e)
reason=str(e),
)
async def _notify_sheet_failure(
@ -459,7 +442,7 @@ class DraftPicksCog(commands.Cog):
channel_id: Optional[int],
pick_overall: int,
player_name: str,
reason: str
reason: str,
):
"""
Post notification to ping channel when sheet write fails.
@ -476,7 +459,7 @@ class DraftPicksCog(commands.Cog):
try:
channel = guild.get_channel(channel_id)
if channel and hasattr(channel, 'send'):
if channel and hasattr(channel, "send"):
await channel.send(
f"⚠️ **Sheet Sync Failed** - Pick #{pick_overall} ({player_name}) "
f"was not written to the draft sheet. "
@ -486,9 +469,7 @@ class DraftPicksCog(commands.Cog):
self.logger.error(f"Failed to send sheet failure notification: {e}")
async def _post_on_clock_announcement(
self,
ping_channel,
guild: discord.Guild
self, ping_channel, guild: discord.Guild
) -> None:
"""
Post the on-clock announcement embed for the next team with role ping.
@ -510,23 +491,26 @@ class DraftPicksCog(commands.Cog):
# Get the new current pick
next_pick = await draft_pick_service.get_pick(
config.sba_season,
updated_draft_data.currentpick
config.sba_season, updated_draft_data.currentpick
)
if not next_pick or not next_pick.owner:
self.logger.error(f"Could not get pick #{updated_draft_data.currentpick} for announcement")
self.logger.error(
f"Could not get pick #{updated_draft_data.currentpick} for announcement"
)
return
# Get recent picks (last 5 completed)
recent_picks = await draft_pick_service.get_recent_picks(
config.sba_season,
updated_draft_data.currentpick - 1, # Start from previous pick
limit=5
limit=5,
)
# Get team roster for sWAR calculation
team_roster = await roster_service.get_team_roster(next_pick.owner.id, "current")
team_roster = await roster_service.get_team_roster(
next_pick.owner.id, "current"
)
roster_swar = team_roster.total_wara if team_roster else 0.0
cap_limit = get_team_salary_cap(next_pick.owner)
@ -534,7 +518,9 @@ class DraftPicksCog(commands.Cog):
top_roster_players = []
if team_roster:
all_players = team_roster.all_players
sorted_players = sorted(all_players, key=lambda p: p.wara if p.wara else 0.0, reverse=True)
sorted_players = sorted(
all_players, key=lambda p: p.wara if p.wara else 0.0, reverse=True
)
top_roster_players = sorted_players[:5]
# Get sheet URL
@ -548,7 +534,7 @@ class DraftPicksCog(commands.Cog):
roster_swar=roster_swar,
cap_limit=cap_limit,
top_roster_players=top_roster_players,
sheet_url=sheet_url
sheet_url=sheet_url,
)
# Mention the team's role (using team.lname)
@ -557,10 +543,14 @@ class DraftPicksCog(commands.Cog):
if team_role:
team_mention = f"{team_role.mention} "
else:
self.logger.warning(f"Could not find role for team {next_pick.owner.lname}")
self.logger.warning(
f"Could not find role for team {next_pick.owner.lname}"
)
await ping_channel.send(content=team_mention, embed=embed)
self.logger.info(f"Posted on-clock announcement for pick #{updated_draft_data.currentpick}")
self.logger.info(
f"Posted on-clock announcement for pick #{updated_draft_data.currentpick}"
)
except Exception as e:
self.logger.error("Error posting on-clock announcement", error=e)

View File

@ -87,7 +87,7 @@ class BotConfig(BaseSettings):
# Application settings
log_level: str = "INFO"
environment: str = "development"
testing: bool = True
testing: bool = False
# Google Sheets settings
sheets_credentials_path: str = "/app/data/major-domo-service-creds.json"

View File

@ -3,7 +3,8 @@ Custom Command models for Discord Bot v2.0
Modern Pydantic models for the custom command system with full type safety.
"""
from datetime import datetime
from datetime import UTC, datetime
from typing import Optional
import re
@ -13,6 +14,7 @@ from models.base import SBABaseModel
class CustomCommandCreator(SBABaseModel):
"""Creator of custom commands."""
id: int = Field(..., description="Database ID") # type: ignore
discord_id: int = Field(..., description="Discord user ID")
username: str = Field(..., description="Discord username")
@ -24,16 +26,21 @@ class CustomCommandCreator(SBABaseModel):
class CustomCommand(SBABaseModel):
"""A custom command created by a user."""
id: int = Field(..., description="Database ID") # type: ignore
name: str = Field(..., description="Command name (unique)")
content: str = Field(..., description="Command response content")
creator_id: Optional[int] = Field(None, description="ID of the creator (may be missing from execute endpoint)")
creator_id: Optional[int] = Field(
None, description="ID of the creator (may be missing from execute endpoint)"
)
creator: Optional[CustomCommandCreator] = Field(None, description="Creator details")
# Timestamps
created_at: datetime = Field(..., description="When command was created") # type: ignore
updated_at: Optional[datetime] = Field(None, description="When command was last updated") # type: ignore
last_used: Optional[datetime] = Field(None, description="When command was last executed")
last_used: Optional[datetime] = Field(
None, description="When command was last executed"
)
# Usage tracking
use_count: int = Field(0, description="Total times command has been used")
@ -41,9 +48,11 @@ class CustomCommand(SBABaseModel):
# Metadata
is_active: bool = Field(True, description="Whether command is currently active")
tags: Optional[list[str]] = Field(None, description="Optional tags for categorization")
tags: Optional[list[str]] = Field(
None, description="Optional tags for categorization"
)
@field_validator('name')
@field_validator("name")
@classmethod
def validate_name(cls, v):
"""Validate command name."""
@ -59,20 +68,33 @@ class CustomCommand(SBABaseModel):
raise ValueError("Command name cannot exceed 32 characters")
# Character validation - only allow alphanumeric, dashes, underscores
if not re.match(r'^[a-z0-9_-]+$', name):
raise ValueError("Command name can only contain letters, numbers, dashes, and underscores")
if not re.match(r"^[a-z0-9_-]+$", name):
raise ValueError(
"Command name can only contain letters, numbers, dashes, and underscores"
)
# Reserved names
reserved = {
'help', 'ping', 'info', 'list', 'create', 'delete', 'edit',
'admin', 'mod', 'owner', 'bot', 'system', 'config'
"help",
"ping",
"info",
"list",
"create",
"delete",
"edit",
"admin",
"mod",
"owner",
"bot",
"system",
"config",
}
if name in reserved:
raise ValueError(f"'{name}' is a reserved command name")
return name.lower()
@field_validator('content')
@field_validator("content")
@classmethod
def validate_content(cls, v):
"""Validate command content."""
@ -86,7 +108,7 @@ class CustomCommand(SBABaseModel):
raise ValueError("Command content cannot exceed 2000 characters")
# Basic content filtering
prohibited = ['@everyone', '@here']
prohibited = ["@everyone", "@here"]
content_lower = content.lower()
for term in prohibited:
if term in content_lower:
@ -99,7 +121,7 @@ class CustomCommand(SBABaseModel):
"""Calculate days since last use."""
if not self.last_used:
return None
return (datetime.now() - self.last_used).days
return (datetime.now(UTC) - self.last_used).days
@property
def is_eligible_for_warning(self) -> bool:
@ -143,6 +165,7 @@ class CustomCommand(SBABaseModel):
class CustomCommandSearchFilters(BaseModel):
"""Filters for searching custom commands."""
name_contains: Optional[str] = None
creator_id: Optional[int] = None
creator_name: Optional[str] = None
@ -152,23 +175,33 @@ class CustomCommandSearchFilters(BaseModel):
is_active: bool = True
# Sorting options
sort_by: str = Field('name', description="Sort field: name, created_at, last_used, use_count, popularity")
sort_by: str = Field(
"name",
description="Sort field: name, created_at, last_used, use_count, popularity",
)
sort_desc: bool = Field(False, description="Sort in descending order")
# Pagination
page: int = Field(1, description="Page number (1-based)")
page_size: int = Field(25, description="Items per page")
@field_validator('sort_by')
@field_validator("sort_by")
@classmethod
def validate_sort_by(cls, v):
"""Validate sort field."""
valid_sorts = {'name', 'created_at', 'last_used', 'use_count', 'popularity', 'creator'}
valid_sorts = {
"name",
"created_at",
"last_used",
"use_count",
"popularity",
"creator",
}
if v not in valid_sorts:
raise ValueError(f"sort_by must be one of: {', '.join(valid_sorts)}")
return v
@field_validator('page')
@field_validator("page")
@classmethod
def validate_page(cls, v):
"""Validate page number."""
@ -176,7 +209,7 @@ class CustomCommandSearchFilters(BaseModel):
raise ValueError("Page number must be >= 1")
return v
@field_validator('page_size')
@field_validator("page_size")
@classmethod
def validate_page_size(cls, v):
"""Validate page size."""
@ -187,6 +220,7 @@ class CustomCommandSearchFilters(BaseModel):
class CustomCommandSearchResult(BaseModel):
"""Result of a custom command search."""
commands: list[CustomCommand]
total_count: int
page: int
@ -207,6 +241,7 @@ class CustomCommandSearchResult(BaseModel):
class CustomCommandStats(BaseModel):
"""Statistics about custom commands."""
total_commands: int
active_commands: int
total_creators: int

View File

@ -3,8 +3,9 @@ Draft configuration and state model
Represents the current draft settings and timer state.
"""
from typing import Optional
from datetime import datetime
from datetime import UTC, datetime
from pydantic import Field, field_validator
from models.base import SBABaseModel
@ -15,10 +16,18 @@ class DraftData(SBABaseModel):
currentpick: int = Field(0, description="Current pick number in progress")
timer: bool = Field(False, description="Whether draft timer is active")
paused: bool = Field(False, description="Whether draft is paused (blocks all picks)")
pick_deadline: Optional[datetime] = Field(None, description="Deadline for current pick")
result_channel: Optional[int] = Field(None, description="Discord channel ID for draft results")
ping_channel: Optional[int] = Field(None, description="Discord channel ID for draft pings")
paused: bool = Field(
False, description="Whether draft is paused (blocks all picks)"
)
pick_deadline: Optional[datetime] = Field(
None, description="Deadline for current pick"
)
result_channel: Optional[int] = Field(
None, description="Discord channel ID for draft results"
)
ping_channel: Optional[int] = Field(
None, description="Discord channel ID for draft pings"
)
pick_minutes: int = Field(1, description="Minutes allowed per pick")
@field_validator("result_channel", "ping_channel", mode="before")
@ -41,7 +50,7 @@ class DraftData(SBABaseModel):
"""Check if the current pick deadline has passed."""
if not self.pick_deadline:
return False
return datetime.now() > self.pick_deadline
return datetime.now(UTC) > self.pick_deadline
@property
def can_make_picks(self) -> bool:

View File

@ -5,7 +5,8 @@ Modern Pydantic models for the custom help system with full type safety.
Allows admins and help editors to create custom help topics for league documentation,
resources, FAQs, links, and guides.
"""
from datetime import datetime
from datetime import UTC, datetime
from typing import Optional
import re
@ -15,6 +16,7 @@ from models.base import SBABaseModel
class HelpCommand(SBABaseModel):
"""A help topic created by an admin or help editor."""
id: int = Field(..., description="Database ID") # type: ignore
name: str = Field(..., description="Help topic name (unique)")
title: str = Field(..., description="Display title")
@ -22,17 +24,23 @@ class HelpCommand(SBABaseModel):
category: Optional[str] = Field(None, description="Category for organization")
# Audit fields
created_by_discord_id: str = Field(..., description="Creator Discord ID (stored as text)")
created_by_discord_id: str = Field(
..., description="Creator Discord ID (stored as text)"
)
created_at: datetime = Field(..., description="When help topic was created") # type: ignore
updated_at: Optional[datetime] = Field(None, description="When help topic was last updated") # type: ignore
last_modified_by: Optional[str] = Field(None, description="Discord ID of last editor (stored as text)")
last_modified_by: Optional[str] = Field(
None, description="Discord ID of last editor (stored as text)"
)
# Status and metrics
is_active: bool = Field(True, description="Whether help topic is active (soft delete)")
is_active: bool = Field(
True, description="Whether help topic is active (soft delete)"
)
view_count: int = Field(0, description="Number of times viewed")
display_order: int = Field(0, description="Sort order for display")
@field_validator('name')
@field_validator("name")
@classmethod
def validate_name(cls, v):
"""Validate help topic name."""
@ -48,12 +56,14 @@ class HelpCommand(SBABaseModel):
raise ValueError("Help topic name cannot exceed 32 characters")
# Character validation - only allow alphanumeric, dashes, underscores
if not re.match(r'^[a-z0-9_-]+$', name):
raise ValueError("Help topic name can only contain letters, numbers, dashes, and underscores")
if not re.match(r"^[a-z0-9_-]+$", name):
raise ValueError(
"Help topic name can only contain letters, numbers, dashes, and underscores"
)
return name.lower()
@field_validator('title')
@field_validator("title")
@classmethod
def validate_title(cls, v):
"""Validate help topic title."""
@ -68,7 +78,7 @@ class HelpCommand(SBABaseModel):
return title
@field_validator('content')
@field_validator("content")
@classmethod
def validate_content(cls, v):
"""Validate help topic content."""
@ -86,7 +96,7 @@ class HelpCommand(SBABaseModel):
return content
@field_validator('category')
@field_validator("category")
@classmethod
def validate_category(cls, v):
"""Validate category if provided."""
@ -103,8 +113,10 @@ class HelpCommand(SBABaseModel):
raise ValueError("Category cannot exceed 50 characters")
# Character validation
if not re.match(r'^[a-z0-9_-]+$', category):
raise ValueError("Category can only contain letters, numbers, dashes, and underscores")
if not re.match(r"^[a-z0-9_-]+$", category):
raise ValueError(
"Category can only contain letters, numbers, dashes, and underscores"
)
return category
@ -118,12 +130,12 @@ class HelpCommand(SBABaseModel):
"""Calculate days since last update."""
if not self.updated_at:
return None
return (datetime.now() - self.updated_at).days
return (datetime.now(UTC) - self.updated_at).days
@property
def days_since_creation(self) -> int:
"""Calculate days since creation."""
return (datetime.now() - self.created_at).days
return (datetime.now(UTC) - self.created_at).days
@property
def popularity_score(self) -> float:
@ -153,28 +165,40 @@ class HelpCommand(SBABaseModel):
class HelpCommandSearchFilters(BaseModel):
"""Filters for searching help commands."""
name_contains: Optional[str] = None
category: Optional[str] = None
is_active: bool = True
# Sorting
sort_by: str = Field('name', description="Sort field: name, category, created_at, view_count, display_order")
sort_by: str = Field(
"name",
description="Sort field: name, category, created_at, view_count, display_order",
)
sort_desc: bool = Field(False, description="Sort in descending order")
# Pagination
page: int = Field(1, description="Page number (1-based)")
page_size: int = Field(25, description="Items per page")
@field_validator('sort_by')
@field_validator("sort_by")
@classmethod
def validate_sort_by(cls, v):
"""Validate sort field."""
valid_sorts = {'name', 'title', 'category', 'created_at', 'updated_at', 'view_count', 'display_order'}
valid_sorts = {
"name",
"title",
"category",
"created_at",
"updated_at",
"view_count",
"display_order",
}
if v not in valid_sorts:
raise ValueError(f"sort_by must be one of: {', '.join(valid_sorts)}")
return v
@field_validator('page')
@field_validator("page")
@classmethod
def validate_page(cls, v):
"""Validate page number."""
@ -182,7 +206,7 @@ class HelpCommandSearchFilters(BaseModel):
raise ValueError("Page number must be >= 1")
return v
@field_validator('page_size')
@field_validator("page_size")
@classmethod
def validate_page_size(cls, v):
"""Validate page size."""
@ -193,6 +217,7 @@ class HelpCommandSearchFilters(BaseModel):
class HelpCommandSearchResult(BaseModel):
"""Result of a help command search."""
help_commands: list[HelpCommand]
total_count: int
page: int
@ -213,6 +238,7 @@ class HelpCommandSearchResult(BaseModel):
class HelpCommandStats(BaseModel):
"""Statistics about help commands."""
total_commands: int
active_commands: int
total_views: int

View File

@ -3,8 +3,9 @@ Custom Commands Service for Discord Bot v2.0
Modern async service layer for managing custom commands with full type safety.
"""
import math
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from typing import Optional, List, Any, Tuple
from utils.logging import get_contextual_logger
@ -13,7 +14,7 @@ from models.custom_command import (
CustomCommandCreator,
CustomCommandSearchFilters,
CustomCommandSearchResult,
CustomCommandStats
CustomCommandStats,
)
from services.base_service import BaseService
from exceptions import BotException
@ -21,16 +22,19 @@ from exceptions import BotException
class CustomCommandNotFoundError(BotException):
"""Raised when a custom command is not found."""
pass
class CustomCommandExistsError(BotException):
"""Raised when trying to create a command that already exists."""
pass
class CustomCommandPermissionError(BotException):
"""Raised when user lacks permission for command operation."""
pass
@ -38,8 +42,8 @@ class CustomCommandsService(BaseService[CustomCommand]):
"""Service for managing custom commands."""
def __init__(self):
super().__init__(CustomCommand, 'custom_commands')
self.logger = get_contextual_logger(f'{__name__}.CustomCommandsService')
super().__init__(CustomCommand, "custom_commands")
self.logger = get_contextual_logger(f"{__name__}.CustomCommandsService")
self.logger.info("CustomCommandsService initialized")
# === Command CRUD Operations ===
@ -51,7 +55,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
creator_discord_id: int,
creator_username: str,
creator_display_name: Optional[str] = None,
tags: Optional[List[str]] = None
tags: Optional[List[str]] = None,
) -> CustomCommand:
"""
Create a new custom command.
@ -83,21 +87,21 @@ class CustomCommandsService(BaseService[CustomCommand]):
creator = await self.get_or_create_creator(
discord_id=creator_discord_id,
username=creator_username,
display_name=creator_display_name
display_name=creator_display_name,
)
# Create command data
now = datetime.now()
now = datetime.now(UTC)
command_data = {
'name': name.lower().strip(),
'content': content.strip(),
'creator_id': creator.id,
'created_at': now.isoformat(),
'last_used': now.isoformat(), # Set initial last_used to creation time
'use_count': 0,
'warning_sent': False,
'is_active': True,
'tags': tags or []
"name": name.lower().strip(),
"content": content.strip(),
"creator_id": creator.id,
"created_at": now.isoformat(),
"last_used": now.isoformat(), # Set initial last_used to creation time
"use_count": 0,
"warning_sent": False,
"is_active": True,
"tags": tags or [],
}
# Create via API
@ -108,18 +112,17 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Update creator stats
await self._update_creator_stats(creator.id)
self.logger.info("Custom command created",
self.logger.info(
"Custom command created",
command_name=name,
creator_id=creator_discord_id,
content_length=len(content))
content_length=len(content),
)
# Return full command with creator info
return await self.get_command_by_name(name)
async def get_command_by_name(
self,
name: str
) -> CustomCommand:
async def get_command_by_name(self, name: str) -> CustomCommand:
"""
Get a custom command by name.
@ -137,7 +140,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
try:
# Use the dedicated by_name endpoint for exact lookup
client = await self.get_client()
data = await client.get(f'custom_commands/by_name/{normalized_name}')
data = await client.get(f"custom_commands/by_name/{normalized_name}")
if not data:
raise CustomCommandNotFoundError(f"Custom command '{name}' not found")
@ -149,9 +152,9 @@ class CustomCommandsService(BaseService[CustomCommand]):
if "404" in str(e) or "not found" in str(e).lower():
raise CustomCommandNotFoundError(f"Custom command '{name}' not found")
else:
self.logger.error("Failed to get command by name",
command_name=name,
error=e)
self.logger.error(
"Failed to get command by name", command_name=name, error=e
)
raise BotException(f"Failed to retrieve command '{name}': {e}")
async def update_command(
@ -159,7 +162,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
name: str,
new_content: str,
updater_discord_id: int,
new_tags: Optional[List[str]] = None
new_tags: Optional[List[str]] = None,
) -> CustomCommand:
"""
Update an existing custom command.
@ -185,40 +188,39 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Prepare update data - include all required fields to avoid NULL constraints
update_data = {
'name': command.name,
'content': new_content.strip(),
'creator_id': command.creator_id,
'created_at': command.created_at.isoformat(), # Preserve original creation time
'updated_at': datetime.now().isoformat(),
'last_used': command.last_used.isoformat() if command.last_used else None,
'warning_sent': False, # Reset warning if command is updated
'is_active': command.is_active, # Preserve active status
'use_count': command.use_count # Preserve usage count
"name": command.name,
"content": new_content.strip(),
"creator_id": command.creator_id,
"created_at": command.created_at.isoformat(), # Preserve original creation time
"updated_at": datetime.now(UTC).isoformat(),
"last_used": command.last_used.isoformat() if command.last_used else None,
"warning_sent": False, # Reset warning if command is updated
"is_active": command.is_active, # Preserve active status
"use_count": command.use_count, # Preserve usage count
}
if new_tags is not None:
update_data['tags'] = new_tags
update_data["tags"] = new_tags
else:
# Preserve existing tags if not being updated
update_data['tags'] = command.tags
update_data["tags"] = command.tags
# Update via API
result = await self.update_item_by_field('name', name, update_data)
result = await self.update_item_by_field("name", name, update_data)
if not result:
raise BotException("Failed to update custom command")
self.logger.info("Custom command updated",
self.logger.info(
"Custom command updated",
command_name=name,
updater_id=updater_discord_id,
new_content_length=len(new_content))
new_content_length=len(new_content),
)
return await self.get_command_by_name(name)
async def delete_command(
self,
name: str,
deleter_discord_id: int,
force: bool = False
self, name: str, deleter_discord_id: int, force: bool = False
) -> bool:
"""
Delete a custom command.
@ -239,20 +241,24 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Check permissions (unless force delete)
if not force and command.creator.discord_id != deleter_discord_id:
raise CustomCommandPermissionError("You can only delete commands you created")
raise CustomCommandPermissionError(
"You can only delete commands you created"
)
# Delete via API
result = await self.delete_item_by_field('name', name)
result = await self.delete_item_by_field("name", name)
if not result:
raise BotException("Failed to delete custom command")
# Update creator stats
await self._update_creator_stats(command.creator_id)
self.logger.info("Custom command deleted",
self.logger.info(
"Custom command deleted",
command_name=name,
deleter_id=deleter_discord_id,
was_forced=force)
was_forced=force,
)
return True
@ -274,7 +280,9 @@ class CustomCommandsService(BaseService[CustomCommand]):
try:
# Use the dedicated execute endpoint which updates stats and returns the command
client = await self.get_client()
data = await client.patch(f'custom_commands/by_name/{normalized_name}/execute')
data = await client.patch(
f"custom_commands/by_name/{normalized_name}/execute"
)
if not data:
raise CustomCommandNotFoundError(f"Custom command '{name}' not found")
@ -282,9 +290,11 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Convert API data to CustomCommand
updated_command = self.model_class.from_api_data(data)
self.logger.debug("Custom command executed",
self.logger.debug(
"Custom command executed",
command_name=name,
new_use_count=updated_command.use_count)
new_use_count=updated_command.use_count,
)
return updated_command, updated_command.content
@ -292,16 +302,15 @@ class CustomCommandsService(BaseService[CustomCommand]):
if "404" in str(e) or "not found" in str(e).lower():
raise CustomCommandNotFoundError(f"Custom command '{name}' not found")
else:
self.logger.error("Failed to execute command",
command_name=name,
error=e)
self.logger.error(
"Failed to execute command", command_name=name, error=e
)
raise BotException(f"Failed to execute command '{name}': {e}")
# === Search and Listing ===
async def search_commands(
self,
filters: CustomCommandSearchFilters
self, filters: CustomCommandSearchFilters
) -> CustomCommandSearchResult:
"""
Search for custom commands with filtering and pagination.
@ -317,25 +326,25 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Apply filters
if filters.name_contains:
params.append(('name__icontains', filters.name_contains))
params.append(("name__icontains", filters.name_contains))
if filters.creator_id:
params.append(('creator_id', filters.creator_id))
params.append(("creator_id", filters.creator_id))
if filters.min_uses:
params.append(('use_count__gte', filters.min_uses))
params.append(("use_count__gte", filters.min_uses))
if filters.max_days_unused:
cutoff_date = datetime.now() - timedelta(days=filters.max_days_unused)
params.append(('last_used__gte', cutoff_date.isoformat()))
cutoff_date = datetime.now(UTC) - timedelta(days=filters.max_days_unused)
params.append(("last_used__gte", cutoff_date.isoformat()))
params.append(('is_active', filters.is_active))
params.append(("is_active", filters.is_active))
# Add sorting
sort_field = filters.sort_by
if filters.sort_desc:
sort_field = f'-{sort_field}'
params.append(('sort', sort_field))
sort_field = f"-{sort_field}"
params.append(("sort", sort_field))
# Get total count for pagination
total_count = await self._get_search_count(params)
@ -343,10 +352,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Add pagination
offset = (filters.page - 1) * filters.page_size
params.extend([
('limit', filters.page_size),
('offset', offset)
])
params.extend([("limit", filters.page_size), ("offset", offset)])
# Execute search
commands_data = await self.get_items_with_params(params)
@ -357,10 +363,14 @@ class CustomCommandsService(BaseService[CustomCommand]):
# The API now returns complete creator data, so we can use it directly
commands.append(cmd_data)
self.logger.debug("Custom commands search completed",
self.logger.debug(
"Custom commands search completed",
total_results=total_count,
page=filters.page,
filters_applied=len([p for p in params if not p[0] in ['sort', 'limit', 'offset']]))
filters_applied=len(
[p for p in params if not p[0] in ["sort", "limit", "offset"]]
),
)
return CustomCommandSearchResult(
commands=commands,
@ -368,14 +378,11 @@ class CustomCommandsService(BaseService[CustomCommand]):
page=filters.page,
page_size=filters.page_size,
total_pages=total_pages,
has_more=filters.page < total_pages
has_more=filters.page < total_pages,
)
async def get_commands_by_creator(
self,
creator_discord_id: int,
page: int = 1,
page_size: int = 25
self, creator_discord_id: int, page: int = 1, page_size: int = 25
) -> CustomCommandSearchResult:
"""Get all commands created by a specific user."""
try:
@ -383,14 +390,14 @@ class CustomCommandsService(BaseService[CustomCommand]):
client = await self.get_client()
params = [
('creator_discord_id', creator_discord_id),
('is_active', True),
('sort', 'name'),
('page', page),
('page_size', page_size)
("creator_discord_id", creator_discord_id),
("is_active", True),
("sort", "name"),
("page", page),
("page_size", page_size),
]
data = await client.get('custom_commands', params=params)
data = await client.get("custom_commands", params=params)
if not data:
return CustomCommandSearchResult(
@ -399,14 +406,14 @@ class CustomCommandsService(BaseService[CustomCommand]):
page=page,
page_size=page_size,
total_pages=0,
has_more=False
has_more=False,
)
# Extract response data
custom_commands = data.get('custom_commands', [])
total_count = data.get('total_count', 0)
total_pages = data.get('total_pages', 0)
has_more = data.get('has_more', False)
custom_commands = data.get("custom_commands", [])
total_count = data.get("total_count", 0)
total_pages = data.get("total_pages", 0)
has_more = data.get("has_more", False)
# Convert to CustomCommand objects (creator data is included in API response)
commands = []
@ -414,15 +421,19 @@ class CustomCommandsService(BaseService[CustomCommand]):
try:
commands.append(self.model_class.from_api_data(cmd_data))
except Exception as e:
self.logger.warning("Failed to create CustomCommand from API data",
command_id=cmd_data.get('id'),
error=e)
self.logger.warning(
"Failed to create CustomCommand from API data",
command_id=cmd_data.get("id"),
error=e,
)
continue
self.logger.debug("Got commands by creator",
self.logger.debug(
"Got commands by creator",
creator_discord_id=creator_discord_id,
returned_commands=len(commands),
total_count=total_count)
total_count=total_count,
)
return CustomCommandSearchResult(
commands=commands,
@ -430,13 +441,15 @@ class CustomCommandsService(BaseService[CustomCommand]):
page=page,
page_size=page_size,
total_pages=total_pages,
has_more=has_more
has_more=has_more,
)
except Exception as e:
self.logger.error("Failed to get commands by creator",
self.logger.error(
"Failed to get commands by creator",
creator_discord_id=creator_discord_id,
error=e)
error=e,
)
# Return empty result on error
return CustomCommandSearchResult(
commands=[],
@ -444,16 +457,12 @@ class CustomCommandsService(BaseService[CustomCommand]):
page=page,
page_size=page_size,
total_pages=0,
has_more=False
has_more=False,
)
async def get_popular_commands(self, limit: int = 10) -> List[CustomCommand]:
"""Get the most popular commands by usage."""
params = [
('is_active', True),
('sort', '-use_count'),
('limit', limit)
]
params = [("is_active", True), ("sort", "-use_count"), ("limit", limit)]
commands_data = await self.get_items_with_params(params)
@ -464,19 +473,19 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning("Skipping popular command with missing creator",
self.logger.warning(
"Skipping popular command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(e))
error=str(e),
)
continue
return commands
async def get_command_names_for_autocomplete(
self,
partial_name: str = "",
limit: int = 25
self, partial_name: str = "", limit: int = 25
) -> List[str]:
"""
Get command names for Discord autocomplete.
@ -491,35 +500,35 @@ class CustomCommandsService(BaseService[CustomCommand]):
try:
# Use the dedicated autocomplete endpoint for better performance
client = await self.get_client()
params = [('limit', limit)]
params = [("limit", limit)]
if partial_name:
params.append(('partial_name', partial_name.lower()))
params.append(("partial_name", partial_name.lower()))
result = await client.get('custom_commands/autocomplete', params=params)
result = await client.get("custom_commands/autocomplete", params=params)
# The autocomplete endpoint returns a list of strings directly
if isinstance(result, list):
return result
else:
self.logger.warning("Unexpected autocomplete response format",
response=result)
self.logger.warning(
"Unexpected autocomplete response format", response=result
)
return []
except Exception as e:
self.logger.error("Failed to get command names for autocomplete",
self.logger.error(
"Failed to get command names for autocomplete",
partial_name=partial_name,
error=e)
error=e,
)
# Return empty list on error to not break Discord autocomplete
return []
# === Creator Management ===
async def get_or_create_creator(
self,
discord_id: int,
username: str,
display_name: Optional[str] = None
self, discord_id: int, username: str, display_name: Optional[str] = None
) -> CustomCommandCreator:
"""Get existing creator or create a new one."""
try:
@ -535,15 +544,17 @@ class CustomCommandsService(BaseService[CustomCommand]):
# Create new creator
creator_data = {
'discord_id': discord_id,
'username': username,
'display_name': display_name,
'created_at': datetime.now().isoformat(),
'total_commands': 0,
'active_commands': 0
"discord_id": discord_id,
"username": username,
"display_name": display_name,
"created_at": datetime.now(UTC).isoformat(),
"total_commands": 0,
"active_commands": 0,
}
result = await self.create_item_in_table('custom_commands/creators', creator_data)
result = await self.create_item_in_table(
"custom_commands/creators", creator_data
)
if not result:
raise BotException("Failed to create command creator")
@ -557,12 +568,14 @@ class CustomCommandsService(BaseService[CustomCommand]):
"""
try:
client = await self.get_client()
data = await client.get('custom_commands/creators', params=[('discord_id', discord_id)])
data = await client.get(
"custom_commands/creators", params=[("discord_id", discord_id)]
)
if not data or not data.get('creators'):
if not data or not data.get("creators"):
raise BotException(f"Creator with Discord ID {discord_id} not found")
creators = data['creators']
creators = data["creators"]
if not creators:
raise BotException(f"Creator with Discord ID {discord_id} not found")
@ -572,9 +585,11 @@ class CustomCommandsService(BaseService[CustomCommand]):
if "not found" in str(e).lower():
raise BotException(f"Creator with Discord ID {discord_id} not found")
else:
self.logger.error("Failed to get creator by Discord ID",
self.logger.error(
"Failed to get creator by Discord ID",
discord_id=discord_id,
error=e)
error=e,
)
raise BotException(f"Failed to retrieve creator: {e}")
async def get_creator_by_id(self, creator_id: int) -> CustomCommandCreator:
@ -584,8 +599,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
BotException: If creator not found
"""
creators = await self.get_items_from_table_with_params(
'custom_commands/creators',
[('id', creator_id)]
"custom_commands/creators", [("id", creator_id)]
)
if not creators:
@ -599,11 +613,11 @@ class CustomCommandsService(BaseService[CustomCommand]):
"""Get comprehensive statistics about custom commands."""
# Get basic counts
total_commands = await self._get_search_count([])
active_commands = await self._get_search_count([('is_active', True)])
active_commands = await self._get_search_count([("is_active", True)])
total_creators = await self._get_creator_count()
# Get total uses
all_commands = await self.get_items_with_params([('is_active', True)])
all_commands = await self.get_items_with_params([("is_active", True)])
total_uses = sum(cmd.use_count for cmd in all_commands)
# Get most popular command
@ -614,11 +628,10 @@ class CustomCommandsService(BaseService[CustomCommand]):
most_active_creator = await self._get_most_active_creator()
# Get recent commands count
week_ago = datetime.now() - timedelta(days=7)
recent_count = await self._get_search_count([
('created_at__gte', week_ago.isoformat()),
('is_active', True)
])
week_ago = datetime.now(UTC) - timedelta(days=7)
recent_count = await self._get_search_count(
[("created_at__gte", week_ago.isoformat()), ("is_active", True)]
)
# Get cleanup statistics
warning_count = await self._get_commands_needing_warning_count()
@ -633,19 +646,19 @@ class CustomCommandsService(BaseService[CustomCommand]):
most_active_creator=most_active_creator,
recent_commands_count=recent_count,
commands_needing_warning=warning_count,
commands_eligible_for_deletion=deletion_count
commands_eligible_for_deletion=deletion_count,
)
# === Cleanup Operations ===
async def get_commands_needing_warning(self) -> List[CustomCommand]:
"""Get commands that need deletion warning (60+ days unused)."""
cutoff_date = datetime.now() - timedelta(days=60)
cutoff_date = datetime.now(UTC) - timedelta(days=60)
params = [
('last_used__lt', cutoff_date.isoformat()),
('warning_sent', False),
('is_active', True)
("last_used__lt", cutoff_date.isoformat()),
("warning_sent", False),
("is_active", True),
]
commands_data = await self.get_items_with_params(params)
@ -657,23 +670,22 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning("Skipping command with missing creator",
self.logger.warning(
"Skipping command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(e))
error=str(e),
)
continue
return commands
async def get_commands_eligible_for_deletion(self) -> List[CustomCommand]:
"""Get commands eligible for deletion (90+ days unused)."""
cutoff_date = datetime.now() - timedelta(days=90)
cutoff_date = datetime.now(UTC) - timedelta(days=90)
params = [
('last_used__lt', cutoff_date.isoformat()),
('is_active', True)
]
params = [("last_used__lt", cutoff_date.isoformat()), ("is_active", True)]
commands_data = await self.get_items_with_params(params)
@ -684,11 +696,13 @@ class CustomCommandsService(BaseService[CustomCommand]):
commands.append(CustomCommand(**cmd_data.model_dump(), creator=creator))
except BotException as e:
# Handle missing creator gracefully
self.logger.warning("Skipping command with missing creator",
self.logger.warning(
"Skipping command with missing creator",
command_id=cmd_data.id,
command_name=cmd_data.name,
creator_id=cmd_data.creator_id,
error=str(e))
error=str(e),
)
continue
return commands
@ -696,9 +710,7 @@ class CustomCommandsService(BaseService[CustomCommand]):
async def mark_warning_sent(self, command_name: str) -> bool:
"""Mark that a deletion warning has been sent for a command."""
result = await self.update_item_by_field(
'name',
command_name,
{'warning_sent': True}
"name", command_name, {"warning_sent": True}
)
return bool(result)
@ -708,12 +720,14 @@ class CustomCommandsService(BaseService[CustomCommand]):
for name in command_names:
try:
await self.delete_item_by_field('name', name)
await self.delete_item_by_field("name", name)
deleted_count += 1
except Exception as e:
self.logger.error("Failed to delete command during bulk delete",
self.logger.error(
"Failed to delete command during bulk delete",
command_name=name,
error=e)
error=e,
)
return deleted_count
@ -722,32 +736,33 @@ class CustomCommandsService(BaseService[CustomCommand]):
async def _update_creator_stats(self, creator_id: int) -> None:
"""Update creator statistics."""
# Count total and active commands
total = await self._get_search_count([('creator_id', creator_id)])
active = await self._get_search_count([('creator_id', creator_id), ('is_active', True)])
total = await self._get_search_count([("creator_id", creator_id)])
active = await self._get_search_count(
[("creator_id", creator_id), ("is_active", True)]
)
# Update creator via API
try:
client = await self.get_client()
await client.put('custom_commands/creators', {
'total_commands': total,
'active_commands': active
}, object_id=creator_id)
await client.put(
"custom_commands/creators",
{"total_commands": total, "active_commands": active},
object_id=creator_id,
)
except Exception as e:
self.logger.error(f"Failed to update creator {creator_id} stats: {e}")
async def _update_creator_info(
self,
creator_id: int,
username: str,
display_name: Optional[str]
self, creator_id: int, username: str, display_name: Optional[str]
) -> None:
"""Update creator username and display name."""
try:
client = await self.get_client()
await client.put('custom_commands/creators', {
'username': username,
'display_name': display_name
}, object_id=creator_id)
await client.put(
"custom_commands/creators",
{"username": username, "display_name": display_name},
object_id=creator_id,
)
except Exception as e:
self.logger.error(f"Failed to update creator {creator_id} info: {e}")
@ -758,14 +773,15 @@ class CustomCommandsService(BaseService[CustomCommand]):
async def _get_creator_count(self) -> int:
"""Get total number of creators."""
creators = await self.get_items_from_table_with_params('custom_commands/creators', [])
creators = await self.get_items_from_table_with_params(
"custom_commands/creators", []
)
return len(creators)
async def _get_most_active_creator(self) -> Optional[CustomCommandCreator]:
"""Get creator with most active commands."""
creators = await self.get_items_from_table_with_params(
'custom_commands/creators',
[('sort', '-active_commands'), ('limit', 1)]
"custom_commands/creators", [("sort", "-active_commands"), ("limit", 1)]
)
if not creators:
@ -775,20 +791,21 @@ class CustomCommandsService(BaseService[CustomCommand]):
async def _get_commands_needing_warning_count(self) -> int:
"""Get count of commands needing warning."""
cutoff_date = datetime.now() - timedelta(days=60)
return await self._get_search_count([
('last_used__lt', cutoff_date.isoformat()),
('warning_sent', False),
('is_active', True)
])
cutoff_date = datetime.now(UTC) - timedelta(days=60)
return await self._get_search_count(
[
("last_used__lt", cutoff_date.isoformat()),
("warning_sent", False),
("is_active", True),
]
)
async def _get_commands_eligible_for_deletion_count(self) -> int:
"""Get count of commands eligible for deletion."""
cutoff_date = datetime.now() - timedelta(days=90)
return await self._get_search_count([
('last_used__lt', cutoff_date.isoformat()),
('is_active', True)
])
cutoff_date = datetime.now(UTC) - timedelta(days=90)
return await self._get_search_count(
[("last_used__lt", cutoff_date.isoformat()), ("is_active", True)]
)
# Global service instance

View File

@ -3,14 +3,15 @@ Draft service for Discord Bot v2.0
Core draft business logic and state management. NO CACHING - draft state changes constantly.
"""
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from services.base_service import BaseService
from models.draft_data import DraftData
logger = logging.getLogger(f'{__name__}.DraftService')
logger = logging.getLogger(f"{__name__}.DraftService")
class DraftService(BaseService[DraftData]):
@ -29,7 +30,7 @@ class DraftService(BaseService[DraftData]):
def __init__(self):
"""Initialize draft service."""
super().__init__(DraftData, 'draftdata')
super().__init__(DraftData, "draftdata")
logger.debug("DraftService initialized")
async def get_draft_data(self) -> Optional[DraftData]:
@ -62,9 +63,7 @@ class DraftService(BaseService[DraftData]):
return None
async def update_draft_data(
self,
draft_id: int,
updates: Dict[str, Any]
self, draft_id: int, updates: Dict[str, Any]
) -> Optional[DraftData]:
"""
Update draft configuration.
@ -92,10 +91,7 @@ class DraftService(BaseService[DraftData]):
return None
async def set_timer(
self,
draft_id: int,
active: bool,
pick_minutes: Optional[int] = None
self, draft_id: int, active: bool, pick_minutes: Optional[int] = None
) -> Optional[DraftData]:
"""
Enable or disable draft timer.
@ -109,27 +105,31 @@ class DraftService(BaseService[DraftData]):
Updated DraftData instance
"""
try:
updates = {'timer': active}
updates = {"timer": active}
if pick_minutes is not None:
updates['pick_minutes'] = pick_minutes
updates["pick_minutes"] = pick_minutes
# Set deadline based on timer state
if active:
# Calculate new deadline
if pick_minutes:
deadline = datetime.now() + timedelta(minutes=pick_minutes)
deadline = datetime.now(UTC) + timedelta(minutes=pick_minutes)
else:
# Get current pick_minutes from existing data
current_data = await self.get_draft_data()
if current_data:
deadline = datetime.now() + timedelta(minutes=current_data.pick_minutes)
deadline = datetime.now(UTC) + timedelta(
minutes=current_data.pick_minutes
)
else:
deadline = datetime.now() + timedelta(minutes=2) # Default fallback
updates['pick_deadline'] = deadline
deadline = datetime.now(UTC) + timedelta(
minutes=2
) # Default fallback
updates["pick_deadline"] = deadline
else:
# Set deadline far in future when timer inactive
updates['pick_deadline'] = datetime.now() + timedelta(days=690)
updates["pick_deadline"] = datetime.now(UTC) + timedelta(days=690)
updated = await self.update_draft_data(draft_id, updates)
@ -146,9 +146,7 @@ class DraftService(BaseService[DraftData]):
return None
async def advance_pick(
self,
draft_id: int,
current_pick: int
self, draft_id: int, current_pick: int
) -> Optional[DraftData]:
"""
Advance to next pick in draft.
@ -199,12 +197,14 @@ class DraftService(BaseService[DraftData]):
return await self.get_draft_data()
# Update to next pick
updates = {'currentpick': next_pick}
updates = {"currentpick": next_pick}
# Reset deadline if timer is active
current_data = await self.get_draft_data()
if current_data and current_data.timer:
updates['pick_deadline'] = datetime.now() + timedelta(minutes=current_data.pick_minutes)
updates["pick_deadline"] = datetime.now(UTC) + timedelta(
minutes=current_data.pick_minutes
)
updated = await self.update_draft_data(draft_id, updates)
@ -220,10 +220,7 @@ class DraftService(BaseService[DraftData]):
return None
async def set_current_pick(
self,
draft_id: int,
overall: int,
reset_timer: bool = True
self, draft_id: int, overall: int, reset_timer: bool = True
) -> Optional[DraftData]:
"""
Manually set current pick (admin operation).
@ -237,12 +234,14 @@ class DraftService(BaseService[DraftData]):
Updated DraftData
"""
try:
updates = {'currentpick': overall}
updates = {"currentpick": overall}
if reset_timer:
current_data = await self.get_draft_data()
if current_data and current_data.timer:
updates['pick_deadline'] = datetime.now() + timedelta(minutes=current_data.pick_minutes)
updates["pick_deadline"] = datetime.now(UTC) + timedelta(
minutes=current_data.pick_minutes
)
updated = await self.update_draft_data(draft_id, updates)
@ -261,7 +260,7 @@ class DraftService(BaseService[DraftData]):
self,
draft_id: int,
ping_channel_id: Optional[int] = None,
result_channel_id: Optional[int] = None
result_channel_id: Optional[int] = None,
) -> Optional[DraftData]:
"""
Update draft Discord channel configuration.
@ -277,9 +276,9 @@ class DraftService(BaseService[DraftData]):
try:
updates = {}
if ping_channel_id is not None:
updates['ping_channel'] = ping_channel_id
updates["ping_channel"] = ping_channel_id
if result_channel_id is not None:
updates['result_channel'] = result_channel_id
updates["result_channel"] = result_channel_id
if not updates:
logger.warning("No channel updates provided")
@ -299,9 +298,7 @@ class DraftService(BaseService[DraftData]):
return None
async def reset_draft_deadline(
self,
draft_id: int,
minutes: Optional[int] = None
self, draft_id: int, minutes: Optional[int] = None
) -> Optional[DraftData]:
"""
Reset the current pick deadline.
@ -321,8 +318,8 @@ class DraftService(BaseService[DraftData]):
return None
minutes = current_data.pick_minutes
new_deadline = datetime.now() + timedelta(minutes=minutes)
updates = {'pick_deadline': new_deadline}
new_deadline = datetime.now(UTC) + timedelta(minutes=minutes)
updates = {"pick_deadline": new_deadline}
updated = await self.update_draft_data(draft_id, updates)
@ -357,9 +354,9 @@ class DraftService(BaseService[DraftData]):
# Pause the draft AND stop the timer
# Set deadline far in future so it doesn't expire while paused
updates = {
'paused': True,
'timer': False,
'pick_deadline': datetime.now() + timedelta(days=690)
"paused": True,
"timer": False,
"pick_deadline": datetime.now(UTC) + timedelta(days=690),
}
updated = await self.update_draft_data(draft_id, updates)
@ -394,16 +391,14 @@ class DraftService(BaseService[DraftData]):
pick_minutes = current_data.pick_minutes if current_data else 2
# Resume the draft AND restart the timer with fresh deadline
new_deadline = datetime.now() + timedelta(minutes=pick_minutes)
updates = {
'paused': False,
'timer': True,
'pick_deadline': new_deadline
}
new_deadline = datetime.now(UTC) + timedelta(minutes=pick_minutes)
updates = {"paused": False, "timer": True, "pick_deadline": new_deadline}
updated = await self.update_draft_data(draft_id, updates)
if updated:
logger.info(f"Draft resumed - timer restarted with {pick_minutes}min deadline")
logger.info(
f"Draft resumed - timer restarted with {pick_minutes}min deadline"
)
else:
logger.error("Failed to resume draft")

View File

@ -4,6 +4,7 @@ Draft Sheet Service
Handles writing draft picks to Google Sheets for public tracking.
Extends SheetsService to reuse authentication and async patterns.
"""
import asyncio
from typing import List, Optional, Tuple
@ -24,7 +25,7 @@ class DraftSheetService(SheetsService):
If None, will use path from config
"""
super().__init__(credentials_path)
self.logger = get_contextual_logger(f'{__name__}.DraftSheetService')
self.logger = get_contextual_logger(f"{__name__}.DraftSheetService")
self._config = get_config()
async def write_pick(
@ -34,7 +35,7 @@ class DraftSheetService(SheetsService):
orig_owner_abbrev: str,
owner_abbrev: str,
player_name: str,
swar: float
swar: float,
) -> bool:
"""
Write a single draft pick to the season's draft sheet.
@ -68,23 +69,19 @@ class DraftSheetService(SheetsService):
return False
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get pygsheets client
sheets = await loop.run_in_executor(None, self._get_client)
# Open the draft sheet by key
spreadsheet = await loop.run_in_executor(
None,
sheets.open_by_key,
sheet_key
None, sheets.open_by_key, sheet_key
)
# Get the worksheet
worksheet = await loop.run_in_executor(
None,
spreadsheet.worksheet_by_title,
self._config.draft_sheet_worksheet
None, spreadsheet.worksheet_by_title, self._config.draft_sheet_worksheet
)
# Prepare pick data (4 columns: orig_owner, owner, player, swar)
@ -93,12 +90,12 @@ class DraftSheetService(SheetsService):
# Calculate row (overall + 1 to leave row 1 for headers)
row = overall + 1
start_column = self._config.draft_sheet_start_column
cell_range = f'{start_column}{row}'
cell_range = f"{start_column}{row}"
# Write the pick data
await loop.run_in_executor(
None,
lambda: worksheet.update_values(crange=cell_range, values=pick_data)
lambda: worksheet.update_values(crange=cell_range, values=pick_data),
)
self.logger.info(
@ -106,7 +103,7 @@ class DraftSheetService(SheetsService):
season=season,
overall=overall,
player=player_name,
owner=owner_abbrev
owner=owner_abbrev,
)
return True
@ -115,14 +112,12 @@ class DraftSheetService(SheetsService):
f"Failed to write pick to draft sheet: {e}",
season=season,
overall=overall,
player=player_name
player=player_name,
)
return False
async def write_picks_batch(
self,
season: int,
picks: List[Tuple[int, str, str, str, float]]
self, season: int, picks: List[Tuple[int, str, str, str, float]]
) -> Tuple[int, int]:
"""
Write multiple draft picks to the sheet in a single batch operation.
@ -151,23 +146,19 @@ class DraftSheetService(SheetsService):
return (0, 0)
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get pygsheets client
sheets = await loop.run_in_executor(None, self._get_client)
# Open the draft sheet by key
spreadsheet = await loop.run_in_executor(
None,
sheets.open_by_key,
sheet_key
None, sheets.open_by_key, sheet_key
)
# Get the worksheet
worksheet = await loop.run_in_executor(
None,
spreadsheet.worksheet_by_title,
self._config.draft_sheet_worksheet
None, spreadsheet.worksheet_by_title, self._config.draft_sheet_worksheet
)
# Sort picks by overall to find range bounds
@ -180,7 +171,7 @@ class DraftSheetService(SheetsService):
# Build a 2D array for the entire range (sparse - empty rows for missing picks)
# Row index 0 = min_overall, row index N = max_overall
num_rows = max_overall - min_overall + 1
batch_data: List[List[str]] = [['', '', '', ''] for _ in range(num_rows)]
batch_data: List[List[str]] = [["", "", "", ""] for _ in range(num_rows)]
# Populate the batch data array
for overall, orig_owner, owner, player_name, swar in sorted_picks:
@ -193,23 +184,23 @@ class DraftSheetService(SheetsService):
end_column = chr(ord(start_column) + 3) # 4 columns: D -> G
end_row = max_overall + 1
cell_range = f'{start_column}{start_row}:{end_column}{end_row}'
cell_range = f"{start_column}{start_row}:{end_column}{end_row}"
self.logger.info(
f"Writing {len(picks)} picks in single batch to range {cell_range}",
season=season
season=season,
)
# Write all picks in a single API call
await loop.run_in_executor(
None,
lambda: worksheet.update_values(crange=cell_range, values=batch_data)
lambda: worksheet.update_values(crange=cell_range, values=batch_data),
)
self.logger.info(
f"Batch write complete: {len(picks)} picks written successfully",
season=season,
total_picks=len(picks)
total_picks=len(picks),
)
return (len(picks), 0)
@ -218,10 +209,7 @@ class DraftSheetService(SheetsService):
return (0, len(picks))
async def clear_picks_range(
self,
season: int,
start_overall: int = 1,
end_overall: int = 512
self, season: int, start_overall: int = 1, end_overall: int = 512
) -> bool:
"""
Clear a range of picks from the draft sheet.
@ -246,23 +234,19 @@ class DraftSheetService(SheetsService):
return False
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get pygsheets client
sheets = await loop.run_in_executor(None, self._get_client)
# Open the draft sheet by key
spreadsheet = await loop.run_in_executor(
None,
sheets.open_by_key,
sheet_key
None, sheets.open_by_key, sheet_key
)
# Get the worksheet
worksheet = await loop.run_in_executor(
None,
spreadsheet.worksheet_by_title,
self._config.draft_sheet_worksheet
None, spreadsheet.worksheet_by_title, self._config.draft_sheet_worksheet
)
# Calculate range (4 columns: D through G)
@ -273,24 +257,23 @@ class DraftSheetService(SheetsService):
# Convert start column letter to end column (D -> G for 4 columns)
end_column = chr(ord(start_column) + 3)
cell_range = f'{start_column}{start_row}:{end_column}{end_row}'
cell_range = f"{start_column}{start_row}:{end_column}{end_row}"
# Clear the range by setting empty values
# We create a 2D array of empty strings
num_rows = end_row - start_row + 1
empty_data = [['', '', '', ''] for _ in range(num_rows)]
empty_data = [["", "", "", ""] for _ in range(num_rows)]
await loop.run_in_executor(
None,
lambda: worksheet.update_values(
crange=f'{start_column}{start_row}',
values=empty_data
)
crange=f"{start_column}{start_row}", values=empty_data
),
)
self.logger.info(
f"Cleared picks {start_overall}-{end_overall} from draft sheet",
season=season
season=season,
)
return True

View File

@ -3,6 +3,7 @@ Scorebug Service
Handles reading live game data from Google Sheets scorecards for real-time score displays.
"""
import asyncio
from typing import Dict, Any, Optional
import pygsheets
@ -16,30 +17,32 @@ class ScorebugData:
"""Data class for scorebug information."""
def __init__(self, data: Dict[str, Any]):
self.away_team_id = data.get('away_team_id', 1)
self.home_team_id = data.get('home_team_id', 1)
self.header = data.get('header', '')
self.away_score = data.get('away_score', 0)
self.home_score = data.get('home_score', 0)
self.which_half = data.get('which_half', '')
self.inning = data.get('inning', 1)
self.is_final = data.get('is_final', False)
self.outs = data.get('outs', 0)
self.win_percentage = data.get('win_percentage', 50.0)
self.away_team_id = data.get("away_team_id", 1)
self.home_team_id = data.get("home_team_id", 1)
self.header = data.get("header", "")
self.away_score = data.get("away_score", 0)
self.home_score = data.get("home_score", 0)
self.which_half = data.get("which_half", "")
self.inning = data.get("inning", 1)
self.is_final = data.get("is_final", False)
self.outs = data.get("outs", 0)
self.win_percentage = data.get("win_percentage", 50.0)
# Current matchup information
self.pitcher_name = data.get('pitcher_name', '')
self.pitcher_url = data.get('pitcher_url', '')
self.pitcher_stats = data.get('pitcher_stats', '')
self.batter_name = data.get('batter_name', '')
self.batter_url = data.get('batter_url', '')
self.batter_stats = data.get('batter_stats', '')
self.on_deck_name = data.get('on_deck_name', '')
self.in_hole_name = data.get('in_hole_name', '')
self.pitcher_name = data.get("pitcher_name", "")
self.pitcher_url = data.get("pitcher_url", "")
self.pitcher_stats = data.get("pitcher_stats", "")
self.batter_name = data.get("batter_name", "")
self.batter_url = data.get("batter_url", "")
self.batter_stats = data.get("batter_stats", "")
self.on_deck_name = data.get("on_deck_name", "")
self.in_hole_name = data.get("in_hole_name", "")
# Additional data
self.runners = data.get('runners', []) # [Catcher, On First, On Second, On Third]
self.summary = data.get('summary', []) # Play-by-play summary lines
self.runners = data.get(
"runners", []
) # [Catcher, On First, On Second, On Third]
self.summary = data.get("summary", []) # Play-by-play summary lines
@property
def score_line(self) -> str:
@ -79,12 +82,10 @@ class ScorebugService(SheetsService):
credentials_path: Path to service account credentials JSON
"""
super().__init__(credentials_path)
self.logger = get_contextual_logger(f'{__name__}.ScorebugService')
self.logger = get_contextual_logger(f"{__name__}.ScorebugService")
async def read_scorebug_data(
self,
sheet_url_or_key: str,
full_length: bool = True
self, sheet_url_or_key: str, full_length: bool = True
) -> ScorebugData:
"""
Read live scorebug data from Google Sheets scorecard.
@ -107,24 +108,28 @@ class ScorebugService(SheetsService):
scorecard = await self.open_scorecard(sheet_url_or_key)
self.logger.debug(f" ✅ Scorecard opened successfully")
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get Scorebug tab
scorebug_tab = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Scorebug'
None, scorecard.worksheet_by_title, "Scorebug"
)
# Read all data from B2:S20 for efficiency
all_data = await loop.run_in_executor(
None,
lambda: scorebug_tab.get_values('B2', 'S20', include_tailing_empty_rows=True)
lambda: scorebug_tab.get_values(
"B2", "S20", include_tailing_empty_rows=True
),
)
self.logger.debug(f"📊 Raw scorebug data dimensions: {len(all_data)} rows")
self.logger.debug(f"📊 First row length: {len(all_data[0]) if all_data else 0} columns")
self.logger.debug(f"📊 Reading from range B2:S20 (columns B-S = indices 0-17 in data)")
self.logger.debug(
f"📊 First row length: {len(all_data[0]) if all_data else 0} columns"
)
self.logger.debug(
f"📊 Reading from range B2:S20 (columns B-S = indices 0-17 in data)"
)
self.logger.debug(f"📊 Raw data structure (all rows):")
for idx, row in enumerate(all_data):
self.logger.debug(f" Row {idx} (Sheet row {idx + 2}): {row}")
@ -133,8 +138,13 @@ class ScorebugService(SheetsService):
# This corresponds to columns B-G (indices 0-5 in all_data)
# Rows 2-8 in sheet (indices 0-6 in all_data)
game_state = [
all_data[0][:6], all_data[1][:6], all_data[2][:6], all_data[3][:6],
all_data[4][:6], all_data[5][:6], all_data[6][:6]
all_data[0][:6],
all_data[1][:6],
all_data[2][:6],
all_data[3][:6],
all_data[4][:6],
all_data[5][:6],
all_data[6][:6],
]
self.logger.debug(f"🎮 Extracted game_state (B2:G8):")
@ -145,12 +155,24 @@ class ScorebugService(SheetsService):
# game_state[3] is away team row (Sheet row 5), game_state[4] is home team row (Sheet row 6)
# First column (index 0) contains the team ID - this is column B in the sheet
self.logger.debug(f"🏟️ Extracting team IDs from game_state:")
self.logger.debug(f" Away team row: game_state[3] = Sheet row 5, column B (index 0)")
self.logger.debug(f" Home team row: game_state[4] = Sheet row 6, column B (index 0)")
self.logger.debug(
f" Away team row: game_state[3] = Sheet row 5, column B (index 0)"
)
self.logger.debug(
f" Home team row: game_state[4] = Sheet row 6, column B (index 0)"
)
try:
away_team_id_raw = game_state[3][0] if len(game_state) > 3 and len(game_state[3]) > 0 else None
home_team_id_raw = game_state[4][0] if len(game_state) > 4 and len(game_state[4]) > 0 else None
away_team_id_raw = (
game_state[3][0]
if len(game_state) > 3 and len(game_state[3]) > 0
else None
)
home_team_id_raw = (
game_state[4][0]
if len(game_state) > 4 and len(game_state[4]) > 0
else None
)
self.logger.debug(f" Raw away team ID value: '{away_team_id_raw}'")
self.logger.debug(f" Raw home team ID value: '{home_team_id_raw}'")
@ -158,61 +180,97 @@ class ScorebugService(SheetsService):
away_team_id = int(away_team_id_raw) if away_team_id_raw else None
home_team_id = int(home_team_id_raw) if home_team_id_raw else None
self.logger.debug(f" ✅ Parsed team IDs - Away: {away_team_id}, Home: {home_team_id}")
self.logger.debug(
f" ✅ Parsed team IDs - Away: {away_team_id}, Home: {home_team_id}"
)
if away_team_id is None or home_team_id is None:
raise ValueError(f'Team IDs not found in scorebug (away: {away_team_id}, home: {home_team_id})')
raise ValueError(
f"Team IDs not found in scorebug (away: {away_team_id}, home: {home_team_id})"
)
except (ValueError, IndexError) as e:
self.logger.error(f"❌ Failed to parse team IDs from scorebug: {e}")
raise ValueError(f'Could not extract team IDs from scorecard')
raise ValueError(f"Could not extract team IDs from scorecard")
# Parse game state
self.logger.debug(f"📝 Parsing header from game_state[0][0] (Sheet B2):")
header = game_state[0][0] if game_state[0] else ''
is_final = header[-5:] == 'FINAL' if header else False
header = game_state[0][0] if game_state[0] else ""
is_final = header[-5:] == "FINAL" if header else False
self.logger.debug(f" Header value: '{header}'")
self.logger.debug(f" Is Final: {is_final}")
# Parse scores with validation
self.logger.debug(f"⚾ Parsing scores:")
self.logger.debug(f" Away score: game_state[3][2] (Sheet row 5, column D)")
self.logger.debug(f" Home score: game_state[4][2] (Sheet row 6, column D)")
self.logger.debug(
f" Away score: game_state[3][2] (Sheet row 5, column D)"
)
self.logger.debug(
f" Home score: game_state[4][2] (Sheet row 6, column D)"
)
try:
away_score_raw = game_state[3][2] if len(game_state) > 3 and len(game_state[3]) > 2 else '0'
self.logger.debug(f" Raw away score value: '{away_score_raw}' (type: {type(away_score_raw).__name__})")
away_score = int(away_score_raw) if away_score_raw != '' else 0
away_score_raw = (
game_state[3][2]
if len(game_state) > 3 and len(game_state[3]) > 2
else "0"
)
self.logger.debug(
f" Raw away score value: '{away_score_raw}' (type: {type(away_score_raw).__name__})"
)
away_score = int(away_score_raw) if away_score_raw != "" else 0
self.logger.debug(f" ✅ Parsed away score: {away_score}")
except (ValueError, IndexError) as e:
self.logger.warning(f" ⚠️ Failed to parse away score: {e}")
away_score = 0
try:
home_score_raw = game_state[4][2] if len(game_state) > 4 and len(game_state[4]) > 2 else '0'
self.logger.debug(f" Raw home score value: '{home_score_raw}' (type: {type(home_score_raw).__name__})")
home_score = int(home_score_raw) if home_score_raw != '' else 0
home_score_raw = (
game_state[4][2]
if len(game_state) > 4 and len(game_state[4]) > 2
else "0"
)
self.logger.debug(
f" Raw home score value: '{home_score_raw}' (type: {type(home_score_raw).__name__})"
)
home_score = int(home_score_raw) if home_score_raw != "" else 0
self.logger.debug(f" ✅ Parsed home score: {home_score}")
except (ValueError, IndexError) as e:
self.logger.warning(f" ⚠️ Failed to parse home score: {e}")
home_score = 0
try:
inning_raw = game_state[3][5] if len(game_state) > 3 and len(game_state[3]) > 5 else '0'
self.logger.debug(f" Raw inning value: '{inning_raw}' (type: {type(inning_raw).__name__})")
inning = int(inning_raw) if inning_raw != '' else 1
inning_raw = (
game_state[3][5]
if len(game_state) > 3 and len(game_state[3]) > 5
else "0"
)
self.logger.debug(
f" Raw inning value: '{inning_raw}' (type: {type(inning_raw).__name__})"
)
inning = int(inning_raw) if inning_raw != "" else 1
self.logger.debug(f" ✅ Parsed inning: {inning}")
except (ValueError, IndexError) as e:
self.logger.warning(f" ⚠️ Failed to parse home score: {e}")
inning = 1
self.logger.debug(f"⏱️ Parsing game state from game_state[3][4] (Sheet row 5, column F):")
which_half = game_state[3][4] if len(game_state) > 3 and len(game_state[3]) > 4 else ''
self.logger.debug(
f"⏱️ Parsing game state from game_state[3][4] (Sheet row 5, column F):"
)
which_half = (
game_state[3][4]
if len(game_state) > 3 and len(game_state[3]) > 4
else ""
)
self.logger.debug(f" Which half value: '{which_half}'")
# Parse outs from all_data[4][4] (Sheet F6 - columns start at B, so F=index 4)
self.logger.debug(f"🔢 Parsing outs from F6 (all_data[4][4]):")
try:
outs_raw = all_data[4][4] if len(all_data) > 4 and len(all_data[4]) > 4 else '0'
outs_raw = (
all_data[4][4]
if len(all_data) > 4 and len(all_data[4]) > 4
else "0"
)
self.logger.debug(f" Raw outs value: '{outs_raw}'")
# Handle "2" or any number
outs = int(outs_raw) if outs_raw and str(outs_raw).strip() else 0
@ -232,34 +290,42 @@ class ScorebugService(SheetsService):
]
# Pitcher: matchups[0][0]=name, [1]=URL, [2]=stats
pitcher_name = matchups[0][0] if len(matchups[0]) > 0 else ''
pitcher_url = matchups[0][1] if len(matchups[0]) > 1 else ''
pitcher_stats = matchups[0][2] if len(matchups[0]) > 2 else ''
self.logger.debug(f" Pitcher: {pitcher_name} | {pitcher_stats} | {pitcher_url}")
pitcher_name = matchups[0][0] if len(matchups[0]) > 0 else ""
pitcher_url = matchups[0][1] if len(matchups[0]) > 1 else ""
pitcher_stats = matchups[0][2] if len(matchups[0]) > 2 else ""
self.logger.debug(
f" Pitcher: {pitcher_name} | {pitcher_stats} | {pitcher_url}"
)
# Batter: matchups[1][0]=name, [1]=URL, [2]=stats, [3]=order, [4]=position
batter_name = matchups[1][0] if len(matchups[1]) > 0 else ''
batter_url = matchups[1][1] if len(matchups[1]) > 1 else ''
batter_stats = matchups[1][2] if len(matchups[1]) > 2 else ''
self.logger.debug(f" Batter: {batter_name} | {batter_stats} | {batter_url}")
batter_name = matchups[1][0] if len(matchups[1]) > 0 else ""
batter_url = matchups[1][1] if len(matchups[1]) > 1 else ""
batter_stats = matchups[1][2] if len(matchups[1]) > 2 else ""
self.logger.debug(
f" Batter: {batter_name} | {batter_stats} | {batter_url}"
)
# On Deck: matchups[2][0]=name
on_deck_name = matchups[2][0] if len(matchups[2]) > 0 else ''
on_deck_url = matchups[2][1] if len(matchups[2]) > 1 else ''
on_deck_name = matchups[2][0] if len(matchups[2]) > 0 else ""
on_deck_url = matchups[2][1] if len(matchups[2]) > 1 else ""
self.logger.debug(f" On Deck: {on_deck_name}")
# In Hole: matchups[3][0]=name
in_hole_name = matchups[3][0] if len(matchups[3]) > 0 else ''
in_hole_url = matchups[3][1] if len(matchups[3]) > 1 else ''
in_hole_name = matchups[3][0] if len(matchups[3]) > 0 else ""
in_hole_url = matchups[3][1] if len(matchups[3]) > 1 else ""
self.logger.debug(f" In Hole: {in_hole_name}")
# Parse win percentage from all_data[6][2] (Sheet D8 - row 8, column D)
self.logger.debug(f"📈 Parsing win percentage from D8 (all_data[6][2]):")
try:
win_pct_raw = all_data[6][2] if len(all_data) > 6 and len(all_data[6]) > 2 else '50%'
win_pct_raw = (
all_data[6][2]
if len(all_data) > 6 and len(all_data[6]) > 2
else "50%"
)
self.logger.debug(f" Raw win percentage value: '{win_pct_raw}'")
# Remove % sign if present and convert to float
win_pct_str = str(win_pct_raw).replace('%', '').strip()
win_pct_str = str(win_pct_raw).replace("%", "").strip()
win_percentage = float(win_pct_str) if win_pct_str else 50.0
self.logger.debug(f" ✅ Parsed win percentage: {win_percentage}%")
except (ValueError, IndexError, AttributeError) as e:
@ -284,7 +350,7 @@ class ScorebugService(SheetsService):
all_data[9][9:11] if len(all_data) > 9 else [], # Catcher (row 11)
all_data[10][9:11] if len(all_data) > 10 else [], # On First (row 12)
all_data[11][9:11] if len(all_data) > 11 else [], # On Second (row 13)
all_data[12][9:11] if len(all_data) > 12 else [] # On Third (row 14)
all_data[12][9:11] if len(all_data) > 12 else [], # On Third (row 14)
]
self.logger.debug(f" Catcher: {runners[0]}")
self.logger.debug(f" On First: {runners[1]}")
@ -308,28 +374,30 @@ class ScorebugService(SheetsService):
self.logger.debug(f"✅ Scorebug data extraction complete!")
scorebug_data = ScorebugData({
'away_team_id': away_team_id,
'home_team_id': home_team_id,
'header': header,
'away_score': away_score,
'home_score': home_score,
'which_half': which_half,
'inning': inning,
'is_final': is_final,
'outs': outs,
'win_percentage': win_percentage,
'pitcher_name': pitcher_name,
'pitcher_url': pitcher_url,
'pitcher_stats': pitcher_stats,
'batter_name': batter_name,
'batter_url': batter_url,
'batter_stats': batter_stats,
'on_deck_name': on_deck_name,
'in_hole_name': in_hole_name,
'runners': runners, # [Catcher, On First, On Second, On Third], each is [name, URL]
'summary': summary # Play-by-play lines from R3:S20
})
scorebug_data = ScorebugData(
{
"away_team_id": away_team_id,
"home_team_id": home_team_id,
"header": header,
"away_score": away_score,
"home_score": home_score,
"which_half": which_half,
"inning": inning,
"is_final": is_final,
"outs": outs,
"win_percentage": win_percentage,
"pitcher_name": pitcher_name,
"pitcher_url": pitcher_url,
"pitcher_stats": pitcher_stats,
"batter_name": batter_name,
"batter_url": batter_url,
"batter_stats": batter_stats,
"on_deck_name": on_deck_name,
"in_hole_name": in_hole_name,
"runners": runners, # [Catcher, On First, On Second, On Third], each is [name, URL]
"summary": summary, # Play-by-play lines from R3:S20
}
)
self.logger.debug(f"🎯 Created ScorebugData object:")
self.logger.debug(f" Away Team ID: {scorebug_data.away_team_id}")

View File

@ -3,6 +3,7 @@ Google Sheets Service
Handles reading data from Google Sheets scorecards for game submission.
"""
import asyncio
from typing import Dict, List, Any, Optional
import pygsheets
@ -24,10 +25,11 @@ class SheetsService:
"""
if credentials_path is None:
from config import get_config
credentials_path = get_config().sheets_credentials_path
self.credentials_path = credentials_path
self.logger = get_contextual_logger(f'{__name__}.SheetsService')
self.logger = get_contextual_logger(f"{__name__}.SheetsService")
self._sheets_client = None
def _get_client(self) -> pygsheets.client.Client:
@ -53,7 +55,16 @@ class SheetsService:
return False
# Common spreadsheet errors
error_values = ['#N/A', '#REF!', '#VALUE!', '#DIV/0!', '#NUM!', '#NAME?', '#NULL!', '#ERROR!']
error_values = [
"#N/A",
"#REF!",
"#VALUE!",
"#DIV/0!",
"#NUM!",
"#NAME?",
"#NULL!",
"#ERROR!",
]
return value.strip() in error_values
@staticmethod
@ -68,7 +79,7 @@ class SheetsService:
Returns:
Integer value or None if invalid
"""
if value is None or value == '':
if value is None or value == "":
return None
# Check for spreadsheet errors
@ -96,16 +107,9 @@ class SheetsService:
"""
try:
# Run in thread pool since pygsheets is synchronous
loop = asyncio.get_event_loop()
sheets = await loop.run_in_executor(
None,
self._get_client
)
scorecard = await loop.run_in_executor(
None,
sheets.open_by_url,
sheet_url
)
loop = asyncio.get_running_loop()
sheets = await loop.run_in_executor(None, self._get_client)
scorecard = await loop.run_in_executor(None, sheets.open_by_url, sheet_url)
self.logger.info(f"Opened scorecard: {scorecard.title}")
return scorecard
@ -116,10 +120,7 @@ class SheetsService:
"Unable to access scorecard. Is it publicly readable?"
) from e
async def read_setup_data(
self,
scorecard: pygsheets.Spreadsheet
) -> Dict[str, Any]:
async def read_setup_data(self, scorecard: pygsheets.Spreadsheet) -> Dict[str, Any]:
"""
Read game metadata from Setup tab.
@ -138,38 +139,27 @@ class SheetsService:
- home_manager_name: str
"""
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get Setup tab
setup_tab = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Setup'
None, scorecard.worksheet_by_title, "Setup"
)
# Read version
version = await loop.run_in_executor(
None,
setup_tab.get_value,
'V35'
)
version = await loop.run_in_executor(None, setup_tab.get_value, "V35")
# Read game data (C3:D7)
g_data = await loop.run_in_executor(
None,
setup_tab.get_values,
'C3',
'D7'
)
g_data = await loop.run_in_executor(None, setup_tab.get_values, "C3", "D7")
return {
'version': version,
'week': int(g_data[1][0]),
'game_num': int(g_data[2][0]),
'away_team_abbrev': g_data[3][0],
'home_team_abbrev': g_data[4][0],
'away_manager_name': g_data[3][1],
'home_manager_name': g_data[4][1]
"version": version,
"week": int(g_data[1][0]),
"game_num": int(g_data[2][0]),
"away_team_abbrev": g_data[3][0],
"home_team_abbrev": g_data[4][0],
"away_manager_name": g_data[3][1],
"home_manager_name": g_data[4][1],
}
except Exception as e:
@ -177,8 +167,7 @@ class SheetsService:
raise SheetsException("Unable to read game setup data") from e
async def read_playtable_data(
self,
scorecard: pygsheets.Spreadsheet
self, scorecard: pygsheets.Spreadsheet
) -> List[Dict[str, Any]]:
"""
Read all plays from Playtable tab.
@ -190,49 +179,101 @@ class SheetsService:
List of play dictionaries with field names mapped
"""
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get Playtable tab
playtable = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Playtable'
None, scorecard.worksheet_by_title, "Playtable"
)
# Read play data
all_plays = await loop.run_in_executor(
None,
playtable.get_values,
'B3',
'BW300'
None, playtable.get_values, "B3", "BW300"
)
# Field names in order (from old bot lines 1621-1632)
play_keys = [
'play_num', 'batter_id', 'batter_pos', 'pitcher_id',
'on_base_code', 'inning_half', 'inning_num', 'batting_order',
'starting_outs', 'away_score', 'home_score', 'on_first_id',
'on_first_final', 'on_second_id', 'on_second_final',
'on_third_id', 'on_third_final', 'batter_final', 'pa', 'ab',
'run', 'e_run', 'hit', 'rbi', 'double', 'triple', 'homerun',
'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'bphr', 'bpfo',
'bp1b', 'bplo', 'sb', 'cs', 'outs', 'pitcher_rest_outs',
'wpa', 'catcher_id', 'defender_id', 'runner_id', 'check_pos',
'error', 'wild_pitch', 'passed_ball', 'pick_off', 'balk',
'is_go_ahead', 'is_tied', 'is_new_inning', 'inherited_runners',
'inherited_scored', 'on_hook_for_loss', 'run_differential',
'unused-manager', 'unused-pitcherpow', 'unused-pitcherrestip',
'unused-runners', 'unused-fatigue', 'unused-roundedip',
'unused-elitestart', 'unused-scenario', 'unused-winxaway',
'unused-winxhome', 'unused-pinchrunner', 'unused-order',
'hand_batting', 'hand_pitching', 're24_primary', 're24_running'
"play_num",
"batter_id",
"batter_pos",
"pitcher_id",
"on_base_code",
"inning_half",
"inning_num",
"batting_order",
"starting_outs",
"away_score",
"home_score",
"on_first_id",
"on_first_final",
"on_second_id",
"on_second_final",
"on_third_id",
"on_third_final",
"batter_final",
"pa",
"ab",
"run",
"e_run",
"hit",
"rbi",
"double",
"triple",
"homerun",
"bb",
"so",
"hbp",
"sac",
"ibb",
"gidp",
"bphr",
"bpfo",
"bp1b",
"bplo",
"sb",
"cs",
"outs",
"pitcher_rest_outs",
"wpa",
"catcher_id",
"defender_id",
"runner_id",
"check_pos",
"error",
"wild_pitch",
"passed_ball",
"pick_off",
"balk",
"is_go_ahead",
"is_tied",
"is_new_inning",
"inherited_runners",
"inherited_scored",
"on_hook_for_loss",
"run_differential",
"unused-manager",
"unused-pitcherpow",
"unused-pitcherrestip",
"unused-runners",
"unused-fatigue",
"unused-roundedip",
"unused-elitestart",
"unused-scenario",
"unused-winxaway",
"unused-winxhome",
"unused-pinchrunner",
"unused-order",
"hand_batting",
"hand_pitching",
"re24_primary",
"re24_running",
]
p_data = []
for line in all_plays:
this_data = {}
for count, value in enumerate(line):
if value != '' and count < len(play_keys):
if value != "" and count < len(play_keys):
this_data[play_keys[count]] = value
# Only include rows with meaningful data (>5 fields)
@ -247,8 +288,7 @@ class SheetsService:
raise SheetsException("Unable to read play-by-play data") from e
async def read_pitching_decisions(
self,
scorecard: pygsheets.Spreadsheet
self, scorecard: pygsheets.Spreadsheet
) -> List[Dict[str, Any]]:
"""
Read pitching decisions from Pitcherstats tab.
@ -260,37 +300,51 @@ class SheetsService:
List of decision dictionaries with field names mapped
"""
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Get Pitcherstats tab
pitching = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Pitcherstats'
None, scorecard.worksheet_by_title, "Pitcherstats"
)
# Read decision data
all_decisions = await loop.run_in_executor(
None,
pitching.get_values,
'B3',
'O30'
None, pitching.get_values, "B3", "O30"
)
# Field names in order (from old bot lines 1688-1691)
pit_keys = [
'pitcher_id', 'rest_ip', 'is_start', 'base_rest',
'extra_rest', 'rest_required', 'win', 'loss', 'is_save',
'hold', 'b_save', 'irunners', 'irunners_scored', 'team_id'
"pitcher_id",
"rest_ip",
"is_start",
"base_rest",
"extra_rest",
"rest_required",
"win",
"loss",
"is_save",
"hold",
"b_save",
"irunners",
"irunners_scored",
"team_id",
]
# Fields that must be integers
int_fields = {
'pitcher_id', 'rest_required', 'win', 'loss', 'is_save',
'hold', 'b_save', 'irunners', 'irunners_scored', 'team_id'
"pitcher_id",
"rest_required",
"win",
"loss",
"is_save",
"hold",
"b_save",
"irunners",
"irunners_scored",
"team_id",
}
# Fields that are required and cannot be None
required_fields = {'pitcher_id', 'team_id'}
required_fields = {"pitcher_id", "team_id"}
pit_data = []
row_num = 3 # Start at row 3 (B3 in spreadsheet)
@ -310,7 +364,7 @@ class SheetsService:
field_name = pit_keys[count]
# Skip empty values
if value == '':
if value == "":
continue
# Check for spreadsheet errors
@ -332,7 +386,7 @@ class SheetsService:
# Sanitize integer fields
if field_name in int_fields:
sanitized = self._sanitize_int_field(value, field_name)
if sanitized is None and value != '':
if sanitized is None and value != "":
self.logger.warning(
f"Row {row_num}: Invalid integer value '{value}' for field '{field_name}' - skipping row"
)
@ -367,8 +421,7 @@ class SheetsService:
raise SheetsException("Unable to read pitching decisions") from e
async def read_box_score(
self,
scorecard: pygsheets.Spreadsheet
self, scorecard: pygsheets.Spreadsheet
) -> Dict[str, List[int]]:
"""
Read box score from Scorecard or Box Score tab.
@ -381,38 +434,28 @@ class SheetsService:
[runs, hits, errors]
"""
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
# Try Scorecard tab first
try:
sc_tab = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Scorecard'
None, scorecard.worksheet_by_title, "Scorecard"
)
score_table = await loop.run_in_executor(
None,
sc_tab.get_values,
'BW8',
'BY9'
None, sc_tab.get_values, "BW8", "BY9"
)
except pygsheets.WorksheetNotFound:
# Fallback to Box Score tab
sc_tab = await loop.run_in_executor(
None,
scorecard.worksheet_by_title,
'Box Score'
None, scorecard.worksheet_by_title, "Box Score"
)
score_table = await loop.run_in_executor(
None,
sc_tab.get_values,
'T6',
'V7'
None, sc_tab.get_values, "T6", "V7"
)
return {
'away': [int(x) for x in score_table[0]], # [R, H, E]
'home': [int(x) for x in score_table[1]] # [R, H, E]
"away": [int(x) for x in score_table[0]], # [R, H, E]
"home": [int(x) for x in score_table[1]], # [R, H, E]
}
except Exception as e:

View File

@ -4,7 +4,8 @@ Draft Monitor Task for Discord Bot v2.0
Automated background task for draft timer monitoring, warnings, and auto-draft.
Self-terminates when draft timer is disabled to conserve resources.
"""
from datetime import datetime
from datetime import UTC, datetime
import discord
from discord.ext import commands, tasks
@ -34,7 +35,7 @@ class DraftMonitorTask:
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.DraftMonitorTask')
self.logger = get_contextual_logger(f"{__name__}.DraftMonitorTask")
# Warning flags (reset each pick)
self.warning_60s_sent = False
@ -101,7 +102,7 @@ class DraftMonitorTask:
return
# Check if we need to take action
now = datetime.now()
now = datetime.now(UTC)
deadline = draft_data.pick_deadline
if not deadline:
@ -115,7 +116,9 @@ class DraftMonitorTask:
new_interval = self._get_poll_interval(time_remaining)
if self.monitor_loop.seconds != new_interval:
self.monitor_loop.change_interval(seconds=new_interval)
self.logger.debug(f"Adjusted poll interval to {new_interval}s (time remaining: {time_remaining:.0f}s)")
self.logger.debug(
f"Adjusted poll interval to {new_interval}s (time remaining: {time_remaining:.0f}s)"
)
if time_remaining <= 0:
# Timer expired - auto-draft
@ -150,8 +153,7 @@ class DraftMonitorTask:
# Get current pick
current_pick = await draft_pick_service.get_pick(
config.sba_season,
draft_data.currentpick
config.sba_season, draft_data.currentpick
)
if not current_pick or not current_pick.owner:
@ -159,7 +161,7 @@ class DraftMonitorTask:
return
# Get draft picks cog to check/acquire lock
draft_picks_cog = self.bot.get_cog('DraftPicksCog')
draft_picks_cog = self.bot.get_cog("DraftPicksCog")
if not draft_picks_cog:
self.logger.error("Could not find DraftPicksCog")
@ -172,7 +174,7 @@ class DraftMonitorTask:
# Acquire lock
async with draft_picks_cog.pick_lock:
draft_picks_cog.lock_acquired_at = datetime.now()
draft_picks_cog.lock_acquired_at = datetime.now(UTC)
draft_picks_cog.lock_acquired_by = None # System auto-draft
try:
@ -199,17 +201,20 @@ class DraftMonitorTask:
# Get ping channel
ping_channel = guild.get_channel(draft_data.ping_channel)
if not ping_channel:
self.logger.error(f"Could not find ping channel {draft_data.ping_channel}")
self.logger.error(
f"Could not find ping channel {draft_data.ping_channel}"
)
return
# Get team's draft list
draft_list = await draft_list_service.get_team_list(
config.sba_season,
current_pick.owner.id
config.sba_season, current_pick.owner.id
)
if not draft_list:
self.logger.warning(f"Team {current_pick.owner.abbrev} has no draft list")
self.logger.warning(
f"Team {current_pick.owner.abbrev} has no draft list"
)
await ping_channel.send(
content=f"{current_pick.owner.abbrev} time expired with no draft list - pick skipped"
)
@ -247,11 +252,7 @@ class DraftMonitorTask:
# Attempt to draft this player
success = await self._attempt_draft_player(
current_pick,
player,
ping_channel,
draft_data,
guild
current_pick, player, ping_channel, draft_data, guild
)
if success:
@ -259,7 +260,9 @@ class DraftMonitorTask:
f"Auto-drafted {player.name} for {current_pick.owner.abbrev}"
)
# Advance to next pick
await draft_service.advance_pick(draft_data.id, draft_data.currentpick)
await draft_service.advance_pick(
draft_data.id, draft_data.currentpick
)
# Post on-clock announcement for next team
await self._post_on_clock_announcement(ping_channel, draft_data)
# Reset warning flags
@ -284,12 +287,7 @@ class DraftMonitorTask:
self.logger.error("Error auto-drafting player", error=e)
async def _attempt_draft_player(
self,
draft_pick,
player,
ping_channel,
draft_data,
guild
self, draft_pick, player, ping_channel, draft_data, guild
) -> bool:
"""
Attempt to draft a specific player.
@ -309,14 +307,18 @@ class DraftMonitorTask:
from services.team_service import team_service
# Get team roster for cap validation
roster = await team_service.get_team_roster(draft_pick.owner.id, 'current')
roster = await team_service.get_team_roster(draft_pick.owner.id, "current")
if not roster:
self.logger.error(f"Could not get roster for team {draft_pick.owner.id}")
self.logger.error(
f"Could not get roster for team {draft_pick.owner.id}"
)
return False
# Validate cap space
is_valid, projected_total, cap_limit = await validate_cap_space(roster, player.wara)
is_valid, projected_total, cap_limit = await validate_cap_space(
roster, player.wara
)
if not is_valid:
self.logger.debug(
@ -327,8 +329,7 @@ class DraftMonitorTask:
# Update draft pick
updated_pick = await draft_pick_service.update_pick_selection(
draft_pick.id,
player.id
draft_pick.id, player.id
)
if not updated_pick:
@ -338,13 +339,14 @@ class DraftMonitorTask:
# Get current league state for dem_week calculation
from services.player_service import player_service
from services.league_service import league_service
current = await league_service.get_current_state()
# Update player team with dem_week set to current.week + 2 for draft picks
updated_player = await player_service.update_player_team(
player.id,
draft_pick.owner.id,
dem_week=current.week + 2 if current else None
dem_week=current.week + 2 if current else None,
)
if not updated_player:
@ -365,11 +367,14 @@ class DraftMonitorTask:
result_channel = guild.get_channel(draft_data.result_channel)
if result_channel:
from views.draft_views import create_player_draft_card
draft_card = await create_player_draft_card(player, draft_pick)
draft_card.set_footer(text="🤖 Auto-drafted from draft list")
await result_channel.send(embed=draft_card)
else:
self.logger.warning(f"Could not find result channel {draft_data.result_channel}")
self.logger.warning(
f"Could not find result channel {draft_data.result_channel}"
)
return True
@ -403,23 +408,26 @@ class DraftMonitorTask:
# Get the new current pick
next_pick = await draft_pick_service.get_pick(
config.sba_season,
updated_draft_data.currentpick
config.sba_season, updated_draft_data.currentpick
)
if not next_pick or not next_pick.owner:
self.logger.error(f"Could not get pick #{updated_draft_data.currentpick} for announcement")
self.logger.error(
f"Could not get pick #{updated_draft_data.currentpick} for announcement"
)
return
# Get recent picks (last 5 completed)
recent_picks = await draft_pick_service.get_recent_picks(
config.sba_season,
updated_draft_data.currentpick - 1, # Start from previous pick
limit=5
limit=5,
)
# Get team roster for sWAR calculation
team_roster = await roster_service.get_team_roster(next_pick.owner.id, "current")
team_roster = await roster_service.get_team_roster(
next_pick.owner.id, "current"
)
roster_swar = team_roster.total_wara if team_roster else 0.0
cap_limit = get_team_salary_cap(next_pick.owner)
@ -427,7 +435,9 @@ class DraftMonitorTask:
top_roster_players = []
if team_roster:
all_players = team_roster.all_players
sorted_players = sorted(all_players, key=lambda p: p.wara if p.wara else 0.0, reverse=True)
sorted_players = sorted(
all_players, key=lambda p: p.wara if p.wara else 0.0, reverse=True
)
top_roster_players = sorted_players[:5]
# Get sheet URL
@ -441,7 +451,7 @@ class DraftMonitorTask:
roster_swar=roster_swar,
cap_limit=cap_limit,
top_roster_players=top_roster_players,
sheet_url=sheet_url
sheet_url=sheet_url,
)
# Mention the team's role (using team.lname)
@ -450,10 +460,14 @@ class DraftMonitorTask:
if team_role:
team_mention = f"{team_role.mention} "
else:
self.logger.warning(f"Could not find role for team {next_pick.owner.lname}")
self.logger.warning(
f"Could not find role for team {next_pick.owner.lname}"
)
await ping_channel.send(content=team_mention, embed=embed)
self.logger.info(f"Posted on-clock announcement for pick #{updated_draft_data.currentpick}")
self.logger.info(
f"Posted on-clock announcement for pick #{updated_draft_data.currentpick}"
)
# Reset poll interval to 30s for new pick
if self.monitor_loop.seconds != 30:
@ -484,8 +498,7 @@ class DraftMonitorTask:
# Get current pick for mention
current_pick = await draft_pick_service.get_pick(
config.sba_season,
draft_data.currentpick
config.sba_season, draft_data.currentpick
)
if not current_pick or not current_pick.owner:
@ -535,10 +548,14 @@ class DraftMonitorTask:
success = await draft_sheet_service.write_pick(
season=config.sba_season,
overall=draft_pick.overall,
orig_owner_abbrev=draft_pick.origowner.abbrev if draft_pick.origowner else draft_pick.owner.abbrev,
orig_owner_abbrev=(
draft_pick.origowner.abbrev
if draft_pick.origowner
else draft_pick.owner.abbrev
),
owner_abbrev=draft_pick.owner.abbrev,
player_name=player.name,
swar=player.wara
swar=player.wara,
)
if not success:
@ -546,7 +563,7 @@ class DraftMonitorTask:
await self._notify_sheet_failure(
ping_channel=ping_channel,
pick_overall=draft_pick.overall,
player_name=player.name
player_name=player.name,
)
except Exception as e:
@ -554,10 +571,12 @@ class DraftMonitorTask:
await self._notify_sheet_failure(
ping_channel=ping_channel,
pick_overall=draft_pick.overall,
player_name=player.name
player_name=player.name,
)
async def _notify_sheet_failure(self, ping_channel, pick_overall: int, player_name: str) -> None:
async def _notify_sheet_failure(
self, ping_channel, pick_overall: int, player_name: str
) -> None:
"""
Post notification to ping channel when sheet write fails.

View File

@ -325,7 +325,7 @@ class TransactionFreezeTask:
self.logger.warning("Could not get current league state")
return
now = datetime.now()
now = datetime.now(UTC)
self.logger.info(
f"Weekly loop check",
datetime=now.isoformat(),
@ -701,10 +701,10 @@ class TransactionFreezeTask:
# Build report entry
if winning_moves:
first_move = winning_moves[0]
# Extract timestamp from moveid (format: Season-XXX-Week-XX-DD-HH:MM:SS)
# Extract timestamp from moveid (format: Season-{season:03d}-Week-{week:02d}-{unix_timestamp})
try:
parts = winning_move_id.split("-")
submitted_at = parts[-1] if len(parts) >= 6 else "Unknown"
submitted_at = parts[-1] if len(parts) >= 4 else "Unknown"
except Exception:
submitted_at = "Unknown"

View File

@ -3,6 +3,7 @@ Tests for configuration management
Ensures configuration loading, validation, and environment handling work correctly.
"""
import os
import pytest
from unittest.mock import patch
@ -15,26 +16,33 @@ class TestBotConfig:
def test_config_loads_required_fields(self):
"""Test that config loads all required fields from environment."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
):
config = BotConfig()
assert config.bot_token == 'test_bot_token'
assert config.bot_token == "test_bot_token"
assert config.guild_id == 123456789
assert config.api_token == 'test_api_token'
assert config.db_url == 'https://api.example.com'
assert config.api_token == "test_api_token"
assert config.db_url == "https://api.example.com"
def test_config_has_default_values(self):
"""Test that config provides sensible defaults."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}, clear=True):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
clear=True,
):
# Create config with disabled env file to test true defaults
config = BotConfig(_env_file=None)
assert config.sba_season == 13
@ -43,20 +51,23 @@ class TestBotConfig:
assert config.sba_color == "a6ce39"
assert config.log_level == "INFO"
assert config.environment == "development"
assert config.testing is True
assert config.testing is False
def test_config_overrides_defaults_from_env(self):
"""Test that environment variables override default values."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'SBA_SEASON': '15',
'LOG_LEVEL': 'DEBUG',
'ENVIRONMENT': 'production',
'TESTING': 'true'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"SBA_SEASON": "15",
"LOG_LEVEL": "DEBUG",
"ENVIRONMENT": "production",
"TESTING": "true",
},
):
config = BotConfig()
assert config.sba_season == 15
assert config.log_level == "DEBUG"
@ -65,111 +76,138 @@ class TestBotConfig:
def test_config_ignores_extra_env_vars(self):
"""Test that extra environment variables are ignored."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'RANDOM_EXTRA_VAR': 'should_be_ignored',
'ANOTHER_RANDOM_VAR': 'also_ignored'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"RANDOM_EXTRA_VAR": "should_be_ignored",
"ANOTHER_RANDOM_VAR": "also_ignored",
},
):
# Should not raise validation error
config = BotConfig()
assert config.bot_token == 'test_bot_token'
assert config.bot_token == "test_bot_token"
# Extra vars should not be accessible
assert not hasattr(config, 'random_extra_var')
assert not hasattr(config, 'another_random_var')
assert not hasattr(config, "random_extra_var")
assert not hasattr(config, "another_random_var")
def test_config_converts_string_to_int(self):
"""Test that guild_id is properly converted from string to int."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '987654321', # String input
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "987654321", # String input
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
):
config = BotConfig()
assert config.guild_id == 987654321
assert isinstance(config.guild_id, int)
def test_config_converts_string_to_bool(self):
"""Test that boolean fields are properly converted."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'TESTING': 'false'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"TESTING": "false",
},
):
config = BotConfig()
assert config.testing is False
assert isinstance(config.testing, bool)
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'TESTING': '1'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"TESTING": "1",
},
):
config = BotConfig()
assert config.testing is True
def test_config_case_insensitive(self):
"""Test that environment variables are case insensitive."""
with patch.dict(os.environ, {
'bot_token': 'test_bot_token', # lowercase
'GUILD_ID': '123456789', # uppercase
'Api_Token': 'test_api_token', # mixed case
'db_url': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"bot_token": "test_bot_token", # lowercase
"GUILD_ID": "123456789", # uppercase
"Api_Token": "test_api_token", # mixed case
"db_url": "https://api.example.com",
},
):
config = BotConfig()
assert config.bot_token == 'test_bot_token'
assert config.api_token == 'test_api_token'
assert config.db_url == 'https://api.example.com'
assert config.bot_token == "test_bot_token"
assert config.api_token == "test_api_token"
assert config.db_url == "https://api.example.com"
def test_is_development_property(self):
"""Test the is_development property."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'ENVIRONMENT': 'development'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"ENVIRONMENT": "development",
},
):
config = BotConfig()
assert config.is_development is True
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'ENVIRONMENT': 'production'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"ENVIRONMENT": "production",
},
):
config = BotConfig()
assert config.is_development is False
def test_is_testing_property(self):
"""Test the is_testing property."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'TESTING': 'true'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"TESTING": "true",
},
):
config = BotConfig()
assert config.is_testing is True
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com',
'TESTING': 'false'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
"TESTING": "false",
},
):
config = BotConfig()
assert config.is_testing is False
@ -180,62 +218,79 @@ class TestConfigValidation:
def test_missing_required_field_raises_error(self):
"""Test that missing required fields raise validation errors."""
# Missing BOT_TOKEN
with patch.dict(os.environ, {
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}, clear=True):
with patch.dict(
os.environ,
{
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
clear=True,
):
with pytest.raises(Exception): # Pydantic ValidationError
BotConfig(_env_file=None)
# Missing GUILD_ID
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}, clear=True):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
clear=True,
):
with pytest.raises(Exception): # Pydantic ValidationError
BotConfig(_env_file=None)
def test_invalid_guild_id_raises_error(self):
"""Test that invalid guild_id values raise validation errors."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': 'not_a_number',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "not_a_number",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
):
with pytest.raises(Exception): # Pydantic ValidationError
BotConfig()
def test_empty_required_field_is_allowed(self):
"""Test that empty required fields are allowed (Pydantic default behavior)."""
with patch.dict(os.environ, {
'BOT_TOKEN': '', # Empty string
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "", # Empty string
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
):
# Should not raise - Pydantic allows empty strings by default
config = BotConfig()
assert config.bot_token == ''
assert config.bot_token == ""
@pytest.fixture
def valid_config():
"""Provide a valid configuration for testing."""
with patch.dict(os.environ, {
'BOT_TOKEN': 'test_bot_token',
'GUILD_ID': '123456789',
'API_TOKEN': 'test_api_token',
'DB_URL': 'https://api.example.com'
}):
with patch.dict(
os.environ,
{
"BOT_TOKEN": "test_bot_token",
"GUILD_ID": "123456789",
"API_TOKEN": "test_api_token",
"DB_URL": "https://api.example.com",
},
):
return BotConfig()
def test_config_fixture(valid_config):
"""Test that the valid_config fixture works correctly."""
assert valid_config.bot_token == 'test_bot_token'
assert valid_config.bot_token == "test_bot_token"
assert valid_config.guild_id == 123456789
assert valid_config.api_token == 'test_api_token'
assert valid_config.db_url == 'https://api.example.com'
assert valid_config.api_token == "test_api_token"
assert valid_config.db_url == "https://api.example.com"

View File

@ -3,6 +3,7 @@ Simplified tests for Custom Command models in Discord Bot v2.0
Testing dataclass models without Pydantic validation.
"""
import pytest
from datetime import datetime, timedelta, timezone
@ -11,7 +12,7 @@ from models.custom_command import (
CustomCommandCreator,
CustomCommandSearchFilters,
CustomCommandSearchResult,
CustomCommandStats
CustomCommandStats,
)
@ -28,7 +29,7 @@ class TestCustomCommandCreator:
display_name="Test User",
created_at=now,
total_commands=10,
active_commands=5
active_commands=5,
)
assert creator.id == 1
@ -49,7 +50,7 @@ class TestCustomCommandCreator:
display_name=None,
created_at=now,
total_commands=0,
active_commands=0
active_commands=0,
)
assert creator.display_name is None
@ -70,7 +71,7 @@ class TestCustomCommand:
display_name="Test User",
created_at=datetime.now(timezone.utc),
total_commands=5,
active_commands=5
active_commands=5,
)
def test_command_basic_creation(self, sample_creator: CustomCommandCreator):
@ -88,7 +89,7 @@ class TestCustomCommand:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
assert command.id == 1
@ -121,7 +122,7 @@ class TestCustomCommand:
use_count=25,
warning_sent=True,
is_active=True,
tags=["fun", "utility"]
tags=["fun", "utility"],
)
assert command.use_count == 25
@ -147,15 +148,19 @@ class TestCustomCommand:
use_count=1,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
# Mock datetime.utcnow for consistent testing
with pytest.MonkeyPatch().context() as m:
m.setattr('models.custom_command.datetime', type('MockDateTime', (), {
'utcnow': lambda: now,
'now': lambda: now
}))
m.setattr(
"models.custom_command.datetime",
type(
"MockDateTime",
(),
{"utcnow": lambda: now, "now": lambda tz=None: now},
),
)
assert command.days_since_last_use == 5
# Command never used
@ -171,7 +176,7 @@ class TestCustomCommand:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
assert unused_command.days_since_last_use is None
@ -193,14 +198,18 @@ class TestCustomCommand:
use_count=50,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
with pytest.MonkeyPatch().context() as m:
m.setattr('models.custom_command.datetime', type('MockDateTime', (), {
'utcnow': lambda: now,
'now': lambda: now
}))
m.setattr(
"models.custom_command.datetime",
type(
"MockDateTime",
(),
{"utcnow": lambda: now, "now": lambda tz=None: now},
),
)
score = recent_command.popularity_score
assert 0 <= score <= 15 # Can be higher due to recency bonus
assert score > 0 # Should have some score due to usage
@ -218,7 +227,7 @@ class TestCustomCommand:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
assert unused_command.popularity_score == 0
@ -250,7 +259,7 @@ class TestCustomCommandSearchFilters:
sort_by="popularity",
sort_desc=True,
page=2,
page_size=10
page_size=10,
)
assert filters.name_contains == "test"
@ -275,7 +284,7 @@ class TestCustomCommandSearchResult:
created_at=datetime.now(timezone.utc),
display_name=None,
total_commands=3,
active_commands=3
active_commands=3,
)
now = datetime.now(timezone.utc)
@ -292,7 +301,7 @@ class TestCustomCommandSearchResult:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
for i in range(3)
]
@ -305,7 +314,7 @@ class TestCustomCommandSearchResult:
page=1,
page_size=20,
total_pages=1,
has_more=False
has_more=False,
)
assert result.commands == sample_commands
@ -323,7 +332,7 @@ class TestCustomCommandSearchResult:
page=2,
page_size=20,
total_pages=3,
has_more=True
has_more=True,
)
assert result.start_index == 21 # (2-1) * 20 + 1
@ -342,7 +351,7 @@ class TestCustomCommandStats:
created_at=datetime.now(timezone.utc),
display_name=None,
total_commands=50,
active_commands=45
active_commands=45,
)
command = CustomCommand(
@ -357,7 +366,7 @@ class TestCustomCommandStats:
use_count=100,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
stats = CustomCommandStats(
@ -369,7 +378,7 @@ class TestCustomCommandStats:
most_active_creator=creator,
recent_commands_count=15,
commands_needing_warning=5,
commands_eligible_for_deletion=2
commands_eligible_for_deletion=2,
)
assert stats.total_commands == 100
@ -394,7 +403,7 @@ class TestCustomCommandStats:
most_active_creator=None,
recent_commands_count=0,
commands_needing_warning=0,
commands_eligible_for_deletion=0
commands_eligible_for_deletion=0,
)
assert stats.average_uses_per_command == 20.0 # 1000 / 50
@ -410,7 +419,7 @@ class TestCustomCommandStats:
most_active_creator=None,
recent_commands_count=0,
commands_needing_warning=0,
commands_eligible_for_deletion=0
commands_eligible_for_deletion=0,
)
assert empty_stats.average_uses_per_command == 0.0
@ -430,7 +439,7 @@ class TestModelIntegration:
display_name="Test User",
created_at=now,
total_commands=3,
active_commands=3
active_commands=3,
)
command = CustomCommand(
@ -445,7 +454,7 @@ class TestModelIntegration:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
# Verify relationship
@ -457,11 +466,7 @@ class TestModelIntegration:
def test_search_result_with_filters(self):
"""Test search result creation with filters."""
filters = CustomCommandSearchFilters(
name_contains="test",
min_uses=5,
sort_by="popularity",
page=2,
page_size=10
name_contains="test", min_uses=5, sort_by="popularity", page=2, page_size=10
)
creator = CustomCommandCreator(
@ -471,7 +476,7 @@ class TestModelIntegration:
created_at=datetime.now(timezone.utc),
display_name=None,
total_commands=1,
active_commands=1
active_commands=1,
)
commands = [
@ -487,7 +492,7 @@ class TestModelIntegration:
use_count=0,
warning_sent=False,
is_active=True,
tags=None
tags=None,
)
]
@ -497,7 +502,7 @@ class TestModelIntegration:
page=filters.page,
page_size=filters.page_size,
total_pages=3,
has_more=True
has_more=True,
)
assert result.page == 2

View File

@ -3,15 +3,16 @@ Tests for Help Command models
Validates model creation, validation, and business logic.
"""
import pytest
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from pydantic import ValidationError
from models.help_command import (
HelpCommand,
HelpCommandSearchFilters,
HelpCommandSearchResult,
HelpCommandStats
HelpCommandStats,
)
@ -22,133 +23,133 @@ class TestHelpCommandModel:
"""Test help command creation with minimal required fields."""
help_cmd = HelpCommand(
id=1,
name='test-topic',
title='Test Topic',
content='This is test content',
created_by_discord_id='123456789',
created_at=datetime.now()
name="test-topic",
title="Test Topic",
content="This is test content",
created_by_discord_id="123456789",
created_at=datetime.now(UTC),
)
assert help_cmd.id == 1
assert help_cmd.name == 'test-topic'
assert help_cmd.title == 'Test Topic'
assert help_cmd.content == 'This is test content'
assert help_cmd.created_by_discord_id == '123456789'
assert help_cmd.name == "test-topic"
assert help_cmd.title == "Test Topic"
assert help_cmd.content == "This is test content"
assert help_cmd.created_by_discord_id == "123456789"
assert help_cmd.is_active is True
assert help_cmd.view_count == 0
def test_help_command_creation_with_optional_fields(self):
"""Test help command creation with all optional fields."""
now = datetime.now()
now = datetime.now(UTC)
help_cmd = HelpCommand(
id=2,
name='trading-rules',
title='Trading Rules & Guidelines',
content='Complete trading rules...',
category='rules',
created_by_discord_id='123456789',
name="trading-rules",
title="Trading Rules & Guidelines",
content="Complete trading rules...",
category="rules",
created_by_discord_id="123456789",
created_at=now,
updated_at=now,
last_modified_by='987654321',
last_modified_by="987654321",
is_active=True,
view_count=100,
display_order=10
display_order=10,
)
assert help_cmd.category == 'rules'
assert help_cmd.category == "rules"
assert help_cmd.updated_at == now
assert help_cmd.last_modified_by == '987654321'
assert help_cmd.last_modified_by == "987654321"
assert help_cmd.view_count == 100
assert help_cmd.display_order == 10
def test_help_command_name_validation(self):
"""Test help command name validation."""
base_data = {
'id': 3,
'title': 'Test',
'content': 'Content',
'created_by_discord_id': '123',
'created_at': datetime.now()
"id": 3,
"title": "Test",
"content": "Content",
"created_by_discord_id": "123",
"created_at": datetime.now(UTC),
}
# Valid names
valid_names = ['test', 'test-topic', 'test_topic', 'test123', 'abc']
valid_names = ["test", "test-topic", "test_topic", "test123", "abc"]
for name in valid_names:
help_cmd = HelpCommand(name=name, **base_data)
assert help_cmd.name == name.lower()
# Invalid names - too short
with pytest.raises(ValidationError):
HelpCommand(name='a', **base_data)
HelpCommand(name="a", **base_data)
# Invalid names - too long
with pytest.raises(ValidationError):
HelpCommand(name='a' * 33, **base_data)
HelpCommand(name="a" * 33, **base_data)
# Invalid names - special characters
with pytest.raises(ValidationError):
HelpCommand(name='test@topic', **base_data)
HelpCommand(name="test@topic", **base_data)
with pytest.raises(ValidationError):
HelpCommand(name='test topic', **base_data)
HelpCommand(name="test topic", **base_data)
def test_help_command_title_validation(self):
"""Test help command title validation."""
base_data = {
'id': 4,
'name': 'test',
'content': 'Content',
'created_by_discord_id': '123',
'created_at': datetime.now()
"id": 4,
"name": "test",
"content": "Content",
"created_by_discord_id": "123",
"created_at": datetime.now(UTC),
}
# Valid title
help_cmd = HelpCommand(title='Test Topic', **base_data)
assert help_cmd.title == 'Test Topic'
help_cmd = HelpCommand(title="Test Topic", **base_data)
assert help_cmd.title == "Test Topic"
# Empty title
with pytest.raises(ValidationError):
HelpCommand(title='', **base_data)
HelpCommand(title="", **base_data)
# Title too long
with pytest.raises(ValidationError):
HelpCommand(title='a' * 201, **base_data)
HelpCommand(title="a" * 201, **base_data)
def test_help_command_content_validation(self):
"""Test help command content validation."""
base_data = {
'id': 5,
'name': 'test',
'title': 'Test',
'created_by_discord_id': '123',
'created_at': datetime.now()
"id": 5,
"name": "test",
"title": "Test",
"created_by_discord_id": "123",
"created_at": datetime.now(UTC),
}
# Valid content
help_cmd = HelpCommand(content='Test content', **base_data)
assert help_cmd.content == 'Test content'
help_cmd = HelpCommand(content="Test content", **base_data)
assert help_cmd.content == "Test content"
# Empty content
with pytest.raises(ValidationError):
HelpCommand(content='', **base_data)
HelpCommand(content="", **base_data)
# Content too long
with pytest.raises(ValidationError):
HelpCommand(content='a' * 4001, **base_data)
HelpCommand(content="a" * 4001, **base_data)
def test_help_command_category_validation(self):
"""Test help command category validation."""
base_data = {
'id': 6,
'name': 'test',
'title': 'Test',
'content': 'Content',
'created_by_discord_id': '123',
'created_at': datetime.now()
"id": 6,
"name": "test",
"title": "Test",
"content": "Content",
"created_by_discord_id": "123",
"created_at": datetime.now(UTC),
}
# Valid categories
valid_categories = ['rules', 'guides', 'resources', 'info', 'faq']
valid_categories = ["rules", "guides", "resources", "info", "faq"]
for category in valid_categories:
help_cmd = HelpCommand(category=category, **base_data)
assert help_cmd.category == category.lower()
@ -159,28 +160,28 @@ class TestHelpCommandModel:
# Invalid category - special characters
with pytest.raises(ValidationError):
HelpCommand(category='test@category', **base_data)
HelpCommand(category="test@category", **base_data)
def test_help_command_is_deleted_property(self):
"""Test is_deleted property."""
active = HelpCommand(
id=7,
name='active',
title='Active Topic',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
is_active=True
name="active",
title="Active Topic",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
is_active=True,
)
deleted = HelpCommand(
id=8,
name='deleted',
title='Deleted Topic',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
is_active=False
name="deleted",
title="Deleted Topic",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
is_active=False,
)
assert active.is_deleted is False
@ -191,24 +192,24 @@ class TestHelpCommandModel:
# No updates
no_update = HelpCommand(
id=9,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
updated_at=None
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
updated_at=None,
)
assert no_update.days_since_update is None
# Recent update
recent = HelpCommand(
id=10,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
updated_at=datetime.now() - timedelta(days=5)
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC) - timedelta(days=5),
)
assert recent.days_since_update == 5
@ -216,11 +217,11 @@ class TestHelpCommandModel:
"""Test days_since_creation property."""
old = HelpCommand(
id=11,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now() - timedelta(days=30)
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC) - timedelta(days=30),
)
assert old.days_since_creation == 30
@ -229,24 +230,24 @@ class TestHelpCommandModel:
# No views
no_views = HelpCommand(
id=12,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
view_count=0
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
view_count=0,
)
assert no_views.popularity_score == 0.0
# New topic with views
new_popular = HelpCommand(
id=13,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now() - timedelta(days=5),
view_count=50
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC) - timedelta(days=5),
view_count=50,
)
score = new_popular.popularity_score
assert score > 5.0 # Base score (5.0) with new topic bonus (1.5x)
@ -254,12 +255,12 @@ class TestHelpCommandModel:
# Old topic with views
old_popular = HelpCommand(
id=14,
name='test',
title='Test',
content='Content',
created_by_discord_id='123',
created_at=datetime.now() - timedelta(days=100),
view_count=50
name="test",
title="Test",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC) - timedelta(days=100),
view_count=50,
)
old_score = old_popular.popularity_score
assert old_score < new_popular.popularity_score # Older topics get penalty
@ -275,7 +276,7 @@ class TestHelpCommandSearchFilters:
assert filters.name_contains is None
assert filters.category is None
assert filters.is_active is True
assert filters.sort_by == 'name'
assert filters.sort_by == "name"
assert filters.sort_desc is False
assert filters.page == 1
assert filters.page_size == 25
@ -283,19 +284,19 @@ class TestHelpCommandSearchFilters:
def test_search_filters_custom_values(self):
"""Test search filters with custom values."""
filters = HelpCommandSearchFilters(
name_contains='trading',
category='rules',
name_contains="trading",
category="rules",
is_active=False,
sort_by='view_count',
sort_by="view_count",
sort_desc=True,
page=2,
page_size=50
page_size=50,
)
assert filters.name_contains == 'trading'
assert filters.category == 'rules'
assert filters.name_contains == "trading"
assert filters.category == "rules"
assert filters.is_active is False
assert filters.sort_by == 'view_count'
assert filters.sort_by == "view_count"
assert filters.sort_desc is True
assert filters.page == 2
assert filters.page_size == 50
@ -303,14 +304,22 @@ class TestHelpCommandSearchFilters:
def test_search_filters_sort_by_validation(self):
"""Test sort_by field validation."""
# Valid sort fields
valid_sorts = ['name', 'title', 'category', 'created_at', 'updated_at', 'view_count', 'display_order']
valid_sorts = [
"name",
"title",
"category",
"created_at",
"updated_at",
"view_count",
"display_order",
]
for sort_field in valid_sorts:
filters = HelpCommandSearchFilters(sort_by=sort_field)
assert filters.sort_by == sort_field
# Invalid sort field
with pytest.raises(ValidationError):
HelpCommandSearchFilters(sort_by='invalid_field')
HelpCommandSearchFilters(sort_by="invalid_field")
def test_search_filters_page_validation(self):
"""Test page number validation."""
@ -353,11 +362,11 @@ class TestHelpCommandSearchResult:
help_commands = [
HelpCommand(
id=i,
name=f'topic-{i}',
title=f'Topic {i}',
content=f'Content {i}',
created_by_discord_id='123',
created_at=datetime.now()
name=f"topic-{i}",
title=f"Topic {i}",
content=f"Content {i}",
created_by_discord_id="123",
created_at=datetime.now(UTC),
)
for i in range(1, 11)
]
@ -368,7 +377,7 @@ class TestHelpCommandSearchResult:
page=1,
page_size=10,
total_pages=5,
has_more=True
has_more=True,
)
assert len(result.help_commands) == 10
@ -386,7 +395,7 @@ class TestHelpCommandSearchResult:
page=3,
page_size=25,
total_pages=4,
has_more=True
has_more=True,
)
assert result.start_index == 51 # (3-1) * 25 + 1
@ -400,7 +409,7 @@ class TestHelpCommandSearchResult:
page=3,
page_size=25,
total_pages=3,
has_more=False
has_more=False,
)
assert result.end_index == 55 # min(3 * 25, 55)
@ -412,7 +421,7 @@ class TestHelpCommandSearchResult:
page=2,
page_size=25,
total_pages=4,
has_more=True
has_more=True,
)
assert result.end_index == 50 # min(2 * 25, 100)
@ -428,7 +437,7 @@ class TestHelpCommandStats:
active_commands=45,
total_views=1000,
most_viewed_command=None,
recent_commands_count=5
recent_commands_count=5,
)
assert stats.total_commands == 50
@ -441,12 +450,12 @@ class TestHelpCommandStats:
"""Test stats with most viewed command."""
most_viewed = HelpCommand(
id=1,
name='popular-topic',
title='Popular Topic',
content='Content',
created_by_discord_id='123',
created_at=datetime.now(),
view_count=500
name="popular-topic",
title="Popular Topic",
content="Content",
created_by_discord_id="123",
created_at=datetime.now(UTC),
view_count=500,
)
stats = HelpCommandStats(
@ -454,11 +463,11 @@ class TestHelpCommandStats:
active_commands=45,
total_views=1000,
most_viewed_command=most_viewed,
recent_commands_count=5
recent_commands_count=5,
)
assert stats.most_viewed_command is not None
assert stats.most_viewed_command.name == 'popular-topic'
assert stats.most_viewed_command.name == "popular-topic"
assert stats.most_viewed_command.view_count == 500
def test_stats_average_views_per_command(self):
@ -469,7 +478,7 @@ class TestHelpCommandStats:
active_commands=40,
total_views=800,
most_viewed_command=None,
recent_commands_count=5
recent_commands_count=5,
)
assert stats.average_views_per_command == 20.0 # 800 / 40
@ -480,7 +489,7 @@ class TestHelpCommandStats:
active_commands=0,
total_views=0,
most_viewed_command=None,
recent_commands_count=0
recent_commands_count=0,
)
assert stats.average_views_per_command == 0.0
@ -492,44 +501,44 @@ class TestHelpCommandFromAPIData:
def test_from_api_data_complete(self):
"""Test from_api_data with complete data."""
api_data = {
'id': 1,
'name': 'trading-rules',
'title': 'Trading Rules & Guidelines',
'content': 'Complete trading rules...',
'category': 'rules',
'created_by_discord_id': '123456789',
'created_at': '2025-01-01T12:00:00',
'updated_at': '2025-01-10T15:30:00',
'last_modified_by': '987654321',
'is_active': True,
'view_count': 100,
'display_order': 10
"id": 1,
"name": "trading-rules",
"title": "Trading Rules & Guidelines",
"content": "Complete trading rules...",
"category": "rules",
"created_by_discord_id": "123456789",
"created_at": "2025-01-01T12:00:00",
"updated_at": "2025-01-10T15:30:00",
"last_modified_by": "987654321",
"is_active": True,
"view_count": 100,
"display_order": 10,
}
help_cmd = HelpCommand.from_api_data(api_data)
assert help_cmd.id == 1
assert help_cmd.name == 'trading-rules'
assert help_cmd.title == 'Trading Rules & Guidelines'
assert help_cmd.content == 'Complete trading rules...'
assert help_cmd.category == 'rules'
assert help_cmd.name == "trading-rules"
assert help_cmd.title == "Trading Rules & Guidelines"
assert help_cmd.content == "Complete trading rules..."
assert help_cmd.category == "rules"
assert help_cmd.view_count == 100
def test_from_api_data_minimal(self):
"""Test from_api_data with minimal required data."""
api_data = {
'id': 2,
'name': 'simple-topic',
'title': 'Simple Topic',
'content': 'Simple content',
'created_by_discord_id': '123456789',
'created_at': '2025-01-01T12:00:00'
"id": 2,
"name": "simple-topic",
"title": "Simple Topic",
"content": "Simple content",
"created_by_discord_id": "123456789",
"created_at": "2025-01-01T12:00:00",
}
help_cmd = HelpCommand.from_api_data(api_data)
assert help_cmd.id == 2
assert help_cmd.name == 'simple-topic'
assert help_cmd.name == "simple-topic"
assert help_cmd.category is None
assert help_cmd.updated_at is None
assert help_cmd.view_count == 0

File diff suppressed because it is too large Load Diff