/ src / external / boost / lockfree / spsc_queue.hpp
spsc_queue.hpp
  1  //  lock-free single-producer/single-consumer ringbuffer
  2  //  this algorithm is implemented in various projects (linux kernel)
  3  //
  4  //  Copyright (C) 2009-2013 Tim Blechmann
  5  //
  6  //  Distributed under the Boost Software License, Version 1.0. (See
  7  //  accompanying file LICENSE_1_0.txt or copy at
  8  //  http://www.boost.org/LICENSE_1_0.txt)
  9  
 10  #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
 11  #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
 12  
 13  #include <algorithm>
 14  #include <memory>
 15  
 16  #include <boost/aligned_storage.hpp>
 17  #include <boost/assert.hpp>
 18  #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
 19  #include <boost/noncopyable.hpp>
 20  #endif
 21  #include <boost/static_assert.hpp>
 22  #include <boost/utility.hpp>
 23  
 24  #include <boost/type_traits/has_trivial_destructor.hpp>
 25  
 26  #include <boost/lockfree/detail/atomic.hpp>
 27  #include <boost/lockfree/detail/branch_hints.hpp>
 28  #include <boost/lockfree/detail/parameter.hpp>
 29  #include <boost/lockfree/detail/prefix.hpp>
 30  
 31  
 32  namespace boost    {
 33  namespace lockfree {
 34  namespace detail   {
 35  
 36  typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
 37                                boost::parameter::optional<tag::allocator>
 38                               > ringbuffer_signature;
 39  
 40  template <typename T>
 41  class ringbuffer_base
 42  #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
 43          : boost::noncopyable
 44  #endif
 45  {
 46  #ifndef BOOST_DOXYGEN_INVOKED
 47      typedef std::size_t size_t;
 48      static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
 49      atomic<size_t> write_index_;
 50      char padding1[padding_size]; /* force read_index and write_index to different cache lines */
 51      atomic<size_t> read_index_;
 52  
 53  #ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS
 54      ringbuffer_base(ringbuffer_base const &) = delete;
 55      ringbuffer_base(ringbuffer_base &&)      = delete;
 56      const ringbuffer_base& operator=( const ringbuffer_base& ) = delete;
 57  #endif
 58  
 59  protected:
 60      ringbuffer_base(void):
 61          write_index_(0), read_index_(0)
 62      {}
 63  
 64      static size_t next_index(size_t arg, size_t max_size)
 65      {
 66          size_t ret = arg + 1;
 67          while (unlikely(ret >= max_size))
 68              ret -= max_size;
 69          return ret;
 70      }
 71  
 72      static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
 73      {
 74          if (write_index >= read_index)
 75              return write_index - read_index;
 76  
 77          size_t ret = write_index + max_size - read_index;
 78          return ret;
 79      }
 80  
 81      static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
 82      {
 83          size_t ret = read_index - write_index - 1;
 84          if (write_index >= read_index)
 85              ret += max_size;
 86          return ret;
 87      }
 88  
 89      bool push(T const & t, T * buffer, size_t max_size)
 90      {
 91          const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
 92          const size_t next = next_index(write_index, max_size);
 93  
 94          if (next == read_index_.load(memory_order_acquire))
 95              return false; /* ringbuffer is full */
 96  
 97          new (buffer + write_index) T(t); // copy-construct
 98  
 99          write_index_.store(next, memory_order_release);
100  
101          return true;
102      }
103  
104      size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
105      {
106          return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
107      }
108  
109      template <typename ConstIterator>
110      ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
111      {
112          // FIXME: avoid std::distance
113  
114          const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
115          const size_t read_index  = read_index_.load(memory_order_acquire);
116          const size_t avail = write_available(write_index, read_index, max_size);
117  
118          if (avail == 0)
119              return begin;
120  
121          size_t input_count = std::distance(begin, end);
122          input_count = (std::min)(input_count, avail);
123  
124          size_t new_write_index = write_index + input_count;
125  
126          const ConstIterator last = boost::next(begin, input_count);
127  
128          if (write_index + input_count > max_size) {
129              /* copy data in two sections */
130              const size_t count0 = max_size - write_index;
131              const ConstIterator midpoint = boost::next(begin, count0);
132  
133              std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
134              std::uninitialized_copy(midpoint, last, internal_buffer);
135              new_write_index -= max_size;
136          } else {
137              std::uninitialized_copy(begin, last, internal_buffer + write_index);
138  
139              if (new_write_index == max_size)
140                  new_write_index = 0;
141          }
142  
143          write_index_.store(new_write_index, memory_order_release);
144          return last;
145      }
146  
147      bool pop (T & ret, T * buffer, size_t max_size)
148      {
149          const size_t write_index = write_index_.load(memory_order_acquire);
150          const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
151          if (empty(write_index, read_index))
152              return false;
153  
154          ret = buffer[read_index];
155          buffer[read_index].~T();
156  
157          size_t next = next_index(read_index, max_size);
158          read_index_.store(next, memory_order_release);
159          return true;
160      }
161  
162      size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
163      {
164          const size_t write_index = write_index_.load(memory_order_acquire);
165          const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
166  
167          const size_t avail = read_available(write_index, read_index, max_size);
168  
169          if (avail == 0)
170              return 0;
171  
172          output_count = (std::min)(output_count, avail);
173  
174          size_t new_read_index = read_index + output_count;
175  
176          if (read_index + output_count > max_size) {
177              /* copy data in two sections */
178              const size_t count0 = max_size - read_index;
179              const size_t count1 = output_count - count0;
180  
181              copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
182              copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
183  
184              new_read_index -= max_size;
185          } else {
186              copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
187              if (new_read_index == max_size)
188                  new_read_index = 0;
189          }
190  
191          read_index_.store(new_read_index, memory_order_release);
192          return output_count;
193      }
194  
195      template <typename OutputIterator>
196      size_t pop (OutputIterator it, T * internal_buffer, size_t max_size)
197      {
198          const size_t write_index = write_index_.load(memory_order_acquire);
199          const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
200  
201          const size_t avail = read_available(write_index, read_index, max_size);
202          if (avail == 0)
203              return 0;
204  
205          size_t new_read_index = read_index + avail;
206  
207          if (read_index + avail > max_size) {
208              /* copy data in two sections */
209              const size_t count0 = max_size - read_index;
210              const size_t count1 = avail - count0;
211  
212              it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
213              copy_and_delete(internal_buffer, internal_buffer + count1, it);
214  
215              new_read_index -= max_size;
216          } else {
217              copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
218              if (new_read_index == max_size)
219                  new_read_index = 0;
220          }
221  
222          read_index_.store(new_read_index, memory_order_release);
223          return avail;
224      }
225  #endif
226  
227  
228  public:
229      /** reset the ringbuffer
230       *
231       * \note Not thread-safe
232       * */
233      void reset(void)
234      {
235          write_index_.store(0, memory_order_relaxed);
236          read_index_.store(0, memory_order_release);
237      }
238  
239      /** Check if the ringbuffer is empty
240       *
241       * \return true, if the ringbuffer is empty, false otherwise
242       * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
243       * */
244      bool empty(void)
245      {
246          return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
247      }
248  
249      /**
250       * \return true, if implementation is lock-free.
251       *
252       * */
253      bool is_lock_free(void) const
254      {
255          return write_index_.is_lock_free() && read_index_.is_lock_free();
256      }
257  
258  private:
259      bool empty(size_t write_index, size_t read_index)
260      {
261          return write_index == read_index;
262      }
263  
264      template< class OutputIterator >
265      OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
266      {
267          if (boost::has_trivial_destructor<T>::value) {
268              return std::copy(first, last, out); // will use memcpy if possible
269          } else {
270              for (; first != last; ++first, ++out) {
271                  *out = *first;
272                  first->~T();
273              }
274              return out;
275          }
276      }
277  };
278  
279  template <typename T, std::size_t MaxSize>
280  class compile_time_sized_ringbuffer:
281      public ringbuffer_base<T>
282  {
283      typedef std::size_t size_type;
284      static const std::size_t max_size = MaxSize + 1;
285  
286      typedef typename boost::aligned_storage<max_size * sizeof(T),
287                                              boost::alignment_of<T>::value
288                                             >::type storage_type;
289  
290      storage_type storage_;
291  
292      T * data()
293      {
294          return static_cast<T*>(storage_.address());
295      }
296  
297  public:
298      bool push(T const & t)
299      {
300          return ringbuffer_base<T>::push(t, data(), max_size);
301      }
302  
303      bool pop(T & ret)
304      {
305          return ringbuffer_base<T>::pop(ret, data(), max_size);
306      }
307  
308      size_type push(T const * t, size_type size)
309      {
310          return ringbuffer_base<T>::push(t, size, data(), max_size);
311      }
312  
313      template <size_type size>
314      size_type push(T const (&t)[size])
315      {
316          return push(t, size);
317      }
318  
319      template <typename ConstIterator>
320      ConstIterator push(ConstIterator begin, ConstIterator end)
321      {
322          return ringbuffer_base<T>::push(begin, end, data(), max_size);
323      }
324  
325      size_type pop(T * ret, size_type size)
326      {
327          return ringbuffer_base<T>::pop(ret, size, data(), max_size);
328      }
329  
330      template <size_type size>
331      size_type pop(T (&ret)[size])
332      {
333          return pop(ret, size);
334      }
335  
336      template <typename OutputIterator>
337      size_type pop(OutputIterator it)
338      {
339          return ringbuffer_base<T>::pop(it, data(), max_size);
340      }
341  };
342  
343  template <typename T, typename Alloc>
344  class runtime_sized_ringbuffer:
345      public ringbuffer_base<T>,
346      private Alloc
347  {
348      typedef std::size_t size_type;
349      size_type max_elements_;
350      typedef typename Alloc::pointer pointer;
351      pointer array_;
352  
353  public:
354      explicit runtime_sized_ringbuffer(size_type max_elements):
355          max_elements_(max_elements + 1)
356      {
357          array_ = Alloc::allocate(max_elements_);
358      }
359  
360      template <typename U>
361      runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
362          Alloc(alloc), max_elements_(max_elements + 1)
363      {
364          array_ = Alloc::allocate(max_elements_);
365      }
366  
367      runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
368          Alloc(alloc), max_elements_(max_elements + 1)
369      {
370          array_ = Alloc::allocate(max_elements_);
371      }
372  
373      ~runtime_sized_ringbuffer(void)
374      {
375          // destroy all remaining items
376          T out;
377          while (pop(out)) {};
378  
379          Alloc::deallocate(array_, max_elements_);
380      }
381  
382      bool push(T const & t)
383      {
384          return ringbuffer_base<T>::push(t, &*array_, max_elements_);
385      }
386  
387      bool pop(T & ret)
388      {
389          return ringbuffer_base<T>::pop(ret, &*array_, max_elements_);
390      }
391  
392      size_type push(T const * t, size_type size)
393      {
394          return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
395      }
396  
397      template <size_type size>
398      size_type push(T const (&t)[size])
399      {
400          return push(t, size);
401      }
402  
403      template <typename ConstIterator>
404      ConstIterator push(ConstIterator begin, ConstIterator end)
405      {
406          return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
407      }
408  
409      size_type pop(T * ret, size_type size)
410      {
411          return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
412      }
413  
414      template <size_type size>
415      size_type pop(T (&ret)[size])
416      {
417          return pop(ret, size);
418      }
419  
420      template <typename OutputIterator>
421      size_type pop(OutputIterator it)
422      {
423          return ringbuffer_base<T>::pop(it, array_, max_elements_);
424      }
425  };
426  
427  template <typename T, typename A0, typename A1>
428  struct make_ringbuffer
429  {
430      typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
431  
432      typedef extract_capacity<bound_args> extract_capacity_t;
433  
434      static const bool runtime_sized = !extract_capacity_t::has_capacity;
435      static const size_t capacity    =  extract_capacity_t::capacity;
436  
437      typedef extract_allocator<bound_args, T> extract_allocator_t;
438      typedef typename extract_allocator_t::type allocator;
439  
440      // allocator argument is only sane, for run-time sized ringbuffers
441      BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
442                                    mpl::bool_<!extract_allocator_t::has_allocator>,
443                                    mpl::true_
444                                   >::type::value));
445  
446      typedef typename mpl::if_c<runtime_sized,
447                                 runtime_sized_ringbuffer<T, allocator>,
448                                 compile_time_sized_ringbuffer<T, capacity>
449                                >::type ringbuffer_type;
450  };
451  
452  
453  } /* namespace detail */
454  
455  
456  /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
457   *
458   *  \b Policies:
459   *  - \c boost::lockfree::capacity<>, optional <br>
460   *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
461   *
462   *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
463   *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
464   *    to be sized at run-time
465   *
466   *  \b Requirements:
467   *  - T must have a default constructor
468   *  - T must be copyable
469   * */
470  #ifndef BOOST_DOXYGEN_INVOKED
471  template <typename T,
472            class A0 = boost::parameter::void_,
473            class A1 = boost::parameter::void_>
474  #else
475  template <typename T, ...Options>
476  #endif
477  class spsc_queue:
478      public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
479  {
480  private:
481  
482  #ifndef BOOST_DOXYGEN_INVOKED
483      typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
484      static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
485      typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
486  
487      struct implementation_defined
488      {
489          typedef allocator_arg allocator;
490          typedef std::size_t size_type;
491      };
492  #endif
493  
494  public:
495      typedef T value_type;
496      typedef typename implementation_defined::allocator allocator;
497      typedef typename implementation_defined::size_type size_type;
498  
499      /** Constructs a spsc_queue
500       *
501       *  \pre spsc_queue must be configured to be sized at compile-time
502       */
503      // @{
504      spsc_queue(void)
505      {
506          BOOST_ASSERT(!runtime_sized);
507      }
508  
509      template <typename U>
510      explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
511      {
512          // just for API compatibility: we don't actually need an allocator
513          BOOST_STATIC_ASSERT(!runtime_sized);
514      }
515  
516      explicit spsc_queue(allocator const & alloc)
517      {
518          // just for API compatibility: we don't actually need an allocator
519          BOOST_ASSERT(!runtime_sized);
520      }
521      // @}
522  
523  
524      /** Constructs a spsc_queue for element_count elements
525       *
526       *  \pre spsc_queue must be configured to be sized at run-time
527       */
528      // @{
529      explicit spsc_queue(size_type element_count):
530          base_type(element_count)
531      {
532          BOOST_ASSERT(runtime_sized);
533      }
534  
535      template <typename U>
536      spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
537          base_type(alloc, element_count)
538      {
539          BOOST_STATIC_ASSERT(runtime_sized);
540      }
541  
542      spsc_queue(size_type element_count, allocator_arg const & alloc):
543          base_type(alloc, element_count)
544      {
545          BOOST_ASSERT(runtime_sized);
546      }
547      // @}
548  
549      /** Pushes object t to the ringbuffer.
550       *
551       * \pre only one thread is allowed to push data to the spsc_queue
552       * \post object will be pushed to the spsc_queue, unless it is full.
553       * \return true, if the push operation is successful.
554       *
555       * \note Thread-safe and wait-free
556       * */
557      bool push(T const & t)
558      {
559          return base_type::push(t);
560      }
561  
562      /** Pops one object from ringbuffer.
563       *
564       * \pre only one thread is allowed to pop data to the spsc_queue
565       * \post if ringbuffer is not empty, object will be copied to ret.
566       * \return true, if the pop operation is successful, false if ringbuffer was empty.
567       *
568       * \note Thread-safe and wait-free
569       */
570      bool pop(T & ret)
571      {
572          return base_type::pop(ret);
573      }
574  
575      /** Pushes as many objects from the array t as there is space.
576       *
577       * \pre only one thread is allowed to push data to the spsc_queue
578       * \return number of pushed items
579       *
580       * \note Thread-safe and wait-free
581       */
582      size_type push(T const * t, size_type size)
583      {
584          return base_type::push(t, size);
585      }
586  
587      /** Pushes as many objects from the array t as there is space available.
588       *
589       * \pre only one thread is allowed to push data to the spsc_queue
590       * \return number of pushed items
591       *
592       * \note Thread-safe and wait-free
593       */
594      template <size_type size>
595      size_type push(T const (&t)[size])
596      {
597          return push(t, size);
598      }
599  
600      /** Pushes as many objects from the range [begin, end) as there is space .
601       *
602       * \pre only one thread is allowed to push data to the spsc_queue
603       * \return iterator to the first element, which has not been pushed
604       *
605       * \note Thread-safe and wait-free
606       */
607      template <typename ConstIterator>
608      ConstIterator push(ConstIterator begin, ConstIterator end)
609      {
610          return base_type::push(begin, end);
611      }
612  
613      /** Pops a maximum of size objects from ringbuffer.
614       *
615       * \pre only one thread is allowed to pop data to the spsc_queue
616       * \return number of popped items
617       *
618       * \note Thread-safe and wait-free
619       * */
620      size_type pop(T * ret, size_type size)
621      {
622          return base_type::pop(ret, size);
623      }
624  
625      /** Pops a maximum of size objects from spsc_queue.
626       *
627       * \pre only one thread is allowed to pop data to the spsc_queue
628       * \return number of popped items
629       *
630       * \note Thread-safe and wait-free
631       * */
632      template <size_type size>
633      size_type pop(T (&ret)[size])
634      {
635          return pop(ret, size);
636      }
637  
638      /** Pops objects to the output iterator it
639       *
640       * \pre only one thread is allowed to pop data to the spsc_queue
641       * \return number of popped items
642       *
643       * \note Thread-safe and wait-free
644       * */
645      template <typename OutputIterator>
646      size_type pop(OutputIterator it)
647      {
648          return base_type::pop(it);
649      }
650  
651      /** consumes one element via a functor
652       *
653       *  pops one element from the queue and applies the functor on this object
654       *
655       * \returns true, if one element was consumed
656       *
657       * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
658       * */
659      template <typename Functor>
660      bool consume_one(Functor & f)
661      {
662          T element;
663          bool success = pop(element);
664          if (success)
665              f(element);
666  
667          return success;
668      }
669  
670      /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
671      template <typename Functor>
672      bool consume_one(Functor const & f)
673      {
674          T element;
675          bool success = pop(element);
676          if (success)
677              f(element);
678  
679          return success;
680      }
681  
682      /** consumes all elements via a functor
683       *
684       * sequentially pops all elements from the queue and applies the functor on each object
685       *
686       * \returns number of elements that are consumed
687       *
688       * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
689       * */
690      template <typename Functor>
691      size_type consume_all(Functor & f)
692      {
693          size_type element_count = 0;
694          while (consume_one(f))
695              element_count += 1;
696  
697          return element_count;
698      }
699  
700      /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
701      template <typename Functor>
702      size_type consume_all(Functor const & f)
703      {
704          size_type element_count = 0;
705          while (consume_one(f))
706              element_count += 1;
707  
708          return element_count;
709      }
710  };
711  
712  } /* namespace lockfree */
713  } /* namespace boost */
714  
715  
716  #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */