mantimon-tcg/backend/app/core/engine.py
Cal Corum e7431e2d1f Move enums to app/core/enums.py and set up clean module exports
Architectural refactor to eliminate circular imports and establish clean
module boundaries:

- Move enums from app/core/models/enums.py to app/core/enums.py
  (foundational module with zero dependencies)
- Update all imports across 30 files to use new enum location
- Set up clean export structure:
  - app.core.enums: canonical source for all enums
  - app.core: convenience exports for full public API
  - app.core.models: exports models only (not enums)
- Add module exports to app/core/__init__.py and app/core/effects/__init__.py
- Remove circular import workarounds from game_state.py

This enables app.core.models to export GameState without circular import
issues, since enums no longer depend on the models package.

All 826 tests passing.
2026-01-26 14:45:26 -06:00

1091 lines
37 KiB
Python

"""Main GameEngine orchestrator for the Mantimon TCG game engine.
This module provides the GameEngine class, which is the primary public API
for all game operations. It orchestrates:
- Game creation and initialization
- Action validation and execution
- Turn and phase management
- Win condition checking
- Visibility filtering for clients
The GameEngine is designed to be the single entry point for game operations,
ensuring all actions are properly validated and state is consistently updated.
Usage:
from app.core.engine import GameEngine
from app.core.config import RulesConfig
from app.core.rng import create_rng
# Create engine with rules and RNG
engine = GameEngine(rules=RulesConfig(), rng=create_rng())
# Create a new game
game = engine.create_game(
player_ids=["player1", "player2"],
decks={"player1": deck1, "player2": deck2},
card_registry=registry,
)
# Execute an action
result = await engine.execute_action(game, "player1", action)
# Get client-safe view
visible = engine.get_visible_state(game, "player1")
"""
from __future__ import annotations
import uuid
from typing import Any
from pydantic import BaseModel, Field
from app.core.config import RulesConfig
from app.core.enums import GameEndReason, StatusCondition, TurnPhase
from app.core.models.actions import (
Action,
AttachEnergyAction,
AttackAction,
EvolvePokemonAction,
PassAction,
PlayPokemonAction,
PlayTrainerAction,
ResignAction,
RetreatAction,
SelectActiveAction,
SelectPrizeAction,
UseAbilityAction,
)
from app.core.models.card import CardDefinition, CardInstance
from app.core.models.game_state import GameState, PlayerState
from app.core.rng import RandomProvider, create_rng
from app.core.rules_validator import ValidationResult, validate_action
from app.core.turn_manager import TurnManager
from app.core.visibility import VisibleGameState, get_spectator_state, get_visible_state
from app.core.win_conditions import (
WinResult,
apply_win_result,
check_resignation,
check_timeout,
check_win_conditions,
)
class ActionResult(BaseModel):
"""Result of executing a game action.
Attributes:
success: Whether the action was executed successfully.
message: Description of what happened.
win_result: If the action resulted in a win, the WinResult.
state_changes: List of state changes for logging/animation.
"""
success: bool
message: str = ""
win_result: WinResult | None = None
state_changes: list[dict[str, Any]] = Field(default_factory=list)
class GameCreationResult(BaseModel):
"""Result of creating a new game.
Attributes:
success: Whether the game was created successfully.
game: The created GameState (None if failed).
message: Description or error message.
"""
success: bool
game: GameState | None = None
message: str = ""
class GameEngine:
"""Main orchestrator for all game operations.
The GameEngine is the primary public API for the Mantimon TCG game engine.
It coordinates all game components and ensures consistent state management.
All game operations should go through the GameEngine rather than directly
manipulating GameState or calling component modules.
Attributes:
rules: The RulesConfig for games created by this engine.
rng: RandomProvider for all random operations.
turn_manager: TurnManager for phase/turn transitions.
Example:
engine = GameEngine()
game = engine.create_game(["p1", "p2"], decks, registry)
result = await engine.execute_action(game, "p1", action)
"""
def __init__(
self,
rules: RulesConfig | None = None,
rng: RandomProvider | None = None,
):
"""Initialize the GameEngine.
Args:
rules: RulesConfig for games. Defaults to standard rules.
rng: RandomProvider for randomness. Defaults to secure RNG.
"""
self.rules = rules or RulesConfig()
self.rng = rng or create_rng()
self.turn_manager = TurnManager()
def create_game(
self,
player_ids: list[str],
decks: dict[str, list[CardInstance]],
card_registry: dict[str, CardDefinition],
energy_decks: dict[str, list[CardInstance]] | None = None,
game_id: str | None = None,
) -> GameCreationResult:
"""Create and initialize a new game.
This method:
1. Validates player count and deck requirements
2. Creates PlayerState for each player
3. Shuffles decks and deals starting hands
4. Sets up prize cards (if using prize card mode)
5. Determines first player randomly
6. Sets initial game phase
Args:
player_ids: List of player IDs (must be exactly 2).
decks: Mapping of player_id -> list of CardInstances for main deck.
card_registry: Mapping of definition_id -> CardDefinition.
energy_decks: Optional mapping of player_id -> energy deck cards.
game_id: Optional game ID. Auto-generated if not provided.
Returns:
GameCreationResult with the created game or error message.
"""
# Validate player count
if len(player_ids) != 2:
return GameCreationResult(
success=False,
message=f"Game requires exactly 2 players, got {len(player_ids)}",
)
# Validate all players have decks
for pid in player_ids:
if pid not in decks:
return GameCreationResult(
success=False,
message=f"No deck provided for player {pid}",
)
# Validate deck sizes
for pid, deck in decks.items():
if len(deck) < self.rules.deck.min_size:
return GameCreationResult(
success=False,
message=f"Player {pid} deck too small: {len(deck)} < {self.rules.deck.min_size}",
)
if len(deck) > self.rules.deck.max_size:
return GameCreationResult(
success=False,
message=f"Player {pid} deck too large: {len(deck)} > {self.rules.deck.max_size}",
)
# Validate at least one basic Pokemon in each deck
for pid, deck in decks.items():
has_basic = False
for card in deck:
card_def = card_registry.get(card.definition_id)
if card_def and card_def.is_basic_pokemon():
has_basic = True
break
if not has_basic:
return GameCreationResult(
success=False,
message=f"Player {pid} deck has no Basic Pokemon",
)
# Generate game ID if not provided
if game_id is None:
game_id = str(uuid.uuid4())
# Create player states
players: dict[str, PlayerState] = {}
for pid in player_ids:
player = PlayerState(player_id=pid)
# Add deck cards
for card in decks[pid]:
player.deck.add(card)
# Shuffle deck
player.deck.shuffle(self.rng)
# Add energy deck if provided
if energy_decks and pid in energy_decks:
for card in energy_decks[pid]:
player.energy_deck.add(card)
player.energy_deck.shuffle(self.rng)
players[pid] = player
# Create game state
game = GameState(
game_id=game_id,
rules=self.rules,
card_registry=card_registry,
players=players,
turn_order=list(player_ids),
phase=TurnPhase.SETUP,
)
# Deal starting hands
hand_size = self.rules.deck.starting_hand_size
for player in game.players.values():
for _ in range(hand_size):
card = player.deck.draw()
if card:
player.hand.add(card)
# Handle mulligan - ensure each player has a Basic Pokemon
# For now, we'll do a simple check and redraw if needed
for player in game.players.values():
self._ensure_basic_in_hand(player, card_registry)
# Set up prizes (if using prize card mode)
if self.rules.prizes.use_prize_cards:
prize_count = self.rules.prizes.count
for player in game.players.values():
for _ in range(prize_count):
card = player.deck.draw()
if card:
player.prizes.add(card)
# Determine first player randomly
first_player = self.rng.choice(player_ids)
game.current_player_id = first_player
game.turn_number = 1
return GameCreationResult(
success=True,
game=game,
message=f"Game created. {first_player} goes first.",
)
def _ensure_basic_in_hand(
self,
player: PlayerState,
card_registry: dict[str, CardDefinition],
) -> None:
"""Ensure the player has at least one Basic Pokemon in hand.
If no Basic Pokemon in hand, shuffle hand into deck and redraw.
Repeat until a Basic is found (mulligan rule).
Args:
player: The player to check.
card_registry: Card definitions for lookup.
"""
max_mulligans = 10 # Prevent infinite loop
mulligans = 0
while mulligans < max_mulligans:
# Check for basic Pokemon in hand
has_basic = False
for card in player.hand.cards:
card_def = card_registry.get(card.definition_id)
if card_def and card_def.is_basic_pokemon():
has_basic = True
break
if has_basic:
return
# Mulligan: shuffle hand into deck and redraw
mulligans += 1
while player.hand.cards:
card = player.hand.draw()
if card:
player.deck.add(card)
player.deck.shuffle(self.rng)
# Redraw
hand_size = 7 # Standard hand size for mulligan
for _ in range(hand_size):
card = player.deck.draw()
if card:
player.hand.add(card)
def validate_action(
self,
game: GameState,
player_id: str,
action: Action,
) -> ValidationResult:
"""Validate whether an action is legal.
This is a thin wrapper around rules_validator.validate_action that
provides the standard engine interface.
Args:
game: Current game state.
player_id: Player attempting the action.
action: The action to validate.
Returns:
ValidationResult indicating if action is valid.
"""
return validate_action(game, player_id, action)
async def execute_action(
self,
game: GameState,
player_id: str,
action: Action,
) -> ActionResult:
"""Validate and execute a player action.
This is the main entry point for action execution. It:
1. Validates the action
2. Executes the action if valid
3. Checks for win conditions
4. Returns the result
Args:
game: Game state to modify.
player_id: Player performing the action.
action: The action to execute.
Returns:
ActionResult with success status and any win result.
"""
# Validate action
validation = self.validate_action(game, player_id, action)
if not validation.valid:
return ActionResult(
success=False,
message=validation.reason or "Invalid action",
)
# Execute based on action type
result = await self._execute_action_internal(game, player_id, action)
# Check win conditions after action
if result.success and result.win_result is None:
win_result = check_win_conditions(game)
if win_result:
apply_win_result(game, win_result)
result.win_result = win_result
# Log action
game.log_action(
{
"player_id": player_id,
"action": action.model_dump(),
"success": result.success,
}
)
return result
async def _execute_action_internal(
self,
game: GameState,
player_id: str,
action: Action,
) -> ActionResult:
"""Execute an action that has already been validated.
Args:
game: Game state to modify.
player_id: Player performing the action.
action: The validated action.
Returns:
ActionResult with execution details.
"""
player = game.players[player_id]
if isinstance(action, PlayPokemonAction):
return self._execute_play_pokemon(game, player, action)
elif isinstance(action, EvolvePokemonAction):
return self._execute_evolve(game, player, action)
elif isinstance(action, AttachEnergyAction):
return self._execute_attach_energy(game, player, action)
elif isinstance(action, PlayTrainerAction):
return await self._execute_play_trainer(game, player, action)
elif isinstance(action, UseAbilityAction):
return await self._execute_use_ability(game, player, action)
elif isinstance(action, AttackAction):
return await self._execute_attack(game, player, action)
elif isinstance(action, RetreatAction):
return self._execute_retreat(game, player, action)
elif isinstance(action, PassAction):
return self._execute_pass(game, player, action)
elif isinstance(action, SelectActiveAction):
return self._execute_select_active(game, player, action)
elif isinstance(action, SelectPrizeAction):
return self._execute_select_prize(game, player, action)
elif isinstance(action, ResignAction):
return self._execute_resign(game, player_id)
else:
return ActionResult(
success=False,
message=f"Unknown action type: {action.type}",
)
def _execute_play_pokemon(
self,
game: GameState,
player: PlayerState,
action: PlayPokemonAction,
) -> ActionResult:
"""Execute playing a Basic Pokemon from hand to bench/active."""
card = player.hand.remove(action.card_instance_id)
if not card:
return ActionResult(success=False, message="Card not in hand")
# If no active, play to active
if not player.has_active_pokemon():
player.active.add(card)
card.turn_played = game.turn_number
return ActionResult(
success=True,
message="Played Pokemon to active",
state_changes=[
{"type": "play_pokemon", "zone": "active", "card_id": action.card_instance_id}
],
)
# Otherwise play to bench
player.bench.add(card)
card.turn_played = game.turn_number
return ActionResult(
success=True,
message="Played Pokemon to bench",
state_changes=[
{"type": "play_pokemon", "zone": "bench", "card_id": action.card_instance_id}
],
)
def _execute_evolve(
self,
game: GameState,
player: PlayerState,
action: EvolvePokemonAction,
) -> ActionResult:
"""Execute evolving a Pokemon."""
# Get evolution card from hand
evo_card = player.hand.remove(action.evolution_card_id)
if not evo_card:
return ActionResult(success=False, message="Evolution card not in hand")
# Find target Pokemon
target = None
zone = None
if action.target_pokemon_id in player.active:
target = player.active.get(action.target_pokemon_id)
zone = player.active
elif action.target_pokemon_id in player.bench:
target = player.bench.get(action.target_pokemon_id)
zone = player.bench
if not target or not zone:
# Put card back
player.hand.add(evo_card)
return ActionResult(success=False, message="Target Pokemon not found")
# Transfer all attached cards to the evolution (energy, tools stay attached)
evo_card.attached_energy = target.attached_energy
evo_card.attached_tools = target.attached_tools
evo_card.damage = target.damage
# Note: Status conditions are NOT transferred - evolution removes status in Pokemon TCG
evo_card.status_conditions = []
evo_card.hp_modifier = target.hp_modifier
evo_card.damage_modifier = target.damage_modifier
evo_card.retreat_cost_modifier = target.retreat_cost_modifier
# Build evolution stack - previous stages go underneath
# Copy existing stack and add the target (previous evolution) to it
evo_card.cards_underneath = target.cards_underneath.copy()
# Clear target's attached lists since they're now on evo_card
target.attached_energy = []
target.attached_tools = []
target.cards_underneath = []
# Add target to the evolution stack (it goes underneath the new evolution)
evo_card.cards_underneath.append(target)
# Track evolution timing
evo_card.turn_played = game.turn_number
evo_card.turn_evolved = game.turn_number
# Remove old Pokemon from zone and add evolution
zone.remove(action.target_pokemon_id)
zone.add(evo_card)
# Note: Target is NOT discarded - it's now in cards_underneath
return ActionResult(
success=True,
message="Pokemon evolved",
state_changes=[
{
"type": "evolve",
"target": action.target_pokemon_id,
"evolution": action.evolution_card_id,
}
],
)
def _execute_attach_energy(
self,
game: GameState,
player: PlayerState,
action: AttachEnergyAction,
) -> ActionResult:
"""Execute attaching energy to a Pokemon."""
# Get energy card from hand or energy zone
energy_card = None
source = "hand"
if action.from_energy_zone and action.energy_card_id in player.energy_zone:
energy_card = player.energy_zone.remove(action.energy_card_id)
source = "energy_zone"
elif action.energy_card_id in player.hand:
energy_card = player.hand.remove(action.energy_card_id)
source = "hand"
if not energy_card:
return ActionResult(success=False, message="Energy card not found")
# Find target Pokemon
target = None
if action.target_pokemon_id in player.active:
target = player.active.get(action.target_pokemon_id)
elif action.target_pokemon_id in player.bench:
target = player.bench.get(action.target_pokemon_id)
if not target:
# Put card back
if source == "energy_zone":
player.energy_zone.add(energy_card)
else:
player.hand.add(energy_card)
return ActionResult(success=False, message="Target Pokemon not found")
# Attach energy - the CardInstance is stored directly on the Pokemon
# Energy stays attached until the Pokemon is knocked out or an effect removes it
target.attach_energy(energy_card)
player.energy_attachments_this_turn += 1
return ActionResult(
success=True,
message="Energy attached",
state_changes=[
{
"type": "attach_energy",
"target": action.target_pokemon_id,
"energy": action.energy_card_id,
}
],
)
async def _execute_play_trainer(
self,
game: GameState,
player: PlayerState,
action: PlayTrainerAction,
) -> ActionResult:
"""Execute playing a Trainer card."""
card = player.hand.remove(action.card_instance_id)
if not card:
return ActionResult(success=False, message="Card not in hand")
card_def = game.get_card_definition(card.definition_id)
if not card_def:
player.hand.add(card)
return ActionResult(success=False, message="Card definition not found")
# Update per-turn counters
if card_def.trainer_type:
from app.core.enums import TrainerType
if card_def.trainer_type == TrainerType.SUPPORTER:
player.supporters_played_this_turn += 1
elif card_def.trainer_type == TrainerType.STADIUM:
player.stadiums_played_this_turn += 1
# Handle stadium replacement - discard to original owner
if game.stadium_in_play and game.stadium_owner_id:
old_stadium = game.stadium_in_play
owner = game.players.get(game.stadium_owner_id)
if owner:
owner.discard.add(old_stadium)
else:
# Fallback if owner not found (shouldn't happen)
player.discard.add(old_stadium)
# Set new stadium and track owner
game.stadium_in_play = card
game.stadium_owner_id = player.player_id
return ActionResult(
success=True,
message="Stadium played",
state_changes=[{"type": "play_stadium", "card_id": action.card_instance_id}],
)
elif card_def.trainer_type == TrainerType.ITEM:
player.items_played_this_turn += 1
# Execute trainer effect (placeholder - effects would be handled by effect system)
# For now, just discard the card
player.discard.add(card)
return ActionResult(
success=True,
message="Trainer card played",
state_changes=[{"type": "play_trainer", "card_id": action.card_instance_id}],
)
async def _execute_use_ability(
self,
game: GameState,
player: PlayerState,
action: UseAbilityAction,
) -> ActionResult:
"""Execute using a Pokemon's ability."""
# Find Pokemon with ability
pokemon = None
if action.pokemon_id in player.active:
pokemon = player.active.get(action.pokemon_id)
elif action.pokemon_id in player.bench:
pokemon = player.bench.get(action.pokemon_id)
if not pokemon:
return ActionResult(success=False, message="Pokemon not found")
card_def = game.get_card_definition(pokemon.definition_id)
if not card_def or not card_def.abilities:
return ActionResult(success=False, message="Pokemon has no abilities")
if action.ability_index >= len(card_def.abilities):
return ActionResult(success=False, message="Invalid ability index")
ability = card_def.abilities[action.ability_index]
# Mark ability as used (increment counter for this specific ability)
pokemon.increment_ability_uses(action.ability_index)
# Execute ability effect (placeholder - would use effect system)
return ActionResult(
success=True,
message=f"Used ability: {ability.name}",
state_changes=[
{"type": "use_ability", "pokemon": action.pokemon_id, "ability": ability.name}
],
)
async def _execute_attack(
self,
game: GameState,
player: PlayerState,
action: AttackAction,
) -> ActionResult:
"""Execute an attack.
Handles confusion status: confused Pokemon must flip a coin before attacking.
On tails, the attack fails and the Pokemon damages itself.
"""
active = player.get_active_pokemon()
if not active:
return ActionResult(success=False, message="No active Pokemon")
card_def = game.get_card_definition(active.definition_id)
if not card_def or not card_def.attacks:
return ActionResult(success=False, message="Pokemon has no attacks")
if action.attack_index >= len(card_def.attacks):
return ActionResult(success=False, message="Invalid attack index")
attack = card_def.attacks[action.attack_index]
# Handle confusion: flip coin, tails = attack fails + self-damage
if StatusCondition.CONFUSED in active.status_conditions:
confusion_flip = self.rng.coin_flip()
if not confusion_flip:
# Tails - attack fails, Pokemon damages itself
self_damage = game.rules.status.confusion_self_damage
active.damage += self_damage
# Check if attacker knocked itself out from confusion
win_result = None
if card_def.hp and active.is_knocked_out(card_def.hp):
opponent_id = game.get_opponent_id(player.player_id)
ko_result = self.turn_manager.process_knockout(
game, active.instance_id, opponent_id, self.rng
)
if ko_result:
win_result = ko_result
# Advance to END phase (attack was attempted even though it failed)
self.turn_manager.advance_to_end(game)
return ActionResult(
success=True, # Action succeeded (coin was flipped), but attack failed
message=f"Confused! Flipped tails - attack failed, {self_damage} self-damage",
win_result=win_result,
state_changes=[
{
"type": "confusion_flip",
"result": "tails",
"self_damage": self_damage,
}
],
)
# Heads - attack proceeds normally (fall through)
# Get opponent's active Pokemon
opponent_id = game.get_opponent_id(player.player_id)
opponent = game.players[opponent_id]
defender = opponent.get_active_pokemon()
if not defender:
return ActionResult(success=False, message="Opponent has no active Pokemon")
# Calculate and apply damage (simplified)
# TODO: EFFECT EXECUTION - When attack effects are implemented, they should be
# executed here BEFORE knockout detection. Effects like coin_flip_damage and
# bench_damage will deal additional damage. After ALL effects resolve, iterate
# through all damaged Pokemon (defender, benched Pokemon, even attacker from
# recoil) and call process_knockout() for each KO'd Pokemon.
#
# The damage handlers (deal_damage, attack_damage) set details["knockout"]=True
# when damage KOs a target - use this to identify which Pokemon need knockout
# processing without re-checking every Pokemon.
#
# See: app/core/effects/handlers.py for knockout flag pattern
# See: SYSTEM_REVIEW.md Issue #13 for context
base_damage = attack.damage or 0
defender.damage += base_damage
# Check for knockout
win_result = None
defender_def = game.get_card_definition(defender.definition_id)
if defender_def and defender_def.hp and defender.is_knocked_out(defender_def.hp):
ko_result = self.turn_manager.process_knockout(
game, defender.instance_id, player.player_id, self.rng
)
if ko_result:
win_result = ko_result
# Advance to END phase after attack
self.turn_manager.advance_to_end(game)
# Build message - include confusion heads if applicable
message = f"Attack: {attack.name} dealt {base_damage} damage"
state_changes: list[dict[str, Any]] = [
{"type": "attack", "name": attack.name, "damage": base_damage}
]
if StatusCondition.CONFUSED in active.status_conditions:
message = f"Confused - flipped heads! {message}"
state_changes.insert(0, {"type": "confusion_flip", "result": "heads"})
return ActionResult(
success=True,
message=message,
win_result=win_result,
state_changes=state_changes,
)
def _execute_retreat(
self,
game: GameState,
player: PlayerState,
action: RetreatAction,
) -> ActionResult:
"""Execute retreating the active Pokemon."""
active = player.get_active_pokemon()
if not active:
return ActionResult(success=False, message="No active Pokemon")
# Find new active on bench
new_active = player.bench.remove(action.new_active_id)
if not new_active:
return ActionResult(success=False, message="New active not found on bench")
# Discard energy for retreat cost (simplified - assume cost already validated)
for energy_id in action.energy_to_discard:
energy = active.detach_energy(energy_id)
if energy:
player.discard.add(energy)
# Swap positions
player.active.remove(active.instance_id)
player.bench.add(active)
player.active.add(new_active)
# Clear status conditions (retreat clears confusion)
from app.core.enums import StatusCondition
active.remove_status(StatusCondition.CONFUSED)
player.retreats_this_turn += 1
return ActionResult(
success=True,
message="Retreated",
state_changes=[
{
"type": "retreat",
"old_active": active.instance_id,
"new_active": action.new_active_id,
}
],
)
def _execute_pass(
self,
game: GameState,
player: PlayerState,
action: PassAction,
) -> ActionResult:
"""Execute passing (ending turn without attacking)."""
# Advance to END phase if in MAIN
if game.phase == TurnPhase.MAIN:
self.turn_manager.skip_attack(game)
return ActionResult(
success=True,
message="Turn passed",
state_changes=[{"type": "pass"}],
)
def _execute_select_active(
self,
game: GameState,
player: PlayerState,
action: SelectActiveAction,
) -> ActionResult:
"""Execute selecting a new active Pokemon (forced action)."""
# Move selected Pokemon from bench to active
new_active = player.bench.remove(action.pokemon_id)
if not new_active:
return ActionResult(success=False, message="Pokemon not found on bench")
player.active.add(new_active)
# Pop the completed forced action from the queue
game.pop_forced_action()
return ActionResult(
success=True,
message="New active Pokemon selected",
state_changes=[{"type": "select_active", "card_id": action.pokemon_id}],
)
def _execute_select_prize(
self,
game: GameState,
player: PlayerState,
action: SelectPrizeAction,
) -> ActionResult:
"""Execute selecting a prize card after a knockout.
In prize card mode, when a player knocks out an opponent's Pokemon,
they take prize cards. This method handles the selection.
Args:
game: Game state to modify.
player: Player taking the prize.
action: The SelectPrizeAction with the prize index.
Returns:
ActionResult indicating success and any win condition.
"""
# Remove prize card at the specified index and add to hand
if action.prize_index < 0 or action.prize_index >= len(player.prizes):
return ActionResult(
success=False,
message=f"Invalid prize index: {action.prize_index}",
)
prize_card = player.prizes.cards.pop(action.prize_index)
player.hand.add(prize_card)
# Handle multi-prize selection (e.g., for EX/VMAX knockouts)
win_result = None
current_forced = game.get_current_forced_action()
if current_forced and current_forced.action_type == "select_prize":
remaining = current_forced.params.get("count", 1) - 1
if remaining > 0 and len(player.prizes) > 0:
# More prizes to take - update the current action
current_forced.params["count"] = remaining
current_forced.reason = f"Select {remaining} more prize card(s)"
else:
# Done taking prizes - pop this action from queue
game.pop_forced_action()
# Check for win by taking all prizes
if len(player.prizes) == 0:
opponent_id = game.get_opponent_id(player.player_id)
win_result = WinResult(
winner_id=player.player_id,
loser_id=opponent_id,
end_reason=GameEndReason.PRIZES_TAKEN,
reason=f"Player {player.player_id} took all prize cards",
)
apply_win_result(game, win_result)
return ActionResult(
success=True,
message="Took prize card",
win_result=win_result,
state_changes=[
{
"type": "select_prize",
"prize_index": action.prize_index,
"card_id": prize_card.instance_id,
}
],
)
def _execute_resign(
self,
game: GameState,
player_id: str,
) -> ActionResult:
"""Execute player resignation."""
win_result = check_resignation(game, player_id)
apply_win_result(game, win_result)
return ActionResult(
success=True,
message=f"Player {player_id} resigned",
win_result=win_result,
state_changes=[{"type": "resign", "player_id": player_id}],
)
def start_turn(self, game: GameState) -> ActionResult:
"""Start the current player's turn.
This handles:
- Checking turn limit (game ends if exceeded)
- Resetting per-turn counters
- Drawing a card (if allowed)
- Flipping energy from energy deck (if enabled)
- Advancing to MAIN phase
Args:
game: Game state to modify.
Returns:
ActionResult with turn start details.
"""
# Check turn limit BEFORE starting turn
turn_limit_result = self.turn_manager.check_turn_limit(game)
if turn_limit_result:
apply_win_result(game, turn_limit_result)
return ActionResult(
success=False,
message=f"Turn limit reached - {turn_limit_result.reason}",
win_result=turn_limit_result,
)
result = self.turn_manager.start_turn(game, self.rng)
if result.win_result:
apply_win_result(game, result.win_result)
return ActionResult(
success=False,
message=result.message,
win_result=result.win_result,
)
return ActionResult(
success=result.success,
message=result.message,
)
def end_turn(self, game: GameState) -> ActionResult:
"""End the current player's turn.
This handles:
- Applying between-turn status effects
- Checking for knockouts
- Advancing to next player's turn
Args:
game: Game state to modify.
Returns:
ActionResult with turn end details.
"""
result = self.turn_manager.end_turn(game, self.rng)
if result.win_result:
apply_win_result(game, result.win_result)
return ActionResult(
success=result.success,
message=result.message,
win_result=result.win_result,
)
def get_visible_state(
self,
game: GameState,
player_id: str,
) -> VisibleGameState:
"""Get a visibility-filtered game state for a player.
Args:
game: Full game state.
player_id: Player requesting the view.
Returns:
VisibleGameState safe to send to the client.
"""
return get_visible_state(game, player_id)
def get_spectator_state(self, game: GameState) -> VisibleGameState:
"""Get a visibility-filtered game state for spectators.
Args:
game: Full game state.
Returns:
VisibleGameState safe for spectators (no hands visible).
"""
return get_spectator_state(game)
def handle_timeout(self, game: GameState, player_id: str) -> ActionResult:
"""Handle a player timeout.
Args:
game: Game state.
player_id: Player who timed out.
Returns:
ActionResult with timeout result.
"""
win_result = check_timeout(game, player_id)
apply_win_result(game, win_result)
return ActionResult(
success=True,
message=f"Player {player_id} timed out",
win_result=win_result,
)