voice-server/app/tts_engine.py
Cal Corum a34aec06f1 Initial commit: Voice server with Piper TTS
A local HTTP service that accepts text via POST and speaks it through
system speakers using Piper TTS neural voice synthesis.

Features:
- POST /notify - Queue text for TTS playback
- GET /health - Health check with TTS/audio/queue status
- GET /voices - List installed voice models
- Async queue processing (no overlapping audio)
- Non-blocking audio via sounddevice
- 73 tests covering API contract

Tech stack:
- FastAPI + Uvicorn
- Piper TTS (neural voices, offline)
- sounddevice (PortAudio)
- Pydantic for validation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 00:18:12 -06:00

288 lines
9.3 KiB
Python

"""
TTS Engine module for voice-server.
Provides text-to-speech synthesis using Piper TTS.
Supports multiple voice models with lazy loading and caching.
"""
import logging
from pathlib import Path
from typing import Protocol
import numpy as np
logger = logging.getLogger(__name__)
class TTSEngine(Protocol):
"""Protocol defining the TTS engine interface."""
def synthesize(self, text: str, voice: str | None = None) -> np.ndarray:
"""Convert text to audio samples."""
...
def get_sample_rate(self) -> int:
"""Get the audio sample rate."""
...
def list_voices(self) -> list[dict]:
"""List available voice models."""
...
class PiperTTSEngine:
"""
Piper TTS engine implementation.
Provides high-quality neural text-to-speech using Piper's ONNX models.
Voice models are loaded lazily and cached for performance.
"""
def __init__(self, model_dir: str = "./models", default_voice: str = "en_US-lessac-medium"):
"""
Initialize the Piper TTS engine.
Args:
model_dir: Directory containing voice model files (.onnx + .onnx.json)
default_voice: Default voice model name to use
"""
self.model_dir = Path(model_dir)
self.default_voice = default_voice
self._voices: dict = {} # Cache of loaded PiperVoice instances
self._voice_metadata: dict = {} # Cache of voice metadata
self._sample_rate: int = 22050 # Piper default sample rate
# Ensure model directory exists
self.model_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"PiperTTSEngine initialized with model_dir={model_dir}")
def _get_voice_path(self, voice_name: str) -> tuple[Path, Path]:
"""
Get paths to voice model files.
Args:
voice_name: Name of the voice model
Returns:
Tuple of (onnx_path, json_path)
"""
onnx_path = self.model_dir / f"{voice_name}.onnx"
json_path = self.model_dir / f"{voice_name}.onnx.json"
return onnx_path, json_path
def _load_voice(self, voice_name: str):
"""
Load a voice model (lazy loading with caching).
Args:
voice_name: Name of the voice model to load
Returns:
Loaded PiperVoice instance
Raises:
FileNotFoundError: If voice model files don't exist
RuntimeError: If voice model fails to load
"""
if voice_name in self._voices:
return self._voices[voice_name]
onnx_path, json_path = self._get_voice_path(voice_name)
if not onnx_path.exists():
raise FileNotFoundError(
f"Voice model not found: {voice_name}. "
f"Expected file: {onnx_path}"
)
try:
from piper import PiperVoice
logger.info(f"Loading voice model: {voice_name}")
voice = PiperVoice.load(str(onnx_path), config_path=str(json_path) if json_path.exists() else None)
self._voices[voice_name] = voice
# Update sample rate from loaded voice
if hasattr(voice, 'config') and voice.config:
self._sample_rate = voice.config.sample_rate
logger.info(f"Voice model loaded: {voice_name} (sample_rate={self._sample_rate})")
return voice
except Exception as e:
logger.error(f"Failed to load voice model {voice_name}: {e}")
raise RuntimeError(f"Failed to load voice model: {e}") from e
def synthesize(self, text: str, voice: str | None = None) -> np.ndarray:
"""
Convert text to audio samples.
Args:
text: Text to convert to speech
voice: Voice model name (uses default if None)
Returns:
NumPy array of audio samples (int16)
Raises:
FileNotFoundError: If voice model not found
RuntimeError: If synthesis fails
"""
voice_name = voice or self.default_voice
if not text or not text.strip():
# Return empty audio for empty text
return np.array([], dtype=np.int16)
try:
piper_voice = self._load_voice(voice_name)
# Synthesize audio - piper returns an iterator of AudioChunk objects
audio_chunks = []
for chunk in piper_voice.synthesize(text):
# Each chunk has audio_int16_array property
audio_chunks.append(chunk.audio_int16_array)
if not audio_chunks:
return np.array([], dtype=np.int16)
# Concatenate all chunks
audio_array = np.concatenate(audio_chunks)
logger.debug(f"Synthesized {len(text)} chars -> {len(audio_array)} samples")
return audio_array
except FileNotFoundError:
raise
except Exception as e:
logger.error(f"TTS synthesis failed: {e}")
raise RuntimeError(f"TTS synthesis failed: {e}") from e
def synthesize_to_float32(self, text: str, voice: str | None = None) -> np.ndarray:
"""
Convert text to float32 audio samples (normalized -1.0 to 1.0).
This format is preferred by sounddevice for playback.
Args:
text: Text to convert to speech
voice: Voice model name (uses default if None)
Returns:
NumPy array of float32 audio samples
"""
int16_audio = self.synthesize(text, voice)
if len(int16_audio) == 0:
return np.array([], dtype=np.float32)
# Convert int16 to float32 normalized
float32_audio = int16_audio.astype(np.float32) / 32768.0
return float32_audio
def get_sample_rate(self) -> int:
"""Get the audio sample rate for the current voice."""
return self._sample_rate
def list_voices(self) -> list[dict]:
"""
List available voice models in the model directory.
Returns:
List of voice info dictionaries with name, language, quality, etc.
"""
voices = []
if not self.model_dir.exists():
return voices
# Find all .onnx files
for onnx_file in self.model_dir.glob("*.onnx"):
voice_name = onnx_file.stem
json_file = onnx_file.with_suffix(".onnx.json")
voice_info = {
"name": voice_name,
"language": self._extract_language(voice_name),
"quality": self._extract_quality(voice_name),
"size_mb": round(onnx_file.stat().st_size / (1024 * 1024), 1),
"installed": True,
}
# Try to load additional metadata from JSON config
if json_file.exists():
try:
import json
with open(json_file) as f:
config = json.load(f)
if "language" in config:
voice_info["language"] = config["language"].get("code", voice_info["language"])
except Exception:
pass # Use extracted values if JSON parsing fails
voices.append(voice_info)
return sorted(voices, key=lambda v: v["name"])
def _extract_language(self, voice_name: str) -> str:
"""Extract language code from voice name (e.g., 'en_US' from 'en_US-lessac-medium')."""
parts = voice_name.split("-")
if parts:
return parts[0]
return "unknown"
def _extract_quality(self, voice_name: str) -> str:
"""Extract quality level from voice name (e.g., 'medium' from 'en_US-lessac-medium')."""
parts = voice_name.split("-")
if len(parts) >= 3:
quality = parts[-1].lower()
if quality in ("low", "medium", "high", "x_low", "x_high"):
return quality
return "medium"
def is_voice_available(self, voice_name: str) -> bool:
"""Check if a voice model is installed."""
onnx_path, _ = self._get_voice_path(voice_name)
return onnx_path.exists()
def health_check(self) -> dict:
"""
Perform a health check on the TTS engine.
Returns:
Dict with status and any error messages
"""
try:
# Check if piper is importable
from piper import PiperVoice # noqa: F401
# Check if model directory exists
if not self.model_dir.exists():
return {
"status": "degraded",
"error": f"Model directory does not exist: {self.model_dir}",
}
# Check if default voice is available
if not self.is_voice_available(self.default_voice):
available = [v["name"] for v in self.list_voices()]
return {
"status": "degraded",
"error": f"Default voice not found: {self.default_voice}",
"available_voices": available,
}
return {"status": "healthy"}
except ImportError as e:
return {
"status": "unhealthy",
"error": f"Piper TTS not installed: {e}",
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
}