/ backend / main.py
main.py
   1  #!/usr/bin/env python3
   2  """
   3  Forgejo CI Dashboard Backend
   4  Aggregates data from Forgejo API and database for the dashboard.
   5  """
   6  
   7  import asyncio
   8  import json
   9  import os
  10  import re
  11  import subprocess
  12  import tempfile
  13  import time
  14  from contextlib import asynccontextmanager
  15  from datetime import datetime, timezone
  16  from pathlib import Path
  17  from typing import Any
  18  
  19  import httpx
  20  import psutil
  21  import psycopg2
  22  from psycopg2.extras import RealDictCursor
  23  from dotenv import load_dotenv
  24  from fastapi import FastAPI, Request
  25  from fastapi.middleware.cors import CORSMiddleware
  26  from fastapi.responses import Response
  27  
  28  load_dotenv()
  29  
  30  # Configuration
  31  FORGEJO_URL = os.getenv("FORGEJO_URL", "http://localhost:3000")
  32  FORGEJO_TOKEN = os.getenv("FORGEJO_TOKEN", "")
  33  FORGEJO_REPOS_PATH = os.getenv("FORGEJO_REPOS_PATH", "/var/lib/forgejo/repositories")
  34  RADICLE_BIN = os.getenv("RADICLE_BIN", "/home/devops/.radicle/bin/rad")
  35  CACHE_TTL = int(os.getenv("CACHE_TTL", "10"))  # seconds
  36  
  37  # Woodpecker CI Configuration
  38  WOODPECKER_URL = os.getenv("WOODPECKER_URL", "https://ci.ac-dc.network")
  39  WOODPECKER_TOKEN = os.getenv("WOODPECKER_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0eXBlIjoidXNlciIsInVzZXItaWQiOiIxIn0.89dRDf3ZEm4uLn8URz_axjimoSfJi2LJzcTCr4LdwaI")
  40  
  41  # Postgres database configuration
  42  DB_HOST = os.getenv("DB_HOST", "localhost")
  43  DB_PORT = int(os.getenv("DB_PORT", "5432"))
  44  DB_NAME = os.getenv("DB_NAME", "forgejo")
  45  DB_USER = os.getenv("DB_USER", "forgejo")
  46  DB_PASSWORD = os.getenv("DB_PASSWORD", "")
  47  
  48  CI_SERVER_SSH_PORT = int(os.getenv("CI_SERVER_SSH_PORT", "2584"))  # SSH port for CI server
  49  # CI stages to track (customize as needed)
  50  CI_STAGES = ["build", "check", "clippy", "fmt", "test", "node"]
  51  
  52  # Remote CI server configuration
  53  CI_SERVER_HOST = os.getenv("CI_SERVER_HOST", "10.106.0.3")
  54  CI_SERVER_USER = os.getenv("CI_SERVER_USER", "devops")
  55  CI_SERVER_SSH_TIMEOUT = int(os.getenv("CI_SERVER_SSH_TIMEOUT", "10"))
  56  
  57  # Background refresh interval (seconds)
  58  BG_REFRESH_INTERVAL = int(os.getenv("BG_REFRESH_INTERVAL", "60"))
  59  
  60  # Path for persistent JSON cache
  61  CACHE_FILE = Path(__file__).parent / "data" / "dashboard_cache.json"
  62  
  63  # Testnet node configuration
  64  TESTNET_SSH_PORT = int(os.getenv("TESTNET_SSH_PORT", "2584"))
  65  TESTNET_SSH_USER = os.getenv("TESTNET_SSH_USER", "devops")
  66  TESTNET_SERVERS = [
  67      {"name": "testnet001", "host": "testnet001.ac-dc.network", "role": "validator"},
  68      {"name": "testnet002", "host": "testnet002.ac-dc.network", "role": "validator"},
  69      {"name": "testnet003", "host": "testnet003.ac-dc.network", "role": "validator"},
  70      {"name": "testnet004", "host": "testnet004.ac-dc.network", "role": "validator"},
  71      {"name": "testnet005", "host": "testnet005.ac-dc.network", "role": "validator"},
  72      {"name": "testnet006", "host": "testnet006.ac-dc.network", "role": "prover"},
  73  ]
  74  
  75  
  76  def get_db_connection():
  77      """Create a connection to the Forgejo Postgres database."""
  78      return psycopg2.connect(
  79          host=DB_HOST,
  80          port=DB_PORT,
  81          dbname=DB_NAME,
  82          user=DB_USER,
  83          password=DB_PASSWORD
  84      )
  85  
  86  
  87  async def fetch_remote_system_metrics() -> dict | None:
  88      """Fetch system metrics from remote CI server via SSH using Python."""
  89      METRICS_SCRIPT_B64 = (
  90          "aW1wb3J0IGpzb24KaW1wb3J0IG9zCmltcG9ydCBzdWJwcm9jZXNzCmltcG9ydCB0aW1lCgpkZWYgZ2V0X21l"
  91          "dHJpY3MoKToKICAgIGNvcmVzID0gb3MuY3B1X2NvdW50KCkKICAgIGZyZWUgPSBzdWJwcm9jZXNzLnJ1bihb"
  92          "J2ZyZWUnLCAnLWInXSwgY2FwdHVyZV9vdXRwdXQ9VHJ1ZSwgdGV4dD1UcnVlKQogICAgbWVtX2xpbmUgPSBb"
  93          "bCBmb3IgbCBpbiBmcmVlLnN0ZG91dC5zcGxpdCgnXG4nKSBpZiBsLnN0YXJ0c3dpdGgoJ01lbTonKV1bMF0u"
  94          "c3BsaXQoKQogICAgc3dhcF9saW5lID0gW2wgZm9yIGwgaW4gZnJlZS5zdGRvdXQuc3BsaXQoJ1xuJykgaWYg"
  95          "bC5zdGFydHN3aXRoKCdTd2FwOicpXVswXS5zcGxpdCgpCiAgICBtZW1fdG90YWwgPSBpbnQobWVtX2xpbmVb"
  96          "MV0pCiAgICBtZW1fdXNlZCA9IGludChtZW1fbGluZVsyXSkKICAgIG1lbV9hdmFpbCA9IGludChtZW1fbGlu"
  97          "ZVs2XSkgaWYgbGVuKG1lbV9saW5lKSA+IDYgZWxzZSBtZW1fdG90YWwgLSBtZW1fdXNlZAogICAgc3dhcF90"
  98          "b3RhbCA9IGludChzd2FwX2xpbmVbMV0pCiAgICBzd2FwX3VzZWQgPSBpbnQoc3dhcF9saW5lWzJdKQogICAg"
  99          "c3dhcF9mcmVlID0gaW50KHN3YXBfbGluZVszXSkgaWYgbGVuKHN3YXBfbGluZSkgPiAzIGVsc2UgMAogICAg"
 100          "ZGYgPSBzdWJwcm9jZXNzLnJ1bihbJ2RmJywgJy1CMScsICcvJ10sIGNhcHR1cmVfb3V0cHV0PVRydWUsIHRl"
 101          "eHQ9VHJ1ZSkKICAgIGRpc2tfbGluZSA9IGRmLnN0ZG91dC5zdHJpcCgpLnNwbGl0KCdcbicpWzFdLnNwbGl0"
 102          "KCkKICAgIGRpc2tfdG90YWwgPSBpbnQoZGlza19saW5lWzFdKQogICAgZGlza191c2VkID0gaW50KGRpc2tf"
 103          "bGluZVsyXSkKICAgIGRpc2tfYXZhaWwgPSBpbnQoZGlza19saW5lWzNdKQoKICAgICMgVXNlIC9wcm9jL3N0"
 104          "YXQgZm9yIGFjY3VyYXRlIENQVSBjYWxjdWxhdGlvbiAoc2FtZSBhcyBQcm9tZXRoZXVzKQogICAgd2l0aCBv"
 105          "cGVuKCcvcHJvYy9zdGF0JykgYXMgZjoKICAgICAgICBjcHUxID0gW2ludCh4KSBmb3IgeCBpbiBmLnJlYWRs"
 106          "aW5lKCkuc3BsaXQoKVsxOl1dCiAgICB0aW1lLnNsZWVwKDEpCiAgICB3aXRoIG9wZW4oJy9wcm9jL3N0YXQn"
 107          "KSBhcyBmOgogICAgICAgIGNwdTIgPSBbaW50KHgpIGZvciB4IGluIGYucmVhZGxpbmUoKS5zcGxpdCgpWzE6"
 108          "XV0KCiAgICBkZWx0YXMgPSBbYiAtIGEgZm9yIGEsIGIgaW4gemlwKGNwdTEsIGNwdTIpXQogICAgdG90YWxf"
 109          "ZGVsdGEgPSBzdW0oZGVsdGFzKQogICAgaWRsZV9kZWx0YSA9IGRlbHRhc1szXSAgIyBJbmRleCAzIGlzIGlk"
 110          "bGUgdGltZQoKICAgIGNwdV9wY3QgPSByb3VuZCgxMDAgKiAoMSAtIGlkbGVfZGVsdGEgLyB0b3RhbF9kZWx0"
 111          "YSksIDEpIGlmIHRvdGFsX2RlbHRhID4gMCBlbHNlIDAKCiAgICByZXR1cm4gewogICAgICAgICJjcHUiOiB7"
 112          "InBlcmNlbnQiOiBjcHVfcGN0LCAiY29yZXMiOiBjb3Jlc30sCiAgICAgICAgIm1lbW9yeSI6IHsidG90YWwi"
 113          "OiBtZW1fdG90YWwsICJ1c2VkIjogbWVtX3VzZWQsICJhdmFpbGFibGUiOiBtZW1fYXZhaWwsICJwZXJjZW50"
 114          "IjogaW50KDEwMCptZW1fdXNlZC9tZW1fdG90YWwpfSwKICAgICAgICAiZGlzayI6IHsidG90YWwiOiBkaXNr"
 115          "X3RvdGFsLCAidXNlZCI6IGRpc2tfdXNlZCwgImZyZWUiOiBkaXNrX2F2YWlsLCAicGVyY2VudCI6IGludCgx"
 116          "MDAqZGlza191c2VkL2Rpc2tfdG90YWwpfSwKICAgICAgICAic3dhcCI6IHsidG90YWwiOiBzd2FwX3RvdGFs"
 117          "LCAidXNlZCI6IHN3YXBfdXNlZCwgImZyZWUiOiBzd2FwX2ZyZWUsICJwZXJjZW50IjogaW50KDEwMCpzd2Fw"
 118          "X3VzZWQvc3dhcF90b3RhbCkgaWYgc3dhcF90b3RhbCA+IDAgZWxzZSAwfSwKICAgICAgICAibWVtb3J5X3By"
 119          "ZXNzdXJlIjogMCwKICAgICAgICAib29tX2V2ZW50cyI6IDAKICAgIH0KcHJpbnQoanNvbi5kdW1wcyhnZXRf"
 120          "bWV0cmljcygpKSkK"
 121      )
 122  
 123      try:
 124          ssh_cmd = (
 125              f"ssh -p {CI_SERVER_SSH_PORT} -o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=accept-new "
 126              f"{CI_SERVER_USER}@{CI_SERVER_HOST} "
 127              f"'echo {METRICS_SCRIPT_B64} | base64 -d | python3'"
 128          )
 129  
 130          proc = await asyncio.create_subprocess_shell(
 131              ssh_cmd,
 132              stdout=asyncio.subprocess.PIPE,
 133              stderr=asyncio.subprocess.PIPE,
 134          )
 135  
 136          try:
 137              stdout, stderr = await asyncio.wait_for(
 138                  proc.communicate(),
 139                  timeout=CI_SERVER_SSH_TIMEOUT
 140              )
 141          except asyncio.TimeoutError:
 142              proc.kill()
 143              await proc.wait()
 144              print(f"SSH timeout fetching metrics from {CI_SERVER_HOST}")
 145              return None
 146  
 147          if proc.returncode != 0:
 148              print(f"SSH error fetching metrics from {CI_SERVER_HOST}: {stderr.decode()}")
 149              return None
 150  
 151          return json.loads(stdout.decode().strip())
 152  
 153      except Exception as e:
 154          print(f"Error fetching remote metrics: {e}")
 155          return None
 156  
 157  
 158  async def fetch_testnet_node_status(server: dict) -> dict:
 159      """Fetch block heights and service status from a testnet node via SSH.
 160  
 161      Pipes the Python script via stdin to avoid shell quoting issues with
 162      braces and quotes in the remote command.
 163  
 164      REST API notes (discovered 2026-02-20):
 165      - Routes are under /v1/testnet/ for both alphaos and deltaos
 166      - JWT token is stored in the ledger dir: /.ledger-13-{N}/jwt_secret_*.txt (alpha)
 167        and /.ledger-delta-21-{N}/jwt_secret_*.txt (delta)
 168      - Block height: GET /v1/testnet/block/height/latest -> integer
 169      - Sync status: GET /v1/testnet/sync/status -> is_synced, ledger_height, sync_mode
 170      - Processes may run outside systemd (started manually), detect via pgrep
 171      - testnet006 alphaos prover uses REST port 3035 (process uses --rest 0.0.0.0:3035)
 172      """
 173      # Script is sent via stdin to `python3` -- no shell quoting needed
 174      # Using str.encode() to avoid byte-literal unicode issues
 175      python_script = (
 176          "import json, subprocess, urllib.request, glob, socket\n"
 177          "result = {'alpha': {}, 'delta': {}}\n"
 178          "for svcs, proc, key in [(['alphaos-validator', 'alphaos'], 'alphaos', 'alpha'), (['deltaos-validator', 'deltaos'], 'deltaos', 'delta')]:\n"
 179          "    svc_status = 'unknown'\n"
 180          "    for svc in svcs:\n"
 181          "        p = subprocess.run(['systemctl', 'is-active', svc], capture_output=True, text=True)\n"
 182          "        s = p.stdout.strip()\n"
 183          "        if s == 'active':\n"
 184          "            svc_status = 'active'\n"
 185          "            break\n"
 186          "    if svc_status != 'active':\n"
 187          "        pcheck = subprocess.run(['pgrep', '-c', proc], capture_output=True, text=True)\n"
 188          "        if pcheck.returncode == 0 and int(pcheck.stdout.strip() or '0') > 0:\n"
 189          "            svc_status = 'running-nosystemd'\n"
 190          "    result[key]['service'] = svc_status\n"
 191          "hostname = socket.gethostname()\n"
 192          "alpha_port = 3035 if 'testnet006' in hostname else 3030\n"
 193          "delta_port = 3031\n"
 194          "def find_jwt(pattern):\n"
 195          "    matches = glob.glob(pattern)\n"
 196          "    if matches:\n"
 197          "        try:\n"
 198          "            with open(matches[0]) as f: return f.read().strip()\n"
 199          "        except Exception: pass\n"
 200          "    return None\n"
 201          "alpha_jwt = find_jwt('/.ledger-13-*/jwt_secret_*.txt')\n"
 202          "delta_jwt = find_jwt('/.ledger-delta-21-*/jwt_secret_*.txt')\n"
 203          "for chain, port, jwt in [('alpha', alpha_port, alpha_jwt), ('delta', delta_port, delta_jwt)]:\n"
 204          "    try:\n"
 205          "        headers = {}\n"
 206          "        if jwt: headers['Authorization'] = 'Bearer ' + jwt\n"
 207          "        req = urllib.request.Request(\n"
 208          "            'http://localhost:' + str(port) + '/v1/testnet/sync/status',\n"
 209          "            headers=headers)\n"
 210          "        with urllib.request.urlopen(req, timeout=3) as resp:\n"
 211          "            d = json.loads(resp.read())\n"
 212          "            result[chain]['height'] = d.get('ledger_height')\n"
 213          "            result[chain]['synced'] = d.get('is_synced', False)\n"
 214          "            result[chain]['sync_mode'] = d.get('sync_mode', 'unknown')\n"
 215          "    except Exception as e:\n"
 216          "        result[chain]['height'] = None\n"
 217          "        result[chain]['synced'] = False\n"
 218          "        result[chain]['sync_error'] = str(e)\n"
 219          "print(json.dumps(result))\n"
 220      ).encode("utf-8")
 221      try:
 222          proc = await asyncio.create_subprocess_exec(
 223              "ssh",
 224              "-p", str(TESTNET_SSH_PORT),
 225              "-o", "ConnectTimeout=5",
 226              "-o", "BatchMode=yes",
 227              "-o", "StrictHostKeyChecking=accept-new",
 228              f"{TESTNET_SSH_USER}@{server['host']}",
 229              "python3",
 230              stdin=asyncio.subprocess.PIPE,
 231              stdout=asyncio.subprocess.PIPE,
 232              stderr=asyncio.subprocess.PIPE,
 233          )
 234          stdout, _ = await asyncio.wait_for(proc.communicate(input=python_script), timeout=12)
 235          data = json.loads(stdout.decode().strip())
 236          return {
 237              "name": server["name"],
 238              "role": server.get("role", "validator"),
 239              "reachable": True,
 240              "alpha_height": data.get("alpha", {}).get("height"),
 241              "alpha_service": data.get("alpha", {}).get("service", "unknown"),
 242              "alpha_synced": data.get("alpha", {}).get("synced", False),
 243              "delta_height": data.get("delta", {}).get("height"),
 244              "delta_service": data.get("delta", {}).get("service", "unknown"),
 245              "delta_synced": data.get("delta", {}).get("synced", False),
 246          }
 247      except Exception as e:
 248          print(f"Testnet {server['name']} fetch failed: {e}")
 249          return {
 250              "name": server["name"],
 251              "role": server.get("role", "validator"),
 252              "reachable": False,
 253              "alpha_height": None,
 254              "alpha_service": "offline",
 255              "alpha_synced": False,
 256              "delta_height": None,
 257              "delta_service": "offline",
 258              "delta_synced": False,
 259          }
 260  
 261  
 262  async def fetch_all_testnet_nodes() -> list[dict]:
 263      """Fetch status from all testnet nodes in parallel."""
 264      tasks = [fetch_testnet_node_status(s) for s in TESTNET_SERVERS]
 265      return list(await asyncio.gather(*tasks))
 266  
 267  
 268  class DashboardStore:
 269      """Persistent store for dashboard data.
 270  
 271      Stores data in memory AND persists to a JSON file atomically.
 272      On startup, loads from disk if a cached file exists.
 273      API endpoints read from this store (instant, never blocks).
 274      """
 275  
 276      def __init__(self, cache_file: Path):
 277          self._cache_file = cache_file
 278          self._data: dict[str, Any] = {}
 279          self._last_updated: float | None = None
 280          cache_file.parent.mkdir(parents=True, exist_ok=True)
 281  
 282      def load_from_disk(self, max_age_seconds: int = 300) -> bool:
 283          """Load data from the persistent JSON file on startup."""
 284          try:
 285              if self._cache_file.exists():
 286                  with open(self._cache_file) as f:
 287                      saved = json.load(f)
 288                  last_updated = saved.get("last_updated")
 289                  if last_updated and time.time() - last_updated > max_age_seconds:
 290                      print(f"DashboardStore: on-disk cache is stale (>{max_age_seconds}s), discarding")
 291                      return False
 292                  self._data = saved.get("data", {})
 293                  self._last_updated = last_updated
 294                  print(f"DashboardStore: loaded from disk ({len(self._data)} keys)")
 295                  return True
 296          except Exception as e:
 297              print(f"DashboardStore: failed to load from disk: {e}")
 298          return False
 299  
 300      def get(self, key: str) -> Any | None:
 301          """Get a value from the store."""
 302          return self._data.get(key)
 303  
 304      def set(self, key: str, value: Any) -> None:
 305          """Set a value in memory."""
 306          self._data[key] = value
 307  
 308      def persist_all(self) -> None:
 309          """Write all current data to disk atomically."""
 310          self._persist()
 311  
 312      def set_last_updated(self) -> None:
 313          """Record the current time as the last-updated timestamp."""
 314          self._last_updated = time.time()
 315  
 316      def get_last_updated(self) -> float | None:
 317          return self._last_updated
 318  
 319      def get_data_age_seconds(self) -> float | None:
 320          if self._last_updated is None:
 321              return None
 322          return time.time() - self._last_updated
 323  
 324      def _persist(self) -> None:
 325          """Atomically write data to the cache file."""
 326          try:
 327              payload = {
 328                  "last_updated": self._last_updated,
 329                  "data": self._data,
 330              }
 331              tmp = tempfile.NamedTemporaryFile(
 332                  mode="w",
 333                  dir=self._cache_file.parent,
 334                  delete=False,
 335                  suffix=".tmp"
 336              )
 337              json.dump(payload, tmp)
 338              tmp.close()
 339              Path(tmp.name).rename(self._cache_file)
 340          except Exception as e:
 341              print(f"DashboardStore: failed to persist to disk: {e}")
 342  
 343  
 344  # Global persistent store
 345  store = DashboardStore(CACHE_FILE)
 346  
 347  
 348  class Cache:
 349      """Simple in-memory cache with TTL for fast-changing pipeline endpoints."""
 350  
 351      def __init__(self, ttl: int = 10):
 352          self.ttl = ttl
 353          self._cache: dict[str, tuple[float, Any]] = {}
 354  
 355      def get(self, key: str) -> Any | None:
 356          if key in self._cache:
 357              timestamp, value = self._cache[key]
 358              if time.time() - timestamp < self.ttl:
 359                  return value
 360              del self._cache[key]
 361          return None
 362  
 363      def set(self, key: str, value: Any) -> None:
 364          self._cache[key] = (time.time(), value)
 365  
 366      def clear(self) -> None:
 367          self._cache.clear()
 368  
 369  
 370  cache = Cache(ttl=CACHE_TTL)
 371  
 372  
 373  def load_radicle_config() -> dict[str, str | None]:
 374      """Load repo-to-RID mappings from config file."""
 375      config_path = Path(__file__).parent / "radicle-repos.json"
 376      try:
 377          if config_path.exists():
 378              with open(config_path) as f:
 379                  data = json.load(f)
 380                  return {k.lower(): v for k, v in data.get("repos", {}).items()}
 381      except Exception as e:
 382          print(f"Error loading radicle config: {e}")
 383      return {}
 384  
 385  
 386  def get_unmerged_branches_count(owner: str, repo: str) -> int:
 387      """Get count of unmerged branches for a repository."""
 388      try:
 389          headers = {}
 390          if FORGEJO_TOKEN:
 391              headers["Authorization"] = f"token {FORGEJO_TOKEN}"
 392  
 393          url = f"{FORGEJO_URL}/api/v1/repos/{owner}/{repo}/branches?limit=100"
 394          with httpx.Client(timeout=5.0) as client:
 395              response = client.get(url, headers=headers)
 396              if response.status_code == 200:
 397                  branches = response.json()
 398                  unmerged = [b for b in branches if b.get("name", "") not in ["main", "master"]]
 399                  return len(unmerged)
 400              return 0
 401      except Exception as e:
 402          print(f"Error getting unmerged branches for {owner}/{repo}: {e}")
 403          return 0
 404  
 405  
 406  def get_radicle_repos() -> dict[str, str | None]:
 407      """Get radicle repos with their HEAD commits from each repo's git remote ref.
 408      
 409      Reads .git/refs/remotes/rad/main in each working repo directory.
 410      This is accurate and fast (local file read, no network), and is updated
 411      whenever git push rad main succeeds.
 412      """
 413      try:
 414          radicle_config = load_radicle_config()
 415          if not radicle_config:
 416              return {}
 417  
 418          repos_root = Path(os.getenv("REPOS_ROOT", "/home/devops/working-repos"))
 419          repos = {}
 420  
 421          for name in radicle_config:
 422              repo_dir = repos_root / name
 423              ref_path = repo_dir / ".git" / "refs" / "remotes" / "rad" / "main"
 424              if ref_path.exists():
 425                  try:
 426                      head = ref_path.read_text().strip()[:7]
 427                      if head and len(head) >= 7:
 428                          repos[name.lower()] = head
 429                  except Exception as e:
 430                      print(f"Error reading rad ref for {name}: {e}")
 431              else:
 432                  # Fallback: try packed-refs
 433                  packed = repo_dir / ".git" / "packed-refs"
 434                  if packed.exists():
 435                      for line in packed.read_text().splitlines():
 436                          if line.endswith(" refs/remotes/rad/main"):
 437                              repos[name.lower()] = line.split()[0][:7]
 438                              break
 439  
 440          return repos
 441      except Exception as e:
 442          print(f"Error getting radicle repos: {e}")
 443          return {}
 444  
 445  
 446  def get_forgejo_repo_head(owner: str, repo: str) -> str | None:
 447      """Get the HEAD commit hash for a Forgejo repository."""
 448      try:
 449          headers = {}
 450          if FORGEJO_TOKEN:
 451              headers["Authorization"] = f"token {FORGEJO_TOKEN}"
 452  
 453          for branch in ["main", "master"]:
 454              try:
 455                  url = f"{FORGEJO_URL}/api/v1/repos/{owner}/{repo}/branches/{branch}"
 456                  with httpx.Client(timeout=5.0) as client:
 457                      response = client.get(url, headers=headers)
 458                      if response.status_code == 200:
 459                          data = response.json()
 460                          commit_sha = data.get("commit", {}).get("id", "")
 461                          if commit_sha:
 462                              return commit_sha[:7]
 463              except Exception:
 464                  continue
 465      except Exception as e:
 466          print(f"Forgejo API query failed for {owner}/{repo}, trying local files: {e}")
 467  
 468      try:
 469          repo_path = Path(FORGEJO_REPOS_PATH) / owner / f"{repo}.git"
 470          if not repo_path.exists():
 471              return None
 472  
 473          def read_packed_ref(ref_name: str) -> str | None:
 474              packed_refs_path = repo_path / "packed-refs"
 475              if not packed_refs_path.exists():
 476                  return None
 477              try:
 478                  for line in packed_refs_path.read_text().splitlines():
 479                      line = line.strip()
 480                      if not line or line.startswith("#") or line.startswith("^"):
 481                          continue
 482                      parts = line.split()
 483                      if len(parts) >= 2 and parts[1] == ref_name:
 484                          return parts[0][:7]
 485              except Exception:
 486                  pass
 487              return None
 488  
 489          for branch in ["main", "master"]:
 490              ref_path = repo_path / "refs" / "heads" / branch
 491              if ref_path.exists():
 492                  return ref_path.read_text().strip()[:7]
 493              packed_head = read_packed_ref(f"refs/heads/{branch}")
 494              if packed_head:
 495                  return packed_head
 496  
 497          head_path = repo_path / "HEAD"
 498          if head_path.exists():
 499              head_content = head_path.read_text().strip()
 500              if head_content.startswith("ref:"):
 501                  ref = head_content.split("ref:")[1].strip()
 502                  ref_path = repo_path / ref
 503                  if ref_path.exists():
 504                      return ref_path.read_text().strip()[:7]
 505                  packed_head = read_packed_ref(ref)
 506                  if packed_head:
 507                      return packed_head
 508              else:
 509                  return head_content[:7]
 510          return None
 511      except Exception as e:
 512          print(f"Error getting Forgejo repo HEAD for {owner}/{repo}: {e}")
 513          return None
 514  
 515  
 516  async def fetch_forgejo_api(path: str, client: httpx.AsyncClient) -> Any:
 517      """Fetch data from Forgejo API."""
 518      headers = {}
 519      if FORGEJO_TOKEN:
 520          headers["Authorization"] = f"token {FORGEJO_TOKEN}"
 521  
 522      url = f"{FORGEJO_URL}{path}"
 523      response = await client.get(url, headers=headers, timeout=30.0)
 524      response.raise_for_status()
 525      return response.json()
 526  
 527  
 528  async def fetch_woodpecker_api(path: str, client: httpx.AsyncClient) -> Any:
 529      """Fetch data from Woodpecker CI API."""
 530      headers = {"Authorization": f"Bearer {WOODPECKER_TOKEN}"}
 531      url = f"{WOODPECKER_URL}{path}"
 532      try:
 533          response = await client.get(url, headers=headers, timeout=10.0)
 534          response.raise_for_status()
 535          return response.json()
 536      except Exception as e:
 537          print(f"Woodpecker API error for {path}: {e}")
 538          return None
 539  
 540  
 541  def query_runners_from_db() -> list[dict]:
 542      """Query runner information directly from Forgejo Postgres database."""
 543      try:
 544          conn = get_db_connection()
 545          cursor = conn.cursor(cursor_factory=RealDictCursor)
 546  
 547          cursor.execute("""
 548              SELECT
 549                  id, name, uuid, version, agent_labels,
 550                  last_online, last_active, owner_id, repo_id
 551              FROM action_runner
 552              WHERE deleted IS NULL OR deleted = 0
 553              ORDER BY last_online DESC
 554          """)
 555  
 556          runners = []
 557          now = time.time()
 558          for row in cursor.fetchall():
 559              last_online = row["last_online"] or 0
 560              last_active = row["last_active"] or 0
 561              is_online = (now - last_online) < 60
 562  
 563              labels = []
 564              if row["agent_labels"]:
 565                  try:
 566                      labels = json.loads(row["agent_labels"])
 567                  except json.JSONDecodeError:
 568                      labels = []
 569  
 570              runners.append({
 571                  "id": row["id"],
 572                  "name": row["name"],
 573                  "uuid": row["uuid"],
 574                  "version": row["version"],
 575                  "labels": labels,
 576                  "status": "online" if is_online else "offline",
 577                  "last_online": datetime.fromtimestamp(last_online, tz=timezone.utc).isoformat() if last_online else None,
 578                  "last_active": datetime.fromtimestamp(last_active, tz=timezone.utc).isoformat() if last_active else None,
 579                  "current_job": None,
 580              })
 581  
 582          conn.close()
 583          return runners
 584      except Exception as e:
 585          print(f"Error querying runners from DB: {e}")
 586          return []
 587  
 588  
 589  def query_running_jobs_from_db() -> dict[int, dict]:
 590      """Query currently running jobs to associate with runners."""
 591      try:
 592          conn = get_db_connection()
 593          cursor = conn.cursor(cursor_factory=RealDictCursor)
 594  
 595          now = int(time.time())
 596          max_age = now - (2 * 3600)
 597  
 598          cursor.execute("""
 599              SELECT
 600                  t.id, t.job_id, t.runner_id, t.status, t.started,
 601                  j.name as job_name, j.run_id,
 602                  r.title as run_title, r.repo_id,
 603                  repo.name as repo_name
 604              FROM action_task t
 605              LEFT JOIN action_run_job j ON t.job_id = j.id
 606              LEFT JOIN action_run r ON j.run_id = r.id
 607              LEFT JOIN repository repo ON r.repo_id = repo.id
 608              WHERE t.status = 2 AND t.started > ?
 609              ORDER BY t.started DESC
 610          """, (max_age,))
 611  
 612          jobs_by_runner = {}
 613          for row in cursor.fetchall():
 614              runner_id = row["runner_id"]
 615              if runner_id and runner_id not in jobs_by_runner:
 616                  jobs_by_runner[runner_id] = {
 617                      "id": row["id"],
 618                      "name": row["job_name"] or "Unknown",
 619                      "run_title": row["run_title"],
 620                      "repo": row["repo_name"],
 621                      "started": datetime.fromtimestamp(row["started"], tz=timezone.utc).isoformat() if row["started"] else None,
 622                  }
 623  
 624          conn.close()
 625          return jobs_by_runner
 626      except Exception as e:
 627          print(f"Error querying running jobs from DB: {e}")
 628          return {}
 629  
 630  
 631  def query_queue_from_db() -> list[dict]:
 632      """Query active workflow runs and return repo-level progress summary."""
 633      try:
 634          conn = get_db_connection()
 635          cursor = conn.cursor(cursor_factory=RealDictCursor)
 636  
 637          now = int(time.time())
 638          max_age = now - (30 * 60)
 639  
 640          cursor.execute("""
 641              SELECT
 642                  r.id as run_id, r.status as run_status, r.updated,
 643                  repo.name as repo_name
 644              FROM action_run r
 645              JOIN repository repo ON r.repo_id = repo.id
 646              WHERE r.status IN (1, 2) AND r.updated > ?
 647              ORDER BY r.updated DESC
 648          """, (max_age,))
 649  
 650          runs = cursor.fetchall()
 651          if not runs:
 652              conn.close()
 653              return []
 654  
 655          queue_summary = []
 656          seen_repos = set()
 657  
 658          for run in runs:
 659              repo_name = run["repo_name"]
 660              if repo_name in seen_repos:
 661                  continue
 662              seen_repos.add(repo_name)
 663  
 664              cursor.execute("""
 665                  SELECT status, COUNT(*) as count
 666                  FROM action_run_job
 667                  WHERE run_id = ?
 668                  GROUP BY status
 669              """, (run["run_id"],))
 670  
 671              status_counts = {row["status"]: row["count"] for row in cursor.fetchall()}
 672  
 673              done = sum(status_counts.get(s, 0) for s in [3, 4, 5, 6])
 674              total = sum(status_counts.values())
 675              running = status_counts.get(2, 0)
 676              pending = status_counts.get(1, 0) + status_counts.get(0, 0)
 677  
 678              if running > 0:
 679                  status = "running"
 680              elif pending > 0:
 681                  status = "pending"
 682              else:
 683                  status = "done"
 684  
 685              queue_summary.append({
 686                  "repo": repo_name,
 687                  "done": done,
 688                  "total": total,
 689                  "running": running,
 690                  "status": status,
 691              })
 692  
 693          conn.close()
 694          return queue_summary
 695      except Exception as e:
 696          print(f"Error querying queue from DB: {e}")
 697          return []
 698  
 699  
 700  def query_history_from_db(limit: int = 30) -> list[dict]:
 701      """Query recent job history from database."""
 702      try:
 703          conn = get_db_connection()
 704          cursor = conn.cursor(cursor_factory=RealDictCursor)
 705  
 706          status_map = {
 707              0: "unknown", 1: "queued", 2: "running",
 708              3: "success", 4: "failure", 5: "cancelled", 6: "skipped"
 709          }
 710  
 711          history = []
 712  
 713          cursor.execute("""
 714              SELECT
 715                  j.id, j.name, j.status, j.started, j.stopped, j.updated,
 716                  r.title, r.ref as branch, r.event,
 717                  repo.name as repo_name, repo.owner_name
 718              FROM action_run_job j
 719              JOIN action_run r ON j.run_id = r.id
 720              JOIN repository repo ON r.repo_id = repo.id
 721              WHERE j.status = 2 AND (j.stopped IS NULL OR j.stopped = 0)
 722              ORDER BY CASE WHEN j.started > 0 THEN j.started ELSE j.updated END DESC
 723              LIMIT ?
 724          """, (limit,))
 725  
 726          for row in cursor.fetchall():
 727              started = row["started"] or 0
 728              branch = row["branch"] or ""
 729              branch = branch.replace("refs/heads/", "").replace("refs/tags/", "")
 730  
 731              history.append({
 732                  "id": row["id"],
 733                  "job": row["name"],
 734                  "repo": f"{row['owner_name']}/{row['repo_name']}",
 735                  "branch": branch,
 736                  "result": status_map.get(row["status"], "unknown"),
 737                  "duration": 0,
 738                  "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None,
 739                  "finished_at": None,
 740              })
 741  
 742          remaining_limit = limit - len(history)
 743          if remaining_limit > 0:
 744              cursor.execute("""
 745                  SELECT
 746                      j.id, j.name, j.status, j.started, j.stopped, j.updated,
 747                      r.title, r.ref as branch, r.event,
 748                      repo.name as repo_name, repo.owner_name
 749                  FROM action_run_job j
 750                  JOIN action_run r ON j.run_id = r.id
 751                  JOIN repository repo ON r.repo_id = repo.id
 752                  WHERE j.status IN (3, 4, 5, 6) OR (j.stopped > 0 AND j.started > 0)
 753                  ORDER BY CASE WHEN j.stopped > 0 THEN j.stopped ELSE j.updated END DESC
 754                  LIMIT ?
 755              """, (remaining_limit,))
 756  
 757              for row in cursor.fetchall():
 758                  started = row["started"] or 0
 759                  stopped = row["stopped"] if row["stopped"] else row["updated"]
 760                  duration = stopped - started if stopped and started else 0
 761  
 762                  branch = row["branch"] or ""
 763                  branch = branch.replace("refs/heads/", "").replace("refs/tags/", "")
 764  
 765                  status = row["status"]
 766                  if status in (1, 2) and stopped > 0:
 767                      result = "success"
 768                  else:
 769                      result = status_map.get(status, "unknown")
 770  
 771                  history.append({
 772                      "id": row["id"],
 773                      "job": row["name"],
 774                      "repo": f"{row['owner_name']}/{row['repo_name']}",
 775                      "branch": branch,
 776                      "result": result,
 777                      "duration": duration,
 778                      "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None,
 779                      "finished_at": datetime.fromtimestamp(stopped, tz=timezone.utc).isoformat() if stopped else None,
 780                  })
 781  
 782          conn.close()
 783          return history
 784      except Exception as e:
 785          print(f"Error querying history from DB: {e}")
 786          return []
 787  
 788  
 789  def query_repo_status_from_api() -> list[dict]:
 790      """Query repos via Forgejo API when database is not accessible."""
 791      headers = {"Authorization": f"token {FORGEJO_TOKEN}"} if FORGEJO_TOKEN else {}
 792  
 793      radicle_repos = get_radicle_repos()
 794      radicle_config = load_radicle_config()
 795  
 796      try:
 797          with httpx.Client(timeout=30.0) as client:
 798              response = client.get(
 799                  f"{FORGEJO_URL}/api/v1/orgs/alpha-delta-network/repos?limit=100",
 800                  headers=headers
 801              )
 802              response.raise_for_status()
 803              all_repos = response.json()
 804  
 805              repos = []
 806              for repo_data in all_repos:
 807                  repo_name = repo_data["name"]
 808                  repo_name_lower = repo_name.lower()
 809                  default_branch = repo_data.get("default_branch", "main")
 810  
 811                  ci_status = None
 812                  try:
 813                      ci_response = client.get(
 814                          f"{FORGEJO_URL}/api/v1/repos/alpha-delta-network/{repo_name}/actions/runs?limit=1",
 815                          headers=headers,
 816                          timeout=5.0
 817                      )
 818                      if ci_response.status_code == 200:
 819                          ci_data = ci_response.json()
 820                          runs = ci_data.get("workflow_runs", [])
 821                          if runs:
 822                              run = runs[0]
 823                              ci_status = run.get("conclusion") or run.get("status")
 824                              if isinstance(ci_status, int):
 825                                  status_map = {1: "pending", 2: "running", 3: "success", 4: "failure", 5: "cancelled", 6: "skipped"}
 826                                  ci_status = status_map.get(ci_status, "unknown")
 827                  except Exception as e:
 828                      print(f"CI status error for {repo_name}: {e}")
 829  
 830                  forgejo_head = None
 831                  try:
 832                      branch_response = client.get(
 833                          f"{FORGEJO_URL}/api/v1/repos/alpha-delta-network/{repo_name}/branches/{default_branch}",
 834                          headers=headers,
 835                          timeout=5.0
 836                      )
 837                      if branch_response.status_code == 200:
 838                          branch_data = branch_response.json()
 839                          forgejo_head = branch_data.get("commit", {}).get("id", "")[:7]
 840                  except Exception as e:
 841                      print(f"Branch HEAD error for {repo_name}: {e}")
 842  
 843                  radicle_head = radicle_repos.get(repo_name_lower)
 844                  rid = radicle_config.get(repo_name_lower)
 845  
 846                  if radicle_head is None and rid:
 847                      sync_status = "unsynced"
 848                      radicle_synced = False
 849                      radicle_head = "?"
 850                  elif radicle_head is None and rid is None:
 851                      sync_status = "no-radicle"
 852                      radicle_synced = None
 853                  elif forgejo_head and radicle_head and forgejo_head == radicle_head:
 854                      sync_status = "synced"
 855                      radicle_synced = True
 856                  else:
 857                      sync_status = "unsynced"
 858                      radicle_synced = False
 859  
 860                  repos.append({
 861                      "name": repo_name,
 862                      "owner_name": "alpha-delta-network",
 863                      "forgejo_branch": default_branch,
 864                      "forgejo_ci_status": ci_status,
 865                      "radicle_synced": radicle_synced,
 866                      "forgejo_head": forgejo_head,
 867                      "radicle_head": radicle_head,
 868                      "sync_status": sync_status,
 869                      "rid": rid,
 870                  })
 871  
 872              return repos
 873      except Exception as e:
 874          print(f"Error querying repos via API: {e}")
 875          return []
 876  
 877  
 878  def query_repo_status_from_db() -> list[dict]:
 879      """Query all repositories with their radicle sync status and CI status."""
 880      try:
 881          conn = get_db_connection()
 882          cursor = conn.cursor(cursor_factory=RealDictCursor)
 883  
 884          radicle_repos = get_radicle_repos()
 885          radicle_config = load_radicle_config()
 886  
 887          cursor.execute("""
 888              SELECT DISTINCT
 889                  repo.id, repo.name, repo.owner_name, repo.default_branch
 890              FROM repository repo
 891              ORDER BY repo.owner_name, repo.name
 892          """)
 893  
 894          repos = []
 895  
 896          for repo_row in cursor.fetchall():
 897              repo_name = repo_row["name"]
 898              owner_name = repo_row["owner_name"]
 899              repo_name_lower = repo_name.lower()
 900              default_branch = repo_row["default_branch"] or "main"
 901  
 902              forgejo_head = get_forgejo_repo_head(owner_name, repo_name)
 903  
 904              radicle_head = radicle_repos.get(repo_name_lower)
 905              rid = radicle_config.get(repo_name_lower)
 906  
 907              if radicle_head is None and rid:
 908                  sync_status = "unsynced"
 909                  radicle_synced = False
 910                  radicle_head = "?"
 911              elif radicle_head is None and rid is None:
 912                  sync_status = "no-radicle"
 913                  radicle_synced = None
 914              elif forgejo_head and radicle_head and forgejo_head == radicle_head:
 915                  sync_status = "synced"
 916                  radicle_synced = True
 917              else:
 918                  sync_status = "unsynced"
 919                  radicle_synced = False
 920  
 921              ci_status = None
 922              try:
 923                  cursor.execute("""
 924                      SELECT conclusion, status, created_unix
 925                      FROM action_run
 926                      WHERE repo_id = %s AND created_unix IS NOT NULL
 927                      ORDER BY created_unix DESC
 928                      LIMIT 1
 929                  """, (repo_row["id"],))
 930                  ci_row = cursor.fetchone()
 931                  if ci_row:
 932                      status_map = {1: "pending", 2: "running", 3: "success", 4: "failure", 5: "cancelled", 6: "skipped"}
 933                      ci_status = status_map.get(ci_row["status"], "unknown")
 934                      if ci_row["conclusion"]:
 935                          conclusion_map = {1: "success", 2: "failure", 3: "cancelled", 4: "skipped"}
 936                          ci_status = conclusion_map.get(ci_row["conclusion"], ci_status)
 937              except Exception:
 938                  pass
 939  
 940              repos.append({
 941                  "name": repo_name,
 942                  "owner_name": owner_name,
 943                  "sync_status": sync_status,
 944                  "forgejo_head": forgejo_head,
 945                  "radicle_head": radicle_head,
 946                  "rid": rid,
 947                  "forgejo_branch": default_branch,
 948                  "radicle_synced": radicle_synced,
 949                  "forgejo_ci_status": ci_status,
 950              })
 951  
 952          conn.close()
 953          return repos
 954      except Exception as e:
 955          print(f"Error querying repo status from DB: {e}, falling back to API")
 956          return query_repo_status_from_api()
 957  
 958  
 959  async def get_woodpecker_repo_status() -> list[dict]:
 960      """Get CI status for all repos from Woodpecker CI using direct API (no CLI needed)."""
 961      try:
 962          async with httpx.AsyncClient(timeout=30.0) as client:
 963              repos_data = await fetch_woodpecker_api("/api/repos", client)
 964              if not repos_data:
 965                  return []
 966  
 967              repos = []
 968              for repo in repos_data:
 969                  full_name = repo.get("full_name", "")
 970                  repo_id = repo.get("id")
 971                  parts = full_name.split("/")
 972                  if len(parts) != 2 or not repo_id:
 973                      continue
 974                  owner, name = parts
 975  
 976                  ci_status = None
 977                  try:
 978                      pipelines = await fetch_woodpecker_api(f"/api/repos/{repo_id}/pipelines?page=1&perPage=1", client)
 979                      if pipelines and len(pipelines) > 0:
 980                          ci_status = pipelines[0].get("status", "unknown")
 981                  except Exception as e:
 982                      print(f"Error getting pipeline status for {full_name}: {e}")
 983  
 984                  repos.append({
 985                      "name": name,
 986                      "full_name": full_name,
 987                      "ci_status": ci_status,
 988                      "active": repo.get("is_active", False),
 989                  })
 990  
 991              return repos
 992      except Exception as e:
 993          print(f"Error getting Woodpecker repo status: {e}")
 994          return []
 995  
 996  
 997  async def fetch_all_slow_data() -> dict:
 998      """Run all the slow fetches and return a dict of results.
 999  
1000      Called from background_refresh(), never from API endpoints.
1001      """
1002      results = {}
1003  
1004      # Fetch runners (SSH + DB)
1005      try:
1006          watcher_runners = {}
1007          try:
1008              proc = await asyncio.create_subprocess_exec(
1009                  "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
1010                  f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1011                  "cat", "/var/www/health/runners.json",
1012                  stdout=asyncio.subprocess.PIPE,
1013                  stderr=asyncio.subprocess.PIPE,
1014              )
1015              stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
1016              for r in json.loads(stdout.decode()):
1017                  num = r["name"].replace("runner-", "")
1018                  watcher_runners[f"native-runner-{num}"] = r
1019          except Exception as e:
1020              print(f"Background: error fetching watcher runners: {e}")
1021  
1022          runners = query_runners_from_db()
1023          running_jobs = query_running_jobs_from_db()
1024  
1025          for runner in runners:
1026              name = runner["name"]
1027              if name in watcher_runners:
1028                  wr = watcher_runners[name]
1029                  runner["status"] = wr["status"]
1030                  if wr["task"]:
1031                      runner_id = runner["id"]
1032                      if runner_id in running_jobs:
1033                          runner["current_job"] = running_jobs[runner_id]
1034                      else:
1035                          runner["current_job"] = {
1036                              "name": f"Task {wr['task']}",
1037                              "repo": wr["repo"],
1038                          }
1039                  else:
1040                      runner["current_job"] = None
1041              elif runner["status"] == "online":
1042                  runner["status"] = "idle"
1043  
1044          results["runners"] = {"runners": runners}
1045      except Exception as e:
1046          print(f"Background: runners fetch failed: {e}")
1047  
1048      # Fetch queue (DB)
1049      try:
1050          queue = query_queue_from_db()
1051          results["queue"] = {"queue": queue}
1052      except Exception as e:
1053          print(f"Background: queue fetch failed: {e}")
1054  
1055      # Fetch history (DB)
1056      try:
1057          history = query_history_from_db(limit=30)
1058          results["history_30"] = {"jobs": history}
1059      except Exception as e:
1060          print(f"Background: history fetch failed: {e}")
1061  
1062      # Fetch repos (DB + Woodpecker + Forgejo API) - slowest part
1063      try:
1064          repos_forgejo = query_repo_status_from_db()
1065          repos_woodpecker = await get_woodpecker_repo_status()
1066          woodpecker_by_name = {r["name"]: r for r in repos_woodpecker}
1067  
1068          for repo in repos_forgejo:
1069              repo_name = repo["name"]
1070              owner_name = repo.get("owner_name", "alpha-delta-network")
1071              repo["unmerged_branches"] = get_unmerged_branches_count(owner_name, repo_name)
1072  
1073              if repo_name in woodpecker_by_name:
1074                  wp_repo = woodpecker_by_name[repo_name]
1075                  repo["ci_status"] = wp_repo["ci_status"]
1076                  repo["ci_source"] = "woodpecker"
1077              else:
1078                  repo["ci_status"] = None
1079                  repo["ci_source"] = "none"
1080  
1081          results["repos"] = {"repos": repos_forgejo}
1082      except Exception as e:
1083          print(f"Background: repos fetch failed: {e}")
1084  
1085      # Compute stats from already-fetched data
1086      try:
1087          runners_list = results.get("runners", {}).get("runners", [])
1088          queue_list = results.get("queue", {}).get("queue", [])
1089          history_list = results.get("history_30", {}).get("jobs", [])
1090  
1091          online_runners = sum(1 for r in runners_list if r["status"] in ("online", "running"))
1092          busy_runners = sum(1 for r in runners_list if r["status"] == "running")
1093          success_count = sum(1 for j in history_list if j["result"] == "success")
1094          failure_count = sum(1 for j in history_list if j["result"] == "failure")
1095          total_completed = len(history_list)
1096  
1097          results["stats"] = {
1098              "runners": {
1099                  "total": len(runners_list),
1100                  "online": online_runners,
1101                  "busy": busy_runners,
1102              },
1103              "queue": {"length": len(queue_list)},
1104              "recent": {
1105                  "total": total_completed,
1106                  "success": success_count,
1107                  "failure": failure_count,
1108                  "success_rate": round(success_count / total_completed * 100, 1) if total_completed > 0 else 0,
1109              }
1110          }
1111      except Exception as e:
1112          print(f"Background: stats computation failed: {e}")
1113  
1114      # Fetch system metrics
1115      try:
1116          cores = psutil.cpu_count()
1117          load_1m = psutil.getloadavg()[0]
1118          cpu_percent = min(100, int(load_1m / cores * 100))
1119          memory = psutil.virtual_memory()
1120          disk = psutil.disk_usage("/")
1121          swap = psutil.swap_memory()
1122  
1123          local_metrics = {
1124              "name": "source",
1125              "cpu": {"percent": cpu_percent, "cores": cores},
1126              "memory": {
1127                  "total": memory.total, "used": memory.used,
1128                  "available": memory.available, "percent": memory.percent,
1129              },
1130              "disk": {
1131                  "total": disk.total, "used": disk.used,
1132                  "free": disk.free, "percent": disk.percent,
1133              },
1134              "swap": {
1135                  "total": swap.total, "used": swap.used,
1136                  "free": swap.free, "percent": swap.percent,
1137              },
1138              "memory_pressure": 0,
1139              "oom_events": 0,
1140          }
1141  
1142          ci_metrics = await fetch_remote_system_metrics()
1143          if ci_metrics:
1144              ci_metrics["name"] = "ci"
1145  
1146          results["system"] = {"local": local_metrics, "ci": ci_metrics}
1147      except Exception as e:
1148          print(f"Background: system fetch failed: {e}")
1149  
1150      # Fetch Woodpecker queue
1151      try:
1152          async with httpx.AsyncClient() as client:
1153              queue_data = await fetch_woodpecker_api("/api/queue/info", client)
1154              if queue_data:
1155                  running = []
1156                  for item in queue_data.get("running", []):
1157                      full_name = item.get("full_name", "")
1158                      repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown")
1159                      running.append({
1160                          "repo": repo_name,
1161                          "pipeline_id": item.get("id"),
1162                          "agent": item.get("agent", ""),
1163                      })
1164  
1165                  pending = []
1166                  for item in queue_data.get("pending", []):
1167                      full_name = item.get("full_name", "")
1168                      repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown")
1169                      pending.append({
1170                          "repo": repo_name,
1171                          "pipeline_id": item.get("id"),
1172                          "waiting_for": "agent",
1173                      })
1174  
1175                  results["woodpecker_queue"] = {
1176                      "running": running,
1177                      "pending": pending,
1178                      "stats": {
1179                          "running_count": len(running),
1180                          "pending_count": len(pending),
1181                      },
1182                      "paused": queue_data.get("paused", False),
1183                  }
1184      except Exception as e:
1185          print(f"Background: woodpecker queue fetch failed: {e}")
1186  
1187      # Fetch Woodpecker agents
1188      try:
1189          async with httpx.AsyncClient() as client:
1190              agents_data = await fetch_woodpecker_api("/api/agents", client)
1191              queue_data = await fetch_woodpecker_api("/api/queue/info", client)
1192              if agents_data:
1193                  running_pipelines = {}
1194                  if queue_data:
1195                      for item in queue_data.get("running", []):
1196                          agent_id = item.get("agent_id")
1197                          if agent_id:
1198                              full_name = item.get("full_name", "")
1199                              repo_name = full_name.split("/")[-1] if "/" in full_name else (full_name or "unknown")
1200                              running_pipelines[agent_id] = {
1201                                  "repo": repo_name,
1202                                  "pipeline_id": item.get("id"),
1203                                  "step": item.get("step", item.get("name", "")),
1204                              }
1205  
1206                  now = time.time()
1207                  active_threshold = 5 * 60
1208  
1209                  agents = []
1210                  for agent in agents_data:
1211                      if agent.get("name"):
1212                          agent_id = agent.get("id")
1213                          last_work = agent.get("last_work", 0)
1214                          time_since_work = now - last_work if last_work else float("inf")
1215  
1216                          if agent_id in running_pipelines:
1217                              status = "running"
1218                              current_pipeline = running_pipelines[agent_id]["repo"]
1219                              current_step = running_pipelines[agent_id].get("step", "")
1220                          elif time_since_work < active_threshold:
1221                              status = "active"
1222                              current_pipeline = None
1223                              current_step = None
1224                          else:
1225                              status = "idle"
1226                              current_pipeline = None
1227                              current_step = None
1228  
1229                          agents.append({
1230                              "id": agent_id,
1231                              "name": agent.get("name", "Unknown"),
1232                              "platform": agent.get("platform", ""),
1233                              "status": status,
1234                              "current_pipeline": current_pipeline,
1235                              "current_step": current_step,
1236                              "backend": agent.get("backend", ""),
1237                              "capacity": agent.get("capacity", -1),
1238                              "last_contact": agent.get("last_contact", 0),
1239                              "last_work": last_work,
1240                              "version": agent.get("version", ""),
1241                          })
1242  
1243                  results["woodpecker_agents"] = {"agents": agents}
1244      except Exception as e:
1245          print(f"Background: woodpecker agents fetch failed: {e}")
1246  
1247      # Fetch testnet node status
1248      try:
1249          testnet_nodes = await fetch_all_testnet_nodes()
1250          results["testnet_nodes"] = {
1251              "nodes": testnet_nodes,
1252              "updated_at": datetime.now(timezone.utc).isoformat(),
1253          }
1254      except Exception as e:
1255          print(f"Background: testnet nodes fetch failed: {e}")
1256  
1257      return results
1258  
1259  
1260  # Track if a refresh is already running to prevent overlapping refreshes
1261  _refresh_running = False
1262  
1263  
1264  async def background_refresh():
1265      """Continuously refresh dashboard data in the background.
1266  
1267      Runs as an asyncio task. Loops forever: fetch all slow data, update the
1268      store, persist to disk, sleep BG_REFRESH_INTERVAL seconds. Never crashes.
1269      """
1270      global _refresh_running
1271  
1272      while True:
1273          if _refresh_running:
1274              print("Background refresh already running, skipping this cycle")
1275              await asyncio.sleep(BG_REFRESH_INTERVAL)
1276              continue
1277  
1278          _refresh_running = True
1279          start = time.time()
1280          print(f"Background refresh starting at {datetime.now().isoformat()}")
1281  
1282          try:
1283              results = await fetch_all_slow_data()
1284  
1285              for key, value in results.items():
1286                  store.set(key, value)
1287  
1288              store.set_last_updated()
1289              store.persist_all()
1290  
1291              elapsed = time.time() - start
1292              print(f"Background refresh complete in {elapsed:.1f}s, updated {len(results)} keys")
1293          except Exception as e:
1294              print(f"Background refresh failed: {e}")
1295          finally:
1296              _refresh_running = False
1297  
1298          await asyncio.sleep(BG_REFRESH_INTERVAL)
1299  
1300  
1301  async def trigger_refresh_once():
1302      """Trigger a single immediate refresh. Used by /dash/api/refresh endpoint."""
1303      global _refresh_running
1304  
1305      if _refresh_running:
1306          print("Refresh already in progress, skipping immediate trigger")
1307          return
1308  
1309      _refresh_running = True
1310      start = time.time()
1311      print(f"Manual refresh triggered at {datetime.now().isoformat()}")
1312  
1313      try:
1314          results = await fetch_all_slow_data()
1315          for key, value in results.items():
1316              store.set(key, value)
1317          store.set_last_updated()
1318          store.persist_all()
1319          elapsed = time.time() - start
1320          print(f"Manual refresh complete in {elapsed:.1f}s")
1321      except Exception as e:
1322          print(f"Manual refresh failed: {e}")
1323      finally:
1324          _refresh_running = False
1325  
1326  
1327  @asynccontextmanager
1328  async def lifespan(app: FastAPI):
1329      """Application lifespan handler."""
1330      print(f"Starting CI Dashboard Backend")
1331      print(f"Forgejo URL: {FORGEJO_URL}")
1332      print(f"Woodpecker URL: {WOODPECKER_URL}")
1333      print(f"Database: postgres://{DB_HOST}:{DB_PORT}/{DB_NAME}")
1334      print(f"Cache file: {CACHE_FILE}")
1335  
1336      # Load existing cache from disk so dashboard shows data immediately
1337      store.load_from_disk()
1338  
1339      # Start background refresh task
1340      bg_task = asyncio.create_task(background_refresh())
1341      print(f"Background refresh task started (interval: {BG_REFRESH_INTERVAL}s)")
1342  
1343      yield
1344  
1345      bg_task.cancel()
1346      try:
1347          await bg_task
1348      except asyncio.CancelledError:
1349          pass
1350      print("Shutting down CI Dashboard Backend")
1351  
1352  
1353  app = FastAPI(
1354      title="CI Dashboard API",
1355      description="Backend aggregator for Forgejo CI Dashboard",
1356      version="1.0.0",
1357      lifespan=lifespan,
1358  )
1359  
1360  app.add_middleware(
1361      CORSMiddleware,
1362      allow_origins=["*"],
1363      allow_credentials=True,
1364      allow_methods=["*"],
1365      allow_headers=["*"],
1366  )
1367  
1368  
1369  @app.middleware("http")
1370  async def no_cache_middleware(request: Request, call_next):
1371      response = await call_next(request)
1372      if request.url.path.startswith("/api/"):
1373          response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
1374          response.headers["Pragma"] = "no-cache"
1375      return response
1376  
1377  
1378  @app.get("/api/health/status")
1379  async def get_health():
1380      """Get Forgejo health status (live fetch, fast)."""
1381      cached = cache.get("health")
1382      if cached:
1383          return cached
1384  
1385      try:
1386          async with httpx.AsyncClient() as client:
1387              data = await fetch_forgejo_api("/api/healthz", client)
1388              cache.set("health", data)
1389              return data
1390      except Exception as e:
1391          return {"status": "error", "message": str(e)}
1392  
1393  
1394  @app.get("/api/runners")
1395  async def get_runners():
1396      """Get list of runners with their status (served from store, instant)."""
1397      data = store.get("runners")
1398      if data is not None:
1399          return data
1400      return {"runners": []}
1401  
1402  
1403  @app.get("/api/queue")
1404  async def get_queue():
1405      """Get jobs waiting in queue (served from store, instant)."""
1406      data = store.get("queue")
1407      if data is not None:
1408          return data
1409      return {"queue": []}
1410  
1411  
1412  @app.get("/api/jobs/history")
1413  async def get_history(limit: int = 30):
1414      """Get recent job history (served from store, instant)."""
1415      data = store.get("history_30")
1416      if data is not None:
1417          jobs = data.get("jobs", [])
1418          return {"jobs": jobs[:limit]}
1419      return {"jobs": []}
1420  
1421  
1422  @app.get("/api/repos")
1423  async def get_repos():
1424      """Get repository CI status with Woodpecker CI + Radicle sync (served from store, instant)."""
1425      data = store.get("repos")
1426      if data is not None:
1427          return data
1428      return {"repos": []}
1429  
1430  
1431  @app.get("/api/stats")
1432  async def get_stats():
1433      """Get overall CI statistics (served from store, instant)."""
1434      data = store.get("stats")
1435      if data is not None:
1436          return data
1437      return {
1438          "runners": {"total": 0, "online": 0, "busy": 0},
1439          "queue": {"length": 0},
1440          "recent": {"total": 0, "success": 0, "failure": 0, "success_rate": 0},
1441      }
1442  
1443  
1444  @app.get("/api/woodpecker/queue")
1445  async def get_woodpecker_queue():
1446      """Get Woodpecker CI queue status (served from store, instant)."""
1447      data = store.get("woodpecker_queue")
1448      if data is not None:
1449          return data
1450      return {"running": [], "pending": [], "stats": {"running_count": 0, "pending_count": 0}, "paused": False}
1451  
1452  
1453  @app.get("/api/woodpecker/agents")
1454  async def get_woodpecker_agents():
1455      """Get Woodpecker CI agent status (served from store, instant)."""
1456      data = store.get("woodpecker_agents")
1457      if data is not None:
1458          return data
1459      return {"agents": []}
1460  
1461  
1462  @app.get("/api/testnet/nodes")
1463  async def get_testnet_nodes():
1464      """Get testnet node block heights and service status (served from store, instant)."""
1465      data = store.get("testnet_nodes")
1466      if data is not None:
1467          return data
1468      return {"nodes": [], "updated_at": None}
1469  
1470  
1471  @app.post("/api/cache/clear")
1472  async def clear_cache():
1473      """Clear the short-TTL cache (store data is unaffected)."""
1474      cache.clear()
1475      return {"status": "ok", "message": "Cache cleared"}
1476  
1477  
1478  @app.get("/api/system")
1479  async def get_system():
1480      """Get system resource usage (served from store, instant)."""
1481      data = store.get("system")
1482      if data is not None:
1483          return data
1484      return {"local": None, "ci": None}
1485  
1486  
1487  @app.get("/dash/api/refresh")
1488  async def trigger_refresh():
1489      """Trigger an immediate background data refresh.
1490  
1491      Returns immediately; refresh runs asynchronously in the background.
1492      """
1493      if _refresh_running:
1494          return {"status": "already_running", "message": "A refresh is already in progress"}
1495      asyncio.create_task(trigger_refresh_once())
1496      return {"status": "triggered", "message": "Refresh started in background"}
1497  
1498  
1499  @app.get("/dash/api/data_age")
1500  async def get_data_age():
1501      """Return when dashboard data was last refreshed."""
1502      last_updated = store.get_last_updated()
1503      age = store.get_data_age_seconds()
1504  
1505      if last_updated is None:
1506          return {
1507              "last_updated": None,
1508              "age_seconds": None,
1509              "message": "No data available yet",
1510          }
1511  
1512      return {
1513          "last_updated": datetime.fromtimestamp(last_updated, tz=timezone.utc).isoformat(),
1514          "age_seconds": round(age, 1) if age is not None else None,
1515          "refresh_interval": BG_REFRESH_INTERVAL,
1516      }
1517  
1518  
1519  if __name__ == "__main__":
1520      import uvicorn
1521      uvicorn.run(app, host="127.0.0.1", port=8081)
1522  
1523  
1524  # Pipeline Monitor endpoints - real-time SSH fetches (not cached in store)
1525  @app.get("/api/pipeline/metrics")
1526  async def get_pipeline_metrics():
1527      """Get current pipeline metrics from CI server watcher."""
1528      try:
1529          proc = await asyncio.create_subprocess_exec(
1530              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
1531              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1532              "cat", "/tmp/ci-monitor-data/metrics.log", "2>/dev/null", "|", "tail", "-1",
1533              stdout=asyncio.subprocess.PIPE,
1534              stderr=asyncio.subprocess.PIPE,
1535          )
1536          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
1537  
1538          if not stdout.strip():
1539              return {"load": 0, "mem_mb": 0, "active_tasks": 0, "completed": 0}
1540  
1541          data = {}
1542          for match in re.finditer(r"(\w+)=([0-9.]+)", stdout.decode()):
1543              key, val = match.groups()
1544              data[key] = float(val) if "." in val else int(val)
1545          return data
1546      except Exception as e:
1547          return {"error": str(e)}
1548  
1549  
1550  @app.get("/api/pipeline/events")
1551  async def get_pipeline_events():
1552      """Get recent pipeline events from CI server watcher."""
1553      try:
1554          proc = await asyncio.create_subprocess_exec(
1555              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
1556              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1557              "tail", "-100", "/tmp/ci-monitor-data/events.log",
1558              stdout=asyncio.subprocess.PIPE,
1559              stderr=asyncio.subprocess.PIPE,
1560          )
1561          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
1562  
1563          events = []
1564          for line in stdout.decode().strip().split("\n"):
1565              if not line:
1566                  continue
1567              match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})[^\[]*\[([^\]]+)\]\s*(.*)", line)
1568              if match:
1569                  timestamp, event_type, rest = match.groups()
1570                  events.append({
1571                      "time": timestamp[11:19],
1572                      "type": event_type,
1573                      "message": rest[:150]
1574                  })
1575          return events[-50:]
1576      except Exception as e:
1577          return []
1578  
1579  
1580  @app.get("/api/pipeline/runners")
1581  async def get_pipeline_runners():
1582      """Get current runner status from CI server watcher."""
1583      try:
1584          proc = await asyncio.create_subprocess_exec(
1585              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
1586              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1587              "cat", "/var/www/health/runners.json",
1588              stdout=asyncio.subprocess.PIPE,
1589              stderr=asyncio.subprocess.PIPE,
1590          )
1591          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
1592  
1593          return json.loads(stdout.decode())
1594      except Exception as e:
1595          return []
1596  
1597  
1598  @app.get("/api/runners/metrics")
1599  async def get_runner_metrics():
1600      """Get detailed runner metrics: load average, CPU quota, throttling stats."""
1601      cached = cache.get("runner_metrics")
1602      if cached:
1603          return cached
1604  
1605      script = '''
1606      uptime | awk -F"load average: " '{print $2}' | awk '{print $1,$2,$3}' | tr -d ','
1607      echo "---"
1608      systemctl show forgejo-runners.slice | grep -E "CPUQuota|CPUUsageNSec"
1609      echo "---"
1610      cat /sys/fs/cgroup/forgejo.slice/forgejo-runners.slice/cpu.stat 2>/dev/null || echo "nr_throttled 0"
1611      echo "---"
1612      systemctl list-units 'forgejo-runner-*' --state=active | grep -c "forgejo-runner-"
1613      '''
1614  
1615      try:
1616          proc = await asyncio.create_subprocess_exec(
1617              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
1618              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1619              "/bin/bash", "-c", script,
1620              stdout=asyncio.subprocess.PIPE,
1621              stderr=asyncio.subprocess.PIPE,
1622          )
1623          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
1624  
1625          output = stdout.decode().strip().split("---")
1626  
1627          load_parts = output[0].strip().split()
1628          load_1m = float(load_parts[0]) if len(load_parts) > 0 else 0
1629          load_5m = float(load_parts[1]) if len(load_parts) > 1 else 0
1630          load_15m = float(load_parts[2]) if len(load_parts) > 2 else 0
1631  
1632          cpu_quota_line = [line for line in output[1].strip().split("\n") if "CPUQuota" in line]
1633          cpu_quota_pct = 100
1634          if cpu_quota_line:
1635              quota_match = re.search(r'CPUQuotaPerSecUSec=(\d+)s', cpu_quota_line[0])
1636              if quota_match:
1637                  cpu_quota_pct = int(quota_match.group(1)) * 100
1638  
1639          throttle_lines = output[2].strip().split("\n")
1640          nr_throttled = 0
1641          throttled_usec = 0
1642          for line in throttle_lines:
1643              if line.startswith("nr_throttled"):
1644                  nr_throttled = int(line.split()[1])
1645              elif line.startswith("throttled_usec"):
1646                  throttled_usec = int(line.split()[1])
1647  
1648          active_runners = int(output[3].strip())
1649  
1650          result = {
1651              "load_average": {
1652                  "1m": load_1m,
1653                  "5m": load_5m,
1654                  "15m": load_15m
1655              },
1656              "cpu_quota_percent": cpu_quota_pct,
1657              "throttling": {
1658                  "nr_throttled": nr_throttled,
1659                  "throttled_seconds": throttled_usec / 1_000_000
1660              },
1661              "runners": {
1662                  "active": active_runners,
1663                  "total": 10
1664              }
1665          }
1666  
1667          cache.set("runner_metrics", result)
1668          return result
1669  
1670      except Exception as e:
1671          return {"error": str(e)}
1672  
1673  
1674  @app.get("/api/sccache/metrics")
1675  async def get_sccache_metrics():
1676      """Get sccache compilation cache metrics from CI server."""
1677      cached = cache.get("sccache_metrics")
1678      if cached:
1679          return cached
1680  
1681      try:
1682          proc = await asyncio.create_subprocess_exec(
1683              "ssh",
1684              "-o", "ConnectTimeout=5",
1685              "-o", "BatchMode=yes",
1686              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
1687              "cat", "/var/www/health/sccache.json",
1688              stdout=asyncio.subprocess.PIPE,
1689              stderr=asyncio.subprocess.PIPE,
1690          )
1691  
1692          try:
1693              stdout, stderr = await asyncio.wait_for(
1694                  proc.communicate(),
1695                  timeout=10
1696              )
1697          except asyncio.TimeoutError:
1698              proc.kill()
1699              await proc.wait()
1700              return {"error": "Timeout fetching sccache metrics"}
1701  
1702          if proc.returncode != 0:
1703              return {"error": f"Failed to fetch sccache metrics: {stderr.decode()}"}
1704  
1705          result = json.loads(stdout.decode().strip())
1706  
1707          cache.set("sccache_metrics", result)
1708          return result
1709  
1710      except Exception as e:
1711          return {"error": str(e)}