ci-watcher.py
1 #!/usr/bin/env python3 2 """ 3 CI Run Watcher - Tracks Forgejo CI runs 4 Detects completions via new task starts AND idle runner detection 5 """ 6 7 import subprocess 8 import json 9 import time 10 import re 11 import threading 12 from datetime import datetime 13 from collections import defaultdict 14 from pathlib import Path 15 16 LOG_DIR = Path("/tmp/ci-monitor-data") 17 LOG_DIR.mkdir(exist_ok=True) 18 19 20 def discover_runners(): 21 """Dynamically discover all forgejo-runner-* systemd units. 22 23 Returns a list of runner numbers (e.g., [1, 2, 3, ..., 14]) 24 """ 25 try: 26 result = subprocess.run( 27 ["systemctl", "list-units", "forgejo-runner-*", "--no-legend"], 28 capture_output=True, 29 text=True, 30 check=True 31 ) 32 33 runners = [] 34 for line in result.stdout.strip().split("\n"): 35 if "forgejo-runner-" in line: 36 # Extract number from "forgejo-runner-N.service" 37 match = re.search(r'forgejo-runner-(\d+)\.service', line) 38 if match: 39 runners.append(int(match.group(1))) 40 41 runners.sort() 42 print(f"Discovered {len(runners)} runners: {runners}") 43 return runners 44 except Exception as e: 45 print(f"Error discovering runners: {e}") 46 # Fallback to 1-10 if discovery fails 47 return list(range(1, 11)) 48 49 50 class CIWatcher: 51 def __init__(self, runner_numbers=None): 52 self.runner_numbers = runner_numbers or discover_runners() 53 self.active_tasks = {} # runner -> {task_id, start_time, repo} 54 self.completed_count = 0 55 self.events_file = open(LOG_DIR / "events.log", "a", buffering=1) 56 self.lock = threading.Lock() 57 58 def log_event(self, event_type, **kwargs): 59 timestamp = datetime.now().isoformat() 60 msg = f"{timestamp} [{event_type}] " + " ".join(f"{k}={v}" for k, v in kwargs.items()) 61 self.events_file.write(msg + "\n") 62 63 def parse_runner_log(self, runner_name, line): 64 with self.lock: 65 # Task start pattern 66 task_match = re.search(r'task (\d+) repo is ([^\s]+)', line) 67 if task_match: 68 task_id = task_match.group(1) 69 repo = task_match.group(2) 70 71 # If this runner had a previous task, mark it complete 72 if runner_name in self.active_tasks: 73 prev = self.active_tasks[runner_name] 74 duration = time.time() - prev["start_time"] 75 self.completed_count += 1 76 self.log_event("TASK_END", 77 runner=runner_name, 78 task=prev["task_id"], 79 repo=prev["repo"].split("/")[-1], 80 duration_s=f"{duration:.1f}", 81 completed_total=self.completed_count) 82 83 # Record new task 84 self.active_tasks[runner_name] = { 85 "task_id": task_id, 86 "start_time": time.time(), 87 "repo": repo 88 } 89 90 load = self.get_load() 91 mem = self.get_memory_mb() 92 self.log_event("TASK_START", 93 runner=runner_name, 94 task=task_id, 95 repo=repo, 96 load=f"{load:.2f}", 97 mem_mb=mem) 98 99 def get_load(self): 100 try: 101 with open("/proc/loadavg") as f: 102 return float(f.read().split()[0]) 103 except: 104 return 0.0 105 106 def get_memory_mb(self): 107 try: 108 with open("/proc/meminfo") as f: 109 lines = f.readlines() 110 total = int(lines[0].split()[1]) 111 avail = int(lines[2].split()[1]) 112 return (total - avail) // 1024 113 except: 114 return 0 115 116 def get_runner_pid(self, runner_num): 117 """Get the main PID of a forgejo-runner service.""" 118 try: 119 result = subprocess.run( 120 ["systemctl", "show", f"forgejo-runner-{runner_num}", 121 "--property=MainPID", "--value"], 122 capture_output=True, text=True, timeout=5 123 ) 124 pid = int(result.stdout.strip()) 125 return pid if pid > 0 else None 126 except: 127 return None 128 129 def runner_has_children(self, pid): 130 """Check if a runner process has any child processes (active job).""" 131 if not pid: 132 return False 133 try: 134 result = subprocess.run( 135 ["ps", "--ppid", str(pid), "--no-headers"], 136 capture_output=True, text=True, timeout=5 137 ) 138 return len(result.stdout.strip()) > 0 139 except: 140 return False 141 142 def check_idle_runners(self): 143 """Periodically check for runners that finished without starting new tasks.""" 144 while True: 145 time.sleep(15) 146 147 with self.lock: 148 to_remove = [] 149 for i in self.runner_numbers: 150 runner_name = f"runner-{i}" 151 152 # Only check runners we think are active 153 if runner_name not in self.active_tasks: 154 continue 155 156 pid = self.get_runner_pid(i) 157 if pid and not self.runner_has_children(pid): 158 prev = self.active_tasks[runner_name] 159 duration = time.time() - prev["start_time"] 160 161 if duration > 5: 162 self.completed_count += 1 163 self.log_event("TASK_END", 164 runner=runner_name, 165 task=prev["task_id"], 166 repo=prev["repo"].split("/")[-1], 167 duration_s=f"{duration:.1f}", 168 completed_total=self.completed_count, 169 detected_by="idle_check") 170 to_remove.append(runner_name) 171 172 for runner_name in to_remove: 173 del self.active_tasks[runner_name] 174 175 def watch_runner(self, runner_num): 176 # Use --since "now" to only get NEW log entries, not historical 177 cmd = ["journalctl", "-u", f"forgejo-runner-{runner_num}", "-f", 178 "--no-pager", "-o", "cat", "--since", "now"] 179 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 180 stderr=subprocess.DEVNULL, text=True) 181 runner_name = f"runner-{runner_num}" 182 183 for line in proc.stdout: 184 self.parse_runner_log(runner_name, line.strip()) 185 186 def watch_workspaces(self): 187 workspace_dir = Path("/opt/ci/workspaces") 188 seen_dirs = set() 189 190 while True: 191 try: 192 current_dirs = set(d.name for d in workspace_dir.iterdir() if d.is_dir()) 193 new_dirs = current_dirs - seen_dirs 194 removed_dirs = seen_dirs - current_dirs 195 196 for d in new_dirs: 197 self.log_event("WORKSPACE_CREATE", dir=d) 198 for d in removed_dirs: 199 self.log_event("WORKSPACE_REMOVE", dir=d) 200 201 seen_dirs = current_dirs 202 except: 203 pass 204 time.sleep(2) 205 206 def collect_metrics(self): 207 while True: 208 load = self.get_load() 209 mem_mb = self.get_memory_mb() 210 211 with self.lock: 212 active = len(self.active_tasks) 213 completed = self.completed_count 214 215 # Update runners.json for the dashboard 216 runners = [] 217 with self.lock: 218 for i in self.runner_numbers: 219 name = f"runner-{i}" 220 if name in self.active_tasks: 221 t = self.active_tasks[name] 222 runners.append({ 223 "name": name, 224 "task": t["task_id"], 225 "repo": t["repo"].split("/")[-1], 226 "status": "running" 227 }) 228 else: 229 runners.append({ 230 "name": name, 231 "task": None, 232 "repo": None, 233 "status": "idle" 234 }) 235 236 with open("/var/www/health/runners.json", "w") as f: 237 json.dump(runners, f) 238 239 # Update events.json for dashboard (last 100 events) 240 try: 241 events = [] 242 events_log = LOG_DIR / "events.log" 243 if events_log.exists(): 244 with open(events_log) as f: 245 lines = f.readlines()[-100:] 246 for line in lines: 247 match = re.match(r'[\d\-T:.]+T(\d{2}:\d{2}:\d{2})\.\d+ \[(\w+)\] (.+)', line.strip()) 248 if match: 249 events.append({ 250 "time": match.group(1), 251 "type": match.group(2), 252 "message": match.group(3) 253 }) 254 with open("/var/www/health/events.json", "w") as f: 255 json.dump(events, f) 256 except: 257 pass 258 259 # Update metrics.json for dashboard 260 try: 261 with open("/var/www/health/metrics.json", "w") as f: 262 json.dump({ 263 "load": load, 264 "mem_mb": mem_mb, 265 "containers": 0, 266 "active_tasks": active, 267 "completed": completed 268 }, f) 269 except: 270 pass 271 272 time.sleep(10) 273 274 def start(self): 275 self.log_event("MONITOR_START", runners=len(self.runner_numbers)) 276 277 threads = [] 278 for i in self.runner_numbers: 279 t = threading.Thread(target=self.watch_runner, args=(i,), daemon=True) 280 t.start() 281 threads.append(t) 282 283 t = threading.Thread(target=self.watch_workspaces, daemon=True) 284 t.start() 285 286 t = threading.Thread(target=self.collect_metrics, daemon=True) 287 t.start() 288 289 t = threading.Thread(target=self.check_idle_runners, daemon=True) 290 t.start() 291 292 self.log_event("WATCHERS_STARTED", count=len(threads)+3) 293 294 try: 295 while True: 296 time.sleep(60) 297 except KeyboardInterrupt: 298 self.log_event("MONITOR_STOP", completed=self.completed_count) 299 300 if __name__ == "__main__": 301 watcher = CIWatcher() 302 watcher.start()