/ internal / watcher / watcher.go
watcher.go
  1  package watcher
  2  
  3  import (
  4  	"context"
  5  	"fmt"
  6  	"log"
  7  	"os"
  8  	"path/filepath"
  9  	"sort"
 10  	"strconv"
 11  	"strings"
 12  	"sync"
 13  	"sync/atomic"
 14  	"time"
 15  
 16  	"github.com/fsnotify/fsnotify"
 17  )
 18  
 19  // maxWatchDirs caps the total number of directories watched across all roots
 20  // to avoid exhausting file descriptors on broad watch paths.
 21  const maxWatchDirs = 4096
 22  
 23  // defaultSkipDirs is the built-in set of directory names that should never be
 24  // recursively watched. These are typically large vendored/generated trees.
 25  var defaultSkipDirs = map[string]bool{
 26  	"node_modules": true,
 27  	".git":         true,
 28  	"vendor":       true,
 29  	".build":       true,
 30  	"DerivedData":  true,
 31  	"Pods":         true,
 32  	".svn":         true,
 33  	".hg":          true,
 34  	"__pycache__":  true,
 35  	".venv":        true,
 36  	"venv":         true,
 37  	".tox":         true,
 38  	"target":       true, // Rust, Java
 39  	"dist":         true,
 40  	"build":        true,
 41  	".next":        true,
 42  	".nuxt":        true,
 43  	".cache":       true,
 44  	".gradle":      true,
 45  }
 46  
 47  // Stats tracks watcher health metrics.
 48  type Stats struct {
 49  	WatchedDirs       int
 50  	SkippedDirs       int
 51  	AddFailures       int
 52  	RateLimitedEvents int
 53  	CapHit            bool
 54  }
 55  
 56  // fileEvent represents a debounced file system event.
 57  type fileEvent struct {
 58  	Path string
 59  	Type string
 60  }
 61  
 62  // agentWatch maps a watch entry to the agent that owns it.
 63  type agentWatch struct {
 64  	Agent string
 65  	Path  string
 66  	Glob  string
 67  }
 68  
 69  // WatchEntry is the config shape for a single watch path+glob.
 70  type WatchEntry struct {
 71  	Path string `json:"path" yaml:"path"`
 72  	Glob string `json:"glob,omitempty" yaml:"glob,omitempty"`
 73  }
 74  
 75  // RunFunc is the callback invoked when debounced events are ready for an agent.
 76  type RunFunc func(ctx context.Context, agent, prompt string)
 77  
 78  // Watcher monitors file system paths and dispatches debounced events to agents.
 79  type Watcher struct {
 80  	fsw      *fsnotify.Watcher
 81  	watches  []agentWatch
 82  	runFn    RunFunc
 83  	Debounce time.Duration
 84  
 85  	mu      sync.Mutex
 86  	batches map[string]map[string]string // agent → path → eventType
 87  	timers  map[string]*time.Timer       // agent → debounce timer
 88  	lastRun map[string]time.Time
 89  
 90  	// skip policy: built-in defaults merged with user-supplied ignores
 91  	skipDirs map[string]bool
 92  
 93  	// stats tracked atomically for observability
 94  	watchedDirs atomic.Int32
 95  	skippedDirs atomic.Int32
 96  	addFailures atomic.Int32
 97  	rateLimits  atomic.Int32
 98  	capHit      atomic.Bool
 99  
100  	// watch state is protected by watchMu to avoid races between startup and runtime adds.
101  	watchMu    sync.Mutex
102  	watchedMap map[string]bool
103  	rateLimit  time.Duration
104  
105  	ctx    context.Context
106  	cancel context.CancelFunc
107  	done   chan struct{}
108  }
109  
110  // Option configures a Watcher.
111  type Option func(*Watcher)
112  
113  // WithIgnoreDirs adds extra directory names to the skip list.
114  func WithIgnoreDirs(names []string) Option {
115  	return func(w *Watcher) {
116  		for _, n := range names {
117  			w.skipDirs[n] = true
118  		}
119  	}
120  }
121  
122  // WithRateLimit sets a minimum interval between RunFunc calls per agent.
123  // Zero or negative disables rate limiting.
124  func WithRateLimit(d time.Duration) Option {
125  	return func(w *Watcher) {
126  		w.rateLimit = d
127  	}
128  }
129  
130  // New creates a Watcher from agent watch configurations.
131  // agentWatches maps agent name → list of watch entries.
132  func New(agentWatches map[string][]WatchEntry, runFn RunFunc, opts ...Option) (*Watcher, error) {
133  	fsw, err := fsnotify.NewWatcher()
134  	if err != nil {
135  		return nil, fmt.Errorf("create fsnotify watcher: %w", err)
136  	}
137  
138  	w := &Watcher{
139  		fsw:        fsw,
140  		runFn:      runFn,
141  		Debounce:   2 * time.Second,
142  		batches:    make(map[string]map[string]string),
143  		timers:     make(map[string]*time.Timer),
144  		lastRun:    make(map[string]time.Time),
145  		skipDirs:   make(map[string]bool, len(defaultSkipDirs)),
146  		watchedMap: make(map[string]bool),
147  		done:       make(chan struct{}),
148  	}
149  	for k, v := range defaultSkipDirs {
150  		w.skipDirs[k] = v
151  	}
152  	for _, opt := range opts {
153  		opt(w)
154  	}
155  
156  	var totalSkipped, totalFailures int
157  	var watches []agentWatch
158  
159  	for agent, entries := range agentWatches {
160  		for _, entry := range entries {
161  			expanded := ExpandPath(entry.Path)
162  			watches = append(watches, agentWatch{
163  				Agent: agent,
164  				Path:  expanded,
165  				Glob:  entry.Glob,
166  			})
167  
168  			// Verify the root watch path exists before walking.
169  			rootInfo, statErr := os.Lstat(expanded)
170  			if statErr != nil {
171  				log.Printf("watcher: watch path %s for agent %s is not accessible: %v", expanded, agent, statErr)
172  				continue
173  			}
174  			if !rootInfo.IsDir() {
175  				log.Printf("watcher: watch path %s for agent %s is not a directory", expanded, agent)
176  				continue
177  			}
178  			if rootInfo.Mode()&os.ModeSymlink != 0 {
179  				if resolved, err := filepath.EvalSymlinks(expanded); err == nil {
180  					expanded = resolved
181  				}
182  			}
183  
184  			rootBefore := int(w.watchedDirs.Load())
185  			var skipped, failures int
186  			_ = filepath.Walk(expanded, func(path string, info os.FileInfo, walkErr error) error {
187  				if walkErr != nil {
188  					// Only skip the directory itself on access errors; don't
189  					// abort the entire subtree for a single file-level error.
190  					if info != nil && info.IsDir() {
191  						log.Printf("watcher: skipping inaccessible directory %s: %v", path, walkErr)
192  						skipped++
193  						w.skippedDirs.Add(1)
194  						return filepath.SkipDir
195  					}
196  					// File-level error: log and continue walking siblings.
197  					return nil
198  				}
199  
200  				if !info.IsDir() {
201  					return nil
202  				}
203  
204  				// Skip symlinked directories to avoid loops.
205  				if info.Mode()&os.ModeSymlink != 0 {
206  					skipped++
207  					w.skippedDirs.Add(1)
208  					return filepath.SkipDir
209  				}
210  
211  				name := filepath.Base(path)
212  				if w.skipDirs[name] && path != expanded {
213  					skipped++
214  					w.skippedDirs.Add(1)
215  					return filepath.SkipDir
216  				}
217  
218  				outcome := w.tryAddWatchDir(path, &failures)
219  				if outcome == watchCapReached || outcome == addWatchFailed {
220  					// Don't SkipDir — siblings may still be addable.
221  					// But do skip children of this dir.
222  					return filepath.SkipDir
223  				}
224  
225  				return nil
226  			})
227  
228  			watchedForRoot := int(w.watchedDirs.Load()) - rootBefore
229  			totalSkipped += skipped
230  			totalFailures += failures
231  			log.Printf("watcher: agent %s root %s — %d dirs watched, %d skipped, %d failures",
232  				agent, expanded, watchedForRoot, skipped, failures)
233  		}
234  	}
235  
236  	w.watches = watches
237  
238  	if totalFailures > 0 {
239  		log.Printf("watcher: total add failures: %d (check file descriptor limits)", totalFailures)
240  	}
241  	if totalSkipped > 0 {
242  		log.Printf("watcher: total skipped directories during startup: %d", totalSkipped)
243  	}
244  	if w.capHit.Load() {
245  		log.Printf("watcher: running in degraded mode — directory cap hit, coverage is partial")
246  	}
247  
248  	return w, nil
249  }
250  
251  // GetStats returns current watcher health metrics.
252  func (w *Watcher) GetStats() Stats {
253  	return Stats{
254  		WatchedDirs:       int(w.watchedDirs.Load()),
255  		SkippedDirs:       int(w.skippedDirs.Load()),
256  		AddFailures:       int(w.addFailures.Load()),
257  		RateLimitedEvents: int(w.rateLimits.Load()),
258  		CapHit:            w.capHit.Load(),
259  	}
260  }
261  
262  // Start begins the event loop. Blocks until ctx is cancelled or Close is called.
263  func (w *Watcher) Start(ctx context.Context) {
264  	w.ctx, w.cancel = context.WithCancel(ctx)
265  	go w.loop()
266  }
267  
268  func (w *Watcher) loop() {
269  	defer close(w.done)
270  	for {
271  		select {
272  		case <-w.ctx.Done():
273  			return
274  		case event, ok := <-w.fsw.Events:
275  			if !ok {
276  				return
277  			}
278  			w.handleEvent(event)
279  		case err, ok := <-w.fsw.Errors:
280  			if !ok {
281  				return
282  			}
283  			log.Printf("watcher: fsnotify error: %v", err)
284  		}
285  	}
286  }
287  
288  func (w *Watcher) handleEvent(event fsnotify.Event) {
289  	path, err := filepath.Abs(filepath.Clean(event.Name))
290  	if err != nil {
291  		return
292  	}
293  
294  	// Auto-add new directories for recursive watching, with same guards as startup.
295  	if event.Has(fsnotify.Create) {
296  		if info, statErr := os.Lstat(path); statErr == nil && info.IsDir() && info.Mode()&os.ModeSymlink == 0 {
297  			name := filepath.Base(path)
298  			// Enforce skip policy and cap on dynamic adds too.
299  			if !w.skipDirs[name] && info.Mode()&os.ModeSymlink == 0 {
300  				if outcome := w.tryAddWatchDir(path, nil); outcome == addWatchFailed {
301  					log.Printf("watcher: failed to add runtime directory %s", path)
302  				} else if outcome == watchCapReached {
303  					log.Printf("watcher: WARNING: directory cap (%d) reached at runtime — new directories will not be watched", maxWatchDirs)
304  				}
305  			}
306  		}
307  	}
308  
309  	eventType := MapEventType(event.Op)
310  	if eventType == "" {
311  		return
312  	}
313  
314  	filename := filepath.Base(path)
315  
316  	// Fan out to all matching agent watches.
317  	for _, aw := range w.watches {
318  		if !isUnder(path, aw.Path) {
319  			continue
320  		}
321  		if !MatchGlob(aw.Glob, filename) {
322  			continue
323  		}
324  		w.appendEvent(aw.Agent, path, eventType)
325  	}
326  }
327  
328  // isUnder returns true if path is inside (or equal to) the watched directory.
329  func isUnder(path, watchDir string) bool {
330  	rel, err := filepath.Rel(watchDir, path)
331  	if err != nil {
332  		return false
333  	}
334  	return !strings.HasPrefix(rel, "..")
335  }
336  
337  func (w *Watcher) appendEvent(agent, path, eventType string) {
338  	w.mu.Lock()
339  	defer w.mu.Unlock()
340  
341  	now := time.Now()
342  	if w.rateLimit > 0 {
343  		if nextAllowed, ok := w.lastRun[agent]; ok && now.Before(nextAllowed) {
344  			w.rateLimits.Add(1)
345  			return
346  		}
347  	}
348  
349  	if w.batches[agent] == nil {
350  		w.batches[agent] = make(map[string]string)
351  	}
352  	w.batches[agent][path] = eventType
353  
354  	// Reset debounce timer for this agent.
355  	if t, ok := w.timers[agent]; ok {
356  		t.Stop()
357  	}
358  	agentCopy := agent
359  	w.timers[agent] = time.AfterFunc(w.Debounce, func() {
360  		w.flush(agentCopy)
361  	})
362  }
363  
364  func (w *Watcher) flush(agent string) {
365  	if w.ctx.Err() != nil {
366  		return
367  	}
368  
369  	w.mu.Lock()
370  	batch := w.batches[agent]
371  	delete(w.batches, agent)
372  	delete(w.timers, agent)
373  	w.mu.Unlock()
374  
375  	if len(batch) == 0 {
376  		return
377  	}
378  
379  	var events []fileEvent
380  	for p, t := range batch {
381  		events = append(events, fileEvent{Path: p, Type: t})
382  	}
383  
384  	if w.rateLimit > 0 {
385  		w.mu.Lock()
386  		w.lastRun[agent] = time.Now().Add(w.rateLimit)
387  		w.mu.Unlock()
388  	}
389  
390  	prompt := FormatPrompt(events)
391  	w.runFn(w.ctx, agent, prompt)
392  }
393  
394  type watchAddOutcome int
395  
396  const (
397  	watchAdded watchAddOutcome = iota
398  	watchAlreadyWatched
399  	watchCapReached
400  	addWatchFailed
401  )
402  
403  func (w *Watcher) tryAddWatchDir(path string, failures *int) watchAddOutcome {
404  	w.watchMu.Lock()
405  	defer w.watchMu.Unlock()
406  
407  	if w.watchedMap[path] {
408  		return watchAlreadyWatched
409  	}
410  	if int(w.watchedDirs.Load()) >= maxWatchDirs {
411  		if !w.capHit.Load() {
412  			w.capHit.Store(true)
413  			log.Printf("watcher: WARNING: directory cap (%d) reached — watch coverage is partial, some changes may be missed", maxWatchDirs)
414  		}
415  		return watchCapReached
416  	}
417  
418  	if err := w.fsw.Add(path); err != nil {
419  		log.Printf("watcher: failed to add %s: %v", path, err)
420  		w.addFailures.Add(1)
421  		if failures != nil {
422  			(*failures)++
423  		}
424  		return addWatchFailed
425  	}
426  
427  	w.watchedMap[path] = true
428  	w.watchedDirs.Add(1)
429  	return watchAdded
430  }
431  
432  // Close stops the watcher, cancels the context, and waits for the event loop to exit.
433  func (w *Watcher) Close() {
434  	if w.cancel != nil {
435  		w.cancel()
436  		_ = w.fsw.Close()
437  		<-w.done
438  	} else {
439  		// Start() was never called — just close fsnotify, don't wait on done.
440  		_ = w.fsw.Close()
441  	}
442  
443  	w.mu.Lock()
444  	defer w.mu.Unlock()
445  	for _, t := range w.timers {
446  		t.Stop()
447  	}
448  }
449  
450  // MatchGlob returns true if filename matches the glob pattern.
451  // An empty glob matches everything.
452  func MatchGlob(glob, filename string) bool {
453  	if glob == "" {
454  		return true
455  	}
456  	matched, err := filepath.Match(glob, filename)
457  	if err != nil {
458  		return false
459  	}
460  	return matched
461  }
462  
463  // MapEventType converts an fsnotify Op to a human-readable event type string.
464  func MapEventType(op fsnotify.Op) string {
465  	switch {
466  	case op.Has(fsnotify.Create):
467  		return "created"
468  	case op.Has(fsnotify.Remove):
469  		return "deleted"
470  	case op.Has(fsnotify.Rename):
471  		return "renamed"
472  	case op.Has(fsnotify.Write):
473  		return "modified"
474  	default:
475  		return ""
476  	}
477  }
478  
479  // ExpandPath expands tilde and environment variables, then cleans and resolves to absolute.
480  func ExpandPath(path string) string {
481  	if strings.HasPrefix(path, "~/") || path == "~" {
482  		home, err := os.UserHomeDir()
483  		if err == nil {
484  			path = filepath.Join(home, path[1:])
485  		}
486  	}
487  	path = os.ExpandEnv(path)
488  	path = filepath.Clean(path)
489  	if abs, err := filepath.Abs(path); err == nil {
490  		path = abs
491  	}
492  	return path
493  }
494  
495  // FormatPrompt formats a slice of file events into a prompt string.
496  func FormatPrompt(events []fileEvent) string {
497  	sort.Slice(events, func(i, j int) bool {
498  		return events[i].Path < events[j].Path
499  	})
500  
501  	var b strings.Builder
502  	b.WriteString("File changes detected:\n")
503  	for _, e := range events {
504  		fmt.Fprintf(&b, "- %s: %s\n", e.Type, e.Path)
505  	}
506  	return strings.TrimRight(b.String(), "\n")
507  }
508  
509  // InActiveHours checks whether now falls within the given "HH:MM-HH:MM" window.
510  // Supports overnight windows (e.g. "22:00-02:00").
511  // Empty window = always active. Invalid format = always active (with log warning).
512  func InActiveHours(window string, now time.Time) bool {
513  	if window == "" {
514  		return true
515  	}
516  
517  	parts := strings.SplitN(window, "-", 2)
518  	if len(parts) != 2 {
519  		log.Printf("watcher: invalid active_hours format %q, treating as always active", window)
520  		return true
521  	}
522  
523  	startMin, err1 := parseHHMM(parts[0])
524  	endMin, err2 := parseHHMM(parts[1])
525  	if err1 != nil || err2 != nil {
526  		log.Printf("watcher: invalid active_hours format %q, treating as always active", window)
527  		return true
528  	}
529  
530  	nowMin := now.Hour()*60 + now.Minute()
531  
532  	if startMin <= endMin {
533  		// Normal window, e.g. "09:00-17:00"
534  		return nowMin >= startMin && nowMin < endMin
535  	}
536  	// Overnight window, e.g. "22:00-02:00"
537  	return nowMin >= startMin || nowMin < endMin
538  }
539  
540  // parseHHMM parses "HH:MM" into minutes since midnight.
541  func parseHHMM(s string) (int, error) {
542  	s = strings.TrimSpace(s)
543  	parts := strings.SplitN(s, ":", 2)
544  	if len(parts) != 2 {
545  		return 0, fmt.Errorf("expected HH:MM, got %q", s)
546  	}
547  	h, err := strconv.Atoi(parts[0])
548  	if err != nil || h < 0 || h > 23 {
549  		return 0, fmt.Errorf("invalid hour in %q", s)
550  	}
551  	m, err := strconv.Atoi(parts[1])
552  	if err != nil || m < 0 || m > 59 {
553  		return 0, fmt.Errorf("invalid minute in %q", s)
554  	}
555  	return h*60 + m, nil
556  }