queued-message-queue.test.ts
1 import assert from 'node:assert/strict' 2 import { describe, it } from 'node:test' 3 import { 4 buildQueuedTranscriptMessages, 5 createOptimisticQueuedMessage, 6 clearQueuedMessagesForSession, 7 listQueuedMessagesForSession, 8 mergeQueuedTranscriptMessages, 9 removeQueuedMessageById, 10 replaceQueuedMessagesForSession, 11 snapshotToQueuedMessages, 12 type QueuedSessionMessage, 13 } from '@/lib/chat/queued-message-queue' 14 15 describe('queued-message-queue', () => { 16 const queue: QueuedSessionMessage[] = [ 17 { runId: 'q1', sessionId: 'session-a', text: 'first a', queuedAt: 1, position: 1 }, 18 { runId: 'q2', sessionId: 'session-b', text: 'first b', queuedAt: 2, position: 1 }, 19 { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 2 }, 20 ] 21 22 it('lists queued messages for a single session', () => { 23 assert.deepEqual( 24 listQueuedMessagesForSession(queue, 'session-a').map((item) => item.runId), 25 ['q1', 'q3'], 26 ) 27 }) 28 29 it('replaces queued items only for the requested session', () => { 30 const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [ 31 { runId: 'q4', sessionId: 'session-a', text: 'replacement', queuedAt: 4, position: 1 }, 32 ], { activeRunId: null }) 33 assert.deepEqual( 34 listQueuedMessagesForSession(replaced, 'session-a').map((item) => item.runId), 35 ['q4'], 36 ) 37 assert.deepEqual( 38 listQueuedMessagesForSession(replaced, 'session-b').map((item) => item.runId), 39 ['q2'], 40 ) 41 }) 42 43 it('keeps only the newly active run as a sending placeholder when it disappears from the queue snapshot', () => { 44 const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [ 45 { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 1 }, 46 ], { activeRunId: 'q1' }) 47 48 assert.deepEqual( 49 listQueuedMessagesForSession(replaced, 'session-a').map((item) => [item.runId, item.sending === true]), 50 [['q1', true], ['q3', false]], 51 ) 52 }) 53 54 it('drops missing stale queue rows that are not the active run', () => { 55 const replaced = replaceQueuedMessagesForSession(queue, 'session-a', [ 56 { runId: 'q3', sessionId: 'session-a', text: 'second a', queuedAt: 3, position: 1 }, 57 ], { activeRunId: 'run-other' }) 58 59 assert.deepEqual( 60 listQueuedMessagesForSession(replaced, 'session-a').map((item) => item.runId), 61 ['q3'], 62 ) 63 }) 64 65 it('removes queued items by stable id', () => { 66 assert.deepEqual(removeQueuedMessageById(queue, 'q2').map((item) => item.runId), ['q1', 'q3']) 67 }) 68 69 it('clears queued items only for the given session', () => { 70 assert.deepEqual( 71 clearQueuedMessagesForSession(queue, 'session-a').map((item) => item.runId), 72 ['q2'], 73 ) 74 }) 75 76 it('creates optimistic queued items with the expected shape', () => { 77 const optimistic = createOptimisticQueuedMessage('session-a', { text: 'queued later' }, 3) 78 assert.equal(optimistic.sessionId, 'session-a') 79 assert.equal(optimistic.position, 3) 80 assert.equal(optimistic.optimistic, true) 81 }) 82 83 it('converts queue snapshots into local queued messages', () => { 84 const queued = snapshotToQueuedMessages({ 85 sessionId: 'session-a', 86 activeRunId: 'run-active', 87 activeTurn: { 88 runId: 'run-active', 89 sessionId: 'session-a', 90 text: 'sending now', 91 queuedAt: 4, 92 position: 0, 93 }, 94 queueLength: 1, 95 items: [ 96 { runId: 'run-queued', sessionId: 'session-a', text: 'queued', queuedAt: 5, position: 1 }, 97 ], 98 }) 99 assert.deepEqual( 100 queued.map((item) => [item.runId, item.sending === true]), 101 [['run-active', true], ['run-queued', false]], 102 ) 103 }) 104 105 it('preserves attachment and reply metadata from queue snapshots', () => { 106 const queued = snapshotToQueuedMessages({ 107 sessionId: 'session-a', 108 activeRunId: null, 109 queueLength: 1, 110 items: [ 111 { 112 runId: 'run-queued-meta', 113 sessionId: 'session-a', 114 text: 'queued with files', 115 queuedAt: 7, 116 position: 1, 117 imagePath: '/tmp/image.png', 118 imageUrl: '/api/uploads/image.png', 119 attachedFiles: ['/tmp/notes.txt', '/tmp/spec.md'], 120 replyToId: 'msg-4', 121 }, 122 ], 123 }) 124 125 assert.deepEqual(queued[0], { 126 runId: 'run-queued-meta', 127 sessionId: 'session-a', 128 text: 'queued with files', 129 queuedAt: 7, 130 position: 1, 131 imagePath: '/tmp/image.png', 132 imageUrl: '/api/uploads/image.png', 133 attachedFiles: ['/tmp/notes.txt', '/tmp/spec.md'], 134 replyToId: 'msg-4', 135 }) 136 }) 137 138 it('deduplicates an active turn when the snapshot also contains it in the queued items', () => { 139 const queued = snapshotToQueuedMessages({ 140 sessionId: 'session-a', 141 activeRunId: 'run-active', 142 activeTurn: { 143 runId: 'run-active', 144 sessionId: 'session-a', 145 text: 'already running', 146 queuedAt: 6, 147 position: 0, 148 }, 149 queueLength: 1, 150 items: [ 151 { runId: 'run-active', sessionId: 'session-a', text: 'already running', queuedAt: 6, position: 1 }, 152 ], 153 }) 154 155 assert.deepEqual(queued.map((item) => item.runId), ['run-active']) 156 assert.equal(queued[0]?.sending, true) 157 }) 158 159 it('sorts queued messages by position and queued time within a session', () => { 160 const unsorted: QueuedSessionMessage[] = [ 161 { runId: 'q4', sessionId: 'session-a', text: 'later', queuedAt: 9, position: 2 }, 162 { runId: 'q5', sessionId: 'session-a', text: 'earlier same pos', queuedAt: 4, position: 1 }, 163 { runId: 'q6', sessionId: 'session-a', text: 'later same pos', queuedAt: 8, position: 1 }, 164 ] 165 166 assert.deepEqual( 167 listQueuedMessagesForSession(unsorted, 'session-a').map((item) => item.runId), 168 ['q5', 'q6', 'q4'], 169 ) 170 }) 171 172 it('builds transcript-ready user messages from sending queued turns', () => { 173 const transcript = buildQueuedTranscriptMessages([ 174 { runId: 'q1', sessionId: 'session-a', text: 'sending row', queuedAt: 20, position: 0, sending: true }, 175 { runId: 'q2', sessionId: 'session-a', text: 'pending row', queuedAt: 21, position: 1 }, 176 { runId: 'q3', sessionId: 'session-b', text: 'other session', queuedAt: 22, position: 0, sending: true }, 177 ], 'session-a') 178 179 assert.deepEqual(transcript, [ 180 { 181 role: 'user', 182 text: 'sending row', 183 time: 20, 184 kind: 'chat', 185 clientRenderId: 'queued:q1', 186 imagePath: undefined, 187 imageUrl: undefined, 188 attachedFiles: undefined, 189 replyToId: undefined, 190 runId: 'q1', 191 }, 192 ]) 193 }) 194 195 it('merges sending queued turns into the transcript ahead of later assistant output', () => { 196 const merged = mergeQueuedTranscriptMessages([ 197 { role: 'assistant', text: 'Thinking...', time: 25, streaming: true, runId: 'run-active' }, 198 ], [ 199 { runId: 'run-active', sessionId: 'session-a', text: 'queued first', queuedAt: 20, position: 0, sending: true }, 200 ], 'session-a') 201 202 assert.deepEqual(merged.map((message) => [message.role, message.text, message.runId]), [ 203 ['user', 'queued first', 'run-active'], 204 ['assistant', 'Thinking...', 'run-active'], 205 ]) 206 }) 207 208 it('preserves existing sending items when replacing queue for a session', () => { 209 const queueWithSending: QueuedSessionMessage[] = [ 210 { runId: 'sending-1', sessionId: 'session-a', text: 'already sending', queuedAt: 1, position: 0, sending: true }, 211 { runId: 'q3', sessionId: 'session-a', text: 'queued', queuedAt: 2, position: 1 }, 212 { runId: 'q2', sessionId: 'session-b', text: 'other', queuedAt: 3, position: 1 }, 213 ] 214 const replaced = replaceQueuedMessagesForSession(queueWithSending, 'session-a', [ 215 { runId: 'q4', sessionId: 'session-a', text: 'new queued', queuedAt: 4, position: 1 }, 216 ], { activeRunId: null }) 217 218 const forSession = listQueuedMessagesForSession(replaced, 'session-a') 219 assert.deepEqual( 220 forSession.map((item) => [item.runId, item.sending === true]), 221 [['sending-1', true], ['q4', false]], 222 ) 223 }) 224 225 it('deduplicates sending items that appear in nextItems', () => { 226 const queueWithSending: QueuedSessionMessage[] = [ 227 { runId: 'run-active', sessionId: 'session-a', text: 'sending', queuedAt: 1, position: 0, sending: true }, 228 ] 229 const replaced = replaceQueuedMessagesForSession(queueWithSending, 'session-a', [ 230 { runId: 'run-active', sessionId: 'session-a', text: 'sending', queuedAt: 1, position: 0, sending: true }, 231 { runId: 'q5', sessionId: 'session-a', text: 'next', queuedAt: 2, position: 1 }, 232 ], { activeRunId: 'run-active' }) 233 234 const forSession = listQueuedMessagesForSession(replaced, 'session-a') 235 assert.deepEqual( 236 forSession.map((item) => item.runId), 237 ['run-active', 'q5'], 238 ) 239 }) 240 241 it('inserts sending messages after last persisted message, not by timestamp', () => { 242 const merged = mergeQueuedTranscriptMessages([ 243 { role: 'user', text: 'First', time: 100 }, 244 { role: 'assistant', text: 'Reply', time: 200 }, 245 { role: 'user', text: 'Second', time: 300 }, 246 { role: 'assistant', text: 'Reply 2', time: 400 }, 247 ], [ 248 // queuedAt is earlier than the last persisted message 249 { runId: 'run-late', sessionId: 'session-a', text: 'queued early', queuedAt: 150, position: 0, sending: true }, 250 ], 'session-a') 251 252 // Should appear at the END, not spliced into the middle at time=150 253 assert.deepEqual(merged.map((msg) => msg.text), [ 254 'First', 'Reply', 'Second', 'Reply 2', 'queued early', 255 ]) 256 }) 257 258 it('skips a sending queued turn once the persisted user message is already present', () => { 259 const merged = mergeQueuedTranscriptMessages([ 260 { role: 'user', text: 'queued first', time: 20, runId: 'run-active' }, 261 { role: 'assistant', text: 'Thinking...', time: 25, streaming: true, runId: 'run-active' }, 262 ], [ 263 { runId: 'run-active', sessionId: 'session-a', text: 'queued first', queuedAt: 20, position: 0, sending: true }, 264 ], 'session-a') 265 266 assert.deepEqual(merged.map((message) => [message.role, message.text, message.runId]), [ 267 ['user', 'queued first', 'run-active'], 268 ['assistant', 'Thinking...', 'run-active'], 269 ]) 270 }) 271 })