/ services / analytics / firstPartyEventLoggingExporter.ts
firstPartyEventLoggingExporter.ts
  1  import type { HrTime } from '@opentelemetry/api'
  2  import { type ExportResult, ExportResultCode } from '@opentelemetry/core'
  3  import type {
  4    LogRecordExporter,
  5    ReadableLogRecord,
  6  } from '@opentelemetry/sdk-logs'
  7  import axios from 'axios'
  8  import { randomUUID } from 'crypto'
  9  import { appendFile, mkdir, readdir, unlink, writeFile } from 'fs/promises'
 10  import * as path from 'path'
 11  import type { CoreUserData } from 'src/utils/user.js'
 12  import {
 13    getIsNonInteractiveSession,
 14    getSessionId,
 15  } from '../../bootstrap/state.js'
 16  import { ClaudeCodeInternalEvent } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js'
 17  import { GrowthbookExperimentEvent } from '../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js'
 18  import {
 19    getClaudeAIOAuthTokens,
 20    hasProfileScope,
 21    isClaudeAISubscriber,
 22  } from '../../utils/auth.js'
 23  import { checkHasTrustDialogAccepted } from '../../utils/config.js'
 24  import { logForDebugging } from '../../utils/debug.js'
 25  import { getClaudeConfigHomeDir } from '../../utils/envUtils.js'
 26  import { errorMessage, isFsInaccessible, toError } from '../../utils/errors.js'
 27  import { getAuthHeaders } from '../../utils/http.js'
 28  import { readJSONLFile } from '../../utils/json.js'
 29  import { logError } from '../../utils/log.js'
 30  import { sleep } from '../../utils/sleep.js'
 31  import { jsonStringify } from '../../utils/slowOperations.js'
 32  import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
 33  import { isOAuthTokenExpired } from '../oauth/client.js'
 34  import { stripProtoFields } from './index.js'
 35  import { type EventMetadata, to1PEventFormat } from './metadata.js'
 36  
 37  // Unique ID for this process run - used to isolate failed event files between runs
 38  const BATCH_UUID = randomUUID()
 39  
 40  // File prefix for failed event storage
 41  const FILE_PREFIX = '1p_failed_events.'
 42  
 43  // Storage directory for failed events - evaluated at runtime to respect CLAUDE_CONFIG_DIR in tests
 44  function getStorageDir(): string {
 45    return path.join(getClaudeConfigHomeDir(), 'telemetry')
 46  }
 47  
 48  // API envelope - event_data is the JSON output from proto toJSON()
 49  type FirstPartyEventLoggingEvent = {
 50    event_type: 'ClaudeCodeInternalEvent' | 'GrowthbookExperimentEvent'
 51    event_data: unknown
 52  }
 53  
 54  type FirstPartyEventLoggingPayload = {
 55    events: FirstPartyEventLoggingEvent[]
 56  }
 57  
 58  /**
 59   * Exporter for 1st-party event logging to /api/event_logging/batch.
 60   *
 61   * Export cycles are controlled by OpenTelemetry's BatchLogRecordProcessor, which
 62   * triggers export() when either:
 63   * - Time interval elapses (default: 5 seconds via scheduledDelayMillis)
 64   * - Batch size is reached (default: 200 events via maxExportBatchSize)
 65   *
 66   * This exporter adds resilience on top:
 67   * - Append-only log for failed events (concurrency-safe)
 68   * - Quadratic backoff retry for failed events, dropped after maxAttempts
 69   * - Immediate retry of queued events when any export succeeds (endpoint is healthy)
 70   * - Chunking large event sets into smaller batches
 71   * - Auth fallback: retries without auth on 401 errors
 72   */
 73  export class FirstPartyEventLoggingExporter implements LogRecordExporter {
 74    private readonly endpoint: string
 75    private readonly timeout: number
 76    private readonly maxBatchSize: number
 77    private readonly skipAuth: boolean
 78    private readonly batchDelayMs: number
 79    private readonly baseBackoffDelayMs: number
 80    private readonly maxBackoffDelayMs: number
 81    private readonly maxAttempts: number
 82    private readonly isKilled: () => boolean
 83    private pendingExports: Promise<void>[] = []
 84    private isShutdown = false
 85    private readonly schedule: (
 86      fn: () => Promise<void>,
 87      delayMs: number,
 88    ) => () => void
 89    private cancelBackoff: (() => void) | null = null
 90    private attempts = 0
 91    private isRetrying = false
 92    private lastExportErrorContext: string | undefined
 93  
 94    constructor(
 95      options: {
 96        timeout?: number
 97        maxBatchSize?: number
 98        skipAuth?: boolean
 99        batchDelayMs?: number
100        baseBackoffDelayMs?: number
101        maxBackoffDelayMs?: number
102        maxAttempts?: number
103        path?: string
104        baseUrl?: string
105        // Injected killswitch probe. Checked per-POST so that disabling the
106        // firstParty sink also stops backoff retries (not just new emits).
107        // Passed in rather than imported to avoid a cycle with firstPartyEventLogger.ts.
108        isKilled?: () => boolean
109        schedule?: (fn: () => Promise<void>, delayMs: number) => () => void
110      } = {},
111    ) {
112      // Default: prod, except when ANTHROPIC_BASE_URL is explicitly staging.
113      // Overridable via tengu_1p_event_batch_config.baseUrl.
114      const baseUrl =
115        options.baseUrl ||
116        (process.env.ANTHROPIC_BASE_URL === 'https://api-staging.anthropic.com'
117          ? 'https://api-staging.anthropic.com'
118          : 'https://api.anthropic.com')
119  
120      this.endpoint = `${baseUrl}${options.path || '/api/event_logging/batch'}`
121  
122      this.timeout = options.timeout || 10000
123      this.maxBatchSize = options.maxBatchSize || 200
124      this.skipAuth = options.skipAuth ?? false
125      this.batchDelayMs = options.batchDelayMs || 100
126      this.baseBackoffDelayMs = options.baseBackoffDelayMs || 500
127      this.maxBackoffDelayMs = options.maxBackoffDelayMs || 30000
128      this.maxAttempts = options.maxAttempts ?? 8
129      this.isKilled = options.isKilled ?? (() => false)
130      this.schedule =
131        options.schedule ??
132        ((fn, ms) => {
133          const t = setTimeout(fn, ms)
134          return () => clearTimeout(t)
135        })
136  
137      // Retry any failed events from previous runs of this session (in background)
138      void this.retryPreviousBatches()
139    }
140  
141    // Expose for testing
142    async getQueuedEventCount(): Promise<number> {
143      return (await this.loadEventsFromCurrentBatch()).length
144    }
145  
146    // --- Storage helpers ---
147  
148    private getCurrentBatchFilePath(): string {
149      return path.join(
150        getStorageDir(),
151        `${FILE_PREFIX}${getSessionId()}.${BATCH_UUID}.json`,
152      )
153    }
154  
155    private async loadEventsFromFile(
156      filePath: string,
157    ): Promise<FirstPartyEventLoggingEvent[]> {
158      try {
159        return await readJSONLFile<FirstPartyEventLoggingEvent>(filePath)
160      } catch {
161        return []
162      }
163    }
164  
165    private async loadEventsFromCurrentBatch(): Promise<
166      FirstPartyEventLoggingEvent[]
167    > {
168      return this.loadEventsFromFile(this.getCurrentBatchFilePath())
169    }
170  
171    private async saveEventsToFile(
172      filePath: string,
173      events: FirstPartyEventLoggingEvent[],
174    ): Promise<void> {
175      try {
176        if (events.length === 0) {
177          try {
178            await unlink(filePath)
179          } catch {
180            // File doesn't exist, nothing to delete
181          }
182        } else {
183          // Ensure storage directory exists
184          await mkdir(getStorageDir(), { recursive: true })
185          // Write as JSON lines (one event per line)
186          const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
187          await writeFile(filePath, content, 'utf8')
188        }
189      } catch (error) {
190        logError(error)
191      }
192    }
193  
194    private async appendEventsToFile(
195      filePath: string,
196      events: FirstPartyEventLoggingEvent[],
197    ): Promise<void> {
198      if (events.length === 0) return
199      try {
200        // Ensure storage directory exists
201        await mkdir(getStorageDir(), { recursive: true })
202        // Append as JSON lines (one event per line) - atomic on most filesystems
203        const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
204        await appendFile(filePath, content, 'utf8')
205      } catch (error) {
206        logError(error)
207      }
208    }
209  
210    private async deleteFile(filePath: string): Promise<void> {
211      try {
212        await unlink(filePath)
213      } catch {
214        // File doesn't exist or can't be deleted, ignore
215      }
216    }
217  
218    // --- Previous batch retry (startup) ---
219  
220    private async retryPreviousBatches(): Promise<void> {
221      try {
222        const prefix = `${FILE_PREFIX}${getSessionId()}.`
223        let files: string[]
224        try {
225          files = (await readdir(getStorageDir()))
226            .filter((f: string) => f.startsWith(prefix) && f.endsWith('.json'))
227            .filter((f: string) => !f.includes(BATCH_UUID)) // Exclude current batch
228        } catch (e) {
229          if (isFsInaccessible(e)) return
230          throw e
231        }
232  
233        for (const file of files) {
234          const filePath = path.join(getStorageDir(), file)
235          void this.retryFileInBackground(filePath)
236        }
237      } catch (error) {
238        logError(error)
239      }
240    }
241  
242    private async retryFileInBackground(filePath: string): Promise<void> {
243      if (this.attempts >= this.maxAttempts) {
244        await this.deleteFile(filePath)
245        return
246      }
247  
248      const events = await this.loadEventsFromFile(filePath)
249      if (events.length === 0) {
250        await this.deleteFile(filePath)
251        return
252      }
253  
254      if (process.env.USER_TYPE === 'ant') {
255        logForDebugging(
256          `1P event logging: retrying ${events.length} events from previous batch`,
257        )
258      }
259  
260      const failedEvents = await this.sendEventsInBatches(events)
261      if (failedEvents.length === 0) {
262        await this.deleteFile(filePath)
263        if (process.env.USER_TYPE === 'ant') {
264          logForDebugging('1P event logging: previous batch retry succeeded')
265        }
266      } else {
267        // Save only the failed events back (not all original events)
268        await this.saveEventsToFile(filePath, failedEvents)
269        if (process.env.USER_TYPE === 'ant') {
270          logForDebugging(
271            `1P event logging: previous batch retry failed, ${failedEvents.length} events remain`,
272          )
273        }
274      }
275    }
276  
277    async export(
278      logs: ReadableLogRecord[],
279      resultCallback: (result: ExportResult) => void,
280    ): Promise<void> {
281      if (this.isShutdown) {
282        if (process.env.USER_TYPE === 'ant') {
283          logForDebugging(
284            '1P event logging export failed: Exporter has been shutdown',
285          )
286        }
287        resultCallback({
288          code: ExportResultCode.FAILED,
289          error: new Error('Exporter has been shutdown'),
290        })
291        return
292      }
293  
294      const exportPromise = this.doExport(logs, resultCallback)
295      this.pendingExports.push(exportPromise)
296  
297      // Clean up completed exports
298      void exportPromise.finally(() => {
299        const index = this.pendingExports.indexOf(exportPromise)
300        if (index > -1) {
301          void this.pendingExports.splice(index, 1)
302        }
303      })
304    }
305  
306    private async doExport(
307      logs: ReadableLogRecord[],
308      resultCallback: (result: ExportResult) => void,
309    ): Promise<void> {
310      try {
311        // Filter for event logs only (by scope name)
312        const eventLogs = logs.filter(
313          log =>
314            log.instrumentationScope?.name === 'com.anthropic.claude_code.events',
315        )
316  
317        if (eventLogs.length === 0) {
318          resultCallback({ code: ExportResultCode.SUCCESS })
319          return
320        }
321  
322        // Transform new logs (failed events are retried independently via backoff)
323        const events = this.transformLogsToEvents(eventLogs).events
324  
325        if (events.length === 0) {
326          resultCallback({ code: ExportResultCode.SUCCESS })
327          return
328        }
329  
330        if (this.attempts >= this.maxAttempts) {
331          resultCallback({
332            code: ExportResultCode.FAILED,
333            error: new Error(
334              `Dropped ${events.length} events: max attempts (${this.maxAttempts}) reached`,
335            ),
336          })
337          return
338        }
339  
340        // Send events
341        const failedEvents = await this.sendEventsInBatches(events)
342        this.attempts++
343  
344        if (failedEvents.length > 0) {
345          await this.queueFailedEvents(failedEvents)
346          this.scheduleBackoffRetry()
347          const context = this.lastExportErrorContext
348            ? ` (${this.lastExportErrorContext})`
349            : ''
350          resultCallback({
351            code: ExportResultCode.FAILED,
352            error: new Error(
353              `Failed to export ${failedEvents.length} events${context}`,
354            ),
355          })
356          return
357        }
358  
359        // Success - reset backoff and immediately retry any queued events
360        this.resetBackoff()
361        if ((await this.getQueuedEventCount()) > 0 && !this.isRetrying) {
362          void this.retryFailedEvents()
363        }
364        resultCallback({ code: ExportResultCode.SUCCESS })
365      } catch (error) {
366        if (process.env.USER_TYPE === 'ant') {
367          logForDebugging(
368            `1P event logging export failed: ${errorMessage(error)}`,
369          )
370        }
371        logError(error)
372        resultCallback({
373          code: ExportResultCode.FAILED,
374          error: toError(error),
375        })
376      }
377    }
378  
379    private async sendEventsInBatches(
380      events: FirstPartyEventLoggingEvent[],
381    ): Promise<FirstPartyEventLoggingEvent[]> {
382      // Chunk events into batches
383      const batches: FirstPartyEventLoggingEvent[][] = []
384      for (let i = 0; i < events.length; i += this.maxBatchSize) {
385        batches.push(events.slice(i, i + this.maxBatchSize))
386      }
387  
388      if (process.env.USER_TYPE === 'ant') {
389        logForDebugging(
390          `1P event logging: exporting ${events.length} events in ${batches.length} batch(es)`,
391        )
392      }
393  
394      // Send each batch with delay between them. On first failure, assume the
395      // endpoint is down and short-circuit: queue the failed batch plus all
396      // remaining unsent batches without POSTing them. The backoff retry will
397      // probe again with a single batch next tick.
398      const failedBatchEvents: FirstPartyEventLoggingEvent[] = []
399      let lastErrorContext: string | undefined
400      for (let i = 0; i < batches.length; i++) {
401        const batch = batches[i]!
402        try {
403          await this.sendBatchWithRetry({ events: batch })
404        } catch (error) {
405          lastErrorContext = getAxiosErrorContext(error)
406          for (let j = i; j < batches.length; j++) {
407            failedBatchEvents.push(...batches[j]!)
408          }
409          if (process.env.USER_TYPE === 'ant') {
410            const skipped = batches.length - 1 - i
411            logForDebugging(
412              `1P event logging: batch ${i + 1}/${batches.length} failed (${lastErrorContext}); short-circuiting ${skipped} remaining batch(es)`,
413            )
414          }
415          break
416        }
417  
418        if (i < batches.length - 1 && this.batchDelayMs > 0) {
419          await sleep(this.batchDelayMs)
420        }
421      }
422  
423      if (failedBatchEvents.length > 0 && lastErrorContext) {
424        this.lastExportErrorContext = lastErrorContext
425      }
426  
427      return failedBatchEvents
428    }
429  
430    private async queueFailedEvents(
431      events: FirstPartyEventLoggingEvent[],
432    ): Promise<void> {
433      const filePath = this.getCurrentBatchFilePath()
434  
435      // Append-only: just add new events to file (atomic on most filesystems)
436      await this.appendEventsToFile(filePath, events)
437  
438      const context = this.lastExportErrorContext
439        ? ` (${this.lastExportErrorContext})`
440        : ''
441      const message = `1P event logging: ${events.length} events failed to export${context}`
442      logError(new Error(message))
443    }
444  
445    private scheduleBackoffRetry(): void {
446      // Don't schedule if already retrying or shutdown
447      if (this.cancelBackoff || this.isRetrying || this.isShutdown) {
448        return
449      }
450  
451      // Quadratic backoff (matching Statsig SDK): base * attempts²
452      const delay = Math.min(
453        this.baseBackoffDelayMs * this.attempts * this.attempts,
454        this.maxBackoffDelayMs,
455      )
456  
457      if (process.env.USER_TYPE === 'ant') {
458        logForDebugging(
459          `1P event logging: scheduling backoff retry in ${delay}ms (attempt ${this.attempts})`,
460        )
461      }
462  
463      this.cancelBackoff = this.schedule(async () => {
464        this.cancelBackoff = null
465        await this.retryFailedEvents()
466      }, delay)
467    }
468  
469    private async retryFailedEvents(): Promise<void> {
470      const filePath = this.getCurrentBatchFilePath()
471  
472      // Keep retrying while there are events and endpoint is healthy
473      while (!this.isShutdown) {
474        const events = await this.loadEventsFromFile(filePath)
475        if (events.length === 0) break
476  
477        if (this.attempts >= this.maxAttempts) {
478          if (process.env.USER_TYPE === 'ant') {
479            logForDebugging(
480              `1P event logging: max attempts (${this.maxAttempts}) reached, dropping ${events.length} events`,
481            )
482          }
483          await this.deleteFile(filePath)
484          this.resetBackoff()
485          return
486        }
487  
488        this.isRetrying = true
489  
490        // Clear file before retry (we have events in memory now)
491        await this.deleteFile(filePath)
492  
493        if (process.env.USER_TYPE === 'ant') {
494          logForDebugging(
495            `1P event logging: retrying ${events.length} failed events (attempt ${this.attempts + 1})`,
496          )
497        }
498  
499        const failedEvents = await this.sendEventsInBatches(events)
500        this.attempts++
501  
502        this.isRetrying = false
503  
504        if (failedEvents.length > 0) {
505          // Write failures back to disk
506          await this.saveEventsToFile(filePath, failedEvents)
507          this.scheduleBackoffRetry()
508          return // Failed - wait for backoff
509        }
510  
511        // Success - reset backoff and continue loop to drain any newly queued events
512        this.resetBackoff()
513        if (process.env.USER_TYPE === 'ant') {
514          logForDebugging('1P event logging: backoff retry succeeded')
515        }
516      }
517    }
518  
519    private resetBackoff(): void {
520      this.attempts = 0
521      if (this.cancelBackoff) {
522        this.cancelBackoff()
523        this.cancelBackoff = null
524      }
525    }
526  
527    private async sendBatchWithRetry(
528      payload: FirstPartyEventLoggingPayload,
529    ): Promise<void> {
530      if (this.isKilled()) {
531        // Throw so the caller short-circuits remaining batches and queues
532        // everything to disk. Zero network traffic while killed; the backoff
533        // timer keeps ticking and will resume POSTs as soon as the GrowthBook
534        // cache picks up the cleared flag.
535        throw new Error('firstParty sink killswitch active')
536      }
537  
538      const baseHeaders: Record<string, string> = {
539        'Content-Type': 'application/json',
540        'User-Agent': getClaudeCodeUserAgent(),
541        'x-service-name': 'claude-code',
542      }
543  
544      // Skip auth if trust hasn't been established yet
545      // This prevents executing apiKeyHelper commands before the trust dialog
546      // Non-interactive sessions implicitly have workspace trust
547      const hasTrust =
548        checkHasTrustDialogAccepted() || getIsNonInteractiveSession()
549      if (process.env.USER_TYPE === 'ant' && !hasTrust) {
550        logForDebugging('1P event logging: Trust not accepted')
551      }
552  
553      // Skip auth when the OAuth token is expired or lacks user:profile
554      // scope (service key sessions). Falls through to unauthenticated send.
555      let shouldSkipAuth = this.skipAuth || !hasTrust
556      if (!shouldSkipAuth && isClaudeAISubscriber()) {
557        const tokens = getClaudeAIOAuthTokens()
558        if (!hasProfileScope()) {
559          shouldSkipAuth = true
560        } else if (tokens && isOAuthTokenExpired(tokens.expiresAt)) {
561          shouldSkipAuth = true
562          if (process.env.USER_TYPE === 'ant') {
563            logForDebugging(
564              '1P event logging: OAuth token expired, skipping auth to avoid 401',
565            )
566          }
567        }
568      }
569  
570      // Try with auth headers first (unless trust not established or token is known to be expired)
571      const authResult = shouldSkipAuth
572        ? { headers: {}, error: 'trust not established or Oauth token expired' }
573        : getAuthHeaders()
574      const useAuth = !authResult.error
575  
576      if (!useAuth && process.env.USER_TYPE === 'ant') {
577        logForDebugging(
578          `1P event logging: auth not available, sending without auth`,
579        )
580      }
581  
582      const headers = useAuth
583        ? { ...baseHeaders, ...authResult.headers }
584        : baseHeaders
585  
586      try {
587        const response = await axios.post(this.endpoint, payload, {
588          timeout: this.timeout,
589          headers,
590        })
591        this.logSuccess(payload.events.length, useAuth, response.data)
592        return
593      } catch (error) {
594        // Handle 401 by retrying without auth
595        if (
596          useAuth &&
597          axios.isAxiosError(error) &&
598          error.response?.status === 401
599        ) {
600          if (process.env.USER_TYPE === 'ant') {
601            logForDebugging(
602              '1P event logging: 401 auth error, retrying without auth',
603            )
604          }
605          const response = await axios.post(this.endpoint, payload, {
606            timeout: this.timeout,
607            headers: baseHeaders,
608          })
609          this.logSuccess(payload.events.length, false, response.data)
610          return
611        }
612  
613        throw error
614      }
615    }
616  
617    private logSuccess(
618      eventCount: number,
619      withAuth: boolean,
620      responseData: unknown,
621    ): void {
622      if (process.env.USER_TYPE === 'ant') {
623        logForDebugging(
624          `1P event logging: ${eventCount} events exported successfully${withAuth ? ' (with auth)' : ' (without auth)'}`,
625        )
626        logForDebugging(`API Response: ${jsonStringify(responseData, null, 2)}`)
627      }
628    }
629  
630    private hrTimeToDate(hrTime: HrTime): Date {
631      const [seconds, nanoseconds] = hrTime
632      return new Date(seconds * 1000 + nanoseconds / 1000000)
633    }
634  
635    private transformLogsToEvents(
636      logs: ReadableLogRecord[],
637    ): FirstPartyEventLoggingPayload {
638      const events: FirstPartyEventLoggingEvent[] = []
639  
640      for (const log of logs) {
641        const attributes = log.attributes || {}
642  
643        // Check if this is a GrowthBook experiment event
644        if (attributes.event_type === 'GrowthbookExperimentEvent') {
645          const timestamp = this.hrTimeToDate(log.hrTime)
646          const account_uuid = attributes.account_uuid as string | undefined
647          const organization_uuid = attributes.organization_uuid as
648            | string
649            | undefined
650          events.push({
651            event_type: 'GrowthbookExperimentEvent',
652            event_data: GrowthbookExperimentEvent.toJSON({
653              event_id: attributes.event_id as string,
654              timestamp,
655              experiment_id: attributes.experiment_id as string,
656              variation_id: attributes.variation_id as number,
657              environment: attributes.environment as string,
658              user_attributes: attributes.user_attributes as string,
659              experiment_metadata: attributes.experiment_metadata as string,
660              device_id: attributes.device_id as string,
661              session_id: attributes.session_id as string,
662              auth:
663                account_uuid || organization_uuid
664                  ? { account_uuid, organization_uuid }
665                  : undefined,
666            }),
667          })
668          continue
669        }
670  
671        // Extract event name
672        const eventName =
673          (attributes.event_name as string) || (log.body as string) || 'unknown'
674  
675        // Extract metadata objects directly (no JSON parsing needed)
676        const coreMetadata = attributes.core_metadata as EventMetadata | undefined
677        const userMetadata = attributes.user_metadata as CoreUserData
678        const eventMetadata = (attributes.event_metadata || {}) as Record<
679          string,
680          unknown
681        >
682  
683        if (!coreMetadata) {
684          // Emit partial event if core metadata is missing
685          if (process.env.USER_TYPE === 'ant') {
686            logForDebugging(
687              `1P event logging: core_metadata missing for event ${eventName}`,
688            )
689          }
690          events.push({
691            event_type: 'ClaudeCodeInternalEvent',
692            event_data: ClaudeCodeInternalEvent.toJSON({
693              event_id: attributes.event_id as string | undefined,
694              event_name: eventName,
695              client_timestamp: this.hrTimeToDate(log.hrTime),
696              session_id: getSessionId(),
697              additional_metadata: Buffer.from(
698                jsonStringify({
699                  transform_error: 'core_metadata attribute is missing',
700                }),
701              ).toString('base64'),
702            }),
703          })
704          continue
705        }
706  
707        // Transform to 1P format
708        const formatted = to1PEventFormat(
709          coreMetadata,
710          userMetadata,
711          eventMetadata,
712        )
713  
714        // _PROTO_* keys are PII-tagged values meant only for privileged BQ
715        // columns. Hoist known keys to proto fields, then defensively strip any
716        // remaining _PROTO_* so an unrecognized future key can't silently land
717        // in the general-access additional_metadata blob. sink.ts applies the
718        // same strip before Datadog; this closes the 1P side.
719        const {
720          _PROTO_skill_name,
721          _PROTO_plugin_name,
722          _PROTO_marketplace_name,
723          ...rest
724        } = formatted.additional
725        const additionalMetadata = stripProtoFields(rest)
726  
727        events.push({
728          event_type: 'ClaudeCodeInternalEvent',
729          event_data: ClaudeCodeInternalEvent.toJSON({
730            event_id: attributes.event_id as string | undefined,
731            event_name: eventName,
732            client_timestamp: this.hrTimeToDate(log.hrTime),
733            device_id: attributes.user_id as string | undefined,
734            email: userMetadata?.email,
735            auth: formatted.auth,
736            ...formatted.core,
737            env: formatted.env,
738            process: formatted.process,
739            skill_name:
740              typeof _PROTO_skill_name === 'string'
741                ? _PROTO_skill_name
742                : undefined,
743            plugin_name:
744              typeof _PROTO_plugin_name === 'string'
745                ? _PROTO_plugin_name
746                : undefined,
747            marketplace_name:
748              typeof _PROTO_marketplace_name === 'string'
749                ? _PROTO_marketplace_name
750                : undefined,
751            additional_metadata:
752              Object.keys(additionalMetadata).length > 0
753                ? Buffer.from(jsonStringify(additionalMetadata)).toString(
754                    'base64',
755                  )
756                : undefined,
757          }),
758        })
759      }
760  
761      return { events }
762    }
763  
764    async shutdown(): Promise<void> {
765      this.isShutdown = true
766      this.resetBackoff()
767      await this.forceFlush()
768      if (process.env.USER_TYPE === 'ant') {
769        logForDebugging('1P event logging exporter shutdown complete')
770      }
771    }
772  
773    async forceFlush(): Promise<void> {
774      await Promise.all(this.pendingExports)
775      if (process.env.USER_TYPE === 'ant') {
776        logForDebugging('1P event logging exporter flush complete')
777      }
778    }
779  }
780  
781  function getAxiosErrorContext(error: unknown): string {
782    if (!axios.isAxiosError(error)) {
783      return errorMessage(error)
784    }
785  
786    const parts: string[] = []
787  
788    const requestId = error.response?.headers?.['request-id']
789    if (requestId) {
790      parts.push(`request-id=${requestId}`)
791    }
792  
793    if (error.response?.status) {
794      parts.push(`status=${error.response.status}`)
795    }
796  
797    if (error.code) {
798      parts.push(`code=${error.code}`)
799    }
800  
801    if (error.message) {
802      parts.push(error.message)
803    }
804  
805    return parts.join(', ')
806  }