Issue #2 gap: Added 14 CardDefinition validation tests covering all required field checks (hp, stage, pokemon_type, evolves_from, trainer_type, energy_type) with both negative and positive test cases. Issue #7 gap: Added 4 confusion attack engine tests covering heads/tails outcomes, self-damage, self-KO with opponent scoring, and configurable damage from RulesConfig. Issue #13 documentation: Added TODO comments in engine.py and handlers.py documenting the expected pattern for knockout detection when effect execution is implemented. Effect handlers set knockout flags; engine should process knockouts after all effects resolve. 825 tests passing (+17 new tests)
1091 lines
37 KiB
Python
1091 lines
37 KiB
Python
"""Main GameEngine orchestrator for the Mantimon TCG game engine.
|
|
|
|
This module provides the GameEngine class, which is the primary public API
|
|
for all game operations. It orchestrates:
|
|
- Game creation and initialization
|
|
- Action validation and execution
|
|
- Turn and phase management
|
|
- Win condition checking
|
|
- Visibility filtering for clients
|
|
|
|
The GameEngine is designed to be the single entry point for game operations,
|
|
ensuring all actions are properly validated and state is consistently updated.
|
|
|
|
Usage:
|
|
from app.core.engine import GameEngine
|
|
from app.core.config import RulesConfig
|
|
from app.core.rng import create_rng
|
|
|
|
# Create engine with rules and RNG
|
|
engine = GameEngine(rules=RulesConfig(), rng=create_rng())
|
|
|
|
# Create a new game
|
|
game = engine.create_game(
|
|
player_ids=["player1", "player2"],
|
|
decks={"player1": deck1, "player2": deck2},
|
|
card_registry=registry,
|
|
)
|
|
|
|
# Execute an action
|
|
result = await engine.execute_action(game, "player1", action)
|
|
|
|
# Get client-safe view
|
|
visible = engine.get_visible_state(game, "player1")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.core.config import RulesConfig
|
|
from app.core.models.actions import (
|
|
Action,
|
|
AttachEnergyAction,
|
|
AttackAction,
|
|
EvolvePokemonAction,
|
|
PassAction,
|
|
PlayPokemonAction,
|
|
PlayTrainerAction,
|
|
ResignAction,
|
|
RetreatAction,
|
|
SelectActiveAction,
|
|
SelectPrizeAction,
|
|
UseAbilityAction,
|
|
)
|
|
from app.core.models.card import CardDefinition, CardInstance
|
|
from app.core.models.enums import GameEndReason, StatusCondition, TurnPhase
|
|
from app.core.models.game_state import GameState, PlayerState
|
|
from app.core.rng import RandomProvider, create_rng
|
|
from app.core.rules_validator import ValidationResult, validate_action
|
|
from app.core.turn_manager import TurnManager
|
|
from app.core.visibility import VisibleGameState, get_spectator_state, get_visible_state
|
|
from app.core.win_conditions import (
|
|
WinResult,
|
|
apply_win_result,
|
|
check_resignation,
|
|
check_timeout,
|
|
check_win_conditions,
|
|
)
|
|
|
|
|
|
class ActionResult(BaseModel):
|
|
"""Result of executing a game action.
|
|
|
|
Attributes:
|
|
success: Whether the action was executed successfully.
|
|
message: Description of what happened.
|
|
win_result: If the action resulted in a win, the WinResult.
|
|
state_changes: List of state changes for logging/animation.
|
|
"""
|
|
|
|
success: bool
|
|
message: str = ""
|
|
win_result: WinResult | None = None
|
|
state_changes: list[dict[str, Any]] = Field(default_factory=list)
|
|
|
|
|
|
class GameCreationResult(BaseModel):
|
|
"""Result of creating a new game.
|
|
|
|
Attributes:
|
|
success: Whether the game was created successfully.
|
|
game: The created GameState (None if failed).
|
|
message: Description or error message.
|
|
"""
|
|
|
|
success: bool
|
|
game: GameState | None = None
|
|
message: str = ""
|
|
|
|
|
|
class GameEngine:
|
|
"""Main orchestrator for all game operations.
|
|
|
|
The GameEngine is the primary public API for the Mantimon TCG game engine.
|
|
It coordinates all game components and ensures consistent state management.
|
|
|
|
All game operations should go through the GameEngine rather than directly
|
|
manipulating GameState or calling component modules.
|
|
|
|
Attributes:
|
|
rules: The RulesConfig for games created by this engine.
|
|
rng: RandomProvider for all random operations.
|
|
turn_manager: TurnManager for phase/turn transitions.
|
|
|
|
Example:
|
|
engine = GameEngine()
|
|
game = engine.create_game(["p1", "p2"], decks, registry)
|
|
result = await engine.execute_action(game, "p1", action)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
rules: RulesConfig | None = None,
|
|
rng: RandomProvider | None = None,
|
|
):
|
|
"""Initialize the GameEngine.
|
|
|
|
Args:
|
|
rules: RulesConfig for games. Defaults to standard rules.
|
|
rng: RandomProvider for randomness. Defaults to secure RNG.
|
|
"""
|
|
self.rules = rules or RulesConfig()
|
|
self.rng = rng or create_rng()
|
|
self.turn_manager = TurnManager()
|
|
|
|
def create_game(
|
|
self,
|
|
player_ids: list[str],
|
|
decks: dict[str, list[CardInstance]],
|
|
card_registry: dict[str, CardDefinition],
|
|
energy_decks: dict[str, list[CardInstance]] | None = None,
|
|
game_id: str | None = None,
|
|
) -> GameCreationResult:
|
|
"""Create and initialize a new game.
|
|
|
|
This method:
|
|
1. Validates player count and deck requirements
|
|
2. Creates PlayerState for each player
|
|
3. Shuffles decks and deals starting hands
|
|
4. Sets up prize cards (if using prize card mode)
|
|
5. Determines first player randomly
|
|
6. Sets initial game phase
|
|
|
|
Args:
|
|
player_ids: List of player IDs (must be exactly 2).
|
|
decks: Mapping of player_id -> list of CardInstances for main deck.
|
|
card_registry: Mapping of definition_id -> CardDefinition.
|
|
energy_decks: Optional mapping of player_id -> energy deck cards.
|
|
game_id: Optional game ID. Auto-generated if not provided.
|
|
|
|
Returns:
|
|
GameCreationResult with the created game or error message.
|
|
"""
|
|
# Validate player count
|
|
if len(player_ids) != 2:
|
|
return GameCreationResult(
|
|
success=False,
|
|
message=f"Game requires exactly 2 players, got {len(player_ids)}",
|
|
)
|
|
|
|
# Validate all players have decks
|
|
for pid in player_ids:
|
|
if pid not in decks:
|
|
return GameCreationResult(
|
|
success=False,
|
|
message=f"No deck provided for player {pid}",
|
|
)
|
|
|
|
# Validate deck sizes
|
|
for pid, deck in decks.items():
|
|
if len(deck) < self.rules.deck.min_size:
|
|
return GameCreationResult(
|
|
success=False,
|
|
message=f"Player {pid} deck too small: {len(deck)} < {self.rules.deck.min_size}",
|
|
)
|
|
if len(deck) > self.rules.deck.max_size:
|
|
return GameCreationResult(
|
|
success=False,
|
|
message=f"Player {pid} deck too large: {len(deck)} > {self.rules.deck.max_size}",
|
|
)
|
|
|
|
# Validate at least one basic Pokemon in each deck
|
|
for pid, deck in decks.items():
|
|
has_basic = False
|
|
for card in deck:
|
|
card_def = card_registry.get(card.definition_id)
|
|
if card_def and card_def.is_basic_pokemon():
|
|
has_basic = True
|
|
break
|
|
if not has_basic:
|
|
return GameCreationResult(
|
|
success=False,
|
|
message=f"Player {pid} deck has no Basic Pokemon",
|
|
)
|
|
|
|
# Generate game ID if not provided
|
|
if game_id is None:
|
|
game_id = str(uuid.uuid4())
|
|
|
|
# Create player states
|
|
players: dict[str, PlayerState] = {}
|
|
for pid in player_ids:
|
|
player = PlayerState(player_id=pid)
|
|
|
|
# Add deck cards
|
|
for card in decks[pid]:
|
|
player.deck.add(card)
|
|
|
|
# Shuffle deck
|
|
player.deck.shuffle(self.rng)
|
|
|
|
# Add energy deck if provided
|
|
if energy_decks and pid in energy_decks:
|
|
for card in energy_decks[pid]:
|
|
player.energy_deck.add(card)
|
|
player.energy_deck.shuffle(self.rng)
|
|
|
|
players[pid] = player
|
|
|
|
# Create game state
|
|
game = GameState(
|
|
game_id=game_id,
|
|
rules=self.rules,
|
|
card_registry=card_registry,
|
|
players=players,
|
|
turn_order=list(player_ids),
|
|
phase=TurnPhase.SETUP,
|
|
)
|
|
|
|
# Deal starting hands
|
|
hand_size = self.rules.deck.starting_hand_size
|
|
for player in game.players.values():
|
|
for _ in range(hand_size):
|
|
card = player.deck.draw()
|
|
if card:
|
|
player.hand.add(card)
|
|
|
|
# Handle mulligan - ensure each player has a Basic Pokemon
|
|
# For now, we'll do a simple check and redraw if needed
|
|
for player in game.players.values():
|
|
self._ensure_basic_in_hand(player, card_registry)
|
|
|
|
# Set up prizes (if using prize card mode)
|
|
if self.rules.prizes.use_prize_cards:
|
|
prize_count = self.rules.prizes.count
|
|
for player in game.players.values():
|
|
for _ in range(prize_count):
|
|
card = player.deck.draw()
|
|
if card:
|
|
player.prizes.add(card)
|
|
|
|
# Determine first player randomly
|
|
first_player = self.rng.choice(player_ids)
|
|
game.current_player_id = first_player
|
|
game.turn_number = 1
|
|
|
|
return GameCreationResult(
|
|
success=True,
|
|
game=game,
|
|
message=f"Game created. {first_player} goes first.",
|
|
)
|
|
|
|
def _ensure_basic_in_hand(
|
|
self,
|
|
player: PlayerState,
|
|
card_registry: dict[str, CardDefinition],
|
|
) -> None:
|
|
"""Ensure the player has at least one Basic Pokemon in hand.
|
|
|
|
If no Basic Pokemon in hand, shuffle hand into deck and redraw.
|
|
Repeat until a Basic is found (mulligan rule).
|
|
|
|
Args:
|
|
player: The player to check.
|
|
card_registry: Card definitions for lookup.
|
|
"""
|
|
max_mulligans = 10 # Prevent infinite loop
|
|
mulligans = 0
|
|
|
|
while mulligans < max_mulligans:
|
|
# Check for basic Pokemon in hand
|
|
has_basic = False
|
|
for card in player.hand.cards:
|
|
card_def = card_registry.get(card.definition_id)
|
|
if card_def and card_def.is_basic_pokemon():
|
|
has_basic = True
|
|
break
|
|
|
|
if has_basic:
|
|
return
|
|
|
|
# Mulligan: shuffle hand into deck and redraw
|
|
mulligans += 1
|
|
while player.hand.cards:
|
|
card = player.hand.draw()
|
|
if card:
|
|
player.deck.add(card)
|
|
|
|
player.deck.shuffle(self.rng)
|
|
|
|
# Redraw
|
|
hand_size = 7 # Standard hand size for mulligan
|
|
for _ in range(hand_size):
|
|
card = player.deck.draw()
|
|
if card:
|
|
player.hand.add(card)
|
|
|
|
def validate_action(
|
|
self,
|
|
game: GameState,
|
|
player_id: str,
|
|
action: Action,
|
|
) -> ValidationResult:
|
|
"""Validate whether an action is legal.
|
|
|
|
This is a thin wrapper around rules_validator.validate_action that
|
|
provides the standard engine interface.
|
|
|
|
Args:
|
|
game: Current game state.
|
|
player_id: Player attempting the action.
|
|
action: The action to validate.
|
|
|
|
Returns:
|
|
ValidationResult indicating if action is valid.
|
|
"""
|
|
return validate_action(game, player_id, action)
|
|
|
|
async def execute_action(
|
|
self,
|
|
game: GameState,
|
|
player_id: str,
|
|
action: Action,
|
|
) -> ActionResult:
|
|
"""Validate and execute a player action.
|
|
|
|
This is the main entry point for action execution. It:
|
|
1. Validates the action
|
|
2. Executes the action if valid
|
|
3. Checks for win conditions
|
|
4. Returns the result
|
|
|
|
Args:
|
|
game: Game state to modify.
|
|
player_id: Player performing the action.
|
|
action: The action to execute.
|
|
|
|
Returns:
|
|
ActionResult with success status and any win result.
|
|
"""
|
|
# Validate action
|
|
validation = self.validate_action(game, player_id, action)
|
|
if not validation.valid:
|
|
return ActionResult(
|
|
success=False,
|
|
message=validation.reason or "Invalid action",
|
|
)
|
|
|
|
# Execute based on action type
|
|
result = await self._execute_action_internal(game, player_id, action)
|
|
|
|
# Check win conditions after action
|
|
if result.success and result.win_result is None:
|
|
win_result = check_win_conditions(game)
|
|
if win_result:
|
|
apply_win_result(game, win_result)
|
|
result.win_result = win_result
|
|
|
|
# Log action
|
|
game.log_action(
|
|
{
|
|
"player_id": player_id,
|
|
"action": action.model_dump(),
|
|
"success": result.success,
|
|
}
|
|
)
|
|
|
|
return result
|
|
|
|
async def _execute_action_internal(
|
|
self,
|
|
game: GameState,
|
|
player_id: str,
|
|
action: Action,
|
|
) -> ActionResult:
|
|
"""Execute an action that has already been validated.
|
|
|
|
Args:
|
|
game: Game state to modify.
|
|
player_id: Player performing the action.
|
|
action: The validated action.
|
|
|
|
Returns:
|
|
ActionResult with execution details.
|
|
"""
|
|
player = game.players[player_id]
|
|
|
|
if isinstance(action, PlayPokemonAction):
|
|
return self._execute_play_pokemon(game, player, action)
|
|
|
|
elif isinstance(action, EvolvePokemonAction):
|
|
return self._execute_evolve(game, player, action)
|
|
|
|
elif isinstance(action, AttachEnergyAction):
|
|
return self._execute_attach_energy(game, player, action)
|
|
|
|
elif isinstance(action, PlayTrainerAction):
|
|
return await self._execute_play_trainer(game, player, action)
|
|
|
|
elif isinstance(action, UseAbilityAction):
|
|
return await self._execute_use_ability(game, player, action)
|
|
|
|
elif isinstance(action, AttackAction):
|
|
return await self._execute_attack(game, player, action)
|
|
|
|
elif isinstance(action, RetreatAction):
|
|
return self._execute_retreat(game, player, action)
|
|
|
|
elif isinstance(action, PassAction):
|
|
return self._execute_pass(game, player, action)
|
|
|
|
elif isinstance(action, SelectActiveAction):
|
|
return self._execute_select_active(game, player, action)
|
|
|
|
elif isinstance(action, SelectPrizeAction):
|
|
return self._execute_select_prize(game, player, action)
|
|
|
|
elif isinstance(action, ResignAction):
|
|
return self._execute_resign(game, player_id)
|
|
|
|
else:
|
|
return ActionResult(
|
|
success=False,
|
|
message=f"Unknown action type: {action.type}",
|
|
)
|
|
|
|
def _execute_play_pokemon(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: PlayPokemonAction,
|
|
) -> ActionResult:
|
|
"""Execute playing a Basic Pokemon from hand to bench/active."""
|
|
card = player.hand.remove(action.card_instance_id)
|
|
if not card:
|
|
return ActionResult(success=False, message="Card not in hand")
|
|
|
|
# If no active, play to active
|
|
if not player.has_active_pokemon():
|
|
player.active.add(card)
|
|
card.turn_played = game.turn_number
|
|
return ActionResult(
|
|
success=True,
|
|
message="Played Pokemon to active",
|
|
state_changes=[
|
|
{"type": "play_pokemon", "zone": "active", "card_id": action.card_instance_id}
|
|
],
|
|
)
|
|
|
|
# Otherwise play to bench
|
|
player.bench.add(card)
|
|
card.turn_played = game.turn_number
|
|
return ActionResult(
|
|
success=True,
|
|
message="Played Pokemon to bench",
|
|
state_changes=[
|
|
{"type": "play_pokemon", "zone": "bench", "card_id": action.card_instance_id}
|
|
],
|
|
)
|
|
|
|
def _execute_evolve(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: EvolvePokemonAction,
|
|
) -> ActionResult:
|
|
"""Execute evolving a Pokemon."""
|
|
# Get evolution card from hand
|
|
evo_card = player.hand.remove(action.evolution_card_id)
|
|
if not evo_card:
|
|
return ActionResult(success=False, message="Evolution card not in hand")
|
|
|
|
# Find target Pokemon
|
|
target = None
|
|
zone = None
|
|
if action.target_pokemon_id in player.active:
|
|
target = player.active.get(action.target_pokemon_id)
|
|
zone = player.active
|
|
elif action.target_pokemon_id in player.bench:
|
|
target = player.bench.get(action.target_pokemon_id)
|
|
zone = player.bench
|
|
|
|
if not target or not zone:
|
|
# Put card back
|
|
player.hand.add(evo_card)
|
|
return ActionResult(success=False, message="Target Pokemon not found")
|
|
|
|
# Transfer all attached cards to the evolution (energy, tools stay attached)
|
|
evo_card.attached_energy = target.attached_energy
|
|
evo_card.attached_tools = target.attached_tools
|
|
evo_card.damage = target.damage
|
|
# Note: Status conditions are NOT transferred - evolution removes status in Pokemon TCG
|
|
evo_card.status_conditions = []
|
|
evo_card.hp_modifier = target.hp_modifier
|
|
evo_card.damage_modifier = target.damage_modifier
|
|
evo_card.retreat_cost_modifier = target.retreat_cost_modifier
|
|
|
|
# Build evolution stack - previous stages go underneath
|
|
# Copy existing stack and add the target (previous evolution) to it
|
|
evo_card.cards_underneath = target.cards_underneath.copy()
|
|
|
|
# Clear target's attached lists since they're now on evo_card
|
|
target.attached_energy = []
|
|
target.attached_tools = []
|
|
target.cards_underneath = []
|
|
|
|
# Add target to the evolution stack (it goes underneath the new evolution)
|
|
evo_card.cards_underneath.append(target)
|
|
|
|
# Track evolution timing
|
|
evo_card.turn_played = game.turn_number
|
|
evo_card.turn_evolved = game.turn_number
|
|
|
|
# Remove old Pokemon from zone and add evolution
|
|
zone.remove(action.target_pokemon_id)
|
|
zone.add(evo_card)
|
|
|
|
# Note: Target is NOT discarded - it's now in cards_underneath
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Pokemon evolved",
|
|
state_changes=[
|
|
{
|
|
"type": "evolve",
|
|
"target": action.target_pokemon_id,
|
|
"evolution": action.evolution_card_id,
|
|
}
|
|
],
|
|
)
|
|
|
|
def _execute_attach_energy(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: AttachEnergyAction,
|
|
) -> ActionResult:
|
|
"""Execute attaching energy to a Pokemon."""
|
|
# Get energy card from hand or energy zone
|
|
energy_card = None
|
|
source = "hand"
|
|
|
|
if action.from_energy_zone and action.energy_card_id in player.energy_zone:
|
|
energy_card = player.energy_zone.remove(action.energy_card_id)
|
|
source = "energy_zone"
|
|
elif action.energy_card_id in player.hand:
|
|
energy_card = player.hand.remove(action.energy_card_id)
|
|
source = "hand"
|
|
|
|
if not energy_card:
|
|
return ActionResult(success=False, message="Energy card not found")
|
|
|
|
# Find target Pokemon
|
|
target = None
|
|
if action.target_pokemon_id in player.active:
|
|
target = player.active.get(action.target_pokemon_id)
|
|
elif action.target_pokemon_id in player.bench:
|
|
target = player.bench.get(action.target_pokemon_id)
|
|
|
|
if not target:
|
|
# Put card back
|
|
if source == "energy_zone":
|
|
player.energy_zone.add(energy_card)
|
|
else:
|
|
player.hand.add(energy_card)
|
|
return ActionResult(success=False, message="Target Pokemon not found")
|
|
|
|
# Attach energy - the CardInstance is stored directly on the Pokemon
|
|
# Energy stays attached until the Pokemon is knocked out or an effect removes it
|
|
target.attach_energy(energy_card)
|
|
player.energy_attachments_this_turn += 1
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Energy attached",
|
|
state_changes=[
|
|
{
|
|
"type": "attach_energy",
|
|
"target": action.target_pokemon_id,
|
|
"energy": action.energy_card_id,
|
|
}
|
|
],
|
|
)
|
|
|
|
async def _execute_play_trainer(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: PlayTrainerAction,
|
|
) -> ActionResult:
|
|
"""Execute playing a Trainer card."""
|
|
card = player.hand.remove(action.card_instance_id)
|
|
if not card:
|
|
return ActionResult(success=False, message="Card not in hand")
|
|
|
|
card_def = game.get_card_definition(card.definition_id)
|
|
if not card_def:
|
|
player.hand.add(card)
|
|
return ActionResult(success=False, message="Card definition not found")
|
|
|
|
# Update per-turn counters
|
|
if card_def.trainer_type:
|
|
from app.core.models.enums import TrainerType
|
|
|
|
if card_def.trainer_type == TrainerType.SUPPORTER:
|
|
player.supporters_played_this_turn += 1
|
|
elif card_def.trainer_type == TrainerType.STADIUM:
|
|
player.stadiums_played_this_turn += 1
|
|
# Handle stadium replacement - discard to original owner
|
|
if game.stadium_in_play and game.stadium_owner_id:
|
|
old_stadium = game.stadium_in_play
|
|
owner = game.players.get(game.stadium_owner_id)
|
|
if owner:
|
|
owner.discard.add(old_stadium)
|
|
else:
|
|
# Fallback if owner not found (shouldn't happen)
|
|
player.discard.add(old_stadium)
|
|
# Set new stadium and track owner
|
|
game.stadium_in_play = card
|
|
game.stadium_owner_id = player.player_id
|
|
return ActionResult(
|
|
success=True,
|
|
message="Stadium played",
|
|
state_changes=[{"type": "play_stadium", "card_id": action.card_instance_id}],
|
|
)
|
|
elif card_def.trainer_type == TrainerType.ITEM:
|
|
player.items_played_this_turn += 1
|
|
|
|
# Execute trainer effect (placeholder - effects would be handled by effect system)
|
|
# For now, just discard the card
|
|
player.discard.add(card)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Trainer card played",
|
|
state_changes=[{"type": "play_trainer", "card_id": action.card_instance_id}],
|
|
)
|
|
|
|
async def _execute_use_ability(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: UseAbilityAction,
|
|
) -> ActionResult:
|
|
"""Execute using a Pokemon's ability."""
|
|
# Find Pokemon with ability
|
|
pokemon = None
|
|
if action.pokemon_id in player.active:
|
|
pokemon = player.active.get(action.pokemon_id)
|
|
elif action.pokemon_id in player.bench:
|
|
pokemon = player.bench.get(action.pokemon_id)
|
|
|
|
if not pokemon:
|
|
return ActionResult(success=False, message="Pokemon not found")
|
|
|
|
card_def = game.get_card_definition(pokemon.definition_id)
|
|
if not card_def or not card_def.abilities:
|
|
return ActionResult(success=False, message="Pokemon has no abilities")
|
|
|
|
if action.ability_index >= len(card_def.abilities):
|
|
return ActionResult(success=False, message="Invalid ability index")
|
|
|
|
ability = card_def.abilities[action.ability_index]
|
|
|
|
# Mark ability as used (increment counter for this specific ability)
|
|
pokemon.increment_ability_uses(action.ability_index)
|
|
|
|
# Execute ability effect (placeholder - would use effect system)
|
|
return ActionResult(
|
|
success=True,
|
|
message=f"Used ability: {ability.name}",
|
|
state_changes=[
|
|
{"type": "use_ability", "pokemon": action.pokemon_id, "ability": ability.name}
|
|
],
|
|
)
|
|
|
|
async def _execute_attack(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: AttackAction,
|
|
) -> ActionResult:
|
|
"""Execute an attack.
|
|
|
|
Handles confusion status: confused Pokemon must flip a coin before attacking.
|
|
On tails, the attack fails and the Pokemon damages itself.
|
|
"""
|
|
active = player.get_active_pokemon()
|
|
if not active:
|
|
return ActionResult(success=False, message="No active Pokemon")
|
|
|
|
card_def = game.get_card_definition(active.definition_id)
|
|
if not card_def or not card_def.attacks:
|
|
return ActionResult(success=False, message="Pokemon has no attacks")
|
|
|
|
if action.attack_index >= len(card_def.attacks):
|
|
return ActionResult(success=False, message="Invalid attack index")
|
|
|
|
attack = card_def.attacks[action.attack_index]
|
|
|
|
# Handle confusion: flip coin, tails = attack fails + self-damage
|
|
if StatusCondition.CONFUSED in active.status_conditions:
|
|
confusion_flip = self.rng.coin_flip()
|
|
if not confusion_flip:
|
|
# Tails - attack fails, Pokemon damages itself
|
|
self_damage = game.rules.status.confusion_self_damage
|
|
active.damage += self_damage
|
|
|
|
# Check if attacker knocked itself out from confusion
|
|
win_result = None
|
|
if card_def.hp and active.is_knocked_out(card_def.hp):
|
|
opponent_id = game.get_opponent_id(player.player_id)
|
|
ko_result = self.turn_manager.process_knockout(
|
|
game, active.instance_id, opponent_id, self.rng
|
|
)
|
|
if ko_result:
|
|
win_result = ko_result
|
|
|
|
# Advance to END phase (attack was attempted even though it failed)
|
|
self.turn_manager.advance_to_end(game)
|
|
|
|
return ActionResult(
|
|
success=True, # Action succeeded (coin was flipped), but attack failed
|
|
message=f"Confused! Flipped tails - attack failed, {self_damage} self-damage",
|
|
win_result=win_result,
|
|
state_changes=[
|
|
{
|
|
"type": "confusion_flip",
|
|
"result": "tails",
|
|
"self_damage": self_damage,
|
|
}
|
|
],
|
|
)
|
|
# Heads - attack proceeds normally (fall through)
|
|
|
|
# Get opponent's active Pokemon
|
|
opponent_id = game.get_opponent_id(player.player_id)
|
|
opponent = game.players[opponent_id]
|
|
defender = opponent.get_active_pokemon()
|
|
|
|
if not defender:
|
|
return ActionResult(success=False, message="Opponent has no active Pokemon")
|
|
|
|
# Calculate and apply damage (simplified)
|
|
# TODO: EFFECT EXECUTION - When attack effects are implemented, they should be
|
|
# executed here BEFORE knockout detection. Effects like coin_flip_damage and
|
|
# bench_damage will deal additional damage. After ALL effects resolve, iterate
|
|
# through all damaged Pokemon (defender, benched Pokemon, even attacker from
|
|
# recoil) and call process_knockout() for each KO'd Pokemon.
|
|
#
|
|
# The damage handlers (deal_damage, attack_damage) set details["knockout"]=True
|
|
# when damage KOs a target - use this to identify which Pokemon need knockout
|
|
# processing without re-checking every Pokemon.
|
|
#
|
|
# See: app/core/effects/handlers.py for knockout flag pattern
|
|
# See: SYSTEM_REVIEW.md Issue #13 for context
|
|
base_damage = attack.damage or 0
|
|
defender.damage += base_damage
|
|
|
|
# Check for knockout
|
|
win_result = None
|
|
defender_def = game.get_card_definition(defender.definition_id)
|
|
if defender_def and defender_def.hp and defender.is_knocked_out(defender_def.hp):
|
|
ko_result = self.turn_manager.process_knockout(
|
|
game, defender.instance_id, player.player_id, self.rng
|
|
)
|
|
if ko_result:
|
|
win_result = ko_result
|
|
|
|
# Advance to END phase after attack
|
|
self.turn_manager.advance_to_end(game)
|
|
|
|
# Build message - include confusion heads if applicable
|
|
message = f"Attack: {attack.name} dealt {base_damage} damage"
|
|
state_changes: list[dict[str, Any]] = [
|
|
{"type": "attack", "name": attack.name, "damage": base_damage}
|
|
]
|
|
if StatusCondition.CONFUSED in active.status_conditions:
|
|
message = f"Confused - flipped heads! {message}"
|
|
state_changes.insert(0, {"type": "confusion_flip", "result": "heads"})
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message=message,
|
|
win_result=win_result,
|
|
state_changes=state_changes,
|
|
)
|
|
|
|
def _execute_retreat(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: RetreatAction,
|
|
) -> ActionResult:
|
|
"""Execute retreating the active Pokemon."""
|
|
active = player.get_active_pokemon()
|
|
if not active:
|
|
return ActionResult(success=False, message="No active Pokemon")
|
|
|
|
# Find new active on bench
|
|
new_active = player.bench.remove(action.new_active_id)
|
|
if not new_active:
|
|
return ActionResult(success=False, message="New active not found on bench")
|
|
|
|
# Discard energy for retreat cost (simplified - assume cost already validated)
|
|
for energy_id in action.energy_to_discard:
|
|
energy = active.detach_energy(energy_id)
|
|
if energy:
|
|
player.discard.add(energy)
|
|
|
|
# Swap positions
|
|
player.active.remove(active.instance_id)
|
|
player.bench.add(active)
|
|
player.active.add(new_active)
|
|
|
|
# Clear status conditions (retreat clears confusion)
|
|
from app.core.models.enums import StatusCondition
|
|
|
|
active.remove_status(StatusCondition.CONFUSED)
|
|
|
|
player.retreats_this_turn += 1
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Retreated",
|
|
state_changes=[
|
|
{
|
|
"type": "retreat",
|
|
"old_active": active.instance_id,
|
|
"new_active": action.new_active_id,
|
|
}
|
|
],
|
|
)
|
|
|
|
def _execute_pass(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: PassAction,
|
|
) -> ActionResult:
|
|
"""Execute passing (ending turn without attacking)."""
|
|
# Advance to END phase if in MAIN
|
|
if game.phase == TurnPhase.MAIN:
|
|
self.turn_manager.skip_attack(game)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Turn passed",
|
|
state_changes=[{"type": "pass"}],
|
|
)
|
|
|
|
def _execute_select_active(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: SelectActiveAction,
|
|
) -> ActionResult:
|
|
"""Execute selecting a new active Pokemon (forced action)."""
|
|
# Move selected Pokemon from bench to active
|
|
new_active = player.bench.remove(action.pokemon_id)
|
|
if not new_active:
|
|
return ActionResult(success=False, message="Pokemon not found on bench")
|
|
|
|
player.active.add(new_active)
|
|
|
|
# Pop the completed forced action from the queue
|
|
game.pop_forced_action()
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="New active Pokemon selected",
|
|
state_changes=[{"type": "select_active", "card_id": action.pokemon_id}],
|
|
)
|
|
|
|
def _execute_select_prize(
|
|
self,
|
|
game: GameState,
|
|
player: PlayerState,
|
|
action: SelectPrizeAction,
|
|
) -> ActionResult:
|
|
"""Execute selecting a prize card after a knockout.
|
|
|
|
In prize card mode, when a player knocks out an opponent's Pokemon,
|
|
they take prize cards. This method handles the selection.
|
|
|
|
Args:
|
|
game: Game state to modify.
|
|
player: Player taking the prize.
|
|
action: The SelectPrizeAction with the prize index.
|
|
|
|
Returns:
|
|
ActionResult indicating success and any win condition.
|
|
"""
|
|
# Remove prize card at the specified index and add to hand
|
|
if action.prize_index < 0 or action.prize_index >= len(player.prizes):
|
|
return ActionResult(
|
|
success=False,
|
|
message=f"Invalid prize index: {action.prize_index}",
|
|
)
|
|
|
|
prize_card = player.prizes.cards.pop(action.prize_index)
|
|
player.hand.add(prize_card)
|
|
|
|
# Handle multi-prize selection (e.g., for EX/VMAX knockouts)
|
|
win_result = None
|
|
current_forced = game.get_current_forced_action()
|
|
if current_forced and current_forced.action_type == "select_prize":
|
|
remaining = current_forced.params.get("count", 1) - 1
|
|
if remaining > 0 and len(player.prizes) > 0:
|
|
# More prizes to take - update the current action
|
|
current_forced.params["count"] = remaining
|
|
current_forced.reason = f"Select {remaining} more prize card(s)"
|
|
else:
|
|
# Done taking prizes - pop this action from queue
|
|
game.pop_forced_action()
|
|
|
|
# Check for win by taking all prizes
|
|
if len(player.prizes) == 0:
|
|
opponent_id = game.get_opponent_id(player.player_id)
|
|
win_result = WinResult(
|
|
winner_id=player.player_id,
|
|
loser_id=opponent_id,
|
|
end_reason=GameEndReason.PRIZES_TAKEN,
|
|
reason=f"Player {player.player_id} took all prize cards",
|
|
)
|
|
apply_win_result(game, win_result)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message="Took prize card",
|
|
win_result=win_result,
|
|
state_changes=[
|
|
{
|
|
"type": "select_prize",
|
|
"prize_index": action.prize_index,
|
|
"card_id": prize_card.instance_id,
|
|
}
|
|
],
|
|
)
|
|
|
|
def _execute_resign(
|
|
self,
|
|
game: GameState,
|
|
player_id: str,
|
|
) -> ActionResult:
|
|
"""Execute player resignation."""
|
|
win_result = check_resignation(game, player_id)
|
|
apply_win_result(game, win_result)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message=f"Player {player_id} resigned",
|
|
win_result=win_result,
|
|
state_changes=[{"type": "resign", "player_id": player_id}],
|
|
)
|
|
|
|
def start_turn(self, game: GameState) -> ActionResult:
|
|
"""Start the current player's turn.
|
|
|
|
This handles:
|
|
- Checking turn limit (game ends if exceeded)
|
|
- Resetting per-turn counters
|
|
- Drawing a card (if allowed)
|
|
- Flipping energy from energy deck (if enabled)
|
|
- Advancing to MAIN phase
|
|
|
|
Args:
|
|
game: Game state to modify.
|
|
|
|
Returns:
|
|
ActionResult with turn start details.
|
|
"""
|
|
# Check turn limit BEFORE starting turn
|
|
turn_limit_result = self.turn_manager.check_turn_limit(game)
|
|
if turn_limit_result:
|
|
apply_win_result(game, turn_limit_result)
|
|
return ActionResult(
|
|
success=False,
|
|
message=f"Turn limit reached - {turn_limit_result.reason}",
|
|
win_result=turn_limit_result,
|
|
)
|
|
|
|
result = self.turn_manager.start_turn(game, self.rng)
|
|
|
|
if result.win_result:
|
|
apply_win_result(game, result.win_result)
|
|
return ActionResult(
|
|
success=False,
|
|
message=result.message,
|
|
win_result=result.win_result,
|
|
)
|
|
|
|
return ActionResult(
|
|
success=result.success,
|
|
message=result.message,
|
|
)
|
|
|
|
def end_turn(self, game: GameState) -> ActionResult:
|
|
"""End the current player's turn.
|
|
|
|
This handles:
|
|
- Applying between-turn status effects
|
|
- Checking for knockouts
|
|
- Advancing to next player's turn
|
|
|
|
Args:
|
|
game: Game state to modify.
|
|
|
|
Returns:
|
|
ActionResult with turn end details.
|
|
"""
|
|
result = self.turn_manager.end_turn(game, self.rng)
|
|
|
|
if result.win_result:
|
|
apply_win_result(game, result.win_result)
|
|
|
|
return ActionResult(
|
|
success=result.success,
|
|
message=result.message,
|
|
win_result=result.win_result,
|
|
)
|
|
|
|
def get_visible_state(
|
|
self,
|
|
game: GameState,
|
|
player_id: str,
|
|
) -> VisibleGameState:
|
|
"""Get a visibility-filtered game state for a player.
|
|
|
|
Args:
|
|
game: Full game state.
|
|
player_id: Player requesting the view.
|
|
|
|
Returns:
|
|
VisibleGameState safe to send to the client.
|
|
"""
|
|
return get_visible_state(game, player_id)
|
|
|
|
def get_spectator_state(self, game: GameState) -> VisibleGameState:
|
|
"""Get a visibility-filtered game state for spectators.
|
|
|
|
Args:
|
|
game: Full game state.
|
|
|
|
Returns:
|
|
VisibleGameState safe for spectators (no hands visible).
|
|
"""
|
|
return get_spectator_state(game)
|
|
|
|
def handle_timeout(self, game: GameState, player_id: str) -> ActionResult:
|
|
"""Handle a player timeout.
|
|
|
|
Args:
|
|
game: Game state.
|
|
player_id: Player who timed out.
|
|
|
|
Returns:
|
|
ActionResult with timeout result.
|
|
"""
|
|
win_result = check_timeout(game, player_id)
|
|
apply_win_result(game, win_result)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
message=f"Player {player_id} timed out",
|
|
win_result=win_result,
|
|
)
|