#!/usr/bin/env python3 """ Server Diagnostics Client Library Provides SSH-based diagnostics for homelab troubleshooting """ import json import subprocess from pathlib import Path from typing import Any, Optional, List, Dict import yaml class ServerDiagnostics: """ Main diagnostic client for server troubleshooting. Connects to servers via SSH and executes whitelisted diagnostic commands. Enforces security constraints from config.yaml. """ def __init__(self, config_path: Optional[str] = None): """ Initialize with configuration. Args: config_path: Path to config.yaml. Defaults to same directory. """ if config_path is None: config_path = Path(__file__).parent / "config.yaml" self.config = self._load_config(config_path) self.servers = self.config.get("servers", {}) self.containers = self.config.get("docker_containers", []) self.allowed_commands = self.config.get("diagnostic_commands", {}) self.remediation_commands = self.config.get("remediation_commands", {}) self.denied_patterns = self.config.get("denied_patterns", []) def _load_config(self, path) -> dict: """Load YAML configuration.""" with open(path) as f: return yaml.safe_load(f) def _validate_command(self, command: str) -> bool: """Check command against deny list.""" for pattern in self.denied_patterns: if pattern in command: raise SecurityError(f"Command contains denied pattern: {pattern}") return True def _ssh_exec(self, server: str, command: str) -> dict: """ Execute command on remote server via SSH. Returns: dict with stdout, stderr, returncode """ self._validate_command(command) server_config = self.servers.get(server) if not server_config: raise ValueError(f"Unknown server: {server}") ssh_key = Path(server_config["ssh_key"]).expanduser() ssh_user = server_config["ssh_user"] hostname = server_config["hostname"] ssh_cmd = [ "ssh", "-i", str(ssh_key), "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", f"{ssh_user}@{hostname}", command, ] result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=60) return { "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode, "success": result.returncode == 0, } # === Docker Operations === def get_docker_status(self, server: str, container: Optional[str] = None) -> dict: """ Get Docker container status. Args: server: Server identifier from config container: Specific container name (optional, all if not specified) Returns: dict with container statuses """ if container: cmd = "docker inspect --format '{{json .State}}' " + container result = self._ssh_exec(server, cmd) if result["success"]: try: result["data"] = json.loads(result["stdout"]) except json.JSONDecodeError: result["data"] = None else: # Use Go template format for Docker 20.10 compatibility # Format: Name|Status|State|Ports cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.State}}|{{.Ports}}'" result = self._ssh_exec(server, cmd) if result["success"]: containers = [] for line in result["stdout"].strip().split("\n"): if line: parts = line.split("|") if len(parts) >= 3: containers.append( { "Names": parts[0], "Status": parts[1], "State": parts[2], "Ports": parts[3] if len(parts) > 3 else "", } ) result["data"] = containers return result def docker_logs( self, server: str, container: str, lines: int = 100, log_filter: Optional[str] = None, ) -> dict: """ Get Docker container logs. Args: server: Server identifier container: Container name lines: Number of lines to retrieve log_filter: Optional grep filter pattern Returns: dict with log output """ cmd = f"docker logs --tail {lines} {container} 2>&1" if log_filter: cmd += f" | grep -i '{log_filter}'" return self._ssh_exec(server, cmd) def docker_restart(self, server: str, container: str) -> dict: """ Restart a Docker container (low-risk remediation). Args: server: Server identifier container: Container name Returns: dict with operation result """ # Check if container is allowed to be restarted container_config = next( (c for c in self.containers if c["name"] == container), None ) if not container_config: return { "success": False, "error": f"Container {container} not in monitored list", } if not container_config.get("restart_allowed", False): return { "success": False, "error": f"Container {container} restart not permitted", } cmd = f"docker restart {container}" result = self._ssh_exec(server, cmd) result["action"] = "docker_restart" result["container"] = container return result # === System Diagnostics === def get_metrics(self, server: str, metric_type: str = "all") -> dict: """ Get system metrics from server. Args: server: Server identifier metric_type: Type of metrics (cpu, memory, disk, network, all) Returns: dict with metric data """ metrics = {} if metric_type in ("cpu", "all"): result = self._ssh_exec(server, self.allowed_commands["cpu_usage"]) metrics["cpu"] = result if metric_type in ("memory", "all"): result = self._ssh_exec(server, self.allowed_commands["memory_usage"]) metrics["memory"] = result if metric_type in ("disk", "all"): result = self._ssh_exec(server, self.allowed_commands["disk_usage"]) metrics["disk"] = result if metric_type in ("network", "all"): result = self._ssh_exec(server, self.allowed_commands["network_status"]) metrics["network"] = result return {"server": server, "metrics": metrics} def read_logs( self, server: str, log_type: str, lines: int = 100, log_filter: Optional[str] = None, custom_path: Optional[str] = None, ) -> dict: """ Read logs from server. Args: server: Server identifier log_type: Type of log (system, docker, application, custom) lines: Number of lines log_filter: Optional grep pattern custom_path: Path for custom log type Returns: dict with log content """ log_paths = { "system": "/var/log/syslog", "docker": "/var/log/docker.log", "application": "/var/log/application.log", } path = custom_path if log_type == "custom" else log_paths.get(log_type) if not path: return {"success": False, "error": f"Unknown log type: {log_type}"} cmd = f"tail -n {lines} {path}" if log_filter: cmd += f" | grep -i '{log_filter}'" return self._ssh_exec(server, cmd) def run_diagnostic( self, server: str, command: str, params: Optional[dict] = None ) -> dict: """ Run a whitelisted diagnostic command. Args: server: Server identifier command: Command key from config whitelist params: Optional parameters to substitute Returns: dict with command output """ if command not in self.allowed_commands: return {"success": False, "error": f"Command '{command}' not in whitelist"} cmd = self.allowed_commands[command] # Substitute parameters if provided if params: for key, value in params.items(): cmd = cmd.replace(f"{{{key}}}", str(value)) return self._ssh_exec(server, cmd) # === Convenience Methods === def quick_health_check(self, server: str) -> dict: """ Perform quick health check on server. Returns summary of Docker containers, disk, and memory. """ health = { "server": server, "docker": self.get_docker_status(server), "metrics": self.get_metrics(server, "all"), "healthy": True, "issues": [], } # Check for stopped containers if health["docker"].get("data"): for container in health["docker"]["data"]: status = container.get("State", container.get("Status", "")) if "Up" not in str(status) and "running" not in str(status).lower(): health["healthy"] = False health["issues"].append( f"Container {container.get('Names', 'unknown')} is not running" ) return health def to_json(self, data: Any) -> str: """Convert result to JSON string.""" return json.dumps(data, indent=2, default=str) class SecurityError(Exception): """Raised when a command violates security constraints.""" pass def main(): """CLI interface for server diagnostics.""" import argparse parser = argparse.ArgumentParser( description="Server Diagnostics CLI", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s docker-status paper-dynasty %(prog)s docker-status paper-dynasty --container paper-dynasty_discord-app_1 %(prog)s docker-logs paper-dynasty paper-dynasty_discord-app_1 --lines 200 %(prog)s docker-restart paper-dynasty paper-dynasty_discord-app_1 %(prog)s metrics paper-dynasty --type all %(prog)s health paper-dynasty %(prog)s diagnostic paper-dynasty disk_usage """, ) subparsers = parser.add_subparsers(dest="command", required=True) # docker-status p_docker = subparsers.add_parser( "docker-status", help="Get Docker container status" ) p_docker.add_argument("server", help="Server identifier") p_docker.add_argument("--container", "-c", help="Specific container name") # docker-logs p_logs = subparsers.add_parser("docker-logs", help="Get Docker container logs") p_logs.add_argument("server", help="Server identifier") p_logs.add_argument("container", help="Container name") p_logs.add_argument("--lines", "-n", type=int, default=100, help="Number of lines") p_logs.add_argument("--filter", "-f", dest="log_filter", help="Grep filter pattern") # docker-restart p_restart = subparsers.add_parser("docker-restart", help="Restart Docker container") p_restart.add_argument("server", help="Server identifier") p_restart.add_argument("container", help="Container name") # metrics p_metrics = subparsers.add_parser("metrics", help="Get system metrics") p_metrics.add_argument("server", help="Server identifier") p_metrics.add_argument( "--type", "-t", default="all", choices=["cpu", "memory", "disk", "network", "all"], help="Metric type", ) # logs p_syslogs = subparsers.add_parser("logs", help="Read system logs") p_syslogs.add_argument("server", help="Server identifier") p_syslogs.add_argument( "--type", "-t", default="system", choices=["system", "docker", "application", "custom"], help="Log type", ) p_syslogs.add_argument( "--lines", "-n", type=int, default=100, help="Number of lines" ) p_syslogs.add_argument( "--filter", "-f", dest="log_filter", help="Grep filter pattern" ) p_syslogs.add_argument("--path", help="Custom log path (for type=custom)") # health p_health = subparsers.add_parser("health", help="Quick health check") p_health.add_argument("server", help="Server identifier") # diagnostic p_diag = subparsers.add_parser("diagnostic", help="Run whitelisted diagnostic") p_diag.add_argument("server", help="Server identifier") p_diag.add_argument("diagnostic_cmd", help="Command from whitelist") p_diag.add_argument( "--params", "-p", help="JSON parameters for command substitution" ) args = parser.parse_args() client = ServerDiagnostics() if args.command == "docker-status": result = client.get_docker_status(args.server, args.container) elif args.command == "docker-logs": result = client.docker_logs( args.server, args.container, args.lines, args.log_filter ) elif args.command == "docker-restart": result = client.docker_restart(args.server, args.container) elif args.command == "metrics": result = client.get_metrics(args.server, args.type) elif args.command == "logs": result = client.read_logs( args.server, args.type, args.lines, args.log_filter, args.path ) elif args.command == "health": result = client.quick_health_check(args.server) elif args.command == "diagnostic": params = json.loads(args.params) if args.params else None result = client.run_diagnostic(args.server, args.diagnostic_cmd, params) print(client.to_json(result)) if __name__ == "__main__": main()