Database Infrastructure: - Added Alembic migration system (alembic.ini, env.py) - Migration 001: Initial schema - Migration 004: Stat materialized views (enhanced) - Migration 005: Composite indexes for performance - operations.py: Session injection support for test isolation - session.py: Enhanced session management Application Updates: - main.py: Integration with new database infrastructure - health.py: Enhanced health checks with pool monitoring Integration Tests: - conftest.py: Session injection pattern for reliable tests - test_operations.py: Database operations tests - test_migrations.py: Migration verification tests Session injection pattern enables: - Production: Auto-commit per operation - Testing: Shared session with automatic rollback - Transactions: Multiple ops, single commit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
387 lines
13 KiB
Python
387 lines
13 KiB
Python
import asyncio
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
|
|
import socketio
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app.api.routes import auth, games, health, teams
|
|
from app.config import get_settings
|
|
from app.core.exceptions import (
|
|
DatabaseError,
|
|
GameEngineError,
|
|
GameNotFoundError,
|
|
InvalidGameStateError,
|
|
)
|
|
from app.core.state_manager import state_manager
|
|
from app.database.session import engine, init_db
|
|
from app.middleware.rate_limit import rate_limiter
|
|
from app.monitoring.pool_monitor import init_pool_monitor
|
|
from app.services import redis_client
|
|
from app.utils.logging import setup_logging
|
|
from app.websocket.connection_manager import ConnectionManager
|
|
from app.websocket.handlers import register_handlers
|
|
|
|
logger = logging.getLogger(f"{__name__}.main")
|
|
|
|
# Background task handles
|
|
_eviction_task: asyncio.Task | None = None
|
|
_session_expiration_task: asyncio.Task | None = None
|
|
_rate_limit_cleanup_task: asyncio.Task | None = None
|
|
_pool_monitoring_task: asyncio.Task | None = None
|
|
|
|
|
|
async def periodic_eviction():
|
|
"""
|
|
Background task to periodically evict idle games from memory.
|
|
|
|
Runs on a configurable interval (default 60 minutes) to:
|
|
1. Evict games that have been idle beyond the timeout threshold
|
|
2. Enforce memory limits by evicting oldest games if over limit
|
|
3. Log memory statistics for monitoring
|
|
|
|
This prevents unbounded memory growth from abandoned games.
|
|
"""
|
|
settings = get_settings()
|
|
interval = settings.game_eviction_interval_minutes * 60
|
|
|
|
logger.info(
|
|
f"Starting periodic eviction task (interval: {settings.game_eviction_interval_minutes}m, "
|
|
f"idle timeout: {settings.game_idle_timeout_hours}h, max games: {settings.game_max_in_memory})"
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(interval)
|
|
|
|
# Run idle game eviction
|
|
evicted = await state_manager.evict_idle_games()
|
|
|
|
# Enforce memory limit
|
|
force_evicted = await state_manager.enforce_memory_limit()
|
|
|
|
# Log stats
|
|
stats = state_manager.get_memory_stats()
|
|
logger.info(
|
|
f"Eviction cycle complete: {len(evicted)} idle + {len(force_evicted)} forced. "
|
|
f"Active: {stats['active_games']}/{stats['max_games']}, "
|
|
f"Oldest: {stats['oldest_game_hours']:.1f}h"
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Eviction task cancelled - shutting down")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Eviction task error: {e}", exc_info=True)
|
|
# Continue running despite errors - don't let one failure stop eviction
|
|
|
|
|
|
async def periodic_session_expiration(connection_manager):
|
|
"""
|
|
Background task to periodically expire inactive WebSocket sessions.
|
|
|
|
Runs every 60 seconds to detect and clean up zombie connections
|
|
that weren't properly closed (network failure, browser crash, etc.).
|
|
|
|
This works alongside Socket.io's ping/pong mechanism:
|
|
- Socket.io ping/pong: Detects transport-level disconnections
|
|
- This task: Detects application-level inactivity (no events for extended period)
|
|
"""
|
|
settings = get_settings()
|
|
interval = 60 # Check every minute
|
|
timeout = settings.ws_connection_timeout * 5 # 5x timeout = 5 minutes default
|
|
|
|
logger.info(
|
|
f"Starting session expiration task (interval: {interval}s, timeout: {timeout}s)"
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(interval)
|
|
|
|
# Expire inactive sessions
|
|
expired = await connection_manager.expire_inactive_sessions(timeout)
|
|
|
|
# Log stats periodically
|
|
stats = connection_manager.get_stats()
|
|
if stats["total_sessions"] > 0:
|
|
logger.debug(
|
|
f"Session stats: {stats['total_sessions']} sessions, "
|
|
f"{stats['unique_users']} users, "
|
|
f"{stats['active_game_rooms']} active games"
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Session expiration task cancelled - shutting down")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Session expiration task error: {e}", exc_info=True)
|
|
# Continue running despite errors
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Startup and shutdown events"""
|
|
global _eviction_task, _session_expiration_task, _rate_limit_cleanup_task, _pool_monitoring_task
|
|
settings = get_settings()
|
|
|
|
# Startup
|
|
logger.info("Starting Paper Dynasty Game Backend")
|
|
setup_logging()
|
|
|
|
# Initialize database
|
|
await init_db()
|
|
logger.info("Database initialized")
|
|
|
|
# Initialize pool monitor for connection pool observability
|
|
pool_monitor = init_pool_monitor(engine)
|
|
_pool_monitoring_task = asyncio.create_task(
|
|
pool_monitor.start_monitoring(interval_seconds=60)
|
|
)
|
|
logger.info("Pool monitoring initialized")
|
|
|
|
# Initialize Redis
|
|
try:
|
|
redis_url = settings.redis_url
|
|
await redis_client.connect(redis_url)
|
|
logger.info(f"Redis initialized: {redis_url}")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Redis connection failed: {e}. Position rating caching will be unavailable."
|
|
)
|
|
|
|
# Start background eviction task
|
|
logger.info("Starting background eviction task")
|
|
_eviction_task = asyncio.create_task(periodic_eviction())
|
|
|
|
# Start session expiration task (connection_manager created in module scope below)
|
|
# Note: This task cleans up zombie WebSocket connections
|
|
logger.info("Starting session expiration task")
|
|
_session_expiration_task = asyncio.create_task(
|
|
periodic_session_expiration(connection_manager)
|
|
)
|
|
|
|
# Start rate limiter cleanup task
|
|
# Note: This task cleans up stale rate limit buckets to prevent memory leaks
|
|
logger.info("Starting rate limiter cleanup task")
|
|
_rate_limit_cleanup_task = asyncio.create_task(
|
|
rate_limiter.cleanup_stale_buckets()
|
|
)
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("Shutting down Paper Dynasty Game Backend")
|
|
|
|
# Stop eviction task
|
|
if _eviction_task:
|
|
logger.info("Stopping background eviction task")
|
|
_eviction_task.cancel()
|
|
try:
|
|
await _eviction_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Stop session expiration task
|
|
if _session_expiration_task:
|
|
logger.info("Stopping session expiration task")
|
|
_session_expiration_task.cancel()
|
|
try:
|
|
await _session_expiration_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Stop rate limiter cleanup task
|
|
if _rate_limit_cleanup_task:
|
|
logger.info("Stopping rate limiter cleanup task")
|
|
_rate_limit_cleanup_task.cancel()
|
|
try:
|
|
await _rate_limit_cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Stop pool monitoring task
|
|
if _pool_monitoring_task:
|
|
logger.info("Stopping pool monitoring task")
|
|
_pool_monitoring_task.cancel()
|
|
try:
|
|
await _pool_monitoring_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Disconnect Redis
|
|
if redis_client.is_connected:
|
|
await redis_client.disconnect()
|
|
logger.info("Redis disconnected")
|
|
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(
|
|
title="Paper Dynasty Game Backend",
|
|
description="Real-time baseball game engine for Paper Dynasty leagues",
|
|
version="1.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
|
|
# Global exception handlers for REST API
|
|
@app.exception_handler(GameNotFoundError)
|
|
async def game_not_found_handler(request: Request, exc: GameNotFoundError):
|
|
"""Handle game not found errors with 404 response."""
|
|
logger.warning(f"Game not found: {exc.game_id}")
|
|
return JSONResponse(
|
|
status_code=404,
|
|
content={"detail": str(exc), "error_code": "GAME_NOT_FOUND"},
|
|
)
|
|
|
|
|
|
@app.exception_handler(InvalidGameStateError)
|
|
async def invalid_game_state_handler(request: Request, exc: InvalidGameStateError):
|
|
"""Handle invalid game state errors with 400 response."""
|
|
logger.warning(f"Invalid game state: {exc}")
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={
|
|
"detail": str(exc),
|
|
"error_code": "INVALID_GAME_STATE",
|
|
"current_state": exc.current_state,
|
|
"expected_state": exc.expected_state,
|
|
},
|
|
)
|
|
|
|
|
|
@app.exception_handler(DatabaseError)
|
|
async def database_error_handler(request: Request, exc: DatabaseError):
|
|
"""Handle database errors with 500 response."""
|
|
logger.error(f"Database error during {exc.operation}: {exc.original_error}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"detail": "Database error occurred",
|
|
"error_code": "DATABASE_ERROR",
|
|
"operation": exc.operation,
|
|
},
|
|
)
|
|
|
|
|
|
@app.exception_handler(GameEngineError)
|
|
async def game_engine_error_handler(request: Request, exc: GameEngineError):
|
|
"""Handle generic game engine errors with 400 response."""
|
|
logger.warning(f"Game engine error: {exc}")
|
|
return JSONResponse(
|
|
status_code=400,
|
|
content={"detail": str(exc), "error_code": "GAME_ENGINE_ERROR"},
|
|
)
|
|
|
|
|
|
@app.exception_handler(SQLAlchemyError)
|
|
async def sqlalchemy_error_handler(request: Request, exc: SQLAlchemyError):
|
|
"""Handle SQLAlchemy errors with 500 response."""
|
|
logger.error(f"SQLAlchemy error: {exc}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"detail": "Database error occurred - please retry",
|
|
"error_code": "DATABASE_ERROR",
|
|
},
|
|
)
|
|
|
|
|
|
# CORS middleware
|
|
settings = get_settings()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=settings.cors_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Initialize Socket.io with ping/pong for connection health
|
|
sio = socketio.AsyncServer(
|
|
async_mode="asgi",
|
|
cors_allowed_origins=settings.cors_origins,
|
|
# Ping/pong for detecting dead connections:
|
|
# - Server sends ping every ping_interval seconds
|
|
# - Client must respond with pong within ping_timeout seconds
|
|
# - Total: connection dies after (ping_interval + ping_timeout) of no response
|
|
ping_interval=settings.ws_heartbeat_interval, # Default: 30s
|
|
ping_timeout=settings.ws_connection_timeout, # Default: 60s
|
|
logger=True,
|
|
engineio_logger=False,
|
|
)
|
|
|
|
# Create Socket.io ASGI app
|
|
socket_app = socketio.ASGIApp(sio, app)
|
|
|
|
# Initialize connection manager and register handlers
|
|
connection_manager = ConnectionManager(sio)
|
|
register_handlers(sio, connection_manager)
|
|
|
|
# Configure game engine with connection manager for real-time event emission
|
|
from app.core.game_engine import game_engine
|
|
game_engine.set_connection_manager(connection_manager)
|
|
|
|
# Include API routes
|
|
app.include_router(health.router, prefix="/api", tags=["health"])
|
|
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
|
|
app.include_router(games.router, prefix="/api/games", tags=["games"])
|
|
app.include_router(teams.router, prefix="/api/teams", tags=["teams"])
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "Paper Dynasty Game Backend", "version": "1.0.0"}
|
|
|
|
|
|
@app.get("/api/health/connections", tags=["health"])
|
|
async def connections_health():
|
|
"""
|
|
WebSocket connection statistics for health monitoring.
|
|
|
|
Returns:
|
|
- total_sessions: Number of active WebSocket connections
|
|
- unique_users: Number of unique authenticated users
|
|
- active_game_rooms: Number of games with connected players
|
|
- sessions_per_game: Breakdown of connections per game
|
|
- inactivity_distribution: Sessions grouped by inactivity period
|
|
- status: healthy (<50 sessions), warning (50-100), critical (>100)
|
|
"""
|
|
import pendulum
|
|
|
|
stats = connection_manager.get_stats()
|
|
|
|
# Determine health status based on session count
|
|
# Thresholds can be adjusted based on expected load
|
|
total = stats["total_sessions"]
|
|
if total > 100:
|
|
status = "critical"
|
|
elif total > 50:
|
|
status = "warning"
|
|
else:
|
|
status = "healthy"
|
|
|
|
# Include rate limiter stats
|
|
rate_limit_stats = rate_limiter.get_stats()
|
|
|
|
return {
|
|
"status": status,
|
|
**stats,
|
|
"rate_limiter": rate_limit_stats,
|
|
"ping_interval_seconds": settings.ws_heartbeat_interval,
|
|
"ping_timeout_seconds": settings.ws_connection_timeout,
|
|
"session_expiration_timeout_seconds": settings.ws_connection_timeout * 5,
|
|
"timestamp": pendulum.now("UTC").to_iso8601_string(),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"app.main:socket_app", host="0.0.0.0", port=8000, reload=True, log_level="info"
|
|
)
|