/ services / voiceStreamSTT.ts
voiceStreamSTT.ts
  1  // Anthropic voice_stream speech-to-text client for push-to-talk.
  2  //
  3  // Only reachable in ant builds (gated by feature('VOICE_MODE') in useVoice.ts import).
  4  //
  5  // Connects to Anthropic's voice_stream WebSocket endpoint using the same
  6  // OAuth credentials as Claude Code.  The endpoint uses conversation_engine
  7  // backed models for speech-to-text.  Designed for hold-to-talk: hold the
  8  // keybinding to record, release to stop and submit.
  9  //
 10  // The wire protocol uses JSON control messages (KeepAlive, CloseStream) and
 11  // binary audio frames.  The server responds with TranscriptText and
 12  // TranscriptEndpoint JSON messages.
 13  
 14  import type { ClientRequest, IncomingMessage } from 'http'
 15  import WebSocket from 'ws'
 16  import { getOauthConfig } from '../constants/oauth.js'
 17  import {
 18    checkAndRefreshOAuthTokenIfNeeded,
 19    getClaudeAIOAuthTokens,
 20    isAnthropicAuthEnabled,
 21  } from '../utils/auth.js'
 22  import { logForDebugging } from '../utils/debug.js'
 23  import { getUserAgent } from '../utils/http.js'
 24  import { logError } from '../utils/log.js'
 25  import { getWebSocketTLSOptions } from '../utils/mtls.js'
 26  import { getWebSocketProxyAgent, getWebSocketProxyUrl } from '../utils/proxy.js'
 27  import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
 28  
 29  const KEEPALIVE_MSG = '{"type":"KeepAlive"}'
 30  const CLOSE_STREAM_MSG = '{"type":"CloseStream"}'
 31  
 32  import { getFeatureValue_CACHED_MAY_BE_STALE } from './analytics/growthbook.js'
 33  
 34  // ─── Constants ───────────────────────────────────────────────────────
 35  
 36  const VOICE_STREAM_PATH = '/api/ws/speech_to_text/voice_stream'
 37  
 38  const KEEPALIVE_INTERVAL_MS = 8_000
 39  
 40  // finalize() resolution timers. `noData` fires when no TranscriptText
 41  // arrives post-CloseStream — the server has nothing; don't wait out the
 42  // full ~3-5s WS teardown to confirm emptiness. `safety` is the last-
 43  // resort cap if the WS hangs. Exported so tests can shorten them.
 44  export const FINALIZE_TIMEOUTS_MS = {
 45    safety: 5_000,
 46    noData: 1_500,
 47  }
 48  
 49  // ─── Types ──────────────────────────────────────────────────────────
 50  
 51  export type VoiceStreamCallbacks = {
 52    onTranscript: (text: string, isFinal: boolean) => void
 53    onError: (error: string, opts?: { fatal?: boolean }) => void
 54    onClose: () => void
 55    onReady: (connection: VoiceStreamConnection) => void
 56  }
 57  
 58  // How finalize() resolved. `no_data_timeout` means zero server messages
 59  // after CloseStream — the silent-drop signature (anthropics/anthropic#287008).
 60  export type FinalizeSource =
 61    | 'post_closestream_endpoint'
 62    | 'no_data_timeout'
 63    | 'safety_timeout'
 64    | 'ws_close'
 65    | 'ws_already_closed'
 66  
 67  export type VoiceStreamConnection = {
 68    send: (audioChunk: Buffer) => void
 69    finalize: () => Promise<FinalizeSource>
 70    close: () => void
 71    isConnected: () => boolean
 72  }
 73  
 74  // The voice_stream endpoint returns transcript chunks and endpoint markers.
 75  type VoiceStreamTranscriptText = {
 76    type: 'TranscriptText'
 77    data: string
 78  }
 79  
 80  type VoiceStreamTranscriptEndpoint = {
 81    type: 'TranscriptEndpoint'
 82  }
 83  
 84  type VoiceStreamTranscriptError = {
 85    type: 'TranscriptError'
 86    error_code?: string
 87    description?: string
 88  }
 89  
 90  type VoiceStreamMessage =
 91    | VoiceStreamTranscriptText
 92    | VoiceStreamTranscriptEndpoint
 93    | VoiceStreamTranscriptError
 94    | { type: 'error'; message?: string }
 95  
 96  // ─── Availability ──────────────────────────────────────────────────────
 97  
 98  export function isVoiceStreamAvailable(): boolean {
 99    // voice_stream uses the same OAuth as Claude Code — available when the
100    // user is authenticated with Anthropic (Claude.ai subscriber or has
101    // valid OAuth tokens).
102    if (!isAnthropicAuthEnabled()) {
103      return false
104    }
105    const tokens = getClaudeAIOAuthTokens()
106    return tokens !== null && tokens.accessToken !== null
107  }
108  
109  // ─── Connection ────────────────────────────────────────────────────────
110  
111  export async function connectVoiceStream(
112    callbacks: VoiceStreamCallbacks,
113    options?: { language?: string; keyterms?: string[] },
114  ): Promise<VoiceStreamConnection | null> {
115    // Ensure OAuth token is fresh before connecting
116    await checkAndRefreshOAuthTokenIfNeeded()
117  
118    const tokens = getClaudeAIOAuthTokens()
119    if (!tokens?.accessToken) {
120      logForDebugging('[voice_stream] No OAuth token available')
121      return null
122    }
123  
124    // voice_stream is a private_api route, but /api/ws/ is also exposed on
125    // the api.anthropic.com listener (service_definitions.yaml private-api:
126    // visibility.external: true). We target that host instead of claude.ai
127    // because the claude.ai CF zone uses TLS fingerprinting and challenges
128    // non-browser clients (anthropics/claude-code#34094). Same private-api
129    // pod, same OAuth Bearer auth — just a CF zone that doesn't block us.
130    // Desktop dictation still uses claude.ai (Swift URLSession has a
131    // browser-class JA3 fingerprint, so CF lets it through).
132    const wsBaseUrl =
133      process.env.VOICE_STREAM_BASE_URL ||
134      getOauthConfig()
135        .BASE_API_URL.replace('https://', 'wss://')
136        .replace('http://', 'ws://')
137  
138    if (process.env.VOICE_STREAM_BASE_URL) {
139      logForDebugging(
140        `[voice_stream] Using VOICE_STREAM_BASE_URL override: ${process.env.VOICE_STREAM_BASE_URL}`,
141      )
142    }
143  
144    const params = new URLSearchParams({
145      encoding: 'linear16',
146      sample_rate: '16000',
147      channels: '1',
148      endpointing_ms: '300',
149      utterance_end_ms: '1000',
150      language: options?.language ?? 'en',
151    })
152  
153    // Route through conversation-engine with Deepgram Nova 3 (bypassing
154    // the server's project_bell_v2_config GrowthBook gate). The server
155    // side is anthropics/anthropic#278327 + #281372; this lets us ramp
156    // clients independently.
157    const isNova3 = getFeatureValue_CACHED_MAY_BE_STALE(
158      'tengu_cobalt_frost',
159      false,
160    )
161    if (isNova3) {
162      params.set('use_conversation_engine', 'true')
163      params.set('stt_provider', 'deepgram-nova3')
164      logForDebugging('[voice_stream] Nova 3 gate enabled (tengu_cobalt_frost)')
165    }
166  
167    // Append keyterms as query params — the voice_stream proxy forwards
168    // these to the STT service which applies appropriate boosting.
169    if (options?.keyterms?.length) {
170      for (const term of options.keyterms) {
171        params.append('keyterms', term)
172      }
173    }
174  
175    const url = `${wsBaseUrl}${VOICE_STREAM_PATH}?${params.toString()}`
176  
177    logForDebugging(`[voice_stream] Connecting to ${url}`)
178  
179    const headers: Record<string, string> = {
180      Authorization: `Bearer ${tokens.accessToken}`,
181      'User-Agent': getUserAgent(),
182      'x-app': 'cli',
183    }
184  
185    const tlsOptions = getWebSocketTLSOptions()
186    const wsOptions =
187      typeof Bun !== 'undefined'
188        ? {
189            headers,
190            proxy: getWebSocketProxyUrl(url),
191            tls: tlsOptions || undefined,
192          }
193        : { headers, agent: getWebSocketProxyAgent(url), ...tlsOptions }
194  
195    const ws = new WebSocket(url, wsOptions)
196  
197    let keepaliveTimer: ReturnType<typeof setInterval> | null = null
198    let connected = false
199    // Set to true once CloseStream has been sent (or the ws is closed).
200    // After this, further audio sends are dropped.
201    let finalized = false
202    // Set to true when finalize() is first called, to prevent double-fire.
203    let finalizing = false
204    // Set when the HTTP upgrade was rejected (unexpected-response). The
205    // close event that follows (1006 from our req.destroy()) is just
206    // mechanical teardown; the upgrade handler already reported the error.
207    let upgradeRejected = false
208    // Resolves finalize(). Four triggers: TranscriptEndpoint post-CloseStream
209    // (~300ms); no-data timer (1.5s); WS close (~3-5s); safety timer (5s).
210    let resolveFinalize: ((source: FinalizeSource) => void) | null = null
211    let cancelNoDataTimer: (() => void) | null = null
212  
213    // Define the connection object before event handlers so it can be passed
214    // to onReady when the WebSocket opens.
215    const connection: VoiceStreamConnection = {
216      send(audioChunk: Buffer): void {
217        if (ws.readyState !== WebSocket.OPEN) {
218          return
219        }
220        if (finalized) {
221          // After CloseStream has been sent, the server rejects further audio.
222          // Drop the chunk to avoid a protocol error.
223          logForDebugging(
224            `[voice_stream] Dropping audio chunk after CloseStream: ${String(audioChunk.length)} bytes`,
225          )
226          return
227        }
228        logForDebugging(
229          `[voice_stream] Sending audio chunk: ${String(audioChunk.length)} bytes`,
230        )
231        // Copy the buffer before sending: NAPI Buffer objects from native
232        // modules may share a pooled ArrayBuffer.  Creating a view with
233        // `new Uint8Array(buf.buffer, offset, len)` can reference stale or
234        // overlapping memory by the time the ws library reads it.
235        // `Buffer.from()` makes an owned copy that the ws library can safely
236        // consume as a binary WebSocket frame.
237        ws.send(Buffer.from(audioChunk))
238      },
239      finalize(): Promise<FinalizeSource> {
240        if (finalizing || finalized) {
241          // Already finalized or WebSocket already closed — resolve immediately.
242          return Promise.resolve('ws_already_closed')
243        }
244        finalizing = true
245  
246        return new Promise<FinalizeSource>(resolve => {
247          const safetyTimer = setTimeout(
248            () => resolveFinalize?.('safety_timeout'),
249            FINALIZE_TIMEOUTS_MS.safety,
250          )
251          const noDataTimer = setTimeout(
252            () => resolveFinalize?.('no_data_timeout'),
253            FINALIZE_TIMEOUTS_MS.noData,
254          )
255          cancelNoDataTimer = () => {
256            clearTimeout(noDataTimer)
257            cancelNoDataTimer = null
258          }
259  
260          resolveFinalize = (source: FinalizeSource) => {
261            clearTimeout(safetyTimer)
262            clearTimeout(noDataTimer)
263            resolveFinalize = null
264            cancelNoDataTimer = null
265            // Legacy Deepgram can leave an interim in lastTranscriptText
266            // with no TranscriptEndpoint (websocket_manager.py sends
267            // TranscriptChunk and TranscriptEndpoint as independent
268            // channel items). All resolve triggers must promote it;
269            // centralize here. No-op when the close handler already did.
270            if (lastTranscriptText) {
271              logForDebugging(
272                `[voice_stream] Promoting unreported interim before ${source} resolve`,
273              )
274              const t = lastTranscriptText
275              lastTranscriptText = ''
276              callbacks.onTranscript(t, true)
277            }
278            logForDebugging(`[voice_stream] Finalize resolved via ${source}`)
279            resolve(source)
280          }
281  
282          // If the WebSocket is already closed, resolve immediately.
283          if (
284            ws.readyState === WebSocket.CLOSED ||
285            ws.readyState === WebSocket.CLOSING
286          ) {
287            resolveFinalize('ws_already_closed')
288            return
289          }
290  
291          // Defer CloseStream to the next event-loop iteration so any audio
292          // callbacks already queued by the native recording module are flushed
293          // to the WebSocket before the server is told to stop accepting audio.
294          // Without this, stopRecording() can return synchronously while the
295          // native module still has a pending onData callback in the event queue,
296          // causing audio to arrive after CloseStream.
297          setTimeout(() => {
298            finalized = true
299            if (ws.readyState === WebSocket.OPEN) {
300              logForDebugging('[voice_stream] Sending CloseStream (finalize)')
301              ws.send(CLOSE_STREAM_MSG)
302            }
303          }, 0)
304        })
305      },
306      close(): void {
307        finalized = true
308        if (keepaliveTimer) {
309          clearInterval(keepaliveTimer)
310          keepaliveTimer = null
311        }
312        connected = false
313        if (ws.readyState === WebSocket.OPEN) {
314          ws.close()
315        }
316      },
317      isConnected(): boolean {
318        return connected && ws.readyState === WebSocket.OPEN
319      },
320    }
321  
322    ws.on('open', () => {
323      logForDebugging('[voice_stream] WebSocket connected')
324      connected = true
325  
326      // Send an immediate KeepAlive so the server knows the client is active.
327      // Audio hardware initialisation can take >1s, so this prevents the
328      // server from closing the connection before audio capture starts.
329      logForDebugging('[voice_stream] Sending initial KeepAlive')
330      ws.send(KEEPALIVE_MSG)
331  
332      // Send periodic keepalive to prevent idle timeout
333      keepaliveTimer = setInterval(
334        ws => {
335          if (ws.readyState === WebSocket.OPEN) {
336            logForDebugging('[voice_stream] Sending periodic KeepAlive')
337            ws.send(KEEPALIVE_MSG)
338          }
339        },
340        KEEPALIVE_INTERVAL_MS,
341        ws,
342      )
343  
344      // Pass the connection to the caller so it can start sending audio.
345      // This fires only after the WebSocket is truly open, guaranteeing
346      // that send() calls will not be silently dropped.
347      callbacks.onReady(connection)
348    })
349  
350    // Track the last TranscriptText so that when TranscriptEndpoint arrives
351    // we can emit it as the final transcript.  The server sometimes sends
352    // multiple non-cumulative TranscriptText messages without endpoints
353    // between them; the TranscriptText handler auto-finalizes previous
354    // segments when it detects the text has changed non-cumulatively.
355    let lastTranscriptText = ''
356  
357    ws.on('message', (raw: Buffer | string) => {
358      const text = raw.toString()
359      logForDebugging(
360        `[voice_stream] Message received (${String(text.length)} chars): ${text.slice(0, 200)}`,
361      )
362      let msg: VoiceStreamMessage
363      try {
364        msg = jsonParse(text) as VoiceStreamMessage
365      } catch {
366        return
367      }
368  
369      switch (msg.type) {
370        case 'TranscriptText': {
371          const transcript = msg.data
372          logForDebugging(`[voice_stream] TranscriptText: "${transcript ?? ''}"`)
373          // Data arrived after CloseStream — disarm the no-data timer so
374          // a slow-but-real flush isn't cut off. Only disarm once finalized
375          // (CloseStream sent); pre-CloseStream data racing the deferred
376          // send would cancel the timer prematurely, falling back to the
377          // slower 5s safety timeout instead of the 1.5s no-data timer.
378          if (finalized) {
379            cancelNoDataTimer?.()
380          }
381          if (transcript) {
382            // Detect when the server has moved to a new speech segment.
383            // Progressive refinements extend or shorten the previous text
384            // (e.g., "hello" → "hello world", or "hello wor" → "hello wo").
385            // A new segment starts with completely different text (neither
386            // is a prefix of the other). When detected, emit the previous
387            // text as final so the caller can accumulate it, preventing
388            // the new segment from overwriting and losing the old one.
389            //
390            // Nova 3's interims are cumulative across segments AND can
391            // revise earlier text ("Hello?" → "Hello."). Revision breaks
392            // the prefix check, causing false auto-finalize → the same
393            // text committed once AND re-appearing in the cumulative
394            // interim = duplication. Nova 3 only endpoints on the final
395            // flush, so auto-finalize is never correct for it.
396            if (!isNova3 && lastTranscriptText) {
397              const prev = lastTranscriptText.trimStart()
398              const next = transcript.trimStart()
399              if (
400                prev &&
401                next &&
402                !next.startsWith(prev) &&
403                !prev.startsWith(next)
404              ) {
405                logForDebugging(
406                  `[voice_stream] Auto-finalizing previous segment (new segment detected): "${lastTranscriptText}"`,
407                )
408                callbacks.onTranscript(lastTranscriptText, true)
409              }
410            }
411            lastTranscriptText = transcript
412            // Emit as interim so the caller can show a live preview.
413            callbacks.onTranscript(transcript, false)
414          }
415          break
416        }
417        case 'TranscriptEndpoint': {
418          logForDebugging(
419            `[voice_stream] TranscriptEndpoint received, lastTranscriptText="${lastTranscriptText}"`,
420          )
421          // The server signals the end of an utterance.  Emit the last
422          // TranscriptText as a final transcript so the caller can commit it.
423          const finalText = lastTranscriptText
424          lastTranscriptText = ''
425          if (finalText) {
426            callbacks.onTranscript(finalText, true)
427          }
428          // When TranscriptEndpoint arrives after CloseStream was sent,
429          // the server has flushed its final transcript — nothing more is
430          // coming.  Resolve finalize now so the caller reads the
431          // accumulated buffer immediately (~300ms) instead of waiting
432          // for the WebSocket close event (~3-5s of server teardown).
433          // `finalized` (not `finalizing`) is the right gate: it flips
434          // inside the setTimeout(0) that actually sends CloseStream, so
435          // a TranscriptEndpoint that races the deferred send still waits.
436          if (finalized) {
437            resolveFinalize?.('post_closestream_endpoint')
438          }
439          break
440        }
441        case 'TranscriptError': {
442          const desc =
443            msg.description ?? msg.error_code ?? 'unknown transcription error'
444          logForDebugging(`[voice_stream] TranscriptError: ${desc}`)
445          if (!finalizing) {
446            callbacks.onError(desc)
447          }
448          break
449        }
450        case 'error': {
451          const errorDetail = msg.message ?? jsonStringify(msg)
452          logForDebugging(`[voice_stream] Server error: ${errorDetail}`)
453          if (!finalizing) {
454            callbacks.onError(errorDetail)
455          }
456          break
457        }
458        default:
459          break
460      }
461    })
462  
463    ws.on('close', (code, reason) => {
464      const reasonStr = reason?.toString() ?? ''
465      logForDebugging(
466        `[voice_stream] WebSocket closed: code=${String(code)} reason="${reasonStr}"`,
467      )
468      connected = false
469      if (keepaliveTimer) {
470        clearInterval(keepaliveTimer)
471        keepaliveTimer = null
472      }
473      // If the server closed the connection before sending TranscriptEndpoint,
474      // promote the last interim transcript to final so no text is lost.
475      if (lastTranscriptText) {
476        logForDebugging(
477          '[voice_stream] Promoting unreported interim transcript to final on close',
478        )
479        const finalText = lastTranscriptText
480        lastTranscriptText = ''
481        callbacks.onTranscript(finalText, true)
482      }
483      // During finalize, suppress onError — the session already delivered
484      // whatever it had. useVoice's onError path wipes accumulatedRef,
485      // which would destroy the transcript before the finalize .then()
486      // reads it. `finalizing` (not resolveFinalize) is the gate: set once
487      // at finalize() entry, never cleared, so it stays accurate after the
488      // fast path or a timer already resolved.
489      resolveFinalize?.('ws_close')
490      if (!finalizing && !upgradeRejected && code !== 1000 && code !== 1005) {
491        callbacks.onError(
492          `Connection closed: code ${String(code)}${reasonStr ? ` — ${reasonStr}` : ''}`,
493        )
494      }
495      callbacks.onClose()
496    })
497  
498    // The ws library fires 'unexpected-response' when the HTTP upgrade
499    // returns a non-101 status. Listening lets us surface the actual status
500    // and flag 4xx as fatal (same token/TLS fingerprint won't change on
501    // retry). With a listener registered, ws does NOT abort on our behalf —
502    // we destroy the request; 'error' does not fire, 'close' does (suppressed
503    // via upgradeRejected above).
504    //
505    // Bun's ws shim historically didn't implement this event (a warning
506    // is logged once at registration). Under Bun a non-101 upgrade falls
507    // through to the generic 'error' + 'close' 1002 path with no recoverable
508    // status; the attemptGenRef guard in useVoice.ts still surfaces the
509    // retry-attempt failure, the user just sees "Expected 101 status code"
510    // instead of "HTTP 503". No harm — the gen fix is the load-bearing part.
511    ws.on('unexpected-response', (req: ClientRequest, res: IncomingMessage) => {
512      const status = res.statusCode ?? 0
513      // Bun's ws implementation on Windows can fire this event for a
514      // successful 101 Switching Protocols response (anthropics/claude-code#40510).
515      // 101 is never a rejection — bail before we destroy a working upgrade.
516      if (status === 101) {
517        logForDebugging(
518          '[voice_stream] unexpected-response fired with 101; ignoring',
519        )
520        return
521      }
522      logForDebugging(
523        `[voice_stream] Upgrade rejected: status=${String(status)} cf-mitigated=${String(res.headers['cf-mitigated'])} cf-ray=${String(res.headers['cf-ray'])}`,
524      )
525      upgradeRejected = true
526      res.resume()
527      req.destroy()
528      if (finalizing) return
529      callbacks.onError(
530        `WebSocket upgrade rejected with HTTP ${String(status)}`,
531        { fatal: status >= 400 && status < 500 },
532      )
533    })
534  
535    ws.on('error', (err: Error) => {
536      logError(err)
537      logForDebugging(`[voice_stream] WebSocket error: ${err.message}`)
538      if (!finalizing) {
539        callbacks.onError(`Voice stream connection error: ${err.message}`)
540      }
541    })
542  
543    return connection
544  }