spsc_queue.hpp
1 // lock-free single-producer/single-consumer ringbuffer 2 // this algorithm is implemented in various projects (linux kernel) 3 // 4 // Copyright (C) 2009-2013 Tim Blechmann 5 // 6 // Distributed under the Boost Software License, Version 1.0. (See 7 // accompanying file LICENSE_1_0.txt or copy at 8 // http://www.boost.org/LICENSE_1_0.txt) 9 10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 12 13 #include <algorithm> 14 #include <memory> 15 16 #include <boost/aligned_storage.hpp> 17 #include <boost/assert.hpp> 18 #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS 19 #include <boost/noncopyable.hpp> 20 #endif 21 #include <boost/static_assert.hpp> 22 #include <boost/utility.hpp> 23 24 #include <boost/type_traits/has_trivial_destructor.hpp> 25 26 #include <boost/lockfree/detail/atomic.hpp> 27 #include <boost/lockfree/detail/branch_hints.hpp> 28 #include <boost/lockfree/detail/parameter.hpp> 29 #include <boost/lockfree/detail/prefix.hpp> 30 31 32 namespace boost { 33 namespace lockfree { 34 namespace detail { 35 36 typedef parameter::parameters<boost::parameter::optional<tag::capacity>, 37 boost::parameter::optional<tag::allocator> 38 > ringbuffer_signature; 39 40 template <typename T> 41 class ringbuffer_base 42 #ifdef BOOST_NO_CXX11_DELETED_FUNCTIONS 43 : boost::noncopyable 44 #endif 45 { 46 #ifndef BOOST_DOXYGEN_INVOKED 47 typedef std::size_t size_t; 48 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); 49 atomic<size_t> write_index_; 50 char padding1[padding_size]; /* force read_index and write_index to different cache lines */ 51 atomic<size_t> read_index_; 52 53 #ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS 54 ringbuffer_base(ringbuffer_base const &) = delete; 55 ringbuffer_base(ringbuffer_base &&) = delete; 56 const ringbuffer_base& operator=( const ringbuffer_base& ) = delete; 57 #endif 58 59 protected: 60 ringbuffer_base(void): 61 write_index_(0), read_index_(0) 62 {} 63 64 static size_t next_index(size_t arg, size_t max_size) 65 { 66 size_t ret = arg + 1; 67 while (unlikely(ret >= max_size)) 68 ret -= max_size; 69 return ret; 70 } 71 72 static size_t read_available(size_t write_index, size_t read_index, size_t max_size) 73 { 74 if (write_index >= read_index) 75 return write_index - read_index; 76 77 size_t ret = write_index + max_size - read_index; 78 return ret; 79 } 80 81 static size_t write_available(size_t write_index, size_t read_index, size_t max_size) 82 { 83 size_t ret = read_index - write_index - 1; 84 if (write_index >= read_index) 85 ret += max_size; 86 return ret; 87 } 88 89 bool push(T const & t, T * buffer, size_t max_size) 90 { 91 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 92 const size_t next = next_index(write_index, max_size); 93 94 if (next == read_index_.load(memory_order_acquire)) 95 return false; /* ringbuffer is full */ 96 97 new (buffer + write_index) T(t); // copy-construct 98 99 write_index_.store(next, memory_order_release); 100 101 return true; 102 } 103 104 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) 105 { 106 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer; 107 } 108 109 template <typename ConstIterator> 110 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size) 111 { 112 // FIXME: avoid std::distance 113 114 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread 115 const size_t read_index = read_index_.load(memory_order_acquire); 116 const size_t avail = write_available(write_index, read_index, max_size); 117 118 if (avail == 0) 119 return begin; 120 121 size_t input_count = std::distance(begin, end); 122 input_count = (std::min)(input_count, avail); 123 124 size_t new_write_index = write_index + input_count; 125 126 const ConstIterator last = boost::next(begin, input_count); 127 128 if (write_index + input_count > max_size) { 129 /* copy data in two sections */ 130 const size_t count0 = max_size - write_index; 131 const ConstIterator midpoint = boost::next(begin, count0); 132 133 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index); 134 std::uninitialized_copy(midpoint, last, internal_buffer); 135 new_write_index -= max_size; 136 } else { 137 std::uninitialized_copy(begin, last, internal_buffer + write_index); 138 139 if (new_write_index == max_size) 140 new_write_index = 0; 141 } 142 143 write_index_.store(new_write_index, memory_order_release); 144 return last; 145 } 146 147 bool pop (T & ret, T * buffer, size_t max_size) 148 { 149 const size_t write_index = write_index_.load(memory_order_acquire); 150 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 151 if (empty(write_index, read_index)) 152 return false; 153 154 ret = buffer[read_index]; 155 buffer[read_index].~T(); 156 157 size_t next = next_index(read_index, max_size); 158 read_index_.store(next, memory_order_release); 159 return true; 160 } 161 162 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size) 163 { 164 const size_t write_index = write_index_.load(memory_order_acquire); 165 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 166 167 const size_t avail = read_available(write_index, read_index, max_size); 168 169 if (avail == 0) 170 return 0; 171 172 output_count = (std::min)(output_count, avail); 173 174 size_t new_read_index = read_index + output_count; 175 176 if (read_index + output_count > max_size) { 177 /* copy data in two sections */ 178 const size_t count0 = max_size - read_index; 179 const size_t count1 = output_count - count0; 180 181 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer); 182 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0); 183 184 new_read_index -= max_size; 185 } else { 186 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer); 187 if (new_read_index == max_size) 188 new_read_index = 0; 189 } 190 191 read_index_.store(new_read_index, memory_order_release); 192 return output_count; 193 } 194 195 template <typename OutputIterator> 196 size_t pop (OutputIterator it, T * internal_buffer, size_t max_size) 197 { 198 const size_t write_index = write_index_.load(memory_order_acquire); 199 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread 200 201 const size_t avail = read_available(write_index, read_index, max_size); 202 if (avail == 0) 203 return 0; 204 205 size_t new_read_index = read_index + avail; 206 207 if (read_index + avail > max_size) { 208 /* copy data in two sections */ 209 const size_t count0 = max_size - read_index; 210 const size_t count1 = avail - count0; 211 212 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it); 213 copy_and_delete(internal_buffer, internal_buffer + count1, it); 214 215 new_read_index -= max_size; 216 } else { 217 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it); 218 if (new_read_index == max_size) 219 new_read_index = 0; 220 } 221 222 read_index_.store(new_read_index, memory_order_release); 223 return avail; 224 } 225 #endif 226 227 228 public: 229 /** reset the ringbuffer 230 * 231 * \note Not thread-safe 232 * */ 233 void reset(void) 234 { 235 write_index_.store(0, memory_order_relaxed); 236 read_index_.store(0, memory_order_release); 237 } 238 239 /** Check if the ringbuffer is empty 240 * 241 * \return true, if the ringbuffer is empty, false otherwise 242 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate. 243 * */ 244 bool empty(void) 245 { 246 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed)); 247 } 248 249 /** 250 * \return true, if implementation is lock-free. 251 * 252 * */ 253 bool is_lock_free(void) const 254 { 255 return write_index_.is_lock_free() && read_index_.is_lock_free(); 256 } 257 258 private: 259 bool empty(size_t write_index, size_t read_index) 260 { 261 return write_index == read_index; 262 } 263 264 template< class OutputIterator > 265 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out ) 266 { 267 if (boost::has_trivial_destructor<T>::value) { 268 return std::copy(first, last, out); // will use memcpy if possible 269 } else { 270 for (; first != last; ++first, ++out) { 271 *out = *first; 272 first->~T(); 273 } 274 return out; 275 } 276 } 277 }; 278 279 template <typename T, std::size_t MaxSize> 280 class compile_time_sized_ringbuffer: 281 public ringbuffer_base<T> 282 { 283 typedef std::size_t size_type; 284 static const std::size_t max_size = MaxSize + 1; 285 286 typedef typename boost::aligned_storage<max_size * sizeof(T), 287 boost::alignment_of<T>::value 288 >::type storage_type; 289 290 storage_type storage_; 291 292 T * data() 293 { 294 return static_cast<T*>(storage_.address()); 295 } 296 297 public: 298 bool push(T const & t) 299 { 300 return ringbuffer_base<T>::push(t, data(), max_size); 301 } 302 303 bool pop(T & ret) 304 { 305 return ringbuffer_base<T>::pop(ret, data(), max_size); 306 } 307 308 size_type push(T const * t, size_type size) 309 { 310 return ringbuffer_base<T>::push(t, size, data(), max_size); 311 } 312 313 template <size_type size> 314 size_type push(T const (&t)[size]) 315 { 316 return push(t, size); 317 } 318 319 template <typename ConstIterator> 320 ConstIterator push(ConstIterator begin, ConstIterator end) 321 { 322 return ringbuffer_base<T>::push(begin, end, data(), max_size); 323 } 324 325 size_type pop(T * ret, size_type size) 326 { 327 return ringbuffer_base<T>::pop(ret, size, data(), max_size); 328 } 329 330 template <size_type size> 331 size_type pop(T (&ret)[size]) 332 { 333 return pop(ret, size); 334 } 335 336 template <typename OutputIterator> 337 size_type pop(OutputIterator it) 338 { 339 return ringbuffer_base<T>::pop(it, data(), max_size); 340 } 341 }; 342 343 template <typename T, typename Alloc> 344 class runtime_sized_ringbuffer: 345 public ringbuffer_base<T>, 346 private Alloc 347 { 348 typedef std::size_t size_type; 349 size_type max_elements_; 350 typedef typename Alloc::pointer pointer; 351 pointer array_; 352 353 public: 354 explicit runtime_sized_ringbuffer(size_type max_elements): 355 max_elements_(max_elements + 1) 356 { 357 array_ = Alloc::allocate(max_elements_); 358 } 359 360 template <typename U> 361 runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements): 362 Alloc(alloc), max_elements_(max_elements + 1) 363 { 364 array_ = Alloc::allocate(max_elements_); 365 } 366 367 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements): 368 Alloc(alloc), max_elements_(max_elements + 1) 369 { 370 array_ = Alloc::allocate(max_elements_); 371 } 372 373 ~runtime_sized_ringbuffer(void) 374 { 375 // destroy all remaining items 376 T out; 377 while (pop(out)) {}; 378 379 Alloc::deallocate(array_, max_elements_); 380 } 381 382 bool push(T const & t) 383 { 384 return ringbuffer_base<T>::push(t, &*array_, max_elements_); 385 } 386 387 bool pop(T & ret) 388 { 389 return ringbuffer_base<T>::pop(ret, &*array_, max_elements_); 390 } 391 392 size_type push(T const * t, size_type size) 393 { 394 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_); 395 } 396 397 template <size_type size> 398 size_type push(T const (&t)[size]) 399 { 400 return push(t, size); 401 } 402 403 template <typename ConstIterator> 404 ConstIterator push(ConstIterator begin, ConstIterator end) 405 { 406 return ringbuffer_base<T>::push(begin, end, array_, max_elements_); 407 } 408 409 size_type pop(T * ret, size_type size) 410 { 411 return ringbuffer_base<T>::pop(ret, size, array_, max_elements_); 412 } 413 414 template <size_type size> 415 size_type pop(T (&ret)[size]) 416 { 417 return pop(ret, size); 418 } 419 420 template <typename OutputIterator> 421 size_type pop(OutputIterator it) 422 { 423 return ringbuffer_base<T>::pop(it, array_, max_elements_); 424 } 425 }; 426 427 template <typename T, typename A0, typename A1> 428 struct make_ringbuffer 429 { 430 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args; 431 432 typedef extract_capacity<bound_args> extract_capacity_t; 433 434 static const bool runtime_sized = !extract_capacity_t::has_capacity; 435 static const size_t capacity = extract_capacity_t::capacity; 436 437 typedef extract_allocator<bound_args, T> extract_allocator_t; 438 typedef typename extract_allocator_t::type allocator; 439 440 // allocator argument is only sane, for run-time sized ringbuffers 441 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>, 442 mpl::bool_<!extract_allocator_t::has_allocator>, 443 mpl::true_ 444 >::type::value)); 445 446 typedef typename mpl::if_c<runtime_sized, 447 runtime_sized_ringbuffer<T, allocator>, 448 compile_time_sized_ringbuffer<T, capacity> 449 >::type ringbuffer_type; 450 }; 451 452 453 } /* namespace detail */ 454 455 456 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free. 457 * 458 * \b Policies: 459 * - \c boost::lockfree::capacity<>, optional <br> 460 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time. 461 * 462 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br> 463 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured 464 * to be sized at run-time 465 * 466 * \b Requirements: 467 * - T must have a default constructor 468 * - T must be copyable 469 * */ 470 #ifndef BOOST_DOXYGEN_INVOKED 471 template <typename T, 472 class A0 = boost::parameter::void_, 473 class A1 = boost::parameter::void_> 474 #else 475 template <typename T, ...Options> 476 #endif 477 class spsc_queue: 478 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type 479 { 480 private: 481 482 #ifndef BOOST_DOXYGEN_INVOKED 483 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type; 484 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized; 485 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg; 486 487 struct implementation_defined 488 { 489 typedef allocator_arg allocator; 490 typedef std::size_t size_type; 491 }; 492 #endif 493 494 public: 495 typedef T value_type; 496 typedef typename implementation_defined::allocator allocator; 497 typedef typename implementation_defined::size_type size_type; 498 499 /** Constructs a spsc_queue 500 * 501 * \pre spsc_queue must be configured to be sized at compile-time 502 */ 503 // @{ 504 spsc_queue(void) 505 { 506 BOOST_ASSERT(!runtime_sized); 507 } 508 509 template <typename U> 510 explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc) 511 { 512 // just for API compatibility: we don't actually need an allocator 513 BOOST_STATIC_ASSERT(!runtime_sized); 514 } 515 516 explicit spsc_queue(allocator const & alloc) 517 { 518 // just for API compatibility: we don't actually need an allocator 519 BOOST_ASSERT(!runtime_sized); 520 } 521 // @} 522 523 524 /** Constructs a spsc_queue for element_count elements 525 * 526 * \pre spsc_queue must be configured to be sized at run-time 527 */ 528 // @{ 529 explicit spsc_queue(size_type element_count): 530 base_type(element_count) 531 { 532 BOOST_ASSERT(runtime_sized); 533 } 534 535 template <typename U> 536 spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc): 537 base_type(alloc, element_count) 538 { 539 BOOST_STATIC_ASSERT(runtime_sized); 540 } 541 542 spsc_queue(size_type element_count, allocator_arg const & alloc): 543 base_type(alloc, element_count) 544 { 545 BOOST_ASSERT(runtime_sized); 546 } 547 // @} 548 549 /** Pushes object t to the ringbuffer. 550 * 551 * \pre only one thread is allowed to push data to the spsc_queue 552 * \post object will be pushed to the spsc_queue, unless it is full. 553 * \return true, if the push operation is successful. 554 * 555 * \note Thread-safe and wait-free 556 * */ 557 bool push(T const & t) 558 { 559 return base_type::push(t); 560 } 561 562 /** Pops one object from ringbuffer. 563 * 564 * \pre only one thread is allowed to pop data to the spsc_queue 565 * \post if ringbuffer is not empty, object will be copied to ret. 566 * \return true, if the pop operation is successful, false if ringbuffer was empty. 567 * 568 * \note Thread-safe and wait-free 569 */ 570 bool pop(T & ret) 571 { 572 return base_type::pop(ret); 573 } 574 575 /** Pushes as many objects from the array t as there is space. 576 * 577 * \pre only one thread is allowed to push data to the spsc_queue 578 * \return number of pushed items 579 * 580 * \note Thread-safe and wait-free 581 */ 582 size_type push(T const * t, size_type size) 583 { 584 return base_type::push(t, size); 585 } 586 587 /** Pushes as many objects from the array t as there is space available. 588 * 589 * \pre only one thread is allowed to push data to the spsc_queue 590 * \return number of pushed items 591 * 592 * \note Thread-safe and wait-free 593 */ 594 template <size_type size> 595 size_type push(T const (&t)[size]) 596 { 597 return push(t, size); 598 } 599 600 /** Pushes as many objects from the range [begin, end) as there is space . 601 * 602 * \pre only one thread is allowed to push data to the spsc_queue 603 * \return iterator to the first element, which has not been pushed 604 * 605 * \note Thread-safe and wait-free 606 */ 607 template <typename ConstIterator> 608 ConstIterator push(ConstIterator begin, ConstIterator end) 609 { 610 return base_type::push(begin, end); 611 } 612 613 /** Pops a maximum of size objects from ringbuffer. 614 * 615 * \pre only one thread is allowed to pop data to the spsc_queue 616 * \return number of popped items 617 * 618 * \note Thread-safe and wait-free 619 * */ 620 size_type pop(T * ret, size_type size) 621 { 622 return base_type::pop(ret, size); 623 } 624 625 /** Pops a maximum of size objects from spsc_queue. 626 * 627 * \pre only one thread is allowed to pop data to the spsc_queue 628 * \return number of popped items 629 * 630 * \note Thread-safe and wait-free 631 * */ 632 template <size_type size> 633 size_type pop(T (&ret)[size]) 634 { 635 return pop(ret, size); 636 } 637 638 /** Pops objects to the output iterator it 639 * 640 * \pre only one thread is allowed to pop data to the spsc_queue 641 * \return number of popped items 642 * 643 * \note Thread-safe and wait-free 644 * */ 645 template <typename OutputIterator> 646 size_type pop(OutputIterator it) 647 { 648 return base_type::pop(it); 649 } 650 651 /** consumes one element via a functor 652 * 653 * pops one element from the queue and applies the functor on this object 654 * 655 * \returns true, if one element was consumed 656 * 657 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 658 * */ 659 template <typename Functor> 660 bool consume_one(Functor & f) 661 { 662 T element; 663 bool success = pop(element); 664 if (success) 665 f(element); 666 667 return success; 668 } 669 670 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs) 671 template <typename Functor> 672 bool consume_one(Functor const & f) 673 { 674 T element; 675 bool success = pop(element); 676 if (success) 677 f(element); 678 679 return success; 680 } 681 682 /** consumes all elements via a functor 683 * 684 * sequentially pops all elements from the queue and applies the functor on each object 685 * 686 * \returns number of elements that are consumed 687 * 688 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking 689 * */ 690 template <typename Functor> 691 size_type consume_all(Functor & f) 692 { 693 size_type element_count = 0; 694 while (consume_one(f)) 695 element_count += 1; 696 697 return element_count; 698 } 699 700 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs) 701 template <typename Functor> 702 size_type consume_all(Functor const & f) 703 { 704 size_type element_count = 0; 705 while (consume_one(f)) 706 element_count += 1; 707 708 return element_count; 709 } 710 }; 711 712 } /* namespace lockfree */ 713 } /* namespace boost */ 714 715 716 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */