/ src / lib / server / runtime / session-run-manager / queries.ts
queries.ts
  1  import type {
  2    RunEventRecord,
  3    SessionQueueSnapshot,
  4    SessionQueuedTurn,
  5    SessionRunRecord,
  6    SessionRunStatus,
  7  } from '@/types'
  8  import {
  9    listPersistedRunEvents,
 10    listPersistedRuns,
 11    loadPersistedRun,
 12  } from '@/lib/server/runtime/run-ledger'
 13  import { isInternalHeartbeatRun } from '@/lib/server/runtime/heartbeat-source'
 14  
 15  import { state } from './state'
 16  import type { SessionRunQueueEntry } from './types'
 17  
 18  export function getSessionRunState(sessionId: string): {
 19    runningRunId?: string
 20    queueLength: number
 21  } {
 22    const summary = getSessionExecutionState(sessionId)
 23    return {
 24      runningRunId: summary.runningRunId,
 25      queueLength: summary.queueLength,
 26    }
 27  }
 28  
 29  function visibleQueuedEntriesForSession(sessionId: string): SessionRunQueueEntry[] {
 30    return Array.from(state.queueByExecution.values())
 31      .flatMap((queue) => queue)
 32      .filter((entry) => entry.run.sessionId === sessionId && entry.run.internal !== true)
 33      .sort((left, right) => left.run.queuedAt - right.run.queuedAt)
 34  }
 35  
 36  function toQueuedTurn(entry: SessionRunQueueEntry, index: number): SessionQueuedTurn {
 37    return {
 38      runId: entry.run.id,
 39      sessionId: entry.run.sessionId,
 40      text: entry.message,
 41      queuedAt: entry.run.queuedAt,
 42      position: index + 1,
 43      imagePath: entry.imagePath,
 44      imageUrl: entry.imageUrl,
 45      attachedFiles: entry.attachedFiles,
 46      replyToId: entry.replyToId,
 47      source: entry.run.source,
 48    }
 49  }
 50  
 51  function toActiveTurn(entry: SessionRunQueueEntry): SessionQueuedTurn {
 52    return {
 53      ...toQueuedTurn(entry, 0),
 54      position: 0,
 55    }
 56  }
 57  
 58  function visibleActiveTurnForSession(sessionId: string): SessionQueuedTurn | null {
 59    const running = Array.from(state.runningByExecution.values())
 60      .find((entry) => entry.run.sessionId === sessionId && entry.run.status === 'running')
 61    if (!running || running.run.internal === true) return null
 62    return toActiveTurn(running)
 63  }
 64  
 65  export function getSessionQueueSnapshot(sessionId: string): SessionQueueSnapshot {
 66    const execution = getSessionExecutionState(sessionId)
 67    const visibleQueued = visibleQueuedEntriesForSession(sessionId)
 68    return {
 69      sessionId,
 70      activeRunId: execution.runningRunId || null,
 71      activeTurn: visibleActiveTurnForSession(sessionId),
 72      queueLength: visibleQueued.length,
 73      items: visibleQueued.map((entry, index) => toQueuedTurn(entry, index)),
 74    }
 75  }
 76  
 77  export function getSessionExecutionState(sessionId: string): {
 78    runningRunId?: string
 79    queueLength: number
 80    hasRunning: boolean
 81    hasQueued: boolean
 82    hasRunningHeartbeat: boolean
 83    hasQueuedHeartbeat: boolean
 84    hasRunningNonHeartbeat: boolean
 85    hasQueuedNonHeartbeat: boolean
 86  } {
 87    const running = Array.from(state.runningByExecution.values())
 88      .find((entry) => entry.run.sessionId === sessionId)
 89    const runningMatchesSession = Boolean(running)
 90    const runningHeartbeat = Boolean(
 91      runningMatchesSession
 92      && running
 93      && isInternalHeartbeatRun(running.run.internal, running.run.source),
 94    )
 95    const runningNonHeartbeat = Boolean(runningMatchesSession && !runningHeartbeat)
 96    const queuedEntries = Array.from(state.queueByExecution.values())
 97      .flatMap((queue) => queue)
 98      .filter((entry) => entry.run.sessionId === sessionId)
 99    const queuedHeartbeat = queuedEntries.filter((entry) =>
100      isInternalHeartbeatRun(entry.run.internal, entry.run.source),
101    ).length
102    const queuedNonHeartbeat = queuedEntries.length - queuedHeartbeat
103    return {
104      runningRunId: (runningMatchesSession && running?.run.status === 'running')
105        ? running.run.id
106        : undefined,
107      queueLength: queuedEntries.length,
108      hasRunning: Boolean(runningMatchesSession),
109      hasQueued: queuedEntries.length > 0,
110      hasRunningHeartbeat: runningHeartbeat,
111      hasQueuedHeartbeat: queuedHeartbeat > 0,
112      hasRunningNonHeartbeat: runningNonHeartbeat,
113      hasQueuedNonHeartbeat: queuedNonHeartbeat > 0,
114    }
115  }
116  
117  export function getRunById(runId: string): SessionRunRecord | null {
118    return state.runs.get(runId) || loadPersistedRun(runId)
119  }
120  
121  export function listRuns(params?: {
122    sessionId?: string
123    status?: SessionRunStatus
124    limit?: number
125  }): SessionRunRecord[] {
126    return listPersistedRuns(params)
127  }
128  
129  export function listRunEvents(runId: string, limit?: number): RunEventRecord[] {
130    return listPersistedRunEvents(runId, limit)
131  }