/ queue / queue.go
queue.go
  1  package queue
  2  
  3  import (
  4  	"container/list"
  5  	"sync"
  6  )
  7  
  8  // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
  9  // Clients interact with the queue by pushing items into the in channel and
 10  // popping items from the out channel. There is a goroutine that manages moving
 11  // items from the in channel to the out channel in the correct order that must
 12  // be started by calling Start().
 13  type ConcurrentQueue struct {
 14  	started sync.Once
 15  	stopped sync.Once
 16  
 17  	chanIn   chan interface{}
 18  	chanOut  chan interface{}
 19  	overflow *list.List
 20  
 21  	wg   sync.WaitGroup
 22  	quit chan struct{}
 23  }
 24  
 25  // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
 26  // the capacity of the output channel. When the size of the queue is below this
 27  // threshold, pushes do not incur the overhead of the less efficient overflow
 28  // structure.
 29  func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
 30  	return &ConcurrentQueue{
 31  		chanIn:   make(chan interface{}),
 32  		chanOut:  make(chan interface{}, bufferSize),
 33  		overflow: list.New(),
 34  		quit:     make(chan struct{}),
 35  	}
 36  }
 37  
 38  // ChanIn returns a channel that can be used to push new items into the queue.
 39  func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
 40  	return cq.chanIn
 41  }
 42  
 43  // ChanOut returns a channel that can be used to pop items from the queue.
 44  func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
 45  	return cq.chanOut
 46  }
 47  
 48  // Start begins a goroutine that manages moving items from the in channel to the
 49  // out channel. The queue tries to move items directly to the out channel
 50  // minimize overhead, but if the out channel is full it pushes items to an
 51  // overflow queue. This must be called before using the queue.
 52  func (cq *ConcurrentQueue) Start() {
 53  	cq.started.Do(cq.start)
 54  }
 55  
 56  func (cq *ConcurrentQueue) start() {
 57  	cq.wg.Add(1)
 58  	go func() {
 59  		defer cq.wg.Done()
 60  
 61  	readLoop:
 62  		for {
 63  			nextElement := cq.overflow.Front()
 64  			if nextElement == nil {
 65  				// Overflow queue is empty so incoming items can be pushed
 66  				// directly to the output channel. If output channel is full
 67  				// though, push to overflow.
 68  				select {
 69  				case item, ok := <-cq.chanIn:
 70  					if !ok {
 71  						break readLoop
 72  					}
 73  					select {
 74  					case cq.chanOut <- item:
 75  						// Optimistically push directly to chanOut
 76  					default:
 77  						cq.overflow.PushBack(item)
 78  					}
 79  				case <-cq.quit:
 80  					return
 81  				}
 82  			} else {
 83  				// Overflow queue is not empty, so any new items get pushed to
 84  				// the back to preserve order.
 85  				select {
 86  				case item, ok := <-cq.chanIn:
 87  					if !ok {
 88  						break readLoop
 89  					}
 90  					cq.overflow.PushBack(item)
 91  				case cq.chanOut <- nextElement.Value:
 92  					cq.overflow.Remove(nextElement)
 93  				case <-cq.quit:
 94  					return
 95  				}
 96  			}
 97  		}
 98  
 99  		// Incoming channel has been closed. Empty overflow queue into
100  		// the outgoing channel.
101  		nextElement := cq.overflow.Front()
102  		for nextElement != nil {
103  			select {
104  			case cq.chanOut <- nextElement.Value:
105  				cq.overflow.Remove(nextElement)
106  			case <-cq.quit:
107  				return
108  			}
109  			nextElement = cq.overflow.Front()
110  		}
111  
112  		// Close outgoing channel.
113  		close(cq.chanOut)
114  	}()
115  }
116  
117  // Stop ends the goroutine that moves items from the in channel to the out
118  // channel. This does not clear the queue state, so the queue can be restarted
119  // without dropping items.
120  func (cq *ConcurrentQueue) Stop() {
121  	cq.stopped.Do(func() {
122  		close(cq.quit)
123  		cq.wg.Wait()
124  	})
125  }