diff --git a/.gitignore b/.gitignore index dbeed45..4724d79 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ __pycache__ # Large binary files *.zip + +# Art assets (managed separately) +rpg-art/ diff --git a/monitoring/recovered-lxc300/server-diagnostics/SKILL.md b/monitoring/recovered-lxc300/server-diagnostics/SKILL.md new file mode 100644 index 0000000..29c53a5 --- /dev/null +++ b/monitoring/recovered-lxc300/server-diagnostics/SKILL.md @@ -0,0 +1,177 @@ +--- +name: server-diagnostics +description: | + Automated server troubleshooting for Docker containers and system health. + Provides SSH-based diagnostics, log reading, metrics collection, and low-risk + remediation. USE WHEN N8N triggers troubleshooting, container issues detected, + or system health checks needed. +--- + +# Server Diagnostics - Automated Troubleshooting + +## When to Activate This Skill +- N8N triggers with error context +- "diagnose container X", "check docker status" +- "read logs from server", "check disk usage" +- "troubleshoot server issue" +- Any automated health check response + +## Quick Start + +### Check All Containers +```bash +python ~/.claude/skills/server-diagnostics/client.py docker-status paper-dynasty +``` + +### Quick Health Check (Docker + System Metrics) +```bash +python ~/.claude/skills/server-diagnostics/client.py health paper-dynasty +``` + +### Get Container Logs +```bash +python ~/.claude/skills/server-diagnostics/client.py docker-logs paper-dynasty paper-dynasty_discord-app_1 --lines 200 +``` + +### Restart a Container +```bash +python ~/.claude/skills/server-diagnostics/client.py docker-restart paper-dynasty paper-dynasty_discord-app_1 +``` + +### System Metrics +```bash +python ~/.claude/skills/server-diagnostics/client.py metrics paper-dynasty --type all +python ~/.claude/skills/server-diagnostics/client.py metrics paper-dynasty --type disk +``` + +### Run Diagnostic Command +```bash +python ~/.claude/skills/server-diagnostics/client.py diagnostic paper-dynasty disk_usage +python ~/.claude/skills/server-diagnostics/client.py diagnostic paper-dynasty memory_usage +``` + +## Troubleshooting Workflow + +When an issue is reported: + +1. **Quick Health Check** - Get overview of containers and system state + ```bash + python ~/.claude/skills/server-diagnostics/client.py health paper-dynasty + ``` + +2. **Check MemoryGraph** - Recall similar issues + ```bash + python ~/.claude/skills/memorygraph/client.py recall "docker container error" + ``` + +3. **Get Container Logs** - Look for errors + ```bash + python ~/.claude/skills/server-diagnostics/client.py docker-logs paper-dynasty --lines 500 --filter error + ``` + +4. **Remediate if Safe** - Restart if allowed + ```bash + python ~/.claude/skills/server-diagnostics/client.py docker-restart paper-dynasty + ``` + +5. **Store Solution** - Save to MemoryGraph if resolved + ```bash + python ~/.claude/skills/memorygraph/client.py store \ + --type solution \ + --title "Fixed issue" \ + --content "Description of problem and solution" \ + --tags "docker,paper-dynasty,troubleshooting" \ + --importance 0.7 + ``` + +## Server Inventory + +| Server | IP | SSH User | Description | +|--------|-----|----------|-------------| +| paper-dynasty | 10.10.0.88 | cal | Paper Dynasty Discord bots and services | + +## Monitored Containers + +| Container | Critical | Restart Allowed | Description | +|-----------|----------|-----------------|-------------| +| paper-dynasty_discord-app_1 | Yes | Yes | Paper Dynasty Discord bot | +| paper-dynasty_db_1 | Yes | Yes | PostgreSQL database | +| paper-dynasty_adminer_1 | No | Yes | Database admin UI | +| sba-website_sba-web_1 | Yes | Yes | SBA website | +| sba-ghost_sba-ghost_1 | No | Yes | Ghost CMS | + +## Available Diagnostic Commands + +- `disk_usage` - df -h +- `memory_usage` - free -h +- `cpu_usage` - top -bn1 | head -20 +- `cpu_load` - uptime +- `process_list` - ps aux --sort=-%mem | head -20 +- `network_status` - ss -tuln +- `docker_ps` - docker ps -a (formatted) +- `docker_stats` - docker stats --no-stream +- `journal_errors` - journalctl -p err -n 50 + +## Security Constraints + +### DENIED Patterns (Will Be Rejected) +- rm -rf, rm -r / +- dd if=, mkfs +- shutdown, reboot +- systemctl stop +- chmod 777 +- wget|sh, curl|sh + +### Container Restart Rules +- Only containers in config.yaml with restart_allowed: true +- N8N container restart is NEVER allowed (it triggers us) + +## MemoryGraph Integration + +Before troubleshooting, check for known solutions: +```bash +python ~/.claude/skills/memorygraph/client.py recall "docker paper-dynasty" +``` + +After resolving, store the pattern: +```bash +python ~/.claude/skills/memorygraph/client.py store \ + --type solution \ + --title "Brief description" \ + --content "Full explanation..." \ + --tags "docker,paper-dynasty,fix" \ + --importance 0.7 +``` + +## Common Issues and Solutions + +### Container Not Running +1. Check logs for crash reason +2. Check disk space and memory +3. Attempt restart if allowed +4. Escalate if restart fails + +### High Memory Usage +1. Check which container is consuming +2. Review docker stats +3. Check for memory leaks in logs +4. Consider container restart + +### Disk Space Low +1. Run disk_usage diagnostic +2. Check docker system df +3. Consider log rotation +4. Alert user for cleanup + +## Output Format + +All commands return JSON: +```json +{ + "success": true, + "stdout": "...", + "stderr": "...", + "returncode": 0, + "data": {...} // Parsed data if applicable +} +``` diff --git a/monitoring/recovered-lxc300/server-diagnostics/client.py b/monitoring/recovered-lxc300/server-diagnostics/client.py new file mode 100644 index 0000000..ff42489 --- /dev/null +++ b/monitoring/recovered-lxc300/server-diagnostics/client.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 +""" +Server Diagnostics Client Library +Provides SSH-based diagnostics for homelab troubleshooting +""" + +import json +import subprocess +from pathlib import Path +from typing import Any, Optional, List, Dict +import yaml + + +class ServerDiagnostics: + """ + Main diagnostic client for server troubleshooting. + + Connects to servers via SSH and executes whitelisted diagnostic + commands. Enforces security constraints from config.yaml. + """ + + def __init__(self, config_path: Optional[str] = None): + """ + Initialize with configuration. + + Args: + config_path: Path to config.yaml. Defaults to same directory. + """ + if config_path is None: + config_path = Path(__file__).parent / "config.yaml" + self.config = self._load_config(config_path) + self.servers = self.config.get("servers", {}) + self.containers = self.config.get("docker_containers", []) + self.allowed_commands = self.config.get("diagnostic_commands", {}) + self.remediation_commands = self.config.get("remediation_commands", {}) + self.denied_patterns = self.config.get("denied_patterns", []) + + def _load_config(self, path) -> dict: + """Load YAML configuration.""" + with open(path) as f: + return yaml.safe_load(f) + + def _validate_command(self, command: str) -> bool: + """Check command against deny list.""" + for pattern in self.denied_patterns: + if pattern in command: + raise SecurityError(f"Command contains denied pattern: {pattern}") + return True + + def _ssh_exec(self, server: str, command: str) -> dict: + """ + Execute command on remote server via SSH. + + Returns: + dict with stdout, stderr, returncode + """ + self._validate_command(command) + + server_config = self.servers.get(server) + if not server_config: + raise ValueError(f"Unknown server: {server}") + + ssh_key = Path(server_config["ssh_key"]).expanduser() + ssh_user = server_config["ssh_user"] + hostname = server_config["hostname"] + + ssh_cmd = [ + "ssh", + "-i", + str(ssh_key), + "-o", + "StrictHostKeyChecking=no", + "-o", + "ConnectTimeout=10", + f"{ssh_user}@{hostname}", + command, + ] + + result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=60) + + return { + "stdout": result.stdout, + "stderr": result.stderr, + "returncode": result.returncode, + "success": result.returncode == 0, + } + + # === Docker Operations === + + def get_docker_status(self, server: str, container: Optional[str] = None) -> dict: + """ + Get Docker container status. + + Args: + server: Server identifier from config + container: Specific container name (optional, all if not specified) + + Returns: + dict with container statuses + """ + if container: + cmd = "docker inspect --format '{{json .State}}' " + container + result = self._ssh_exec(server, cmd) + if result["success"]: + try: + result["data"] = json.loads(result["stdout"]) + except json.JSONDecodeError: + result["data"] = None + else: + # Use Go template format for Docker 20.10 compatibility + # Format: Name|Status|State|Ports + cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.State}}|{{.Ports}}'" + result = self._ssh_exec(server, cmd) + if result["success"]: + containers = [] + for line in result["stdout"].strip().split("\n"): + if line: + parts = line.split("|") + if len(parts) >= 3: + containers.append( + { + "Names": parts[0], + "Status": parts[1], + "State": parts[2], + "Ports": parts[3] if len(parts) > 3 else "", + } + ) + result["data"] = containers + + return result + + def docker_logs( + self, + server: str, + container: str, + lines: int = 100, + log_filter: Optional[str] = None, + ) -> dict: + """ + Get Docker container logs. + + Args: + server: Server identifier + container: Container name + lines: Number of lines to retrieve + log_filter: Optional grep filter pattern + + Returns: + dict with log output + """ + cmd = f"docker logs --tail {lines} {container} 2>&1" + if log_filter: + cmd += f" | grep -i '{log_filter}'" + + return self._ssh_exec(server, cmd) + + def docker_restart(self, server: str, container: str) -> dict: + """ + Restart a Docker container (low-risk remediation). + + Args: + server: Server identifier + container: Container name + + Returns: + dict with operation result + """ + # Check if container is allowed to be restarted + container_config = next( + (c for c in self.containers if c["name"] == container), None + ) + + if not container_config: + return { + "success": False, + "error": f"Container {container} not in monitored list", + } + + if not container_config.get("restart_allowed", False): + return { + "success": False, + "error": f"Container {container} restart not permitted", + } + + cmd = f"docker restart {container}" + result = self._ssh_exec(server, cmd) + result["action"] = "docker_restart" + result["container"] = container + + return result + + # === System Diagnostics === + + def get_metrics(self, server: str, metric_type: str = "all") -> dict: + """ + Get system metrics from server. + + Args: + server: Server identifier + metric_type: Type of metrics (cpu, memory, disk, network, all) + + Returns: + dict with metric data + """ + metrics = {} + + if metric_type in ("cpu", "all"): + result = self._ssh_exec(server, self.allowed_commands["cpu_usage"]) + metrics["cpu"] = result + + if metric_type in ("memory", "all"): + result = self._ssh_exec(server, self.allowed_commands["memory_usage"]) + metrics["memory"] = result + + if metric_type in ("disk", "all"): + result = self._ssh_exec(server, self.allowed_commands["disk_usage"]) + metrics["disk"] = result + + if metric_type in ("network", "all"): + result = self._ssh_exec(server, self.allowed_commands["network_status"]) + metrics["network"] = result + + return {"server": server, "metrics": metrics} + + def read_logs( + self, + server: str, + log_type: str, + lines: int = 100, + log_filter: Optional[str] = None, + custom_path: Optional[str] = None, + ) -> dict: + """ + Read logs from server. + + Args: + server: Server identifier + log_type: Type of log (system, docker, application, custom) + lines: Number of lines + log_filter: Optional grep pattern + custom_path: Path for custom log type + + Returns: + dict with log content + """ + log_paths = { + "system": "/var/log/syslog", + "docker": "/var/log/docker.log", + "application": "/var/log/application.log", + } + + path = custom_path if log_type == "custom" else log_paths.get(log_type) + + if not path: + return {"success": False, "error": f"Unknown log type: {log_type}"} + + cmd = f"tail -n {lines} {path}" + if log_filter: + cmd += f" | grep -i '{log_filter}'" + + return self._ssh_exec(server, cmd) + + def run_diagnostic( + self, server: str, command: str, params: Optional[dict] = None + ) -> dict: + """ + Run a whitelisted diagnostic command. + + Args: + server: Server identifier + command: Command key from config whitelist + params: Optional parameters to substitute + + Returns: + dict with command output + """ + if command not in self.allowed_commands: + return {"success": False, "error": f"Command '{command}' not in whitelist"} + + cmd = self.allowed_commands[command] + + # Substitute parameters if provided + if params: + for key, value in params.items(): + cmd = cmd.replace(f"{{{key}}}", str(value)) + + return self._ssh_exec(server, cmd) + + # === Convenience Methods === + + def quick_health_check(self, server: str) -> dict: + """ + Perform quick health check on server. + + Returns summary of Docker containers, disk, and memory. + """ + health = { + "server": server, + "docker": self.get_docker_status(server), + "metrics": self.get_metrics(server, "all"), + "healthy": True, + "issues": [], + } + + # Check for stopped containers + if health["docker"].get("data"): + for container in health["docker"]["data"]: + status = container.get("State", container.get("Status", "")) + if "Up" not in str(status) and "running" not in str(status).lower(): + health["healthy"] = False + health["issues"].append( + f"Container {container.get('Names', 'unknown')} is not running" + ) + + return health + + def to_json(self, data: Any) -> str: + """Convert result to JSON string.""" + return json.dumps(data, indent=2, default=str) + + +class SecurityError(Exception): + """Raised when a command violates security constraints.""" + + pass + + +def main(): + """CLI interface for server diagnostics.""" + import argparse + + parser = argparse.ArgumentParser( + description="Server Diagnostics CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s docker-status paper-dynasty + %(prog)s docker-status paper-dynasty --container paper-dynasty_discord-app_1 + %(prog)s docker-logs paper-dynasty paper-dynasty_discord-app_1 --lines 200 + %(prog)s docker-restart paper-dynasty paper-dynasty_discord-app_1 + %(prog)s metrics paper-dynasty --type all + %(prog)s health paper-dynasty + %(prog)s diagnostic paper-dynasty disk_usage + """, + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + # docker-status + p_docker = subparsers.add_parser( + "docker-status", help="Get Docker container status" + ) + p_docker.add_argument("server", help="Server identifier") + p_docker.add_argument("--container", "-c", help="Specific container name") + + # docker-logs + p_logs = subparsers.add_parser("docker-logs", help="Get Docker container logs") + p_logs.add_argument("server", help="Server identifier") + p_logs.add_argument("container", help="Container name") + p_logs.add_argument("--lines", "-n", type=int, default=100, help="Number of lines") + p_logs.add_argument("--filter", "-f", dest="log_filter", help="Grep filter pattern") + + # docker-restart + p_restart = subparsers.add_parser("docker-restart", help="Restart Docker container") + p_restart.add_argument("server", help="Server identifier") + p_restart.add_argument("container", help="Container name") + + # metrics + p_metrics = subparsers.add_parser("metrics", help="Get system metrics") + p_metrics.add_argument("server", help="Server identifier") + p_metrics.add_argument( + "--type", + "-t", + default="all", + choices=["cpu", "memory", "disk", "network", "all"], + help="Metric type", + ) + + # logs + p_syslogs = subparsers.add_parser("logs", help="Read system logs") + p_syslogs.add_argument("server", help="Server identifier") + p_syslogs.add_argument( + "--type", + "-t", + default="system", + choices=["system", "docker", "application", "custom"], + help="Log type", + ) + p_syslogs.add_argument( + "--lines", "-n", type=int, default=100, help="Number of lines" + ) + p_syslogs.add_argument( + "--filter", "-f", dest="log_filter", help="Grep filter pattern" + ) + p_syslogs.add_argument("--path", help="Custom log path (for type=custom)") + + # health + p_health = subparsers.add_parser("health", help="Quick health check") + p_health.add_argument("server", help="Server identifier") + + # diagnostic + p_diag = subparsers.add_parser("diagnostic", help="Run whitelisted diagnostic") + p_diag.add_argument("server", help="Server identifier") + p_diag.add_argument("diagnostic_cmd", help="Command from whitelist") + p_diag.add_argument( + "--params", "-p", help="JSON parameters for command substitution" + ) + + args = parser.parse_args() + + client = ServerDiagnostics() + + if args.command == "docker-status": + result = client.get_docker_status(args.server, args.container) + + elif args.command == "docker-logs": + result = client.docker_logs( + args.server, args.container, args.lines, args.log_filter + ) + + elif args.command == "docker-restart": + result = client.docker_restart(args.server, args.container) + + elif args.command == "metrics": + result = client.get_metrics(args.server, args.type) + + elif args.command == "logs": + result = client.read_logs( + args.server, args.type, args.lines, args.log_filter, args.path + ) + + elif args.command == "health": + result = client.quick_health_check(args.server) + + elif args.command == "diagnostic": + params = json.loads(args.params) if args.params else None + result = client.run_diagnostic(args.server, args.diagnostic_cmd, params) + + print(client.to_json(result)) + + +if __name__ == "__main__": + main() diff --git a/monitoring/recovered-lxc300/server-diagnostics/config.yaml b/monitoring/recovered-lxc300/server-diagnostics/config.yaml new file mode 100644 index 0000000..56ba667 --- /dev/null +++ b/monitoring/recovered-lxc300/server-diagnostics/config.yaml @@ -0,0 +1,72 @@ +# Server Diagnostics Configuration +# Used by client.py for server inventory and security constraints + +# Server inventory - SSH connection details +servers: + paper-dynasty: + hostname: 10.10.0.88 + ssh_user: cal + ssh_key: ~/.ssh/claude_diagnostics_key + description: "Paper Dynasty Discord bots and services" + +# Docker containers to monitor +# restart_allowed: false prevents automatic remediation +docker_containers: + - name: paper-dynasty_discord-app_1 + critical: true + restart_allowed: true + description: "Paper Dynasty Discord bot" + + - name: paper-dynasty_db_1 + critical: true + restart_allowed: true + description: "Paper Dynasty PostgreSQL database" + + - name: paper-dynasty_adminer_1 + critical: false + restart_allowed: true + description: "Database admin UI" + + - name: sba-website_sba-web_1 + critical: true + restart_allowed: true + description: "SBA website" + + - name: sba-ghost_sba-ghost_1 + critical: false + restart_allowed: true + description: "SBA Ghost CMS" + +# Whitelisted diagnostic commands +diagnostic_commands: + disk_usage: "df -h" + memory_usage: "free -h" + cpu_usage: "top -bn1 | head -20" + cpu_load: "uptime" + process_list: "ps aux --sort=-%mem | head -20" + network_status: "ss -tuln" + docker_ps: "docker ps -a --format 'table {{.Names}}\\t{{.Status}}\\t{{.Ports}}'" + docker_stats: "docker stats --no-stream --format 'table {{.Name}}\\t{{.CPUPerc}}\\t{{.MemUsage}}'" + journal_errors: "journalctl -p err -n 50 --no-pager" + +# Remediation commands (low-risk only) +remediation_commands: + docker_restart: "docker restart {container}" + docker_logs: "docker logs --tail 500 {container}" + +# DENIED patterns - commands containing these will be rejected +denied_patterns: + - "rm -rf" + - "rm -r /" + - "dd if=" + - "mkfs" + - ":(){:|:&};:" + - "shutdown" + - "reboot" + - "init 0" + - "init 6" + - "systemctl stop" + - "> /dev/sd" + - "chmod 777" + - "wget|sh" + - "curl|sh" diff --git a/monitoring/recovered-lxc300/server-diagnostics/requirements.txt b/monitoring/recovered-lxc300/server-diagnostics/requirements.txt new file mode 100644 index 0000000..3aecde9 --- /dev/null +++ b/monitoring/recovered-lxc300/server-diagnostics/requirements.txt @@ -0,0 +1 @@ +pyyaml>=6.0 diff --git a/monitoring/recovered-lxc300/settings.json b/monitoring/recovered-lxc300/settings.json new file mode 100644 index 0000000..a2f1fec --- /dev/null +++ b/monitoring/recovered-lxc300/settings.json @@ -0,0 +1,26 @@ +{ + "permissions": { + "allow": [ + "Bash(python3 ~/.claude/skills/server-diagnostics/client.py:*)", + "Bash(ssh -i ~/.ssh/claude_diagnostics_key:*)", + "Read(~/.claude/skills/**)", + "Read(~/.claude/logs/**)", + "Glob(*)", + "Grep(*)" + ], + "deny": [ + "Bash(rm -rf:*)", + "Bash(rm -r /:*)", + "Bash(dd:*)", + "Bash(mkfs:*)", + "Bash(shutdown:*)", + "Bash(reboot:*)", + "Bash(*> /dev/sd*)", + "Bash(chmod 777:*)", + "Bash(*|sh)", + "Bash(*curl*|*bash*)", + "Bash(*wget*|*bash*)" + ] + }, + "model": "sonnet" +} diff --git a/tdarr/archive/README.md b/tdarr/archive/README.md new file mode 100644 index 0000000..2ebb7ca --- /dev/null +++ b/tdarr/archive/README.md @@ -0,0 +1,19 @@ +# Legacy Tdarr Scripts + +## tdarr_monitor_local_node.py + +Full-featured Tdarr monitoring script (~1200 lines) built for when the local workstation (nobara-pc) ran as an unmapped remote Tdarr node with GPU transcoding. + +**Features:** Stuck job detection via cross-run state comparison (pickle file), automatic worker killing, Discord alerts, configurable thresholds, rotating log files. + +**Why it existed:** The unmapped remote node architecture was prone to stuck jobs caused by network issues during file transfers between the remote node and server. The monitor ran every minute via cron to detect and kill stuck workers. + +**Why it's archived:** Transcoding moved to ubuntu-manticore (10.10.0.226) as a local mapped node with shared NFS storage. No remote transfers means no stuck jobs. Tdarr manages its own workers natively. Archived February 2026. + +## tdarr_file_monitor_local_node.py + tdarr-file-monitor-cron_local_node.sh + +File completion monitor that watched the local Tdarr cache directory for finished `.mkv` transcodes and copied the smallest version to a backup location. The cron wrapper ran it every minute. + +**Why it existed:** When the local workstation ran as an unmapped Tdarr node, completed transcodes landed in the local NVMe cache. This monitor detected completion (by tracking size stability) and kept the best copy. + +**Why it's archived:** Same reason as above - mapped node on manticore writes directly to the shared NFS media mount. No local cache to monitor. Archived February 2026. diff --git a/tdarr/archive/tdarr-file-monitor-cron_local_node.sh b/tdarr/archive/tdarr-file-monitor-cron_local_node.sh new file mode 100755 index 0000000..2faacbe --- /dev/null +++ b/tdarr/archive/tdarr-file-monitor-cron_local_node.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Cron job wrapper for Tdarr file monitor +# Add this to crontab with: * * * * * /mnt/NV2/Development/claude-home/monitoring/scripts/tdarr-file-monitor-cron.sh + +cd /mnt/NV2/Development/claude-home/monitoring/scripts +/usr/bin/python3 /mnt/NV2/Development/claude-home/monitoring/scripts/tdarr_file_monitor.py \ No newline at end of file diff --git a/tdarr/archive/tdarr_file_monitor_local_node.py b/tdarr/archive/tdarr_file_monitor_local_node.py new file mode 100755 index 0000000..e789408 --- /dev/null +++ b/tdarr/archive/tdarr_file_monitor_local_node.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +Tdarr File Monitor - Monitors Tdarr cache directory for completed .mkv files and copies them to backup location. +Detects file completion by monitoring size changes and always keeps the smallest version of duplicate files. +""" + +import os +import shutil +import json +import time +import logging +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import Dict, Optional +from datetime import datetime, timedelta + + +@dataclass +class FileState: + """Tracks the state of a monitored file.""" + path: str + size: int + last_modified: float + first_seen: float + last_size_change: float + check_count: int = 0 + + +class TdarrFileMonitor: + """Monitors Tdarr cache directory for completed .mkv files.""" + + def __init__( + self, + source_dir: str = "/mnt/NV2/tdarr-cache/nobara-pc-gpu-unmapped/temp", + media_dir: str = "/mnt/NV2/tdarr-cache/nobara-pc-gpu-unmapped/media", + dest_dir: str = "/mnt/NV2/tdarr-cache/manual-backup", + state_file: str = "/mnt/NV2/Development/claude-home/logs/tdarr_file_monitor_state.json", + completion_wait_seconds: int = 60, + log_file: str = "/mnt/NV2/Development/claude-home/logs/tdarr_file_monitor.log" + ): + self.source_dir = Path(source_dir) + self.media_dir = Path(media_dir) + self.dest_dir = Path(dest_dir) + self.state_file = Path(state_file) + self.completion_wait_seconds = completion_wait_seconds + self.monitored_files: Dict[str, FileState] = {} + + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ] + ) + self.logger = logging.getLogger(f'{__name__}.TdarrFileMonitor') + + # Ensure destination directory exists + self.dest_dir.mkdir(parents=True, exist_ok=True) + + # Load previous state + self._load_state() + + def _load_state(self) -> None: + """Load monitored files state from disk.""" + if self.state_file.exists(): + try: + with open(self.state_file, 'r') as f: + data = json.load(f) + self.monitored_files = { + path: FileState(**file_data) + for path, file_data in data.items() + } + self.logger.info(f"Loaded state for {len(self.monitored_files)} monitored files") + except Exception as e: + self.logger.error(f"Failed to load state file: {e}") + self.monitored_files = {} + + def _save_state(self) -> None: + """Save monitored files state to disk.""" + try: + with open(self.state_file, 'w') as f: + data = {path: asdict(state) for path, state in self.monitored_files.items()} + json.dump(data, f, indent=2) + except Exception as e: + self.logger.error(f"Failed to save state file: {e}") + + def _scan_for_mkv_files(self) -> Dict[str, Path]: + """Scan source directory for .mkv files in all subdirectories.""" + mkv_files = {} + try: + for mkv_file in self.source_dir.rglob("*.mkv"): + if mkv_file.is_file(): + mkv_files[str(mkv_file)] = mkv_file + except Exception as e: + self.logger.error(f"Error scanning source directory: {e}") + + return mkv_files + + def _get_file_info(self, file_path: Path) -> Optional[tuple]: + """Get file size and modification time, return None if file doesn't exist or can't be accessed.""" + try: + stat = file_path.stat() + return stat.st_size, stat.st_mtime + except (OSError, FileNotFoundError) as e: + self.logger.warning(f"Cannot access file {file_path}: {e}") + return None + + def _validate_file_pair(self, temp_file_path: Path, temp_file_size: int) -> bool: + """Validate that a matching file exists in media directory with exact same name and size.""" + try: + # Search for matching file in media directory tree + for media_file in self.media_dir.rglob(temp_file_path.name): + if media_file.is_file(): + media_file_info = self._get_file_info(media_file) + if media_file_info: + media_size, _ = media_file_info + if media_size == temp_file_size: + self.logger.debug(f"Found matching file: {temp_file_path.name} ({temp_file_size:,} bytes) in temp and media directories") + return True + else: + self.logger.debug(f"Size mismatch for {temp_file_path.name}: temp={temp_file_size:,}, media={media_size:,}") + + # No matching file found + self.logger.info(f"No matching file found in media directory for {temp_file_path.name} ({temp_file_size:,} bytes)") + return False + + except Exception as e: + self.logger.error(f"Error validating file pair for {temp_file_path.name}: {e}") + return False + + def _is_file_complete(self, file_state: FileState, current_time: float) -> bool: + """Check if file is complete based on size stability.""" + stale_time = current_time - file_state.last_size_change + return stale_time >= self.completion_wait_seconds + + def _should_copy_file(self, source_path: Path, dest_path: Path) -> bool: + """Determine if we should copy the file (always keep smaller version).""" + if not dest_path.exists(): + return True + + source_size = source_path.stat().st_size + dest_size = dest_path.stat().st_size + + if source_size < dest_size: + self.logger.info(f"Source file {source_path.name} ({source_size:,} bytes) is smaller than existing destination ({dest_size:,} bytes), will replace") + return True + else: + self.logger.info(f"Source file {source_path.name} ({source_size:,} bytes) is not smaller than existing destination ({dest_size:,} bytes), skipping") + return False + + def _copy_file_with_retry(self, source_path: Path, dest_path: Path) -> bool: + """Copy file with retry logic and cleanup on failure.""" + temp_dest = dest_path.with_suffix(dest_path.suffix + '.tmp') + + for attempt in range(2): # Try twice + try: + start_time = time.time() + self.logger.info(f"Attempt {attempt + 1}: Copying {source_path.name} ({source_path.stat().st_size:,} bytes)") + + # Copy to temporary file first + shutil.copy2(source_path, temp_dest) + + # Verify copy completed successfully + if temp_dest.stat().st_size != source_path.stat().st_size: + raise Exception(f"Copy verification failed: size mismatch") + + # Move temp file to final destination + if dest_path.exists(): + dest_path.unlink() # Remove existing file + temp_dest.rename(dest_path) + + copy_time = time.time() - start_time + final_size = dest_path.stat().st_size + + self.logger.info(f"Successfully copied {source_path.name} ({final_size:,} bytes) in {copy_time:.2f}s") + return True + + except Exception as e: + self.logger.error(f"Copy attempt {attempt + 1} failed for {source_path.name}: {e}") + + # Cleanup temporary file if it exists + if temp_dest.exists(): + try: + temp_dest.unlink() + except Exception as cleanup_error: + self.logger.error(f"Failed to cleanup temp file {temp_dest}: {cleanup_error}") + + if attempt == 1: # Last attempt failed + self.logger.error(f"All copy attempts failed for {source_path.name}, giving up") + return False + else: + time.sleep(5) # Wait before retry + + return False + + def run_check(self) -> None: + """Run a single monitoring check cycle.""" + current_time = time.time() + self.logger.info("Starting monitoring check cycle") + + # Scan for current .mkv files + current_files = self._scan_for_mkv_files() + self.logger.info(f"Found {len(current_files)} .mkv files in source directory") + + # Remove files from monitoring that no longer exist + missing_files = set(self.monitored_files.keys()) - set(current_files.keys()) + for missing_file in missing_files: + self.logger.info(f"File no longer exists, removing from monitoring: {Path(missing_file).name}") + del self.monitored_files[missing_file] + + # Process each current file + files_to_copy = [] + for file_path_str, file_path in current_files.items(): + file_info = self._get_file_info(file_path) + if not file_info: + continue + + current_size, current_mtime = file_info + + # Update or create file state + if file_path_str in self.monitored_files: + file_state = self.monitored_files[file_path_str] + file_state.check_count += 1 + + # Check if size changed + if current_size != file_state.size: + file_state.size = current_size + file_state.last_size_change = current_time + self.logger.debug(f"Size changed for {file_path.name}: {current_size:,} bytes") + + file_state.last_modified = current_mtime + + else: + # New file discovered - validate before tracking + if not self._validate_file_pair(file_path, current_size): + # File doesn't have a matching pair in media directory, skip tracking + continue + + file_state = FileState( + path=file_path_str, + size=current_size, + last_modified=current_mtime, + first_seen=current_time, + last_size_change=current_time, + check_count=1 + ) + self.monitored_files[file_path_str] = file_state + self.logger.info(f"Started monitoring validated file: {file_path.name} ({current_size:,} bytes)") + + # Log current state + stale_time = current_time - file_state.last_size_change + self.logger.info(f"Checking {file_path.name}: {current_size:,} bytes, stale for {stale_time:.1f}s (checks: {file_state.check_count})") + + # Check if file is complete + if self._is_file_complete(file_state, current_time): + dest_path = self.dest_dir / file_path.name + if self._should_copy_file(file_path, dest_path): + files_to_copy.append((file_path, dest_path, file_state)) + + # Copy completed files + for source_path, dest_path, file_state in files_to_copy: + self.logger.info(f"File appears complete: {source_path.name} (stable for {current_time - file_state.last_size_change:.1f}s)") + + if self._copy_file_with_retry(source_path, dest_path): + # Remove from monitoring after successful copy + del self.monitored_files[str(source_path)] + self.logger.info(f"Successfully processed and removed from monitoring: {source_path.name}") + else: + self.logger.error(f"Failed to copy {source_path.name}, will continue monitoring") + + # Save state + self._save_state() + + self.logger.info(f"Check cycle completed, monitoring {len(self.monitored_files)} files") + + +def main(): + """Main entry point for the script.""" + monitor = TdarrFileMonitor() + monitor.run_check() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tdarr/archive/tdarr_monitor_local_node.py b/tdarr/archive/tdarr_monitor_local_node.py new file mode 100755 index 0000000..ad546d1 --- /dev/null +++ b/tdarr/archive/tdarr_monitor_local_node.py @@ -0,0 +1,1227 @@ +#!/usr/bin/env python3 +""" +Tdarr API Monitoring Script with Stuck Job Detection and Discord Alerts + +Monitors Tdarr server via its web API endpoints: +- Server status and health +- Queue status and statistics +- Node status and performance +- Library scan progress +- Worker activity +- Stuck job detection with configurable timeouts +- Discord notifications for alerts and status updates + +Usage: + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check queue + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes + + # Enable stuck job detection (30 minute threshold) + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck + + # Custom stuck threshold (15 minutes) + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check all --detect-stuck --stuck-threshold 15 + + # Enable Discord alerts for stuck jobs (uses default webhook) + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --discord-alerts + + # Automatically clear hung workers when detected + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers + + # Full monitoring with automatic clearing and Discord alerts + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --detect-stuck --clear-hung-workers --discord-alerts + + # Test Discord integration (uses default webhook) + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --discord-test + + # Enable file logging with custom path and debug level + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --log-file /tmp/tdarr_debug.log --log-level DEBUG + + # Disable file logging (console only) + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --check nodes --no-log-file + + # Clear memory state + python3 tdarr_monitor.py --server http://10.10.0.43:8265 --clear-memory +""" + +import argparse +import json +import logging +import logging.handlers +import sys +import os +import pickle +from dataclasses import dataclass, asdict +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Union +import requests +from urllib.parse import urljoin + + + +@dataclass +class WorkerSnapshot: + worker_id: str + node_id: str + worker_type: str + file: str + percentage: float + status: str + fps: int + eta: str + timestamp: datetime + + +@dataclass +class StuckJob: + worker_snapshot: WorkerSnapshot + first_seen: datetime + stuck_duration_minutes: float + is_stuck: bool = True + + +@dataclass +class MemoryState: + worker_snapshots: Dict[str, WorkerSnapshot] + stuck_jobs: Dict[str, StuckJob] + last_updated: datetime + + +@dataclass +class ServerStatus: + timestamp: str + server_url: str + status: str + error: Optional[str] = None + version: Optional[str] = None + server_id: Optional[str] = None + uptime: Optional[str] = None + system_info: Optional[Dict[str, Any]] = None + + +@dataclass +class QueueStats: + total_files: int + queued: int + processing: int + completed: int + queue_items: List[Dict[str, Any]] + + +@dataclass +class QueueStatus: + timestamp: str + queue_stats: Optional[QueueStats] = None + error: Optional[str] = None + + +@dataclass +class NodeInfo: + id: Optional[str] + nodeName: Optional[str] + status: str + lastSeen: Optional[int] + version: Optional[str] + platform: Optional[str] + workers: Dict[str, int] + processing: List[Dict[str, Any]] + + +@dataclass +class NodeSummary: + total_nodes: int + online_nodes: int + offline_nodes: int + online_details: List[NodeInfo] + offline_details: List[NodeInfo] + + +@dataclass +class NodeStatus: + timestamp: str + nodes: List[Dict[str, Any]] + node_summary: Optional[NodeSummary] = None + stuck_jobs: List[StuckJob] = None + error: Optional[str] = None + + +@dataclass +class LibraryInfo: + name: Optional[str] + path: Optional[str] + file_count: int + scan_progress: int + last_scan: Optional[str] + is_scanning: bool + + +@dataclass +class ScanStatus: + total_libraries: int + total_files: int + scanning_libraries: int + + +@dataclass +class LibraryStatus: + timestamp: str + libraries: List[LibraryInfo] + scan_status: Optional[ScanStatus] = None + error: Optional[str] = None + + +@dataclass +class Statistics: + total_transcodes: int + space_saved: int + total_files_processed: int + failed_transcodes: int + processing_speed: int + eta: Optional[str] + + +@dataclass +class StatisticsStatus: + timestamp: str + statistics: Optional[Statistics] = None + error: Optional[str] = None + + +@dataclass +class HealthCheck: + status: str + healthy: bool + online_count: Optional[int] = None + total_count: Optional[int] = None + accessible: Optional[bool] = None + total_items: Optional[int] = None + + +@dataclass +class HealthStatus: + timestamp: str + overall_status: str + checks: Dict[str, HealthCheck] + + +@dataclass +class DiscordEmbedField: + name: str + value: str + inline: bool = False + + +@dataclass +class DiscordEmbed: + title: str + description: str + color: int + fields: List[DiscordEmbedField] = None + timestamp: str = None + + def __post_init__(self): + if self.fields is None: + self.fields = [] + if self.timestamp is None: + self.timestamp = datetime.now().isoformat() + + +class DiscordNotifier: + def __init__(self, webhook_url: str, timeout: int = 10): + """Initialize Discord notifier with webhook URL.""" + self.webhook_url = webhook_url + self.timeout = timeout + self.session = requests.Session() + self.logger = logging.getLogger(f"{__name__}.DiscordNotifier") + + def send_content_message(self, content: str, username: str = "Tdarr Monitor") -> bool: + """Send a simple content message to Discord. + + Args: + content: The message content to send + username: Bot username to display + + Returns: + True if successful, False otherwise + """ + payload = { + "content": content, + "username": username + } + + return self._send_webhook(payload) + + def send_embed_message(self, + title: str, + description: str, + color: int = 0xff6b6b, # Red by default + fields: List[DiscordEmbedField] = None, + username: str = "Tdarr Monitor") -> bool: + """Send an embed message to Discord. + + Args: + title: Embed title + description: Embed description + color: Embed color (hex integer, default red) + fields: List of embed fields + username: Bot username to display + + Returns: + True if successful, False otherwise + """ + embed = DiscordEmbed( + title=title, + description=description, + color=color, + fields=fields or [] + ) + + payload = { + "username": username, + "embeds": [asdict(embed)] + } + + return self._send_webhook(payload) + + def send_stuck_job_alert(self, stuck_jobs: List[StuckJob]) -> bool: + """Send alert for stuck jobs using embed format. + + Args: + stuck_jobs: List of stuck jobs to report + + Returns: + True if successful, False otherwise + """ + if not stuck_jobs: + return True + + # Create embed fields for each stuck job + fields = [] + for i, stuck_job in enumerate(stuck_jobs[:10]): # Limit to 10 jobs (Discord embed field limit is 25) + ws = stuck_job.worker_snapshot + field_value = ( + f"**File:** {os.path.basename(ws.file)}\n" + f"**Status:** {ws.status}\n" + f"**Progress:** {ws.percentage}%\n" + f"**Duration:** {stuck_job.stuck_duration_minutes:.1f} minutes\n" + f"**Node:** {ws.node_id}" + ) + + fields.append(DiscordEmbedField( + name=f"Stuck Job {i+1}: {ws.worker_id}", + value=field_value, + inline=True + )) + + # Add summary field if there are more jobs + if len(stuck_jobs) > 10: + fields.append(DiscordEmbedField( + name="Additional Jobs", + value=f"... and {len(stuck_jobs) - 10} more stuck jobs", + inline=False + )) + + title = f"๐Ÿšจ Tdarr Stuck Jobs Detected ({len(stuck_jobs)})" + description = ( + f"Detected {len(stuck_jobs)} stuck job{'s' if len(stuck_jobs) != 1 else ''} " + f"in your Tdarr system. These jobs will follow the requested clear behavior." + ) + + return self.send_embed_message( + title=title, + description=description, + color=0xff6b6b, # Red color for alerts + fields=fields + ) + + def send_system_status(self, + server_status: ServerStatus, + node_status: NodeStatus, + stuck_jobs: List[StuckJob] = None) -> bool: + """Send system status summary using embed format. + + Args: + server_status: Server status information + node_status: Node status information + stuck_jobs: Optional stuck jobs list + + Returns: + True if successful, False otherwise + """ + # Determine overall health color + is_healthy = ( + server_status.status == "good" and + not server_status.error and + not node_status.error and + (not stuck_jobs or len(stuck_jobs) == 0) + ) + + color = 0x28a745 if is_healthy else 0xff6b6b # Green if healthy, red if not + + # Build description + description_parts = [ + f"**Server Status:** {server_status.status.title()}", + f"**Version:** {getattr(server_status, 'version', 'Unknown')}" + ] + + if node_status.node_summary: + description_parts.extend([ + f"**Total Nodes:** {node_status.node_summary.total_nodes}", + f"**Online Nodes:** {node_status.node_summary.online_nodes}", + f"**Offline Nodes:** {node_status.node_summary.offline_nodes}" + ]) + + if stuck_jobs: + description_parts.append(f"**Stuck Jobs:** {len(stuck_jobs)}") + + # Add node details as fields + fields = [] + if node_status.node_summary and node_status.node_summary.online_details: + for node in node_status.node_summary.online_details: + active_workers = len(node.processing) if node.processing else 0 + field_value = ( + f"**Status:** Online\n" + f"**Platform:** {node.platform or 'Unknown'}\n" + f"**Active Workers:** {active_workers}\n" + f"**CPU Workers:** {node.workers.get('cpu', 0)}\n" + f"**GPU Workers:** {node.workers.get('gpu', 0)}" + ) + + fields.append(DiscordEmbedField( + name=f"๐Ÿ“ก {node.nodeName or node.id}", + value=field_value, + inline=True + )) + + title = "๐Ÿ“Š Tdarr System Status" + if not is_healthy: + title = "โš ๏ธ Tdarr System Alert" + + return self.send_embed_message( + title=title, + description="\n".join(description_parts), + color=color, + fields=fields + ) + + def _send_webhook(self, payload: Dict[str, Any]) -> bool: + """Send payload to Discord webhook. + + Args: + payload: JSON payload to send + + Returns: + True if successful, False otherwise + """ + try: + response = self.session.post( + self.webhook_url, + json=payload, + timeout=self.timeout + ) + response.raise_for_status() + self.logger.info("Discord notification sent successfully") + return True + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to send Discord notification: {e}") + return False + except Exception as e: + self.logger.error(f"Unexpected error sending Discord notification: {e}") + return False + + +class StuckJobDetector: + def __init__(self, memory_file: str = "/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl", stuck_threshold_minutes: int = 30): + """Initialize stuck job detector with memory persistence.""" + self.memory_file = os.path.abspath(memory_file) # Use absolute path + self.stuck_threshold_minutes = stuck_threshold_minutes + self.logger = logging.getLogger(f"{__name__}.StuckJobDetector") + self.logger.debug(f"Using memory file: {self.memory_file}") + self.memory_state = self._load_memory_state() + + # Ensure memory directory exists + os.makedirs(os.path.dirname(memory_file), exist_ok=True) + + def _load_memory_state(self) -> MemoryState: + """Load memory state from disk or create new one.""" + if os.path.exists(self.memory_file): + try: + with open(self.memory_file, 'rb') as f: + memory_state = pickle.load(f) + self.logger.debug(f"Loaded memory state: {len(memory_state.worker_snapshots)} workers, {len(memory_state.stuck_jobs)} stuck jobs") + return memory_state + except Exception as e: + self.logger.warning(f"Failed to load memory state: {e}, creating new state") + else: + self.logger.debug(f"Memory file {self.memory_file} does not exist, creating new state") + + return MemoryState( + worker_snapshots={}, + stuck_jobs={}, + last_updated=datetime.now() + ) + + def _save_memory_state(self): + """Save memory state to disk.""" + try: + with open(self.memory_file, 'wb') as f: + pickle.dump(self.memory_state, f) + except Exception as e: + self.logger.error(f"Failed to save memory state: {e}") + + def _create_worker_key(self, node_id: str, worker_id: str) -> str: + """Create unique key for worker identification.""" + return f"{node_id}:{worker_id}" + + def _is_worker_stuck(self, current: WorkerSnapshot, previous: WorkerSnapshot) -> bool: + """Check if worker is stuck based on comparison with previous snapshot.""" + worker_key = f"{current.node_id}:{current.worker_id}" + + # Check each condition individually for detailed logging + file_same = current.file == previous.file + percentage_same = current.percentage == previous.percentage + status_same = current.status == previous.status + fps_same = current.fps == previous.fps + eta_same = current.eta == previous.eta + + is_stuck = file_same and percentage_same and status_same and fps_same and eta_same + + # Log detailed comparison info + self.logger.debug(f"Worker {worker_key} stuck check:") + self.logger.debug(f" File: '{current.file}' == '{previous.file}' = {file_same}") + self.logger.debug(f" Percentage: {current.percentage}% == {previous.percentage}% = {percentage_same}") + self.logger.debug(f" Status: '{current.status}' == '{previous.status}' = {status_same}") + self.logger.debug(f" FPS: {current.fps} == {previous.fps} = {fps_same}") + self.logger.debug(f" ETA: '{current.eta}' == '{previous.eta}' = {eta_same}") + self.logger.debug(f" โ†’ Result: {'STUCK' if is_stuck else 'NOT STUCK'}") + + # Log INFO level when we detect changes (worker making progress) + if not is_stuck: + if not percentage_same: + self.logger.info(f"Worker {worker_key} making progress: {previous.percentage}% โ†’ {current.percentage}%") + elif not status_same: + self.logger.info(f"Worker {worker_key} status changed: '{previous.status}' โ†’ '{current.status}'") + elif not file_same: + self.logger.info(f"Worker {worker_key} file changed: '{previous.file}' โ†’ '{current.file}'") + + return is_stuck + + def update_workers(self, nodes_data: Dict[str, Any]) -> List[StuckJob]: + """Update worker snapshots and detect stuck jobs.""" + current_time = datetime.now() + current_workers = {} + detected_stuck_jobs = [] + + # Extract current worker states from nodes data + for node_id, node_data in nodes_data.items(): + workers = node_data.get('workers', {}) + for worker_id, worker_data in workers.items(): + worker_key = self._create_worker_key(node_id, worker_id) + + # Create current snapshot + current_snapshot = WorkerSnapshot( + worker_id=worker_id, + node_id=node_id, + worker_type=worker_data.get('workerType', 'unknown'), + file=worker_data.get('file', ''), + percentage=worker_data.get('percentage', -1), + status=worker_data.get('status', ''), + fps=worker_data.get('fps', 0), + eta=worker_data.get('ETA', ''), + timestamp=current_time + ) + + current_workers[worker_key] = current_snapshot + + # Log all workers being tracked + self.logger.debug(f"Tracking worker {worker_key}: {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'") + + # Check if worker was previously tracked + if worker_key in self.memory_state.worker_snapshots: + previous_snapshot = self.memory_state.worker_snapshots[worker_key] + + # Check if worker is stuck + if self._is_worker_stuck(current_snapshot, previous_snapshot): + # Calculate how long it's been stuck + time_since_previous = (current_time - previous_snapshot.timestamp).total_seconds() / 60 + self.logger.debug(f"Worker {worker_key} has been stuck for {time_since_previous:.1f} minutes since last check") + self.logger.debug(f"Worker {worker_key} checking stuck_jobs dict: {list(self.memory_state.stuck_jobs.keys())}") + + if worker_key in self.memory_state.stuck_jobs: + # Already known stuck job, update duration + stuck_job = self.memory_state.stuck_jobs[worker_key] + stuck_duration = current_time - stuck_job.first_seen + stuck_job.stuck_duration_minutes = stuck_duration.total_seconds() / 60 + stuck_job.worker_snapshot = current_snapshot + + self.logger.debug(f"Worker {worker_key} known stuck job - duration: {stuck_job.stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min") + if stuck_job.stuck_duration_minutes >= self.stuck_threshold_minutes: + self.logger.debug(f"Worker {worker_key} EXCEEDS threshold - adding to detected stuck jobs") + detected_stuck_jobs.append(stuck_job) + else: + self.logger.debug(f"Worker {worker_key} below threshold - not flagging yet") + else: + # New stuck job detected - add to memory immediately to start tracking + first_seen = previous_snapshot.timestamp + stuck_duration = current_time - first_seen + stuck_duration_minutes = stuck_duration.total_seconds() / 60 + + self.logger.debug(f"Worker {worker_key} NEW stuck job - first_seen: {first_seen}, current: {current_time}") + self.logger.debug(f"Worker {worker_key} NEW stuck job - duration: {stuck_duration_minutes:.1f} min, threshold: {self.stuck_threshold_minutes} min") + + # Create stuck job entry immediately to track duration across runs + stuck_job = StuckJob( + worker_snapshot=current_snapshot, + first_seen=first_seen, + stuck_duration_minutes=stuck_duration_minutes, + is_stuck=True + ) + self.memory_state.stuck_jobs[worker_key] = stuck_job + + if stuck_duration_minutes >= self.stuck_threshold_minutes: + self.logger.debug(f"Worker {worker_key} NEW stuck job EXCEEDS threshold - flagging for clearing") + detected_stuck_jobs.append(stuck_job) + else: + self.logger.debug(f"Worker {worker_key} NEW stuck job below threshold - tracking in memory") + else: + # Worker is not stuck, remove from stuck jobs if present + if worker_key in self.memory_state.stuck_jobs: + del self.memory_state.stuck_jobs[worker_key] + self.logger.info(f"Worker {worker_key} is no longer stuck") + else: + # New worker, start tracking it + self.logger.info(f"New worker detected: {worker_key} - {current_snapshot.status} at {current_snapshot.percentage}% on '{current_snapshot.file}'") + + # Clean up stuck jobs for workers that no longer exist + stuck_jobs_to_remove = [] + for worker_key in self.memory_state.stuck_jobs: + if worker_key not in current_workers: + stuck_jobs_to_remove.append(worker_key) + + for worker_key in stuck_jobs_to_remove: + del self.memory_state.stuck_jobs[worker_key] + self.logger.info(f"Removed stuck job tracking for missing worker: {worker_key}") + + # Update memory state + self.memory_state.worker_snapshots = current_workers + self.memory_state.last_updated = current_time + + # Save to disk + self._save_memory_state() + + return detected_stuck_jobs + + def get_stuck_jobs(self) -> List[StuckJob]: + """Get current list of stuck jobs.""" + return list(self.memory_state.stuck_jobs.values()) + + def clear_memory(self): + """Clear all memory state.""" + self.memory_state = MemoryState( + worker_snapshots={}, + stuck_jobs={}, + last_updated=datetime.now() + ) + self._save_memory_state() + self.logger.info("Memory state cleared") + + +class TdarrMonitor: + def __init__(self, server_url: str, timeout: int = 30, enable_stuck_detection: bool = False, + stuck_threshold_minutes: int = 30, memory_file: str = ".claude/tmp/tdarr_memory.pkl", + discord_webhook_url: str = None, enable_discord_alerts: bool = False, + log_file: Optional[str] = None, log_level: str = "INFO", clear_hung_workers: bool = False): + """Initialize Tdarr monitor with server URL.""" + self.server_url = server_url.rstrip('/') + self.timeout = timeout + self.session = requests.Session() + self.enable_stuck_detection = enable_stuck_detection + self.enable_discord_alerts = enable_discord_alerts + self.clear_hung_workers_enabled = clear_hung_workers + + # Configure logging first + self._setup_logging(log_file, log_level) + self.logger = logging.getLogger(__name__) + + # Initialize stuck job detector if enabled + self.stuck_detector = None + if enable_stuck_detection: + self.stuck_detector = StuckJobDetector(memory_file, stuck_threshold_minutes) + + # Initialize Discord notifier if enabled + self.discord_notifier = None + if enable_discord_alerts: + if discord_webhook_url: + self.discord_notifier = DiscordNotifier(discord_webhook_url) + else: + self.logger.warning("Discord alerts enabled but no webhook URL provided") + + def _setup_logging(self, log_file: Optional[str] = None, log_level: str = "INFO"): + """Configure logging with optional file rotation.""" + # Clear any existing handlers + root_logger = logging.getLogger() + root_logger.handlers.clear() + + # Set log level + level = getattr(logging, log_level.upper(), logging.INFO) + root_logger.setLevel(level) + + # Create formatters + console_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Enhanced file formatter for better readability + file_formatter = logging.Formatter( + '%(asctime)s [%(levelname)-7s] %(module)-12s | %(message)s', + datefmt='%H:%M:%S' + ) + + # Console handler (for interactive use) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(console_formatter) + root_logger.addHandler(console_handler) + + # File handler with rotation (if log_file specified) + if log_file: + # Ensure log directory exists + log_dir = os.path.dirname(log_file) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + + # Rotating file handler: 10MB max, keep 5 backup files + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5, + encoding='utf-8' + ) + file_handler.setFormatter(file_formatter) + root_logger.addHandler(file_handler) + + def _make_request(self, endpoint: str) -> Optional[Dict[str, Any]]: + """Make HTTP request to Tdarr API endpoint.""" + url = urljoin(self.server_url, endpoint) + + try: + response = self.session.get(url, timeout=self.timeout) + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + self.logger.error(f"Request failed for {url}: {e}") + return None + except json.JSONDecodeError as e: + self.logger.error(f"JSON decode failed for {url}: {e}") + return None + + def clear_hung_workers(self, stuck_jobs: Optional[List[StuckJob]] = None) -> bool: + """Clear hung workers via Tdarr API using kill-worker endpoint. + + Args: + stuck_jobs: List of StuckJob objects to clear. Each contains worker and node information. + + Returns: + True if all workers cleared successfully, False otherwise + """ + if not stuck_jobs: + self.logger.info("No stuck jobs provided for clearing hung workers") + return True + + success_count = 0 + total_count = len(stuck_jobs) + + for stuck_job in stuck_jobs: + worker_snapshot = stuck_job.worker_snapshot + try: + # Use the kill-worker endpoint with correct payload format + endpoint = '/api/v2/kill-worker' + payload = { + "data": { + "nodeID": worker_snapshot.node_id, + "workerID": worker_snapshot.worker_id + } + } + + url = urljoin(self.server_url, endpoint) + response = self.session.post(url, json=payload, timeout=self.timeout) + response.raise_for_status() + + self.logger.info(f"Successfully killed hung worker: {worker_snapshot.node_id}:{worker_snapshot.worker_id}") + success_count += 1 + + except requests.exceptions.RequestException as e: + self.logger.error(f"Failed to kill worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}") + except Exception as e: + self.logger.error(f"Unexpected error killing worker {worker_snapshot.node_id}:{worker_snapshot.worker_id}: {e}") + + self.logger.info(f"Cleared {success_count}/{total_count} hung workers") + return success_count == total_count + + def get_server_status(self) -> ServerStatus: + """Get overall server status and configuration.""" + timestamp = datetime.now().isoformat() + + # Try to get server info from API + data = self._make_request('/api/v2/status') + if data: + server_status = data.get('status', 'unknown') + self.logger.info(f"Server check completed: status={server_status}, version={data.get('version', 'unknown')}") + return ServerStatus( + timestamp=timestamp, + server_url=self.server_url, + status=server_status, + version=data.get('version'), + uptime=data.get('uptime') + ) + else: + self.logger.error("Server check failed: Unable to connect to Tdarr server") + return ServerStatus( + timestamp=timestamp, + server_url=self.server_url, + status='offline', + error='Unable to connect to Tdarr server' + ) + + def get_queue_status(self) -> QueueStatus: + """Get transcoding queue status and statistics.""" + timestamp = datetime.now().isoformat() + + # Get queue information + data = self._make_request('/api/v2/get-queue') + if data: + queue_data = data.get('queue', []) + + # Calculate queue statistics + total_files = len(queue_data) + queued_files = len([f for f in queue_data if f.get('status') == 'Queued']) + processing_files = len([f for f in queue_data if f.get('status') == 'Processing']) + completed_files = len([f for f in queue_data if f.get('status') == 'Completed']) + + queue_stats = QueueStats( + total_files=total_files, + queued=queued_files, + processing=processing_files, + completed=completed_files, + queue_items=queue_data[:10] # First 10 items for details + ) + + return QueueStatus( + timestamp=timestamp, + queue_stats=queue_stats + ) + else: + return QueueStatus( + timestamp=timestamp, + error='Unable to fetch queue data' + ) + + def get_node_status(self) -> NodeStatus: + """Get status of all connected nodes.""" + timestamp = datetime.now().isoformat() + + # Get nodes information (using correct endpoint) + data = self._make_request('/api/v2/get-nodes') + if data: + # Handle the actual data structure returned by Tdarr API + nodes_dict = data if isinstance(data, dict) else {} + nodes = [] + + # Process node information + online_nodes = [] + offline_nodes = [] + + for node_id, node_data in nodes_dict.items(): + node_info = NodeInfo( + id=node_id, + nodeName=node_data.get('nodeName'), + status='online', # Assume online if in response + lastSeen=None, + version=node_data.get('config', {}).get('version'), + platform=node_data.get('config', {}).get('platform_arch_isdocker'), + workers={ + 'cpu': len([w for w in node_data.get('workers', {}).values() if 'cpu' in w.get('workerType', '').lower()]), + 'gpu': len([w for w in node_data.get('workers', {}).values() if 'gpu' in w.get('workerType', '').lower()]) + }, + processing=list(node_data.get('workers', {}).values()) + ) + + online_nodes.append(node_info) + nodes.append(node_data) + + # Check for stuck jobs if detection is enabled + stuck_jobs = [] + if self.stuck_detector: + try: + stuck_jobs = self.stuck_detector.update_workers(nodes_dict) + if stuck_jobs: + self.logger.warning(f"Detected {len(stuck_jobs)} stuck jobs") + for stuck_job in stuck_jobs: + self.logger.warning( + f"Stuck job: {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id} " + f"on file '{stuck_job.worker_snapshot.file}' " + f"at {stuck_job.worker_snapshot.percentage}% for {stuck_job.stuck_duration_minutes:.1f} minutes" + ) + + # Clear hung workers if enabled + if self.clear_hung_workers_enabled: + try: + clear_success = self.clear_hung_workers(stuck_jobs) + if clear_success: + self.logger.info(f"Successfully cleared {len(stuck_jobs)} hung workers") + else: + self.logger.warning("Some hung workers could not be cleared") + except Exception as e: + self.logger.error(f"Error clearing hung workers: {e}") + + # Send Discord notification for stuck jobs + if self.discord_notifier: + try: + self.discord_notifier.send_stuck_job_alert(stuck_jobs) + except Exception as e: + self.logger.error(f"Failed to send Discord stuck job alert: {e}") + + except Exception as e: + self.logger.error(f"Error in stuck job detection: {e}") + + node_summary = NodeSummary( + total_nodes=len(nodes), + online_nodes=len(online_nodes), + offline_nodes=len(offline_nodes), + online_details=online_nodes, + offline_details=offline_nodes + ) + + # Log successful node check with summary + if stuck_jobs: + self.logger.info(f"Node check completed: {len(nodes)} nodes online, {len(stuck_jobs)} stuck jobs detected") + else: + self.logger.info(f"Node check completed: {len(nodes)} nodes online, no stuck jobs detected") + + return NodeStatus( + timestamp=timestamp, + nodes=nodes, + node_summary=node_summary, + stuck_jobs=stuck_jobs + ) + else: + self.logger.error("Node check failed: Unable to fetch node data") + return NodeStatus( + timestamp=timestamp, + nodes=[], + error='Unable to fetch node data' + ) + + def get_library_status(self) -> LibraryStatus: + """Get library scan status and file statistics.""" + timestamp = datetime.now().isoformat() + + # Get library information + data = self._make_request('/api/v2/get-libraries') + if data: + libraries = data.get('libraries', []) + + library_stats = [] + total_files = 0 + + for lib in libraries: + lib_info = LibraryInfo( + name=lib.get('name'), + path=lib.get('path'), + file_count=lib.get('totalFiles', 0), + scan_progress=lib.get('scanProgress', 0), + last_scan=lib.get('lastScan'), + is_scanning=lib.get('isScanning', False) + ) + library_stats.append(lib_info) + total_files += lib_info.file_count + + scan_status = ScanStatus( + total_libraries=len(libraries), + total_files=total_files, + scanning_libraries=len([l for l in library_stats if l.is_scanning]) + ) + + return LibraryStatus( + timestamp=timestamp, + libraries=library_stats, + scan_status=scan_status + ) + else: + return LibraryStatus( + timestamp=timestamp, + libraries=[], + error='Unable to fetch library data' + ) + + def get_statistics(self) -> StatisticsStatus: + """Get overall Tdarr statistics and health metrics.""" + timestamp = datetime.now().isoformat() + + # Get statistics + data = self._make_request('/api/v2/get-stats') + if data: + stats = data.get('stats', {}) + statistics = Statistics( + total_transcodes=stats.get('totalTranscodes', 0), + space_saved=stats.get('spaceSaved', 0), + total_files_processed=stats.get('totalFilesProcessed', 0), + failed_transcodes=stats.get('failedTranscodes', 0), + processing_speed=stats.get('processingSpeed', 0), + eta=stats.get('eta') + ) + + return StatisticsStatus( + timestamp=timestamp, + statistics=statistics + ) + else: + return StatisticsStatus( + timestamp=timestamp, + error='Unable to fetch statistics' + ) + + def health_check(self) -> HealthStatus: + """Perform comprehensive health check.""" + timestamp = datetime.now().isoformat() + + # Server connectivity + server_status = self.get_server_status() + server_check = HealthCheck( + status=server_status.status, + healthy=server_status.status == 'good' + ) + + # Node connectivity + node_status = self.get_node_status() + nodes_healthy = ( + node_status.node_summary.online_nodes > 0 if node_status.node_summary else False + ) and not node_status.error + + nodes_check = HealthCheck( + status='online' if nodes_healthy else 'offline', + healthy=nodes_healthy, + online_count=node_status.node_summary.online_nodes if node_status.node_summary else 0, + total_count=node_status.node_summary.total_nodes if node_status.node_summary else 0 + ) + + checks = { + 'server': server_check, + 'nodes': nodes_check + } + + # Determine overall health + all_checks_healthy = all(check.healthy for check in checks.values()) + overall_status = 'healthy' if all_checks_healthy else 'unhealthy' + + return HealthStatus( + timestamp=timestamp, + overall_status=overall_status, + checks=checks + ) + + +def main(): + parser = argparse.ArgumentParser(description='Monitor Tdarr server via API') + parser.add_argument('--server', required=True, help='Tdarr server URL (e.g., http://10.10.0.43:8265)') + parser.add_argument('--check', choices=['all', 'status', 'queue', 'nodes', 'libraries', 'stats', 'health'], + default='health', help='Type of check to perform') + parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds') + parser.add_argument('--output', choices=['json', 'pretty'], default='pretty', help='Output format') + parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') + parser.add_argument('--detect-stuck', action='store_true', help='Enable stuck job detection') + parser.add_argument('--stuck-threshold', type=int, default=30, help='Minutes before job is considered stuck (default: 30)') + parser.add_argument('--memory-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_memory.pkl', help='Path to memory state file') + parser.add_argument('--clear-memory', action='store_true', help='Clear memory state and exit') + parser.add_argument('--discord-webhook', + default='https://discord.com/api/webhooks/1404105821549498398/y2Ud1RK9rzFjv58xbypUfQNe3jrL7ZUq1FkQHa4_dfOHm2ylp93z0f4tY0O8Z-vQgKhD', + help='Discord webhook URL for notifications (default: configured webhook)') + parser.add_argument('--discord-alerts', action='store_true', help='Enable Discord alerts for stuck jobs and system status') + parser.add_argument('--discord-test', action='store_true', help='Send test Discord message and exit') + parser.add_argument('--log-file', default='/mnt/NV2/Development/claude-home/logs/tdarr_monitor.log', + help='Path to log file with rotation (default: /mnt/NV2/Development/claude-home/logs/tdarr_monitor.log)') + parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', + help='Logging level (default: INFO)') + parser.add_argument('--no-log-file', action='store_true', help='Disable file logging, console only') + parser.add_argument('--clear-hung-workers', action='store_true', help='Clear hung workers via API call when stuck jobs are detected') + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Handle clear memory command + if args.clear_memory: + if os.path.exists(args.memory_file): + os.remove(args.memory_file) + print(f"Memory state cleared: {args.memory_file}") + else: + print(f"Memory file does not exist: {args.memory_file}") + sys.exit(0) + + # Handle Discord test command + if args.discord_test: + print("Sending Discord test messages...") + notifier = DiscordNotifier(args.discord_webhook) + + # Test content message + content_success = notifier.send_content_message( + "๐Ÿงช **Tdarr Monitor Test** - Content message working correctly!" + ) + + # Test embed message + test_fields = [ + DiscordEmbedField("Test Field 1", "This is a test value", True), + DiscordEmbedField("Test Field 2", "Another test value", True), + ] + + embed_success = notifier.send_embed_message( + title="๐Ÿงช Tdarr Monitor Test", + description="This is a test embed message to verify Discord integration is working correctly.", + color=0x00ff00, # Green + fields=test_fields + ) + + if content_success and embed_success: + print("โœ… Discord test successful! Both content and embed messages sent.") + sys.exit(0) + else: + print("โŒ Discord test failed. Check webhook URL and permissions.") + sys.exit(1) + + # Initialize monitor + log_file = None if args.no_log_file else args.log_file + monitor = TdarrMonitor( + args.server, + args.timeout, + enable_stuck_detection=args.detect_stuck, + stuck_threshold_minutes=args.stuck_threshold, + memory_file=args.memory_file, + discord_webhook_url=args.discord_webhook, + enable_discord_alerts=args.discord_alerts, + log_file=log_file, + log_level=args.log_level, + clear_hung_workers=args.clear_hung_workers + ) + + # Perform requested check + monitor.logger.info(f"Starting Tdarr monitoring check: {args.check}, stuck_detection={'enabled' if args.detect_stuck else 'disabled'}, clear_workers={'enabled' if args.clear_hung_workers else 'disabled'}") + + result = None + if args.check == 'all': + result = { + 'server_status': monitor.get_server_status(), + 'node_status': monitor.get_node_status() + } + elif args.check == 'status': + result = monitor.get_server_status() + elif args.check == 'queue': + result = monitor.get_queue_status() + elif args.check == 'nodes': + result = monitor.get_node_status() + elif args.check == 'libraries': + result = monitor.get_library_status() + elif args.check == 'stats': + result = monitor.get_statistics() + elif args.check == 'health': + result = monitor.health_check() + + # Output results + if args.output == 'json': + # Convert dataclasses to dictionaries for JSON serialization + if args.check == 'all': + json_result = {} + for key, value in result.items(): + json_result[key] = asdict(value) + print(json.dumps(json_result, indent=2)) + else: + print(json.dumps(asdict(result), indent=2)) + else: + # Pretty print format + print(f"=== Tdarr Monitor Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") + + if args.check == 'health' or (hasattr(result, 'overall_status') and result.overall_status): + health = result if hasattr(result, 'overall_status') else None + if health: + status = health.overall_status + print(f"Overall Status: {status.upper()}") + + if health.checks: + print("\nHealth Checks:") + for check_name, check_data in health.checks.items(): + status_icon = "โœ“" if check_data.healthy else "โœ—" + print(f" {status_icon} {check_name.title()}: {asdict(check_data)}") + + # Display stuck jobs if present + if args.detect_stuck: + if hasattr(result, 'stuck_jobs') and result.stuck_jobs: + print(f"\n=== STUCK JOBS DETECTED ({len(result.stuck_jobs)}) ===") + for stuck_job in result.stuck_jobs: + print(f"๐Ÿšจ {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}") + print(f" File: {stuck_job.worker_snapshot.file}") + print(f" Progress: {stuck_job.worker_snapshot.percentage}%") + print(f" Status: {stuck_job.worker_snapshot.status}") + print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes") + print() + elif args.check in ['nodes', 'all']: + # Check all results for stuck jobs if 'all' is selected + stuck_found = False + if args.check == 'all' and isinstance(result, dict): + for section, data in result.items(): + if hasattr(data, 'stuck_jobs') and data.stuck_jobs: + if not stuck_found: + print(f"\n=== STUCK JOBS DETECTED ===") + stuck_found = True + for stuck_job in data.stuck_jobs: + print(f"๐Ÿšจ {stuck_job.worker_snapshot.node_id}:{stuck_job.worker_snapshot.worker_id}") + print(f" File: {stuck_job.worker_snapshot.file}") + print(f" Progress: {stuck_job.worker_snapshot.percentage}%") + print(f" Status: {stuck_job.worker_snapshot.status}") + print(f" Stuck for: {stuck_job.stuck_duration_minutes:.1f} minutes") + print() + + if not stuck_found: + print(f"\nโœ… No stuck jobs detected (threshold: {args.stuck_threshold} minutes)") + + if args.check == 'all': + for section, data in result.items(): + print(f"\n=== {section.replace('_', ' ').title()} ===") + # Don't print stuck_jobs in JSON format as we already displayed them above + if hasattr(data, 'stuck_jobs'): + data_dict = asdict(data) + data_dict.pop('stuck_jobs', None) + print(json.dumps(data_dict, indent=2)) + else: + print(json.dumps(asdict(data), indent=2)) + elif args.check != 'health': + # Don't print stuck_jobs in JSON format as we already displayed them above + if hasattr(result, 'stuck_jobs'): + result_dict = asdict(result) + result_dict.pop('stuck_jobs', None) + print(json.dumps(result_dict, indent=2)) + else: + print(json.dumps(asdict(result), indent=2)) + + # Exit with appropriate code + if result: + # Check for unhealthy status in health check + if isinstance(result, HealthStatus) and result.overall_status == 'unhealthy': + sys.exit(1) + # Check for errors in individual status objects (all status classes except HealthStatus have error attribute) + elif (isinstance(result, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus)) + and result.error): + sys.exit(1) + # Check for errors in 'all' results + elif isinstance(result, dict): + for status_obj in result.values(): + if (isinstance(status_obj, (ServerStatus, QueueStatus, NodeStatus, LibraryStatus, StatisticsStatus)) + and status_obj.error): + sys.exit(1) + + sys.exit(0) + + +if __name__ == '__main__': + main() \ No newline at end of file