/ components / execd / pkg / runtime / replay_buffer.go
replay_buffer.go
  1  // Copyright 2025 Alibaba Group Holding Ltd.
  2  //
  3  // Licensed under the Apache License, Version 2.0 (the "License");
  4  // you may not use this file except in compliance with the License.
  5  // You may obtain a copy of the License at
  6  //
  7  //     http://www.apache.org/licenses/LICENSE-2.0
  8  //
  9  // Unless required by applicable law or agreed to in writing, software
 10  // distributed under the License is distributed on an "AS IS" BASIS,
 11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  // See the License for the specific language governing permissions and
 13  // limitations under the License.
 14  
 15  package runtime
 16  
 17  import "sync"
 18  
 19  const replayBufferSize = 1 << 20 // 1 MiB
 20  
 21  // replayBuffer is a fixed-capacity circular byte buffer with a monotonic write counter.
 22  //
 23  // Invariant: head == total % size (where size is the buffer capacity).
 24  // This means the byte at absolute offset o (o >= oldest) is stored at buf[o % size].
 25  type replayBuffer struct {
 26  	mu    sync.Mutex
 27  	buf   []byte
 28  	size  int   // == replayBufferSize (or smaller in tests)
 29  	head  int   // next write position; always == total % size
 30  	total int64 // monotonic byte counter (total bytes ever written)
 31  }
 32  
 33  func newReplayBuffer() *replayBuffer {
 34  	return &replayBuffer{
 35  		buf:  make([]byte, replayBufferSize),
 36  		size: replayBufferSize,
 37  	}
 38  }
 39  
 40  // write appends p to the buffer, evicting the oldest bytes when the buffer is full.
 41  // The invariant head == total % size is preserved on every call.
 42  func (r *replayBuffer) write(p []byte) {
 43  	if len(p) == 0 {
 44  		return
 45  	}
 46  	r.mu.Lock()
 47  	defer r.mu.Unlock()
 48  
 49  	// When p is larger than the whole buffer we only keep the last size bytes,
 50  	// but we still advance total by the full len(p) to maintain the invariant.
 51  	if len(p) >= r.size {
 52  		skip := len(p) - r.size
 53  		r.total += int64(skip)
 54  		r.head = (r.head + skip) % r.size
 55  		p = p[skip:]
 56  		// len(p) == r.size now
 57  	}
 58  
 59  	// len(p) <= r.size — split into at most two contiguous copies.
 60  	n := copy(r.buf[r.head:], p)
 61  	r.head = (r.head + n) % r.size
 62  	r.total += int64(n)
 63  
 64  	if n < len(p) {
 65  		rest := p[n:]
 66  		copy(r.buf[r.head:], rest)
 67  		r.head = (r.head + len(rest)) % r.size
 68  		r.total += int64(len(rest))
 69  	}
 70  }
 71  
 72  // Total returns the total number of bytes ever written to the buffer.
 73  func (r *replayBuffer) Total() int64 {
 74  	r.mu.Lock()
 75  	defer r.mu.Unlock()
 76  	return r.total
 77  }
 78  
 79  // ReadFrom returns a snapshot of all bytes starting from the given absolute byte offset.
 80  //
 81  // Returns (data, actualOffset) where actualOffset is the offset of the first returned byte:
 82  //   - If offset >= total, returns (nil, total) — caller is already caught up.
 83  //   - If offset < oldest retained byte, clamps to oldest (bytes were evicted).
 84  //   - Otherwise returns bytes [offset, total).
 85  func (r *replayBuffer) ReadFrom(offset int64) ([]byte, int64) {
 86  	r.mu.Lock()
 87  	defer r.mu.Unlock()
 88  
 89  	if offset >= r.total {
 90  		return nil, r.total
 91  	}
 92  
 93  	// Oldest retained absolute offset.
 94  	oldest := r.total - int64(r.size)
 95  	if oldest < 0 {
 96  		oldest = 0
 97  	}
 98  	if offset < oldest {
 99  		offset = oldest
100  	}
101  
102  	count := int(r.total - offset)
103  	if count <= 0 {
104  		return nil, r.total
105  	}
106  
107  	// Thanks to the invariant head == total % size, the byte at absolute offset o
108  	// is stored at buf[o % size].  This holds whether or not the buffer has wrapped.
109  	start := int(offset % int64(r.size))
110  	end := start + count
111  
112  	result := make([]byte, count)
113  	if end <= r.size {
114  		copy(result, r.buf[start:end])
115  	} else {
116  		n := copy(result, r.buf[start:])
117  		copy(result[n:], r.buf[:count-n])
118  	}
119  	return result, offset
120  }