replay_buffer.go
1 // Copyright 2025 Alibaba Group Holding Ltd. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package runtime 16 17 import "sync" 18 19 const replayBufferSize = 1 << 20 // 1 MiB 20 21 // replayBuffer is a fixed-capacity circular byte buffer with a monotonic write counter. 22 // 23 // Invariant: head == total % size (where size is the buffer capacity). 24 // This means the byte at absolute offset o (o >= oldest) is stored at buf[o % size]. 25 type replayBuffer struct { 26 mu sync.Mutex 27 buf []byte 28 size int // == replayBufferSize (or smaller in tests) 29 head int // next write position; always == total % size 30 total int64 // monotonic byte counter (total bytes ever written) 31 } 32 33 func newReplayBuffer() *replayBuffer { 34 return &replayBuffer{ 35 buf: make([]byte, replayBufferSize), 36 size: replayBufferSize, 37 } 38 } 39 40 // write appends p to the buffer, evicting the oldest bytes when the buffer is full. 41 // The invariant head == total % size is preserved on every call. 42 func (r *replayBuffer) write(p []byte) { 43 if len(p) == 0 { 44 return 45 } 46 r.mu.Lock() 47 defer r.mu.Unlock() 48 49 // When p is larger than the whole buffer we only keep the last size bytes, 50 // but we still advance total by the full len(p) to maintain the invariant. 51 if len(p) >= r.size { 52 skip := len(p) - r.size 53 r.total += int64(skip) 54 r.head = (r.head + skip) % r.size 55 p = p[skip:] 56 // len(p) == r.size now 57 } 58 59 // len(p) <= r.size — split into at most two contiguous copies. 60 n := copy(r.buf[r.head:], p) 61 r.head = (r.head + n) % r.size 62 r.total += int64(n) 63 64 if n < len(p) { 65 rest := p[n:] 66 copy(r.buf[r.head:], rest) 67 r.head = (r.head + len(rest)) % r.size 68 r.total += int64(len(rest)) 69 } 70 } 71 72 // Total returns the total number of bytes ever written to the buffer. 73 func (r *replayBuffer) Total() int64 { 74 r.mu.Lock() 75 defer r.mu.Unlock() 76 return r.total 77 } 78 79 // ReadFrom returns a snapshot of all bytes starting from the given absolute byte offset. 80 // 81 // Returns (data, actualOffset) where actualOffset is the offset of the first returned byte: 82 // - If offset >= total, returns (nil, total) — caller is already caught up. 83 // - If offset < oldest retained byte, clamps to oldest (bytes were evicted). 84 // - Otherwise returns bytes [offset, total). 85 func (r *replayBuffer) ReadFrom(offset int64) ([]byte, int64) { 86 r.mu.Lock() 87 defer r.mu.Unlock() 88 89 if offset >= r.total { 90 return nil, r.total 91 } 92 93 // Oldest retained absolute offset. 94 oldest := r.total - int64(r.size) 95 if oldest < 0 { 96 oldest = 0 97 } 98 if offset < oldest { 99 offset = oldest 100 } 101 102 count := int(r.total - offset) 103 if count <= 0 { 104 return nil, r.total 105 } 106 107 // Thanks to the invariant head == total % size, the byte at absolute offset o 108 // is stored at buf[o % size]. This holds whether or not the buffer has wrapped. 109 start := int(offset % int64(r.size)) 110 end := start + count 111 112 result := make([]byte, count) 113 if end <= r.size { 114 copy(result, r.buf[start:end]) 115 } else { 116 n := copy(result, r.buf[start:]) 117 copy(result[n:], r.buf[:count-n]) 118 } 119 return result, offset 120 }