core.ts
1 import { log } from '@/lib/server/logger' 2 import { loadAgents } from '@/lib/server/agents/agent-repository' 3 import { loadConnectors, saveConnectors } from '@/lib/server/connectors/connector-repository' 4 import { decryptKey, loadCredentials } from '@/lib/server/credentials/credential-repository' 5 import { loadQueue } from '@/lib/server/runtime/queue-repository' 6 import { pruneExpiredLocks, readRuntimeLock, releaseRuntimeLock, renewRuntimeLock, tryAcquireRuntimeLock } from '@/lib/server/runtime/runtime-lock-repository' 7 import { isOwnerProcessDead } from '@/lib/server/daemon/lease-owner' 8 import { loadSchedules } from '@/lib/server/schedules/schedule-repository' 9 import { loadSessions } from '@/lib/server/sessions/session-repository' 10 import { loadSettings } from '@/lib/server/settings/settings-repository' 11 import { pruneOldUsage } from '@/lib/server/usage/usage-repository' 12 import { appendWebhookLog, deleteWebhookRetry, loadWebhookRetryQueue, loadWebhooks, upsertWebhookRetry } from '@/lib/server/webhooks/webhook-repository' 13 import { notify } from '@/lib/server/ws-hub' 14 import { processNext, cleanupFinishedTaskSessions, validateCompletedTasksQueue, recoverStalledRunningTasks, resumeQueue, promoteDeferred } from '@/lib/server/runtime/queue' 15 import { startScheduler, stopScheduler } from '@/lib/server/runtime/scheduler' 16 import { sweepOrphanedBrowsers, getActiveBrowserCount } from '@/lib/server/session-tools' 17 import { 18 autoStartConnectors, 19 stopAllConnectors, 20 startConnector, 21 getConnectorStatus, 22 checkConnectorHealth, 23 createConnectorReconnectState, 24 advanceConnectorReconnectState, 25 clearReconnectState, 26 getAllReconnectStates, 27 getReconnectState, 28 setReconnectState, 29 } from '@/lib/server/connectors/manager' 30 import { startConnectorOutboxWorker, stopConnectorOutboxWorker } from '@/lib/server/connectors/outbox' 31 import { pruneConnectorTrackingState } from '@/lib/server/connectors/runtime-state' 32 import { startHeartbeatService, stopHeartbeatService, getHeartbeatServiceStatus, pruneHeartbeatState, pruneOrchestratorState } from '@/lib/server/runtime/heartbeat-service' 33 import { hasOpenClawAgents, ensureGatewayConnected, disconnectAutoGateways, getGateway } from '@/lib/server/openclaw/gateway' 34 import { enqueueSessionRun, sweepStuckRuns } from '@/lib/server/runtime/session-run-manager' 35 import { pruneOldRuns } from '@/lib/server/runtime/run-ledger' 36 import { getEnabledCapabilitySelection } from '@/lib/capability-selection' 37 import { WORKSPACE_DIR } from '@/lib/server/data-dir' 38 import { DEFAULT_HEARTBEAT_INTERVAL_SEC } from '@/lib/runtime/heartbeat-defaults' 39 import { genId } from '@/lib/id' 40 import { isAgentDisabled } from '@/lib/server/agents/agent-availability' 41 import { errorMessage, hmrSingleton } from '@/lib/shared-utils' 42 import path from 'node:path' 43 import type { Connector, Session, WebhookRetryEntry } from '@/types' 44 import { createNotification } from '@/lib/server/create-notification' 45 import { pingProvider, OPENAI_COMPATIBLE_DEFAULTS, restoreProviderHealthState } from '@/lib/server/provider-health' 46 import { runIntegrityMonitor } from '@/lib/server/integrity-monitor' 47 import { notifyOrchestrators } from '@/lib/server/runtime/orchestrator-events' 48 import { recoverStaleDelegationJobs } from '@/lib/server/agents/delegation-jobs' 49 import { restoreSwarmRegistry } from '@/lib/server/agents/subagent-swarm' 50 import { cleanupFinishedSubagents } from '@/lib/server/agents/subagent-runtime' 51 import { pruneMainLoopState } from '@/lib/server/agents/main-agent-loop' 52 import { pruneSystemEventQueues, pruneOrchestratorEventQueues } from '@/lib/server/runtime/system-events' 53 import { checkSwarmTimeouts, ensureProtocolEngineRecovered } from '@/lib/server/protocols/protocol-service' 54 import { sweepManagedProcesses, reapOrphanedSandboxContainers } from '@/lib/server/runtime/process-manager' 55 import { drainIdleWindowCallbacks } from '@/lib/server/runtime/idle-window' 56 import { 57 buildSessionHeartbeatHealthDedupKey, 58 daemonAutostartEnvEnabled, 59 isDaemonBackgroundServicesEnabled, 60 parseCronToMs, 61 parseHeartbeatIntervalSec, 62 shouldNotifyProviderReachabilityIssue, 63 shouldSuppressSessionHeartbeatHealthAlert, 64 shouldSuppressSyntheticAgentHealthAlert, 65 } from '@/lib/server/runtime/daemon-policy' 66 import { loadEstopState } from '@/lib/server/runtime/estop' 67 import { classifyRuntimeFailure, recordSupervisorIncident } from '@/lib/server/autonomy/supervisor-reflection' 68 import { getMemoryDb } from '@/lib/server/memory/memory-db' 69 import { clearLogsByAge } from '@/lib/server/execution-log' 70 71 const TAG = 'daemon-state' 72 73 const QUEUE_CHECK_INTERVAL = 30_000 // 30 seconds 74 const BROWSER_SWEEP_INTERVAL = 60_000 // 60 seconds 75 const BROWSER_MAX_AGE = 10 * 60 * 1000 // 10 minutes idle = orphaned 76 const HEALTH_CHECK_INTERVAL = 120_000 // 2 minutes 77 const CONNECTOR_HEALTH_CHECK_INTERVAL = 15_000 // 15 seconds 78 const MEMORY_CONSOLIDATION_INTERVAL = 6 * 3600_000 // 6 hours 79 const MEMORY_CONSOLIDATION_INITIAL_DELAY = 60_000 // 1 minute after daemon start 80 const STALE_MULTIPLIER = 4 // session is stale after N × heartbeat interval 81 const STALE_MIN_MS = 4 * 60 * 1000 // minimum 4 minutes regardless of interval 82 const STALE_AUTO_DISABLE_MULTIPLIER = 16 // auto-disable after much longer sustained staleness 83 const STALE_AUTO_DISABLE_MIN_MS = 45 * 60 * 1000 // never auto-disable before 45 minutes 84 const CONNECTOR_RESTART_BASE_MS = 30_000 85 const CONNECTOR_RESTART_MAX_MS = 15 * 60 * 1000 86 const MAX_WAKE_ATTEMPTS = 3 87 const QUEUE_PROCESS_TIMEOUT = 10 * 60_000 // 10 minutes 88 const SHUTDOWN_TIMEOUT_MS = 15_000 89 const PROVIDER_PING_CB_THRESHOLD = 3 // trips after 3 consecutive failures 90 const PROVIDER_PING_CB_BASE_MS = 300_000 // 5 min initial cooldown 91 const PROVIDER_PING_CB_MAX_MS = 1_800_000 // 30 min max cooldown 92 const DAEMON_RUNTIME_LOCK_NAME = 'daemon-primary' 93 const DAEMON_RUNTIME_LOCK_TTL_MS = 120_000 94 const DAEMON_RUNTIME_LOCK_RENEW_MS = 30_000 95 96 export { 97 buildSessionHeartbeatHealthDedupKey, 98 isDaemonBackgroundServicesEnabled, 99 shouldNotifyProviderReachabilityIssue, 100 shouldSuppressSessionHeartbeatHealthAlert, 101 shouldSuppressSyntheticAgentHealthAlert, 102 } 103 104 // Store daemon state on globalThis to survive HMR reloads 105 interface DaemonState { 106 queueIntervalId: ReturnType<typeof setInterval> | null 107 browserSweepId: ReturnType<typeof setInterval> | null 108 healthIntervalId: ReturnType<typeof setInterval> | null 109 connectorHealthIntervalId: ReturnType<typeof setInterval> | null 110 memoryConsolidationTimeoutId: ReturnType<typeof setTimeout> | null 111 memoryConsolidationIntervalId: ReturnType<typeof setInterval> | null 112 evalSchedulerIntervalId: ReturnType<typeof setInterval> | null 113 swarmTimeoutIntervalId: ReturnType<typeof setInterval> | null 114 /** Session IDs we've already alerted as stale (alert-once semantics). */ 115 staleSessionIds: Set<string> 116 /** OpenClaw gateway agent IDs currently considered down. */ 117 openclawDownAgentIds: Set<string> 118 /** Per-agent auto-repair state for OpenClaw gateways. */ 119 openclawRepairState: Map<string, { attempts: number; lastAttemptAt: number; cooldownUntil: number }> 120 lastIntegrityCheckAt: number | null 121 lastIntegrityDriftCount: number 122 manualStopRequested: boolean 123 running: boolean 124 lastProcessedAt: number | null 125 healthCheckRunning: boolean 126 connectorHealthCheckRunning: boolean 127 shuttingDown: boolean 128 providerPingCircuitBreaker: Map<string, { consecutiveFailures: number; skipUntil: number }> 129 lockRenewIntervalId: ReturnType<typeof setInterval> | null 130 leaseRetryTimeoutId: ReturnType<typeof setTimeout> | null 131 primaryLeaseHeld: boolean 132 } 133 134 const ds: DaemonState = hmrSingleton<DaemonState>('__swarmclaw_daemon__', () => ({ 135 queueIntervalId: null, 136 browserSweepId: null, 137 healthIntervalId: null, 138 connectorHealthIntervalId: null, 139 memoryConsolidationTimeoutId: null, 140 memoryConsolidationIntervalId: null, 141 evalSchedulerIntervalId: null, 142 swarmTimeoutIntervalId: null, 143 staleSessionIds: new Set<string>(), 144 openclawDownAgentIds: new Set<string>(), 145 openclawRepairState: new Map<string, { attempts: number; lastAttemptAt: number; cooldownUntil: number }>(), 146 lastIntegrityCheckAt: null, 147 lastIntegrityDriftCount: 0, 148 manualStopRequested: false, 149 running: false, 150 lastProcessedAt: null, 151 healthCheckRunning: false, 152 connectorHealthCheckRunning: false, 153 shuttingDown: false, 154 providerPingCircuitBreaker: new Map<string, { consecutiveFailures: number; skipUntil: number }>(), 155 lockRenewIntervalId: null, 156 leaseRetryTimeoutId: null, 157 primaryLeaseHeld: false, 158 })) 159 160 const daemonLockOwner = hmrSingleton<string>( 161 '__swarmclaw_daemon_lock_owner__', 162 () => `pid:${process.pid}:${genId(8)}`, 163 ) 164 165 // Backfill fields for hot-reloaded daemon state objects from older code versions. 166 if (!ds.staleSessionIds) ds.staleSessionIds = new Set<string>() 167 if (!ds.openclawDownAgentIds) ds.openclawDownAgentIds = new Set<string>() 168 if (!ds.openclawRepairState) ds.openclawRepairState = new Map<string, { attempts: number; lastAttemptAt: number; cooldownUntil: number }>() 169 if (ds.lastIntegrityCheckAt === undefined) ds.lastIntegrityCheckAt = null 170 if (ds.lastIntegrityDriftCount === undefined) ds.lastIntegrityDriftCount = 0 171 // Migrate from old issueLastAlertAt map if present (HMR across code versions) 172 // eslint-disable-next-line @typescript-eslint/no-explicit-any 173 if ((ds as any).issueLastAlertAt) delete (ds as any).issueLastAlertAt 174 if (ds.healthIntervalId === undefined) ds.healthIntervalId = null 175 if (ds.connectorHealthIntervalId === undefined) ds.connectorHealthIntervalId = null 176 if (ds.manualStopRequested === undefined) ds.manualStopRequested = false 177 if (ds.memoryConsolidationTimeoutId === undefined) ds.memoryConsolidationTimeoutId = null 178 if (ds.memoryConsolidationIntervalId === undefined) ds.memoryConsolidationIntervalId = null 179 if (ds.evalSchedulerIntervalId === undefined) ds.evalSchedulerIntervalId = null 180 if (ds.swarmTimeoutIntervalId === undefined) ds.swarmTimeoutIntervalId = null 181 if (ds.healthCheckRunning === undefined) ds.healthCheckRunning = false 182 if (ds.connectorHealthCheckRunning === undefined) ds.connectorHealthCheckRunning = false 183 if (ds.shuttingDown === undefined) ds.shuttingDown = false 184 if (!ds.providerPingCircuitBreaker) ds.providerPingCircuitBreaker = new Map<string, { consecutiveFailures: number; skipUntil: number }>() 185 if (ds.lockRenewIntervalId === undefined) ds.lockRenewIntervalId = null 186 if (ds.leaseRetryTimeoutId === undefined) ds.leaseRetryTimeoutId = null 187 if (ds.primaryLeaseHeld === undefined) ds.primaryLeaseHeld = false 188 189 function stopDaemonLeaseRenewal(opts?: { release?: boolean }) { 190 if (ds.lockRenewIntervalId) { 191 clearInterval(ds.lockRenewIntervalId) 192 ds.lockRenewIntervalId = null 193 } 194 if (opts?.release !== false && ds.primaryLeaseHeld) { 195 try { 196 releaseRuntimeLock(DAEMON_RUNTIME_LOCK_NAME, daemonLockOwner) 197 } catch { 198 // Best effort during shutdown or HMR. 199 } 200 } 201 if (opts?.release !== false) ds.primaryLeaseHeld = false 202 } 203 204 function startDaemonLeaseRenewal() { 205 if (!ds.primaryLeaseHeld || ds.lockRenewIntervalId) return 206 ds.lockRenewIntervalId = setInterval(() => { 207 if (!ds.running || !ds.primaryLeaseHeld) return 208 let renewed = false 209 try { 210 renewed = renewRuntimeLock(DAEMON_RUNTIME_LOCK_NAME, daemonLockOwner, DAEMON_RUNTIME_LOCK_TTL_MS) 211 } catch (err: unknown) { 212 log.warn(TAG, `[daemon] Failed to renew daemon lease: ${errorMessage(err)}`) 213 } 214 if (renewed) return 215 ds.primaryLeaseHeld = false 216 stopDaemonLeaseRenewal({ release: false }) 217 log.warn(TAG, '[daemon] Lost cross-process daemon lease; stopping local daemon instance') 218 void stopDaemon({ source: 'lease-lost' }) 219 }, DAEMON_RUNTIME_LOCK_RENEW_MS) 220 } 221 222 function acquireDaemonLease(source: string): boolean { 223 if (ds.primaryLeaseHeld) { 224 startDaemonLeaseRenewal() 225 return true 226 } 227 let acquired = false 228 try { 229 acquired = tryAcquireRuntimeLock(DAEMON_RUNTIME_LOCK_NAME, daemonLockOwner, DAEMON_RUNTIME_LOCK_TTL_MS) 230 } catch (err: unknown) { 231 log.warn(TAG, `[daemon] Failed to acquire daemon lease (source=${source}): ${errorMessage(err)}`) 232 return false 233 } 234 if (!acquired) { 235 let owner = 'another process' 236 let expiresAt: number | null = null 237 try { 238 const lease = readRuntimeLock(DAEMON_RUNTIME_LOCK_NAME) 239 if (lease) { 240 owner = lease.owner || owner 241 expiresAt = lease.expiresAt 242 } 243 } catch { 244 // Best-effort diagnostics only. 245 } 246 247 // Stale-lease recovery: when a previous container / process crashed 248 // without releasing the lease, the new instance would otherwise wait 249 // up to the full TTL (DAEMON_RUNTIME_LOCK_TTL_MS) before being able 250 // to start the daemon. If the recorded owner pid is local to this 251 // host AND is no longer alive, reclaim the lease immediately and 252 // retry. Conservative: any uncertainty (different host, malformed 253 // owner, kill probe failed for an unexpected reason) skips the 254 // reclaim path. Reported as issue #41 (Bug 2). 255 if (isOwnerProcessDead(owner)) { 256 try { 257 releaseRuntimeLock(DAEMON_RUNTIME_LOCK_NAME, owner) 258 log.info(TAG, `[daemon] Reclaimed stale daemon-primary lease from dead owner ${owner}`) 259 let retried = false 260 try { 261 retried = tryAcquireRuntimeLock(DAEMON_RUNTIME_LOCK_NAME, daemonLockOwner, DAEMON_RUNTIME_LOCK_TTL_MS) 262 } catch (err: unknown) { 263 log.warn(TAG, `[daemon] Reclaim retry failed (source=${source}): ${errorMessage(err)}`) 264 } 265 if (retried) { 266 ds.primaryLeaseHeld = true 267 startDaemonLeaseRenewal() 268 return true 269 } 270 } catch (err: unknown) { 271 log.warn(TAG, `[daemon] Failed to release stale lease (source=${source}): ${errorMessage(err)}`) 272 } 273 } 274 275 log.info(TAG, `[daemon] Skipping start (source=${source}); lease held by ${owner}`) 276 277 // Schedule one deferred retry slightly past the lease's expiry so 278 // the daemon comes up automatically once the prior owner's TTL has 279 // elapsed, instead of waiting for the next API call to nudge it. 280 if (expiresAt !== null) { 281 const delayMs = Math.max(1_000, expiresAt - Date.now() + 1_000) 282 if (ds.leaseRetryTimeoutId) clearTimeout(ds.leaseRetryTimeoutId) 283 ds.leaseRetryTimeoutId = setTimeout(() => { 284 ds.leaseRetryTimeoutId = null 285 if (ds.running || ds.primaryLeaseHeld) return 286 ensureDaemonStarted(`${source}:lease-retry`) 287 }, delayMs) 288 ds.leaseRetryTimeoutId.unref?.() 289 } 290 return false 291 } 292 ds.primaryLeaseHeld = true 293 startDaemonLeaseRenewal() 294 return true 295 } 296 297 export function ensureDaemonStarted(source = 'unknown'): boolean { 298 if (ds.running) return false 299 if (!daemonAutostartEnvEnabled()) return false 300 if (ds.manualStopRequested) return false 301 if (loadEstopState().level !== 'none') return false 302 return startDaemon({ source, manualStart: false }) 303 } 304 305 export function startDaemon(options?: { source?: string; manualStart?: boolean }): boolean { 306 const source = options?.source || 'unknown' 307 const manualStart = options?.manualStart === true 308 if (manualStart) ds.manualStopRequested = false 309 const estop = loadEstopState() 310 if (estop.level !== 'none') { 311 notify('daemon') 312 log.warn(TAG, `[daemon] Start blocked by estop (level=${estop.level}, source=${source})`) 313 return false 314 } 315 316 if (ds.running) { 317 // In dev/HMR, daemon can already be flagged running while new interval types 318 // (for example health monitor) were introduced in newer code. 319 startDaemonLeaseRenewal() 320 startQueueProcessor() 321 startBrowserSweep() 322 startHeartbeatService() 323 startMemoryConsolidation() 324 startSwarmTimeoutChecker() 325 syncDaemonBackgroundServices({ runConnectorHealthCheckImmediately: false }) 326 return false 327 } 328 if (!acquireDaemonLease(source)) { 329 notify('daemon') 330 return false 331 } 332 ds.running = true 333 notify('daemon') 334 log.info(TAG, `[daemon] Starting daemon (source=${source}, scheduler + queue processor + heartbeat)`) 335 336 try { 337 validateCompletedTasksQueue() 338 cleanupFinishedTaskSessions() 339 recoverStaleDelegationJobs({ fullRestart: true }) 340 ensureProtocolEngineRecovered() 341 restoreProviderHealthState() 342 try { 343 const lost = restoreSwarmRegistry() 344 if (lost > 0) log.info(TAG, `[daemon] Marked ${lost} in-flight swarm(s) as lost after restart`) 345 } catch { /* best-effort */ } 346 resumeQueue() 347 startScheduler() 348 startQueueProcessor() 349 startBrowserSweep() 350 startHeartbeatService() 351 startMemoryConsolidation() 352 startSwarmTimeoutChecker() 353 syncDaemonBackgroundServices({ runConnectorHealthCheckImmediately: false }) 354 } catch (err: unknown) { 355 ds.running = false 356 stopDaemonLeaseRenewal() 357 notify('daemon') 358 log.error(TAG, '[daemon] Failed to start:', errorMessage(err)) 359 throw err 360 } 361 362 if (isDaemonBackgroundServicesEnabled()) { 363 // Auto-start enabled connectors only when the full background stack is enabled. 364 autoStartConnectors().catch((err: unknown) => { 365 log.error(TAG, '[daemon] Error auto-starting connectors:', errorMessage(err)) 366 }) 367 } 368 return true 369 } 370 371 export async function stopDaemon(options?: { source?: string; manualStop?: boolean }) { 372 const source = options?.source || 'unknown' 373 if (options?.manualStop === true) ds.manualStopRequested = true 374 if (!ds.running) { 375 stopDaemonLeaseRenewal() 376 return 377 } 378 ds.running = false 379 ds.shuttingDown = true 380 notify('daemon') 381 log.info(TAG, `[daemon] Stopping daemon (source=${source})`) 382 383 stopScheduler() 384 stopQueueProcessor() 385 stopBrowserSweep() 386 stopHealthMonitor() 387 stopConnectorHealthMonitor() 388 stopConnectorOutboxWorker() 389 stopHeartbeatService() 390 stopMemoryConsolidation() 391 stopSwarmTimeoutChecker() 392 stopEvalScheduler() 393 try { 394 await Promise.race([ 395 stopAllConnectors({ disable: false }), 396 new Promise<void>((_, reject) => 397 setTimeout(() => reject(new Error('Connector shutdown timed out')), SHUTDOWN_TIMEOUT_MS) 398 ), 399 ]) 400 } catch (err: unknown) { 401 log.warn(TAG, `[daemon] Connector shutdown issue: ${errorMessage(err)}`) 402 } finally { 403 stopDaemonLeaseRenewal() 404 ds.shuttingDown = false 405 } 406 } 407 408 function startBrowserSweep() { 409 if (ds.browserSweepId) return 410 ds.browserSweepId = setInterval(() => { 411 const count = getActiveBrowserCount() 412 if (count > 0) { 413 const cleaned = sweepOrphanedBrowsers(BROWSER_MAX_AGE) 414 if (cleaned > 0) { 415 log.info(TAG, `[daemon] Cleaned ${cleaned} orphaned browser(s), ${getActiveBrowserCount()} still active`) 416 } 417 } 418 }, BROWSER_SWEEP_INTERVAL) 419 } 420 421 function stopBrowserSweep() { 422 if (ds.browserSweepId) { 423 clearInterval(ds.browserSweepId) 424 ds.browserSweepId = null 425 } 426 // Kill all remaining browsers on shutdown 427 sweepOrphanedBrowsers(0) 428 } 429 430 export async function syncOpenClawGatewayLifecycle() { 431 if (!hasOpenClawAgents()) { 432 disconnectAutoGateways() 433 return 434 } 435 if (!getGateway()?.connected) { 436 await ensureGatewayConnected() 437 } 438 } 439 440 function startQueueProcessor() { 441 if (ds.queueIntervalId) return 442 ds.queueIntervalId = setInterval(async () => { 443 if (!ds.running) return 444 const queue = loadQueue() 445 if (queue.length > 0) { 446 log.info(TAG, `[daemon] Processing ${queue.length} queued task(s)`) 447 try { 448 await Promise.race([ 449 processNext(), 450 new Promise<void>((_, reject) => 451 setTimeout(() => reject(new Error('Queue processing timed out')), QUEUE_PROCESS_TIMEOUT) 452 ), 453 ]) 454 } catch (err: unknown) { 455 log.error(TAG, `[daemon] Queue processing error/timeout: ${errorMessage(err)}`) 456 } 457 ds.lastProcessedAt = Date.now() 458 } 459 if (!isDaemonBackgroundServicesEnabled()) return 460 // OpenClaw gateway lifecycle: lazy connect for active OpenClaw agents, stop auto-managed reconnects when none remain. 461 try { 462 await syncOpenClawGatewayLifecycle() 463 } catch { /* gateway errors are non-fatal */ } 464 }, QUEUE_CHECK_INTERVAL) 465 } 466 467 function stopQueueProcessor() { 468 if (ds.queueIntervalId) { 469 clearInterval(ds.queueIntervalId) 470 ds.queueIntervalId = null 471 } 472 } 473 474 async function sendHealthAlert(input: string | { 475 text: string 476 dedupKey?: string 477 entityType?: string 478 entityId?: string 479 }) { 480 const payload = typeof input === 'string' ? { text: input } : input 481 const text = payload.text 482 log.warn(TAG, `[health] ${text}`) 483 createNotification({ 484 type: 'warning', 485 title: 'SwarmClaw health alert', 486 message: text, 487 dedupKey: payload.dedupKey || `health-alert:${text}`, 488 entityType: payload.entityType, 489 entityId: payload.entityId, 490 dispatchExternally: false, 491 }) 492 } 493 494 async function runConnectorHealthChecks(now: number) { 495 // First, collapse dead runtime instances into persisted error state so the 496 // daemon can own the restart cadence and backoff policy. 497 try { 498 await checkConnectorHealth() 499 } catch (err: unknown) { 500 log.error(TAG, '[health] Connector isAlive check failed:', errorMessage(err)) 501 } 502 503 const connectors = loadConnectors() 504 for (const connector of Object.values(connectors) as Connector[]) { 505 if (!connector?.id || typeof connector.id !== 'string') continue 506 if (connector.isEnabled !== true) { 507 clearReconnectState(connector.id) 508 continue 509 } 510 511 const runtimeStatus = getConnectorStatus(connector.id) 512 if (runtimeStatus === 'running') { 513 clearReconnectState(connector.id) 514 continue 515 } 516 517 const current = getReconnectState(connector.id) 518 ?? createConnectorReconnectState( 519 { error: typeof connector.lastError === 'string' ? connector.lastError : '' }, 520 { initialBackoffMs: CONNECTOR_RESTART_BASE_MS }, 521 ) 522 523 if (current.exhausted) { 524 continue 525 } 526 527 if (current.nextRetryAt > now) continue 528 529 // Notify on first detection of a down connector 530 if (current.attempts === 0) { 531 createNotification({ 532 type: 'warning', 533 title: `Connector "${connector.name}" is down`, 534 message: 'Auto-restart in progress.', 535 dedupKey: `connector-down:${connector.id}`, 536 entityType: 'connector', 537 entityId: connector.id, 538 }) 539 } 540 541 try { 542 await startConnector(connector.id) 543 clearReconnectState(connector.id) 544 await sendHealthAlert(`Connector "${connector.name}" (${connector.platform}) was down and has been auto-restarted.`) 545 } catch (err: unknown) { 546 const message = errorMessage(err) 547 const next = advanceConnectorReconnectState(current, message, now, { 548 initialBackoffMs: CONNECTOR_RESTART_BASE_MS, 549 maxBackoffMs: CONNECTOR_RESTART_MAX_MS, 550 maxAttempts: MAX_WAKE_ATTEMPTS, 551 }) 552 setReconnectState(connector.id, next) 553 if (next.exhausted) { 554 log.warn(TAG, `[health] Connector "${connector.name}" exceeded ${MAX_WAKE_ATTEMPTS} auto-restart attempts — giving up until the server restarts or the user retries manually`) 555 connector.status = 'error' 556 connector.lastError = `Auto-restart gave up after ${MAX_WAKE_ATTEMPTS} attempts: ${message}` 557 connector.updatedAt = Date.now() 558 connectors[connector.id] = connector 559 saveConnectors(connectors) 560 notify('connectors') 561 notifyOrchestrators(`Connector ${connector.name || connector.id} status: error — auto-restart exhausted after ${MAX_WAKE_ATTEMPTS} attempts`, `connector-status:${connector.id}`) 562 createNotification({ 563 type: 'error', 564 title: `Connector "${connector.name}" failed`, 565 message: `Auto-restart gave up after ${MAX_WAKE_ATTEMPTS} attempts.`, 566 dedupKey: `connector-gave-up:${connector.id}`, 567 entityType: 'connector', 568 entityId: connector.id, 569 }) 570 } else { 571 log.warn(TAG, `[health] Connector auto-restart failed for ${connector.name} (attempt ${next.attempts}/${MAX_WAKE_ATTEMPTS}): ${message}`) 572 } 573 } 574 } 575 576 // Purge restart state for connectors that no longer exist in storage 577 for (const id of Object.keys(getAllReconnectStates())) { 578 if (!connectors[id] || connectors[id]?.isEnabled !== true) clearReconnectState(id) 579 } 580 } 581 582 async function processWebhookRetries() { 583 const retryQueue = loadWebhookRetryQueue() 584 const now = Date.now() 585 const dueEntries: WebhookRetryEntry[] = [] 586 587 for (const raw of Object.values(retryQueue)) { 588 const entry = raw as WebhookRetryEntry 589 if (entry.deadLettered) continue 590 if (entry.nextRetryAt > now) continue 591 dueEntries.push(entry) 592 } 593 594 if (dueEntries.length === 0) return 595 596 const webhooks = loadWebhooks() 597 const agents = loadAgents() 598 const sessions = loadSessions() 599 600 for (const entry of dueEntries) { 601 const webhook = webhooks[entry.webhookId] as unknown as Record<string, unknown> | undefined 602 if (!webhook) { 603 // Webhook deleted — drop the retry 604 deleteWebhookRetry(entry.id) 605 continue 606 } 607 608 const agentId = typeof webhook.agentId === 'string' ? webhook.agentId : '' 609 const agent = agentId ? (agents[agentId] as unknown as Record<string, unknown> | undefined) : null 610 if (!agent) { 611 entry.deadLettered = true 612 upsertWebhookRetry(entry.id, entry) 613 log.warn(TAG, `[webhook-retry] Dead-lettered ${entry.id}: agent not found for webhook ${entry.webhookId}`) 614 continue 615 } 616 if (isAgentDisabled(agent)) { 617 entry.deadLettered = true 618 upsertWebhookRetry(entry.id, entry) 619 log.warn(TAG, `[webhook-retry] Dead-lettered ${entry.id}: agent disabled for webhook ${entry.webhookId}`) 620 continue 621 } 622 623 // Find or create a webhook session (same logic as the POST handler) 624 const sessionName = `webhook:${entry.webhookId}` 625 let session = Object.values(sessions).find( 626 (s: unknown) => { 627 const rec = s as Record<string, unknown> 628 return rec.name === sessionName && rec.agentId === agent.id 629 }, 630 ) as unknown as Record<string, unknown> | undefined 631 632 if (!session) { 633 const sessionId = genId() 634 const ts = Date.now() 635 session = { 636 id: sessionId, 637 name: sessionName, 638 cwd: WORKSPACE_DIR, 639 user: 'system', 640 provider: agent.provider || 'claude-cli', 641 model: agent.model || '', 642 credentialId: agent.credentialId || null, 643 apiEndpoint: agent.apiEndpoint || null, 644 claudeSessionId: null, 645 codexThreadId: null, 646 opencodeSessionId: null, 647 delegateResumeIds: { claudeCode: null, codex: null, opencode: null, gemini: null }, 648 messages: [], 649 createdAt: ts, 650 lastActiveAt: ts, 651 sessionType: 'human', 652 agentId: agent.id, 653 parentSessionId: null, 654 ...getEnabledCapabilitySelection(agent), 655 heartbeatEnabled: (agent.heartbeatEnabled as boolean | undefined) ?? false, 656 heartbeatIntervalSec: (agent.heartbeatIntervalSec as number | null | undefined) ?? null, 657 } 658 const { upsertSession: upsert } = await import('@/lib/server/storage') 659 upsert(session.id as string, session) 660 } 661 662 const payloadPreview = (entry.payload || '').slice(0, 12_000) 663 const prompt = [ 664 'Webhook event received (retry).', 665 `Webhook ID: ${entry.webhookId}`, 666 `Webhook Name: ${(webhook.name as string) || entry.webhookId}`, 667 `Source: ${(webhook.source as string) || 'custom'}`, 668 `Event: ${entry.event}`, 669 `Retry attempt: ${entry.attempts}`, 670 `Original received at: ${new Date(entry.createdAt).toISOString()}`, 671 '', 672 'Payload:', 673 payloadPreview || '(empty payload)', 674 '', 675 'Handle this event now. If this requires notifying the user, use configured connector tools.', 676 ].join('\n') 677 678 try { 679 const run = enqueueSessionRun({ 680 sessionId: session.id as string, 681 message: prompt, 682 source: 'webhook', 683 internal: false, 684 mode: 'followup', 685 }) 686 687 appendWebhookLog(genId(8), { 688 id: genId(8), 689 webhookId: entry.webhookId, 690 event: entry.event, 691 payload: (entry.payload || '').slice(0, 2000), 692 status: 'success', 693 sessionId: session.id, 694 runId: run.runId, 695 timestamp: Date.now(), 696 }) 697 698 deleteWebhookRetry(entry.id) 699 log.info(TAG, `[webhook-retry] Successfully retried ${entry.id} for webhook ${entry.webhookId} (attempt ${entry.attempts})`) 700 } catch (err: unknown) { 701 const errorMsg = errorMessage(err) 702 entry.attempts += 1 703 704 if (entry.attempts >= entry.maxAttempts) { 705 entry.deadLettered = true 706 upsertWebhookRetry(entry.id, entry) 707 log.warn(TAG, `[webhook-retry] Dead-lettered ${entry.id} after ${entry.attempts} attempts: ${errorMsg}`) 708 const failure = classifyRuntimeFailure({ source: 'webhook', message: errorMsg }) 709 if (session?.id) { 710 recordSupervisorIncident({ 711 runId: entry.id, 712 sessionId: session.id as string, 713 taskId: null, 714 agentId: agentId || null, 715 source: 'webhook', 716 kind: 'runtime_failure', 717 severity: failure.severity, 718 summary: `Webhook delivery dead-lettered: ${errorMsg}`.slice(0, 320), 719 details: errorMsg, 720 failureFamily: failure.family, 721 remediation: failure.remediation, 722 repairPrompt: failure.repairPrompt, 723 autoAction: null, 724 }) 725 } 726 727 appendWebhookLog(genId(8), { 728 id: genId(8), 729 webhookId: entry.webhookId, 730 event: entry.event, 731 payload: (entry.payload || '').slice(0, 2000), 732 status: 'error', 733 error: `Dead-lettered after ${entry.attempts} attempts: ${errorMsg}`, 734 timestamp: Date.now(), 735 }) 736 } else { 737 // Exponential backoff: 30s * 2^attempt + random jitter (0-5000ms) 738 const jitter = Math.floor(Math.random() * 5000) 739 entry.nextRetryAt = Date.now() + (30_000 * Math.pow(2, entry.attempts)) + jitter 740 upsertWebhookRetry(entry.id, entry) 741 log.warn(TAG, `[webhook-retry] Retry ${entry.id} failed (attempt ${entry.attempts}/${entry.maxAttempts}), next at ${new Date(entry.nextRetryAt).toISOString()}: ${errorMsg}`) 742 } 743 } 744 } 745 } 746 747 async function runProviderHealthChecks() { 748 const agents = loadAgents() 749 const credentials = loadCredentials() 750 751 // Build deduplicated set of { provider, credentialId, apiEndpoint } tuples 752 const seen = new Set<string>() 753 const tuples: { provider: string; credentialId: string; apiEndpoint: string; agentId: string; credentialName: string }[] = [] 754 755 for (const agent of Object.values(agents) as unknown as Record<string, unknown>[]) { 756 if (!agent?.id || typeof agent.id !== 'string') continue 757 if (shouldSuppressSyntheticAgentHealthAlert(agent.id)) continue 758 const provider = typeof agent.provider === 'string' ? agent.provider : '' 759 if (!provider || ['claude-cli', 'codex-cli', 'opencode-cli', 'gemini-cli', 'copilot-cli', 'droid-cli', 'cursor-cli', 'qwen-code-cli', 'goose'].includes(provider)) continue 760 761 const credentialId = typeof agent.credentialId === 'string' ? agent.credentialId : '' 762 const apiEndpoint = typeof agent.apiEndpoint === 'string' ? agent.apiEndpoint : '' 763 764 // For OpenClaw, scope per agent (each may have a different gateway) 765 const key = provider === 'openclaw' 766 ? `openclaw:${agent.id}` 767 : `${provider}:${credentialId || 'no-cred'}:${apiEndpoint}` 768 if (seen.has(key)) continue 769 seen.add(key) 770 771 const cred = credentialId ? (credentials[credentialId] as unknown as Record<string, unknown> | undefined) : undefined 772 const credName = typeof cred?.name === 'string' ? cred.name : provider 773 774 tuples.push({ 775 provider, 776 credentialId, 777 apiEndpoint, 778 agentId: agent.id, 779 credentialName: credName, 780 }) 781 } 782 783 for (const tuple of tuples) { 784 // Circuit breaker: skip providers that have failed repeatedly 785 const cbKey = `${tuple.provider}:${tuple.credentialId || 'no-cred'}:${tuple.apiEndpoint}` 786 const cb = ds.providerPingCircuitBreaker.get(cbKey) 787 const now = Date.now() 788 if (cb && cb.skipUntil > now) continue 789 790 let apiKey: string | undefined 791 if (tuple.credentialId) { 792 const cred = credentials[tuple.credentialId] as unknown as Record<string, unknown> | undefined 793 if (cred?.encryptedKey && typeof cred.encryptedKey === 'string') { 794 try { apiKey = decryptKey(cred.encryptedKey) } catch { /* skip undecryptable */ continue } 795 } 796 } 797 798 const endpoint = tuple.apiEndpoint || OPENAI_COMPATIBLE_DEFAULTS[tuple.provider]?.defaultEndpoint || undefined 799 const result = await pingProvider(tuple.provider, apiKey, endpoint) 800 801 if (!result.ok) { 802 // Update circuit breaker state 803 const existing = ds.providerPingCircuitBreaker.get(cbKey) || { consecutiveFailures: 0, skipUntil: 0 } 804 existing.consecutiveFailures += 1 805 if (existing.consecutiveFailures >= PROVIDER_PING_CB_THRESHOLD) { 806 const cooldown = Math.min( 807 PROVIDER_PING_CB_BASE_MS * Math.pow(2, existing.consecutiveFailures - PROVIDER_PING_CB_THRESHOLD), 808 PROVIDER_PING_CB_MAX_MS, 809 ) 810 existing.skipUntil = now + cooldown 811 log.info(TAG, `[health] Circuit breaker tripped for ${tuple.credentialName} — skipping pings for ${Math.round(cooldown / 60_000)}m`) 812 } 813 ds.providerPingCircuitBreaker.set(cbKey, existing) 814 815 if (!shouldNotifyProviderReachabilityIssue(tuple.provider)) { 816 continue 817 } 818 819 const dedupKey = `provider-down:${tuple.credentialId || tuple.provider}` 820 821 const entityType = tuple.credentialId ? 'credential' : undefined 822 const entityId = tuple.credentialId || undefined 823 824 createNotification({ 825 type: 'warning', 826 title: `Provider unreachable: ${tuple.credentialName}`, 827 message: result.message, 828 dedupKey, 829 entityType, 830 entityId, 831 }) 832 } else { 833 // Success — clear circuit breaker 834 ds.providerPingCircuitBreaker.delete(cbKey) 835 } 836 } 837 } 838 839 const OPENCLAW_REPAIR_MAX_ATTEMPTS = 3 840 const OPENCLAW_REPAIR_COOLDOWN_MS = 300_000 // 5 minutes 841 842 async function runOpenClawGatewayHealthChecks() { 843 const agents = loadAgents() 844 const credentials = loadCredentials() 845 846 // Build deduplicated OpenClaw agent tuples 847 const seen = new Set<string>() 848 const tuples: { agentId: string; endpoint: string; credentialId: string; credentialName: string }[] = [] 849 850 for (const agent of Object.values(agents) as unknown as Record<string, unknown>[]) { 851 if (!agent?.id || typeof agent.id !== 'string') continue 852 if (shouldSuppressSyntheticAgentHealthAlert(agent.id)) continue 853 if (agent.provider !== 'openclaw') continue 854 855 const key = `openclaw:${agent.id}` 856 if (seen.has(key)) continue 857 seen.add(key) 858 859 const credentialId = typeof agent.credentialId === 'string' ? agent.credentialId : '' 860 const endpoint = typeof agent.apiEndpoint === 'string' ? agent.apiEndpoint : '' 861 const cred = credentialId ? (credentials[credentialId] as unknown as Record<string, unknown> | undefined) : undefined 862 const credName = typeof cred?.name === 'string' ? cred.name : 'openclaw' 863 864 tuples.push({ agentId: agent.id, endpoint, credentialId, credentialName: credName }) 865 } 866 867 if (!tuples.length) return 868 869 const { probeOpenClawHealth } = await import('@/lib/server/openclaw/health') 870 871 for (const tuple of tuples) { 872 let token: string | undefined 873 if (tuple.credentialId) { 874 const cred = credentials[tuple.credentialId] as unknown as Record<string, unknown> | undefined 875 if (cred?.encryptedKey && typeof cred.encryptedKey === 'string') { 876 try { token = decryptKey(cred.encryptedKey) } catch { continue } 877 } 878 } 879 880 const result = await probeOpenClawHealth({ 881 endpoint: tuple.endpoint || undefined, 882 token, 883 timeoutMs: 10_000, 884 }) 885 886 const now = Date.now() 887 888 if (result.ok) { 889 // Recovered 890 if (ds.openclawDownAgentIds.has(tuple.agentId)) { 891 ds.openclawDownAgentIds.delete(tuple.agentId) 892 ds.openclawRepairState.delete(tuple.agentId) 893 createNotification({ 894 type: 'success', 895 title: 'OpenClaw gateway recovered', 896 message: `Gateway for ${tuple.credentialName} is reachable again.`, 897 dedupKey: `openclaw-gw-down:${tuple.agentId}`, 898 }) 899 } 900 continue 901 } 902 903 // Unhealthy 904 const repair = ds.openclawRepairState.get(tuple.agentId) || { attempts: 0, lastAttemptAt: 0, cooldownUntil: 0 } 905 906 // In cooldown — skip 907 if (repair.cooldownUntil > now) continue 908 909 // Cooldown expired — reset 910 if (repair.cooldownUntil > 0 && repair.cooldownUntil <= now) { 911 repair.attempts = 0 912 repair.cooldownUntil = 0 913 } 914 915 ds.openclawDownAgentIds.add(tuple.agentId) 916 917 if (repair.attempts < OPENCLAW_REPAIR_MAX_ATTEMPTS) { 918 try { 919 const { runOpenClawDoctor } = await import('@/lib/server/openclaw/doctor') 920 await runOpenClawDoctor({ fix: true }) 921 } catch (err: unknown) { 922 log.warn(TAG, '[daemon] openclaw doctor --fix failed:', errorMessage(err)) 923 } 924 repair.attempts += 1 925 repair.lastAttemptAt = now 926 } else { 927 repair.cooldownUntil = now + OPENCLAW_REPAIR_COOLDOWN_MS 928 } 929 930 ds.openclawRepairState.set(tuple.agentId, repair) 931 932 createNotification({ 933 type: 'error', 934 title: `OpenClaw gateway unreachable: ${tuple.credentialName}`, 935 message: result.error || 'Health check failed', 936 dedupKey: `openclaw-gw-down:${tuple.agentId}`, 937 }) 938 } 939 } 940 941 /** 942 * Prune orphaned entries from module-level Maps/Sets that reference 943 * sessions, connectors, or agents that no longer exist in storage. 944 * Runs every health-check cycle (2 minutes). 945 */ 946 function pruneOrphanedState(sessions: Record<string, unknown>): void { 947 const liveSessionIds = new Set(Object.keys(sessions)) 948 949 // Main-loop state map (per-session autonomous state) 950 pruneMainLoopState(liveSessionIds) 951 952 // Heartbeat service tracking maps 953 pruneHeartbeatState(liveSessionIds) 954 955 // System event queues for dead sessions 956 pruneSystemEventQueues(liveSessionIds) 957 958 // Subagent lineage/handle registry — remove finished subagent state older than 30 min 959 cleanupFinishedSubagents() 960 961 // Process manager — sweep completed processes older than TTL 962 sweepManagedProcesses() 963 964 // Reap orphaned sandbox containers from prior crashes 965 reapOrphanedSandboxContainers().catch((err) => { 966 log.warn(TAG, '[daemon] Orphaned sandbox reap failed:', typeof err === 'object' && err !== null && 'message' in err ? (err as Error).message : String(err)) 967 }) 968 969 // Daemon-local: prune openclawRepairState for agents that no longer exist 970 const agents = loadAgents() 971 for (const agentId of ds.openclawRepairState.keys()) { 972 if (!agents[agentId]) ds.openclawRepairState.delete(agentId) 973 } 974 for (const agentId of ds.openclawDownAgentIds) { 975 if (!agents[agentId]) ds.openclawDownAgentIds.delete(agentId) 976 } 977 978 // Orchestrator event queues for dead agents 979 const liveAgentIds = new Set(Object.keys(agents)) 980 pruneOrchestratorEventQueues(liveAgentIds) 981 982 // Orchestrator wake/failure/dailyCycles Maps for deleted agents 983 pruneOrchestratorState(liveAgentIds) 984 985 // Connector tracking Maps for deleted connectors 986 const connectors = loadConnectors() 987 pruneConnectorTrackingState(new Set(Object.keys(connectors))) 988 989 // Prune circuit breaker entries for providers that no longer have any agent referencing them 990 const liveProviderKeys = new Set<string>() 991 for (const agent of Object.values(agents) as unknown as Record<string, unknown>[]) { 992 if (!agent?.id) continue 993 const p = typeof agent.provider === 'string' ? agent.provider : '' 994 const c = typeof agent.credentialId === 'string' ? agent.credentialId : '' 995 const e = typeof agent.apiEndpoint === 'string' ? agent.apiEndpoint : '' 996 if (p) liveProviderKeys.add(`${p}:${c || 'no-cred'}:${e}`) 997 } 998 for (const key of ds.providerPingCircuitBreaker.keys()) { 999 if (!liveProviderKeys.has(key)) ds.providerPingCircuitBreaker.delete(key) 1000 } 1001 } 1002 1003 async function runMemoryMaintenanceTick(): Promise<void> { 1004 try { 1005 const memDb = getMemoryDb() 1006 const result = memDb.maintain({ dedupe: true, pruneWorking: true, ttlHours: 24 }) 1007 if (result.deduped > 0 || result.pruned > 0) { 1008 log.info(TAG, `[daemon] Memory maintenance: deduped=${result.deduped}, pruned=${result.pruned}`) 1009 } 1010 } catch (err: unknown) { 1011 log.warn(TAG, '[daemon] Memory maintenance tick failed:', err instanceof Error ? err.message : String(err)) 1012 } 1013 } 1014 1015 async function runHealthChecks() { 1016 // Continuously keep the completed queue honest. 1017 validateCompletedTasksQueue() 1018 recoverStalledRunningTasks() 1019 1020 // Watchdog: abort runs stuck in running state beyond their timeout threshold. 1021 try { 1022 const stuck = sweepStuckRuns() 1023 if (stuck.aborted > 0) { 1024 log.info(TAG, `[daemon] Watchdog: aborted ${stuck.aborted} stuck run(s)`) 1025 } 1026 } catch (err: unknown) { 1027 log.error(TAG, '[daemon] Stuck-run watchdog failed:', err instanceof Error ? err.message : String(err)) 1028 } 1029 1030 // Keep heartbeat state in sync with task terminal states even without daemon restarts. 1031 cleanupFinishedTaskSessions() 1032 1033 // Re-queue deferred tasks whose agents have become available again. 1034 try { promoteDeferred() } catch {} 1035 1036 const sessions = loadSessions() 1037 const now = Date.now() 1038 const currentlyStale = new Set<string>() 1039 const dirtySessionIds: string[] = [] 1040 1041 for (const session of Object.values(sessions) as unknown as Record<string, unknown>[]) { 1042 if (!session?.id || typeof session.id !== 'string') continue 1043 if (session.heartbeatEnabled !== true) continue 1044 1045 const sessionId = session.id 1046 if (shouldSuppressSessionHeartbeatHealthAlert(session as Pick<Session, 'id' | 'name' | 'user' | 'shortcutForAgentId'>)) { 1047 ds.staleSessionIds.delete(sessionId) 1048 continue 1049 } 1050 1051 const sessionLabel = String(session.name || sessionId) 1052 const intervalSec = parseHeartbeatIntervalSec(session.heartbeatIntervalSec, DEFAULT_HEARTBEAT_INTERVAL_SEC) 1053 if (intervalSec <= 0) continue 1054 const staleAfter = Math.max(intervalSec * STALE_MULTIPLIER * 1000, STALE_MIN_MS) 1055 const lastActive = typeof session.lastActiveAt === 'number' ? session.lastActiveAt : 0 1056 if (lastActive <= 0) continue 1057 1058 const staleForMs = now - lastActive 1059 if (staleForMs > staleAfter) { 1060 const autoDisableAfter = Math.max(intervalSec * STALE_AUTO_DISABLE_MULTIPLIER * 1000, STALE_AUTO_DISABLE_MIN_MS) 1061 if (staleForMs > autoDisableAfter) { 1062 session.heartbeatEnabled = false 1063 session.lastActiveAt = now 1064 dirtySessionIds.push(sessionId) 1065 ds.staleSessionIds.delete(sessionId) 1066 await sendHealthAlert({ 1067 text: `Auto-disabled heartbeat for stale session "${sessionLabel}" after ${Math.round(staleForMs / 60_000)}m of inactivity.`, 1068 dedupKey: buildSessionHeartbeatHealthDedupKey(sessionId, 'auto-disabled'), 1069 entityType: 'session', 1070 entityId: sessionId, 1071 }) 1072 continue 1073 } 1074 1075 currentlyStale.add(sessionId) 1076 // Only alert on transition from healthy → stale (once per stale episode) 1077 if (!ds.staleSessionIds.has(sessionId)) { 1078 ds.staleSessionIds.add(sessionId) 1079 await sendHealthAlert({ 1080 text: `Session "${sessionLabel}" heartbeat appears stale (last active ${(Math.round(staleForMs / 1000))}s ago, interval ${intervalSec}s).`, 1081 dedupKey: buildSessionHeartbeatHealthDedupKey(sessionId, 'stale'), 1082 entityType: 'session', 1083 entityId: sessionId, 1084 }) 1085 } 1086 } 1087 } 1088 1089 // Clear recovered sessions so they can re-alert if they go stale again later 1090 for (const id of ds.staleSessionIds) { 1091 if (!currentlyStale.has(id)) { 1092 ds.staleSessionIds.delete(id) 1093 } 1094 } 1095 1096 for (const sid of dirtySessionIds) { 1097 const s = sessions[sid] 1098 if (s) { 1099 const { upsertSession: upsert } = await import('@/lib/server/storage') 1100 upsert(sid, s) 1101 } 1102 } 1103 1104 // Provider reachability checks 1105 try { 1106 await runProviderHealthChecks() 1107 } catch (err: unknown) { 1108 log.error(TAG, '[daemon] Provider health check failed:', errorMessage(err)) 1109 } 1110 1111 // OpenClaw gateway health checks + auto-repair 1112 try { 1113 await runOpenClawGatewayHealthChecks() 1114 } catch (err: unknown) { 1115 log.error(TAG, '[daemon] OpenClaw gateway health check failed:', errorMessage(err)) 1116 } 1117 1118 // Integrity drift monitoring for identity/config/extension files. 1119 try { 1120 const integrity = runIntegrityMonitor(loadSettings()) 1121 ds.lastIntegrityCheckAt = integrity.checkedAt 1122 ds.lastIntegrityDriftCount = integrity.drifts.length 1123 if (integrity.drifts.length > 0) { 1124 for (const drift of integrity.drifts) { 1125 const rel = path.relative(process.cwd(), drift.filePath) 1126 const shortPath = rel && !rel.startsWith('..') ? rel : drift.filePath 1127 const action = drift.type === 'created' 1128 ? 'created' 1129 : drift.type === 'deleted' 1130 ? 'deleted' 1131 : 'modified' 1132 createNotification({ 1133 type: drift.type === 'deleted' ? 'error' : 'warning', 1134 title: `Integrity drift detected (${drift.kind})`, 1135 message: `${shortPath} was ${action}.`, 1136 dedupKey: `integrity:${drift.id}:${drift.nextHash || 'missing'}`, 1137 entityType: 'session', 1138 entityId: drift.id, 1139 }) 1140 } 1141 await sendHealthAlert(`Integrity monitor detected ${integrity.drifts.length} file drift event(s).`) 1142 } 1143 } catch (err: unknown) { 1144 log.error(TAG, '[daemon] Integrity monitor check failed:', errorMessage(err)) 1145 } 1146 1147 // Process webhook retry queue 1148 try { 1149 await processWebhookRetries() 1150 } catch (err: unknown) { 1151 log.error(TAG, '[daemon] Webhook retry processing failed:', errorMessage(err)) 1152 } 1153 1154 // Periodic memory hygiene: prune orphaned state for deleted sessions/connectors 1155 try { 1156 pruneOrphanedState(sessions) 1157 } catch (err: unknown) { 1158 log.error(TAG, '[daemon] Memory hygiene sweep failed:', errorMessage(err)) 1159 } 1160 1161 // Prune old terminal runs and their events to prevent unbounded growth 1162 try { 1163 const pruned = pruneOldRuns() 1164 if (pruned.prunedRuns > 0 || pruned.prunedEvents > 0) { 1165 log.info(TAG, `[daemon] Pruned ${pruned.prunedRuns} old run(s) and ${pruned.prunedEvents} run event(s)`) 1166 } 1167 } catch (err: unknown) { 1168 log.error(TAG, '[daemon] Run pruning failed:', err instanceof Error ? err.message : String(err)) 1169 } 1170 1171 // Prune expired runtime locks 1172 try { 1173 const locksRemoved = pruneExpiredLocks() 1174 if (locksRemoved > 0) { 1175 log.info(TAG, `[daemon] Pruned ${locksRemoved} expired lock(s)`) 1176 } 1177 } catch (err: unknown) { 1178 log.error(TAG, '[daemon] Lock pruning failed:', err instanceof Error ? err.message : String(err)) 1179 } 1180 1181 // Prune old execution logs (30-day retention) 1182 try { 1183 const logsRemoved = clearLogsByAge(30 * 24 * 3600_000) 1184 if (logsRemoved > 0) { 1185 log.info(TAG, `[daemon] Pruned ${logsRemoved} old execution log(s)`) 1186 } 1187 } catch (err: unknown) { 1188 log.error(TAG, '[daemon] Execution log pruning failed:', errorMessage(err)) 1189 } 1190 1191 // Prune old usage records (90-day retention) 1192 try { 1193 const usageRemoved = pruneOldUsage(90 * 24 * 3600_000) 1194 if (usageRemoved > 0) { 1195 log.info(TAG, `[daemon] Pruned ${usageRemoved} old usage record(s)`) 1196 } 1197 } catch (err: unknown) { 1198 log.error(TAG, '[daemon] Usage pruning failed:', errorMessage(err)) 1199 } 1200 1201 // Periodic memory database maintenance (dedup + TTL pruning) 1202 try { 1203 await runMemoryMaintenanceTick() 1204 } catch (err: unknown) { 1205 log.error(TAG, '[daemon] Memory maintenance failed:', err instanceof Error ? err.message : String(err)) 1206 } 1207 1208 // Drain idle-window callbacks when the system is quiet 1209 try { 1210 await drainIdleWindowCallbacks() 1211 } catch (err: unknown) { 1212 log.error(TAG, '[daemon] Idle-window drain failed:', err instanceof Error ? err.message : String(err)) 1213 } 1214 } 1215 1216 function startHealthMonitor() { 1217 if (ds.healthIntervalId) return 1218 ds.healthIntervalId = setInterval(() => { 1219 if (ds.healthCheckRunning || ds.shuttingDown) return 1220 ds.healthCheckRunning = true 1221 runHealthChecks() 1222 .catch((err) => { 1223 log.error(TAG, '[daemon] Health monitor tick failed:', err?.message || String(err)) 1224 }) 1225 .finally(() => { ds.healthCheckRunning = false }) 1226 }, HEALTH_CHECK_INTERVAL) 1227 } 1228 1229 function stopHealthMonitor() { 1230 if (ds.healthIntervalId) { 1231 clearInterval(ds.healthIntervalId) 1232 ds.healthIntervalId = null 1233 } 1234 } 1235 1236 function syncDaemonBackgroundServices(options?: { runConnectorHealthCheckImmediately?: boolean }) { 1237 if (isDaemonBackgroundServicesEnabled()) { 1238 startHealthMonitor() 1239 startConnectorHealthMonitor({ 1240 runImmediately: options?.runConnectorHealthCheckImmediately !== false, 1241 }) 1242 startConnectorOutboxWorker() 1243 startEvalScheduler() 1244 return 1245 } 1246 stopHealthMonitor() 1247 stopConnectorHealthMonitor() 1248 stopConnectorOutboxWorker() 1249 stopEvalScheduler() 1250 } 1251 1252 function startConnectorHealthMonitor(options?: { runImmediately?: boolean }) { 1253 if (ds.connectorHealthIntervalId) return 1254 1255 const tick = () => { 1256 if (ds.connectorHealthCheckRunning || ds.shuttingDown) return 1257 ds.connectorHealthCheckRunning = true 1258 runConnectorHealthChecks(Date.now()) 1259 .catch((err) => { 1260 log.error(TAG, '[daemon] Connector health tick failed:', errorMessage(err)) 1261 }) 1262 .finally(() => { ds.connectorHealthCheckRunning = false }) 1263 } 1264 1265 if (options?.runImmediately !== false) tick() 1266 ds.connectorHealthIntervalId = setInterval(tick, CONNECTOR_HEALTH_CHECK_INTERVAL) 1267 } 1268 1269 function stopConnectorHealthMonitor() { 1270 if (ds.connectorHealthIntervalId) { 1271 clearInterval(ds.connectorHealthIntervalId) 1272 ds.connectorHealthIntervalId = null 1273 } 1274 } 1275 1276 function runConsolidationTick() { 1277 import('@/lib/server/memory/memory-consolidation').then(({ runDailyConsolidation, registerConsolidationIdleCallback, registerCompactionIdleCallback }) => { 1278 // Wire idle-window callbacks so consolidation, compaction, and dreaming run during quiet periods 1279 registerConsolidationIdleCallback() 1280 registerCompactionIdleCallback() 1281 import('@/lib/server/memory/dream-idle-callback').then(({ registerDreamIdleCallback }) => { 1282 registerDreamIdleCallback() 1283 }).catch((err: unknown) => { 1284 log.error(TAG, '[daemon] Dream idle callback registration failed:', errorMessage(err)) 1285 }) 1286 1287 return runDailyConsolidation().then((stats) => { 1288 if (stats.digests > 0 || stats.pruned > 0 || stats.deduped > 0) { 1289 log.info(TAG, `[daemon] Memory consolidation: ${stats.digests} digest(s), ${stats.pruned} pruned, ${stats.deduped} deduped`) 1290 } 1291 if (stats.errors.length > 0) { 1292 log.warn(TAG, `[daemon] Memory consolidation errors: ${stats.errors.join('; ')}`) 1293 } 1294 }) 1295 }).catch((err: unknown) => { 1296 log.error(TAG, '[daemon] Memory consolidation failed:', errorMessage(err)) 1297 }) 1298 } 1299 1300 function startMemoryConsolidation() { 1301 if (ds.memoryConsolidationTimeoutId || ds.memoryConsolidationIntervalId) return 1302 // Deferred first run, then repeat on interval 1303 ds.memoryConsolidationTimeoutId = setTimeout(() => { 1304 ds.memoryConsolidationTimeoutId = null 1305 runConsolidationTick() 1306 ds.memoryConsolidationIntervalId = setInterval(runConsolidationTick, MEMORY_CONSOLIDATION_INTERVAL) 1307 }, MEMORY_CONSOLIDATION_INITIAL_DELAY) 1308 } 1309 1310 function stopMemoryConsolidation() { 1311 if (ds.memoryConsolidationTimeoutId) { 1312 clearTimeout(ds.memoryConsolidationTimeoutId) 1313 ds.memoryConsolidationTimeoutId = null 1314 } 1315 if (ds.memoryConsolidationIntervalId) { 1316 clearInterval(ds.memoryConsolidationIntervalId) 1317 ds.memoryConsolidationIntervalId = null 1318 } 1319 } 1320 1321 // --- Eval scheduler --- 1322 1323 const EVAL_DEFAULT_INTERVAL_MS = 24 * 3600_000 // 24 hours 1324 1325 async function runEvalSchedulerTick() { 1326 try { 1327 const settings = loadSettings() 1328 if (!settings.autonomyEvalEnabled) return 1329 1330 const { runEvalSuite } = await import('@/lib/server/eval/runner') 1331 const agents = loadAgents() 1332 const heartbeatAgentIds = Object.keys(agents).filter( 1333 (id) => agents[id].heartbeatEnabled === true, 1334 ) 1335 1336 for (const agentId of heartbeatAgentIds) { 1337 try { 1338 const result = await runEvalSuite(agentId) 1339 log.info(TAG, 1340 `[daemon:eval] Agent ${agents[agentId].name}: ${result.percentage}% (${result.totalScore}/${result.maxScore})`, 1341 ) 1342 createNotification({ 1343 title: `Eval: ${agents[agentId].name} scored ${result.percentage}%`, 1344 message: `${result.runs.length} scenarios, ${result.totalScore}/${result.maxScore} points`, 1345 type: result.percentage >= 60 ? 'info' : 'warning', 1346 }) 1347 } catch (err: unknown) { 1348 log.error(TAG, `[daemon:eval] Failed for agent ${agentId}:`, errorMessage(err)) 1349 } 1350 } 1351 } catch (err: unknown) { 1352 log.error(TAG, '[daemon:eval] Scheduler tick error:', errorMessage(err)) 1353 } 1354 } 1355 1356 function startEvalScheduler() { 1357 if (ds.evalSchedulerIntervalId) return 1358 try { 1359 const settings = loadSettings() 1360 if (!settings.autonomyEvalEnabled) return 1361 const intervalMs = parseCronToMs(settings.autonomyEvalCron, EVAL_DEFAULT_INTERVAL_MS) || EVAL_DEFAULT_INTERVAL_MS 1362 ds.evalSchedulerIntervalId = setInterval(runEvalSchedulerTick, intervalMs) 1363 log.info(TAG, `[daemon:eval] Eval scheduler started (interval=${Math.round(intervalMs / 3600_000)}h)`) 1364 } catch { 1365 // Eval scheduling is optional — don't block daemon start 1366 } 1367 } 1368 1369 function stopEvalScheduler() { 1370 if (ds.evalSchedulerIntervalId) { 1371 clearInterval(ds.evalSchedulerIntervalId) 1372 ds.evalSchedulerIntervalId = null 1373 } 1374 } 1375 1376 const SWARM_TIMEOUT_CHECK_INTERVAL = 30_000 1377 1378 function startSwarmTimeoutChecker() { 1379 if (ds.swarmTimeoutIntervalId) return 1380 ds.swarmTimeoutIntervalId = setInterval(() => { 1381 if (!ds.running || ds.shuttingDown) return 1382 try { 1383 checkSwarmTimeouts() 1384 } catch (err: unknown) { 1385 log.error(TAG, `[daemon] Swarm timeout check error: ${errorMessage(err)}`) 1386 } 1387 }, SWARM_TIMEOUT_CHECK_INTERVAL) 1388 } 1389 1390 function stopSwarmTimeoutChecker() { 1391 if (ds.swarmTimeoutIntervalId) { 1392 clearInterval(ds.swarmTimeoutIntervalId) 1393 ds.swarmTimeoutIntervalId = null 1394 } 1395 } 1396 1397 function refreshDaemonTimersForHotReload() { 1398 if (!ds.running) return 1399 1400 if (ds.queueIntervalId) { 1401 clearInterval(ds.queueIntervalId) 1402 ds.queueIntervalId = null 1403 startQueueProcessor() 1404 } 1405 1406 if (ds.browserSweepId) { 1407 clearInterval(ds.browserSweepId) 1408 ds.browserSweepId = null 1409 startBrowserSweep() 1410 } 1411 1412 if (ds.healthIntervalId) { 1413 clearInterval(ds.healthIntervalId) 1414 ds.healthIntervalId = null 1415 } 1416 1417 if (ds.connectorHealthIntervalId) { 1418 clearInterval(ds.connectorHealthIntervalId) 1419 ds.connectorHealthIntervalId = null 1420 } 1421 1422 if (ds.memoryConsolidationTimeoutId || ds.memoryConsolidationIntervalId) { 1423 stopMemoryConsolidation() 1424 startMemoryConsolidation() 1425 } 1426 1427 if (ds.evalSchedulerIntervalId) { 1428 stopEvalScheduler() 1429 } 1430 1431 if (ds.swarmTimeoutIntervalId) { 1432 stopSwarmTimeoutChecker() 1433 startSwarmTimeoutChecker() 1434 } 1435 1436 syncDaemonBackgroundServices() 1437 } 1438 1439 // In dev/HMR, the daemon state survives on globalThis while interval callbacks keep 1440 // the old module closure alive. Refresh long-lived timers so they always run the 1441 // current module's logic instead of stale health-alert code paths. 1442 refreshDaemonTimersForHotReload() 1443 1444 export async function runDaemonHealthCheckNow() { 1445 // Bypass circuit breaker for manual/forced checks 1446 ds.providerPingCircuitBreaker.clear() 1447 await Promise.all([ 1448 runHealthChecks(), 1449 runConnectorHealthChecks(Date.now()), 1450 ]) 1451 } 1452 1453 export async function runConnectorHealthCheckNowForTest(now = Date.now()) { 1454 await runConnectorHealthChecks(now) 1455 } 1456 1457 export function getDaemonStatus() { 1458 const estop = loadEstopState() 1459 const queue = loadQueue() 1460 const schedules = loadSchedules() 1461 const reconnectStates = Object.values(getAllReconnectStates()) 1462 1463 // Find next scheduled task 1464 let nextScheduled: number | null = null 1465 for (const s of Object.values(schedules) as unknown as Record<string, unknown>[]) { 1466 if (s.status === 'active' && s.nextRunAt) { 1467 if (!nextScheduled || (s.nextRunAt as number) < nextScheduled) { 1468 nextScheduled = s.nextRunAt as number 1469 } 1470 } 1471 } 1472 1473 // Webhook retry queue stats 1474 const retryQueue = loadWebhookRetryQueue() 1475 const retryEntries = Object.values(retryQueue) as WebhookRetryEntry[] 1476 const pendingRetries = retryEntries.filter(e => !e.deadLettered).length 1477 const deadLettered = retryEntries.filter(e => e.deadLettered).length 1478 1479 return { 1480 running: ds.running, 1481 schedulerActive: ds.running, 1482 autostartEnabled: daemonAutostartEnvEnabled(), 1483 backgroundServicesEnabled: isDaemonBackgroundServicesEnabled(), 1484 reducedMode: !isDaemonBackgroundServicesEnabled(), 1485 manualStopRequested: ds.manualStopRequested, 1486 estop, 1487 queueLength: queue.length, 1488 lastProcessed: ds.lastProcessedAt, 1489 nextScheduled, 1490 heartbeat: getHeartbeatServiceStatus(), 1491 health: { 1492 monitorActive: !!ds.healthIntervalId, 1493 connectorMonitorActive: !!ds.connectorHealthIntervalId, 1494 staleSessions: ds.staleSessionIds.size, 1495 connectorsInBackoff: reconnectStates.filter((state) => !state.exhausted).length, 1496 connectorsExhausted: reconnectStates.filter((state) => state.exhausted).length, 1497 checkIntervalSec: Math.trunc(HEALTH_CHECK_INTERVAL / 1000), 1498 connectorCheckIntervalSec: Math.trunc(CONNECTOR_HEALTH_CHECK_INTERVAL / 1000), 1499 integrity: { 1500 enabled: loadSettings().integrityMonitorEnabled !== false, 1501 lastCheckedAt: ds.lastIntegrityCheckAt, 1502 lastDriftCount: ds.lastIntegrityDriftCount, 1503 }, 1504 }, 1505 webhookRetry: { 1506 pendingRetries, 1507 deadLettered, 1508 }, 1509 guards: { 1510 healthCheckRunning: ds.healthCheckRunning, 1511 connectorHealthCheckRunning: ds.connectorHealthCheckRunning, 1512 shuttingDown: ds.shuttingDown, 1513 providerCircuitBreakers: ds.providerPingCircuitBreaker.size, 1514 }, 1515 } 1516 } 1517 1518 /** 1519 * Lightweight health summary safe for external consumption. 1520 * Reads cached state only — no probes or side effects. 1521 */ 1522 export function getDaemonHealthSummary(): { 1523 ok: boolean 1524 uptime: number 1525 components: { 1526 daemon: { status: 'healthy' | 'stopped' | 'degraded' } 1527 connectors: { healthy: number; errored: number; total: number } 1528 providers: { healthy: number; cooldown: number; total: number } 1529 gateways: { healthy: number; degraded: number; total: number } 1530 } 1531 estop: boolean 1532 nextScheduledTask: number | null 1533 } { 1534 const estopState = loadEstopState() 1535 const estopActive = estopState.level !== 'none' 1536 1537 // Daemon status 1538 const daemonStatus: 'healthy' | 'stopped' | 'degraded' = !ds.running 1539 ? 'stopped' 1540 : estopActive ? 'degraded' : 'healthy' 1541 1542 // Connector summary 1543 const connectors = loadConnectors() 1544 const connectorEntries = Object.values(connectors) as unknown as Record<string, unknown>[] 1545 const enabledConnectors = connectorEntries.filter(c => c?.isEnabled === true) 1546 let healthyConnectors = 0 1547 let erroredConnectors = 0 1548 for (const c of enabledConnectors) { 1549 if (typeof c.id === 'string' && getConnectorStatus(c.id) === 'running') { 1550 healthyConnectors++ 1551 } else { 1552 erroredConnectors++ 1553 } 1554 } 1555 1556 // Provider summary (based on circuit breaker state) 1557 const agents = loadAgents() 1558 const agentEntries = Object.values(agents) as unknown as Record<string, unknown>[] 1559 const providerKeys = new Set<string>() 1560 for (const agent of agentEntries) { 1561 if (!agent?.id || typeof agent.id !== 'string') continue 1562 const provider = typeof agent.provider === 'string' ? agent.provider : '' 1563 if (!provider || ['claude-cli', 'codex-cli', 'opencode-cli', 'gemini-cli', 'copilot-cli', 'droid-cli', 'cursor-cli', 'qwen-code-cli', 'goose'].includes(provider)) continue 1564 const credentialId = typeof agent.credentialId === 'string' ? agent.credentialId : '' 1565 const apiEndpoint = typeof agent.apiEndpoint === 'string' ? agent.apiEndpoint : '' 1566 providerKeys.add(`${provider}:${credentialId || 'no-cred'}:${apiEndpoint}`) 1567 } 1568 const now = Date.now() 1569 let cooldownProviders = 0 1570 for (const key of providerKeys) { 1571 const cb = ds.providerPingCircuitBreaker.get(key) 1572 if (cb && cb.skipUntil > now) cooldownProviders++ 1573 } 1574 1575 // Gateway summary (OpenClaw gateways) 1576 const totalGateways = ds.openclawDownAgentIds.size 1577 + agentEntries.filter(a => a?.provider === 'openclaw' && !ds.openclawDownAgentIds.has(a.id as string)).length 1578 const degradedGateways = ds.openclawDownAgentIds.size 1579 1580 // Next scheduled task 1581 const schedules = loadSchedules() 1582 let nextScheduled: number | null = null 1583 for (const s of Object.values(schedules) as unknown as Record<string, unknown>[]) { 1584 if (s.status === 'active' && s.nextRunAt) { 1585 if (!nextScheduled || (s.nextRunAt as number) < nextScheduled) { 1586 nextScheduled = s.nextRunAt as number 1587 } 1588 } 1589 } 1590 1591 const allProvidersDown = providerKeys.size > 0 && cooldownProviders >= providerKeys.size 1592 const ok = ds.running && !estopActive && !allProvidersDown 1593 1594 return { 1595 ok, 1596 uptime: Math.trunc(process.uptime()), 1597 components: { 1598 daemon: { status: daemonStatus }, 1599 connectors: { 1600 healthy: healthyConnectors, 1601 errored: erroredConnectors, 1602 total: enabledConnectors.length, 1603 }, 1604 providers: { 1605 healthy: providerKeys.size - cooldownProviders, 1606 cooldown: cooldownProviders, 1607 total: providerKeys.size, 1608 }, 1609 gateways: { 1610 healthy: totalGateways - degradedGateways, 1611 degraded: degradedGateways, 1612 total: totalGateways, 1613 }, 1614 }, 1615 estop: estopActive, 1616 nextScheduledTask: nextScheduled, 1617 } 1618 }