threadsafe_queue.h
1 // Copyright 2010 Dolphin Emulator Project 2 // Licensed under GPLv2+ 3 // Refer to the license.txt file included. 4 5 #pragma once 6 7 // a simple lockless thread-safe, 8 // single reader, single writer queue 9 10 #include <atomic> 11 #include <condition_variable> 12 #include <cstddef> 13 #include <mutex> 14 #include <utility> 15 16 #include "common/polyfill_thread.h" 17 18 namespace Common { 19 template <typename T, bool with_stop_token = false> 20 class SPSCQueue { 21 public: 22 SPSCQueue() { 23 write_ptr = read_ptr = new ElementPtr(); 24 } 25 ~SPSCQueue() { 26 // this will empty out the whole queue 27 delete read_ptr; 28 } 29 30 [[nodiscard]] std::size_t Size() const { 31 return size.load(); 32 } 33 34 [[nodiscard]] bool Empty() const { 35 return Size() == 0; 36 } 37 38 [[nodiscard]] T& Front() const { 39 return read_ptr->current; 40 } 41 42 template <typename Arg> 43 void Push(Arg&& t) { 44 // create the element, add it to the queue 45 write_ptr->current = std::forward<Arg>(t); 46 // set the next pointer to a new element ptr 47 // then advance the write pointer 48 ElementPtr* new_ptr = new ElementPtr(); 49 write_ptr->next.store(new_ptr, std::memory_order_release); 50 write_ptr = new_ptr; 51 ++size; 52 53 // cv_mutex must be held or else there will be a missed wakeup if the other thread is in the 54 // line before cv.wait 55 // TODO(bunnei): This can be replaced with C++20 waitable atomics when properly supported. 56 // See discussion on https://github.com/yuzu-emu/yuzu/pull/3173 for details. 57 std::scoped_lock lock{cv_mutex}; 58 cv.notify_one(); 59 } 60 61 void Pop() { 62 --size; 63 64 ElementPtr* tmpptr = read_ptr; 65 // advance the read pointer 66 read_ptr = tmpptr->next.load(); 67 // set the next element to nullptr to stop the recursive deletion 68 tmpptr->next.store(nullptr); 69 delete tmpptr; // this also deletes the element 70 } 71 72 bool Pop(T& t) { 73 if (Empty()) 74 return false; 75 76 --size; 77 78 ElementPtr* tmpptr = read_ptr; 79 read_ptr = tmpptr->next.load(std::memory_order_acquire); 80 t = std::move(tmpptr->current); 81 tmpptr->next.store(nullptr); 82 delete tmpptr; 83 return true; 84 } 85 86 T PopWait() { 87 if (Empty()) { 88 std::unique_lock lock{cv_mutex}; 89 cv.wait(lock, [this]() { return !Empty(); }); 90 } 91 T t; 92 Pop(t); 93 return t; 94 } 95 96 T PopWait(std::stop_token stop_token) { 97 if (Empty()) { 98 std::unique_lock lock{cv_mutex}; 99 CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); 100 } 101 if (stop_token.stop_requested()) { 102 return T{}; 103 } 104 T t; 105 Pop(t); 106 return t; 107 } 108 109 // not thread-safe 110 void Clear() { 111 size.store(0); 112 delete read_ptr; 113 write_ptr = read_ptr = new ElementPtr(); 114 } 115 116 private: 117 // stores a pointer to element 118 // and a pointer to the next ElementPtr 119 class ElementPtr { 120 public: 121 ElementPtr() = default; 122 ~ElementPtr() { 123 ElementPtr* next_ptr = next.load(); 124 125 if (next_ptr) 126 delete next_ptr; 127 } 128 129 T current; 130 std::atomic<ElementPtr*> next{nullptr}; 131 }; 132 133 ElementPtr* write_ptr; 134 ElementPtr* read_ptr; 135 std::atomic_size_t size{0}; 136 std::mutex cv_mutex; 137 std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv; 138 }; 139 140 // a simple thread-safe, 141 // single reader, multiple writer queue 142 143 template <typename T, bool with_stop_token = false> 144 class MPSCQueue { 145 public: 146 [[nodiscard]] std::size_t Size() const { 147 return spsc_queue.Size(); 148 } 149 150 [[nodiscard]] bool Empty() const { 151 return spsc_queue.Empty(); 152 } 153 154 [[nodiscard]] T& Front() const { 155 return spsc_queue.Front(); 156 } 157 158 template <typename Arg> 159 void Push(Arg&& t) { 160 std::scoped_lock lock{write_lock}; 161 spsc_queue.Push(t); 162 } 163 164 void Pop() { 165 return spsc_queue.Pop(); 166 } 167 168 bool Pop(T& t) { 169 return spsc_queue.Pop(t); 170 } 171 172 T PopWait() { 173 return spsc_queue.PopWait(); 174 } 175 176 T PopWait(std::stop_token stop_token) { 177 return spsc_queue.PopWait(stop_token); 178 } 179 180 // not thread-safe 181 void Clear() { 182 spsc_queue.Clear(); 183 } 184 185 private: 186 SPSCQueue<T, with_stop_token> spsc_queue; 187 std::mutex write_lock; 188 }; 189 } // namespace Common