/ utils / telemetry / bigqueryExporter.ts
bigqueryExporter.ts
  1  import type { Attributes, HrTime } from '@opentelemetry/api'
  2  import { type ExportResult, ExportResultCode } from '@opentelemetry/core'
  3  import {
  4    AggregationTemporality,
  5    type MetricData,
  6    type DataPoint as OTelDataPoint,
  7    type PushMetricExporter,
  8    type ResourceMetrics,
  9  } from '@opentelemetry/sdk-metrics'
 10  import axios from 'axios'
 11  import { checkMetricsEnabled } from 'src/services/api/metricsOptOut.js'
 12  import { getIsNonInteractiveSession } from '../../bootstrap/state.js'
 13  import { getSubscriptionType, isClaudeAISubscriber } from '../auth.js'
 14  import { checkHasTrustDialogAccepted } from '../config.js'
 15  import { logForDebugging } from '../debug.js'
 16  import { errorMessage, toError } from '../errors.js'
 17  import { getAuthHeaders } from '../http.js'
 18  import { logError } from '../log.js'
 19  import { jsonStringify } from '../slowOperations.js'
 20  import { getClaudeCodeUserAgent } from '../userAgent.js'
 21  
 22  type DataPoint = {
 23    attributes: Record<string, string>
 24    value: number
 25    timestamp: string
 26  }
 27  
 28  type Metric = {
 29    name: string
 30    description?: string
 31    unit?: string
 32    data_points: DataPoint[]
 33  }
 34  
 35  type InternalMetricsPayload = {
 36    resource_attributes: Record<string, string>
 37    metrics: Metric[]
 38  }
 39  
 40  export class BigQueryMetricsExporter implements PushMetricExporter {
 41    private readonly endpoint: string
 42    private readonly timeout: number
 43    private pendingExports: Promise<void>[] = []
 44    private isShutdown = false
 45  
 46    constructor(options: { timeout?: number } = {}) {
 47      const defaultEndpoint = 'https://api.anthropic.com/api/claude_code/metrics'
 48  
 49      if (
 50        process.env.USER_TYPE === 'ant' &&
 51        process.env.ANT_CLAUDE_CODE_METRICS_ENDPOINT
 52      ) {
 53        this.endpoint =
 54          process.env.ANT_CLAUDE_CODE_METRICS_ENDPOINT +
 55          '/api/claude_code/metrics'
 56      } else {
 57        this.endpoint = defaultEndpoint
 58      }
 59  
 60      this.timeout = options.timeout || 5000
 61    }
 62  
 63    async export(
 64      metrics: ResourceMetrics,
 65      resultCallback: (result: ExportResult) => void,
 66    ): Promise<void> {
 67      if (this.isShutdown) {
 68        resultCallback({
 69          code: ExportResultCode.FAILED,
 70          error: new Error('Exporter has been shutdown'),
 71        })
 72        return
 73      }
 74  
 75      const exportPromise = this.doExport(metrics, resultCallback)
 76      this.pendingExports.push(exportPromise)
 77  
 78      // Clean up completed exports
 79      void exportPromise.finally(() => {
 80        const index = this.pendingExports.indexOf(exportPromise)
 81        if (index > -1) {
 82          void this.pendingExports.splice(index, 1)
 83        }
 84      })
 85    }
 86  
 87    private async doExport(
 88      metrics: ResourceMetrics,
 89      resultCallback: (result: ExportResult) => void,
 90    ): Promise<void> {
 91      try {
 92        // Skip if trust not established in interactive mode
 93        // This prevents triggering apiKeyHelper before trust dialog
 94        const hasTrust =
 95          checkHasTrustDialogAccepted() || getIsNonInteractiveSession()
 96        if (!hasTrust) {
 97          logForDebugging(
 98            'BigQuery metrics export: trust not established, skipping',
 99          )
100          resultCallback({ code: ExportResultCode.SUCCESS })
101          return
102        }
103  
104        // Check organization-level metrics opt-out
105        const metricsStatus = await checkMetricsEnabled()
106        if (!metricsStatus.enabled) {
107          logForDebugging('Metrics export disabled by organization setting')
108          resultCallback({ code: ExportResultCode.SUCCESS })
109          return
110        }
111  
112        const payload = this.transformMetricsForInternal(metrics)
113  
114        const authResult = getAuthHeaders()
115        if (authResult.error) {
116          logForDebugging(`Metrics export failed: ${authResult.error}`)
117          resultCallback({
118            code: ExportResultCode.FAILED,
119            error: new Error(authResult.error),
120          })
121          return
122        }
123  
124        const headers: Record<string, string> = {
125          'Content-Type': 'application/json',
126          'User-Agent': getClaudeCodeUserAgent(),
127          ...authResult.headers,
128        }
129  
130        const response = await axios.post(this.endpoint, payload, {
131          timeout: this.timeout,
132          headers,
133        })
134  
135        logForDebugging('BigQuery metrics exported successfully')
136        logForDebugging(
137          `BigQuery API Response: ${jsonStringify(response.data, null, 2)}`,
138        )
139        resultCallback({ code: ExportResultCode.SUCCESS })
140      } catch (error) {
141        logForDebugging(`BigQuery metrics export failed: ${errorMessage(error)}`)
142        logError(error)
143        resultCallback({
144          code: ExportResultCode.FAILED,
145          error: toError(error),
146        })
147      }
148    }
149  
150    private transformMetricsForInternal(
151      metrics: ResourceMetrics,
152    ): InternalMetricsPayload {
153      const attrs = metrics.resource.attributes
154  
155      const resourceAttributes: Record<string, string> = {
156        'service.name': (attrs['service.name'] as string) || 'claude-code',
157        'service.version': (attrs['service.version'] as string) || 'unknown',
158        'os.type': (attrs['os.type'] as string) || 'unknown',
159        'os.version': (attrs['os.version'] as string) || 'unknown',
160        'host.arch': (attrs['host.arch'] as string) || 'unknown',
161        'aggregation.temporality':
162          this.selectAggregationTemporality() === AggregationTemporality.DELTA
163            ? 'delta'
164            : 'cumulative',
165      }
166  
167      // Only add wsl.version if it exists (omit instead of default)
168      if (attrs['wsl.version']) {
169        resourceAttributes['wsl.version'] = attrs['wsl.version'] as string
170      }
171  
172      // Add customer type and subscription type
173      if (isClaudeAISubscriber()) {
174        resourceAttributes['user.customer_type'] = 'claude_ai'
175        const subscriptionType = getSubscriptionType()
176        if (subscriptionType) {
177          resourceAttributes['user.subscription_type'] = subscriptionType
178        }
179      } else {
180        resourceAttributes['user.customer_type'] = 'api'
181      }
182  
183      const transformed = {
184        resource_attributes: resourceAttributes,
185        metrics: metrics.scopeMetrics.flatMap(scopeMetric =>
186          scopeMetric.metrics.map(metric => ({
187            name: metric.descriptor.name,
188            description: metric.descriptor.description,
189            unit: metric.descriptor.unit,
190            data_points: this.extractDataPoints(metric),
191          })),
192        ),
193      }
194  
195      return transformed
196    }
197  
198    private extractDataPoints(metric: MetricData): DataPoint[] {
199      const dataPoints = metric.dataPoints || []
200  
201      return dataPoints
202        .filter(
203          (point): point is OTelDataPoint<number> =>
204            typeof point.value === 'number',
205        )
206        .map(point => ({
207          attributes: this.convertAttributes(point.attributes),
208          value: point.value,
209          timestamp: this.hrTimeToISOString(
210            point.endTime || point.startTime || [Date.now() / 1000, 0],
211          ),
212        }))
213    }
214  
215    async shutdown(): Promise<void> {
216      this.isShutdown = true
217      await this.forceFlush()
218      logForDebugging('BigQuery metrics exporter shutdown complete')
219    }
220  
221    async forceFlush(): Promise<void> {
222      await Promise.all(this.pendingExports)
223      logForDebugging('BigQuery metrics exporter flush complete')
224    }
225  
226    private convertAttributes(
227      attributes: Attributes | undefined,
228    ): Record<string, string> {
229      const result: Record<string, string> = {}
230      if (attributes) {
231        for (const [key, value] of Object.entries(attributes)) {
232          if (value !== undefined && value !== null) {
233            result[key] = String(value)
234          }
235        }
236      }
237      return result
238    }
239  
240    private hrTimeToISOString(hrTime: HrTime): string {
241      const [seconds, nanoseconds] = hrTime
242      const date = new Date(seconds * 1000 + nanoseconds / 1000000)
243      return date.toISOString()
244    }
245  
246    selectAggregationTemporality(): AggregationTemporality {
247      // DO NOT CHANGE THIS TO CUMULATIVE
248      // It would mess up the aggregation of metrics
249      // for CC Productivity metrics dashboard
250      return AggregationTemporality.DELTA
251    }
252  }