"""Redis connection utilities for Mantimon TCG. This module provides async Redis connection management with: - Connection pooling - JSON serialization helpers - Proper cleanup on shutdown Usage: from app.db.redis import get_redis, init_redis, close_redis # Initialize on startup await init_redis() # Use Redis async with get_redis() as redis: await redis.set("key", "value") value = await redis.get("key") # Close on shutdown await close_redis() Key Patterns: - game:{game_id} - Active game state (JSON) - session:{session_id} - User session data - matchmaking:queue - Matchmaking queue """ import json from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from typing import Any from redis.asyncio import ConnectionPool, Redis from app.config import settings # Global Redis pool (initialized by init_redis) _pool: ConnectionPool | None = None async def init_redis() -> ConnectionPool: """Initialize the Redis connection pool. Creates a connection pool configured based on application settings. Should be called once during application startup. Returns: The initialized ConnectionPool instance. Example: @app.on_event("startup") async def startup(): await init_redis() """ global _pool if _pool is not None: return _pool _pool = ConnectionPool.from_url( str(settings.redis_url), max_connections=settings.redis_max_connections, decode_responses=True, # Return strings instead of bytes ) return _pool async def close_redis() -> None: """Close Redis connections and dispose of the pool. Should be called during application shutdown. Example: @app.on_event("shutdown") async def shutdown(): await close_redis() """ global _pool if _pool is not None: await _pool.disconnect() _pool = None def get_pool() -> ConnectionPool: """Get the Redis connection pool. Returns: The initialized ConnectionPool instance. Raises: RuntimeError: If init_redis() has not been called. """ if _pool is None: raise RuntimeError("Redis pool not initialized. Call init_redis() first.") return _pool @asynccontextmanager async def get_redis() -> AsyncGenerator[Redis, None]: """Get a Redis client from the connection pool. Provides a Redis client as an async context manager that automatically returns the connection to the pool. Yields: A Redis client instance. Raises: RuntimeError: If init_redis() has not been called. Example: async with get_redis() as redis: await redis.set("key", "value") value = await redis.get("key") """ pool = get_pool() client = Redis(connection_pool=pool) try: yield client finally: await client.aclose() class RedisHelper: """Helper class for common Redis operations with JSON support. Provides convenience methods for storing and retrieving JSON data, with proper serialization and error handling. Example: helper = RedisHelper() # Store JSON await helper.set_json("game:123", {"state": "active"}) # Retrieve JSON data = await helper.get_json("game:123") # Delete await helper.delete("game:123") """ async def get_json(self, key: str) -> dict[str, Any] | None: """Get and deserialize a JSON value from Redis. Args: key: Redis key to retrieve. Returns: Deserialized JSON data, or None if key doesn't exist. """ async with get_redis() as client: value = await client.get(key) if value is None: return None return json.loads(value) async def set_json( self, key: str, value: dict[str, Any], expire_seconds: int | None = None, ) -> None: """Serialize and store a JSON value in Redis. Args: key: Redis key to store. value: Dictionary to serialize and store. expire_seconds: Optional TTL in seconds. """ async with get_redis() as client: json_str = json.dumps(value, default=str) if expire_seconds: await client.setex(key, expire_seconds, json_str) else: await client.set(key, json_str) async def delete(self, key: str) -> bool: """Delete a key from Redis. Args: key: Redis key to delete. Returns: True if the key was deleted, False if it didn't exist. """ async with get_redis() as client: result = await client.delete(key) return result > 0 async def exists(self, key: str) -> bool: """Check if a key exists in Redis. Args: key: Redis key to check. Returns: True if the key exists, False otherwise. """ async with get_redis() as client: result = await client.exists(key) return result > 0 async def get_keys(self, pattern: str) -> list[str]: """Get all keys matching a pattern. Args: pattern: Redis key pattern (e.g., "game:*"). Returns: List of matching keys. Note: Use sparingly in production as KEYS can be slow on large datasets. Consider SCAN for production use. """ async with get_redis() as client: keys = await client.keys(pattern) return list(keys) async def set_with_ttl( self, key: str, value: str, ttl_seconds: int, ) -> None: """Set a string value with TTL. Args: key: Redis key to store. value: String value to store. ttl_seconds: Time-to-live in seconds. """ async with get_redis() as client: await client.setex(key, ttl_seconds, value) async def increment(self, key: str, amount: int = 1) -> int: """Increment a counter. Args: key: Redis key for the counter. amount: Amount to increment by (default 1). Returns: The new counter value. """ async with get_redis() as client: return await client.incrby(key, amount) # Global helper instance redis_helper = RedisHelper()