/ go / internal / agents / executor.go
executor.go
  1  package agents
  2  
  3  import (
  4  	"context"
  5  	"fmt"
  6  	"strings"
  7  	"sync"
  8  	"time"
  9  
 10  	"github.com/TransformerOS/kamaji-go/internal/tools"
 11  	"github.com/TransformerOS/kamaji-go/internal/types"
 12  )
 13  
 14  // BasicAgentExecutor implements the Agent interface
 15  type BasicAgentExecutor struct {
 16  	llm     types.LLMProvider
 17  	tools   []tools.Tool
 18  	verbose bool
 19  	memory  types.Memory
 20  	config  types.AgentConfig
 21  	status  types.AgentStatus
 22  	mutex   sync.RWMutex
 23  }
 24  
 25  // NewBasicAgentExecutor creates a new basic agent executor
 26  func NewBasicAgentExecutor(llm types.LLMProvider, agentTools []tools.Tool, verbose bool) *BasicAgentExecutor {
 27  	return &BasicAgentExecutor{
 28  		llm:     llm,
 29  		tools:   agentTools,
 30  		verbose: verbose,
 31  		config: types.AgentConfig{
 32  			MaxIterations: 10,
 33  			Timeout:       5 * time.Minute,
 34  			EnableMemory:  true,
 35  			Verbose:       verbose,
 36  		},
 37  		status: types.AgentStatus{
 38  			ID:         fmt.Sprintf("agent-%d", time.Now().Unix()),
 39  			Status:     "ready",
 40  			LastActive: time.Now(),
 41  			TasksRun:   0,
 42  			ErrorCount: 0,
 43  		},
 44  	}
 45  }
 46  
 47  // Execute runs a task using the agent
 48  func (a *BasicAgentExecutor) Execute(ctx context.Context, task string) (string, error) {
 49  	a.mutex.Lock()
 50  	defer a.mutex.Unlock()
 51  
 52  	a.status.LastActive = time.Now()
 53  	a.status.Status = "running"
 54  	a.status.TasksRun++
 55  
 56  	if a.verbose {
 57  		fmt.Printf("Agent executing task: %s\n", task)
 58  	}
 59  
 60  	// Create tool descriptions for the prompt
 61  	toolDescriptions := a.getToolDescriptions()
 62  	
 63  	// Build the agent prompt
 64  	prompt := a.buildAgentPrompt(task, toolDescriptions)
 65  	
 66  	// Execute with timeout
 67  	ctx, cancel := context.WithTimeout(ctx, a.config.Timeout)
 68  	defer cancel()
 69  
 70  	result, err := a.executeWithRetries(ctx, prompt)
 71  	
 72  	if err != nil {
 73  		a.status.ErrorCount++
 74  		a.status.Status = "error"
 75  		return "", fmt.Errorf("agent execution failed: %w", err)
 76  	}
 77  
 78  	a.status.Status = "ready"
 79  	return result, nil
 80  }
 81  
 82  // ExecuteStream runs a task with streaming output
 83  func (a *BasicAgentExecutor) ExecuteStream(ctx context.Context, task string) (<-chan types.StreamChunk, error) {
 84  	resultChan := make(chan types.StreamChunk, 10)
 85  	
 86  	go func() {
 87  		defer close(resultChan)
 88  		
 89  		result, err := a.Execute(ctx, task)
 90  		if err != nil {
 91  			resultChan <- types.StreamChunk{
 92  				Content: "",
 93  				Done:    true,
 94  				Error:   err,
 95  			}
 96  			return
 97  		}
 98  		
 99  		// Send result in chunks
100  		words := strings.Fields(result)
101  		for i, word := range words {
102  			select {
103  			case <-ctx.Done():
104  				return
105  			case resultChan <- types.StreamChunk{
106  				Content: word + " ",
107  				Done:    i == len(words)-1,
108  				Error:   nil,
109  			}:
110  			}
111  			time.Sleep(50 * time.Millisecond) // Simulate streaming
112  		}
113  	}()
114  	
115  	return resultChan, nil
116  }
117  
118  // GetTools returns the agent's tools
119  func (a *BasicAgentExecutor) GetTools() []tools.Tool {
120  	return a.tools
121  }
122  
123  // GetStatus returns the agent's current status
124  func (a *BasicAgentExecutor) GetStatus() types.AgentStatus {
125  	a.mutex.RLock()
126  	defer a.mutex.RUnlock()
127  	return a.status
128  }
129  
130  // SetMemory sets the agent's memory
131  func (a *BasicAgentExecutor) SetMemory(memory types.Memory) error {
132  	a.mutex.Lock()
133  	defer a.mutex.Unlock()
134  	a.memory = memory
135  	return nil
136  }
137  
138  // Close cleans up the agent
139  func (a *BasicAgentExecutor) Close() error {
140  	a.mutex.Lock()
141  	defer a.mutex.Unlock()
142  	a.status.Status = "closed"
143  	return nil
144  }
145  
146  // executeWithRetries executes the LLM call with retries
147  func (a *BasicAgentExecutor) executeWithRetries(ctx context.Context, prompt string) (string, error) {
148  	var lastErr error
149  	
150  	maxRetries := 3
151  	for i := 0; i < maxRetries; i++ {
152  		result, err := a.llm.Call(ctx, prompt)
153  		if err == nil {
154  			return result, nil
155  		}
156  		lastErr = err
157  		
158  		if a.verbose {
159  			fmt.Printf("Retry %d/%d failed: %v\n", i+1, maxRetries, err)
160  		}
161  		
162  		// Wait before retry
163  		select {
164  		case <-ctx.Done():
165  			return "", ctx.Err()
166  		case <-time.After(time.Duration(i+1) * time.Second):
167  		}
168  	}
169  	
170  	return "", fmt.Errorf("all retries failed, last error: %w", lastErr)
171  }
172  
173  // buildAgentPrompt creates the agent prompt with tools
174  func (a *BasicAgentExecutor) buildAgentPrompt(task string, toolDescriptions string) string {
175  	return fmt.Sprintf(`You are a helpful AI assistant with access to tools. Your task is to help the user accomplish their goal.
176  
177  Available tools:
178  %s
179  
180  Task: %s
181  
182  Please think step by step and use the appropriate tools if needed. If you need to use a tool, format your response as:
183  TOOL_CALL: tool_name(arguments)
184  
185  Otherwise, provide a direct response.`, toolDescriptions, task)
186  }
187  
188  // getToolDescriptions returns formatted tool descriptions
189  func (a *BasicAgentExecutor) getToolDescriptions() string {
190  	if len(a.tools) == 0 {
191  		return "No tools available."
192  	}
193  	
194  	var descriptions []string
195  	for _, tool := range a.tools {
196  		descriptions = append(descriptions, fmt.Sprintf("- %s: %s", tool.Name(), tool.Description()))
197  	}
198  	
199  	return strings.Join(descriptions, "\n")
200  }