agent.py
  1  """Bob System Sentinel Agent — deep infrastructure monitoring and investigation.
  2  
  3  Goes beyond Home Keeper's health checks to perform:
  4  - Prometheus metric queries for trending analysis
  5  - Docker log analysis for error patterns
  6  - Network device inventory (SSH-accessible devices)
  7  - Disk usage by container/volume
  8  - SSL certificate expiry checks
  9  
 10  Triggered by scheduler (every 15 min when enabled) or by alerts.
 11  """
 12  
 13  import asyncio
 14  import json
 15  import os
 16  import subprocess
 17  import sys
 18  import time
 19  import urllib.request
 20  import urllib.error
 21  from datetime import datetime, timezone
 22  
 23  import nats
 24  
 25  NATS_URL = os.getenv("NATS_URL", "nats://127.0.0.1:4222")
 26  PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://127.0.0.1:9090")
 27  ROLE = "system_sentinel"
 28  
 29  # SSH-accessible devices for inventory
 30  INVENTORY_HOSTS = {
 31      "kairos-macbook-1.lan": {"user": "kairos", "description": "Greatroom MacBook"},
 32      "nuclide-amd.lan": {"user": "nuclide", "description": "Reverse proxy / auth"},
 33  }
 34  
 35  
 36  def run_cmd(cmd: str, timeout: int = 15) -> str:
 37      """Run a shell command and return stdout."""
 38      try:
 39          result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
 40          return result.stdout.strip()
 41      except subprocess.TimeoutExpired:
 42          return f"TIMEOUT after {timeout}s"
 43      except Exception as e:
 44          return f"ERROR: {e}"
 45  
 46  
 47  def prometheus_query(query: str) -> dict:
 48      """Execute a PromQL instant query."""
 49      try:
 50          url = f"{PROMETHEUS_URL}/api/v1/query?query={urllib.request.quote(query)}"
 51          with urllib.request.urlopen(url, timeout=10) as resp:
 52              return json.loads(resp.read().decode())
 53      except Exception as e:
 54          return {"status": "error", "error": str(e)}
 55  
 56  
 57  def check_docker_disk_usage() -> dict:
 58      """Get disk usage by Docker."""
 59      output = run_cmd("docker system df --format json")
 60      if output.startswith("ERROR") or output.startswith("TIMEOUT"):
 61          return {"error": output}
 62  
 63      results = []
 64      for line in output.strip().split("\n"):
 65          if line.strip():
 66              try:
 67                  results.append(json.loads(line))
 68              except json.JSONDecodeError:
 69                  continue
 70      return {"docker_disk": results}
 71  
 72  
 73  def check_large_containers() -> list:
 74      """Find containers using the most disk space."""
 75      output = run_cmd(
 76          "docker ps -a --format '{{.Names}}\t{{.Size}}' | sort -t'\t' -k2 -h -r | head -10"
 77      )
 78      if output.startswith("ERROR"):
 79          return []
 80      containers = []
 81      for line in output.strip().split("\n"):
 82          if "\t" in line:
 83              name, size = line.split("\t", 1)
 84              containers.append({"name": name, "size": size})
 85      return containers
 86  
 87  
 88  def check_docker_logs_errors() -> dict:
 89      """Scan recent Docker logs for error patterns."""
 90      # Check key containers for recent errors
 91      containers_to_check = [
 92          "vllm", "pipecat-agent", "fish-speech", "faster-whisper",
 93          "bob-home-keeper", "bob-agent-scheduler",
 94      ]
 95      error_summary = {}
 96      for name in containers_to_check:
 97          output = run_cmd(f"docker logs --since 1h {name} 2>&1 | grep -ci 'error\\|exception\\|traceback'")
 98          try:
 99              count = int(output)
100          except ValueError:
101              count = -1
102          if count > 0:
103              # Get sample errors
104              samples = run_cmd(f"docker logs --since 1h {name} 2>&1 | grep -i 'error\\|exception' | tail -3")
105              error_summary[name] = {"error_count": count, "samples": samples.split("\n")[:3]}
106      return error_summary
107  
108  
109  def check_prometheus_metrics() -> dict:
110      """Query key Prometheus metrics for system health trends."""
111      metrics = {}
112  
113      # Node CPU usage (1 min avg)
114      result = prometheus_query('100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)')
115      if result.get("status") == "success":
116          try:
117              value = float(result["data"]["result"][0]["value"][1])
118              metrics["cpu_usage_pct"] = round(value, 1)
119          except (KeyError, IndexError, ValueError):
120              pass
121  
122      # Node memory usage
123      result = prometheus_query('100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)')
124      if result.get("status") == "success":
125          try:
126              value = float(result["data"]["result"][0]["value"][1])
127              metrics["memory_usage_pct"] = round(value, 1)
128          except (KeyError, IndexError, ValueError):
129              pass
130  
131      # Disk usage
132      result = prometheus_query('100 - (node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"} * 100)')
133      if result.get("status") == "success":
134          try:
135              value = float(result["data"]["result"][0]["value"][1])
136              metrics["disk_usage_pct"] = round(value, 1)
137          except (KeyError, IndexError, ValueError):
138              pass
139  
140      # Network traffic (bytes/sec)
141      result = prometheus_query('rate(node_network_receive_bytes_total{device!="lo"}[5m])')
142      if result.get("status") == "success":
143          try:
144              total = sum(float(r["value"][1]) for r in result["data"]["result"])
145              metrics["network_rx_mbps"] = round(total / 1024 / 1024, 2)
146          except (KeyError, ValueError):
147              pass
148  
149      return metrics
150  
151  
152  def inventory_remote_device(host: str, user: str) -> dict:
153      """SSH to a remote device and gather inventory info."""
154      info = {"host": host, "reachable": False}
155  
156      # Quick SSH check
157      output = run_cmd(f"ssh -o ConnectTimeout=3 -o BatchMode=yes {user}@{host} 'uname -a' 2>/dev/null", timeout=10)
158      if output.startswith("ERROR") or output.startswith("TIMEOUT") or not output:
159          return info
160  
161      info["reachable"] = True
162      info["uname"] = output
163  
164      # Disk
165      disk = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'df -h / | tail -1' 2>/dev/null", timeout=10)
166      if disk and not disk.startswith("ERROR"):
167          parts = disk.split()
168          if len(parts) >= 5:
169              info["disk"] = {"total": parts[1], "used": parts[2], "available": parts[3], "percent": parts[4]}
170  
171      # Uptime
172      uptime = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'uptime -p 2>/dev/null || uptime' 2>/dev/null", timeout=10)
173      if uptime and not uptime.startswith("ERROR"):
174          info["uptime"] = uptime
175  
176      # Memory
177      mem = run_cmd(f"ssh -o ConnectTimeout=3 {user}@{host} 'free -h 2>/dev/null | grep Mem || vm_stat 2>/dev/null | head -3' 2>/dev/null", timeout=10)
178      if mem and not mem.startswith("ERROR"):
179          info["memory"] = mem
180  
181      return info
182  
183  
184  async def run_sentinel(trigger_data: dict) -> dict:
185      """Run deep system investigation."""
186      run_id = trigger_data.get("run_id", f"sentinel-{int(time.time())}")
187      started_at = datetime.now(timezone.utc).isoformat()
188      print(f"\nSystem Sentinel run: {run_id}")
189  
190      # Docker disk usage
191      print("Checking Docker disk usage...")
192      docker_disk = check_docker_disk_usage()
193      large_containers = check_large_containers()
194  
195      # Docker log errors
196      print("Scanning Docker logs for errors...")
197      log_errors = check_docker_logs_errors()
198      total_errors = sum(v["error_count"] for v in log_errors.values())
199      print(f"  {total_errors} errors found across {len(log_errors)} containers")
200  
201      # Prometheus metrics
202      print("Querying Prometheus metrics...")
203      prom_metrics = check_prometheus_metrics()
204      if prom_metrics:
205          print(f"  CPU: {prom_metrics.get('cpu_usage_pct', '?')}%, RAM: {prom_metrics.get('memory_usage_pct', '?')}%")
206  
207      # Remote device inventory
208      print("Inventorying remote devices...")
209      inventory = {}
210      for host, cfg in INVENTORY_HOSTS.items():
211          info = inventory_remote_device(host, cfg["user"])
212          inventory[host] = info
213          status = "reachable" if info["reachable"] else "UNREACHABLE"
214          print(f"  {host}: {status}")
215  
216      # Compile alerts
217      alerts = []
218      if total_errors > 50:
219          alerts.append(f"High error rate: {total_errors} errors in last hour across Docker logs")
220      for name, errs in log_errors.items():
221          if errs["error_count"] > 20:
222              alerts.append(f"Container {name}: {errs['error_count']} errors in last hour")
223      for host, info in inventory.items():
224          if not info["reachable"]:
225              alerts.append(f"Device unreachable via SSH: {host}")
226  
227      severity = "critical" if any("UNREACHABLE" in a for a in alerts) \
228          else "warning" if alerts \
229          else "ok"
230  
231      result = {
232          "run_id": run_id,
233          "role": ROLE,
234          "started_at": started_at,
235          "completed_at": datetime.now(timezone.utc).isoformat(),
236          "severity": severity,
237          "alerts": alerts,
238          "prometheus_metrics": prom_metrics,
239          "docker_disk": docker_disk,
240          "large_containers": large_containers[:5],
241          "log_errors": {k: {"count": v["error_count"]} for k, v in log_errors.items()},
242          "device_inventory": inventory,
243      }
244  
245      print(f"\nResult: severity={severity}, {len(alerts)} alerts")
246      return result
247  
248  
249  async def main():
250      """Main agent loop."""
251      nc = await nats.connect(NATS_URL)
252      js = nc.jetstream()
253      print(f"System Sentinel connected to NATS at {NATS_URL}")
254  
255      trigger_subject = f"bob.agent.{ROLE}.trigger"
256  
257      async def handle_trigger(msg):
258          try:
259              trigger_data = json.loads(msg.data.decode())
260              print(f"\nReceived trigger: {trigger_data.get('run_id', 'unknown')}")
261  
262              started = {
263                  "role": ROLE,
264                  "run_id": trigger_data.get("run_id"),
265                  "started_at": datetime.now(timezone.utc).isoformat(),
266              }
267              await js.publish(f"bob.agent.{ROLE}.started", json.dumps(started).encode())
268  
269              result = await run_sentinel(trigger_data)
270              await js.publish(f"bob.agent.{ROLE}.result", json.dumps(result).encode())
271  
272              if result["alerts"]:
273                  alert = {
274                      "role": ROLE,
275                      "run_id": result["run_id"],
276                      "severity": result["severity"],
277                      "alerts": result["alerts"],
278                      "timestamp": datetime.now(timezone.utc).isoformat(),
279                  }
280                  await js.publish(f"bob.agent.{ROLE}.alert", json.dumps(alert).encode())
281  
282              print(f"Sentinel result published")
283              await msg.ack()
284          except Exception as e:
285              print(f"Error: {e}", file=sys.stderr)
286              import traceback
287              traceback.print_exc()
288  
289      await js.subscribe(
290          trigger_subject,
291          cb=handle_trigger,
292          durable="system_sentinel_agent",
293          deliver_policy="new",
294          manual_ack=True,
295      )
296      print(f"Subscribed to {trigger_subject}")
297      print("Waiting for triggers...")
298  
299      try:
300          while True:
301              await asyncio.sleep(1)
302      except asyncio.CancelledError:
303          pass
304      finally:
305          await nc.close()
306  
307  
308  if __name__ == "__main__":
309      asyncio.run(main())