main.py
1 #!/usr/bin/env python3 2 """ 3 Forgejo CI Dashboard Backend 4 Aggregates data from Forgejo API and database for the dashboard. 5 """ 6 7 import asyncio 8 import os 9 import re 10 import sqlite3 11 import subprocess 12 import time 13 from contextlib import asynccontextmanager 14 from datetime import datetime, timezone 15 from pathlib import Path 16 from typing import Any 17 18 import httpx 19 import psutil 20 from dotenv import load_dotenv 21 from fastapi import FastAPI, HTTPException 22 from fastapi.middleware.cors import CORSMiddleware 23 from fastapi.responses import JSONResponse 24 25 load_dotenv() 26 27 # Configuration 28 FORGEJO_URL = os.getenv("FORGEJO_URL", "http://localhost:3000") 29 FORGEJO_TOKEN = os.getenv("FORGEJO_TOKEN", "") 30 FORGEJO_DB_PATH = os.getenv("FORGEJO_DB_PATH", "/var/lib/forgejo/forgejo.db") 31 FORGEJO_REPOS_PATH = os.getenv("FORGEJO_REPOS_PATH", "/var/lib/forgejo/repositories") 32 RADICLE_BIN = os.getenv("RADICLE_BIN", "/home/devops/.radicle/bin/rad") 33 CACHE_TTL = int(os.getenv("CACHE_TTL", "10")) # seconds 34 35 # CI stages to track (customize as needed) 36 CI_STAGES = ["build", "check", "clippy", "fmt", "test", "node"] 37 38 # Remote CI server configuration 39 CI_SERVER_HOST = os.getenv("CI_SERVER_HOST", "10.106.0.3") 40 CI_SERVER_USER = os.getenv("CI_SERVER_USER", "devops") 41 CI_SERVER_SSH_TIMEOUT = int(os.getenv("CI_SERVER_SSH_TIMEOUT", "5")) 42 43 44 async def fetch_remote_system_metrics() -> dict | None: 45 """Fetch system metrics from remote CI server via SSH using Linux commands. 46 47 Uses /proc/stat aggregate line for CPU (average across ALL cores), 48 free for memory, and df for disk usage. 49 """ 50 # Shell script to gather metrics and output JSON 51 # CPU: Sample /proc/stat twice with 0.1s interval to calculate usage percentage 52 # The first line of /proc/stat (starting with "cpu ") is aggregate across all CPUs 53 script = ''' 54 read _ u1 n1 s1 i1 w1 x1 y1 _ < /proc/stat 55 sleep 0.1 56 read _ u2 n2 s2 i2 w2 x2 y2 _ < /proc/stat 57 cores=$(nproc) 58 total1=$((u1+n1+s1+i1+w1+x1+y1)) 59 total2=$((u2+n2+s2+i2+w2+x2+y2)) 60 idle_delta=$((i2-i1)) 61 total_delta=$((total2-total1)) 62 if [ $total_delta -gt 0 ]; then 63 cpu_pct=$((100 * (total_delta - idle_delta) / total_delta)) 64 else 65 cpu_pct=0 66 fi 67 read _ mem_total mem_used mem_free _ _ mem_avail _ < <(free -b | grep Mem) 68 read _ disk_total disk_used disk_avail _ _ < <(df -B1 / | tail -1) 69 disk_pct=$((100 * disk_used / disk_total)) 70 mem_pct=$((100 * mem_used / mem_total)) 71 echo "{\\"cpu\\":{\\"percent\\":$cpu_pct,\\"cores\\":$cores},\\"memory\\":{\\"total\\":$mem_total,\\"used\\":$mem_used,\\"available\\":$mem_avail,\\"percent\\":$mem_pct},\\"disk\\":{\\"total\\":$disk_total,\\"used\\":$disk_used,\\"free\\":$disk_avail,\\"percent\\":$disk_pct}}" 72 ''' 73 74 try: 75 proc = await asyncio.create_subprocess_exec( 76 "ssh", 77 "-o", "ConnectTimeout=5", 78 "-o", "BatchMode=yes", 79 "-o", "StrictHostKeyChecking=accept-new", 80 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 81 "bash", "-c", script, 82 stdout=asyncio.subprocess.PIPE, 83 stderr=asyncio.subprocess.PIPE, 84 ) 85 86 try: 87 stdout, stderr = await asyncio.wait_for( 88 proc.communicate(), 89 timeout=CI_SERVER_SSH_TIMEOUT 90 ) 91 except asyncio.TimeoutError: 92 proc.kill() 93 await proc.wait() 94 print(f"SSH timeout fetching metrics from {CI_SERVER_HOST}") 95 return None 96 97 if proc.returncode != 0: 98 print(f"SSH error fetching metrics from {CI_SERVER_HOST}: {stderr.decode()}") 99 return None 100 101 import json 102 return json.loads(stdout.decode().strip()) 103 104 except Exception as e: 105 print(f"Error fetching remote metrics: {e}") 106 return None 107 108 109 class Cache: 110 """Simple in-memory cache with TTL.""" 111 112 def __init__(self, ttl: int = 10): 113 self.ttl = ttl 114 self._cache: dict[str, tuple[float, Any]] = {} 115 116 def get(self, key: str) -> Any | None: 117 if key in self._cache: 118 timestamp, value = self._cache[key] 119 if time.time() - timestamp < self.ttl: 120 return value 121 del self._cache[key] 122 return None 123 124 def set(self, key: str, value: Any) -> None: 125 self._cache[key] = (time.time(), value) 126 127 def clear(self) -> None: 128 self._cache.clear() 129 130 131 cache = Cache(ttl=CACHE_TTL) 132 133 134 def load_radicle_config() -> dict[str, str | None]: 135 """Load repo-to-RID mappings from config file. 136 137 Returns dict mapping repo name (lowercase) to RID or None. 138 """ 139 config_path = Path(__file__).parent / "radicle-repos.json" 140 try: 141 if config_path.exists(): 142 import json 143 with open(config_path) as f: 144 data = json.load(f) 145 return {k.lower(): v for k, v in data.get("repos", {}).items()} 146 except Exception as e: 147 print(f"Error loading radicle config: {e}") 148 return {} 149 150 151 def get_radicle_head_for_rid(rid: str) -> str | None: 152 """Get the HEAD commit for a radicle repo by RID. 153 154 Uses rad inspect to get repo details. 155 """ 156 if not rid: 157 return None 158 try: 159 result = subprocess.run( 160 [RADICLE_BIN, "inspect", "--refs", rid], 161 capture_output=True, 162 text=True, 163 timeout=10, 164 env={**os.environ, "RAD_HOME": "/home/devops/.radicle"} 165 ) 166 if result.returncode != 0: 167 return None 168 169 # Parse output to find HEAD or main/master ref 170 # Look for lines like: refs/heads/main -> abc1234 171 for line in result.stdout.split("\n"): 172 if "refs/heads/main" in line or "refs/heads/master" in line: 173 parts = line.split() 174 for part in parts: 175 # Find the commit hash (7+ hex chars) 176 if len(part) >= 7 and all(c in "0123456789abcdef" for c in part[:7]): 177 return part[:7] 178 return None 179 except Exception as e: 180 print(f"Error getting radicle HEAD for {rid}: {e}") 181 return None 182 183 184 def get_radicle_repos() -> dict[str, str | None]: 185 """Get radicle repos with their HEAD commits from CI server. 186 187 Returns dict mapping repo name (lowercase) to HEAD commit hash or None. 188 Queries CI server's Radicle storage directly using the CI runner's namespace 189 to get the actual synced HEAD commits (not the canonical branch). 190 """ 191 try: 192 # Get CI runner's node ID and query git refs from Radicle storage 193 # The CI runner's namespace contains the actually synced commits 194 radicle_config = load_radicle_config() 195 if not radicle_config: 196 return {} 197 198 # Build a script that: 199 # 1. Gets the CI runner's node ID (DID) 200 # 2. For each RID, reads the HEAD from the runner's namespace 201 rids = {name: rid for name, rid in radicle_config.items() if rid} 202 if not rids: 203 return {} 204 205 # Create script to extract heads from CI runner's namespace 206 rid_list = " ".join(f"{name}:{rid.replace('rad:', '')}" for name, rid in rids.items()) 207 script = f''' 208 export PATH=/home/devops/.radicle/bin:$PATH 209 NODE_ID=$(rad self 2>/dev/null | grep "^DID" | awk '{{print $2}}' | sed 's/did:key://') 210 if [ -z "$NODE_ID" ]; then 211 echo "ERROR: Could not get node ID" >&2 212 exit 1 213 fi 214 for entry in {rid_list}; do 215 name="${{entry%%:*}}" 216 rid="${{entry#*:}}" 217 ref_path="$HOME/.radicle/storage/$rid/refs/namespaces/$NODE_ID/refs/heads/main" 218 if [ -f "$ref_path" ]; then 219 head=$(cat "$ref_path" | head -c 7) 220 echo "$name:$head" 221 fi 222 done 223 ''' 224 225 result = subprocess.run( 226 [ 227 "ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", 228 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 229 "bash", "-c", script 230 ], 231 capture_output=True, 232 text=True, 233 timeout=15, 234 ) 235 236 repos = {} 237 if result.returncode == 0: 238 for line in result.stdout.strip().split("\n"): 239 if ":" in line: 240 parts = line.split(":", 1) 241 if len(parts) == 2: 242 name, head = parts 243 if head and len(head) >= 7: 244 repos[name.lower()] = head 245 246 return repos 247 except Exception as e: 248 print(f"Error getting radicle repos: {e}") 249 return {} 250 251 252 def get_forgejo_repo_head(owner: str, repo: str) -> str | None: 253 """Get the HEAD commit hash for a Forgejo repository.""" 254 try: 255 repo_path = Path(FORGEJO_REPOS_PATH) / owner / f"{repo}.git" 256 if not repo_path.exists(): 257 return None 258 259 def read_packed_ref(ref_name: str) -> str | None: 260 """Read a ref from packed-refs file if it exists.""" 261 packed_refs_path = repo_path / "packed-refs" 262 if not packed_refs_path.exists(): 263 return None 264 try: 265 for line in packed_refs_path.read_text().splitlines(): 266 line = line.strip() 267 # Skip comments and empty lines 268 if not line or line.startswith("#") or line.startswith("^"): 269 continue 270 # Format: <sha> <ref> 271 parts = line.split() 272 if len(parts) >= 2 and parts[1] == ref_name: 273 return parts[0][:7] 274 except Exception: 275 pass 276 return None 277 278 # Try refs/heads/main first, then master 279 for branch in ["main", "master"]: 280 ref_path = repo_path / "refs" / "heads" / branch 281 if ref_path.exists(): 282 return ref_path.read_text().strip()[:7] 283 # Check packed-refs if loose ref doesn't exist 284 packed_head = read_packed_ref(f"refs/heads/{branch}") 285 if packed_head: 286 return packed_head 287 288 # Fallback to HEAD 289 head_path = repo_path / "HEAD" 290 if head_path.exists(): 291 head_content = head_path.read_text().strip() 292 if head_content.startswith("ref:"): 293 ref = head_content.split("ref:")[1].strip() 294 ref_path = repo_path / ref 295 if ref_path.exists(): 296 return ref_path.read_text().strip()[:7] 297 # Check packed-refs for the symbolic ref target 298 packed_head = read_packed_ref(ref) 299 if packed_head: 300 return packed_head 301 else: 302 return head_content[:7] 303 return None 304 except Exception as e: 305 print(f"Error getting Forgejo repo HEAD for {owner}/{repo}: {e}") 306 return None 307 308 309 async def fetch_forgejo_api(path: str, client: httpx.AsyncClient) -> Any: 310 """Fetch data from Forgejo API.""" 311 headers = {} 312 if FORGEJO_TOKEN: 313 headers["Authorization"] = f"token {FORGEJO_TOKEN}" 314 315 url = f"{FORGEJO_URL}{path}" 316 response = await client.get(url, headers=headers, timeout=30.0) 317 response.raise_for_status() 318 return response.json() 319 320 321 def query_runners_from_db() -> list[dict]: 322 """Query runner information directly from Forgejo SQLite database.""" 323 if not Path(FORGEJO_DB_PATH).exists(): 324 return [] 325 326 try: 327 conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True) 328 conn.row_factory = sqlite3.Row 329 cursor = conn.cursor() 330 331 cursor.execute(""" 332 SELECT 333 id, name, uuid, version, agent_labels, 334 last_online, last_active, owner_id, repo_id 335 FROM action_runner 336 WHERE deleted IS NULL OR deleted = 0 337 ORDER BY last_online DESC 338 """) 339 340 runners = [] 341 now = time.time() 342 for row in cursor.fetchall(): 343 last_online = row["last_online"] or 0 344 last_active = row["last_active"] or 0 345 346 # Consider runner "online" if seen in last 60 seconds 347 is_online = (now - last_online) < 60 348 349 # Parse labels JSON 350 labels = [] 351 if row["agent_labels"]: 352 import json 353 try: 354 labels = json.loads(row["agent_labels"]) 355 except json.JSONDecodeError: 356 labels = [] 357 358 runners.append({ 359 "id": row["id"], 360 "name": row["name"], 361 "uuid": row["uuid"], 362 "version": row["version"], 363 "labels": labels, 364 "status": "online" if is_online else "offline", 365 "last_online": datetime.fromtimestamp(last_online, tz=timezone.utc).isoformat() if last_online else None, 366 "last_active": datetime.fromtimestamp(last_active, tz=timezone.utc).isoformat() if last_active else None, 367 "current_job": None, # Will be populated from jobs query 368 }) 369 370 conn.close() 371 return runners 372 except Exception as e: 373 print(f"Error querying runners from DB: {e}") 374 return [] 375 376 377 def query_running_jobs_from_db() -> dict[int, dict]: 378 """Query currently running jobs to associate with runners.""" 379 if not Path(FORGEJO_DB_PATH).exists(): 380 return {} 381 382 try: 383 conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True) 384 conn.row_factory = sqlite3.Row 385 cursor = conn.cursor() 386 387 now = int(time.time()) 388 # Only consider tasks started in the last 2 hours as "running" 389 # Older tasks with status=2 are likely stuck/orphaned 390 max_age = now - (2 * 3600) 391 392 # Query running tasks (status 2 = running in Forgejo) 393 # Get only the most recent task per runner 394 cursor.execute(""" 395 SELECT 396 t.id, t.job_id, t.runner_id, t.status, t.started, 397 j.name as job_name, j.run_id, 398 r.title as run_title, r.repo_id 399 FROM action_task t 400 LEFT JOIN action_run_job j ON t.job_id = j.id 401 LEFT JOIN action_run r ON j.run_id = r.id 402 WHERE t.status = 2 AND t.started > ? 403 ORDER BY t.started DESC 404 """, (max_age,)) 405 406 jobs_by_runner = {} 407 for row in cursor.fetchall(): 408 runner_id = row["runner_id"] 409 # Only keep the most recent job per runner (first due to ORDER BY DESC) 410 if runner_id and runner_id not in jobs_by_runner: 411 jobs_by_runner[runner_id] = { 412 "id": row["id"], 413 "name": row["job_name"] or "Unknown", 414 "run_title": row["run_title"], 415 "started": datetime.fromtimestamp(row["started"], tz=timezone.utc).isoformat() if row["started"] else None, 416 } 417 418 conn.close() 419 return jobs_by_runner 420 except Exception as e: 421 print(f"Error querying running jobs from DB: {e}") 422 return {} 423 424 425 def query_queue_from_db() -> list[dict]: 426 """Query active workflow runs and return repo-level progress summary. 427 428 Returns a list of repos with active runs, showing completed/total jobs. 429 Format: {"repo": "alphavm", "done": 3, "total": 8, "status": "running"} 430 """ 431 if not Path(FORGEJO_DB_PATH).exists(): 432 return [] 433 434 try: 435 conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True) 436 conn.row_factory = sqlite3.Row 437 cursor = conn.cursor() 438 439 now = int(time.time()) 440 # Only consider runs updated in the last 30 minutes 441 max_age = now - (30 * 60) 442 443 # Get active workflow runs (status 1=pending, 2=running) 444 cursor.execute(""" 445 SELECT 446 r.id as run_id, r.status as run_status, r.updated, 447 repo.name as repo_name 448 FROM action_run r 449 JOIN repository repo ON r.repo_id = repo.id 450 WHERE r.status IN (1, 2) AND r.updated > ? 451 ORDER BY r.updated DESC 452 """, (max_age,)) 453 454 runs = cursor.fetchall() 455 if not runs: 456 conn.close() 457 return [] 458 459 # For each active run, count jobs by status 460 # Job status: 1=pending, 2=running, 3=success, 4=failure, 5=cancelled, 6=skipped 461 queue_summary = [] 462 seen_repos = set() 463 464 for run in runs: 465 repo_name = run["repo_name"] 466 # Only show each repo once (most recent run) 467 if repo_name in seen_repos: 468 continue 469 seen_repos.add(repo_name) 470 471 cursor.execute(""" 472 SELECT status, COUNT(*) as count 473 FROM action_run_job 474 WHERE run_id = ? 475 GROUP BY status 476 """, (run["run_id"],)) 477 478 status_counts = {row["status"]: row["count"] for row in cursor.fetchall()} 479 480 # Calculate done (success, failure, cancelled, skipped) vs total 481 done = sum(status_counts.get(s, 0) for s in [3, 4, 5, 6]) 482 total = sum(status_counts.values()) 483 running = status_counts.get(2, 0) 484 pending = status_counts.get(1, 0) + status_counts.get(0, 0) 485 486 # Determine overall status 487 if running > 0: 488 status = "running" 489 elif pending > 0: 490 status = "pending" 491 else: 492 status = "done" 493 494 queue_summary.append({ 495 "repo": repo_name, 496 "done": done, 497 "total": total, 498 "running": running, 499 "status": status, 500 }) 501 502 conn.close() 503 return queue_summary 504 except Exception as e: 505 print(f"Error querying queue from DB: {e}") 506 return [] 507 508 509 def query_history_from_db(limit: int = 30) -> list[dict]: 510 """Query recent job history from database. 511 512 Returns jobs sorted chronologically: 513 - Unfinished jobs (running/pending) first, by start time newest first 514 - Completed jobs second, by completion time newest first 515 """ 516 if not Path(FORGEJO_DB_PATH).exists(): 517 return [] 518 519 try: 520 conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True) 521 conn.row_factory = sqlite3.Row 522 cursor = conn.cursor() 523 524 status_map = { 525 0: "unknown", 1: "queued", 2: "running", 526 3: "success", 4: "failure", 5: "cancelled", 6: "skipped" 527 } 528 529 history = [] 530 531 # First: Query actually running jobs, sorted by start time newest first 532 # Status: 2=running, but only if stopped is 0/NULL (not yet completed) 533 # Some jobs have status=2 but already have stopped times (stale status) 534 cursor.execute(""" 535 SELECT 536 j.id, j.name, j.status, j.started, j.stopped, j.updated, 537 r.title, r.ref as branch, r.event, 538 repo.name as repo_name, repo.owner_name 539 FROM action_run_job j 540 JOIN action_run r ON j.run_id = r.id 541 JOIN repository repo ON r.repo_id = repo.id 542 WHERE j.status = 2 AND (j.stopped IS NULL OR j.stopped = 0) 543 ORDER BY CASE WHEN j.started > 0 THEN j.started ELSE j.updated END DESC 544 LIMIT ? 545 """, (limit,)) 546 547 for row in cursor.fetchall(): 548 started = row["started"] or 0 549 branch = row["branch"] or "" 550 branch = branch.replace("refs/heads/", "").replace("refs/tags/", "") 551 552 history.append({ 553 "id": row["id"], 554 "job": row["name"], 555 "repo": f"{row['owner_name']}/{row['repo_name']}", 556 "branch": branch, 557 "result": status_map.get(row["status"], "unknown"), 558 "duration": 0, # Still running, no duration yet 559 "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None, 560 "finished_at": None, 561 }) 562 563 # Second: Query completed jobs, sorted by completion time newest first 564 # Include jobs with stopped time > 0 (actually finished) OR status indicates completion 565 # This catches jobs with stale status=2 that actually finished 566 remaining_limit = limit - len(history) 567 if remaining_limit > 0: 568 cursor.execute(""" 569 SELECT 570 j.id, j.name, j.status, j.started, j.stopped, j.updated, 571 r.title, r.ref as branch, r.event, 572 repo.name as repo_name, repo.owner_name 573 FROM action_run_job j 574 JOIN action_run r ON j.run_id = r.id 575 JOIN repository repo ON r.repo_id = repo.id 576 WHERE j.status IN (3, 4, 5, 6) OR (j.stopped > 0 AND j.started > 0) 577 ORDER BY CASE WHEN j.stopped > 0 THEN j.stopped ELSE j.updated END DESC 578 LIMIT ? 579 """, (remaining_limit,)) 580 581 for row in cursor.fetchall(): 582 started = row["started"] or 0 583 stopped = row["stopped"] if row["stopped"] else row["updated"] 584 duration = stopped - started if stopped and started else 0 585 586 branch = row["branch"] or "" 587 branch = branch.replace("refs/heads/", "").replace("refs/tags/", "") 588 589 # Determine result: if job has stopped time but status is still running/queued, 590 # treat as success (stale status that wasn't updated) 591 status = row["status"] 592 if status in (1, 2) and stopped > 0: 593 result = "success" 594 else: 595 result = status_map.get(status, "unknown") 596 597 history.append({ 598 "id": row["id"], 599 "job": row["name"], 600 "repo": f"{row['owner_name']}/{row['repo_name']}", 601 "branch": branch, 602 "result": result, 603 "duration": duration, 604 "started_at": datetime.fromtimestamp(started, tz=timezone.utc).isoformat() if started else None, 605 "finished_at": datetime.fromtimestamp(stopped, tz=timezone.utc).isoformat() if stopped else None, 606 }) 607 608 conn.close() 609 return history 610 except Exception as e: 611 print(f"Error querying history from DB: {e}") 612 return [] 613 614 615 def query_repo_status_from_db() -> list[dict]: 616 """Query all repositories with their radicle sync status. 617 618 Returns repos with sync_status: 619 - "synced" (green): Forgejo HEAD == Radicle HEAD 620 - "unsynced" (yellow): Forgejo HEAD != Radicle HEAD 621 - "no-radicle" (red): No radicle origin for this repo 622 """ 623 if not Path(FORGEJO_DB_PATH).exists(): 624 return [] 625 626 try: 627 conn = sqlite3.connect(f"file:{FORGEJO_DB_PATH}?mode=ro", uri=True) 628 conn.row_factory = sqlite3.Row 629 cursor = conn.cursor() 630 631 # Get radicle HEAD commits from rad ls (for seeded repos) 632 radicle_repos = get_radicle_repos() 633 634 # Load RID config for repos not in rad ls 635 radicle_config = load_radicle_config() 636 637 # Get all repos (not just those with action runs) 638 cursor.execute(""" 639 SELECT DISTINCT 640 repo.id, repo.name, repo.owner_name, repo.default_branch 641 FROM repository repo 642 ORDER BY repo.owner_name, repo.name 643 """) 644 645 repos = [] 646 647 for repo_row in cursor.fetchall(): 648 repo_name = repo_row["name"] 649 owner_name = repo_row["owner_name"] 650 repo_name_lower = repo_name.lower() 651 652 # Get Forgejo HEAD commit 653 forgejo_head = get_forgejo_repo_head(owner_name, repo_name) 654 655 # Check radicle - first from rad ls, then from config 656 radicle_head = radicle_repos.get(repo_name_lower) 657 rid = radicle_config.get(repo_name_lower) 658 659 # If we have a RID in config but no HEAD from rad ls, the repo has radicle 660 # but we might not have the HEAD synced locally 661 if radicle_head is None and rid: 662 # Repo has radicle origin but HEAD unknown (not seeded locally) 663 sync_status = "unsynced" 664 radicle_head = "?" 665 elif radicle_head is None and rid is None: 666 sync_status = "no-radicle" 667 elif forgejo_head and radicle_head and forgejo_head == radicle_head: 668 sync_status = "synced" 669 else: 670 sync_status = "unsynced" 671 672 repos.append({ 673 "name": repo_name, 674 "sync_status": sync_status, 675 "forgejo_head": forgejo_head, 676 "radicle_head": radicle_head, 677 "rid": rid, 678 }) 679 680 conn.close() 681 return repos 682 except Exception as e: 683 print(f"Error querying repo status from DB: {e}") 684 return [] 685 686 687 @asynccontextmanager 688 async def lifespan(app: FastAPI): 689 """Application lifespan handler.""" 690 print(f"Starting CI Dashboard Backend") 691 print(f"Forgejo URL: {FORGEJO_URL}") 692 print(f"Database: {FORGEJO_DB_PATH}") 693 yield 694 print("Shutting down CI Dashboard Backend") 695 696 697 app = FastAPI( 698 title="CI Dashboard API", 699 description="Backend aggregator for Forgejo CI Dashboard", 700 version="1.0.0", 701 lifespan=lifespan, 702 ) 703 704 # CORS middleware 705 app.add_middleware( 706 CORSMiddleware, 707 allow_origins=["*"], 708 allow_credentials=True, 709 allow_methods=["*"], 710 allow_headers=["*"], 711 ) 712 713 714 @app.get("/api/health/status") 715 async def get_health(): 716 """Get Forgejo health status.""" 717 cached = cache.get("health") 718 if cached: 719 return cached 720 721 try: 722 async with httpx.AsyncClient() as client: 723 data = await fetch_forgejo_api("/api/healthz", client) 724 cache.set("health", data) 725 return data 726 except Exception as e: 727 return {"status": "error", "message": str(e)} 728 729 730 @app.get("/api/runners") 731 async def get_runners(): 732 """Get list of runners with their status from CI watcher (real-time).""" 733 cached = cache.get("runners") 734 if cached: 735 return cached 736 737 # Get real-time status from CI watcher 738 watcher_runners = {} 739 try: 740 proc = await asyncio.create_subprocess_exec( 741 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 742 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 743 "cat", "/var/www/health/runners.json", 744 stdout=asyncio.subprocess.PIPE, 745 stderr=asyncio.subprocess.PIPE, 746 ) 747 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 748 import json 749 for r in json.loads(stdout.decode()): 750 # Map runner-N to native-runner-N 751 num = r["name"].replace("runner-", "") 752 watcher_runners[f"native-runner-{num}"] = r 753 except Exception as e: 754 print(f"Error fetching watcher runners: {e}") 755 756 # Get runner metadata from Forgejo DB 757 runners = query_runners_from_db() 758 759 # Merge: use watcher status as source of truth 760 for runner in runners: 761 name = runner["name"] 762 if name in watcher_runners: 763 wr = watcher_runners[name] 764 runner["status"] = wr["status"] 765 if wr["task"]: 766 runner["current_job"] = { 767 "name": f"Task {wr['task']}", 768 "run_title": wr["repo"], 769 } 770 else: 771 runner["current_job"] = None 772 elif runner["status"] == "online": 773 runner["status"] = "idle" 774 775 result = {"runners": runners} 776 cache.set("runners", result) 777 return result 778 779 780 @app.get("/api/queue") 781 async def get_queue(): 782 """Get jobs waiting in queue.""" 783 cached = cache.get("queue") 784 if cached: 785 return cached 786 787 queue = query_queue_from_db() 788 result = {"queue": queue} 789 cache.set("queue", result) 790 return result 791 792 793 @app.get("/api/jobs/history") 794 async def get_history(limit: int = 30): 795 """Get recent job history.""" 796 cache_key = f"history_{limit}" 797 cached = cache.get(cache_key) 798 if cached: 799 return cached 800 801 history = query_history_from_db(limit=limit) 802 result = {"jobs": history} 803 cache.set(cache_key, result) 804 return result 805 806 807 @app.get("/api/repos") 808 async def get_repos(): 809 """Get repository CI status.""" 810 cached = cache.get("repos") 811 if cached: 812 return cached 813 814 repos = query_repo_status_from_db() 815 result = {"repos": repos} 816 cache.set("repos", result) 817 return result 818 819 820 @app.get("/api/stats") 821 async def get_stats(): 822 """Get overall CI statistics.""" 823 cached = cache.get("stats") 824 if cached: 825 return cached 826 827 runners = query_runners_from_db() 828 queue = query_queue_from_db() 829 history = query_history_from_db(limit=100) 830 831 online_runners = sum(1 for r in runners if r["status"] in ("online", "running")) 832 busy_runners = sum(1 for r in runners if r["status"] == "running") 833 834 success_count = sum(1 for j in history if j["result"] == "success") 835 failure_count = sum(1 for j in history if j["result"] == "failure") 836 total_completed = len(history) 837 838 result = { 839 "runners": { 840 "total": len(runners), 841 "online": online_runners, 842 "busy": busy_runners, 843 }, 844 "queue": { 845 "length": len(queue), 846 }, 847 "recent": { 848 "total": total_completed, 849 "success": success_count, 850 "failure": failure_count, 851 "success_rate": round(success_count / total_completed * 100, 1) if total_completed > 0 else 0, 852 } 853 } 854 cache.set("stats", result) 855 return result 856 857 858 @app.post("/api/cache/clear") 859 async def clear_cache(): 860 """Clear the data cache.""" 861 cache.clear() 862 return {"status": "ok", "message": "Cache cleared"} 863 864 865 @app.get("/api/system") 866 async def get_system(): 867 """Get system resource usage (CPU, memory, disk) for both local and CI servers.""" 868 cached = cache.get("system") 869 if cached: 870 return cached 871 872 # Local server metrics (this Forgejo/Radicle server) 873 # CPU usage (percent across all cores) 874 cpu_percent = psutil.cpu_percent(interval=0.1) 875 memory = psutil.virtual_memory() 876 disk = psutil.disk_usage("/") 877 878 local_metrics = { 879 "name": "source", 880 "cpu": { 881 "percent": cpu_percent, 882 "cores": psutil.cpu_count(), 883 }, 884 "memory": { 885 "total": memory.total, 886 "used": memory.used, 887 "available": memory.available, 888 "percent": memory.percent, 889 }, 890 "disk": { 891 "total": disk.total, 892 "used": disk.used, 893 "free": disk.free, 894 "percent": disk.percent, 895 }, 896 } 897 898 # Remote CI server metrics (fetched via SSH) 899 ci_metrics = await fetch_remote_system_metrics() 900 if ci_metrics: 901 ci_metrics["name"] = "ci" 902 903 result = { 904 "local": local_metrics, 905 "ci": ci_metrics, 906 } 907 cache.set("system", result) 908 return result 909 910 911 if __name__ == "__main__": 912 import uvicorn 913 uvicorn.run(app, host="127.0.0.1", port=8081) 914 915 916 # Pipeline Monitor endpoints - fetches data from CI watcher 917 @app.get("/api/pipeline/metrics") 918 async def get_pipeline_metrics(): 919 """Get current pipeline metrics from CI server watcher.""" 920 try: 921 proc = await asyncio.create_subprocess_exec( 922 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 923 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 924 "cat", "/tmp/ci-monitor-data/metrics.log", "2>/dev/null", "|", "tail", "-1", 925 stdout=asyncio.subprocess.PIPE, 926 stderr=asyncio.subprocess.PIPE, 927 ) 928 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 929 930 if not stdout.strip(): 931 return {"load": 0, "mem_mb": 0, "active_tasks": 0, "completed": 0} 932 933 # Parse: 2026-01-09T16:35:58 load=43.3 mem_mb=13943 containers=0 active_tasks=50 completed=0 934 import re 935 data = {} 936 for match in re.finditer(r"(\w+)=([0-9.]+)", stdout.decode()): 937 key, val = match.groups() 938 data[key] = float(val) if "." in val else int(val) 939 return data 940 except Exception as e: 941 return {"error": str(e)} 942 943 944 @app.get("/api/pipeline/events") 945 async def get_pipeline_events(): 946 """Get recent pipeline events from CI server watcher.""" 947 try: 948 proc = await asyncio.create_subprocess_exec( 949 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 950 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 951 "tail", "-100", "/tmp/ci-monitor-data/events.log", 952 stdout=asyncio.subprocess.PIPE, 953 stderr=asyncio.subprocess.PIPE, 954 ) 955 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 956 957 events = [] 958 import re 959 for line in stdout.decode().strip().split("\n"): 960 if not line: 961 continue 962 match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})[^\[]*\[([^\]]+)\]\s*(.*)", line) 963 if match: 964 timestamp, event_type, rest = match.groups() 965 events.append({ 966 "time": timestamp[11:19], 967 "type": event_type, 968 "message": rest[:150] 969 }) 970 return events[-50:] # Last 50 events 971 except Exception as e: 972 return [] 973 974 975 @app.get("/api/pipeline/runners") 976 async def get_pipeline_runners(): 977 """Get current runner status from CI server watcher.""" 978 try: 979 proc = await asyncio.create_subprocess_exec( 980 "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", 981 f"{CI_SERVER_USER}@{CI_SERVER_HOST}", 982 "cat", "/var/www/health/runners.json", 983 stdout=asyncio.subprocess.PIPE, 984 stderr=asyncio.subprocess.PIPE, 985 ) 986 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) 987 988 import json 989 return json.loads(stdout.decode()) 990 except Exception as e: 991 return []