ci-watcher.py
1 #!/usr/bin/env python3 2 """ 3 CI Run Watcher - Tracks Forgejo CI runs 4 Detects completions via new task starts AND idle runner detection 5 """ 6 7 import subprocess 8 import json 9 import time 10 import re 11 import threading 12 from datetime import datetime 13 from collections import defaultdict 14 from pathlib import Path 15 16 LOG_DIR = Path("/tmp/ci-monitor-data") 17 LOG_DIR.mkdir(exist_ok=True) 18 19 class CIWatcher: 20 def __init__(self): 21 self.active_tasks = {} # runner -> {task_id, start_time, repo} 22 self.completed_count = 0 23 self.events_file = open(LOG_DIR / "events.log", "a", buffering=1) 24 self.lock = threading.Lock() 25 26 def log_event(self, event_type, **kwargs): 27 timestamp = datetime.now().isoformat() 28 msg = f"{timestamp} [{event_type}] " + " ".join(f"{k}={v}" for k, v in kwargs.items()) 29 self.events_file.write(msg + "\n") 30 31 def parse_runner_log(self, runner_name, line): 32 with self.lock: 33 # Task start pattern 34 task_match = re.search(r'task (\d+) repo is ([^\s]+)', line) 35 if task_match: 36 task_id = task_match.group(1) 37 repo = task_match.group(2) 38 39 # If this runner had a previous task, mark it complete 40 if runner_name in self.active_tasks: 41 prev = self.active_tasks[runner_name] 42 duration = time.time() - prev["start_time"] 43 self.completed_count += 1 44 self.log_event("TASK_END", 45 runner=runner_name, 46 task=prev["task_id"], 47 repo=prev["repo"].split("/")[-1], 48 duration_s=f"{duration:.1f}", 49 completed_total=self.completed_count) 50 51 # Record new task 52 self.active_tasks[runner_name] = { 53 "task_id": task_id, 54 "start_time": time.time(), 55 "repo": repo 56 } 57 58 load = self.get_load() 59 mem = self.get_memory_mb() 60 self.log_event("TASK_START", 61 runner=runner_name, 62 task=task_id, 63 repo=repo, 64 load=f"{load:.2f}", 65 mem_mb=mem) 66 67 def get_load(self): 68 try: 69 with open("/proc/loadavg") as f: 70 return float(f.read().split()[0]) 71 except: 72 return 0.0 73 74 def get_memory_mb(self): 75 try: 76 with open("/proc/meminfo") as f: 77 lines = f.readlines() 78 total = int(lines[0].split()[1]) 79 avail = int(lines[2].split()[1]) 80 return (total - avail) // 1024 81 except: 82 return 0 83 84 def get_runner_pid(self, runner_num): 85 """Get the main PID of a forgejo-runner service.""" 86 try: 87 result = subprocess.run( 88 ["systemctl", "show", f"forgejo-runner-{runner_num}", 89 "--property=MainPID", "--value"], 90 capture_output=True, text=True, timeout=5 91 ) 92 pid = int(result.stdout.strip()) 93 return pid if pid > 0 else None 94 except: 95 return None 96 97 def runner_has_children(self, pid): 98 """Check if a runner process has any child processes (active job).""" 99 if not pid: 100 return False 101 try: 102 result = subprocess.run( 103 ["ps", "--ppid", str(pid), "--no-headers"], 104 capture_output=True, text=True, timeout=5 105 ) 106 return len(result.stdout.strip()) > 0 107 except: 108 return False 109 110 def check_idle_runners(self): 111 """Periodically check for runners that finished without starting new tasks.""" 112 while True: 113 time.sleep(15) 114 115 with self.lock: 116 to_remove = [] 117 for i in range(1, 6): 118 runner_name = f"runner-{i}" 119 120 # Only check runners we think are active 121 if runner_name not in self.active_tasks: 122 continue 123 124 pid = self.get_runner_pid(i) 125 if pid and not self.runner_has_children(pid): 126 prev = self.active_tasks[runner_name] 127 duration = time.time() - prev["start_time"] 128 129 if duration > 5: 130 self.completed_count += 1 131 self.log_event("TASK_END", 132 runner=runner_name, 133 task=prev["task_id"], 134 repo=prev["repo"].split("/")[-1], 135 duration_s=f"{duration:.1f}", 136 completed_total=self.completed_count, 137 detected_by="idle_check") 138 to_remove.append(runner_name) 139 140 for runner_name in to_remove: 141 del self.active_tasks[runner_name] 142 143 def watch_runner(self, runner_num): 144 # Use --since "now" to only get NEW log entries, not historical 145 cmd = ["journalctl", "-u", f"forgejo-runner-{runner_num}", "-f", 146 "--no-pager", "-o", "cat", "--since", "now"] 147 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 148 stderr=subprocess.DEVNULL, text=True) 149 runner_name = f"runner-{runner_num}" 150 151 for line in proc.stdout: 152 self.parse_runner_log(runner_name, line.strip()) 153 154 def watch_workspaces(self): 155 workspace_dir = Path("/opt/ci/workspaces") 156 seen_dirs = set() 157 158 while True: 159 try: 160 current_dirs = set(d.name for d in workspace_dir.iterdir() if d.is_dir()) 161 new_dirs = current_dirs - seen_dirs 162 removed_dirs = seen_dirs - current_dirs 163 164 for d in new_dirs: 165 self.log_event("WORKSPACE_CREATE", dir=d) 166 for d in removed_dirs: 167 self.log_event("WORKSPACE_REMOVE", dir=d) 168 169 seen_dirs = current_dirs 170 except: 171 pass 172 time.sleep(2) 173 174 def collect_metrics(self): 175 while True: 176 load = self.get_load() 177 mem_mb = self.get_memory_mb() 178 179 with self.lock: 180 active = len(self.active_tasks) 181 completed = self.completed_count 182 183 # Update runners.json for the dashboard 184 runners = [] 185 with self.lock: 186 for i in range(1, 6): 187 name = f"runner-{i}" 188 if name in self.active_tasks: 189 t = self.active_tasks[name] 190 runners.append({ 191 "name": name, 192 "task": t["task_id"], 193 "repo": t["repo"].split("/")[-1], 194 "status": "running" 195 }) 196 else: 197 runners.append({ 198 "name": name, 199 "task": None, 200 "repo": None, 201 "status": "idle" 202 }) 203 204 with open("/var/www/health/runners.json", "w") as f: 205 json.dump(runners, f) 206 207 # Update events.json for dashboard (last 100 events) 208 try: 209 events = [] 210 events_log = LOG_DIR / "events.log" 211 if events_log.exists(): 212 with open(events_log) as f: 213 lines = f.readlines()[-100:] 214 for line in lines: 215 match = re.match(r'[\d\-T:.]+T(\d{2}:\d{2}:\d{2})\.\d+ \[(\w+)\] (.+)', line.strip()) 216 if match: 217 events.append({ 218 "time": match.group(1), 219 "type": match.group(2), 220 "message": match.group(3) 221 }) 222 with open("/var/www/health/events.json", "w") as f: 223 json.dump(events, f) 224 except: 225 pass 226 227 # Update metrics.json for dashboard 228 try: 229 with open("/var/www/health/metrics.json", "w") as f: 230 json.dump({ 231 "load": load, 232 "mem_mb": mem_mb, 233 "containers": 0, 234 "active_tasks": active, 235 "completed": completed 236 }, f) 237 except: 238 pass 239 240 time.sleep(10) 241 242 def start(self): 243 self.log_event("MONITOR_START", runners=5) 244 245 threads = [] 246 for i in range(1, 6): 247 t = threading.Thread(target=self.watch_runner, args=(i,), daemon=True) 248 t.start() 249 threads.append(t) 250 251 t = threading.Thread(target=self.watch_workspaces, daemon=True) 252 t.start() 253 254 t = threading.Thread(target=self.collect_metrics, daemon=True) 255 t.start() 256 257 t = threading.Thread(target=self.check_idle_runners, daemon=True) 258 t.start() 259 260 self.log_event("WATCHERS_STARTED", count=len(threads)+3) 261 262 try: 263 while True: 264 time.sleep(60) 265 except KeyboardInterrupt: 266 self.log_event("MONITOR_STOP", completed=self.completed_count) 267 268 if __name__ == "__main__": 269 watcher = CIWatcher() 270 watcher.start()