heartbeat-wake.ts
1 /** 2 * On-demand heartbeat wake — triggers an immediate heartbeat for an agent/session. 3 * Requests are debounced with a short coalesce window, retain distinct trigger 4 * events per target, and retry when the session lane is already busy. 5 */ 6 7 import { ensureAgentThreadSession } from '@/lib/server/agents/agent-thread-session' 8 import { 9 buildAgentHeartbeatPrompt, 10 heartbeatConfigForSession, 11 isHeartbeatContentEffectivelyEmpty, 12 readHeartbeatFile, 13 } from '@/lib/server/runtime/heartbeat-service' 14 import { buildMainLoopHeartbeatPrompt, isMainSession } from '@/lib/server/agents/main-agent-loop' 15 import { listAgents } from '@/lib/server/agents/agent-repository' 16 import { getSession, listSessions } from '@/lib/server/sessions/session-repository' 17 import { loadSettings } from '@/lib/server/settings/settings-repository' 18 import { 19 enqueueSessionRun, 20 getSessionExecutionState, 21 hasActiveNonHeartbeatSessionLease, 22 repairSessionRunQueue, 23 } from '@/lib/server/runtime/session-run-manager' 24 import { log } from '@/lib/server/logger' 25 import { isAgentDisabled } from '@/lib/server/agents/agent-availability' 26 import { errorMessage, hmrSingleton } from '@/lib/shared-utils' 27 28 export interface WakeRequestInput { 29 eventId?: string 30 agentId?: string 31 sessionId?: string 32 reason?: string 33 source?: string 34 resumeMessage?: string 35 detail?: string 36 requestedAt?: number 37 occurredAt?: number 38 priority?: number 39 retryCount?: number 40 } 41 42 export interface WakeEvent { 43 eventId?: string 44 reason: string 45 source?: string 46 resumeMessage?: string 47 detail?: string 48 occurredAt: number 49 priority: number 50 } 51 52 export interface WakeRequest { 53 agentId?: string 54 sessionId?: string 55 requestedAt: number 56 retryCount: number 57 events: WakeEvent[] 58 } 59 60 const COALESCE_MS = 250 61 const RETRY_MS = 1_000 62 const MAX_WAKE_EVENTS = 6 63 const MAX_RESUME_CHARS = 280 64 const MAX_DETAIL_CHARS = 800 65 type WakeTimerKind = 'normal' | 'retry' 66 67 const state = hmrSingleton('__swarmclaw_heartbeat_wake__', () => ({ 68 pending: new Map<string, WakeRequest>(), 69 timer: null as ReturnType<typeof setTimeout> | null, 70 timerDueAt: null as number | null, 71 timerKind: null as WakeTimerKind | null, 72 })) 73 74 function trimText(value: unknown, maxChars: number): string | undefined { 75 if (typeof value !== 'string') return undefined 76 const normalized = value.replace(/\s+/g, ' ').trim() 77 return normalized ? normalized.slice(0, maxChars) : undefined 78 } 79 80 function normalizeWakeReason(reason?: string): string { 81 return trimText(reason, 80) || 'on-demand' 82 } 83 84 function normalizeWakeTarget(value?: string): string | undefined { 85 return trimText(value, 160) 86 } 87 88 function normalizeOccurredAt(value?: number): number { 89 return typeof value === 'number' && Number.isFinite(value) ? Math.trunc(value) : Date.now() 90 } 91 92 function reasonPriority(reason: string): number { 93 const normalized = reason.toLowerCase() 94 if (/(approval|connector-message|webhook|watch_job|scheduled_wake|task-completed)/.test(normalized)) return 90 95 if (/(schedule)/.test(normalized)) return 70 96 if (/(comparison|manual|on-demand)/.test(normalized)) return 50 97 return 40 98 } 99 100 function normalizeWakeEvent(input: WakeRequestInput): WakeEvent { 101 const reason = normalizeWakeReason(input.reason) 102 const explicitPriority = typeof input.priority === 'number' && Number.isFinite(input.priority) 103 ? Math.trunc(input.priority) 104 : reasonPriority(reason) 105 return { 106 ...(trimText(input.eventId, 160) ? { eventId: trimText(input.eventId, 160) } : {}), 107 reason, 108 ...(trimText(input.source, 120) ? { source: trimText(input.source, 120) } : {}), 109 ...(trimText(input.resumeMessage, MAX_RESUME_CHARS) ? { resumeMessage: trimText(input.resumeMessage, MAX_RESUME_CHARS) } : {}), 110 ...(trimText(input.detail, MAX_DETAIL_CHARS) ? { detail: trimText(input.detail, MAX_DETAIL_CHARS) } : {}), 111 occurredAt: normalizeOccurredAt(input.occurredAt ?? input.requestedAt), 112 priority: Math.max(0, Math.min(100, explicitPriority)), 113 } 114 } 115 116 function uniqueWakeEvents(existing: WakeEvent[], incoming: WakeEvent): WakeEvent[] { 117 const merged = [...existing] 118 const matchIndex = merged.findIndex((candidate) => { 119 if (candidate.eventId && incoming.eventId) return candidate.eventId === incoming.eventId 120 return candidate.reason === incoming.reason 121 && candidate.source === incoming.source 122 && candidate.resumeMessage === incoming.resumeMessage 123 && candidate.detail === incoming.detail 124 }) 125 126 if (matchIndex >= 0) { 127 const previous = merged[matchIndex] 128 merged[matchIndex] = { 129 ...previous, 130 ...incoming, 131 priority: Math.max(previous.priority, incoming.priority), 132 occurredAt: Math.max(previous.occurredAt, incoming.occurredAt), 133 resumeMessage: incoming.resumeMessage || previous.resumeMessage, 134 detail: incoming.detail || previous.detail, 135 } 136 } else { 137 merged.push(incoming) 138 } 139 140 merged.sort((left, right) => { 141 if (right.priority !== left.priority) return right.priority - left.priority 142 return right.occurredAt - left.occurredAt 143 }) 144 return merged.slice(0, MAX_WAKE_EVENTS) 145 } 146 147 function wakeTargetKey(input: { agentId?: string; sessionId?: string }): string { 148 return `${normalizeWakeTarget(input.agentId) || ''}::${normalizeWakeTarget(input.sessionId) || ''}` 149 } 150 151 export function mergeHeartbeatWakeRequest( 152 existing: WakeRequest | undefined, 153 next: WakeRequestInput, 154 ): WakeRequest { 155 const agentId = normalizeWakeTarget(next.agentId) || existing?.agentId 156 const sessionId = normalizeWakeTarget(next.sessionId) || existing?.sessionId 157 const requestedAt = Math.max(existing?.requestedAt || 0, normalizeOccurredAt(next.requestedAt)) 158 const retryCount = Math.max(existing?.retryCount || 0, typeof next.retryCount === 'number' ? Math.trunc(next.retryCount) : 0) 159 const events = uniqueWakeEvents(existing?.events || [], normalizeWakeEvent(next)) 160 return { 161 ...(agentId ? { agentId } : {}), 162 ...(sessionId ? { sessionId } : {}), 163 requestedAt, 164 retryCount, 165 events, 166 } 167 } 168 169 function queuePendingWake(next: WakeRequestInput): void { 170 const key = wakeTargetKey(next) 171 const existing = state.pending.get(key) 172 state.pending.set(key, mergeHeartbeatWakeRequest(existing, next)) 173 } 174 175 function queuePendingWakeRequest(next: WakeRequest): void { 176 const key = wakeTargetKey(next) 177 let merged = state.pending.get(key) 178 for (const event of next.events) { 179 merged = mergeHeartbeatWakeRequest(merged, { 180 agentId: next.agentId, 181 sessionId: next.sessionId, 182 requestedAt: next.requestedAt, 183 retryCount: next.retryCount, 184 eventId: event.eventId, 185 reason: event.reason, 186 source: event.source, 187 resumeMessage: event.resumeMessage, 188 detail: event.detail, 189 occurredAt: event.occurredAt, 190 priority: event.priority, 191 }) 192 } 193 if (merged) state.pending.set(key, merged) 194 } 195 196 function scheduleFlush(delayMs: number, kind: WakeTimerKind = 'normal'): void { 197 const delay = Math.max(0, Number.isFinite(delayMs) ? Math.trunc(delayMs) : COALESCE_MS) 198 const dueAt = Date.now() + delay 199 if (state.timer) { 200 if (state.timerKind === 'retry' && kind !== 'normal') return 201 if (typeof state.timerDueAt === 'number' && state.timerDueAt <= dueAt) return 202 clearTimeout(state.timer) 203 state.timer = null 204 state.timerDueAt = null 205 state.timerKind = null 206 } 207 208 state.timerDueAt = dueAt 209 state.timerKind = kind 210 state.timer = setTimeout(() => { 211 flushWakes() 212 }, delay) 213 } 214 215 function isScheduleLikeWake(wake: WakeRequest): boolean { 216 return wake.events.some((event) => { 217 const reason = event.reason.toLowerCase() 218 return reason.includes('schedule') 219 }) 220 } 221 222 export function buildWakeTriggerContext(events: WakeEvent[], nowIso?: string): string { 223 const lines = [ 224 '## Wake Trigger Context', 225 `Triggered at: ${nowIso || new Date().toISOString()}`, 226 'These new events caused this immediate wake. Prioritize them over generic background polling and avoid repeating already-completed work.', 227 'If the base heartbeat instructions require an exact file change or exact acknowledgment phrase, follow that exactly and do not add extra commentary.', 228 ] 229 for (const event of events.slice(0, MAX_WAKE_EVENTS)) { 230 const tags = [ 231 `reason=${event.reason}`, 232 event.source ? `source=${event.source}` : '', 233 `priority=${event.priority}`, 234 `at=${new Date(event.occurredAt).toISOString()}`, 235 ].filter(Boolean).join(' | ') 236 lines.push(`- ${tags}`) 237 if (event.resumeMessage) lines.push(` Resume: ${event.resumeMessage}`) 238 if (event.detail) lines.push(` Detail: ${event.detail}`) 239 } 240 lines.push('Reply HEARTBEAT_OK only if every trigger above is already handled or truly needs no action.') 241 return lines.join('\n') 242 } 243 244 export function deriveHeartbeatWakeDeliveryMode( 245 events: WakeEvent[], 246 ): 'default' | 'tool_only' | 'silent' { 247 if (events.some((event) => event.reason.toLowerCase() === 'connector-message')) { 248 return 'tool_only' 249 } 250 if (events.length > 0 && events.every((event) => event.reason.toLowerCase().includes('schedule'))) { 251 return 'silent' 252 } 253 return 'default' 254 } 255 256 export function buildHeartbeatWakePrompt(input: { 257 wake: WakeRequest 258 basePrompt?: string 259 nowIso?: string 260 }): string { 261 const triggerContext = buildWakeTriggerContext(input.wake.events, input.nowIso) 262 if (input.basePrompt?.trim()) { 263 return [ 264 input.basePrompt.trim(), 265 '', 266 triggerContext, 267 ].join('\n') 268 } 269 return [ 270 'AGENT_HEARTBEAT_WAKE', 271 `Time: ${input.nowIso || new Date().toISOString()}`, 272 triggerContext, 273 'Take the highest-value next step now, or reply HEARTBEAT_OK if nothing needs attention.', 274 ].join('\n') 275 } 276 277 function resolveWakeSessionId( 278 wake: WakeRequest, 279 sessions: Record<string, Record<string, unknown>>, 280 ): string | undefined { 281 if (wake.sessionId) return wake.sessionId 282 if (!wake.agentId) return undefined 283 if (isScheduleLikeWake(wake)) { 284 for (const session of Object.values(sessions)) { 285 if (session.agentId !== wake.agentId) continue 286 if (isMainSession(session)) return String(session.id) 287 } 288 return ensureAgentThreadSession(wake.agentId)?.id 289 } 290 291 let bestSession: { id: string; lastActiveAt: number } | null = null 292 for (const session of Object.values(sessions)) { 293 if (session.agentId !== wake.agentId) continue 294 const lastActive = typeof session.lastActiveAt === 'number' ? session.lastActiveAt : 0 295 if (!bestSession || lastActive > bestSession.lastActiveAt) { 296 bestSession = { id: String(session.id), lastActiveAt: lastActive } 297 } 298 } 299 if (bestSession?.id) return bestSession.id 300 return ensureAgentThreadSession(wake.agentId)?.id 301 } 302 303 export function resolveWakeSessionIdForTests( 304 wake: WakeRequest, 305 sessions: Record<string, Record<string, unknown>>, 306 ): string | undefined { 307 return resolveWakeSessionId(wake, sessions) 308 } 309 310 function flushWakes(): void { 311 state.timer = null 312 state.timerDueAt = null 313 state.timerKind = null 314 const wakes = [...state.pending.values()] 315 state.pending.clear() 316 317 if (!wakes.length) return 318 319 const agents = listAgents() 320 const settings = loadSettings() 321 const sessions = listSessions() as unknown as Record<string, Record<string, unknown>> 322 let delayedForRetry = false 323 324 for (const wake of wakes) { 325 try { 326 const sessionId = resolveWakeSessionId(wake, sessions) 327 if (!sessionId) continue 328 329 const session = (sessions[sessionId] || getSession(sessionId)) as unknown as Record<string, unknown> | undefined 330 if (!session) continue 331 332 let execution = getSessionExecutionState(sessionId) 333 const sharedNonHeartbeatBusy = hasActiveNonHeartbeatSessionLease(sessionId) 334 if (execution.hasQueued && !execution.hasRunning && !sharedNonHeartbeatBusy) { 335 const repair = repairSessionRunQueue(sessionId, { 336 reason: 'Recovered stale queued run before heartbeat wake', 337 }) 338 if (repair.recoveredQueuedRuns > 0 || repair.kickedExecutionKeys > 0) { 339 execution = getSessionExecutionState(sessionId) 340 } 341 } 342 if (execution.hasRunning || execution.hasQueued || sharedNonHeartbeatBusy) { 343 queuePendingWakeRequest({ 344 ...wake, 345 sessionId, 346 retryCount: wake.retryCount + 1, 347 }) 348 delayedForRetry = true 349 log.info('heartbeat-wake', `Wake delayed for busy session ${sessionId}`, { 350 running: execution.hasRunning, 351 queued: execution.queueLength, 352 sharedNonHeartbeatBusy, 353 }) 354 continue 355 } 356 357 const agentId = (session.agentId || wake.agentId) as string | undefined 358 const agent = agentId ? agents[agentId] as unknown as Record<string, unknown> | null : null 359 // Skip sessions whose agent was deleted or trashed (not in loadAgents()) 360 if (agentId && !agent) continue 361 if (isAgentDisabled(agent)) continue 362 363 const cfg = heartbeatConfigForSession(session, settings, agents) 364 if (!cfg.enabled) { 365 log.info('heartbeat-wake', `Wake skipped for session ${sessionId}: heartbeat disabled`, { 366 agentId, 367 }) 368 continue 369 } 370 const rawHeartbeatFileContent = readHeartbeatFile(session) 371 const heartbeatFileContent = isHeartbeatContentEffectivelyEmpty(rawHeartbeatFileContent) ? '' : rawHeartbeatFileContent 372 const baseHeartbeatPrompt = buildAgentHeartbeatPrompt(session, agent, cfg.prompt, heartbeatFileContent) 373 const promptCore = isMainSession(session) 374 ? buildMainLoopHeartbeatPrompt(session, baseHeartbeatPrompt) 375 : baseHeartbeatPrompt 376 const prompt = buildHeartbeatWakePrompt({ 377 wake, 378 basePrompt: promptCore, 379 }) 380 381 enqueueSessionRun({ 382 sessionId, 383 message: prompt, 384 internal: true, 385 source: 'heartbeat-wake', 386 mode: 'collect', 387 dedupeKey: `heartbeat-wake:${sessionId}`, 388 modelOverride: cfg.model || undefined, 389 heartbeatConfig: { 390 ackMaxChars: cfg.ackMaxChars, 391 showOk: cfg.showOk, 392 showAlerts: cfg.showAlerts, 393 target: cfg.target, 394 deliveryMode: deriveHeartbeatWakeDeliveryMode(wake.events), 395 }, 396 }) 397 398 log.info('heartbeat-wake', `Wake fired for session ${sessionId}`, { 399 reasons: wake.events.map((event: WakeEvent) => event.reason), 400 retryCount: wake.retryCount, 401 }) 402 } catch (err: unknown) { 403 queuePendingWakeRequest({ 404 ...wake, 405 retryCount: wake.retryCount + 1, 406 }) 407 delayedForRetry = true 408 log.warn('heartbeat-wake', `Wake failed: ${errorMessage(err)}`) 409 } 410 } 411 412 if (delayedForRetry && state.pending.size > 0) { 413 scheduleFlush(RETRY_MS, 'retry') 414 } 415 } 416 417 /** Queue a heartbeat wake. Multiple rapid calls are coalesced into a single flush. */ 418 export function requestHeartbeatNow(opts: WakeRequestInput): void { 419 queuePendingWake(opts) 420 scheduleFlush(COALESCE_MS, 'normal') 421 } 422 423 export function resetHeartbeatWakeStateForTests(): void { 424 if (state.timer) clearTimeout(state.timer) 425 state.timer = null 426 state.timerDueAt = null 427 state.timerKind = null 428 state.pending.clear() 429 } 430 431 export function hasPendingHeartbeatWake(): boolean { 432 return state.pending.size > 0 || Boolean(state.timer) 433 } 434 435 export function snapshotPendingHeartbeatWakesForTests(): WakeRequest[] { 436 return [...state.pending.values()].map((wake) => ({ 437 ...wake, 438 events: wake.events.map((event: WakeEvent) => ({ ...event })), 439 })) 440 }