/ src / lib / server / daemon / controller.ts
controller.ts
  1  import crypto from 'node:crypto'
  2  import { spawn } from 'node:child_process'
  3  import fs from 'node:fs'
  4  import net from 'node:net'
  5  import path from 'node:path'
  6  
  7  import { log } from '@/lib/server/logger'
  8  import {
  9    DAEMON_LOG_PATH,
 10    clearDaemonAdminMetadata,
 11    isProcessRunning,
 12    readDaemonAdminMetadata,
 13    writeDaemonAdminMetadata,
 14  } from '@/lib/server/daemon/admin-metadata'
 15  import {
 16    loadDaemonStatusRecord,
 17    patchDaemonStatusRecord,
 18  } from '@/lib/server/daemon/daemon-status-repository'
 19  import type {
 20    DaemonAdminMetadata,
 21    DaemonConnectorRuntimeState,
 22    DaemonHealthSummaryPayload,
 23    DaemonRunningConnectorInfo,
 24    DaemonStatusPayload,
 25  } from '@/lib/server/daemon/types'
 26  import { DATA_DIR } from '@/lib/server/data-dir'
 27  import { loadEstopState } from '@/lib/server/runtime/estop'
 28  import {
 29    getDaemonHealthSummary,
 30    getDaemonStatus,
 31    startDaemon,
 32    stopDaemon,
 33  } from '@/lib/server/runtime/daemon-state/core'
 34  import { daemonAutostartEnvEnabled } from '@/lib/server/runtime/daemon-policy'
 35  import {
 36    releaseRuntimeLock,
 37    tryAcquireRuntimeLock,
 38  } from '@/lib/server/runtime/runtime-lock-repository'
 39  import { errorMessage, hmrSingleton } from '@/lib/shared-utils'
 40  
 41  // HMR-safe single-shot guard so the "subprocess fallback unavailable"
 42  // warning logs once per process lifetime, not per API call.
 43  const subprocessFallbackUnavailableLogged = hmrSingleton<{ value: boolean }>(
 44    '__swarmclaw_daemon_subprocess_fallback_warned__',
 45    () => ({ value: false }),
 46  )
 47  
 48  const TAG = 'daemon-controller'
 49  const LAUNCH_LOCK_NAME = 'daemon-launcher'
 50  const LAUNCH_LOCK_TTL_MS = 20_000
 51  const DAEMON_READY_TIMEOUT_MS = 20_000
 52  const DAEMON_POLL_INTERVAL_MS = 250
 53  const DAEMON_STALE_AFTER_MS = 20_000
 54  
 55  function now(): number {
 56    return Date.now()
 57  }
 58  
 59  function createLockOwner(): string {
 60    return `launcher:${process.pid}:${crypto.randomBytes(6).toString('hex')}`
 61  }
 62  
 63  function buildDefaultStatus(): DaemonStatusPayload {
 64    return {
 65      running: false,
 66      schedulerActive: false,
 67      autostartEnabled: daemonAutostartEnvEnabled(),
 68      backgroundServicesEnabled: true,
 69      reducedMode: false,
 70      manualStopRequested: false,
 71      estop: loadEstopState(),
 72      queueLength: 0,
 73      lastProcessed: null,
 74      nextScheduled: null,
 75      heartbeat: null,
 76      health: {
 77        monitorActive: false,
 78        connectorMonitorActive: false,
 79        staleSessions: 0,
 80        connectorsInBackoff: 0,
 81        connectorsExhausted: 0,
 82        checkIntervalSec: 120,
 83        connectorCheckIntervalSec: 15,
 84        integrity: {
 85          enabled: true,
 86          lastCheckedAt: null,
 87          lastDriftCount: 0,
 88        },
 89      },
 90      webhookRetry: {
 91        pendingRetries: 0,
 92        deadLettered: 0,
 93      },
 94      guards: {
 95        healthCheckRunning: false,
 96        connectorHealthCheckRunning: false,
 97        shuttingDown: false,
 98        providerCircuitBreakers: 0,
 99      },
100    }
101  }
102  
103  function buildDefaultHealthSummary(): DaemonHealthSummaryPayload {
104    const estop = loadEstopState().level !== 'none'
105    return {
106      ok: false,
107      uptime: 0,
108      components: {
109        daemon: { status: estop ? 'degraded' : 'stopped' },
110        connectors: { healthy: 0, errored: 0, total: 0 },
111        providers: { healthy: 0, cooldown: 0, total: 0 },
112        gateways: { healthy: 0, degraded: 0, total: 0 },
113      },
114      estop,
115      nextScheduledTask: null,
116    }
117  }
118  
119  function getDaemonHomeDir(): string {
120    const configured = process.env.SWARMCLAW_HOME?.trim()
121    if (configured) return path.resolve(configured)
122    return path.dirname(DATA_DIR)
123  }
124  
125  function resolveDaemonRoot(): string | null {
126    const candidates = [
127      process.env.SWARMCLAW_BUILD_ROOT,
128      process.env.SWARMCLAW_PACKAGE_ROOT,
129      process.cwd(),
130    ]
131      .filter((value): value is string => typeof value === 'string' && value.trim().length > 0)
132      .map((value) => path.resolve(value))
133  
134    for (const root of candidates) {
135      if (fs.existsSync(path.join(root, 'src', 'lib', 'server', 'daemon', 'daemon-runtime.ts'))) {
136        return root
137      }
138    }
139  
140    return null
141  }
142  
143  function resolveDaemonRuntimeEntry(): { root: string; entry: string } {
144    const root = resolveDaemonRoot()
145    if (!root) {
146      throw new Error('Unable to locate daemon runtime entry. Set SWARMCLAW_BUILD_ROOT or SWARMCLAW_PACKAGE_ROOT.')
147    }
148    return {
149      root,
150      entry: path.join(root, 'src', 'lib', 'server', 'daemon', 'daemon-runtime.ts'),
151    }
152  }
153  
154  function buildDaemonUrl(port: number, routePath: string): string {
155    const normalized = routePath.startsWith('/') ? routePath : `/${routePath}`
156    return `http://127.0.0.1:${port}${normalized}`
157  }
158  
159  function withTimeout(timeoutMs = 2_000): AbortSignal {
160    return AbortSignal.timeout(timeoutMs)
161  }
162  
163  async function readJsonResponse<T>(response: Response): Promise<T> {
164    const text = await response.text()
165    if (!text) return {} as T
166    return JSON.parse(text) as T
167  }
168  
169  type DaemonSnapshotResponse = {
170    status: DaemonStatusPayload
171    healthSummary: DaemonHealthSummaryPayload
172  }
173  
174  function getInProcessDaemonSnapshot(): DaemonSnapshotResponse | null {
175    const status = getDaemonStatus()
176    if (!status.running) return null
177    return {
178      status,
179      healthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload,
180    }
181  }
182  
183  async function requestDaemon<T>(
184    metadata: DaemonAdminMetadata,
185    routePath: string,
186    init?: RequestInit,
187  ): Promise<T> {
188    const headers = new Headers(init?.headers || {})
189    headers.set('authorization', `Bearer ${metadata.token}`)
190    if (init?.body && !headers.has('content-type')) {
191      headers.set('content-type', 'application/json')
192    }
193  
194    const response = await fetch(buildDaemonUrl(metadata.port, routePath), {
195      ...init,
196      headers,
197      signal: init?.signal || withTimeout(),
198    })
199    if (!response.ok) {
200      const detail = await response.text().catch(() => '')
201      throw new Error(`Daemon admin request failed (${response.status}): ${detail || response.statusText}`)
202    }
203    return readJsonResponse<T>(response)
204  }
205  
206  function daemonRecordLooksLive(): boolean {
207    const record = loadDaemonStatusRecord()
208    return Boolean(
209      record.pid
210      && isProcessRunning(record.pid)
211      && record.lastHeartbeatAt
212      && now() - record.lastHeartbeatAt <= DAEMON_STALE_AFTER_MS,
213    )
214  }
215  
216  function buildFallbackStatus(): DaemonStatusPayload {
217    const record = loadDaemonStatusRecord()
218    const base = record.lastStatus ? { ...record.lastStatus } : buildDefaultStatus()
219    const running = daemonRecordLooksLive()
220    return {
221      ...base,
222      running,
223      schedulerActive: running,
224      autostartEnabled: daemonAutostartEnvEnabled(),
225      manualStopRequested: record.manualStopRequested,
226      estop: loadEstopState(),
227    }
228  }
229  
230  function buildFallbackHealthSummary(): DaemonHealthSummaryPayload {
231    const record = loadDaemonStatusRecord()
232    const running = daemonRecordLooksLive()
233    const base = record.lastHealthSummary
234      ? {
235          ...record.lastHealthSummary,
236          components: {
237            ...record.lastHealthSummary.components,
238            daemon: {
239              ...record.lastHealthSummary.components.daemon,
240            },
241          },
242        }
243      : buildDefaultHealthSummary()
244  
245    base.ok = running && base.components.daemon.status !== 'degraded'
246    base.components.daemon.status = running
247      ? (loadEstopState().level === 'none' ? 'healthy' : 'degraded')
248      : 'stopped'
249    base.estop = loadEstopState().level !== 'none'
250    return base
251  }
252  
253  function markDaemonUnavailable(source: string, err?: unknown): void {
254    clearDaemonAdminMetadata()
255    patchDaemonStatusRecord((current) => {
256      const status = current.lastStatus ? { ...current.lastStatus } : buildDefaultStatus()
257      status.running = false
258      status.schedulerActive = false
259      status.estop = loadEstopState()
260      return {
261        ...current,
262        pid: null,
263        adminPort: null,
264        desiredState: current.manualStopRequested ? 'stopped' : current.desiredState,
265        stoppedAt: now(),
266        updatedAt: now(),
267        lastStopSource: source,
268        lastError: err ? errorMessage(err) : current.lastError,
269        lastStatus: status,
270      }
271    })
272  }
273  
274  async function getLiveDaemonSnapshot(): Promise<DaemonSnapshotResponse | null> {
275    const metadata = readDaemonAdminMetadata()
276    if (!metadata) return null
277    if (!isProcessRunning(metadata.pid)) {
278      markDaemonUnavailable('pid-missing')
279      return null
280    }
281    try {
282      return await requestDaemon<DaemonSnapshotResponse>(metadata, '/status')
283    } catch (err: unknown) {
284      if (!isProcessRunning(metadata.pid)) {
285        markDaemonUnavailable('request-failed', err)
286        return null
287      }
288      return null
289    }
290  }
291  
292  async function waitForDaemonReady(metadata: DaemonAdminMetadata): Promise<void> {
293    const deadline = now() + DAEMON_READY_TIMEOUT_MS
294    while (now() < deadline) {
295      if (!isProcessRunning(metadata.pid)) {
296        throw new Error(`Daemon process ${metadata.pid} exited before becoming ready.`)
297      }
298      try {
299        const snapshot = await requestDaemon<DaemonSnapshotResponse>(metadata, '/status')
300        patchDaemonStatusRecord((current) => ({
301          ...current,
302          pid: metadata.pid,
303          adminPort: metadata.port,
304          desiredState: 'running',
305          manualStopRequested: false,
306          startedAt: current.startedAt || metadata.launchedAt,
307          stoppedAt: null,
308          lastHeartbeatAt: now(),
309          updatedAt: now(),
310          lastError: null,
311          lastStatus: snapshot.status,
312          lastHealthSummary: snapshot.healthSummary,
313        }))
314        return
315      } catch {
316        await new Promise((resolve) => setTimeout(resolve, DAEMON_POLL_INTERVAL_MS))
317      }
318    }
319    throw new Error(`Timed out waiting for daemon admin server on port ${metadata.port}.`)
320  }
321  
322  async function waitForProcessExit(pid: number, timeoutMs = 5_000): Promise<void> {
323    const deadline = now() + timeoutMs
324    while (now() < deadline) {
325      if (!isProcessRunning(pid)) return
326      await new Promise((resolve) => setTimeout(resolve, 100))
327    }
328  }
329  
330  async function reservePort(): Promise<number> {
331    return new Promise<number>((resolve, reject) => {
332      const server = net.createServer()
333      server.once('error', reject)
334      server.listen(0, '127.0.0.1', () => {
335        const address = server.address()
336        if (!address || typeof address === 'string') {
337          server.close(() => reject(new Error('Failed to reserve daemon admin port.')))
338          return
339        }
340        const { port } = address
341        server.close((err) => {
342          if (err) reject(err)
343          else resolve(port)
344        })
345      })
346    })
347  }
348  
349  function buildDaemonSpawnEnv(root: string, adminPort: number, adminToken: string): NodeJS.ProcessEnv {
350    return {
351      ...process.env,
352      SWARMCLAW_HOME: getDaemonHomeDir(),
353      DATA_DIR,
354      WORKSPACE_DIR: process.env.WORKSPACE_DIR,
355      BROWSER_PROFILES_DIR: process.env.BROWSER_PROFILES_DIR,
356      SWARMCLAW_BUILD_ROOT: process.env.SWARMCLAW_BUILD_ROOT || root,
357      SWARMCLAW_PACKAGE_ROOT: process.env.SWARMCLAW_PACKAGE_ROOT || root,
358      SWARMCLAW_RUNTIME_ROLE: 'daemon',
359      SWARMCLAW_DAEMON_BACKGROUND_SERVICES: '1',
360      SWARMCLAW_DAEMON_ADMIN_PORT: String(adminPort),
361      SWARMCLAW_DAEMON_ADMIN_TOKEN: adminToken,
362    }
363  }
364  
365  export async function ensureDaemonProcessRunning(
366    source: string,
367    opts?: { manualStart?: boolean },
368  ): Promise<boolean> {
369    const manualStart = opts?.manualStart === true
370    const record = loadDaemonStatusRecord()
371    if (loadEstopState().level !== 'none') return false
372    if (!manualStart && !daemonAutostartEnvEnabled()) return false
373    if (!manualStart && record.manualStopRequested) return false
374  
375    const inProcessSnapshot = getInProcessDaemonSnapshot()
376    if (inProcessSnapshot) return false
377  
378    const startedInProcess = startDaemon({ source, manualStart })
379    if (startedInProcess) return true
380    if (getInProcessDaemonSnapshot()) return false
381  
382    const live = await getLiveDaemonSnapshot()
383    if (live?.status.running) return false
384  
385    const lockOwner = createLockOwner()
386    if (!tryAcquireRuntimeLock(LAUNCH_LOCK_NAME, lockOwner, LAUNCH_LOCK_TTL_MS)) return false
387  
388    try {
389      const secondCheck = await getLiveDaemonSnapshot()
390      if (secondCheck?.status.running) return false
391  
392      let resolved: { root: string; entry: string }
393      try {
394        resolved = resolveDaemonRuntimeEntry()
395      } catch (err: unknown) {
396        // The standalone Docker image does not ship `src/` (Next.js standalone
397        // output excludes raw source files), so the subprocess fallback can
398        // never spawn there. Fail soft: log once and let callers fall back to
399        // whatever in-process daemon path is available rather than surfacing
400        // a 500 to API consumers. Reported as issue #41 (Bug 3).
401        if (!subprocessFallbackUnavailableLogged.value) {
402          subprocessFallbackUnavailableLogged.value = true
403          log.warn(TAG, `[daemon] Subprocess fallback unavailable in this build (${errorMessage(err)}). The in-process daemon will continue to be the primary path.`)
404        }
405        return false
406      }
407      const { root, entry } = resolved
408      const adminPort = await reservePort()
409      const adminToken = crypto.randomBytes(24).toString('hex')
410      fs.mkdirSync(path.dirname(DAEMON_LOG_PATH), { recursive: true })
411      const logStream = fs.openSync(DAEMON_LOG_PATH, 'a')
412      const child = spawn(
413        process.execPath,
414        ['--no-warnings', '--import', 'tsx', entry, '--port', String(adminPort), '--token', adminToken],
415        {
416          cwd: root,
417          detached: true,
418          env: buildDaemonSpawnEnv(root, adminPort, adminToken),
419          stdio: ['ignore', logStream, logStream],
420        },
421      )
422  
423      const metadata: DaemonAdminMetadata = {
424        pid: child.pid ?? 0,
425        port: adminPort,
426        token: adminToken,
427        launchedAt: now(),
428        source,
429      }
430      if (!metadata.pid) {
431        throw new Error('Daemon process failed to spawn.')
432      }
433  
434      writeDaemonAdminMetadata(metadata)
435      patchDaemonStatusRecord((current) => ({
436        ...current,
437        pid: metadata.pid,
438        adminPort: metadata.port,
439        desiredState: 'running',
440        manualStopRequested: false,
441        startedAt: current.startedAt,
442        stoppedAt: null,
443        updatedAt: now(),
444        lastLaunchSource: source,
445        lastError: null,
446      }))
447  
448      await waitForDaemonReady(metadata)
449      child.unref()
450      return true
451    } catch (err: unknown) {
452      markDaemonUnavailable(`launch-failed:${source}`, err)
453      throw err
454    } finally {
455      releaseRuntimeLock(LAUNCH_LOCK_NAME, lockOwner)
456    }
457  }
458  
459  export async function stopDaemonProcess(opts?: {
460    source?: string
461    manualStop?: boolean
462  }): Promise<boolean> {
463    const source = opts?.source || 'unknown'
464    const manualStop = opts?.manualStop === true
465    const metadata = readDaemonAdminMetadata()
466    const inProcessSnapshot = getInProcessDaemonSnapshot()
467  
468    if (inProcessSnapshot && (!metadata || metadata.pid === process.pid || !isProcessRunning(metadata.pid))) {
469      await stopDaemon({ source, manualStop })
470      clearDaemonAdminMetadata()
471      patchDaemonStatusRecord((current) => ({
472        ...current,
473        pid: null,
474        adminPort: null,
475        desiredState: 'stopped',
476        manualStopRequested: manualStop ? true : current.manualStopRequested,
477        stoppedAt: now(),
478        updatedAt: now(),
479        lastStopSource: source,
480        lastStatus: {
481          ...getDaemonStatus(),
482          manualStopRequested: manualStop ? true : current.manualStopRequested,
483        },
484        lastHealthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload,
485      }))
486      return true
487    }
488  
489    if (!metadata || !isProcessRunning(metadata.pid)) {
490      clearDaemonAdminMetadata()
491      patchDaemonStatusRecord((current) => ({
492        ...current,
493        pid: null,
494        adminPort: null,
495        desiredState: 'stopped',
496        manualStopRequested: manualStop ? true : current.manualStopRequested,
497        stoppedAt: now(),
498        updatedAt: now(),
499        lastStopSource: source,
500        lastStatus: {
501          ...(current.lastStatus || buildDefaultStatus()),
502          running: false,
503          schedulerActive: false,
504          manualStopRequested: manualStop ? true : current.manualStopRequested,
505          estop: loadEstopState(),
506        },
507      }))
508      return false
509    }
510  
511    try {
512      await requestDaemon<{ ok: boolean }>(metadata, '/stop', {
513        method: 'POST',
514        body: JSON.stringify({ source }),
515      })
516    } catch (err: unknown) {
517      if (isProcessRunning(metadata.pid)) {
518        try {
519          process.kill(metadata.pid, 'SIGTERM')
520        } catch {
521          // Fall through to stale cleanup below.
522        }
523      }
524      log.warn(TAG, `Daemon stop request fell back to SIGTERM (${source})`, errorMessage(err))
525    }
526  
527    await waitForProcessExit(metadata.pid)
528    clearDaemonAdminMetadata()
529    patchDaemonStatusRecord((current) => ({
530      ...current,
531      pid: null,
532      adminPort: null,
533      desiredState: 'stopped',
534      manualStopRequested: manualStop ? true : current.manualStopRequested,
535      stoppedAt: now(),
536      updatedAt: now(),
537      lastStopSource: source,
538      lastStatus: {
539        ...(current.lastStatus || buildDefaultStatus()),
540        running: false,
541        schedulerActive: false,
542        manualStopRequested: manualStop ? true : current.manualStopRequested,
543        estop: loadEstopState(),
544      },
545    }))
546    return true
547  }
548  
549  export async function getDaemonStatusSnapshot(): Promise<DaemonStatusPayload> {
550    const inProcessSnapshot = getInProcessDaemonSnapshot()
551    if (inProcessSnapshot) return inProcessSnapshot.status
552    const live = await getLiveDaemonSnapshot()
553    if (live) return live.status
554    return buildFallbackStatus()
555  }
556  
557  export async function getDaemonHealthSummarySnapshot(): Promise<DaemonHealthSummaryPayload> {
558    const inProcessSnapshot = getInProcessDaemonSnapshot()
559    if (inProcessSnapshot) return inProcessSnapshot.healthSummary
560    const live = await getLiveDaemonSnapshot()
561    if (live) return live.healthSummary
562    return buildFallbackHealthSummary()
563  }
564  
565  export async function runDaemonHealthCheckViaAdmin(source: string): Promise<DaemonSnapshotResponse> {
566    await ensureDaemonProcessRunning(source, { manualStart: true })
567    const inProcessSnapshot = getInProcessDaemonSnapshot()
568    if (inProcessSnapshot) return inProcessSnapshot
569    const metadata = readDaemonAdminMetadata()
570    if (!metadata) {
571      return {
572        status: buildFallbackStatus(),
573        healthSummary: buildFallbackHealthSummary(),
574      }
575    }
576    try {
577      return await requestDaemon<DaemonSnapshotResponse>(metadata, '/health-check', {
578        method: 'POST',
579        body: JSON.stringify({ source }),
580      })
581    } catch (err: unknown) {
582      markDaemonUnavailable(`health-check:${source}`, err)
583      return {
584        status: buildFallbackStatus(),
585        healthSummary: buildFallbackHealthSummary(),
586      }
587    }
588  }
589  
590  export async function listDaemonConnectorRuntime(): Promise<Record<string, DaemonConnectorRuntimeState>> {
591    // When the daemon is running in-process, read runtime state directly.
592    const inProcessStatus = getDaemonStatus()
593    if (inProcessStatus.running) {
594      const { listRunningConnectors, getConnectorStatus, isConnectorAuthenticated, hasConnectorCredentials, getConnectorQR, getConnectorPresence } =
595        await import('@/lib/server/connectors/connector-lifecycle')
596      const result: Record<string, DaemonConnectorRuntimeState> = {}
597      for (const { id } of listRunningConnectors()) {
598        result[id] = {
599          status: getConnectorStatus(id),
600          authenticated: isConnectorAuthenticated(id),
601          hasCredentials: hasConnectorCredentials(id),
602          qrDataUrl: getConnectorQR(id),
603          presence: getConnectorPresence(id),
604        }
605      }
606      return result
607    }
608  
609    const metadata = readDaemonAdminMetadata()
610    if (!metadata || !isProcessRunning(metadata.pid)) return {}
611    try {
612      const result = await requestDaemon<{ connectors: Record<string, DaemonConnectorRuntimeState> }>(metadata, '/connectors')
613      return result.connectors || {}
614    } catch {
615      return {}
616    }
617  }
618  
619  export async function getDaemonConnectorRuntime(connectorId: string): Promise<DaemonConnectorRuntimeState | null> {
620    // When the daemon is running in-process, read runtime state directly from
621    // the connector lifecycle module instead of an unreachable subprocess HTTP API.
622    const inProcessStatus = getDaemonStatus()
623    if (inProcessStatus.running) {
624      const { getConnectorStatus, getConnectorQR, isConnectorAuthenticated, hasConnectorCredentials, getConnectorPresence } =
625        await import('@/lib/server/connectors/connector-lifecycle')
626      const status = getConnectorStatus(connectorId)
627      return {
628        status,
629        authenticated: isConnectorAuthenticated(connectorId),
630        hasCredentials: hasConnectorCredentials(connectorId),
631        qrDataUrl: getConnectorQR(connectorId),
632        presence: getConnectorPresence(connectorId),
633      }
634    }
635  
636    const metadata = readDaemonAdminMetadata()
637    if (!metadata || !isProcessRunning(metadata.pid)) return null
638    try {
639      const result = await requestDaemon<{ connector: DaemonConnectorRuntimeState | null }>(
640        metadata,
641        `/connectors/${encodeURIComponent(connectorId)}`,
642      )
643      return result.connector || null
644    } catch {
645      return null
646    }
647  }
648  
649  export async function runDaemonConnectorAction(
650    connectorId: string,
651    action: 'start' | 'stop' | 'repair',
652    source: string,
653  ): Promise<DaemonConnectorRuntimeState | null> {
654    if (action !== 'stop') {
655      await ensureDaemonProcessRunning(source, { manualStart: true })
656    }
657  
658    // When the daemon is running in-process (e.g. standalone production build),
659    // there is no subprocess admin server or daemon-admin.json. Execute the
660    // connector lifecycle action directly in the current process.
661    const inProcessStatus = getDaemonStatus()
662    if (inProcessStatus.running) {
663      try {
664        const { startConnector, stopConnector, repairConnector } = await import('@/lib/server/connectors/connector-lifecycle')
665        if (action === 'start') {
666          await startConnector(connectorId)
667        } else if (action === 'stop') {
668          await stopConnector(connectorId)
669        } else if (action === 'repair') {
670          await repairConnector(connectorId)
671        }
672      } catch (err: unknown) {
673        log.error(TAG, `In-process connector action "${action}" failed for ${connectorId}:`, errorMessage(err))
674      }
675      return null
676    }
677  
678    const metadata = readDaemonAdminMetadata()
679    if (!metadata || !isProcessRunning(metadata.pid)) return null
680    const result = await requestDaemon<{ connector: DaemonConnectorRuntimeState | null }>(
681      metadata,
682      `/connectors/${encodeURIComponent(connectorId)}/actions`,
683      {
684        method: 'POST',
685        body: JSON.stringify({ action, source }),
686      },
687    )
688    return result.connector || null
689  }
690  
691  export async function listDaemonRunningConnectors(platform?: string): Promise<DaemonRunningConnectorInfo[]> {
692    const metadata = readDaemonAdminMetadata()
693    if (!metadata || !isProcessRunning(metadata.pid)) return []
694    const query = platform ? `?platform=${encodeURIComponent(platform)}` : ''
695    try {
696      const result = await requestDaemon<{ connectors: DaemonRunningConnectorInfo[] }>(
697        metadata,
698        `/connectors/running${query}`,
699      )
700      return Array.isArray(result.connectors) ? result.connectors : []
701    } catch {
702      return []
703    }
704  }