checkqueue.h
1 // Copyright (c) 2012-2022 The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #ifndef BITCOIN_CHECKQUEUE_H 6 #define BITCOIN_CHECKQUEUE_H 7 8 #include <sync.h> 9 #include <tinyformat.h> 10 #include <util/threadnames.h> 11 12 #include <algorithm> 13 #include <iterator> 14 #include <vector> 15 16 /** 17 * Queue for verifications that have to be performed. 18 * The verifications are represented by a type T, which must provide an 19 * operator(), returning a bool. 20 * 21 * One thread (the master) is assumed to push batches of verifications 22 * onto the queue, where they are processed by N-1 worker threads. When 23 * the master is done adding work, it temporarily joins the worker pool 24 * as an N'th worker, until all jobs are done. 25 */ 26 template <typename T> 27 class CCheckQueue 28 { 29 private: 30 //! Mutex to protect the inner state 31 Mutex m_mutex; 32 33 //! Worker threads block on this when out of work 34 std::condition_variable m_worker_cv; 35 36 //! Master thread blocks on this when out of work 37 std::condition_variable m_master_cv; 38 39 //! The queue of elements to be processed. 40 //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 41 std::vector<T> queue GUARDED_BY(m_mutex); 42 43 //! The number of workers (including the master) that are idle. 44 int nIdle GUARDED_BY(m_mutex){0}; 45 46 //! The total number of workers (including the master). 47 int nTotal GUARDED_BY(m_mutex){0}; 48 49 //! The temporary evaluation result. 50 bool fAllOk GUARDED_BY(m_mutex){true}; 51 52 /** 53 * Number of verifications that haven't completed yet. 54 * This includes elements that are no longer queued, but still in the 55 * worker's own batches. 56 */ 57 unsigned int nTodo GUARDED_BY(m_mutex){0}; 58 59 //! The maximum number of elements to be processed in one batch 60 const unsigned int nBatchSize; 61 62 std::vector<std::thread> m_worker_threads; 63 bool m_request_stop GUARDED_BY(m_mutex){false}; 64 65 /** Internal function that does bulk of the verification work. */ 66 bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 67 { 68 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 69 std::vector<T> vChecks; 70 vChecks.reserve(nBatchSize); 71 unsigned int nNow = 0; 72 bool fOk = true; 73 do { 74 { 75 WAIT_LOCK(m_mutex, lock); 76 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 77 if (nNow) { 78 fAllOk &= fOk; 79 nTodo -= nNow; 80 if (nTodo == 0 && !fMaster) 81 // We processed the last element; inform the master it can exit and return the result 82 m_master_cv.notify_one(); 83 } else { 84 // first iteration 85 nTotal++; 86 } 87 // logically, the do loop starts here 88 while (queue.empty() && !m_request_stop) { 89 if (fMaster && nTodo == 0) { 90 nTotal--; 91 bool fRet = fAllOk; 92 // reset the status for new work later 93 fAllOk = true; 94 // return the current status 95 return fRet; 96 } 97 nIdle++; 98 cond.wait(lock); // wait 99 nIdle--; 100 } 101 if (m_request_stop) { 102 return false; 103 } 104 105 // Decide how many work units to process now. 106 // * Do not try to do everything at once, but aim for increasingly smaller batches so 107 // all workers finish approximately simultaneously. 108 // * Try to account for idle jobs which will instantly start helping. 109 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 110 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 111 auto start_it = queue.end() - nNow; 112 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); 113 queue.erase(start_it, queue.end()); 114 // Check whether we need to do work at all 115 fOk = fAllOk; 116 } 117 // execute work 118 for (T& check : vChecks) 119 if (fOk) 120 fOk = check(); 121 vChecks.clear(); 122 } while (true); 123 } 124 125 public: 126 //! Mutex to ensure only one concurrent CCheckQueueControl 127 Mutex m_control_mutex; 128 129 //! Create a new check queue 130 explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) 131 : nBatchSize(batch_size) 132 { 133 m_worker_threads.reserve(worker_threads_num); 134 for (int n = 0; n < worker_threads_num; ++n) { 135 m_worker_threads.emplace_back([this, n]() { 136 util::ThreadRename(strprintf("scriptch.%i", n)); 137 Loop(false /* worker thread */); 138 }); 139 } 140 } 141 142 // Since this class manages its own resources, which is a thread 143 // pool `m_worker_threads`, copy and move operations are not appropriate. 144 CCheckQueue(const CCheckQueue&) = delete; 145 CCheckQueue& operator=(const CCheckQueue&) = delete; 146 CCheckQueue(CCheckQueue&&) = delete; 147 CCheckQueue& operator=(CCheckQueue&&) = delete; 148 149 //! Wait until execution finishes, and return whether all evaluations were successful. 150 bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 151 { 152 return Loop(true /* master thread */); 153 } 154 155 //! Add a batch of checks to the queue 156 void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 157 { 158 if (vChecks.empty()) { 159 return; 160 } 161 162 { 163 LOCK(m_mutex); 164 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); 165 nTodo += vChecks.size(); 166 } 167 168 if (vChecks.size() == 1) { 169 m_worker_cv.notify_one(); 170 } else { 171 m_worker_cv.notify_all(); 172 } 173 } 174 175 ~CCheckQueue() 176 { 177 WITH_LOCK(m_mutex, m_request_stop = true); 178 m_worker_cv.notify_all(); 179 for (std::thread& t : m_worker_threads) { 180 t.join(); 181 } 182 } 183 184 bool HasThreads() const { return !m_worker_threads.empty(); } 185 }; 186 187 /** 188 * RAII-style controller object for a CCheckQueue that guarantees the passed 189 * queue is finished before continuing. 190 */ 191 template <typename T> 192 class CCheckQueueControl 193 { 194 private: 195 CCheckQueue<T> * const pqueue; 196 bool fDone; 197 198 public: 199 CCheckQueueControl() = delete; 200 CCheckQueueControl(const CCheckQueueControl&) = delete; 201 CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 202 explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 203 { 204 // passed queue is supposed to be unused, or nullptr 205 if (pqueue != nullptr) { 206 ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 207 } 208 } 209 210 bool Wait() 211 { 212 if (pqueue == nullptr) 213 return true; 214 bool fRet = pqueue->Wait(); 215 fDone = true; 216 return fRet; 217 } 218 219 void Add(std::vector<T>&& vChecks) 220 { 221 if (pqueue != nullptr) { 222 pqueue->Add(std::move(vChecks)); 223 } 224 } 225 226 ~CCheckQueueControl() 227 { 228 if (!fDone) 229 Wait(); 230 if (pqueue != nullptr) { 231 LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 232 } 233 } 234 }; 235 236 #endif // BITCOIN_CHECKQUEUE_H