controller.ts
1 import crypto from 'node:crypto' 2 import { spawn } from 'node:child_process' 3 import fs from 'node:fs' 4 import net from 'node:net' 5 import path from 'node:path' 6 7 import { log } from '@/lib/server/logger' 8 import { 9 DAEMON_LOG_PATH, 10 clearDaemonAdminMetadata, 11 isProcessRunning, 12 readDaemonAdminMetadata, 13 writeDaemonAdminMetadata, 14 } from '@/lib/server/daemon/admin-metadata' 15 import { 16 loadDaemonStatusRecord, 17 patchDaemonStatusRecord, 18 } from '@/lib/server/daemon/daemon-status-repository' 19 import type { 20 DaemonAdminMetadata, 21 DaemonConnectorRuntimeState, 22 DaemonHealthSummaryPayload, 23 DaemonRunningConnectorInfo, 24 DaemonStatusPayload, 25 } from '@/lib/server/daemon/types' 26 import { DATA_DIR } from '@/lib/server/data-dir' 27 import { loadEstopState } from '@/lib/server/runtime/estop' 28 import { 29 getDaemonHealthSummary, 30 getDaemonStatus, 31 startDaemon, 32 stopDaemon, 33 } from '@/lib/server/runtime/daemon-state/core' 34 import { daemonAutostartEnvEnabled } from '@/lib/server/runtime/daemon-policy' 35 import { 36 releaseRuntimeLock, 37 tryAcquireRuntimeLock, 38 } from '@/lib/server/runtime/runtime-lock-repository' 39 import { errorMessage, hmrSingleton } from '@/lib/shared-utils' 40 41 // HMR-safe single-shot guard so the "subprocess fallback unavailable" 42 // warning logs once per process lifetime, not per API call. 43 const subprocessFallbackUnavailableLogged = hmrSingleton<{ value: boolean }>( 44 '__swarmclaw_daemon_subprocess_fallback_warned__', 45 () => ({ value: false }), 46 ) 47 48 const TAG = 'daemon-controller' 49 const LAUNCH_LOCK_NAME = 'daemon-launcher' 50 const LAUNCH_LOCK_TTL_MS = 20_000 51 const DAEMON_READY_TIMEOUT_MS = 20_000 52 const DAEMON_POLL_INTERVAL_MS = 250 53 const DAEMON_STALE_AFTER_MS = 20_000 54 55 function now(): number { 56 return Date.now() 57 } 58 59 function createLockOwner(): string { 60 return `launcher:${process.pid}:${crypto.randomBytes(6).toString('hex')}` 61 } 62 63 function buildDefaultStatus(): DaemonStatusPayload { 64 return { 65 running: false, 66 schedulerActive: false, 67 autostartEnabled: daemonAutostartEnvEnabled(), 68 backgroundServicesEnabled: true, 69 reducedMode: false, 70 manualStopRequested: false, 71 estop: loadEstopState(), 72 queueLength: 0, 73 lastProcessed: null, 74 nextScheduled: null, 75 heartbeat: null, 76 health: { 77 monitorActive: false, 78 connectorMonitorActive: false, 79 staleSessions: 0, 80 connectorsInBackoff: 0, 81 connectorsExhausted: 0, 82 checkIntervalSec: 120, 83 connectorCheckIntervalSec: 15, 84 integrity: { 85 enabled: true, 86 lastCheckedAt: null, 87 lastDriftCount: 0, 88 }, 89 }, 90 webhookRetry: { 91 pendingRetries: 0, 92 deadLettered: 0, 93 }, 94 guards: { 95 healthCheckRunning: false, 96 connectorHealthCheckRunning: false, 97 shuttingDown: false, 98 providerCircuitBreakers: 0, 99 }, 100 } 101 } 102 103 function buildDefaultHealthSummary(): DaemonHealthSummaryPayload { 104 const estop = loadEstopState().level !== 'none' 105 return { 106 ok: false, 107 uptime: 0, 108 components: { 109 daemon: { status: estop ? 'degraded' : 'stopped' }, 110 connectors: { healthy: 0, errored: 0, total: 0 }, 111 providers: { healthy: 0, cooldown: 0, total: 0 }, 112 gateways: { healthy: 0, degraded: 0, total: 0 }, 113 }, 114 estop, 115 nextScheduledTask: null, 116 } 117 } 118 119 function getDaemonHomeDir(): string { 120 const configured = process.env.SWARMCLAW_HOME?.trim() 121 if (configured) return path.resolve(configured) 122 return path.dirname(DATA_DIR) 123 } 124 125 function resolveDaemonRoot(): string | null { 126 const candidates = [ 127 process.env.SWARMCLAW_BUILD_ROOT, 128 process.env.SWARMCLAW_PACKAGE_ROOT, 129 process.cwd(), 130 ] 131 .filter((value): value is string => typeof value === 'string' && value.trim().length > 0) 132 .map((value) => path.resolve(value)) 133 134 for (const root of candidates) { 135 if (fs.existsSync(path.join(root, 'src', 'lib', 'server', 'daemon', 'daemon-runtime.ts'))) { 136 return root 137 } 138 } 139 140 return null 141 } 142 143 function resolveDaemonRuntimeEntry(): { root: string; entry: string } { 144 const root = resolveDaemonRoot() 145 if (!root) { 146 throw new Error('Unable to locate daemon runtime entry. Set SWARMCLAW_BUILD_ROOT or SWARMCLAW_PACKAGE_ROOT.') 147 } 148 return { 149 root, 150 entry: path.join(root, 'src', 'lib', 'server', 'daemon', 'daemon-runtime.ts'), 151 } 152 } 153 154 function buildDaemonUrl(port: number, routePath: string): string { 155 const normalized = routePath.startsWith('/') ? routePath : `/${routePath}` 156 return `http://127.0.0.1:${port}${normalized}` 157 } 158 159 function withTimeout(timeoutMs = 2_000): AbortSignal { 160 return AbortSignal.timeout(timeoutMs) 161 } 162 163 async function readJsonResponse<T>(response: Response): Promise<T> { 164 const text = await response.text() 165 if (!text) return {} as T 166 return JSON.parse(text) as T 167 } 168 169 type DaemonSnapshotResponse = { 170 status: DaemonStatusPayload 171 healthSummary: DaemonHealthSummaryPayload 172 } 173 174 function getInProcessDaemonSnapshot(): DaemonSnapshotResponse | null { 175 const status = getDaemonStatus() 176 if (!status.running) return null 177 return { 178 status, 179 healthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload, 180 } 181 } 182 183 async function requestDaemon<T>( 184 metadata: DaemonAdminMetadata, 185 routePath: string, 186 init?: RequestInit, 187 ): Promise<T> { 188 const headers = new Headers(init?.headers || {}) 189 headers.set('authorization', `Bearer ${metadata.token}`) 190 if (init?.body && !headers.has('content-type')) { 191 headers.set('content-type', 'application/json') 192 } 193 194 const response = await fetch(buildDaemonUrl(metadata.port, routePath), { 195 ...init, 196 headers, 197 signal: init?.signal || withTimeout(), 198 }) 199 if (!response.ok) { 200 const detail = await response.text().catch(() => '') 201 throw new Error(`Daemon admin request failed (${response.status}): ${detail || response.statusText}`) 202 } 203 return readJsonResponse<T>(response) 204 } 205 206 function daemonRecordLooksLive(): boolean { 207 const record = loadDaemonStatusRecord() 208 return Boolean( 209 record.pid 210 && isProcessRunning(record.pid) 211 && record.lastHeartbeatAt 212 && now() - record.lastHeartbeatAt <= DAEMON_STALE_AFTER_MS, 213 ) 214 } 215 216 function buildFallbackStatus(): DaemonStatusPayload { 217 const record = loadDaemonStatusRecord() 218 const base = record.lastStatus ? { ...record.lastStatus } : buildDefaultStatus() 219 const running = daemonRecordLooksLive() 220 return { 221 ...base, 222 running, 223 schedulerActive: running, 224 autostartEnabled: daemonAutostartEnvEnabled(), 225 manualStopRequested: record.manualStopRequested, 226 estop: loadEstopState(), 227 } 228 } 229 230 function buildFallbackHealthSummary(): DaemonHealthSummaryPayload { 231 const record = loadDaemonStatusRecord() 232 const running = daemonRecordLooksLive() 233 const base = record.lastHealthSummary 234 ? { 235 ...record.lastHealthSummary, 236 components: { 237 ...record.lastHealthSummary.components, 238 daemon: { 239 ...record.lastHealthSummary.components.daemon, 240 }, 241 }, 242 } 243 : buildDefaultHealthSummary() 244 245 base.ok = running && base.components.daemon.status !== 'degraded' 246 base.components.daemon.status = running 247 ? (loadEstopState().level === 'none' ? 'healthy' : 'degraded') 248 : 'stopped' 249 base.estop = loadEstopState().level !== 'none' 250 return base 251 } 252 253 function markDaemonUnavailable(source: string, err?: unknown): void { 254 clearDaemonAdminMetadata() 255 patchDaemonStatusRecord((current) => { 256 const status = current.lastStatus ? { ...current.lastStatus } : buildDefaultStatus() 257 status.running = false 258 status.schedulerActive = false 259 status.estop = loadEstopState() 260 return { 261 ...current, 262 pid: null, 263 adminPort: null, 264 desiredState: current.manualStopRequested ? 'stopped' : current.desiredState, 265 stoppedAt: now(), 266 updatedAt: now(), 267 lastStopSource: source, 268 lastError: err ? errorMessage(err) : current.lastError, 269 lastStatus: status, 270 } 271 }) 272 } 273 274 async function getLiveDaemonSnapshot(): Promise<DaemonSnapshotResponse | null> { 275 const metadata = readDaemonAdminMetadata() 276 if (!metadata) return null 277 if (!isProcessRunning(metadata.pid)) { 278 markDaemonUnavailable('pid-missing') 279 return null 280 } 281 try { 282 return await requestDaemon<DaemonSnapshotResponse>(metadata, '/status') 283 } catch (err: unknown) { 284 if (!isProcessRunning(metadata.pid)) { 285 markDaemonUnavailable('request-failed', err) 286 return null 287 } 288 return null 289 } 290 } 291 292 async function waitForDaemonReady(metadata: DaemonAdminMetadata): Promise<void> { 293 const deadline = now() + DAEMON_READY_TIMEOUT_MS 294 while (now() < deadline) { 295 if (!isProcessRunning(metadata.pid)) { 296 throw new Error(`Daemon process ${metadata.pid} exited before becoming ready.`) 297 } 298 try { 299 const snapshot = await requestDaemon<DaemonSnapshotResponse>(metadata, '/status') 300 patchDaemonStatusRecord((current) => ({ 301 ...current, 302 pid: metadata.pid, 303 adminPort: metadata.port, 304 desiredState: 'running', 305 manualStopRequested: false, 306 startedAt: current.startedAt || metadata.launchedAt, 307 stoppedAt: null, 308 lastHeartbeatAt: now(), 309 updatedAt: now(), 310 lastError: null, 311 lastStatus: snapshot.status, 312 lastHealthSummary: snapshot.healthSummary, 313 })) 314 return 315 } catch { 316 await new Promise((resolve) => setTimeout(resolve, DAEMON_POLL_INTERVAL_MS)) 317 } 318 } 319 throw new Error(`Timed out waiting for daemon admin server on port ${metadata.port}.`) 320 } 321 322 async function waitForProcessExit(pid: number, timeoutMs = 5_000): Promise<void> { 323 const deadline = now() + timeoutMs 324 while (now() < deadline) { 325 if (!isProcessRunning(pid)) return 326 await new Promise((resolve) => setTimeout(resolve, 100)) 327 } 328 } 329 330 async function reservePort(): Promise<number> { 331 return new Promise<number>((resolve, reject) => { 332 const server = net.createServer() 333 server.once('error', reject) 334 server.listen(0, '127.0.0.1', () => { 335 const address = server.address() 336 if (!address || typeof address === 'string') { 337 server.close(() => reject(new Error('Failed to reserve daemon admin port.'))) 338 return 339 } 340 const { port } = address 341 server.close((err) => { 342 if (err) reject(err) 343 else resolve(port) 344 }) 345 }) 346 }) 347 } 348 349 function buildDaemonSpawnEnv(root: string, adminPort: number, adminToken: string): NodeJS.ProcessEnv { 350 return { 351 ...process.env, 352 SWARMCLAW_HOME: getDaemonHomeDir(), 353 DATA_DIR, 354 WORKSPACE_DIR: process.env.WORKSPACE_DIR, 355 BROWSER_PROFILES_DIR: process.env.BROWSER_PROFILES_DIR, 356 SWARMCLAW_BUILD_ROOT: process.env.SWARMCLAW_BUILD_ROOT || root, 357 SWARMCLAW_PACKAGE_ROOT: process.env.SWARMCLAW_PACKAGE_ROOT || root, 358 SWARMCLAW_RUNTIME_ROLE: 'daemon', 359 SWARMCLAW_DAEMON_BACKGROUND_SERVICES: '1', 360 SWARMCLAW_DAEMON_ADMIN_PORT: String(adminPort), 361 SWARMCLAW_DAEMON_ADMIN_TOKEN: adminToken, 362 } 363 } 364 365 export async function ensureDaemonProcessRunning( 366 source: string, 367 opts?: { manualStart?: boolean }, 368 ): Promise<boolean> { 369 const manualStart = opts?.manualStart === true 370 const record = loadDaemonStatusRecord() 371 if (loadEstopState().level !== 'none') return false 372 if (!manualStart && !daemonAutostartEnvEnabled()) return false 373 if (!manualStart && record.manualStopRequested) return false 374 375 const inProcessSnapshot = getInProcessDaemonSnapshot() 376 if (inProcessSnapshot) return false 377 378 const startedInProcess = startDaemon({ source, manualStart }) 379 if (startedInProcess) return true 380 if (getInProcessDaemonSnapshot()) return false 381 382 const live = await getLiveDaemonSnapshot() 383 if (live?.status.running) return false 384 385 const lockOwner = createLockOwner() 386 if (!tryAcquireRuntimeLock(LAUNCH_LOCK_NAME, lockOwner, LAUNCH_LOCK_TTL_MS)) return false 387 388 try { 389 const secondCheck = await getLiveDaemonSnapshot() 390 if (secondCheck?.status.running) return false 391 392 let resolved: { root: string; entry: string } 393 try { 394 resolved = resolveDaemonRuntimeEntry() 395 } catch (err: unknown) { 396 // The standalone Docker image does not ship `src/` (Next.js standalone 397 // output excludes raw source files), so the subprocess fallback can 398 // never spawn there. Fail soft: log once and let callers fall back to 399 // whatever in-process daemon path is available rather than surfacing 400 // a 500 to API consumers. Reported as issue #41 (Bug 3). 401 if (!subprocessFallbackUnavailableLogged.value) { 402 subprocessFallbackUnavailableLogged.value = true 403 log.warn(TAG, `[daemon] Subprocess fallback unavailable in this build (${errorMessage(err)}). The in-process daemon will continue to be the primary path.`) 404 } 405 return false 406 } 407 const { root, entry } = resolved 408 const adminPort = await reservePort() 409 const adminToken = crypto.randomBytes(24).toString('hex') 410 fs.mkdirSync(path.dirname(DAEMON_LOG_PATH), { recursive: true }) 411 const logStream = fs.openSync(DAEMON_LOG_PATH, 'a') 412 const child = spawn( 413 process.execPath, 414 ['--no-warnings', '--import', 'tsx', entry, '--port', String(adminPort), '--token', adminToken], 415 { 416 cwd: root, 417 detached: true, 418 env: buildDaemonSpawnEnv(root, adminPort, adminToken), 419 stdio: ['ignore', logStream, logStream], 420 }, 421 ) 422 423 const metadata: DaemonAdminMetadata = { 424 pid: child.pid ?? 0, 425 port: adminPort, 426 token: adminToken, 427 launchedAt: now(), 428 source, 429 } 430 if (!metadata.pid) { 431 throw new Error('Daemon process failed to spawn.') 432 } 433 434 writeDaemonAdminMetadata(metadata) 435 patchDaemonStatusRecord((current) => ({ 436 ...current, 437 pid: metadata.pid, 438 adminPort: metadata.port, 439 desiredState: 'running', 440 manualStopRequested: false, 441 startedAt: current.startedAt, 442 stoppedAt: null, 443 updatedAt: now(), 444 lastLaunchSource: source, 445 lastError: null, 446 })) 447 448 await waitForDaemonReady(metadata) 449 child.unref() 450 return true 451 } catch (err: unknown) { 452 markDaemonUnavailable(`launch-failed:${source}`, err) 453 throw err 454 } finally { 455 releaseRuntimeLock(LAUNCH_LOCK_NAME, lockOwner) 456 } 457 } 458 459 export async function stopDaemonProcess(opts?: { 460 source?: string 461 manualStop?: boolean 462 }): Promise<boolean> { 463 const source = opts?.source || 'unknown' 464 const manualStop = opts?.manualStop === true 465 const metadata = readDaemonAdminMetadata() 466 const inProcessSnapshot = getInProcessDaemonSnapshot() 467 468 if (inProcessSnapshot && (!metadata || metadata.pid === process.pid || !isProcessRunning(metadata.pid))) { 469 await stopDaemon({ source, manualStop }) 470 clearDaemonAdminMetadata() 471 patchDaemonStatusRecord((current) => ({ 472 ...current, 473 pid: null, 474 adminPort: null, 475 desiredState: 'stopped', 476 manualStopRequested: manualStop ? true : current.manualStopRequested, 477 stoppedAt: now(), 478 updatedAt: now(), 479 lastStopSource: source, 480 lastStatus: { 481 ...getDaemonStatus(), 482 manualStopRequested: manualStop ? true : current.manualStopRequested, 483 }, 484 lastHealthSummary: getDaemonHealthSummary() as DaemonHealthSummaryPayload, 485 })) 486 return true 487 } 488 489 if (!metadata || !isProcessRunning(metadata.pid)) { 490 clearDaemonAdminMetadata() 491 patchDaemonStatusRecord((current) => ({ 492 ...current, 493 pid: null, 494 adminPort: null, 495 desiredState: 'stopped', 496 manualStopRequested: manualStop ? true : current.manualStopRequested, 497 stoppedAt: now(), 498 updatedAt: now(), 499 lastStopSource: source, 500 lastStatus: { 501 ...(current.lastStatus || buildDefaultStatus()), 502 running: false, 503 schedulerActive: false, 504 manualStopRequested: manualStop ? true : current.manualStopRequested, 505 estop: loadEstopState(), 506 }, 507 })) 508 return false 509 } 510 511 try { 512 await requestDaemon<{ ok: boolean }>(metadata, '/stop', { 513 method: 'POST', 514 body: JSON.stringify({ source }), 515 }) 516 } catch (err: unknown) { 517 if (isProcessRunning(metadata.pid)) { 518 try { 519 process.kill(metadata.pid, 'SIGTERM') 520 } catch { 521 // Fall through to stale cleanup below. 522 } 523 } 524 log.warn(TAG, `Daemon stop request fell back to SIGTERM (${source})`, errorMessage(err)) 525 } 526 527 await waitForProcessExit(metadata.pid) 528 clearDaemonAdminMetadata() 529 patchDaemonStatusRecord((current) => ({ 530 ...current, 531 pid: null, 532 adminPort: null, 533 desiredState: 'stopped', 534 manualStopRequested: manualStop ? true : current.manualStopRequested, 535 stoppedAt: now(), 536 updatedAt: now(), 537 lastStopSource: source, 538 lastStatus: { 539 ...(current.lastStatus || buildDefaultStatus()), 540 running: false, 541 schedulerActive: false, 542 manualStopRequested: manualStop ? true : current.manualStopRequested, 543 estop: loadEstopState(), 544 }, 545 })) 546 return true 547 } 548 549 export async function getDaemonStatusSnapshot(): Promise<DaemonStatusPayload> { 550 const inProcessSnapshot = getInProcessDaemonSnapshot() 551 if (inProcessSnapshot) return inProcessSnapshot.status 552 const live = await getLiveDaemonSnapshot() 553 if (live) return live.status 554 return buildFallbackStatus() 555 } 556 557 export async function getDaemonHealthSummarySnapshot(): Promise<DaemonHealthSummaryPayload> { 558 const inProcessSnapshot = getInProcessDaemonSnapshot() 559 if (inProcessSnapshot) return inProcessSnapshot.healthSummary 560 const live = await getLiveDaemonSnapshot() 561 if (live) return live.healthSummary 562 return buildFallbackHealthSummary() 563 } 564 565 export async function runDaemonHealthCheckViaAdmin(source: string): Promise<DaemonSnapshotResponse> { 566 await ensureDaemonProcessRunning(source, { manualStart: true }) 567 const inProcessSnapshot = getInProcessDaemonSnapshot() 568 if (inProcessSnapshot) return inProcessSnapshot 569 const metadata = readDaemonAdminMetadata() 570 if (!metadata) { 571 return { 572 status: buildFallbackStatus(), 573 healthSummary: buildFallbackHealthSummary(), 574 } 575 } 576 try { 577 return await requestDaemon<DaemonSnapshotResponse>(metadata, '/health-check', { 578 method: 'POST', 579 body: JSON.stringify({ source }), 580 }) 581 } catch (err: unknown) { 582 markDaemonUnavailable(`health-check:${source}`, err) 583 return { 584 status: buildFallbackStatus(), 585 healthSummary: buildFallbackHealthSummary(), 586 } 587 } 588 } 589 590 export async function listDaemonConnectorRuntime(): Promise<Record<string, DaemonConnectorRuntimeState>> { 591 // When the daemon is running in-process, read runtime state directly. 592 const inProcessStatus = getDaemonStatus() 593 if (inProcessStatus.running) { 594 const { listRunningConnectors, getConnectorStatus, isConnectorAuthenticated, hasConnectorCredentials, getConnectorQR, getConnectorPresence } = 595 await import('@/lib/server/connectors/connector-lifecycle') 596 const result: Record<string, DaemonConnectorRuntimeState> = {} 597 for (const { id } of listRunningConnectors()) { 598 result[id] = { 599 status: getConnectorStatus(id), 600 authenticated: isConnectorAuthenticated(id), 601 hasCredentials: hasConnectorCredentials(id), 602 qrDataUrl: getConnectorQR(id), 603 presence: getConnectorPresence(id), 604 } 605 } 606 return result 607 } 608 609 const metadata = readDaemonAdminMetadata() 610 if (!metadata || !isProcessRunning(metadata.pid)) return {} 611 try { 612 const result = await requestDaemon<{ connectors: Record<string, DaemonConnectorRuntimeState> }>(metadata, '/connectors') 613 return result.connectors || {} 614 } catch { 615 return {} 616 } 617 } 618 619 export async function getDaemonConnectorRuntime(connectorId: string): Promise<DaemonConnectorRuntimeState | null> { 620 // When the daemon is running in-process, read runtime state directly from 621 // the connector lifecycle module instead of an unreachable subprocess HTTP API. 622 const inProcessStatus = getDaemonStatus() 623 if (inProcessStatus.running) { 624 const { getConnectorStatus, getConnectorQR, isConnectorAuthenticated, hasConnectorCredentials, getConnectorPresence } = 625 await import('@/lib/server/connectors/connector-lifecycle') 626 const status = getConnectorStatus(connectorId) 627 return { 628 status, 629 authenticated: isConnectorAuthenticated(connectorId), 630 hasCredentials: hasConnectorCredentials(connectorId), 631 qrDataUrl: getConnectorQR(connectorId), 632 presence: getConnectorPresence(connectorId), 633 } 634 } 635 636 const metadata = readDaemonAdminMetadata() 637 if (!metadata || !isProcessRunning(metadata.pid)) return null 638 try { 639 const result = await requestDaemon<{ connector: DaemonConnectorRuntimeState | null }>( 640 metadata, 641 `/connectors/${encodeURIComponent(connectorId)}`, 642 ) 643 return result.connector || null 644 } catch { 645 return null 646 } 647 } 648 649 export async function runDaemonConnectorAction( 650 connectorId: string, 651 action: 'start' | 'stop' | 'repair', 652 source: string, 653 ): Promise<DaemonConnectorRuntimeState | null> { 654 if (action !== 'stop') { 655 await ensureDaemonProcessRunning(source, { manualStart: true }) 656 } 657 658 // When the daemon is running in-process (e.g. standalone production build), 659 // there is no subprocess admin server or daemon-admin.json. Execute the 660 // connector lifecycle action directly in the current process. 661 const inProcessStatus = getDaemonStatus() 662 if (inProcessStatus.running) { 663 try { 664 const { startConnector, stopConnector, repairConnector } = await import('@/lib/server/connectors/connector-lifecycle') 665 if (action === 'start') { 666 await startConnector(connectorId) 667 } else if (action === 'stop') { 668 await stopConnector(connectorId) 669 } else if (action === 'repair') { 670 await repairConnector(connectorId) 671 } 672 } catch (err: unknown) { 673 log.error(TAG, `In-process connector action "${action}" failed for ${connectorId}:`, errorMessage(err)) 674 } 675 return null 676 } 677 678 const metadata = readDaemonAdminMetadata() 679 if (!metadata || !isProcessRunning(metadata.pid)) return null 680 const result = await requestDaemon<{ connector: DaemonConnectorRuntimeState | null }>( 681 metadata, 682 `/connectors/${encodeURIComponent(connectorId)}/actions`, 683 { 684 method: 'POST', 685 body: JSON.stringify({ action, source }), 686 }, 687 ) 688 return result.connector || null 689 } 690 691 export async function listDaemonRunningConnectors(platform?: string): Promise<DaemonRunningConnectorInfo[]> { 692 const metadata = readDaemonAdminMetadata() 693 if (!metadata || !isProcessRunning(metadata.pid)) return [] 694 const query = platform ? `?platform=${encodeURIComponent(platform)}` : '' 695 try { 696 const result = await requestDaemon<{ connectors: DaemonRunningConnectorInfo[] }>( 697 metadata, 698 `/connectors/running${query}`, 699 ) 700 return Array.isArray(result.connectors) ? result.connectors : [] 701 } catch { 702 return [] 703 } 704 }