watcher.ts
1 /** 2 * Team Memory File Watcher 3 * 4 * Watches the team memory directory for changes and triggers 5 * a debounced push to the server when files are modified. 6 * Performs an initial pull on startup, then starts a directory-level 7 * fs.watch so first-time writes to a fresh repo get picked up. 8 */ 9 10 import { feature } from 'bun:bundle' 11 import { type FSWatcher, watch } from 'fs' 12 import { mkdir, stat } from 'fs/promises' 13 import { join } from 'path' 14 import { 15 getTeamMemPath, 16 isTeamMemoryEnabled, 17 } from '../../memdir/teamMemPaths.js' 18 import { registerCleanup } from '../../utils/cleanupRegistry.js' 19 import { logForDebugging } from '../../utils/debug.js' 20 import { errorMessage } from '../../utils/errors.js' 21 import { getGithubRepo } from '../../utils/git.js' 22 import { 23 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 24 logEvent, 25 } from '../analytics/index.js' 26 import { 27 createSyncState, 28 isTeamMemorySyncAvailable, 29 pullTeamMemory, 30 pushTeamMemory, 31 type SyncState, 32 } from './index.js' 33 import type { TeamMemorySyncPushResult } from './types.js' 34 35 const DEBOUNCE_MS = 2000 // Wait 2s after last change before pushing 36 37 // ─── Watcher state ────────────────────────────────────────── 38 let watcher: FSWatcher | null = null 39 let debounceTimer: ReturnType<typeof setTimeout> | null = null 40 let pushInProgress = false 41 let hasPendingChanges = false 42 let currentPushPromise: Promise<void> | null = null 43 let watcherStarted = false 44 45 // Set after a push fails for a reason that can't self-heal on retry. 46 // Prevents watch events from other sessions' writes to the shared team 47 // dir driving an infinite retry loop (BQ Mar 14-16: one no_oauth device 48 // emitted 167K push events over 2.5 days). Cleared on unlink — file deletion 49 // is a recovery action for the too-many-entries case, and for no_oauth the 50 // suppression persisting until session restart is correct. 51 let pushSuppressedReason: string | null = null 52 53 /** 54 * Permanent = retry without user action will fail the same way. 55 * - no_oauth / no_repo: pre-request client checks, no status code 56 * - 4xx except 409/429: client error (404 missing repo, 413 too many 57 * entries, 403 permission). 409 is a transient conflict — server state 58 * changed under us, a fresh push after next pull can succeed. 429 is a 59 * rate limit — watcher-driven backoff is fine. 60 */ 61 export function isPermanentFailure(r: TeamMemorySyncPushResult): boolean { 62 if (r.errorType === 'no_oauth' || r.errorType === 'no_repo') return true 63 if ( 64 r.httpStatus !== undefined && 65 r.httpStatus >= 400 && 66 r.httpStatus < 500 && 67 r.httpStatus !== 409 && 68 r.httpStatus !== 429 69 ) { 70 return true 71 } 72 return false 73 } 74 75 // Sync state owned by the watcher — shared across all sync operations. 76 let syncState: SyncState | null = null 77 78 /** 79 * Execute the push and track its lifecycle. 80 * Push is read-only on disk (delta+probe, no merge writes), so no event 81 * suppression is needed — edits arriving mid-push hit schedulePush() and 82 * the debounce re-arms after this push completes. 83 */ 84 async function executePush(): Promise<void> { 85 if (!syncState) { 86 return 87 } 88 pushInProgress = true 89 try { 90 const result = await pushTeamMemory(syncState) 91 if (result.success) { 92 hasPendingChanges = false 93 } 94 if (result.success && result.filesUploaded > 0) { 95 logForDebugging( 96 `team-memory-watcher: pushed ${result.filesUploaded} files`, 97 { level: 'info' }, 98 ) 99 } else if (!result.success) { 100 logForDebugging(`team-memory-watcher: push failed: ${result.error}`, { 101 level: 'warn', 102 }) 103 if (isPermanentFailure(result) && pushSuppressedReason === null) { 104 pushSuppressedReason = 105 result.httpStatus !== undefined 106 ? `http_${result.httpStatus}` 107 : (result.errorType ?? 'unknown') 108 logForDebugging( 109 `team-memory-watcher: suppressing retry until next unlink or session restart (${pushSuppressedReason})`, 110 { level: 'warn' }, 111 ) 112 logEvent('tengu_team_mem_push_suppressed', { 113 reason: 114 pushSuppressedReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 115 ...(result.httpStatus && { status: result.httpStatus }), 116 }) 117 } 118 } 119 } catch (e) { 120 logForDebugging(`team-memory-watcher: push error: ${errorMessage(e)}`, { 121 level: 'warn', 122 }) 123 } finally { 124 pushInProgress = false 125 currentPushPromise = null 126 } 127 } 128 129 /** 130 * Debounced push: waits for writes to settle, then pushes once. 131 */ 132 function schedulePush(): void { 133 if (pushSuppressedReason !== null) return 134 hasPendingChanges = true 135 if (debounceTimer) { 136 clearTimeout(debounceTimer) 137 } 138 debounceTimer = setTimeout(() => { 139 if (pushInProgress) { 140 schedulePush() 141 return 142 } 143 currentPushPromise = executePush() 144 }, DEBOUNCE_MS) 145 } 146 147 /** 148 * Start watching the team memory directory for changes. 149 * 150 * Uses `fs.watch({recursive: true})` on the directory (not chokidar). 151 * chokidar 4+ dropped fsevents, and Bun's `fs.watch` fallback uses kqueue, 152 * which requires one open fd per watched file — with 500+ team memory files 153 * that's 500+ permanently-held fds (confirmed via lsof + repro). 154 * 155 * `recursive: true` is required because team memory supports subdirs 156 * (validateTeamMemKey, pushTeamMemory's walkDir). On macOS Bun uses 157 * FSEvents for recursive — O(1) fds regardless of tree size (verified: 158 * 2 fds for 60 files across 5 subdirs). On Linux inotify needs one watch 159 * per directory — O(subdirs), still fine (team memory rarely nests). 160 * 161 * `fs.watch` on a directory doesn't distinguish add/change/unlink — all three 162 * emit `rename`. To clear suppression on the too-many-entries recovery path 163 * (user deletes files), we stat the filename on each event: ENOENT → treat as 164 * unlink. For `no_oauth` suppression this is correct: no_oauth users don't 165 * delete team memory files to recover, they restart with auth. 166 */ 167 async function startFileWatcher(teamDir: string): Promise<void> { 168 if (watcherStarted) { 169 return 170 } 171 watcherStarted = true 172 173 try { 174 // pullTeamMemory returns early without creating the dir for fresh repos 175 // with no server content (index.ts isEmpty path). mkdir with 176 // recursive:true is idempotent — no existence check needed. 177 await mkdir(teamDir, { recursive: true }) 178 179 watcher = watch( 180 teamDir, 181 { persistent: true, recursive: true }, 182 (_eventType, filename) => { 183 if (filename === null) { 184 schedulePush() 185 return 186 } 187 if (pushSuppressedReason !== null) { 188 // Suppression is only cleared by unlink (recovery action for 189 // too-many-entries). fs.watch doesn't distinguish unlink from 190 // add/write — stat to disambiguate. ENOENT → file gone → clear. 191 void stat(join(teamDir, filename)).catch( 192 (err: NodeJS.ErrnoException) => { 193 if (err.code !== 'ENOENT') return 194 if (pushSuppressedReason !== null) { 195 logForDebugging( 196 `team-memory-watcher: unlink cleared suppression (was: ${pushSuppressedReason})`, 197 { level: 'info' }, 198 ) 199 pushSuppressedReason = null 200 } 201 schedulePush() 202 }, 203 ) 204 return 205 } 206 schedulePush() 207 }, 208 ) 209 watcher.on('error', err => { 210 logForDebugging( 211 `team-memory-watcher: fs.watch error: ${errorMessage(err)}`, 212 { level: 'warn' }, 213 ) 214 }) 215 logForDebugging(`team-memory-watcher: watching ${teamDir}`, { 216 level: 'debug', 217 }) 218 } catch (err) { 219 // fs.watch throws synchronously on ENOENT (race: dir deleted between 220 // mkdir and watch) or EACCES. watcherStarted is already true above, 221 // so notifyTeamMemoryWrite's explicit schedulePush path still works. 222 logForDebugging( 223 `team-memory-watcher: failed to watch ${teamDir}: ${errorMessage(err)}`, 224 { level: 'warn' }, 225 ) 226 } 227 228 registerCleanup(async () => stopTeamMemoryWatcher()) 229 } 230 231 /** 232 * Start the team memory sync system. 233 * 234 * Returns early (before creating any state) if: 235 * - TEAMMEM build flag is off 236 * - team memory is disabled (isTeamMemoryEnabled) 237 * - OAuth is not available (isTeamMemorySyncAvailable) 238 * - the current repo has no github.com remote 239 * 240 * The early github.com check prevents a noisy failure mode where the 241 * watcher starts, it fires on local edits, and every push/pull 242 * logs `errorType: no_repo` forever. Team memory is GitHub-scoped on 243 * the server side, so non-github.com remotes can never sync anyway. 244 * 245 * Pulls from server, then starts the file watcher unconditionally. 246 * The watcher must start even when the server has no content yet 247 * (fresh EAP repo) — otherwise Claude's first team-memory write 248 * depends entirely on PostToolUse hooks firing notifyTeamMemoryWrite, 249 * which is a chicken-and-egg: Claude's write rate is low enough that 250 * a fresh partner can sit in the bootstrap dead zone for days. 251 */ 252 export async function startTeamMemoryWatcher(): Promise<void> { 253 if (!feature('TEAMMEM')) { 254 return 255 } 256 if (!isTeamMemoryEnabled() || !isTeamMemorySyncAvailable()) { 257 return 258 } 259 const repoSlug = await getGithubRepo() 260 if (!repoSlug) { 261 logForDebugging( 262 'team-memory-watcher: no github.com remote, skipping sync', 263 { level: 'debug' }, 264 ) 265 return 266 } 267 268 syncState = createSyncState() 269 270 // Initial pull from server (runs before the watcher starts, so its disk 271 // writes won't trigger schedulePush) 272 let initialPullSuccess = false 273 let initialFilesPulled = 0 274 let serverHasContent = false 275 try { 276 const pullResult = await pullTeamMemory(syncState) 277 initialPullSuccess = pullResult.success 278 serverHasContent = pullResult.entryCount > 0 279 if (pullResult.success && pullResult.filesWritten > 0) { 280 initialFilesPulled = pullResult.filesWritten 281 logForDebugging( 282 `team-memory-watcher: initial pull got ${pullResult.filesWritten} files`, 283 { level: 'info' }, 284 ) 285 } 286 } catch (e) { 287 logForDebugging( 288 `team-memory-watcher: initial pull failed: ${errorMessage(e)}`, 289 { level: 'warn' }, 290 ) 291 } 292 293 // Always start the watcher. Watching an empty dir is cheap, 294 // and the alternative (lazy start on notifyTeamMemoryWrite) creates 295 // a bootstrap dead zone for fresh repos. 296 await startFileWatcher(getTeamMemPath()) 297 298 logEvent('tengu_team_mem_sync_started', { 299 initial_pull_success: initialPullSuccess, 300 initial_files_pulled: initialFilesPulled, 301 // Kept for dashboard continuity; now always true when this event fires. 302 watcher_started: true, 303 server_has_content: serverHasContent, 304 }) 305 } 306 307 /** 308 * Call this when a team memory file is written (e.g. from PostToolUse hooks). 309 * Schedules a push explicitly in case fs.watch misses the write — 310 * a file written in the same tick the watcher starts may not fire an 311 * event, and some platforms coalesce rapid successive writes. 312 * If the watcher does fire, the debounce timer just resets. 313 */ 314 export async function notifyTeamMemoryWrite(): Promise<void> { 315 if (!syncState) { 316 return 317 } 318 schedulePush() 319 } 320 321 /** 322 * Stop the file watcher and flush pending changes. 323 * Note: runs within the 2s graceful shutdown budget, so the flush 324 * is best-effort — if the HTTP PUT doesn't complete in time, 325 * process.exit() will kill it. 326 */ 327 export async function stopTeamMemoryWatcher(): Promise<void> { 328 if (debounceTimer) { 329 clearTimeout(debounceTimer) 330 debounceTimer = null 331 } 332 if (watcher) { 333 watcher.close() 334 watcher = null 335 } 336 // Await any in-flight push 337 if (currentPushPromise) { 338 try { 339 await currentPushPromise 340 } catch { 341 // Ignore errors during shutdown 342 } 343 } 344 // Flush pending changes that were debounced but not yet pushed 345 if (hasPendingChanges && syncState && pushSuppressedReason === null) { 346 try { 347 await pushTeamMemory(syncState) 348 } catch { 349 // Best-effort — shutdown may kill this 350 } 351 } 352 } 353 354 /** 355 * Test-only: reset module state and optionally seed syncState. 356 * The feature('TEAMMEM') gate at the top of startTeamMemoryWatcher() is 357 * always false in bun test, so tests can't set syncState through the normal 358 * path. This helper lets tests drive notifyTeamMemoryWrite() / 359 * stopTeamMemoryWatcher() directly. 360 * 361 * `skipWatcher: true` marks the watcher as already-started without actually 362 * starting it. Tests that only exercise the schedulePush/flush path don't 363 * need a real watcher. 364 */ 365 export function _resetWatcherStateForTesting(opts?: { 366 syncState?: SyncState 367 skipWatcher?: boolean 368 pushSuppressedReason?: string | null 369 }): void { 370 watcher = null 371 debounceTimer = null 372 pushInProgress = false 373 hasPendingChanges = false 374 currentPushPromise = null 375 watcherStarted = opts?.skipWatcher ?? false 376 pushSuppressedReason = opts?.pushSuppressedReason ?? null 377 syncState = opts?.syncState ?? null 378 } 379 380 /** 381 * Test-only: start the real fs.watch on a specified directory. 382 * Used by the fd-count regression test — startTeamMemoryWatcher() is gated 383 * by feature('TEAMMEM') which is false under bun test. 384 */ 385 export function _startFileWatcherForTesting(dir: string): Promise<void> { 386 return startFileWatcher(dir) 387 }