workflows.ts
1 import { randomUUID } from 'node:crypto' 2 import type { 3 WorkflowInputRequest, 4 WorkflowRespondRequestPayload, 5 WorkflowStateSummary, 6 WorkflowStatus, 7 } from '@/lib/shared/chat' 8 import { getDb } from '@/server/storage/db' 9 10 interface WorkflowRunRow { 11 id: string 12 task_id: string 13 command_id: string 14 status: WorkflowStatus 15 awaiting_input: number 16 resume_token: string | null 17 updated_at: string 18 } 19 20 interface WorkflowInputRow { 21 id: string 22 workflow_id: string 23 resume_token: string 24 prompt: string 25 fields_json: string 26 choices_json: string | null 27 } 28 29 export interface UpsertWorkflowRunInput { 30 id: string 31 taskId: string 32 sessionId: string 33 commandId: string 34 status: WorkflowStatus 35 awaitingInput: boolean 36 resumeToken?: string | null 37 state?: Record<string, unknown> 38 error?: string | null 39 createdAt?: string 40 updatedAt?: string 41 completedAt?: string | null 42 } 43 44 export function upsertWorkflowRun( 45 input: UpsertWorkflowRunInput, 46 ): WorkflowStateSummary { 47 const db = getDb() 48 const now = new Date().toISOString() 49 db.prepare( 50 `INSERT INTO workflow_runs ( 51 id, task_id, session_id, command_id, status, awaiting_input, resume_token, 52 state_json, error, created_at, updated_at, completed_at 53 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 54 ON CONFLICT(id) DO UPDATE SET 55 task_id = excluded.task_id, 56 session_id = excluded.session_id, 57 command_id = excluded.command_id, 58 status = excluded.status, 59 awaiting_input = excluded.awaiting_input, 60 resume_token = excluded.resume_token, 61 state_json = excluded.state_json, 62 error = excluded.error, 63 updated_at = excluded.updated_at, 64 completed_at = excluded.completed_at`, 65 ).run( 66 input.id, 67 input.taskId, 68 input.sessionId, 69 input.commandId, 70 input.status, 71 input.awaitingInput ? 1 : 0, 72 normalizeNullableString(input.resumeToken), 73 JSON.stringify(input.state ?? {}), 74 normalizeNullableString(input.error), 75 input.createdAt ?? now, 76 input.updatedAt ?? now, 77 normalizeNullableString(input.completedAt), 78 ) 79 return getWorkflowRunOrThrow(input.id) 80 } 81 82 export function getWorkflowRun(workflowId: string): WorkflowStateSummary | null { 83 const db = getDb() 84 const row = db 85 .prepare( 86 `SELECT id, task_id, command_id, status, awaiting_input, resume_token, updated_at 87 FROM workflow_runs 88 WHERE id = ?`, 89 ) 90 .get(workflowId) as WorkflowRunRow | undefined 91 return row ? rowToSummary(row) : null 92 } 93 94 export function recordWorkflowInput(input: { 95 workflowId: string 96 taskId: string 97 commandId: string 98 prompt: string 99 fields: WorkflowInputRequest['fields'] 100 choices?: WorkflowInputRequest['choices'] 101 resumeToken?: string 102 }): WorkflowInputRequest { 103 const resumeToken = input.resumeToken?.trim() || randomUUID() 104 const now = new Date().toISOString() 105 const db = getDb() 106 const id = randomUUID() 107 108 db.prepare( 109 `INSERT INTO workflow_inputs ( 110 id, workflow_id, resume_token, prompt, fields_json, choices_json, created_at 111 ) VALUES (?, ?, ?, ?, ?, ?, ?)`, 112 ).run( 113 id, 114 input.workflowId, 115 resumeToken, 116 input.prompt.trim() || 'Input required', 117 JSON.stringify(input.fields), 118 input.choices ? JSON.stringify(input.choices) : null, 119 now, 120 ) 121 122 upsertWorkflowRun({ 123 id: input.workflowId, 124 taskId: input.taskId, 125 sessionId: getSessionIdByTaskId(input.taskId), 126 commandId: input.commandId, 127 status: 'waiting_input', 128 awaitingInput: true, 129 resumeToken, 130 updatedAt: now, 131 }) 132 133 return { 134 workflowId: input.workflowId, 135 taskId: input.taskId, 136 prompt: input.prompt.trim() || 'Input required', 137 fields: input.fields, 138 choices: input.choices, 139 resumeToken, 140 } 141 } 142 143 export function getPendingWorkflowInput(input: { 144 workflowId: string 145 resumeToken: string 146 }): WorkflowInputRequest | null { 147 const db = getDb() 148 const row = db 149 .prepare( 150 `SELECT id, workflow_id, resume_token, prompt, fields_json, choices_json 151 FROM workflow_inputs 152 WHERE workflow_id = ? 153 AND resume_token = ? 154 AND resolved_at IS NULL 155 ORDER BY created_at DESC 156 LIMIT 1`, 157 ) 158 .get(input.workflowId, input.resumeToken) as WorkflowInputRow | undefined 159 if (!row) return null 160 161 const workflow = getWorkflowRun(row.workflow_id) 162 if (!workflow) return null 163 164 return { 165 workflowId: row.workflow_id, 166 taskId: workflow.taskId, 167 prompt: row.prompt, 168 fields: parseWorkflowFields(row.fields_json), 169 choices: parseWorkflowChoices(row.choices_json), 170 resumeToken: row.resume_token, 171 } 172 } 173 174 export function resolveWorkflowInput( 175 input: WorkflowRespondRequestPayload, 176 ): WorkflowStateSummary | null { 177 const db = getDb() 178 const now = new Date().toISOString() 179 const result = db 180 .prepare( 181 `UPDATE workflow_inputs 182 SET resolved_at = ?, resolved_values_json = ? 183 WHERE workflow_id = ? 184 AND resume_token = ? 185 AND resolved_at IS NULL`, 186 ) 187 .run( 188 now, 189 JSON.stringify(input.values), 190 input.workflowId, 191 input.resumeToken, 192 ) 193 194 if (result.changes === 0) return null 195 196 const workflow = getWorkflowRun(input.workflowId) 197 if (!workflow) return null 198 199 return upsertWorkflowRun({ 200 id: input.workflowId, 201 taskId: workflow.taskId, 202 sessionId: getSessionIdByTaskId(workflow.taskId), 203 commandId: workflow.commandId, 204 status: 'running', 205 awaitingInput: false, 206 resumeToken: null, 207 updatedAt: now, 208 }) 209 } 210 211 function getWorkflowRunOrThrow(workflowId: string): WorkflowStateSummary { 212 const row = getWorkflowRun(workflowId) 213 if (!row) throw new Error(`Workflow run not found after upsert: ${workflowId}`) 214 return row 215 } 216 217 function rowToSummary(row: WorkflowRunRow): WorkflowStateSummary { 218 return { 219 workflowId: row.id, 220 taskId: row.task_id, 221 commandId: row.command_id, 222 status: row.status, 223 awaitingInput: row.awaiting_input === 1, 224 resumeToken: row.resume_token, 225 updatedAt: row.updated_at, 226 } 227 } 228 229 function parseWorkflowFields(raw: string): WorkflowInputRequest['fields'] { 230 try { 231 const parsed = JSON.parse(raw) as unknown 232 if (Array.isArray(parsed)) { 233 return parsed as WorkflowInputRequest['fields'] 234 } 235 } catch { 236 // Ignore malformed rows and return an empty schema. 237 } 238 return [] 239 } 240 241 function parseWorkflowChoices( 242 raw: string | null, 243 ): WorkflowInputRequest['choices'] | undefined { 244 if (!raw) return undefined 245 try { 246 const parsed = JSON.parse(raw) as unknown 247 if (Array.isArray(parsed)) { 248 return parsed as WorkflowInputRequest['choices'] 249 } 250 } catch { 251 // Ignore malformed rows and return undefined. 252 } 253 return undefined 254 } 255 256 function getSessionIdByTaskId(taskId: string): string { 257 const db = getDb() 258 const row = db 259 .prepare('SELECT session_id FROM agent_tasks WHERE id = ?') 260 .get(taskId) as { session_id?: string } | undefined 261 const sessionId = row?.session_id?.trim() 262 if (!sessionId) { 263 throw new Error(`Missing session for task ${taskId}`) 264 } 265 return sessionId 266 } 267 268 function normalizeNullableString( 269 value: string | null | undefined, 270 ): string | null { 271 const trimmed = value?.trim() 272 return trimmed ? trimmed : null 273 }