tasks.ts
1 import { randomUUID } from 'node:crypto' 2 import type { 3 TaskEventEntry, 4 TaskSource, 5 TaskStatus, 6 TaskSummary, 7 } from '@/lib/shared/chat' 8 import { getDb } from '@/server/storage/db' 9 10 interface AgentTaskRow { 11 id: string 12 session_id: string 13 run_id: string | null 14 backend_id: string | null 15 title: string 16 kind: string 17 status: TaskStatus 18 progress_percent: number | null 19 progress_label: string | null 20 attached_message_id: string | null 21 source: TaskSource 22 error: string | null 23 started_at: string 24 updated_at: string 25 completed_at: string | null 26 } 27 28 interface AgentTaskEventRow { 29 id: string 30 task_id: string 31 at: string 32 level: TaskEventEntry['level'] 33 event_type: string 34 text: string 35 dedupe_key: string | null 36 payload_json: string | null 37 } 38 39 export interface UpsertAgentTaskInput { 40 id: string 41 sessionId: string 42 runId?: string | null 43 backendId?: string | null 44 title: string 45 kind: string 46 status: TaskStatus 47 progressPercent?: number | null 48 progressLabel?: string | null 49 attachedMessageId?: string | null 50 source: TaskSource 51 error?: string | null 52 startedAt?: string 53 updatedAt?: string 54 completedAt?: string | null 55 metadata?: Record<string, unknown> | null 56 } 57 58 const TASK_EVENT_RETENTION_DAYS = 30 59 const TASK_QUERY_MAX_LIMIT = 200 60 const TASK_EVENT_QUERY_MAX_LIMIT = 500 61 const FINAL_TASK_STATUSES = new Set<TaskStatus>(['succeeded', 'failed', 'cancelled']) 62 63 let lastTaskEventPruneAtMs = 0 64 65 export function upsertAgentTask(input: UpsertAgentTaskInput): TaskSummary { 66 const db = getDb() 67 const now = new Date().toISOString() 68 const startedAt = input.startedAt ?? now 69 const updatedAt = input.updatedAt ?? now 70 const completedAt = 71 input.completedAt ?? 72 (FINAL_TASK_STATUSES.has(input.status) ? now : null) 73 74 db.prepare( 75 `INSERT INTO agent_tasks ( 76 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 77 progress_label, attached_message_id, source, error, started_at, updated_at, 78 completed_at, metadata_json 79 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 80 ON CONFLICT(id) DO UPDATE SET 81 session_id = excluded.session_id, 82 run_id = excluded.run_id, 83 backend_id = excluded.backend_id, 84 title = excluded.title, 85 kind = excluded.kind, 86 status = excluded.status, 87 progress_percent = excluded.progress_percent, 88 progress_label = excluded.progress_label, 89 attached_message_id = excluded.attached_message_id, 90 source = excluded.source, 91 error = excluded.error, 92 started_at = COALESCE(agent_tasks.started_at, excluded.started_at), 93 updated_at = excluded.updated_at, 94 completed_at = excluded.completed_at, 95 metadata_json = excluded.metadata_json`, 96 ).run( 97 input.id, 98 input.sessionId, 99 normalizeNullableString(input.runId), 100 normalizeNullableString(input.backendId), 101 input.title.trim() || '(untitled task)', 102 input.kind.trim() || 'task', 103 input.status, 104 clampPercent(input.progressPercent), 105 normalizeNullableString(input.progressLabel), 106 normalizeNullableString(input.attachedMessageId), 107 input.source, 108 normalizeNullableString(input.error), 109 startedAt, 110 updatedAt, 111 normalizeNullableString(completedAt), 112 input.metadata ? JSON.stringify(input.metadata) : null, 113 ) 114 115 return getAgentTaskOrThrow(input.id) 116 } 117 118 export function getAgentTask(taskId: string): TaskSummary | null { 119 const db = getDb() 120 const row = db 121 .prepare( 122 `SELECT 123 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 124 progress_label, attached_message_id, source, error, started_at, updated_at, completed_at 125 FROM agent_tasks 126 WHERE id = ?`, 127 ) 128 .get(taskId) as AgentTaskRow | undefined 129 130 return row ? rowToTaskSummary(row) : null 131 } 132 133 export function listAgentTasks(input?: { 134 sessionId?: string | null 135 scope?: 'active' | 'all' 136 limit?: number 137 }): TaskSummary[] { 138 const limit = clampTaskLimit(input?.limit, 60) 139 const scope = input?.scope ?? 'active' 140 const sessionId = normalizeNullableString(input?.sessionId) 141 142 const statuses = scope === 'active' ? ['queued', 'running', 'waiting_input'] : null 143 const rows = buildTaskListQuery({ sessionId, statuses, limit }).all() as AgentTaskRow[] 144 return rows.map(rowToTaskSummary) 145 } 146 147 export function cancelAgentTask(taskId: string): TaskSummary | null { 148 const existing = getAgentTask(taskId) 149 if (!existing) return null 150 151 const now = new Date().toISOString() 152 return upsertAgentTask({ 153 id: existing.id, 154 sessionId: existing.sessionId, 155 runId: existing.runId, 156 backendId: existing.backendId, 157 title: existing.title, 158 kind: existing.kind, 159 status: 'cancelled', 160 progressPercent: existing.progressPercent, 161 progressLabel: existing.progressLabel, 162 attachedMessageId: existing.attachedMessageId, 163 source: existing.source, 164 error: existing.error, 165 startedAt: existing.startedAt, 166 updatedAt: now, 167 completedAt: now, 168 }) 169 } 170 171 export function appendAgentTaskEvent(input: { 172 id?: string 173 taskId: string 174 at?: string 175 level?: TaskEventEntry['level'] 176 type: string 177 text: string 178 dedupeKey?: string | null 179 payload?: Record<string, unknown> | null 180 }): TaskEventEntry { 181 maybePruneExpiredTaskEvents() 182 const db = getDb() 183 const at = input.at ?? new Date().toISOString() 184 const id = input.id ?? randomUUID() 185 186 const dedupeKey = normalizeNullableString(input.dedupeKey) 187 if (dedupeKey) { 188 const existing = db 189 .prepare( 190 `SELECT id 191 FROM agent_task_events 192 WHERE task_id = ? 193 AND dedupe_key = ? 194 ORDER BY at DESC 195 LIMIT 1`, 196 ) 197 .get(input.taskId, dedupeKey) as { id?: string } | undefined 198 if (existing?.id) { 199 return { 200 id: existing.id, 201 taskId: input.taskId, 202 at, 203 level: input.level ?? 'info', 204 type: input.type.trim() || 'event', 205 text: input.text.trim() || '(empty event)', 206 payload: input.payload ?? null, 207 } 208 } 209 } 210 211 db.prepare( 212 `INSERT INTO agent_task_events ( 213 id, task_id, at, level, event_type, text, dedupe_key, payload_json 214 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 215 ).run( 216 id, 217 input.taskId, 218 at, 219 input.level ?? 'info', 220 input.type.trim() || 'event', 221 input.text.trim() || '(empty event)', 222 dedupeKey, 223 input.payload ? JSON.stringify(input.payload) : null, 224 ) 225 226 return { 227 id, 228 taskId: input.taskId, 229 at, 230 level: input.level ?? 'info', 231 type: input.type.trim() || 'event', 232 text: input.text.trim() || '(empty event)', 233 payload: input.payload ?? null, 234 } 235 } 236 237 export function listAgentTaskEvents(input: { 238 taskId: string 239 limit?: number 240 }): TaskEventEntry[] { 241 const db = getDb() 242 const limit = clampTaskLimit(input.limit, 120, TASK_EVENT_QUERY_MAX_LIMIT) 243 const rows = db 244 .prepare( 245 `SELECT 246 id, task_id, at, level, event_type, text, dedupe_key, payload_json 247 FROM agent_task_events 248 WHERE task_id = ? 249 ORDER BY at ASC 250 LIMIT ?`, 251 ) 252 .all(input.taskId, limit) as AgentTaskEventRow[] 253 return rows.map((row) => ({ 254 id: row.id, 255 taskId: row.task_id, 256 at: row.at, 257 level: row.level, 258 type: row.event_type, 259 text: row.text, 260 payload: parseRecordJson(row.payload_json), 261 })) 262 } 263 264 function getAgentTaskOrThrow(taskId: string): TaskSummary { 265 const task = getAgentTask(taskId) 266 if (!task) { 267 throw new Error(`Task not found after upsert: ${taskId}`) 268 } 269 return task 270 } 271 272 function rowToTaskSummary(row: AgentTaskRow): TaskSummary { 273 return { 274 id: row.id, 275 sessionId: row.session_id, 276 runId: row.run_id, 277 backendId: row.backend_id, 278 title: row.title, 279 kind: row.kind, 280 status: row.status, 281 progressPercent: row.progress_percent, 282 progressLabel: row.progress_label, 283 attachedMessageId: row.attached_message_id, 284 startedAt: row.started_at, 285 updatedAt: row.updated_at, 286 completedAt: row.completed_at, 287 error: row.error, 288 source: row.source, 289 } 290 } 291 292 function buildTaskListQuery(input: { 293 sessionId: string | null 294 statuses: string[] | null 295 limit: number 296 }) { 297 const db = getDb() 298 if (input.sessionId && input.statuses) { 299 return db.prepare( 300 `SELECT 301 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 302 progress_label, attached_message_id, source, error, started_at, updated_at, completed_at 303 FROM agent_tasks 304 WHERE session_id = ? 305 AND status IN (${input.statuses.map(() => '?').join(', ')}) 306 ORDER BY updated_at DESC 307 LIMIT ${input.limit}`, 308 ).bind(input.sessionId, ...input.statuses) 309 } 310 if (input.sessionId) { 311 return db.prepare( 312 `SELECT 313 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 314 progress_label, attached_message_id, source, error, started_at, updated_at, completed_at 315 FROM agent_tasks 316 WHERE session_id = ? 317 ORDER BY updated_at DESC 318 LIMIT ${input.limit}`, 319 ).bind(input.sessionId) 320 } 321 if (input.statuses) { 322 return db.prepare( 323 `SELECT 324 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 325 progress_label, attached_message_id, source, error, started_at, updated_at, completed_at 326 FROM agent_tasks 327 WHERE status IN (${input.statuses.map(() => '?').join(', ')}) 328 ORDER BY updated_at DESC 329 LIMIT ${input.limit}`, 330 ).bind(...input.statuses) 331 } 332 return db.prepare( 333 `SELECT 334 id, session_id, run_id, backend_id, title, kind, status, progress_percent, 335 progress_label, attached_message_id, source, error, started_at, updated_at, completed_at 336 FROM agent_tasks 337 ORDER BY updated_at DESC 338 LIMIT ${input.limit}`, 339 ) 340 } 341 342 function maybePruneExpiredTaskEvents(): void { 343 const now = Date.now() 344 if (now - lastTaskEventPruneAtMs < 60_000) return 345 lastTaskEventPruneAtMs = now 346 347 const cutoff = new Date(now - TASK_EVENT_RETENTION_DAYS * 24 * 60 * 60 * 1000) 348 const db = getDb() 349 db.prepare('DELETE FROM agent_task_events WHERE at < ?').run(cutoff.toISOString()) 350 } 351 352 function parseRecordJson(value: string | null): Record<string, unknown> | null { 353 if (!value) return null 354 try { 355 const parsed = JSON.parse(value) as unknown 356 if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)) { 357 return parsed as Record<string, unknown> 358 } 359 } catch { 360 // Ignore malformed payloads and keep row readable. 361 } 362 return null 363 } 364 365 function clampTaskLimit( 366 value: number | undefined, 367 fallback: number, 368 max = TASK_QUERY_MAX_LIMIT, 369 ): number { 370 if (value == null || !Number.isFinite(value)) return fallback 371 return Math.min(max, Math.max(1, Math.round(value))) 372 } 373 374 function clampPercent(value: number | null | undefined): number | null { 375 if (value == null || !Number.isFinite(value)) return null 376 return Math.min(100, Math.max(0, value)) 377 } 378 379 function normalizeNullableString( 380 value: string | null | undefined, 381 ): string | null { 382 const trimmed = value?.trim() 383 return trimmed ? trimmed : null 384 }