cloud_delegate.go
1 package tools 2 3 import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/Kocoro-lab/ShanClaw/internal/agent" 10 "github.com/Kocoro-lab/ShanClaw/internal/client" 11 ) 12 13 // OnWorkflowStartedFunc is called when cloud_delegate gets a workflow_id from Cloud. 14 // Used by daemon mode to send progress with workflow_id for streaming card replies. 15 type OnWorkflowStartedFunc func(workflowID string) 16 17 type contextKeyWorkflowStarted struct{} 18 19 // WithOnWorkflowStarted returns a context carrying a per-request workflow started callback. 20 // This is safe for concurrent use (each message gets its own context). 21 func WithOnWorkflowStarted(ctx context.Context, fn OnWorkflowStartedFunc) context.Context { 22 return context.WithValue(ctx, contextKeyWorkflowStarted{}, fn) 23 } 24 25 type CloudDelegateTool struct { 26 gw *client.GatewayClient 27 apiKey string 28 timeout time.Duration 29 handler agent.EventHandler 30 agentName string 31 agentPrompt string 32 } 33 34 type cloudDelegateArgs struct { 35 Task string `json:"task"` 36 Context string `json:"context,omitempty"` 37 WorkflowType string `json:"workflow_type,omitempty"` 38 Terminal *bool `json:"terminal,omitempty"` 39 } 40 41 func NewCloudDelegateTool(gw *client.GatewayClient, apiKey string, timeout time.Duration, handler agent.EventHandler, agentName, agentPrompt string) *CloudDelegateTool { 42 return &CloudDelegateTool{ 43 gw: gw, 44 apiKey: apiKey, 45 timeout: timeout, 46 handler: handler, 47 agentName: agentName, 48 agentPrompt: agentPrompt, 49 } 50 } 51 52 // SetHandler updates the event handler. Used when the handler isn't available 53 // at registration time (e.g., TUI creates handler per-run). 54 func (t *CloudDelegateTool) SetHandler(h agent.EventHandler) { 55 t.handler = h 56 } 57 58 // SetAgentContext updates the agent identity forwarded to Shannon Cloud. 59 // Used in daemon mode where the agent isn't known at registration time. 60 func (t *CloudDelegateTool) SetAgentContext(name, prompt string) { 61 t.agentName = name 62 t.agentPrompt = prompt 63 } 64 65 func (t *CloudDelegateTool) Info() agent.ToolInfo { 66 return agent.ToolInfo{ 67 Name: "cloud_delegate", 68 Description: "Delegate to Shannon Cloud. Remote, 5-15 min, expensive.\n\n" + 69 "Use cloud_delegate ONLY when the task contains 3+ sub-investigations that\n" + 70 "each require a DIFFERENT source and a DIFFERENT query strategy, and only\n" + 71 "need to converge at the end (intermediate state sharing between agents is fine).\n\n" + 72 "Key distinction — do not confuse these:\n" + 73 " - OUTPUT cardinality (return N items in a list) → NOT parallelism\n" + 74 " - INVESTIGATION cardinality (run N different queries\n" + 75 " on N different sources with N different strategies) → may warrant cloud\n\n" + 76 "A single platform returning a long list is ONE investigation, regardless\n" + 77 "of list length. Use local tools.\n\n" + 78 "Do NOT use cloud_delegate when:\n" + 79 " - One query on one platform can return the list (even if list is long)\n" + 80 " - Task iterates on a single topic/entity with follow-up queries\n" + 81 " - The task names one domain, one source, or one entity\n" + 82 " - The user asks to \"find N X's\" on a specific platform\n\n" + 83 "Local routing by task shape:\n" + 84 " - List/enumerate/find on any single platform → x_search\n" + 85 " - Iterative research on one topic → x_search + web_fetch\n" + 86 " - Fetch a specific URL → web_fetch or http\n" + 87 " - Save output → file_write", 88 Parameters: map[string]any{ 89 "type": "object", 90 "properties": map[string]any{ 91 "task": map[string]any{ 92 "type": "string", 93 "description": "The task to delegate. Be specific and detailed about what you need.", 94 }, 95 "context": map[string]any{ 96 "type": "string", 97 "description": "Optional context to include with the task (max 8000 chars). Can include relevant code snippets, data, or background information.", 98 }, 99 "workflow_type": map[string]any{ 100 "type": "string", 101 "enum": []string{"research", "swarm", "auto"}, 102 "description": "Execution mode. Assumes the gate in the top-level description has already passed — this parameter does NOT expand eligibility for calling cloud_delegate. " + 103 "'auto' (default): system picks a DAG based on task shape. " + 104 "'research': fixed research DAG, ~5 min. " + 105 "'swarm': dynamic sub-agent spawning with shared workspace, 10-15 min; use only when sub-agents need to exchange intermediate files.", 106 }, 107 "terminal": map[string]any{ 108 "type": "boolean", 109 "description": "If true, return the cloud result directly to the user (bypasses further processing). If false, feed the result back into your context so you can continue working with it (e.g., write files, run tests, apply changes). Defaults to true for 'research' workflow, false otherwise.", 110 }, 111 }, 112 }, 113 Required: []string{"task"}, 114 } 115 } 116 117 func (t *CloudDelegateTool) Run(ctx context.Context, argsJSON string) (agent.ToolResult, error) { 118 var args cloudDelegateArgs 119 if err := json.Unmarshal([]byte(argsJSON), &args); err != nil { 120 return agent.ToolResult{Content: fmt.Sprintf("invalid arguments: %v", err), IsError: true}, nil 121 } 122 123 if args.Task == "" { 124 return agent.ToolResult{Content: "task is required", IsError: true}, nil 125 } 126 127 // Cap context length 128 if len(args.Context) > 8000 { 129 args.Context = args.Context[:8000] 130 } 131 132 // Build context map based on workflow_type 133 taskContext := make(map[string]any) 134 if args.Context != "" { 135 taskContext["user_context"] = args.Context 136 } 137 switch args.WorkflowType { 138 case "research": 139 taskContext["force_research"] = true 140 case "swarm": 141 taskContext["force_swarm"] = true 142 case "auto", "": 143 // no flag — let the system decide 144 } 145 146 if t.agentName != "" { 147 taskContext["agent_name"] = t.agentName 148 if t.agentPrompt != "" { 149 taskContext["agent_instructions"] = t.agentPrompt 150 } 151 } 152 153 taskReq := client.TaskRequest{ 154 Query: args.Task, 155 Context: taskContext, 156 } 157 158 if t.gw == nil { 159 return agent.ToolResult{Content: "cloud delegation not available: gateway not configured", IsError: true}, nil 160 } 161 162 // Apply timeout 163 timeoutCtx, cancel := context.WithTimeout(ctx, t.timeout) 164 defer cancel() 165 166 resp, err := t.gw.SubmitTaskStream(timeoutCtx, taskReq) 167 if err != nil { 168 return agent.ToolResult{Content: fmt.Sprintf("failed to submit task: %v", err), IsError: true}, nil 169 } 170 171 // Notify daemon of workflow_id so Cloud can start streaming card replies 172 if resp.WorkflowID != "" { 173 if fn, ok := ctx.Value(contextKeyWorkflowStarted{}).(OnWorkflowStartedFunc); ok && fn != nil { 174 fn(resp.WorkflowID) 175 } 176 } 177 178 // Resolve stream URL 179 streamURL := resp.StreamURL 180 if streamURL == "" { 181 streamURL = t.gw.StreamURL(resp.WorkflowID) 182 } 183 streamURL = t.gw.ResolveURL(streamURL) 184 185 var finalResult string 186 var workflowErr error 187 var cloudUsage agent.TurnUsage 188 189 // Enable cloud streaming on handlers that support it (e.g., TUI) 190 type cloudStreamToggle interface { 191 SetCloudStreaming(bool) 192 } 193 if cs, ok := t.handler.(cloudStreamToggle); ok { 194 cs.SetCloudStreaming(true) 195 defer cs.SetCloudStreaming(false) 196 } 197 198 err = client.StreamSSE(timeoutCtx, streamURL, t.apiKey, func(ev client.SSEEvent) { 199 var event struct { 200 Message string `json:"message"` 201 AgentID string `json:"agent_id"` 202 Delta string `json:"delta"` 203 Response string `json:"response"` 204 Type string `json:"type"` 205 Payload map[string]interface{} `json:"payload"` 206 } 207 json.Unmarshal([]byte(ev.Data), &event) 208 209 switch ev.Event { 210 // --- Streaming deltas --- 211 case "thread.message.delta", "LLM_PARTIAL": 212 // Only stream deltas from synthesis / final_output / swarm-lead / single-agent (empty) to user 213 if t.handler != nil && (event.AgentID == "final_output" || event.AgentID == "swarm-lead" || event.AgentID == "synthesis" || event.AgentID == "") { 214 delta := event.Delta 215 if delta == "" { 216 delta = event.Message 217 } 218 if delta != "" { 219 t.handler.OnStreamDelta(delta) 220 } 221 } 222 223 // --- Final result --- 224 case "thread.message.completed", "LLM_OUTPUT": 225 if event.AgentID == "title_generator" { 226 break // skip title generation output 227 } 228 if event.Response != "" { 229 finalResult = event.Response 230 } 231 // Accumulate usage from LLM_OUTPUT metadata 232 t.accumulateUsage(ev.Data, &cloudUsage) 233 234 // --- HITL: research plan review --- 235 case "RESEARCH_PLAN_READY": 236 // Surface the plan to the user, then auto-approve 237 if t.handler != nil && event.Message != "" { 238 t.handler.OnCloudPlan("research_plan", event.Message, true) 239 } 240 // Auto-approve so the workflow continues (matches Desktop's autoApprove: "on" default) 241 go t.gw.ApproveReviewPlan(timeoutCtx, resp.WorkflowID) 242 243 case "RESEARCH_PLAN_UPDATED": 244 // Updated plan from feedback — surface to user 245 if t.handler != nil && event.Message != "" { 246 t.handler.OnCloudPlan("research_plan_updated", event.Message, false) 247 } 248 249 case "RESEARCH_PLAN_APPROVED": 250 // Plan approved, execution starting 251 if t.handler != nil { 252 t.handler.OnCloudPlan("approved", "", false) 253 } 254 255 case "APPROVAL_REQUESTED": 256 // General approval request — auto-approve 257 if t.handler != nil && event.Message != "" { 258 t.handler.OnStreamDelta("\n[Approval requested: " + event.Message + " — auto-approving]\n") 259 } 260 go t.gw.ApproveReviewPlan(timeoutCtx, resp.WorkflowID) 261 262 // --- Status events — only surface user-facing milestones --- 263 case "AGENT_STARTED": 264 if t.handler != nil { 265 t.handler.OnCloudAgent(event.AgentID, "started", statusMsg(event.AgentID, event.Message, "Agent working...")) 266 } 267 case "AGENT_COMPLETED": 268 if t.handler != nil { 269 t.handler.OnCloudAgent(event.AgentID, "completed", statusMsg(event.AgentID, event.Message, "Agent completed")) 270 } 271 case "AGENT_THINKING": 272 if len(event.Message) <= 100 && t.handler != nil { 273 t.handler.OnCloudAgent("", "thinking", statusMsg("", event.Message, "Thinking...")) 274 } 275 case "TOOL_INVOKED", "TOOL_STARTED": 276 if t.handler != nil { 277 t.handler.OnCloudAgent("", "tool", statusMsg("", event.Message, "Calling tool...")) 278 } 279 280 case "DATA_PROCESSING": 281 // Use a semantic label for pre-planning / data prep. Was "synthesis", 282 // which confusingly implies the final summarization step and also 283 // collides with Shannon Cloud's real `synthesis` agent ID (line ~213 284 // filter). `preparing` reflects what DATA_PROCESSING actually is. 285 if msg := event.Message; msg != "" && len(msg) <= 150 && t.handler != nil { 286 t.handler.OnCloudAgent("preparing", "processing", msg) 287 } 288 289 // --- Internal plumbing — silently ignore --- 290 case "WORKFLOW_STARTED", "TOOL_OBSERVATION", "TOOL_COMPLETED", 291 "DELEGATION", "PROGRESS", "STATUS_UPDATE", "WAITING": 292 // Drop — these are too verbose for the desktop UI 293 case "APPROVAL_DECISION": 294 // no-op 295 296 // --- Swarm-specific events --- 297 case "LEAD_DECISION": 298 if msg := event.Message; msg != "" && len(msg) <= 150 && t.handler != nil { 299 t.handler.OnCloudAgent("", "thinking", msg) 300 } 301 case "TASKLIST_UPDATED": 302 if payload := event.Payload; payload != nil { 303 if tasks, ok := payload["tasks"].([]interface{}); ok && len(tasks) > 0 { 304 completed := 0 305 for _, task := range tasks { 306 if tm, ok := task.(map[string]interface{}); ok { 307 if tm["status"] == "completed" { 308 completed++ 309 } 310 } 311 } 312 if t.handler != nil { 313 t.handler.OnCloudProgress(completed, len(tasks)) 314 } 315 } 316 } 317 case "HITL_RESPONSE": 318 if event.Message != "" && t.handler != nil { 319 t.handler.OnCloudAgent("", "thinking", "Lead responding to your input") 320 } 321 322 case "WORKFLOW_COMPLETED": 323 if finalResult == "" { 324 finalResult = event.Message 325 } 326 327 case "WORKFLOW_FAILED", "error", "ERROR_OCCURRED": 328 workflowErr = fmt.Errorf("workflow failed: %s", event.Message) 329 330 case "workflow.cancelled": 331 workflowErr = fmt.Errorf("workflow cancelled") 332 } 333 }) 334 335 // Report accumulated cloud usage 336 if t.handler != nil && cloudUsage.LLMCalls > 0 { 337 t.handler.OnUsage(cloudUsage) 338 } 339 340 // Handle timeout 341 if err != nil && timeoutCtx.Err() == context.DeadlineExceeded { 342 if finalResult != "" { 343 return agent.ToolResult{Content: fmt.Sprintf("[cloud_delegate timed out after %s, returning partial result]\n\n%s", t.timeout, finalResult)}, nil 344 } 345 return agent.ToolResult{Content: fmt.Sprintf("cloud task timed out after %s with no result", t.timeout), IsError: true}, nil 346 } 347 348 if err != nil { 349 return agent.ToolResult{Content: fmt.Sprintf("stream error: %v", err), IsError: true}, nil 350 } 351 352 if workflowErr != nil { 353 return agent.ToolResult{Content: workflowErr.Error(), IsError: true}, nil 354 } 355 356 if finalResult == "" { 357 return agent.ToolResult{Content: "workflow completed but returned no response", IsError: true}, nil 358 } 359 360 // API fallback: SSE events may be truncated (cloud caps at 10K runes). 361 // Always attempt to fetch the full result from the REST API. 362 // Only mark as CloudResult (bypass LLM summarization) when we have 363 // a confirmed full result — either from SSE or from the API fallback. 364 fullResultConfirmed := true 365 taskID := resp.TaskID 366 if taskID == "" { 367 taskID = resp.WorkflowID 368 } 369 if taskID != "" && t.gw != nil { 370 apiCtx, apiCancel := context.WithTimeout(ctx, 10*time.Second) 371 defer apiCancel() 372 if task, apiErr := t.gw.GetTask(apiCtx, taskID); apiErr == nil && task.Result != "" { 373 if len(task.Result) > len(finalResult) { 374 // API returned a longer result — SSE was truncated 375 finalResult = task.Result 376 } 377 // API succeeded: we have the canonical full result 378 } else { 379 // API fallback failed — SSE result may be truncated 380 fullResultConfirmed = false 381 } 382 } 383 384 // Determine terminal mode: explicit arg takes precedence, 385 // otherwise research defaults to terminal, swarm/auto default to non-terminal. 386 terminal := args.WorkflowType == "research" 387 if args.Terminal != nil { 388 terminal = *args.Terminal 389 } 390 391 return agent.ToolResult{Content: finalResult, CloudResult: fullResultConfirmed && terminal}, nil 392 } 393 394 func (t *CloudDelegateTool) RequiresApproval() bool { return true } 395 396 func (t *CloudDelegateTool) IsReadOnlyCall(string) bool { return false } 397 398 // accumulateUsage extracts usage metadata from LLM_OUTPUT events and adds it to the running total. 399 func (t *CloudDelegateTool) accumulateUsage(data string, usage *agent.TurnUsage) { 400 // Shannon Cloud sends usage info in "metadata" field of LLM_OUTPUT events 401 var meta struct { 402 Metadata *struct { 403 InputTokens int `json:"input_tokens"` 404 OutputTokens int `json:"output_tokens"` 405 TokensUsed int `json:"tokens_used"` 406 CostUSD float64 `json:"cost_usd"` 407 CacheReadTokens int `json:"cache_read_tokens"` 408 CacheCreationTokens int `json:"cache_creation_tokens"` 409 CacheCreation5mTokens int `json:"cache_creation_5m_tokens"` 410 CacheCreation1hTokens int `json:"cache_creation_1h_tokens"` 411 ModelUsed string `json:"model_used"` 412 } `json:"metadata"` 413 } 414 if err := json.Unmarshal([]byte(data), &meta); err != nil || meta.Metadata == nil { 415 return 416 } 417 usage.Add(client.Usage{ 418 InputTokens: meta.Metadata.InputTokens, 419 OutputTokens: meta.Metadata.OutputTokens, 420 TotalTokens: meta.Metadata.TokensUsed, 421 CostUSD: meta.Metadata.CostUSD, 422 CacheReadTokens: meta.Metadata.CacheReadTokens, 423 CacheCreationTokens: meta.Metadata.CacheCreationTokens, 424 CacheCreation5mTokens: meta.Metadata.CacheCreation5mTokens, 425 CacheCreation1hTokens: meta.Metadata.CacheCreation1hTokens, 426 }) 427 if meta.Metadata.ModelUsed != "" { 428 usage.Model = meta.Metadata.ModelUsed 429 } 430 } 431 432 // statusMsg returns message if non-empty, otherwise fallback. 433 // Prepends agentID label if present. 434 func statusMsg(agentID, message, fallback string) string { 435 msg := message 436 if msg == "" { 437 msg = fallback 438 } 439 if agentID != "" && agentID != "orchestrator" && agentID != "streaming" { 440 return "[" + agentID + "] " + msg 441 } 442 return msg 443 } 444 445 // Ensure CloudDelegateTool implements SafeChecker to always require approval. 446 var _ agent.SafeChecker = (*CloudDelegateTool)(nil) 447 448 func (t *CloudDelegateTool) IsSafeArgs(_ string) bool { return false }