events_test.go
1 // Copyright 2026 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package events 16 17 import ( 18 "context" 19 "encoding/json" 20 "io" 21 "net/http" 22 "net/http/httptest" 23 "testing" 24 "time" 25 26 "github.com/alibaba/opensandbox/egress/pkg/constants" 27 "github.com/stretchr/testify/require" 28 ) 29 30 type captureSubscriber struct { 31 recv chan BlockedEvent 32 } 33 34 func (c *captureSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) { 35 c.recv <- ev 36 } 37 38 type blockingSubscriber struct { 39 block chan struct{} 40 } 41 42 func (b *blockingSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) { 43 // Block until the channel is closed to simulate a slow consumer and trigger backpressure. 44 <-b.block 45 _ = ev 46 } 47 48 func TestBroadcasterFanout(t *testing.T) { 49 ctx, cancel := context.WithCancel(context.Background()) 50 defer cancel() 51 52 b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 2}) 53 54 sub1 := &captureSubscriber{recv: make(chan BlockedEvent, 1)} 55 sub2 := &captureSubscriber{recv: make(chan BlockedEvent, 1)} 56 b.AddSubscriber(sub1) 57 b.AddSubscriber(sub2) 58 59 ev := BlockedEvent{Hostname: "example.com.", Timestamp: time.Now()} 60 b.Publish(ev) 61 62 select { 63 case got := <-sub1.recv: 64 require.Equal(t, ev.Hostname, got.Hostname, "sub1 expected hostname") 65 case <-time.After(2 * time.Second): 66 require.FailNow(t, "sub1 did not receive event") 67 } 68 69 select { 70 case got := <-sub2.recv: 71 require.Equal(t, ev.Hostname, got.Hostname, "sub2 expected hostname") 72 case <-time.After(2 * time.Second): 73 require.FailNow(t, "sub2 did not receive event") 74 } 75 76 b.Close() 77 } 78 79 func TestBroadcasterDropsWhenSubscriberBackedUp(t *testing.T) { 80 ctx, cancel := context.WithCancel(context.Background()) 81 defer cancel() 82 83 // Small queue; blocking subscriber will hold the first event. 84 b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 1}) 85 block := make(chan struct{}) 86 sub := &blockingSubscriber{block: block} 87 b.AddSubscriber(sub) 88 89 ev1 := BlockedEvent{Hostname: "first.example", Timestamp: time.Now()} 90 ev2 := BlockedEvent{Hostname: "second.example", Timestamp: time.Now()} 91 92 b.Publish(ev1) 93 // This publish should drop because subscriber is blocked and queue size is 1. 94 b.Publish(ev2) 95 96 // Allow subscriber to drain and exit. 97 close(block) 98 99 b.Close() 100 } 101 102 func TestWebhookSubscriberSendsPayload(t *testing.T) { 103 var ( 104 gotMethod string 105 gotPayload webhookPayload 106 ) 107 const ( 108 sandboxIDInitial = "sandbox-test" 109 sandboxIDLater = "sandbox-updated" 110 ) 111 t.Setenv(constants.EnvSandboxID, sandboxIDInitial) 112 113 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 114 gotMethod = r.Method 115 body, _ := io.ReadAll(r.Body) 116 _ = r.Body.Close() 117 _ = json.Unmarshal(body, &gotPayload) 118 w.WriteHeader(http.StatusOK) 119 })) 120 defer server.Close() 121 122 sub := NewWebhookSubscriber(server.URL) 123 require.NotNil(t, sub, "webhook subscriber should not be nil") 124 t.Setenv(constants.EnvSandboxID, sandboxIDLater) 125 126 ts := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) 127 ev := BlockedEvent{Hostname: "Example.com.", Timestamp: ts} 128 sub.HandleBlocked(context.Background(), ev) 129 130 require.Equal(t, http.MethodPost, gotMethod, "expected POST") 131 require.Equal(t, ev.Hostname, gotPayload.Hostname, "expected hostname") 132 require.Equal(t, webhookSource, gotPayload.Source, "expected source") 133 require.Equal(t, sandboxIDInitial, gotPayload.SandboxID, "expected sandboxId captured at init") 134 require.NotEmpty(t, gotPayload.Timestamp, "expected timestamp to be set") 135 }