/ internal / tools / cloud_delegate.go
cloud_delegate.go
  1  package tools
  2  
  3  import (
  4  	"context"
  5  	"encoding/json"
  6  	"fmt"
  7  	"time"
  8  
  9  	"github.com/Kocoro-lab/ShanClaw/internal/agent"
 10  	"github.com/Kocoro-lab/ShanClaw/internal/client"
 11  )
 12  
 13  // OnWorkflowStartedFunc is called when cloud_delegate gets a workflow_id from Cloud.
 14  // Used by daemon mode to send progress with workflow_id for streaming card replies.
 15  type OnWorkflowStartedFunc func(workflowID string)
 16  
 17  type contextKeyWorkflowStarted struct{}
 18  
 19  // WithOnWorkflowStarted returns a context carrying a per-request workflow started callback.
 20  // This is safe for concurrent use (each message gets its own context).
 21  func WithOnWorkflowStarted(ctx context.Context, fn OnWorkflowStartedFunc) context.Context {
 22  	return context.WithValue(ctx, contextKeyWorkflowStarted{}, fn)
 23  }
 24  
 25  type CloudDelegateTool struct {
 26  	gw          *client.GatewayClient
 27  	apiKey      string
 28  	timeout     time.Duration
 29  	handler     agent.EventHandler
 30  	agentName   string
 31  	agentPrompt string
 32  }
 33  
 34  type cloudDelegateArgs struct {
 35  	Task         string `json:"task"`
 36  	Context      string `json:"context,omitempty"`
 37  	WorkflowType string `json:"workflow_type,omitempty"`
 38  	Terminal     *bool  `json:"terminal,omitempty"`
 39  }
 40  
 41  func NewCloudDelegateTool(gw *client.GatewayClient, apiKey string, timeout time.Duration, handler agent.EventHandler, agentName, agentPrompt string) *CloudDelegateTool {
 42  	return &CloudDelegateTool{
 43  		gw:          gw,
 44  		apiKey:      apiKey,
 45  		timeout:     timeout,
 46  		handler:     handler,
 47  		agentName:   agentName,
 48  		agentPrompt: agentPrompt,
 49  	}
 50  }
 51  
 52  // SetHandler updates the event handler. Used when the handler isn't available
 53  // at registration time (e.g., TUI creates handler per-run).
 54  func (t *CloudDelegateTool) SetHandler(h agent.EventHandler) {
 55  	t.handler = h
 56  }
 57  
 58  // SetAgentContext updates the agent identity forwarded to Shannon Cloud.
 59  // Used in daemon mode where the agent isn't known at registration time.
 60  func (t *CloudDelegateTool) SetAgentContext(name, prompt string) {
 61  	t.agentName = name
 62  	t.agentPrompt = prompt
 63  }
 64  
 65  func (t *CloudDelegateTool) Info() agent.ToolInfo {
 66  	return agent.ToolInfo{
 67  		Name: "cloud_delegate",
 68  		Description: "Delegate to Shannon Cloud. Remote, 5-15 min, expensive.\n\n" +
 69  			"Use cloud_delegate ONLY when the task contains 3+ sub-investigations that\n" +
 70  			"each require a DIFFERENT source and a DIFFERENT query strategy, and only\n" +
 71  			"need to converge at the end (intermediate state sharing between agents is fine).\n\n" +
 72  			"Key distinction — do not confuse these:\n" +
 73  			"  - OUTPUT cardinality (return N items in a list)        → NOT parallelism\n" +
 74  			"  - INVESTIGATION cardinality (run N different queries\n" +
 75  			"    on N different sources with N different strategies)  → may warrant cloud\n\n" +
 76  			"A single platform returning a long list is ONE investigation, regardless\n" +
 77  			"of list length. Use local tools.\n\n" +
 78  			"Do NOT use cloud_delegate when:\n" +
 79  			"  - One query on one platform can return the list (even if list is long)\n" +
 80  			"  - Task iterates on a single topic/entity with follow-up queries\n" +
 81  			"  - The task names one domain, one source, or one entity\n" +
 82  			"  - The user asks to \"find N X's\" on a specific platform\n\n" +
 83  			"Local routing by task shape:\n" +
 84  			"  - List/enumerate/find on any single platform   → x_search\n" +
 85  			"  - Iterative research on one topic              → x_search + web_fetch\n" +
 86  			"  - Fetch a specific URL                         → web_fetch or http\n" +
 87  			"  - Save output                                  → file_write",
 88  		Parameters: map[string]any{
 89  			"type": "object",
 90  			"properties": map[string]any{
 91  				"task": map[string]any{
 92  					"type":        "string",
 93  					"description": "The task to delegate. Be specific and detailed about what you need.",
 94  				},
 95  				"context": map[string]any{
 96  					"type":        "string",
 97  					"description": "Optional context to include with the task (max 8000 chars). Can include relevant code snippets, data, or background information.",
 98  				},
 99  				"workflow_type": map[string]any{
100  					"type": "string",
101  					"enum": []string{"research", "swarm", "auto"},
102  					"description": "Execution mode. Assumes the gate in the top-level description has already passed — this parameter does NOT expand eligibility for calling cloud_delegate. " +
103  						"'auto' (default): system picks a DAG based on task shape. " +
104  						"'research': fixed research DAG, ~5 min. " +
105  						"'swarm': dynamic sub-agent spawning with shared workspace, 10-15 min; use only when sub-agents need to exchange intermediate files.",
106  				},
107  				"terminal": map[string]any{
108  					"type":        "boolean",
109  					"description": "If true, return the cloud result directly to the user (bypasses further processing). If false, feed the result back into your context so you can continue working with it (e.g., write files, run tests, apply changes). Defaults to true for 'research' workflow, false otherwise.",
110  				},
111  			},
112  		},
113  		Required: []string{"task"},
114  	}
115  }
116  
117  func (t *CloudDelegateTool) Run(ctx context.Context, argsJSON string) (agent.ToolResult, error) {
118  	var args cloudDelegateArgs
119  	if err := json.Unmarshal([]byte(argsJSON), &args); err != nil {
120  		return agent.ToolResult{Content: fmt.Sprintf("invalid arguments: %v", err), IsError: true}, nil
121  	}
122  
123  	if args.Task == "" {
124  		return agent.ToolResult{Content: "task is required", IsError: true}, nil
125  	}
126  
127  	// Cap context length
128  	if len(args.Context) > 8000 {
129  		args.Context = args.Context[:8000]
130  	}
131  
132  	// Build context map based on workflow_type
133  	taskContext := make(map[string]any)
134  	if args.Context != "" {
135  		taskContext["user_context"] = args.Context
136  	}
137  	switch args.WorkflowType {
138  	case "research":
139  		taskContext["force_research"] = true
140  	case "swarm":
141  		taskContext["force_swarm"] = true
142  	case "auto", "":
143  		// no flag — let the system decide
144  	}
145  
146  	if t.agentName != "" {
147  		taskContext["agent_name"] = t.agentName
148  		if t.agentPrompt != "" {
149  			taskContext["agent_instructions"] = t.agentPrompt
150  		}
151  	}
152  
153  	taskReq := client.TaskRequest{
154  		Query:   args.Task,
155  		Context: taskContext,
156  	}
157  
158  	if t.gw == nil {
159  		return agent.ToolResult{Content: "cloud delegation not available: gateway not configured", IsError: true}, nil
160  	}
161  
162  	// Apply timeout
163  	timeoutCtx, cancel := context.WithTimeout(ctx, t.timeout)
164  	defer cancel()
165  
166  	resp, err := t.gw.SubmitTaskStream(timeoutCtx, taskReq)
167  	if err != nil {
168  		return agent.ToolResult{Content: fmt.Sprintf("failed to submit task: %v", err), IsError: true}, nil
169  	}
170  
171  	// Notify daemon of workflow_id so Cloud can start streaming card replies
172  	if resp.WorkflowID != "" {
173  		if fn, ok := ctx.Value(contextKeyWorkflowStarted{}).(OnWorkflowStartedFunc); ok && fn != nil {
174  			fn(resp.WorkflowID)
175  		}
176  	}
177  
178  	// Resolve stream URL
179  	streamURL := resp.StreamURL
180  	if streamURL == "" {
181  		streamURL = t.gw.StreamURL(resp.WorkflowID)
182  	}
183  	streamURL = t.gw.ResolveURL(streamURL)
184  
185  	var finalResult string
186  	var workflowErr error
187  	var cloudUsage agent.TurnUsage
188  
189  	// Enable cloud streaming on handlers that support it (e.g., TUI)
190  	type cloudStreamToggle interface {
191  		SetCloudStreaming(bool)
192  	}
193  	if cs, ok := t.handler.(cloudStreamToggle); ok {
194  		cs.SetCloudStreaming(true)
195  		defer cs.SetCloudStreaming(false)
196  	}
197  
198  	err = client.StreamSSE(timeoutCtx, streamURL, t.apiKey, func(ev client.SSEEvent) {
199  		var event struct {
200  			Message  string                 `json:"message"`
201  			AgentID  string                 `json:"agent_id"`
202  			Delta    string                 `json:"delta"`
203  			Response string                 `json:"response"`
204  			Type     string                 `json:"type"`
205  			Payload  map[string]interface{} `json:"payload"`
206  		}
207  		json.Unmarshal([]byte(ev.Data), &event)
208  
209  		switch ev.Event {
210  		// --- Streaming deltas ---
211  		case "thread.message.delta", "LLM_PARTIAL":
212  			// Only stream deltas from synthesis / final_output / swarm-lead / single-agent (empty) to user
213  			if t.handler != nil && (event.AgentID == "final_output" || event.AgentID == "swarm-lead" || event.AgentID == "synthesis" || event.AgentID == "") {
214  				delta := event.Delta
215  				if delta == "" {
216  					delta = event.Message
217  				}
218  				if delta != "" {
219  					t.handler.OnStreamDelta(delta)
220  				}
221  			}
222  
223  		// --- Final result ---
224  		case "thread.message.completed", "LLM_OUTPUT":
225  			if event.AgentID == "title_generator" {
226  				break // skip title generation output
227  			}
228  			if event.Response != "" {
229  				finalResult = event.Response
230  			}
231  			// Accumulate usage from LLM_OUTPUT metadata
232  			t.accumulateUsage(ev.Data, &cloudUsage)
233  
234  		// --- HITL: research plan review ---
235  		case "RESEARCH_PLAN_READY":
236  			// Surface the plan to the user, then auto-approve
237  			if t.handler != nil && event.Message != "" {
238  				t.handler.OnCloudPlan("research_plan", event.Message, true)
239  			}
240  			// Auto-approve so the workflow continues (matches Desktop's autoApprove: "on" default)
241  			go t.gw.ApproveReviewPlan(timeoutCtx, resp.WorkflowID)
242  
243  		case "RESEARCH_PLAN_UPDATED":
244  			// Updated plan from feedback — surface to user
245  			if t.handler != nil && event.Message != "" {
246  				t.handler.OnCloudPlan("research_plan_updated", event.Message, false)
247  			}
248  
249  		case "RESEARCH_PLAN_APPROVED":
250  			// Plan approved, execution starting
251  			if t.handler != nil {
252  				t.handler.OnCloudPlan("approved", "", false)
253  			}
254  
255  		case "APPROVAL_REQUESTED":
256  			// General approval request — auto-approve
257  			if t.handler != nil && event.Message != "" {
258  				t.handler.OnStreamDelta("\n[Approval requested: " + event.Message + " — auto-approving]\n")
259  			}
260  			go t.gw.ApproveReviewPlan(timeoutCtx, resp.WorkflowID)
261  
262  		// --- Status events — only surface user-facing milestones ---
263  		case "AGENT_STARTED":
264  			if t.handler != nil {
265  				t.handler.OnCloudAgent(event.AgentID, "started", statusMsg(event.AgentID, event.Message, "Agent working..."))
266  			}
267  		case "AGENT_COMPLETED":
268  			if t.handler != nil {
269  				t.handler.OnCloudAgent(event.AgentID, "completed", statusMsg(event.AgentID, event.Message, "Agent completed"))
270  			}
271  		case "AGENT_THINKING":
272  			if len(event.Message) <= 100 && t.handler != nil {
273  				t.handler.OnCloudAgent("", "thinking", statusMsg("", event.Message, "Thinking..."))
274  			}
275  		case "TOOL_INVOKED", "TOOL_STARTED":
276  			if t.handler != nil {
277  				t.handler.OnCloudAgent("", "tool", statusMsg("", event.Message, "Calling tool..."))
278  			}
279  
280  		case "DATA_PROCESSING":
281  			// Use a semantic label for pre-planning / data prep. Was "synthesis",
282  			// which confusingly implies the final summarization step and also
283  			// collides with Shannon Cloud's real `synthesis` agent ID (line ~213
284  			// filter). `preparing` reflects what DATA_PROCESSING actually is.
285  			if msg := event.Message; msg != "" && len(msg) <= 150 && t.handler != nil {
286  				t.handler.OnCloudAgent("preparing", "processing", msg)
287  			}
288  
289  		// --- Internal plumbing — silently ignore ---
290  		case "WORKFLOW_STARTED", "TOOL_OBSERVATION", "TOOL_COMPLETED",
291  			"DELEGATION", "PROGRESS", "STATUS_UPDATE", "WAITING":
292  			// Drop — these are too verbose for the desktop UI
293  		case "APPROVAL_DECISION":
294  			// no-op
295  
296  		// --- Swarm-specific events ---
297  		case "LEAD_DECISION":
298  			if msg := event.Message; msg != "" && len(msg) <= 150 && t.handler != nil {
299  				t.handler.OnCloudAgent("", "thinking", msg)
300  			}
301  		case "TASKLIST_UPDATED":
302  			if payload := event.Payload; payload != nil {
303  				if tasks, ok := payload["tasks"].([]interface{}); ok && len(tasks) > 0 {
304  					completed := 0
305  					for _, task := range tasks {
306  						if tm, ok := task.(map[string]interface{}); ok {
307  							if tm["status"] == "completed" {
308  								completed++
309  							}
310  						}
311  					}
312  					if t.handler != nil {
313  						t.handler.OnCloudProgress(completed, len(tasks))
314  					}
315  				}
316  			}
317  		case "HITL_RESPONSE":
318  			if event.Message != "" && t.handler != nil {
319  				t.handler.OnCloudAgent("", "thinking", "Lead responding to your input")
320  			}
321  
322  		case "WORKFLOW_COMPLETED":
323  			if finalResult == "" {
324  				finalResult = event.Message
325  			}
326  
327  		case "WORKFLOW_FAILED", "error", "ERROR_OCCURRED":
328  			workflowErr = fmt.Errorf("workflow failed: %s", event.Message)
329  
330  		case "workflow.cancelled":
331  			workflowErr = fmt.Errorf("workflow cancelled")
332  		}
333  	})
334  
335  	// Report accumulated cloud usage
336  	if t.handler != nil && cloudUsage.LLMCalls > 0 {
337  		t.handler.OnUsage(cloudUsage)
338  	}
339  
340  	// Handle timeout
341  	if err != nil && timeoutCtx.Err() == context.DeadlineExceeded {
342  		if finalResult != "" {
343  			return agent.ToolResult{Content: fmt.Sprintf("[cloud_delegate timed out after %s, returning partial result]\n\n%s", t.timeout, finalResult)}, nil
344  		}
345  		return agent.ToolResult{Content: fmt.Sprintf("cloud task timed out after %s with no result", t.timeout), IsError: true}, nil
346  	}
347  
348  	if err != nil {
349  		return agent.ToolResult{Content: fmt.Sprintf("stream error: %v", err), IsError: true}, nil
350  	}
351  
352  	if workflowErr != nil {
353  		return agent.ToolResult{Content: workflowErr.Error(), IsError: true}, nil
354  	}
355  
356  	if finalResult == "" {
357  		return agent.ToolResult{Content: "workflow completed but returned no response", IsError: true}, nil
358  	}
359  
360  	// API fallback: SSE events may be truncated (cloud caps at 10K runes).
361  	// Always attempt to fetch the full result from the REST API.
362  	// Only mark as CloudResult (bypass LLM summarization) when we have
363  	// a confirmed full result — either from SSE or from the API fallback.
364  	fullResultConfirmed := true
365  	taskID := resp.TaskID
366  	if taskID == "" {
367  		taskID = resp.WorkflowID
368  	}
369  	if taskID != "" && t.gw != nil {
370  		apiCtx, apiCancel := context.WithTimeout(ctx, 10*time.Second)
371  		defer apiCancel()
372  		if task, apiErr := t.gw.GetTask(apiCtx, taskID); apiErr == nil && task.Result != "" {
373  			if len(task.Result) > len(finalResult) {
374  				// API returned a longer result — SSE was truncated
375  				finalResult = task.Result
376  			}
377  			// API succeeded: we have the canonical full result
378  		} else {
379  			// API fallback failed — SSE result may be truncated
380  			fullResultConfirmed = false
381  		}
382  	}
383  
384  	// Determine terminal mode: explicit arg takes precedence,
385  	// otherwise research defaults to terminal, swarm/auto default to non-terminal.
386  	terminal := args.WorkflowType == "research"
387  	if args.Terminal != nil {
388  		terminal = *args.Terminal
389  	}
390  
391  	return agent.ToolResult{Content: finalResult, CloudResult: fullResultConfirmed && terminal}, nil
392  }
393  
394  func (t *CloudDelegateTool) RequiresApproval() bool { return true }
395  
396  func (t *CloudDelegateTool) IsReadOnlyCall(string) bool { return false }
397  
398  // accumulateUsage extracts usage metadata from LLM_OUTPUT events and adds it to the running total.
399  func (t *CloudDelegateTool) accumulateUsage(data string, usage *agent.TurnUsage) {
400  	// Shannon Cloud sends usage info in "metadata" field of LLM_OUTPUT events
401  	var meta struct {
402  		Metadata *struct {
403  			InputTokens           int     `json:"input_tokens"`
404  			OutputTokens          int     `json:"output_tokens"`
405  			TokensUsed            int     `json:"tokens_used"`
406  			CostUSD               float64 `json:"cost_usd"`
407  			CacheReadTokens       int     `json:"cache_read_tokens"`
408  			CacheCreationTokens   int     `json:"cache_creation_tokens"`
409  			CacheCreation5mTokens int     `json:"cache_creation_5m_tokens"`
410  			CacheCreation1hTokens int     `json:"cache_creation_1h_tokens"`
411  			ModelUsed             string  `json:"model_used"`
412  		} `json:"metadata"`
413  	}
414  	if err := json.Unmarshal([]byte(data), &meta); err != nil || meta.Metadata == nil {
415  		return
416  	}
417  	usage.Add(client.Usage{
418  		InputTokens:           meta.Metadata.InputTokens,
419  		OutputTokens:          meta.Metadata.OutputTokens,
420  		TotalTokens:           meta.Metadata.TokensUsed,
421  		CostUSD:               meta.Metadata.CostUSD,
422  		CacheReadTokens:       meta.Metadata.CacheReadTokens,
423  		CacheCreationTokens:   meta.Metadata.CacheCreationTokens,
424  		CacheCreation5mTokens: meta.Metadata.CacheCreation5mTokens,
425  		CacheCreation1hTokens: meta.Metadata.CacheCreation1hTokens,
426  	})
427  	if meta.Metadata.ModelUsed != "" {
428  		usage.Model = meta.Metadata.ModelUsed
429  	}
430  }
431  
432  // statusMsg returns message if non-empty, otherwise fallback.
433  // Prepends agentID label if present.
434  func statusMsg(agentID, message, fallback string) string {
435  	msg := message
436  	if msg == "" {
437  		msg = fallback
438  	}
439  	if agentID != "" && agentID != "orchestrator" && agentID != "streaming" {
440  		return "[" + agentID + "] " + msg
441  	}
442  	return msg
443  }
444  
445  // Ensure CloudDelegateTool implements SafeChecker to always require approval.
446  var _ agent.SafeChecker = (*CloudDelegateTool)(nil)
447  
448  func (t *CloudDelegateTool) IsSafeArgs(_ string) bool { return false }