/ src / server / storage / agent-runtime / workflows.ts
workflows.ts
  1  import { randomUUID } from 'node:crypto'
  2  import type {
  3    WorkflowInputRequest,
  4    WorkflowRespondRequestPayload,
  5    WorkflowStateSummary,
  6    WorkflowStatus,
  7  } from '@/lib/shared/chat'
  8  import { getDb } from '@/server/storage/db'
  9  
 10  interface WorkflowRunRow {
 11    id: string
 12    task_id: string
 13    command_id: string
 14    status: WorkflowStatus
 15    awaiting_input: number
 16    resume_token: string | null
 17    updated_at: string
 18  }
 19  
 20  interface WorkflowInputRow {
 21    id: string
 22    workflow_id: string
 23    resume_token: string
 24    prompt: string
 25    fields_json: string
 26    choices_json: string | null
 27  }
 28  
 29  export interface UpsertWorkflowRunInput {
 30    id: string
 31    taskId: string
 32    sessionId: string
 33    commandId: string
 34    status: WorkflowStatus
 35    awaitingInput: boolean
 36    resumeToken?: string | null
 37    state?: Record<string, unknown>
 38    error?: string | null
 39    createdAt?: string
 40    updatedAt?: string
 41    completedAt?: string | null
 42  }
 43  
 44  export function upsertWorkflowRun(
 45    input: UpsertWorkflowRunInput,
 46  ): WorkflowStateSummary {
 47    const db = getDb()
 48    const now = new Date().toISOString()
 49    db.prepare(
 50      `INSERT INTO workflow_runs (
 51        id, task_id, session_id, command_id, status, awaiting_input, resume_token,
 52        state_json, error, created_at, updated_at, completed_at
 53      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 54      ON CONFLICT(id) DO UPDATE SET
 55        task_id = excluded.task_id,
 56        session_id = excluded.session_id,
 57        command_id = excluded.command_id,
 58        status = excluded.status,
 59        awaiting_input = excluded.awaiting_input,
 60        resume_token = excluded.resume_token,
 61        state_json = excluded.state_json,
 62        error = excluded.error,
 63        updated_at = excluded.updated_at,
 64        completed_at = excluded.completed_at`,
 65    ).run(
 66      input.id,
 67      input.taskId,
 68      input.sessionId,
 69      input.commandId,
 70      input.status,
 71      input.awaitingInput ? 1 : 0,
 72      normalizeNullableString(input.resumeToken),
 73      JSON.stringify(input.state ?? {}),
 74      normalizeNullableString(input.error),
 75      input.createdAt ?? now,
 76      input.updatedAt ?? now,
 77      normalizeNullableString(input.completedAt),
 78    )
 79    return getWorkflowRunOrThrow(input.id)
 80  }
 81  
 82  export function getWorkflowRun(workflowId: string): WorkflowStateSummary | null {
 83    const db = getDb()
 84    const row = db
 85      .prepare(
 86        `SELECT id, task_id, command_id, status, awaiting_input, resume_token, updated_at
 87         FROM workflow_runs
 88         WHERE id = ?`,
 89      )
 90      .get(workflowId) as WorkflowRunRow | undefined
 91    return row ? rowToSummary(row) : null
 92  }
 93  
 94  export function recordWorkflowInput(input: {
 95    workflowId: string
 96    taskId: string
 97    commandId: string
 98    prompt: string
 99    fields: WorkflowInputRequest['fields']
100    choices?: WorkflowInputRequest['choices']
101    resumeToken?: string
102  }): WorkflowInputRequest {
103    const resumeToken = input.resumeToken?.trim() || randomUUID()
104    const now = new Date().toISOString()
105    const db = getDb()
106    const id = randomUUID()
107  
108    db.prepare(
109      `INSERT INTO workflow_inputs (
110        id, workflow_id, resume_token, prompt, fields_json, choices_json, created_at
111      ) VALUES (?, ?, ?, ?, ?, ?, ?)`,
112    ).run(
113      id,
114      input.workflowId,
115      resumeToken,
116      input.prompt.trim() || 'Input required',
117      JSON.stringify(input.fields),
118      input.choices ? JSON.stringify(input.choices) : null,
119      now,
120    )
121  
122    upsertWorkflowRun({
123      id: input.workflowId,
124      taskId: input.taskId,
125      sessionId: getSessionIdByTaskId(input.taskId),
126      commandId: input.commandId,
127      status: 'waiting_input',
128      awaitingInput: true,
129      resumeToken,
130      updatedAt: now,
131    })
132  
133    return {
134      workflowId: input.workflowId,
135      taskId: input.taskId,
136      prompt: input.prompt.trim() || 'Input required',
137      fields: input.fields,
138      choices: input.choices,
139      resumeToken,
140    }
141  }
142  
143  export function getPendingWorkflowInput(input: {
144    workflowId: string
145    resumeToken: string
146  }): WorkflowInputRequest | null {
147    const db = getDb()
148    const row = db
149      .prepare(
150        `SELECT id, workflow_id, resume_token, prompt, fields_json, choices_json
151         FROM workflow_inputs
152         WHERE workflow_id = ?
153         AND resume_token = ?
154         AND resolved_at IS NULL
155         ORDER BY created_at DESC
156         LIMIT 1`,
157      )
158      .get(input.workflowId, input.resumeToken) as WorkflowInputRow | undefined
159    if (!row) return null
160  
161    const workflow = getWorkflowRun(row.workflow_id)
162    if (!workflow) return null
163  
164    return {
165      workflowId: row.workflow_id,
166      taskId: workflow.taskId,
167      prompt: row.prompt,
168      fields: parseWorkflowFields(row.fields_json),
169      choices: parseWorkflowChoices(row.choices_json),
170      resumeToken: row.resume_token,
171    }
172  }
173  
174  export function resolveWorkflowInput(
175    input: WorkflowRespondRequestPayload,
176  ): WorkflowStateSummary | null {
177    const db = getDb()
178    const now = new Date().toISOString()
179    const result = db
180      .prepare(
181        `UPDATE workflow_inputs
182         SET resolved_at = ?, resolved_values_json = ?
183         WHERE workflow_id = ?
184         AND resume_token = ?
185         AND resolved_at IS NULL`,
186      )
187      .run(
188        now,
189        JSON.stringify(input.values),
190        input.workflowId,
191        input.resumeToken,
192      )
193  
194    if (result.changes === 0) return null
195  
196    const workflow = getWorkflowRun(input.workflowId)
197    if (!workflow) return null
198  
199    return upsertWorkflowRun({
200      id: input.workflowId,
201      taskId: workflow.taskId,
202      sessionId: getSessionIdByTaskId(workflow.taskId),
203      commandId: workflow.commandId,
204      status: 'running',
205      awaitingInput: false,
206      resumeToken: null,
207      updatedAt: now,
208    })
209  }
210  
211  function getWorkflowRunOrThrow(workflowId: string): WorkflowStateSummary {
212    const row = getWorkflowRun(workflowId)
213    if (!row) throw new Error(`Workflow run not found after upsert: ${workflowId}`)
214    return row
215  }
216  
217  function rowToSummary(row: WorkflowRunRow): WorkflowStateSummary {
218    return {
219      workflowId: row.id,
220      taskId: row.task_id,
221      commandId: row.command_id,
222      status: row.status,
223      awaitingInput: row.awaiting_input === 1,
224      resumeToken: row.resume_token,
225      updatedAt: row.updated_at,
226    }
227  }
228  
229  function parseWorkflowFields(raw: string): WorkflowInputRequest['fields'] {
230    try {
231      const parsed = JSON.parse(raw) as unknown
232      if (Array.isArray(parsed)) {
233        return parsed as WorkflowInputRequest['fields']
234      }
235    } catch {
236      // Ignore malformed rows and return an empty schema.
237    }
238    return []
239  }
240  
241  function parseWorkflowChoices(
242    raw: string | null,
243  ): WorkflowInputRequest['choices'] | undefined {
244    if (!raw) return undefined
245    try {
246      const parsed = JSON.parse(raw) as unknown
247      if (Array.isArray(parsed)) {
248        return parsed as WorkflowInputRequest['choices']
249      }
250    } catch {
251      // Ignore malformed rows and return undefined.
252    }
253    return undefined
254  }
255  
256  function getSessionIdByTaskId(taskId: string): string {
257    const db = getDb()
258    const row = db
259      .prepare('SELECT session_id FROM agent_tasks WHERE id = ?')
260      .get(taskId) as { session_id?: string } | undefined
261    const sessionId = row?.session_id?.trim()
262    if (!sessionId) {
263      throw new Error(`Missing session for task ${taskId}`)
264    }
265    return sessionId
266  }
267  
268  function normalizeNullableString(
269    value: string | null | undefined,
270  ): string | null {
271    const trimmed = value?.trim()
272    return trimmed ? trimmed : null
273  }