/ backend / ci-watcher.py
ci-watcher.py
  1  #!/usr/bin/env python3
  2  """
  3  CI Run Watcher - Tracks Forgejo CI runs
  4  Detects completions via new task starts AND idle runner detection
  5  """
  6  
  7  import subprocess
  8  import json
  9  import time
 10  import re
 11  import threading
 12  from datetime import datetime
 13  from collections import defaultdict
 14  from pathlib import Path
 15  
 16  LOG_DIR = Path("/tmp/ci-monitor-data")
 17  LOG_DIR.mkdir(exist_ok=True)
 18  
 19  
 20  def discover_runners():
 21      """Dynamically discover all forgejo-runner-* systemd units.
 22  
 23      Returns a list of runner numbers (e.g., [1, 2, 3, ..., 14])
 24      """
 25      try:
 26          result = subprocess.run(
 27              ["systemctl", "list-units", "forgejo-runner-*", "--no-legend"],
 28              capture_output=True,
 29              text=True,
 30              check=True
 31          )
 32  
 33          runners = []
 34          for line in result.stdout.strip().split("\n"):
 35              if "forgejo-runner-" in line:
 36                  # Extract number from "forgejo-runner-N.service"
 37                  match = re.search(r'forgejo-runner-(\d+)\.service', line)
 38                  if match:
 39                      runners.append(int(match.group(1)))
 40  
 41          runners.sort()
 42          print(f"Discovered {len(runners)} runners: {runners}")
 43          return runners
 44      except Exception as e:
 45          print(f"Error discovering runners: {e}")
 46          # Fallback to 1-10 if discovery fails
 47          return list(range(1, 11))
 48  
 49  
 50  class CIWatcher:
 51      def __init__(self, runner_numbers=None):
 52          self.runner_numbers = runner_numbers or discover_runners()
 53          self.active_tasks = {}  # runner -> {task_id, start_time, repo}
 54          self.completed_count = 0
 55          self.events_file = open(LOG_DIR / "events.log", "a", buffering=1)
 56          self.lock = threading.Lock()
 57          
 58      def log_event(self, event_type, **kwargs):
 59          timestamp = datetime.now().isoformat()
 60          msg = f"{timestamp} [{event_type}] " + " ".join(f"{k}={v}" for k, v in kwargs.items())
 61          self.events_file.write(msg + "\n")
 62          
 63      def parse_runner_log(self, runner_name, line):
 64          with self.lock:
 65              # Task start pattern
 66              task_match = re.search(r'task (\d+) repo is ([^\s]+)', line)
 67              if task_match:
 68                  task_id = task_match.group(1)
 69                  repo = task_match.group(2)
 70                  
 71                  # If this runner had a previous task, mark it complete
 72                  if runner_name in self.active_tasks:
 73                      prev = self.active_tasks[runner_name]
 74                      duration = time.time() - prev["start_time"]
 75                      self.completed_count += 1
 76                      self.log_event("TASK_END", 
 77                                   runner=runner_name, 
 78                                   task=prev["task_id"],
 79                                   repo=prev["repo"].split("/")[-1],
 80                                   duration_s=f"{duration:.1f}",
 81                                   completed_total=self.completed_count)
 82                  
 83                  # Record new task
 84                  self.active_tasks[runner_name] = {
 85                      "task_id": task_id,
 86                      "start_time": time.time(),
 87                      "repo": repo
 88                  }
 89                  
 90                  load = self.get_load()
 91                  mem = self.get_memory_mb()
 92                  self.log_event("TASK_START", 
 93                               runner=runner_name, 
 94                               task=task_id, 
 95                               repo=repo,
 96                               load=f"{load:.2f}",
 97                               mem_mb=mem)
 98                               
 99      def get_load(self):
100          try:
101              with open("/proc/loadavg") as f:
102                  return float(f.read().split()[0])
103          except:
104              return 0.0
105              
106      def get_memory_mb(self):
107          try:
108              with open("/proc/meminfo") as f:
109                  lines = f.readlines()
110                  total = int(lines[0].split()[1])
111                  avail = int(lines[2].split()[1])
112                  return (total - avail) // 1024
113          except:
114              return 0
115  
116      def get_runner_pid(self, runner_num):
117          """Get the main PID of a forgejo-runner service."""
118          try:
119              result = subprocess.run(
120                  ["systemctl", "show", f"forgejo-runner-{runner_num}", 
121                   "--property=MainPID", "--value"],
122                  capture_output=True, text=True, timeout=5
123              )
124              pid = int(result.stdout.strip())
125              return pid if pid > 0 else None
126          except:
127              return None
128  
129      def runner_has_children(self, pid):
130          """Check if a runner process has any child processes (active job)."""
131          if not pid:
132              return False
133          try:
134              result = subprocess.run(
135                  ["ps", "--ppid", str(pid), "--no-headers"],
136                  capture_output=True, text=True, timeout=5
137              )
138              return len(result.stdout.strip()) > 0
139          except:
140              return False
141  
142      def check_idle_runners(self):
143          """Periodically check for runners that finished without starting new tasks."""
144          while True:
145              time.sleep(15)
146  
147              with self.lock:
148                  to_remove = []
149                  for i in self.runner_numbers:
150                      runner_name = f"runner-{i}"
151                      
152                      # Only check runners we think are active
153                      if runner_name not in self.active_tasks:
154                          continue
155                      
156                      pid = self.get_runner_pid(i)
157                      if pid and not self.runner_has_children(pid):
158                          prev = self.active_tasks[runner_name]
159                          duration = time.time() - prev["start_time"]
160                          
161                          if duration > 5:
162                              self.completed_count += 1
163                              self.log_event("TASK_END", 
164                                           runner=runner_name, 
165                                           task=prev["task_id"],
166                                           repo=prev["repo"].split("/")[-1],
167                                           duration_s=f"{duration:.1f}",
168                                           completed_total=self.completed_count,
169                                           detected_by="idle_check")
170                              to_remove.append(runner_name)
171                  
172                  for runner_name in to_remove:
173                      del self.active_tasks[runner_name]
174              
175      def watch_runner(self, runner_num):
176          # Use --since "now" to only get NEW log entries, not historical
177          cmd = ["journalctl", "-u", f"forgejo-runner-{runner_num}", "-f", 
178                 "--no-pager", "-o", "cat", "--since", "now"]
179          proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
180                                stderr=subprocess.DEVNULL, text=True)
181          runner_name = f"runner-{runner_num}"
182          
183          for line in proc.stdout:
184              self.parse_runner_log(runner_name, line.strip())
185              
186      def watch_workspaces(self):
187          workspace_dir = Path("/opt/ci/workspaces")
188          seen_dirs = set()
189          
190          while True:
191              try:
192                  current_dirs = set(d.name for d in workspace_dir.iterdir() if d.is_dir())
193                  new_dirs = current_dirs - seen_dirs
194                  removed_dirs = seen_dirs - current_dirs
195                  
196                  for d in new_dirs:
197                      self.log_event("WORKSPACE_CREATE", dir=d)
198                  for d in removed_dirs:
199                      self.log_event("WORKSPACE_REMOVE", dir=d)
200                      
201                  seen_dirs = current_dirs
202              except:
203                  pass
204              time.sleep(2)
205              
206      def collect_metrics(self):
207          while True:
208              load = self.get_load()
209              mem_mb = self.get_memory_mb()
210              
211              with self.lock:
212                  active = len(self.active_tasks)
213                  completed = self.completed_count
214              
215              # Update runners.json for the dashboard
216              runners = []
217              with self.lock:
218                  for i in self.runner_numbers:
219                      name = f"runner-{i}"
220                      if name in self.active_tasks:
221                          t = self.active_tasks[name]
222                          runners.append({
223                              "name": name,
224                              "task": t["task_id"],
225                              "repo": t["repo"].split("/")[-1],
226                              "status": "running"
227                          })
228                      else:
229                          runners.append({
230                              "name": name,
231                              "task": None,
232                              "repo": None,
233                              "status": "idle"
234                          })
235              
236              with open("/var/www/health/runners.json", "w") as f:
237                  json.dump(runners, f)
238  
239              # Update events.json for dashboard (last 100 events)
240              try:
241                  events = []
242                  events_log = LOG_DIR / "events.log"
243                  if events_log.exists():
244                      with open(events_log) as f:
245                          lines = f.readlines()[-100:]
246                      for line in lines:
247                          match = re.match(r'[\d\-T:.]+T(\d{2}:\d{2}:\d{2})\.\d+ \[(\w+)\] (.+)', line.strip())
248                          if match:
249                              events.append({
250                                  "time": match.group(1),
251                                  "type": match.group(2),
252                                  "message": match.group(3)
253                              })
254                  with open("/var/www/health/events.json", "w") as f:
255                      json.dump(events, f)
256              except:
257                  pass
258  
259              # Update metrics.json for dashboard
260              try:
261                  with open("/var/www/health/metrics.json", "w") as f:
262                      json.dump({
263                          "load": load,
264                          "mem_mb": mem_mb,
265                          "containers": 0,
266                          "active_tasks": active,
267                          "completed": completed
268                      }, f)
269              except:
270                  pass
271                  
272              time.sleep(10)
273              
274      def start(self):
275          self.log_event("MONITOR_START", runners=len(self.runner_numbers))
276  
277          threads = []
278          for i in self.runner_numbers:
279              t = threading.Thread(target=self.watch_runner, args=(i,), daemon=True)
280              t.start()
281              threads.append(t)
282              
283          t = threading.Thread(target=self.watch_workspaces, daemon=True)
284          t.start()
285          
286          t = threading.Thread(target=self.collect_metrics, daemon=True)
287          t.start()
288  
289          t = threading.Thread(target=self.check_idle_runners, daemon=True)
290          t.start()
291          
292          self.log_event("WATCHERS_STARTED", count=len(threads)+3)
293          
294          try:
295              while True:
296                  time.sleep(60)
297          except KeyboardInterrupt:
298              self.log_event("MONITOR_STOP", completed=self.completed_count)
299  
300  if __name__ == "__main__":
301      watcher = CIWatcher()
302      watcher.start()