/ src / checkqueue.h
checkqueue.h
  1  // Copyright (c) 2012-present The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #ifndef BITCOIN_CHECKQUEUE_H
  6  #define BITCOIN_CHECKQUEUE_H
  7  
  8  #include <sync.h>
  9  #include <tinyformat.h>
 10  #include <util/log.h>
 11  #include <util/threadnames.h>
 12  
 13  #include <algorithm>
 14  #include <iterator>
 15  #include <optional>
 16  #include <vector>
 17  
 18  /**
 19   * Queue for verifications that have to be performed.
 20    * The verifications are represented by a type T, which must provide an
 21    * operator(), returning an std::optional<R>.
 22    *
 23    * The overall result of the computation is std::nullopt if all invocations
 24    * return std::nullopt, or one of the other results otherwise.
 25    *
 26    * One thread (the master) is assumed to push batches of verifications
 27    * onto the queue, where they are processed by N-1 worker threads. When
 28    * the master is done adding work, it temporarily joins the worker pool
 29    * as an N'th worker, until all jobs are done.
 30    *
 31    */
 32  template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
 33  class CCheckQueue
 34  {
 35  private:
 36      //! Mutex to protect the inner state
 37      Mutex m_mutex;
 38  
 39      //! Worker threads block on this when out of work
 40      std::condition_variable m_worker_cv;
 41  
 42      //! Master thread blocks on this when out of work
 43      std::condition_variable m_master_cv;
 44  
 45      //! The queue of elements to be processed.
 46      //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
 47      std::vector<T> queue GUARDED_BY(m_mutex);
 48  
 49      //! The number of workers (including the master) that are idle.
 50      int nIdle GUARDED_BY(m_mutex){0};
 51  
 52      //! The total number of workers (including the master).
 53      int nTotal GUARDED_BY(m_mutex){0};
 54  
 55      //! The temporary evaluation result.
 56      std::optional<R> m_result GUARDED_BY(m_mutex);
 57  
 58      /**
 59       * Number of verifications that haven't completed yet.
 60       * This includes elements that are no longer queued, but still in the
 61       * worker's own batches.
 62       */
 63      unsigned int nTodo GUARDED_BY(m_mutex){0};
 64  
 65      //! The maximum number of elements to be processed in one batch
 66      const unsigned int nBatchSize;
 67  
 68      std::vector<std::thread> m_worker_threads;
 69      bool m_request_stop GUARDED_BY(m_mutex){false};
 70  
 71      /** Internal function that does bulk of the verification work. If fMaster, return the final result. */
 72      std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
 73      {
 74          std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
 75          std::vector<T> vChecks;
 76          vChecks.reserve(nBatchSize);
 77          unsigned int nNow = 0;
 78          std::optional<R> local_result;
 79          bool do_work;
 80          do {
 81              {
 82                  WAIT_LOCK(m_mutex, lock);
 83                  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
 84                  if (nNow) {
 85                      if (local_result.has_value() && !m_result.has_value()) {
 86                          std::swap(local_result, m_result);
 87                      }
 88                      nTodo -= nNow;
 89                      if (nTodo == 0 && !fMaster) {
 90                          // We processed the last element; inform the master it can exit and return the result
 91                          m_master_cv.notify_one();
 92                      }
 93                  } else {
 94                      // first iteration
 95                      nTotal++;
 96                  }
 97                  // logically, the do loop starts here
 98                  while (queue.empty() && !m_request_stop) {
 99                      if (fMaster && nTodo == 0) {
100                          nTotal--;
101                          std::optional<R> to_return = std::move(m_result);
102                          // reset the status for new work later
103                          m_result = std::nullopt;
104                          // return the current status
105                          return to_return;
106                      }
107                      nIdle++;
108                      cond.wait(lock); // wait
109                      nIdle--;
110                  }
111                  if (m_request_stop) {
112                      // return value does not matter, because m_request_stop is only set in the destructor.
113                      return std::nullopt;
114                  }
115  
116                  // Decide how many work units to process now.
117                  // * Do not try to do everything at once, but aim for increasingly smaller batches so
118                  //   all workers finish approximately simultaneously.
119                  // * Try to account for idle jobs which will instantly start helping.
120                  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
121                  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
122                  auto start_it = queue.end() - nNow;
123                  vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
124                  queue.erase(start_it, queue.end());
125                  // Check whether we need to do work at all
126                  do_work = !m_result.has_value();
127              }
128              // execute work
129              if (do_work) {
130                  for (T& check : vChecks) {
131                      local_result = check();
132                      if (local_result.has_value()) break;
133                  }
134              }
135              vChecks.clear();
136          } while (true);
137      }
138  
139  public:
140      //! Mutex to ensure only one concurrent CCheckQueueControl
141      Mutex m_control_mutex;
142  
143      //! Create a new check queue
144      explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
145          : nBatchSize(batch_size)
146      {
147          LogInfo("Script verification uses %d additional threads", worker_threads_num);
148          m_worker_threads.reserve(worker_threads_num);
149          for (int n = 0; n < worker_threads_num; ++n) {
150              m_worker_threads.emplace_back([this, n]() {
151                  util::ThreadRename(strprintf("scriptch.%i", n));
152                  Loop(false /* worker thread */);
153              });
154          }
155      }
156  
157      // Since this class manages its own resources, which is a thread
158      // pool `m_worker_threads`, copy and move operations are not appropriate.
159      CCheckQueue(const CCheckQueue&) = delete;
160      CCheckQueue& operator=(const CCheckQueue&) = delete;
161      CCheckQueue(CCheckQueue&&) = delete;
162      CCheckQueue& operator=(CCheckQueue&&) = delete;
163  
164      //! Join the execution until completion. If at least one evaluation wasn't successful, return
165      //! its error.
166      std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
167      {
168          return Loop(true /* master thread */);
169      }
170  
171      //! Add a batch of checks to the queue
172      void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
173      {
174          if (vChecks.empty()) {
175              return;
176          }
177  
178          {
179              LOCK(m_mutex);
180              queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
181              nTodo += vChecks.size();
182          }
183  
184          if (vChecks.size() == 1) {
185              m_worker_cv.notify_one();
186          } else {
187              m_worker_cv.notify_all();
188          }
189      }
190  
191      ~CCheckQueue()
192      {
193          WITH_LOCK(m_mutex, m_request_stop = true);
194          m_worker_cv.notify_all();
195          for (std::thread& t : m_worker_threads) {
196              t.join();
197          }
198      }
199  
200      bool HasThreads() const { return !m_worker_threads.empty(); }
201  };
202  
203  /**
204   * RAII-style controller object for a CCheckQueue that guarantees the passed
205   * queue is finished before continuing.
206   */
207  template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
208  class SCOPED_LOCKABLE CCheckQueueControl
209  {
210  private:
211      CCheckQueue<T, R>& m_queue;
212      UniqueLock<Mutex> m_lock;
213      bool fDone;
214  
215  public:
216      CCheckQueueControl() = delete;
217      CCheckQueueControl(const CCheckQueueControl&) = delete;
218      CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
219      explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {}
220  
221      std::optional<R> Complete()
222      {
223          auto ret = m_queue.Complete();
224          fDone = true;
225          return ret;
226      }
227  
228      void Add(std::vector<T>&& vChecks)
229      {
230          m_queue.Add(std::move(vChecks));
231      }
232  
233      ~CCheckQueueControl() UNLOCK_FUNCTION()
234      {
235          if (!fDone)
236              Complete();
237      }
238  };
239  
240  #endif // BITCOIN_CHECKQUEUE_H