runner.go
1 package daemon 2 3 import ( 4 "context" 5 "crypto/rand" 6 "encoding/base64" 7 "encoding/hex" 8 "encoding/json" 9 "errors" 10 "fmt" 11 "log" 12 "net/url" 13 "os" 14 "path/filepath" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/Kocoro-lab/ShanClaw/internal/agent" 20 "github.com/Kocoro-lab/ShanClaw/internal/agents" 21 "github.com/Kocoro-lab/ShanClaw/internal/audit" 22 "github.com/Kocoro-lab/ShanClaw/internal/client" 23 "github.com/Kocoro-lab/ShanClaw/internal/config" 24 "github.com/Kocoro-lab/ShanClaw/internal/cwdctx" 25 "github.com/Kocoro-lab/ShanClaw/internal/hooks" 26 "github.com/Kocoro-lab/ShanClaw/internal/mcp" 27 "github.com/Kocoro-lab/ShanClaw/internal/memory" 28 "github.com/Kocoro-lab/ShanClaw/internal/runstatus" 29 "github.com/Kocoro-lab/ShanClaw/internal/schedule" 30 "github.com/Kocoro-lab/ShanClaw/internal/session" 31 "github.com/Kocoro-lab/ShanClaw/internal/skills" 32 "github.com/Kocoro-lab/ShanClaw/internal/tools" 33 ) 34 35 var ( 36 disconnectPlaywrightAfterIdleFn = func(mgr *mcp.ClientManager, d time.Duration) { 37 mgr.DisconnectAfterIdle("playwright", d) 38 } 39 disconnectPlaywrightNowFn = func(mgr *mcp.ClientManager) { 40 mgr.Disconnect("playwright") 41 } 42 stopPlaywrightChromeFn = mcp.StopCDPChrome 43 ) 44 45 // RequestContentBlock represents a content block in the POST /message request. 46 // Supported types: "text" and "image" (passed through to LLM), "file_ref" (resolved by daemon). 47 type RequestContentBlock struct { 48 Type string `json:"type"` 49 Text string `json:"text,omitempty"` 50 Source *client.ImageSource `json:"source,omitempty"` 51 FilePath string `json:"file_path,omitempty"` 52 Filename string `json:"filename,omitempty"` 53 ByteSize int64 `json:"byte_size,omitempty"` 54 } 55 56 // RunAgentRequest is the input for RunAgent. 57 type RunAgentRequest struct { 58 Text string `json:"text"` 59 Content []RequestContentBlock `json:"content,omitempty"` // multimodal content blocks (optional) 60 Agent string `json:"agent,omitempty"` 61 SessionID string `json:"session_id,omitempty"` 62 NewSession bool `json:"new_session,omitempty"` 63 Source string `json:"source,omitempty"` // "slack", "line", "shanclaw", "webhook" 64 Sender string `json:"sender,omitempty"` // user identifier from channel 65 Channel string `json:"channel,omitempty"` // channel/thread source context 66 ThreadID string `json:"thread_id,omitempty"` // thread context for messaging platforms 67 CWD string `json:"cwd,omitempty"` // absolute project path override 68 RouteKey string `json:"-"` // internal routing key 69 Ephemeral bool `json:"-"` // caller owns persistence + events 70 ModelOverride string `json:"-"` // overrides agent model tier 71 BypassRouting bool `json:"-"` // skip route lock (heartbeat runs) 72 SessionHistory []client.Message `json:"-"` // pre-loaded history for LLM context (BypassRouting runs) 73 StickyContext string `json:"-"` // 额外的 sticky context,注入系统提示(对用户不可见) 74 Files []RemoteFile `json:"-"` // remote file attachments from Cloud (WS only) 75 } 76 77 // Validate checks that the request has the minimum required fields. 78 func (r *RunAgentRequest) Validate() error { 79 if strings.TrimSpace(r.Text) == "" && len(r.Content) == 0 { 80 return fmt.Errorf("text or content is required") 81 } 82 if r.Agent != "" { 83 if err := agents.ValidateAgentName(r.Agent); err != nil { 84 return err 85 } 86 } 87 if r.CWD != "" { 88 if err := cwdctx.ValidateCWD(r.CWD); err != nil { 89 return fmt.Errorf("invalid cwd: %w", err) 90 } 91 } 92 return nil 93 } 94 95 // ComputeRouteKey builds the route key for session cache/locking decisions. 96 func ComputeRouteKey(req RunAgentRequest) string { 97 if req.BypassRouting { 98 return "" 99 } 100 if req.Agent != "" { 101 return "agent:" + req.Agent 102 } 103 if req.SessionID != "" { 104 return "session:" + sanitizeRouteValue(req.SessionID) 105 } 106 if req.NewSession || shouldBypassRouteCache(req.Source) { 107 return "" 108 } 109 if req.Source != "" && req.Channel != "" { 110 return "default:" + sanitizeRouteValue(req.Source) + ":" + sanitizeRouteValue(req.Channel) 111 } 112 return "" 113 } 114 115 func shouldBypassRouteCache(source string) bool { 116 switch strings.ToLower(strings.TrimSpace(source)) { 117 case "", ChannelWeb, "webhook", "cron", ChannelSchedule, ChannelSystem: 118 return true 119 default: 120 return false 121 } 122 } 123 124 func sanitizeRouteValue(value string) string { 125 trimmed := strings.TrimSpace(value) 126 if trimmed == "" { 127 return "" 128 } 129 return url.PathEscape(trimmed) 130 } 131 132 // resolveContentBlocks converts request content blocks into client.ContentBlock 133 // values suitable for the LLM. "text" and "image" blocks are passed through; 134 // "file_ref" blocks are resolved by reading the referenced file from disk. 135 func resolveContentBlocks(blocks []RequestContentBlock) []client.ContentBlock { 136 out := make([]client.ContentBlock, 0, len(blocks)) 137 for _, b := range blocks { 138 switch b.Type { 139 case "text": 140 out = append(out, client.ContentBlock{Type: "text", Text: b.Text}) 141 case "image": 142 out = append(out, client.ContentBlock{Type: "image", Source: b.Source}) 143 case "document": 144 out = append(out, client.ContentBlock{Type: "document", Source: b.Source}) 145 case "file_ref": 146 out = append(out, resolveFileRef(b)...) 147 } 148 } 149 return out 150 } 151 152 // imageExtensions are sent as base64 image content blocks to the LLM. 153 var imageExtensions = map[string]string{ 154 ".jpg": "image/jpeg", ".jpeg": "image/jpeg", 155 ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", 156 } 157 158 // resolveFileRef returns the appropriate content blocks for a file_ref. 159 // Images → model-visible path hint plus base64 image block so the agent has 160 // both a reusable file handle and inline vision access. 161 // All other files → text hint with path so the agent reads via file_read tool. 162 func resolveFileRef(b RequestContentBlock) []client.ContentBlock { 163 ext := strings.ToLower(filepath.Ext(b.Filename)) 164 165 // Images must be inline base64 — Claude vision requires image data in the request body. 166 if mimeType, ok := imageExtensions[ext]; ok { 167 info, err := os.Stat(b.FilePath) 168 if err != nil { 169 log.Printf("WARNING: failed to read attached image %s: %v", b.FilePath, err) 170 return []client.ContentBlock{{ 171 Type: "text", 172 Text: fmt.Sprintf("[Error: unable to read image %s]", b.Filename), 173 }} 174 } 175 const maxInlineImage = 20 * 1024 * 1024 // 20 MB 176 if info.Size() > maxInlineImage { 177 return []client.ContentBlock{{ 178 Type: "text", 179 Text: fmt.Sprintf("[User attached image: %s (%d bytes) at path: %s — too large for inline vision (max %d bytes). Use file_read or another file-based tool with this path.]", 180 b.Filename, info.Size(), b.FilePath, maxInlineImage), 181 }} 182 } 183 data, err := os.ReadFile(b.FilePath) 184 if err != nil { 185 log.Printf("WARNING: failed to read attached image %s: %v", b.FilePath, err) 186 return []client.ContentBlock{{ 187 Type: "text", 188 Text: fmt.Sprintf("[Error: unable to read image %s]", b.Filename), 189 }} 190 } 191 encoded := base64.StdEncoding.EncodeToString(data) 192 return []client.ContentBlock{ 193 { 194 Type: "text", 195 Text: fmt.Sprintf("[User attached image: %s (%d bytes) at path: %s — the image is included inline below for vision. Use the path if a tool needs the original file.]", 196 b.Filename, info.Size(), b.FilePath), 197 }, 198 { 199 Type: "image", 200 Source: &client.ImageSource{Type: "base64", MediaType: mimeType, Data: encoded}, 201 }, 202 } 203 } 204 205 // PDF files: file_read natively renders PDF pages as images for vision. 206 if ext == ".pdf" { 207 return []client.ContentBlock{{ 208 Type: "text", 209 Text: fmt.Sprintf("[User attached PDF: %s (%d bytes) at path: %s — use file_read to analyze (it renders PDF pages as images for vision). Use offset for start page, limit for max pages.]", 210 b.Filename, b.ByteSize, b.FilePath), 211 }} 212 } 213 214 // All other files: let the agent use file_read to access content on demand. 215 return []client.ContentBlock{{ 216 Type: "text", 217 Text: fmt.Sprintf("[User attached file: %s (%d bytes) at path: %s — use the file_read tool to read its contents]", 218 b.Filename, b.ByteSize, b.FilePath), 219 }} 220 } 221 222 // extractUserFilePaths collects file paths from file_ref content blocks. 223 // These paths represent files the user explicitly attached, so tool access 224 // to them should be auto-approved without prompting. 225 func extractUserFilePaths(blocks []RequestContentBlock) []string { 226 var paths []string 227 for _, b := range blocks { 228 if b.Type == "file_ref" && b.FilePath != "" { 229 paths = append(paths, b.FilePath) 230 } 231 } 232 return paths 233 } 234 235 // buildUserMsgContent creates the MessageContent for the user message. 236 // If resolved content contains non-text blocks (images), uses block array format. 237 // Otherwise, merges all text into a single string for maximum gateway compatibility. 238 func buildUserMsgContent(prompt string, resolvedContent []client.ContentBlock) client.MessageContent { 239 if len(resolvedContent) == 0 { 240 return client.NewTextContent(prompt) 241 } 242 243 // Check if any block requires array format (images, documents). 244 needsBlocks := false 245 for _, b := range resolvedContent { 246 if b.Type != "text" { 247 needsBlocks = true 248 break 249 } 250 } 251 252 if needsBlocks { 253 blocks := resolvedContent 254 if prompt != "" { 255 blocks = append([]client.ContentBlock{{Type: "text", Text: prompt}}, blocks...) 256 } 257 return client.NewBlockContent(blocks) 258 } 259 260 // Text-only: merge into single string. 261 merged := prompt 262 for _, b := range resolvedContent { 263 if b.Text != "" { 264 merged += "\n\n" + b.Text 265 } 266 } 267 return client.NewTextContent(merged) 268 } 269 270 // hasPDFAttachment returns true if any file_ref block has a .pdf extension. 271 func hasPDFAttachment(blocks []RequestContentBlock) bool { 272 for _, b := range blocks { 273 if b.Type == "file_ref" && strings.ToLower(filepath.Ext(b.Filename)) == ".pdf" { 274 return true 275 } 276 } 277 return false 278 } 279 280 // injectBundledSkill appends a bundled skill to the list if not already present. 281 func injectBundledSkill(existing []*skills.Skill, shannonDir, name string) []*skills.Skill { 282 for _, s := range existing { 283 if s.Name == name { 284 return existing // already loaded 285 } 286 } 287 src, err := skills.BundledSkillSource(shannonDir) 288 if err != nil { 289 log.Printf("daemon: failed to load bundled skill source for %q: %v", name, err) 290 return existing 291 } 292 loaded, err := skills.LoadSkills(src) 293 if err != nil { 294 log.Printf("daemon: failed to load bundled skill %q: %v", name, err) 295 return existing 296 } 297 for _, s := range loaded { 298 if s.Name == name { 299 return append(existing, s) 300 } 301 } 302 return existing 303 } 304 305 // EnsureRouteKey computes and sets the route key if not already set. 306 func (req *RunAgentRequest) EnsureRouteKey() { 307 if req == nil { 308 return 309 } 310 if req.RouteKey == "" { 311 req.RouteKey = ComputeRouteKey(*req) 312 } 313 } 314 315 // outputFormatForSource maps a request source to an output format profile. 316 // Only explicit cloud-distributed channel sources use "plain" — Shannon Cloud 317 // handles final channel rendering for these (Slack mrkdwn, LINE Flex, etc.). 318 // Everything else (local, cron, schedule, web, unknown) defaults to "markdown". 319 // 320 // Shares its cloud-source definition with ensureCloudSessionTmpDir via 321 // isCloudSource; the two paths must agree on what "cloud-routed" means or the 322 // allocator and the formatter would drift apart silently. 323 func outputFormatForSource(source string) string { 324 if isCloudSource(source) { 325 return "plain" 326 } 327 return "markdown" 328 } 329 330 // cacheSourceFromDaemonSource maps the daemon-level source (slack/webhook/ 331 // cron/mcp/tui/...) to the cache_source string Shannon uses for prompt-cache 332 // TTL routing. Channel messages + interactive use → long bucket (1h). Fire-and- 333 // forget paths → short bucket (5m). See docs/cache-strategy.md. 334 // 335 // Unknown / unclassified sources deliberately fall through to "unknown" → 336 // Shannon routes unknown to 5m (fail cheap, not fail expensive). 337 func cacheSourceFromDaemonSource(source string) string { 338 s := strings.ToLower(strings.TrimSpace(source)) 339 switch s { 340 case "slack", "line", "feishu", "lark", "telegram": 341 // Human-conversation channels: idle gaps > 5m are common, 1h pays off. 342 return s 343 case "tui", "shanclaw": 344 // Interactive sessions: TUI and ShanClaw Desktop both have idle gaps >> 5m. 345 return s 346 case "cache_bench": 347 // Synthetic benchmark traffic — treat as long-bucket so bench measures 348 // reflect the production channel-message configuration. 349 return "cache_bench" 350 case "webhook", "cron", "schedule", "mcp": 351 // One-shot paths — each invocation starts fresh, no resume. 352 return s 353 default: 354 return "unknown" 355 } 356 } 357 358 func routeTitle(source, channel, sender string) string { 359 if source == "" { 360 return "" 361 } 362 s := strings.ToLower(strings.TrimSpace(source)) 363 if s == "" { 364 return "" 365 } 366 label := strings.ToUpper(s[:1]) + s[1:] 367 368 // Use sender name when available (e.g. "Slack · Wayland") 369 if sender != "" { 370 return label + " · " + sender 371 } 372 // Fall back to channel if it differs from source (avoid "Slack slack") 373 if channel != "" && strings.ToLower(channel) != s { 374 return label + " · " + channel 375 } 376 return label 377 } 378 379 // RunAgentResult is the output from RunAgent. 380 type RunAgentResult struct { 381 Reply string `json:"reply"` 382 SessionID string `json:"session_id"` 383 Agent string `json:"agent"` 384 Usage RunAgentUsage `json:"usage"` 385 // Partial=true + FailureCode indicate the run completed "softly" — the 386 // reply is valid and should be shown, but the loop layer flagged it as 387 // abnormal (e.g. loop-detector force-stop). Treat as a soft warning, not 388 // an error. 389 Partial bool `json:"partial,omitempty"` 390 FailureCode runstatus.Code `json:"failure_code,omitempty"` 391 } 392 393 // RunAgentUsage tracks token and cost information for a single agent run. 394 type RunAgentUsage struct { 395 InputTokens int `json:"input_tokens"` 396 OutputTokens int `json:"output_tokens"` 397 TotalTokens int `json:"total_tokens"` 398 CostUSD float64 `json:"cost_usd"` 399 } 400 401 // ServerDeps holds shared dependencies required by both the WS callback 402 // and the HTTP server for running agent loops. 403 type ServerDeps struct { 404 mu sync.RWMutex // guards Config, Registry, Cleanup during reload 405 Config *config.Config 406 GW *client.GatewayClient 407 Registry *agent.ToolRegistry 408 MCPManager *mcp.ClientManager // live MCP connections; swapped on reload 409 Supervisor *mcp.Supervisor // MCP health supervisor; swapped on reload 410 Cleanup func() // closes MCP connections; swapped on reload 411 BaselineReg *agent.ToolRegistry // local-only tools; refreshed on reload 412 GatewayOverlay []agent.Tool // cached gateway tools; refreshed on reload 413 PostOverlays []agent.Tool // cloud_delegate etc.; refreshed on reload 414 ShannonDir string 415 AgentsDir string 416 Auditor *audit.AuditLogger 417 HookRunner *hooks.HookRunner 418 SessionCache *SessionCache 419 EventBus *EventBus 420 ScheduleManager *schedule.Manager 421 WSClient *Client // WebSocket client for proactive messages 422 SecretsStore *skills.SecretsStore // skill secrets for env injection 423 MemSvc *memory.Service // structured memory orchestrator (Phase 2.3) 424 } 425 426 // Snapshot returns current Config, Registry, and Supervisor under read lock. 427 // Callers use the returned values without holding the lock. 428 func (d *ServerDeps) Snapshot() (*config.Config, *agent.ToolRegistry, *mcp.Supervisor) { 429 d.mu.RLock() 430 cfg, reg, sup := d.Config, d.Registry, d.Supervisor 431 d.mu.RUnlock() 432 return cfg, reg, sup 433 } 434 435 // ShutdownCleanup captures and calls the current Cleanup function under lock, 436 // preventing races with concurrent reload swaps. 437 func (d *ServerDeps) ShutdownCleanup() { 438 d.mu.Lock() 439 cleanup := d.Cleanup 440 d.Cleanup = nil 441 d.mu.Unlock() 442 if cleanup != nil { 443 cleanup() 444 } 445 } 446 447 // WriteLock acquires the write lock on ServerDeps. Used by daemon event 448 // handler to update in-memory config (e.g., always-allow persistence). 449 func (d *ServerDeps) WriteLock() { d.mu.Lock() } 450 func (d *ServerDeps) WriteUnlock() { d.mu.Unlock() } 451 452 // RebuildLayers returns the cached rebuild layers under read lock. 453 func (d *ServerDeps) RebuildLayers() (*agent.ToolRegistry, []agent.Tool, []agent.Tool, *mcp.ClientManager) { 454 d.mu.RLock() 455 bl, gw, po, mgr := d.BaselineReg, d.GatewayOverlay, d.PostOverlays, d.MCPManager 456 d.mu.RUnlock() 457 return bl, gw, po, mgr 458 } 459 460 func cleanupPlaywrightAfterTurn(mgr *mcp.ClientManager) { 461 if mgr == nil { 462 return 463 } 464 cfg, ok := mgr.ConfigFor("playwright") 465 if !ok || cfg.KeepAlive { 466 return 467 } 468 if mcp.IsPlaywrightCDPMode(cfg) { 469 disconnectPlaywrightNowFn(mgr) 470 stopPlaywrightChromeFn() 471 log.Printf("daemon: Playwright on-demand teardown completed") 472 return 473 } 474 disconnectPlaywrightAfterIdleFn(mgr, 5*time.Minute) 475 log.Printf("daemon: Playwright idle disconnect scheduled (5m)") 476 } 477 478 // resumeNamedAgentColdStart resumes the latest persisted named-agent session. 479 // Returns true only when a session was actually loaded from disk; a fresh 480 // in-memory session pre-created by the route manager does not count as resumed. 481 func resumeNamedAgentColdStart(sessMgr *session.Manager) (bool, error) { 482 latest, err := sessMgr.ResumeLatest() 483 if err != nil { 484 return false, err 485 } 486 if latest != nil { 487 return true, nil 488 } 489 if sessMgr.Current() == nil { 490 sessMgr.NewSession() 491 } 492 return false, nil 493 } 494 495 // RunAgent executes a single agent turn using the shared dependencies. 496 // The caller provides an EventHandler to control streaming, approval, and 497 // event reporting (WS uses daemonEventHandler, HTTP uses httpEventHandler). 498 func RunAgent(ctx context.Context, deps *ServerDeps, req RunAgentRequest, handler agent.EventHandler) (*RunAgentResult, error) { 499 // Phase 1: read supervisor atomically, probe if needed 500 cfg, _, sup := deps.Snapshot() 501 if cfg == nil || deps.GW == nil || deps.SessionCache == nil { 502 return nil, fmt.Errorf("daemon not fully configured") 503 } 504 if sup != nil { 505 // Cancel any pending idle disconnect — a new turn is starting. 506 if _, _, _, mgr := deps.RebuildLayers(); mgr != nil { 507 mgr.CancelIdleDisconnect("playwright") 508 } 509 // Only probe+reconnect Playwright when it's not already disconnected. 510 // When the user closes Chrome, the periodic probe marks it Disconnected. 511 // Calling ProbeNow on a Disconnected server triggers attemptReconnect, 512 // which relaunches Chrome — disruptive if the task doesn't need browser tools. 513 if h := sup.HealthFor("playwright"); h.State != mcp.StateDisconnected { 514 sup.ProbeNow("playwright") 515 } 516 } 517 // Phase 2: re-snapshot to get post-swap registry 518 cfg, baseReg, _ := deps.Snapshot() 519 if baseReg == nil { 520 return nil, fmt.Errorf("daemon not fully configured") 521 } 522 agentName := req.Agent 523 prompt := req.Text 524 525 // Download remote file attachments and convert to file_ref blocks. 526 // Attachment files must survive across turns (non-image files become 527 // file_read hints in session history). Cleanup uses sessMgr.OnClose 528 // (append-style, fires on manager close) — not OnSessionClose (which 529 // replaces per-session and would clobber previous turns' cleanup). 530 // The defer is a safety net for early-return errors before sessMgr 531 // is available; it's cancelled once OnClose takes ownership. 532 var attachmentCleanup func() 533 var attachmentRegistered bool 534 defer func() { 535 if !attachmentRegistered && attachmentCleanup != nil { 536 attachmentCleanup() 537 } 538 }() 539 if len(req.Content) > 0 { 540 var inlineCleanup func() 541 req.Content, inlineCleanup = materializeInlineImageBlocks(deps.ShannonDir, req.Content) 542 attachmentCleanup = combineCleanup(attachmentCleanup, inlineCleanup) 543 } 544 if len(req.Files) > 0 { 545 var fileBlocks []RequestContentBlock 546 var remoteCleanup func() 547 fileBlocks, remoteCleanup = downloadRemoteFiles(deps.ShannonDir, req.Files) 548 attachmentCleanup = combineCleanup(attachmentCleanup, remoteCleanup) 549 req.Content = append(req.Content, fileBlocks...) 550 // Zero auth headers to prevent lingering tokens in memory. 551 for i := range req.Files { 552 req.Files[i].AuthHeader = "" 553 } 554 } 555 556 // Resolve multimodal content blocks (if present). 557 var resolvedContent []client.ContentBlock 558 if len(req.Content) > 0 { 559 resolvedContent = resolveContentBlocks(req.Content) 560 } 561 562 // "default" is not a real agent — it means "use base agent, no --agent flag". 563 if agentName == "default" { 564 agentName = "" 565 } 566 req.Agent = agentName 567 explicitAgent := agentName != "" // explicitly requested, not parsed from @mention 568 569 // Parse @mention if no explicit agent was provided. 570 if agentName == "" { 571 agentName, prompt = agents.ParseAgentMention(req.Text) 572 } 573 if prompt == "" { 574 prompt = req.Text 575 } 576 577 var agentOverride *agents.Agent 578 if agentName != "" { 579 a, loadErr := agents.LoadAgent(deps.AgentsDir, agentName) 580 if loadErr != nil { 581 if explicitAgent { 582 return nil, fmt.Errorf("agent not found: %s", agentName) 583 } 584 // @mention fallback: use default agent 585 log.Printf("daemon: agent %q not found: %v, using default", agentName, loadErr) 586 agentName = "" 587 prompt = req.Text 588 } else { 589 agentOverride = a 590 } 591 } 592 // Resolve agent-scoped slash command: "/cmd-name args" → command content. 593 if agentOverride != nil && strings.HasPrefix(prompt, "/") { 594 parts := strings.Fields(prompt) 595 cmdName := strings.TrimPrefix(parts[0], "/") 596 if content, ok := agentOverride.Commands[cmdName]; ok { 597 args := "" 598 if len(parts) > 1 { 599 args = strings.Join(parts[1:], " ") 600 } 601 prompt = strings.ReplaceAll(content, "$ARGUMENTS", args) 602 } 603 } 604 req.Text = prompt 605 // Recompute route key after final agent resolution. 606 // Callers may precompute a default/source-channel key before @mention parsing. 607 // Recomputing here avoids cross-route contamination. 608 req.RouteKey = ComputeRouteKey(req) 609 610 sessionsDir := deps.SessionCache.SessionsDir(agentName) 611 var sessMgr *session.Manager 612 613 var route *routeEntry 614 var routeDone chan struct{} 615 var routeInjectCh chan agent.InjectedMessage 616 // Empty route key = no cache entry for routing, always start a fresh local session. 617 if req.RouteKey != "" { 618 route = deps.SessionCache.LockRouteWithManager(req.RouteKey, sessionsDir) 619 sessMgr = route.manager 620 reqCtx, cancel := context.WithCancel(ctx) 621 routeDone = make(chan struct{}) 622 routeInjectCh = make(chan agent.InjectedMessage, 10) 623 deps.SessionCache.SetRouteRunState(req.RouteKey, routeDone, nil, "") 624 ctx = reqCtx 625 // Register cancel under sc.mu so CancelRoute sees it immediately. 626 // Also fires cancel right away if CancelRoute already set cancelPending. 627 deps.SessionCache.SetRouteCancel(req.RouteKey, cancel) 628 defer func() { 629 deps.SessionCache.ClearRouteRunState(req.RouteKey) 630 closeRouteDone(routeDone) 631 route.cancel = nil 632 // Set sessionID directly — do NOT call SetRouteSessionID which 633 // would try to acquire route.mu again (same deadlock). 634 if current := sessMgr.Current(); current != nil { 635 route.sessionID = current.ID 636 } 637 deps.SessionCache.UnlockRoute(req.RouteKey) 638 }() 639 } else { 640 managerDir := sessionsDir 641 if req.BypassRouting { 642 tmpDir, tmpErr := os.MkdirTemp("", "heartbeat-*") 643 if tmpErr != nil { 644 return nil, fmt.Errorf("create temp session dir: %w", tmpErr) 645 } 646 defer os.RemoveAll(tmpDir) 647 managerDir = tmpDir 648 } 649 sessMgr = session.NewManager(managerDir) 650 defer func() { 651 if err := sessMgr.Close(); err != nil { 652 log.Printf("daemon: failed to close ephemeral session manager for %q: %v", managerDir, err) 653 } 654 }() 655 } 656 657 resumed := false 658 switch { 659 case req.SessionID != "": 660 // Resume a specific session by ID (reuses cached manager to avoid DB handle leak). 661 if _, err := sessMgr.Resume(req.SessionID); err != nil { 662 return nil, fmt.Errorf("session not found: %s", req.SessionID) 663 } 664 resumed = true 665 case req.NewSession || req.RouteKey == "": 666 sessMgr.NewSession() 667 case route != nil && route.sessionID != "": 668 if _, err := sessMgr.Resume(route.sessionID); err != nil { 669 log.Printf("daemon: failed to resume routed session %q for %q: %v", route.sessionID, req.RouteKey, err) 670 sessMgr.NewSession() 671 } else { 672 resumed = true 673 } 674 case strings.HasPrefix(req.RouteKey, "agent:"): 675 // Named-agent cold start (first run or after daemon restart). 676 // route.sessionID is empty — resume latest from disk, or start fresh if none. 677 if resumedLatest, err := resumeNamedAgentColdStart(sessMgr); err != nil { 678 log.Printf("daemon: failed to resume latest named-agent session for %q: %v", req.RouteKey, err) 679 if sessMgr.Current() == nil { 680 sessMgr.NewSession() 681 } 682 } else { 683 resumed = resumedLatest 684 } 685 default: 686 sessMgr.NewSession() 687 } 688 sess := sessMgr.Current() 689 690 // Seed pre-loaded history for bypass-routed runs (e.g., heartbeat). 691 // The throwaway manager has an empty session; this gives the LLM context. 692 if len(req.SessionHistory) > 0 { 693 sess.Messages = req.SessionHistory 694 } 695 696 // Resolve effective CWD: request > resumed session > agent config. When all 697 // three are empty we deliberately do NOT invent a working directory for 698 // most sources — the request runs with no filesystem scope, and filesystem 699 // tools (glob, grep, file_read, directory_list) will refuse any relative 700 // paths at the tool level. Web-only and pure-reasoning tasks are unaffected. 701 // 702 // Cloud-routed sources (slack/line/feishu/lark/telegram/webhook) are the 703 // one exception: they arrive with no user shell and no persisted CWD, so a 704 // tool like browser_snapshot(filename="x.md") has nowhere to land and 705 // file_read("x.md") can't resolve it. For those we allocate a per-session 706 // scratch dir under ~/.shannon/tmp/sessions/<id>/ as the lowest-priority 707 // fallback. Any real CWD (request/resumed/agent) still wins. 708 var sessionCWD string 709 if resumed { 710 sessionCWD = sess.CWD 711 } 712 var agentCWD string 713 if agentOverride != nil && agentOverride.Config != nil { 714 agentCWD = agentOverride.Config.CWD 715 } 716 effectiveCWD := cwdctx.ResolveEffectiveCWD(req.CWD, sessionCWD, agentCWD) 717 var cloudSessionCWD string 718 if effectiveCWD == "" { 719 if dir, err := ensureCloudSessionTmpDir(deps.ShannonDir, sess.ID, req.Source); err != nil { 720 log.Printf("daemon: failed to allocate cloud session cwd for %s: %v", sess.ID, err) 721 } else if dir != "" { 722 cloudSessionCWD = dir 723 effectiveCWD = dir 724 } 725 } 726 if effectiveCWD != "" { 727 if err := cwdctx.ValidateCWD(effectiveCWD); err != nil { 728 return nil, fmt.Errorf("invalid cwd: %w", err) 729 } 730 } 731 if req.RouteKey != "" { 732 deps.SessionCache.SetRouteRunState(req.RouteKey, routeDone, routeInjectCh, effectiveCWD) 733 } 734 runCfg, err := config.RuntimeConfigForCWD(cfg, effectiveCWD) 735 if err != nil { 736 return nil, fmt.Errorf("runtime config: %w", err) 737 } 738 // Only write back when we have a real CWD — avoid poisoning the session 739 // with an empty value and avoid overwriting an existing non-empty session 740 // CWD with an empty fallback. Cloud scratch dirs are deliberately NOT 741 // persisted: they live under ~/.shannon/tmp/sessions/<id>/, get removed 742 // on session close, and must be re-allocated on every resume. Persisting 743 // them would leave sess.CWD pointing at a now-deleted path, and the next 744 // run would fail ValidateCWD before it could recreate the scratch. 745 if effectiveCWD != "" && cloudSessionCWD == "" { 746 sess.CWD = effectiveCWD 747 } 748 ctx = cwdctx.WithSessionCWD(ctx, effectiveCWD) 749 750 // Wrap the transport handler with a bus-emitting handler so every run 751 // publishes progress events regardless of transport. See 752 // docs/superpowers/specs/2026-04-23-event-bus-progress-coverage-design.md. 753 bus := &busEventHandler{deps: deps, agent: agentName} 754 handler = &multiHandler{handlers: []agent.EventHandler{handler, bus}} 755 756 // Notify handler of resolved session ID so it can include it in EventBus payloads. 757 if setter, ok := handler.(interface{ SetSessionID(string) }); ok { 758 setter.SetSessionID(sess.ID) 759 } 760 761 // Route notify tool calls through the EventBus so attached SSE clients 762 // (typically the Desktop app) render the banner via UNUserNotificationCenter 763 // with correct app attribution and click-through routing. Falls back to 764 // the direct osascript path only when EmitTo reports zero deliveries — 765 // either because no client is subscribed, or because every subscriber's 766 // buffer was full. Using EmitTo's delivery count (rather than a liveness 767 // check) means a single stalled subscriber cannot swallow notifications 768 // into a silent void. 769 if deps.EventBus != nil { 770 sessID := sess.ID 771 notifyAgent := agentName 772 notifySource := req.Source 773 ctx = tools.WithNotifyHandler(ctx, func(title, body string, sound bool) bool { 774 payload, _ := json.Marshal(map[string]any{ 775 "session_id": sessID, 776 "agent": notifyAgent, 777 "source": notifySource, 778 "title": title, 779 "body": body, 780 "sound": sound, 781 }) 782 return deps.EventBus.EmitTo(Event{Type: EventNotification, Payload: payload}) > 0 783 }) 784 } 785 786 // Persist session to disk before loop.Run() so there's a record even if 787 // the daemon crashes mid-execution. The final save after completion is 788 // still needed to capture the assistant's reply. 789 // Ephemeral requests skip persistence — the caller owns session lifecycle. 790 if !req.Ephemeral { 791 if req.Source != "" && req.Channel != "" { 792 sess.Source = req.Source 793 sess.Channel = req.Channel 794 } 795 // Only set source-derived title for non-named-agent routes. 796 // Named agents always get session.AgentTitle in the post-loop block. 797 if sess.Title == "New session" && req.RouteKey != "" && !strings.HasPrefix(req.RouteKey, "agent:") { 798 title := routeTitle(req.Source, req.Channel, req.Sender) 799 if title != "" { 800 sess.Title = title 801 } 802 } 803 if err := sessMgr.Save(); err != nil { 804 log.Printf("daemon: failed to pre-save session: %v", err) 805 } 806 } 807 808 // Snapshot history BEFORE appending the user message so loop.Run(prompt, history) 809 // does not receive the user message twice (once as prompt, once in history). 810 // HistoryForLoop strips prior loop-injected guardrail nudges (MessageMeta 811 // .SystemInjected) so they cannot leak into the current run's conversation 812 // snapshot — see session.Session.HistoryForLoop for the full rationale. 813 history := sess.HistoryForLoop() 814 815 // For externally-sourced messages (Slack, LINE, etc.), persist the user message 816 // before the agent loop so the UI can display it immediately on notification. 817 // preLoopUserAppended tracks the in-memory append (not save success) to prevent 818 // double-appending in the post-loop persist block. 819 userMsgTime := time.Now() 820 var preLoopUserAppended bool 821 if !req.Ephemeral && req.Source != "" { 822 source := req.Source 823 if source == "" { 824 source = "unknown" 825 } 826 msgID := generateMessageID() 827 userMsgContent := buildUserMsgContent(prompt, resolvedContent) 828 sess.Messages = append(sess.Messages, 829 client.Message{Role: "user", Content: userMsgContent}, 830 ) 831 sess.MessageMeta = append(sess.MessageMeta, 832 session.MessageMeta{Source: source, MessageID: msgID, Timestamp: session.TimePtr(userMsgTime)}, 833 ) 834 preLoopUserAppended = true 835 if err := sessMgr.Save(); err != nil { 836 log.Printf("daemon: failed to pre-save user message: %v", err) 837 } else if deps.EventBus != nil { 838 payload, _ := json.Marshal(map[string]any{ 839 "agent": agentName, 840 "source": req.Source, 841 "sender": req.Sender, 842 "session_id": sess.ID, 843 "message_id": msgID, 844 "text": prompt, 845 }) 846 deps.EventBus.Emit(Event{Type: EventMessageReceived, Payload: payload}) 847 } 848 } 849 850 // Clone and apply per-agent tool filter 851 reg := tools.CloneWithRuntimeConfig(baseReg, runCfg) 852 if agentOverride != nil { 853 reg = tools.ApplyToolFilter(reg, agentOverride) 854 } 855 856 // Attach SecretsStore to the session-scoped bash tool so use_skill 857 // activations can expose skill secrets as child-process env vars. 858 // Baseline bash is created at daemon start before NewServer, so the 859 // store has to be wired here, after CloneWithRuntimeConfig has 860 // deep-copied bash for this run. 861 if deps.SecretsStore != nil { 862 if bashTool, ok := reg.Get("bash"); ok { 863 if bt, ok := bashTool.(*tools.BashTool); ok { 864 bt.SecretsStore = deps.SecretsStore 865 } 866 } 867 } 868 869 // Load skills (agent-scoped or global) and wire to registry 870 var loadedSkills []*skills.Skill 871 if agentOverride != nil { 872 loadedSkills = agentOverride.Skills 873 } else { 874 var err error 875 loadedSkills, err = agents.LoadGlobalSkills(deps.ShannonDir) 876 if err != nil { 877 log.Printf("WARNING: failed to load global skills: %v", err) 878 } 879 } 880 881 // Auto-inject bundled skills based on attached file types. 882 if hasPDFAttachment(req.Content) { 883 loadedSkills = injectBundledSkill(loadedSkills, deps.ShannonDir, "pdf-reader") 884 } 885 886 tools.SetRegistrySkills(reg, loadedSkills) 887 888 // Always expose local session search for daemon-served agents. 889 // Use the per-agent manager so searches are scoped to that agent's sessions. 890 tools.RegisterSessionSearch(reg, sessMgr) 891 892 // memory_recall — talks to the structured memory sidecar when ready and 893 // falls back to session keyword search + MEMORY.md grep otherwise. Always 894 // register; the tool itself decides whether to use the service or fallback 895 // based on the service's Status(). 896 var memSvc tools.MemoryQuerier 897 if deps.MemSvc != nil { 898 memSvc = deps.MemSvc 899 } 900 tools.RegisterMemoryTool(reg, memSvc, &daemonFallback{sessionMgr: sessMgr}) 901 902 loop := agent.NewAgentLoop(deps.GW, reg, runCfg.ModelTier, deps.ShannonDir, 903 runCfg.Agent.MaxIterations, runCfg.Tools.ResultTruncation, runCfg.Tools.ArgsTruncation, 904 &runCfg.Permissions, deps.Auditor, deps.HookRunner) 905 loop.SetMaxTokens(runCfg.Agent.MaxTokens) 906 loop.SetTemperature(runCfg.Agent.Temperature) 907 loop.SetContextWindow(runCfg.Agent.ContextWindow) 908 loop.SetEnableStreaming(false) 909 loop.SetDeltaProvider(agent.NewTemporalDelta()) 910 loop.SetCacheSource(cacheSourceFromDaemonSource(req.Source)) 911 loop.SetSkillDiscovery(runCfg.Agent.SkillDiscoveryEnabled()) 912 if agentOverride != nil { 913 scopedMCPCtx := tools.ResolveMCPContext(runCfg, agentOverride) 914 agentDir := filepath.Join(deps.ShannonDir, "agents", agentName) 915 loop.SwitchAgent(agentOverride.Prompt, agentDir, nil, scopedMCPCtx, loadedSkills) 916 } else { 917 loop.SetMemoryDir(filepath.Join(deps.ShannonDir, "memory")) 918 if loadedSkills != nil { 919 loop.SetSkills(loadedSkills) 920 } 921 scopedMCPCtx := tools.ResolveMCPContext(runCfg) 922 if scopedMCPCtx != "" { 923 loop.SetMCPContext(scopedMCPCtx) 924 } 925 } 926 if runCfg.Agent.Model != "" { 927 loop.SetSpecificModel(runCfg.Agent.Model) 928 } 929 if runCfg.Agent.Thinking { 930 if runCfg.Agent.ThinkingMode == "enabled" { 931 loop.SetThinking(&client.ThinkingConfig{Type: "enabled", BudgetTokens: runCfg.Agent.ThinkingBudget}) 932 } else { 933 loop.SetThinking(&client.ThinkingConfig{Type: "adaptive"}) 934 } 935 } 936 if runCfg.Agent.ReasoningEffort != "" { 937 loop.SetReasoningEffort(runCfg.Agent.ReasoningEffort) 938 } 939 // Per-agent model config overrides 940 if agentOverride != nil && agentOverride.Config != nil && agentOverride.Config.Agent != nil { 941 ac := agentOverride.Config.Agent 942 if ac.Model != nil { 943 loop.SetSpecificModel(*ac.Model) 944 } 945 if ac.MaxIterations != nil { 946 loop.SetMaxIterations(*ac.MaxIterations) 947 } 948 if ac.Temperature != nil { 949 loop.SetTemperature(*ac.Temperature) 950 } 951 if ac.MaxTokens != nil { 952 loop.SetMaxTokens(*ac.MaxTokens) 953 } 954 if ac.ContextWindow != nil { 955 loop.SetContextWindow(*ac.ContextWindow) 956 } 957 if ac.IdleSoftTimeoutSecs != nil { 958 runCfg.Agent.IdleSoftTimeoutSecs = *ac.IdleSoftTimeoutSecs 959 } 960 if ac.IdleHardTimeoutSecs != nil { 961 runCfg.Agent.IdleHardTimeoutSecs = *ac.IdleHardTimeoutSecs 962 } 963 } 964 // Apply idle-timeout config AFTER per-agent overrides have been folded 965 // into runCfg, otherwise agent-level opt-in/override silently does nothing. 966 loop.SetIdleTimeouts(runCfg.Agent.IdleSoftTimeoutSecs, runCfg.Agent.IdleHardTimeoutSecs) 967 if req.ModelOverride != "" { 968 loop.SetModelTier(req.ModelOverride) 969 } 970 // Inject session metadata as sticky context so it survives compaction. 971 { 972 var parts []string 973 if req.Source != "" { 974 parts = append(parts, "Source: "+req.Source) 975 } 976 if req.Channel != "" { 977 parts = append(parts, "Channel: "+req.Channel) 978 } 979 if req.Sender != "" { 980 parts = append(parts, "Sender: "+req.Sender) 981 } 982 if agentName != "" { 983 parts = append(parts, "Agent: "+agentName) 984 } 985 if req.StickyContext != "" { 986 parts = append(parts, req.StickyContext) 987 } 988 if len(parts) > 0 { 989 loop.SetStickyContext(strings.Join(parts, "\n")) 990 } 991 } 992 993 // Output format: cloud-distributed channels use "plain" (Shannon Cloud 994 // handles final channel rendering). Local sources keep "markdown" (default). 995 loop.SetOutputFormat(outputFormatForSource(req.Source)) 996 997 loop.SetHandler(handler) 998 999 // Wire handler and agent context to the per-run cloud_delegate copy. 1000 // Must use reg (cloned), not baseReg (shared), to avoid race across routes. 1001 if ct, ok := reg.Get("cloud_delegate"); ok { 1002 if cdt, ok := ct.(*tools.CloudDelegateTool); ok { 1003 cdt.SetHandler(handler) 1004 if agentOverride != nil { 1005 cdt.SetAgentContext(agentName, agentOverride.Prompt) 1006 } else { 1007 cdt.SetAgentContext("", "") 1008 } 1009 } 1010 } 1011 1012 if routeInjectCh != nil { 1013 loop.SetInjectCh(routeInjectCh) 1014 } 1015 loop.SetSessionID(sess.ID) 1016 loop.SetSessionCWD(effectiveCWD) 1017 loop.SetWorkingSet(sessMgr.WorkingSet(sess.ID)) 1018 // Always set (even nil) to clear paths from a previous run on a reused loop. 1019 loop.SetUserFilePaths(extractUserFilePaths(req.Content)) 1020 sessMgr.OnSessionClose(sess.ID, loop.SpillCleanupFunc()) 1021 1022 // file:// preview bridge: lazily-started loopback HTTP server that 1023 // rewrites browser_navigate(file://...) into http://127.0.0.1/<token>/… 1024 // so Playwright's Chromium deny-list doesn't strand the agent. 1025 // 1026 // Allowlist: the bridge only serves files already reachable by the 1027 // agent's other tools — the effective session CWD subtree plus any 1028 // explicit user-attached files. This prevents browser_navigate from 1029 // becoming an escape hatch that reads arbitrary local files outside 1030 // the normal file-access boundary. 1031 filePreview := tools.NewFilePreviewBridge() 1032 if effectiveCWD != "" { 1033 filePreview.AllowRoot(effectiveCWD) 1034 } 1035 for _, p := range extractUserFilePaths(req.Content) { 1036 filePreview.AllowFile(p) 1037 } 1038 sessMgr.OnSessionClose(sess.ID, func() { _ = filePreview.Close() }) 1039 if cloudSessionCWD != "" { 1040 // Reclaim the per-session scratch dir when the session is closed 1041 // (SessionCache eviction, daemon shutdown). Artifacts live across turns 1042 // of the same session but don't accumulate across sessions. 1043 sessMgr.OnSessionClose(sess.ID, cloudSessionTmpCleanup(cloudSessionCWD)) 1044 } 1045 ctx = tools.WithFilePreview(ctx, filePreview) 1046 if attachmentCleanup != nil { 1047 attachmentRegistered = true // cancel the defer safety net 1048 sessMgr.OnClose(attachmentCleanup) 1049 } 1050 1051 // Turn persistence: capture the session state at turn start so both the 1052 // mid-turn checkpoint hook and the post-turn final save can rebuild 1053 // messages + usage idempotently from (baseline + current loop state). 1054 // This is the single source of truth — no append-on-top anywhere in 1055 // the turn's persistence path, which would otherwise double-write any 1056 // transcript that crossed a checkpoint boundary. 1057 checkpointSource := req.Source 1058 if checkpointSource == "" { 1059 checkpointSource = "unknown" 1060 } 1061 turnBase := captureTurnBaseline(sess, checkpointSource, preLoopUserAppended) 1062 // The daemon handler implements agent.UsageProvider; extract once so 1063 // callsites pass a strongly-typed provider (or nil) to applyTurnState. 1064 var turnUsage usageProvider 1065 if up, ok := handler.(agent.UsageProvider); ok { 1066 turnUsage = up 1067 } 1068 loop.SetCheckpointMinInterval(2 * time.Second) // debounce in the loop, not here 1069 loop.SetCheckpointFunc(func(ctx context.Context) error { 1070 applyTurnState(sess, loop, turnUsage, turnBase) 1071 sess.InProgress = true 1072 if err := sessMgr.Save(); err != nil { 1073 log.Printf("daemon: mid-turn checkpoint save failed: %v", err) 1074 // Return the error so AgentLoop.maybeCheckpoint keeps the 1075 // dirty flag set and the next fire point retries. 1076 return err 1077 } 1078 return nil 1079 }) 1080 1081 result, usage, runErr := loop.Run(ctx, prompt, resolvedContent, history) 1082 status := loop.LastRunStatus() 1083 if runErr != nil && !isSoftRunError(runErr) { 1084 // Hard error — save a user-friendly error message so the session isn't 1085 // left with a dangling user message and no assistant reply. 1086 // Full error detail goes to the log; session/UI gets a clean summary. 1087 log.Printf("daemon: agent %s run error: %v", agentName, runErr) 1088 if status.FailureCode == runstatus.CodeNone { 1089 status.FailureCode = runstatus.CodeFromError(runErr) 1090 } 1091 userErr := FriendlyAgentError(runErr) 1092 savedSessionID := "" 1093 if !req.Ephemeral && result == "" { 1094 // Use the same idempotent rebuild as the mid-turn checkpoint 1095 // and the normal final save: reset messages+usage to 1096 // (baseline + current snapshot), then append the friendly 1097 // error stub on top. This handles three previously-broken cases: 1098 // (a) a prior checkpoint already persisted partial transcript 1099 // — we must not duplicate it by appending the error on 1100 // top of what's already there. 1101 // (b) a dirty checkpoint was debounced just before the error 1102 // — rebuilding from RunMessages picks up the trailing 1103 // batches that never got their own save. 1104 // (c) usage was already folded by a checkpoint — AddUsage 1105 // would double-count, so use baseline+current instead. 1106 applyTurnMessages(sess, loop, turnBase) 1107 sess.Messages = append(sess.Messages, 1108 client.Message{Role: "assistant", Content: client.NewTextContent(userErr)}, 1109 ) 1110 sess.MessageMeta = append(sess.MessageMeta, 1111 session.MessageMeta{Source: req.Source, Timestamp: session.TimePtr(time.Now())}, 1112 ) 1113 applyTurnUsage(sess, turnUsage, turnBase) 1114 sess.InProgress = false // hard-error path: turn is over, clear marker 1115 if err := sessMgr.Save(); err != nil { 1116 log.Printf("daemon: failed to save error session: %v", err) 1117 } else { 1118 savedSessionID = sess.ID 1119 } 1120 } 1121 if deps.EventBus != nil { 1122 payload, _ := json.Marshal(map[string]any{ 1123 "agent": agentName, 1124 "source": req.Source, 1125 "session_id": savedSessionID, 1126 "error": fmt.Sprintf("agent run failed: %v", runErr), 1127 "friendly_error": userErr, 1128 "failure_code": status.FailureCode, 1129 }) 1130 deps.EventBus.Emit(Event{Type: EventAgentError, Payload: payload}) 1131 } 1132 return nil, fmt.Errorf("agent error for %s: %w", agentName, runErr) 1133 } 1134 if errors.Is(runErr, agent.ErrMaxIterReached) { 1135 log.Printf("daemon: agent %s hit iteration limit, saving partial result", agentName) 1136 } 1137 1138 // Tracks persistence outcome so the return value can blank SessionID on 1139 // failure (in addition to the agent_reply gate inside the block below). 1140 // Stays nil for ephemeral requests, which is the desired "no failure" state. 1141 var saveErr error 1142 1143 // Ephemeral requests skip post-run persistence — the caller owns session lifecycle. 1144 if !req.Ephemeral { 1145 // Set title from first user message (named agents get a fixed title). 1146 if sess.Title == "New session" { 1147 if agentName != "" { 1148 sess.Title = session.AgentTitle(agentName) 1149 } else { 1150 sess.Title = session.Title(prompt) 1151 } 1152 } 1153 1154 // Final save uses the same (baseline + current snapshot) rebuild as 1155 // mid-turn checkpoints, so a turn that produced checkpoints never 1156 // gets its transcript double-written here. 1157 if len(loop.RunMessages()) > 0 { 1158 applyTurnMessages(sess, loop, turnBase) 1159 } else { 1160 // Fallback: flat text (early LLM error with nothing accumulated). 1161 // Truncate to baseline first so this path is also idempotent 1162 // under the (unusual) case where a prior checkpoint ran. 1163 if len(sess.Messages) > turnBase.msgCount { 1164 sess.Messages = sess.Messages[:turnBase.msgCount] 1165 } 1166 if len(sess.MessageMeta) > turnBase.metaCount { 1167 sess.MessageMeta = sess.MessageMeta[:turnBase.metaCount] 1168 } 1169 if !preLoopUserAppended { 1170 fallbackContent := buildUserMsgContent(prompt, resolvedContent) 1171 sess.Messages = append(sess.Messages, 1172 client.Message{Role: "user", Content: fallbackContent}, 1173 ) 1174 sess.MessageMeta = append(sess.MessageMeta, 1175 session.MessageMeta{Source: checkpointSource, Timestamp: session.TimePtr(userMsgTime)}, 1176 ) 1177 } 1178 replyTime := time.Now() 1179 sess.Messages = append(sess.Messages, 1180 client.Message{Role: "assistant", Content: client.NewTextContent(result)}, 1181 ) 1182 sess.MessageMeta = append(sess.MessageMeta, 1183 session.MessageMeta{Source: checkpointSource, Timestamp: session.TimePtr(replyTime)}, 1184 ) 1185 } 1186 applyTurnUsage(sess, turnUsage, turnBase) // idempotent: baseline + current 1187 sess.InProgress = false // turn completed — clear mid-turn crash marker 1188 saveErr = sessMgr.Save() 1189 if saveErr != nil { 1190 log.Printf("daemon: failed to save session: %v", saveErr) 1191 if deps.EventBus != nil { 1192 payload, _ := json.Marshal(map[string]any{ 1193 "agent": agentName, 1194 "source": req.Source, 1195 "session_id": sess.ID, 1196 "error": fmt.Sprintf("session save failed: %v", saveErr), 1197 "failure_code": runstatus.CodeUnexpected, 1198 }) 1199 deps.EventBus.Emit(Event{Type: EventAgentError, Payload: payload}) 1200 } 1201 } 1202 1203 // Only emit agent_reply when the session actually persisted. If the 1204 // save failed, the conversation is not on disk and downstream 1205 // consumers (e.g. desktop schedule notifications that click through 1206 // to the session) would point at a session that cannot be loaded. 1207 if saveErr == nil && deps.EventBus != nil { 1208 payload := map[string]any{ 1209 "agent": agentName, 1210 "source": req.Source, 1211 "session_id": sess.ID, 1212 "text": result, 1213 } 1214 // Soft-warning semantics: force-stop exits still emit a normal 1215 // agent_reply, but carry partial/failure_code so consumers can 1216 // show a non-error "stopped early" hint next to the text. 1217 if status.Partial { 1218 payload["partial"] = true 1219 payload["failure_code"] = status.FailureCode 1220 } 1221 payloadBytes, _ := json.Marshal(payload) 1222 deps.EventBus.Emit(Event{Type: EventAgentReply, Payload: payloadBytes}) 1223 } 1224 } 1225 1226 // Prefer handler-accumulated LLM totals (includes cloud_delegate nested 1227 // spend) for the model token fields. Tool billing rolls into CostUSD 1228 // on top of LLM cost but never into the token fields, so 1229 // input_tokens+output_tokens==total_tokens stays true for API consumers. 1230 reportedUsage := RunAgentUsage{ 1231 InputTokens: usage.InputTokens, 1232 OutputTokens: usage.OutputTokens, 1233 TotalTokens: usage.TotalTokens, 1234 CostUSD: usage.CostUSD, 1235 } 1236 if up, ok := handler.(agent.UsageProvider); ok { 1237 acc := up.Usage() 1238 llm := acc.LLM 1239 if llm.LLMCalls > 0 || llm.TotalTokens > 0 || llm.CostUSD > 0 || acc.ToolCostUSD > 0 { 1240 reportedUsage = RunAgentUsage{ 1241 InputTokens: llm.InputTokens, 1242 OutputTokens: llm.OutputTokens, 1243 TotalTokens: llm.TotalTokens, 1244 CostUSD: llm.CostUSD + acc.ToolCostUSD, 1245 } 1246 } 1247 } 1248 log.Printf("daemon: reply to %s (%d tokens, $%.4f)", agentName, reportedUsage.TotalTokens, reportedUsage.CostUSD) 1249 1250 // Respect the keep_alive toggle after each completed turn. 1251 if _, _, _, mgr := deps.RebuildLayers(); mgr != nil { 1252 cleanupPlaywrightAfterTurn(mgr) 1253 } 1254 1255 // On save failure, blank SessionID so HTTP/SSE clients can't click through 1256 // to a session that isn't on disk (matches the agent_reply gate above). 1257 returnedSessionID := sess.ID 1258 if saveErr != nil { 1259 returnedSessionID = "" 1260 } 1261 return &RunAgentResult{ 1262 Reply: result, 1263 SessionID: returnedSessionID, 1264 Agent: agentName, 1265 Usage: reportedUsage, 1266 Partial: status.Partial, 1267 FailureCode: status.FailureCode, 1268 }, nil 1269 } 1270 1271 func generateMessageID() string { 1272 b := make([]byte, 8) 1273 _, _ = rand.Read(b) 1274 return "msg-" + hex.EncodeToString(b) 1275 } 1276 1277 func closeRouteDone(done chan struct{}) { 1278 if done == nil { 1279 return 1280 } 1281 defer func() { 1282 if recover() != nil { 1283 // Best effort cleanup; callers may close defensively in multiple paths. 1284 // Avoid panic if the channel was already closed externally. 1285 } 1286 }() 1287 close(done) 1288 } 1289 1290 // isSoftRunError reports whether err is a normal termination (cancel, timeout, 1291 // max iterations) rather than a hard failure. Soft errors should persist the 1292 // full conversation from RunMessages(), not just a friendly error stub. 1293 func isSoftRunError(err error) bool { 1294 return errors.Is(err, agent.ErrMaxIterReached) || 1295 errors.Is(err, agent.ErrHardIdleTimeout) || 1296 errors.Is(err, context.Canceled) || 1297 errors.Is(err, context.DeadlineExceeded) 1298 } 1299 1300 // turnBaseline captures pre-turn session state so both mid-turn checkpoints 1301 // and the post-turn final save can idempotently rebuild the session from 1302 // (baseline + current loop snapshot) — never append-on-top. This is the 1303 // single persistence invariant for a turn: after applyTurnState runs, the 1304 // session reflects exactly one canonical transcript and one usage total 1305 // for the accumulated turn, no matter how many times the function is 1306 // called. 1307 type turnBaseline struct { 1308 msgCount int 1309 metaCount int 1310 usage session.UsageSummary // pre-turn cumulative usage; zero if sess.Usage was nil 1311 hadUsage bool // true if sess.Usage was non-nil at baseline 1312 source string 1313 preLoopUser bool 1314 } 1315 1316 // captureTurnBaseline snapshots sess state at turn start so subsequent 1317 // applyTurnState calls can rebuild idempotently. 1318 func captureTurnBaseline(sess *session.Session, source string, preLoopUserAppended bool) turnBaseline { 1319 b := turnBaseline{ 1320 msgCount: len(sess.Messages), 1321 metaCount: len(sess.MessageMeta), 1322 source: source, 1323 preLoopUser: preLoopUserAppended, 1324 } 1325 if sess.Usage != nil { 1326 b.usage = *sess.Usage 1327 b.hadUsage = true 1328 } 1329 return b 1330 } 1331 1332 // applyTurnMessages rebuilds sess.Messages/MessageMeta from baseline + 1333 // loop.RunMessages(). Idempotent — safe to call any number of times with 1334 // changing loop state (compaction shrinks etc.). 1335 func applyTurnMessages(sess *session.Session, loop *agent.AgentLoop, b turnBaseline) { 1336 if len(sess.Messages) > b.msgCount { 1337 sess.Messages = sess.Messages[:b.msgCount] 1338 } 1339 if len(sess.MessageMeta) > b.metaCount { 1340 sess.MessageMeta = sess.MessageMeta[:b.metaCount] 1341 } 1342 runMsgs := loop.RunMessages() 1343 if len(runMsgs) == 0 { 1344 return 1345 } 1346 runInjected := loop.RunMessageInjected() 1347 runTimestamps := loop.RunMessageTimestamps() 1348 startIdx := 0 1349 if b.preLoopUser && runMsgs[0].Role == "user" { 1350 startIdx = 1 1351 } 1352 fallbackTime := time.Now() 1353 for i := startIdx; i < len(runMsgs); i++ { 1354 ts := fallbackTime 1355 if i < len(runTimestamps) && !runTimestamps[i].IsZero() { 1356 ts = runTimestamps[i] 1357 } 1358 sess.Messages = append(sess.Messages, runMsgs[i]) 1359 meta := session.MessageMeta{Source: b.source, Timestamp: session.TimePtr(ts)} 1360 if i < len(runInjected) && runInjected[i] { 1361 meta.SystemInjected = true 1362 } 1363 sess.MessageMeta = append(sess.MessageMeta, meta) 1364 } 1365 } 1366 1367 // usageProvider is the local interface applyTurnUsage needs. Defined here 1368 // (rather than accepting agent.UsageProvider directly) so the caller type 1369 // is restricted at compile time — a future refactor that dropped the 1370 // interface on the daemon handler would fail to compile instead of 1371 // silently no-op'ing the usage folding at runtime. 1372 type usageProvider interface { 1373 Usage() agent.AccumulatedUsage 1374 } 1375 1376 // applyTurnUsage sets sess.Usage to (baseline + current accumulator). 1377 // Idempotent — no double-counting across checkpoint + final-save calls. 1378 // A nil provider is a no-op (used by unit tests that exercise only the 1379 // message path). 1380 func applyTurnUsage(sess *session.Session, up usageProvider, b turnBaseline) { 1381 if up == nil { 1382 return 1383 } 1384 acc := up.Usage() 1385 llm := acc.LLM 1386 hasTurnUsage := llm.LLMCalls > 0 || acc.ToolCalls > 0 || llm.InputTokens > 0 || 1387 llm.CostUSD > 0 || acc.ToolCostUSD > 0 1388 if !b.hadUsage && !hasTurnUsage { 1389 return 1390 } 1391 total := b.usage 1392 if hasTurnUsage { 1393 total.Add(session.UsageFromAccumulated( 1394 llm.LLMCalls, llm.InputTokens, llm.OutputTokens, llm.TotalTokens, 1395 llm.CostUSD, llm.CacheReadTokens, llm.CacheCreationTokens, llm.CacheCreation5mTokens, llm.CacheCreation1hTokens, llm.Model, 1396 acc.ToolCalls, acc.ToolCostUSD, 1397 )) 1398 } 1399 sess.Usage = &total 1400 if sess.SchemaVersion < 2 { 1401 sess.SchemaVersion = 2 1402 } 1403 } 1404 1405 // applyTurnState is the combined rebuild — messages + usage — used by 1406 // both mid-turn checkpoints and the post-turn final save so a turn is 1407 // never persisted twice via different paths. up may be nil (usage skipped). 1408 func applyTurnState(sess *session.Session, loop *agent.AgentLoop, 1409 up usageProvider, b turnBaseline) { 1410 applyTurnMessages(sess, loop, b) 1411 applyTurnUsage(sess, up, b) 1412 } 1413 1414 // FriendlyAgentError maps raw agent errors to user-facing messages. 1415 // Full error detail is logged separately; this keeps session/UI clean. 1416 func FriendlyAgentError(err error) string { 1417 return runstatus.FriendlyMessage(runstatus.CodeFromError(err)) 1418 }