/ src / lib / chat / chat.ts
chat.ts
 1  import type { SSEEvent } from '@/types'
 2  import { getStoredAccessKey } from '@/lib/app/api-client'
 3  
 4  interface StreamChatOptions {
 5    internal?: boolean
 6    queueMode?: 'followup' | 'steer' | 'collect'
 7    replyToId?: string
 8  }
 9  
10  export async function streamChat(
11    sessionId: string,
12    message: string,
13    imagePath?: string,
14    imageUrl?: string,
15    onEvent?: (event: SSEEvent) => void,
16    optionsOrFiles?: StreamChatOptions | string[],
17    options?: StreamChatOptions,
18  ): Promise<void> {
19    // Support both (options) and (attachedFiles, options) as 6th arg
20    let attachedFiles: string[] | undefined
21    let opts: StreamChatOptions | undefined
22    if (Array.isArray(optionsOrFiles)) {
23      attachedFiles = optionsOrFiles
24      opts = options
25    } else {
26      opts = optionsOrFiles
27    }
28  
29    const key = getStoredAccessKey()
30    const res = await fetch(`/api/chats/${sessionId}/chat`, {
31      method: 'POST',
32      headers: {
33        'Content-Type': 'application/json',
34        ...(key ? { 'X-Access-Key': key } : {}),
35      },
36      body: JSON.stringify({
37        message,
38        imagePath,
39        imageUrl,
40        attachedFiles,
41        internal: !!opts?.internal,
42        queueMode: opts?.queueMode,
43        ...(opts?.replyToId ? { replyToId: opts.replyToId } : {}),
44      }),
45    })
46  
47    if (!res.ok || !res.body) {
48      onEvent?.({ t: 'err', text: `Request failed (${res.status})` })
49      onEvent?.({ t: 'done' })
50      return
51    }
52  
53    const reader = res.body.getReader()
54    const decoder = new TextDecoder()
55    let buf = ''
56    const STREAM_IDLE_TIMEOUT_MS = 300_000
57  
58    while (true) {
59      let timeoutId: ReturnType<typeof setTimeout> | undefined
60      let timedOut = false
61      const idleAbort = new Promise<{ done: true; value: undefined }>((resolve) => {
62        timeoutId = setTimeout(() => {
63          timedOut = true
64          resolve({ done: true, value: undefined })
65        }, STREAM_IDLE_TIMEOUT_MS)
66      })
67      const { done, value } = await Promise.race([reader.read(), idleAbort])
68      clearTimeout(timeoutId)
69      if (done) {
70        if (timedOut) {
71          onEvent?.({ t: 'err', text: 'Stream timed out (no data for 5 minutes)' })
72          reader.cancel().catch(() => {})
73        }
74        break
75      }
76      buf += decoder.decode(value, { stream: true })
77      const lines = buf.split('\n')
78      buf = lines.pop() || ''
79      for (const line of lines) {
80        if (!line.startsWith('data: ')) continue
81        try {
82          const event = JSON.parse(line.slice(6)) as SSEEvent
83          // Forward all event types including tool_call and tool_result
84          onEvent?.(event)
85        } catch {
86          // skip malformed
87        }
88      }
89    }
90  }