/ backend / ci-watcher.py
ci-watcher.py
  1  #!/usr/bin/env python3
  2  """
  3  CI Run Watcher - Tracks Forgejo CI runs
  4  Detects completions via new task starts AND idle runner detection
  5  """
  6  
  7  import subprocess
  8  import json
  9  import time
 10  import re
 11  import threading
 12  from datetime import datetime
 13  from collections import defaultdict
 14  from pathlib import Path
 15  
 16  LOG_DIR = Path("/tmp/ci-monitor-data")
 17  LOG_DIR.mkdir(exist_ok=True)
 18  
 19  class CIWatcher:
 20      def __init__(self):
 21          self.active_tasks = {}  # runner -> {task_id, start_time, repo}
 22          self.completed_count = 0
 23          self.events_file = open(LOG_DIR / "events.log", "a", buffering=1)
 24          self.lock = threading.Lock()
 25          
 26      def log_event(self, event_type, **kwargs):
 27          timestamp = datetime.now().isoformat()
 28          msg = f"{timestamp} [{event_type}] " + " ".join(f"{k}={v}" for k, v in kwargs.items())
 29          self.events_file.write(msg + "\n")
 30          
 31      def parse_runner_log(self, runner_name, line):
 32          with self.lock:
 33              # Task start pattern
 34              task_match = re.search(r'task (\d+) repo is ([^\s]+)', line)
 35              if task_match:
 36                  task_id = task_match.group(1)
 37                  repo = task_match.group(2)
 38                  
 39                  # If this runner had a previous task, mark it complete
 40                  if runner_name in self.active_tasks:
 41                      prev = self.active_tasks[runner_name]
 42                      duration = time.time() - prev["start_time"]
 43                      self.completed_count += 1
 44                      self.log_event("TASK_END", 
 45                                   runner=runner_name, 
 46                                   task=prev["task_id"],
 47                                   repo=prev["repo"].split("/")[-1],
 48                                   duration_s=f"{duration:.1f}",
 49                                   completed_total=self.completed_count)
 50                  
 51                  # Record new task
 52                  self.active_tasks[runner_name] = {
 53                      "task_id": task_id,
 54                      "start_time": time.time(),
 55                      "repo": repo
 56                  }
 57                  
 58                  load = self.get_load()
 59                  mem = self.get_memory_mb()
 60                  self.log_event("TASK_START", 
 61                               runner=runner_name, 
 62                               task=task_id, 
 63                               repo=repo,
 64                               load=f"{load:.2f}",
 65                               mem_mb=mem)
 66                               
 67      def get_load(self):
 68          try:
 69              with open("/proc/loadavg") as f:
 70                  return float(f.read().split()[0])
 71          except:
 72              return 0.0
 73              
 74      def get_memory_mb(self):
 75          try:
 76              with open("/proc/meminfo") as f:
 77                  lines = f.readlines()
 78                  total = int(lines[0].split()[1])
 79                  avail = int(lines[2].split()[1])
 80                  return (total - avail) // 1024
 81          except:
 82              return 0
 83  
 84      def get_runner_pid(self, runner_num):
 85          """Get the main PID of a forgejo-runner service."""
 86          try:
 87              result = subprocess.run(
 88                  ["systemctl", "show", f"forgejo-runner-{runner_num}", 
 89                   "--property=MainPID", "--value"],
 90                  capture_output=True, text=True, timeout=5
 91              )
 92              pid = int(result.stdout.strip())
 93              return pid if pid > 0 else None
 94          except:
 95              return None
 96  
 97      def runner_has_children(self, pid):
 98          """Check if a runner process has any child processes (active job)."""
 99          if not pid:
100              return False
101          try:
102              result = subprocess.run(
103                  ["ps", "--ppid", str(pid), "--no-headers"],
104                  capture_output=True, text=True, timeout=5
105              )
106              return len(result.stdout.strip()) > 0
107          except:
108              return False
109  
110      def check_idle_runners(self):
111          """Periodically check for runners that finished without starting new tasks."""
112          while True:
113              time.sleep(15)
114              
115              with self.lock:
116                  to_remove = []
117                  for i in range(1, 6):
118                      runner_name = f"runner-{i}"
119                      
120                      # Only check runners we think are active
121                      if runner_name not in self.active_tasks:
122                          continue
123                      
124                      pid = self.get_runner_pid(i)
125                      if pid and not self.runner_has_children(pid):
126                          prev = self.active_tasks[runner_name]
127                          duration = time.time() - prev["start_time"]
128                          
129                          if duration > 5:
130                              self.completed_count += 1
131                              self.log_event("TASK_END", 
132                                           runner=runner_name, 
133                                           task=prev["task_id"],
134                                           repo=prev["repo"].split("/")[-1],
135                                           duration_s=f"{duration:.1f}",
136                                           completed_total=self.completed_count,
137                                           detected_by="idle_check")
138                              to_remove.append(runner_name)
139                  
140                  for runner_name in to_remove:
141                      del self.active_tasks[runner_name]
142              
143      def watch_runner(self, runner_num):
144          # Use --since "now" to only get NEW log entries, not historical
145          cmd = ["journalctl", "-u", f"forgejo-runner-{runner_num}", "-f", 
146                 "--no-pager", "-o", "cat", "--since", "now"]
147          proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
148                                stderr=subprocess.DEVNULL, text=True)
149          runner_name = f"runner-{runner_num}"
150          
151          for line in proc.stdout:
152              self.parse_runner_log(runner_name, line.strip())
153              
154      def watch_workspaces(self):
155          workspace_dir = Path("/opt/ci/workspaces")
156          seen_dirs = set()
157          
158          while True:
159              try:
160                  current_dirs = set(d.name for d in workspace_dir.iterdir() if d.is_dir())
161                  new_dirs = current_dirs - seen_dirs
162                  removed_dirs = seen_dirs - current_dirs
163                  
164                  for d in new_dirs:
165                      self.log_event("WORKSPACE_CREATE", dir=d)
166                  for d in removed_dirs:
167                      self.log_event("WORKSPACE_REMOVE", dir=d)
168                      
169                  seen_dirs = current_dirs
170              except:
171                  pass
172              time.sleep(2)
173              
174      def collect_metrics(self):
175          while True:
176              load = self.get_load()
177              mem_mb = self.get_memory_mb()
178              
179              with self.lock:
180                  active = len(self.active_tasks)
181                  completed = self.completed_count
182              
183              # Update runners.json for the dashboard
184              runners = []
185              with self.lock:
186                  for i in range(1, 6):
187                      name = f"runner-{i}"
188                      if name in self.active_tasks:
189                          t = self.active_tasks[name]
190                          runners.append({
191                              "name": name,
192                              "task": t["task_id"],
193                              "repo": t["repo"].split("/")[-1],
194                              "status": "running"
195                          })
196                      else:
197                          runners.append({
198                              "name": name,
199                              "task": None,
200                              "repo": None,
201                              "status": "idle"
202                          })
203              
204              with open("/var/www/health/runners.json", "w") as f:
205                  json.dump(runners, f)
206  
207              # Update events.json for dashboard (last 100 events)
208              try:
209                  events = []
210                  events_log = LOG_DIR / "events.log"
211                  if events_log.exists():
212                      with open(events_log) as f:
213                          lines = f.readlines()[-100:]
214                      for line in lines:
215                          match = re.match(r'[\d\-T:.]+T(\d{2}:\d{2}:\d{2})\.\d+ \[(\w+)\] (.+)', line.strip())
216                          if match:
217                              events.append({
218                                  "time": match.group(1),
219                                  "type": match.group(2),
220                                  "message": match.group(3)
221                              })
222                  with open("/var/www/health/events.json", "w") as f:
223                      json.dump(events, f)
224              except:
225                  pass
226  
227              # Update metrics.json for dashboard
228              try:
229                  with open("/var/www/health/metrics.json", "w") as f:
230                      json.dump({
231                          "load": load,
232                          "mem_mb": mem_mb,
233                          "containers": 0,
234                          "active_tasks": active,
235                          "completed": completed
236                      }, f)
237              except:
238                  pass
239                  
240              time.sleep(10)
241              
242      def start(self):
243          self.log_event("MONITOR_START", runners=5)
244          
245          threads = []
246          for i in range(1, 6):
247              t = threading.Thread(target=self.watch_runner, args=(i,), daemon=True)
248              t.start()
249              threads.append(t)
250              
251          t = threading.Thread(target=self.watch_workspaces, daemon=True)
252          t.start()
253          
254          t = threading.Thread(target=self.collect_metrics, daemon=True)
255          t.start()
256  
257          t = threading.Thread(target=self.check_idle_runners, daemon=True)
258          t.start()
259          
260          self.log_event("WATCHERS_STARTED", count=len(threads)+3)
261          
262          try:
263              while True:
264                  time.sleep(60)
265          except KeyboardInterrupt:
266              self.log_event("MONITOR_STOP", completed=self.completed_count)
267  
268  if __name__ == "__main__":
269      watcher = CIWatcher()
270      watcher.start()