/ scripts / profile-tui.py
profile-tui.py
  1  #!/usr/bin/env python3
  2  """Drive the Hermes TUI under HERMES_DEV_PERF and summarize the pipeline.
  3  
  4  Usage:
  5    scripts/profile-tui.py [--session SID] [--hold KEY] [--seconds N] [--rate HZ]
  6  
  7  Defaults: picks the session with the most messages, holds PageUp for 8s at
  8  ~30 Hz (matching xterm key-repeat), summarizes ~/.hermes/perf.log on exit.
  9  
 10  The --tui build must exist (run `npm run build` in ui-tui first). This script
 11  launches `node dist/entry.js` directly with HERMES_TUI_RESUME set so it
 12  bypasses the hermes_cli wrapper — we want repeatable timing, not the CLI's
 13  session-picker flow.
 14  
 15  Environment overrides:
 16    HERMES_PERF_LOG     (default ~/.hermes/perf.log)
 17    HERMES_PERF_NODE    (default node from $PATH)
 18    HERMES_TUI_DIR      (default /home/bb/hermes-agent/ui-tui)
 19  
 20  Exit code is 0 if the harness ran and parsed results, 2 if the TUI crashed
 21  or produced no perf data (suggests HERMES_DEV_PERF wiring is broken).
 22  """
 23  
 24  from __future__ import annotations
 25  
 26  import argparse
 27  import json
 28  import os
 29  import pty
 30  import select
 31  import signal
 32  import sqlite3
 33  import sys
 34  import time
 35  from pathlib import Path
 36  from typing import Any
 37  
 38  _PROJECT_ROOT = Path(__file__).resolve().parent.parent
 39  sys.path.insert(0, str(_PROJECT_ROOT))
 40  try:
 41      from hermes_constants import get_hermes_home
 42  except ImportError:
 43      def get_hermes_home() -> Path:  # type: ignore[misc]
 44          val = (os.environ.get("HERMES_HOME") or "").strip()
 45          return Path(val) if val else Path.home() / ".hermes"
 46  
 47  DEFAULT_TUI_DIR = Path(os.environ.get("HERMES_TUI_DIR", "/home/bb/hermes-agent/ui-tui"))
 48  DEFAULT_LOG = Path(os.environ.get("HERMES_PERF_LOG", str(get_hermes_home() / "perf.log")))
 49  DEFAULT_STATE_DB = get_hermes_home() / "state.db"
 50  
 51  # Keystroke escape sequences.  Matches what xterm/VT220 send when the
 52  # terminal has bracketed-paste disabled and the key-repeat handler fires.
 53  KEYS = {
 54      "page_up": b"\x1b[5~",
 55      "page_down": b"\x1b[6~",
 56      "wheel_up": b"\x1b[M`!!",      # mouse wheel up (SGR-less) — best-effort
 57      "shift_up": b"\x1b[1;2A",
 58      "shift_down": b"\x1b[1;2B",
 59  }
 60  
 61  
 62  def pick_longest_session(db: Path) -> str:
 63      conn = sqlite3.connect(db)
 64      row = conn.execute(
 65          "SELECT id FROM sessions s ORDER BY "
 66          "(SELECT COUNT(*) FROM messages m WHERE m.session_id = s.id) DESC LIMIT 1"
 67      ).fetchone()
 68      if not row:
 69          sys.exit(f"no sessions in {db}")
 70      return row[0]
 71  
 72  
 73  def drain(fd: int, timeout: float) -> bytes:
 74      """Read whatever's available from fd within `timeout`, then return."""
 75      chunks = []
 76      end = time.monotonic() + timeout
 77      while time.monotonic() < end:
 78          r, _, _ = select.select([fd], [], [], max(0.0, end - time.monotonic()))
 79          if not r:
 80              break
 81          try:
 82              data = os.read(fd, 4096)
 83          except OSError:
 84              break
 85          if not data:
 86              break
 87          chunks.append(data)
 88      return b"".join(chunks)
 89  
 90  
 91  def hold_key(fd: int, seq: bytes, seconds: float, rate_hz: int) -> int:
 92      """Write `seq` to fd at ~rate_hz for `seconds`. Returns keystrokes sent."""
 93      interval = 1.0 / max(1, rate_hz)
 94      end = time.monotonic() + seconds
 95      sent = 0
 96      while time.monotonic() < end:
 97          try:
 98              os.write(fd, seq)
 99              sent += 1
100          except OSError:
101              break
102          # Drain stdout to keep the PTY buffer flowing; ignore content.
103          drain(fd, 0)
104          time.sleep(interval)
105      return sent
106  
107  
108  def summarize(log: Path, since_ts_ms: int) -> dict[str, Any]:
109      """Parse perf.log, keep only events newer than since_ts_ms, return stats."""
110      react_events: list[dict[str, Any]] = []
111      frame_events: list[dict[str, Any]] = []
112      if not log.exists():
113          return {"error": f"no log at {log}", "react": [], "frame": []}
114      for line in log.read_text().splitlines():
115          line = line.strip()
116          if not line:
117              continue
118          try:
119              row = json.loads(line)
120          except json.JSONDecodeError:
121              continue
122          if int(row.get("ts", 0)) < since_ts_ms:
123              continue
124          src = row.get("src")
125          if src == "react":
126              react_events.append(row)
127          elif src == "frame":
128              frame_events.append(row)
129  
130      return {
131          "react": react_events,
132          "frame": frame_events,
133      }
134  
135  
136  def pct(values: list[float], p: float) -> float:
137      if not values:
138          return 0.0
139      s = sorted(values)
140      idx = min(len(s) - 1, int(len(s) * p))
141      return s[idx]
142  
143  
144  def format_report(data: dict[str, Any]) -> str:
145      react = data.get("react") or []
146      frames = data.get("frame") or []
147      out = []
148  
149      out.append("═══ React Profiler ═══")
150      if not react:
151          out.append("  (no react events — HERMES_DEV_PERF wired? threshold too high?)")
152      else:
153          by_id: dict[str, list[float]] = {}
154          for r in react:
155              by_id.setdefault(r["id"], []).append(r["actualMs"])
156          out.append(f"  {'pane':<14} {'count':>6} {'p50':>8} {'p95':>8} {'p99':>8} {'max':>8}")
157          for pid, ms in sorted(by_id.items(), key=lambda kv: -pct(kv[1], 0.99)):
158              out.append(
159                  f"  {pid:<14} {len(ms):>6} {pct(ms,0.50):>8.2f} {pct(ms,0.95):>8.2f} "
160                  f"{pct(ms,0.99):>8.2f} {max(ms):>8.2f}"
161              )
162  
163      out.append("")
164      out.append("═══ Ink pipeline ═══")
165      if not frames:
166          out.append("  (no frame events — onFrame wiring broken?)")
167      else:
168          dur = [f["durationMs"] for f in frames]
169          phases_present = any(f.get("phases") for f in frames)
170          out.append(f"  frames captured: {len(frames)}")
171          out.append(
172              f"  durationMs  p50={pct(dur,0.50):.2f}  p95={pct(dur,0.95):.2f}  "
173              f"p99={pct(dur,0.99):.2f}  max={max(dur):.2f}"
174          )
175          # Effective FPS during the run: frames / elapsed seconds.
176          ts = sorted(f["ts"] for f in frames)
177          if len(ts) >= 2:
178              elapsed_s = (ts[-1] - ts[0]) / 1000.0
179              fps = len(frames) / elapsed_s if elapsed_s > 0 else float("inf")
180              out.append(f"  throughput: {len(frames)} frames / {elapsed_s:.2f}s = {fps:.1f} fps")
181  
182          if phases_present:
183              fields = ["yoga", "renderer", "diff", "optimize", "write", "commit"]
184              out.append("")
185              out.append(f"  {'phase':<10} {'p50':>8} {'p95':>8} {'p99':>8} {'max':>8}   (ms)")
186              for field in fields:
187                  vals = [f["phases"][field] for f in frames if f.get("phases")]
188                  if vals:
189                      out.append(
190                          f"  {field:<10} {pct(vals,0.50):>8.2f} {pct(vals,0.95):>8.2f} "
191                          f"{pct(vals,0.99):>8.2f} {max(vals):>8.2f}"
192                      )
193              # Derived: sum of phases vs durationMs (reveals hidden time).
194              sum_ps = [
195                  sum(f["phases"][k] for k in fields)
196                  for f in frames if f.get("phases")
197              ]
198              if sum_ps:
199                  dur_match = [f["durationMs"] for f in frames if f.get("phases")]
200                  deltas = [d - s for d, s in zip(dur_match, sum_ps)]
201                  out.append(
202                      f"  {'dur-Σphases':<10} {pct(deltas,0.50):>8.2f} {pct(deltas,0.95):>8.2f} "
203                      f"{pct(deltas,0.99):>8.2f} {max(deltas):>8.2f}   (unaccounted-for time)"
204                  )
205  
206              # Yoga counters
207              visited = [f["phases"]["yogaVisited"] for f in frames if f.get("phases")]
208              measured = [f["phases"]["yogaMeasured"] for f in frames if f.get("phases")]
209              cache_hits = [f["phases"]["yogaCacheHits"] for f in frames if f.get("phases")]
210              live = [f["phases"]["yogaLive"] for f in frames if f.get("phases")]
211              out.append("")
212              out.append("  Yoga counters (per frame):")
213              for name, vals in (
214                  ("visited", visited),
215                  ("measured", measured),
216                  ("cacheHits", cache_hits),
217                  ("live", live),
218              ):
219                  if vals:
220                      out.append(f"    {name:<11} p50={pct(vals,0.5):.0f}  p99={pct(vals,0.99):.0f}  max={max(vals)}")
221  
222              # Patch counts — proxy for "how much changed each frame"
223              patches = [f["phases"]["patches"] for f in frames if f.get("phases")]
224              if patches:
225                  out.append(
226                      f"  patches     p50={pct(patches,0.5):.0f}  p99={pct(patches,0.99):.0f}  "
227                      f"max={max(patches)}  total={sum(patches)}"
228                  )
229              optimized = [
230                  f["phases"].get("optimizedPatches", 0)
231                  for f in frames if f.get("phases")
232              ]
233              if any(optimized):
234                  out.append(
235                      f"  optimized   p50={pct(optimized,0.5):.0f}  p99={pct(optimized,0.99):.0f}  "
236                      f"max={max(optimized)}  total={sum(optimized)}"
237                      f"  (ratio: {sum(optimized)/max(1,sum(patches)):.2f})"
238                  )
239  
240              # Write bytes + drain telemetry — the outer-terminal bottleneck gauge.
241              bytes_written = [
242                  f["phases"].get("writeBytes", 0)
243                  for f in frames if f.get("phases")
244              ]
245              if any(bytes_written):
246                  total_b = sum(bytes_written)
247                  kb = total_b / 1024
248                  out.append(
249                      f"  writeBytes  p50={pct(bytes_written,0.5):.0f}B  p99={pct(bytes_written,0.99):.0f}B  "
250                      f"max={max(bytes_written)}B  total={kb:.1f}KB"
251                  )
252              drains = [
253                  f["phases"].get("prevFrameDrainMs", 0)
254                  for f in frames if f.get("phases")
255              ]
256              if any(d > 0 for d in drains):
257                  nonzero = [d for d in drains if d > 0]
258                  out.append(
259                      f"  drainMs     p50={pct(nonzero,0.5):.2f}  p95={pct(nonzero,0.95):.2f}  "
260                      f"p99={pct(nonzero,0.99):.2f}  max={max(nonzero):.2f}   (terminal flush latency)"
261                  )
262              backpressure = sum(1 for f in frames if f.get("phases", {}).get("backpressure"))
263              if backpressure:
264                  out.append(
265                      f"  backpressure: {backpressure}/{len(frames)} frames "
266                      f"({100*backpressure/len(frames):.0f}%)   (Node stdout buffer full — terminal slow)"
267                  )
268  
269          # Flickers
270          flicker_frames = [f for f in frames if f.get("flickers")]
271          if flicker_frames:
272              out.append("")
273              out.append(f"  ⚠ flickers detected in {len(flicker_frames)} frames")
274              reasons: dict[str, int] = {}
275              for f in flicker_frames:
276                  for fl in f["flickers"]:
277                      reasons[fl["reason"]] = reasons.get(fl["reason"], 0) + 1
278              for reason, n in sorted(reasons.items(), key=lambda kv: -kv[1]):
279                  out.append(f"    {reason}: {n}")
280  
281      return "\n".join(out)
282  
283  
284  def key_metrics(data: dict[str, Any]) -> dict[str, float]:
285      """Flatten the report into a dict of scalar metrics for A/B diffing."""
286      metrics: dict[str, float] = {}
287      frames = data.get("frame") or []
288      react = data.get("react") or []
289  
290      if frames:
291          durs = [f["durationMs"] for f in frames]
292          metrics["frames"] = len(frames)
293          metrics["dur_p50"] = pct(durs, 0.50)
294          metrics["dur_p95"] = pct(durs, 0.95)
295          metrics["dur_p99"] = pct(durs, 0.99)
296          metrics["dur_max"] = max(durs)
297  
298          ts = sorted(f["ts"] for f in frames)
299          if len(ts) >= 2:
300              elapsed = (ts[-1] - ts[0]) / 1000.0
301              metrics["fps_throughput"] = len(frames) / elapsed if elapsed > 0 else 0.0
302              # Interframe gaps distribution — complementary view to throughput:
303              gaps = [ts[i] - ts[i - 1] for i in range(1, len(ts))]
304              if gaps:
305                  metrics["gap_p50_ms"] = pct(gaps, 0.50)
306                  metrics["gap_p99_ms"] = pct(gaps, 0.99)
307                  metrics["gaps_under_16ms"] = sum(1 for g in gaps if g < 16)
308                  metrics["gaps_over_200ms"] = sum(1 for g in gaps if g >= 200)
309  
310          for phase in ("renderer", "yoga", "diff", "write"):
311              vals = [f["phases"][phase] for f in frames if f.get("phases")]
312              if vals:
313                  metrics[f"{phase}_p99"] = pct(vals, 0.99)
314                  metrics[f"{phase}_max"] = max(vals)
315  
316          patches = [f["phases"]["patches"] for f in frames if f.get("phases")]
317          if patches:
318              metrics["patches_total"] = sum(patches)
319              metrics["patches_p99"] = pct(patches, 0.99)
320  
321          optimized = [
322              f["phases"].get("optimizedPatches", 0) for f in frames if f.get("phases")
323          ]
324          if any(optimized):
325              metrics["optimized_total"] = sum(optimized)
326  
327          bytes_list = [
328              f["phases"].get("writeBytes", 0) for f in frames if f.get("phases")
329          ]
330          if any(bytes_list):
331              metrics["writeBytes_total"] = sum(bytes_list)
332  
333          drains = [
334              f["phases"].get("prevFrameDrainMs", 0)
335              for f in frames if f.get("phases")
336          ]
337          drain_nonzero = [d for d in drains if d > 0]
338          if drain_nonzero:
339              metrics["drain_p99"] = pct(drain_nonzero, 0.99)
340              metrics["drain_max"] = max(drain_nonzero)
341  
342          bp = sum(1 for f in frames if f.get("phases", {}).get("backpressure"))
343          metrics["backpressure_frames"] = bp
344  
345      if react:
346          for pid in set(e["id"] for e in react):
347              ms = [e["actualMs"] for e in react if e["id"] == pid]
348              metrics[f"react_{pid}_p99"] = pct(ms, 0.99)
349              metrics[f"react_{pid}_max"] = max(ms)
350  
351      return metrics
352  
353  
354  def format_diff(before: dict[str, float], after: dict[str, float]) -> str:
355      """Render a side-by-side A/B comparison table."""
356      keys = sorted(set(before) | set(after))
357      lines = [f"{'metric':<28} {'before':>12} {'after':>12} {'delta':>12}  {'%':>6}"]
358      lines.append("─" * 76)
359      for k in keys:
360          b = before.get(k, 0.0)
361          a = after.get(k, 0.0)
362          d = a - b
363          pct_change = ((a / b) - 1) * 100 if b not in (0, 0.0) else float("inf") if a else 0
364  
365          # Flag improvements vs regressions. For _p99 / _max / _total / gaps_over /
366          # patches / writeBytes / backpressure, LOWER is better.  For fps / gaps_under,
367          # HIGHER is better.
368          lower_is_better = any(
369              token in k
370              for token in (
371                  "p50",
372                  "p95",
373                  "p99",
374                  "_max",
375                  "_total",
376                  "gaps_over",
377                  "backpressure",
378                  "drain",
379              )
380          )
381          higher_is_better = "fps_" in k or "gaps_under" in k
382          mark = ""
383          if d and not (lower_is_better or higher_is_better):
384              mark = ""
385          elif d < 0 and lower_is_better:
386              mark = "↓"
387          elif d > 0 and higher_is_better:
388              mark = "↑"
389          elif d > 0 and lower_is_better:
390              mark = "↑"  # regression
391          elif d < 0 and higher_is_better:
392              mark = "↓"  # regression
393  
394          pct_str = "—" if pct_change == float("inf") else f"{pct_change:+6.1f}%"
395          lines.append(
396              f"{k:<28} {b:>12.2f} {a:>12.2f} {d:>+12.2f}  {pct_str} {mark}"
397          )
398  
399      return "\n".join(lines)
400  
401  
402  def run_once(args: argparse.Namespace) -> dict[str, Any]:
403      tui_dir = Path(args.tui_dir).resolve()
404      entry = tui_dir / "dist" / "entry.js"
405      if not entry.exists():
406          sys.exit(f"{entry} missing — run `npm run build` in {tui_dir} first")
407  
408      sid = args.session or pick_longest_session(DEFAULT_STATE_DB)
409      print(f"• session: {sid}")
410      print(f"• hold: {args.hold} x {args.rate}Hz for {args.seconds}s after {args.warmup}s warmup")
411      print(f"• terminal: {args.cols}x{args.rows}")
412  
413      log = Path(args.log)
414      if not args.keep_log and log.exists():
415          log.unlink()
416  
417      since_ms = int(time.time() * 1000)
418  
419      env = os.environ.copy()
420      env["HERMES_DEV_PERF"] = "1"
421      env["HERMES_DEV_PERF_MS"] = str(args.threshold_ms)
422      env["HERMES_DEV_PERF_LOG"] = str(log)
423      env["HERMES_TUI_RESUME"] = sid
424      env["COLUMNS"] = str(args.cols)
425      env["LINES"] = str(args.rows)
426      env["TERM"] = env.get("TERM", "xterm-256color")
427  
428      # Pass through extra flags the TUI wrapper recognizes (e.g. --no-fullscreen).
429      # Stored on args as `extra_flags` list.
430      node = os.environ.get("HERMES_PERF_NODE", "node")
431      node_args = [node, str(entry), *getattr(args, "extra_flags", [])]
432  
433      pid, fd = pty.fork()
434      if pid == 0:
435          os.execvpe(node, node_args, env)
436  
437      try:
438          import fcntl, struct, termios
439          winsize = struct.pack("HHHH", args.rows, args.cols, 0, 0)
440          fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
441  
442          print(f"• pid: {pid}  fd: {fd}")
443          print(f"• warmup {args.warmup}s (drain startup output)…")
444          drain(fd, args.warmup)
445  
446          print(f"• holding {args.hold}…")
447          sent = hold_key(fd, KEYS[args.hold], args.seconds, args.rate)
448          print(f"  sent {sent} keystrokes")
449  
450          drain(fd, 0.5)
451      finally:
452          try:
453              os.kill(pid, signal.SIGTERM)
454              for _ in range(10):
455                  pid_done, _ = os.waitpid(pid, os.WNOHANG)
456                  if pid_done == pid:
457                      break
458                  time.sleep(0.1)
459              else:
460                  os.kill(pid, signal.SIGKILL)
461                  os.waitpid(pid, 0)
462          except (ProcessLookupError, ChildProcessError):
463              pass
464          try:
465              os.close(fd)
466          except OSError:
467              pass
468  
469      time.sleep(0.2)
470      return summarize(log, since_ms)
471  
472  
473  def main() -> int:
474      p = argparse.ArgumentParser()
475      p.add_argument("--session", help="session id to resume (default: longest in db)")
476      p.add_argument("--hold", default="page_up", choices=sorted(KEYS.keys()), help="key to hold")
477      p.add_argument("--seconds", type=float, default=8.0, help="how long to hold the key")
478      p.add_argument("--rate", type=int, default=30, help="keystrokes per second")
479      p.add_argument("--warmup", type=float, default=3.0, help="seconds to wait after launch before input")
480      p.add_argument("--threshold-ms", type=float, default=0.0, help="HERMES_DEV_PERF_MS (0 = capture all)")
481      p.add_argument("--cols", type=int, default=120)
482      p.add_argument("--rows", type=int, default=40)
483      p.add_argument("--keep-log", action="store_true", help="don't wipe perf.log before run")
484      p.add_argument("--tui-dir", default=str(DEFAULT_TUI_DIR))
485      p.add_argument("--log", default=str(DEFAULT_LOG))
486      p.add_argument("--save", metavar="LABEL",
487                     help="save the final metrics as /tmp/perf-<LABEL>.json for later --compare")
488      p.add_argument("--compare", metavar="LABEL",
489                     help="diff against /tmp/perf-<LABEL>.json after running")
490      p.add_argument("--loop", action="store_true",
491                     help="watch for source changes, rebuild, rerun, and diff vs previous run")
492      p.add_argument("--extra-flag", dest="extra_flags", action="append", default=[],
493                     help="pass through to node dist/entry.js (repeatable)")
494      args = p.parse_args()
495  
496      if args.loop:
497          return loop_mode(args)
498  
499      # Single-shot path.
500      data = run_once(args)
501      print()
502      print(format_report(data))
503  
504      metrics = key_metrics(data)
505  
506      if args.save:
507          path = Path(f"/tmp/perf-{args.save}.json")
508          path.write_text(json.dumps(metrics, indent=2))
509          print(f"\n• saved: {path}")
510  
511      if args.compare:
512          path = Path(f"/tmp/perf-{args.compare}.json")
513          if not path.exists():
514              print(f"\n⚠ no baseline at {path} — run with --save {args.compare} first")
515          else:
516              before = json.loads(path.read_text())
517              print(f"\n═══ A/B diff vs /tmp/perf-{args.compare}.json ═══")
518              print(format_diff(before, metrics))
519  
520      if not data["react"] and not data["frame"]:
521          return 2
522      return 0
523  
524  
525  def loop_mode(args: argparse.Namespace) -> int:
526      """Watch source files, rebuild, rerun, print A/B diff against previous run.
527  
528      Keeps a rolling 'previous run' baseline in memory so each iteration
529      reports delta vs the last one — visibility into whether the last
530      edit moved the needle.  Press Ctrl+C to stop.
531      """
532      import subprocess
533  
534      tui_dir = Path(args.tui_dir).resolve()
535      src_root = tui_dir / "src"
536      pkg_root = tui_dir / "packages" / "hermes-ink" / "src"
537  
538      def collect_mtimes() -> dict[str, float]:
539          mtimes: dict[str, float] = {}
540          for root in (src_root, pkg_root):
541              if not root.exists():
542                  continue
543              for path in root.rglob("*"):
544                  if path.suffix in {".ts", ".tsx"} and "__tests__" not in str(path):
545                      try:
546                          mtimes[str(path)] = path.stat().st_mtime
547                      except OSError:
548                          pass
549          return mtimes
550  
551      previous_metrics: dict[str, float] | None = None
552      previous_mtimes = collect_mtimes()
553      iteration = 0
554  
555      print(f"• loop mode — watching {src_root} + {pkg_root} for *.ts(x) changes")
556      print("• edit any TS file, the harness rebuilds + reruns automatically")
557      print("• Ctrl+C to stop\n")
558  
559      try:
560          while True:
561              iteration += 1
562              print(f"\n{'═' * 76}")
563              print(f"Iteration {iteration}  @ {time.strftime('%H:%M:%S')}")
564              print("═" * 76)
565  
566              if iteration > 1:
567                  print("• rebuilding…")
568                  result = subprocess.run(
569                      ["npm", "run", "build"],
570                      cwd=tui_dir,
571                      capture_output=True,
572                      text=True,
573                  )
574                  if result.returncode != 0:
575                      print("✗ build failed:")
576                      print(result.stdout[-2000:])
577                      print(result.stderr[-2000:])
578                      print("\n• waiting for source changes to retry…")
579                      previous_mtimes = wait_for_change(previous_mtimes, collect_mtimes)
580                      continue
581                  print("✓ build ok")
582  
583              data = run_once(args)
584              metrics = key_metrics(data)
585  
586              print()
587              print(format_report(data))
588  
589              if previous_metrics is not None:
590                  print(f"\n═══ A/B diff vs iteration {iteration - 1} ═══")
591                  print(format_diff(previous_metrics, metrics))
592  
593              previous_metrics = metrics
594  
595              print("\n• waiting for source changes…")
596              previous_mtimes = wait_for_change(previous_mtimes, collect_mtimes)
597      except KeyboardInterrupt:
598          print("\n• loop stopped")
599          return 0
600  
601  
602  def wait_for_change(prev: dict[str, float], collect) -> dict[str, float]:
603      """Poll every 1s until a watched file's mtime changes. Debounced 500ms."""
604      while True:
605          time.sleep(1)
606          current = collect()
607  
608          changed = [
609              path for path, mtime in current.items() if prev.get(path) != mtime
610          ]
611  
612          if changed:
613              print(f"  ↻ {len(changed)} file(s) changed:")
614              for path in changed[:5]:
615                  print(f"    {path}")
616              # Debounce — editor save bursts can take ~500ms to settle
617              time.sleep(0.5)
618              return collect()
619  
620  
621  if __name__ == "__main__":
622      sys.exit(main())