queries.ts
1 import type { 2 RunEventRecord, 3 SessionQueueSnapshot, 4 SessionQueuedTurn, 5 SessionRunRecord, 6 SessionRunStatus, 7 } from '@/types' 8 import { 9 listPersistedRunEvents, 10 listPersistedRuns, 11 loadPersistedRun, 12 } from '@/lib/server/runtime/run-ledger' 13 import { isInternalHeartbeatRun } from '@/lib/server/runtime/heartbeat-source' 14 15 import { state } from './state' 16 import type { SessionRunQueueEntry } from './types' 17 18 export function getSessionRunState(sessionId: string): { 19 runningRunId?: string 20 queueLength: number 21 } { 22 const summary = getSessionExecutionState(sessionId) 23 return { 24 runningRunId: summary.runningRunId, 25 queueLength: summary.queueLength, 26 } 27 } 28 29 function visibleQueuedEntriesForSession(sessionId: string): SessionRunQueueEntry[] { 30 return Array.from(state.queueByExecution.values()) 31 .flatMap((queue) => queue) 32 .filter((entry) => entry.run.sessionId === sessionId && entry.run.internal !== true) 33 .sort((left, right) => left.run.queuedAt - right.run.queuedAt) 34 } 35 36 function toQueuedTurn(entry: SessionRunQueueEntry, index: number): SessionQueuedTurn { 37 return { 38 runId: entry.run.id, 39 sessionId: entry.run.sessionId, 40 text: entry.message, 41 queuedAt: entry.run.queuedAt, 42 position: index + 1, 43 imagePath: entry.imagePath, 44 imageUrl: entry.imageUrl, 45 attachedFiles: entry.attachedFiles, 46 replyToId: entry.replyToId, 47 source: entry.run.source, 48 } 49 } 50 51 function toActiveTurn(entry: SessionRunQueueEntry): SessionQueuedTurn { 52 return { 53 ...toQueuedTurn(entry, 0), 54 position: 0, 55 } 56 } 57 58 function visibleActiveTurnForSession(sessionId: string): SessionQueuedTurn | null { 59 const running = Array.from(state.runningByExecution.values()) 60 .find((entry) => entry.run.sessionId === sessionId && entry.run.status === 'running') 61 if (!running || running.run.internal === true) return null 62 return toActiveTurn(running) 63 } 64 65 export function getSessionQueueSnapshot(sessionId: string): SessionQueueSnapshot { 66 const execution = getSessionExecutionState(sessionId) 67 const visibleQueued = visibleQueuedEntriesForSession(sessionId) 68 return { 69 sessionId, 70 activeRunId: execution.runningRunId || null, 71 activeTurn: visibleActiveTurnForSession(sessionId), 72 queueLength: visibleQueued.length, 73 items: visibleQueued.map((entry, index) => toQueuedTurn(entry, index)), 74 } 75 } 76 77 export function getSessionExecutionState(sessionId: string): { 78 runningRunId?: string 79 queueLength: number 80 hasRunning: boolean 81 hasQueued: boolean 82 hasRunningHeartbeat: boolean 83 hasQueuedHeartbeat: boolean 84 hasRunningNonHeartbeat: boolean 85 hasQueuedNonHeartbeat: boolean 86 } { 87 const running = Array.from(state.runningByExecution.values()) 88 .find((entry) => entry.run.sessionId === sessionId) 89 const runningMatchesSession = Boolean(running) 90 const runningHeartbeat = Boolean( 91 runningMatchesSession 92 && running 93 && isInternalHeartbeatRun(running.run.internal, running.run.source), 94 ) 95 const runningNonHeartbeat = Boolean(runningMatchesSession && !runningHeartbeat) 96 const queuedEntries = Array.from(state.queueByExecution.values()) 97 .flatMap((queue) => queue) 98 .filter((entry) => entry.run.sessionId === sessionId) 99 const queuedHeartbeat = queuedEntries.filter((entry) => 100 isInternalHeartbeatRun(entry.run.internal, entry.run.source), 101 ).length 102 const queuedNonHeartbeat = queuedEntries.length - queuedHeartbeat 103 return { 104 runningRunId: (runningMatchesSession && running?.run.status === 'running') 105 ? running.run.id 106 : undefined, 107 queueLength: queuedEntries.length, 108 hasRunning: Boolean(runningMatchesSession), 109 hasQueued: queuedEntries.length > 0, 110 hasRunningHeartbeat: runningHeartbeat, 111 hasQueuedHeartbeat: queuedHeartbeat > 0, 112 hasRunningNonHeartbeat: runningNonHeartbeat, 113 hasQueuedNonHeartbeat: queuedNonHeartbeat > 0, 114 } 115 } 116 117 export function getRunById(runId: string): SessionRunRecord | null { 118 return state.runs.get(runId) || loadPersistedRun(runId) 119 } 120 121 export function listRuns(params?: { 122 sessionId?: string 123 status?: SessionRunStatus 124 limit?: number 125 }): SessionRunRecord[] { 126 return listPersistedRuns(params) 127 } 128 129 export function listRunEvents(runId: string, limit?: number): RunEventRecord[] { 130 return listPersistedRunEvents(runId, limit) 131 }