""" TTS Queue Manager for voice-server. Manages an async queue of TTS requests and processes them sequentially. """ import asyncio import logging import time from dataclasses import dataclass, field from typing import Any import numpy as np logger = logging.getLogger(__name__) class QueueFullError(Exception): """Raised when the TTS queue is full.""" pass @dataclass class TTSRequest: """A TTS request in the queue.""" message: str voice: str rate: int voice_enabled: bool urgent: bool = False timestamp: float = field(default_factory=time.time) request_id: str | None = None @dataclass class QueueStats: """Statistics about queue processing.""" processed: int = 0 errors: int = 0 total_audio_seconds: float = 0.0 class TTSQueueManager: """ Manages the TTS request queue and processes requests sequentially. Ensures audio doesn't overlap by processing one request at a time. """ def __init__( self, tts_engine: Any, audio_player: Any, max_size: int = 50, request_timeout: float = 60.0, ): """ Initialize the queue manager. Args: tts_engine: TTS engine instance for synthesis audio_player: Audio player instance for playback max_size: Maximum queue size request_timeout: Timeout for processing each request (seconds) """ self.tts_engine = tts_engine self.audio_player = audio_player self.max_size = max_size self.request_timeout = request_timeout self._queue: asyncio.Queue[TTSRequest] = asyncio.Queue(maxsize=max_size) self._stats = QueueStats() self._running = False self._processor_task: asyncio.Task | None = None async def start(self) -> None: """Start the queue processor background task.""" if self._running: return self._running = True self._processor_task = asyncio.create_task(self._process_queue()) logger.info("TTS queue processor started") async def stop(self) -> None: """Stop the queue processor and wait for current item to complete.""" self._running = False if self._processor_task: # Cancel the task self._processor_task.cancel() try: await self._processor_task except asyncio.CancelledError: pass self._processor_task = None # Stop any playing audio self.audio_player.stop() logger.info("TTS queue processor stopped") async def enqueue(self, request: TTSRequest) -> int: """ Add a TTS request to the queue. Args: request: The TTS request to queue Returns: Queue position (1-indexed) Raises: QueueFullError: If the queue is full """ try: # Use a short timeout to avoid blocking await asyncio.wait_for( self._queue.put(request), timeout=1.0, ) position = self._queue.qsize() logger.debug(f"Enqueued request: {request.message[:50]}... (position={position})") return position except asyncio.TimeoutError: raise QueueFullError(f"TTS queue is full (max_size={self.max_size})") async def _process_queue(self) -> None: """Background task that processes queued requests.""" while self._running: try: # Wait for a request (with timeout to allow checking _running) try: request = await asyncio.wait_for( self._queue.get(), timeout=1.0, ) except asyncio.TimeoutError: continue await self._process_request(request) self._queue.task_done() except asyncio.CancelledError: raise except Exception as e: logger.error(f"Error in queue processor: {e}") self._stats.errors += 1 async def _process_request(self, request: TTSRequest) -> None: """ Process a single TTS request. Args: request: The TTS request to process """ start_time = time.time() try: logger.debug(f"Processing TTS request: {request.message[:50]}...") if not request.voice_enabled: logger.debug("Voice disabled, skipping TTS") self._stats.processed += 1 return # Synthesize audio (run in thread pool to avoid blocking) loop = asyncio.get_event_loop() audio_data = await asyncio.wait_for( loop.run_in_executor( None, self.tts_engine.synthesize_to_float32, request.message, request.voice, ), timeout=self.request_timeout, ) if len(audio_data) == 0: logger.warning("TTS generated empty audio") self._stats.processed += 1 return # Apply volume boost for urgent messages if request.urgent: audio_data = np.clip(audio_data * 1.5, -1.0, 1.0).astype(np.float32) logger.debug("Applied urgent volume boost (1.5x)") # Play audio self.audio_player.play(audio_data, self.tts_engine.get_sample_rate()) # Wait for playback to complete await self.audio_player.wait_async() # Update stats duration = len(audio_data) / self.tts_engine.get_sample_rate() self._stats.processed += 1 self._stats.total_audio_seconds += duration elapsed = time.time() - start_time logger.debug(f"Request processed: {duration:.2f}s audio in {elapsed:.2f}s") except asyncio.TimeoutError: logger.error(f"Request timed out after {self.request_timeout}s") self._stats.errors += 1 except Exception as e: logger.error(f"Error processing request: {e}") self._stats.errors += 1 @property def size(self) -> int: """Get current queue size.""" return self._queue.qsize() @property def capacity(self) -> int: """Get queue capacity.""" return self.max_size @property def utilization(self) -> float: """Get queue utilization percentage.""" if self.max_size == 0: return 0.0 return (self.size / self.max_size) * 100.0 @property def stats(self) -> QueueStats: """Get queue statistics.""" return self._stats def get_status(self) -> dict: """Get queue status for health checks.""" return { "size": self.size, "capacity": self.capacity, "utilization": round(self.utilization, 1), "processed": self._stats.processed, "errors": self._stats.errors, "total_audio_seconds": round(self._stats.total_audio_seconds, 1), "running": self._running, }