/ src / lib / server / daemon / daemon-runtime.ts
daemon-runtime.ts
  1  import http from 'node:http'
  2  import { URL } from 'node:url'
  3  
  4  import { loadConnectors } from '@/lib/server/connectors/connector-repository'
  5  import {
  6    clearReconnectState,
  7    getConnectorPresence,
  8    getConnectorQR,
  9    getConnectorStatus,
 10    getReconnectState,
 11    hasConnectorCredentials,
 12    isConnectorAuthenticated,
 13    listRunningConnectors,
 14    repairConnector,
 15    startConnector,
 16    stopConnector,
 17  } from '@/lib/server/connectors/manager'
 18  import { log } from '@/lib/server/logger'
 19  import {
 20    getDaemonHealthSummary,
 21    getDaemonStatus,
 22    runDaemonHealthCheckNow,
 23    startDaemon,
 24    stopDaemon,
 25  } from '@/lib/server/runtime/daemon-state'
 26  import {
 27    clearDaemonAdminMetadata,
 28    readDaemonAdminMetadata,
 29    writeDaemonAdminMetadata,
 30  } from '@/lib/server/daemon/admin-metadata'
 31  import {
 32    loadDaemonStatusRecord,
 33    patchDaemonStatusRecord,
 34  } from '@/lib/server/daemon/daemon-status-repository'
 35  import type {
 36    DaemonAdminMetadata,
 37    DaemonConnectorRuntimeState,
 38    DaemonHealthSummaryPayload,
 39    DaemonRunningConnectorInfo,
 40    DaemonStatusPayload,
 41  } from '@/lib/server/daemon/types'
 42  
 43  const TAG = 'daemon-runtime'
 44  const HEARTBEAT_FLUSH_INTERVAL_MS = 5_000
 45  
 46  type AdminSnapshotResponse = {
 47    status: DaemonStatusPayload
 48    healthSummary: DaemonHealthSummaryPayload
 49  }
 50  
 51  function parseArgs(argv: string[]): { port: number; token: string } {
 52    let port = Number.parseInt(process.env.SWARMCLAW_DAEMON_ADMIN_PORT || '', 10)
 53    let token = (process.env.SWARMCLAW_DAEMON_ADMIN_TOKEN || '').trim()
 54    for (let index = 0; index < argv.length; index += 1) {
 55      const arg = argv[index]
 56      if (arg === '--port' && index + 1 < argv.length) {
 57        port = Number.parseInt(argv[index + 1] || '', 10)
 58        index += 1
 59      } else if (arg === '--token' && index + 1 < argv.length) {
 60        token = (argv[index + 1] || '').trim()
 61        index += 1
 62      }
 63    }
 64    if (!Number.isFinite(port) || port <= 0) {
 65      throw new Error('Missing daemon admin port.')
 66    }
 67    if (!token) {
 68      throw new Error('Missing daemon admin token.')
 69    }
 70    return { port, token }
 71  }
 72  
 73  function sendJson(res: http.ServerResponse, statusCode: number, payload: unknown): void {
 74    const body = JSON.stringify(payload)
 75    res.writeHead(statusCode, {
 76      'content-type': 'application/json; charset=utf-8',
 77      'content-length': Buffer.byteLength(body),
 78    })
 79    res.end(body)
 80  }
 81  
 82  async function readJsonBody(req: http.IncomingMessage): Promise<Record<string, unknown>> {
 83    const chunks: Buffer[] = []
 84    for await (const chunk of req) {
 85      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
 86    }
 87    if (chunks.length === 0) return {}
 88    return JSON.parse(Buffer.concat(chunks).toString('utf8')) as Record<string, unknown>
 89  }
 90  
 91  function buildSnapshot(): AdminSnapshotResponse {
 92    return {
 93      status: getDaemonStatus() as DaemonStatusPayload,
 94      healthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload,
 95    }
 96  }
 97  
 98  function buildConnectorRuntimeSnapshot(): Record<string, DaemonConnectorRuntimeState> {
 99    const connectors = loadConnectors()
100    const snapshot: Record<string, DaemonConnectorRuntimeState> = {}
101    for (const connector of Object.values(connectors)) {
102      const runtimeStatus = getConnectorStatus(connector.id)
103      const reconnectState = getReconnectState(connector.id)
104      snapshot[connector.id] = {
105        status: runtimeStatus === 'running'
106          ? 'running'
107          : connector.lastError
108            ? 'error'
109            : 'stopped',
110        authenticated: connector.platform === 'whatsapp' ? isConnectorAuthenticated(connector.id) : undefined,
111        hasCredentials: connector.platform === 'whatsapp' ? hasConnectorCredentials(connector.id) : undefined,
112        qrDataUrl: connector.platform === 'whatsapp' ? getConnectorQR(connector.id) : undefined,
113        reconnectAttempts: reconnectState?.attempts,
114        nextRetryAt: reconnectState?.nextRetryAt,
115        reconnectError: reconnectState?.error ?? null,
116        reconnectExhausted: reconnectState?.exhausted,
117        presence: runtimeStatus === 'running' ? getConnectorPresence(connector.id) : null,
118      }
119    }
120    return snapshot
121  }
122  
123  async function buildConnectorActionSnapshot(
124    connectorId: string,
125    action: 'start' | 'stop' | 'repair',
126  ): Promise<DaemonConnectorRuntimeState | null> {
127    if (!loadConnectors()[connectorId]) return null
128    if (action === 'start') {
129      clearReconnectState(connectorId)
130      await startConnector(connectorId)
131    } else if (action === 'stop') {
132      await stopConnector(connectorId)
133    } else {
134      clearReconnectState(connectorId)
135      await repairConnector(connectorId)
136    }
137    return buildConnectorRuntimeSnapshot()[connectorId] || null
138  }
139  
140  let shuttingDown = false
141  let heartbeatInterval: ReturnType<typeof setInterval> | null = null
142  let server: http.Server | null = null
143  
144  function persistHeartbeat(): void {
145    const snapshot = buildSnapshot()
146    patchDaemonStatusRecord((current) => ({
147      ...current,
148      pid: process.pid,
149      desiredState: snapshot.status.running ? 'running' : current.desiredState,
150      manualStopRequested: current.manualStopRequested,
151      startedAt: current.startedAt || Date.now(),
152      stoppedAt: snapshot.status.running ? null : current.stoppedAt,
153      adminPort: readDaemonAdminMetadata()?.port ?? current.adminPort,
154      lastHeartbeatAt: Date.now(),
155      updatedAt: Date.now(),
156      lastError: null,
157      lastStatus: snapshot.status,
158      lastHealthSummary: snapshot.healthSummary,
159    }))
160  }
161  
162  async function shutdown(source: string): Promise<void> {
163    if (shuttingDown) return
164    shuttingDown = true
165    if (heartbeatInterval) {
166      clearInterval(heartbeatInterval)
167      heartbeatInterval = null
168    }
169    try {
170      await stopDaemon({ source, manualStop: false })
171    } catch (err: unknown) {
172      log.warn(TAG, `Daemon runtime stop failed (${source})`, err)
173    }
174    const snapshot = buildSnapshot()
175    patchDaemonStatusRecord((current) => ({
176      ...current,
177      pid: null,
178      adminPort: null,
179      desiredState: 'stopped',
180      startedAt: current.startedAt,
181      stoppedAt: Date.now(),
182      lastHeartbeatAt: current.lastHeartbeatAt,
183      updatedAt: Date.now(),
184      lastStopSource: source,
185      lastStatus: {
186        ...snapshot.status,
187        running: false,
188        schedulerActive: false,
189      },
190      lastHealthSummary: {
191        ...snapshot.healthSummary,
192        ok: false,
193        components: {
194          ...snapshot.healthSummary.components,
195          daemon: { status: 'stopped' },
196        },
197      },
198    }))
199    clearDaemonAdminMetadata()
200    await new Promise<void>((resolve) => {
201      if (!server) {
202        resolve()
203        return
204      }
205      server.close(() => resolve())
206    })
207  }
208  
209  async function main(): Promise<void> {
210    const { port, token } = parseArgs(process.argv.slice(2))
211    const metadata: DaemonAdminMetadata = {
212      pid: process.pid,
213      port,
214      token,
215      launchedAt: Date.now(),
216      source: loadDaemonStatusRecord().lastLaunchSource,
217    }
218    writeDaemonAdminMetadata(metadata)
219  
220    const started = startDaemon({ source: 'daemon-runtime:boot', manualStart: true })
221    const snapshot = buildSnapshot()
222    if (!started && !snapshot.status.running) {
223      patchDaemonStatusRecord((current) => ({
224        ...current,
225        pid: null,
226        adminPort: null,
227        desiredState: 'stopped',
228        stoppedAt: Date.now(),
229        updatedAt: Date.now(),
230        lastError: 'Daemon runtime could not acquire execution lease.',
231        lastStatus: {
232          ...snapshot.status,
233          running: false,
234          schedulerActive: false,
235        },
236        lastHealthSummary: {
237          ...snapshot.healthSummary,
238          ok: false,
239          components: {
240            ...snapshot.healthSummary.components,
241            daemon: { status: 'stopped' },
242          },
243        },
244      }))
245      clearDaemonAdminMetadata()
246      process.exitCode = 1
247      return
248    }
249  
250    persistHeartbeat()
251    heartbeatInterval = setInterval(() => {
252      persistHeartbeat()
253    }, HEARTBEAT_FLUSH_INTERVAL_MS)
254  
255    server = http.createServer(async (req, res) => {
256      try {
257        const auth = req.headers.authorization || ''
258        if (auth !== `Bearer ${token}`) {
259          sendJson(res, 401, { error: 'Unauthorized' })
260          return
261        }
262        const url = new URL(req.url || '/', `http://127.0.0.1:${port}`)
263        if (req.method === 'GET' && url.pathname === '/status') {
264          sendJson(res, 200, buildSnapshot())
265          return
266        }
267        if (req.method === 'POST' && url.pathname === '/health-check') {
268          await runDaemonHealthCheckNow()
269          persistHeartbeat()
270          sendJson(res, 200, buildSnapshot())
271          return
272        }
273        if (req.method === 'POST' && url.pathname === '/stop') {
274          sendJson(res, 200, { ok: true })
275          void shutdown('daemon-admin:stop').finally(() => process.exit(0))
276          return
277        }
278        if (req.method === 'GET' && url.pathname === '/connectors') {
279          sendJson(res, 200, { connectors: buildConnectorRuntimeSnapshot() })
280          return
281        }
282        if (req.method === 'GET' && url.pathname === '/connectors/running') {
283          const platform = url.searchParams.get('platform') || undefined
284          sendJson(res, 200, { connectors: listRunningConnectors(platform) as DaemonRunningConnectorInfo[] })
285          return
286        }
287        const connectorMatch = url.pathname.match(/^\/connectors\/([^/]+)$/)
288        if (req.method === 'GET' && connectorMatch) {
289          const connectorId = decodeURIComponent(connectorMatch[1] || '')
290          sendJson(res, 200, { connector: buildConnectorRuntimeSnapshot()[connectorId] || null })
291          return
292        }
293        const actionMatch = url.pathname.match(/^\/connectors\/([^/]+)\/actions$/)
294        if (req.method === 'POST' && actionMatch) {
295          const connectorId = decodeURIComponent(actionMatch[1] || '')
296          const body = await readJsonBody(req)
297          const action = typeof body.action === 'string' ? body.action.trim().toLowerCase() : ''
298          if (action !== 'start' && action !== 'stop' && action !== 'repair') {
299            sendJson(res, 400, { error: 'Invalid connector action.' })
300            return
301          }
302          const connector = await buildConnectorActionSnapshot(connectorId, action)
303          sendJson(res, connector ? 200 : 404, { connector })
304          return
305        }
306        sendJson(res, 404, { error: 'Not found' })
307      } catch (err: unknown) {
308        sendJson(res, 500, { error: err instanceof Error ? err.message : 'Daemon admin failure' })
309      }
310    })
311  
312    await new Promise<void>((resolve) => {
313      server!.listen(port, '127.0.0.1', () => resolve())
314    })
315  
316    for (const signal of ['SIGINT', 'SIGTERM'] as const) {
317      process.on(signal, () => {
318        void shutdown(`signal:${signal}`).finally(() => process.exit(0))
319      })
320    }
321    process.on('uncaughtException', (err: Error) => {
322      try {
323        patchDaemonStatusRecord((current) => ({
324          ...current,
325          lastError: err.message,
326          updatedAt: Date.now(),
327        }))
328      } catch (patchErr) {
329        log.error(TAG, 'Failed to record uncaughtException in daemon status', patchErr)
330      }
331      void shutdown('uncaughtException').finally(() => process.exit(1))
332    })
333    process.on('unhandledRejection', (reason: unknown) => {
334      try {
335        patchDaemonStatusRecord((current) => ({
336          ...current,
337          lastError: reason instanceof Error ? reason.message : String(reason),
338          updatedAt: Date.now(),
339        }))
340      } catch (patchErr) {
341        log.error(TAG, 'Failed to record unhandledRejection in daemon status', patchErr)
342      }
343      void shutdown('unhandledRejection').finally(() => process.exit(1))
344    })
345  }
346  
347  void main().catch((err: unknown) => {
348    try {
349      patchDaemonStatusRecord((current) => ({
350        ...current,
351        pid: null,
352        adminPort: null,
353        desiredState: 'stopped',
354        stoppedAt: Date.now(),
355        updatedAt: Date.now(),
356        lastError: err instanceof Error ? err.message : 'Daemon runtime failed to start',
357      }))
358    } catch (patchErr) {
359      log.error(TAG, 'Failed to record fatal daemon error in status', patchErr)
360    }
361    clearDaemonAdminMetadata()
362    log.error(TAG, 'Fatal daemon runtime error', err)
363    process.exit(1)
364  })