/ server / directConnectManager.ts
directConnectManager.ts
  1  /* eslint-disable eslint-plugin-n/no-unsupported-features/node-builtins */
  2  
  3  import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  4  import type {
  5    SDKControlPermissionRequest,
  6    StdoutMessage,
  7  } from '../entrypoints/sdk/controlTypes.js'
  8  import type { RemotePermissionResponse } from '../remote/RemoteSessionManager.js'
  9  import { logForDebugging } from '../utils/debug.js'
 10  import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
 11  import type { RemoteMessageContent } from '../utils/teleport/api.js'
 12  
 13  export type DirectConnectConfig = {
 14    serverUrl: string
 15    sessionId: string
 16    wsUrl: string
 17    authToken?: string
 18  }
 19  
 20  export type DirectConnectCallbacks = {
 21    onMessage: (message: SDKMessage) => void
 22    onPermissionRequest: (
 23      request: SDKControlPermissionRequest,
 24      requestId: string,
 25    ) => void
 26    onConnected?: () => void
 27    onDisconnected?: () => void
 28    onError?: (error: Error) => void
 29  }
 30  
 31  function isStdoutMessage(value: unknown): value is StdoutMessage {
 32    return (
 33      typeof value === 'object' &&
 34      value !== null &&
 35      'type' in value &&
 36      typeof value.type === 'string'
 37    )
 38  }
 39  
 40  export class DirectConnectSessionManager {
 41    private ws: WebSocket | null = null
 42    private config: DirectConnectConfig
 43    private callbacks: DirectConnectCallbacks
 44  
 45    constructor(config: DirectConnectConfig, callbacks: DirectConnectCallbacks) {
 46      this.config = config
 47      this.callbacks = callbacks
 48    }
 49  
 50    connect(): void {
 51      const headers: Record<string, string> = {}
 52      if (this.config.authToken) {
 53        headers['authorization'] = `Bearer ${this.config.authToken}`
 54      }
 55      // Bun's WebSocket supports headers option but the DOM typings don't
 56      this.ws = new WebSocket(this.config.wsUrl, {
 57        headers,
 58      } as unknown as string[])
 59  
 60      this.ws.addEventListener('open', () => {
 61        this.callbacks.onConnected?.()
 62      })
 63  
 64      this.ws.addEventListener('message', event => {
 65        const data = typeof event.data === 'string' ? event.data : ''
 66        const lines = data.split('\n').filter((l: string) => l.trim())
 67  
 68        for (const line of lines) {
 69          let raw: unknown
 70          try {
 71            raw = jsonParse(line)
 72          } catch {
 73            continue
 74          }
 75  
 76          if (!isStdoutMessage(raw)) {
 77            continue
 78          }
 79          const parsed = raw
 80  
 81          // Handle control requests (permission requests)
 82          if (parsed.type === 'control_request') {
 83            if (parsed.request.subtype === 'can_use_tool') {
 84              this.callbacks.onPermissionRequest(
 85                parsed.request,
 86                parsed.request_id,
 87              )
 88            } else {
 89              // Send an error response for unrecognized subtypes so the
 90              // server doesn't hang waiting for a reply that never comes.
 91              logForDebugging(
 92                `[DirectConnect] Unsupported control request subtype: ${parsed.request.subtype}`,
 93              )
 94              this.sendErrorResponse(
 95                parsed.request_id,
 96                `Unsupported control request subtype: ${parsed.request.subtype}`,
 97              )
 98            }
 99            continue
100          }
101  
102          // Forward SDK messages (assistant, result, system, etc.)
103          if (
104            parsed.type !== 'control_response' &&
105            parsed.type !== 'keep_alive' &&
106            parsed.type !== 'control_cancel_request' &&
107            parsed.type !== 'streamlined_text' &&
108            parsed.type !== 'streamlined_tool_use_summary' &&
109            !(parsed.type === 'system' && parsed.subtype === 'post_turn_summary')
110          ) {
111            this.callbacks.onMessage(parsed)
112          }
113        }
114      })
115  
116      this.ws.addEventListener('close', () => {
117        this.callbacks.onDisconnected?.()
118      })
119  
120      this.ws.addEventListener('error', () => {
121        this.callbacks.onError?.(new Error('WebSocket connection error'))
122      })
123    }
124  
125    sendMessage(content: RemoteMessageContent): boolean {
126      if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
127        return false
128      }
129  
130      // Must match SDKUserMessage format expected by `--input-format stream-json`
131      const message = jsonStringify({
132        type: 'user',
133        message: {
134          role: 'user',
135          content: content,
136        },
137        parent_tool_use_id: null,
138        session_id: '',
139      })
140      this.ws.send(message)
141      return true
142    }
143  
144    respondToPermissionRequest(
145      requestId: string,
146      result: RemotePermissionResponse,
147    ): void {
148      if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
149        return
150      }
151  
152      // Must match SDKControlResponse format expected by StructuredIO
153      const response = jsonStringify({
154        type: 'control_response',
155        response: {
156          subtype: 'success',
157          request_id: requestId,
158          response: {
159            behavior: result.behavior,
160            ...(result.behavior === 'allow'
161              ? { updatedInput: result.updatedInput }
162              : { message: result.message }),
163          },
164        },
165      })
166      this.ws.send(response)
167    }
168  
169    /**
170     * Send an interrupt signal to cancel the current request
171     */
172    sendInterrupt(): void {
173      if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
174        return
175      }
176  
177      // Must match SDKControlRequest format expected by StructuredIO
178      const request = jsonStringify({
179        type: 'control_request',
180        request_id: crypto.randomUUID(),
181        request: {
182          subtype: 'interrupt',
183        },
184      })
185      this.ws.send(request)
186    }
187  
188    private sendErrorResponse(requestId: string, error: string): void {
189      if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
190        return
191      }
192      const response = jsonStringify({
193        type: 'control_response',
194        response: {
195          subtype: 'error',
196          request_id: requestId,
197          error,
198        },
199      })
200      this.ws.send(response)
201    }
202  
203    disconnect(): void {
204      if (this.ws) {
205        this.ws.close()
206        this.ws = null
207      }
208    }
209  
210    isConnected(): boolean {
211      return this.ws?.readyState === WebSocket.OPEN
212    }
213  }