/ src / lib / chat / queued-message-queue.test.ts
queued-message-queue.test.ts
  1  import assert from 'node:assert/strict'
  2  import { describe, it } from 'node:test'
  3  import {
  4    buildQueuedTranscriptMessages,
  5    createOptimisticQueuedMessage,
  6    clearQueuedMessagesForSession,
  7    listQueuedMessagesForSession,
  8    mergeQueuedTranscriptMessages,
  9    removeQueuedMessageById,
 10    replaceQueuedMessagesForSession,
 11    snapshotToQueuedMessages,
 12    type QueuedSessionMessage,
 13  } from '@/lib/chat/queued-message-queue'
 14  
 15  describe('queued-message-queue', () => {
 16    const queue: QueuedSessionMessage[] = [
 17      { runId: 'q1', sessionId: 'session-a', text: 'first a', queuedAt: 1, position: 1 },
 18      { runId: 'q2', sessionId: 'session-b', text: 'first b', queuedAt: 2, position: 1 },
 19      { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 2 },
 20    ]
 21  
 22    it('lists queued messages for a single session', () => {
 23      assert.deepEqual(
 24        listQueuedMessagesForSession(queue, 'session-a').map((item) => item.runId),
 25        ['q1', 'q3'],
 26      )
 27    })
 28  
 29    it('replaces queued items only for the requested session', () => {
 30      const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [
 31        { runId: 'q4', sessionId: 'session-a', text: 'replacement', queuedAt: 4, position: 1 },
 32      ], { activeRunId: null })
 33      assert.deepEqual(
 34        listQueuedMessagesForSession(replaced, 'session-a').map((item) => item.runId),
 35        ['q4'],
 36      )
 37      assert.deepEqual(
 38        listQueuedMessagesForSession(replaced, 'session-b').map((item) => item.runId),
 39        ['q2'],
 40      )
 41    })
 42  
 43    it('keeps only the newly active run as a sending placeholder when it disappears from the queue snapshot', () => {
 44      const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [
 45        { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 1 },
 46      ], { activeRunId: 'q1' })
 47  
 48      assert.deepEqual(
 49        listQueuedMessagesForSession(replaced, 'session-a').map((item) => [item.runId, item.sending === true]),
 50        [['q1', true], ['q3', false]],
 51      )
 52    })
 53  
 54    it('drops missing stale queue rows that are not the active run', () => {
 55      const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [
 56        { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 1 },
 57      ], { activeRunId: 'run-other' })
 58  
 59      assert.deepEqual(
 60        listQueuedMessagesForSession(replaced, 'session-a').map((item) => item.runId),
 61        ['q3'],
 62      )
 63    })
 64  
 65    it('removes queued items by stable id', () => {
 66      assert.deepEqual(removeQueuedMessageById(queue, 'q2').map((item) => item.runId), ['q1', 'q3'])
 67    })
 68  
 69    it('clears queued items only for the given session', () => {
 70      assert.deepEqual(
 71        clearQueuedMessagesForSession(queue, 'session-a').map((item) => item.runId),
 72        ['q2'],
 73      )
 74    })
 75  
 76    it('creates optimistic queued items with the expected shape', () => {
 77      const optimistic = createOptimisticQueuedMessage('session-a', { text: 'queued later' }, 3)
 78      assert.equal(optimistic.sessionId, 'session-a')
 79      assert.equal(optimistic.position, 3)
 80      assert.equal(optimistic.optimistic, true)
 81    })
 82  
 83    it('converts queue snapshots into local queued messages', () => {
 84      const queued = snapshotToQueuedMessages({
 85        sessionId: 'session-a',
 86        activeRunId: 'run-active',
 87        activeTurn: {
 88          runId: 'run-active',
 89          sessionId: 'session-a',
 90          text: 'sending now',
 91          queuedAt: 4,
 92          position: 0,
 93        },
 94        queueLength: 1,
 95        items: [
 96          { runId: 'run-queued', sessionId: 'session-a', text: 'queued', queuedAt: 5, position: 1 },
 97        ],
 98      })
 99      assert.deepEqual(
100        queued.map((item) => [item.runId, item.sending === true]),
101        [['run-active', true], ['run-queued', false]],
102      )
103    })
104  
105    it('preserves attachment and reply metadata from queue snapshots', () => {
106      const queued = snapshotToQueuedMessages({
107        sessionId: 'session-a',
108        activeRunId: null,
109        queueLength: 1,
110        items: [
111          {
112            runId: 'run-queued-meta',
113            sessionId: 'session-a',
114            text: 'queued with files',
115            queuedAt: 7,
116            position: 1,
117            imagePath: '/tmp/image.png',
118            imageUrl: '/api/uploads/image.png',
119            attachedFiles: ['/tmp/notes.txt', '/tmp/spec.md'],
120            replyToId: 'msg-4',
121          },
122        ],
123      })
124  
125      assert.deepEqual(queued[0], {
126        runId: 'run-queued-meta',
127        sessionId: 'session-a',
128        text: 'queued with files',
129        queuedAt: 7,
130        position: 1,
131        imagePath: '/tmp/image.png',
132        imageUrl: '/api/uploads/image.png',
133        attachedFiles: ['/tmp/notes.txt', '/tmp/spec.md'],
134        replyToId: 'msg-4',
135      })
136    })
137  
138    it('deduplicates an active turn when the snapshot also contains it in the queued items', () => {
139      const queued = snapshotToQueuedMessages({
140        sessionId: 'session-a',
141        activeRunId: 'run-active',
142        activeTurn: {
143          runId: 'run-active',
144          sessionId: 'session-a',
145          text: 'already running',
146          queuedAt: 6,
147          position: 0,
148        },
149        queueLength: 1,
150        items: [
151          { runId: 'run-active', sessionId: 'session-a', text: 'already running', queuedAt: 6, position: 1 },
152        ],
153      })
154  
155      assert.deepEqual(queued.map((item) => item.runId), ['run-active'])
156      assert.equal(queued[0]?.sending, true)
157    })
158  
159    it('sorts queued messages by position and queued time within a session', () => {
160      const unsorted: QueuedSessionMessage[] = [
161        { runId: 'q4', sessionId: 'session-a', text: 'later', queuedAt: 9, position: 2 },
162        { runId: 'q5', sessionId: 'session-a', text: 'earlier same pos', queuedAt: 4, position: 1 },
163        { runId: 'q6', sessionId: 'session-a', text: 'later same pos', queuedAt: 8, position: 1 },
164      ]
165  
166      assert.deepEqual(
167        listQueuedMessagesForSession(unsorted, 'session-a').map((item) => item.runId),
168        ['q5', 'q6', 'q4'],
169      )
170    })
171  
172    it('builds transcript-ready user messages from sending queued turns', () => {
173      const transcript = buildQueuedTranscriptMessages([
174        { runId: 'q1', sessionId: 'session-a', text: 'sending row', queuedAt: 20, position: 0, sending: true },
175        { runId: 'q2', sessionId: 'session-a', text: 'pending row', queuedAt: 21, position: 1 },
176        { runId: 'q3', sessionId: 'session-b', text: 'other session', queuedAt: 22, position: 0, sending: true },
177      ], 'session-a')
178  
179      assert.deepEqual(transcript, [
180        {
181          role: 'user',
182          text: 'sending row',
183          time: 20,
184          kind: 'chat',
185          clientRenderId: 'queued:q1',
186          imagePath: undefined,
187          imageUrl: undefined,
188          attachedFiles: undefined,
189          replyToId: undefined,
190          runId: 'q1',
191        },
192      ])
193    })
194  
195    it('merges sending queued turns into the transcript ahead of later assistant output', () => {
196      const merged = mergeQueuedTranscriptMessages([
197        { role: 'assistant', text: 'Thinking...', time: 25, streaming: true, runId: 'run-active' },
198      ], [
199        { runId: 'run-active', sessionId: 'session-a', text: 'queued first', queuedAt: 20, position: 0, sending: true },
200      ], 'session-a')
201  
202      assert.deepEqual(merged.map((message) => [message.role, message.text, message.runId]), [
203        ['user', 'queued first', 'run-active'],
204        ['assistant', 'Thinking...', 'run-active'],
205      ])
206    })
207  
208    it('preserves existing sending items when replacing queue for a session', () => {
209      const queueWithSending: QueuedSessionMessage[] = [
210        { runId: 'sending-1', sessionId: 'session-a', text: 'already sending', queuedAt: 1, position: 0, sending: true },
211        { runId: 'q3', sessionId: 'session-a', text: 'queued', queuedAt: 2, position: 1 },
212        { runId: 'q2', sessionId: 'session-b', text: 'other', queuedAt: 3, position: 1 },
213      ]
214      const replaced = replaceQueuedMessagesForSession(queueWithSending, 'session-a', [
215        { runId: 'q4', sessionId: 'session-a', text: 'new queued', queuedAt: 4, position: 1 },
216      ], { activeRunId: null })
217  
218      const forSession = listQueuedMessagesForSession(replaced, 'session-a')
219      assert.deepEqual(
220        forSession.map((item) => [item.runId, item.sending === true]),
221        [['sending-1', true], ['q4', false]],
222      )
223    })
224  
225    it('deduplicates sending items that appear in nextItems', () => {
226      const queueWithSending: QueuedSessionMessage[] = [
227        { runId: 'run-active', sessionId: 'session-a', text: 'sending', queuedAt: 1, position: 0, sending: true },
228      ]
229      const replaced = replaceQueuedMessagesForSession(queueWithSending, 'session-a', [
230        { runId: 'run-active', sessionId: 'session-a', text: 'sending', queuedAt: 1, position: 0, sending: true },
231        { runId: 'q5', sessionId: 'session-a', text: 'next', queuedAt: 2, position: 1 },
232      ], { activeRunId: 'run-active' })
233  
234      const forSession = listQueuedMessagesForSession(replaced, 'session-a')
235      assert.deepEqual(
236        forSession.map((item) => item.runId),
237        ['run-active', 'q5'],
238      )
239    })
240  
241    it('inserts sending messages after last persisted message, not by timestamp', () => {
242      const merged = mergeQueuedTranscriptMessages([
243        { role: 'user', text: 'First', time: 100 },
244        { role: 'assistant', text: 'Reply', time: 200 },
245        { role: 'user', text: 'Second', time: 300 },
246        { role: 'assistant', text: 'Reply 2', time: 400 },
247      ], [
248        // queuedAt is earlier than the last persisted message
249        { runId: 'run-late', sessionId: 'session-a', text: 'queued early', queuedAt: 150, position: 0, sending: true },
250      ], 'session-a')
251  
252      // Should appear at the END, not spliced into the middle at time=150
253      assert.deepEqual(merged.map((msg) => msg.text), [
254        'First', 'Reply', 'Second', 'Reply 2', 'queued early',
255      ])
256    })
257  
258    it('skips a sending queued turn once the persisted user message is already present', () => {
259      const merged = mergeQueuedTranscriptMessages([
260        { role: 'user', text: 'queued first', time: 20, runId: 'run-active' },
261        { role: 'assistant', text: 'Thinking...', time: 25, streaming: true, runId: 'run-active' },
262      ], [
263        { runId: 'run-active', sessionId: 'session-a', text: 'queued first', queuedAt: 20, position: 0, sending: true },
264      ], 'session-a')
265  
266      assert.deepEqual(merged.map((message) => [message.role, message.text, message.runId]), [
267        ['user', 'queued first', 'run-active'],
268        ['assistant', 'Thinking...', 'run-active'],
269      ])
270    })
271  })