/ src / services / teamMemorySync / index.ts
index.ts
   1  /**
   2   * Team Memory Sync Service
   3   *
   4   * Syncs team memory files between the local filesystem and the server API.
   5   * Team memory is scoped per-repo (identified by git remote hash) and shared
   6   * across all authenticated org members.
   7   *
   8   * API contract (anthropic/anthropic#250711 + #283027):
   9   *   GET  /api/claude_code/team_memory?repo={owner/repo}            → TeamMemoryData (includes entryChecksums)
  10   *   GET  /api/claude_code/team_memory?repo={owner/repo}&view=hashes → metadata + entryChecksums only (no entry bodies)
  11   *   PUT  /api/claude_code/team_memory?repo={owner/repo}            → upload entries (upsert semantics)
  12   *   404 = no data exists yet
  13   *
  14   * Sync semantics:
  15   *   - Pull overwrites local files with server content (server wins per-key).
  16   *   - Push uploads only keys whose content hash differs from serverChecksums
  17   *     (delta upload). Server uses upsert: keys not in the PUT are preserved.
  18   *   - File deletions do NOT propagate: deleting a local file won't remove it
  19   *     from the server, and the next pull will restore it locally.
  20   *
  21   * State management:
  22   *   All mutable state (ETag tracking, watcher suppression) lives in a
  23   *   SyncState object created by the caller and threaded through every call.
  24   *   This avoids module-level mutable state and gives tests natural isolation.
  25   */
  26  
  27  import axios from 'axios'
  28  import { createHash } from 'crypto'
  29  import { mkdir, readdir, readFile, stat, writeFile } from 'fs/promises'
  30  import { join, relative, sep } from 'path'
  31  import {
  32    CLAUDE_AI_INFERENCE_SCOPE,
  33    CLAUDE_AI_PROFILE_SCOPE,
  34    getOauthConfig,
  35    OAUTH_BETA_HEADER,
  36  } from '../../constants/oauth.js'
  37  import {
  38    getTeamMemPath,
  39    PathTraversalError,
  40    validateTeamMemKey,
  41  } from '../../memdir/teamMemPaths.js'
  42  import { count } from '../../utils/array.js'
  43  import {
  44    checkAndRefreshOAuthTokenIfNeeded,
  45    getClaudeAIOAuthTokens,
  46  } from '../../utils/auth.js'
  47  import { logForDebugging } from '../../utils/debug.js'
  48  import { classifyAxiosError } from '../../utils/errors.js'
  49  import { getGithubRepo } from '../../utils/git.js'
  50  import {
  51    getAPIProvider,
  52    isFirstPartyAnthropicBaseUrl,
  53  } from '../../utils/model/providers.js'
  54  import { sleep } from '../../utils/sleep.js'
  55  import { jsonStringify } from '../../utils/slowOperations.js'
  56  import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
  57  import { logEvent } from '../analytics/index.js'
  58  import type { AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS } from '../analytics/metadata.js'
  59  import { getRetryDelay } from '../api/withRetry.js'
  60  import { scanForSecrets } from './secretScanner.js'
  61  import {
  62    type SkippedSecretFile,
  63    TeamMemoryDataSchema,
  64    type TeamMemoryHashesResult,
  65    type TeamMemorySyncFetchResult,
  66    type TeamMemorySyncPushResult,
  67    type TeamMemorySyncUploadResult,
  68    TeamMemoryTooManyEntriesSchema,
  69  } from './types.js'
  70  
  71  const TEAM_MEMORY_SYNC_TIMEOUT_MS = 30_000
  72  // Per-entry size cap — server default from anthropic/anthropic#293258.
  73  // Pre-filtering oversized entries saves bandwidth: the structured 413 for
  74  // this case doesn't give us anything to learn (one file is just too big).
  75  const MAX_FILE_SIZE_BYTES = 250_000
  76  // No client-side DEFAULT_MAX_ENTRIES: the server's entry-count cap is
  77  // GB-tunable per-org (claude_code_team_memory_limits), so any compile-time
  78  // constant here will drift.  We only truncate after learning the effective
  79  // limit from a structured 413's extra_details.max_entries.
  80  // Gateway body-size cap.  The API gateway rejects PUT bodies over ~256-512KB
  81  // with an unstructured (HTML) 413 before the request reaches the app server —
  82  // distinguishable from the app's structured entry-count 413 only by latency
  83  // (~750ms gateway vs ~2.3s app on comparable payloads).  #21969 removed the
  84  // client entry-count cap; cold pushes from heavy users then sent 300KB-1.4MB
  85  // bodies and hit this.  200KB leaves headroom under the observed threshold
  86  // and keeps a single-entry-at-MAX_FILE_SIZE_BYTES solo batch (~250KB) just
  87  // under the real gateway limit.  Batches larger than this are split into
  88  // sequential PUTs — server upsert-merge semantics make that safe.
  89  const MAX_PUT_BODY_BYTES = 200_000
  90  const MAX_RETRIES = 3
  91  const MAX_CONFLICT_RETRIES = 2
  92  
  93  // ─── Sync state ─────────────────────────────────────────────
  94  
  95  /**
  96   * Mutable state for the team memory sync service.
  97   * Created once per session by the watcher and passed to all sync functions.
  98   * Tests create a fresh instance per test for isolation.
  99   */
 100  export type SyncState = {
 101    /** Last known server checksum (ETag) for conditional requests. */
 102    lastKnownChecksum: string | null
 103    /**
 104     * Per-key content hash (`sha256:<hex>`) of what we believe the server
 105     * currently holds. Populated from server-provided entryChecksums on pull
 106     * and from local hashes on successful push. Used to compute the delta on
 107     * push — only keys whose local hash differs are uploaded.
 108     */
 109    serverChecksums: Map<string, string>
 110    /**
 111     * Server-enforced max_entries cap, learned from a structured 413 response
 112     * (anthropic/anthropic#293258 adds error_code + extra_details.max_entries).
 113     * Stays null until a 413 is observed — the server's cap is GB-tunable
 114     * per-org so there is no correct client-side default.  While null,
 115     * readLocalTeamMemory sends everything and lets the server be
 116     * authoritative (it rejects atomically).
 117     */
 118    serverMaxEntries: number | null
 119  }
 120  
 121  export function createSyncState(): SyncState {
 122    return {
 123      lastKnownChecksum: null,
 124      serverChecksums: new Map(),
 125      serverMaxEntries: null,
 126    }
 127  }
 128  
 129  /**
 130   * Compute `sha256:<hex>` over the UTF-8 bytes of the given content.
 131   * Format matches the server's entryChecksums values (anthropic/anthropic#283027)
 132   * so local-vs-server comparison works by direct string equality.
 133   */
 134  export function hashContent(content: string): string {
 135    return 'sha256:' + createHash('sha256').update(content, 'utf8').digest('hex')
 136  }
 137  
 138  /**
 139   * Type guard narrowing an unknown error to a Node.js errno-style exception.
 140   * Uses `in` narrowing so no `as` cast is needed at call sites.
 141   */
 142  function isErrnoException(e: unknown): e is NodeJS.ErrnoException {
 143    return e instanceof Error && 'code' in e && typeof e.code === 'string'
 144  }
 145  
 146  // ─── Auth & endpoint ─────────────────────────────────────────
 147  
 148  /**
 149   * Check if user is authenticated with first-party OAuth (required for team memory sync).
 150   */
 151  function isUsingOAuth(): boolean {
 152    if (getAPIProvider() !== 'firstParty' || !isFirstPartyAnthropicBaseUrl()) {
 153      return false
 154    }
 155    const tokens = getClaudeAIOAuthTokens()
 156    return Boolean(
 157      tokens?.accessToken &&
 158        tokens.scopes?.includes(CLAUDE_AI_INFERENCE_SCOPE) &&
 159        tokens.scopes.includes(CLAUDE_AI_PROFILE_SCOPE),
 160    )
 161  }
 162  
 163  function getTeamMemorySyncEndpoint(repoSlug: string): string {
 164    const baseUrl =
 165      process.env.TEAM_MEMORY_SYNC_URL || getOauthConfig().BASE_API_URL
 166    return `${baseUrl}/api/claude_code/team_memory?repo=${encodeURIComponent(repoSlug)}`
 167  }
 168  
 169  function getAuthHeaders(): {
 170    headers?: Record<string, string>
 171    error?: string
 172  } {
 173    const oauthTokens = getClaudeAIOAuthTokens()
 174    if (oauthTokens?.accessToken) {
 175      return {
 176        headers: {
 177          Authorization: `Bearer ${oauthTokens.accessToken}`,
 178          'anthropic-beta': OAUTH_BETA_HEADER,
 179          'User-Agent': getClaudeCodeUserAgent(),
 180        },
 181      }
 182    }
 183    return { error: 'No OAuth token available for team memory sync' }
 184  }
 185  
 186  // ─── Fetch (pull) ────────────────────────────────────────────
 187  
 188  async function fetchTeamMemoryOnce(
 189    state: SyncState,
 190    repoSlug: string,
 191    etag?: string | null,
 192  ): Promise<TeamMemorySyncFetchResult> {
 193    try {
 194      await checkAndRefreshOAuthTokenIfNeeded()
 195  
 196      const auth = getAuthHeaders()
 197      if (auth.error) {
 198        return {
 199          success: false,
 200          error: auth.error,
 201          skipRetry: true,
 202          errorType: 'auth',
 203        }
 204      }
 205  
 206      const headers: Record<string, string> = { ...auth.headers }
 207      if (etag) {
 208        headers['If-None-Match'] = `"${etag.replace(/"/g, '')}"`
 209      }
 210  
 211      const endpoint = getTeamMemorySyncEndpoint(repoSlug)
 212      const response = await axios.get(endpoint, {
 213        headers,
 214        timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS,
 215        validateStatus: status =>
 216          status === 200 || status === 304 || status === 404,
 217      })
 218  
 219      if (response.status === 304) {
 220        logForDebugging('team-memory-sync: not modified (304)', {
 221          level: 'debug',
 222        })
 223        return { success: true, notModified: true, checksum: etag ?? undefined }
 224      }
 225  
 226      if (response.status === 404) {
 227        logForDebugging('team-memory-sync: no remote data (404)', {
 228          level: 'debug',
 229        })
 230        state.lastKnownChecksum = null
 231        return { success: true, isEmpty: true }
 232      }
 233  
 234      const parsed = TeamMemoryDataSchema().safeParse(response.data)
 235      if (!parsed.success) {
 236        logForDebugging('team-memory-sync: invalid response format', {
 237          level: 'warn',
 238        })
 239        return {
 240          success: false,
 241          error: 'Invalid team memory response format',
 242          skipRetry: true,
 243          errorType: 'parse',
 244        }
 245      }
 246  
 247      // Extract checksum from response data or ETag header
 248      const responseChecksum =
 249        parsed.data.checksum ||
 250        response.headers['etag']?.replace(/^"|"$/g, '') ||
 251        undefined
 252      if (responseChecksum) {
 253        state.lastKnownChecksum = responseChecksum
 254      }
 255  
 256      logForDebugging(
 257        `team-memory-sync: fetched successfully (checksum: ${responseChecksum ?? 'none'})`,
 258        { level: 'debug' },
 259      )
 260      return {
 261        success: true,
 262        data: parsed.data,
 263        isEmpty: false,
 264        checksum: responseChecksum,
 265      }
 266    } catch (error) {
 267      const { kind, status, message } = classifyAxiosError(error)
 268      const body = axios.isAxiosError(error)
 269        ? JSON.stringify(error.response?.data ?? '')
 270        : ''
 271      if (kind !== 'other') {
 272        logForDebugging(`team-memory-sync: fetch error ${status}: ${body}`, {
 273          level: 'warn',
 274        })
 275      }
 276      switch (kind) {
 277        case 'auth':
 278          return {
 279            success: false,
 280            error: `Not authorized for team memory sync: ${body}`,
 281            skipRetry: true,
 282            errorType: 'auth',
 283            httpStatus: status,
 284          }
 285        case 'timeout':
 286          return {
 287            success: false,
 288            error: 'Team memory sync request timeout',
 289            errorType: 'timeout',
 290          }
 291        case 'network':
 292          return {
 293            success: false,
 294            error: 'Cannot connect to server',
 295            errorType: 'network',
 296          }
 297        default:
 298          return {
 299            success: false,
 300            error: message,
 301            errorType: 'unknown',
 302            httpStatus: status,
 303          }
 304      }
 305    }
 306  }
 307  
 308  /**
 309   * Fetch only per-key checksums + metadata (no entry bodies).
 310   * Used for cheap serverChecksums refresh during 412 conflict resolution — avoids
 311   * downloading ~300KB of content just to learn which keys changed.
 312   * Requires anthropic/anthropic#283027 deployed; on failure the caller fails the
 313   * push and the watcher retries on the next edit.
 314   */
 315  async function fetchTeamMemoryHashes(
 316    state: SyncState,
 317    repoSlug: string,
 318  ): Promise<TeamMemoryHashesResult> {
 319    try {
 320      await checkAndRefreshOAuthTokenIfNeeded()
 321      const auth = getAuthHeaders()
 322      if (auth.error) {
 323        return { success: false, error: auth.error, errorType: 'auth' }
 324      }
 325  
 326      const endpoint = getTeamMemorySyncEndpoint(repoSlug) + '&view=hashes'
 327      const response = await axios.get(endpoint, {
 328        headers: auth.headers,
 329        timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS,
 330        validateStatus: status => status === 200 || status === 404,
 331      })
 332  
 333      if (response.status === 404) {
 334        state.lastKnownChecksum = null
 335        return { success: true, entryChecksums: {} }
 336      }
 337  
 338      const checksum =
 339        response.data?.checksum || response.headers['etag']?.replace(/^"|"$/g, '')
 340      const entryChecksums = response.data?.entryChecksums
 341  
 342      // Requires anthropic/anthropic#283027. If entryChecksums is missing,
 343      // treat as a probe failure — caller fails the push; watcher retries.
 344      if (!entryChecksums || typeof entryChecksums !== 'object') {
 345        return {
 346          success: false,
 347          error:
 348            'Server did not return entryChecksums (?view=hashes unsupported)',
 349          errorType: 'parse',
 350        }
 351      }
 352  
 353      if (checksum) {
 354        state.lastKnownChecksum = checksum
 355      }
 356      return {
 357        success: true,
 358        version: response.data?.version,
 359        checksum,
 360        entryChecksums,
 361      }
 362    } catch (error) {
 363      const { kind, status, message } = classifyAxiosError(error)
 364      switch (kind) {
 365        case 'auth':
 366          return {
 367            success: false,
 368            error: 'Not authorized',
 369            errorType: 'auth',
 370            httpStatus: status,
 371          }
 372        case 'timeout':
 373          return { success: false, error: 'Timeout', errorType: 'timeout' }
 374        case 'network':
 375          return { success: false, error: 'Network error', errorType: 'network' }
 376        default:
 377          return {
 378            success: false,
 379            error: message,
 380            errorType: 'unknown',
 381            httpStatus: status,
 382          }
 383      }
 384    }
 385  }
 386  
 387  async function fetchTeamMemory(
 388    state: SyncState,
 389    repoSlug: string,
 390    etag?: string | null,
 391  ): Promise<TeamMemorySyncFetchResult> {
 392    let lastResult: TeamMemorySyncFetchResult | null = null
 393  
 394    for (let attempt = 1; attempt <= MAX_RETRIES + 1; attempt++) {
 395      lastResult = await fetchTeamMemoryOnce(state, repoSlug, etag)
 396      if (lastResult.success || lastResult.skipRetry) {
 397        return lastResult
 398      }
 399      if (attempt > MAX_RETRIES) {
 400        return lastResult
 401      }
 402      const delayMs = getRetryDelay(attempt)
 403      logForDebugging(`team-memory-sync: retry ${attempt}/${MAX_RETRIES}`, {
 404        level: 'debug',
 405      })
 406      await sleep(delayMs)
 407    }
 408  
 409    return lastResult!
 410  }
 411  
 412  // ─── Upload (push) ───────────────────────────────────────────
 413  
 414  /**
 415   * Split a delta into PUT-sized batches under MAX_PUT_BODY_BYTES each.
 416   *
 417   * Greedy bin-packing over sorted keys — sorting gives deterministic batches
 418   * across calls, which matters for ETag stability if the conflict loop retries
 419   * after a partial commit.  The byte count is the full serialized body
 420   * including JSON overhead, so what we measure is what axios sends.
 421   *
 422   * A single entry exceeding MAX_PUT_BODY_BYTES goes into its own solo batch
 423   * (MAX_FILE_SIZE_BYTES=250K already caps individual files; a ~250K solo body
 424   * is above our soft cap but below the gateway's observed real threshold).
 425   */
 426  export function batchDeltaByBytes(
 427    delta: Record<string, string>,
 428  ): Array<Record<string, string>> {
 429    const keys = Object.keys(delta).sort()
 430    if (keys.length === 0) return []
 431  
 432    // Fixed overhead for `{"entries":{}}` — each entry then adds its marginal
 433    // bytes.  jsonStringify (≡ JSON.stringify under the hood) on the raw
 434    // strings handles escaping so the count matches what axios serializes.
 435    const EMPTY_BODY_BYTES = Buffer.byteLength('{"entries":{}}', 'utf8')
 436    const entryBytes = (k: string, v: string): number =>
 437      Buffer.byteLength(jsonStringify(k), 'utf8') +
 438      Buffer.byteLength(jsonStringify(v), 'utf8') +
 439      2 // colon + comma (comma over-counts by 1 on the last entry; harmless slack)
 440  
 441    const batches: Array<Record<string, string>> = []
 442    let current: Record<string, string> = {}
 443    let currentBytes = EMPTY_BODY_BYTES
 444  
 445    for (const key of keys) {
 446      const added = entryBytes(key, delta[key]!)
 447      if (
 448        currentBytes + added > MAX_PUT_BODY_BYTES &&
 449        Object.keys(current).length > 0
 450      ) {
 451        batches.push(current)
 452        current = {}
 453        currentBytes = EMPTY_BODY_BYTES
 454      }
 455      current[key] = delta[key]!
 456      currentBytes += added
 457    }
 458    batches.push(current)
 459    return batches
 460  }
 461  
 462  async function uploadTeamMemory(
 463    state: SyncState,
 464    repoSlug: string,
 465    entries: Record<string, string>,
 466    ifMatchChecksum?: string | null,
 467  ): Promise<TeamMemorySyncUploadResult> {
 468    try {
 469      await checkAndRefreshOAuthTokenIfNeeded()
 470  
 471      const auth = getAuthHeaders()
 472      if (auth.error) {
 473        return { success: false, error: auth.error, errorType: 'auth' }
 474      }
 475  
 476      const headers: Record<string, string> = {
 477        ...auth.headers,
 478        'Content-Type': 'application/json',
 479      }
 480      if (ifMatchChecksum) {
 481        headers['If-Match'] = `"${ifMatchChecksum.replace(/"/g, '')}"`
 482      }
 483  
 484      const endpoint = getTeamMemorySyncEndpoint(repoSlug)
 485      const response = await axios.put(
 486        endpoint,
 487        { entries },
 488        {
 489          headers,
 490          timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS,
 491          validateStatus: status => status === 200 || status === 412,
 492        },
 493      )
 494  
 495      if (response.status === 412) {
 496        logForDebugging('team-memory-sync: conflict (412 Precondition Failed)', {
 497          level: 'info',
 498        })
 499        return { success: false, conflict: true, error: 'ETag mismatch' }
 500      }
 501  
 502      const responseChecksum = response.data?.checksum
 503      if (responseChecksum) {
 504        state.lastKnownChecksum = responseChecksum
 505      }
 506  
 507      logForDebugging(
 508        `team-memory-sync: uploaded ${Object.keys(entries).length} entries (checksum: ${responseChecksum ?? 'none'})`,
 509        { level: 'debug' },
 510      )
 511      return {
 512        success: true,
 513        checksum: responseChecksum,
 514        lastModified: response.data?.lastModified,
 515      }
 516    } catch (error) {
 517      const body = axios.isAxiosError(error)
 518        ? JSON.stringify(error.response?.data ?? '')
 519        : ''
 520      logForDebugging(
 521        `team-memory-sync: upload failed: ${error instanceof Error ? error.message : ''} ${body}`,
 522        { level: 'warn' },
 523      )
 524      const { kind, status: httpStatus, message } = classifyAxiosError(error)
 525      const errorType = kind === 'http' || kind === 'other' ? 'unknown' : kind
 526      let serverErrorCode: 'team_memory_too_many_entries' | undefined
 527      let serverMaxEntries: number | undefined
 528      let serverReceivedEntries: number | undefined
 529      // Parse structured 413 (anthropic/anthropic#293258). The server's
 530      // RequestTooLargeException includes error_code + extra_details with
 531      // the effective max_entries (may be GB-tuned per-org). Cache it so
 532      // the next push trims to the right value.
 533      if (httpStatus === 413 && axios.isAxiosError(error)) {
 534        const parsed = TeamMemoryTooManyEntriesSchema().safeParse(
 535          error.response?.data,
 536        )
 537        if (parsed.success) {
 538          serverErrorCode = parsed.data.error.details.error_code
 539          serverMaxEntries = parsed.data.error.details.max_entries
 540          serverReceivedEntries = parsed.data.error.details.received_entries
 541        }
 542      }
 543      return {
 544        success: false,
 545        error: message,
 546        errorType,
 547        httpStatus,
 548        ...(serverErrorCode !== undefined && { serverErrorCode }),
 549        ...(serverMaxEntries !== undefined && { serverMaxEntries }),
 550        ...(serverReceivedEntries !== undefined && { serverReceivedEntries }),
 551      }
 552    }
 553  }
 554  
 555  // ─── Local file operations ───────────────────────────────────
 556  
 557  /**
 558   * Read all team memory files from the local directory into a flat key-value map.
 559   * Keys are relative paths from the team memory directory.
 560   * Empty files are included (content will be empty string).
 561   *
 562   * PSR M22174: Each file is scanned for credentials before inclusion
 563   * using patterns from gitleaks. Files containing secrets are SKIPPED
 564   * (not uploaded) and collected in skippedSecrets so the caller can
 565   * warn the user.
 566   */
 567  async function readLocalTeamMemory(maxEntries: number | null): Promise<{
 568    entries: Record<string, string>
 569    skippedSecrets: SkippedSecretFile[]
 570  }> {
 571    const teamDir = getTeamMemPath()
 572    const entries: Record<string, string> = {}
 573    const skippedSecrets: SkippedSecretFile[] = []
 574  
 575    async function walkDir(dir: string): Promise<void> {
 576      try {
 577        const dirEntries = await readdir(dir, { withFileTypes: true })
 578        await Promise.all(
 579          dirEntries.map(async entry => {
 580            const fullPath = join(dir, entry.name)
 581            if (entry.isDirectory()) {
 582              await walkDir(fullPath)
 583            } else if (entry.isFile()) {
 584              try {
 585                const stats = await stat(fullPath)
 586                if (stats.size > MAX_FILE_SIZE_BYTES) {
 587                  logForDebugging(
 588                    `team-memory-sync: skipping oversized file ${entry.name} (${stats.size} > ${MAX_FILE_SIZE_BYTES} bytes)`,
 589                    { level: 'info' },
 590                  )
 591                  return
 592                }
 593                const content = await readFile(fullPath, 'utf8')
 594                const relPath = relative(teamDir, fullPath).replaceAll('\\', '/')
 595  
 596                // PSR M22174: scan for secrets BEFORE adding to the upload
 597                // payload. If a secret is detected, skip this file entirely
 598                // so it never leaves the machine.
 599                const secretMatches = scanForSecrets(content)
 600                if (secretMatches.length > 0) {
 601                  // Report only the first match per file — one secret is
 602                  // enough to skip the file and we don't want to log more
 603                  // than necessary about credential locations.
 604                  const firstMatch = secretMatches[0]!
 605                  skippedSecrets.push({
 606                    path: relPath,
 607                    ruleId: firstMatch.ruleId,
 608                    label: firstMatch.label,
 609                  })
 610                  logForDebugging(
 611                    `team-memory-sync: skipping "${relPath}" — detected ${firstMatch.label}`,
 612                    { level: 'warn' },
 613                  )
 614                  return
 615                }
 616  
 617                entries[relPath] = content
 618              } catch {
 619                // Skip unreadable files
 620              }
 621            }
 622          }),
 623        )
 624      } catch (e) {
 625        if (isErrnoException(e)) {
 626          if (e.code !== 'ENOENT' && e.code !== 'EACCES' && e.code !== 'EPERM') {
 627            throw e
 628          }
 629        } else {
 630          throw e
 631        }
 632      }
 633    }
 634  
 635    await walkDir(teamDir)
 636  
 637    // Truncate only if we've LEARNED a cap from the server (via a structured
 638    // 413's extra_details.max_entries — anthropic/anthropic#293258).  The
 639    // server's entry-count cap is GB-tunable per-org via
 640    // claude_code_team_memory_limits; we have no way to know it in advance.
 641    // Before the first 413 we send everything and let the server be
 642    // authoritative.  The server validates total stored entries after merge
 643    // (not PUT body count) and rejects atomically — nothing is written on 413.
 644    //
 645    // Sorting before truncation is what makes delta computation work: without
 646    // it, the parallel walk above picks a different N-of-M subset each push
 647    // (Promise.all resolves in completion order), serverChecksums misses keys,
 648    // and the "delta" balloons to near-full snapshot.  With deterministic
 649    // truncation, the same N keys are compared against the same server state.
 650    //
 651    // When disk has more files than the learned cap, alphabetically-last ones
 652    // consistently never sync.  When the merged (server + delta) count exceeds
 653    // the cap we still fail — recovering requires soft_delete_keys.
 654    const keys = Object.keys(entries).sort()
 655    if (maxEntries !== null && keys.length > maxEntries) {
 656      const dropped = keys.slice(maxEntries)
 657      logForDebugging(
 658        `team-memory-sync: ${keys.length} local entries exceeds server cap of ${maxEntries}; ${dropped.length} file(s) will NOT sync: ${dropped.join(', ')}. Consider consolidating or removing some team memory files.`,
 659        { level: 'warn' },
 660      )
 661      logEvent('tengu_team_mem_entries_capped', {
 662        total_entries: keys.length,
 663        dropped_count: dropped.length,
 664        max_entries: maxEntries,
 665      })
 666      const truncated: Record<string, string> = {}
 667      for (const key of keys.slice(0, maxEntries)) {
 668        truncated[key] = entries[key]!
 669      }
 670      return { entries: truncated, skippedSecrets }
 671    }
 672    return { entries, skippedSecrets }
 673  }
 674  
 675  /**
 676   * Write remote team memory entries to the local directory.
 677   * Validates every path against the team memory directory boundary.
 678   * Skips entries whose on-disk content already matches, so unchanged
 679   * files keep their mtime and don't spuriously invalidate the
 680   * getMemoryFiles cache or trigger watcher events.
 681   *
 682   * Parallel: each entry is processed independently (validate + read-compare
 683   * + mkdir + write). Concurrent mkdir on a shared parent is safe with
 684   * recursive: true (EEXIST is swallowed). The initial pull is the long
 685   * pole in startTeamMemoryWatcher — p99 was ~22s serial at 50 entries.
 686   *
 687   * Returns the number of files actually written.
 688   */
 689  async function writeRemoteEntriesToLocal(
 690    entries: Record<string, string>,
 691  ): Promise<number> {
 692    const results = await Promise.all(
 693      Object.entries(entries).map(async ([relPath, content]) => {
 694        let validatedPath: string
 695        try {
 696          validatedPath = await validateTeamMemKey(relPath)
 697        } catch (e) {
 698          if (e instanceof PathTraversalError) {
 699            logForDebugging(`team-memory-sync: ${e.message}`, { level: 'warn' })
 700            return false
 701          }
 702          throw e
 703        }
 704  
 705        const sizeBytes = Buffer.byteLength(content, 'utf8')
 706        if (sizeBytes > MAX_FILE_SIZE_BYTES) {
 707          logForDebugging(
 708            `team-memory-sync: skipping oversized remote entry "${relPath}"`,
 709            { level: 'info' },
 710          )
 711          return false
 712        }
 713  
 714        // Skip if on-disk content already matches. Handles the common case
 715        // where pull returns unchanged entries (skipEtagCache path, first
 716        // pull of a session with warm disk state from prior session).
 717        try {
 718          const existing = await readFile(validatedPath, 'utf8')
 719          if (existing === content) {
 720            return false
 721          }
 722        } catch (e) {
 723          if (
 724            isErrnoException(e) &&
 725            e.code !== 'ENOENT' &&
 726            e.code !== 'ENOTDIR'
 727          ) {
 728            logForDebugging(
 729              `team-memory-sync: unexpected read error for "${relPath}": ${e.code}`,
 730              { level: 'debug' },
 731            )
 732          }
 733          // Fall through to write for ENOENT/ENOTDIR (file doesn't exist yet)
 734        }
 735  
 736        try {
 737          const parentDir = validatedPath.substring(
 738            0,
 739            validatedPath.lastIndexOf(sep),
 740          )
 741          await mkdir(parentDir, { recursive: true })
 742          await writeFile(validatedPath, content, 'utf8')
 743          return true
 744        } catch (e) {
 745          logForDebugging(
 746            `team-memory-sync: failed to write "${relPath}": ${e}`,
 747            { level: 'warn' },
 748          )
 749          return false
 750        }
 751      }),
 752    )
 753  
 754    return count(results, Boolean)
 755  }
 756  
 757  // ─── Public API ──────────────────────────────────────────────
 758  
 759  /**
 760   * Check if team memory sync is available (requires first-party OAuth).
 761   */
 762  export function isTeamMemorySyncAvailable(): boolean {
 763    return isUsingOAuth()
 764  }
 765  
 766  /**
 767   * Pull team memory from the server and write to local directory.
 768   * Returns true if any files were updated.
 769   */
 770  export async function pullTeamMemory(
 771    state: SyncState,
 772    options?: { skipEtagCache?: boolean },
 773  ): Promise<{
 774    success: boolean
 775    filesWritten: number
 776    /** Number of entries the server returned, regardless of whether they were written to disk. */
 777    entryCount: number
 778    notModified?: boolean
 779    error?: string
 780  }> {
 781    const skipEtagCache = options?.skipEtagCache ?? false
 782    const startTime = Date.now()
 783  
 784    if (!isUsingOAuth()) {
 785      logPull(startTime, { success: false, errorType: 'no_oauth' })
 786      return {
 787        success: false,
 788        filesWritten: 0,
 789        entryCount: 0,
 790        error: 'OAuth not available',
 791      }
 792    }
 793  
 794    const repoSlug = await getGithubRepo()
 795    if (!repoSlug) {
 796      logPull(startTime, { success: false, errorType: 'no_repo' })
 797      return {
 798        success: false,
 799        filesWritten: 0,
 800        entryCount: 0,
 801        error: 'No git remote found',
 802      }
 803    }
 804  
 805    const etag = skipEtagCache ? null : state.lastKnownChecksum
 806    const result = await fetchTeamMemory(state, repoSlug, etag)
 807    if (!result.success) {
 808      logPull(startTime, {
 809        success: false,
 810        errorType: result.errorType,
 811        status: result.httpStatus,
 812      })
 813      return {
 814        success: false,
 815        filesWritten: 0,
 816        entryCount: 0,
 817        error: result.error,
 818      }
 819    }
 820    if (result.notModified) {
 821      logPull(startTime, { success: true, notModified: true })
 822      return { success: true, filesWritten: 0, entryCount: 0, notModified: true }
 823    }
 824    if (result.isEmpty || !result.data) {
 825      // Server has no data — clear stale serverChecksums so the next push
 826      // doesn't skip entries it thinks the server already has.
 827      state.serverChecksums.clear()
 828      logPull(startTime, { success: true })
 829      return { success: true, filesWritten: 0, entryCount: 0 }
 830    }
 831  
 832    const entries = result.data.content.entries
 833    const responseChecksums = result.data.content.entryChecksums
 834  
 835    // Refresh serverChecksums from server-provided per-key hashes.
 836    // Requires anthropic/anthropic#283027 — if the response lacks entryChecksums
 837    // (pre-deploy server), serverChecksums stays empty and the next push uploads
 838    // everything; it self-corrects on push success.
 839    state.serverChecksums.clear()
 840    if (responseChecksums) {
 841      for (const [key, hash] of Object.entries(responseChecksums)) {
 842        state.serverChecksums.set(key, hash)
 843      }
 844    } else {
 845      logForDebugging(
 846        'team-memory-sync: server response missing entryChecksums (pre-#283027 deploy) — next push will be full, not delta',
 847        { level: 'debug' },
 848      )
 849    }
 850  
 851    const filesWritten = await writeRemoteEntriesToLocal(entries)
 852    if (filesWritten > 0) {
 853      const { clearMemoryFileCaches } = await import('../../utils/claudemd.js')
 854      clearMemoryFileCaches()
 855    }
 856    logForDebugging(`team-memory-sync: pulled ${filesWritten} files`, {
 857      level: 'info',
 858    })
 859  
 860    logPull(startTime, { success: true, filesWritten })
 861  
 862    return {
 863      success: true,
 864      filesWritten,
 865      entryCount: Object.keys(entries).length,
 866    }
 867  }
 868  
 869  /**
 870   * Push local team memory files to the server with optimistic locking.
 871   *
 872   * Uses delta upload: only keys whose local content hash differs from
 873   * serverChecksums are included in the PUT. On 412 conflict, probes
 874   * GET ?view=hashes to refresh serverChecksums, recomputes the delta
 875   * (naturally excluding keys where a teammate's push matches ours),
 876   * and retries. No merge, no disk writes — server-only new keys from
 877   * a teammate's concurrent push propagate on the next pull.
 878   *
 879   * Local-wins-on-conflict is the opposite of syncTeamMemory's pull-first
 880   * semantics. This is intentional: pushTeamMemory is triggered by a local edit,
 881   * and that edit must not be silently discarded just because a teammate pushed
 882   * in the meantime. Content-level merge (same key, both changed) is not
 883   * attempted — the local version simply overwrites the server version for that
 884   * key, and the server's edit to that key is lost. This is the lesser evil:
 885   * the local user is actively editing and can re-incorporate the teammate's
 886   * changes, whereas silently discarding the local edit loses work the user
 887   * just did with no recourse.
 888   */
 889  export async function pushTeamMemory(
 890    state: SyncState,
 891  ): Promise<TeamMemorySyncPushResult> {
 892    const startTime = Date.now()
 893    let conflictRetries = 0
 894  
 895    if (!isUsingOAuth()) {
 896      logPush(startTime, { success: false, errorType: 'no_oauth' })
 897      return {
 898        success: false,
 899        filesUploaded: 0,
 900        error: 'OAuth not available',
 901        errorType: 'no_oauth',
 902      }
 903    }
 904  
 905    const repoSlug = await getGithubRepo()
 906    if (!repoSlug) {
 907      logPush(startTime, { success: false, errorType: 'no_repo' })
 908      return {
 909        success: false,
 910        filesUploaded: 0,
 911        error: 'No git remote found',
 912        errorType: 'no_repo',
 913      }
 914    }
 915  
 916    // Read local entries once at the start. Conflict resolution does NOT re-read
 917    // from disk — the delta computation against a refreshed serverChecksums naturally
 918    // excludes server-origin content, so the user's local edit cannot be clobbered.
 919    // Secret scanning (PSR M22174) happens here once — files with detected
 920    // secrets are excluded from the upload set.
 921    const localRead = await readLocalTeamMemory(state.serverMaxEntries)
 922    const entries = localRead.entries
 923    const skippedSecrets = localRead.skippedSecrets
 924    if (skippedSecrets.length > 0) {
 925      // Log a user-visible warning listing which files were skipped and why.
 926      // Don't block the push — just exclude those files. The secret VALUE is
 927      // never logged, only the type label.
 928      const summary = skippedSecrets
 929        .map(s => `"${s.path}" (${s.label})`)
 930        .join(', ')
 931      logForDebugging(
 932        `team-memory-sync: ${skippedSecrets.length} file(s) skipped due to detected secrets: ${summary}. Remove the secret(s) to enable sync for these files.`,
 933        { level: 'warn' },
 934      )
 935      logEvent('tengu_team_mem_secret_skipped', {
 936        file_count: skippedSecrets.length,
 937        // Only log gitleaks rule IDs (not values, not paths — paths could
 938        // leak repo structure). Comma-joined for compact single-field analytics.
 939        rule_ids: skippedSecrets
 940          .map(s => s.ruleId)
 941          .join(
 942            ',',
 943          ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 944      })
 945    }
 946  
 947    // Hash each local entry once. The loop recomputes the delta each iteration
 948    // (serverChecksums may change after a 412 probe) but local hashes are stable.
 949    const localHashes = new Map<string, string>()
 950    for (const [key, content] of Object.entries(entries)) {
 951      localHashes.set(key, hashContent(content))
 952    }
 953  
 954    let sawConflict = false
 955  
 956    for (
 957      let conflictAttempt = 0;
 958      conflictAttempt <= MAX_CONFLICT_RETRIES;
 959      conflictAttempt++
 960    ) {
 961      // Delta: only upload keys whose content hash differs from what we believe
 962      // the server holds. On first push after a fresh pull, this is exactly the
 963      // user's local edits. After a 412 probe, matching hashes are excluded —
 964      // server-origin content from a teammate's concurrent push is naturally
 965      // dropped from the delta, so we never re-upload it.
 966      const delta: Record<string, string> = {}
 967      for (const [key, localHash] of localHashes) {
 968        if (state.serverChecksums.get(key) !== localHash) {
 969          delta[key] = entries[key]!
 970        }
 971      }
 972      const deltaCount = Object.keys(delta).length
 973  
 974      if (deltaCount === 0) {
 975        // Nothing to upload. This is the expected fast path after a fresh pull
 976        // with no local edits, and also the convergence point after a 412 where
 977        // the teammate's push was a strict superset of ours.
 978        logPush(startTime, {
 979          success: true,
 980          conflict: sawConflict,
 981          conflictRetries,
 982        })
 983        return {
 984          success: true,
 985          filesUploaded: 0,
 986          ...(skippedSecrets.length > 0 && { skippedSecrets }),
 987        }
 988      }
 989  
 990      // Split the delta into PUT-sized batches to stay under the gateway's
 991      // body-size limit.  Typical deltas (1-3 edited files) land in one batch;
 992      // cold pushes with many files are where this earns its keep.  Each batch
 993      // is a complete PUT that upserts its keys independently — if batch N
 994      // fails, batches 1..N-1 are already committed server-side.  Updating
 995      // serverChecksums after each success means the outer conflict-loop retry
 996      // naturally resumes from the uncommitted tail (those keys still differ).
 997      // state.lastKnownChecksum is updated inside uploadTeamMemory on each
 998      // 200, so the ETag chain threads through the batches automatically.
 999      const batches = batchDeltaByBytes(delta)
1000      let filesUploaded = 0
1001      let result: TeamMemorySyncUploadResult | undefined
1002  
1003      for (const batch of batches) {
1004        result = await uploadTeamMemory(
1005          state,
1006          repoSlug,
1007          batch,
1008          state.lastKnownChecksum,
1009        )
1010        if (!result.success) break
1011  
1012        for (const key of Object.keys(batch)) {
1013          state.serverChecksums.set(key, localHashes.get(key)!)
1014        }
1015        filesUploaded += Object.keys(batch).length
1016      }
1017      // batches is non-empty (deltaCount > 0 guaranteed by the check above),
1018      // so the loop executed at least once.
1019      result = result!
1020  
1021      if (result.success) {
1022        // Server-side delta propagation to disk (server-only new keys from a
1023        // teammate's concurrent push) happens on the next pull — we only
1024        // fetched hashes during conflict resolution, not bodies.
1025        logForDebugging(
1026          batches.length > 1
1027            ? `team-memory-sync: pushed ${filesUploaded} of ${localHashes.size} files in ${batches.length} batches`
1028            : `team-memory-sync: pushed ${filesUploaded} of ${localHashes.size} files (delta)`,
1029          { level: 'info' },
1030        )
1031        logPush(startTime, {
1032          success: true,
1033          filesUploaded,
1034          conflict: sawConflict,
1035          conflictRetries,
1036          putBatches: batches.length > 1 ? batches.length : undefined,
1037        })
1038        return {
1039          success: true,
1040          filesUploaded,
1041          checksum: result.checksum,
1042          ...(skippedSecrets.length > 0 && { skippedSecrets }),
1043        }
1044      }
1045  
1046      if (!result.conflict) {
1047        // If the server returned a structured 413 with its effective
1048        // max_entries (anthropic/anthropic#293258), cache it so the next push
1049        // trims to the right cap. The server may GB-tune this per-org.
1050        // This push still fails — re-trimming mid-push would require re-reading
1051        // local entries and re-computing the delta, and we'd need
1052        // soft_delete_keys to shrink below current server count anyway.
1053        if (result.serverMaxEntries !== undefined) {
1054          state.serverMaxEntries = result.serverMaxEntries
1055          logForDebugging(
1056            `team-memory-sync: learned server max_entries=${result.serverMaxEntries} from 413; next push will truncate to this`,
1057            { level: 'warn' },
1058          )
1059        }
1060        // filesUploaded may be nonzero if earlier batches committed before this
1061        // one failed. Those keys ARE on the server; the push is a failure
1062        // because it's incomplete, but we don't re-upload them on retry
1063        // (serverChecksums was updated).
1064        logPush(startTime, {
1065          success: false,
1066          filesUploaded,
1067          conflictRetries,
1068          putBatches: batches.length > 1 ? batches.length : undefined,
1069          errorType: result.errorType,
1070          status: result.httpStatus,
1071          // Datadog: filter @error_code:team_memory_too_many_entries to track
1072          // too-many-files rejections distinct from gateway/unstructured 413s
1073          errorCode: result.serverErrorCode,
1074          serverMaxEntries: result.serverMaxEntries,
1075          serverReceivedEntries: result.serverReceivedEntries,
1076        })
1077        return {
1078          success: false,
1079          filesUploaded,
1080          error: result.error,
1081          errorType: result.errorType,
1082          httpStatus: result.httpStatus,
1083        }
1084      }
1085  
1086      // 412 conflict — refresh serverChecksums and retry with a tighter delta.
1087      sawConflict = true
1088      if (conflictAttempt >= MAX_CONFLICT_RETRIES) {
1089        logForDebugging(
1090          `team-memory-sync: giving up after ${MAX_CONFLICT_RETRIES} conflict retries`,
1091          { level: 'warn' },
1092        )
1093        logPush(startTime, {
1094          success: false,
1095          conflict: true,
1096          conflictRetries,
1097          errorType: 'conflict',
1098        })
1099        return {
1100          success: false,
1101          filesUploaded: 0,
1102          conflict: true,
1103          error: 'Conflict resolution failed after retries',
1104        }
1105      }
1106  
1107      conflictRetries++
1108  
1109      logForDebugging(
1110        `team-memory-sync: conflict (412), probing server hashes (attempt ${conflictAttempt + 1}/${MAX_CONFLICT_RETRIES})`,
1111        { level: 'info' },
1112      )
1113  
1114      // Cheap probe: fetch only per-key checksums, no entry bodies. Refreshes
1115      // serverChecksums so the next iteration's delta drops any keys a teammate just
1116      // pushed with identical content.
1117      const probe = await fetchTeamMemoryHashes(state, repoSlug)
1118      if (!probe.success || !probe.entryChecksums) {
1119        // Requires anthropic/anthropic#283027. A transient probe failure here is
1120        // fine: the push is failed and the watcher will retry on the next edit.
1121        logPush(startTime, {
1122          success: false,
1123          conflict: true,
1124          conflictRetries,
1125          errorType: 'conflict',
1126        })
1127        return {
1128          success: false,
1129          filesUploaded: 0,
1130          conflict: true,
1131          error: `Conflict resolution hashes probe failed: ${probe.error}`,
1132        }
1133      }
1134      state.serverChecksums.clear()
1135      for (const [key, hash] of Object.entries(probe.entryChecksums)) {
1136        state.serverChecksums.set(key, hash)
1137      }
1138    }
1139  
1140    logPush(startTime, { success: false, conflictRetries })
1141    return {
1142      success: false,
1143      filesUploaded: 0,
1144      error: 'Unexpected end of conflict resolution loop',
1145    }
1146  }
1147  
1148  /**
1149   * Bidirectional sync: pull from server, merge with local, push back.
1150   * Server entries take precedence on conflict (last-write-wins by the server).
1151   * Push uses conflict resolution (retries on 412) via pushTeamMemory.
1152   */
1153  export async function syncTeamMemory(state: SyncState): Promise<{
1154    success: boolean
1155    filesPulled: number
1156    filesPushed: number
1157    error?: string
1158  }> {
1159    // 1. Pull remote → local (skip ETag cache for full sync)
1160    const pullResult = await pullTeamMemory(state, { skipEtagCache: true })
1161    if (!pullResult.success) {
1162      return {
1163        success: false,
1164        filesPulled: 0,
1165        filesPushed: 0,
1166        error: pullResult.error,
1167      }
1168    }
1169  
1170    // 2. Push local → remote (with conflict resolution)
1171    const pushResult = await pushTeamMemory(state)
1172    if (!pushResult.success) {
1173      return {
1174        success: false,
1175        filesPulled: pullResult.filesWritten,
1176        filesPushed: 0,
1177        error: pushResult.error,
1178      }
1179    }
1180  
1181    logForDebugging(
1182      `team-memory-sync: synced (pulled ${pullResult.filesWritten}, pushed ${pushResult.filesUploaded})`,
1183      { level: 'info' },
1184    )
1185  
1186    return {
1187      success: true,
1188      filesPulled: pullResult.filesWritten,
1189      filesPushed: pushResult.filesUploaded,
1190    }
1191  }
1192  
1193  // ─── Telemetry helpers ───────────────────────────────────────
1194  
1195  function logPull(
1196    startTime: number,
1197    outcome: {
1198      success: boolean
1199      filesWritten?: number
1200      notModified?: boolean
1201      errorType?: string
1202      status?: number
1203    },
1204  ): void {
1205    logEvent('tengu_team_mem_sync_pull', {
1206      success: outcome.success,
1207      files_written: outcome.filesWritten ?? 0,
1208      not_modified: outcome.notModified ?? false,
1209      duration_ms: Date.now() - startTime,
1210      ...(outcome.errorType && {
1211        errorType:
1212          outcome.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1213      }),
1214      ...(outcome.status && { status: outcome.status }),
1215    })
1216  }
1217  
1218  function logPush(
1219    startTime: number,
1220    outcome: {
1221      success: boolean
1222      filesUploaded?: number
1223      conflict?: boolean
1224      conflictRetries?: number
1225      errorType?: string
1226      status?: number
1227      putBatches?: number
1228      errorCode?: string
1229      serverMaxEntries?: number
1230      serverReceivedEntries?: number
1231    },
1232  ): void {
1233    logEvent('tengu_team_mem_sync_push', {
1234      success: outcome.success,
1235      files_uploaded: outcome.filesUploaded ?? 0,
1236      conflict: outcome.conflict ?? false,
1237      conflict_retries: outcome.conflictRetries ?? 0,
1238      duration_ms: Date.now() - startTime,
1239      ...(outcome.errorType && {
1240        errorType:
1241          outcome.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1242      }),
1243      ...(outcome.status && { status: outcome.status }),
1244      ...(outcome.putBatches && { put_batches: outcome.putBatches }),
1245      ...(outcome.errorCode && {
1246        error_code:
1247          outcome.errorCode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1248      }),
1249      ...(outcome.serverMaxEntries !== undefined && {
1250        server_max_entries: outcome.serverMaxEntries,
1251      }),
1252      ...(outcome.serverReceivedEntries !== undefined && {
1253        server_received_entries: outcome.serverReceivedEntries,
1254      }),
1255    })
1256  }