/ src / lib / server / runtime / heartbeat-wake.ts
heartbeat-wake.ts
  1  /**
  2   * On-demand heartbeat wake — triggers an immediate heartbeat for an agent/session.
  3   * Requests are debounced with a short coalesce window, retain distinct trigger
  4   * events per target, and retry when the session lane is already busy.
  5   */
  6  
  7  import { ensureAgentThreadSession } from '@/lib/server/agents/agent-thread-session'
  8  import {
  9    buildAgentHeartbeatPrompt,
 10    heartbeatConfigForSession,
 11    isHeartbeatContentEffectivelyEmpty,
 12    readHeartbeatFile,
 13  } from '@/lib/server/runtime/heartbeat-service'
 14  import { buildMainLoopHeartbeatPrompt, isMainSession } from '@/lib/server/agents/main-agent-loop'
 15  import { listAgents } from '@/lib/server/agents/agent-repository'
 16  import { getSession, listSessions } from '@/lib/server/sessions/session-repository'
 17  import { loadSettings } from '@/lib/server/settings/settings-repository'
 18  import {
 19    enqueueSessionRun,
 20    getSessionExecutionState,
 21    hasActiveNonHeartbeatSessionLease,
 22    repairSessionRunQueue,
 23  } from '@/lib/server/runtime/session-run-manager'
 24  import { log } from '@/lib/server/logger'
 25  import { isAgentDisabled } from '@/lib/server/agents/agent-availability'
 26  import { errorMessage, hmrSingleton } from '@/lib/shared-utils'
 27  
 28  export interface WakeRequestInput {
 29    eventId?: string
 30    agentId?: string
 31    sessionId?: string
 32    reason?: string
 33    source?: string
 34    resumeMessage?: string
 35    detail?: string
 36    requestedAt?: number
 37    occurredAt?: number
 38    priority?: number
 39    retryCount?: number
 40  }
 41  
 42  export interface WakeEvent {
 43    eventId?: string
 44    reason: string
 45    source?: string
 46    resumeMessage?: string
 47    detail?: string
 48    occurredAt: number
 49    priority: number
 50  }
 51  
 52  export interface WakeRequest {
 53    agentId?: string
 54    sessionId?: string
 55    requestedAt: number
 56    retryCount: number
 57    events: WakeEvent[]
 58  }
 59  
 60  const COALESCE_MS = 250
 61  const RETRY_MS = 1_000
 62  const MAX_WAKE_EVENTS = 6
 63  const MAX_RESUME_CHARS = 280
 64  const MAX_DETAIL_CHARS = 800
 65  type WakeTimerKind = 'normal' | 'retry'
 66  
 67  const state = hmrSingleton('__swarmclaw_heartbeat_wake__', () => ({
 68    pending: new Map<string, WakeRequest>(),
 69    timer: null as ReturnType<typeof setTimeout> | null,
 70    timerDueAt: null as number | null,
 71    timerKind: null as WakeTimerKind | null,
 72  }))
 73  
 74  function trimText(value: unknown, maxChars: number): string | undefined {
 75    if (typeof value !== 'string') return undefined
 76    const normalized = value.replace(/\s+/g, ' ').trim()
 77    return normalized ? normalized.slice(0, maxChars) : undefined
 78  }
 79  
 80  function normalizeWakeReason(reason?: string): string {
 81    return trimText(reason, 80) || 'on-demand'
 82  }
 83  
 84  function normalizeWakeTarget(value?: string): string | undefined {
 85    return trimText(value, 160)
 86  }
 87  
 88  function normalizeOccurredAt(value?: number): number {
 89    return typeof value === 'number' && Number.isFinite(value) ? Math.trunc(value) : Date.now()
 90  }
 91  
 92  function reasonPriority(reason: string): number {
 93    const normalized = reason.toLowerCase()
 94    if (/(approval|connector-message|webhook|watch_job|scheduled_wake|task-completed)/.test(normalized)) return 90
 95    if (/(schedule)/.test(normalized)) return 70
 96    if (/(comparison|manual|on-demand)/.test(normalized)) return 50
 97    return 40
 98  }
 99  
100  function normalizeWakeEvent(input: WakeRequestInput): WakeEvent {
101    const reason = normalizeWakeReason(input.reason)
102    const explicitPriority = typeof input.priority === 'number' && Number.isFinite(input.priority)
103      ? Math.trunc(input.priority)
104      : reasonPriority(reason)
105    return {
106      ...(trimText(input.eventId, 160) ? { eventId: trimText(input.eventId, 160) } : {}),
107      reason,
108      ...(trimText(input.source, 120) ? { source: trimText(input.source, 120) } : {}),
109      ...(trimText(input.resumeMessage, MAX_RESUME_CHARS) ? { resumeMessage: trimText(input.resumeMessage, MAX_RESUME_CHARS) } : {}),
110      ...(trimText(input.detail, MAX_DETAIL_CHARS) ? { detail: trimText(input.detail, MAX_DETAIL_CHARS) } : {}),
111      occurredAt: normalizeOccurredAt(input.occurredAt ?? input.requestedAt),
112      priority: Math.max(0, Math.min(100, explicitPriority)),
113    }
114  }
115  
116  function uniqueWakeEvents(existing: WakeEvent[], incoming: WakeEvent): WakeEvent[] {
117    const merged = [...existing]
118    const matchIndex = merged.findIndex((candidate) => {
119      if (candidate.eventId && incoming.eventId) return candidate.eventId === incoming.eventId
120      return candidate.reason === incoming.reason
121        && candidate.source === incoming.source
122        && candidate.resumeMessage === incoming.resumeMessage
123        && candidate.detail === incoming.detail
124    })
125  
126    if (matchIndex >= 0) {
127      const previous = merged[matchIndex]
128      merged[matchIndex] = {
129        ...previous,
130        ...incoming,
131        priority: Math.max(previous.priority, incoming.priority),
132        occurredAt: Math.max(previous.occurredAt, incoming.occurredAt),
133        resumeMessage: incoming.resumeMessage || previous.resumeMessage,
134        detail: incoming.detail || previous.detail,
135      }
136    } else {
137      merged.push(incoming)
138    }
139  
140    merged.sort((left, right) => {
141      if (right.priority !== left.priority) return right.priority - left.priority
142      return right.occurredAt - left.occurredAt
143    })
144    return merged.slice(0, MAX_WAKE_EVENTS)
145  }
146  
147  function wakeTargetKey(input: { agentId?: string; sessionId?: string }): string {
148    return `${normalizeWakeTarget(input.agentId) || ''}::${normalizeWakeTarget(input.sessionId) || ''}`
149  }
150  
151  export function mergeHeartbeatWakeRequest(
152    existing: WakeRequest | undefined,
153    next: WakeRequestInput,
154  ): WakeRequest {
155    const agentId = normalizeWakeTarget(next.agentId) || existing?.agentId
156    const sessionId = normalizeWakeTarget(next.sessionId) || existing?.sessionId
157    const requestedAt = Math.max(existing?.requestedAt || 0, normalizeOccurredAt(next.requestedAt))
158    const retryCount = Math.max(existing?.retryCount || 0, typeof next.retryCount === 'number' ? Math.trunc(next.retryCount) : 0)
159    const events = uniqueWakeEvents(existing?.events || [], normalizeWakeEvent(next))
160    return {
161      ...(agentId ? { agentId } : {}),
162      ...(sessionId ? { sessionId } : {}),
163      requestedAt,
164      retryCount,
165      events,
166    }
167  }
168  
169  function queuePendingWake(next: WakeRequestInput): void {
170    const key = wakeTargetKey(next)
171    const existing = state.pending.get(key)
172    state.pending.set(key, mergeHeartbeatWakeRequest(existing, next))
173  }
174  
175  function queuePendingWakeRequest(next: WakeRequest): void {
176    const key = wakeTargetKey(next)
177    let merged = state.pending.get(key)
178    for (const event of next.events) {
179      merged = mergeHeartbeatWakeRequest(merged, {
180        agentId: next.agentId,
181        sessionId: next.sessionId,
182        requestedAt: next.requestedAt,
183        retryCount: next.retryCount,
184        eventId: event.eventId,
185        reason: event.reason,
186        source: event.source,
187        resumeMessage: event.resumeMessage,
188        detail: event.detail,
189        occurredAt: event.occurredAt,
190        priority: event.priority,
191      })
192    }
193    if (merged) state.pending.set(key, merged)
194  }
195  
196  function scheduleFlush(delayMs: number, kind: WakeTimerKind = 'normal'): void {
197    const delay = Math.max(0, Number.isFinite(delayMs) ? Math.trunc(delayMs) : COALESCE_MS)
198    const dueAt = Date.now() + delay
199    if (state.timer) {
200      if (state.timerKind === 'retry' && kind !== 'normal') return
201      if (typeof state.timerDueAt === 'number' && state.timerDueAt <= dueAt) return
202      clearTimeout(state.timer)
203      state.timer = null
204      state.timerDueAt = null
205      state.timerKind = null
206    }
207  
208    state.timerDueAt = dueAt
209    state.timerKind = kind
210    state.timer = setTimeout(() => {
211      flushWakes()
212    }, delay)
213  }
214  
215  function isScheduleLikeWake(wake: WakeRequest): boolean {
216    return wake.events.some((event) => {
217      const reason = event.reason.toLowerCase()
218      return reason.includes('schedule')
219    })
220  }
221  
222  export function buildWakeTriggerContext(events: WakeEvent[], nowIso?: string): string {
223    const lines = [
224      '## Wake Trigger Context',
225      `Triggered at: ${nowIso || new Date().toISOString()}`,
226      'These new events caused this immediate wake. Prioritize them over generic background polling and avoid repeating already-completed work.',
227      'If the base heartbeat instructions require an exact file change or exact acknowledgment phrase, follow that exactly and do not add extra commentary.',
228    ]
229    for (const event of events.slice(0, MAX_WAKE_EVENTS)) {
230      const tags = [
231        `reason=${event.reason}`,
232        event.source ? `source=${event.source}` : '',
233        `priority=${event.priority}`,
234        `at=${new Date(event.occurredAt).toISOString()}`,
235      ].filter(Boolean).join(' | ')
236      lines.push(`- ${tags}`)
237      if (event.resumeMessage) lines.push(`  Resume: ${event.resumeMessage}`)
238      if (event.detail) lines.push(`  Detail: ${event.detail}`)
239    }
240    lines.push('Reply HEARTBEAT_OK only if every trigger above is already handled or truly needs no action.')
241    return lines.join('\n')
242  }
243  
244  export function deriveHeartbeatWakeDeliveryMode(
245    events: WakeEvent[],
246  ): 'default' | 'tool_only' | 'silent' {
247    if (events.some((event) => event.reason.toLowerCase() === 'connector-message')) {
248      return 'tool_only'
249    }
250    if (events.length > 0 && events.every((event) => event.reason.toLowerCase().includes('schedule'))) {
251      return 'silent'
252    }
253    return 'default'
254  }
255  
256  export function buildHeartbeatWakePrompt(input: {
257    wake: WakeRequest
258    basePrompt?: string
259    nowIso?: string
260  }): string {
261    const triggerContext = buildWakeTriggerContext(input.wake.events, input.nowIso)
262    if (input.basePrompt?.trim()) {
263      return [
264        input.basePrompt.trim(),
265        '',
266        triggerContext,
267      ].join('\n')
268    }
269    return [
270      'AGENT_HEARTBEAT_WAKE',
271      `Time: ${input.nowIso || new Date().toISOString()}`,
272      triggerContext,
273      'Take the highest-value next step now, or reply HEARTBEAT_OK if nothing needs attention.',
274    ].join('\n')
275  }
276  
277  function resolveWakeSessionId(
278    wake: WakeRequest,
279    sessions: Record<string, Record<string, unknown>>,
280  ): string | undefined {
281    if (wake.sessionId) return wake.sessionId
282    if (!wake.agentId) return undefined
283    if (isScheduleLikeWake(wake)) {
284      for (const session of Object.values(sessions)) {
285        if (session.agentId !== wake.agentId) continue
286        if (isMainSession(session)) return String(session.id)
287      }
288      return ensureAgentThreadSession(wake.agentId)?.id
289    }
290  
291    let bestSession: { id: string; lastActiveAt: number } | null = null
292    for (const session of Object.values(sessions)) {
293      if (session.agentId !== wake.agentId) continue
294      const lastActive = typeof session.lastActiveAt === 'number' ? session.lastActiveAt : 0
295      if (!bestSession || lastActive > bestSession.lastActiveAt) {
296        bestSession = { id: String(session.id), lastActiveAt: lastActive }
297      }
298    }
299    if (bestSession?.id) return bestSession.id
300    return ensureAgentThreadSession(wake.agentId)?.id
301  }
302  
303  export function resolveWakeSessionIdForTests(
304    wake: WakeRequest,
305    sessions: Record<string, Record<string, unknown>>,
306  ): string | undefined {
307    return resolveWakeSessionId(wake, sessions)
308  }
309  
310  function flushWakes(): void {
311    state.timer = null
312    state.timerDueAt = null
313    state.timerKind = null
314    const wakes = [...state.pending.values()]
315    state.pending.clear()
316  
317    if (!wakes.length) return
318  
319    const agents = listAgents()
320    const settings = loadSettings()
321    const sessions = listSessions() as unknown as Record<string, Record<string, unknown>>
322    let delayedForRetry = false
323  
324    for (const wake of wakes) {
325      try {
326        const sessionId = resolveWakeSessionId(wake, sessions)
327        if (!sessionId) continue
328  
329        const session = (sessions[sessionId] || getSession(sessionId)) as unknown as Record<string, unknown> | undefined
330        if (!session) continue
331  
332        let execution = getSessionExecutionState(sessionId)
333        const sharedNonHeartbeatBusy = hasActiveNonHeartbeatSessionLease(sessionId)
334        if (execution.hasQueued && !execution.hasRunning && !sharedNonHeartbeatBusy) {
335          const repair = repairSessionRunQueue(sessionId, {
336            reason: 'Recovered stale queued run before heartbeat wake',
337          })
338          if (repair.recoveredQueuedRuns > 0 || repair.kickedExecutionKeys > 0) {
339            execution = getSessionExecutionState(sessionId)
340          }
341        }
342        if (execution.hasRunning || execution.hasQueued || sharedNonHeartbeatBusy) {
343          queuePendingWakeRequest({
344            ...wake,
345            sessionId,
346            retryCount: wake.retryCount + 1,
347          })
348          delayedForRetry = true
349          log.info('heartbeat-wake', `Wake delayed for busy session ${sessionId}`, {
350            running: execution.hasRunning,
351            queued: execution.queueLength,
352            sharedNonHeartbeatBusy,
353          })
354          continue
355        }
356  
357        const agentId = (session.agentId || wake.agentId) as string | undefined
358        const agent = agentId ? agents[agentId] as unknown as Record<string, unknown> | null : null
359        // Skip sessions whose agent was deleted or trashed (not in loadAgents())
360        if (agentId && !agent) continue
361        if (isAgentDisabled(agent)) continue
362  
363        const cfg = heartbeatConfigForSession(session, settings, agents)
364        if (!cfg.enabled) {
365          log.info('heartbeat-wake', `Wake skipped for session ${sessionId}: heartbeat disabled`, {
366            agentId,
367          })
368          continue
369        }
370        const rawHeartbeatFileContent = readHeartbeatFile(session)
371        const heartbeatFileContent = isHeartbeatContentEffectivelyEmpty(rawHeartbeatFileContent) ? '' : rawHeartbeatFileContent
372        const baseHeartbeatPrompt = buildAgentHeartbeatPrompt(session, agent, cfg.prompt, heartbeatFileContent)
373        const promptCore = isMainSession(session)
374          ? buildMainLoopHeartbeatPrompt(session, baseHeartbeatPrompt)
375          : baseHeartbeatPrompt
376        const prompt = buildHeartbeatWakePrompt({
377          wake,
378          basePrompt: promptCore,
379        })
380  
381        enqueueSessionRun({
382          sessionId,
383          message: prompt,
384          internal: true,
385          source: 'heartbeat-wake',
386          mode: 'collect',
387          dedupeKey: `heartbeat-wake:${sessionId}`,
388          modelOverride: cfg.model || undefined,
389          heartbeatConfig: {
390            ackMaxChars: cfg.ackMaxChars,
391            showOk: cfg.showOk,
392            showAlerts: cfg.showAlerts,
393            target: cfg.target,
394            deliveryMode: deriveHeartbeatWakeDeliveryMode(wake.events),
395          },
396        })
397  
398        log.info('heartbeat-wake', `Wake fired for session ${sessionId}`, {
399          reasons: wake.events.map((event: WakeEvent) => event.reason),
400          retryCount: wake.retryCount,
401        })
402      } catch (err: unknown) {
403        queuePendingWakeRequest({
404          ...wake,
405          retryCount: wake.retryCount + 1,
406        })
407        delayedForRetry = true
408        log.warn('heartbeat-wake', `Wake failed: ${errorMessage(err)}`)
409      }
410    }
411  
412    if (delayedForRetry && state.pending.size > 0) {
413      scheduleFlush(RETRY_MS, 'retry')
414    }
415  }
416  
417  /** Queue a heartbeat wake. Multiple rapid calls are coalesced into a single flush. */
418  export function requestHeartbeatNow(opts: WakeRequestInput): void {
419    queuePendingWake(opts)
420    scheduleFlush(COALESCE_MS, 'normal')
421  }
422  
423  export function resetHeartbeatWakeStateForTests(): void {
424    if (state.timer) clearTimeout(state.timer)
425    state.timer = null
426    state.timerDueAt = null
427    state.timerKind = null
428    state.pending.clear()
429  }
430  
431  export function hasPendingHeartbeatWake(): boolean {
432    return state.pending.size > 0 || Boolean(state.timer)
433  }
434  
435  export function snapshotPendingHeartbeatWakesForTests(): WakeRequest[] {
436    return [...state.pending.values()].map((wake) => ({
437      ...wake,
438      events: wake.events.map((event: WakeEvent) => ({ ...event })),
439    }))
440  }