/ backend / main.py
main.py
  1  #!/usr/bin/env python3
  2  """
  3  Forgejo CI Dashboard Backend
  4  Aggregates data from Forgejo API and database for the dashboard.
  5  """
  6  
  7  import asyncio
  8  import os
  9  import re
 10  import sqlite3
 11  import subprocess
 12  import time
 13  from contextlib import asynccontextmanager
 14  from datetime import datetime, timezone
 15  from pathlib import Path
 16  from typing import Any
 17  
 18  import httpx
 19  import psutil
 20  from dotenv import load_dotenv
 21  from fastapi import FastAPI, HTTPException
 22  from fastapi.middleware.cors import CORSMiddleware
 23  from fastapi.responses import JSONResponse
 24  
 25  load_dotenv()
 26  
 27  # Configuration
 28  FORGEJO_URL = os.getenv("FORGEJO_URL", "http://localhost:3000")
 29  FORGEJO_TOKEN = os.getenv("FORGEJO_TOKEN", "")
 30  FORGEJO_DB_PATH = os.getenv("FORGEJO_DB_PATH", "/var/lib/forgejo/forgejo.db")
 31  FORGEJO_REPOS_PATH = os.getenv("FORGEJO_REPOS_PATH", "/var/lib/forgejo/repositories")
 32  RADICLE_BIN = os.getenv("RADICLE_BIN", "/home/devops/.radicle/bin/rad")
 33  CACHE_TTL = int(os.getenv("CACHE_TTL", "10"))  # seconds
 34  
 35  # CI stages to track (customize as needed)
 36  CI_STAGES = ["build", "check", "clippy", "fmt", "test", "node"]
 37  
 38  # Remote CI server configuration
 39  CI_SERVER_HOST = os.getenv("CI_SERVER_HOST", "10.106.0.3")
 40  CI_SERVER_USER = os.getenv("CI_SERVER_USER", "devops")
 41  CI_SERVER_SSH_TIMEOUT = int(os.getenv("CI_SERVER_SSH_TIMEOUT", "5"))
 42  
 43  
 44  async def fetch_remote_system_metrics() -> dict | None:
 45      """Fetch system metrics from remote CI server via SSH using Linux commands.
 46  
 47      Uses /proc/stat aggregate line for CPU (average across ALL cores),
 48      free for memory, and df for disk usage.
 49      """
 50      # Shell script to gather metrics and output JSON
 51      # CPU: Sample /proc/stat twice with 0.1s interval to calculate usage percentage
 52      # The first line of /proc/stat (starting with "cpu ") is aggregate across all CPUs
 53      script = '''
 54  read _ u1 n1 s1 i1 w1 x1 y1 _ < /proc/stat
 55  sleep 0.1
 56  read _ u2 n2 s2 i2 w2 x2 y2 _ < /proc/stat
 57  cores=$(nproc)
 58  total1=$((u1+n1+s1+i1+w1+x1+y1))
 59  total2=$((u2+n2+s2+i2+w2+x2+y2))
 60  idle_delta=$((i2-i1))
 61  total_delta=$((total2-total1))
 62  if [ $total_delta -gt 0 ]; then
 63      cpu_pct=$((100 * (total_delta - idle_delta) / total_delta))
 64  else
 65      cpu_pct=0
 66  fi
 67  read _ mem_total mem_used mem_free _ _ mem_avail _ < <(free -b | grep Mem)
 68  read _ disk_total disk_used disk_avail _ _ < <(df -B1 / | tail -1)
 69  disk_pct=$((100 * disk_used / disk_total))
 70  mem_pct=$((100 * mem_used / mem_total))
 71  echo "{\\"cpu\\":{\\"percent\\":$cpu_pct,\\"cores\\":$cores},\\"memory\\":{\\"total\\":$mem_total,\\"used\\":$mem_used,\\"available\\":$mem_avail,\\"percent\\":$mem_pct},\\"disk\\":{\\"total\\":$disk_total,\\"used\\":$disk_used,\\"free\\":$disk_avail,\\"percent\\":$disk_pct}}"
 72  '''
 73  
 74      try:
 75          proc = await asyncio.create_subprocess_exec(
 76              "ssh",
 77              "-o", "ConnectTimeout=5",
 78              "-o", "BatchMode=yes",
 79              "-o", "StrictHostKeyChecking=accept-new",
 80              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
 81              "bash", "-c", script,
 82              stdout=asyncio.subprocess.PIPE,
 83              stderr=asyncio.subprocess.PIPE,
 84          )
 85  
 86          try:
 87              stdout, stderr = await asyncio.wait_for(
 88                  proc.communicate(),
 89                  timeout=CI_SERVER_SSH_TIMEOUT
 90              )
 91          except asyncio.TimeoutError:
 92              proc.kill()
 93              await proc.wait()
 94              print(f"SSH timeout fetching metrics from {CI_SERVER_HOST}")
 95              return None
 96  
 97          if proc.returncode != 0:
 98              print(f"SSH error fetching metrics from {CI_SERVER_HOST}: {stderr.decode()}")
 99              return None
100  
101          import json
102          return json.loads(stdout.decode().strip())
103  
104      except Exception as e:
105          print(f"Error fetching remote metrics: {e}")
106          return None
107  
108  
109  class Cache:
110      """Simple in-memory cache with TTL."""
111  
112      def __init__(self, ttl: int = 10):
113          self.ttl = ttl
114          self._cache: dict[str, tuple[float, Any]] = {}
115  
116      def get(self, key: str) -> Any | None:
117          if key in self._cache:
118              timestamp, value = self._cache[key]
119              if time.time() - timestamp < self.ttl:
120                  return value
121              del self._cache[key]
122          return None
123  
124      def set(self, key: str, value: Any) -> None:
125          self._cache[key] = (time.time(), value)
126  
127      def clear(self) -> None:
128          self._cache.clear()
129  
130  
131  cache = Cache(ttl=CACHE_TTL)
132  
133  
134  def load_radicle_config() -> dict[str, str | None]:
135      """Load repo-to-RID mappings from config file.
136  
137      Returns dict mapping repo name (lowercase) to RID or None.
138      """
139      config_path = Path(__file__).parent / "radicle-repos.json"
140      try:
141          if config_path.exists():
142              import json
143              with open(config_path) as f:
144                  data = json.load(f)
145                  return {k.lower(): v for k, v in data.get("repos", {}).items()}
146      except Exception as e:
147          print(f"Error loading radicle config: {e}")
148      return {}
149  
150  
151  def get_radicle_head_for_rid(rid: str) -> str | None:
152      """Get the HEAD commit for a radicle repo by RID.
153  
154      Uses rad inspect to get repo details.
155      """
156      if not rid:
157          return None
158      try:
159          result = subprocess.run(
160              [RADICLE_BIN, "inspect", "--refs", rid],
161              capture_output=True,
162              text=True,
163              timeout=10,
164              env={**os.environ, "RAD_HOME": "/home/devops/.radicle"}
165          )
166          if result.returncode != 0:
167              return None
168  
169          # Parse output to find HEAD or main/master ref
170          # Look for lines like: refs/heads/main -> abc1234
171          for line in result.stdout.split("\n"):
172              if "refs/heads/main" in line or "refs/heads/master" in line:
173                  parts = line.split()
174                  for part in parts:
175                      # Find the commit hash (7+ hex chars)
176                      if len(part) >= 7 and all(c in "0123456789abcdef" for c in part[:7]):
177                          return part[:7]
178          return None
179      except Exception as e:
180          print(f"Error getting radicle HEAD for {rid}: {e}")
181          return None
182  
183  
184  def get_radicle_repos() -> dict[str, str | None]:
185      """Get radicle repos with their HEAD commits from CI server.
186  
187      Returns dict mapping repo name (lowercase) to HEAD commit hash or None.
188      Queries CI server's Radicle storage directly using the CI runner's namespace
189      to get the actual synced HEAD commits (not the canonical branch).
190      """
191      try:
192          # Get CI runner's node ID and query git refs from Radicle storage
193          # The CI runner's namespace contains the actually synced commits
194          radicle_config = load_radicle_config()
195          if not radicle_config:
196              return {}
197  
198          # Build a script that:
199          # 1. Gets the CI runner's node ID (DID)
200          # 2. For each RID, reads the HEAD from the runner's namespace
201          rids = {name: rid for name, rid in radicle_config.items() if rid}
202          if not rids:
203              return {}
204  
205          # Create script to extract heads from CI runner's namespace
206          rid_list = " ".join(f"{name}:{rid.replace('rad:', '')}" for name, rid in rids.items())
207          script = f'''
208  export PATH=/home/devops/.radicle/bin:$PATH
209  NODE_ID=$(rad self 2>/dev/null | grep "^DID" | awk '{{print $2}}' | sed 's/did:key://')
210  if [ -z "$NODE_ID" ]; then
211      echo "ERROR: Could not get node ID" >&2
212      exit 1
213  fi
214  for entry in {rid_list}; do
215      name="${{entry%%:*}}"
216      rid="${{entry#*:}}"
217      ref_path="$HOME/.radicle/storage/$rid/refs/namespaces/$NODE_ID/refs/heads/main"
218      if [ -f "$ref_path" ]; then
219          head=$(cat "$ref_path" | head -c 7)
220          echo "$name:$head"
221      fi
222  done
223  '''
224  
225          result = subprocess.run(
226              [
227                  "ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes",
228                  f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
229                  "bash", "-c", script
230              ],
231              capture_output=True,
232              text=True,
233              timeout=15,
234          )
235  
236          repos = {}
237          if result.returncode == 0:
238              for line in result.stdout.strip().split("\n"):
239                  if ":" in line:
240                      parts = line.split(":", 1)
241                      if len(parts) == 2:
242                          name, head = parts
243                          if head and len(head) >= 7:
244                              repos[name.lower()] = head
245  
246          return repos
247      except Exception as e:
248          print(f"Error getting radicle repos: {e}")
249          return {}
250  
251  
252  def get_forgejo_repo_head(owner: str, repo: str) -> str | None:
253      """Get the HEAD commit hash for a Forgejo repository."""
254      try:
255          repo_path = Path(FORGEJO_REPOS_PATH) / owner / f"{repo}.git"
256          if not repo_path.exists():
257              return None
258  
259          def read_packed_ref(ref_name: str) -> str | None:
260              """Read a ref from packed-refs file if it exists."""
261              packed_refs_path = repo_path / "packed-refs"
262              if not packed_refs_path.exists():
263                  return None
264              try:
265                  for line in packed_refs_path.read_text().splitlines():
266                      line = line.strip()
267                      # Skip comments and empty lines
268                      if not line or line.startswith("#") or line.startswith("^"):
269                          continue
270                      # Format: <sha> <ref>
271                      parts = line.split()
272                      if len(parts) >= 2 and parts[1] == ref_name:
273                          return parts[0][:7]
274              except Exception:
275                  pass
276              return None
277  
278          # Try refs/heads/main first, then master
279          for branch in ["main", "master"]:
280              ref_path = repo_path / "refs" / "heads" / branch
281              if ref_path.exists():
282                  return ref_path.read_text().strip()[:7]
283              # Check packed-refs if loose ref doesn't exist
284              packed_head = read_packed_ref(f"refs/heads/{branch}")
285              if packed_head:
286                  return packed_head
287  
288          # Fallback to HEAD
289          head_path = repo_path / "HEAD"
290          if head_path.exists():
291              head_content = head_path.read_text().strip()
292              if head_content.startswith("ref:"):
293                  ref = head_content.split("ref:")[1].strip()
294                  ref_path = repo_path / ref
295                  if ref_path.exists():
296                      return ref_path.read_text().strip()[:7]
297                  # Check packed-refs for the symbolic ref target
298                  packed_head = read_packed_ref(ref)
299                  if packed_head:
300                      return packed_head
301              else:
302                  return head_content[:7]
303          return None
304      except Exception as e:
305          print(f"Error getting Forgejo repo HEAD for {owner}/{repo}: {e}")
306          return None
307  
308  
309  async def fetch_forgejo_api(path: str, client: httpx.AsyncClient) -> Any:
310      """Fetch data from Forgejo API."""
311      headers = {}
312      if FORGEJO_TOKEN:
313          headers["Authorization"] = f"token {FORGEJO_TOKEN}"
314  
315      url = f"{FORGEJO_URL}{path}"
316      response = await client.get(url, headers=headers, timeout=30.0)
317      response.raise_for_status()
318      return response.json()
319  
320  
321  def query_runners_from_db() -> list[dict]:
322      """Query runner information directly from Forgejo SQLite database."""
323      if not Path(FORGEJO_DB_PATH).exists():
324          return []
325  
326      try:
327          conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True)
328          conn.row_factory = sqlite3.Row
329          cursor = conn.cursor()
330  
331          cursor.execute("""
332              SELECT
333                  id, name, uuid, version, agent_labels,
334                  last_online, last_active, owner_id, repo_id
335              FROM action_runner
336              WHERE deleted IS NULL OR deleted = 0
337              ORDER BY last_online DESC
338          """)
339  
340          runners = []
341          now = time.time()
342          for row in cursor.fetchall():
343              last_online = row["last_online"] or 0
344              last_active = row["last_active"] or 0
345  
346              # Consider runner "online" if seen in last 60 seconds
347              is_online = (now - last_online) < 60
348  
349              # Parse labels JSON
350              labels = []
351              if row["agent_labels"]:
352                  import json
353                  try:
354                      labels = json.loads(row["agent_labels"])
355                  except json.JSONDecodeError:
356                      labels = []
357  
358              runners.append({
359                  "id": row["id"],
360                  "name": row["name"],
361                  "uuid": row["uuid"],
362                  "version": row["version"],
363                  "labels": labels,
364                  "status": "online" if is_online else "offline",
365                  "last_online": datetime.fromtimestamp(last_online, tz=timezone.utc).isoformat() if last_online else None,
366                  "last_active": datetime.fromtimestamp(last_active, tz=timezone.utc).isoformat() if last_active else None,
367                  "current_job": None,  # Will be populated from jobs query
368              })
369  
370          conn.close()
371          return runners
372      except Exception as e:
373          print(f"Error querying runners from DB: {e}")
374          return []
375  
376  
377  def query_running_jobs_from_db() -> dict[int, dict]:
378      """Query currently running jobs to associate with runners."""
379      if not Path(FORGEJO_DB_PATH).exists():
380          return {}
381  
382      try:
383          conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True)
384          conn.row_factory = sqlite3.Row
385          cursor = conn.cursor()
386  
387          now = int(time.time())
388          # Only consider tasks started in the last 2 hours as "running"
389          # Older tasks with status=2 are likely stuck/orphaned
390          max_age = now - (2 * 3600)
391  
392          # Query running tasks (status 2 = running in Forgejo)
393          # Get only the most recent task per runner
394          cursor.execute("""
395              SELECT
396                  t.id, t.job_id, t.runner_id, t.status, t.started,
397                  j.name as job_name, j.run_id,
398                  r.title as run_title, r.repo_id
399              FROM action_task t
400              LEFT JOIN action_run_job j ON t.job_id = j.id
401              LEFT JOIN action_run r ON j.run_id = r.id
402              WHERE t.status = 2 AND t.started > ?
403              ORDER BY t.started DESC
404          """, (max_age,))
405  
406          jobs_by_runner = {}
407          for row in cursor.fetchall():
408              runner_id = row["runner_id"]
409              # Only keep the most recent job per runner (first due to ORDER BY DESC)
410              if runner_id and runner_id not in jobs_by_runner:
411                  jobs_by_runner[runner_id] = {
412                      "id": row["id"],
413                      "name": row["job_name"] or "Unknown",
414                      "run_title": row["run_title"],
415                      "started": datetime.fromtimestamp(row["started"], tz=timezone.utc).isoformat() if row["started"] else None,
416                  }
417  
418          conn.close()
419          return jobs_by_runner
420      except Exception as e:
421          print(f"Error querying running jobs from DB: {e}")
422          return {}
423  
424  
425  def query_queue_from_db() -> list[dict]:
426      """Query active workflow runs and return repo-level progress summary.
427  
428      Returns a list of repos with active runs, showing completed/total jobs.
429      Format: {"repo": "alphavm", "done": 3, "total": 8, "status": "running"}
430      """
431      if not Path(FORGEJO_DB_PATH).exists():
432          return []
433  
434      try:
435          conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True)
436          conn.row_factory = sqlite3.Row
437          cursor = conn.cursor()
438  
439          now = int(time.time())
440          # Only consider runs updated in the last 30 minutes
441          max_age = now - (30 * 60)
442  
443          # Get active workflow runs (status 1=pending, 2=running)
444          cursor.execute("""
445              SELECT
446                  r.id as run_id, r.status as run_status, r.updated,
447                  repo.name as repo_name
448              FROM action_run r
449              JOIN repository repo ON r.repo_id = repo.id
450              WHERE r.status IN (1, 2) AND r.updated > ?
451              ORDER BY r.updated DESC
452          """, (max_age,))
453  
454          runs = cursor.fetchall()
455          if not runs:
456              conn.close()
457              return []
458  
459          # For each active run, count jobs by status
460          # Job status: 1=pending, 2=running, 3=success, 4=failure, 5=cancelled, 6=skipped
461          queue_summary = []
462          seen_repos = set()
463  
464          for run in runs:
465              repo_name = run["repo_name"]
466              # Only show each repo once (most recent run)
467              if repo_name in seen_repos:
468                  continue
469              seen_repos.add(repo_name)
470  
471              cursor.execute("""
472                  SELECT status, COUNT(*) as count
473                  FROM action_run_job
474                  WHERE run_id = ?
475                  GROUP BY status
476              """, (run["run_id"],))
477  
478              status_counts = {row["status"]: row["count"] for row in cursor.fetchall()}
479  
480              # Calculate done (success, failure, cancelled, skipped) vs total
481              done = sum(status_counts.get(s, 0) for s in [3, 4, 5, 6])
482              total = sum(status_counts.values())
483              running = status_counts.get(2, 0)
484              pending = status_counts.get(1, 0) + status_counts.get(0, 0)
485  
486              # Determine overall status
487              if running > 0:
488                  status = "running"
489              elif pending > 0:
490                  status = "pending"
491              else:
492                  status = "done"
493  
494              queue_summary.append({
495                  "repo": repo_name,
496                  "done": done,
497                  "total": total,
498                  "running": running,
499                  "status": status,
500              })
501  
502          conn.close()
503          return queue_summary
504      except Exception as e:
505          print(f"Error querying queue from DB: {e}")
506          return []
507  
508  
509  def query_history_from_db(limit: int = 30) -> list[dict]:
510      """Query recent job history from database.
511  
512      Returns jobs sorted chronologically:
513      - Unfinished jobs (running/pending) first, by start time newest first
514      - Completed jobs second, by completion time newest first
515      """
516      if not Path(FORGEJO_DB_PATH).exists():
517          return []
518  
519      try:
520          conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True)
521          conn.row_factory = sqlite3.Row
522          cursor = conn.cursor()
523  
524          status_map = {
525              0: "unknown", 1: "queued", 2: "running",
526              3: "success", 4: "failure", 5: "cancelled", 6: "skipped"
527          }
528  
529          history = []
530  
531          # First: Query actually running jobs, sorted by start time newest first
532          # Status: 2=running, but only if stopped is 0/NULL (not yet completed)
533          # Some jobs have status=2 but already have stopped times (stale status)
534          cursor.execute("""
535              SELECT
536                  j.id, j.name, j.status, j.started, j.stopped, j.updated,
537                  r.title, r.ref as branch, r.event,
538                  repo.name as repo_name, repo.owner_name
539              FROM action_run_job j
540              JOIN action_run r ON j.run_id = r.id
541              JOIN repository repo ON r.repo_id = repo.id
542              WHERE j.status = 2 AND (j.stopped IS NULL OR j.stopped = 0)
543              ORDER BY CASE WHEN j.started > 0 THEN j.started ELSE j.updated END DESC
544              LIMIT ?
545          """, (limit,))
546  
547          for row in cursor.fetchall():
548              started = row["started"] or 0
549              branch = row["branch"] or ""
550              branch = branch.replace("refs/heads/", "").replace("refs/tags/", "")
551  
552              history.append({
553                  "id": row["id"],
554                  "job": row["name"],
555                  "repo": f"{row['owner_name']}/{row['repo_name']}",
556                  "branch": branch,
557                  "result": status_map.get(row["status"], "unknown"),
558                  "duration": 0,  # Still running, no duration yet
559                  "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None,
560                  "finished_at": None,
561              })
562  
563          # Second: Query completed jobs, sorted by completion time newest first
564          # Include jobs with stopped time > 0 (actually finished) OR status indicates completion
565          # This catches jobs with stale status=2 that actually finished
566          remaining_limit = limit - len(history)
567          if remaining_limit > 0:
568              cursor.execute("""
569                  SELECT
570                      j.id, j.name, j.status, j.started, j.stopped, j.updated,
571                      r.title, r.ref as branch, r.event,
572                      repo.name as repo_name, repo.owner_name
573                  FROM action_run_job j
574                  JOIN action_run r ON j.run_id = r.id
575                  JOIN repository repo ON r.repo_id = repo.id
576                  WHERE j.status IN (3, 4, 5, 6) OR (j.stopped > 0 AND j.started > 0)
577                  ORDER BY CASE WHEN j.stopped > 0 THEN j.stopped ELSE j.updated END DESC
578                  LIMIT ?
579              """, (remaining_limit,))
580  
581              for row in cursor.fetchall():
582                  started = row["started"] or 0
583                  stopped = row["stopped"] if row["stopped"] else row["updated"]
584                  duration = stopped - started if stopped and started else 0
585  
586                  branch = row["branch"] or ""
587                  branch = branch.replace("refs/heads/", "").replace("refs/tags/", "")
588  
589                  # Determine result: if job has stopped time but status is still running/queued,
590                  # treat as success (stale status that wasn't updated)
591                  status = row["status"]
592                  if status in (1, 2) and stopped > 0:
593                      result = "success"
594                  else:
595                      result = status_map.get(status, "unknown")
596  
597                  history.append({
598                      "id": row["id"],
599                      "job": row["name"],
600                      "repo": f"{row['owner_name']}/{row['repo_name']}",
601                      "branch": branch,
602                      "result": result,
603                      "duration": duration,
604                      "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None,
605                      "finished_at": datetime.fromtimestamp(stopped, tz=timezone.utc).isoformat() if stopped else None,
606                  })
607  
608          conn.close()
609          return history
610      except Exception as e:
611          print(f"Error querying history from DB: {e}")
612          return []
613  
614  
615  def query_repo_status_from_db() -> list[dict]:
616      """Query all repositories with their radicle sync status.
617  
618      Returns repos with sync_status:
619      - "synced" (green): Forgejo HEAD == Radicle HEAD
620      - "unsynced" (yellow): Forgejo HEAD != Radicle HEAD
621      - "no-radicle" (red): No radicle origin for this repo
622      """
623      if not Path(FORGEJO_DB_PATH).exists():
624          return []
625  
626      try:
627          conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True)
628          conn.row_factory = sqlite3.Row
629          cursor = conn.cursor()
630  
631          # Get radicle HEAD commits from rad ls (for seeded repos)
632          radicle_repos = get_radicle_repos()
633  
634          # Load RID config for repos not in rad ls
635          radicle_config = load_radicle_config()
636  
637          # Get all repos (not just those with action runs)
638          cursor.execute("""
639              SELECT DISTINCT
640                  repo.id, repo.name, repo.owner_name, repo.default_branch
641              FROM repository repo
642              ORDER BY repo.owner_name, repo.name
643          """)
644  
645          repos = []
646  
647          for repo_row in cursor.fetchall():
648              repo_name = repo_row["name"]
649              owner_name = repo_row["owner_name"]
650              repo_name_lower = repo_name.lower()
651  
652              # Get Forgejo HEAD commit
653              forgejo_head = get_forgejo_repo_head(owner_name, repo_name)
654  
655              # Check radicle - first from rad ls, then from config
656              radicle_head = radicle_repos.get(repo_name_lower)
657              rid = radicle_config.get(repo_name_lower)
658  
659              # If we have a RID in config but no HEAD from rad ls, the repo has radicle
660              # but we might not have the HEAD synced locally
661              if radicle_head is None and rid:
662                  # Repo has radicle origin but HEAD unknown (not seeded locally)
663                  sync_status = "unsynced"
664                  radicle_head = "?"
665              elif radicle_head is None and rid is None:
666                  sync_status = "no-radicle"
667              elif forgejo_head and radicle_head and forgejo_head == radicle_head:
668                  sync_status = "synced"
669              else:
670                  sync_status = "unsynced"
671  
672              repos.append({
673                  "name": repo_name,
674                  "sync_status": sync_status,
675                  "forgejo_head": forgejo_head,
676                  "radicle_head": radicle_head,
677                  "rid": rid,
678              })
679  
680          conn.close()
681          return repos
682      except Exception as e:
683          print(f"Error querying repo status from DB: {e}")
684          return []
685  
686  
687  @asynccontextmanager
688  async def lifespan(app: FastAPI):
689      """Application lifespan handler."""
690      print(f"Starting CI Dashboard Backend")
691      print(f"Forgejo URL: {FORGEJO_URL}")
692      print(f"Database: {FORGEJO_DB_PATH}")
693      yield
694      print("Shutting down CI Dashboard Backend")
695  
696  
697  app = FastAPI(
698      title="CI Dashboard API",
699      description="Backend aggregator for Forgejo CI Dashboard",
700      version="1.0.0",
701      lifespan=lifespan,
702  )
703  
704  # CORS middleware
705  app.add_middleware(
706      CORSMiddleware,
707      allow_origins=["*"],
708      allow_credentials=True,
709      allow_methods=["*"],
710      allow_headers=["*"],
711  )
712  
713  
714  @app.get("/api/health/status")
715  async def get_health():
716      """Get Forgejo health status."""
717      cached = cache.get("health")
718      if cached:
719          return cached
720  
721      try:
722          async with httpx.AsyncClient() as client:
723              data = await fetch_forgejo_api("/api/healthz", client)
724              cache.set("health", data)
725              return data
726      except Exception as e:
727          return {"status": "error", "message": str(e)}
728  
729  
730  @app.get("/api/runners")
731  async def get_runners():
732      """Get list of runners with their status from CI watcher (real-time)."""
733      cached = cache.get("runners")
734      if cached:
735          return cached
736  
737      # Get real-time status from CI watcher
738      watcher_runners = {}
739      try:
740          proc = await asyncio.create_subprocess_exec(
741              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
742              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
743              "cat", "/var/www/health/runners.json",
744              stdout=asyncio.subprocess.PIPE,
745              stderr=asyncio.subprocess.PIPE,
746          )
747          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
748          import json
749          for r in json.loads(stdout.decode()):
750              # Map runner-N to native-runner-N
751              num = r["name"].replace("runner-", "")
752              watcher_runners[f"native-runner-{num}"] = r
753      except Exception as e:
754          print(f"Error fetching watcher runners: {e}")
755  
756      # Get runner metadata from Forgejo DB
757      runners = query_runners_from_db()
758  
759      # Merge: use watcher status as source of truth
760      for runner in runners:
761          name = runner["name"]
762          if name in watcher_runners:
763              wr = watcher_runners[name]
764              runner["status"] = wr["status"]
765              if wr["task"]:
766                  runner["current_job"] = {
767                      "name": f"Task {wr['task']}",
768                      "run_title": wr["repo"],
769                  }
770              else:
771                  runner["current_job"] = None
772          elif runner["status"] == "online":
773              runner["status"] = "idle"
774  
775      result = {"runners": runners}
776      cache.set("runners", result)
777      return result
778  
779  
780  @app.get("/api/queue")
781  async def get_queue():
782      """Get jobs waiting in queue."""
783      cached = cache.get("queue")
784      if cached:
785          return cached
786  
787      queue = query_queue_from_db()
788      result = {"queue": queue}
789      cache.set("queue", result)
790      return result
791  
792  
793  @app.get("/api/jobs/history")
794  async def get_history(limit: int = 30):
795      """Get recent job history."""
796      cache_key = f"history_{limit}"
797      cached = cache.get(cache_key)
798      if cached:
799          return cached
800  
801      history = query_history_from_db(limit=limit)
802      result = {"jobs": history}
803      cache.set(cache_key, result)
804      return result
805  
806  
807  @app.get("/api/repos")
808  async def get_repos():
809      """Get repository CI status."""
810      cached = cache.get("repos")
811      if cached:
812          return cached
813  
814      repos = query_repo_status_from_db()
815      result = {"repos": repos}
816      cache.set("repos", result)
817      return result
818  
819  
820  @app.get("/api/stats")
821  async def get_stats():
822      """Get overall CI statistics."""
823      cached = cache.get("stats")
824      if cached:
825          return cached
826  
827      runners = query_runners_from_db()
828      queue = query_queue_from_db()
829      history = query_history_from_db(limit=100)
830  
831      online_runners = sum(1 for r in runners if r["status"] in ("online", "running"))
832      busy_runners = sum(1 for r in runners if r["status"] == "running")
833  
834      success_count = sum(1 for j in history if j["result"] == "success")
835      failure_count = sum(1 for j in history if j["result"] == "failure")
836      total_completed = len(history)
837  
838      result = {
839          "runners": {
840              "total": len(runners),
841              "online": online_runners,
842              "busy": busy_runners,
843          },
844          "queue": {
845              "length": len(queue),
846          },
847          "recent": {
848              "total": total_completed,
849              "success": success_count,
850              "failure": failure_count,
851              "success_rate": round(success_count / total_completed * 100, 1) if total_completed > 0 else 0,
852          }
853      }
854      cache.set("stats", result)
855      return result
856  
857  
858  @app.post("/api/cache/clear")
859  async def clear_cache():
860      """Clear the data cache."""
861      cache.clear()
862      return {"status": "ok", "message": "Cache cleared"}
863  
864  
865  @app.get("/api/system")
866  async def get_system():
867      """Get system resource usage (CPU, memory, disk) for both local and CI servers."""
868      cached = cache.get("system")
869      if cached:
870          return cached
871  
872      # Local server metrics (this Forgejo/Radicle server)
873      # CPU usage (percent across all cores)
874      cpu_percent = psutil.cpu_percent(interval=0.1)
875      memory = psutil.virtual_memory()
876      disk = psutil.disk_usage("/")
877  
878      local_metrics = {
879          "name": "source",
880          "cpu": {
881              "percent": cpu_percent,
882              "cores": psutil.cpu_count(),
883          },
884          "memory": {
885              "total": memory.total,
886              "used": memory.used,
887              "available": memory.available,
888              "percent": memory.percent,
889          },
890          "disk": {
891              "total": disk.total,
892              "used": disk.used,
893              "free": disk.free,
894              "percent": disk.percent,
895          },
896      }
897  
898      # Remote CI server metrics (fetched via SSH)
899      ci_metrics = await fetch_remote_system_metrics()
900      if ci_metrics:
901          ci_metrics["name"] = "ci"
902  
903      result = {
904          "local": local_metrics,
905          "ci": ci_metrics,
906      }
907      cache.set("system", result)
908      return result
909  
910  
911  if __name__ == "__main__":
912      import uvicorn
913      uvicorn.run(app, host="127.0.0.1", port=8081)
914  
915  
916  # Pipeline Monitor endpoints - fetches data from CI watcher
917  @app.get("/api/pipeline/metrics")
918  async def get_pipeline_metrics():
919      """Get current pipeline metrics from CI server watcher."""
920      try:
921          proc = await asyncio.create_subprocess_exec(
922              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
923              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
924              "cat", "/tmp/ci-monitor-data/metrics.log", "2>/dev/null", "|", "tail", "-1",
925              stdout=asyncio.subprocess.PIPE,
926              stderr=asyncio.subprocess.PIPE,
927          )
928          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
929          
930          if not stdout.strip():
931              return {"load": 0, "mem_mb": 0, "active_tasks": 0, "completed": 0}
932          
933          # Parse: 2026-01-09T16:35:58 load=43.3 mem_mb=13943 containers=0 active_tasks=50 completed=0
934          import re
935          data = {}
936          for match in re.finditer(r"(\w+)=([0-9.]+)", stdout.decode()):
937              key, val = match.groups()
938              data[key] = float(val) if "." in val else int(val)
939          return data
940      except Exception as e:
941          return {"error": str(e)}
942  
943  
944  @app.get("/api/pipeline/events")
945  async def get_pipeline_events():
946      """Get recent pipeline events from CI server watcher."""
947      try:
948          proc = await asyncio.create_subprocess_exec(
949              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
950              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
951              "tail", "-100", "/tmp/ci-monitor-data/events.log",
952              stdout=asyncio.subprocess.PIPE,
953              stderr=asyncio.subprocess.PIPE,
954          )
955          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
956          
957          events = []
958          import re
959          for line in stdout.decode().strip().split("\n"):
960              if not line:
961                  continue
962              match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})[^\[]*\[([^\]]+)\]\s*(.*)", line)
963              if match:
964                  timestamp, event_type, rest = match.groups()
965                  events.append({
966                      "time": timestamp[11:19],
967                      "type": event_type,
968                      "message": rest[:150]
969                  })
970          return events[-50:]  # Last 50 events
971      except Exception as e:
972          return []
973  
974  
975  @app.get("/api/pipeline/runners")
976  async def get_pipeline_runners():
977      """Get current runner status from CI server watcher."""
978      try:
979          proc = await asyncio.create_subprocess_exec(
980              "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes",
981              f"{CI_SERVER_USER}@{CI_SERVER_HOST}",
982              "cat", "/var/www/health/runners.json",
983              stdout=asyncio.subprocess.PIPE,
984              stderr=asyncio.subprocess.PIPE,
985          )
986          stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
987          
988          import json
989          return json.loads(stdout.decode())
990      except Exception as e:
991          return []