/ go / test / streaming_test.go
streaming_test.go
  1  package test
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  	"testing"
  8  	"time"
  9  
 10  	"github.com/TransformerOS/kamaji-go/internal/streaming"
 11  )
 12  
 13  func TestAgentStreamBasicEvents(t *testing.T) {
 14  	ctx := context.Background()
 15  	stream := streaming.NewAgentStream(ctx)
 16  	defer stream.Close()
 17  	
 18  	// Send test events
 19  	go func() {
 20  		stream.Thinking("Processing request...")
 21  		stream.ToolUse("file_read", "Reading config.yaml")
 22  		stream.Result("Config loaded successfully")
 23  		stream.Final("Task completed")
 24  	}()
 25  	
 26  	// Collect events
 27  	var events []streaming.AgentEvent
 28  	timeout := time.After(1 * time.Second)
 29  	
 30  	for len(events) < 4 {
 31  		select {
 32  		case event := <-stream.Events:
 33  			events = append(events, event)
 34  		case <-timeout:
 35  			t.Fatal("Timeout waiting for events")
 36  		case <-stream.Done:
 37  			break
 38  		}
 39  	}
 40  	
 41  	// Verify events
 42  	if len(events) != 4 {
 43  		t.Errorf("Expected 4 events, got %d", len(events))
 44  	}
 45  	
 46  	expectedTypes := []streaming.EventType{
 47  		streaming.EventThinking,
 48  		streaming.EventTool,
 49  		streaming.EventResult,
 50  		streaming.EventFinal,
 51  	}
 52  	
 53  	for i, event := range events {
 54  		if event.Type != expectedTypes[i] {
 55  			t.Errorf("Event %d: expected type %s, got %s", i, expectedTypes[i], event.Type)
 56  		}
 57  		
 58  		if event.Timestamp.IsZero() {
 59  			t.Errorf("Event %d: timestamp not set", i)
 60  		}
 61  	}
 62  	
 63  	// Check specific event content
 64  	if events[1].Tool != "file_read" {
 65  		t.Errorf("Expected tool 'file_read', got '%s'", events[1].Tool)
 66  	}
 67  }
 68  
 69  func TestAgentStreamErrorHandling(t *testing.T) {
 70  	ctx := context.Background()
 71  	stream := streaming.NewAgentStream(ctx)
 72  	defer stream.Close()
 73  	
 74  	testError := "test error message"
 75  	
 76  	go func() {
 77  		stream.Error(errors.New(testError))
 78  	}()
 79  	
 80  	select {
 81  	case event := <-stream.Events:
 82  		if event.Type != streaming.EventError {
 83  			t.Errorf("Expected error event, got %s", event.Type)
 84  		}
 85  		if event.Content != testError {
 86  			t.Errorf("Expected error content '%s', got '%s'", testError, event.Content)
 87  		}
 88  	case <-time.After(1 * time.Second):
 89  		t.Fatal("Timeout waiting for error event")
 90  	}
 91  }
 92  
 93  func TestAgentStreamCancellation(t *testing.T) {
 94  	ctx, cancel := context.WithCancel(context.Background())
 95  	stream := streaming.NewAgentStream(ctx)
 96  	
 97  	// Cancel context
 98  	cancel()
 99  	
100  	// Try to send event after cancellation
101  	stream.Thinking("This should not be received")
102  	
103  	// Should not receive any events
104  	select {
105  	case <-stream.Events:
106  		t.Error("Should not receive events after cancellation")
107  	case <-time.After(100 * time.Millisecond):
108  		// Expected - no events received
109  	}
110  	
111  	stream.Close()
112  }
113  
114  func TestAgentStreamBuffering(t *testing.T) {
115  	ctx := context.Background()
116  	stream := streaming.NewAgentStream(ctx)
117  	defer stream.Close()
118  	
119  	// Send many events quickly
120  	eventCount := 50
121  	go func() {
122  		for i := 0; i < eventCount; i++ {
123  			stream.Thinking(fmt.Sprintf("Event %d", i))
124  		}
125  	}()
126  	
127  	// Collect all events
128  	var received []streaming.AgentEvent
129  	timeout := time.After(2 * time.Second)
130  	
131  	for len(received) < eventCount {
132  		select {
133  		case event := <-stream.Events:
134  			received = append(received, event)
135  		case <-timeout:
136  			break
137  		}
138  	}
139  	
140  	if len(received) != eventCount {
141  		t.Errorf("Expected %d events, got %d", eventCount, len(received))
142  	}
143  }