/ tests / openai-compatible-stream-timeout.test.ts
openai-compatible-stream-timeout.test.ts
  1  import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
  2  
  3  import { recordApiCall } from '@/server/costs/ledger'
  4  import { openAiCompatibleProvider } from '@/server/providers/openai-compatible'
  5  import type {
  6    ProviderChatInput,
  7    ProviderChatStreamChunk,
  8  } from '@/server/providers/types'
  9  
 10  vi.mock('@/server/costs/ledger', () => ({
 11    recordApiCall: vi.fn(),
 12  }))
 13  
 14  vi.mock('@/server/storage/chat-store', () => ({
 15    getUploadById: vi.fn(() => null),
 16  }))
 17  
 18  vi.mock('@/server/uploads/files', () => ({
 19    readUploadFile: vi.fn(),
 20  }))
 21  
 22  const originalEnv = { ...process.env }
 23  const originalFetch = global.fetch
 24  
 25  function buildSseResponse(input: {
 26    events: Array<{ delayMs: number; payload: Record<string, unknown> | '[DONE]' }>
 27    status?: number
 28  }): Response {
 29    const encoder = new TextEncoder()
 30    const stream = new ReadableStream<Uint8Array>({
 31      start(controller) {
 32        let elapsedMs = 0
 33        for (const event of input.events) {
 34          elapsedMs += event.delayMs
 35          setTimeout(() => {
 36            try {
 37              const line =
 38                event.payload === '[DONE]'
 39                  ? 'data: [DONE]\n\n'
 40                  : `data: ${JSON.stringify(event.payload)}\n\n`
 41              controller.enqueue(encoder.encode(line))
 42            } catch {
 43              // Stream may already be canceled/closed.
 44            }
 45          }, elapsedMs)
 46        }
 47  
 48        setTimeout(() => {
 49          try {
 50            controller.close()
 51          } catch {
 52            // Already closed by cancellation/abort.
 53          }
 54        }, elapsedMs + 10)
 55      },
 56    })
 57  
 58    return new Response(stream, {
 59      status: input.status ?? 200,
 60      headers: {
 61        'Content-Type': 'text/event-stream',
 62      },
 63    })
 64  }
 65  
 66  async function collectStream(
 67    stream: AsyncIterable<ProviderChatStreamChunk>,
 68  ): Promise<string> {
 69    let text = ''
 70    for await (const chunk of stream) {
 71      if (chunk.type === 'delta') {
 72        text += chunk.delta
 73      }
 74    }
 75    return text
 76  }
 77  
 78  describe('openAiCompatibleProvider responses stream inactivity timeout', () => {
 79    beforeEach(() => {
 80      vi.clearAllMocks()
 81      process.env = {
 82        ...originalEnv,
 83        LLM_BASE_URL: 'https://example.test/v1',
 84        LLM_CHAT_MODEL: 'minimax-m2',
 85        LLM_TIMEOUT_MS: '30000',
 86      }
 87      global.fetch = vi.fn() as typeof fetch
 88    })
 89  
 90    afterEach(() => {
 91      process.env = originalEnv
 92      global.fetch = originalFetch
 93    })
 94  
 95    it('does not abort active streams when chunk activity keeps arriving before timeout', async () => {
 96      process.env.LLM_TIMEOUT_MS = '1000'
 97  
 98      const fetchMock = vi.mocked(global.fetch)
 99      fetchMock.mockResolvedValueOnce(
100        buildSseResponse({
101          events: [
102            {
103              delayMs: 150,
104              payload: { type: 'response.output_text.delta', delta: 'Hello ' },
105            },
106            {
107              delayMs: 700,
108              payload: { type: 'response.output_text.delta', delta: 'world' },
109            },
110            {
111              delayMs: 700,
112              payload: { type: 'response.output_text.delta', delta: '!' },
113            },
114            {
115              delayMs: 50,
116              payload: { type: 'response.completed', response: { status: 'completed' } },
117            },
118            {
119              delayMs: 25,
120              payload: '[DONE]',
121            },
122          ],
123        }),
124      )
125  
126      const input: ProviderChatInput = {
127        systemPrompt: 'You are helpful.',
128        compactedSummary: '',
129        memories: [],
130        messages: [
131          {
132            role: 'user',
133            text: 'hello there',
134            attachments: [],
135          },
136        ],
137        providerOverride: {
138          baseUrl: 'https://router.example.com/v1',
139          apiKey: null,
140          chatEndpointMode: 'responses',
141        },
142        modelOverride: 'minimax-m2',
143        allowDangerousBashTool: false,
144      }
145  
146      const result = await openAiCompatibleProvider.streamReply(input)
147      const streamedText = await collectStream(result.stream)
148  
149      expect(streamedText).toBe('Hello world!')
150      expect(fetchMock).toHaveBeenCalledTimes(1)
151      expect(vi.mocked(recordApiCall)).toHaveBeenCalledWith(
152        expect.objectContaining({
153          endpoint: 'responses.stream',
154          status: 'success',
155        }),
156      )
157    })
158  })