/ components / egress / pkg / events / events_test.go
events_test.go
  1  // Copyright 2026 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package events
 16  
 17  import (
 18  	"context"
 19  	"encoding/json"
 20  	"io"
 21  	"net/http"
 22  	"net/http/httptest"
 23  	"testing"
 24  	"time"
 25  
 26  	"github.com/alibaba/opensandbox/egress/pkg/constants"
 27  	"github.com/stretchr/testify/require"
 28  )
 29  
 30  type captureSubscriber struct {
 31  	recv chan BlockedEvent
 32  }
 33  
 34  func (c *captureSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
 35  	c.recv <- ev
 36  }
 37  
 38  type blockingSubscriber struct {
 39  	block chan struct{}
 40  }
 41  
 42  func (b *blockingSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
 43  	// Block until the channel is closed to simulate a slow consumer and trigger backpressure.
 44  	<-b.block
 45  	_ = ev
 46  }
 47  
 48  func TestBroadcasterFanout(t *testing.T) {
 49  	ctx, cancel := context.WithCancel(context.Background())
 50  	defer cancel()
 51  
 52  	b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 2})
 53  
 54  	sub1 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
 55  	sub2 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
 56  	b.AddSubscriber(sub1)
 57  	b.AddSubscriber(sub2)
 58  
 59  	ev := BlockedEvent{Hostname: "example.com.", Timestamp: time.Now()}
 60  	b.Publish(ev)
 61  
 62  	select {
 63  	case got := <-sub1.recv:
 64  		require.Equal(t, ev.Hostname, got.Hostname, "sub1 expected hostname")
 65  	case <-time.After(2 * time.Second):
 66  		require.FailNow(t, "sub1 did not receive event")
 67  	}
 68  
 69  	select {
 70  	case got := <-sub2.recv:
 71  		require.Equal(t, ev.Hostname, got.Hostname, "sub2 expected hostname")
 72  	case <-time.After(2 * time.Second):
 73  		require.FailNow(t, "sub2 did not receive event")
 74  	}
 75  
 76  	b.Close()
 77  }
 78  
 79  func TestBroadcasterDropsWhenSubscriberBackedUp(t *testing.T) {
 80  	ctx, cancel := context.WithCancel(context.Background())
 81  	defer cancel()
 82  
 83  	// Small queue; blocking subscriber will hold the first event.
 84  	b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 1})
 85  	block := make(chan struct{})
 86  	sub := &blockingSubscriber{block: block}
 87  	b.AddSubscriber(sub)
 88  
 89  	ev1 := BlockedEvent{Hostname: "first.example", Timestamp: time.Now()}
 90  	ev2 := BlockedEvent{Hostname: "second.example", Timestamp: time.Now()}
 91  
 92  	b.Publish(ev1)
 93  	// This publish should drop because subscriber is blocked and queue size is 1.
 94  	b.Publish(ev2)
 95  
 96  	// Allow subscriber to drain and exit.
 97  	close(block)
 98  
 99  	b.Close()
100  }
101  
102  func TestWebhookSubscriberSendsPayload(t *testing.T) {
103  	var (
104  		gotMethod  string
105  		gotPayload webhookPayload
106  	)
107  	const (
108  		sandboxIDInitial = "sandbox-test"
109  		sandboxIDLater   = "sandbox-updated"
110  	)
111  	t.Setenv(constants.EnvSandboxID, sandboxIDInitial)
112  
113  	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
114  		gotMethod = r.Method
115  		body, _ := io.ReadAll(r.Body)
116  		_ = r.Body.Close()
117  		_ = json.Unmarshal(body, &gotPayload)
118  		w.WriteHeader(http.StatusOK)
119  	}))
120  	defer server.Close()
121  
122  	sub := NewWebhookSubscriber(server.URL)
123  	require.NotNil(t, sub, "webhook subscriber should not be nil")
124  	t.Setenv(constants.EnvSandboxID, sandboxIDLater)
125  
126  	ts := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)
127  	ev := BlockedEvent{Hostname: "Example.com.", Timestamp: ts}
128  	sub.HandleBlocked(context.Background(), ev)
129  
130  	require.Equal(t, http.MethodPost, gotMethod, "expected POST")
131  	require.Equal(t, ev.Hostname, gotPayload.Hostname, "expected hostname")
132  	require.Equal(t, webhookSource, gotPayload.Source, "expected source")
133  	require.Equal(t, sandboxIDInitial, gotPayload.SandboxID, "expected sandboxId captured at init")
134  	require.NotEmpty(t, gotPayload.Timestamp, "expected timestamp to be set")
135  }