claude-home/monitoring/scripts/tdarr_monitor.py
Cal Corum 10c9e0d854 CLAUDE: Migrate to technology-first documentation architecture
Complete restructure from patterns/examples/reference to technology-focused directories:

• Created technology-specific directories with comprehensive documentation:
  - /tdarr/ - Transcoding automation with gaming-aware scheduling
  - /docker/ - Container management with GPU acceleration patterns
  - /vm-management/ - Virtual machine automation and cloud-init
  - /networking/ - SSH infrastructure, reverse proxy, and security
  - /monitoring/ - System health checks and Discord notifications
  - /databases/ - Database patterns and troubleshooting
  - /development/ - Programming language patterns (bash, nodejs, python, vuejs)

• Enhanced CLAUDE.md with intelligent context loading:
  - Technology-first loading rules for automatic context provision
  - Troubleshooting keyword triggers for emergency scenarios
  - Documentation maintenance protocols with automated reminders
  - Context window management for optimal documentation updates

• Preserved valuable content from .claude/tmp/:
  - SSH security improvements and server inventory
  - Tdarr CIFS troubleshooting and Docker iptables solutions
  - Operational scripts with proper technology classification

• Benefits achieved:
  - Self-contained technology directories with complete context
  - Automatic loading of relevant documentation based on keywords
  - Emergency-ready troubleshooting with comprehensive guides
  - Scalable structure for future technology additions
  - Eliminated context bloat through targeted loading

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-12 23:20:15 -05:00

1234 lines
50 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Tdarr API Monitoring Script with Stuck Job Detection and Discord Alerts
Monitors Tdarr server via its web API endpoints:
- Server status and health
- Queue status and statistics
- Node status and performance
- Library scan progress
- Worker activity
- Stuck job detection with configurable timeouts
- Discord notifications for alerts and status updates
Usage:
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check queue
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes
# Enable stuck job detection (30 minute threshold)
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck
# Custom stuck threshold (15 minutes)
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all --detect-stuck --stuck-threshold 15
# Enable Discord alerts for stuck jobs (uses default webhook)
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --discord-alerts
# Automatically clear hung workers when detected
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers
# Full monitoring with automatic clearing and Discord alerts
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers --discord-alerts
# Test Discord integration (uses default webhook)
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --discord-test
# Enable file logging with custom path and debug level
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --log-file /tmp/tdarr_debug.log --log-level DEBUG
# Disable file logging (console only)
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --no-log-file
# Clear memory state
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --clear-memory
"""
import argparse
import json
import logging
import logging.handlers
import sys
import os
import pickle
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
import requests
from urllib.parse import urljoin
@dataclass
class WorkerSnapshot:
worker_id: str
node_id: str
worker_type: str
file: str
percentage: float
status: str
fps: int
eta: str
timestamp: datetime
@dataclass
class StuckJob:
worker_snapshot: WorkerSnapshot
first_seen: datetime
stuck_duration_minutes: float
is_stuck: bool = True
@dataclass
class MemoryState:
worker_snapshots: Dict[str, WorkerSnapshot]
stuck_jobs: Dict[str, StuckJob]
last_updated: datetime
@dataclass
class ServerStatus:
timestamp: str
server_url: str
status: str
error: Optional[str] = None
version: Optional[str] = None
server_id: Optional[str] = None
uptime: Optional[str] = None
system_info: Optional[Dict[str, Any]] = None
@dataclass
class QueueStats:
total_files: int
queued: int
processing: int
completed: int
queue_items: List[Dict[str, Any]]
@dataclass
class QueueStatus:
timestamp: str
queue_stats: Optional[QueueStats] = None
error: Optional[str] = None
@dataclass
class NodeInfo:
id: Optional[str]
nodeName: Optional[str]
status: str
lastSeen: Optional[int]
version: Optional[str]
platform: Optional[str]
workers: Dict[str, int]
processing: List[Dict[str, Any]]
@dataclass
class NodeSummary:
total_nodes: int
online_nodes: int
offline_nodes: int
online_details: List[NodeInfo]
offline_details: List[NodeInfo]
@dataclass
class NodeStatus:
timestamp: str
nodes: List[Dict[str, Any]]
node_summary: Optional[NodeSummary] = None
stuck_jobs: List[StuckJob] = None
error: Optional[str] = None
@dataclass
class LibraryInfo:
name: Optional[str]
path: Optional[str]
file_count: int
scan_progress: int
last_scan: Optional[str]
is_scanning: bool
@dataclass
class ScanStatus:
total_libraries: int
total_files: int
scanning_libraries: int
@dataclass
class LibraryStatus:
timestamp: str
libraries: List[LibraryInfo]
scan_status: Optional[ScanStatus] = None
error: Optional[str] = None
@dataclass
class Statistics:
total_transcodes: int
space_saved: int
total_files_processed: int
failed_transcodes: int
processing_speed: int
eta: Optional[str]
@dataclass
class StatisticsStatus:
timestamp: str
statistics: Optional[Statistics] = None
error: Optional[str] = None
@dataclass
class HealthCheck:
status: str
healthy: bool
online_count: Optional[int] = None
total_count: Optional[int] = None
accessible: Optional[bool] = None
total_items: Optional[int] = None
@dataclass
class HealthStatus:
timestamp: str
overall_status: str
checks: Dict[str, HealthCheck]
@dataclass
class DiscordEmbedField:
name: str
value: str
inline: bool = False
@dataclass
class DiscordEmbed:
title: str
description: str
color: int
fields: List[DiscordEmbedField] = None
timestamp: str = None
def __post_init__(self):
if self.fields is None:
self.fields = []
if self.timestamp is None:
self.timestamp = datetime.utcnow().isoformat()
class DiscordNotifier:
def __init__(self, webhook_url: str, timeout: int = 10):
"""Initialize Discord notifier with webhook URL."""
self.webhook_url = webhook_url
self.timeout = timeout
self.session = requests.Session()
self.logger = logging.getLogger(f"{__name__}.DiscordNotifier")
def send_content_message(self, content: str, username: str = "Tdarr Monitor") -> bool:
"""Send a simple content message to Discord.
Args:
content: The message content to send
username: Bot username to display
Returns:
True if successful, False otherwise
"""
payload = {
"content": content,
"username": username
}
return self._send_webhook(payload)
def send_embed_message(self,
title: str,
description: str,
color: int = 0xff6b6b, # Red by default
fields: List[DiscordEmbedField] = None,
username: str = "Tdarr Monitor") -> bool:
"""Send an embed message to Discord.
Args:
title: Embed title
description: Embed description
color: Embed color (hex integer, default red)
fields: List of embed fields
username: Bot username to display
Returns:
True if successful, False otherwise
"""
embed = DiscordEmbed(
title=title,
description=description,
color=color,
fields=fields or []
)
payload = {
"username": username,
"embeds": [asdict(embed)]
}
return self._send_webhook(payload)
def send_stuck_job_alert(self, stuck_jobs: List[StuckJob]) -> bool:
"""Send alert for stuck jobs using embed format.
Args:
stuck_jobs: List of stuck jobs to report
Returns:
True if successful, False otherwise
"""
if not stuck_jobs:
return True
# Create embed fields for each stuck job
fields = []
for i, stuck_job in enumerate(stuck_jobs[:10]): # Limit to 10 jobs (Discord embed field limit is 25)
ws = stuck_job.worker_snapshot
field_value = (
f"**File:** {os.path.basename(ws.file)}\n"
f"**Progress:** {ws.percentage}%\n"
f"**Status:** {ws.status}\n"
f"**Duration:** {stuck_job.stuck_duration_minutes:.1f} minutes\n"
f"**Node:** {ws.node_id}"
)
fields.append(DiscordEmbedField(
name=f"🚨 Stuck Job {i+1}: {ws.worker_id}",
value=field_value,
inline=True
))
# Add summary field if there are more jobs
if len(stuck_jobs) > 10:
fields.append(DiscordEmbedField(
name="Additional Jobs",
value=f"... and {len(stuck_jobs) - 10} more stuck jobs",
inline=False
))
title = f"🚨 Tdarr Stuck Jobs Detected ({len(stuck_jobs)})"
description = (
f"Detected {len(stuck_jobs)} stuck job{'s' if len(stuck_jobs) != 1 else ''} "
f"in your Tdarr system. These jobs may need manual intervention."
)
return self.send_embed_message(
title=title,
description=description,
color=0xff6b6b, # Red color for alerts
fields=fields
)
def send_system_status(self,
server_status: ServerStatus,
node_status: NodeStatus,
stuck_jobs: List[StuckJob] = None) -> bool:
"""Send system status summary using embed format.
Args:
server_status: Server status information
node_status: Node status information
stuck_jobs: Optional stuck jobs list
Returns:
True if successful, False otherwise
"""
# Determine overall health color
is_healthy = (
server_status.status == "good" and
not server_status.error and
not node_status.error and
(not stuck_jobs or len(stuck_jobs) == 0)
)
color = 0x28a745 if is_healthy else 0xff6b6b # Green if healthy, red if not
# Build description
description_parts = [
f"**Server Status:** {server_status.status.title()}",
f"**Version:** {getattr(server_status, 'version', 'Unknown')}"
]
if node_status.node_summary:
description_parts.extend([
f"**Total Nodes:** {node_status.node_summary.total_nodes}",
f"**Online Nodes:** {node_status.node_summary.online_nodes}",
f"**Offline Nodes:** {node_status.node_summary.offline_nodes}"
])
if stuck_jobs:
description_parts.append(f"**Stuck Jobs:** {len(stuck_jobs)}")
# Add node details as fields
fields = []
if node_status.node_summary and node_status.node_summary.online_details:
for node in node_status.node_summary.online_details:
active_workers = len(node.processing) if node.processing else 0
field_value = (
f"**Status:** Online\n"
f"**Platform:** {node.platform or 'Unknown'}\n"
f"**Active Workers:** {active_workers}\n"
f"**CPU Workers:** {node.workers.get('cpu', 0)}\n"
f"**GPU Workers:** {node.workers.get('gpu', 0)}"
)
fields.append(DiscordEmbedField(
name=f"📡 {node.nodeName or node.id}",
value=field_value,
inline=True
))
title = "📊 Tdarr System Status"
if not is_healthy:
title = "⚠️ Tdarr System Alert"
return self.send_embed_message(
title=title,
description="\n".join(description_parts),
color=color,
fields=fields
)
def _send_webhook(self, payload: Dict[str, Any]) -> bool:
"""Send payload to Discord webhook.
Args:
payload: JSON payload to send
Returns:
True if successful, False otherwise
"""
try:
response = self.session.post(
self.webhook_url,
json=payload,
timeout=self.timeout
)
response.raise_for_status()
self.logger.info("Discord notification sent successfully")
return True
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to send Discord notification: {e}")
return False
except Exception as e:
self.logger.error(f"Unexpected error sending Discord notification: {e}")
return False
class StuckJobDetector:
def __init__(self, memory_file: str = ".claude/tmp/tdarr_memory.pkl", stuck_threshold_minutes: int = 30):
"""Initialize stuck job detector with memory persistence."""
self.memory_file = os.path.abspath(memory_file) # Use absolute path
self.stuck_threshold_minutes = stuck_threshold_minutes
self.logger = logging.getLogger(f"{__name__}.StuckJobDetector")
self.logger.debug(f"Using memory file: {self.memory_file}")
self.memory_state = self._load_memory_state()
# Ensure memory directory exists
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
def _load_memory_state(self) -> MemoryState:
"""Load memory state from disk or create new one."""
if os.path.exists(self.memory_file):
try:
with open(self.memory_file, 'rb') as f:
memory_state = pickle.load(f)
self.logger.debug(f"Loaded memory state: {len(memory_state.worker_snapshots)} workers, {len(memory_state.stuck_jobs)} stuck jobs")
return memory_state
except Exception as e:
self.logger.warning(f"Failed to load memory state: {e}, creating new state")
else:
self.logger.debug(f"Memory file {self.memory_file} does not exist, creating new state")
return MemoryState(
worker_snapshots={},
stuck_jobs={},
last_updated=datetime.now()
)
def _save_memory_state(self):
"""Save memory state to disk."""
try:
with open(self.memory_file, 'wb') as f:
pickle.dump(self.memory_state, f)
except Exception as e:
self.logger.error(f"Failed to save memory state: {e}")
def _create_worker_key(self, node_id: str, worker_id: str) -> str:
"""Create unique key for worker identification."""
return f"{node_id}:{worker_id}"
def _is_worker_stuck(self, current: WorkerSnapshot, previous: WorkerSnapshot) -> bool:
"""Check if worker is stuck based on comparison with previous snapshot."""
worker_key = f"{current.node_id}:{current.worker_id}"
# Check each condition individually for detailed logging
file_same = current.file == previous.file
percentage_same = current.percentage == previous.percentage
status_same = current.status == previous.status
fps_same = current.fps == previous.fps
eta_same = current.eta == previous.eta
is_stuck = file_same and percentage_same and status_same and fps_same and eta_same
# Log detailed comparison info
self.logger.debug(f"Worker {worker_key} stuck check:")
self.logger.debug(f" File: '{current.file}' == '{previous.file}' = {file_same}")
self.logger.debug(f" Percentage: {current.percentage}% == {previous.percentage}% = {percentage_same}")
self.logger.debug(f" Status: '{current.status}' == '{previous.status}' = {status_same}")
self.logger.debug(f" FPS: {current.fps} == {previous.fps} = {fps_same}")
self.logger.debug(f" ETA: '{current.eta}' == '{previous.eta}' = {eta_same}")
self.logger.debug(f" → Result: {'STUCK' if is_stuck else 'NOT STUCK'}")
# Log INFO level when we detect changes (worker making progress)
if not is_stuck:
if not percentage_same:
self.logger.info(f"Worker {worker_key} making progress: {previous.percentage}% → {current.percentage}%")
elif not status_same:
self.logger.info(f"Worker {worker_key} status changed: '{previous.status}''{current.status}'")
elif not file_same:
self.logger.info(f"Worker {worker_key} file changed: '{previous.file}''{current.file}'")
return is_stuck
def update_workers(self, nodes_data: Dict[str, Any]) -> List[StuckJob]:
"""Update worker snapshots and detect stuck jobs."""
current_time = datetime.now()
current_workers = {}
detected_stuck_jobs = []
# Extract current worker states from nodes data
for node_id, node_data in nodes_data.items():
workers = node_data.get('workers', {})
for worker_id, worker_data in workers.items():
worker_key = self._create_worker_key(node_id, worker_id)
# Create current snapshot
current_snapshot = WorkerSnapshot(
worker_id=worker_id,
node_id=node_id,
worker_type=worker_data.get('workerType', 'unknown'),
file=worker_data.get('file', ''),
percentage=worker_data.get('percentage', -1),
status=worker_data.get('status', ''),
fps=worker_data.get('fps', 0),
eta=worker_data.get('ETA', ''),
timestamp=current_time
)
current_workers[worker_key] = current_snapshot
# Log all workers being tracked
self.logger.debug(f"Tracking worker {worker_key}: {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'")
# Check if worker was previously tracked
if worker_key in self.memory_state.worker_snapshots:
previous_snapshot = self.memory_state.worker_snapshots[worker_key]
# Check if worker is stuck
if self._is_worker_stuck(current_snapshot, previous_snapshot):
# Calculate how long it's been stuck
time_since_previous = (current_time - previous_snapshot.timestamp).total_seconds() / 60
self.logger.debug(f"Worker {worker_key} has been stuck for {time_since_previous:.1f} minutes since last check")
self.logger.debug(f"Worker {worker_key} checking stuck_jobs dict: {list(self.memory_state.stuck_jobs.keys())}")
if worker_key in self.memory_state.stuck_jobs:
# Already known stuck job, update duration
stuck_job = self.memory_state.stuck_jobs[worker_key]
stuck_duration = current_time - stuck_job.first_seen
stuck_job.stuck_duration_minutes = stuck_duration.total_seconds() / 60
stuck_job.worker_snapshot = current_snapshot
self.logger.debug(f"Worker {worker_key} known stuck job - duration: {stuck_job.stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min")
if stuck_job.stuck_duration_minutes >= self.stuck_threshold_minutes:
self.logger.debug(f"Worker {worker_key} EXCEEDS threshold - adding to detected stuck jobs")
detected_stuck_jobs.append(stuck_job)
else:
self.logger.debug(f"Worker {worker_key} below threshold - not flagging yet")
else:
# New stuck job detected - add to memory immediately to start tracking
first_seen = previous_snapshot.timestamp
stuck_duration = current_time - first_seen
stuck_duration_minutes = stuck_duration.total_seconds() / 60
self.logger.debug(f"Worker {worker_key} NEW stuck job - first_seen: {first_seen}, current: {current_time}")
self.logger.debug(f"Worker {worker_key} NEW stuck job - duration: {stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min")
# Create stuck job entry immediately to track duration across runs
stuck_job = StuckJob(
worker_snapshot=current_snapshot,
first_seen=first_seen,
stuck_duration_minutes=stuck_duration_minutes,
is_stuck=True
)
self.memory_state.stuck_jobs[worker_key] = stuck_job
if stuck_duration_minutes >= self.stuck_threshold_minutes:
self.logger.debug(f"Worker {worker_key} NEW stuck job EXCEEDS threshold - flagging for clearing")
detected_stuck_jobs.append(stuck_job)
else:
self.logger.debug(f"Worker {worker_key} NEW stuck job below threshold - tracking in memory")
else:
# Worker is not stuck, remove from stuck jobs if present
if worker_key in self.memory_state.stuck_jobs:
del self.memory_state.stuck_jobs[worker_key]
self.logger.info(f"Worker {worker_key} is no longer stuck")
else:
# New worker, start tracking it
self.logger.info(f"New worker detected: {worker_key} - {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'")
# Clean up stuck jobs for workers that no longer exist
stuck_jobs_to_remove = []
for worker_key in self.memory_state.stuck_jobs:
if worker_key not in current_workers:
stuck_jobs_to_remove.append(worker_key)
for worker_key in stuck_jobs_to_remove:
del self.memory_state.stuck_jobs[worker_key]
self.logger.info(f"Removed stuck job tracking for missing worker: {worker_key}")
# Update memory state
self.memory_state.worker_snapshots = current_workers
self.memory_state.last_updated = current_time
# Save to disk
self._save_memory_state()
return detected_stuck_jobs
def get_stuck_jobs(self) -> List[StuckJob]:
"""Get current list of stuck jobs."""
return list(self.memory_state.stuck_jobs.values())
def clear_memory(self):
"""Clear all memory state."""
self.memory_state = MemoryState(
worker_snapshots={},
stuck_jobs={},
last_updated=datetime.now()
)
self._save_memory_state()
self.logger.info("Memory state cleared")
class TdarrMonitor:
def __init__(self, server_url: str, timeout: int = 30, enable_stuck_detection: bool = False,
stuck_threshold_minutes: int = 30, memory_file: str = ".claude/tmp/tdarr_memory.pkl",
discord_webhook_url: str = None, enable_discord_alerts: bool = False,
log_file: Optional[str] = None, log_level: str = "INFO", clear_hung_workers: bool = False):
"""Initialize Tdarr monitor with server URL."""
self.server_url = server_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
self.enable_stuck_detection = enable_stuck_detection
self.enable_discord_alerts = enable_discord_alerts
self.clear_hung_workers_enabled = clear_hung_workers
# Configure logging first
self._setup_logging(log_file, log_level)
self.logger = logging.getLogger(__name__)
# Initialize stuck job detector if enabled
self.stuck_detector = None
if enable_stuck_detection:
self.stuck_detector = StuckJobDetector(memory_file, stuck_threshold_minutes)
# Initialize Discord notifier if enabled
self.discord_notifier = None
if enable_discord_alerts:
if discord_webhook_url:
self.discord_notifier = DiscordNotifier(discord_webhook_url)
else:
self.logger.warning("Discord alerts enabled but no webhook URL provided")
def _setup_logging(self, log_file: Optional[str] = None, log_level: str = "INFO"):
"""Configure logging with optional file rotation."""
# Clear any existing handlers
root_logger = logging.getLogger()
root_logger.handlers.clear()
# Set log level
level = getattr(logging, log_level.upper(), logging.INFO)
root_logger.setLevel(level)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler (for interactive use)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler with rotation (if log_file specified)
if log_file:
# Ensure log directory exists
log_dir = os.path.dirname(log_file)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
# Rotating file handler: 10MB max, keep 5 backup files
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
def _make_request(self, endpoint: str) -> Optional[Dict[str, Any]]:
"""Make HTTP request to Tdarr API endpoint."""
url = urljoin(self.server_url, endpoint)
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"Request failed for {url}: {e}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode failed for {url}: {e}")
return None
def clear_hung_workers(self, stuck_jobs: Optional[List[StuckJob]] = None) -> bool:
"""Clear hung workers via Tdarr API using kill-worker endpoint.
Args:
stuck_jobs: List of StuckJob objects to clear. Each contains worker and node information.
Returns:
True if all workers cleared successfully, False otherwise
"""
if not stuck_jobs:
self.logger.info("No stuck jobs provided for clearing hung workers")
return True
success_count = 0
total_count = len(stuck_jobs)
for stuck_job in stuck_jobs:
worker_snapshot = stuck_job.worker_snapshot
try:
# Use the kill-worker endpoint with correct payload format
endpoint = '/api/v2/kill-worker'
payload = {
"data": {
"nodeID": worker_snapshot.node_id,
"workerID": worker_snapshot.worker_id
}
}
url = urljoin(self.server_url, endpoint)
response = self.session.post(url, json=payload, timeout=self.timeout)
response.raise_for_status()
self.logger.info(f"Successfully killed hung worker: {worker_snapshot.node_id}:{worker_snapshot.worker_id}")
success_count += 1
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to kill worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}")
except Exception as e:
self.logger.error(f"Unexpected error killing worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}")
self.logger.info(f"Cleared {success_count}/{total_count} hung workers")
return success_count == total_count
def get_server_status(self) -> ServerStatus:
"""Get overall server status and configuration."""
timestamp = datetime.now().isoformat()
# Try to get server info from API
data = self._make_request('/api/v2/status')
if data:
server_status = data.get('status', 'unknown')
self.logger.info(f"Server check completed: status={server_status}, version={data.get('version', 'unknown')}")
return ServerStatus(
timestamp=timestamp,
server_url=self.server_url,
status=server_status,
version=data.get('version'),
uptime=data.get('uptime')
)
else:
self.logger.error("Server check failed: Unable to connect to Tdarr server")
return ServerStatus(
timestamp=timestamp,
server_url=self.server_url,
status='offline',
error='Unable to connect to Tdarr server'
)
def get_queue_status(self) -> QueueStatus:
"""Get transcoding queue status and statistics."""
timestamp = datetime.now().isoformat()
# Get queue information
data = self._make_request('/api/v2/get-queue')
if data:
queue_data = data.get('queue', [])
# Calculate queue statistics
total_files = len(queue_data)
queued_files = len([f for f in queue_data if f.get('status') == 'Queued'])
processing_files = len([f for f in queue_data if f.get('status') == 'Processing'])
completed_files = len([f for f in queue_data if f.get('status') == 'Completed'])
queue_stats = QueueStats(
total_files=total_files,
queued=queued_files,
processing=processing_files,
completed=completed_files,
queue_items=queue_data[:10] # First 10 items for details
)
return QueueStatus(
timestamp=timestamp,
queue_stats=queue_stats
)
else:
return QueueStatus(
timestamp=timestamp,
error='Unable to fetch queue data'
)
def get_node_status(self) -> NodeStatus:
"""Get status of all connected nodes."""
timestamp = datetime.now().isoformat()
# Get nodes information (using correct endpoint)
data = self._make_request('/api/v2/get-nodes')
if data:
# Handle the actual data structure returned by Tdarr API
nodes_dict = data if isinstance(data, dict) else {}
nodes = []
# Process node information
online_nodes = []
offline_nodes = []
for node_id, node_data in nodes_dict.items():
node_info = NodeInfo(
id=node_id,
nodeName=node_data.get('nodeName'),
status='online', # Assume online if in response
lastSeen=None,
version=node_data.get('config', {}).get('version'),
platform=node_data.get('config', {}).get('platform_arch_isdocker'),
workers={
'cpu': len([w for w in node_data.get('workers', {}).values() if 'cpu' in w.get('workerType', '').lower()]),
'gpu': len([w for w in node_data.get('workers', {}).values() if 'gpu' in w.get('workerType', '').lower()])
},
processing=list(node_data.get('workers', {}).values())
)
online_nodes.append(node_info)
nodes.append(node_data)
# Check for stuck jobs if detection is enabled
stuck_jobs = []
if self.stuck_detector:
try:
stuck_jobs = self.stuck_detector.update_workers(nodes_dict)
if stuck_jobs:
self.logger.warning(f"Detected {len(stuck_jobs)} stuck jobs")
for stuck_job in stuck_jobs:
self.logger.warning(
f"Stuck job: {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id} "
f"on file '{stuck_job.worker_snapshot.file}' "
f"at {stuck_job.worker_snapshot.percentage}% for {stuck_job.stuck_duration_minutes:.1f} minutes"
)
# Clear hung workers if enabled
if self.clear_hung_workers_enabled:
try:
clear_success = self.clear_hung_workers(stuck_jobs)
if clear_success:
self.logger.info(f"Successfully cleared {len(stuck_jobs)} hung workers")
else:
self.logger.warning("Some hung workers could not be cleared")
except Exception as e:
self.logger.error(f"Error clearing hung workers: {e}")
# Send Discord notification for stuck jobs
if self.discord_notifier:
try:
self.discord_notifier.send_stuck_job_alert(stuck_jobs)
except Exception as e:
self.logger.error(f"Failed to send Discord stuck job alert: {e}")
except Exception as e:
self.logger.error(f"Error in stuck job detection: {e}")
node_summary = NodeSummary(
total_nodes=len(nodes),
online_nodes=len(online_nodes),
offline_nodes=len(offline_nodes),
online_details=online_nodes,
offline_details=offline_nodes
)
# Log successful node check with summary
if stuck_jobs:
self.logger.info(f"Node check completed: {len(nodes)} nodes online, {len(stuck_jobs)} stuck jobs detected")
else:
self.logger.info(f"Node check completed: {len(nodes)} nodes online, no stuck jobs detected")
return NodeStatus(
timestamp=timestamp,
nodes=nodes,
node_summary=node_summary,
stuck_jobs=stuck_jobs
)
else:
self.logger.error("Node check failed: Unable to fetch node data")
return NodeStatus(
timestamp=timestamp,
nodes=[],
error='Unable to fetch node data'
)
def get_library_status(self) -> LibraryStatus:
"""Get library scan status and file statistics."""
timestamp = datetime.now().isoformat()
# Get library information
data = self._make_request('/api/v2/get-libraries')
if data:
libraries = data.get('libraries', [])
library_stats = []
total_files = 0
for lib in libraries:
lib_info = LibraryInfo(
name=lib.get('name'),
path=lib.get('path'),
file_count=lib.get('totalFiles', 0),
scan_progress=lib.get('scanProgress', 0),
last_scan=lib.get('lastScan'),
is_scanning=lib.get('isScanning', False)
)
library_stats.append(lib_info)
total_files += lib_info.file_count
scan_status = ScanStatus(
total_libraries=len(libraries),
total_files=total_files,
scanning_libraries=len([l for l in library_stats if l.is_scanning])
)
return LibraryStatus(
timestamp=timestamp,
libraries=library_stats,
scan_status=scan_status
)
else:
return LibraryStatus(
timestamp=timestamp,
libraries=[],
error='Unable to fetch library data'
)
def get_statistics(self) -> StatisticsStatus:
"""Get overall Tdarr statistics and health metrics."""
timestamp = datetime.now().isoformat()
# Get statistics
data = self._make_request('/api/v2/get-stats')
if data:
stats = data.get('stats', {})
statistics = Statistics(
total_transcodes=stats.get('totalTranscodes', 0),
space_saved=stats.get('spaceSaved', 0),
total_files_processed=stats.get('totalFilesProcessed', 0),
failed_transcodes=stats.get('failedTranscodes', 0),
processing_speed=stats.get('processingSpeed', 0),
eta=stats.get('eta')
)
return StatisticsStatus(
timestamp=timestamp,
statistics=statistics
)
else:
return StatisticsStatus(
timestamp=timestamp,
error='Unable to fetch statistics'
)
def health_check(self) -> HealthStatus:
"""Perform comprehensive health check."""
timestamp = datetime.now().isoformat()
# Server connectivity
server_status = self.get_server_status()
server_check = HealthCheck(
status=server_status.status,
healthy=server_status.status == 'good'
)
# Node connectivity
node_status = self.get_node_status()
nodes_healthy = (
node_status.node_summary.online_nodes > 0 if node_status.node_summary else False
) and not node_status.error
nodes_check = HealthCheck(
status='online' if nodes_healthy else 'offline',
healthy=nodes_healthy,
online_count=node_status.node_summary.online_nodes if node_status.node_summary else 0,
total_count=node_status.node_summary.total_nodes if node_status.node_summary else 0
)
# Queue status
queue_status = self.get_queue_status()
queue_healthy = not queue_status.error
queue_check = HealthCheck(
status='accessible' if queue_healthy else 'error',
healthy=queue_healthy,
accessible=queue_healthy,
total_items=queue_status.queue_stats.total_files if queue_status.queue_stats else 0
)
checks = {
'server': server_check,
'nodes': nodes_check,
'queue': queue_check
}
# Determine overall health
all_checks_healthy = all(check.healthy for check in checks.values())
overall_status = 'healthy' if all_checks_healthy else 'unhealthy'
return HealthStatus(
timestamp=timestamp,
overall_status=overall_status,
checks=checks
)
def main():
parser = argparse.ArgumentParser(description='Monitor Tdarr server via API')
parser.add_argument('--server', required=True, help='Tdarr server URL (e.g., http://10.10.0.43:8265)')
parser.add_argument('--check', choices=['all', 'status', 'queue', 'nodes', 'libraries', 'stats', 'health'],
default='health', help='Type of check to perform')
parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds')
parser.add_argument('--output', choices=['json', 'pretty'], default='pretty', help='Output format')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
parser.add_argument('--detect-stuck', action='store_true', help='Enable stuck job detection')
parser.add_argument('--stuck-threshold', type=int, default=30, help='Minutes before job is considered stuck (default: 30)')
parser.add_argument('--memory-file', default='.claude/tmp/tdarr_memory.pkl', help='Path to memory state file')
parser.add_argument('--clear-memory', action='store_true', help='Clear memory state and exit')
parser.add_argument('--discord-webhook',
default='https://discord.com/api/webhooks/1404105821549498398/y2Ud1RK9rzFjv58xbypUfQNe3jrL7ZUq1FkQHa4_dfOHm2ylp93z0f4tY0O8Z-vQgKhD',
help='Discord webhook URL for notifications (default: configured webhook)')
parser.add_argument('--discord-alerts', action='store_true', help='Enable Discord alerts for stuck jobs and system status')
parser.add_argument('--discord-test', action='store_true', help='Send test Discord message and exit')
parser.add_argument('--log-file', default='./scripts/logs/tdarr_monitor.log',
help='Path to log file with rotation (default: ./scripts/logs/tdarr_monitor.log)')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO',
help='Logging level (default: INFO)')
parser.add_argument('--no-log-file', action='store_true', help='Disable file logging, console only')
parser.add_argument('--clear-hung-workers', action='store_true', help='Clear hung workers via API call when stuck jobs are detected')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Handle clear memory command
if args.clear_memory:
if os.path.exists(args.memory_file):
os.remove(args.memory_file)
print(f"Memory state cleared: {args.memory_file}")
else:
print(f"Memory file does not exist: {args.memory_file}")
sys.exit(0)
# Handle Discord test command
if args.discord_test:
print("Sending Discord test messages...")
notifier = DiscordNotifier(args.discord_webhook)
# Test content message
content_success = notifier.send_content_message(
"🧪 **Tdarr Monitor Test** - Content message working correctly!"
)
# Test embed message
test_fields = [
DiscordEmbedField("Test Field 1", "This is a test value", True),
DiscordEmbedField("Test Field 2", "Another test value", True),
]
embed_success = notifier.send_embed_message(
title="🧪 Tdarr Monitor Test",
description="This is a test embed message to verify Discord integration is working correctly.",
color=0x00ff00, # Green
fields=test_fields
)
if content_success and embed_success:
print("✅ Discord test successful! Both content and embed messages sent.")
sys.exit(0)
else:
print("❌ Discord test failed. Check webhook URL and permissions.")
sys.exit(1)
# Initialize monitor
log_file = None if args.no_log_file else args.log_file
monitor = TdarrMonitor(
args.server,
args.timeout,
enable_stuck_detection=args.detect_stuck,
stuck_threshold_minutes=args.stuck_threshold,
memory_file=args.memory_file,
discord_webhook_url=args.discord_webhook,
enable_discord_alerts=args.discord_alerts,
log_file=log_file,
log_level=args.log_level,
clear_hung_workers=args.clear_hung_workers
)
# Perform requested check
monitor.logger.info(f"Starting Tdarr monitoring check: {args.check}, stuck_detection={'enabled' if args.detect_stuck else 'disabled'}, clear_workers={'enabled' if args.clear_hung_workers else 'disabled'}")
result = None
if args.check == 'all':
result = {
'server_status': monitor.get_server_status(),
'queue_status': monitor.get_queue_status(),
'node_status': monitor.get_node_status(),
'library_status': monitor.get_library_status(),
'statistics': monitor.get_statistics()
}
elif args.check == 'status':
result = monitor.get_server_status()
elif args.check == 'queue':
result = monitor.get_queue_status()
elif args.check == 'nodes':
result = monitor.get_node_status()
elif args.check == 'libraries':
result = monitor.get_library_status()
elif args.check == 'stats':
result = monitor.get_statistics()
elif args.check == 'health':
result = monitor.health_check()
# Output results
if args.output == 'json':
# Convert dataclasses to dictionaries for JSON serialization
if args.check == 'all':
json_result = {}
for key, value in result.items():
json_result[key] = asdict(value)
print(json.dumps(json_result, indent=2))
else:
print(json.dumps(asdict(result), indent=2))
else:
# Pretty print format
print(f"=== Tdarr Monitor Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
if args.check == 'health' or (hasattr(result, 'overall_status') and result.overall_status):
health = result if hasattr(result, 'overall_status') else None
if health:
status = health.overall_status
print(f"Overall Status: {status.upper()}")
if health.checks:
print("\nHealth Checks:")
for check_name, check_data in health.checks.items():
status_icon = "" if check_data.healthy else ""
print(f" {status_icon} {check_name.title()}: {asdict(check_data)}")
# Display stuck jobs if present
if args.detect_stuck:
if hasattr(result, 'stuck_jobs') and result.stuck_jobs:
print(f"\n=== STUCK JOBS DETECTED ({len(result.stuck_jobs)}) ===")
for stuck_job in result.stuck_jobs:
print(f"🚨 {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}")
print(f" File: {stuck_job.worker_snapshot.file}")
print(f" Progress: {stuck_job.worker_snapshot.percentage}%")
print(f" Status: {stuck_job.worker_snapshot.status}")
print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes")
print()
elif args.check in ['nodes', 'all']:
# Check all results for stuck jobs if 'all' is selected
stuck_found = False
if args.check == 'all' and isinstance(result, dict):
for section, data in result.items():
if hasattr(data, 'stuck_jobs') and data.stuck_jobs:
if not stuck_found:
print(f"\n=== STUCK JOBS DETECTED ===")
stuck_found = True
for stuck_job in data.stuck_jobs:
print(f"🚨 {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}")
print(f" File: {stuck_job.worker_snapshot.file}")
print(f" Progress: {stuck_job.worker_snapshot.percentage}%")
print(f" Status: {stuck_job.worker_snapshot.status}")
print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes")
print()
if not stuck_found:
print(f"\n✅ No stuck jobs detected (threshold: {args.stuck_threshold} minutes)")
if args.check == 'all':
for section, data in result.items():
print(f"\n=== {section.replace('_', ' ').title()} ===")
# Don't print stuck_jobs in JSON format as we already displayed them above
if hasattr(data, 'stuck_jobs'):
data_dict = asdict(data)
data_dict.pop('stuck_jobs', None)
print(json.dumps(data_dict, indent=2))
else:
print(json.dumps(asdict(data), indent=2))
elif args.check != 'health':
# Don't print stuck_jobs in JSON format as we already displayed them above
if hasattr(result, 'stuck_jobs'):
result_dict = asdict(result)
result_dict.pop('stuck_jobs', None)
print(json.dumps(result_dict, indent=2))
else:
print(json.dumps(asdict(result), indent=2))
# Exit with appropriate code
if result:
# Check for unhealthy status in health check
if isinstance(result, HealthStatus) and result.overall_status == 'unhealthy':
sys.exit(1)
# Check for errors in individual status objects (all status classes except HealthStatus have error attribute)
elif (isinstance(result, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus))
and result.error):
sys.exit(1)
# Check for errors in 'all' results
elif isinstance(result, dict):
for status_obj in result.values():
if (isinstance(status_obj, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus))
and status_obj.error):
sys.exit(1)
sys.exit(0)
if __name__ == '__main__':
main()