voice-server/app/audio_player.py
Cal Corum a34aec06f1 Initial commit: Voice server with Piper TTS
A local HTTP service that accepts text via POST and speaks it through
system speakers using Piper TTS neural voice synthesis.

Features:
- POST /notify - Queue text for TTS playback
- GET /health - Health check with TTS/audio/queue status
- GET /voices - List installed voice models
- Async queue processing (no overlapping audio)
- Non-blocking audio via sounddevice
- 73 tests covering API contract

Tech stack:
- FastAPI + Uvicorn
- Piper TTS (neural voices, offline)
- sounddevice (PortAudio)
- Pydantic for validation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 00:18:12 -06:00

193 lines
5.9 KiB
Python

"""
Audio playback module for voice-server.
Provides non-blocking audio playback using sounddevice.
"""
import asyncio
import logging
import time
from typing import Protocol
import numpy as np
logger = logging.getLogger(__name__)
class AudioPlayer(Protocol):
"""Protocol defining the audio player interface."""
def play(self, audio_data: np.ndarray, sample_rate: int) -> None:
"""Play audio data (non-blocking)."""
...
def is_playing(self) -> bool:
"""Check if audio is currently playing."""
...
def stop(self) -> None:
"""Stop current playback."""
...
async def wait_async(self) -> None:
"""Wait asynchronously for playback to complete."""
...
class SounddevicePlayer:
"""
Audio player implementation using sounddevice.
Provides non-blocking playback with async wait support.
"""
def __init__(self, default_sample_rate: int = 22050, retry_attempts: int = 3):
"""
Initialize the audio player.
Args:
default_sample_rate: Default sample rate if not specified in play()
retry_attempts: Number of retry attempts on playback failure
"""
self.default_sample_rate = default_sample_rate
self.retry_attempts = retry_attempts
self._initialized = False
# Lazy import sounddevice to defer PortAudio initialization
self._sd = None
def _ensure_initialized(self):
"""Ensure sounddevice is imported and initialized."""
if self._sd is None:
try:
import sounddevice as sd
self._sd = sd
self._initialized = True
logger.info("SounddevicePlayer initialized successfully")
except OSError as e:
logger.error(f"Failed to initialize sounddevice: {e}")
raise RuntimeError(f"Audio system unavailable: {e}") from e
def play(self, audio_data: np.ndarray, sample_rate: int | None = None) -> None:
"""
Play audio data (non-blocking).
The audio plays in a background thread. Use is_playing() to check status
or wait_async() to wait for completion.
Args:
audio_data: NumPy array of audio samples (float32 or int16)
sample_rate: Sample rate in Hz (uses default if not specified)
"""
self._ensure_initialized()
if len(audio_data) == 0:
logger.debug("Skipping playback of empty audio")
return
rate = sample_rate or self.default_sample_rate
# Stop any currently playing audio
self.stop()
for attempt in range(self.retry_attempts):
try:
# Play audio - returns immediately, audio plays in background
self._sd.play(audio_data, rate)
logger.debug(f"Started playback: {len(audio_data)} samples at {rate}Hz")
return
except self._sd.PortAudioError as e:
logger.warning(f"Playback attempt {attempt + 1} failed: {e}")
if attempt < self.retry_attempts - 1:
time.sleep(0.5)
else:
raise RuntimeError(f"Audio playback failed after {self.retry_attempts} attempts: {e}")
def is_playing(self) -> bool:
"""Check if audio is currently playing."""
if self._sd is None:
return False
try:
stream = self._sd.get_stream()
return stream is not None and stream.active
except Exception:
return False
def stop(self) -> None:
"""Stop current playback."""
if self._sd is not None:
try:
self._sd.stop()
except Exception as e:
logger.warning(f"Error stopping playback: {e}")
def wait(self) -> None:
"""Block until current playback completes."""
if self._sd is not None:
try:
self._sd.wait()
except Exception as e:
logger.warning(f"Error waiting for playback: {e}")
async def wait_async(self, poll_interval: float = 0.05) -> None:
"""
Wait asynchronously for playback to complete.
Uses polling to avoid blocking the event loop.
Args:
poll_interval: How often to check playback status (seconds)
"""
while self.is_playing():
await asyncio.sleep(poll_interval)
def get_diagnostics(self) -> dict:
"""
Get audio system diagnostics for health checks.
Returns:
Dictionary with audio device information and status
"""
try:
self._ensure_initialized()
devices = self._sd.query_devices()
output_devices = [d for d in devices if d["max_output_channels"] > 0]
if not output_devices:
return {
"status": "unavailable",
"error": "No audio output devices found",
}
default_output = self._sd.query_devices(kind="output")
return {
"status": "available",
"device_count": len(output_devices),
"default_output": default_output["name"],
"default_sample_rate": int(default_output["default_samplerate"]),
}
except Exception as e:
return {
"status": "unavailable",
"error": str(e),
}
def health_check(self) -> dict:
"""
Perform a health check on the audio system.
Returns:
Dictionary with status and any error messages
"""
diagnostics = self.get_diagnostics()
if diagnostics["status"] == "available":
return {"status": "healthy", "details": diagnostics}
else:
return {"status": "unhealthy", "error": diagnostics.get("error", "Unknown error")}