#!/usr/bin/env python3 """ Tdarr API Monitoring Script with Stuck Job Detection and Discord Alerts Monitors Tdarr server via its web API endpoints: - Server status and health - Queue status and statistics - Node status and performance - Library scan progress - Worker activity - Stuck job detection with configurable timeouts - Discord notifications for alerts and status updates Usage: python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check queue python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes # Enable stuck job detection (30 minute threshold) python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck # Custom stuck threshold (15 minutes) python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all --detect-stuck --stuck-threshold 15 # Enable Discord alerts for stuck jobs (uses default webhook) python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --discord-alerts # Automatically clear hung workers when detected python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers # Full monitoring with automatic clearing and Discord alerts python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers --discord-alerts # Test Discord integration (uses default webhook) python3 tdarr_monitor.py --server http://10.10.0.43:8265 --discord-test # Enable file logging with custom path and debug level python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --log-file /tmp/tdarr_debug.log --log-level DEBUG # Disable file logging (console only) python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --no-log-file # Clear memory state python3 tdarr_monitor.py --server http://10.10.0.43:8265 --clear-memory """ import argparse import json import logging import logging.handlers import sys import os import pickle from dataclasses import dataclass, asdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Union import requests from urllib.parse import urljoin @dataclass class WorkerSnapshot: worker_id: str node_id: str worker_type: str file: str percentage: float status: str fps: int eta: str timestamp: datetime @dataclass class StuckJob: worker_snapshot: WorkerSnapshot first_seen: datetime stuck_duration_minutes: float is_stuck: bool = True @dataclass class MemoryState: worker_snapshots: Dict[str, WorkerSnapshot] stuck_jobs: Dict[str, StuckJob] last_updated: datetime @dataclass class ServerStatus: timestamp: str server_url: str status: str error: Optional[str] = None version: Optional[str] = None server_id: Optional[str] = None uptime: Optional[str] = None system_info: Optional[Dict[str, Any]] = None @dataclass class QueueStats: total_files: int queued: int processing: int completed: int queue_items: List[Dict[str, Any]] @dataclass class QueueStatus: timestamp: str queue_stats: Optional[QueueStats] = None error: Optional[str] = None @dataclass class NodeInfo: id: Optional[str] nodeName: Optional[str] status: str lastSeen: Optional[int] version: Optional[str] platform: Optional[str] workers: Dict[str, int] processing: List[Dict[str, Any]] @dataclass class NodeSummary: total_nodes: int online_nodes: int offline_nodes: int online_details: List[NodeInfo] offline_details: List[NodeInfo] @dataclass class NodeStatus: timestamp: str nodes: List[Dict[str, Any]] node_summary: Optional[NodeSummary] = None stuck_jobs: List[StuckJob] = None error: Optional[str] = None @dataclass class LibraryInfo: name: Optional[str] path: Optional[str] file_count: int scan_progress: int last_scan: Optional[str] is_scanning: bool @dataclass class ScanStatus: total_libraries: int total_files: int scanning_libraries: int @dataclass class LibraryStatus: timestamp: str libraries: List[LibraryInfo] scan_status: Optional[ScanStatus] = None error: Optional[str] = None @dataclass class Statistics: total_transcodes: int space_saved: int total_files_processed: int failed_transcodes: int processing_speed: int eta: Optional[str] @dataclass class StatisticsStatus: timestamp: str statistics: Optional[Statistics] = None error: Optional[str] = None @dataclass class HealthCheck: status: str healthy: bool online_count: Optional[int] = None total_count: Optional[int] = None accessible: Optional[bool] = None total_items: Optional[int] = None @dataclass class HealthStatus: timestamp: str overall_status: str checks: Dict[str, HealthCheck] @dataclass class DiscordEmbedField: name: str value: str inline: bool = False @dataclass class DiscordEmbed: title: str description: str color: int fields: List[DiscordEmbedField] = None timestamp: str = None def __post_init__(self): if self.fields is None: self.fields = [] if self.timestamp is None: self.timestamp = datetime.now().isoformat() class DiscordNotifier: def __init__(self, webhook_url: str, timeout: int = 10): """Initialize Discord notifier with webhook URL.""" self.webhook_url = webhook_url self.timeout = timeout self.session = requests.Session() self.logger = logging.getLogger(f"{__name__}.DiscordNotifier") def send_content_message(self, content: str, username: str = "Tdarr Monitor") -> bool: """Send a simple content message to Discord. Args: content: The message content to send username: Bot username to display Returns: True if successful, False otherwise """ payload = { "content": content, "username": username } return self._send_webhook(payload) def send_embed_message(self, title: str, description: str, color: int = 0xff6b6b, # Red by default fields: List[DiscordEmbedField] = None, username: str = "Tdarr Monitor") -> bool: """Send an embed message to Discord. Args: title: Embed title description: Embed description color: Embed color (hex integer, default red) fields: List of embed fields username: Bot username to display Returns: True if successful, False otherwise """ embed = DiscordEmbed( title=title, description=description, color=color, fields=fields or [] ) payload = { "username": username, "embeds": [asdict(embed)] } return self._send_webhook(payload) def send_stuck_job_alert(self, stuck_jobs: List[StuckJob]) -> bool: """Send alert for stuck jobs using embed format. Args: stuck_jobs: List of stuck jobs to report Returns: True if successful, False otherwise """ if not stuck_jobs: return True # Create embed fields for each stuck job fields = [] for i, stuck_job in enumerate(stuck_jobs[:10]): # Limit to 10 jobs (Discord embed field limit is 25) ws = stuck_job.worker_snapshot field_value = ( f"**File:** {os.path.basename(ws.file)}\n" f"**Status:** {ws.status}\n" f"**Progress:** {ws.percentage}%\n" f"**Duration:** {stuck_job.stuck_duration_minutes:.1f} minutes\n" f"**Node:** {ws.node_id}" ) fields.append(DiscordEmbedField( name=f"Stuck Job {i+1}: {ws.worker_id}", value=field_value, inline=True )) # Add summary field if there are more jobs if len(stuck_jobs) > 10: fields.append(DiscordEmbedField( name="Additional Jobs", value=f"... and {len(stuck_jobs) - 10} more stuck jobs", inline=False )) title = f"๐Ÿšจ Tdarr Stuck Jobs Detected ({len(stuck_jobs)})" description = ( f"Detected {len(stuck_jobs)} stuck job{'s' if len(stuck_jobs) != 1 else ''} " f"in your Tdarr system. These jobs will follow the requested clear behavior." ) return self.send_embed_message( title=title, description=description, color=0xff6b6b, # Red color for alerts fields=fields ) def send_system_status(self, server_status: ServerStatus, node_status: NodeStatus, stuck_jobs: List[StuckJob] = None) -> bool: """Send system status summary using embed format. Args: server_status: Server status information node_status: Node status information stuck_jobs: Optional stuck jobs list Returns: True if successful, False otherwise """ # Determine overall health color is_healthy = ( server_status.status == "good" and not server_status.error and not node_status.error and (not stuck_jobs or len(stuck_jobs) == 0) ) color = 0x28a745 if is_healthy else 0xff6b6b # Green if healthy, red if not # Build description description_parts = [ f"**Server Status:** {server_status.status.title()}", f"**Version:** {getattr(server_status, 'version', 'Unknown')}" ] if node_status.node_summary: description_parts.extend([ f"**Total Nodes:** {node_status.node_summary.total_nodes}", f"**Online Nodes:** {node_status.node_summary.online_nodes}", f"**Offline Nodes:** {node_status.node_summary.offline_nodes}" ]) if stuck_jobs: description_parts.append(f"**Stuck Jobs:** {len(stuck_jobs)}") # Add node details as fields fields = [] if node_status.node_summary and node_status.node_summary.online_details: for node in node_status.node_summary.online_details: active_workers = len(node.processing) if node.processing else 0 field_value = ( f"**Status:** Online\n" f"**Platform:** {node.platform or 'Unknown'}\n" f"**Active Workers:** {active_workers}\n" f"**CPU Workers:** {node.workers.get('cpu', 0)}\n" f"**GPU Workers:** {node.workers.get('gpu', 0)}" ) fields.append(DiscordEmbedField( name=f"๐Ÿ“ก {node.nodeName or node.id}", value=field_value, inline=True )) title = "๐Ÿ“Š Tdarr System Status" if not is_healthy: title = "โš ๏ธ Tdarr System Alert" return self.send_embed_message( title=title, description="\n".join(description_parts), color=color, fields=fields ) def _send_webhook(self, payload: Dict[str, Any]) -> bool: """Send payload to Discord webhook. Args: payload: JSON payload to send Returns: True if successful, False otherwise """ try: response = self.session.post( self.webhook_url, json=payload, timeout=self.timeout ) response.raise_for_status() self.logger.info("Discord notification sent successfully") return True except requests.exceptions.RequestException as e: self.logger.error(f"Failed to send Discord notification: {e}") return False except Exception as e: self.logger.error(f"Unexpected error sending Discord notification: {e}") return False class StuckJobDetector: def __init__(self, memory_file: str = "/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl", stuck_threshold_minutes: int = 30): """Initialize stuck job detector with memory persistence.""" self.memory_file = os.path.abspath(memory_file) # Use absolute path self.stuck_threshold_minutes = stuck_threshold_minutes self.logger = logging.getLogger(f"{__name__}.StuckJobDetector") self.logger.debug(f"Using memory file: {self.memory_file}") self.memory_state = self._load_memory_state() # Ensure memory directory exists os.makedirs(os.path.dirname(memory_file), exist_ok=True) def _load_memory_state(self) -> MemoryState: """Load memory state from disk or create new one.""" if os.path.exists(self.memory_file): try: with open(self.memory_file, 'rb') as f: memory_state = pickle.load(f) self.logger.debug(f"Loaded memory state: {len(memory_state.worker_snapshots)} workers, {len(memory_state.stuck_jobs)} stuck jobs") return memory_state except Exception as e: self.logger.warning(f"Failed to load memory state: {e}, creating new state") else: self.logger.debug(f"Memory file {self.memory_file} does not exist, creating new state") return MemoryState( worker_snapshots={}, stuck_jobs={}, last_updated=datetime.now() ) def _save_memory_state(self): """Save memory state to disk.""" try: with open(self.memory_file, 'wb') as f: pickle.dump(self.memory_state, f) except Exception as e: self.logger.error(f"Failed to save memory state: {e}") def _create_worker_key(self, node_id: str, worker_id: str) -> str: """Create unique key for worker identification.""" return f"{node_id}:{worker_id}" def _is_worker_stuck(self, current: WorkerSnapshot, previous: WorkerSnapshot) -> bool: """Check if worker is stuck based on comparison with previous snapshot.""" worker_key = f"{current.node_id}:{current.worker_id}" # Check each condition individually for detailed logging file_same = current.file == previous.file percentage_same = current.percentage == previous.percentage status_same = current.status == previous.status fps_same = current.fps == previous.fps eta_same = current.eta == previous.eta is_stuck = file_same and percentage_same and status_same and fps_same and eta_same # Log detailed comparison info self.logger.debug(f"Worker {worker_key} stuck check:") self.logger.debug(f" File: '{current.file}' == '{previous.file}' = {file_same}") self.logger.debug(f" Percentage: {current.percentage}% == {previous.percentage}% = {percentage_same}") self.logger.debug(f" Status: '{current.status}' == '{previous.status}' = {status_same}") self.logger.debug(f" FPS: {current.fps} == {previous.fps} = {fps_same}") self.logger.debug(f" ETA: '{current.eta}' == '{previous.eta}' = {eta_same}") self.logger.debug(f" โ†’ Result: {'STUCK' if is_stuck else 'NOT STUCK'}") # Log INFO level when we detect changes (worker making progress) if not is_stuck: if not percentage_same: self.logger.info(f"Worker {worker_key} making progress: {previous.percentage}% โ†’ {current.percentage}%") elif not status_same: self.logger.info(f"Worker {worker_key} status changed: '{previous.status}' โ†’ '{current.status}'") elif not file_same: self.logger.info(f"Worker {worker_key} file changed: '{previous.file}' โ†’ '{current.file}'") return is_stuck def update_workers(self, nodes_data: Dict[str, Any]) -> List[StuckJob]: """Update worker snapshots and detect stuck jobs.""" current_time = datetime.now() current_workers = {} detected_stuck_jobs = [] # Extract current worker states from nodes data for node_id, node_data in nodes_data.items(): workers = node_data.get('workers', {}) for worker_id, worker_data in workers.items(): worker_key = self._create_worker_key(node_id, worker_id) # Create current snapshot current_snapshot = WorkerSnapshot( worker_id=worker_id, node_id=node_id, worker_type=worker_data.get('workerType', 'unknown'), file=worker_data.get('file', ''), percentage=worker_data.get('percentage', -1), status=worker_data.get('status', ''), fps=worker_data.get('fps', 0), eta=worker_data.get('ETA', ''), timestamp=current_time ) current_workers[worker_key] = current_snapshot # Log all workers being tracked self.logger.debug(f"Tracking worker {worker_key}: {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'") # Check if worker was previously tracked if worker_key in self.memory_state.worker_snapshots: previous_snapshot = self.memory_state.worker_snapshots[worker_key] # Check if worker is stuck if self._is_worker_stuck(current_snapshot, previous_snapshot): # Calculate how long it's been stuck time_since_previous = (current_time - previous_snapshot.timestamp).total_seconds() / 60 self.logger.debug(f"Worker {worker_key} has been stuck for {time_since_previous:.1f} minutes since last check") self.logger.debug(f"Worker {worker_key} checking stuck_jobs dict: {list(self.memory_state.stuck_jobs.keys())}") if worker_key in self.memory_state.stuck_jobs: # Already known stuck job, update duration stuck_job = self.memory_state.stuck_jobs[worker_key] stuck_duration = current_time - stuck_job.first_seen stuck_job.stuck_duration_minutes = stuck_duration.total_seconds() / 60 stuck_job.worker_snapshot = current_snapshot self.logger.debug(f"Worker {worker_key} known stuck job - duration: {stuck_job.stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min") if stuck_job.stuck_duration_minutes >= self.stuck_threshold_minutes: self.logger.debug(f"Worker {worker_key} EXCEEDS threshold - adding to detected stuck jobs") detected_stuck_jobs.append(stuck_job) else: self.logger.debug(f"Worker {worker_key} below threshold - not flagging yet") else: # New stuck job detected - add to memory immediately to start tracking first_seen = previous_snapshot.timestamp stuck_duration = current_time - first_seen stuck_duration_minutes = stuck_duration.total_seconds() / 60 self.logger.debug(f"Worker {worker_key} NEW stuck job - first_seen: {first_seen}, current: {current_time}") self.logger.debug(f"Worker {worker_key} NEW stuck job - duration: {stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min") # Create stuck job entry immediately to track duration across runs stuck_job = StuckJob( worker_snapshot=current_snapshot, first_seen=first_seen, stuck_duration_minutes=stuck_duration_minutes, is_stuck=True ) self.memory_state.stuck_jobs[worker_key] = stuck_job if stuck_duration_minutes >= self.stuck_threshold_minutes: self.logger.debug(f"Worker {worker_key} NEW stuck job EXCEEDS threshold - flagging for clearing") detected_stuck_jobs.append(stuck_job) else: self.logger.debug(f"Worker {worker_key} NEW stuck job below threshold - tracking in memory") else: # Worker is not stuck, remove from stuck jobs if present if worker_key in self.memory_state.stuck_jobs: del self.memory_state.stuck_jobs[worker_key] self.logger.info(f"Worker {worker_key} is no longer stuck") else: # New worker, start tracking it self.logger.info(f"New worker detected: {worker_key} - {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'") # Clean up stuck jobs for workers that no longer exist stuck_jobs_to_remove = [] for worker_key in self.memory_state.stuck_jobs: if worker_key not in current_workers: stuck_jobs_to_remove.append(worker_key) for worker_key in stuck_jobs_to_remove: del self.memory_state.stuck_jobs[worker_key] self.logger.info(f"Removed stuck job tracking for missing worker: {worker_key}") # Update memory state self.memory_state.worker_snapshots = current_workers self.memory_state.last_updated = current_time # Save to disk self._save_memory_state() return detected_stuck_jobs def get_stuck_jobs(self) -> List[StuckJob]: """Get current list of stuck jobs.""" return list(self.memory_state.stuck_jobs.values()) def clear_memory(self): """Clear all memory state.""" self.memory_state = MemoryState( worker_snapshots={}, stuck_jobs={}, last_updated=datetime.now() ) self._save_memory_state() self.logger.info("Memory state cleared") class TdarrMonitor: def __init__(self, server_url: str, timeout: int = 30, enable_stuck_detection: bool = False, stuck_threshold_minutes: int = 30, memory_file: str = ".claude/tmp/tdarr_memory.pkl", discord_webhook_url: str = None, enable_discord_alerts: bool = False, log_file: Optional[str] = None, log_level: str = "INFO", clear_hung_workers: bool = False): """Initialize Tdarr monitor with server URL.""" self.server_url = server_url.rstrip('/') self.timeout = timeout self.session = requests.Session() self.enable_stuck_detection = enable_stuck_detection self.enable_discord_alerts = enable_discord_alerts self.clear_hung_workers_enabled = clear_hung_workers # Configure logging first self._setup_logging(log_file, log_level) self.logger = logging.getLogger(__name__) # Initialize stuck job detector if enabled self.stuck_detector = None if enable_stuck_detection: self.stuck_detector = StuckJobDetector(memory_file, stuck_threshold_minutes) # Initialize Discord notifier if enabled self.discord_notifier = None if enable_discord_alerts: if discord_webhook_url: self.discord_notifier = DiscordNotifier(discord_webhook_url) else: self.logger.warning("Discord alerts enabled but no webhook URL provided") def _setup_logging(self, log_file: Optional[str] = None, log_level: str = "INFO"): """Configure logging with optional file rotation.""" # Clear any existing handlers root_logger = logging.getLogger() root_logger.handlers.clear() # Set log level level = getattr(logging, log_level.upper(), logging.INFO) root_logger.setLevel(level) # Create formatters console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Enhanced file formatter for better readability file_formatter = logging.Formatter( '%(asctime)s [%(levelname)-7s] %(module)-12s | %(message)s', datefmt='%H:%M:%S' ) # Console handler (for interactive use) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(console_formatter) root_logger.addHandler(console_handler) # File handler with rotation (if log_file specified) if log_file: # Ensure log directory exists log_dir = os.path.dirname(log_file) if log_dir: os.makedirs(log_dir, exist_ok=True) # Rotating file handler: 10MB max, keep 5 backup files file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setFormatter(file_formatter) root_logger.addHandler(file_handler) def _make_request(self, endpoint: str) -> Optional[Dict[str, Any]]: """Make HTTP request to Tdarr API endpoint.""" url = urljoin(self.server_url, endpoint) try: response = self.session.get(url, timeout=self.timeout) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: self.logger.error(f"Request failed for {url}: {e}") return None except json.JSONDecodeError as e: self.logger.error(f"JSON decode failed for {url}: {e}") return None def clear_hung_workers(self, stuck_jobs: Optional[List[StuckJob]] = None) -> bool: """Clear hung workers via Tdarr API using kill-worker endpoint. Args: stuck_jobs: List of StuckJob objects to clear. Each contains worker and node information. Returns: True if all workers cleared successfully, False otherwise """ if not stuck_jobs: self.logger.info("No stuck jobs provided for clearing hung workers") return True success_count = 0 total_count = len(stuck_jobs) for stuck_job in stuck_jobs: worker_snapshot = stuck_job.worker_snapshot try: # Use the kill-worker endpoint with correct payload format endpoint = '/api/v2/kill-worker' payload = { "data": { "nodeID": worker_snapshot.node_id, "workerID": worker_snapshot.worker_id } } url = urljoin(self.server_url, endpoint) response = self.session.post(url, json=payload, timeout=self.timeout) response.raise_for_status() self.logger.info(f"Successfully killed hung worker: {worker_snapshot.node_id}:{worker_snapshot.worker_id}") success_count += 1 except requests.exceptions.RequestException as e: self.logger.error(f"Failed to kill worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}") except Exception as e: self.logger.error(f"Unexpected error killing worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}") self.logger.info(f"Cleared {success_count}/{total_count} hung workers") return success_count == total_count def get_server_status(self) -> ServerStatus: """Get overall server status and configuration.""" timestamp = datetime.now().isoformat() # Try to get server info from API data = self._make_request('/api/v2/status') if data: server_status = data.get('status', 'unknown') self.logger.info(f"Server check completed: status={server_status}, version={data.get('version', 'unknown')}") return ServerStatus( timestamp=timestamp, server_url=self.server_url, status=server_status, version=data.get('version'), uptime=data.get('uptime') ) else: self.logger.error("Server check failed: Unable to connect to Tdarr server") return ServerStatus( timestamp=timestamp, server_url=self.server_url, status='offline', error='Unable to connect to Tdarr server' ) def get_queue_status(self) -> QueueStatus: """Get transcoding queue status and statistics.""" timestamp = datetime.now().isoformat() # Get queue information data = self._make_request('/api/v2/get-queue') if data: queue_data = data.get('queue', []) # Calculate queue statistics total_files = len(queue_data) queued_files = len([f for f in queue_data if f.get('status') == 'Queued']) processing_files = len([f for f in queue_data if f.get('status') == 'Processing']) completed_files = len([f for f in queue_data if f.get('status') == 'Completed']) queue_stats = QueueStats( total_files=total_files, queued=queued_files, processing=processing_files, completed=completed_files, queue_items=queue_data[:10] # First 10 items for details ) return QueueStatus( timestamp=timestamp, queue_stats=queue_stats ) else: return QueueStatus( timestamp=timestamp, error='Unable to fetch queue data' ) def get_node_status(self) -> NodeStatus: """Get status of all connected nodes.""" timestamp = datetime.now().isoformat() # Get nodes information (using correct endpoint) data = self._make_request('/api/v2/get-nodes') if data: # Handle the actual data structure returned by Tdarr API nodes_dict = data if isinstance(data, dict) else {} nodes = [] # Process node information online_nodes = [] offline_nodes = [] for node_id, node_data in nodes_dict.items(): node_info = NodeInfo( id=node_id, nodeName=node_data.get('nodeName'), status='online', # Assume online if in response lastSeen=None, version=node_data.get('config', {}).get('version'), platform=node_data.get('config', {}).get('platform_arch_isdocker'), workers={ 'cpu': len([w for w in node_data.get('workers', {}).values() if 'cpu' in w.get('workerType', '').lower()]), 'gpu': len([w for w in node_data.get('workers', {}).values() if 'gpu' in w.get('workerType', '').lower()]) }, processing=list(node_data.get('workers', {}).values()) ) online_nodes.append(node_info) nodes.append(node_data) # Check for stuck jobs if detection is enabled stuck_jobs = [] if self.stuck_detector: try: stuck_jobs = self.stuck_detector.update_workers(nodes_dict) if stuck_jobs: self.logger.warning(f"Detected {len(stuck_jobs)} stuck jobs") for stuck_job in stuck_jobs: self.logger.warning( f"Stuck job: {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id} " f"on file '{stuck_job.worker_snapshot.file}' " f"at {stuck_job.worker_snapshot.percentage}% for {stuck_job.stuck_duration_minutes:.1f} minutes" ) # Clear hung workers if enabled if self.clear_hung_workers_enabled: try: clear_success = self.clear_hung_workers(stuck_jobs) if clear_success: self.logger.info(f"Successfully cleared {len(stuck_jobs)} hung workers") else: self.logger.warning("Some hung workers could not be cleared") except Exception as e: self.logger.error(f"Error clearing hung workers: {e}") # Send Discord notification for stuck jobs if self.discord_notifier: try: self.discord_notifier.send_stuck_job_alert(stuck_jobs) except Exception as e: self.logger.error(f"Failed to send Discord stuck job alert: {e}") except Exception as e: self.logger.error(f"Error in stuck job detection: {e}") node_summary = NodeSummary( total_nodes=len(nodes), online_nodes=len(online_nodes), offline_nodes=len(offline_nodes), online_details=online_nodes, offline_details=offline_nodes ) # Log successful node check with summary if stuck_jobs: self.logger.info(f"Node check completed: {len(nodes)} nodes online, {len(stuck_jobs)} stuck jobs detected") else: self.logger.info(f"Node check completed: {len(nodes)} nodes online, no stuck jobs detected") return NodeStatus( timestamp=timestamp, nodes=nodes, node_summary=node_summary, stuck_jobs=stuck_jobs ) else: self.logger.error("Node check failed: Unable to fetch node data") return NodeStatus( timestamp=timestamp, nodes=[], error='Unable to fetch node data' ) def get_library_status(self) -> LibraryStatus: """Get library scan status and file statistics.""" timestamp = datetime.now().isoformat() # Get library information data = self._make_request('/api/v2/get-libraries') if data: libraries = data.get('libraries', []) library_stats = [] total_files = 0 for lib in libraries: lib_info = LibraryInfo( name=lib.get('name'), path=lib.get('path'), file_count=lib.get('totalFiles', 0), scan_progress=lib.get('scanProgress', 0), last_scan=lib.get('lastScan'), is_scanning=lib.get('isScanning', False) ) library_stats.append(lib_info) total_files += lib_info.file_count scan_status = ScanStatus( total_libraries=len(libraries), total_files=total_files, scanning_libraries=len([l for l in library_stats if l.is_scanning]) ) return LibraryStatus( timestamp=timestamp, libraries=library_stats, scan_status=scan_status ) else: return LibraryStatus( timestamp=timestamp, libraries=[], error='Unable to fetch library data' ) def get_statistics(self) -> StatisticsStatus: """Get overall Tdarr statistics and health metrics.""" timestamp = datetime.now().isoformat() # Get statistics data = self._make_request('/api/v2/get-stats') if data: stats = data.get('stats', {}) statistics = Statistics( total_transcodes=stats.get('totalTranscodes', 0), space_saved=stats.get('spaceSaved', 0), total_files_processed=stats.get('totalFilesProcessed', 0), failed_transcodes=stats.get('failedTranscodes', 0), processing_speed=stats.get('processingSpeed', 0), eta=stats.get('eta') ) return StatisticsStatus( timestamp=timestamp, statistics=statistics ) else: return StatisticsStatus( timestamp=timestamp, error='Unable to fetch statistics' ) def health_check(self) -> HealthStatus: """Perform comprehensive health check.""" timestamp = datetime.now().isoformat() # Server connectivity server_status = self.get_server_status() server_check = HealthCheck( status=server_status.status, healthy=server_status.status == 'good' ) # Node connectivity node_status = self.get_node_status() nodes_healthy = ( node_status.node_summary.online_nodes > 0 if node_status.node_summary else False ) and not node_status.error nodes_check = HealthCheck( status='online' if nodes_healthy else 'offline', healthy=nodes_healthy, online_count=node_status.node_summary.online_nodes if node_status.node_summary else 0, total_count=node_status.node_summary.total_nodes if node_status.node_summary else 0 ) checks = { 'server': server_check, 'nodes': nodes_check } # Determine overall health all_checks_healthy = all(check.healthy for check in checks.values()) overall_status = 'healthy' if all_checks_healthy else 'unhealthy' return HealthStatus( timestamp=timestamp, overall_status=overall_status, checks=checks ) def main(): parser = argparse.ArgumentParser(description='Monitor Tdarr server via API') parser.add_argument('--server', required=True, help='Tdarr server URL (e.g., http://10.10.0.43:8265)') parser.add_argument('--check', choices=['all', 'status', 'queue', 'nodes', 'libraries', 'stats', 'health'], default='health', help='Type of check to perform') parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds') parser.add_argument('--output', choices=['json', 'pretty'], default='pretty', help='Output format') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') parser.add_argument('--detect-stuck', action='store_true', help='Enable stuck job detection') parser.add_argument('--stuck-threshold', type=int, default=30, help='Minutes before job is considered stuck (default: 30)') parser.add_argument('--memory-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl', help='Path to memory state file') parser.add_argument('--clear-memory', action='store_true', help='Clear memory state and exit') parser.add_argument('--discord-webhook', default='https://discord.com/api/webhooks/1404105821549498398/y2Ud1RK9rzFjv58xbypUfQNe3jrL7ZUq1FkQHa4_dfOHm2ylp93z0f4tY0O8Z-vQgKhD', help='Discord webhook URL for notifications (default: configured webhook)') parser.add_argument('--discord-alerts', action='store_true', help='Enable Discord alerts for stuck jobs and system status') parser.add_argument('--discord-test', action='store_true', help='Send test Discord message and exit') parser.add_argument('--log-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_monitor.log', help='Path to log file with rotation (default: /mnt/NV2/Development/claude-home/logs/tdarr_monitor.log)') parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Logging level (default: INFO)') parser.add_argument('--no-log-file', action='store_true', help='Disable file logging, console only') parser.add_argument('--clear-hung-workers', action='store_true', help='Clear hung workers via API call when stuck jobs are detected') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Handle clear memory command if args.clear_memory: if os.path.exists(args.memory_file): os.remove(args.memory_file) print(f"Memory state cleared: {args.memory_file}") else: print(f"Memory file does not exist: {args.memory_file}") sys.exit(0) # Handle Discord test command if args.discord_test: print("Sending Discord test messages...") notifier = DiscordNotifier(args.discord_webhook) # Test content message content_success = notifier.send_content_message( "๐Ÿงช **Tdarr Monitor Test** - Content message working correctly!" ) # Test embed message test_fields = [ DiscordEmbedField("Test Field 1", "This is a test value", True), DiscordEmbedField("Test Field 2", "Another test value", True), ] embed_success = notifier.send_embed_message( title="๐Ÿงช Tdarr Monitor Test", description="This is a test embed message to verify Discord integration is working correctly.", color=0x00ff00, # Green fields=test_fields ) if content_success and embed_success: print("โœ… Discord test successful! Both content and embed messages sent.") sys.exit(0) else: print("โŒ Discord test failed. Check webhook URL and permissions.") sys.exit(1) # Initialize monitor log_file = None if args.no_log_file else args.log_file monitor = TdarrMonitor( args.server, args.timeout, enable_stuck_detection=args.detect_stuck, stuck_threshold_minutes=args.stuck_threshold, memory_file=args.memory_file, discord_webhook_url=args.discord_webhook, enable_discord_alerts=args.discord_alerts, log_file=log_file, log_level=args.log_level, clear_hung_workers=args.clear_hung_workers ) # Perform requested check monitor.logger.info(f"Starting Tdarr monitoring check: {args.check}, stuck_detection={'enabled' if args.detect_stuck else 'disabled'}, clear_workers={'enabled' if args.clear_hung_workers else 'disabled'}") result = None if args.check == 'all': result = { 'server_status': monitor.get_server_status(), 'node_status': monitor.get_node_status() } elif args.check == 'status': result = monitor.get_server_status() elif args.check == 'queue': result = monitor.get_queue_status() elif args.check == 'nodes': result = monitor.get_node_status() elif args.check == 'libraries': result = monitor.get_library_status() elif args.check == 'stats': result = monitor.get_statistics() elif args.check == 'health': result = monitor.health_check() # Output results if args.output == 'json': # Convert dataclasses to dictionaries for JSON serialization if args.check == 'all': json_result = {} for key, value in result.items(): json_result[key] = asdict(value) print(json.dumps(json_result, indent=2)) else: print(json.dumps(asdict(result), indent=2)) else: # Pretty print format print(f"=== Tdarr Monitor Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") if args.check == 'health' or (hasattr(result, 'overall_status') and result.overall_status): health = result if hasattr(result, 'overall_status') else None if health: status = health.overall_status print(f"Overall Status: {status.upper()}") if health.checks: print("\nHealth Checks:") for check_name, check_data in health.checks.items(): status_icon = "โœ“" if check_data.healthy else "โœ—" print(f" {status_icon} {check_name.title()}: {asdict(check_data)}") # Display stuck jobs if present if args.detect_stuck: if hasattr(result, 'stuck_jobs') and result.stuck_jobs: print(f"\n=== STUCK JOBS DETECTED ({len(result.stuck_jobs)}) ===") for stuck_job in result.stuck_jobs: print(f"๐Ÿšจ {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}") print(f" File: {stuck_job.worker_snapshot.file}") print(f" Progress: {stuck_job.worker_snapshot.percentage}%") print(f" Status: {stuck_job.worker_snapshot.status}") print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes") print() elif args.check in ['nodes', 'all']: # Check all results for stuck jobs if 'all' is selected stuck_found = False if args.check == 'all' and isinstance(result, dict): for section, data in result.items(): if hasattr(data, 'stuck_jobs') and data.stuck_jobs: if not stuck_found: print(f"\n=== STUCK JOBS DETECTED ===") stuck_found = True for stuck_job in data.stuck_jobs: print(f"๐Ÿšจ {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}") print(f" File: {stuck_job.worker_snapshot.file}") print(f" Progress: {stuck_job.worker_snapshot.percentage}%") print(f" Status: {stuck_job.worker_snapshot.status}") print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes") print() if not stuck_found: print(f"\nโœ… No stuck jobs detected (threshold: {args.stuck_threshold} minutes)") if args.check == 'all': for section, data in result.items(): print(f"\n=== {section.replace('_', ' ').title()} ===") # Don't print stuck_jobs in JSON format as we already displayed them above if hasattr(data, 'stuck_jobs'): data_dict = asdict(data) data_dict.pop('stuck_jobs', None) print(json.dumps(data_dict, indent=2)) else: print(json.dumps(asdict(data), indent=2)) elif args.check != 'health': # Don't print stuck_jobs in JSON format as we already displayed them above if hasattr(result, 'stuck_jobs'): result_dict = asdict(result) result_dict.pop('stuck_jobs', None) print(json.dumps(result_dict, indent=2)) else: print(json.dumps(asdict(result), indent=2)) # Exit with appropriate code if result: # Check for unhealthy status in health check if isinstance(result, HealthStatus) and result.overall_status == 'unhealthy': sys.exit(1) # Check for errors in individual status objects (all status classes except HealthStatus have error attribute) elif (isinstance(result, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus)) and result.error): sys.exit(1) # Check for errors in 'all' results elif isinstance(result, dict): for status_obj in result.values(): if (isinstance(status_obj, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus)) and status_obj.error): sys.exit(1) sys.exit(0) if __name__ == '__main__': main()