replay_buffer_test.go
1 // Copyright 2025 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package runtime 16 17 import ( 18 "bytes" 19 "sync" 20 "testing" 21 22 "github.com/stretchr/testify/require" 23 ) 24 25 func TestReplayBuffer_BasicWriteRead(t *testing.T) { 26 rb := newReplayBuffer() 27 rb.write([]byte("hello")) 28 rb.write([]byte(" world")) 29 30 data, off := rb.ReadFrom(0) 31 require.Equal(t, int64(0), off) 32 require.Equal(t, []byte("hello world"), data) 33 require.Equal(t, int64(11), rb.Total()) 34 } 35 36 func TestReplayBuffer_ReadFromMiddle(t *testing.T) { 37 rb := newReplayBuffer() 38 rb.write([]byte("abcde")) 39 40 data, off := rb.ReadFrom(2) 41 require.Equal(t, int64(2), off) 42 require.Equal(t, []byte("cde"), data) 43 } 44 45 func TestReplayBuffer_ReadFromCurrent(t *testing.T) { 46 rb := newReplayBuffer() 47 rb.write([]byte("abc")) 48 49 data, off := rb.ReadFrom(3) 50 require.Nil(t, data, "should return nil when caught up") 51 require.Equal(t, int64(3), off) 52 } 53 54 func TestReplayBuffer_CircularEviction(t *testing.T) { 55 rb := &replayBuffer{ 56 buf: make([]byte, 8), 57 size: 8, 58 } 59 60 // Write 6 bytes: "abcdef" 61 rb.write([]byte("abcdef")) 62 require.Equal(t, int64(6), rb.Total()) 63 64 // Write 4 more bytes: now total=10, oldest=2 (evicted "ab") 65 rb.write([]byte("ghij")) 66 require.Equal(t, int64(10), rb.Total()) 67 68 // offset 0 should be clamped to oldest=2 69 data, off := rb.ReadFrom(0) 70 require.Equal(t, int64(2), off) 71 require.Equal(t, []byte("cdefghij"), data) 72 73 // Read from offset 5 (within retained range) 74 data, off = rb.ReadFrom(5) 75 require.Equal(t, int64(5), off) 76 require.Equal(t, []byte("fghij"), data) 77 } 78 79 func TestReplayBuffer_LargeGap(t *testing.T) { 80 rb := &replayBuffer{ 81 buf: make([]byte, 4), 82 size: 4, 83 } 84 // Write "ABCDEF" — total=6, oldest=2, retained="CDEF" 85 rb.write([]byte("ABCDEF")) 86 87 // Requesting from 0 should clamp to oldest=2 88 data, off := rb.ReadFrom(0) 89 require.Equal(t, int64(2), off) 90 require.Equal(t, []byte("CDEF"), data) 91 92 // Requesting from 1 should also clamp to oldest=2 93 data, off = rb.ReadFrom(1) 94 require.Equal(t, int64(2), off) 95 require.Equal(t, []byte("CDEF"), data) 96 } 97 98 func TestReplayBuffer_Concurrent(t *testing.T) { 99 rb := newReplayBuffer() 100 chunk := bytes.Repeat([]byte("x"), 1024) 101 102 var wg sync.WaitGroup 103 for range 16 { 104 wg.Add(1) 105 go func() { 106 defer wg.Done() 107 for range 64 { 108 rb.write(chunk) 109 } 110 }() 111 } 112 for range 4 { 113 wg.Add(1) 114 go func() { 115 defer wg.Done() 116 for range 32 { 117 rb.ReadFrom(0) 118 rb.Total() 119 } 120 }() 121 } 122 wg.Wait() 123 124 total := rb.Total() 125 require.Equal(t, int64(16*64*1024), total) 126 } 127 128 func TestReplayBuffer_ExactlyFull(t *testing.T) { 129 rb := &replayBuffer{ 130 buf: make([]byte, 4), 131 size: 4, 132 } 133 rb.write([]byte("1234")) 134 require.Equal(t, int64(4), rb.Total()) 135 136 data, off := rb.ReadFrom(0) 137 require.Equal(t, int64(0), off) 138 require.Equal(t, []byte("1234"), data) 139 } 140 141 func TestReplayBuffer_WriteWrapsCorrectly(t *testing.T) { 142 rb := &replayBuffer{ 143 buf: make([]byte, 4), 144 size: 4, 145 } 146 // Write "ABCD" — buffer full 147 rb.write([]byte("ABCD")) 148 // Write "EF" — evicts "AB", retained "CDEF" 149 rb.write([]byte("EF")) 150 151 data, off := rb.ReadFrom(0) 152 require.Equal(t, int64(2), off, "offset should be clamped to oldest=2") 153 require.Equal(t, []byte("CDEF"), data) 154 }