/ components / execd / pkg / runtime / replay_buffer_test.go
replay_buffer_test.go
  1  // Copyright 2025 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package runtime
 16  
 17  import (
 18  	"bytes"
 19  	"sync"
 20  	"testing"
 21  
 22  	"github.com/stretchr/testify/require"
 23  )
 24  
 25  func TestReplayBuffer_BasicWriteRead(t *testing.T) {
 26  	rb := newReplayBuffer()
 27  	rb.write([]byte("hello"))
 28  	rb.write([]byte(" world"))
 29  
 30  	data, off := rb.ReadFrom(0)
 31  	require.Equal(t, int64(0), off)
 32  	require.Equal(t, []byte("hello world"), data)
 33  	require.Equal(t, int64(11), rb.Total())
 34  }
 35  
 36  func TestReplayBuffer_ReadFromMiddle(t *testing.T) {
 37  	rb := newReplayBuffer()
 38  	rb.write([]byte("abcde"))
 39  
 40  	data, off := rb.ReadFrom(2)
 41  	require.Equal(t, int64(2), off)
 42  	require.Equal(t, []byte("cde"), data)
 43  }
 44  
 45  func TestReplayBuffer_ReadFromCurrent(t *testing.T) {
 46  	rb := newReplayBuffer()
 47  	rb.write([]byte("abc"))
 48  
 49  	data, off := rb.ReadFrom(3)
 50  	require.Nil(t, data, "should return nil when caught up")
 51  	require.Equal(t, int64(3), off)
 52  }
 53  
 54  func TestReplayBuffer_CircularEviction(t *testing.T) {
 55  	rb := &replayBuffer{
 56  		buf:  make([]byte, 8),
 57  		size: 8,
 58  	}
 59  
 60  	// Write 6 bytes: "abcdef"
 61  	rb.write([]byte("abcdef"))
 62  	require.Equal(t, int64(6), rb.Total())
 63  
 64  	// Write 4 more bytes: now total=10, oldest=2 (evicted "ab")
 65  	rb.write([]byte("ghij"))
 66  	require.Equal(t, int64(10), rb.Total())
 67  
 68  	// offset 0 should be clamped to oldest=2
 69  	data, off := rb.ReadFrom(0)
 70  	require.Equal(t, int64(2), off)
 71  	require.Equal(t, []byte("cdefghij"), data)
 72  
 73  	// Read from offset 5 (within retained range)
 74  	data, off = rb.ReadFrom(5)
 75  	require.Equal(t, int64(5), off)
 76  	require.Equal(t, []byte("fghij"), data)
 77  }
 78  
 79  func TestReplayBuffer_LargeGap(t *testing.T) {
 80  	rb := &replayBuffer{
 81  		buf:  make([]byte, 4),
 82  		size: 4,
 83  	}
 84  	// Write "ABCDEF" — total=6, oldest=2, retained="CDEF"
 85  	rb.write([]byte("ABCDEF"))
 86  
 87  	// Requesting from 0 should clamp to oldest=2
 88  	data, off := rb.ReadFrom(0)
 89  	require.Equal(t, int64(2), off)
 90  	require.Equal(t, []byte("CDEF"), data)
 91  
 92  	// Requesting from 1 should also clamp to oldest=2
 93  	data, off = rb.ReadFrom(1)
 94  	require.Equal(t, int64(2), off)
 95  	require.Equal(t, []byte("CDEF"), data)
 96  }
 97  
 98  func TestReplayBuffer_Concurrent(t *testing.T) {
 99  	rb := newReplayBuffer()
100  	chunk := bytes.Repeat([]byte("x"), 1024)
101  
102  	var wg sync.WaitGroup
103  	for range 16 {
104  		wg.Add(1)
105  		go func() {
106  			defer wg.Done()
107  			for range 64 {
108  				rb.write(chunk)
109  			}
110  		}()
111  	}
112  	for range 4 {
113  		wg.Add(1)
114  		go func() {
115  			defer wg.Done()
116  			for range 32 {
117  				rb.ReadFrom(0)
118  				rb.Total()
119  			}
120  		}()
121  	}
122  	wg.Wait()
123  
124  	total := rb.Total()
125  	require.Equal(t, int64(16*64*1024), total)
126  }
127  
128  func TestReplayBuffer_ExactlyFull(t *testing.T) {
129  	rb := &replayBuffer{
130  		buf:  make([]byte, 4),
131  		size: 4,
132  	}
133  	rb.write([]byte("1234"))
134  	require.Equal(t, int64(4), rb.Total())
135  
136  	data, off := rb.ReadFrom(0)
137  	require.Equal(t, int64(0), off)
138  	require.Equal(t, []byte("1234"), data)
139  }
140  
141  func TestReplayBuffer_WriteWrapsCorrectly(t *testing.T) {
142  	rb := &replayBuffer{
143  		buf:  make([]byte, 4),
144  		size: 4,
145  	}
146  	// Write "ABCD" — buffer full
147  	rb.write([]byte("ABCD"))
148  	// Write "EF" — evicts "AB", retained "CDEF"
149  	rb.write([]byte("EF"))
150  
151  	data, off := rb.ReadFrom(0)
152  	require.Equal(t, int64(2), off, "offset should be clamped to oldest=2")
153  	require.Equal(t, []byte("CDEF"), data)
154  }