checkqueue.h
1 // Copyright (c) 2012-present The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #ifndef BITCOIN_CHECKQUEUE_H 6 #define BITCOIN_CHECKQUEUE_H 7 8 #include <sync.h> 9 #include <tinyformat.h> 10 #include <util/log.h> 11 #include <util/threadnames.h> 12 13 #include <algorithm> 14 #include <iterator> 15 #include <optional> 16 #include <vector> 17 18 /** 19 * Queue for verifications that have to be performed. 20 * The verifications are represented by a type T, which must provide an 21 * operator(), returning an std::optional<R>. 22 * 23 * The overall result of the computation is std::nullopt if all invocations 24 * return std::nullopt, or one of the other results otherwise. 25 * 26 * One thread (the master) is assumed to push batches of verifications 27 * onto the queue, where they are processed by N-1 worker threads. When 28 * the master is done adding work, it temporarily joins the worker pool 29 * as an N'th worker, until all jobs are done. 30 * 31 */ 32 template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>> 33 class CCheckQueue 34 { 35 private: 36 //! Mutex to protect the inner state 37 Mutex m_mutex; 38 39 //! Worker threads block on this when out of work 40 std::condition_variable m_worker_cv; 41 42 //! Master thread blocks on this when out of work 43 std::condition_variable m_master_cv; 44 45 //! The queue of elements to be processed. 46 //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 47 std::vector<T> queue GUARDED_BY(m_mutex); 48 49 //! The number of workers (including the master) that are idle. 50 int nIdle GUARDED_BY(m_mutex){0}; 51 52 //! The total number of workers (including the master). 53 int nTotal GUARDED_BY(m_mutex){0}; 54 55 //! The temporary evaluation result. 56 std::optional<R> m_result GUARDED_BY(m_mutex); 57 58 /** 59 * Number of verifications that haven't completed yet. 60 * This includes elements that are no longer queued, but still in the 61 * worker's own batches. 62 */ 63 unsigned int nTodo GUARDED_BY(m_mutex){0}; 64 65 //! The maximum number of elements to be processed in one batch 66 const unsigned int nBatchSize; 67 68 std::vector<std::thread> m_worker_threads; 69 bool m_request_stop GUARDED_BY(m_mutex){false}; 70 71 /** Internal function that does bulk of the verification work. If fMaster, return the final result. */ 72 std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 73 { 74 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 75 std::vector<T> vChecks; 76 vChecks.reserve(nBatchSize); 77 unsigned int nNow = 0; 78 std::optional<R> local_result; 79 bool do_work; 80 do { 81 { 82 WAIT_LOCK(m_mutex, lock); 83 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 84 if (nNow) { 85 if (local_result.has_value() && !m_result.has_value()) { 86 std::swap(local_result, m_result); 87 } 88 nTodo -= nNow; 89 if (nTodo == 0 && !fMaster) { 90 // We processed the last element; inform the master it can exit and return the result 91 m_master_cv.notify_one(); 92 } 93 } else { 94 // first iteration 95 nTotal++; 96 } 97 // logically, the do loop starts here 98 while (queue.empty() && !m_request_stop) { 99 if (fMaster && nTodo == 0) { 100 nTotal--; 101 std::optional<R> to_return = std::move(m_result); 102 // reset the status for new work later 103 m_result = std::nullopt; 104 // return the current status 105 return to_return; 106 } 107 nIdle++; 108 cond.wait(lock); // wait 109 nIdle--; 110 } 111 if (m_request_stop) { 112 // return value does not matter, because m_request_stop is only set in the destructor. 113 return std::nullopt; 114 } 115 116 // Decide how many work units to process now. 117 // * Do not try to do everything at once, but aim for increasingly smaller batches so 118 // all workers finish approximately simultaneously. 119 // * Try to account for idle jobs which will instantly start helping. 120 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 121 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 122 auto start_it = queue.end() - nNow; 123 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); 124 queue.erase(start_it, queue.end()); 125 // Check whether we need to do work at all 126 do_work = !m_result.has_value(); 127 } 128 // execute work 129 if (do_work) { 130 for (T& check : vChecks) { 131 local_result = check(); 132 if (local_result.has_value()) break; 133 } 134 } 135 vChecks.clear(); 136 } while (true); 137 } 138 139 public: 140 //! Mutex to ensure only one concurrent CCheckQueueControl 141 Mutex m_control_mutex; 142 143 //! Create a new check queue 144 explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) 145 : nBatchSize(batch_size) 146 { 147 LogInfo("Script verification uses %d additional threads", worker_threads_num); 148 m_worker_threads.reserve(worker_threads_num); 149 for (int n = 0; n < worker_threads_num; ++n) { 150 m_worker_threads.emplace_back([this, n]() { 151 util::ThreadRename(strprintf("scriptch.%i", n)); 152 Loop(false /* worker thread */); 153 }); 154 } 155 } 156 157 // Since this class manages its own resources, which is a thread 158 // pool `m_worker_threads`, copy and move operations are not appropriate. 159 CCheckQueue(const CCheckQueue&) = delete; 160 CCheckQueue& operator=(const CCheckQueue&) = delete; 161 CCheckQueue(CCheckQueue&&) = delete; 162 CCheckQueue& operator=(CCheckQueue&&) = delete; 163 164 //! Join the execution until completion. If at least one evaluation wasn't successful, return 165 //! its error. 166 std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 167 { 168 return Loop(true /* master thread */); 169 } 170 171 //! Add a batch of checks to the queue 172 void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 173 { 174 if (vChecks.empty()) { 175 return; 176 } 177 178 { 179 LOCK(m_mutex); 180 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); 181 nTodo += vChecks.size(); 182 } 183 184 if (vChecks.size() == 1) { 185 m_worker_cv.notify_one(); 186 } else { 187 m_worker_cv.notify_all(); 188 } 189 } 190 191 ~CCheckQueue() 192 { 193 WITH_LOCK(m_mutex, m_request_stop = true); 194 m_worker_cv.notify_all(); 195 for (std::thread& t : m_worker_threads) { 196 t.join(); 197 } 198 } 199 200 bool HasThreads() const { return !m_worker_threads.empty(); } 201 }; 202 203 /** 204 * RAII-style controller object for a CCheckQueue that guarantees the passed 205 * queue is finished before continuing. 206 */ 207 template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>> 208 class SCOPED_LOCKABLE CCheckQueueControl 209 { 210 private: 211 CCheckQueue<T, R>& m_queue; 212 UniqueLock<Mutex> m_lock; 213 bool fDone; 214 215 public: 216 CCheckQueueControl() = delete; 217 CCheckQueueControl(const CCheckQueueControl&) = delete; 218 CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 219 explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {} 220 221 std::optional<R> Complete() 222 { 223 auto ret = m_queue.Complete(); 224 fDone = true; 225 return ret; 226 } 227 228 void Add(std::vector<T>&& vChecks) 229 { 230 m_queue.Add(std::move(vChecks)); 231 } 232 233 ~CCheckQueueControl() UNLOCK_FUNCTION() 234 { 235 if (!fDone) 236 Complete(); 237 } 238 }; 239 240 #endif // BITCOIN_CHECKQUEUE_H