- Created complete gaming detection and priority system - Added gaming schedule configuration and enforcement - Implemented Steam library monitoring with auto-detection - Built comprehensive game process detection for multiple platforms - Added gaming-aware Tdarr worker management with priority controls - Created emergency gaming mode for immediate worker shutdown - Integrated Discord notifications for gaming state changes - Replaced old bash monitoring with enhanced Python monitoring system - Added persistent state management and memory tracking - Implemented configurable gaming time windows and schedules - Updated .gitignore to exclude logs directories 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1227 lines
50 KiB
Python
Executable File
1227 lines
50 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Tdarr API Monitoring Script with Stuck Job Detection and Discord Alerts
|
|
|
|
Monitors Tdarr server via its web API endpoints:
|
|
- Server status and health
|
|
- Queue status and statistics
|
|
- Node status and performance
|
|
- Library scan progress
|
|
- Worker activity
|
|
- Stuck job detection with configurable timeouts
|
|
- Discord notifications for alerts and status updates
|
|
|
|
Usage:
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check queue
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes
|
|
|
|
# Enable stuck job detection (30 minute threshold)
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck
|
|
|
|
# Custom stuck threshold (15 minutes)
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all --detect-stuck --stuck-threshold 15
|
|
|
|
# Enable Discord alerts for stuck jobs (uses default webhook)
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --discord-alerts
|
|
|
|
# Automatically clear hung workers when detected
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers
|
|
|
|
# Full monitoring with automatic clearing and Discord alerts
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers --discord-alerts
|
|
|
|
# Test Discord integration (uses default webhook)
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --discord-test
|
|
|
|
# Enable file logging with custom path and debug level
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --log-file /tmp/tdarr_debug.log --log-level DEBUG
|
|
|
|
# Disable file logging (console only)
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --no-log-file
|
|
|
|
# Clear memory state
|
|
python3 tdarr_monitor.py --server http://10.10.0.43:8265 --clear-memory
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import sys
|
|
import os
|
|
import pickle
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Union
|
|
import requests
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
|
|
@dataclass
|
|
class WorkerSnapshot:
|
|
worker_id: str
|
|
node_id: str
|
|
worker_type: str
|
|
file: str
|
|
percentage: float
|
|
status: str
|
|
fps: int
|
|
eta: str
|
|
timestamp: datetime
|
|
|
|
|
|
@dataclass
|
|
class StuckJob:
|
|
worker_snapshot: WorkerSnapshot
|
|
first_seen: datetime
|
|
stuck_duration_minutes: float
|
|
is_stuck: bool = True
|
|
|
|
|
|
@dataclass
|
|
class MemoryState:
|
|
worker_snapshots: Dict[str, WorkerSnapshot]
|
|
stuck_jobs: Dict[str, StuckJob]
|
|
last_updated: datetime
|
|
|
|
|
|
@dataclass
|
|
class ServerStatus:
|
|
timestamp: str
|
|
server_url: str
|
|
status: str
|
|
error: Optional[str] = None
|
|
version: Optional[str] = None
|
|
server_id: Optional[str] = None
|
|
uptime: Optional[str] = None
|
|
system_info: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
@dataclass
|
|
class QueueStats:
|
|
total_files: int
|
|
queued: int
|
|
processing: int
|
|
completed: int
|
|
queue_items: List[Dict[str, Any]]
|
|
|
|
|
|
@dataclass
|
|
class QueueStatus:
|
|
timestamp: str
|
|
queue_stats: Optional[QueueStats] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class NodeInfo:
|
|
id: Optional[str]
|
|
nodeName: Optional[str]
|
|
status: str
|
|
lastSeen: Optional[int]
|
|
version: Optional[str]
|
|
platform: Optional[str]
|
|
workers: Dict[str, int]
|
|
processing: List[Dict[str, Any]]
|
|
|
|
|
|
@dataclass
|
|
class NodeSummary:
|
|
total_nodes: int
|
|
online_nodes: int
|
|
offline_nodes: int
|
|
online_details: List[NodeInfo]
|
|
offline_details: List[NodeInfo]
|
|
|
|
|
|
@dataclass
|
|
class NodeStatus:
|
|
timestamp: str
|
|
nodes: List[Dict[str, Any]]
|
|
node_summary: Optional[NodeSummary] = None
|
|
stuck_jobs: List[StuckJob] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class LibraryInfo:
|
|
name: Optional[str]
|
|
path: Optional[str]
|
|
file_count: int
|
|
scan_progress: int
|
|
last_scan: Optional[str]
|
|
is_scanning: bool
|
|
|
|
|
|
@dataclass
|
|
class ScanStatus:
|
|
total_libraries: int
|
|
total_files: int
|
|
scanning_libraries: int
|
|
|
|
|
|
@dataclass
|
|
class LibraryStatus:
|
|
timestamp: str
|
|
libraries: List[LibraryInfo]
|
|
scan_status: Optional[ScanStatus] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class Statistics:
|
|
total_transcodes: int
|
|
space_saved: int
|
|
total_files_processed: int
|
|
failed_transcodes: int
|
|
processing_speed: int
|
|
eta: Optional[str]
|
|
|
|
|
|
@dataclass
|
|
class StatisticsStatus:
|
|
timestamp: str
|
|
statistics: Optional[Statistics] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class HealthCheck:
|
|
status: str
|
|
healthy: bool
|
|
online_count: Optional[int] = None
|
|
total_count: Optional[int] = None
|
|
accessible: Optional[bool] = None
|
|
total_items: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class HealthStatus:
|
|
timestamp: str
|
|
overall_status: str
|
|
checks: Dict[str, HealthCheck]
|
|
|
|
|
|
@dataclass
|
|
class DiscordEmbedField:
|
|
name: str
|
|
value: str
|
|
inline: bool = False
|
|
|
|
|
|
@dataclass
|
|
class DiscordEmbed:
|
|
title: str
|
|
description: str
|
|
color: int
|
|
fields: List[DiscordEmbedField] = None
|
|
timestamp: str = None
|
|
|
|
def __post_init__(self):
|
|
if self.fields is None:
|
|
self.fields = []
|
|
if self.timestamp is None:
|
|
self.timestamp = datetime.now().isoformat()
|
|
|
|
|
|
class DiscordNotifier:
|
|
def __init__(self, webhook_url: str, timeout: int = 10):
|
|
"""Initialize Discord notifier with webhook URL."""
|
|
self.webhook_url = webhook_url
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
self.logger = logging.getLogger(f"{__name__}.DiscordNotifier")
|
|
|
|
def send_content_message(self, content: str, username: str = "Tdarr Monitor") -> bool:
|
|
"""Send a simple content message to Discord.
|
|
|
|
Args:
|
|
content: The message content to send
|
|
username: Bot username to display
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
payload = {
|
|
"content": content,
|
|
"username": username
|
|
}
|
|
|
|
return self._send_webhook(payload)
|
|
|
|
def send_embed_message(self,
|
|
title: str,
|
|
description: str,
|
|
color: int = 0xff6b6b, # Red by default
|
|
fields: List[DiscordEmbedField] = None,
|
|
username: str = "Tdarr Monitor") -> bool:
|
|
"""Send an embed message to Discord.
|
|
|
|
Args:
|
|
title: Embed title
|
|
description: Embed description
|
|
color: Embed color (hex integer, default red)
|
|
fields: List of embed fields
|
|
username: Bot username to display
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
embed = DiscordEmbed(
|
|
title=title,
|
|
description=description,
|
|
color=color,
|
|
fields=fields or []
|
|
)
|
|
|
|
payload = {
|
|
"username": username,
|
|
"embeds": [asdict(embed)]
|
|
}
|
|
|
|
return self._send_webhook(payload)
|
|
|
|
def send_stuck_job_alert(self, stuck_jobs: List[StuckJob]) -> bool:
|
|
"""Send alert for stuck jobs using embed format.
|
|
|
|
Args:
|
|
stuck_jobs: List of stuck jobs to report
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if not stuck_jobs:
|
|
return True
|
|
|
|
# Create embed fields for each stuck job
|
|
fields = []
|
|
for i, stuck_job in enumerate(stuck_jobs[:10]): # Limit to 10 jobs (Discord embed field limit is 25)
|
|
ws = stuck_job.worker_snapshot
|
|
field_value = (
|
|
f"**File:** {os.path.basename(ws.file)}\n"
|
|
f"**Status:** {ws.status}\n"
|
|
f"**Progress:** {ws.percentage}%\n"
|
|
f"**Duration:** {stuck_job.stuck_duration_minutes:.1f} minutes\n"
|
|
f"**Node:** {ws.node_id}"
|
|
)
|
|
|
|
fields.append(DiscordEmbedField(
|
|
name=f"Stuck Job {i+1}: {ws.worker_id}",
|
|
value=field_value,
|
|
inline=True
|
|
))
|
|
|
|
# Add summary field if there are more jobs
|
|
if len(stuck_jobs) > 10:
|
|
fields.append(DiscordEmbedField(
|
|
name="Additional Jobs",
|
|
value=f"... and {len(stuck_jobs) - 10} more stuck jobs",
|
|
inline=False
|
|
))
|
|
|
|
title = f"🚨 Tdarr Stuck Jobs Detected ({len(stuck_jobs)})"
|
|
description = (
|
|
f"Detected {len(stuck_jobs)} stuck job{'s' if len(stuck_jobs) != 1 else ''} "
|
|
f"in your Tdarr system. These jobs will follow the requested clear behavior."
|
|
)
|
|
|
|
return self.send_embed_message(
|
|
title=title,
|
|
description=description,
|
|
color=0xff6b6b, # Red color for alerts
|
|
fields=fields
|
|
)
|
|
|
|
def send_system_status(self,
|
|
server_status: ServerStatus,
|
|
node_status: NodeStatus,
|
|
stuck_jobs: List[StuckJob] = None) -> bool:
|
|
"""Send system status summary using embed format.
|
|
|
|
Args:
|
|
server_status: Server status information
|
|
node_status: Node status information
|
|
stuck_jobs: Optional stuck jobs list
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
# Determine overall health color
|
|
is_healthy = (
|
|
server_status.status == "good" and
|
|
not server_status.error and
|
|
not node_status.error and
|
|
(not stuck_jobs or len(stuck_jobs) == 0)
|
|
)
|
|
|
|
color = 0x28a745 if is_healthy else 0xff6b6b # Green if healthy, red if not
|
|
|
|
# Build description
|
|
description_parts = [
|
|
f"**Server Status:** {server_status.status.title()}",
|
|
f"**Version:** {getattr(server_status, 'version', 'Unknown')}"
|
|
]
|
|
|
|
if node_status.node_summary:
|
|
description_parts.extend([
|
|
f"**Total Nodes:** {node_status.node_summary.total_nodes}",
|
|
f"**Online Nodes:** {node_status.node_summary.online_nodes}",
|
|
f"**Offline Nodes:** {node_status.node_summary.offline_nodes}"
|
|
])
|
|
|
|
if stuck_jobs:
|
|
description_parts.append(f"**Stuck Jobs:** {len(stuck_jobs)}")
|
|
|
|
# Add node details as fields
|
|
fields = []
|
|
if node_status.node_summary and node_status.node_summary.online_details:
|
|
for node in node_status.node_summary.online_details:
|
|
active_workers = len(node.processing) if node.processing else 0
|
|
field_value = (
|
|
f"**Status:** Online\n"
|
|
f"**Platform:** {node.platform or 'Unknown'}\n"
|
|
f"**Active Workers:** {active_workers}\n"
|
|
f"**CPU Workers:** {node.workers.get('cpu', 0)}\n"
|
|
f"**GPU Workers:** {node.workers.get('gpu', 0)}"
|
|
)
|
|
|
|
fields.append(DiscordEmbedField(
|
|
name=f"📡 {node.nodeName or node.id}",
|
|
value=field_value,
|
|
inline=True
|
|
))
|
|
|
|
title = "📊 Tdarr System Status"
|
|
if not is_healthy:
|
|
title = "⚠️ Tdarr System Alert"
|
|
|
|
return self.send_embed_message(
|
|
title=title,
|
|
description="\n".join(description_parts),
|
|
color=color,
|
|
fields=fields
|
|
)
|
|
|
|
def _send_webhook(self, payload: Dict[str, Any]) -> bool:
|
|
"""Send payload to Discord webhook.
|
|
|
|
Args:
|
|
payload: JSON payload to send
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
response = self.session.post(
|
|
self.webhook_url,
|
|
json=payload,
|
|
timeout=self.timeout
|
|
)
|
|
response.raise_for_status()
|
|
self.logger.info("Discord notification sent successfully")
|
|
return True
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Failed to send Discord notification: {e}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error sending Discord notification: {e}")
|
|
return False
|
|
|
|
|
|
class StuckJobDetector:
|
|
def __init__(self, memory_file: str = "/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl", stuck_threshold_minutes: int = 30):
|
|
"""Initialize stuck job detector with memory persistence."""
|
|
self.memory_file = os.path.abspath(memory_file) # Use absolute path
|
|
self.stuck_threshold_minutes = stuck_threshold_minutes
|
|
self.logger = logging.getLogger(f"{__name__}.StuckJobDetector")
|
|
self.logger.debug(f"Using memory file: {self.memory_file}")
|
|
self.memory_state = self._load_memory_state()
|
|
|
|
# Ensure memory directory exists
|
|
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
|
|
|
|
def _load_memory_state(self) -> MemoryState:
|
|
"""Load memory state from disk or create new one."""
|
|
if os.path.exists(self.memory_file):
|
|
try:
|
|
with open(self.memory_file, 'rb') as f:
|
|
memory_state = pickle.load(f)
|
|
self.logger.debug(f"Loaded memory state: {len(memory_state.worker_snapshots)} workers, {len(memory_state.stuck_jobs)} stuck jobs")
|
|
return memory_state
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to load memory state: {e}, creating new state")
|
|
else:
|
|
self.logger.debug(f"Memory file {self.memory_file} does not exist, creating new state")
|
|
|
|
return MemoryState(
|
|
worker_snapshots={},
|
|
stuck_jobs={},
|
|
last_updated=datetime.now()
|
|
)
|
|
|
|
def _save_memory_state(self):
|
|
"""Save memory state to disk."""
|
|
try:
|
|
with open(self.memory_file, 'wb') as f:
|
|
pickle.dump(self.memory_state, f)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save memory state: {e}")
|
|
|
|
def _create_worker_key(self, node_id: str, worker_id: str) -> str:
|
|
"""Create unique key for worker identification."""
|
|
return f"{node_id}:{worker_id}"
|
|
|
|
def _is_worker_stuck(self, current: WorkerSnapshot, previous: WorkerSnapshot) -> bool:
|
|
"""Check if worker is stuck based on comparison with previous snapshot."""
|
|
worker_key = f"{current.node_id}:{current.worker_id}"
|
|
|
|
# Check each condition individually for detailed logging
|
|
file_same = current.file == previous.file
|
|
percentage_same = current.percentage == previous.percentage
|
|
status_same = current.status == previous.status
|
|
fps_same = current.fps == previous.fps
|
|
eta_same = current.eta == previous.eta
|
|
|
|
is_stuck = file_same and percentage_same and status_same and fps_same and eta_same
|
|
|
|
# Log detailed comparison info
|
|
self.logger.debug(f"Worker {worker_key} stuck check:")
|
|
self.logger.debug(f" File: '{current.file}' == '{previous.file}' = {file_same}")
|
|
self.logger.debug(f" Percentage: {current.percentage}% == {previous.percentage}% = {percentage_same}")
|
|
self.logger.debug(f" Status: '{current.status}' == '{previous.status}' = {status_same}")
|
|
self.logger.debug(f" FPS: {current.fps} == {previous.fps} = {fps_same}")
|
|
self.logger.debug(f" ETA: '{current.eta}' == '{previous.eta}' = {eta_same}")
|
|
self.logger.debug(f" → Result: {'STUCK' if is_stuck else 'NOT STUCK'}")
|
|
|
|
# Log INFO level when we detect changes (worker making progress)
|
|
if not is_stuck:
|
|
if not percentage_same:
|
|
self.logger.info(f"Worker {worker_key} making progress: {previous.percentage}% → {current.percentage}%")
|
|
elif not status_same:
|
|
self.logger.info(f"Worker {worker_key} status changed: '{previous.status}' → '{current.status}'")
|
|
elif not file_same:
|
|
self.logger.info(f"Worker {worker_key} file changed: '{previous.file}' → '{current.file}'")
|
|
|
|
return is_stuck
|
|
|
|
def update_workers(self, nodes_data: Dict[str, Any]) -> List[StuckJob]:
|
|
"""Update worker snapshots and detect stuck jobs."""
|
|
current_time = datetime.now()
|
|
current_workers = {}
|
|
detected_stuck_jobs = []
|
|
|
|
# Extract current worker states from nodes data
|
|
for node_id, node_data in nodes_data.items():
|
|
workers = node_data.get('workers', {})
|
|
for worker_id, worker_data in workers.items():
|
|
worker_key = self._create_worker_key(node_id, worker_id)
|
|
|
|
# Create current snapshot
|
|
current_snapshot = WorkerSnapshot(
|
|
worker_id=worker_id,
|
|
node_id=node_id,
|
|
worker_type=worker_data.get('workerType', 'unknown'),
|
|
file=worker_data.get('file', ''),
|
|
percentage=worker_data.get('percentage', -1),
|
|
status=worker_data.get('status', ''),
|
|
fps=worker_data.get('fps', 0),
|
|
eta=worker_data.get('ETA', ''),
|
|
timestamp=current_time
|
|
)
|
|
|
|
current_workers[worker_key] = current_snapshot
|
|
|
|
# Log all workers being tracked
|
|
self.logger.debug(f"Tracking worker {worker_key}: {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'")
|
|
|
|
# Check if worker was previously tracked
|
|
if worker_key in self.memory_state.worker_snapshots:
|
|
previous_snapshot = self.memory_state.worker_snapshots[worker_key]
|
|
|
|
# Check if worker is stuck
|
|
if self._is_worker_stuck(current_snapshot, previous_snapshot):
|
|
# Calculate how long it's been stuck
|
|
time_since_previous = (current_time - previous_snapshot.timestamp).total_seconds() / 60
|
|
self.logger.debug(f"Worker {worker_key} has been stuck for {time_since_previous:.1f} minutes since last check")
|
|
self.logger.debug(f"Worker {worker_key} checking stuck_jobs dict: {list(self.memory_state.stuck_jobs.keys())}")
|
|
|
|
if worker_key in self.memory_state.stuck_jobs:
|
|
# Already known stuck job, update duration
|
|
stuck_job = self.memory_state.stuck_jobs[worker_key]
|
|
stuck_duration = current_time - stuck_job.first_seen
|
|
stuck_job.stuck_duration_minutes = stuck_duration.total_seconds() / 60
|
|
stuck_job.worker_snapshot = current_snapshot
|
|
|
|
self.logger.debug(f"Worker {worker_key} known stuck job - duration: {stuck_job.stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min")
|
|
if stuck_job.stuck_duration_minutes >= self.stuck_threshold_minutes:
|
|
self.logger.debug(f"Worker {worker_key} EXCEEDS threshold - adding to detected stuck jobs")
|
|
detected_stuck_jobs.append(stuck_job)
|
|
else:
|
|
self.logger.debug(f"Worker {worker_key} below threshold - not flagging yet")
|
|
else:
|
|
# New stuck job detected - add to memory immediately to start tracking
|
|
first_seen = previous_snapshot.timestamp
|
|
stuck_duration = current_time - first_seen
|
|
stuck_duration_minutes = stuck_duration.total_seconds() / 60
|
|
|
|
self.logger.debug(f"Worker {worker_key} NEW stuck job - first_seen: {first_seen}, current: {current_time}")
|
|
self.logger.debug(f"Worker {worker_key} NEW stuck job - duration: {stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min")
|
|
|
|
# Create stuck job entry immediately to track duration across runs
|
|
stuck_job = StuckJob(
|
|
worker_snapshot=current_snapshot,
|
|
first_seen=first_seen,
|
|
stuck_duration_minutes=stuck_duration_minutes,
|
|
is_stuck=True
|
|
)
|
|
self.memory_state.stuck_jobs[worker_key] = stuck_job
|
|
|
|
if stuck_duration_minutes >= self.stuck_threshold_minutes:
|
|
self.logger.debug(f"Worker {worker_key} NEW stuck job EXCEEDS threshold - flagging for clearing")
|
|
detected_stuck_jobs.append(stuck_job)
|
|
else:
|
|
self.logger.debug(f"Worker {worker_key} NEW stuck job below threshold - tracking in memory")
|
|
else:
|
|
# Worker is not stuck, remove from stuck jobs if present
|
|
if worker_key in self.memory_state.stuck_jobs:
|
|
del self.memory_state.stuck_jobs[worker_key]
|
|
self.logger.info(f"Worker {worker_key} is no longer stuck")
|
|
else:
|
|
# New worker, start tracking it
|
|
self.logger.info(f"New worker detected: {worker_key} - {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'")
|
|
|
|
# Clean up stuck jobs for workers that no longer exist
|
|
stuck_jobs_to_remove = []
|
|
for worker_key in self.memory_state.stuck_jobs:
|
|
if worker_key not in current_workers:
|
|
stuck_jobs_to_remove.append(worker_key)
|
|
|
|
for worker_key in stuck_jobs_to_remove:
|
|
del self.memory_state.stuck_jobs[worker_key]
|
|
self.logger.info(f"Removed stuck job tracking for missing worker: {worker_key}")
|
|
|
|
# Update memory state
|
|
self.memory_state.worker_snapshots = current_workers
|
|
self.memory_state.last_updated = current_time
|
|
|
|
# Save to disk
|
|
self._save_memory_state()
|
|
|
|
return detected_stuck_jobs
|
|
|
|
def get_stuck_jobs(self) -> List[StuckJob]:
|
|
"""Get current list of stuck jobs."""
|
|
return list(self.memory_state.stuck_jobs.values())
|
|
|
|
def clear_memory(self):
|
|
"""Clear all memory state."""
|
|
self.memory_state = MemoryState(
|
|
worker_snapshots={},
|
|
stuck_jobs={},
|
|
last_updated=datetime.now()
|
|
)
|
|
self._save_memory_state()
|
|
self.logger.info("Memory state cleared")
|
|
|
|
|
|
class TdarrMonitor:
|
|
def __init__(self, server_url: str, timeout: int = 30, enable_stuck_detection: bool = False,
|
|
stuck_threshold_minutes: int = 30, memory_file: str = ".claude/tmp/tdarr_memory.pkl",
|
|
discord_webhook_url: str = None, enable_discord_alerts: bool = False,
|
|
log_file: Optional[str] = None, log_level: str = "INFO", clear_hung_workers: bool = False):
|
|
"""Initialize Tdarr monitor with server URL."""
|
|
self.server_url = server_url.rstrip('/')
|
|
self.timeout = timeout
|
|
self.session = requests.Session()
|
|
self.enable_stuck_detection = enable_stuck_detection
|
|
self.enable_discord_alerts = enable_discord_alerts
|
|
self.clear_hung_workers_enabled = clear_hung_workers
|
|
|
|
# Configure logging first
|
|
self._setup_logging(log_file, log_level)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Initialize stuck job detector if enabled
|
|
self.stuck_detector = None
|
|
if enable_stuck_detection:
|
|
self.stuck_detector = StuckJobDetector(memory_file, stuck_threshold_minutes)
|
|
|
|
# Initialize Discord notifier if enabled
|
|
self.discord_notifier = None
|
|
if enable_discord_alerts:
|
|
if discord_webhook_url:
|
|
self.discord_notifier = DiscordNotifier(discord_webhook_url)
|
|
else:
|
|
self.logger.warning("Discord alerts enabled but no webhook URL provided")
|
|
|
|
def _setup_logging(self, log_file: Optional[str] = None, log_level: str = "INFO"):
|
|
"""Configure logging with optional file rotation."""
|
|
# Clear any existing handlers
|
|
root_logger = logging.getLogger()
|
|
root_logger.handlers.clear()
|
|
|
|
# Set log level
|
|
level = getattr(logging, log_level.upper(), logging.INFO)
|
|
root_logger.setLevel(level)
|
|
|
|
# Create formatters
|
|
console_formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
# Enhanced file formatter for better readability
|
|
file_formatter = logging.Formatter(
|
|
'%(asctime)s [%(levelname)-7s] %(module)-12s | %(message)s',
|
|
datefmt='%H:%M:%S'
|
|
)
|
|
|
|
# Console handler (for interactive use)
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setFormatter(console_formatter)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
# File handler with rotation (if log_file specified)
|
|
if log_file:
|
|
# Ensure log directory exists
|
|
log_dir = os.path.dirname(log_file)
|
|
if log_dir:
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Rotating file handler: 10MB max, keep 5 backup files
|
|
file_handler = logging.handlers.RotatingFileHandler(
|
|
log_file,
|
|
maxBytes=10 * 1024 * 1024, # 10MB
|
|
backupCount=5,
|
|
encoding='utf-8'
|
|
)
|
|
file_handler.setFormatter(file_formatter)
|
|
root_logger.addHandler(file_handler)
|
|
|
|
def _make_request(self, endpoint: str) -> Optional[Dict[str, Any]]:
|
|
"""Make HTTP request to Tdarr API endpoint."""
|
|
url = urljoin(self.server_url, endpoint)
|
|
|
|
try:
|
|
response = self.session.get(url, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Request failed for {url}: {e}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"JSON decode failed for {url}: {e}")
|
|
return None
|
|
|
|
def clear_hung_workers(self, stuck_jobs: Optional[List[StuckJob]] = None) -> bool:
|
|
"""Clear hung workers via Tdarr API using kill-worker endpoint.
|
|
|
|
Args:
|
|
stuck_jobs: List of StuckJob objects to clear. Each contains worker and node information.
|
|
|
|
Returns:
|
|
True if all workers cleared successfully, False otherwise
|
|
"""
|
|
if not stuck_jobs:
|
|
self.logger.info("No stuck jobs provided for clearing hung workers")
|
|
return True
|
|
|
|
success_count = 0
|
|
total_count = len(stuck_jobs)
|
|
|
|
for stuck_job in stuck_jobs:
|
|
worker_snapshot = stuck_job.worker_snapshot
|
|
try:
|
|
# Use the kill-worker endpoint with correct payload format
|
|
endpoint = '/api/v2/kill-worker'
|
|
payload = {
|
|
"data": {
|
|
"nodeID": worker_snapshot.node_id,
|
|
"workerID": worker_snapshot.worker_id
|
|
}
|
|
}
|
|
|
|
url = urljoin(self.server_url, endpoint)
|
|
response = self.session.post(url, json=payload, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
self.logger.info(f"Successfully killed hung worker: {worker_snapshot.node_id}:{worker_snapshot.worker_id}")
|
|
success_count += 1
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Failed to kill worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}")
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error killing worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}")
|
|
|
|
self.logger.info(f"Cleared {success_count}/{total_count} hung workers")
|
|
return success_count == total_count
|
|
|
|
def get_server_status(self) -> ServerStatus:
|
|
"""Get overall server status and configuration."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Try to get server info from API
|
|
data = self._make_request('/api/v2/status')
|
|
if data:
|
|
server_status = data.get('status', 'unknown')
|
|
self.logger.info(f"Server check completed: status={server_status}, version={data.get('version', 'unknown')}")
|
|
return ServerStatus(
|
|
timestamp=timestamp,
|
|
server_url=self.server_url,
|
|
status=server_status,
|
|
version=data.get('version'),
|
|
uptime=data.get('uptime')
|
|
)
|
|
else:
|
|
self.logger.error("Server check failed: Unable to connect to Tdarr server")
|
|
return ServerStatus(
|
|
timestamp=timestamp,
|
|
server_url=self.server_url,
|
|
status='offline',
|
|
error='Unable to connect to Tdarr server'
|
|
)
|
|
|
|
def get_queue_status(self) -> QueueStatus:
|
|
"""Get transcoding queue status and statistics."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Get queue information
|
|
data = self._make_request('/api/v2/get-queue')
|
|
if data:
|
|
queue_data = data.get('queue', [])
|
|
|
|
# Calculate queue statistics
|
|
total_files = len(queue_data)
|
|
queued_files = len([f for f in queue_data if f.get('status') == 'Queued'])
|
|
processing_files = len([f for f in queue_data if f.get('status') == 'Processing'])
|
|
completed_files = len([f for f in queue_data if f.get('status') == 'Completed'])
|
|
|
|
queue_stats = QueueStats(
|
|
total_files=total_files,
|
|
queued=queued_files,
|
|
processing=processing_files,
|
|
completed=completed_files,
|
|
queue_items=queue_data[:10] # First 10 items for details
|
|
)
|
|
|
|
return QueueStatus(
|
|
timestamp=timestamp,
|
|
queue_stats=queue_stats
|
|
)
|
|
else:
|
|
return QueueStatus(
|
|
timestamp=timestamp,
|
|
error='Unable to fetch queue data'
|
|
)
|
|
|
|
def get_node_status(self) -> NodeStatus:
|
|
"""Get status of all connected nodes."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Get nodes information (using correct endpoint)
|
|
data = self._make_request('/api/v2/get-nodes')
|
|
if data:
|
|
# Handle the actual data structure returned by Tdarr API
|
|
nodes_dict = data if isinstance(data, dict) else {}
|
|
nodes = []
|
|
|
|
# Process node information
|
|
online_nodes = []
|
|
offline_nodes = []
|
|
|
|
for node_id, node_data in nodes_dict.items():
|
|
node_info = NodeInfo(
|
|
id=node_id,
|
|
nodeName=node_data.get('nodeName'),
|
|
status='online', # Assume online if in response
|
|
lastSeen=None,
|
|
version=node_data.get('config', {}).get('version'),
|
|
platform=node_data.get('config', {}).get('platform_arch_isdocker'),
|
|
workers={
|
|
'cpu': len([w for w in node_data.get('workers', {}).values() if 'cpu' in w.get('workerType', '').lower()]),
|
|
'gpu': len([w for w in node_data.get('workers', {}).values() if 'gpu' in w.get('workerType', '').lower()])
|
|
},
|
|
processing=list(node_data.get('workers', {}).values())
|
|
)
|
|
|
|
online_nodes.append(node_info)
|
|
nodes.append(node_data)
|
|
|
|
# Check for stuck jobs if detection is enabled
|
|
stuck_jobs = []
|
|
if self.stuck_detector:
|
|
try:
|
|
stuck_jobs = self.stuck_detector.update_workers(nodes_dict)
|
|
if stuck_jobs:
|
|
self.logger.warning(f"Detected {len(stuck_jobs)} stuck jobs")
|
|
for stuck_job in stuck_jobs:
|
|
self.logger.warning(
|
|
f"Stuck job: {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id} "
|
|
f"on file '{stuck_job.worker_snapshot.file}' "
|
|
f"at {stuck_job.worker_snapshot.percentage}% for {stuck_job.stuck_duration_minutes:.1f} minutes"
|
|
)
|
|
|
|
# Clear hung workers if enabled
|
|
if self.clear_hung_workers_enabled:
|
|
try:
|
|
clear_success = self.clear_hung_workers(stuck_jobs)
|
|
if clear_success:
|
|
self.logger.info(f"Successfully cleared {len(stuck_jobs)} hung workers")
|
|
else:
|
|
self.logger.warning("Some hung workers could not be cleared")
|
|
except Exception as e:
|
|
self.logger.error(f"Error clearing hung workers: {e}")
|
|
|
|
# Send Discord notification for stuck jobs
|
|
if self.discord_notifier:
|
|
try:
|
|
self.discord_notifier.send_stuck_job_alert(stuck_jobs)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send Discord stuck job alert: {e}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in stuck job detection: {e}")
|
|
|
|
node_summary = NodeSummary(
|
|
total_nodes=len(nodes),
|
|
online_nodes=len(online_nodes),
|
|
offline_nodes=len(offline_nodes),
|
|
online_details=online_nodes,
|
|
offline_details=offline_nodes
|
|
)
|
|
|
|
# Log successful node check with summary
|
|
if stuck_jobs:
|
|
self.logger.info(f"Node check completed: {len(nodes)} nodes online, {len(stuck_jobs)} stuck jobs detected")
|
|
else:
|
|
self.logger.info(f"Node check completed: {len(nodes)} nodes online, no stuck jobs detected")
|
|
|
|
return NodeStatus(
|
|
timestamp=timestamp,
|
|
nodes=nodes,
|
|
node_summary=node_summary,
|
|
stuck_jobs=stuck_jobs
|
|
)
|
|
else:
|
|
self.logger.error("Node check failed: Unable to fetch node data")
|
|
return NodeStatus(
|
|
timestamp=timestamp,
|
|
nodes=[],
|
|
error='Unable to fetch node data'
|
|
)
|
|
|
|
def get_library_status(self) -> LibraryStatus:
|
|
"""Get library scan status and file statistics."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Get library information
|
|
data = self._make_request('/api/v2/get-libraries')
|
|
if data:
|
|
libraries = data.get('libraries', [])
|
|
|
|
library_stats = []
|
|
total_files = 0
|
|
|
|
for lib in libraries:
|
|
lib_info = LibraryInfo(
|
|
name=lib.get('name'),
|
|
path=lib.get('path'),
|
|
file_count=lib.get('totalFiles', 0),
|
|
scan_progress=lib.get('scanProgress', 0),
|
|
last_scan=lib.get('lastScan'),
|
|
is_scanning=lib.get('isScanning', False)
|
|
)
|
|
library_stats.append(lib_info)
|
|
total_files += lib_info.file_count
|
|
|
|
scan_status = ScanStatus(
|
|
total_libraries=len(libraries),
|
|
total_files=total_files,
|
|
scanning_libraries=len([l for l in library_stats if l.is_scanning])
|
|
)
|
|
|
|
return LibraryStatus(
|
|
timestamp=timestamp,
|
|
libraries=library_stats,
|
|
scan_status=scan_status
|
|
)
|
|
else:
|
|
return LibraryStatus(
|
|
timestamp=timestamp,
|
|
libraries=[],
|
|
error='Unable to fetch library data'
|
|
)
|
|
|
|
def get_statistics(self) -> StatisticsStatus:
|
|
"""Get overall Tdarr statistics and health metrics."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Get statistics
|
|
data = self._make_request('/api/v2/get-stats')
|
|
if data:
|
|
stats = data.get('stats', {})
|
|
statistics = Statistics(
|
|
total_transcodes=stats.get('totalTranscodes', 0),
|
|
space_saved=stats.get('spaceSaved', 0),
|
|
total_files_processed=stats.get('totalFilesProcessed', 0),
|
|
failed_transcodes=stats.get('failedTranscodes', 0),
|
|
processing_speed=stats.get('processingSpeed', 0),
|
|
eta=stats.get('eta')
|
|
)
|
|
|
|
return StatisticsStatus(
|
|
timestamp=timestamp,
|
|
statistics=statistics
|
|
)
|
|
else:
|
|
return StatisticsStatus(
|
|
timestamp=timestamp,
|
|
error='Unable to fetch statistics'
|
|
)
|
|
|
|
def health_check(self) -> HealthStatus:
|
|
"""Perform comprehensive health check."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Server connectivity
|
|
server_status = self.get_server_status()
|
|
server_check = HealthCheck(
|
|
status=server_status.status,
|
|
healthy=server_status.status == 'good'
|
|
)
|
|
|
|
# Node connectivity
|
|
node_status = self.get_node_status()
|
|
nodes_healthy = (
|
|
node_status.node_summary.online_nodes > 0 if node_status.node_summary else False
|
|
) and not node_status.error
|
|
|
|
nodes_check = HealthCheck(
|
|
status='online' if nodes_healthy else 'offline',
|
|
healthy=nodes_healthy,
|
|
online_count=node_status.node_summary.online_nodes if node_status.node_summary else 0,
|
|
total_count=node_status.node_summary.total_nodes if node_status.node_summary else 0
|
|
)
|
|
|
|
checks = {
|
|
'server': server_check,
|
|
'nodes': nodes_check
|
|
}
|
|
|
|
# Determine overall health
|
|
all_checks_healthy = all(check.healthy for check in checks.values())
|
|
overall_status = 'healthy' if all_checks_healthy else 'unhealthy'
|
|
|
|
return HealthStatus(
|
|
timestamp=timestamp,
|
|
overall_status=overall_status,
|
|
checks=checks
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Monitor Tdarr server via API')
|
|
parser.add_argument('--server', required=True, help='Tdarr server URL (e.g., http://10.10.0.43:8265)')
|
|
parser.add_argument('--check', choices=['all', 'status', 'queue', 'nodes', 'libraries', 'stats', 'health'],
|
|
default='health', help='Type of check to perform')
|
|
parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds')
|
|
parser.add_argument('--output', choices=['json', 'pretty'], default='pretty', help='Output format')
|
|
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
|
|
parser.add_argument('--detect-stuck', action='store_true', help='Enable stuck job detection')
|
|
parser.add_argument('--stuck-threshold', type=int, default=30, help='Minutes before job is considered stuck (default: 30)')
|
|
parser.add_argument('--memory-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl', help='Path to memory state file')
|
|
parser.add_argument('--clear-memory', action='store_true', help='Clear memory state and exit')
|
|
parser.add_argument('--discord-webhook',
|
|
default='https://discord.com/api/webhooks/1404105821549498398/y2Ud1RK9rzFjv58xbypUfQNe3jrL7ZUq1FkQHa4_dfOHm2ylp93z0f4tY0O8Z-vQgKhD',
|
|
help='Discord webhook URL for notifications (default: configured webhook)')
|
|
parser.add_argument('--discord-alerts', action='store_true', help='Enable Discord alerts for stuck jobs and system status')
|
|
parser.add_argument('--discord-test', action='store_true', help='Send test Discord message and exit')
|
|
parser.add_argument('--log-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_monitor.log',
|
|
help='Path to log file with rotation (default: /mnt/NV2/Development/claude-home/logs/tdarr_monitor.log)')
|
|
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO',
|
|
help='Logging level (default: INFO)')
|
|
parser.add_argument('--no-log-file', action='store_true', help='Disable file logging, console only')
|
|
parser.add_argument('--clear-hung-workers', action='store_true', help='Clear hung workers via API call when stuck jobs are detected')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Handle clear memory command
|
|
if args.clear_memory:
|
|
if os.path.exists(args.memory_file):
|
|
os.remove(args.memory_file)
|
|
print(f"Memory state cleared: {args.memory_file}")
|
|
else:
|
|
print(f"Memory file does not exist: {args.memory_file}")
|
|
sys.exit(0)
|
|
|
|
# Handle Discord test command
|
|
if args.discord_test:
|
|
print("Sending Discord test messages...")
|
|
notifier = DiscordNotifier(args.discord_webhook)
|
|
|
|
# Test content message
|
|
content_success = notifier.send_content_message(
|
|
"🧪 **Tdarr Monitor Test** - Content message working correctly!"
|
|
)
|
|
|
|
# Test embed message
|
|
test_fields = [
|
|
DiscordEmbedField("Test Field 1", "This is a test value", True),
|
|
DiscordEmbedField("Test Field 2", "Another test value", True),
|
|
]
|
|
|
|
embed_success = notifier.send_embed_message(
|
|
title="🧪 Tdarr Monitor Test",
|
|
description="This is a test embed message to verify Discord integration is working correctly.",
|
|
color=0x00ff00, # Green
|
|
fields=test_fields
|
|
)
|
|
|
|
if content_success and embed_success:
|
|
print("✅ Discord test successful! Both content and embed messages sent.")
|
|
sys.exit(0)
|
|
else:
|
|
print("❌ Discord test failed. Check webhook URL and permissions.")
|
|
sys.exit(1)
|
|
|
|
# Initialize monitor
|
|
log_file = None if args.no_log_file else args.log_file
|
|
monitor = TdarrMonitor(
|
|
args.server,
|
|
args.timeout,
|
|
enable_stuck_detection=args.detect_stuck,
|
|
stuck_threshold_minutes=args.stuck_threshold,
|
|
memory_file=args.memory_file,
|
|
discord_webhook_url=args.discord_webhook,
|
|
enable_discord_alerts=args.discord_alerts,
|
|
log_file=log_file,
|
|
log_level=args.log_level,
|
|
clear_hung_workers=args.clear_hung_workers
|
|
)
|
|
|
|
# Perform requested check
|
|
monitor.logger.info(f"Starting Tdarr monitoring check: {args.check}, stuck_detection={'enabled' if args.detect_stuck else 'disabled'}, clear_workers={'enabled' if args.clear_hung_workers else 'disabled'}")
|
|
|
|
result = None
|
|
if args.check == 'all':
|
|
result = {
|
|
'server_status': monitor.get_server_status(),
|
|
'node_status': monitor.get_node_status()
|
|
}
|
|
elif args.check == 'status':
|
|
result = monitor.get_server_status()
|
|
elif args.check == 'queue':
|
|
result = monitor.get_queue_status()
|
|
elif args.check == 'nodes':
|
|
result = monitor.get_node_status()
|
|
elif args.check == 'libraries':
|
|
result = monitor.get_library_status()
|
|
elif args.check == 'stats':
|
|
result = monitor.get_statistics()
|
|
elif args.check == 'health':
|
|
result = monitor.health_check()
|
|
|
|
# Output results
|
|
if args.output == 'json':
|
|
# Convert dataclasses to dictionaries for JSON serialization
|
|
if args.check == 'all':
|
|
json_result = {}
|
|
for key, value in result.items():
|
|
json_result[key] = asdict(value)
|
|
print(json.dumps(json_result, indent=2))
|
|
else:
|
|
print(json.dumps(asdict(result), indent=2))
|
|
else:
|
|
# Pretty print format
|
|
print(f"=== Tdarr Monitor Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
|
|
|
|
if args.check == 'health' or (hasattr(result, 'overall_status') and result.overall_status):
|
|
health = result if hasattr(result, 'overall_status') else None
|
|
if health:
|
|
status = health.overall_status
|
|
print(f"Overall Status: {status.upper()}")
|
|
|
|
if health.checks:
|
|
print("\nHealth Checks:")
|
|
for check_name, check_data in health.checks.items():
|
|
status_icon = "✓" if check_data.healthy else "✗"
|
|
print(f" {status_icon} {check_name.title()}: {asdict(check_data)}")
|
|
|
|
# Display stuck jobs if present
|
|
if args.detect_stuck:
|
|
if hasattr(result, 'stuck_jobs') and result.stuck_jobs:
|
|
print(f"\n=== STUCK JOBS DETECTED ({len(result.stuck_jobs)}) ===")
|
|
for stuck_job in result.stuck_jobs:
|
|
print(f"🚨 {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}")
|
|
print(f" File: {stuck_job.worker_snapshot.file}")
|
|
print(f" Progress: {stuck_job.worker_snapshot.percentage}%")
|
|
print(f" Status: {stuck_job.worker_snapshot.status}")
|
|
print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes")
|
|
print()
|
|
elif args.check in ['nodes', 'all']:
|
|
# Check all results for stuck jobs if 'all' is selected
|
|
stuck_found = False
|
|
if args.check == 'all' and isinstance(result, dict):
|
|
for section, data in result.items():
|
|
if hasattr(data, 'stuck_jobs') and data.stuck_jobs:
|
|
if not stuck_found:
|
|
print(f"\n=== STUCK JOBS DETECTED ===")
|
|
stuck_found = True
|
|
for stuck_job in data.stuck_jobs:
|
|
print(f"🚨 {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}")
|
|
print(f" File: {stuck_job.worker_snapshot.file}")
|
|
print(f" Progress: {stuck_job.worker_snapshot.percentage}%")
|
|
print(f" Status: {stuck_job.worker_snapshot.status}")
|
|
print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes")
|
|
print()
|
|
|
|
if not stuck_found:
|
|
print(f"\n✅ No stuck jobs detected (threshold: {args.stuck_threshold} minutes)")
|
|
|
|
if args.check == 'all':
|
|
for section, data in result.items():
|
|
print(f"\n=== {section.replace('_', ' ').title()} ===")
|
|
# Don't print stuck_jobs in JSON format as we already displayed them above
|
|
if hasattr(data, 'stuck_jobs'):
|
|
data_dict = asdict(data)
|
|
data_dict.pop('stuck_jobs', None)
|
|
print(json.dumps(data_dict, indent=2))
|
|
else:
|
|
print(json.dumps(asdict(data), indent=2))
|
|
elif args.check != 'health':
|
|
# Don't print stuck_jobs in JSON format as we already displayed them above
|
|
if hasattr(result, 'stuck_jobs'):
|
|
result_dict = asdict(result)
|
|
result_dict.pop('stuck_jobs', None)
|
|
print(json.dumps(result_dict, indent=2))
|
|
else:
|
|
print(json.dumps(asdict(result), indent=2))
|
|
|
|
# Exit with appropriate code
|
|
if result:
|
|
# Check for unhealthy status in health check
|
|
if isinstance(result, HealthStatus) and result.overall_status == 'unhealthy':
|
|
sys.exit(1)
|
|
# Check for errors in individual status objects (all status classes except HealthStatus have error attribute)
|
|
elif (isinstance(result, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus))
|
|
and result.error):
|
|
sys.exit(1)
|
|
# Check for errors in 'all' results
|
|
elif isinstance(result, dict):
|
|
for status_obj in result.values():
|
|
if (isinstance(status_obj, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus))
|
|
and status_obj.error):
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |