/ src / external / boost / lockfree / queue.hpp
queue.hpp
  1  //  lock-free queue from
  2  //  Michael, M. M. and Scott, M. L.,
  3  //  "simple, fast and practical non-blocking and blocking concurrent queue algorithms"
  4  //
  5  //  Copyright (C) 2008-2013 Tim Blechmann
  6  //
  7  //  Distributed under the Boost Software License, Version 1.0. (See
  8  //  accompanying file LICENSE_1_0.txt or copy at
  9  //  http://www.boost.org/LICENSE_1_0.txt)
 10  
 11  #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED
 12  #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED
 13  
 14  #include <boost/assert.hpp>
 15  #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
 16  #include <boost/noncopyable.hpp>
 17  #endif
 18  #include <boost/static_assert.hpp>
 19  #include <boost/type_traits/has_trivial_assign.hpp>
 20  #include <boost/type_traits/has_trivial_destructor.hpp>
 21  
 22  #include <boost/lockfree/detail/atomic.hpp>
 23  #include <boost/lockfree/detail/copy_payload.hpp>
 24  #include <boost/lockfree/detail/freelist.hpp>
 25  #include <boost/lockfree/detail/parameter.hpp>
 26  #include <boost/lockfree/detail/tagged_ptr.hpp>
 27  
 28  #if defined(_MSC_VER)
 29  #pragma warning(push)
 30  #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
 31  #endif
 32  
 33  
 34  namespace boost    {
 35  namespace lockfree {
 36  namespace detail   {
 37  
 38  typedef parameter::parameters<boost::parameter::optional<tag::allocator>,
 39                                boost::parameter::optional<tag::capacity>
 40                               > queue_signature;
 41  
 42  } /* namespace detail */
 43  
 44  
 45  /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free,
 46   *  construction/destruction has to be synchronized. It uses a freelist for memory management,
 47   *  freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed.
 48   *
 49   *  \b Policies:
 50   *  - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n
 51   *    Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n
 52   *    If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed
 53   *    by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index
 54   *    type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way
 55   *    to achieve lock-freedom.
 56   *
 57   *  - \ref boost::lockfree::capacity, optional \n
 58   *    If this template argument is passed to the options, the size of the queue is set at compile-time.\n
 59   *    It this option implies \c fixed_sized<true>
 60   *
 61   *  - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n
 62   *    Specifies the allocator that is used for the internal freelist
 63   *
 64   *  \b Requirements:
 65   *   - T must have a copy constructor
 66   *   - T must have a trivial assignment operator
 67   *   - T must have a trivial destructor
 68   *
 69   * */
 70  #ifndef BOOST_DOXYGEN_INVOKED
 71  template <typename T,
 72            class A0 = boost::parameter::void_,
 73            class A1 = boost::parameter::void_,
 74            class A2 = boost::parameter::void_>
 75  #else
 76  template <typename T, ...Options>
 77  #endif
 78  class queue
 79  #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS
 80      : boost::noncopyable
 81  #endif
 82  {
 83  private:
 84  #ifndef BOOST_DOXYGEN_INVOKED
 85  
 86  #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR
 87      BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value));
 88  #endif
 89  
 90  #ifdef BOOST_HAS_TRIVIAL_ASSIGN
 91      BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value));
 92  #endif
 93  
 94      typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
 95  
 96      static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
 97      static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
 98      static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
 99      static const bool node_based = !(has_capacity || fixed_sized);
100      static const bool compile_time_sized = has_capacity;
101  
102      struct BOOST_LOCKFREE_CACHELINE_ALIGNMENT node
103      {
104          typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
105          typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
106  
107          node(T const & v, handle_type null_handle):
108              data(v)//, next(tagged_node_handle(0, 0))
109          {
110              /* increment tag to avoid ABA problem */
111              tagged_node_handle old_next = next.load(memory_order_relaxed);
112              tagged_node_handle new_next (null_handle, old_next.get_next_tag());
113              next.store(new_next, memory_order_release);
114          }
115  
116          node (handle_type null_handle):
117              next(tagged_node_handle(null_handle, 0))
118          {}
119  
120          node(void)
121          {}
122  
123          atomic<tagged_node_handle> next;
124          T data;
125      };
126  
127      typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
128      typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
129      typedef typename pool_t::tagged_node_handle tagged_node_handle;
130      typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
131  
132      void initialize(void)
133      {
134          node * n = pool.template construct<true, false>(pool.null_handle());
135          tagged_node_handle dummy_node(pool.get_handle(n), 0);
136          head_.store(dummy_node, memory_order_relaxed);
137          tail_.store(dummy_node, memory_order_release);
138      }
139  
140      struct implementation_defined
141      {
142          typedef node_allocator allocator;
143          typedef std::size_t size_type;
144      };
145  
146  #endif
147  
148  #ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS
149      queue(queue const &) = delete;
150      queue(queue &&)      = delete;
151      const queue& operator=( const queue& ) = delete;
152  #endif
153  
154  public:
155      typedef T value_type;
156      typedef typename implementation_defined::allocator allocator;
157      typedef typename implementation_defined::size_type size_type;
158  
159      /**
160       * \return true, if implementation is lock-free.
161       *
162       * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner.
163       *       On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is
164       *       no possibility to provide a completely accurate implementation, because one would need to test every internal
165       *       node, which is impossible if further nodes will be allocated from the operating system.
166       * */
167      bool is_lock_free (void) const
168      {
169          return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free();
170      }
171  
172      //! Construct queue
173      // @{
174      queue(void):
175          head_(tagged_node_handle(0, 0)),
176          tail_(tagged_node_handle(0, 0)),
177          pool(node_allocator(), capacity)
178      {
179          BOOST_ASSERT(has_capacity);
180          initialize();
181      }
182  
183      template <typename U>
184      explicit queue(typename node_allocator::template rebind<U>::other const & alloc):
185          head_(tagged_node_handle(0, 0)),
186          tail_(tagged_node_handle(0, 0)),
187          pool(alloc, capacity)
188      {
189          BOOST_STATIC_ASSERT(has_capacity);
190          initialize();
191      }
192  
193      explicit queue(allocator const & alloc):
194          head_(tagged_node_handle(0, 0)),
195          tail_(tagged_node_handle(0, 0)),
196          pool(alloc, capacity)
197      {
198          BOOST_ASSERT(has_capacity);
199          initialize();
200      }
201      // @}
202  
203      //! Construct queue, allocate n nodes for the freelist.
204      // @{
205      explicit queue(size_type n):
206          head_(tagged_node_handle(0, 0)),
207          tail_(tagged_node_handle(0, 0)),
208          pool(node_allocator(), n + 1)
209      {
210          BOOST_ASSERT(!has_capacity);
211          initialize();
212      }
213  
214      template <typename U>
215      queue(size_type n, typename node_allocator::template rebind<U>::other const & alloc):
216          head_(tagged_node_handle(0, 0)),
217          tail_(tagged_node_handle(0, 0)),
218          pool(alloc, n + 1)
219      {
220          BOOST_STATIC_ASSERT(!has_capacity);
221          initialize();
222      }
223      // @}
224  
225      /** \copydoc boost::lockfree::stack::reserve
226       * */
227      void reserve(size_type n)
228      {
229          pool.template reserve<true>(n);
230      }
231  
232      /** \copydoc boost::lockfree::stack::reserve_unsafe
233       * */
234      void reserve_unsafe(size_type n)
235      {
236          pool.template reserve<false>(n);
237      }
238  
239      /** Destroys queue, free all nodes from freelist.
240       * */
241      ~queue(void)
242      {
243          T dummy;
244          while(unsynchronized_pop(dummy))
245          {}
246  
247          pool.template destruct<false>(head_.load(memory_order_relaxed));
248      }
249  
250      /** Check if the queue is empty
251       *
252       * \return true, if the queue is empty, false otherwise
253       * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
254       *       value in program logic.
255       * */
256      bool empty(void)
257      {
258          return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
259      }
260  
261      /** Pushes object t to the queue.
262       *
263       * \post object will be pushed to the queue, if internal node can be allocated
264       * \returns true, if the push operation is successful.
265       *
266       * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
267       *                    from the OS. This may not be lock-free.
268       * */
269      bool push(T const & t)
270      {
271          return do_push<false>(t);
272      }
273  
274      /** Pushes object t to the queue.
275       *
276       * \post object will be pushed to the queue, if internal node can be allocated
277       * \returns true, if the push operation is successful.
278       *
279       * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
280       * \throws if memory allocator throws
281       * */
282      bool bounded_push(T const & t)
283      {
284          return do_push<true>(t);
285      }
286  
287  
288  private:
289  #ifndef BOOST_DOXYGEN_INVOKED
290      template <bool Bounded>
291      bool do_push(T const & t)
292      {
293          using detail::likely;
294  
295          node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
296          handle_type node_handle = pool.get_handle(n);
297  
298          if (n == NULL)
299              return false;
300  
301          for (;;) {
302              tagged_node_handle tail = tail_.load(memory_order_acquire);
303              node * tail_node = pool.get_pointer(tail);
304              tagged_node_handle next = tail_node->next.load(memory_order_acquire);
305              node * next_ptr = pool.get_pointer(next);
306  
307              tagged_node_handle tail2 = tail_.load(memory_order_acquire);
308              if (likely(tail == tail2)) {
309                  if (next_ptr == 0) {
310                      tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
311                      if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
312                          tagged_node_handle new_tail(node_handle, tail.get_next_tag());
313                          tail_.compare_exchange_strong(tail, new_tail);
314                          return true;
315                      }
316                  }
317                  else {
318                      tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
319                      tail_.compare_exchange_strong(tail, new_tail);
320                  }
321              }
322          }
323      }
324  #endif
325  
326  public:
327  
328      /** Pushes object t to the queue.
329       *
330       * \post object will be pushed to the queue, if internal node can be allocated
331       * \returns true, if the push operation is successful.
332       *
333       * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
334       *       from the OS. This may not be lock-free.
335       * \throws if memory allocator throws
336       * */
337      bool unsynchronized_push(T const & t)
338      {
339          node * n = pool.template construct<false, false>(t, pool.null_handle());
340  
341          if (n == NULL)
342              return false;
343  
344          for (;;) {
345              tagged_node_handle tail = tail_.load(memory_order_relaxed);
346              tagged_node_handle next = tail->next.load(memory_order_relaxed);
347              node * next_ptr = next.get_ptr();
348  
349              if (next_ptr == 0) {
350                  tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed);
351                  tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed);
352                  return true;
353              }
354              else
355                  tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed);
356          }
357      }
358  
359      /** Pops object from queue.
360       *
361       * \post if pop operation is successful, object will be copied to ret.
362       * \returns true, if the pop operation is successful, false if queue was empty.
363       *
364       * \note Thread-safe and non-blocking
365       * */
366      bool pop (T & ret)
367      {
368          return pop<T>(ret);
369      }
370  
371      /** Pops object from queue.
372       *
373       * \pre type U must be constructible by T and copyable, or T must be convertible to U
374       * \post if pop operation is successful, object will be copied to ret.
375       * \returns true, if the pop operation is successful, false if queue was empty.
376       *
377       * \note Thread-safe and non-blocking
378       * */
379      template <typename U>
380      bool pop (U & ret)
381      {
382          using detail::likely;
383          for (;;) {
384              tagged_node_handle head = head_.load(memory_order_acquire);
385              node * head_ptr = pool.get_pointer(head);
386  
387              tagged_node_handle tail = tail_.load(memory_order_acquire);
388              tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
389              node * next_ptr = pool.get_pointer(next);
390  
391              tagged_node_handle head2 = head_.load(memory_order_acquire);
392              if (likely(head == head2)) {
393                  if (pool.get_handle(head) == pool.get_handle(tail)) {
394                      if (next_ptr == 0)
395                          return false;
396  
397                      tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
398                      tail_.compare_exchange_strong(tail, new_tail);
399  
400                  } else {
401                      if (next_ptr == 0)
402                          /* this check is not part of the original algorithm as published by michael and scott
403                           *
404                           * however we reuse the tagged_ptr part for the freelist and clear the next part during node
405                           * allocation. we can observe a null-pointer here.
406                           * */
407                          continue;
408                      detail::copy_payload(next_ptr->data, ret);
409  
410                      tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
411                      if (head_.compare_exchange_weak(head, new_head)) {
412                          pool.template destruct<true>(head);
413                          return true;
414                      }
415                  }
416              }
417          }
418      }
419  
420      /** Pops object from queue.
421       *
422       * \post if pop operation is successful, object will be copied to ret.
423       * \returns true, if the pop operation is successful, false if queue was empty.
424       *
425       * \note Not thread-safe, but non-blocking
426       *
427       * */
428      bool unsynchronized_pop (T & ret)
429      {
430          return unsynchronized_pop<T>(ret);
431      }
432  
433      /** Pops object from queue.
434       *
435       * \pre type U must be constructible by T and copyable, or T must be convertible to U
436       * \post if pop operation is successful, object will be copied to ret.
437       * \returns true, if the pop operation is successful, false if queue was empty.
438       *
439       * \note Not thread-safe, but non-blocking
440       *
441       * */
442      template <typename U>
443      bool unsynchronized_pop (U & ret)
444      {
445          for (;;) {
446              tagged_node_handle head = head_.load(memory_order_relaxed);
447              node * head_ptr = pool.get_pointer(head);
448              tagged_node_handle tail = tail_.load(memory_order_relaxed);
449              tagged_node_handle next = head_ptr->next.load(memory_order_relaxed);
450              node * next_ptr = pool.get_pointer(next);
451  
452              if (pool.get_handle(head) == pool.get_handle(tail)) {
453                  if (next_ptr == 0)
454                      return false;
455  
456                  tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
457                  tail_.store(new_tail);
458              } else {
459                  if (next_ptr == 0)
460                      /* this check is not part of the original algorithm as published by michael and scott
461                       *
462                       * however we reuse the tagged_ptr part for the freelist and clear the next part during node
463                       * allocation. we can observe a null-pointer here.
464                       * */
465                      continue;
466                  detail::copy_payload(next_ptr->data, ret);
467                  tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
468                  head_.store(new_head);
469                  pool.template destruct<false>(head);
470                  return true;
471              }
472          }
473      }
474  
475      /** consumes one element via a functor
476       *
477       *  pops one element from the queue and applies the functor on this object
478       *
479       * \returns true, if one element was consumed
480       *
481       * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
482       * */
483      template <typename Functor>
484      bool consume_one(Functor & f)
485      {
486          T element;
487          bool success = pop(element);
488          if (success)
489              f(element);
490  
491          return success;
492      }
493  
494      /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs)
495      template <typename Functor>
496      bool consume_one(Functor const & f)
497      {
498          T element;
499          bool success = pop(element);
500          if (success)
501              f(element);
502  
503          return success;
504      }
505  
506      /** consumes all elements via a functor
507       *
508       * sequentially pops all elements from the queue and applies the functor on each object
509       *
510       * \returns number of elements that are consumed
511       *
512       * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
513       * */
514      template <typename Functor>
515      size_t consume_all(Functor & f)
516      {
517          size_t element_count = 0;
518          while (consume_one(f))
519              element_count += 1;
520  
521          return element_count;
522      }
523  
524      /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs)
525      template <typename Functor>
526      size_t consume_all(Functor const & f)
527      {
528          size_t element_count = 0;
529          while (consume_one(f))
530              element_count += 1;
531  
532          return element_count;
533      }
534  
535  private:
536  #ifndef BOOST_DOXYGEN_INVOKED
537      atomic<tagged_node_handle> head_;
538      static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
539      char padding1[padding_size];
540      atomic<tagged_node_handle> tail_;
541      char padding2[padding_size];
542  
543      pool_t pool;
544  #endif
545  };
546  
547  } /* namespace lockfree */
548  } /* namespace boost */
549  
550  #if defined(_MSC_VER)
551  #pragma warning(pop)
552  #endif
553  
554  #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */