/ src / utils / messageQueueManager.ts
messageQueueManager.ts
  1  import { feature } from 'bun:bundle'
  2  import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
  3  import type { Permutations } from 'src/types/utils.js'
  4  import { getSessionId } from '../bootstrap/state.js'
  5  import type { AppState } from '../state/AppState.js'
  6  import type {
  7    QueueOperation,
  8    QueueOperationMessage,
  9  } from '../types/messageQueueTypes.js'
 10  import type {
 11    EditablePromptInputMode,
 12    PromptInputMode,
 13    QueuedCommand,
 14    QueuePriority,
 15  } from '../types/textInputTypes.js'
 16  import type { PastedContent } from './config.js'
 17  import { extractTextContent } from './messages.js'
 18  import { objectGroupBy } from './objectGroupBy.js'
 19  import { recordQueueOperation } from './sessionStorage.js'
 20  import { createSignal } from './signal.js'
 21  
 22  export type SetAppState = (f: (prev: AppState) => AppState) => void
 23  
 24  // ============================================================================
 25  // Logging helper
 26  // ============================================================================
 27  
 28  function logOperation(operation: QueueOperation, content?: string): void {
 29    const sessionId = getSessionId()
 30    const queueOp: QueueOperationMessage = {
 31      type: 'queue-operation',
 32      operation,
 33      timestamp: new Date().toISOString(),
 34      sessionId,
 35      ...(content !== undefined && { content }),
 36    }
 37    void recordQueueOperation(queueOp)
 38  }
 39  
 40  // ============================================================================
 41  // Unified command queue (module-level, independent of React state)
 42  //
 43  // All commands — user input, task notifications, orphaned permissions — go
 44  // through this single queue. React components subscribe via
 45  // useSyncExternalStore (subscribeToCommandQueue / getCommandQueueSnapshot).
 46  // Non-React code (print.ts streaming loop) reads directly via
 47  // getCommandQueue() / getCommandQueueLength().
 48  //
 49  // Priority determines dequeue order: 'now' > 'next' > 'later'.
 50  // Within the same priority, commands are processed FIFO.
 51  // ============================================================================
 52  
 53  const commandQueue: QueuedCommand[] = []
 54  /** Frozen snapshot — recreated on every mutation for useSyncExternalStore. */
 55  let snapshot: readonly QueuedCommand[] = Object.freeze([])
 56  const queueChanged = createSignal()
 57  
 58  function notifySubscribers(): void {
 59    snapshot = Object.freeze([...commandQueue])
 60    queueChanged.emit()
 61  }
 62  
 63  // ============================================================================
 64  // useSyncExternalStore interface
 65  // ============================================================================
 66  
 67  /**
 68   * Subscribe to command queue changes.
 69   * Compatible with React's useSyncExternalStore.
 70   */
 71  export const subscribeToCommandQueue = queueChanged.subscribe
 72  
 73  /**
 74   * Get current snapshot of the command queue.
 75   * Compatible with React's useSyncExternalStore.
 76   * Returns a frozen array that only changes reference on mutation.
 77   */
 78  export function getCommandQueueSnapshot(): readonly QueuedCommand[] {
 79    return snapshot
 80  }
 81  
 82  // ============================================================================
 83  // Read operations (for non-React code)
 84  // ============================================================================
 85  
 86  /**
 87   * Get a mutable copy of the current queue.
 88   * Use for one-off reads where you need the actual commands.
 89   */
 90  export function getCommandQueue(): QueuedCommand[] {
 91    return [...commandQueue]
 92  }
 93  
 94  /**
 95   * Get the current queue length without copying.
 96   */
 97  export function getCommandQueueLength(): number {
 98    return commandQueue.length
 99  }
100  
101  /**
102   * Check if there are commands in the queue.
103   */
104  export function hasCommandsInQueue(): boolean {
105    return commandQueue.length > 0
106  }
107  
108  /**
109   * Trigger a re-check by notifying subscribers.
110   * Use after async processing completes to ensure remaining commands
111   * are picked up by useSyncExternalStore consumers.
112   */
113  export function recheckCommandQueue(): void {
114    if (commandQueue.length > 0) {
115      notifySubscribers()
116    }
117  }
118  
119  // ============================================================================
120  // Write operations
121  // ============================================================================
122  
123  /**
124   * Add a command to the queue.
125   * Used for user-initiated commands (prompt, bash, orphaned-permission).
126   * Defaults priority to 'next' (processed before task notifications).
127   */
128  export function enqueue(command: QueuedCommand): void {
129    commandQueue.push({ ...command, priority: command.priority ?? 'next' })
130    notifySubscribers()
131    logOperation(
132      'enqueue',
133      typeof command.value === 'string' ? command.value : undefined,
134    )
135  }
136  
137  /**
138   * Add a task notification to the queue.
139   * Convenience wrapper that defaults priority to 'later' so user input
140   * is never starved by system messages.
141   */
142  export function enqueuePendingNotification(command: QueuedCommand): void {
143    commandQueue.push({ ...command, priority: command.priority ?? 'later' })
144    notifySubscribers()
145    logOperation(
146      'enqueue',
147      typeof command.value === 'string' ? command.value : undefined,
148    )
149  }
150  
151  const PRIORITY_ORDER: Record<QueuePriority, number> = {
152    now: 0,
153    next: 1,
154    later: 2,
155  }
156  
157  /**
158   * Remove and return the highest-priority command, or undefined if empty.
159   * Within the same priority level, commands are dequeued FIFO.
160   *
161   * An optional `filter` narrows the candidates: only commands for which the
162   * predicate returns `true` are considered. Non-matching commands stay in the
163   * queue untouched. This lets between-turn drains (SDK, REPL) restrict to
164   * main-thread commands (`cmd.agentId === undefined`) without restructuring
165   * the existing while-loop patterns.
166   */
167  export function dequeue(
168    filter?: (cmd: QueuedCommand) => boolean,
169  ): QueuedCommand | undefined {
170    if (commandQueue.length === 0) {
171      return undefined
172    }
173  
174    // Find the first command with the highest priority (respecting filter)
175    let bestIdx = -1
176    let bestPriority = Infinity
177    for (let i = 0; i < commandQueue.length; i++) {
178      const cmd = commandQueue[i]!
179      if (filter && !filter(cmd)) continue
180      const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
181      if (priority < bestPriority) {
182        bestIdx = i
183        bestPriority = priority
184      }
185    }
186  
187    if (bestIdx === -1) return undefined
188  
189    const [dequeued] = commandQueue.splice(bestIdx, 1)
190    notifySubscribers()
191    logOperation('dequeue')
192    return dequeued
193  }
194  
195  /**
196   * Remove and return all commands from the queue.
197   * Logs a dequeue operation for each command.
198   */
199  export function dequeueAll(): QueuedCommand[] {
200    if (commandQueue.length === 0) {
201      return []
202    }
203  
204    const commands = [...commandQueue]
205    commandQueue.length = 0
206    notifySubscribers()
207  
208    for (const _cmd of commands) {
209      logOperation('dequeue')
210    }
211  
212    return commands
213  }
214  
215  /**
216   * Return the highest-priority command without removing it, or undefined if empty.
217   * Accepts an optional `filter` — only commands passing the predicate are considered.
218   */
219  export function peek(
220    filter?: (cmd: QueuedCommand) => boolean,
221  ): QueuedCommand | undefined {
222    if (commandQueue.length === 0) {
223      return undefined
224    }
225    let bestIdx = -1
226    let bestPriority = Infinity
227    for (let i = 0; i < commandQueue.length; i++) {
228      const cmd = commandQueue[i]!
229      if (filter && !filter(cmd)) continue
230      const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
231      if (priority < bestPriority) {
232        bestIdx = i
233        bestPriority = priority
234      }
235    }
236    if (bestIdx === -1) return undefined
237    return commandQueue[bestIdx]
238  }
239  
240  /**
241   * Remove and return all commands matching a predicate, preserving priority order.
242   * Non-matching commands stay in the queue.
243   */
244  export function dequeueAllMatching(
245    predicate: (cmd: QueuedCommand) => boolean,
246  ): QueuedCommand[] {
247    const matched: QueuedCommand[] = []
248    const remaining: QueuedCommand[] = []
249    for (const cmd of commandQueue) {
250      if (predicate(cmd)) {
251        matched.push(cmd)
252      } else {
253        remaining.push(cmd)
254      }
255    }
256    if (matched.length === 0) {
257      return []
258    }
259    commandQueue.length = 0
260    commandQueue.push(...remaining)
261    notifySubscribers()
262    for (const _cmd of matched) {
263      logOperation('dequeue')
264    }
265    return matched
266  }
267  
268  /**
269   * Remove specific commands from the queue by reference identity.
270   * Callers must pass the same object references that are in the queue
271   * (e.g. from getCommandsByMaxPriority). Logs a 'remove' operation for each.
272   */
273  export function remove(commandsToRemove: QueuedCommand[]): void {
274    if (commandsToRemove.length === 0) {
275      return
276    }
277  
278    const before = commandQueue.length
279    for (let i = commandQueue.length - 1; i >= 0; i--) {
280      if (commandsToRemove.includes(commandQueue[i]!)) {
281        commandQueue.splice(i, 1)
282      }
283    }
284  
285    if (commandQueue.length !== before) {
286      notifySubscribers()
287    }
288  
289    for (const _cmd of commandsToRemove) {
290      logOperation('remove')
291    }
292  }
293  
294  /**
295   * Remove commands matching a predicate.
296   * Returns the removed commands.
297   */
298  export function removeByFilter(
299    predicate: (cmd: QueuedCommand) => boolean,
300  ): QueuedCommand[] {
301    const removed: QueuedCommand[] = []
302    for (let i = commandQueue.length - 1; i >= 0; i--) {
303      if (predicate(commandQueue[i]!)) {
304        removed.unshift(commandQueue.splice(i, 1)[0]!)
305      }
306    }
307  
308    if (removed.length > 0) {
309      notifySubscribers()
310      for (const _cmd of removed) {
311        logOperation('remove')
312      }
313    }
314  
315    return removed
316  }
317  
318  /**
319   * Clear all commands from the queue.
320   * Used by ESC cancellation to discard queued notifications.
321   */
322  export function clearCommandQueue(): void {
323    if (commandQueue.length === 0) {
324      return
325    }
326    commandQueue.length = 0
327    notifySubscribers()
328  }
329  
330  /**
331   * Clear all commands and reset snapshot.
332   * Used for test cleanup.
333   */
334  export function resetCommandQueue(): void {
335    commandQueue.length = 0
336    snapshot = Object.freeze([])
337  }
338  
339  // ============================================================================
340  // Editable mode helpers
341  // ============================================================================
342  
343  const NON_EDITABLE_MODES = new Set<PromptInputMode>([
344    'task-notification',
345  ] satisfies Permutations<Exclude<PromptInputMode, EditablePromptInputMode>>)
346  
347  export function isPromptInputModeEditable(
348    mode: PromptInputMode,
349  ): mode is EditablePromptInputMode {
350    return !NON_EDITABLE_MODES.has(mode)
351  }
352  
353  /**
354   * Whether this queued command can be pulled into the input buffer via UP/ESC.
355   * System-generated commands (proactive ticks, scheduled tasks, plan
356   * verification, channel messages) contain raw XML and must not leak into
357   * the user's input.
358   */
359  export function isQueuedCommandEditable(cmd: QueuedCommand): boolean {
360    return isPromptInputModeEditable(cmd.mode) && !cmd.isMeta
361  }
362  
363  /**
364   * Whether this queued command should render in the queue preview under the
365   * prompt. Superset of editable — channel messages show (so the keyboard user
366   * sees what arrived) but stay non-editable (raw XML).
367   */
368  export function isQueuedCommandVisible(cmd: QueuedCommand): boolean {
369    if (
370      (feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
371      cmd.origin?.kind === 'channel'
372    )
373      return true
374    return isQueuedCommandEditable(cmd)
375  }
376  
377  /**
378   * Extract text from a queued command value.
379   * For strings, returns the string.
380   * For ContentBlockParam[], extracts text from text blocks.
381   */
382  function extractTextFromValue(value: string | ContentBlockParam[]): string {
383    return typeof value === 'string' ? value : extractTextContent(value, '\n')
384  }
385  
386  /**
387   * Extract images from ContentBlockParam[] and convert to PastedContent format.
388   * Returns empty array for string values or if no images found.
389   */
390  function extractImagesFromValue(
391    value: string | ContentBlockParam[],
392    startId: number,
393  ): PastedContent[] {
394    if (typeof value === 'string') {
395      return []
396    }
397  
398    const images: PastedContent[] = []
399    let imageIndex = 0
400    for (const block of value) {
401      if (block.type === 'image' && block.source.type === 'base64') {
402        images.push({
403          id: startId + imageIndex,
404          type: 'image',
405          content: block.source.data,
406          mediaType: block.source.media_type,
407          filename: `image${imageIndex + 1}`,
408        })
409        imageIndex++
410      }
411    }
412    return images
413  }
414  
415  export type PopAllEditableResult = {
416    text: string
417    cursorOffset: number
418    images: PastedContent[]
419  }
420  
421  /**
422   * Pop all editable commands and combine them with current input for editing.
423   * Notification modes (task-notification) are left in the queue
424   * to be auto-processed later.
425   * Returns object with combined text, cursor offset, and images to restore.
426   * Returns undefined if no editable commands in queue.
427   */
428  export function popAllEditable(
429    currentInput: string,
430    currentCursorOffset: number,
431  ): PopAllEditableResult | undefined {
432    if (commandQueue.length === 0) {
433      return undefined
434    }
435  
436    const { editable = [], nonEditable = [] } = objectGroupBy(
437      [...commandQueue],
438      cmd => (isQueuedCommandEditable(cmd) ? 'editable' : 'nonEditable'),
439    )
440  
441    if (editable.length === 0) {
442      return undefined
443    }
444  
445    // Extract text from queued commands (handles both strings and ContentBlockParam[])
446    const queuedTexts = editable.map(cmd => extractTextFromValue(cmd.value))
447    const newInput = [...queuedTexts, currentInput].filter(Boolean).join('\n')
448  
449    // Calculate cursor offset: length of joined queued commands + 1 + current cursor offset
450    const cursorOffset = queuedTexts.join('\n').length + 1 + currentCursorOffset
451  
452    // Extract images from queued commands
453    const images: PastedContent[] = []
454    let nextImageId = Date.now() // Use timestamp as base for unique IDs
455    for (const cmd of editable) {
456      // handlePromptSubmit queues images in pastedContents (value is a string).
457      // Preserve the original PastedContent id so imageStore lookups still work.
458      if (cmd.pastedContents) {
459        for (const content of Object.values(cmd.pastedContents)) {
460          if (content.type === 'image') {
461            images.push(content)
462          }
463        }
464      }
465      // Bridge/remote commands may embed images directly in ContentBlockParam[].
466      const cmdImages = extractImagesFromValue(cmd.value, nextImageId)
467      images.push(...cmdImages)
468      nextImageId += cmdImages.length
469    }
470  
471    for (const command of editable) {
472      logOperation(
473        'popAll',
474        typeof command.value === 'string' ? command.value : undefined,
475      )
476    }
477  
478    // Replace queue contents with only the non-editable commands
479    commandQueue.length = 0
480    commandQueue.push(...nonEditable)
481    notifySubscribers()
482  
483    return { text: newInput, cursorOffset, images }
484  }
485  
486  // ============================================================================
487  // Backward-compatible aliases (deprecated — prefer new names)
488  // ============================================================================
489  
490  /** @deprecated Use subscribeToCommandQueue */
491  export const subscribeToPendingNotifications = subscribeToCommandQueue
492  
493  /** @deprecated Use getCommandQueueSnapshot */
494  export function getPendingNotificationsSnapshot(): readonly QueuedCommand[] {
495    return snapshot
496  }
497  
498  /** @deprecated Use hasCommandsInQueue */
499  export const hasPendingNotifications = hasCommandsInQueue
500  
501  /** @deprecated Use getCommandQueueLength */
502  export const getPendingNotificationsCount = getCommandQueueLength
503  
504  /** @deprecated Use recheckCommandQueue */
505  export const recheckPendingNotifications = recheckCommandQueue
506  
507  /** @deprecated Use dequeue */
508  export function dequeuePendingNotification(): QueuedCommand | undefined {
509    return dequeue()
510  }
511  
512  /** @deprecated Use resetCommandQueue */
513  export const resetPendingNotifications = resetCommandQueue
514  
515  /** @deprecated Use clearCommandQueue */
516  export const clearPendingNotifications = clearCommandQueue
517  
518  /**
519   * Get commands at or above a given priority level without removing them.
520   * Useful for mid-chain draining where only urgent items should be processed.
521   *
522   * Priority order: 'now' (0) > 'next' (1) > 'later' (2).
523   * Passing 'now' returns only now-priority commands; 'later' returns everything.
524   */
525  export function getCommandsByMaxPriority(
526    maxPriority: QueuePriority,
527  ): QueuedCommand[] {
528    const threshold = PRIORITY_ORDER[maxPriority]
529    return commandQueue.filter(
530      cmd => PRIORITY_ORDER[cmd.priority ?? 'next'] <= threshold,
531    )
532  }
533  
534  /**
535   * Returns true if the command is a slash command that should be routed through
536   * processSlashCommand rather than sent to the model as text.
537   *
538   * Commands with `skipSlashCommands` (e.g. bridge/CCR messages) are NOT treated
539   * as slash commands — their text is meant for the model.
540   */
541  export function isSlashCommand(cmd: QueuedCommand): boolean {
542    return (
543      typeof cmd.value === 'string' &&
544      cmd.value.trim().startsWith('/') &&
545      !cmd.skipSlashCommands
546    )
547  }