/ go / internal / streaming / agent.go
agent.go
 1  package streaming
 2  
 3  import (
 4  	"context"
 5  	"time"
 6  )
 7  
 8  type EventType string
 9  
10  const (
11  	EventThinking EventType = "thinking"
12  	EventTool     EventType = "tool"
13  	EventResult   EventType = "result"
14  	EventFinal    EventType = "final"
15  	EventError    EventType = "error"
16  )
17  
18  type AgentEvent struct {
19  	Type      EventType `json:"type"`
20  	Content   string    `json:"content"`
21  	Tool      string    `json:"tool,omitempty"`
22  	Timestamp time.Time `json:"timestamp"`
23  	Duration  float64   `json:"duration,omitempty"`
24  }
25  
26  type AgentStream struct {
27  	Events chan AgentEvent
28  	Done   chan struct{}
29  	ctx    context.Context
30  	cancel context.CancelFunc
31  }
32  
33  func NewAgentStream(ctx context.Context) *AgentStream {
34  	streamCtx, cancel := context.WithCancel(ctx)
35  	return &AgentStream{
36  		Events: make(chan AgentEvent, 100),
37  		Done:   make(chan struct{}),
38  		ctx:    streamCtx,
39  		cancel: cancel,
40  	}
41  }
42  
43  func (s *AgentStream) Send(event AgentEvent) {
44  	event.Timestamp = time.Now()
45  	select {
46  	case s.Events <- event:
47  	case <-s.ctx.Done():
48  	}
49  }
50  
51  func (s *AgentStream) Close() {
52  	s.cancel()
53  	close(s.Events)
54  	close(s.Done)
55  }
56  
57  func (s *AgentStream) Thinking(content string) {
58  	s.Send(AgentEvent{Type: EventThinking, Content: content})
59  }
60  
61  func (s *AgentStream) ToolUse(tool, content string) {
62  	s.Send(AgentEvent{Type: EventTool, Tool: tool, Content: content})
63  }
64  
65  func (s *AgentStream) Result(content string) {
66  	s.Send(AgentEvent{Type: EventResult, Content: content})
67  }
68  
69  func (s *AgentStream) Final(content string) {
70  	s.Send(AgentEvent{Type: EventFinal, Content: content})
71  }
72  
73  func (s *AgentStream) Error(err error) {
74  	s.Send(AgentEvent{Type: EventError, Content: err.Error()})
75  }