daemon-runtime.ts
1 import http from 'node:http' 2 import { URL } from 'node:url' 3 4 import { loadConnectors } from '@/lib/server/connectors/connector-repository' 5 import { 6 clearReconnectState, 7 getConnectorPresence, 8 getConnectorQR, 9 getConnectorStatus, 10 getReconnectState, 11 hasConnectorCredentials, 12 isConnectorAuthenticated, 13 listRunningConnectors, 14 repairConnector, 15 startConnector, 16 stopConnector, 17 } from '@/lib/server/connectors/manager' 18 import { log } from '@/lib/server/logger' 19 import { 20 getDaemonHealthSummary, 21 getDaemonStatus, 22 runDaemonHealthCheckNow, 23 startDaemon, 24 stopDaemon, 25 } from '@/lib/server/runtime/daemon-state' 26 import { 27 clearDaemonAdminMetadata, 28 readDaemonAdminMetadata, 29 writeDaemonAdminMetadata, 30 } from '@/lib/server/daemon/admin-metadata' 31 import { 32 loadDaemonStatusRecord, 33 patchDaemonStatusRecord, 34 } from '@/lib/server/daemon/daemon-status-repository' 35 import type { 36 DaemonAdminMetadata, 37 DaemonConnectorRuntimeState, 38 DaemonHealthSummaryPayload, 39 DaemonRunningConnectorInfo, 40 DaemonStatusPayload, 41 } from '@/lib/server/daemon/types' 42 43 const TAG = 'daemon-runtime' 44 const HEARTBEAT_FLUSH_INTERVAL_MS = 5_000 45 46 type AdminSnapshotResponse = { 47 status: DaemonStatusPayload 48 healthSummary: DaemonHealthSummaryPayload 49 } 50 51 function parseArgs(argv: string[]): { port: number; token: string } { 52 let port = Number.parseInt(process.env.SWARMCLAW_DAEMON_ADMIN_PORT || '', 10) 53 let token = (process.env.SWARMCLAW_DAEMON_ADMIN_TOKEN || '').trim() 54 for (let index = 0; index < argv.length; index += 1) { 55 const arg = argv[index] 56 if (arg === '--port' && index + 1 < argv.length) { 57 port = Number.parseInt(argv[index + 1] || '', 10) 58 index += 1 59 } else if (arg === '--token' && index + 1 < argv.length) { 60 token = (argv[index + 1] || '').trim() 61 index += 1 62 } 63 } 64 if (!Number.isFinite(port) || port <= 0) { 65 throw new Error('Missing daemon admin port.') 66 } 67 if (!token) { 68 throw new Error('Missing daemon admin token.') 69 } 70 return { port, token } 71 } 72 73 function sendJson(res: http.ServerResponse, statusCode: number, payload: unknown): void { 74 const body = JSON.stringify(payload) 75 res.writeHead(statusCode, { 76 'content-type': 'application/json; charset=utf-8', 77 'content-length': Buffer.byteLength(body), 78 }) 79 res.end(body) 80 } 81 82 async function readJsonBody(req: http.IncomingMessage): Promise<Record<string, unknown>> { 83 const chunks: Buffer[] = [] 84 for await (const chunk of req) { 85 chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) 86 } 87 if (chunks.length === 0) return {} 88 return JSON.parse(Buffer.concat(chunks).toString('utf8')) as Record<string, unknown> 89 } 90 91 function buildSnapshot(): AdminSnapshotResponse { 92 return { 93 status: getDaemonStatus() as DaemonStatusPayload, 94 healthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload, 95 } 96 } 97 98 function buildConnectorRuntimeSnapshot(): Record<string, DaemonConnectorRuntimeState> { 99 const connectors = loadConnectors() 100 const snapshot: Record<string, DaemonConnectorRuntimeState> = {} 101 for (const connector of Object.values(connectors)) { 102 const runtimeStatus = getConnectorStatus(connector.id) 103 const reconnectState = getReconnectState(connector.id) 104 snapshot[connector.id] = { 105 status: runtimeStatus === 'running' 106 ? 'running' 107 : connector.lastError 108 ? 'error' 109 : 'stopped', 110 authenticated: connector.platform === 'whatsapp' ? isConnectorAuthenticated(connector.id) : undefined, 111 hasCredentials: connector.platform === 'whatsapp' ? hasConnectorCredentials(connector.id) : undefined, 112 qrDataUrl: connector.platform === 'whatsapp' ? getConnectorQR(connector.id) : undefined, 113 reconnectAttempts: reconnectState?.attempts, 114 nextRetryAt: reconnectState?.nextRetryAt, 115 reconnectError: reconnectState?.error ?? null, 116 reconnectExhausted: reconnectState?.exhausted, 117 presence: runtimeStatus === 'running' ? getConnectorPresence(connector.id) : null, 118 } 119 } 120 return snapshot 121 } 122 123 async function buildConnectorActionSnapshot( 124 connectorId: string, 125 action: 'start' | 'stop' | 'repair', 126 ): Promise<DaemonConnectorRuntimeState | null> { 127 if (!loadConnectors()[connectorId]) return null 128 if (action === 'start') { 129 clearReconnectState(connectorId) 130 await startConnector(connectorId) 131 } else if (action === 'stop') { 132 await stopConnector(connectorId) 133 } else { 134 clearReconnectState(connectorId) 135 await repairConnector(connectorId) 136 } 137 return buildConnectorRuntimeSnapshot()[connectorId] || null 138 } 139 140 let shuttingDown = false 141 let heartbeatInterval: ReturnType<typeof setInterval> | null = null 142 let server: http.Server | null = null 143 144 function persistHeartbeat(): void { 145 const snapshot = buildSnapshot() 146 patchDaemonStatusRecord((current) => ({ 147 ...current, 148 pid: process.pid, 149 desiredState: snapshot.status.running ? 'running' : current.desiredState, 150 manualStopRequested: current.manualStopRequested, 151 startedAt: current.startedAt || Date.now(), 152 stoppedAt: snapshot.status.running ? null : current.stoppedAt, 153 adminPort: readDaemonAdminMetadata()?.port ?? current.adminPort, 154 lastHeartbeatAt: Date.now(), 155 updatedAt: Date.now(), 156 lastError: null, 157 lastStatus: snapshot.status, 158 lastHealthSummary: snapshot.healthSummary, 159 })) 160 } 161 162 async function shutdown(source: string): Promise<void> { 163 if (shuttingDown) return 164 shuttingDown = true 165 if (heartbeatInterval) { 166 clearInterval(heartbeatInterval) 167 heartbeatInterval = null 168 } 169 try { 170 await stopDaemon({ source, manualStop: false }) 171 } catch (err: unknown) { 172 log.warn(TAG, `Daemon runtime stop failed (${source})`, err) 173 } 174 const snapshot = buildSnapshot() 175 patchDaemonStatusRecord((current) => ({ 176 ...current, 177 pid: null, 178 adminPort: null, 179 desiredState: 'stopped', 180 startedAt: current.startedAt, 181 stoppedAt: Date.now(), 182 lastHeartbeatAt: current.lastHeartbeatAt, 183 updatedAt: Date.now(), 184 lastStopSource: source, 185 lastStatus: { 186 ...snapshot.status, 187 running: false, 188 schedulerActive: false, 189 }, 190 lastHealthSummary: { 191 ...snapshot.healthSummary, 192 ok: false, 193 components: { 194 ...snapshot.healthSummary.components, 195 daemon: { status: 'stopped' }, 196 }, 197 }, 198 })) 199 clearDaemonAdminMetadata() 200 await new Promise<void>((resolve) => { 201 if (!server) { 202 resolve() 203 return 204 } 205 server.close(() => resolve()) 206 }) 207 } 208 209 async function main(): Promise<void> { 210 const { port, token } = parseArgs(process.argv.slice(2)) 211 const metadata: DaemonAdminMetadata = { 212 pid: process.pid, 213 port, 214 token, 215 launchedAt: Date.now(), 216 source: loadDaemonStatusRecord().lastLaunchSource, 217 } 218 writeDaemonAdminMetadata(metadata) 219 220 const started = startDaemon({ source: 'daemon-runtime:boot', manualStart: true }) 221 const snapshot = buildSnapshot() 222 if (!started && !snapshot.status.running) { 223 patchDaemonStatusRecord((current) => ({ 224 ...current, 225 pid: null, 226 adminPort: null, 227 desiredState: 'stopped', 228 stoppedAt: Date.now(), 229 updatedAt: Date.now(), 230 lastError: 'Daemon runtime could not acquire execution lease.', 231 lastStatus: { 232 ...snapshot.status, 233 running: false, 234 schedulerActive: false, 235 }, 236 lastHealthSummary: { 237 ...snapshot.healthSummary, 238 ok: false, 239 components: { 240 ...snapshot.healthSummary.components, 241 daemon: { status: 'stopped' }, 242 }, 243 }, 244 })) 245 clearDaemonAdminMetadata() 246 process.exitCode = 1 247 return 248 } 249 250 persistHeartbeat() 251 heartbeatInterval = setInterval(() => { 252 persistHeartbeat() 253 }, HEARTBEAT_FLUSH_INTERVAL_MS) 254 255 server = http.createServer(async (req, res) => { 256 try { 257 const auth = req.headers.authorization || '' 258 if (auth !== `Bearer ${token}`) { 259 sendJson(res, 401, { error: 'Unauthorized' }) 260 return 261 } 262 const url = new URL(req.url || '/', `http://127.0.0.1:${port}`) 263 if (req.method === 'GET' && url.pathname === '/status') { 264 sendJson(res, 200, buildSnapshot()) 265 return 266 } 267 if (req.method === 'POST' && url.pathname === '/health-check') { 268 await runDaemonHealthCheckNow() 269 persistHeartbeat() 270 sendJson(res, 200, buildSnapshot()) 271 return 272 } 273 if (req.method === 'POST' && url.pathname === '/stop') { 274 sendJson(res, 200, { ok: true }) 275 void shutdown('daemon-admin:stop').finally(() => process.exit(0)) 276 return 277 } 278 if (req.method === 'GET' && url.pathname === '/connectors') { 279 sendJson(res, 200, { connectors: buildConnectorRuntimeSnapshot() }) 280 return 281 } 282 if (req.method === 'GET' && url.pathname === '/connectors/running') { 283 const platform = url.searchParams.get('platform') || undefined 284 sendJson(res, 200, { connectors: listRunningConnectors(platform) as DaemonRunningConnectorInfo[] }) 285 return 286 } 287 const connectorMatch = url.pathname.match(/^\/connectors\/([^/]+)$/) 288 if (req.method === 'GET' && connectorMatch) { 289 const connectorId = decodeURIComponent(connectorMatch[1] || '') 290 sendJson(res, 200, { connector: buildConnectorRuntimeSnapshot()[connectorId] || null }) 291 return 292 } 293 const actionMatch = url.pathname.match(/^\/connectors\/([^/]+)\/actions$/) 294 if (req.method === 'POST' && actionMatch) { 295 const connectorId = decodeURIComponent(actionMatch[1] || '') 296 const body = await readJsonBody(req) 297 const action = typeof body.action === 'string' ? body.action.trim().toLowerCase() : '' 298 if (action !== 'start' && action !== 'stop' && action !== 'repair') { 299 sendJson(res, 400, { error: 'Invalid connector action.' }) 300 return 301 } 302 const connector = await buildConnectorActionSnapshot(connectorId, action) 303 sendJson(res, connector ? 200 : 404, { connector }) 304 return 305 } 306 sendJson(res, 404, { error: 'Not found' }) 307 } catch (err: unknown) { 308 sendJson(res, 500, { error: err instanceof Error ? err.message : 'Daemon admin failure' }) 309 } 310 }) 311 312 await new Promise<void>((resolve) => { 313 server!.listen(port, '127.0.0.1', () => resolve()) 314 }) 315 316 for (const signal of ['SIGINT', 'SIGTERM'] as const) { 317 process.on(signal, () => { 318 void shutdown(`signal:${signal}`).finally(() => process.exit(0)) 319 }) 320 } 321 process.on('uncaughtException', (err: Error) => { 322 try { 323 patchDaemonStatusRecord((current) => ({ 324 ...current, 325 lastError: err.message, 326 updatedAt: Date.now(), 327 })) 328 } catch (patchErr) { 329 log.error(TAG, 'Failed to record uncaughtException in daemon status', patchErr) 330 } 331 void shutdown('uncaughtException').finally(() => process.exit(1)) 332 }) 333 process.on('unhandledRejection', (reason: unknown) => { 334 try { 335 patchDaemonStatusRecord((current) => ({ 336 ...current, 337 lastError: reason instanceof Error ? reason.message : String(reason), 338 updatedAt: Date.now(), 339 })) 340 } catch (patchErr) { 341 log.error(TAG, 'Failed to record unhandledRejection in daemon status', patchErr) 342 } 343 void shutdown('unhandledRejection').finally(() => process.exit(1)) 344 }) 345 } 346 347 void main().catch((err: unknown) => { 348 try { 349 patchDaemonStatusRecord((current) => ({ 350 ...current, 351 pid: null, 352 adminPort: null, 353 desiredState: 'stopped', 354 stoppedAt: Date.now(), 355 updatedAt: Date.now(), 356 lastError: err instanceof Error ? err.message : 'Daemon runtime failed to start', 357 })) 358 } catch (patchErr) { 359 log.error(TAG, 'Failed to record fatal daemon error in status', patchErr) 360 } 361 clearDaemonAdminMetadata() 362 log.error(TAG, 'Fatal daemon runtime error', err) 363 process.exit(1) 364 })