watcher.go
1 package watcher 2 3 import ( 4 "context" 5 "fmt" 6 "log" 7 "os" 8 "path/filepath" 9 "sort" 10 "strconv" 11 "strings" 12 "sync" 13 "sync/atomic" 14 "time" 15 16 "github.com/fsnotify/fsnotify" 17 ) 18 19 // maxWatchDirs caps the total number of directories watched across all roots 20 // to avoid exhausting file descriptors on broad watch paths. 21 const maxWatchDirs = 4096 22 23 // defaultSkipDirs is the built-in set of directory names that should never be 24 // recursively watched. These are typically large vendored/generated trees. 25 var defaultSkipDirs = map[string]bool{ 26 "node_modules": true, 27 ".git": true, 28 "vendor": true, 29 ".build": true, 30 "DerivedData": true, 31 "Pods": true, 32 ".svn": true, 33 ".hg": true, 34 "__pycache__": true, 35 ".venv": true, 36 "venv": true, 37 ".tox": true, 38 "target": true, // Rust, Java 39 "dist": true, 40 "build": true, 41 ".next": true, 42 ".nuxt": true, 43 ".cache": true, 44 ".gradle": true, 45 } 46 47 // Stats tracks watcher health metrics. 48 type Stats struct { 49 WatchedDirs int 50 SkippedDirs int 51 AddFailures int 52 RateLimitedEvents int 53 CapHit bool 54 } 55 56 // fileEvent represents a debounced file system event. 57 type fileEvent struct { 58 Path string 59 Type string 60 } 61 62 // agentWatch maps a watch entry to the agent that owns it. 63 type agentWatch struct { 64 Agent string 65 Path string 66 Glob string 67 } 68 69 // WatchEntry is the config shape for a single watch path+glob. 70 type WatchEntry struct { 71 Path string `json:"path" yaml:"path"` 72 Glob string `json:"glob,omitempty" yaml:"glob,omitempty"` 73 } 74 75 // RunFunc is the callback invoked when debounced events are ready for an agent. 76 type RunFunc func(ctx context.Context, agent, prompt string) 77 78 // Watcher monitors file system paths and dispatches debounced events to agents. 79 type Watcher struct { 80 fsw *fsnotify.Watcher 81 watches []agentWatch 82 runFn RunFunc 83 Debounce time.Duration 84 85 mu sync.Mutex 86 batches map[string]map[string]string // agent → path → eventType 87 timers map[string]*time.Timer // agent → debounce timer 88 lastRun map[string]time.Time 89 90 // skip policy: built-in defaults merged with user-supplied ignores 91 skipDirs map[string]bool 92 93 // stats tracked atomically for observability 94 watchedDirs atomic.Int32 95 skippedDirs atomic.Int32 96 addFailures atomic.Int32 97 rateLimits atomic.Int32 98 capHit atomic.Bool 99 100 // watch state is protected by watchMu to avoid races between startup and runtime adds. 101 watchMu sync.Mutex 102 watchedMap map[string]bool 103 rateLimit time.Duration 104 105 ctx context.Context 106 cancel context.CancelFunc 107 done chan struct{} 108 } 109 110 // Option configures a Watcher. 111 type Option func(*Watcher) 112 113 // WithIgnoreDirs adds extra directory names to the skip list. 114 func WithIgnoreDirs(names []string) Option { 115 return func(w *Watcher) { 116 for _, n := range names { 117 w.skipDirs[n] = true 118 } 119 } 120 } 121 122 // WithRateLimit sets a minimum interval between RunFunc calls per agent. 123 // Zero or negative disables rate limiting. 124 func WithRateLimit(d time.Duration) Option { 125 return func(w *Watcher) { 126 w.rateLimit = d 127 } 128 } 129 130 // New creates a Watcher from agent watch configurations. 131 // agentWatches maps agent name → list of watch entries. 132 func New(agentWatches map[string][]WatchEntry, runFn RunFunc, opts ...Option) (*Watcher, error) { 133 fsw, err := fsnotify.NewWatcher() 134 if err != nil { 135 return nil, fmt.Errorf("create fsnotify watcher: %w", err) 136 } 137 138 w := &Watcher{ 139 fsw: fsw, 140 runFn: runFn, 141 Debounce: 2 * time.Second, 142 batches: make(map[string]map[string]string), 143 timers: make(map[string]*time.Timer), 144 lastRun: make(map[string]time.Time), 145 skipDirs: make(map[string]bool, len(defaultSkipDirs)), 146 watchedMap: make(map[string]bool), 147 done: make(chan struct{}), 148 } 149 for k, v := range defaultSkipDirs { 150 w.skipDirs[k] = v 151 } 152 for _, opt := range opts { 153 opt(w) 154 } 155 156 var totalSkipped, totalFailures int 157 var watches []agentWatch 158 159 for agent, entries := range agentWatches { 160 for _, entry := range entries { 161 expanded := ExpandPath(entry.Path) 162 watches = append(watches, agentWatch{ 163 Agent: agent, 164 Path: expanded, 165 Glob: entry.Glob, 166 }) 167 168 // Verify the root watch path exists before walking. 169 rootInfo, statErr := os.Lstat(expanded) 170 if statErr != nil { 171 log.Printf("watcher: watch path %s for agent %s is not accessible: %v", expanded, agent, statErr) 172 continue 173 } 174 if !rootInfo.IsDir() { 175 log.Printf("watcher: watch path %s for agent %s is not a directory", expanded, agent) 176 continue 177 } 178 if rootInfo.Mode()&os.ModeSymlink != 0 { 179 if resolved, err := filepath.EvalSymlinks(expanded); err == nil { 180 expanded = resolved 181 } 182 } 183 184 rootBefore := int(w.watchedDirs.Load()) 185 var skipped, failures int 186 _ = filepath.Walk(expanded, func(path string, info os.FileInfo, walkErr error) error { 187 if walkErr != nil { 188 // Only skip the directory itself on access errors; don't 189 // abort the entire subtree for a single file-level error. 190 if info != nil && info.IsDir() { 191 log.Printf("watcher: skipping inaccessible directory %s: %v", path, walkErr) 192 skipped++ 193 w.skippedDirs.Add(1) 194 return filepath.SkipDir 195 } 196 // File-level error: log and continue walking siblings. 197 return nil 198 } 199 200 if !info.IsDir() { 201 return nil 202 } 203 204 // Skip symlinked directories to avoid loops. 205 if info.Mode()&os.ModeSymlink != 0 { 206 skipped++ 207 w.skippedDirs.Add(1) 208 return filepath.SkipDir 209 } 210 211 name := filepath.Base(path) 212 if w.skipDirs[name] && path != expanded { 213 skipped++ 214 w.skippedDirs.Add(1) 215 return filepath.SkipDir 216 } 217 218 outcome := w.tryAddWatchDir(path, &failures) 219 if outcome == watchCapReached || outcome == addWatchFailed { 220 // Don't SkipDir — siblings may still be addable. 221 // But do skip children of this dir. 222 return filepath.SkipDir 223 } 224 225 return nil 226 }) 227 228 watchedForRoot := int(w.watchedDirs.Load()) - rootBefore 229 totalSkipped += skipped 230 totalFailures += failures 231 log.Printf("watcher: agent %s root %s — %d dirs watched, %d skipped, %d failures", 232 agent, expanded, watchedForRoot, skipped, failures) 233 } 234 } 235 236 w.watches = watches 237 238 if totalFailures > 0 { 239 log.Printf("watcher: total add failures: %d (check file descriptor limits)", totalFailures) 240 } 241 if totalSkipped > 0 { 242 log.Printf("watcher: total skipped directories during startup: %d", totalSkipped) 243 } 244 if w.capHit.Load() { 245 log.Printf("watcher: running in degraded mode — directory cap hit, coverage is partial") 246 } 247 248 return w, nil 249 } 250 251 // GetStats returns current watcher health metrics. 252 func (w *Watcher) GetStats() Stats { 253 return Stats{ 254 WatchedDirs: int(w.watchedDirs.Load()), 255 SkippedDirs: int(w.skippedDirs.Load()), 256 AddFailures: int(w.addFailures.Load()), 257 RateLimitedEvents: int(w.rateLimits.Load()), 258 CapHit: w.capHit.Load(), 259 } 260 } 261 262 // Start begins the event loop. Blocks until ctx is cancelled or Close is called. 263 func (w *Watcher) Start(ctx context.Context) { 264 w.ctx, w.cancel = context.WithCancel(ctx) 265 go w.loop() 266 } 267 268 func (w *Watcher) loop() { 269 defer close(w.done) 270 for { 271 select { 272 case <-w.ctx.Done(): 273 return 274 case event, ok := <-w.fsw.Events: 275 if !ok { 276 return 277 } 278 w.handleEvent(event) 279 case err, ok := <-w.fsw.Errors: 280 if !ok { 281 return 282 } 283 log.Printf("watcher: fsnotify error: %v", err) 284 } 285 } 286 } 287 288 func (w *Watcher) handleEvent(event fsnotify.Event) { 289 path, err := filepath.Abs(filepath.Clean(event.Name)) 290 if err != nil { 291 return 292 } 293 294 // Auto-add new directories for recursive watching, with same guards as startup. 295 if event.Has(fsnotify.Create) { 296 if info, statErr := os.Lstat(path); statErr == nil && info.IsDir() && info.Mode()&os.ModeSymlink == 0 { 297 name := filepath.Base(path) 298 // Enforce skip policy and cap on dynamic adds too. 299 if !w.skipDirs[name] && info.Mode()&os.ModeSymlink == 0 { 300 if outcome := w.tryAddWatchDir(path, nil); outcome == addWatchFailed { 301 log.Printf("watcher: failed to add runtime directory %s", path) 302 } else if outcome == watchCapReached { 303 log.Printf("watcher: WARNING: directory cap (%d) reached at runtime — new directories will not be watched", maxWatchDirs) 304 } 305 } 306 } 307 } 308 309 eventType := MapEventType(event.Op) 310 if eventType == "" { 311 return 312 } 313 314 filename := filepath.Base(path) 315 316 // Fan out to all matching agent watches. 317 for _, aw := range w.watches { 318 if !isUnder(path, aw.Path) { 319 continue 320 } 321 if !MatchGlob(aw.Glob, filename) { 322 continue 323 } 324 w.appendEvent(aw.Agent, path, eventType) 325 } 326 } 327 328 // isUnder returns true if path is inside (or equal to) the watched directory. 329 func isUnder(path, watchDir string) bool { 330 rel, err := filepath.Rel(watchDir, path) 331 if err != nil { 332 return false 333 } 334 return !strings.HasPrefix(rel, "..") 335 } 336 337 func (w *Watcher) appendEvent(agent, path, eventType string) { 338 w.mu.Lock() 339 defer w.mu.Unlock() 340 341 now := time.Now() 342 if w.rateLimit > 0 { 343 if nextAllowed, ok := w.lastRun[agent]; ok && now.Before(nextAllowed) { 344 w.rateLimits.Add(1) 345 return 346 } 347 } 348 349 if w.batches[agent] == nil { 350 w.batches[agent] = make(map[string]string) 351 } 352 w.batches[agent][path] = eventType 353 354 // Reset debounce timer for this agent. 355 if t, ok := w.timers[agent]; ok { 356 t.Stop() 357 } 358 agentCopy := agent 359 w.timers[agent] = time.AfterFunc(w.Debounce, func() { 360 w.flush(agentCopy) 361 }) 362 } 363 364 func (w *Watcher) flush(agent string) { 365 if w.ctx.Err() != nil { 366 return 367 } 368 369 w.mu.Lock() 370 batch := w.batches[agent] 371 delete(w.batches, agent) 372 delete(w.timers, agent) 373 w.mu.Unlock() 374 375 if len(batch) == 0 { 376 return 377 } 378 379 var events []fileEvent 380 for p, t := range batch { 381 events = append(events, fileEvent{Path: p, Type: t}) 382 } 383 384 if w.rateLimit > 0 { 385 w.mu.Lock() 386 w.lastRun[agent] = time.Now().Add(w.rateLimit) 387 w.mu.Unlock() 388 } 389 390 prompt := FormatPrompt(events) 391 w.runFn(w.ctx, agent, prompt) 392 } 393 394 type watchAddOutcome int 395 396 const ( 397 watchAdded watchAddOutcome = iota 398 watchAlreadyWatched 399 watchCapReached 400 addWatchFailed 401 ) 402 403 func (w *Watcher) tryAddWatchDir(path string, failures *int) watchAddOutcome { 404 w.watchMu.Lock() 405 defer w.watchMu.Unlock() 406 407 if w.watchedMap[path] { 408 return watchAlreadyWatched 409 } 410 if int(w.watchedDirs.Load()) >= maxWatchDirs { 411 if !w.capHit.Load() { 412 w.capHit.Store(true) 413 log.Printf("watcher: WARNING: directory cap (%d) reached — watch coverage is partial, some changes may be missed", maxWatchDirs) 414 } 415 return watchCapReached 416 } 417 418 if err := w.fsw.Add(path); err != nil { 419 log.Printf("watcher: failed to add %s: %v", path, err) 420 w.addFailures.Add(1) 421 if failures != nil { 422 (*failures)++ 423 } 424 return addWatchFailed 425 } 426 427 w.watchedMap[path] = true 428 w.watchedDirs.Add(1) 429 return watchAdded 430 } 431 432 // Close stops the watcher, cancels the context, and waits for the event loop to exit. 433 func (w *Watcher) Close() { 434 if w.cancel != nil { 435 w.cancel() 436 _ = w.fsw.Close() 437 <-w.done 438 } else { 439 // Start() was never called — just close fsnotify, don't wait on done. 440 _ = w.fsw.Close() 441 } 442 443 w.mu.Lock() 444 defer w.mu.Unlock() 445 for _, t := range w.timers { 446 t.Stop() 447 } 448 } 449 450 // MatchGlob returns true if filename matches the glob pattern. 451 // An empty glob matches everything. 452 func MatchGlob(glob, filename string) bool { 453 if glob == "" { 454 return true 455 } 456 matched, err := filepath.Match(glob, filename) 457 if err != nil { 458 return false 459 } 460 return matched 461 } 462 463 // MapEventType converts an fsnotify Op to a human-readable event type string. 464 func MapEventType(op fsnotify.Op) string { 465 switch { 466 case op.Has(fsnotify.Create): 467 return "created" 468 case op.Has(fsnotify.Remove): 469 return "deleted" 470 case op.Has(fsnotify.Rename): 471 return "renamed" 472 case op.Has(fsnotify.Write): 473 return "modified" 474 default: 475 return "" 476 } 477 } 478 479 // ExpandPath expands tilde and environment variables, then cleans and resolves to absolute. 480 func ExpandPath(path string) string { 481 if strings.HasPrefix(path, "~/") || path == "~" { 482 home, err := os.UserHomeDir() 483 if err == nil { 484 path = filepath.Join(home, path[1:]) 485 } 486 } 487 path = os.ExpandEnv(path) 488 path = filepath.Clean(path) 489 if abs, err := filepath.Abs(path); err == nil { 490 path = abs 491 } 492 return path 493 } 494 495 // FormatPrompt formats a slice of file events into a prompt string. 496 func FormatPrompt(events []fileEvent) string { 497 sort.Slice(events, func(i, j int) bool { 498 return events[i].Path < events[j].Path 499 }) 500 501 var b strings.Builder 502 b.WriteString("File changes detected:\n") 503 for _, e := range events { 504 fmt.Fprintf(&b, "- %s: %s\n", e.Type, e.Path) 505 } 506 return strings.TrimRight(b.String(), "\n") 507 } 508 509 // InActiveHours checks whether now falls within the given "HH:MM-HH:MM" window. 510 // Supports overnight windows (e.g. "22:00-02:00"). 511 // Empty window = always active. Invalid format = always active (with log warning). 512 func InActiveHours(window string, now time.Time) bool { 513 if window == "" { 514 return true 515 } 516 517 parts := strings.SplitN(window, "-", 2) 518 if len(parts) != 2 { 519 log.Printf("watcher: invalid active_hours format %q, treating as always active", window) 520 return true 521 } 522 523 startMin, err1 := parseHHMM(parts[0]) 524 endMin, err2 := parseHHMM(parts[1]) 525 if err1 != nil || err2 != nil { 526 log.Printf("watcher: invalid active_hours format %q, treating as always active", window) 527 return true 528 } 529 530 nowMin := now.Hour()*60 + now.Minute() 531 532 if startMin <= endMin { 533 // Normal window, e.g. "09:00-17:00" 534 return nowMin >= startMin && nowMin < endMin 535 } 536 // Overnight window, e.g. "22:00-02:00" 537 return nowMin >= startMin || nowMin < endMin 538 } 539 540 // parseHHMM parses "HH:MM" into minutes since midnight. 541 func parseHHMM(s string) (int, error) { 542 s = strings.TrimSpace(s) 543 parts := strings.SplitN(s, ":", 2) 544 if len(parts) != 2 { 545 return 0, fmt.Errorf("expected HH:MM, got %q", s) 546 } 547 h, err := strconv.Atoi(parts[0]) 548 if err != nil || h < 0 || h > 23 { 549 return 0, fmt.Errorf("invalid hour in %q", s) 550 } 551 m, err := strconv.Atoi(parts[1]) 552 if err != nil || m < 0 || m > 59 { 553 return 0, fmt.Errorf("invalid minute in %q", s) 554 } 555 return h*60 + m, nil 556 }