main.py
1 #!/usr/bin/env python3 2 """ 3 Forgejo CI Dashboard Backend 4 Aggregates data from Forgejo API and database for the dashboard. 5 """ 6 7 import asyncio 8 import json 9 import os 10 import re 11 import subprocess 12 import tempfile 13 import time 14 from contextlib import asynccontextmanager 15 from datetime import datetime, timezone 16 from pathlib import Path 17 from typing import Any 18 19 import httpx 20 import psutil 21 import psycopg2 22 from psycopg2.extras import RealDictCursor 23 from dotenv import load_dotenv 24 from fastapi import FastAPI, Request 25 from fastapi.middleware.cors import CORSMiddleware 26 from fastapi.responses import Response 27 28 load_dotenv() 29 30 # Configuration 31 FORGEJO_URL = os.getenv("FORGEJO_URL", "http://localhost:3000") 32 FORGEJO_TOKEN = os.getenv("FORGEJO_TOKEN", "") 33 FORGEJO_REPOS_PATH = os.getenv("FORGEJO_REPOS_PATH", "/var/lib/forgejo/repositories") 34 RADICLE_BIN = os.getenv("RADICLE_BIN", "/home/devops/.radicle/bin/rad") 35 CACHE_TTL = int(os.getenv("CACHE_TTL", "10")) # seconds 36 37 # Woodpecker CI Configuration 38 WOODPECKER_URL = os.getenv("WOODPECKER_URL", "https://ci.ac-dc.network") 39 WOODPECKER_TOKEN = os.getenv("WOODPECKER_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0eXBlIjoidXNlciIsInVzZXItaWQiOiIxIn0.89dRDf3ZEm4uLn8URz_axjimoSfJi2LJzcTCr4LdwaI") 40 41 # Postgres database configuration 42 DB_HOST = os.getenv("DB_HOST", "localhost") 43 DB_PORT = int(os.getenv("DB_PORT", "5432")) 44 DB_NAME = os.getenv("DB_NAME", "forgejo") 45 DB_USER = os.getenv("DB_USER", "forgejo") 46 DB_PASSWORD = os.getenv("DB_PASSWORD", "") 47 48 CI_SERVER_SSH_PORT = int(os.getenv("CI_SERVER_SSH_PORT", "2584")) # SSH port for CI server 49 # CI stages to track (customize as needed) 50 CI_STAGES = ["build", "check", "clippy", "fmt", "test", "node"] 51 52 # Remote CI server configuration 53 CI_SERVER_HOST = os.getenv("CI_SERVER_HOST", "10.106.0.3") 54 CI_SERVER_USER = os.getenv("CI_SERVER_USER", "devops") 55 CI_SERVER_SSH_TIMEOUT = int(os.getenv("CI_SERVER_SSH_TIMEOUT", "10")) 56 57 # Background refresh interval (seconds) 58 BG_REFRESH_INTERVAL = int(os.getenv("BG_REFRESH_INTERVAL", "60")) 59 60 # Path for persistent JSON cache 61 CACHE_FILE = Path(__file__).parent / "data" / "dashboard_cache.json" 62 63 # Testnet node configuration 64 TESTNET_SSH_PORT = int(os.getenv("TESTNET_SSH_PORT", "2584")) 65 TESTNET_SSH_USER = os.getenv("TESTNET_SSH_USER", "devops") 66 TESTNET_SERVERS = [ 67 {"name": "testnet001", "host": "testnet001.ac-dc.network", "role": "validator"}, 68 {"name": "testnet002", "host": "testnet002.ac-dc.network", "role": "validator"}, 69 {"name": "testnet003", "host": "testnet003.ac-dc.network", "role": "validator"}, 70 {"name": "testnet004", "host": "testnet004.ac-dc.network", "role": "validator"}, 71 {"name": "testnet005", "host": "testnet005.ac-dc.network", "role": "validator"}, 72 {"name": "testnet006", "host": "testnet006.ac-dc.network", "role": "prover"}, 73 ] 74 75 76 def get_db_connection(): 77 """Create a connection to the Forgejo Postgres database.""" 78 return psycopg2.connect( 79 host=DB_HOST, 80 port=DB_PORT, 81 dbname=DB_NAME, 82 user=DB_USER, 83 password=DB_PASSWORD 84 ) 85 86 87 async def fetch_remote_system_metrics() -> dict | None: 88 """Fetch system metrics from remote CI server via SSH using Python.""" 89 METRICS_SCRIPT_B64 = ( 90 "aW1wb3J0IGpzb24KaW1wb3J0IG9zCmltcG9ydCBzdWJwcm9jZXNzCmltcG9ydCB0aW1lCgpkZWYgZ2V0X21l" 91 "dHJpY3MoKToKICAgIGNvcmVzID0gb3MuY3B1X2NvdW50KCkKICAgIGZyZWUgPSBzdWJwcm9jZXNzLnJ1bihb" 92 "J2ZyZWUnLCAnLWInXSwgY2FwdHVyZV9vdXRwdXQ9VHJ1ZSwgdGV4dD1UcnVlKQogICAgbWVtX2xpbmUgPSBb" 93 "bCBmb3IgbCBpbiBmcmVlLnN0ZG91dC5zcGxpdCgnXG4nKSBpZiBsLnN0YXJ0c3dpdGgoJ01lbTonKV1bMF0u" 94 "c3BsaXQoKQogICAgc3dhcF9saW5lID0gW2wgZm9yIGwgaW4gZnJlZS5zdGRvdXQuc3BsaXQoJ1xuJykgaWYg" 95 "bC5zdGFydHN3aXRoKCdTd2FwOicpXVswXS5zcGxpdCgpCiAgICBtZW1fdG90YWwgPSBpbnQobWVtX2xpbmVb" 96 "MV0pCiAgICBtZW1fdXNlZCA9IGludChtZW1fbGluZVsyXSkKICAgIG1lbV9hdmFpbCA9IGludChtZW1fbGlu" 97 "ZVs2XSkgaWYgbGVuKG1lbV9saW5lKSA+IDYgZWxzZSBtZW1fdG90YWwgLSBtZW1fdXNlZAogICAgc3dhcF90" 98 "b3RhbCA9IGludChzd2FwX2xpbmVbMV0pCiAgICBzd2FwX3VzZWQgPSBpbnQoc3dhcF9saW5lWzJdKQogICAg" 99 "c3dhcF9mcmVlID0gaW50KHN3YXBfbGluZVszXSkgaWYgbGVuKHN3YXBfbGluZSkgPiAzIGVsc2UgMAogICAg" 100 "ZGYgPSBzdWJwcm9jZXNzLnJ1bihbJ2RmJywgJy1CMScsICcvJ10sIGNhcHR1cmVfb3V0cHV0PVRydWUsIHRl" 101 "eHQ9VHJ1ZSkKICAgIGRpc2tfbGluZSA9IGRmLnN0ZG91dC5zdHJpcCgpLnNwbGl0KCdcbicpWzFdLnNwbGl0" 102 "KCkKICAgIGRpc2tfdG90YWwgPSBpbnQoZGlza19saW5lWzFdKQogICAgZGlza191c2VkID0gaW50KGRpc2tf" 103 "bGluZVsyXSkKICAgIGRpc2tfYXZhaWwgPSBpbnQoZGlza19saW5lWzNdKQoKICAgICMgVXNlIC9wcm9jL3N0" 104 "YXQgZm9yIGFjY3VyYXRlIENQVSBjYWxjdWxhdGlvbiAoc2FtZSBhcyBQcm9tZXRoZXVzKQogICAgd2l0aCBv" 105 "cGVuKCcvcHJvYy9zdGF0JykgYXMgZjoKICAgICAgICBjcHUxID0gW2ludCh4KSBmb3IgeCBpbiBmLnJlYWRs" 106 "aW5lKCkuc3BsaXQoKVsxOl1dCiAgICB0aW1lLnNsZWVwKDEpCiAgICB3aXRoIG9wZW4oJy9wcm9jL3N0YXQn" 107 "KSBhcyBmOgogICAgICAgIGNwdTIgPSBbaW50KHgpIGZvciB4IGluIGYucmVhZGxpbmUoKS5zcGxpdCgpWzE6" 108 "XV0KCiAgICBkZWx0YXMgPSBbYiAtIGEgZm9yIGEsIGIgaW4gemlwKGNwdTEsIGNwdTIpXQogICAgdG90YWxf" 109 "ZGVsdGEgPSBzdW0oZGVsdGFzKQogICAgaWRsZV9kZWx0YSA9IGRlbHRhc1szXSAgIyBJbmRleCAzIGlzIGlk" 110 "bGUgdGltZQoKICAgIGNwdV9wY3QgPSByb3VuZCgxMDAgKiAoMSAtIGlkbGVfZGVsdGEgLyB0b3RhbF9kZWx0" 111 "YSksIDEpIGlmIHRvdGFsX2RlbHRhID4gMCBlbHNlIDAKCiAgICByZXR1cm4gewogICAgICAgICJjcHUiOiB7" 112 "InBlcmNlbnQiOiBjcHVfcGN0LCAiY29yZXMiOiBjb3Jlc30sCiAgICAgICAgIm1lbW9yeSI6IHsidG90YWwi" 113 "OiBtZW1fdG90YWwsICJ1c2VkIjogbWVtX3VzZWQsICJhdmFpbGFibGUiOiBtZW1fYXZhaWwsICJwZXJjZW50" 114 "IjogaW50KDEwMCptZW1fdXNlZC9tZW1fdG90YWwpfSwKICAgICAgICAiZGlzayI6IHsidG90YWwiOiBkaXNr" 115 "X3RvdGFsLCAidXNlZCI6IGRpc2tfdXNlZCwgImZyZWUiOiBkaXNrX2F2YWlsLCAicGVyY2VudCI6IGludCgx" 116 "MDAqZGlza191c2VkL2Rpc2tfdG90YWwpfSwKICAgICAgICAic3dhcCI6IHsidG90YWwiOiBzd2FwX3RvdGFs" 117 "LCAidXNlZCI6IHN3YXBfdXNlZCwgImZyZWUiOiBzd2FwX2ZyZWUsICJwZXJjZW50IjogaW50KDEwMCpzd2Fw" 118 "X3VzZWQvc3dhcF90b3RhbCkgaWYgc3dhcF90b3RhbCA+IDAgZWxzZSAwfSwKICAgICAgICAibWVtb3J5X3By" 119 "ZXNzdXJlIjogMCwKICAgICAgICAib29tX2V2ZW50cyI6IDAKICAgIH0KcHJpbnQoanNvbi5kdW1wcyhnZXRf" 120 "bWV0cmljcygpKSkK" 121 ) 122 123 try: 124 ssh_cmd = ( 125 f"ssh -p {CI_SERVER_SSH_PORT} -o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=accept-new " 126 f"{CI_SERVER_USER}@{CI_SERVER_HOST} " 127 f"'echo {METRICS_SCRIPT_B64} | base64 -d | python3'" 128 ) 129 130 proc = await asyncio.create_subprocess_shell( 131 ssh_cmd, 132 stdout=asyncio.subprocess.PIPE, 133 stderr=asyncio.subprocess.PIPE, 134 ) 135 136 try: 137 stdout, stderr = await asyncio.wait_for( 138 proc.communicate(), 139 timeout=CI_SERVER_SSH_TIMEOUT 140 ) 141 except asyncio.TimeoutError: 142 proc.kill() 143 await proc.wait() 144 print(f"SSH timeout fetching metrics from {CI_SERVER_HOST}") 145 return None 146 147 if proc.returncode != 0: 148 print(f"SSH error fetching metrics from {CI_SERVER_HOST}: {stderr.decode()}") 149 return None 150 151 return json.loads(stdout.decode().strip()) 152 153 except Exception as e: 154 print(f"Error fetching remote metrics: {e}") 155 return None 156 157 158 async def fetch_testnet_node_status(server: dict) -> dict: 159 """Fetch block heights and service status from a testnet node via SSH. 160 161 Pipes the Python script via stdin to avoid shell quoting issues with 162 braces and quotes in the remote command. 163 164 REST API notes (discovered 2026-02-20): 165 - Routes are under /v1/testnet/ for both alphaos and deltaos 166 - JWT token is stored in the ledger dir: /.ledger-13-{N}/jwt_secret_*.txt (alpha) 167 and /.ledger-delta-21-{N}/jwt_secret_*.txt (delta) 168 - Block height: GET /v1/testnet/block/height/latest -> integer 169 - Sync status: GET /v1/testnet/sync/status -> is_synced, ledger_height, sync_mode 170 - Processes may run outside systemd (started manually), detect via pgrep 171 - testnet006 alphaos prover uses REST port 3035 (process uses --rest 0.0.0.0:3035) 172 """ 173 # Script is sent via stdin to `python3` -- no shell quoting needed 174 # Using str.encode() to avoid byte-literal unicode issues 175 python_script = ( 176 "import json, subprocess, urllib.request, glob, socket\n" 177 "result = {'alpha': {}, 'delta': {}}\n" 178 "for svcs, proc, key in [(['alphaos-validator', 'alphaos'], 'alphaos', 'alpha'), (['deltaos-validator', 'deltaos'], 'deltaos', 'delta')]:\n" 179 " svc_status = 'unknown'\n" 180 " for svc in svcs:\n" 181 " p = subprocess.run(['systemctl', 'is-active', svc], capture_output=True, text=True)\n" 182 " s = p.stdout.strip()\n" 183 " if s == 'active':\n" 184 " svc_status = 'active'\n" 185 " break\n" 186 " if svc_status != 'active':\n" 187 " pcheck = subprocess.run(['pgrep', '-c', proc], capture_output=True, text=True)\n" 188 " if pcheck.returncode == 0 and int(pcheck.stdout.strip() or '0') > 0:\n" 189 " svc_status = 'running-nosystemd'\n" 190 " result[key]['service'] = svc_status\n" 191 "hostname = socket.gethostname()\n" 192 "alpha_port = 3035 if 'testnet006' in hostname else 3030\n" 193 "delta_port = 3031\n" 194 "def find_jwt(pattern):\n" 195 " matches = glob.glob(pattern)\n" 196 " if matches:\n" 197 " try:\n" 198 " with open(matches[0]) as f: return f.read().strip()\n" 199 " except Exception: pass\n" 200 " return None\n" 201 "alpha_jwt = find_jwt('/.ledger-13-*/jwt_secret_*.txt')\n" 202 "delta_jwt = find_jwt('/.ledger-delta-21-*/jwt_secret_*.txt')\n" 203 "for chain, port, jwt in [('alpha', alpha_port, alpha_jwt), ('delta', delta_port, delta_jwt)]:\n" 204 " try:\n" 205 " headers = {}\n" 206 " if jwt: headers['Authorization'] = 'Bearer ' + jwt\n" 207 " req = urllib.request.Request(\n" 208 " 'http://localhost:' + str(port) + '/v1/testnet/sync/status',\n" 209 " headers=headers)\n" 210 " with urllib.request.urlopen(req, timeout=3) as resp:\n" 211 " d = json.loads(resp.read())\n" 212 " result[chain]['height'] = d.get('ledger_height')\n" 213 " result[chain]['synced'] = d.get('is_synced', False)\n" 214 " result[chain]['sync_mode'] = d.get('sync_mode', 'unknown')\n" 215 " except Exception as e:\n" 216 " result[chain]['height'] = None\n" 217 " result[chain]['synced'] = False\n" 218 " result[chain]['sync_error'] = str(e)\n" 219 "print(json.dumps(result))\n" 220 ).encode("utf-8") 221 try: 222 proc = await asyncio.create_subprocess_exec( 223 "ssh", 224 "-p", str(TESTNET_SSH_PORT), 225 "-o", "ConnectTimeout=5", 226 "-o", "BatchMode=yes", 227 "-o", "StrictHostKeyChecking=accept-new", 228 f"{TESTNET_SSH_USER}@{server['host']}", 229 "python3", 230 stdin=asyncio.subprocess.PIPE, 231 stdout=asyncio.subprocess.PIPE, 232 stderr=asyncio.subprocess.PIPE, 233 ) 234 stdout, _ = await asyncio.wait_for(proc.communicate(input=python_script), timeout=12) 235 data = json.loads(stdout.decode().strip()) 236 return { 237 "name": server["name"], 238 "role": server.get("role", "validator"), 239 "reachable": True, 240 "alpha_height": data.get("alpha", {}).get("height"), 241 "alpha_service": data.get("alpha", {}).get("service", "unknown"), 242 "alpha_synced": data.get("alpha", {}).get("synced", False), 243 "delta_height": data.get("delta", {}).get("height"), 244 "delta_service": data.get("delta", {}).get("service", "unknown"), 245 "delta_synced": data.get("delta", {}).get("synced", False), 246 } 247 except Exception as e: 248 print(f"Testnet {server['name']} fetch failed: {e}") 249 return { 250 "name": server["name"], 251 "role": server.get("role", "validator"), 252 "reachable": False, 253 "alpha_height": None, 254 "alpha_service": "offline", 255 "alpha_synced": False, 256 "delta_height": None, 257 "delta_service": "offline", 258 "delta_synced": False, 259 } 260 261 262 async def fetch_all_testnet_nodes() -> list[dict]: 263 """Fetch status from all testnet nodes in parallel.""" 264 tasks = [fetch_testnet_node_status(s) for s in TESTNET_SERVERS] 265 return list(await asyncio.gather(*tasks)) 266 267 268 class DashboardStore: 269 """Persistent store for dashboard data. 270 271 Stores data in memory AND persists to a JSON file atomically. 272 On startup, loads from disk if a cached file exists. 273 API endpoints read from this store (instant, never blocks). 274 """ 275 276 def __init__(self, cache_file: Path): 277 self._cache_file = cache_file 278 self._data: dict[str, Any] = {} 279 self._last_updated: float | None = None 280 cache_file.parent.mkdir(parents=True, exist_ok=True) 281 282 def load_from_disk(self, max_age_seconds: int = 300) -> bool: 283 """Load data from the persistent JSON file on startup.""" 284 try: 285 if self._cache_file.exists(): 286 with open(self._cache_file) as f: 287 saved = json.load(f) 288 last_updated = saved.get("last_updated") 289 if last_updated and time.time() - last_updated > max_age_seconds: 290 print(f"DashboardStore: on-disk cache is stale (>{max_age_seconds}s), discarding") 291 return False 292 self._data = saved.get("data", {}) 293 self._last_updated = last_updated 294 print(f"DashboardStore: loaded from disk ({len(self._data)} keys)") 295 return True 296 except Exception as e: 297 print(f"DashboardStore: failed to load from disk: {e}") 298 return False 299 300 def get(self, key: str) -> Any | None: 301 """Get a value from the store.""" 302 return self._data.get(key) 303 304 def set(self, key: str, value: Any) -> None: 305 """Set a value in memory.""" 306 self._data[key] = value 307 308 def persist_all(self) -> None: 309 """Write all current data to disk atomically.""" 310 self._persist() 311 312 def set_last_updated(self) -> None: 313 """Record the current time as the last-updated timestamp.""" 314 self._last_updated = time.time() 315 316 def get_last_updated(self) -> float | None: 317 return self._last_updated 318 319 def get_data_age_seconds(self) -> float | None: 320 if self._last_updated is None: 321 return None 322 return time.time() - self._last_updated 323 324 def _persist(self) -> None: 325 """Atomically write data to the cache file.""" 326 try: 327 payload = { 328 "last_updated": self._last_updated, 329 "data": self._data, 330 } 331 tmp = tempfile.NamedTemporaryFile( 332 mode="w", 333 dir=self._cache_file.parent, 334 delete=False, 335 suffix=".tmp" 336 ) 337 json.dump(payload, tmp) 338 tmp.close() 339 Path(tmp.name).rename(self._cache_file) 340 except Exception as e: 341 print(f"DashboardStore: failed to persist to disk: {e}") 342 343 344 # Global persistent store 345 store = DashboardStore(CACHE_FILE) 346 347 348 class Cache: 349 """Simple in-memory cache with TTL for fast-changing pipeline endpoints.""" 350 351 def __init__(self, ttl: int = 10): 352 self.ttl = ttl 353 self._cache: dict[str, tuple[float, Any]] = {} 354 355 def get(self, key: str) -> Any | None: 356 if key in self._cache: 357 timestamp, value = self._cache[key] 358 if time.time() - timestamp < self.ttl: 359 return value 360 del self._cache[key] 361 return None 362 363 def set(self, key: str, value: Any) -> None: 364 self._cache[key] = (time.time(), value) 365 366 def clear(self) -> None: 367 self._cache.clear() 368 369 370 cache = Cache(ttl=CACHE_TTL) 371 372 373 def load_radicle_config() -> dict[str, str | None]: 374 """Load repo-to-RID mappings from config file.""" 375 config_path = Path(__file__).parent / "radicle-repos.json" 376 try: 377 if config_path.exists(): 378 with open(config_path) as f: 379 data = json.load(f) 380 return {k.lower(): v for k, v in data.get("repos", {}).items()} 381 except Exception as e: 382 print(f"Error loading radicle config: {e}") 383 return {} 384 385 386 def get_unmerged_branches_count(owner: str, repo: str) -> int: 387 """Get count of unmerged branches for a repository.""" 388 try: 389 headers = {} 390 if FORGEJO_TOKEN: 391 headers["Authorization"] = f"token {FORGEJO_TOKEN}" 392 393 url = f"{FORGEJO_URL}/api/v1/repos/{owner}/{repo}/branches?limit=100" 394 with httpx.Client(timeout=5.0) as client: 395 response = client.get(url, headers=headers) 396 if response.status_code == 200: 397 branches = response.json() 398 unmerged = [b for b in branches if b.get("name", "") not in ["main", "master"]] 399 return len(unmerged) 400 return 0 401 except Exception as e: 402 print(f"Error getting unmerged branches for {owner}/{repo}: {e}") 403 return 0 404 405 406 def get_radicle_repos() -> dict[str, str | None]: 407 """Get radicle repos with their HEAD commits from each repo's git remote ref. 408 409 Reads .git/refs/remotes/rad/main in each working repo directory. 410 This is accurate and fast (local file read, no network), and is updated 411 whenever git push rad main succeeds. 412 """ 413 try: 414 radicle_config = load_radicle_config() 415 if not radicle_config: 416 return {} 417 418 repos_root = Path(os.getenv("REPOS_ROOT", "/home/devops/working-repos")) 419 repos = {} 420 421 for name in radicle_config: 422 repo_dir = repos_root / name 423 ref_path = repo_dir / ".git" / "refs" / "remotes" / "rad" / "main" 424 if ref_path.exists(): 425 try: 426 head = ref_path.read_text().strip()[:7] 427 if head and len(head) >= 7: 428 repos[name.lower()] = head 429 except Exception as e: 430 print(f"Error reading rad ref for {name}: {e}") 431 else: 432 # Fallback: try packed-refs 433 packed = repo_dir / ".git" / "packed-refs" 434 if packed.exists(): 435 for line in packed.read_text().splitlines(): 436 if line.endswith(" refs/remotes/rad/main"): 437 repos[name.lower()] = line.split()[0][:7] 438 break 439 440 return repos 441 except Exception as e: 442 print(f"Error getting radicle repos: {e}") 443 return {} 444 445 446 def get_forgejo_repo_head(owner: str, repo: str) -> str | None: 447 """Get the HEAD commit hash for a Forgejo repository.""" 448 try: 449 headers = {} 450 if FORGEJO_TOKEN: 451 headers["Authorization"] = f"token {FORGEJO_TOKEN}" 452 453 for branch in ["main", "master"]: 454 try: 455 url = f"{FORGEJO_URL}/api/v1/repos/{owner}/{repo}/branches/{branch}" 456 with httpx.Client(timeout=5.0) as client: 457 response = client.get(url, headers=headers) 458 if response.status_code == 200: 459 data = response.json() 460 commit_sha = data.get("commit", {}).get("id", "") 461 if commit_sha: 462 return commit_sha[:7] 463 except Exception: 464 continue 465 except Exception as e: 466 print(f"Forgejo API query failed for {owner}/{repo}, trying local files: {e}") 467 468 try: 469 repo_path = Path(FORGEJO_REPOS_PATH) / owner / f"{repo}.git" 470 if not repo_path.exists(): 471 return None 472 473 def read_packed_ref(ref_name: str) -> str | None: 474 packed_refs_path = repo_path / "packed-refs" 475 if not packed_refs_path.exists(): 476 return None 477 try: 478 for line in packed_refs_path.read_text().splitlines(): 479 line = line.strip() 480 if not line or line.startswith("#") or line.startswith("^"): 481 continue 482 parts = line.split() 483 if len(parts) >= 2 and parts[1] == ref_name: 484 return parts[0][:7] 485 except Exception: 486 pass 487 return None 488 489 for branch in ["main", "master"]: 490 ref_path = repo_path / "refs" / "heads" / branch 491 if ref_path.exists(): 492 return ref_path.read_text().strip()[:7] 493 packed_head = read_packed_ref(f"refs/heads/{branch}") 494 if packed_head: 495 return packed_head 496 497 head_path = repo_path / "HEAD" 498 if head_path.exists(): 499 head_content = head_path.read_text().strip() 500 if head_content.startswith("ref:"): 501 ref = head_content.split("ref:")[1].strip() 502 ref_path = repo_path / ref 503 if ref_path.exists(): 504 return ref_path.read_text().strip()[:7] 505 packed_head = read_packed_ref(ref) 506 if packed_head: 507 return packed_head 508 else: 509 return head_content[:7] 510 return None 511 except Exception as e: 512 print(f"Error getting Forgejo repo HEAD for {owner}/{repo}: {e}") 513 return None 514 515 516 async def fetch_forgejo_api(path: str, client: httpx.AsyncClient) -> Any: 517 """Fetch data from Forgejo API.""" 518 headers = {} 519 if FORGEJO_TOKEN: 520 headers["Authorization"] = f"token {FORGEJO_TOKEN}" 521 522 url = f"{FORGEJO_URL}{path}" 523 response = await client.get(url, headers=headers, timeout=30.0) 524 response.raise_for_status() 525 return response.json() 526 527 528 async def fetch_woodpecker_api(path: str, client: httpx.AsyncClient) -> Any: 529 """Fetch data from Woodpecker CI API.""" 530 headers = {"Authorization": f"Bearer {WOODPECKER_TOKEN}"} 531 url = f"{WOODPECKER_URL}{path}" 532 try: 533 response = await client.get(url, headers=headers, timeout=10.0) 534 response.raise_for_status() 535 return response.json() 536 except Exception as e: 537 print(f"Woodpecker API error for {path}: {e}") 538 return None 539 540 541 def query_runners_from_db() -> list[dict]: 542 """Query runner information directly from Forgejo Postgres database.""" 543 try: 544 conn = get_db_connection() 545 cursor = conn.cursor(cursor_factory=RealDictCursor) 546 547 cursor.execute(""" 548 SELECT 549 id, name, uuid, version, agent_labels, 550 last_online, last_active, owner_id, repo_id 551 FROM action_runner 552 WHERE deleted IS NULL OR deleted = 0 553 ORDER BY last_online DESC 554 """) 555 556 runners = [] 557 now = time.time() 558 for row in cursor.fetchall(): 559 last_online = row["last_online"] or 0 560 last_active = row["last_active"] or 0 561 is_online = (now - last_online) < 60 562 563 labels = [] 564 if row["agent_labels"]: 565 try: 566 labels = json.loads(row["agent_labels"]) 567 except json.JSONDecodeError: 568 labels = [] 569 570 runners.append({ 571 "id": row["id"], 572 "name": row["name"], 573 "uuid": row["uuid"], 574 "version": row["version"], 575 "labels": labels, 576 "status": "online" if is_online else "offline", 577 "last_online": datetime.fromtimestamp(last_online, tz=timezone.utc).isoformat() if last_online else None, 578 "last_active": datetime.fromtimestamp(last_active, tz=timezone.utc).isoformat() if last_active else None, 579 "current_job": None, 580 }) 581 582 conn.close() 583 return runners 584 except Exception as e: 585 print(f"Error querying runners from DB: {e}") 586 return [] 587 588 589 def query_running_jobs_from_db() -> dict[int, dict]: 590 """Query currently running jobs to associate with runners.""" 591 try: 592 conn = get_db_connection() 593 cursor = conn.cursor(cursor_factory=RealDictCursor) 594 595 now = int(time.time()) 596 max_age = now - (2 * 3600) 597 598 cursor.execute(""" 599 SELECT 600 t.id, t.job_id, t.runner_id, t.status, t.started, 601 j.name as job_name, j.run_id, 602 r.title as run_title, r.repo_id, 603 repo.name as repo_name 604 FROM action_task t 605 LEFT JOIN action_run_job j ON t.job_id = j.id 606 LEFT JOIN action_run r ON j.run_id = r.id 607 LEFT JOIN repository repo ON r.repo_id = repo.id 608 WHERE t.status = 2 AND t.started > ? 609 ORDER BY t.started DESC 610 """, (max_age,)) 611 612 jobs_by_runner = {} 613 for row in cursor.fetchall(): 614 runner_id = row["runner_id"] 615 if runner_id and runner_id not in jobs_by_runner: 616 jobs_by_runner[runner_id] = { 617 "id": row["id"], 618 "name": row["job_name"] or "Unknown", 619 "run_title": row["run_title"], 620 "repo": row["repo_name"], 621 "started": datetime.fromtimestamp(row["started"], tz=timezone.utc).isoformat() if row["started"] else None, 622 } 623 624 conn.close() 625 return jobs_by_runner 626 except Exception as e: 627 print(f"Error querying running jobs from DB: {e}") 628 return {} 629 630 631 def query_queue_from_db() -> list[dict]: 632 """Query active workflow runs and return repo-level progress summary.""" 633 try: 634 conn = get_db_connection() 635 cursor = conn.cursor(cursor_factory=RealDictCursor) 636 637 now = int(time.time()) 638 max_age = now - (30 * 60) 639 640 cursor.execute(""" 641 SELECT 642 r.id as run_id, r.status as run_status, r.updated, 643 repo.name as repo_name 644 FROM action_run r 645 JOIN repository repo ON r.repo_id = repo.id 646 WHERE r.status IN (1, 2) AND r.updated > ? 647 ORDER BY r.updated DESC 648 """, (max_age,)) 649 650 runs = cursor.fetchall() 651 if not runs: 652 conn.close() 653 return [] 654 655 queue_summary = [] 656 seen_repos = set() 657 658 for run in runs: 659 repo_name = run["repo_name"] 660 if repo_name in seen_repos: 661 continue 662 seen_repos.add(repo_name) 663 664 cursor.execute(""" 665 SELECT status, COUNT(*) as count 666 FROM action_run_job 667 WHERE run_id = ? 668 GROUP BY status 669 """, (run["run_id"],)) 670 671 status_counts = {row["status"]: row["count"] for row in cursor.fetchall()} 672 673 done = sum(status_counts.get(s, 0) for s in [3, 4, 5, 6]) 674 total = sum(status_counts.values()) 675 running = status_counts.get(2, 0) 676 pending = status_counts.get(1, 0) + status_counts.get(0, 0) 677 678 if running > 0: 679 status = "running" 680 elif pending > 0: 681 status = "pending" 682 else: 683 status = "done" 684 685 queue_summary.append({ 686 "repo": repo_name, 687 "done": done, 688 "total": total, 689 "running": running, 690 "status": status, 691 }) 692 693 conn.close() 694 return queue_summary 695 except Exception as e: 696 print(f"Error querying queue from DB: {e}") 697 return [] 698 699 700 def query_history_from_db(limit: int = 30) -> list[dict]: 701 """Query recent job history from database.""" 702 try: 703 conn = get_db_connection() 704 cursor = conn.cursor(cursor_factory=RealDictCursor) 705 706 status_map = { 707 0: "unknown", 1: "queued", 2: "running", 708 3: "success", 4: "failure", 5: "cancelled", 6: "skipped" 709 } 710 711 history = [] 712 713 cursor.execute(""" 714 SELECT 715 j.id, j.name, j.status, j.started, j.stopped, j.updated, 716 r.title, r.ref as branch, r.event, 717 repo.name as repo_name, repo.owner_name 718 FROM action_run_job j 719 JOIN action_run r ON j.run_id = r.id 720 JOIN repository repo ON r.repo_id = repo.id 721 WHERE j.status = 2 AND (j.stopped IS NULL OR j.stopped = 0) 722 ORDER BY CASE WHEN j.started > 0 THEN j.started ELSE j.updated END DESC 723 LIMIT ? 724 """, (limit,)) 725 726 for row in cursor.fetchall(): 727 started = row["started"] or 0 728 branch = row["branch"] or "" 729 branch = branch.replace("refs/heads/", "").replace("refs/tags/", "") 730 731 history.append({ 732 "id": row["id"], 733 "job": row["name"], 734 "repo": f"{row['owner_name']}/{row['repo_name']}", 735 "branch": branch, 736 "result": status_map.get(row["status"], "unknown"), 737 "duration": 0, 738 "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None, 739 "finished_at": None, 740 }) 741 742 remaining_limit = limit - len(history) 743 if remaining_limit > 0: 744 cursor.execute(""" 745 SELECT 746 j.id, j.name, j.status, j.started, j.stopped, j.updated, 747 r.title, r.ref as branch, r.event, 748 repo.name as repo_name, repo.owner_name 749 FROM action_run_job j 750 JOIN action_run r ON j.run_id = r.id 751 JOIN repository repo ON r.repo_id = repo.id 752 WHERE j.status IN (3, 4, 5, 6) OR (j.stopped > 0 AND j.started > 0) 753 ORDER BY CASE WHEN j.stopped > 0 THEN j.stopped ELSE j.updated END DESC 754 LIMIT ? 755 """, (remaining_limit,)) 756 757 for row in cursor.fetchall(): 758 started = row["started"] or 0 759 stopped = row["stopped"] if row["stopped"] else row["updated"] 760 duration = stopped - started if stopped and started else 0 761 762 branch = row["branch"] or "" 763 branch = branch.replace("refs/heads/", "").replace("refs/tags/", "") 764 765 status = row["status"] 766 if status in (1, 2) and stopped > 0: 767 result = "success" 768 else: 769 result = status_map.get(status, "unknown") 770 771 history.append({ 772 "id": row["id"], 773 "job": row["name"], 774 "repo": f"{row['owner_name']}/{row['repo_name']}", 775 "branch": branch, 776 "result": result, 777 "duration": duration, 778 "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None, 779 "finished_at": datetime.fromtimestamp(stopped, tz=timezone.utc).isoformat() if stopped else None, 780 }) 781 782 conn.close() 783 return history 784 except Exception as e: 785 print(f"Error querying history from DB: {e}") 786 return [] 787 788 789 def query_repo_status_from_api() -> list[dict]: 790 """Query repos via Forgejo API when database is not accessible.""" 791 headers = {"Authorization": f"token {FORGEJO_TOKEN}"} if FORGEJO_TOKEN else {} 792 793 radicle_repos = get_radicle_repos() 794 radicle_config = load_radicle_config() 795 796 try: 797 with httpx.Client(timeout=30.0) as client: 798 response = client.get( 799 f"{FORGEJO_URL}/api/v1/orgs/alpha-delta-network/repos?limit=100", 800 headers=headers 801 ) 802 response.raise_for_status() 803 all_repos = response.json() 804 805 repos = [] 806 for repo_data in all_repos: 807 repo_name = repo_data["name"] 808 repo_name_lower = repo_name.lower() 809 default_branch = repo_data.get("default_branch", "main") 810 811 ci_status = None 812 try: 813 ci_response = client.get( 814 f"{FORGEJO_URL}/api/v1/repos/alpha-delta-network/{repo_name}/actions/runs?limit=1", 815 headers=headers, 816 timeout=5.0 817 ) 818 if ci_response.status_code == 200: 819 ci_data = ci_response.json() 820 runs = ci_data.get("workflow_runs", []) 821 if runs: 822 run = runs[0] 823 ci_status = run.get("conclusion") or run.get("status") 824 if isinstance(ci_status, int): 825 status_map = {1: "pending", 2: "running", 3: "success", 4: "failure", 5: "cancelled", 6: "skipped"} 826 ci_status = status_map.get(ci_status, "unknown") 827 except Exception as e: 828 print(f"CI status error for {repo_name}: {e}") 829 830 forgejo_head = None 831 try: 832 branch_response = client.get( 833 f"{FORGEJO_URL}/api/v1/repos/alpha-delta-network/{repo_name}/branches/{default_branch}", 834 headers=headers, 835 timeout=5.0 836 ) 837 if branch_response.status_code == 200: 838 branch_data = branch_response.json() 839 forgejo_head = branch_data.get("commit", {}).get("id", "")[:7] 840 except Exception as e: 841 print(f"Branch HEAD error for {repo_name}: {e}") 842 843 radicle_head = radicle_repos.get(repo_name_lower) 844 rid = radicle_config.get(repo_name_lower) 845 846 if radicle_head is None and rid: 847 sync_status = "unsynced" 848 radicle_synced = False 849 radicle_head = "?" 850 elif radicle_head is None and rid is None: 851 sync_status = "no-radicle" 852 radicle_synced = None 853 elif forgejo_head and radicle_head and forgejo_head == radicle_head: 854 sync_status = "synced" 855 radicle_synced = True 856 else: 857 sync_status = "unsynced" 858 radicle_synced = False 859 860 repos.append({ 861 "name": repo_name, 862 "owner_name": "alpha-delta-network", 863 "forgejo_branch": default_branch, 864 "forgejo_ci_status": ci_status, 865 "radicle_synced": radicle_synced, 866 "forgejo_head": forgejo_head, 867 "radicle_head": radicle_head, 868 "sync_status": sync_status, 869 "rid": rid, 870 }) 871 872 return repos 873 except Exception as e: 874 print(f"Error querying repos via API: {e}") 875 return [] 876 877 878 def query_repo_status_from_db() -> list[dict]: 879 """Query all repositories with their radicle sync status and CI status.""" 880 try: 881 conn = get_db_connection() 882 cursor = conn.cursor(cursor_factory=RealDictCursor) 883 884 radicle_repos = get_radicle_repos() 885 radicle_config = load_radicle_config() 886 887 cursor.execute(""" 888 SELECT DISTINCT 889 repo.id, repo.name, repo.owner_name, repo.default_branch 890 FROM repository repo 891 ORDER BY repo.owner_name, repo.name 892 """) 893 894 repos = [] 895 896 for repo_row in cursor.fetchall(): 897 repo_name = repo_row["name"] 898 owner_name = repo_row["owner_name"] 899 repo_name_lower = repo_name.lower() 900 default_branch = repo_row["default_branch"] or "main" 901 902 forgejo_head = get_forgejo_repo_head(owner_name, repo_name) 903 904 radicle_head = radicle_repos.get(repo_name_lower) 905 rid = radicle_config.get(repo_name_lower) 906 907 if radicle_head is None and rid: 908 sync_status = "unsynced" 909 radicle_synced = False 910 radicle_head = "?" 911 elif radicle_head is None and rid is None: 912 sync_status = "no-radicle" 913 radicle_synced = None 914 elif forgejo_head and radicle_head and forgejo_head == radicle_head: 915 sync_status = "synced" 916 radicle_synced = True 917 else: 918 sync_status = "unsynced" 919 radicle_synced = False 920 921 ci_status = None 922 try: 923 cursor.execute(""" 924 SELECT conclusion, status, created_unix 925 FROM action_run 926 WHERE repo_id = %s AND created_unix IS NOT NULL 927 ORDER BY created_unix DESC 928 LIMIT 1 929 """, (repo_row["id"],)) 930 ci_row = cursor.fetchone() 931 if ci_row: 932 status_map = {1: "pending", 2: "running", 3: "success", 4: "failure", 5: "cancelled", 6: "skipped"} 933 ci_status = status_map.get(ci_row["status"], "unknown") 934 if ci_row["conclusion"]: 935 conclusion_map = {1: "success", 2: "failure", 3: "cancelled", 4: "skipped"} 936 ci_status = conclusion_map.get(ci_row["conclusion"], ci_status) 937 except Exception: 938 pass 939 940 repos.append({ 941 "name": repo_name, 942 "owner_name": owner_name, 943 "sync_status": sync_status, 944 "forgejo_head": forgejo_head, 945 "radicle_head": radicle_head, 946 "rid": rid, 947 "forgejo_branch": default_branch, 948 "radicle_synced": radicle_synced, 949 "forgejo_ci_status": ci_status, 950 }) 951 952 conn.close() 953 return repos 954 except Exception as e: 955 print(f"Error querying repo status from DB: {e}, falling back to API") 956 return query_repo_status_from_api() 957 958 959 async def get_woodpecker_repo_status() -> list[dict]: 960 """Get CI status for all repos from Woodpecker CI using direct API (no CLI needed).""" 961 try: 962 async with httpx.AsyncClient(timeout=30.0) as client: 963 repos_data = await fetch_woodpecker_api("/api/repos", client) 964 if not repos_data: 965 return [] 966 967 repos = [] 968 for repo in repos_data: 969 full_name = repo.get("full_name", "") 970 repo_id = repo.get("id") 971 parts = full_name.split("/") 972 if len(parts) != 2 or not repo_id: 973 continue 974 owner, name = parts 975 976 ci_status = None 977 try: 978 pipelines = await fetch_woodpecker_api(f"/api/repos/{repo_id}/pipelines?page=1&perPage=1", client) 979 if pipelines and len(pipelines) > 0: 980 ci_status = pipelines[0].get("status", "unknown") 981 except Exception as e: 982 print(f"Error getting pipeline status for {full_name}: {e}") 983 984 repos.append({ 985 "name": name, 986 "full_name": full_name, 987 "ci_status": ci_status, 988 "active": repo.get("is_active", False), 989 }) 990 991 return repos 992 except Exception as e: 993 print(f"Error getting Woodpecker repo status: {e}") 994 return [] 995 996 997 async def fetch_all_slow_data() -> dict: 998 """Run all the slow fetches and return a dict of results. 999 1000 Called from background_refresh(), never from API endpoints. 1001 """ 1002 results = {} 1003 1004 # Fetch runners (SSH + DB) 1005 try: 1006 watcher_runners = {} 1007 try: 1008 proc = await asyncio.create_subprocess_exec( 1009 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 1010 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1011 "cat", "/var/www/health/runners.json", 1012 stdout=asyncio.subprocess.PIPE, 1013 stderr=asyncio.subprocess.PIPE, 1014 ) 1015 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 1016 for r in json.loads(stdout.decode()): 1017 num = r["name"].replace("runner-", "") 1018 watcher_runners[f"native-runner-{num}"] = r 1019 except Exception as e: 1020 print(f"Background: error fetching watcher runners: {e}") 1021 1022 runners = query_runners_from_db() 1023 running_jobs = query_running_jobs_from_db() 1024 1025 for runner in runners: 1026 name = runner["name"] 1027 if name in watcher_runners: 1028 wr = watcher_runners[name] 1029 runner["status"] = wr["status"] 1030 if wr["task"]: 1031 runner_id = runner["id"] 1032 if runner_id in running_jobs: 1033 runner["current_job"] = running_jobs[runner_id] 1034 else: 1035 runner["current_job"] = { 1036 "name": f"Task {wr['task']}", 1037 "repo": wr["repo"], 1038 } 1039 else: 1040 runner["current_job"] = None 1041 elif runner["status"] == "online": 1042 runner["status"] = "idle" 1043 1044 results["runners"] = {"runners": runners} 1045 except Exception as e: 1046 print(f"Background: runners fetch failed: {e}") 1047 1048 # Fetch queue (DB) 1049 try: 1050 queue = query_queue_from_db() 1051 results["queue"] = {"queue": queue} 1052 except Exception as e: 1053 print(f"Background: queue fetch failed: {e}") 1054 1055 # Fetch history (DB) 1056 try: 1057 history = query_history_from_db(limit=30) 1058 results["history_30"] = {"jobs": history} 1059 except Exception as e: 1060 print(f"Background: history fetch failed: {e}") 1061 1062 # Fetch repos (DB + Woodpecker + Forgejo API) - slowest part 1063 try: 1064 repos_forgejo = query_repo_status_from_db() 1065 repos_woodpecker = await get_woodpecker_repo_status() 1066 woodpecker_by_name = {r["name"]: r for r in repos_woodpecker} 1067 1068 for repo in repos_forgejo: 1069 repo_name = repo["name"] 1070 owner_name = repo.get("owner_name", "alpha-delta-network") 1071 repo["unmerged_branches"] = get_unmerged_branches_count(owner_name, repo_name) 1072 1073 if repo_name in woodpecker_by_name: 1074 wp_repo = woodpecker_by_name[repo_name] 1075 repo["ci_status"] = wp_repo["ci_status"] 1076 repo["ci_source"] = "woodpecker" 1077 else: 1078 repo["ci_status"] = None 1079 repo["ci_source"] = "none" 1080 1081 results["repos"] = {"repos": repos_forgejo} 1082 except Exception as e: 1083 print(f"Background: repos fetch failed: {e}") 1084 1085 # Compute stats from already-fetched data 1086 try: 1087 runners_list = results.get("runners", {}).get("runners", []) 1088 queue_list = results.get("queue", {}).get("queue", []) 1089 history_list = results.get("history_30", {}).get("jobs", []) 1090 1091 online_runners = sum(1 for r in runners_list if r["status"] in ("online", "running")) 1092 busy_runners = sum(1 for r in runners_list if r["status"] == "running") 1093 success_count = sum(1 for j in history_list if j["result"] == "success") 1094 failure_count = sum(1 for j in history_list if j["result"] == "failure") 1095 total_completed = len(history_list) 1096 1097 results["stats"] = { 1098 "runners": { 1099 "total": len(runners_list), 1100 "online": online_runners, 1101 "busy": busy_runners, 1102 }, 1103 "queue": {"length": len(queue_list)}, 1104 "recent": { 1105 "total": total_completed, 1106 "success": success_count, 1107 "failure": failure_count, 1108 "success_rate": round(success_count / total_completed * 100, 1) if total_completed > 0 else 0, 1109 } 1110 } 1111 except Exception as e: 1112 print(f"Background: stats computation failed: {e}") 1113 1114 # Fetch system metrics 1115 try: 1116 cores = psutil.cpu_count() 1117 load_1m = psutil.getloadavg()[0] 1118 cpu_percent = min(100, int(load_1m / cores * 100)) 1119 memory = psutil.virtual_memory() 1120 disk = psutil.disk_usage("/") 1121 swap = psutil.swap_memory() 1122 1123 local_metrics = { 1124 "name": "source", 1125 "cpu": {"percent": cpu_percent, "cores": cores}, 1126 "memory": { 1127 "total": memory.total, "used": memory.used, 1128 "available": memory.available, "percent": memory.percent, 1129 }, 1130 "disk": { 1131 "total": disk.total, "used": disk.used, 1132 "free": disk.free, "percent": disk.percent, 1133 }, 1134 "swap": { 1135 "total": swap.total, "used": swap.used, 1136 "free": swap.free, "percent": swap.percent, 1137 }, 1138 "memory_pressure": 0, 1139 "oom_events": 0, 1140 } 1141 1142 ci_metrics = await fetch_remote_system_metrics() 1143 if ci_metrics: 1144 ci_metrics["name"] = "ci" 1145 1146 results["system"] = {"local": local_metrics, "ci": ci_metrics} 1147 except Exception as e: 1148 print(f"Background: system fetch failed: {e}") 1149 1150 # Fetch Woodpecker queue 1151 try: 1152 async with httpx.AsyncClient() as client: 1153 queue_data = await fetch_woodpecker_api("/api/queue/info", client) 1154 if queue_data: 1155 running = [] 1156 for item in queue_data.get("running", []): 1157 full_name = item.get("full_name", "") 1158 repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown") 1159 running.append({ 1160 "repo": repo_name, 1161 "pipeline_id": item.get("id"), 1162 "agent": item.get("agent", ""), 1163 }) 1164 1165 pending = [] 1166 for item in queue_data.get("pending", []): 1167 full_name = item.get("full_name", "") 1168 repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown") 1169 pending.append({ 1170 "repo": repo_name, 1171 "pipeline_id": item.get("id"), 1172 "waiting_for": "agent", 1173 }) 1174 1175 results["woodpecker_queue"] = { 1176 "running": running, 1177 "pending": pending, 1178 "stats": { 1179 "running_count": len(running), 1180 "pending_count": len(pending), 1181 }, 1182 "paused": queue_data.get("paused", False), 1183 } 1184 except Exception as e: 1185 print(f"Background: woodpecker queue fetch failed: {e}") 1186 1187 # Fetch Woodpecker agents 1188 try: 1189 async with httpx.AsyncClient() as client: 1190 agents_data = await fetch_woodpecker_api("/api/agents", client) 1191 queue_data = await fetch_woodpecker_api("/api/queue/info", client) 1192 if agents_data: 1193 running_pipelines = {} 1194 if queue_data: 1195 for item in queue_data.get("running", []): 1196 agent_id = item.get("agent_id") 1197 if agent_id: 1198 full_name = item.get("full_name", "") 1199 repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown") 1200 running_pipelines[agent_id] = { 1201 "repo": repo_name, 1202 "pipeline_id": item.get("id"), 1203 "step": item.get("step", item.get("name", "")), 1204 } 1205 1206 now = time.time() 1207 active_threshold = 5 * 60 1208 1209 agents = [] 1210 for agent in agents_data: 1211 if agent.get("name"): 1212 agent_id = agent.get("id") 1213 last_work = agent.get("last_work", 0) 1214 time_since_work = now - last_work if last_work else float("inf") 1215 1216 if agent_id in running_pipelines: 1217 status = "running" 1218 current_pipeline = running_pipelines[agent_id]["repo"] 1219 current_step = running_pipelines[agent_id].get("step", "") 1220 elif time_since_work < active_threshold: 1221 status = "active" 1222 current_pipeline = None 1223 current_step = None 1224 else: 1225 status = "idle" 1226 current_pipeline = None 1227 current_step = None 1228 1229 agents.append({ 1230 "id": agent_id, 1231 "name": agent.get("name", "Unknown"), 1232 "platform": agent.get("platform", ""), 1233 "status": status, 1234 "current_pipeline": current_pipeline, 1235 "current_step": current_step, 1236 "backend": agent.get("backend", ""), 1237 "capacity": agent.get("capacity", -1), 1238 "last_contact": agent.get("last_contact", 0), 1239 "last_work": last_work, 1240 "version": agent.get("version", ""), 1241 }) 1242 1243 results["woodpecker_agents"] = {"agents": agents} 1244 except Exception as e: 1245 print(f"Background: woodpecker agents fetch failed: {e}") 1246 1247 # Fetch testnet node status 1248 try: 1249 testnet_nodes = await fetch_all_testnet_nodes() 1250 results["testnet_nodes"] = { 1251 "nodes": testnet_nodes, 1252 "updated_at": datetime.now(timezone.utc).isoformat(), 1253 } 1254 except Exception as e: 1255 print(f"Background: testnet nodes fetch failed: {e}") 1256 1257 return results 1258 1259 1260 # Track if a refresh is already running to prevent overlapping refreshes 1261 _refresh_running = False 1262 1263 1264 async def background_refresh(): 1265 """Continuously refresh dashboard data in the background. 1266 1267 Runs as an asyncio task. Loops forever: fetch all slow data, update the 1268 store, persist to disk, sleep BG_REFRESH_INTERVAL seconds. Never crashes. 1269 """ 1270 global _refresh_running 1271 1272 while True: 1273 if _refresh_running: 1274 print("Background refresh already running, skipping this cycle") 1275 await asyncio.sleep(BG_REFRESH_INTERVAL) 1276 continue 1277 1278 _refresh_running = True 1279 start = time.time() 1280 print(f"Background refresh starting at {datetime.now().isoformat()}") 1281 1282 try: 1283 results = await fetch_all_slow_data() 1284 1285 for key, value in results.items(): 1286 store.set(key, value) 1287 1288 store.set_last_updated() 1289 store.persist_all() 1290 1291 elapsed = time.time() - start 1292 print(f"Background refresh complete in {elapsed:.1f}s, updated {len(results)} keys") 1293 except Exception as e: 1294 print(f"Background refresh failed: {e}") 1295 finally: 1296 _refresh_running = False 1297 1298 await asyncio.sleep(BG_REFRESH_INTERVAL) 1299 1300 1301 async def trigger_refresh_once(): 1302 """Trigger a single immediate refresh. Used by /dash/api/refresh endpoint.""" 1303 global _refresh_running 1304 1305 if _refresh_running: 1306 print("Refresh already in progress, skipping immediate trigger") 1307 return 1308 1309 _refresh_running = True 1310 start = time.time() 1311 print(f"Manual refresh triggered at {datetime.now().isoformat()}") 1312 1313 try: 1314 results = await fetch_all_slow_data() 1315 for key, value in results.items(): 1316 store.set(key, value) 1317 store.set_last_updated() 1318 store.persist_all() 1319 elapsed = time.time() - start 1320 print(f"Manual refresh complete in {elapsed:.1f}s") 1321 except Exception as e: 1322 print(f"Manual refresh failed: {e}") 1323 finally: 1324 _refresh_running = False 1325 1326 1327 @asynccontextmanager 1328 async def lifespan(app: FastAPI): 1329 """Application lifespan handler.""" 1330 print(f"Starting CI Dashboard Backend") 1331 print(f"Forgejo URL: {FORGEJO_URL}") 1332 print(f"Woodpecker URL: {WOODPECKER_URL}") 1333 print(f"Database: postgres://{DB_HOST}:{DB_PORT}/{DB_NAME}") 1334 print(f"Cache file: {CACHE_FILE}") 1335 1336 # Load existing cache from disk so dashboard shows data immediately 1337 store.load_from_disk() 1338 1339 # Start background refresh task 1340 bg_task = asyncio.create_task(background_refresh()) 1341 print(f"Background refresh task started (interval: {BG_REFRESH_INTERVAL}s)") 1342 1343 yield 1344 1345 bg_task.cancel() 1346 try: 1347 await bg_task 1348 except asyncio.CancelledError: 1349 pass 1350 print("Shutting down CI Dashboard Backend") 1351 1352 1353 app = FastAPI( 1354 title="CI Dashboard API", 1355 description="Backend aggregator for Forgejo CI Dashboard", 1356 version="1.0.0", 1357 lifespan=lifespan, 1358 ) 1359 1360 app.add_middleware( 1361 CORSMiddleware, 1362 allow_origins=["*"], 1363 allow_credentials=True, 1364 allow_methods=["*"], 1365 allow_headers=["*"], 1366 ) 1367 1368 1369 @app.middleware("http") 1370 async def no_cache_middleware(request: Request, call_next): 1371 response = await call_next(request) 1372 if request.url.path.startswith("/api/"): 1373 response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" 1374 response.headers["Pragma"] = "no-cache" 1375 return response 1376 1377 1378 @app.get("/api/health/status") 1379 async def get_health(): 1380 """Get Forgejo health status (live fetch, fast).""" 1381 cached = cache.get("health") 1382 if cached: 1383 return cached 1384 1385 try: 1386 async with httpx.AsyncClient() as client: 1387 data = await fetch_forgejo_api("/api/healthz", client) 1388 cache.set("health", data) 1389 return data 1390 except Exception as e: 1391 return {"status": "error", "message": str(e)} 1392 1393 1394 @app.get("/api/runners") 1395 async def get_runners(): 1396 """Get list of runners with their status (served from store, instant).""" 1397 data = store.get("runners") 1398 if data is not None: 1399 return data 1400 return {"runners": []} 1401 1402 1403 @app.get("/api/queue") 1404 async def get_queue(): 1405 """Get jobs waiting in queue (served from store, instant).""" 1406 data = store.get("queue") 1407 if data is not None: 1408 return data 1409 return {"queue": []} 1410 1411 1412 @app.get("/api/jobs/history") 1413 async def get_history(limit: int = 30): 1414 """Get recent job history (served from store, instant).""" 1415 data = store.get("history_30") 1416 if data is not None: 1417 jobs = data.get("jobs", []) 1418 return {"jobs": jobs[:limit]} 1419 return {"jobs": []} 1420 1421 1422 @app.get("/api/repos") 1423 async def get_repos(): 1424 """Get repository CI status with Woodpecker CI + Radicle sync (served from store, instant).""" 1425 data = store.get("repos") 1426 if data is not None: 1427 return data 1428 return {"repos": []} 1429 1430 1431 @app.get("/api/stats") 1432 async def get_stats(): 1433 """Get overall CI statistics (served from store, instant).""" 1434 data = store.get("stats") 1435 if data is not None: 1436 return data 1437 return { 1438 "runners": {"total": 0, "online": 0, "busy": 0}, 1439 "queue": {"length": 0}, 1440 "recent": {"total": 0, "success": 0, "failure": 0, "success_rate": 0}, 1441 } 1442 1443 1444 @app.get("/api/woodpecker/queue") 1445 async def get_woodpecker_queue(): 1446 """Get Woodpecker CI queue status (served from store, instant).""" 1447 data = store.get("woodpecker_queue") 1448 if data is not None: 1449 return data 1450 return {"running": [], "pending": [], "stats": {"running_count": 0, "pending_count": 0}, "paused": False} 1451 1452 1453 @app.get("/api/woodpecker/agents") 1454 async def get_woodpecker_agents(): 1455 """Get Woodpecker CI agent status (served from store, instant).""" 1456 data = store.get("woodpecker_agents") 1457 if data is not None: 1458 return data 1459 return {"agents": []} 1460 1461 1462 @app.get("/api/testnet/nodes") 1463 async def get_testnet_nodes(): 1464 """Get testnet node block heights and service status (served from store, instant).""" 1465 data = store.get("testnet_nodes") 1466 if data is not None: 1467 return data 1468 return {"nodes": [], "updated_at": None} 1469 1470 1471 @app.post("/api/cache/clear") 1472 async def clear_cache(): 1473 """Clear the short-TTL cache (store data is unaffected).""" 1474 cache.clear() 1475 return {"status": "ok", "message": "Cache cleared"} 1476 1477 1478 @app.get("/api/system") 1479 async def get_system(): 1480 """Get system resource usage (served from store, instant).""" 1481 data = store.get("system") 1482 if data is not None: 1483 return data 1484 return {"local": None, "ci": None} 1485 1486 1487 @app.get("/dash/api/refresh") 1488 async def trigger_refresh(): 1489 """Trigger an immediate background data refresh. 1490 1491 Returns immediately; refresh runs asynchronously in the background. 1492 """ 1493 if _refresh_running: 1494 return {"status": "already_running", "message": "A refresh is already in progress"} 1495 asyncio.create_task(trigger_refresh_once()) 1496 return {"status": "triggered", "message": "Refresh started in background"} 1497 1498 1499 @app.get("/dash/api/data_age") 1500 async def get_data_age(): 1501 """Return when dashboard data was last refreshed.""" 1502 last_updated = store.get_last_updated() 1503 age = store.get_data_age_seconds() 1504 1505 if last_updated is None: 1506 return { 1507 "last_updated": None, 1508 "age_seconds": None, 1509 "message": "No data available yet", 1510 } 1511 1512 return { 1513 "last_updated": datetime.fromtimestamp(last_updated, tz=timezone.utc).isoformat(), 1514 "age_seconds": round(age, 1) if age is not None else None, 1515 "refresh_interval": BG_REFRESH_INTERVAL, 1516 } 1517 1518 1519 if __name__ == "__main__": 1520 import uvicorn 1521 uvicorn.run(app, host="127.0.0.1", port=8081) 1522 1523 1524 # Pipeline Monitor endpoints - real-time SSH fetches (not cached in store) 1525 @app.get("/api/pipeline/metrics") 1526 async def get_pipeline_metrics(): 1527 """Get current pipeline metrics from CI server watcher.""" 1528 try: 1529 proc = await asyncio.create_subprocess_exec( 1530 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 1531 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1532 "cat", "/tmp/ci-monitor-data/metrics.log", "2>/dev/null", "|", "tail", "-1", 1533 stdout=asyncio.subprocess.PIPE, 1534 stderr=asyncio.subprocess.PIPE, 1535 ) 1536 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 1537 1538 if not stdout.strip(): 1539 return {"load": 0, "mem_mb": 0, "active_tasks": 0, "completed": 0} 1540 1541 data = {} 1542 for match in re.finditer(r"(\w+)=([0-9.]+)", stdout.decode()): 1543 key, val = match.groups() 1544 data[key] = float(val) if "." in val else int(val) 1545 return data 1546 except Exception as e: 1547 return {"error": str(e)} 1548 1549 1550 @app.get("/api/pipeline/events") 1551 async def get_pipeline_events(): 1552 """Get recent pipeline events from CI server watcher.""" 1553 try: 1554 proc = await asyncio.create_subprocess_exec( 1555 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 1556 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1557 "tail", "-100", "/tmp/ci-monitor-data/events.log", 1558 stdout=asyncio.subprocess.PIPE, 1559 stderr=asyncio.subprocess.PIPE, 1560 ) 1561 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 1562 1563 events = [] 1564 for line in stdout.decode().strip().split("\n"): 1565 if not line: 1566 continue 1567 match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})[^\[]*\[([^\]]+)\]\s*(.*)", line) 1568 if match: 1569 timestamp, event_type, rest = match.groups() 1570 events.append({ 1571 "time": timestamp[11:19], 1572 "type": event_type, 1573 "message": rest[:150] 1574 }) 1575 return events[-50:] 1576 except Exception as e: 1577 return [] 1578 1579 1580 @app.get("/api/pipeline/runners") 1581 async def get_pipeline_runners(): 1582 """Get current runner status from CI server watcher.""" 1583 try: 1584 proc = await asyncio.create_subprocess_exec( 1585 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 1586 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1587 "cat", "/var/www/health/runners.json", 1588 stdout=asyncio.subprocess.PIPE, 1589 stderr=asyncio.subprocess.PIPE, 1590 ) 1591 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 1592 1593 return json.loads(stdout.decode()) 1594 except Exception as e: 1595 return [] 1596 1597 1598 @app.get("/api/runners/metrics") 1599 async def get_runner_metrics(): 1600 """Get detailed runner metrics: load average, CPU quota, throttling stats.""" 1601 cached = cache.get("runner_metrics") 1602 if cached: 1603 return cached 1604 1605 script = ''' 1606 uptime | awk -F"load average: " '{print $2}' | awk '{print $1,$2,$3}' | tr -d ',' 1607 echo "---" 1608 systemctl show forgejo-runners.slice | grep -E "CPUQuota|CPUUsageNSec" 1609 echo "---" 1610 cat /sys/fs/cgroup/forgejo.slice/forgejo-runners.slice/cpu.stat 2>/dev/null || echo "nr_throttled 0" 1611 echo "---" 1612 systemctl list-units 'forgejo-runner-*' --state=active | grep -c "forgejo-runner-" 1613 ''' 1614 1615 try: 1616 proc = await asyncio.create_subprocess_exec( 1617 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 1618 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1619 "/bin/bash", "-c", script, 1620 stdout=asyncio.subprocess.PIPE, 1621 stderr=asyncio.subprocess.PIPE, 1622 ) 1623 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 1624 1625 output = stdout.decode().strip().split("---") 1626 1627 load_parts = output[0].strip().split() 1628 load_1m = float(load_parts[0]) if len(load_parts) > 0 else 0 1629 load_5m = float(load_parts[1]) if len(load_parts) > 1 else 0 1630 load_15m = float(load_parts[2]) if len(load_parts) > 2 else 0 1631 1632 cpu_quota_line = [line for line in output[1].strip().split("\n") if "CPUQuota" in line] 1633 cpu_quota_pct = 100 1634 if cpu_quota_line: 1635 quota_match = re.search(r'CPUQuotaPerSecUSec=(\d+)s', cpu_quota_line[0]) 1636 if quota_match: 1637 cpu_quota_pct = int(quota_match.group(1)) * 100 1638 1639 throttle_lines = output[2].strip().split("\n") 1640 nr_throttled = 0 1641 throttled_usec = 0 1642 for line in throttle_lines: 1643 if line.startswith("nr_throttled"): 1644 nr_throttled = int(line.split()[1]) 1645 elif line.startswith("throttled_usec"): 1646 throttled_usec = int(line.split()[1]) 1647 1648 active_runners = int(output[3].strip()) 1649 1650 result = { 1651 "load_average": { 1652 "1m": load_1m, 1653 "5m": load_5m, 1654 "15m": load_15m 1655 }, 1656 "cpu_quota_percent": cpu_quota_pct, 1657 "throttling": { 1658 "nr_throttled": nr_throttled, 1659 "throttled_seconds": throttled_usec / 1_000_000 1660 }, 1661 "runners": { 1662 "active": active_runners, 1663 "total": 10 1664 } 1665 } 1666 1667 cache.set("runner_metrics", result) 1668 return result 1669 1670 except Exception as e: 1671 return {"error": str(e)} 1672 1673 1674 @app.get("/api/sccache/metrics") 1675 async def get_sccache_metrics(): 1676 """Get sccache compilation cache metrics from CI server.""" 1677 cached = cache.get("sccache_metrics") 1678 if cached: 1679 return cached 1680 1681 try: 1682 proc = await asyncio.create_subprocess_exec( 1683 "ssh", 1684 "-o", "ConnectTimeout=5", 1685 "-o", "BatchMode=yes", 1686 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 1687 "cat", "/var/www/health/sccache.json", 1688 stdout=asyncio.subprocess.PIPE, 1689 stderr=asyncio.subprocess.PIPE, 1690 ) 1691 1692 try: 1693 stdout, stderr = await asyncio.wait_for( 1694 proc.communicate(), 1695 timeout=10 1696 ) 1697 except asyncio.TimeoutError: 1698 proc.kill() 1699 await proc.wait() 1700 return {"error": "Timeout fetching sccache metrics"} 1701 1702 if proc.returncode != 0: 1703 return {"error": f"Failed to fetch sccache metrics: {stderr.decode()}"} 1704 1705 result = json.loads(stdout.decode().strip()) 1706 1707 cache.set("sccache_metrics", result) 1708 return result 1709 1710 except Exception as e: 1711 return {"error": str(e)}