/ services / teamMemorySync / watcher.ts
watcher.ts
  1  /**
  2   * Team Memory File Watcher
  3   *
  4   * Watches the team memory directory for changes and triggers
  5   * a debounced push to the server when files are modified.
  6   * Performs an initial pull on startup, then starts a directory-level
  7   * fs.watch so first-time writes to a fresh repo get picked up.
  8   */
  9  
 10  import { feature } from 'bun:bundle'
 11  import { type FSWatcher, watch } from 'fs'
 12  import { mkdir, stat } from 'fs/promises'
 13  import { join } from 'path'
 14  import {
 15    getTeamMemPath,
 16    isTeamMemoryEnabled,
 17  } from '../../memdir/teamMemPaths.js'
 18  import { registerCleanup } from '../../utils/cleanupRegistry.js'
 19  import { logForDebugging } from '../../utils/debug.js'
 20  import { errorMessage } from '../../utils/errors.js'
 21  import { getGithubRepo } from '../../utils/git.js'
 22  import {
 23    type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 24    logEvent,
 25  } from '../analytics/index.js'
 26  import {
 27    createSyncState,
 28    isTeamMemorySyncAvailable,
 29    pullTeamMemory,
 30    pushTeamMemory,
 31    type SyncState,
 32  } from './index.js'
 33  import type { TeamMemorySyncPushResult } from './types.js'
 34  
 35  const DEBOUNCE_MS = 2000 // Wait 2s after last change before pushing
 36  
 37  // ─── Watcher state ──────────────────────────────────────────
 38  let watcher: FSWatcher | null = null
 39  let debounceTimer: ReturnType<typeof setTimeout> | null = null
 40  let pushInProgress = false
 41  let hasPendingChanges = false
 42  let currentPushPromise: Promise<void> | null = null
 43  let watcherStarted = false
 44  
 45  // Set after a push fails for a reason that can't self-heal on retry.
 46  // Prevents watch events from other sessions' writes to the shared team
 47  // dir driving an infinite retry loop (BQ Mar 14-16: one no_oauth device
 48  // emitted 167K push events over 2.5 days). Cleared on unlink — file deletion
 49  // is a recovery action for the too-many-entries case, and for no_oauth the
 50  // suppression persisting until session restart is correct.
 51  let pushSuppressedReason: string | null = null
 52  
 53  /**
 54   * Permanent = retry without user action will fail the same way.
 55   * - no_oauth / no_repo: pre-request client checks, no status code
 56   * - 4xx except 409/429: client error (404 missing repo, 413 too many
 57   *   entries, 403 permission). 409 is a transient conflict — server state
 58   *   changed under us, a fresh push after next pull can succeed. 429 is a
 59   *   rate limit — watcher-driven backoff is fine.
 60   */
 61  export function isPermanentFailure(r: TeamMemorySyncPushResult): boolean {
 62    if (r.errorType === 'no_oauth' || r.errorType === 'no_repo') return true
 63    if (
 64      r.httpStatus !== undefined &&
 65      r.httpStatus >= 400 &&
 66      r.httpStatus < 500 &&
 67      r.httpStatus !== 409 &&
 68      r.httpStatus !== 429
 69    ) {
 70      return true
 71    }
 72    return false
 73  }
 74  
 75  // Sync state owned by the watcher — shared across all sync operations.
 76  let syncState: SyncState | null = null
 77  
 78  /**
 79   * Execute the push and track its lifecycle.
 80   * Push is read-only on disk (delta+probe, no merge writes), so no event
 81   * suppression is needed — edits arriving mid-push hit schedulePush() and
 82   * the debounce re-arms after this push completes.
 83   */
 84  async function executePush(): Promise<void> {
 85    if (!syncState) {
 86      return
 87    }
 88    pushInProgress = true
 89    try {
 90      const result = await pushTeamMemory(syncState)
 91      if (result.success) {
 92        hasPendingChanges = false
 93      }
 94      if (result.success && result.filesUploaded > 0) {
 95        logForDebugging(
 96          `team-memory-watcher: pushed ${result.filesUploaded} files`,
 97          { level: 'info' },
 98        )
 99      } else if (!result.success) {
100        logForDebugging(`team-memory-watcher: push failed: ${result.error}`, {
101          level: 'warn',
102        })
103        if (isPermanentFailure(result) && pushSuppressedReason === null) {
104          pushSuppressedReason =
105            result.httpStatus !== undefined
106              ? `http_${result.httpStatus}`
107              : (result.errorType ?? 'unknown')
108          logForDebugging(
109            `team-memory-watcher: suppressing retry until next unlink or session restart (${pushSuppressedReason})`,
110            { level: 'warn' },
111          )
112          logEvent('tengu_team_mem_push_suppressed', {
113            reason:
114              pushSuppressedReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
115            ...(result.httpStatus && { status: result.httpStatus }),
116          })
117        }
118      }
119    } catch (e) {
120      logForDebugging(`team-memory-watcher: push error: ${errorMessage(e)}`, {
121        level: 'warn',
122      })
123    } finally {
124      pushInProgress = false
125      currentPushPromise = null
126    }
127  }
128  
129  /**
130   * Debounced push: waits for writes to settle, then pushes once.
131   */
132  function schedulePush(): void {
133    if (pushSuppressedReason !== null) return
134    hasPendingChanges = true
135    if (debounceTimer) {
136      clearTimeout(debounceTimer)
137    }
138    debounceTimer = setTimeout(() => {
139      if (pushInProgress) {
140        schedulePush()
141        return
142      }
143      currentPushPromise = executePush()
144    }, DEBOUNCE_MS)
145  }
146  
147  /**
148   * Start watching the team memory directory for changes.
149   *
150   * Uses `fs.watch({recursive: true})` on the directory (not chokidar).
151   * chokidar 4+ dropped fsevents, and Bun's `fs.watch` fallback uses kqueue,
152   * which requires one open fd per watched file — with 500+ team memory files
153   * that's 500+ permanently-held fds (confirmed via lsof + repro).
154   *
155   * `recursive: true` is required because team memory supports subdirs
156   * (validateTeamMemKey, pushTeamMemory's walkDir). On macOS Bun uses
157   * FSEvents for recursive — O(1) fds regardless of tree size (verified:
158   * 2 fds for 60 files across 5 subdirs). On Linux inotify needs one watch
159   * per directory — O(subdirs), still fine (team memory rarely nests).
160   *
161   * `fs.watch` on a directory doesn't distinguish add/change/unlink — all three
162   * emit `rename`. To clear suppression on the too-many-entries recovery path
163   * (user deletes files), we stat the filename on each event: ENOENT → treat as
164   * unlink.  For `no_oauth` suppression this is correct: no_oauth users don't
165   * delete team memory files to recover, they restart with auth.
166   */
167  async function startFileWatcher(teamDir: string): Promise<void> {
168    if (watcherStarted) {
169      return
170    }
171    watcherStarted = true
172  
173    try {
174      // pullTeamMemory returns early without creating the dir for fresh repos
175      // with no server content (index.ts isEmpty path). mkdir with
176      // recursive:true is idempotent — no existence check needed.
177      await mkdir(teamDir, { recursive: true })
178  
179      watcher = watch(
180        teamDir,
181        { persistent: true, recursive: true },
182        (_eventType, filename) => {
183          if (filename === null) {
184            schedulePush()
185            return
186          }
187          if (pushSuppressedReason !== null) {
188            // Suppression is only cleared by unlink (recovery action for
189            // too-many-entries). fs.watch doesn't distinguish unlink from
190            // add/write — stat to disambiguate. ENOENT → file gone → clear.
191            void stat(join(teamDir, filename)).catch(
192              (err: NodeJS.ErrnoException) => {
193                if (err.code !== 'ENOENT') return
194                if (pushSuppressedReason !== null) {
195                  logForDebugging(
196                    `team-memory-watcher: unlink cleared suppression (was: ${pushSuppressedReason})`,
197                    { level: 'info' },
198                  )
199                  pushSuppressedReason = null
200                }
201                schedulePush()
202              },
203            )
204            return
205          }
206          schedulePush()
207        },
208      )
209      watcher.on('error', err => {
210        logForDebugging(
211          `team-memory-watcher: fs.watch error: ${errorMessage(err)}`,
212          { level: 'warn' },
213        )
214      })
215      logForDebugging(`team-memory-watcher: watching ${teamDir}`, {
216        level: 'debug',
217      })
218    } catch (err) {
219      // fs.watch throws synchronously on ENOENT (race: dir deleted between
220      // mkdir and watch) or EACCES. watcherStarted is already true above,
221      // so notifyTeamMemoryWrite's explicit schedulePush path still works.
222      logForDebugging(
223        `team-memory-watcher: failed to watch ${teamDir}: ${errorMessage(err)}`,
224        { level: 'warn' },
225      )
226    }
227  
228    registerCleanup(async () => stopTeamMemoryWatcher())
229  }
230  
231  /**
232   * Start the team memory sync system.
233   *
234   * Returns early (before creating any state) if:
235   *   - TEAMMEM build flag is off
236   *   - team memory is disabled (isTeamMemoryEnabled)
237   *   - OAuth is not available (isTeamMemorySyncAvailable)
238   *   - the current repo has no github.com remote
239   *
240   * The early github.com check prevents a noisy failure mode where the
241   * watcher starts, it fires on local edits, and every push/pull
242   * logs `errorType: no_repo` forever. Team memory is GitHub-scoped on
243   * the server side, so non-github.com remotes can never sync anyway.
244   *
245   * Pulls from server, then starts the file watcher unconditionally.
246   * The watcher must start even when the server has no content yet
247   * (fresh EAP repo) — otherwise Claude's first team-memory write
248   * depends entirely on PostToolUse hooks firing notifyTeamMemoryWrite,
249   * which is a chicken-and-egg: Claude's write rate is low enough that
250   * a fresh partner can sit in the bootstrap dead zone for days.
251   */
252  export async function startTeamMemoryWatcher(): Promise<void> {
253    if (!feature('TEAMMEM')) {
254      return
255    }
256    if (!isTeamMemoryEnabled() || !isTeamMemorySyncAvailable()) {
257      return
258    }
259    const repoSlug = await getGithubRepo()
260    if (!repoSlug) {
261      logForDebugging(
262        'team-memory-watcher: no github.com remote, skipping sync',
263        { level: 'debug' },
264      )
265      return
266    }
267  
268    syncState = createSyncState()
269  
270    // Initial pull from server (runs before the watcher starts, so its disk
271    // writes won't trigger schedulePush)
272    let initialPullSuccess = false
273    let initialFilesPulled = 0
274    let serverHasContent = false
275    try {
276      const pullResult = await pullTeamMemory(syncState)
277      initialPullSuccess = pullResult.success
278      serverHasContent = pullResult.entryCount > 0
279      if (pullResult.success && pullResult.filesWritten > 0) {
280        initialFilesPulled = pullResult.filesWritten
281        logForDebugging(
282          `team-memory-watcher: initial pull got ${pullResult.filesWritten} files`,
283          { level: 'info' },
284        )
285      }
286    } catch (e) {
287      logForDebugging(
288        `team-memory-watcher: initial pull failed: ${errorMessage(e)}`,
289        { level: 'warn' },
290      )
291    }
292  
293    // Always start the watcher. Watching an empty dir is cheap,
294    // and the alternative (lazy start on notifyTeamMemoryWrite) creates
295    // a bootstrap dead zone for fresh repos.
296    await startFileWatcher(getTeamMemPath())
297  
298    logEvent('tengu_team_mem_sync_started', {
299      initial_pull_success: initialPullSuccess,
300      initial_files_pulled: initialFilesPulled,
301      // Kept for dashboard continuity; now always true when this event fires.
302      watcher_started: true,
303      server_has_content: serverHasContent,
304    })
305  }
306  
307  /**
308   * Call this when a team memory file is written (e.g. from PostToolUse hooks).
309   * Schedules a push explicitly in case fs.watch misses the write —
310   * a file written in the same tick the watcher starts may not fire an
311   * event, and some platforms coalesce rapid successive writes.
312   * If the watcher does fire, the debounce timer just resets.
313   */
314  export async function notifyTeamMemoryWrite(): Promise<void> {
315    if (!syncState) {
316      return
317    }
318    schedulePush()
319  }
320  
321  /**
322   * Stop the file watcher and flush pending changes.
323   * Note: runs within the 2s graceful shutdown budget, so the flush
324   * is best-effort — if the HTTP PUT doesn't complete in time,
325   * process.exit() will kill it.
326   */
327  export async function stopTeamMemoryWatcher(): Promise<void> {
328    if (debounceTimer) {
329      clearTimeout(debounceTimer)
330      debounceTimer = null
331    }
332    if (watcher) {
333      watcher.close()
334      watcher = null
335    }
336    // Await any in-flight push
337    if (currentPushPromise) {
338      try {
339        await currentPushPromise
340      } catch {
341        // Ignore errors during shutdown
342      }
343    }
344    // Flush pending changes that were debounced but not yet pushed
345    if (hasPendingChanges && syncState && pushSuppressedReason === null) {
346      try {
347        await pushTeamMemory(syncState)
348      } catch {
349        // Best-effort — shutdown may kill this
350      }
351    }
352  }
353  
354  /**
355   * Test-only: reset module state and optionally seed syncState.
356   * The feature('TEAMMEM') gate at the top of startTeamMemoryWatcher() is
357   * always false in bun test, so tests can't set syncState through the normal
358   * path. This helper lets tests drive notifyTeamMemoryWrite() /
359   * stopTeamMemoryWatcher() directly.
360   *
361   * `skipWatcher: true` marks the watcher as already-started without actually
362   * starting it. Tests that only exercise the schedulePush/flush path don't
363   * need a real watcher.
364   */
365  export function _resetWatcherStateForTesting(opts?: {
366    syncState?: SyncState
367    skipWatcher?: boolean
368    pushSuppressedReason?: string | null
369  }): void {
370    watcher = null
371    debounceTimer = null
372    pushInProgress = false
373    hasPendingChanges = false
374    currentPushPromise = null
375    watcherStarted = opts?.skipWatcher ?? false
376    pushSuppressedReason = opts?.pushSuppressedReason ?? null
377    syncState = opts?.syncState ?? null
378  }
379  
380  /**
381   * Test-only: start the real fs.watch on a specified directory.
382   * Used by the fd-count regression test — startTeamMemoryWatcher() is gated
383   * by feature('TEAMMEM') which is false under bun test.
384   */
385  export function _startFileWatcherForTesting(dir: string): Promise<void> {
386    return startFileWatcher(dir)
387  }