/ src / lib / chat / queued-message-queue.ts
queued-message-queue.ts
  1  import type { Message, SessionQueueSnapshot, SessionQueuedTurn } from '@/types'
  2  
  3  export interface QueuedSessionMessage extends SessionQueuedTurn {
  4    optimistic?: boolean
  5    /** Set when the server has consumed the message but the chat hasn't shown it yet */
  6    sending?: boolean
  7  }
  8  
  9  export interface QueueMessageDraft {
 10    text: string
 11    imagePath?: string
 12    imageUrl?: string
 13    attachedFiles?: string[]
 14    replyToId?: string
 15  }
 16  
 17  export function nextQueuedMessageId(now = Date.now(), random = Math.random): string {
 18    return `queued-${now}-${random().toString(36).slice(2, 8)}`
 19  }
 20  
 21  export function createOptimisticQueuedMessage(
 22    sessionId: string,
 23    draft: QueueMessageDraft,
 24    position: number,
 25  ): QueuedSessionMessage {
 26    return {
 27      runId: nextQueuedMessageId(),
 28      sessionId,
 29      text: draft.text,
 30      queuedAt: Date.now(),
 31      position,
 32      imagePath: draft.imagePath,
 33      imageUrl: draft.imageUrl,
 34      attachedFiles: draft.attachedFiles,
 35      replyToId: draft.replyToId,
 36      optimistic: true,
 37    }
 38  }
 39  
 40  export function snapshotToQueuedMessages(snapshot: SessionQueueSnapshot): QueuedSessionMessage[] {
 41    const activeRunId = typeof snapshot.activeRunId === 'string' && snapshot.activeRunId.trim()
 42      ? snapshot.activeRunId
 43      : null
 44    const nextItems: QueuedSessionMessage[] = []
 45    if (snapshot.activeTurn && activeRunId && snapshot.activeTurn.runId === activeRunId) {
 46      nextItems.push({
 47        ...snapshot.activeTurn,
 48        sending: true,
 49      })
 50    }
 51    const seenRunIds = new Set(nextItems.map((item) => item.runId))
 52    for (const item of snapshot.items) {
 53      if (seenRunIds.has(item.runId)) continue
 54      nextItems.push({ ...item })
 55      seenRunIds.add(item.runId)
 56    }
 57    return nextItems
 58  }
 59  
 60  interface ReplaceQueuedMessagesOptions {
 61    activeRunId?: string | null
 62  }
 63  
 64  export function replaceQueuedMessagesForSession(
 65    queue: QueuedSessionMessage[],
 66    sessionId: string,
 67    nextItems: QueuedSessionMessage[],
 68    options: ReplaceQueuedMessagesOptions = {},
 69  ): QueuedSessionMessage[] {
 70    const otherSessions = queue.filter((item) => item.sessionId !== sessionId)
 71    const previousForSession = queue.filter((item) => item.sessionId === sessionId && !item.sending)
 72    // Detect consumed messages: items in local state but not in server snapshot.
 73    // Keep only the run that actually became active visible as "sending" so it
 74    // doesn't vanish from the UI before the transcript refresh catches up.
 75    const nextRunIds = new Set(nextItems.map((item) => item.runId))
 76    const activeRunId = typeof options.activeRunId === 'string' && options.activeRunId.trim()
 77      ? options.activeRunId
 78      : null
 79    // Preserve existing "sending" items not covered by the new snapshot —
 80    // they'll be cleaned up later by setMessages or the timeout.
 81    const existingSending = queue.filter((item) =>
 82      item.sessionId === sessionId && item.sending && !nextRunIds.has(item.runId),
 83    )
 84    const consumed = previousForSession
 85      .filter((item) => !item.optimistic && !nextRunIds.has(item.runId) && activeRunId === item.runId)
 86      .map((item) => ({ ...item, sending: true }))
 87    return [
 88      ...otherSessions,
 89      ...existingSending,
 90      ...consumed,
 91      ...nextItems,
 92    ]
 93  }
 94  
 95  export function listQueuedMessagesForSession(
 96    queue: QueuedSessionMessage[],
 97    sessionId: string | null | undefined,
 98  ): QueuedSessionMessage[] {
 99    if (!sessionId) return []
100    return queue
101      .filter((item) => item.sessionId === sessionId)
102      .sort((left, right) => left.position - right.position || left.queuedAt - right.queuedAt)
103  }
104  
105  export function buildQueuedTranscriptMessages(
106    queue: QueuedSessionMessage[],
107    sessionId: string | null | undefined,
108  ): Message[] {
109    return listQueuedMessagesForSession(queue, sessionId)
110      .filter((item) => item.sending === true)
111      .map((item) => ({
112        role: 'user',
113        text: item.text,
114        time: item.queuedAt,
115        kind: 'chat',
116        clientRenderId: `queued:${item.runId}`,
117        imagePath: item.imagePath,
118        imageUrl: item.imageUrl,
119        attachedFiles: item.attachedFiles,
120        replyToId: item.replyToId,
121        runId: item.runId,
122      }))
123  }
124  
125  export function mergeQueuedTranscriptMessages(
126    messages: Message[],
127    queue: QueuedSessionMessage[],
128    sessionId: string | null | undefined,
129  ): Message[] {
130    const queuedTranscript = buildQueuedTranscriptMessages(queue, sessionId)
131    if (queuedTranscript.length === 0) return messages
132    const merged = [...messages]
133    for (const queuedMessage of queuedTranscript) {
134      const queuedRunId = typeof queuedMessage.runId === 'string' && queuedMessage.runId.trim()
135        ? queuedMessage.runId
136        : null
137      if (queuedRunId && merged.some((message) => message.role === 'user' && message.runId === queuedRunId)) {
138        continue
139      }
140      // Place queued user message before its corresponding assistant response
141      // (same runId), otherwise append after the last persisted message.
142      const sameRunAssistantIndex = queuedRunId
143        ? merged.findIndex((msg) => msg.role === 'assistant' && msg.runId === queuedRunId)
144        : -1
145      if (sameRunAssistantIndex >= 0) {
146        merged.splice(sameRunAssistantIndex, 0, queuedMessage)
147      } else {
148        const lastPersistedIndex = merged.findLastIndex(
149          (msg) => !msg.clientRenderId?.startsWith('queued:'),
150        )
151        const insertAt = lastPersistedIndex >= 0 ? lastPersistedIndex + 1 : merged.length
152        merged.splice(insertAt, 0, queuedMessage)
153      }
154    }
155    return merged
156  }
157  
158  export function removeQueuedMessageById(
159    queue: QueuedSessionMessage[],
160    id: string,
161  ): QueuedSessionMessage[] {
162    return queue.filter((item) => item.runId !== id)
163  }
164  
165  export function clearQueuedMessagesForSession(
166    queue: QueuedSessionMessage[],
167    sessionId: string | null | undefined,
168  ): QueuedSessionMessage[] {
169    if (!sessionId) return queue
170    return queue.filter((item) => item.sessionId !== sessionId)
171  }