/ src / services / analytics / firstPartyEventLogger.ts
firstPartyEventLogger.ts
  1  import type { AnyValueMap, Logger, logs } from '@opentelemetry/api-logs'
  2  import { resourceFromAttributes } from '@opentelemetry/resources'
  3  import {
  4    BatchLogRecordProcessor,
  5    LoggerProvider,
  6  } from '@opentelemetry/sdk-logs'
  7  import {
  8    ATTR_SERVICE_NAME,
  9    ATTR_SERVICE_VERSION,
 10  } from '@opentelemetry/semantic-conventions'
 11  import { randomUUID } from 'crypto'
 12  import { isEqual } from 'lodash-es'
 13  import { getOrCreateUserID } from '../../utils/config.js'
 14  import { logForDebugging } from '../../utils/debug.js'
 15  import { logError } from '../../utils/log.js'
 16  import { getPlatform, getWslVersion } from '../../utils/platform.js'
 17  import { jsonStringify } from '../../utils/slowOperations.js'
 18  import { profileCheckpoint } from '../../utils/startupProfiler.js'
 19  import { getCoreUserData } from '../../utils/user.js'
 20  import { isAnalyticsDisabled } from './config.js'
 21  import { FirstPartyEventLoggingExporter } from './firstPartyEventLoggingExporter.js'
 22  import type { GrowthBookUserAttributes } from './growthbook.js'
 23  import { getDynamicConfig_CACHED_MAY_BE_STALE } from './growthbook.js'
 24  import { getEventMetadata } from './metadata.js'
 25  import { isSinkKilled } from './sinkKillswitch.js'
 26  
 27  /**
 28   * Configuration for sampling individual event types.
 29   * Each event name maps to an object containing sample_rate (0-1).
 30   * Events not in the config are logged at 100% rate.
 31   */
 32  export type EventSamplingConfig = {
 33    [eventName: string]: {
 34      sample_rate: number
 35    }
 36  }
 37  
 38  const EVENT_SAMPLING_CONFIG_NAME = 'tengu_event_sampling_config'
 39  /**
 40   * Get the event sampling configuration from GrowthBook.
 41   * Uses cached value if available, updates cache in background.
 42   */
 43  export function getEventSamplingConfig(): EventSamplingConfig {
 44    return getDynamicConfig_CACHED_MAY_BE_STALE<EventSamplingConfig>(
 45      EVENT_SAMPLING_CONFIG_NAME,
 46      {},
 47    )
 48  }
 49  
 50  /**
 51   * Determine if an event should be sampled based on its sample rate.
 52   * Returns the sample rate if sampled, null if not sampled.
 53   *
 54   * @param eventName - Name of the event to check
 55   * @returns The sample_rate if event should be logged, null if it should be dropped
 56   */
 57  export function shouldSampleEvent(eventName: string): number | null {
 58    const config = getEventSamplingConfig()
 59    const eventConfig = config[eventName]
 60  
 61    // If no config for this event, log at 100% rate (no sampling)
 62    if (!eventConfig) {
 63      return null
 64    }
 65  
 66    const sampleRate = eventConfig.sample_rate
 67  
 68    // Validate sample rate is in valid range
 69    if (typeof sampleRate !== 'number' || sampleRate < 0 || sampleRate > 1) {
 70      return null
 71    }
 72  
 73    // Sample rate of 1 means log everything (no need to add metadata)
 74    if (sampleRate >= 1) {
 75      return null
 76    }
 77  
 78    // Sample rate of 0 means drop everything
 79    if (sampleRate <= 0) {
 80      return 0
 81    }
 82  
 83    // Randomly decide whether to sample this event
 84    return Math.random() < sampleRate ? sampleRate : 0
 85  }
 86  
 87  const BATCH_CONFIG_NAME = 'tengu_1p_event_batch_config'
 88  type BatchConfig = {
 89    scheduledDelayMillis?: number
 90    maxExportBatchSize?: number
 91    maxQueueSize?: number
 92    skipAuth?: boolean
 93    maxAttempts?: number
 94    path?: string
 95    baseUrl?: string
 96  }
 97  function getBatchConfig(): BatchConfig {
 98    return getDynamicConfig_CACHED_MAY_BE_STALE<BatchConfig>(
 99      BATCH_CONFIG_NAME,
100      {},
101    )
102  }
103  
104  // Module-local state for event logging (not exposed globally)
105  let firstPartyEventLogger: ReturnType<typeof logs.getLogger> | null = null
106  let firstPartyEventLoggerProvider: LoggerProvider | null = null
107  // Last batch config used to construct the provider — used by
108  // reinitialize1PEventLoggingIfConfigChanged to decide whether a rebuild is
109  // needed when GrowthBook refreshes.
110  let lastBatchConfig: BatchConfig | null = null
111  /**
112   * Flush and shutdown the 1P event logger.
113   * This should be called as the final step before process exit to ensure
114   * all events (including late ones from API responses) are exported.
115   */
116  export async function shutdown1PEventLogging(): Promise<void> {
117    if (!firstPartyEventLoggerProvider) {
118      return
119    }
120    try {
121      await firstPartyEventLoggerProvider.shutdown()
122      if (process.env.USER_TYPE === 'ant') {
123        logForDebugging('1P event logging: final shutdown complete')
124      }
125    } catch {
126      // Ignore shutdown errors
127    }
128  }
129  
130  /**
131   * Check if 1P event logging is enabled.
132   * Respects the same opt-outs as other analytics sinks:
133   * - Test environment
134   * - Third-party cloud providers (Bedrock/Vertex)
135   * - Global telemetry opt-outs
136   * - Non-essential traffic disabled
137   *
138   * Note: Unlike BigQuery metrics, event logging does NOT check organization-level
139   * metrics opt-out via API. It follows the same pattern as Statsig event logging.
140   */
141  export function is1PEventLoggingEnabled(): boolean {
142    // Respect standard analytics opt-outs
143    return !isAnalyticsDisabled()
144  }
145  
146  /**
147   * Log a 1st-party event for internal analytics (async version).
148   * Events are batched and exported to /api/event_logging/batch
149   *
150   * This enriches the event with core metadata (model, session, env context, etc.)
151   * at log time, similar to logEventToStatsig.
152   *
153   * @param eventName - Name of the event (e.g., 'tengu_api_query')
154   * @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
155   */
156  async function logEventTo1PAsync(
157    firstPartyEventLogger: Logger,
158    eventName: string,
159    metadata: Record<string, number | boolean | undefined> = {},
160  ): Promise<void> {
161    try {
162      // Enrich with core metadata at log time (similar to Statsig pattern)
163      const coreMetadata = await getEventMetadata({
164        model: metadata.model,
165        betas: metadata.betas,
166      })
167  
168      // Build attributes - OTel supports nested objects natively via AnyValueMap
169      // Cast through unknown since our nested objects are structurally compatible
170      // with AnyValue but TS doesn't recognize it due to missing index signatures
171      const attributes = {
172        event_name: eventName,
173        event_id: randomUUID(),
174        // Pass objects directly - no JSON serialization needed
175        core_metadata: coreMetadata,
176        user_metadata: getCoreUserData(true),
177        event_metadata: metadata,
178      } as unknown as AnyValueMap
179  
180      // Add user_id if available
181      const userId = getOrCreateUserID()
182      if (userId) {
183        attributes.user_id = userId
184      }
185  
186      // Debug logging when debug mode is enabled
187      if (process.env.USER_TYPE === 'ant') {
188        logForDebugging(
189          `[ANT-ONLY] 1P event: ${eventName} ${jsonStringify(metadata, null, 0)}`,
190        )
191      }
192  
193      // Emit log record
194      firstPartyEventLogger.emit({
195        body: eventName,
196        attributes,
197      })
198    } catch (e) {
199      if (process.env.NODE_ENV === 'development') {
200        throw e
201      }
202      if (process.env.USER_TYPE === 'ant') {
203        logError(e as Error)
204      }
205      // swallow
206    }
207  }
208  
209  /**
210   * Log a 1st-party event for internal analytics.
211   * Events are batched and exported to /api/event_logging/batch
212   *
213   * @param eventName - Name of the event (e.g., 'tengu_api_query')
214   * @param metadata - Additional metadata for the event (intentionally no strings, to avoid accidentally logging code/filepaths)
215   */
216  export function logEventTo1P(
217    eventName: string,
218    metadata: Record<string, number | boolean | undefined> = {},
219  ): void {
220    if (!is1PEventLoggingEnabled()) {
221      return
222    }
223  
224    if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
225      return
226    }
227  
228    // Fire and forget - don't block on metadata enrichment
229    void logEventTo1PAsync(firstPartyEventLogger, eventName, metadata)
230  }
231  
232  /**
233   * GrowthBook experiment event data for logging
234   */
235  export type GrowthBookExperimentData = {
236    experimentId: string
237    variationId: number
238    userAttributes?: GrowthBookUserAttributes
239    experimentMetadata?: Record<string, unknown>
240  }
241  
242  // api.anthropic.com only serves the "production" GrowthBook environment
243  // (see starling/starling/cli/cli.py DEFAULT_ENVIRONMENTS). Staging and
244  // development environments are not exported to the prod API.
245  function getEnvironmentForGrowthBook(): string {
246    return 'production'
247  }
248  
249  /**
250   * Log a GrowthBook experiment assignment event to 1P.
251   * Events are batched and exported to /api/event_logging/batch
252   *
253   * @param data - GrowthBook experiment assignment data
254   */
255  export function logGrowthBookExperimentTo1P(
256    data: GrowthBookExperimentData,
257  ): void {
258    if (!is1PEventLoggingEnabled()) {
259      return
260    }
261  
262    if (!firstPartyEventLogger || isSinkKilled('firstParty')) {
263      return
264    }
265  
266    const userId = getOrCreateUserID()
267    const { accountUuid, organizationUuid } = getCoreUserData(true)
268  
269    // Build attributes for GrowthbookExperimentEvent
270    const attributes = {
271      event_type: 'GrowthbookExperimentEvent',
272      event_id: randomUUID(),
273      experiment_id: data.experimentId,
274      variation_id: data.variationId,
275      ...(userId && { device_id: userId }),
276      ...(accountUuid && { account_uuid: accountUuid }),
277      ...(organizationUuid && { organization_uuid: organizationUuid }),
278      ...(data.userAttributes && {
279        session_id: data.userAttributes.sessionId,
280        user_attributes: jsonStringify(data.userAttributes),
281      }),
282      ...(data.experimentMetadata && {
283        experiment_metadata: jsonStringify(data.experimentMetadata),
284      }),
285      environment: getEnvironmentForGrowthBook(),
286    }
287  
288    if (process.env.USER_TYPE === 'ant') {
289      logForDebugging(
290        `[ANT-ONLY] 1P GrowthBook experiment: ${data.experimentId} variation=${data.variationId}`,
291      )
292    }
293  
294    firstPartyEventLogger.emit({
295      body: 'growthbook_experiment',
296      attributes,
297    })
298  }
299  
300  const DEFAULT_LOGS_EXPORT_INTERVAL_MS = 10000
301  const DEFAULT_MAX_EXPORT_BATCH_SIZE = 200
302  const DEFAULT_MAX_QUEUE_SIZE = 8192
303  
304  /**
305   * Initialize 1P event logging infrastructure.
306   * This creates a separate LoggerProvider for internal event logging,
307   * independent of customer OTLP telemetry.
308   *
309   * This uses its own minimal resource configuration with just the attributes
310   * we need for internal analytics (service name, version, platform info).
311   */
312  export function initialize1PEventLogging(): void {
313    profileCheckpoint('1p_event_logging_start')
314    const enabled = is1PEventLoggingEnabled()
315  
316    if (!enabled) {
317      if (process.env.USER_TYPE === 'ant') {
318        logForDebugging('1P event logging not enabled')
319      }
320      return
321    }
322  
323    // Fetch batch processor configuration from GrowthBook dynamic config
324    // Uses cached value if available, refreshes in background
325    const batchConfig = getBatchConfig()
326    lastBatchConfig = batchConfig
327    profileCheckpoint('1p_event_after_growthbook_config')
328  
329    const scheduledDelayMillis =
330      batchConfig.scheduledDelayMillis ||
331      parseInt(
332        process.env.OTEL_LOGS_EXPORT_INTERVAL ||
333          DEFAULT_LOGS_EXPORT_INTERVAL_MS.toString(),
334      )
335  
336    const maxExportBatchSize =
337      batchConfig.maxExportBatchSize || DEFAULT_MAX_EXPORT_BATCH_SIZE
338  
339    const maxQueueSize = batchConfig.maxQueueSize || DEFAULT_MAX_QUEUE_SIZE
340  
341    // Build our own resource for 1P event logging with minimal attributes
342    const platform = getPlatform()
343    const attributes: Record<string, string> = {
344      [ATTR_SERVICE_NAME]: 'claude-code',
345      [ATTR_SERVICE_VERSION]: MACRO.VERSION,
346    }
347  
348    // Add WSL-specific attributes if running on WSL
349    if (platform === 'wsl') {
350      const wslVersion = getWslVersion()
351      if (wslVersion) {
352        attributes['wsl.version'] = wslVersion
353      }
354    }
355  
356    const resource = resourceFromAttributes(attributes)
357  
358    // Create a new LoggerProvider with the EventLoggingExporter
359    // NOTE: This is kept separate from customer telemetry logs to ensure
360    // internal events don't leak to customer endpoints and vice versa.
361    // We don't register this globally - it's only used for internal event logging.
362    const eventLoggingExporter = new FirstPartyEventLoggingExporter({
363      maxBatchSize: maxExportBatchSize,
364      skipAuth: batchConfig.skipAuth,
365      maxAttempts: batchConfig.maxAttempts,
366      path: batchConfig.path,
367      baseUrl: batchConfig.baseUrl,
368      isKilled: () => isSinkKilled('firstParty'),
369    })
370    firstPartyEventLoggerProvider = new LoggerProvider({
371      resource,
372      processors: [
373        new BatchLogRecordProcessor(eventLoggingExporter, {
374          scheduledDelayMillis,
375          maxExportBatchSize,
376          maxQueueSize,
377        }),
378      ],
379    })
380  
381    // Initialize event logger from our internal provider (NOT from global API)
382    // IMPORTANT: We must get the logger from our local provider, not logs.getLogger()
383    // because logs.getLogger() returns a logger from the global provider, which is
384    // separate and used for customer telemetry.
385    firstPartyEventLogger = firstPartyEventLoggerProvider.getLogger(
386      'com.anthropic.claude_code.events',
387      MACRO.VERSION,
388    )
389  }
390  
391  /**
392   * Rebuild the 1P event logging pipeline if the batch config changed.
393   * Register this with onGrowthBookRefresh so long-running sessions pick up
394   * changes to batch size, delay, endpoint, etc.
395   *
396   * Event-loss safety:
397   * 1. Null the logger first — concurrent logEventTo1P() calls hit the
398   *    !firstPartyEventLogger guard and bail during the swap window. This drops
399   *    a handful of events but prevents emitting to a draining provider.
400   * 2. forceFlush() drains the old BatchLogRecordProcessor buffer to the
401   *    exporter. Export failures go to disk at getCurrentBatchFilePath() which
402   *    is keyed by module-level BATCH_UUID + sessionId — unchanged across
403   *    reinit — so the NEW exporter's disk-backed retry picks them up.
404   * 3. Swap to new provider/logger; old provider shutdown runs in background
405   *    (buffer already drained, just cleanup).
406   */
407  export async function reinitialize1PEventLoggingIfConfigChanged(): Promise<void> {
408    if (!is1PEventLoggingEnabled() || !firstPartyEventLoggerProvider) {
409      return
410    }
411  
412    const newConfig = getBatchConfig()
413  
414    if (isEqual(newConfig, lastBatchConfig)) {
415      return
416    }
417  
418    if (process.env.USER_TYPE === 'ant') {
419      logForDebugging(
420        `1P event logging: ${BATCH_CONFIG_NAME} changed, reinitializing`,
421      )
422    }
423  
424    const oldProvider = firstPartyEventLoggerProvider
425    const oldLogger = firstPartyEventLogger
426    firstPartyEventLogger = null
427  
428    try {
429      await oldProvider.forceFlush()
430    } catch {
431      // Export failures are already on disk; new exporter will retry them.
432    }
433  
434    firstPartyEventLoggerProvider = null
435    try {
436      initialize1PEventLogging()
437    } catch (e) {
438      // Restore so the next GrowthBook refresh can retry. oldProvider was
439      // only forceFlush()'d, not shut down — it's still functional. Without
440      // this, both stay null and the !firstPartyEventLoggerProvider gate at
441      // the top makes recovery impossible.
442      firstPartyEventLoggerProvider = oldProvider
443      firstPartyEventLogger = oldLogger
444      logError(e)
445      return
446    }
447  
448    void oldProvider.shutdown().catch(() => {})
449  }