openai-compatible-stream-timeout.test.ts
1 import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' 2 3 import { recordApiCall } from '@/server/costs/ledger' 4 import { openAiCompatibleProvider } from '@/server/providers/openai-compatible' 5 import type { 6 ProviderChatInput, 7 ProviderChatStreamChunk, 8 } from '@/server/providers/types' 9 10 vi.mock('@/server/costs/ledger', () => ({ 11 recordApiCall: vi.fn(), 12 })) 13 14 vi.mock('@/server/storage/chat-store', () => ({ 15 getUploadById: vi.fn(() => null), 16 })) 17 18 vi.mock('@/server/uploads/files', () => ({ 19 readUploadFile: vi.fn(), 20 })) 21 22 const originalEnv = { ...process.env } 23 const originalFetch = global.fetch 24 25 function buildSseResponse(input: { 26 events: Array<{ delayMs: number; payload: Record<string, unknown> | '[DONE]' }> 27 status?: number 28 }): Response { 29 const encoder = new TextEncoder() 30 const stream = new ReadableStream<Uint8Array>({ 31 start(controller) { 32 let elapsedMs = 0 33 for (const event of input.events) { 34 elapsedMs += event.delayMs 35 setTimeout(() => { 36 try { 37 const line = 38 event.payload === '[DONE]' 39 ? 'data: [DONE]\n\n' 40 : `data: ${JSON.stringify(event.payload)}\n\n` 41 controller.enqueue(encoder.encode(line)) 42 } catch { 43 // Stream may already be canceled/closed. 44 } 45 }, elapsedMs) 46 } 47 48 setTimeout(() => { 49 try { 50 controller.close() 51 } catch { 52 // Already closed by cancellation/abort. 53 } 54 }, elapsedMs + 10) 55 }, 56 }) 57 58 return new Response(stream, { 59 status: input.status ?? 200, 60 headers: { 61 'Content-Type': 'text/event-stream', 62 }, 63 }) 64 } 65 66 async function collectStream( 67 stream: AsyncIterable<ProviderChatStreamChunk>, 68 ): Promise<string> { 69 let text = '' 70 for await (const chunk of stream) { 71 if (chunk.type === 'delta') { 72 text += chunk.delta 73 } 74 } 75 return text 76 } 77 78 describe('openAiCompatibleProvider responses stream inactivity timeout', () => { 79 beforeEach(() => { 80 vi.clearAllMocks() 81 process.env = { 82 ...originalEnv, 83 LLM_BASE_URL: 'https://example.test/v1', 84 LLM_CHAT_MODEL: 'minimax-m2', 85 LLM_TIMEOUT_MS: '30000', 86 } 87 global.fetch = vi.fn() as typeof fetch 88 }) 89 90 afterEach(() => { 91 process.env = originalEnv 92 global.fetch = originalFetch 93 }) 94 95 it('does not abort active streams when chunk activity keeps arriving before timeout', async () => { 96 process.env.LLM_TIMEOUT_MS = '1000' 97 98 const fetchMock = vi.mocked(global.fetch) 99 fetchMock.mockResolvedValueOnce( 100 buildSseResponse({ 101 events: [ 102 { 103 delayMs: 150, 104 payload: { type: 'response.output_text.delta', delta: 'Hello ' }, 105 }, 106 { 107 delayMs: 700, 108 payload: { type: 'response.output_text.delta', delta: 'world' }, 109 }, 110 { 111 delayMs: 700, 112 payload: { type: 'response.output_text.delta', delta: '!' }, 113 }, 114 { 115 delayMs: 50, 116 payload: { type: 'response.completed', response: { status: 'completed' } }, 117 }, 118 { 119 delayMs: 25, 120 payload: '[DONE]', 121 }, 122 ], 123 }), 124 ) 125 126 const input: ProviderChatInput = { 127 systemPrompt: 'You are helpful.', 128 compactedSummary: '', 129 memories: [], 130 messages: [ 131 { 132 role: 'user', 133 text: 'hello there', 134 attachments: [], 135 }, 136 ], 137 providerOverride: { 138 baseUrl: 'https://router.example.com/v1', 139 apiKey: null, 140 chatEndpointMode: 'responses', 141 }, 142 modelOverride: 'minimax-m2', 143 allowDangerousBashTool: false, 144 } 145 146 const result = await openAiCompatibleProvider.streamReply(input) 147 const streamedText = await collectStream(result.stream) 148 149 expect(streamedText).toBe('Hello world!') 150 expect(fetchMock).toHaveBeenCalledTimes(1) 151 expect(vi.mocked(recordApiCall)).toHaveBeenCalledWith( 152 expect.objectContaining({ 153 endpoint: 'responses.stream', 154 status: 'success', 155 }), 156 ) 157 }) 158 })