/ src / server / storage / agent-runtime / tasks.ts
tasks.ts
  1  import { randomUUID } from 'node:crypto'
  2  import type {
  3    TaskEventEntry,
  4    TaskSource,
  5    TaskStatus,
  6    TaskSummary,
  7  } from '@/lib/shared/chat'
  8  import { getDb } from '@/server/storage/db'
  9  
 10  interface AgentTaskRow {
 11    id: string
 12    session_id: string
 13    run_id: string | null
 14    backend_id: string | null
 15    title: string
 16    kind: string
 17    status: TaskStatus
 18    progress_percent: number | null
 19    progress_label: string | null
 20    attached_message_id: string | null
 21    source: TaskSource
 22    error: string | null
 23    started_at: string
 24    updated_at: string
 25    completed_at: string | null
 26  }
 27  
 28  interface AgentTaskEventRow {
 29    id: string
 30    task_id: string
 31    at: string
 32    level: TaskEventEntry['level']
 33    event_type: string
 34    text: string
 35    dedupe_key: string | null
 36    payload_json: string | null
 37  }
 38  
 39  export interface UpsertAgentTaskInput {
 40    id: string
 41    sessionId: string
 42    runId?: string | null
 43    backendId?: string | null
 44    title: string
 45    kind: string
 46    status: TaskStatus
 47    progressPercent?: number | null
 48    progressLabel?: string | null
 49    attachedMessageId?: string | null
 50    source: TaskSource
 51    error?: string | null
 52    startedAt?: string
 53    updatedAt?: string
 54    completedAt?: string | null
 55    metadata?: Record<string, unknown> | null
 56  }
 57  
 58  const TASK_EVENT_RETENTION_DAYS = 30
 59  const TASK_QUERY_MAX_LIMIT = 200
 60  const TASK_EVENT_QUERY_MAX_LIMIT = 500
 61  const FINAL_TASK_STATUSES = new Set<TaskStatus>(['succeeded', 'failed', 'cancelled'])
 62  
 63  let lastTaskEventPruneAtMs = 0
 64  
 65  export function upsertAgentTask(input: UpsertAgentTaskInput): TaskSummary {
 66    const db = getDb()
 67    const now = new Date().toISOString()
 68    const startedAt = input.startedAt ?? now
 69    const updatedAt = input.updatedAt ?? now
 70    const completedAt =
 71      input.completedAt ??
 72      (FINAL_TASK_STATUSES.has(input.status) ? now : null)
 73  
 74    db.prepare(
 75      `INSERT INTO agent_tasks (
 76        id, session_id, run_id, backend_id, title, kind, status, progress_percent,
 77        progress_label, attached_message_id, source, error, started_at, updated_at,
 78        completed_at, metadata_json
 79      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 80      ON CONFLICT(id) DO UPDATE SET
 81        session_id = excluded.session_id,
 82        run_id = excluded.run_id,
 83        backend_id = excluded.backend_id,
 84        title = excluded.title,
 85        kind = excluded.kind,
 86        status = excluded.status,
 87        progress_percent = excluded.progress_percent,
 88        progress_label = excluded.progress_label,
 89        attached_message_id = excluded.attached_message_id,
 90        source = excluded.source,
 91        error = excluded.error,
 92        started_at = COALESCE(agent_tasks.started_at, excluded.started_at),
 93        updated_at = excluded.updated_at,
 94        completed_at = excluded.completed_at,
 95        metadata_json = excluded.metadata_json`,
 96    ).run(
 97      input.id,
 98      input.sessionId,
 99      normalizeNullableString(input.runId),
100      normalizeNullableString(input.backendId),
101      input.title.trim() || '(untitled task)',
102      input.kind.trim() || 'task',
103      input.status,
104      clampPercent(input.progressPercent),
105      normalizeNullableString(input.progressLabel),
106      normalizeNullableString(input.attachedMessageId),
107      input.source,
108      normalizeNullableString(input.error),
109      startedAt,
110      updatedAt,
111      normalizeNullableString(completedAt),
112      input.metadata ? JSON.stringify(input.metadata) : null,
113    )
114  
115    return getAgentTaskOrThrow(input.id)
116  }
117  
118  export function getAgentTask(taskId: string): TaskSummary | null {
119    const db = getDb()
120    const row = db
121      .prepare(
122        `SELECT
123          id, session_id, run_id, backend_id, title, kind, status, progress_percent,
124          progress_label, attached_message_id, source, error, started_at, updated_at, completed_at
125         FROM agent_tasks
126         WHERE id = ?`,
127      )
128      .get(taskId) as AgentTaskRow | undefined
129  
130    return row ? rowToTaskSummary(row) : null
131  }
132  
133  export function listAgentTasks(input?: {
134    sessionId?: string | null
135    scope?: 'active' | 'all'
136    limit?: number
137  }): TaskSummary[] {
138    const limit = clampTaskLimit(input?.limit, 60)
139    const scope = input?.scope ?? 'active'
140    const sessionId = normalizeNullableString(input?.sessionId)
141  
142    const statuses = scope === 'active' ? ['queued', 'running', 'waiting_input'] : null
143    const rows = buildTaskListQuery({ sessionId, statuses, limit }).all() as AgentTaskRow[]
144    return rows.map(rowToTaskSummary)
145  }
146  
147  export function cancelAgentTask(taskId: string): TaskSummary | null {
148    const existing = getAgentTask(taskId)
149    if (!existing) return null
150  
151    const now = new Date().toISOString()
152    return upsertAgentTask({
153      id: existing.id,
154      sessionId: existing.sessionId,
155      runId: existing.runId,
156      backendId: existing.backendId,
157      title: existing.title,
158      kind: existing.kind,
159      status: 'cancelled',
160      progressPercent: existing.progressPercent,
161      progressLabel: existing.progressLabel,
162      attachedMessageId: existing.attachedMessageId,
163      source: existing.source,
164      error: existing.error,
165      startedAt: existing.startedAt,
166      updatedAt: now,
167      completedAt: now,
168    })
169  }
170  
171  export function appendAgentTaskEvent(input: {
172    id?: string
173    taskId: string
174    at?: string
175    level?: TaskEventEntry['level']
176    type: string
177    text: string
178    dedupeKey?: string | null
179    payload?: Record<string, unknown> | null
180  }): TaskEventEntry {
181    maybePruneExpiredTaskEvents()
182    const db = getDb()
183    const at = input.at ?? new Date().toISOString()
184    const id = input.id ?? randomUUID()
185  
186    const dedupeKey = normalizeNullableString(input.dedupeKey)
187    if (dedupeKey) {
188      const existing = db
189        .prepare(
190          `SELECT id
191           FROM agent_task_events
192           WHERE task_id = ?
193           AND dedupe_key = ?
194           ORDER BY at DESC
195           LIMIT 1`,
196        )
197        .get(input.taskId, dedupeKey) as { id?: string } | undefined
198      if (existing?.id) {
199        return {
200          id: existing.id,
201          taskId: input.taskId,
202          at,
203          level: input.level ?? 'info',
204          type: input.type.trim() || 'event',
205          text: input.text.trim() || '(empty event)',
206          payload: input.payload ?? null,
207        }
208      }
209    }
210  
211    db.prepare(
212      `INSERT INTO agent_task_events (
213        id, task_id, at, level, event_type, text, dedupe_key, payload_json
214      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
215    ).run(
216      id,
217      input.taskId,
218      at,
219      input.level ?? 'info',
220      input.type.trim() || 'event',
221      input.text.trim() || '(empty event)',
222      dedupeKey,
223      input.payload ? JSON.stringify(input.payload) : null,
224    )
225  
226    return {
227      id,
228      taskId: input.taskId,
229      at,
230      level: input.level ?? 'info',
231      type: input.type.trim() || 'event',
232      text: input.text.trim() || '(empty event)',
233      payload: input.payload ?? null,
234    }
235  }
236  
237  export function listAgentTaskEvents(input: {
238    taskId: string
239    limit?: number
240  }): TaskEventEntry[] {
241    const db = getDb()
242    const limit = clampTaskLimit(input.limit, 120, TASK_EVENT_QUERY_MAX_LIMIT)
243    const rows = db
244      .prepare(
245        `SELECT
246          id, task_id, at, level, event_type, text, dedupe_key, payload_json
247         FROM agent_task_events
248         WHERE task_id = ?
249         ORDER BY at ASC
250         LIMIT ?`,
251      )
252      .all(input.taskId, limit) as AgentTaskEventRow[]
253    return rows.map((row) => ({
254      id: row.id,
255      taskId: row.task_id,
256      at: row.at,
257      level: row.level,
258      type: row.event_type,
259      text: row.text,
260      payload: parseRecordJson(row.payload_json),
261    }))
262  }
263  
264  function getAgentTaskOrThrow(taskId: string): TaskSummary {
265    const task = getAgentTask(taskId)
266    if (!task) {
267      throw new Error(`Task not found after upsert: ${taskId}`)
268    }
269    return task
270  }
271  
272  function rowToTaskSummary(row: AgentTaskRow): TaskSummary {
273    return {
274      id: row.id,
275      sessionId: row.session_id,
276      runId: row.run_id,
277      backendId: row.backend_id,
278      title: row.title,
279      kind: row.kind,
280      status: row.status,
281      progressPercent: row.progress_percent,
282      progressLabel: row.progress_label,
283      attachedMessageId: row.attached_message_id,
284      startedAt: row.started_at,
285      updatedAt: row.updated_at,
286      completedAt: row.completed_at,
287      error: row.error,
288      source: row.source,
289    }
290  }
291  
292  function buildTaskListQuery(input: {
293    sessionId: string | null
294    statuses: string[] | null
295    limit: number
296  }) {
297    const db = getDb()
298    if (input.sessionId && input.statuses) {
299      return db.prepare(
300        `SELECT
301          id, session_id, run_id, backend_id, title, kind, status, progress_percent,
302          progress_label, attached_message_id, source, error, started_at, updated_at, completed_at
303         FROM agent_tasks
304         WHERE session_id = ?
305         AND status IN (${input.statuses.map(() => '?').join(', ')})
306         ORDER BY updated_at DESC
307         LIMIT ${input.limit}`,
308      ).bind(input.sessionId, ...input.statuses)
309    }
310    if (input.sessionId) {
311      return db.prepare(
312        `SELECT
313          id, session_id, run_id, backend_id, title, kind, status, progress_percent,
314          progress_label, attached_message_id, source, error, started_at, updated_at, completed_at
315         FROM agent_tasks
316         WHERE session_id = ?
317         ORDER BY updated_at DESC
318         LIMIT ${input.limit}`,
319      ).bind(input.sessionId)
320    }
321    if (input.statuses) {
322      return db.prepare(
323        `SELECT
324          id, session_id, run_id, backend_id, title, kind, status, progress_percent,
325          progress_label, attached_message_id, source, error, started_at, updated_at, completed_at
326         FROM agent_tasks
327         WHERE status IN (${input.statuses.map(() => '?').join(', ')})
328         ORDER BY updated_at DESC
329         LIMIT ${input.limit}`,
330      ).bind(...input.statuses)
331    }
332    return db.prepare(
333      `SELECT
334        id, session_id, run_id, backend_id, title, kind, status, progress_percent,
335        progress_label, attached_message_id, source, error, started_at, updated_at, completed_at
336       FROM agent_tasks
337       ORDER BY updated_at DESC
338       LIMIT ${input.limit}`,
339    )
340  }
341  
342  function maybePruneExpiredTaskEvents(): void {
343    const now = Date.now()
344    if (now - lastTaskEventPruneAtMs < 60_000) return
345    lastTaskEventPruneAtMs = now
346  
347    const cutoff = new Date(now - TASK_EVENT_RETENTION_DAYS * 24 * 60 * 60 * 1000)
348    const db = getDb()
349    db.prepare('DELETE FROM agent_task_events WHERE at < ?').run(cutoff.toISOString())
350  }
351  
352  function parseRecordJson(value: string | null): Record<string, unknown> | null {
353    if (!value) return null
354    try {
355      const parsed = JSON.parse(value) as unknown
356      if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)) {
357        return parsed as Record<string, unknown>
358      }
359    } catch {
360      // Ignore malformed payloads and keep row readable.
361    }
362    return null
363  }
364  
365  function clampTaskLimit(
366    value: number | undefined,
367    fallback: number,
368    max = TASK_QUERY_MAX_LIMIT,
369  ): number {
370    if (value == null || !Number.isFinite(value)) return fallback
371    return Math.min(max, Math.max(1, Math.round(value)))
372  }
373  
374  function clampPercent(value: number | null | undefined): number | null {
375    if (value == null || !Number.isFinite(value)) return null
376    return Math.min(100, Math.max(0, value))
377  }
378  
379  function normalizeNullableString(
380    value: string | null | undefined,
381  ): string | null {
382    const trimmed = value?.trim()
383    return trimmed ? trimmed : null
384  }