openai-compatible-chat-completions-stream.test.ts
1 import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' 2 3 import { recordApiCall } from '@/server/costs/ledger' 4 import { openAiCompatibleProvider } from '@/server/providers/openai-compatible' 5 import type { ProviderChatInput } from '@/server/providers/types' 6 7 vi.mock('@/server/costs/ledger', () => ({ 8 recordApiCall: vi.fn(), 9 })) 10 11 vi.mock('@/server/storage/chat-store', () => ({ 12 getUploadById: vi.fn(() => null), 13 })) 14 15 vi.mock('@/server/uploads/files', () => ({ 16 readUploadFile: vi.fn(), 17 })) 18 19 const originalEnv = { ...process.env } 20 const originalFetch = global.fetch 21 22 function buildSseResponse(input: { 23 events: Array<{ delayMs: number; payload: Record<string, unknown> | '[DONE]' }> 24 }): Response { 25 const encoder = new TextEncoder() 26 const stream = new ReadableStream<Uint8Array>({ 27 start(controller) { 28 let elapsedMs = 0 29 for (const event of input.events) { 30 elapsedMs += event.delayMs 31 setTimeout(() => { 32 try { 33 const line = 34 event.payload === '[DONE]' 35 ? 'data: [DONE]\n\n' 36 : `data: ${JSON.stringify(event.payload)}\n\n` 37 controller.enqueue(encoder.encode(line)) 38 } catch { 39 // Stream may already be canceled/closed. 40 } 41 }, elapsedMs) 42 } 43 44 setTimeout(() => { 45 try { 46 controller.close() 47 } catch { 48 // Already closed by cancellation/abort. 49 } 50 }, elapsedMs + 10) 51 }, 52 }) 53 54 return new Response(stream, { 55 status: 200, 56 headers: { 57 'Content-Type': 'text/event-stream', 58 }, 59 }) 60 } 61 62 describe('openAiCompatibleProvider chat-completions streaming', () => { 63 beforeEach(() => { 64 vi.clearAllMocks() 65 process.env = { 66 ...originalEnv, 67 LLM_BASE_URL: 'https://example.test/v1', 68 LLM_CHAT_MODEL: 'minimax-m2', 69 LLM_TIMEOUT_MS: '30000', 70 } 71 global.fetch = vi.fn() as typeof fetch 72 }) 73 74 afterEach(() => { 75 process.env = originalEnv 76 global.fetch = originalFetch 77 }) 78 79 it('streams chat-completions deltas directly and forwards thinking chunks', async () => { 80 const fetchMock = vi.mocked(global.fetch) 81 fetchMock.mockResolvedValueOnce( 82 buildSseResponse({ 83 events: [ 84 { 85 delayMs: 20, 86 payload: { 87 choices: [{ delta: { reasoning_content: 'Planning briefly. ' } }], 88 }, 89 }, 90 { 91 delayMs: 20, 92 payload: { 93 choices: [{ delta: { content: 'Hello ' } }], 94 }, 95 }, 96 { 97 delayMs: 20, 98 payload: { 99 choices: [{ delta: { content: 'world' } }], 100 }, 101 }, 102 { 103 delayMs: 20, 104 payload: { 105 choices: [{ finish_reason: 'stop', delta: {} }], 106 }, 107 }, 108 { 109 delayMs: 10, 110 payload: '[DONE]', 111 }, 112 ], 113 }), 114 ) 115 116 const input: ProviderChatInput = { 117 systemPrompt: 'You are helpful.', 118 compactedSummary: '', 119 memories: [], 120 messages: [ 121 { 122 role: 'user', 123 text: 'hello there', 124 attachments: [], 125 }, 126 ], 127 providerOverride: { 128 baseUrl: 'https://router.example.com/v1', 129 apiKey: null, 130 chatEndpointMode: 'chat_completions', 131 }, 132 modelOverride: 'minimax-m2', 133 allowDangerousBashTool: false, 134 } 135 136 const streamReply = await openAiCompatibleProvider.streamReply(input) 137 let text = '' 138 const thinkingChunks: string[] = [] 139 for await (const chunk of streamReply.stream) { 140 if (chunk.type === 'delta') { 141 text += chunk.delta 142 continue 143 } 144 if (chunk.type === 'thinking') { 145 thinkingChunks.push(chunk.thinking) 146 } 147 } 148 149 expect(text).toBe('Hello world') 150 expect(thinkingChunks.join('')).toContain('Planning briefly.') 151 152 expect(fetchMock).toHaveBeenCalledTimes(1) 153 expect(String(fetchMock.mock.calls[0]?.[0])).toContain('/chat/completions') 154 const requestInit = fetchMock.mock.calls[0]?.[1] as RequestInit 155 const payload = JSON.parse(String(requestInit.body)) as { stream?: boolean } 156 expect(payload.stream).toBe(true) 157 158 expect(vi.mocked(recordApiCall)).toHaveBeenCalledWith( 159 expect.objectContaining({ 160 endpoint: 'chat.completions.stream', 161 status: 'success', 162 }), 163 ) 164 }) 165 166 it('does not emit duplicate thinking chunks when reasoning mirrors content', async () => { 167 const fetchMock = vi.mocked(global.fetch) 168 fetchMock.mockResolvedValueOnce( 169 buildSseResponse({ 170 events: [ 171 { 172 delayMs: 20, 173 payload: { 174 choices: [ 175 { 176 delta: { 177 reasoning_content: 'Analyzing latest headlines.', 178 content: 'Analyzing latest headlines.', 179 }, 180 }, 181 ], 182 }, 183 }, 184 { 185 delayMs: 20, 186 payload: { 187 choices: [{ delta: { content: ' Final answer here.' } }], 188 }, 189 }, 190 { 191 delayMs: 20, 192 payload: { 193 choices: [{ finish_reason: 'stop', delta: {} }], 194 }, 195 }, 196 { 197 delayMs: 10, 198 payload: '[DONE]', 199 }, 200 ], 201 }), 202 ) 203 204 const input: ProviderChatInput = { 205 systemPrompt: 'You are helpful.', 206 compactedSummary: '', 207 memories: [], 208 messages: [ 209 { 210 role: 'user', 211 text: 'hello there', 212 attachments: [], 213 }, 214 ], 215 providerOverride: { 216 baseUrl: 'https://router.example.com/v1', 217 apiKey: null, 218 chatEndpointMode: 'chat_completions', 219 }, 220 modelOverride: 'minimax-m2', 221 allowDangerousBashTool: false, 222 } 223 224 const streamReply = await openAiCompatibleProvider.streamReply(input) 225 let text = '' 226 const thinkingChunks: string[] = [] 227 for await (const chunk of streamReply.stream) { 228 if (chunk.type === 'delta') { 229 text += chunk.delta 230 continue 231 } 232 if (chunk.type === 'thinking') { 233 thinkingChunks.push(chunk.thinking) 234 } 235 } 236 237 expect(text).toContain('Analyzing latest headlines.') 238 expect(text).toContain('Final answer here.') 239 expect(thinkingChunks).toEqual([]) 240 }) 241 })