/ bridge / bridgeMessaging.ts
bridgeMessaging.ts
  1  /**
  2   * Shared transport-layer helpers for bridge message handling.
  3   *
  4   * Extracted from replBridge.ts so both the env-based core (initBridgeCore)
  5   * and the env-less core (initEnvLessBridgeCore) can use the same ingress
  6   * parsing, control-request handling, and echo-dedup machinery.
  7   *
  8   * Everything here is pure — no closure over bridge-specific state. All
  9   * collaborators (transport, sessionId, UUID sets, callbacks) are passed
 10   * as params.
 11   */
 12  
 13  import { randomUUID } from 'crypto'
 14  import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
 15  import type {
 16    SDKControlRequest,
 17    SDKControlResponse,
 18  } from '../entrypoints/sdk/controlTypes.js'
 19  import type { SDKResultSuccess } from '../entrypoints/sdk/coreTypes.js'
 20  import { logEvent } from '../services/analytics/index.js'
 21  import { EMPTY_USAGE } from '../services/api/emptyUsage.js'
 22  import type { Message } from '../types/message.js'
 23  import { normalizeControlMessageKeys } from '../utils/controlMessageCompat.js'
 24  import { logForDebugging } from '../utils/debug.js'
 25  import { stripDisplayTagsAllowEmpty } from '../utils/displayTags.js'
 26  import { errorMessage } from '../utils/errors.js'
 27  import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
 28  import { jsonParse } from '../utils/slowOperations.js'
 29  import type { ReplBridgeTransport } from './replBridgeTransport.js'
 30  
 31  // ─── Type guards ─────────────────────────────────────────────────────────────
 32  
 33  /** Type predicate for parsed WebSocket messages. SDKMessage is a
 34   *  discriminated union on `type` — validating the discriminant is
 35   *  sufficient for the predicate; callers narrow further via the union. */
 36  export function isSDKMessage(value: unknown): value is SDKMessage {
 37    return (
 38      value !== null &&
 39      typeof value === 'object' &&
 40      'type' in value &&
 41      typeof value.type === 'string'
 42    )
 43  }
 44  
 45  /** Type predicate for control_response messages from the server. */
 46  export function isSDKControlResponse(
 47    value: unknown,
 48  ): value is SDKControlResponse {
 49    return (
 50      value !== null &&
 51      typeof value === 'object' &&
 52      'type' in value &&
 53      value.type === 'control_response' &&
 54      'response' in value
 55    )
 56  }
 57  
 58  /** Type predicate for control_request messages from the server. */
 59  export function isSDKControlRequest(
 60    value: unknown,
 61  ): value is SDKControlRequest {
 62    return (
 63      value !== null &&
 64      typeof value === 'object' &&
 65      'type' in value &&
 66      value.type === 'control_request' &&
 67      'request_id' in value &&
 68      'request' in value
 69    )
 70  }
 71  
 72  /**
 73   * True for message types that should be forwarded to the bridge transport.
 74   * The server only wants user/assistant turns and slash-command system events;
 75   * everything else (tool_result, progress, etc.) is internal REPL chatter.
 76   */
 77  export function isEligibleBridgeMessage(m: Message): boolean {
 78    // Virtual messages (REPL inner calls) are display-only — bridge/SDK
 79    // consumers see the REPL tool_use/result which summarizes the work.
 80    if ((m.type === 'user' || m.type === 'assistant') && m.isVirtual) {
 81      return false
 82    }
 83    return (
 84      m.type === 'user' ||
 85      m.type === 'assistant' ||
 86      (m.type === 'system' && m.subtype === 'local_command')
 87    )
 88  }
 89  
 90  /**
 91   * Extract title-worthy text from a Message for onUserMessage. Returns
 92   * undefined for messages that shouldn't title the session: non-user, meta
 93   * (nudges), tool results, compact summaries, non-human origins (task
 94   * notifications, channel messages), or pure display-tag content
 95   * (<ide_opened_file>, <session-start-hook>, etc.).
 96   *
 97   * Synthetic interrupts ([Request interrupted by user]) are NOT filtered here —
 98   * isSyntheticMessage lives in messages.ts (heavy import, pulls command
 99   * registry). The initialMessages path in initReplBridge checks it; the
100   * writeMessages path reaching an interrupt as the *first* message is
101   * implausible (an interrupt implies a prior prompt already flowed through).
102   */
103  export function extractTitleText(m: Message): string | undefined {
104    if (m.type !== 'user' || m.isMeta || m.toolUseResult || m.isCompactSummary)
105      return undefined
106    if (m.origin && m.origin.kind !== 'human') return undefined
107    const content = m.message.content
108    let raw: string | undefined
109    if (typeof content === 'string') {
110      raw = content
111    } else {
112      for (const block of content) {
113        if (block.type === 'text') {
114          raw = block.text
115          break
116        }
117      }
118    }
119    if (!raw) return undefined
120    const clean = stripDisplayTagsAllowEmpty(raw)
121    return clean || undefined
122  }
123  
124  // ─── Ingress routing ─────────────────────────────────────────────────────────
125  
126  /**
127   * Parse an ingress WebSocket message and route it to the appropriate handler.
128   * Ignores messages whose UUID is in recentPostedUUIDs (echoes of what we sent)
129   * or in recentInboundUUIDs (re-deliveries we've already forwarded — e.g.
130   * server replayed history after a transport swap lost the seq-num cursor).
131   */
132  export function handleIngressMessage(
133    data: string,
134    recentPostedUUIDs: BoundedUUIDSet,
135    recentInboundUUIDs: BoundedUUIDSet,
136    onInboundMessage: ((msg: SDKMessage) => void | Promise<void>) | undefined,
137    onPermissionResponse?: ((response: SDKControlResponse) => void) | undefined,
138    onControlRequest?: ((request: SDKControlRequest) => void) | undefined,
139  ): void {
140    try {
141      const parsed: unknown = normalizeControlMessageKeys(jsonParse(data))
142  
143      // control_response is not an SDKMessage — check before the type guard
144      if (isSDKControlResponse(parsed)) {
145        logForDebugging('[bridge:repl] Ingress message type=control_response')
146        onPermissionResponse?.(parsed)
147        return
148      }
149  
150      // control_request from the server (initialize, set_model, can_use_tool).
151      // Must respond promptly or the server kills the WS (~10-14s timeout).
152      if (isSDKControlRequest(parsed)) {
153        logForDebugging(
154          `[bridge:repl] Inbound control_request subtype=${parsed.request.subtype}`,
155        )
156        onControlRequest?.(parsed)
157        return
158      }
159  
160      if (!isSDKMessage(parsed)) return
161  
162      // Check for UUID to detect echoes of our own messages
163      const uuid =
164        'uuid' in parsed && typeof parsed.uuid === 'string'
165          ? parsed.uuid
166          : undefined
167  
168      if (uuid && recentPostedUUIDs.has(uuid)) {
169        logForDebugging(
170          `[bridge:repl] Ignoring echo: type=${parsed.type} uuid=${uuid}`,
171        )
172        return
173      }
174  
175      // Defensive dedup: drop inbound prompts we've already forwarded. The
176      // SSE seq-num carryover (lastTransportSequenceNum) is the primary fix
177      // for history-replay; this catches edge cases where that negotiation
178      // fails (server ignores from_sequence_num, transport died before
179      // receiving any frames, etc).
180      if (uuid && recentInboundUUIDs.has(uuid)) {
181        logForDebugging(
182          `[bridge:repl] Ignoring re-delivered inbound: type=${parsed.type} uuid=${uuid}`,
183        )
184        return
185      }
186  
187      logForDebugging(
188        `[bridge:repl] Ingress message type=${parsed.type}${uuid ? ` uuid=${uuid}` : ''}`,
189      )
190  
191      if (parsed.type === 'user') {
192        if (uuid) recentInboundUUIDs.add(uuid)
193        logEvent('tengu_bridge_message_received', {
194          is_repl: true,
195        })
196        // Fire-and-forget — handler may be async (attachment resolution).
197        void onInboundMessage?.(parsed)
198      } else {
199        logForDebugging(
200          `[bridge:repl] Ignoring non-user inbound message: type=${parsed.type}`,
201        )
202      }
203    } catch (err) {
204      logForDebugging(
205        `[bridge:repl] Failed to parse ingress message: ${errorMessage(err)}`,
206      )
207    }
208  }
209  
210  // ─── Server-initiated control requests ───────────────────────────────────────
211  
212  export type ServerControlRequestHandlers = {
213    transport: ReplBridgeTransport | null
214    sessionId: string
215    /**
216     * When true, all mutable requests (interrupt, set_model, set_permission_mode,
217     * set_max_thinking_tokens) reply with an error instead of false-success.
218     * initialize still replies success — the server kills the connection otherwise.
219     * Used by the outbound-only bridge mode and the SDK's /bridge subpath so claude.ai sees a
220     * proper error instead of "action succeeded but nothing happened locally".
221     */
222    outboundOnly?: boolean
223    onInterrupt?: () => void
224    onSetModel?: (model: string | undefined) => void
225    onSetMaxThinkingTokens?: (maxTokens: number | null) => void
226    onSetPermissionMode?: (
227      mode: PermissionMode,
228    ) => { ok: true } | { ok: false; error: string }
229  }
230  
231  const OUTBOUND_ONLY_ERROR =
232    'This session is outbound-only. Enable Remote Control locally to allow inbound control.'
233  
234  /**
235   * Respond to inbound control_request messages from the server. The server
236   * sends these for session lifecycle events (initialize, set_model) and
237   * for turn-level coordination (interrupt, set_max_thinking_tokens). If we
238   * don't respond, the server hangs and kills the WS after ~10-14s.
239   *
240   * Previously a closure inside initBridgeCore's onWorkReceived; now takes
241   * collaborators as params so both cores can use it.
242   */
243  export function handleServerControlRequest(
244    request: SDKControlRequest,
245    handlers: ServerControlRequestHandlers,
246  ): void {
247    const {
248      transport,
249      sessionId,
250      outboundOnly,
251      onInterrupt,
252      onSetModel,
253      onSetMaxThinkingTokens,
254      onSetPermissionMode,
255    } = handlers
256    if (!transport) {
257      logForDebugging(
258        '[bridge:repl] Cannot respond to control_request: transport not configured',
259      )
260      return
261    }
262  
263    let response: SDKControlResponse
264  
265    // Outbound-only: reply error for mutable requests so claude.ai doesn't show
266    // false success. initialize must still succeed (server kills the connection
267    // if it doesn't — see comment above).
268    if (outboundOnly && request.request.subtype !== 'initialize') {
269      response = {
270        type: 'control_response',
271        response: {
272          subtype: 'error',
273          request_id: request.request_id,
274          error: OUTBOUND_ONLY_ERROR,
275        },
276      }
277      const event = { ...response, session_id: sessionId }
278      void transport.write(event)
279      logForDebugging(
280        `[bridge:repl] Rejected ${request.request.subtype} (outbound-only) request_id=${request.request_id}`,
281      )
282      return
283    }
284  
285    switch (request.request.subtype) {
286      case 'initialize':
287        // Respond with minimal capabilities — the REPL handles
288        // commands, models, and account info itself.
289        response = {
290          type: 'control_response',
291          response: {
292            subtype: 'success',
293            request_id: request.request_id,
294            response: {
295              commands: [],
296              output_style: 'normal',
297              available_output_styles: ['normal'],
298              models: [],
299              account: {},
300              pid: process.pid,
301            },
302          },
303        }
304        break
305  
306      case 'set_model':
307        onSetModel?.(request.request.model)
308        response = {
309          type: 'control_response',
310          response: {
311            subtype: 'success',
312            request_id: request.request_id,
313          },
314        }
315        break
316  
317      case 'set_max_thinking_tokens':
318        onSetMaxThinkingTokens?.(request.request.max_thinking_tokens)
319        response = {
320          type: 'control_response',
321          response: {
322            subtype: 'success',
323            request_id: request.request_id,
324          },
325        }
326        break
327  
328      case 'set_permission_mode': {
329        // The callback returns a policy verdict so we can send an error
330        // control_response without importing isAutoModeGateEnabled /
331        // isBypassPermissionsModeDisabled here (bootstrap-isolation). If no
332        // callback is registered (daemon context, which doesn't wire this —
333        // see daemonBridge.ts), return an error verdict rather than a silent
334        // false-success: the mode is never actually applied in that context,
335        // so success would lie to the client.
336        const verdict = onSetPermissionMode?.(request.request.mode) ?? {
337          ok: false,
338          error:
339            'set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)',
340        }
341        if (verdict.ok) {
342          response = {
343            type: 'control_response',
344            response: {
345              subtype: 'success',
346              request_id: request.request_id,
347            },
348          }
349        } else {
350          response = {
351            type: 'control_response',
352            response: {
353              subtype: 'error',
354              request_id: request.request_id,
355              error: verdict.error,
356            },
357          }
358        }
359        break
360      }
361  
362      case 'interrupt':
363        onInterrupt?.()
364        response = {
365          type: 'control_response',
366          response: {
367            subtype: 'success',
368            request_id: request.request_id,
369          },
370        }
371        break
372  
373      default:
374        // Unknown subtype — respond with error so the server doesn't
375        // hang waiting for a reply that never comes.
376        response = {
377          type: 'control_response',
378          response: {
379            subtype: 'error',
380            request_id: request.request_id,
381            error: `REPL bridge does not handle control_request subtype: ${request.request.subtype}`,
382          },
383        }
384    }
385  
386    const event = { ...response, session_id: sessionId }
387    void transport.write(event)
388    logForDebugging(
389      `[bridge:repl] Sent control_response for ${request.request.subtype} request_id=${request.request_id} result=${response.response.subtype}`,
390    )
391  }
392  
393  // ─── Result message (for session archival on teardown) ───────────────────────
394  
395  /**
396   * Build a minimal `SDKResultSuccess` message for session archival.
397   * The server needs this event before a WS close to trigger archival.
398   */
399  export function makeResultMessage(sessionId: string): SDKResultSuccess {
400    return {
401      type: 'result',
402      subtype: 'success',
403      duration_ms: 0,
404      duration_api_ms: 0,
405      is_error: false,
406      num_turns: 0,
407      result: '',
408      stop_reason: null,
409      total_cost_usd: 0,
410      usage: { ...EMPTY_USAGE },
411      modelUsage: {},
412      permission_denials: [],
413      session_id: sessionId,
414      uuid: randomUUID(),
415    }
416  }
417  
418  // ─── BoundedUUIDSet (echo-dedup ring buffer) ─────────────────────────────────
419  
420  /**
421   * FIFO-bounded set backed by a circular buffer. Evicts the oldest entry
422   * when capacity is reached, keeping memory usage constant at O(capacity).
423   *
424   * Messages are added in chronological order, so evicted entries are always
425   * the oldest. The caller relies on external ordering (the hook's
426   * lastWrittenIndexRef) as the primary dedup — this set is a secondary
427   * safety net for echo filtering and race-condition dedup.
428   */
429  export class BoundedUUIDSet {
430    private readonly capacity: number
431    private readonly ring: (string | undefined)[]
432    private readonly set = new Set<string>()
433    private writeIdx = 0
434  
435    constructor(capacity: number) {
436      this.capacity = capacity
437      this.ring = new Array<string | undefined>(capacity)
438    }
439  
440    add(uuid: string): void {
441      if (this.set.has(uuid)) return
442      // Evict the entry at the current write position (if occupied)
443      const evicted = this.ring[this.writeIdx]
444      if (evicted !== undefined) {
445        this.set.delete(evicted)
446      }
447      this.ring[this.writeIdx] = uuid
448      this.set.add(uuid)
449      this.writeIdx = (this.writeIdx + 1) % this.capacity
450    }
451  
452    has(uuid: string): boolean {
453      return this.set.has(uuid)
454    }
455  
456    clear(): void {
457      this.set.clear()
458      this.ring.fill(undefined)
459      this.writeIdx = 0
460    }
461  }