/ src / checkqueue.h
checkqueue.h
  1  // Copyright (c) 2012-2022 The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #ifndef BITCOIN_CHECKQUEUE_H
  6  #define BITCOIN_CHECKQUEUE_H
  7  
  8  #include <sync.h>
  9  #include <tinyformat.h>
 10  #include <util/threadnames.h>
 11  
 12  #include <algorithm>
 13  #include <iterator>
 14  #include <vector>
 15  
 16  /**
 17   * Queue for verifications that have to be performed.
 18    * The verifications are represented by a type T, which must provide an
 19    * operator(), returning a bool.
 20    *
 21    * One thread (the master) is assumed to push batches of verifications
 22    * onto the queue, where they are processed by N-1 worker threads. When
 23    * the master is done adding work, it temporarily joins the worker pool
 24    * as an N'th worker, until all jobs are done.
 25    */
 26  template <typename T>
 27  class CCheckQueue
 28  {
 29  private:
 30      //! Mutex to protect the inner state
 31      Mutex m_mutex;
 32  
 33      //! Worker threads block on this when out of work
 34      std::condition_variable m_worker_cv;
 35  
 36      //! Master thread blocks on this when out of work
 37      std::condition_variable m_master_cv;
 38  
 39      //! The queue of elements to be processed.
 40      //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
 41      std::vector<T> queue GUARDED_BY(m_mutex);
 42  
 43      //! The number of workers (including the master) that are idle.
 44      int nIdle GUARDED_BY(m_mutex){0};
 45  
 46      //! The total number of workers (including the master).
 47      int nTotal GUARDED_BY(m_mutex){0};
 48  
 49      //! The temporary evaluation result.
 50      bool fAllOk GUARDED_BY(m_mutex){true};
 51  
 52      /**
 53       * Number of verifications that haven't completed yet.
 54       * This includes elements that are no longer queued, but still in the
 55       * worker's own batches.
 56       */
 57      unsigned int nTodo GUARDED_BY(m_mutex){0};
 58  
 59      //! The maximum number of elements to be processed in one batch
 60      const unsigned int nBatchSize;
 61  
 62      std::vector<std::thread> m_worker_threads;
 63      bool m_request_stop GUARDED_BY(m_mutex){false};
 64  
 65      /** Internal function that does bulk of the verification work. */
 66      bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
 67      {
 68          std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
 69          std::vector<T> vChecks;
 70          vChecks.reserve(nBatchSize);
 71          unsigned int nNow = 0;
 72          bool fOk = true;
 73          do {
 74              {
 75                  WAIT_LOCK(m_mutex, lock);
 76                  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
 77                  if (nNow) {
 78                      fAllOk &= fOk;
 79                      nTodo -= nNow;
 80                      if (nTodo == 0 && !fMaster)
 81                          // We processed the last element; inform the master it can exit and return the result
 82                          m_master_cv.notify_one();
 83                  } else {
 84                      // first iteration
 85                      nTotal++;
 86                  }
 87                  // logically, the do loop starts here
 88                  while (queue.empty() && !m_request_stop) {
 89                      if (fMaster && nTodo == 0) {
 90                          nTotal--;
 91                          bool fRet = fAllOk;
 92                          // reset the status for new work later
 93                          fAllOk = true;
 94                          // return the current status
 95                          return fRet;
 96                      }
 97                      nIdle++;
 98                      cond.wait(lock); // wait
 99                      nIdle--;
100                  }
101                  if (m_request_stop) {
102                      return false;
103                  }
104  
105                  // Decide how many work units to process now.
106                  // * Do not try to do everything at once, but aim for increasingly smaller batches so
107                  //   all workers finish approximately simultaneously.
108                  // * Try to account for idle jobs which will instantly start helping.
109                  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110                  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111                  auto start_it = queue.end() - nNow;
112                  vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
113                  queue.erase(start_it, queue.end());
114                  // Check whether we need to do work at all
115                  fOk = fAllOk;
116              }
117              // execute work
118              for (T& check : vChecks)
119                  if (fOk)
120                      fOk = check();
121              vChecks.clear();
122          } while (true);
123      }
124  
125  public:
126      //! Mutex to ensure only one concurrent CCheckQueueControl
127      Mutex m_control_mutex;
128  
129      //! Create a new check queue
130      explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
131          : nBatchSize(batch_size)
132      {
133          m_worker_threads.reserve(worker_threads_num);
134          for (int n = 0; n < worker_threads_num; ++n) {
135              m_worker_threads.emplace_back([this, n]() {
136                  util::ThreadRename(strprintf("scriptch.%i", n));
137                  Loop(false /* worker thread */);
138              });
139          }
140      }
141  
142      // Since this class manages its own resources, which is a thread
143      // pool `m_worker_threads`, copy and move operations are not appropriate.
144      CCheckQueue(const CCheckQueue&) = delete;
145      CCheckQueue& operator=(const CCheckQueue&) = delete;
146      CCheckQueue(CCheckQueue&&) = delete;
147      CCheckQueue& operator=(CCheckQueue&&) = delete;
148  
149      //! Wait until execution finishes, and return whether all evaluations were successful.
150      bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
151      {
152          return Loop(true /* master thread */);
153      }
154  
155      //! Add a batch of checks to the queue
156      void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
157      {
158          if (vChecks.empty()) {
159              return;
160          }
161  
162          {
163              LOCK(m_mutex);
164              queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
165              nTodo += vChecks.size();
166          }
167  
168          if (vChecks.size() == 1) {
169              m_worker_cv.notify_one();
170          } else {
171              m_worker_cv.notify_all();
172          }
173      }
174  
175      ~CCheckQueue()
176      {
177          WITH_LOCK(m_mutex, m_request_stop = true);
178          m_worker_cv.notify_all();
179          for (std::thread& t : m_worker_threads) {
180              t.join();
181          }
182      }
183  
184      bool HasThreads() const { return !m_worker_threads.empty(); }
185  };
186  
187  /**
188   * RAII-style controller object for a CCheckQueue that guarantees the passed
189   * queue is finished before continuing.
190   */
191  template <typename T>
192  class CCheckQueueControl
193  {
194  private:
195      CCheckQueue<T> * const pqueue;
196      bool fDone;
197  
198  public:
199      CCheckQueueControl() = delete;
200      CCheckQueueControl(const CCheckQueueControl&) = delete;
201      CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
202      explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
203      {
204          // passed queue is supposed to be unused, or nullptr
205          if (pqueue != nullptr) {
206              ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
207          }
208      }
209  
210      bool Wait()
211      {
212          if (pqueue == nullptr)
213              return true;
214          bool fRet = pqueue->Wait();
215          fDone = true;
216          return fRet;
217      }
218  
219      void Add(std::vector<T>&& vChecks)
220      {
221          if (pqueue != nullptr) {
222              pqueue->Add(std::move(vChecks));
223          }
224      }
225  
226      ~CCheckQueueControl()
227      {
228          if (!fDone)
229              Wait();
230          if (pqueue != nullptr) {
231              LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
232          }
233      }
234  };
235  
236  #endif // BITCOIN_CHECKQUEUE_H