/ pool / read.go
read.go
 1  package pool
 2  
 3  import (
 4  	"time"
 5  
 6  	"github.com/lightningnetwork/lnd/buffer"
 7  )
 8  
 9  // Read is a worker pool specifically designed for sharing access to buffer.Read
10  // objects amongst a set of worker goroutines. This enables an application to
11  // limit the total number of buffer.Read objects allocated at any given time.
12  type Read struct {
13  	workerPool *Worker
14  	bufferPool *ReadBuffer
15  }
16  
17  // NewRead creates a new Read pool, using an underlying ReadBuffer pool to
18  // recycle buffer.Read objects across the lifetime of the Read pool's workers.
19  func NewRead(readBufferPool *ReadBuffer, numWorkers int,
20  	workerTimeout time.Duration) *Read {
21  
22  	r := &Read{
23  		bufferPool: readBufferPool,
24  	}
25  	r.workerPool = NewWorker(&WorkerConfig{
26  		NewWorkerState: r.newWorkerState,
27  		NumWorkers:     numWorkers,
28  		WorkerTimeout:  workerTimeout,
29  	})
30  
31  	return r
32  }
33  
34  // Start safely spins up the Read pool.
35  func (r *Read) Start() error {
36  	return r.workerPool.Start()
37  }
38  
39  // Stop safely shuts down the Read pool.
40  func (r *Read) Stop() error {
41  	return r.workerPool.Stop()
42  }
43  
44  // Submit accepts a function closure that provides access to the fresh
45  // buffer.Read object. The function's execution will be allocated to one of the
46  // underlying Worker pool's goroutines.
47  func (r *Read) Submit(inner func(*buffer.Read) error) error {
48  	return r.workerPool.Submit(func(s WorkerState) error {
49  		state := s.(*readWorkerState)
50  		return inner(state.readBuf)
51  	})
52  }
53  
54  // readWorkerState is the per-goroutine state maintained by a Read pool's
55  // goroutines.
56  type readWorkerState struct {
57  	// bufferPool is the pool to which the readBuf will be returned when the
58  	// goroutine exits.
59  	bufferPool *ReadBuffer
60  
61  	// readBuf is a buffer taken from the bufferPool on initialization,
62  	// which will be cleaned and provided to any tasks that the goroutine
63  	// processes before exiting.
64  	readBuf *buffer.Read
65  }
66  
67  // newWorkerState initializes a new readWorkerState, which will be called
68  // whenever a new goroutine is allocated to begin processing read tasks.
69  func (r *Read) newWorkerState() WorkerState {
70  	return &readWorkerState{
71  		bufferPool: r.bufferPool,
72  		readBuf:    r.bufferPool.Take(),
73  	}
74  }
75  
76  // Cleanup returns the readBuf to the underlying buffer pool, and removes the
77  // goroutine's reference to the readBuf.
78  func (r *readWorkerState) Cleanup() {
79  	r.bufferPool.Return(r.readBuf)
80  	r.readBuf = nil
81  }
82  
83  // Reset recycles the readBuf to make it ready for any subsequent tasks the
84  // goroutine may process.
85  func (r *readWorkerState) Reset() {
86  	r.readBuf.Recycle()
87  }