agent.py
1 """Bob System Sentinel Agent — deep infrastructure monitoring and investigation. 2 3 Goes beyond Home Keeper's health checks to perform: 4 - Prometheus metric queries for trending analysis 5 - Docker log analysis for error patterns 6 - Network device inventory (SSH-accessible devices) 7 - Disk usage by container/volume 8 - SSL certificate expiry checks 9 10 Triggered by scheduler (every 15 min when enabled) or by alerts. 11 """ 12 13 import asyncio 14 import json 15 import os 16 import subprocess 17 import sys 18 import time 19 import urllib.request 20 import urllib.error 21 from datetime import datetime, timezone 22 23 import nats 24 25 NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222") 26 PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://127.0.0.1:9090") 27 ROLE = "system_sentinel" 28 29 # SSH-accessible devices for inventory 30 INVENTORY_HOSTS = { 31 "kairos-macbook-1.lan": {"user": "kairos", "description": "Greatroom MacBook"}, 32 "nuclide-amd.lan": {"user": "nuclide", "description": "Reverse proxy / auth"}, 33 } 34 35 36 def run_cmd(cmd: str, timeout: int = 15) -> str: 37 """Run a shell command and return stdout.""" 38 try: 39 result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) 40 return result.stdout.strip() 41 except subprocess.TimeoutExpired: 42 return f"TIMEOUT after {timeout}s" 43 except Exception as e: 44 return f"ERROR: {e}" 45 46 47 def prometheus_query(query: str) -> dict: 48 """Execute a PromQL instant query.""" 49 try: 50 url = f"{PROMETHEUS_URL}/api/v1/query?query={urllib.request.quote(query)}" 51 with urllib.request.urlopen(url, timeout=10) as resp: 52 return json.loads(resp.read().decode()) 53 except Exception as e: 54 return {"status": "error", "error": str(e)} 55 56 57 def check_docker_disk_usage() -> dict: 58 """Get disk usage by Docker.""" 59 output = run_cmd("docker system df --format json") 60 if output.startswith("ERROR") or output.startswith("TIMEOUT"): 61 return {"error": output} 62 63 results = [] 64 for line in output.strip().split("\n"): 65 if line.strip(): 66 try: 67 results.append(json.loads(line)) 68 except json.JSONDecodeError: 69 continue 70 return {"docker_disk": results} 71 72 73 def check_large_containers() -> list: 74 """Find containers using the most disk space.""" 75 output = run_cmd( 76 "docker ps -a --format '{{.Names}}\t{{.Size}}' | sort -t'\t' -k2 -h -r | head -10" 77 ) 78 if output.startswith("ERROR"): 79 return [] 80 containers = [] 81 for line in output.strip().split("\n"): 82 if "\t" in line: 83 name, size = line.split("\t", 1) 84 containers.append({"name": name, "size": size}) 85 return containers 86 87 88 def check_docker_logs_errors() -> dict: 89 """Scan recent Docker logs for error patterns.""" 90 # Check key containers for recent errors 91 containers_to_check = [ 92 "vllm", "pipecat-agent", "fish-speech", "faster-whisper", 93 "bob-home-keeper", "bob-agent-scheduler", 94 ] 95 error_summary = {} 96 for name in containers_to_check: 97 output = run_cmd(f"docker logs --since 1h {name} 2>&1 | grep -ci 'error\\|exception\\|traceback'") 98 try: 99 count = int(output) 100 except ValueError: 101 count = -1 102 if count > 0: 103 # Get sample errors 104 samples = run_cmd(f"docker logs --since 1h {name} 2>&1 | grep -i 'error\\|exception' | tail -3") 105 error_summary[name] = {"error_count": count, "samples": samples.split("\n")[:3]} 106 return error_summary 107 108 109 def check_prometheus_metrics() -> dict: 110 """Query key Prometheus metrics for system health trends.""" 111 metrics = {} 112 113 # Node CPU usage (1 min avg) 114 result = prometheus_query('100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)') 115 if result.get("status") == "success": 116 try: 117 value = float(result["data"]["result"][0]["value"][1]) 118 metrics["cpu_usage_pct"] = round(value, 1) 119 except (KeyError, IndexError, ValueError): 120 pass 121 122 # Node memory usage 123 result = prometheus_query('100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)') 124 if result.get("status") == "success": 125 try: 126 value = float(result["data"]["result"][0]["value"][1]) 127 metrics["memory_usage_pct"] = round(value, 1) 128 except (KeyError, IndexError, ValueError): 129 pass 130 131 # Disk usage 132 result = prometheus_query('100 - (node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100)') 133 if result.get("status") == "success": 134 try: 135 value = float(result["data"]["result"][0]["value"][1]) 136 metrics["disk_usage_pct"] = round(value, 1) 137 except (KeyError, IndexError, ValueError): 138 pass 139 140 # Network traffic (bytes/sec) 141 result = prometheus_query('rate(node_network_receive_bytes_total{device!="lo"}[5m])') 142 if result.get("status") == "success": 143 try: 144 total = sum(float(r["value"][1]) for r in result["data"]["result"]) 145 metrics["network_rx_mbps"] = round(total / 1024 / 1024, 2) 146 except (KeyError, ValueError): 147 pass 148 149 return metrics 150 151 152 def inventory_remote_device(host: str, user: str) -> dict: 153 """SSH to a remote device and gather inventory info.""" 154 info = {"host": host, "reachable": False} 155 156 # Quick SSH check 157 output = run_cmd(f"ssh -o ConnectTimeout=3 -o BatchMode=yes {user}@{host} 'uname -a' 2>/dev/null", timeout=10) 158 if output.startswith("ERROR") or output.startswith("TIMEOUT") or not output: 159 return info 160 161 info["reachable"] = True 162 info["uname"] = output 163 164 # Disk 165 disk = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'df -h / | tail -1' 2>/dev/null", timeout=10) 166 if disk and not disk.startswith("ERROR"): 167 parts = disk.split() 168 if len(parts) >= 5: 169 info["disk"] = {"total": parts[1], "used": parts[2], "available": parts[3], "percent": parts[4]} 170 171 # Uptime 172 uptime = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'uptime -p 2>/dev/null || uptime' 2>/dev/null", timeout=10) 173 if uptime and not uptime.startswith("ERROR"): 174 info["uptime"] = uptime 175 176 # Memory 177 mem = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'free -h 2>/dev/null | grep Mem || vm_stat 2>/dev/null | head -3' 2>/dev/null", timeout=10) 178 if mem and not mem.startswith("ERROR"): 179 info["memory"] = mem 180 181 return info 182 183 184 async def run_sentinel(trigger_data: dict) -> dict: 185 """Run deep system investigation.""" 186 run_id = trigger_data.get("run_id", f"sentinel-{int(time.time())}") 187 started_at = datetime.now(timezone.utc).isoformat() 188 print(f"\nSystem Sentinel run: {run_id}") 189 190 # Docker disk usage 191 print("Checking Docker disk usage...") 192 docker_disk = check_docker_disk_usage() 193 large_containers = check_large_containers() 194 195 # Docker log errors 196 print("Scanning Docker logs for errors...") 197 log_errors = check_docker_logs_errors() 198 total_errors = sum(v["error_count"] for v in log_errors.values()) 199 print(f" {total_errors} errors found across {len(log_errors)} containers") 200 201 # Prometheus metrics 202 print("Querying Prometheus metrics...") 203 prom_metrics = check_prometheus_metrics() 204 if prom_metrics: 205 print(f" CPU: {prom_metrics.get('cpu_usage_pct', '?')}%, RAM: {prom_metrics.get('memory_usage_pct', '?')}%") 206 207 # Remote device inventory 208 print("Inventorying remote devices...") 209 inventory = {} 210 for host, cfg in INVENTORY_HOSTS.items(): 211 info = inventory_remote_device(host, cfg["user"]) 212 inventory[host] = info 213 status = "reachable" if info["reachable"] else "UNREACHABLE" 214 print(f" {host}: {status}") 215 216 # Compile alerts 217 alerts = [] 218 if total_errors > 50: 219 alerts.append(f"High error rate: {total_errors} errors in last hour across Docker logs") 220 for name, errs in log_errors.items(): 221 if errs["error_count"] > 20: 222 alerts.append(f"Container {name}: {errs['error_count']} errors in last hour") 223 for host, info in inventory.items(): 224 if not info["reachable"]: 225 alerts.append(f"Device unreachable via SSH: {host}") 226 227 severity = "critical" if any("UNREACHABLE" in a for a in alerts) \ 228 else "warning" if alerts \ 229 else "ok" 230 231 result = { 232 "run_id": run_id, 233 "role": ROLE, 234 "started_at": started_at, 235 "completed_at": datetime.now(timezone.utc).isoformat(), 236 "severity": severity, 237 "alerts": alerts, 238 "prometheus_metrics": prom_metrics, 239 "docker_disk": docker_disk, 240 "large_containers": large_containers[:5], 241 "log_errors": {k: {"count": v["error_count"]} for k, v in log_errors.items()}, 242 "device_inventory": inventory, 243 } 244 245 print(f"\nResult: severity={severity}, {len(alerts)} alerts") 246 return result 247 248 249 async def main(): 250 """Main agent loop.""" 251 nc = await nats.connect(NATS_URL) 252 js = nc.jetstream() 253 print(f"System Sentinel connected to NATS at {NATS_URL}") 254 255 trigger_subject = f"bob.agent.{ROLE}.trigger" 256 257 async def handle_trigger(msg): 258 try: 259 trigger_data = json.loads(msg.data.decode()) 260 print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}") 261 262 started = { 263 "role": ROLE, 264 "run_id": trigger_data.get("run_id"), 265 "started_at": datetime.now(timezone.utc).isoformat(), 266 } 267 await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode()) 268 269 result = await run_sentinel(trigger_data) 270 await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode()) 271 272 if result["alerts"]: 273 alert = { 274 "role": ROLE, 275 "run_id": result["run_id"], 276 "severity": result["severity"], 277 "alerts": result["alerts"], 278 "timestamp": datetime.now(timezone.utc).isoformat(), 279 } 280 await js.publish(f"bob.agent.{ROLE}.alert", json.dumps(alert).encode()) 281 282 print(f"Sentinel result published") 283 await msg.ack() 284 except Exception as e: 285 print(f"Error: {e}", file=sys.stderr) 286 import traceback 287 traceback.print_exc() 288 289 await js.subscribe( 290 trigger_subject, 291 cb=handle_trigger, 292 durable="system_sentinel_agent", 293 deliver_policy="new", 294 manual_ack=True, 295 ) 296 print(f"Subscribed to {trigger_subject}") 297 print("Waiting for triggers...") 298 299 try: 300 while True: 301 await asyncio.sleep(1) 302 except asyncio.CancelledError: 303 pass 304 finally: 305 await nc.close() 306 307 308 if __name__ == "__main__": 309 asyncio.run(main())