queued-message-queue.ts
1 import type { Message, SessionQueueSnapshot, SessionQueuedTurn } from '@/types' 2 3 export interface QueuedSessionMessage extends SessionQueuedTurn { 4 optimistic?: boolean 5 /** Set when the server has consumed the message but the chat hasn't shown it yet */ 6 sending?: boolean 7 } 8 9 export interface QueueMessageDraft { 10 text: string 11 imagePath?: string 12 imageUrl?: string 13 attachedFiles?: string[] 14 replyToId?: string 15 } 16 17 export function nextQueuedMessageId(now = Date.now(), random = Math.random): string { 18 return `queued-${now}-${random().toString(36).slice(2, 8)}` 19 } 20 21 export function createOptimisticQueuedMessage( 22 sessionId: string, 23 draft: QueueMessageDraft, 24 position: number, 25 ): QueuedSessionMessage { 26 return { 27 runId: nextQueuedMessageId(), 28 sessionId, 29 text: draft.text, 30 queuedAt: Date.now(), 31 position, 32 imagePath: draft.imagePath, 33 imageUrl: draft.imageUrl, 34 attachedFiles: draft.attachedFiles, 35 replyToId: draft.replyToId, 36 optimistic: true, 37 } 38 } 39 40 export function snapshotToQueuedMessages(snapshot: SessionQueueSnapshot): QueuedSessionMessage[] { 41 const activeRunId = typeof snapshot.activeRunId === 'string' && snapshot.activeRunId.trim() 42 ? snapshot.activeRunId 43 : null 44 const nextItems: QueuedSessionMessage[] = [] 45 if (snapshot.activeTurn && activeRunId && snapshot.activeTurn.runId === activeRunId) { 46 nextItems.push({ 47 ...snapshot.activeTurn, 48 sending: true, 49 }) 50 } 51 const seenRunIds = new Set(nextItems.map((item) => item.runId)) 52 for (const item of snapshot.items) { 53 if (seenRunIds.has(item.runId)) continue 54 nextItems.push({ ...item }) 55 seenRunIds.add(item.runId) 56 } 57 return nextItems 58 } 59 60 interface ReplaceQueuedMessagesOptions { 61 activeRunId?: string | null 62 } 63 64 export function replaceQueuedMessagesForSession( 65 queue: QueuedSessionMessage[], 66 sessionId: string, 67 nextItems: QueuedSessionMessage[], 68 options: ReplaceQueuedMessagesOptions = {}, 69 ): QueuedSessionMessage[] { 70 const otherSessions = queue.filter((item) => item.sessionId !== sessionId) 71 const previousForSession = queue.filter((item) => item.sessionId === sessionId && !item.sending) 72 // Detect consumed messages: items in local state but not in server snapshot. 73 // Keep only the run that actually became active visible as "sending" so it 74 // doesn't vanish from the UI before the transcript refresh catches up. 75 const nextRunIds = new Set(nextItems.map((item) => item.runId)) 76 const activeRunId = typeof options.activeRunId === 'string' && options.activeRunId.trim() 77 ? options.activeRunId 78 : null 79 // Preserve existing "sending" items not covered by the new snapshot — 80 // they'll be cleaned up later by setMessages or the timeout. 81 const existingSending = queue.filter((item) => 82 item.sessionId === sessionId && item.sending && !nextRunIds.has(item.runId), 83 ) 84 const consumed = previousForSession 85 .filter((item) => !item.optimistic && !nextRunIds.has(item.runId) && activeRunId === item.runId) 86 .map((item) => ({ ...item, sending: true })) 87 return [ 88 ...otherSessions, 89 ...existingSending, 90 ...consumed, 91 ...nextItems, 92 ] 93 } 94 95 export function listQueuedMessagesForSession( 96 queue: QueuedSessionMessage[], 97 sessionId: string | null | undefined, 98 ): QueuedSessionMessage[] { 99 if (!sessionId) return [] 100 return queue 101 .filter((item) => item.sessionId === sessionId) 102 .sort((left, right) => left.position - right.position || left.queuedAt - right.queuedAt) 103 } 104 105 export function buildQueuedTranscriptMessages( 106 queue: QueuedSessionMessage[], 107 sessionId: string | null | undefined, 108 ): Message[] { 109 return listQueuedMessagesForSession(queue, sessionId) 110 .filter((item) => item.sending === true) 111 .map((item) => ({ 112 role: 'user', 113 text: item.text, 114 time: item.queuedAt, 115 kind: 'chat', 116 clientRenderId: `queued:${item.runId}`, 117 imagePath: item.imagePath, 118 imageUrl: item.imageUrl, 119 attachedFiles: item.attachedFiles, 120 replyToId: item.replyToId, 121 runId: item.runId, 122 })) 123 } 124 125 export function mergeQueuedTranscriptMessages( 126 messages: Message[], 127 queue: QueuedSessionMessage[], 128 sessionId: string | null | undefined, 129 ): Message[] { 130 const queuedTranscript = buildQueuedTranscriptMessages(queue, sessionId) 131 if (queuedTranscript.length === 0) return messages 132 const merged = [...messages] 133 for (const queuedMessage of queuedTranscript) { 134 const queuedRunId = typeof queuedMessage.runId === 'string' && queuedMessage.runId.trim() 135 ? queuedMessage.runId 136 : null 137 if (queuedRunId && merged.some((message) => message.role === 'user' && message.runId === queuedRunId)) { 138 continue 139 } 140 // Place queued user message before its corresponding assistant response 141 // (same runId), otherwise append after the last persisted message. 142 const sameRunAssistantIndex = queuedRunId 143 ? merged.findIndex((msg) => msg.role === 'assistant' && msg.runId === queuedRunId) 144 : -1 145 if (sameRunAssistantIndex >= 0) { 146 merged.splice(sameRunAssistantIndex, 0, queuedMessage) 147 } else { 148 const lastPersistedIndex = merged.findLastIndex( 149 (msg) => !msg.clientRenderId?.startsWith('queued:'), 150 ) 151 const insertAt = lastPersistedIndex >= 0 ? lastPersistedIndex + 1 : merged.length 152 merged.splice(insertAt, 0, queuedMessage) 153 } 154 } 155 return merged 156 } 157 158 export function removeQueuedMessageById( 159 queue: QueuedSessionMessage[], 160 id: string, 161 ): QueuedSessionMessage[] { 162 return queue.filter((item) => item.runId !== id) 163 } 164 165 export function clearQueuedMessagesForSession( 166 queue: QueuedSessionMessage[], 167 sessionId: string | null | undefined, 168 ): QueuedSessionMessage[] { 169 if (!sessionId) return queue 170 return queue.filter((item) => item.sessionId !== sessionId) 171 }