UNSET sentinel pattern: - Add UNSET sentinel in protocols.py for nullable field updates - Fix inability to clear deck description (UNSET=keep, None=clear) - Fix repository inability to clear validation_errors Starter deck improvements: - Remove unused has_starter_deck from CollectionService - Add deprecation notes to old starter deck methods Validation improvements: - Add energy type validation in deck_validator.py - Add energy type validation in deck schemas - Add VALID_ENERGY_TYPES constant Game loading fix: - Fix get_deck_for_game silently skipping invalid cards - Now raises ValueError with clear error message Tests: - Add TestEnergyTypeValidation test class - Add TestGetDeckForGame test class - Add tests for validate_energy_types utility function Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
556 lines
19 KiB
Python
556 lines
19 KiB
Python
"""Deck service for Mantimon TCG.
|
|
|
|
This module provides business logic for deck management. It uses
|
|
the DeckRepository protocol for data access and pure validation functions.
|
|
|
|
The service layer handles:
|
|
- Deck slot limits (free vs premium users)
|
|
- Deck validation with optional ownership checking
|
|
- Starter deck creation
|
|
|
|
The backend is stateless - deck rules come from the request via DeckConfig.
|
|
|
|
Example:
|
|
from app.core.config import DeckConfig
|
|
from app.services.card_service import CardService
|
|
from app.services.deck_service import DeckService
|
|
from app.repositories.postgres import PostgresDeckRepository, PostgresCollectionRepository
|
|
|
|
# Create dependencies
|
|
card_service = CardService()
|
|
card_service.load_all()
|
|
|
|
# Create repositories
|
|
deck_repo = PostgresDeckRepository(db_session)
|
|
collection_repo = PostgresCollectionRepository(db_session)
|
|
|
|
# Create service
|
|
service = DeckService(deck_repo, card_service, collection_repo)
|
|
|
|
# Create a deck - rules provided by caller
|
|
deck = await service.create_deck(
|
|
user_id=user_id,
|
|
name="My Deck",
|
|
cards={"a1-001-bulbasaur": 4, ...},
|
|
energy_cards={"grass": 14, "colorless": 6},
|
|
deck_config=DeckConfig(), # Rules from frontend
|
|
max_decks=5, # From user.max_decks
|
|
)
|
|
"""
|
|
|
|
import logging
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from app.core.config import DeckConfig
|
|
from app.core.models.card import CardDefinition
|
|
from app.repositories.protocols import (
|
|
UNSET,
|
|
CollectionRepository,
|
|
DeckEntry,
|
|
DeckRepository,
|
|
)
|
|
from app.services.card_service import CardService
|
|
from app.services.deck_validator import ValidationResult, validate_deck
|
|
|
|
|
|
class DeckLimitExceededError(Exception):
|
|
"""Raised when user tries to create more decks than allowed."""
|
|
|
|
pass
|
|
|
|
|
|
class StarterAlreadySelectedError(Exception):
|
|
"""Raised when user tries to select a starter deck they already have."""
|
|
|
|
pass
|
|
|
|
|
|
class DeckNotFoundError(Exception):
|
|
"""Raised when deck is not found or not owned by user."""
|
|
|
|
pass
|
|
|
|
|
|
class DeckService:
|
|
"""Service for deck business logic.
|
|
|
|
Uses repository pattern for data access, enabling:
|
|
- Easy unit testing with mock repositories
|
|
- Multiple storage backends
|
|
- Offline fork support
|
|
|
|
The backend is stateless - deck rules come from the request via DeckConfig.
|
|
|
|
Attributes:
|
|
_deck_repo: The deck repository implementation.
|
|
_collection_repo: The collection repository (for ownership checks).
|
|
_card_service: The card service for card lookups.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
deck_repository: DeckRepository,
|
|
card_service: CardService,
|
|
collection_repository: CollectionRepository | None = None,
|
|
) -> None:
|
|
"""Initialize with dependencies.
|
|
|
|
Args:
|
|
deck_repository: Implementation of DeckRepository protocol.
|
|
card_service: Card service for looking up card definitions.
|
|
collection_repository: Implementation of CollectionRepository protocol.
|
|
Required for ownership validation in campaign mode.
|
|
"""
|
|
self._deck_repo = deck_repository
|
|
self._card_service = card_service
|
|
self._collection_repo = collection_repository
|
|
|
|
async def create_deck(
|
|
self,
|
|
user_id: UUID,
|
|
name: str,
|
|
cards: dict[str, int],
|
|
energy_cards: dict[str, int],
|
|
deck_config: DeckConfig,
|
|
max_decks: int,
|
|
validate_ownership: bool = True,
|
|
is_starter: bool = False,
|
|
starter_type: str | None = None,
|
|
description: str | None = None,
|
|
) -> DeckEntry:
|
|
"""Create a new deck.
|
|
|
|
Validates the deck and stores validation results. Invalid decks
|
|
CAN be saved (with errors) to support work-in-progress decks.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
name: Display name for the deck.
|
|
cards: Card ID to quantity mapping.
|
|
energy_cards: Energy type to quantity mapping.
|
|
deck_config: Deck rules from the caller (frontend provides this).
|
|
max_decks: Maximum decks allowed (from user.max_decks).
|
|
validate_ownership: If True, checks card ownership (campaign mode).
|
|
is_starter: Whether this is a starter deck.
|
|
starter_type: Type of starter deck if applicable.
|
|
description: Optional deck description.
|
|
|
|
Returns:
|
|
The created DeckEntry.
|
|
|
|
Raises:
|
|
DeckLimitExceededError: If user has reached deck limit.
|
|
|
|
Example:
|
|
deck = await service.create_deck(
|
|
user_id=user_id,
|
|
name="Grass Power",
|
|
cards={"a1-001-bulbasaur": 4, ...},
|
|
energy_cards={"grass": 14, "colorless": 6},
|
|
deck_config=DeckConfig(),
|
|
max_decks=5,
|
|
)
|
|
"""
|
|
# Check deck limit
|
|
current_count = await self._deck_repo.count_by_user(user_id)
|
|
if current_count >= max_decks:
|
|
raise DeckLimitExceededError(
|
|
f"Deck limit reached ({current_count}/{max_decks}). "
|
|
"Upgrade to premium for unlimited decks."
|
|
)
|
|
|
|
# Validate deck
|
|
validation = await self._validate_deck_internal(
|
|
cards, energy_cards, deck_config, user_id if validate_ownership else None
|
|
)
|
|
|
|
return await self._deck_repo.create(
|
|
user_id=user_id,
|
|
name=name,
|
|
cards=cards,
|
|
energy_cards=energy_cards,
|
|
is_valid=validation.is_valid,
|
|
validation_errors=validation.errors if validation.errors else None,
|
|
is_starter=is_starter,
|
|
starter_type=starter_type,
|
|
description=description,
|
|
)
|
|
|
|
async def update_deck(
|
|
self,
|
|
user_id: UUID,
|
|
deck_id: UUID,
|
|
deck_config: DeckConfig,
|
|
name: str | None = None,
|
|
cards: dict[str, int] | None = None,
|
|
energy_cards: dict[str, int] | None = None,
|
|
validate_ownership: bool = True,
|
|
description: str | None = UNSET, # type: ignore[assignment]
|
|
) -> DeckEntry:
|
|
"""Update an existing deck.
|
|
|
|
Re-validates if cards or energy_cards change.
|
|
|
|
Args:
|
|
user_id: The user's UUID (for ownership verification).
|
|
deck_id: The deck's UUID.
|
|
deck_config: Deck rules from the caller (frontend provides this).
|
|
name: New name (optional, None keeps existing).
|
|
cards: New card composition (optional, None keeps existing).
|
|
energy_cards: New energy composition (optional, None keeps existing).
|
|
validate_ownership: If True, checks card ownership (campaign mode).
|
|
description: New description (UNSET=keep, None=clear, str=set).
|
|
|
|
Returns:
|
|
The updated DeckEntry.
|
|
|
|
Raises:
|
|
DeckNotFoundError: If deck not found or not owned by user.
|
|
"""
|
|
# Verify ownership
|
|
deck = await self._deck_repo.get_user_deck(user_id, deck_id)
|
|
if deck is None:
|
|
raise DeckNotFoundError(f"Deck {deck_id} not found or not owned by user")
|
|
|
|
# Determine if we need to re-validate
|
|
needs_revalidation = cards is not None or energy_cards is not None
|
|
|
|
# Use existing values if not provided
|
|
final_cards = cards if cards is not None else deck.cards
|
|
final_energy = energy_cards if energy_cards is not None else deck.energy_cards
|
|
|
|
# Re-validate if needed
|
|
is_valid = deck.is_valid
|
|
validation_errors = deck.validation_errors
|
|
|
|
if needs_revalidation:
|
|
validation = await self._validate_deck_internal(
|
|
final_cards,
|
|
final_energy,
|
|
deck_config,
|
|
user_id if validate_ownership else None,
|
|
)
|
|
is_valid = validation.is_valid
|
|
validation_errors = validation.errors if validation.errors else None
|
|
|
|
result = await self._deck_repo.update(
|
|
deck_id=deck_id,
|
|
name=name,
|
|
cards=cards,
|
|
energy_cards=energy_cards,
|
|
is_valid=is_valid,
|
|
validation_errors=validation_errors,
|
|
description=description,
|
|
)
|
|
|
|
if result is None:
|
|
raise DeckNotFoundError(f"Deck {deck_id} not found")
|
|
|
|
return result
|
|
|
|
async def delete_deck(self, user_id: UUID, deck_id: UUID) -> bool:
|
|
"""Delete a deck.
|
|
|
|
Args:
|
|
user_id: The user's UUID (for ownership verification).
|
|
deck_id: The deck's UUID.
|
|
|
|
Returns:
|
|
True if deleted.
|
|
|
|
Raises:
|
|
DeckNotFoundError: If deck not found or not owned by user.
|
|
"""
|
|
# Verify ownership
|
|
deck = await self._deck_repo.get_user_deck(user_id, deck_id)
|
|
if deck is None:
|
|
raise DeckNotFoundError(f"Deck {deck_id} not found or not owned by user")
|
|
|
|
return await self._deck_repo.delete(deck_id)
|
|
|
|
async def get_deck(self, user_id: UUID, deck_id: UUID) -> DeckEntry:
|
|
"""Get a deck owned by user.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
deck_id: The deck's UUID.
|
|
|
|
Returns:
|
|
The DeckEntry.
|
|
|
|
Raises:
|
|
DeckNotFoundError: If deck not found or not owned by user.
|
|
"""
|
|
deck = await self._deck_repo.get_user_deck(user_id, deck_id)
|
|
if deck is None:
|
|
raise DeckNotFoundError(f"Deck {deck_id} not found or not owned by user")
|
|
return deck
|
|
|
|
async def get_user_decks(self, user_id: UUID) -> list[DeckEntry]:
|
|
"""Get all decks for a user.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
|
|
Returns:
|
|
List of all user's decks.
|
|
"""
|
|
return await self._deck_repo.get_by_user(user_id)
|
|
|
|
async def can_create_deck(self, user_id: UUID, max_decks: int) -> bool:
|
|
"""Check if user can create another deck.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
max_decks: Maximum decks allowed (from user.max_decks).
|
|
|
|
Returns:
|
|
True if user can create another deck.
|
|
"""
|
|
current_count = await self._deck_repo.count_by_user(user_id)
|
|
return current_count < max_decks
|
|
|
|
async def get_deck_count(self, user_id: UUID) -> int:
|
|
"""Get number of decks user has.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
|
|
Returns:
|
|
Number of decks.
|
|
"""
|
|
return await self._deck_repo.count_by_user(user_id)
|
|
|
|
async def _validate_deck_internal(
|
|
self,
|
|
cards: dict[str, int],
|
|
energy_cards: dict[str, int],
|
|
deck_config: DeckConfig,
|
|
user_id: UUID | None = None,
|
|
) -> ValidationResult:
|
|
"""Internal method to validate a deck composition.
|
|
|
|
Args:
|
|
cards: Card ID to quantity mapping.
|
|
energy_cards: Energy type to quantity mapping.
|
|
deck_config: Deck rules from the caller.
|
|
user_id: If provided, validates card ownership (campaign mode).
|
|
Pass None for freeplay mode.
|
|
|
|
Returns:
|
|
ValidationResult with is_valid and errors.
|
|
"""
|
|
owned_cards: dict[str, int] | None = None
|
|
if user_id is not None and self._collection_repo is not None:
|
|
# Get user's collection for ownership validation
|
|
collection = await self._collection_repo.get_all(user_id)
|
|
owned_cards = {entry.card_definition_id: entry.quantity for entry in collection}
|
|
|
|
return validate_deck(
|
|
cards=cards,
|
|
energy_cards=energy_cards,
|
|
deck_config=deck_config,
|
|
card_lookup=self._card_service.get_card,
|
|
owned_cards=owned_cards,
|
|
)
|
|
|
|
async def get_deck_for_game(self, user_id: UUID, deck_id: UUID) -> list[CardDefinition]:
|
|
"""Expand a deck to a list of CardDefinitions for game use.
|
|
|
|
Used by GameEngine to create a game from a deck.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
deck_id: The deck's UUID.
|
|
|
|
Returns:
|
|
List of CardDefinition objects for each card in the deck
|
|
(duplicates included based on quantity).
|
|
|
|
Raises:
|
|
DeckNotFoundError: If deck not found or not owned by user.
|
|
ValueError: If deck contains invalid card IDs that cannot be resolved.
|
|
"""
|
|
deck = await self.get_deck(user_id, deck_id)
|
|
|
|
cards: list[CardDefinition] = []
|
|
invalid_card_ids: list[str] = []
|
|
|
|
for card_id, quantity in deck.cards.items():
|
|
card_def = self._card_service.get_card(card_id)
|
|
if card_def is not None:
|
|
cards.extend([card_def] * quantity)
|
|
else:
|
|
invalid_card_ids.append(card_id)
|
|
|
|
if invalid_card_ids:
|
|
logger = logging.getLogger(__name__)
|
|
logger.warning(
|
|
f"Deck {deck_id} contains invalid card IDs: {invalid_card_ids[:5]}"
|
|
+ (f" (and {len(invalid_card_ids) - 5} more)" if len(invalid_card_ids) > 5 else "")
|
|
)
|
|
raise ValueError(
|
|
f"Deck contains {len(invalid_card_ids)} invalid card(s) that cannot be found: "
|
|
f"{', '.join(invalid_card_ids[:5])}"
|
|
+ (f" (and {len(invalid_card_ids) - 5} more)" if len(invalid_card_ids) > 5 else "")
|
|
)
|
|
|
|
return cards
|
|
|
|
async def has_starter_deck(self, user_id: UUID) -> tuple[bool, str | None]:
|
|
"""Check if user has a starter deck.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
|
|
Returns:
|
|
Tuple of (has_starter, starter_type).
|
|
"""
|
|
return await self._deck_repo.has_starter(user_id)
|
|
|
|
async def create_starter_deck(
|
|
self,
|
|
user_id: UUID,
|
|
starter_type: str,
|
|
deck_config: DeckConfig,
|
|
max_decks: int,
|
|
) -> DeckEntry:
|
|
"""Create a starter deck for a user.
|
|
|
|
.. deprecated::
|
|
Use select_and_grant_starter_deck() instead, which atomically
|
|
creates the deck AND grants cards with race condition protection.
|
|
|
|
This creates the deck but does NOT grant the cards to collection.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
starter_type: Type of starter deck (grass, fire, water, etc.).
|
|
deck_config: Deck rules from the caller (frontend provides this).
|
|
max_decks: Maximum decks allowed (from user.max_decks).
|
|
|
|
Returns:
|
|
The created starter DeckEntry.
|
|
|
|
Raises:
|
|
ValueError: If starter_type is invalid.
|
|
DeckLimitExceededError: If user has reached deck limit.
|
|
"""
|
|
from app.data.starter_decks import STARTER_TYPES, get_starter_deck
|
|
|
|
if starter_type not in STARTER_TYPES:
|
|
raise ValueError(
|
|
f"Invalid starter type: {starter_type}. "
|
|
f"Must be one of: {', '.join(STARTER_TYPES)}"
|
|
)
|
|
|
|
starter = get_starter_deck(starter_type)
|
|
|
|
return await self.create_deck(
|
|
user_id=user_id,
|
|
name=starter["name"],
|
|
cards=starter["cards"],
|
|
energy_cards=starter["energy_cards"],
|
|
deck_config=deck_config,
|
|
max_decks=max_decks,
|
|
validate_ownership=False, # Starter decks skip ownership check
|
|
is_starter=True,
|
|
starter_type=starter_type,
|
|
description=starter["description"],
|
|
)
|
|
|
|
async def select_and_grant_starter_deck(
|
|
self,
|
|
user_id: UUID,
|
|
starter_type: str,
|
|
deck_config: DeckConfig,
|
|
max_decks: int,
|
|
) -> DeckEntry:
|
|
"""Select a starter deck, granting cards and creating the deck atomically.
|
|
|
|
This is the preferred method for starter deck selection. It handles:
|
|
- Validation of starter type
|
|
- Race condition protection via database unique constraint
|
|
- Creating the deck and granting cards together
|
|
|
|
The database has a partial unique index on (user_id) WHERE is_starter=true,
|
|
which prevents duplicate starter decks even under concurrent requests.
|
|
|
|
Args:
|
|
user_id: The user's UUID.
|
|
starter_type: Type of starter deck (grass, fire, water, etc.).
|
|
deck_config: Deck rules from the caller (frontend provides this).
|
|
max_decks: Maximum decks allowed (from user.max_decks).
|
|
|
|
Returns:
|
|
The created starter DeckEntry.
|
|
|
|
Raises:
|
|
ValueError: If starter_type is invalid.
|
|
StarterAlreadySelectedError: If user already has a starter deck.
|
|
DeckLimitExceededError: If user has reached deck limit.
|
|
"""
|
|
from app.data.starter_decks import STARTER_TYPES, get_starter_deck
|
|
from app.db.models.collection import CardSource
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Validate starter type early
|
|
if starter_type not in STARTER_TYPES:
|
|
raise ValueError(
|
|
f"Invalid starter type: {starter_type}. "
|
|
f"Must be one of: {', '.join(STARTER_TYPES)}"
|
|
)
|
|
|
|
# Fast path: check if user already has a starter
|
|
has_starter, existing_type = await self.has_starter_deck(user_id)
|
|
if has_starter:
|
|
raise StarterAlreadySelectedError(f"Starter deck already selected: {existing_type}")
|
|
|
|
starter = get_starter_deck(starter_type)
|
|
|
|
# Try to create the deck first - protected by unique constraint
|
|
# If two concurrent requests try this, one will fail with IntegrityError
|
|
try:
|
|
deck = await self.create_deck(
|
|
user_id=user_id,
|
|
name=starter["name"],
|
|
cards=starter["cards"],
|
|
energy_cards=starter["energy_cards"],
|
|
deck_config=deck_config,
|
|
max_decks=max_decks,
|
|
validate_ownership=False,
|
|
is_starter=True,
|
|
starter_type=starter_type,
|
|
description=starter["description"],
|
|
)
|
|
except IntegrityError as e:
|
|
# Unique constraint violation - another request beat us
|
|
logger.info(f"Starter deck creation race condition for user {user_id}: {e}")
|
|
raise StarterAlreadySelectedError(
|
|
"Starter deck already selected (concurrent request)"
|
|
) from None
|
|
|
|
# Deck created successfully - now grant the cards
|
|
# This should never fail for valid starter types
|
|
if self._collection_repo is not None:
|
|
for card_id, quantity in starter["cards"].items():
|
|
await self._collection_repo.upsert(
|
|
user_id=user_id,
|
|
card_definition_id=card_id,
|
|
quantity_delta=quantity,
|
|
source=CardSource.STARTER,
|
|
)
|
|
for energy_type, quantity in starter["energy_cards"].items():
|
|
# Energy cards are stored as "energy-{type}" in collection
|
|
await self._collection_repo.upsert(
|
|
user_id=user_id,
|
|
card_definition_id=f"energy-{energy_type}",
|
|
quantity_delta=quantity,
|
|
source=CardSource.STARTER,
|
|
)
|
|
|
|
return deck
|