/ tests / openai-compatible-chat-completions-stream.test.ts
openai-compatible-chat-completions-stream.test.ts
  1  import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
  2  
  3  import { recordApiCall } from '@/server/costs/ledger'
  4  import { openAiCompatibleProvider } from '@/server/providers/openai-compatible'
  5  import type { ProviderChatInput } from '@/server/providers/types'
  6  
  7  vi.mock('@/server/costs/ledger', () => ({
  8    recordApiCall: vi.fn(),
  9  }))
 10  
 11  vi.mock('@/server/storage/chat-store', () => ({
 12    getUploadById: vi.fn(() => null),
 13  }))
 14  
 15  vi.mock('@/server/uploads/files', () => ({
 16    readUploadFile: vi.fn(),
 17  }))
 18  
 19  const originalEnv = { ...process.env }
 20  const originalFetch = global.fetch
 21  
 22  function buildSseResponse(input: {
 23    events: Array<{ delayMs: number; payload: Record<string, unknown> | '[DONE]' }>
 24  }): Response {
 25    const encoder = new TextEncoder()
 26    const stream = new ReadableStream<Uint8Array>({
 27      start(controller) {
 28        let elapsedMs = 0
 29        for (const event of input.events) {
 30          elapsedMs += event.delayMs
 31          setTimeout(() => {
 32            try {
 33              const line =
 34                event.payload === '[DONE]'
 35                  ? 'data: [DONE]\n\n'
 36                  : `data: ${JSON.stringify(event.payload)}\n\n`
 37              controller.enqueue(encoder.encode(line))
 38            } catch {
 39              // Stream may already be canceled/closed.
 40            }
 41          }, elapsedMs)
 42        }
 43  
 44        setTimeout(() => {
 45          try {
 46            controller.close()
 47          } catch {
 48            // Already closed by cancellation/abort.
 49          }
 50        }, elapsedMs + 10)
 51      },
 52    })
 53  
 54    return new Response(stream, {
 55      status: 200,
 56      headers: {
 57        'Content-Type': 'text/event-stream',
 58      },
 59    })
 60  }
 61  
 62  describe('openAiCompatibleProvider chat-completions streaming', () => {
 63    beforeEach(() => {
 64      vi.clearAllMocks()
 65      process.env = {
 66        ...originalEnv,
 67        LLM_BASE_URL: 'https://example.test/v1',
 68        LLM_CHAT_MODEL: 'minimax-m2',
 69        LLM_TIMEOUT_MS: '30000',
 70      }
 71      global.fetch = vi.fn() as typeof fetch
 72    })
 73  
 74    afterEach(() => {
 75      process.env = originalEnv
 76      global.fetch = originalFetch
 77    })
 78  
 79    it('streams chat-completions deltas directly and forwards thinking chunks', async () => {
 80      const fetchMock = vi.mocked(global.fetch)
 81      fetchMock.mockResolvedValueOnce(
 82        buildSseResponse({
 83          events: [
 84            {
 85              delayMs: 20,
 86              payload: {
 87                choices: [{ delta: { reasoning_content: 'Planning briefly. ' } }],
 88              },
 89            },
 90            {
 91              delayMs: 20,
 92              payload: {
 93                choices: [{ delta: { content: 'Hello ' } }],
 94              },
 95            },
 96            {
 97              delayMs: 20,
 98              payload: {
 99                choices: [{ delta: { content: 'world' } }],
100              },
101            },
102            {
103              delayMs: 20,
104              payload: {
105                choices: [{ finish_reason: 'stop', delta: {} }],
106              },
107            },
108            {
109              delayMs: 10,
110              payload: '[DONE]',
111            },
112          ],
113        }),
114      )
115  
116      const input: ProviderChatInput = {
117        systemPrompt: 'You are helpful.',
118        compactedSummary: '',
119        memories: [],
120        messages: [
121          {
122            role: 'user',
123            text: 'hello there',
124            attachments: [],
125          },
126        ],
127        providerOverride: {
128          baseUrl: 'https://router.example.com/v1',
129          apiKey: null,
130          chatEndpointMode: 'chat_completions',
131        },
132        modelOverride: 'minimax-m2',
133        allowDangerousBashTool: false,
134      }
135  
136      const streamReply = await openAiCompatibleProvider.streamReply(input)
137      let text = ''
138      const thinkingChunks: string[] = []
139      for await (const chunk of streamReply.stream) {
140        if (chunk.type === 'delta') {
141          text += chunk.delta
142          continue
143        }
144        if (chunk.type === 'thinking') {
145          thinkingChunks.push(chunk.thinking)
146        }
147      }
148  
149      expect(text).toBe('Hello world')
150      expect(thinkingChunks.join('')).toContain('Planning briefly.')
151  
152      expect(fetchMock).toHaveBeenCalledTimes(1)
153      expect(String(fetchMock.mock.calls[0]?.[0])).toContain('/chat/completions')
154      const requestInit = fetchMock.mock.calls[0]?.[1] as RequestInit
155      const payload = JSON.parse(String(requestInit.body)) as { stream?: boolean }
156      expect(payload.stream).toBe(true)
157  
158      expect(vi.mocked(recordApiCall)).toHaveBeenCalledWith(
159        expect.objectContaining({
160          endpoint: 'chat.completions.stream',
161          status: 'success',
162        }),
163      )
164    })
165  
166    it('does not emit duplicate thinking chunks when reasoning mirrors content', async () => {
167      const fetchMock = vi.mocked(global.fetch)
168      fetchMock.mockResolvedValueOnce(
169        buildSseResponse({
170          events: [
171            {
172              delayMs: 20,
173              payload: {
174                choices: [
175                  {
176                    delta: {
177                      reasoning_content: 'Analyzing latest headlines.',
178                      content: 'Analyzing latest headlines.',
179                    },
180                  },
181                ],
182              },
183            },
184            {
185              delayMs: 20,
186              payload: {
187                choices: [{ delta: { content: ' Final answer here.' } }],
188              },
189            },
190            {
191              delayMs: 20,
192              payload: {
193                choices: [{ finish_reason: 'stop', delta: {} }],
194              },
195            },
196            {
197              delayMs: 10,
198              payload: '[DONE]',
199            },
200          ],
201        }),
202      )
203  
204      const input: ProviderChatInput = {
205        systemPrompt: 'You are helpful.',
206        compactedSummary: '',
207        memories: [],
208        messages: [
209          {
210            role: 'user',
211            text: 'hello there',
212            attachments: [],
213          },
214        ],
215        providerOverride: {
216          baseUrl: 'https://router.example.com/v1',
217          apiKey: null,
218          chatEndpointMode: 'chat_completions',
219        },
220        modelOverride: 'minimax-m2',
221        allowDangerousBashTool: false,
222      }
223  
224      const streamReply = await openAiCompatibleProvider.streamReply(input)
225      let text = ''
226      const thinkingChunks: string[] = []
227      for await (const chunk of streamReply.stream) {
228        if (chunk.type === 'delta') {
229          text += chunk.delta
230          continue
231        }
232        if (chunk.type === 'thinking') {
233          thinkingChunks.push(chunk.thinking)
234        }
235      }
236  
237      expect(text).toContain('Analyzing latest headlines.')
238      expect(text).toContain('Final answer here.')
239      expect(thinkingChunks).toEqual([])
240    })
241  })