streaming_test.go
1 package test 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "testing" 8 "time" 9 10 "github.com/TransformerOS/kamaji-go/internal/streaming" 11 ) 12 13 func TestAgentStreamBasicEvents(t *testing.T) { 14 ctx := context.Background() 15 stream := streaming.NewAgentStream(ctx) 16 defer stream.Close() 17 18 // Send test events 19 go func() { 20 stream.Thinking("Processing request...") 21 stream.ToolUse("file_read", "Reading config.yaml") 22 stream.Result("Config loaded successfully") 23 stream.Final("Task completed") 24 }() 25 26 // Collect events 27 var events []streaming.AgentEvent 28 timeout := time.After(1 * time.Second) 29 30 for len(events) < 4 { 31 select { 32 case event := <-stream.Events: 33 events = append(events, event) 34 case <-timeout: 35 t.Fatal("Timeout waiting for events") 36 case <-stream.Done: 37 break 38 } 39 } 40 41 // Verify events 42 if len(events) != 4 { 43 t.Errorf("Expected 4 events, got %d", len(events)) 44 } 45 46 expectedTypes := []streaming.EventType{ 47 streaming.EventThinking, 48 streaming.EventTool, 49 streaming.EventResult, 50 streaming.EventFinal, 51 } 52 53 for i, event := range events { 54 if event.Type != expectedTypes[i] { 55 t.Errorf("Event %d: expected type %s, got %s", i, expectedTypes[i], event.Type) 56 } 57 58 if event.Timestamp.IsZero() { 59 t.Errorf("Event %d: timestamp not set", i) 60 } 61 } 62 63 // Check specific event content 64 if events[1].Tool != "file_read" { 65 t.Errorf("Expected tool 'file_read', got '%s'", events[1].Tool) 66 } 67 } 68 69 func TestAgentStreamErrorHandling(t *testing.T) { 70 ctx := context.Background() 71 stream := streaming.NewAgentStream(ctx) 72 defer stream.Close() 73 74 testError := "test error message" 75 76 go func() { 77 stream.Error(errors.New(testError)) 78 }() 79 80 select { 81 case event := <-stream.Events: 82 if event.Type != streaming.EventError { 83 t.Errorf("Expected error event, got %s", event.Type) 84 } 85 if event.Content != testError { 86 t.Errorf("Expected error content '%s', got '%s'", testError, event.Content) 87 } 88 case <-time.After(1 * time.Second): 89 t.Fatal("Timeout waiting for error event") 90 } 91 } 92 93 func TestAgentStreamCancellation(t *testing.T) { 94 ctx, cancel := context.WithCancel(context.Background()) 95 stream := streaming.NewAgentStream(ctx) 96 97 // Cancel context 98 cancel() 99 100 // Try to send event after cancellation 101 stream.Thinking("This should not be received") 102 103 // Should not receive any events 104 select { 105 case <-stream.Events: 106 t.Error("Should not receive events after cancellation") 107 case <-time.After(100 * time.Millisecond): 108 // Expected - no events received 109 } 110 111 stream.Close() 112 } 113 114 func TestAgentStreamBuffering(t *testing.T) { 115 ctx := context.Background() 116 stream := streaming.NewAgentStream(ctx) 117 defer stream.Close() 118 119 // Send many events quickly 120 eventCount := 50 121 go func() { 122 for i := 0; i < eventCount; i++ { 123 stream.Thinking(fmt.Sprintf("Event %d", i)) 124 } 125 }() 126 127 // Collect all events 128 var received []streaming.AgentEvent 129 timeout := time.After(2 * time.Second) 130 131 for len(received) < eventCount { 132 select { 133 case event := <-stream.Events: 134 received = append(received, event) 135 case <-timeout: 136 break 137 } 138 } 139 140 if len(received) != eventCount { 141 t.Errorf("Expected %d events, got %d", eventCount, len(received)) 142 } 143 }