/ queue / gc_queue.go
gc_queue.go
  1  package queue
  2  
  3  import (
  4  	"container/list"
  5  	"time"
  6  
  7  	"github.com/lightningnetwork/lnd/ticker"
  8  )
  9  
 10  // GCQueue is garbage collecting queue, which dynamically grows and contracts
 11  // based on load. If the queue has items which have been returned, the queue
 12  // will check every gcInterval amount of time to see if any elements are
 13  // eligible to be released back to the runtime. Elements that have been in the
 14  // queue for a duration of least expiryInterval will be released upon the next
 15  // iteration of the garbage collection, thus the maximum amount of time an
 16  // element remain in the queue is expiryInterval+gcInterval. The gc ticker will
 17  // be disabled after all items in the queue have been taken or released to
 18  // ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
 19  // the steady state.
 20  type GCQueue struct {
 21  	// takeBuffer coordinates the delivery of items taken from the queue
 22  	// such that they are delivered to requesters.
 23  	takeBuffer chan interface{}
 24  
 25  	// returnBuffer coordinates the return of items back into the queue,
 26  	// where they will be kept until retaken or released.
 27  	returnBuffer chan interface{}
 28  
 29  	// newItem is a constructor, used to generate new elements if none are
 30  	// otherwise available for reuse.
 31  	newItem func() interface{}
 32  
 33  	// expiryInterval is the minimum amount of time an element will remain
 34  	// in the queue before being released.
 35  	expiryInterval time.Duration
 36  
 37  	// recycleTicker is a resumable ticker used to trigger a sweep to
 38  	// release elements that have been in the queue longer than
 39  	// expiryInterval.
 40  	recycleTicker ticker.Ticker
 41  
 42  	// freeList maintains a list of gcQueueEntries, sorted in order of
 43  	// increasing time of arrival.
 44  	freeList *list.List
 45  
 46  	quit chan struct{}
 47  }
 48  
 49  // NewGCQueue creates a new garbage collecting queue, which dynamically grows
 50  // and contracts based on load. If the queue has items which have been returned,
 51  // the queue will check every gcInterval amount of time to see if any elements
 52  // are eligible to be released back to the runtime. Elements that have been in
 53  // the queue for a duration of least expiryInterval will be released upon the
 54  // next iteration of the garbage collection, thus the maximum amount of time an
 55  // element remain in the queue is expiryInterval+gcInterval. The gc ticker will
 56  // be disabled after all items in the queue have been taken or released to
 57  // ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
 58  // the steady state. The returnQueueSize parameter is used to size the maximal
 59  // number of items that can be returned without being dropped during large
 60  // bursts in attempts to return items to the GCQUeue.
 61  func NewGCQueue(newItem func() interface{}, returnQueueSize int,
 62  	gcInterval, expiryInterval time.Duration) *GCQueue {
 63  
 64  	q := &GCQueue{
 65  		takeBuffer:     make(chan interface{}),
 66  		returnBuffer:   make(chan interface{}, returnQueueSize),
 67  		expiryInterval: expiryInterval,
 68  		freeList:       list.New(),
 69  		recycleTicker:  ticker.New(gcInterval),
 70  		newItem:        newItem,
 71  		quit:           make(chan struct{}),
 72  	}
 73  
 74  	go q.queueManager()
 75  
 76  	return q
 77  }
 78  
 79  // Take returns either a recycled element from the queue, or creates a new item
 80  // if none are available.
 81  func (q *GCQueue) Take() interface{} {
 82  	select {
 83  	case item := <-q.takeBuffer:
 84  		return item
 85  	case <-time.After(time.Millisecond):
 86  		return q.newItem()
 87  	}
 88  }
 89  
 90  // Return adds the returned item to freelist if the queue's returnBuffer has
 91  // available capacity. Under load, items may be dropped to ensure this method
 92  // does not block.
 93  func (q *GCQueue) Return(item interface{}) {
 94  	select {
 95  	case q.returnBuffer <- item:
 96  	default:
 97  	}
 98  }
 99  
100  // gcQueueEntry is a tuple containing an interface{} and the time at which the
101  // item was added to the queue. The recorded time is used to determine when the
102  // entry becomes stale, and can be released if it has not already been taken.
103  type gcQueueEntry struct {
104  	item interface{}
105  	time time.Time
106  }
107  
108  // queueManager maintains the free list of elements by popping the head of the
109  // queue when items are needed, and appending them to the end of the queue when
110  // items are returned. The queueManager will periodically attempt to release any
111  // items that have been in the queue longer than the expiry interval.
112  //
113  // NOTE: This method SHOULD be run as a goroutine.
114  func (q *GCQueue) queueManager() {
115  	for {
116  		// If the pool is empty, initialize a buffer pool to serve a
117  		// client that takes a buffer immediately. If this happens, this
118  		// is either:
119  		//   1) the first iteration of the loop,
120  		//   2) after all entries were garbage collected, or
121  		//   3) the freelist was emptied after the last entry was taken.
122  		//
123  		// In all of these cases, it is safe to pause the recycle ticker
124  		// since it will be resumed as soon an entry is returned to the
125  		// freelist.
126  		if q.freeList.Len() == 0 {
127  			q.freeList.PushBack(gcQueueEntry{
128  				item: q.newItem(),
129  				time: time.Now(),
130  			})
131  
132  			q.recycleTicker.Pause()
133  		}
134  
135  		next := q.freeList.Front()
136  
137  		select {
138  
139  		// If a client requests a new write buffer, deliver the buffer
140  		// at the head of the freelist to them.
141  		case q.takeBuffer <- next.Value.(gcQueueEntry).item:
142  			q.freeList.Remove(next)
143  
144  		// If a client is returning a write buffer, add it to the free
145  		// list and resume the recycle ticker so that it can be cleared
146  		// if the entries are not quickly reused.
147  		case item := <-q.returnBuffer:
148  			// Add the returned buffer to the freelist, recording
149  			// the current time so we can determine when the entry
150  			// expires.
151  			q.freeList.PushBack(gcQueueEntry{
152  				item: item,
153  				time: time.Now(),
154  			})
155  
156  			// Adding the buffer implies that we now have a non-zero
157  			// number of elements in the free list. Resume the
158  			// recycle ticker to cleanup any entries that go unused.
159  			q.recycleTicker.Resume()
160  
161  		// If the recycle ticker fires, we will aggressively release any
162  		// write buffers in the freelist for which the expiryInterval
163  		// has elapsed since their insertion. If after doing so, no
164  		// elements remain, we will pause the recycle ticker.
165  		case <-q.recycleTicker.Ticks():
166  			// Since the insert time of all entries will be
167  			// monotonically increasing, iterate over elements and
168  			// remove all entries that have expired.
169  			var next *list.Element
170  			for e := q.freeList.Front(); e != nil; e = next {
171  				// Cache the next element, since it will become
172  				// unreachable from the current element if it is
173  				// removed.
174  				next = e.Next()
175  				entry := e.Value.(gcQueueEntry)
176  
177  				// Use now - insertTime <= expiryInterval to
178  				// determine if this entry has not expired.
179  				if time.Since(entry.time) <= q.expiryInterval {
180  					// If this entry hasn't expired, then
181  					// all entries that follow will still be
182  					// valid.
183  					break
184  				}
185  
186  				// Otherwise, remove the expired entry from the
187  				// linked-list.
188  				q.freeList.Remove(e)
189  				entry.item = nil
190  				e.Value = nil
191  			}
192  		}
193  	}
194  }