/ src / common / threadsafe_queue.h
threadsafe_queue.h
  1  // Copyright 2010 Dolphin Emulator Project
  2  // Licensed under GPLv2+
  3  // Refer to the license.txt file included.
  4  
  5  #pragma once
  6  
  7  // a simple lockless thread-safe,
  8  // single reader, single writer queue
  9  
 10  #include <atomic>
 11  #include <condition_variable>
 12  #include <cstddef>
 13  #include <mutex>
 14  #include <utility>
 15  
 16  #include "common/polyfill_thread.h"
 17  
 18  namespace Common {
 19  template <typename T, bool with_stop_token = false>
 20  class SPSCQueue {
 21  public:
 22      SPSCQueue() {
 23          write_ptr = read_ptr = new ElementPtr();
 24      }
 25      ~SPSCQueue() {
 26          // this will empty out the whole queue
 27          delete read_ptr;
 28      }
 29  
 30      [[nodiscard]] std::size_t Size() const {
 31          return size.load();
 32      }
 33  
 34      [[nodiscard]] bool Empty() const {
 35          return Size() == 0;
 36      }
 37  
 38      [[nodiscard]] T& Front() const {
 39          return read_ptr->current;
 40      }
 41  
 42      template <typename Arg>
 43      void Push(Arg&& t) {
 44          // create the element, add it to the queue
 45          write_ptr->current = std::forward<Arg>(t);
 46          // set the next pointer to a new element ptr
 47          // then advance the write pointer
 48          ElementPtr* new_ptr = new ElementPtr();
 49          write_ptr->next.store(new_ptr, std::memory_order_release);
 50          write_ptr = new_ptr;
 51          ++size;
 52  
 53          // cv_mutex must be held or else there will be a missed wakeup if the other thread is in the
 54          // line before cv.wait
 55          // TODO(bunnei): This can be replaced with C++20 waitable atomics when properly supported.
 56          // See discussion on https://github.com/yuzu-emu/yuzu/pull/3173 for details.
 57          std::scoped_lock lock{cv_mutex};
 58          cv.notify_one();
 59      }
 60  
 61      void Pop() {
 62          --size;
 63  
 64          ElementPtr* tmpptr = read_ptr;
 65          // advance the read pointer
 66          read_ptr = tmpptr->next.load();
 67          // set the next element to nullptr to stop the recursive deletion
 68          tmpptr->next.store(nullptr);
 69          delete tmpptr; // this also deletes the element
 70      }
 71  
 72      bool Pop(T& t) {
 73          if (Empty())
 74              return false;
 75  
 76          --size;
 77  
 78          ElementPtr* tmpptr = read_ptr;
 79          read_ptr = tmpptr->next.load(std::memory_order_acquire);
 80          t = std::move(tmpptr->current);
 81          tmpptr->next.store(nullptr);
 82          delete tmpptr;
 83          return true;
 84      }
 85  
 86      T PopWait() {
 87          if (Empty()) {
 88              std::unique_lock lock{cv_mutex};
 89              cv.wait(lock, [this]() { return !Empty(); });
 90          }
 91          T t;
 92          Pop(t);
 93          return t;
 94      }
 95  
 96      T PopWait(std::stop_token stop_token) {
 97          if (Empty()) {
 98              std::unique_lock lock{cv_mutex};
 99              CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
100          }
101          if (stop_token.stop_requested()) {
102              return T{};
103          }
104          T t;
105          Pop(t);
106          return t;
107      }
108  
109      // not thread-safe
110      void Clear() {
111          size.store(0);
112          delete read_ptr;
113          write_ptr = read_ptr = new ElementPtr();
114      }
115  
116  private:
117      // stores a pointer to element
118      // and a pointer to the next ElementPtr
119      class ElementPtr {
120      public:
121          ElementPtr() = default;
122          ~ElementPtr() {
123              ElementPtr* next_ptr = next.load();
124  
125              if (next_ptr)
126                  delete next_ptr;
127          }
128  
129          T current;
130          std::atomic<ElementPtr*> next{nullptr};
131      };
132  
133      ElementPtr* write_ptr;
134      ElementPtr* read_ptr;
135      std::atomic_size_t size{0};
136      std::mutex cv_mutex;
137      std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv;
138  };
139  
140  // a simple thread-safe,
141  // single reader, multiple writer queue
142  
143  template <typename T, bool with_stop_token = false>
144  class MPSCQueue {
145  public:
146      [[nodiscard]] std::size_t Size() const {
147          return spsc_queue.Size();
148      }
149  
150      [[nodiscard]] bool Empty() const {
151          return spsc_queue.Empty();
152      }
153  
154      [[nodiscard]] T& Front() const {
155          return spsc_queue.Front();
156      }
157  
158      template <typename Arg>
159      void Push(Arg&& t) {
160          std::scoped_lock lock{write_lock};
161          spsc_queue.Push(t);
162      }
163  
164      void Pop() {
165          return spsc_queue.Pop();
166      }
167  
168      bool Pop(T& t) {
169          return spsc_queue.Pop(t);
170      }
171  
172      T PopWait() {
173          return spsc_queue.PopWait();
174      }
175  
176      T PopWait(std::stop_token stop_token) {
177          return spsc_queue.PopWait(stop_token);
178      }
179  
180      // not thread-safe
181      void Clear() {
182          spsc_queue.Clear();
183      }
184  
185  private:
186      SPSCQueue<T, with_stop_token> spsc_queue;
187      std::mutex write_lock;
188  };
189  } // namespace Common