claude-home/monitoring/recovered-lxc300/server-diagnostics/client.py
Cal Corum 28abde7c9f chore: add recovered CT 302 configs, archive tdarr scripts, clean up repo
- Add recovered LXC 300/302 server-diagnostics configs as reference
  (headless Claude permission patterns, health check client)
- Archive decommissioned tdarr monitoring scripts
- Gitignore rpg-art/ directory
- Delete stray temp files and swarm-test/

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 00:41:41 -06:00

444 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Server Diagnostics Client Library
Provides SSH-based diagnostics for homelab troubleshooting
"""
import json
import subprocess
from pathlib import Path
from typing import Any, Optional, List, Dict
import yaml
class ServerDiagnostics:
"""
Main diagnostic client for server troubleshooting.
Connects to servers via SSH and executes whitelisted diagnostic
commands. Enforces security constraints from config.yaml.
"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize with configuration.
Args:
config_path: Path to config.yaml. Defaults to same directory.
"""
if config_path is None:
config_path = Path(__file__).parent / "config.yaml"
self.config = self._load_config(config_path)
self.servers = self.config.get("servers", {})
self.containers = self.config.get("docker_containers", [])
self.allowed_commands = self.config.get("diagnostic_commands", {})
self.remediation_commands = self.config.get("remediation_commands", {})
self.denied_patterns = self.config.get("denied_patterns", [])
def _load_config(self, path) -> dict:
"""Load YAML configuration."""
with open(path) as f:
return yaml.safe_load(f)
def _validate_command(self, command: str) -> bool:
"""Check command against deny list."""
for pattern in self.denied_patterns:
if pattern in command:
raise SecurityError(f"Command contains denied pattern: {pattern}")
return True
def _ssh_exec(self, server: str, command: str) -> dict:
"""
Execute command on remote server via SSH.
Returns:
dict with stdout, stderr, returncode
"""
self._validate_command(command)
server_config = self.servers.get(server)
if not server_config:
raise ValueError(f"Unknown server: {server}")
ssh_key = Path(server_config["ssh_key"]).expanduser()
ssh_user = server_config["ssh_user"]
hostname = server_config["hostname"]
ssh_cmd = [
"ssh",
"-i",
str(ssh_key),
"-o",
"StrictHostKeyChecking=no",
"-o",
"ConnectTimeout=10",
f"{ssh_user}@{hostname}",
command,
]
result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=60)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
"success": result.returncode == 0,
}
# === Docker Operations ===
def get_docker_status(self, server: str, container: Optional[str] = None) -> dict:
"""
Get Docker container status.
Args:
server: Server identifier from config
container: Specific container name (optional, all if not specified)
Returns:
dict with container statuses
"""
if container:
cmd = "docker inspect --format '{{json .State}}' " + container
result = self._ssh_exec(server, cmd)
if result["success"]:
try:
result["data"] = json.loads(result["stdout"])
except json.JSONDecodeError:
result["data"] = None
else:
# Use Go template format for Docker 20.10 compatibility
# Format: Name|Status|State|Ports
cmd = "docker ps -a --format '{{.Names}}|{{.Status}}|{{.State}}|{{.Ports}}'"
result = self._ssh_exec(server, cmd)
if result["success"]:
containers = []
for line in result["stdout"].strip().split("\n"):
if line:
parts = line.split("|")
if len(parts) >= 3:
containers.append(
{
"Names": parts[0],
"Status": parts[1],
"State": parts[2],
"Ports": parts[3] if len(parts) > 3 else "",
}
)
result["data"] = containers
return result
def docker_logs(
self,
server: str,
container: str,
lines: int = 100,
log_filter: Optional[str] = None,
) -> dict:
"""
Get Docker container logs.
Args:
server: Server identifier
container: Container name
lines: Number of lines to retrieve
log_filter: Optional grep filter pattern
Returns:
dict with log output
"""
cmd = f"docker logs --tail {lines} {container} 2>&1"
if log_filter:
cmd += f" | grep -i '{log_filter}'"
return self._ssh_exec(server, cmd)
def docker_restart(self, server: str, container: str) -> dict:
"""
Restart a Docker container (low-risk remediation).
Args:
server: Server identifier
container: Container name
Returns:
dict with operation result
"""
# Check if container is allowed to be restarted
container_config = next(
(c for c in self.containers if c["name"] == container), None
)
if not container_config:
return {
"success": False,
"error": f"Container {container} not in monitored list",
}
if not container_config.get("restart_allowed", False):
return {
"success": False,
"error": f"Container {container} restart not permitted",
}
cmd = f"docker restart {container}"
result = self._ssh_exec(server, cmd)
result["action"] = "docker_restart"
result["container"] = container
return result
# === System Diagnostics ===
def get_metrics(self, server: str, metric_type: str = "all") -> dict:
"""
Get system metrics from server.
Args:
server: Server identifier
metric_type: Type of metrics (cpu, memory, disk, network, all)
Returns:
dict with metric data
"""
metrics = {}
if metric_type in ("cpu", "all"):
result = self._ssh_exec(server, self.allowed_commands["cpu_usage"])
metrics["cpu"] = result
if metric_type in ("memory", "all"):
result = self._ssh_exec(server, self.allowed_commands["memory_usage"])
metrics["memory"] = result
if metric_type in ("disk", "all"):
result = self._ssh_exec(server, self.allowed_commands["disk_usage"])
metrics["disk"] = result
if metric_type in ("network", "all"):
result = self._ssh_exec(server, self.allowed_commands["network_status"])
metrics["network"] = result
return {"server": server, "metrics": metrics}
def read_logs(
self,
server: str,
log_type: str,
lines: int = 100,
log_filter: Optional[str] = None,
custom_path: Optional[str] = None,
) -> dict:
"""
Read logs from server.
Args:
server: Server identifier
log_type: Type of log (system, docker, application, custom)
lines: Number of lines
log_filter: Optional grep pattern
custom_path: Path for custom log type
Returns:
dict with log content
"""
log_paths = {
"system": "/var/log/syslog",
"docker": "/var/log/docker.log",
"application": "/var/log/application.log",
}
path = custom_path if log_type == "custom" else log_paths.get(log_type)
if not path:
return {"success": False, "error": f"Unknown log type: {log_type}"}
cmd = f"tail -n {lines} {path}"
if log_filter:
cmd += f" | grep -i '{log_filter}'"
return self._ssh_exec(server, cmd)
def run_diagnostic(
self, server: str, command: str, params: Optional[dict] = None
) -> dict:
"""
Run a whitelisted diagnostic command.
Args:
server: Server identifier
command: Command key from config whitelist
params: Optional parameters to substitute
Returns:
dict with command output
"""
if command not in self.allowed_commands:
return {"success": False, "error": f"Command '{command}' not in whitelist"}
cmd = self.allowed_commands[command]
# Substitute parameters if provided
if params:
for key, value in params.items():
cmd = cmd.replace(f"{{{key}}}", str(value))
return self._ssh_exec(server, cmd)
# === Convenience Methods ===
def quick_health_check(self, server: str) -> dict:
"""
Perform quick health check on server.
Returns summary of Docker containers, disk, and memory.
"""
health = {
"server": server,
"docker": self.get_docker_status(server),
"metrics": self.get_metrics(server, "all"),
"healthy": True,
"issues": [],
}
# Check for stopped containers
if health["docker"].get("data"):
for container in health["docker"]["data"]:
status = container.get("State", container.get("Status", ""))
if "Up" not in str(status) and "running" not in str(status).lower():
health["healthy"] = False
health["issues"].append(
f"Container {container.get('Names', 'unknown')} is not running"
)
return health
def to_json(self, data: Any) -> str:
"""Convert result to JSON string."""
return json.dumps(data, indent=2, default=str)
class SecurityError(Exception):
"""Raised when a command violates security constraints."""
pass
def main():
"""CLI interface for server diagnostics."""
import argparse
parser = argparse.ArgumentParser(
description="Server Diagnostics CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s docker-status paper-dynasty
%(prog)s docker-status paper-dynasty --container paper-dynasty_discord-app_1
%(prog)s docker-logs paper-dynasty paper-dynasty_discord-app_1 --lines 200
%(prog)s docker-restart paper-dynasty paper-dynasty_discord-app_1
%(prog)s metrics paper-dynasty --type all
%(prog)s health paper-dynasty
%(prog)s diagnostic paper-dynasty disk_usage
""",
)
subparsers = parser.add_subparsers(dest="command", required=True)
# docker-status
p_docker = subparsers.add_parser(
"docker-status", help="Get Docker container status"
)
p_docker.add_argument("server", help="Server identifier")
p_docker.add_argument("--container", "-c", help="Specific container name")
# docker-logs
p_logs = subparsers.add_parser("docker-logs", help="Get Docker container logs")
p_logs.add_argument("server", help="Server identifier")
p_logs.add_argument("container", help="Container name")
p_logs.add_argument("--lines", "-n", type=int, default=100, help="Number of lines")
p_logs.add_argument("--filter", "-f", dest="log_filter", help="Grep filter pattern")
# docker-restart
p_restart = subparsers.add_parser("docker-restart", help="Restart Docker container")
p_restart.add_argument("server", help="Server identifier")
p_restart.add_argument("container", help="Container name")
# metrics
p_metrics = subparsers.add_parser("metrics", help="Get system metrics")
p_metrics.add_argument("server", help="Server identifier")
p_metrics.add_argument(
"--type",
"-t",
default="all",
choices=["cpu", "memory", "disk", "network", "all"],
help="Metric type",
)
# logs
p_syslogs = subparsers.add_parser("logs", help="Read system logs")
p_syslogs.add_argument("server", help="Server identifier")
p_syslogs.add_argument(
"--type",
"-t",
default="system",
choices=["system", "docker", "application", "custom"],
help="Log type",
)
p_syslogs.add_argument(
"--lines", "-n", type=int, default=100, help="Number of lines"
)
p_syslogs.add_argument(
"--filter", "-f", dest="log_filter", help="Grep filter pattern"
)
p_syslogs.add_argument("--path", help="Custom log path (for type=custom)")
# health
p_health = subparsers.add_parser("health", help="Quick health check")
p_health.add_argument("server", help="Server identifier")
# diagnostic
p_diag = subparsers.add_parser("diagnostic", help="Run whitelisted diagnostic")
p_diag.add_argument("server", help="Server identifier")
p_diag.add_argument("diagnostic_cmd", help="Command from whitelist")
p_diag.add_argument(
"--params", "-p", help="JSON parameters for command substitution"
)
args = parser.parse_args()
client = ServerDiagnostics()
if args.command == "docker-status":
result = client.get_docker_status(args.server, args.container)
elif args.command == "docker-logs":
result = client.docker_logs(
args.server, args.container, args.lines, args.log_filter
)
elif args.command == "docker-restart":
result = client.docker_restart(args.server, args.container)
elif args.command == "metrics":
result = client.get_metrics(args.server, args.type)
elif args.command == "logs":
result = client.read_logs(
args.server, args.type, args.lines, args.log_filter, args.path
)
elif args.command == "health":
result = client.quick_health_check(args.server)
elif args.command == "diagnostic":
params = json.loads(args.params) if args.params else None
result = client.run_diagnostic(args.server, args.diagnostic_cmd, params)
print(client.to_json(result))
if __name__ == "__main__":
main()