daemon.go
1 package cmd 2 3 import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "net/http" 9 "os" 10 "os/signal" 11 "path/filepath" 12 "strings" 13 "sync" 14 "syscall" 15 "time" 16 17 "github.com/Kocoro-lab/ShanClaw/internal/agent" 18 "github.com/Kocoro-lab/ShanClaw/internal/agents" 19 "github.com/Kocoro-lab/ShanClaw/internal/audit" 20 "github.com/Kocoro-lab/ShanClaw/internal/client" 21 "github.com/Kocoro-lab/ShanClaw/internal/config" 22 "github.com/Kocoro-lab/ShanClaw/internal/daemon" 23 "github.com/Kocoro-lab/ShanClaw/internal/heartbeat" 24 "github.com/Kocoro-lab/ShanClaw/internal/hooks" 25 "github.com/Kocoro-lab/ShanClaw/internal/mcp" 26 "github.com/Kocoro-lab/ShanClaw/internal/permissions" 27 "github.com/Kocoro-lab/ShanClaw/internal/schedule" 28 "github.com/Kocoro-lab/ShanClaw/internal/skills" 29 "github.com/Kocoro-lab/ShanClaw/internal/tools" 30 "github.com/Kocoro-lab/ShanClaw/internal/watcher" 31 "github.com/spf13/cobra" 32 ) 33 34 var daemonCmd = &cobra.Command{ 35 Use: "daemon", 36 Short: "Background daemon for channel messaging", 37 } 38 39 var daemonStartCmd = &cobra.Command{ 40 Use: "start", 41 Short: "Start the daemon (connects to Shannon Cloud for channel messages)", 42 RunE: func(cmd *cobra.Command, args []string) error { 43 detach, _ := cmd.Flags().GetBool("detach") 44 if detach { 45 return daemonStartDetached() 46 } 47 48 cfg, err := config.Load() 49 if err != nil { 50 return fmt.Errorf("config: %w", err) 51 } 52 53 shanDir := config.ShannonDir() 54 agentsDir := filepath.Join(shanDir, "agents") 55 pidPath := filepath.Join(shanDir, "daemon.pid") 56 57 if err := agents.EnsureBuiltins(agentsDir, Version); err != nil { 58 log.Printf("WARNING: failed to sync builtin agents: %v", err) 59 } 60 if err := skills.EnsureBuiltinSkills(shanDir); err != nil { 61 log.Printf("WARNING: failed to sync builtin skills: %v", err) 62 } 63 64 force, _ := cmd.Flags().GetBool("force") 65 if force { 66 stopExistingDaemon(pidPath) 67 } 68 69 if daemon.IsDaemonServiceLoaded() { 70 log.Println("Warning: daemon is managed by launchd. Use 'shan daemon stop' to remove launchd management.") 71 } 72 73 pidFile, err := daemon.AcquirePIDFile(pidPath) 74 if err != nil { 75 return err 76 } 77 defer pidFile.Close() 78 79 // Clean up orphaned Chrome CDP from a previous hard kill. Must run AFTER 80 // AcquirePIDFile — holding the lock guarantees no other daemon is alive, 81 // so any Chrome CDP we find is truly orphaned (not owned by a peer). 82 mcp.CleanupOrphanedCDPChrome() 83 84 // Apply configured Chrome profile override before any CDP launch. 85 mcp.SetCDPChromeProfile(cfg.Daemon.ChromeProfile) 86 87 gw := client.NewGatewayClient(cfg.Endpoint, cfg.APIKey) 88 baselineReg, reg, skillsPtr, mcpMgr, cleanup, serverErr := tools.RegisterAllWithBaseline(gw, cfg) 89 if serverErr != nil { 90 log.Printf("Warning: %v", serverErr) 91 } 92 _ = skillsPtr // skills are set per-request in RunAgent 93 94 tools.RegisterCloudDelegate(reg, gw, cfg, nil, "", "") // daemon: agent forwarding per-message not yet supported 95 96 gatewayOverlay := tools.ExtractGatewayTools(reg) 97 postOverlays := tools.ExtractPostOverlays(reg, baselineReg) 98 99 var auditor *audit.AuditLogger 100 if shanDir != "" { 101 auditor, _ = audit.NewAuditLogger(filepath.Join(shanDir, "logs")) 102 } 103 if auditor != nil { 104 defer auditor.Close() 105 } 106 hookRunner := hooks.NewHookRunner(cfg.Hooks) 107 108 sessionCache := daemon.NewSessionCache(shanDir) 109 110 wsEndpoint := strings.Replace(cfg.Endpoint, "https://", "wss://", 1) 111 wsEndpoint = strings.Replace(wsEndpoint, "http://", "ws://", 1) 112 wsEndpoint += "/v1/ws/messages" 113 scheduleManager := schedule.NewManager(filepath.Join(shanDir, "schedules.json")) 114 115 ctx, cancel := context.WithCancel(context.Background()) 116 defer cancel() 117 118 sigCh := make(chan os.Signal, 1) 119 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 120 go func() { 121 <-sigCh 122 log.Println("daemon: shutting down...") 123 mcp.StopCDPChrome() 124 cancel() 125 }() 126 127 deps := &daemon.ServerDeps{ 128 Config: cfg, 129 GW: gw, 130 Registry: reg, 131 MCPManager: mcpMgr, 132 Cleanup: cleanup, 133 ShannonDir: shanDir, 134 AgentsDir: agentsDir, 135 Auditor: auditor, 136 HookRunner: hookRunner, 137 SessionCache: sessionCache, 138 ScheduleManager: scheduleManager, 139 BaselineReg: baselineReg, 140 GatewayOverlay: gatewayOverlay, 141 PostOverlays: postOverlays, 142 } 143 defer func() { 144 if deps.Supervisor != nil { 145 deps.Supervisor.Stop() 146 } 147 deps.ShutdownCleanup() 148 }() 149 150 supervisor := mcp.NewSupervisor(mcpMgr) 151 supervisor.RegisterCapabilityProbe("playwright", &mcp.PlaywrightProbe{}) 152 supervisor.SetOnReconnect(func(ctx context.Context, serverName string) { 153 if serverName == "playwright" { 154 tools.CleanupPlaywrightReconnect(ctx, mcpMgr) 155 } 156 }) 157 supervisor.SetOnChange(func(server string, oldState, newState mcp.HealthState) { 158 _, _, depsSup := deps.Snapshot() 159 if depsSup != supervisor { 160 return 161 } 162 // Read cached layers from deps (refreshed on any config reload) 163 bl, gwOv, po, mgr := deps.RebuildLayers() 164 newReg := tools.RebuildRegistryForHealth(bl, gwOv, po, supervisor.HealthStates(), mgr, supervisor) 165 deps.WriteLock() 166 deps.Registry = newReg 167 deps.WriteUnlock() 168 log.Printf("MCP registry rebuilt: %d tools", len(newReg.All())) 169 }) 170 171 deps.WriteLock() 172 deps.Supervisor = supervisor 173 deps.WriteUnlock() 174 175 supervisor.Start(ctx) 176 177 // Force initial registry rebuild to attach the supervisor to MCPTools. 178 // CompleteRegistration creates tools before the supervisor exists, so 179 // they lack on-demand reconnect. This rebuild replaces them with 180 // supervisor-aware instances from the cached tool list. 181 { 182 bl, gwOv, po, mgr := deps.RebuildLayers() 183 initReg := tools.RebuildRegistryForHealth(bl, gwOv, po, supervisor.HealthStates(), mgr, supervisor) 184 deps.WriteLock() 185 deps.Registry = initReg 186 deps.WriteUnlock() 187 log.Printf("MCP registry initialized with supervisor: %d tools", len(initReg.All())) 188 } 189 190 if !cfg.Daemon.AutoApprove { 191 log.Println("daemon: interactive approval mode — tools requiring approval will be sent to the client for user confirmation. Set daemon.auto_approve: true in config to auto-approve all tools.") 192 } 193 194 // Create WS client first, then broker (broker needs client's send method). 195 var wsClient *daemon.Client 196 var broker *daemon.ApprovalBroker 197 198 wsClient = daemon.NewClient(wsEndpoint, cfg.APIKey, func(msg daemon.MessagePayload) string { 199 msgCtx := ctx 200 201 // Wire per-message workflow_id callback via context for streaming card replies. 202 // Uses context (not mutable tool field) for concurrency safety. 203 if msg.MessageID != "" { 204 msgID := msg.MessageID 205 msgCtx = tools.WithOnWorkflowStarted(msgCtx, func(workflowID string) { 206 _ = wsClient.SendProgressWithWorkflow(msgID, workflowID) 207 }) 208 } 209 210 // Use msg.Source if Cloud populates it; fall back to msg.Channel during rolling deploy 211 source := msg.Source 212 if source == "" { 213 source = msg.Channel 214 } 215 req := daemon.RunAgentRequest{ 216 Text: msg.Text, 217 Content: msg.Content, 218 Agent: msg.AgentName, 219 Source: source, 220 Channel: msg.Channel, 221 ThreadID: msg.ThreadID, 222 Sender: msg.Sender, 223 CWD: msg.CWD, 224 Files: msg.Files, 225 } 226 // Fall back to @mention parsing if cloud didn't set agent name. 227 if req.Agent == "" { 228 agentName, prompt := agents.ParseAgentMention(msg.Text) 229 req.Agent = agentName 230 req.Text = prompt 231 } 232 if req.Text == "" { 233 req.Text = msg.Text 234 } 235 // Allow file-only messages (no text) from messaging platforms. 236 if req.Text == "" && len(req.Files) > 0 { 237 req.Text = "[Attached files]" 238 } 239 if err := req.Validate(); err != nil { 240 return daemon.FriendlyAgentError(err) 241 } 242 req.EnsureRouteKey() 243 244 // Try injecting into an active run on the same route. 245 if req.RouteKey != "" { 246 switch deps.SessionCache.InjectMessage(req.RouteKey, agent.InjectedMessage{Text: req.Text, CWD: req.CWD}) { 247 case daemon.InjectOK: 248 // Message injected — running loop will incorporate it. 249 return "[message received, processing...]" 250 case daemon.InjectQueueFull: 251 // Active run exists but queue saturated — don't start a new run. 252 log.Printf("daemon: inject queue full for route %q, message dropped", req.RouteKey) 253 return "" 254 case daemon.InjectBusy: 255 return "[message rejected: the active run is still initializing; retry when it reaches the next turn]" 256 case daemon.InjectCWDConflict: 257 return "[message rejected: the active run is using a different project; wait for it to finish or cancel it before switching cwd]" 258 case daemon.InjectNoActiveRun: 259 // Fall through to start a new RunAgent 260 } 261 } 262 263 // Resolve auto_approve: per-agent overrides global 264 autoApprove := cfg.Daemon.AutoApprove 265 if req.Agent != "" { 266 if a, err := agents.LoadAgent(agentsDir, req.Agent); err == nil && a.Config != nil && a.Config.AutoApprove != nil { 267 autoApprove = *a.Config.AutoApprove 268 } 269 } 270 271 handler := &daemonEventHandler{ 272 broker: broker, 273 ctx: msgCtx, 274 channel: msg.Channel, 275 threadID: msg.ThreadID, 276 agent: req.Agent, 277 autoApprove: autoApprove, 278 shannonDir: shanDir, 279 deps: deps, 280 wsClient: wsClient, 281 messageID: msg.MessageID, 282 } 283 284 result, err := daemon.RunAgent(msgCtx, deps, req, handler) 285 if err != nil { 286 // Full error already logged inside RunAgent; return clean message. 287 return daemon.FriendlyAgentError(err) 288 } 289 290 log.Printf("daemon: reply to %s (%d tokens, $%.4f)", result.Agent, result.Usage.TotalTokens, result.Usage.CostUSD) 291 return result.Reply 292 }, func(text string) { 293 log.Printf("daemon: [system] %s", text) 294 }) 295 296 broker = daemon.NewApprovalBroker(wsClient.SendApprovalRequest) 297 wsClient.SetApprovalBroker(broker) 298 299 localServer := daemon.NewServer(7533, wsClient, deps, Version) 300 localServer.SetCancelFunc(cancel) 301 localServer.SetApprovalResolvedNotifier(wsClient.SendApprovalResolved) 302 wsClient.SetEventBus(localServer.EventBus()) 303 deps.EventBus = localServer.EventBus() 304 deps.WSClient = wsClient 305 306 // Start file watcher and heartbeat manager. 307 var triggerMu sync.Mutex 308 var fileWatcher *watcher.Watcher 309 var hbManager *heartbeat.Manager 310 311 watchRunFn := func(watchCtx context.Context, agentName, prompt string) { 312 req := daemon.RunAgentRequest{ 313 Agent: agentName, 314 Source: "watcher", 315 Text: prompt, 316 } 317 handler := &autoApproveHandler{} 318 result, err := daemon.RunAgent(watchCtx, deps, req, handler) 319 if err != nil { 320 log.Printf("daemon: watcher agent %q error: %v", agentName, err) 321 return 322 } 323 log.Printf("daemon: watcher agent %q reply (%d tokens): %s", agentName, result.Usage.TotalTokens, truncateReply(result.Reply, 200)) 324 } 325 agentWatches := collectAgentWatches(agentsDir) 326 if len(agentWatches) > 0 { 327 fw, err := watcher.New(agentWatches, watchRunFn) 328 if err != nil { 329 log.Printf("daemon: watcher init failed: %v", err) 330 } else { 331 fw.Start(ctx) 332 fileWatcher = fw 333 log.Printf("daemon: file watcher started (%d agents)", len(agentWatches)) 334 } 335 } 336 337 hbMgr, err := heartbeat.New(agentsDir, deps) 338 if err != nil { 339 log.Printf("daemon: heartbeat init failed: %v", err) 340 } else { 341 hbMgr.Start(ctx) 342 hbManager = hbMgr 343 log.Printf("daemon: heartbeat manager started") 344 } 345 346 // Start internal cron scheduler (evaluates schedules each minute). 347 cronScheduler := daemon.NewScheduler(scheduleManager, deps) 348 go cronScheduler.Start(ctx) 349 log.Println("daemon: cron scheduler started") 350 351 localServer.SetOnReload(func() { 352 triggerMu.Lock() 353 defer triggerMu.Unlock() 354 355 // Close old watcher/heartbeat. 356 if fileWatcher != nil { 357 fileWatcher.Close() 358 fileWatcher = nil 359 } 360 if hbManager != nil { 361 hbManager.Close() 362 hbManager = nil 363 } 364 365 // Rebuild from fresh agent configs. 366 newWatches := collectAgentWatches(agentsDir) 367 if len(newWatches) > 0 { 368 fw, err := watcher.New(newWatches, watchRunFn) 369 if err != nil { 370 log.Printf("daemon: reload watcher init failed: %v", err) 371 } else { 372 fw.Start(ctx) 373 fileWatcher = fw 374 log.Printf("daemon: file watcher restarted (%d agents)", len(newWatches)) 375 } 376 } 377 378 newHb, err := heartbeat.New(agentsDir, deps) 379 if err != nil { 380 log.Printf("daemon: reload heartbeat init failed: %v", err) 381 } else { 382 newHb.Start(ctx) 383 hbManager = newHb 384 log.Printf("daemon: heartbeat manager restarted") 385 } 386 }) 387 388 broker.SetOnRequest(func(requestID, tool, args string) { 389 if localServer.EventBus() != nil { 390 payload, _ := json.Marshal(map[string]string{ 391 "request_id": requestID, 392 "tool": tool, 393 "args": args, 394 }) 395 localServer.EventBus().Emit(daemon.Event{Type: daemon.EventApprovalRequest, Payload: payload}) 396 } 397 }) 398 serverErrCh := make(chan error, 1) 399 go func() { 400 serverErrCh <- localServer.Start(ctx) 401 }() 402 // Give the listener a moment to bind, then check for immediate failure. 403 time.Sleep(50 * time.Millisecond) 404 select { 405 case err := <-serverErrCh: 406 return fmt.Errorf("daemon: local server failed to start: %w", err) 407 default: 408 log.Printf("daemon: local server listening on http://127.0.0.1:7533") 409 } 410 411 log.Printf("daemon: connecting to %s", wsEndpoint) 412 wsClient.RunWithReconnect(ctx) 413 414 triggerMu.Lock() 415 if fileWatcher != nil { 416 fileWatcher.Close() 417 fileWatcher = nil 418 } 419 if hbManager != nil { 420 hbManager.Close() 421 hbManager = nil 422 } 423 triggerMu.Unlock() 424 425 sessionCache.CloseAll() 426 return nil 427 }, 428 } 429 430 var daemonStopCmd = &cobra.Command{ 431 Use: "stop", 432 Short: "Stop the background daemon", 433 RunE: func(cmd *cobra.Command, args []string) error { 434 launchdManaged := daemon.IsDaemonServiceLoaded() 435 if launchdManaged { 436 if err := daemon.LaunchctlBootout(); err != nil { 437 log.Printf("Warning: launchctl bootout failed: %v", err) 438 } 439 daemon.RemoveDaemonPlist() 440 } 441 442 pidPath := filepath.Join(config.ShannonDir(), "daemon.pid") 443 444 // If launchd bootout already killed the process, we're done. 445 if launchdManaged { 446 // Brief wait for process to exit after bootout. 447 time.Sleep(500 * time.Millisecond) 448 if _, locked := daemon.IsLocked(pidPath); !locked { 449 fmt.Println("Daemon stopped (launchd service removed).") 450 return nil 451 } 452 // Process still alive — fall through to HTTP/SIGTERM. 453 } 454 455 // Try graceful HTTP shutdown first. 456 resp, err := http.Post("http://127.0.0.1:7533/shutdown", "application/json", nil) 457 if err == nil { 458 resp.Body.Close() 459 if resp.StatusCode != http.StatusOK { 460 return fmt.Errorf("unexpected response: %s", resp.Status) 461 } 462 // Wait for process to fully exit (PID file lock released). 463 deadline := time.After(5 * time.Second) 464 ticker := time.NewTicker(200 * time.Millisecond) 465 defer ticker.Stop() 466 for { 467 select { 468 case <-deadline: 469 fmt.Println("Daemon shutdown requested (still exiting).") 470 return nil 471 case <-ticker.C: 472 if _, locked := daemon.IsLocked(pidPath); !locked { 473 fmt.Println("Daemon stopped.") 474 return nil 475 } 476 } 477 } 478 } 479 480 // HTTP failed — fall back to SIGTERM via PID file. 481 pid, locked := daemon.IsLocked(pidPath) 482 if !locked { 483 return fmt.Errorf("daemon not running") 484 } 485 if pid <= 0 { 486 return fmt.Errorf("daemon PID file is locked but contains invalid PID") 487 } 488 489 proc, err := os.FindProcess(pid) 490 if err != nil { 491 return fmt.Errorf("cannot find daemon process %d: %w", pid, err) 492 } 493 if err := proc.Signal(syscall.SIGTERM); err != nil { 494 return fmt.Errorf("failed to send SIGTERM to PID %d: %w", pid, err) 495 } 496 fmt.Printf("Sent SIGTERM to daemon (PID %d).\n", pid) 497 498 // Wait for process to exit (up to 5s). 499 deadline := time.After(5 * time.Second) 500 ticker := time.NewTicker(200 * time.Millisecond) 501 defer ticker.Stop() 502 for { 503 select { 504 case <-deadline: 505 fmt.Printf("Warning: daemon (PID %d) did not exit within 5s.\n", pid) 506 return nil 507 case <-ticker.C: 508 if _, locked := daemon.IsLocked(pidPath); !locked { 509 fmt.Println("Daemon stopped.") 510 return nil 511 } 512 } 513 } 514 }, 515 } 516 517 var daemonStatusCmd = &cobra.Command{ 518 Use: "status", 519 Short: "Show daemon status", 520 RunE: func(cmd *cobra.Command, args []string) error { 521 pidPath := filepath.Join(config.ShannonDir(), "daemon.pid") 522 523 resp, err := http.Get("http://127.0.0.1:7533/status") 524 if err != nil { 525 // HTTP failed — check PID file to distinguish "not running" from "running but no HTTP server". 526 if pid, locked := daemon.IsLocked(pidPath); locked { 527 fmt.Printf("Status: running (HTTP server unavailable)\n") 528 fmt.Printf("PID: %d\n", pid) 529 return nil 530 } 531 fmt.Println("Daemon is not running.") 532 return nil 533 } 534 defer resp.Body.Close() 535 536 var status struct { 537 IsConnected bool `json:"is_connected"` 538 ActiveAgent string `json:"active_agent"` 539 Uptime int `json:"uptime"` 540 Version string `json:"version"` 541 } 542 if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { 543 return fmt.Errorf("failed to parse status: %w", err) 544 } 545 546 pid, _ := daemon.ReadPID(pidPath) 547 fmt.Printf("Status: running\n") 548 if pid > 0 { 549 fmt.Printf("PID: %d\n", pid) 550 } 551 if status.Version != "" { 552 fmt.Printf("Version: %s\n", status.Version) 553 } 554 fmt.Printf("Connected: %v\n", status.IsConnected) 555 if status.ActiveAgent != "" { 556 fmt.Printf("Agent: %s\n", status.ActiveAgent) 557 } 558 uptime := time.Duration(status.Uptime) * time.Second 559 fmt.Printf("Uptime: %s\n", uptime) 560 if daemon.IsDaemonServiceLoaded() { 561 fmt.Printf("Launchd: managed\n") 562 } else { 563 fmt.Printf("Launchd: not installed\n") 564 } 565 return nil 566 }, 567 } 568 569 type daemonEventHandler struct { 570 broker *daemon.ApprovalBroker 571 ctx context.Context 572 channel string 573 threadID string 574 agent string 575 autoApprove bool 576 shannonDir string 577 deps *daemon.ServerDeps 578 sessionID string // set by RunAgent after session resolution (EventBus spans sessions) 579 wsClient *daemon.Client // for event forwarding to Cloud 580 messageID string // scoped to current message 581 usage agent.UsageAccumulator 582 } 583 584 func (h *daemonEventHandler) SetSessionID(id string) { h.sessionID = id } 585 586 // Usage returns the cumulative usage collected during this handler's lifetime, 587 // split into LLM and gateway-tool billing so tool synthetic tokens don't 588 // corrupt the LLM token accounting. 589 func (h *daemonEventHandler) Usage() agent.AccumulatedUsage { return h.usage.Snapshot() } 590 591 // ResetUsage clears accumulated totals. Use between independent messages on 592 // the same long-lived handler so per-message cost reporting is accurate. 593 func (h *daemonEventHandler) ResetUsage() { h.usage.Reset() } 594 595 func (h *daemonEventHandler) OnToolCall(name string, args string) { 596 // Skip cloud_delegate — it has its own streaming path via SendProgressWithWorkflow. 597 // Forwarding it as a daemon event would conflict (creates a daemon: stream that 598 // never receives WORKFLOW_COMPLETED from the Temporal workflow). 599 if h.wsClient != nil && h.messageID != "" && name != "cloud_delegate" { 600 // Send empty message so StreamConsumer uses toolDisplayName mapping 601 // (e.g., "web_search" → "Searching the web") 602 if err := h.wsClient.SendEvent(h.messageID, "TOOL_INVOKED", "", map[string]interface{}{"tool": name}); err != nil { 603 log.Printf("daemon: event forward failed: %v", err) 604 } 605 } 606 } 607 func (h *daemonEventHandler) OnToolResult(name string, args string, result agent.ToolResult, elapsed time.Duration) { 608 log.Printf("daemon: tool %s completed (%.1fs)", name, elapsed.Seconds()) 609 if h.wsClient != nil && h.messageID != "" && name != "cloud_delegate" { 610 if err := h.wsClient.SendEvent(h.messageID, "TOOL_COMPLETED", "", map[string]interface{}{"tool": name, "elapsed": elapsed.Seconds()}); err != nil { 611 log.Printf("daemon: event forward failed: %v", err) 612 } 613 } 614 } 615 func (h *daemonEventHandler) OnText(text string) { 616 if h.wsClient != nil && h.messageID != "" { 617 if err := h.wsClient.SendEvent(h.messageID, "LLM_OUTPUT", text, nil); err != nil { 618 log.Printf("daemon: event forward failed: %v", err) 619 } 620 } 621 } 622 func (h *daemonEventHandler) OnStreamDelta(delta string) { 623 if h.wsClient != nil && h.messageID != "" { 624 if err := h.wsClient.SendEvent(h.messageID, "LLM_PARTIAL", delta, nil); err != nil { 625 log.Printf("daemon: event forward failed: %v", err) 626 } 627 } 628 } 629 func (h *daemonEventHandler) OnUsage(usage agent.TurnUsage) { 630 h.usage.Add(usage) 631 } 632 func (h *daemonEventHandler) OnCloudAgent(agentID, status, message string) {} 633 func (h *daemonEventHandler) OnCloudProgress(completed, total int) {} 634 func (h *daemonEventHandler) OnCloudPlan(planType, content string, needsReview bool) {} 635 636 // OnRunStatus satisfies agent.RunStatusHandler on daemonEventHandler. Actual 637 // bus emission now happens in busEventHandler (see multiHandler wiring in 638 // runner.go). This implementation is a no-op but must remain so the type 639 // assertion for RunStatusHandler continues to match the daemonEventHandler 640 // alongside any future WS-specific forwarding we add. 641 func (h *daemonEventHandler) OnRunStatus(code, detail string) {} 642 643 func (h *daemonEventHandler) OnApprovalNeeded(tool string, args string) bool { 644 if h.autoApprove { 645 log.Printf("daemon: auto-approving %s (auto_approve=true)", tool) 646 return true 647 } 648 decision := h.broker.Request(h.ctx, h.channel, h.threadID, h.agent, tool, args) 649 if decision == daemon.DecisionAlwaysAllow { 650 if tool == "bash" { 651 cmd := permissions.ExtractField(args, "command") 652 if cmd != "" { 653 if err := config.AppendAllowedCommand(h.shannonDir, cmd); err != nil { 654 log.Printf("daemon: failed to persist always-allow: %v", err) 655 } else { 656 // Update in-memory config under write lock. 657 // Access deps.Config directly (not a captured pointer) so 658 // we always mutate the current config, even after reloads. 659 h.deps.WriteLock() 660 perms := &h.deps.Config.Permissions 661 if !containsString(perms.AllowedCommands, cmd) { 662 perms.AllowedCommands = append(perms.AllowedCommands, cmd) 663 } 664 h.deps.WriteUnlock() 665 log.Printf("daemon: always-allow persisted: %s", cmd) 666 } 667 } 668 } else { 669 h.broker.SetToolAutoApprove(tool) 670 log.Printf("daemon: always-allow (session): %s", tool) 671 } 672 } 673 return decision == daemon.DecisionAllow || decision == daemon.DecisionAlwaysAllow 674 } 675 676 // autoApproveHandler is a minimal EventHandler for internal triggers (watcher, heartbeat). 677 type autoApproveHandler struct { 678 usage agent.UsageAccumulator 679 } 680 681 // Usage returns the cumulative usage collected during this handler's lifetime. 682 func (h *autoApproveHandler) Usage() agent.AccumulatedUsage { return h.usage.Snapshot() } 683 684 func (h *autoApproveHandler) OnToolCall(name string, args string) {} 685 func (h *autoApproveHandler) OnToolResult(name string, args string, result agent.ToolResult, elapsed time.Duration) { 686 log.Printf("daemon: tool %s completed (%.1fs)", name, elapsed.Seconds()) 687 } 688 func (h *autoApproveHandler) OnText(text string) {} 689 func (h *autoApproveHandler) OnStreamDelta(delta string) {} 690 func (h *autoApproveHandler) OnUsage(usage agent.TurnUsage) { h.usage.Add(usage) } 691 func (h *autoApproveHandler) OnCloudAgent(agentID, status, message string) {} 692 func (h *autoApproveHandler) OnCloudProgress(completed, total int) {} 693 func (h *autoApproveHandler) OnCloudPlan(planType, content string, needsReview bool) {} 694 func (h *autoApproveHandler) OnApprovalNeeded(tool string, args string) bool { return true } 695 696 func containsString(slice []string, s string) bool { 697 for _, v := range slice { 698 if v == s { 699 return true 700 } 701 } 702 return false 703 } 704 705 func stopExistingDaemon(pidPath string) { 706 // Try graceful HTTP shutdown. 707 client := &http.Client{Timeout: 2 * time.Second} 708 resp, err := client.Post("http://127.0.0.1:7533/shutdown", "application/json", nil) 709 if err == nil { 710 resp.Body.Close() 711 } 712 713 // If HTTP failed, try SIGTERM via PID file. 714 if err != nil { 715 if pid, locked := daemon.IsLocked(pidPath); locked && pid > 0 { 716 if proc, err := os.FindProcess(pid); err == nil { 717 proc.Signal(syscall.SIGTERM) 718 } 719 } 720 } 721 722 // Wait for lock to be released (up to 3s). 723 deadline := time.After(3 * time.Second) 724 ticker := time.NewTicker(200 * time.Millisecond) 725 defer ticker.Stop() 726 for { 727 select { 728 case <-deadline: 729 log.Printf("daemon: existing daemon did not stop within 3s, proceeding anyway") 730 return 731 case <-ticker.C: 732 if _, locked := daemon.IsLocked(pidPath); !locked { 733 return 734 } 735 } 736 } 737 } 738 739 func daemonStartDetached() error { 740 shanDir := config.ShannonDir() 741 pidPath := filepath.Join(shanDir, "daemon.pid") 742 743 if _, locked := daemon.IsLocked(pidPath); locked { 744 return fmt.Errorf("daemon is already running (PID file locked)") 745 } 746 747 logDir := filepath.Join(shanDir, "logs") 748 if err := os.MkdirAll(logDir, 0755); err != nil { 749 return fmt.Errorf("create log dir: %w", err) 750 } 751 logPath := filepath.Join(logDir, "daemon.log") 752 753 plistContent := daemon.GenerateDaemonPlist(daemon.ShanBinary(), logPath) 754 plistPath := daemon.DaemonPlistPath() 755 if err := daemon.WriteDaemonPlist(plistPath, plistContent); err != nil { 756 return fmt.Errorf("write plist: %w", err) 757 } 758 759 if err := daemon.LaunchctlBootstrap(plistPath); err != nil { 760 return fmt.Errorf("launchctl bootstrap: %w", err) 761 } 762 763 fmt.Printf("Daemon started via launchd.\n") 764 fmt.Printf(" Plist: %s\n", plistPath) 765 fmt.Printf(" Logs: %s\n", logPath) 766 fmt.Printf("Use 'shan daemon stop' to stop.\n") 767 return nil 768 } 769 770 func truncateReply(s string, n int) string { 771 if len(s) <= n { 772 return s 773 } 774 return s[:n] + "..." 775 } 776 777 func collectAgentWatches(agentsDir string) map[string][]watcher.WatchEntry { 778 result := make(map[string][]watcher.WatchEntry) 779 entries, err := agents.ListAgents(agentsDir) 780 if err != nil { 781 return result 782 } 783 for _, entry := range entries { 784 a, err := agents.LoadAgent(agentsDir, entry.Name) 785 if err != nil || a.Config == nil || len(a.Config.Watch) == 0 { 786 continue 787 } 788 for _, w := range a.Config.Watch { 789 result[entry.Name] = append(result[entry.Name], watcher.WatchEntry{ 790 Path: w.Path, 791 Glob: w.Glob, 792 }) 793 } 794 } 795 return result 796 } 797 798 func init() { 799 daemonStartCmd.Flags().Bool("force", false, "Stop any existing daemon before starting") 800 daemonStartCmd.Flags().BoolP("detach", "d", false, "Run as background service via launchd (macOS only)") 801 daemonCmd.AddCommand(daemonStartCmd) 802 daemonCmd.AddCommand(daemonStopCmd) 803 daemonCmd.AddCommand(daemonStatusCmd) 804 rootCmd.AddCommand(daemonCmd) 805 }