/ src / test / threadpool_tests.cpp
threadpool_tests.cpp
  1  // Copyright (c) The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <common/system.h>
  6  #include <logging.h>
  7  #include <random.h>
  8  #include <test/util/common.h>
  9  #include <util/string.h>
 10  #include <util/threadpool.h>
 11  #include <util/time.h>
 12  
 13  #include <boost/test/unit_test.hpp>
 14  
 15  #include <array>
 16  #include <functional>
 17  #include <latch>
 18  #include <ranges>
 19  #include <semaphore>
 20  
 21  // General test values
 22  int NUM_WORKERS_DEFAULT = 0;
 23  constexpr char POOL_NAME[] = "test";
 24  constexpr auto WAIT_TIMEOUT = 120s;
 25  
 26  struct ThreadPoolFixture {
 27      ThreadPoolFixture() {
 28          NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1;
 29          LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
 30      }
 31  };
 32  
 33  // Test Cases Overview
 34  // 0) Submit task to a non-started pool.
 35  // 1) Submit tasks and verify completion.
 36  // 2) Maintain all threads busy except one.
 37  // 3) Wait for work to finish.
 38  // 4) Wait for result object.
 39  // 5) The task throws an exception, catch must be done in the consumer side.
 40  // 6) Busy workers, help them by processing tasks externally.
 41  // 7) Recursive submission of tasks.
 42  // 8) Submit task when all threads are busy, stop pool and verify task gets executed.
 43  // 9) Congestion test; create more workers than available cores.
 44  // 10) Ensure Interrupt() prevents further submissions.
 45  // 11) Start() must not cause a deadlock when called during Stop().
 46  // 12) Ensure queued tasks complete after Interrupt().
 47  // 13) Ensure the Stop() calling thread helps drain the queue.
 48  // 14) Submit range of tasks in one lock acquisition.
 49  BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
 50  
 51  #define WAIT_FOR(futures)                                                         \
 52      do {                                                                          \
 53          for (const auto& f : futures) {                                           \
 54              BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
 55          }                                                                         \
 56      } while (0)
 57  
 58  // Helper to unwrap a valid pool submission
 59  template <typename F>
 60  [[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
 61  {
 62      return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
 63  }
 64  
 65  // Block a number of worker threads by submitting tasks that wait on `release_sem`.
 66  // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
 67  std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
 68  {
 69      assert(threadPool.WorkersCount() >= num_of_threads_to_block);
 70      std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
 71      std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
 72      for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
 73          ready.count_down();
 74          release_sem.acquire();
 75      });
 76      ready.wait();
 77      return blocking_tasks;
 78  }
 79  
 80  // Test 0, submit task to a non-started, interrupted, or stopped pool
 81  BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
 82  {
 83      ThreadPool threadPool(POOL_NAME);
 84      const auto fn_empty = [&] {};
 85  
 86      // Never started: Inactive
 87      auto res = threadPool.Submit(fn_empty);
 88      BOOST_CHECK(!res);
 89      BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
 90  
 91      // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
 92      std::counting_semaphore<> blocker(0);
 93      threadPool.Start(NUM_WORKERS_DEFAULT);
 94      const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
 95      threadPool.Interrupt();
 96      res = threadPool.Submit(fn_empty);
 97      BOOST_CHECK(!res);
 98      BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
 99      BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
100      blocker.release(NUM_WORKERS_DEFAULT);
101      WAIT_FOR(blocking_tasks);
102  
103      // Interrupted then stopped: Inactive
104      threadPool.Stop();
105      res = threadPool.Submit(fn_empty);
106      BOOST_CHECK(!res);
107      BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
108  
109      // Started then stopped: Inactive
110      threadPool.Start(NUM_WORKERS_DEFAULT);
111      threadPool.Stop();
112      res = threadPool.Submit(fn_empty);
113      BOOST_CHECK(!res);
114      BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
115  
116      std::vector<std::function<void()>> tasks;
117      const auto range_res{threadPool.Submit(std::move(tasks))};
118      BOOST_CHECK(!range_res);
119      BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
120  }
121  
122  // Test 1, submit tasks and verify completion
123  BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
124  {
125      int num_tasks = 50;
126  
127      ThreadPool threadPool(POOL_NAME);
128      threadPool.Start(NUM_WORKERS_DEFAULT);
129      std::atomic<int> counter = 0;
130  
131      // Store futures to ensure completion before checking counter.
132      std::vector<std::future<void>> futures;
133      futures.reserve(num_tasks);
134      for (int i = 1; i <= num_tasks; i++) {
135          futures.emplace_back(Submit(threadPool, [&counter, i]() {
136              counter.fetch_add(i, std::memory_order_relaxed);
137          }));
138      }
139  
140      // Wait for all tasks to finish
141      WAIT_FOR(futures);
142      int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
143      BOOST_CHECK_EQUAL(counter.load(), expected_value);
144      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
145  }
146  
147  // Test 2, maintain all threads busy except one
148  BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
149  {
150      ThreadPool threadPool(POOL_NAME);
151      threadPool.Start(NUM_WORKERS_DEFAULT);
152      std::counting_semaphore<> blocker(0);
153      const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
154  
155      // Now execute tasks on the single available worker
156      // and check that all the tasks are executed.
157      int num_tasks = 15;
158      int counter = 0;
159  
160      // Store futures to wait on
161      std::vector<std::future<void>> futures(num_tasks);
162      for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
163  
164      WAIT_FOR(futures);
165      BOOST_CHECK_EQUAL(counter, num_tasks);
166  
167      blocker.release(NUM_WORKERS_DEFAULT - 1);
168      WAIT_FOR(blocking_tasks);
169      threadPool.Stop();
170      BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
171  }
172  
173  // Test 3, wait for work to finish
174  BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
175  {
176      ThreadPool threadPool(POOL_NAME);
177      threadPool.Start(NUM_WORKERS_DEFAULT);
178      std::atomic<bool> flag = false;
179      std::future<void> future = Submit(threadPool, [&flag]() {
180          UninterruptibleSleep(200ms);
181          flag.store(true, std::memory_order_release);
182      });
183      BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
184      BOOST_CHECK(flag.load(std::memory_order_acquire));
185  }
186  
187  // Test 4, obtain result object
188  BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
189  {
190      ThreadPool threadPool(POOL_NAME);
191      threadPool.Start(NUM_WORKERS_DEFAULT);
192      std::future<bool> future_bool = Submit(threadPool, []() { return true; });
193      BOOST_CHECK(future_bool.get());
194  
195      std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
196      std::string result = future_str.get();
197      BOOST_CHECK_EQUAL(result, "true");
198  }
199  
200  // Test 5, throw exception and catch it on the consumer side
201  BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
202  {
203      ThreadPool threadPool(POOL_NAME);
204      threadPool.Start(NUM_WORKERS_DEFAULT);
205  
206      const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
207  
208      const int num_tasks = 5;
209      std::vector<std::future<void>> futures;
210      futures.reserve(num_tasks);
211      for (int i = 0; i < num_tasks; i++) {
212          futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
213      }
214  
215      for (int i = 0; i < num_tasks; i++) {
216          BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
217      }
218  }
219  
220  // Test 6, all workers are busy, help them by processing tasks from outside
221  BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
222  {
223      ThreadPool threadPool(POOL_NAME);
224      threadPool.Start(NUM_WORKERS_DEFAULT);
225  
226      std::counting_semaphore<> blocker(0);
227      const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
228  
229      // Now submit tasks and check that none of them are executed.
230      int num_tasks = 20;
231      std::atomic<int> counter = 0;
232      for (int i = 0; i < num_tasks; i++) {
233          (void)Submit(threadPool, [&counter]() {
234              counter.fetch_add(1, std::memory_order_relaxed);
235          });
236      }
237      UninterruptibleSleep(100ms);
238      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
239  
240      // Now process manually
241      for (int i = 0; i < num_tasks; i++) {
242          threadPool.ProcessTask();
243      }
244      BOOST_CHECK_EQUAL(counter.load(), num_tasks);
245      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
246      blocker.release(NUM_WORKERS_DEFAULT);
247      threadPool.Stop();
248      WAIT_FOR(blocking_tasks);
249  }
250  
251  // Test 7, submit tasks from other tasks
252  BOOST_AUTO_TEST_CASE(recursive_task_submission)
253  {
254      ThreadPool threadPool(POOL_NAME);
255      threadPool.Start(NUM_WORKERS_DEFAULT);
256  
257      std::promise<void> signal;
258      (void)Submit(threadPool, [&]() {
259          (void)Submit(threadPool, [&]() {
260              signal.set_value();
261          });
262      });
263  
264      signal.get_future().wait();
265      threadPool.Stop();
266  }
267  
268  // Test 8, submit task when all threads are busy and then stop the pool
269  BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
270  {
271      ThreadPool threadPool(POOL_NAME);
272      threadPool.Start(NUM_WORKERS_DEFAULT);
273  
274      std::counting_semaphore<> blocker(0);
275      const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
276  
277      // Submit an extra task that should execute once a worker is free
278      std::future<bool> future = Submit(threadPool, []() { return true; });
279  
280      // At this point, all workers are blocked, and the extra task is queued
281      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
282  
283      // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
284      std::thread thread_unblocker([&blocker]() {
285          UninterruptibleSleep(300ms);
286          blocker.release(NUM_WORKERS_DEFAULT);
287      });
288  
289      // Stop the pool while the workers are still blocked
290      threadPool.Stop();
291  
292      // Expect the submitted task to complete
293      BOOST_CHECK(future.get());
294      thread_unblocker.join();
295  
296      // Obviously all the previously blocking tasks should be completed at this point too
297      WAIT_FOR(blocking_tasks);
298  
299      // Pool should be stopped and no workers remaining
300      BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
301  }
302  
303  // Test 9, more workers than available cores (congestion test)
304  BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
305  {
306      ThreadPool threadPool(POOL_NAME);
307      threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
308  
309      int num_tasks = 200;
310      std::atomic<int> counter{0};
311  
312      std::vector<std::future<void>> futures;
313      futures.reserve(num_tasks);
314      for (int i = 0; i < num_tasks; i++) {
315          futures.emplace_back(Submit(threadPool, [&counter] {
316              counter.fetch_add(1, std::memory_order_relaxed);
317          }));
318      }
319  
320      WAIT_FOR(futures);
321      BOOST_CHECK_EQUAL(counter.load(), num_tasks);
322  }
323  
324  // Test 10, Interrupt() prevents further submissions
325  BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
326  {
327      // 1) Interrupt from main thread
328      ThreadPool threadPool(POOL_NAME);
329      threadPool.Start(NUM_WORKERS_DEFAULT);
330      threadPool.Interrupt();
331  
332      auto res = threadPool.Submit([]{});
333      BOOST_CHECK(!res);
334      BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
335  
336      std::vector<std::function<void()>> tasks;
337      const auto range_res{threadPool.Submit(std::move(tasks))};
338      BOOST_CHECK(!range_res);
339      BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
340  
341      // Reset pool
342      threadPool.Stop();
343  
344      // 2) Interrupt() from a worker thread
345      // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
346      threadPool.Start(/*num_workers=*/3);
347      std::atomic<int> counter{0};
348      std::counting_semaphore<> blocker(0);
349      const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
350      Submit(threadPool, [&threadPool, &counter]{
351          threadPool.Interrupt();
352          counter.fetch_add(1, std::memory_order_relaxed);
353      }).get();
354      blocker.release(1); // unblock worker
355  
356      BOOST_CHECK_EQUAL(counter.load(), 1);
357      threadPool.Stop();
358      WAIT_FOR(blocking_tasks);
359      BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
360  }
361  
362  // Test 11, Start() must not cause a deadlock when called during Stop()
363  BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
364  {
365      ThreadPool threadPool(POOL_NAME);
366      threadPool.Start(NUM_WORKERS_DEFAULT);
367  
368      // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
369      std::counting_semaphore<> workers_blocker(0);
370      const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
371  
372      std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
373  
374      // Stop() takes ownership of the workers before joining them, so WorkersCount()
375      // hits 0 the moment Stop() is waiting for them to join. That is our signal
376      // to call Start() right into the middle of the joining phase.
377      while (threadPool.WorkersCount() != 0) {
378          std::this_thread::yield(); // let the OS breathe so it can switch context
379      }
380      // Now we know for sure the stopper thread is hanging while workers are still alive.
381      // Restart the pool and resume workers so the stopper thread can proceed.
382      // This will throw an exception only if the pool handles Start-Stop race properly,
383      // otherwise it will proceed and hang the stopper_thread.
384      try {
385          threadPool.Start(NUM_WORKERS_DEFAULT);
386      } catch (std::exception& e) {
387          BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
388      }
389      workers_blocker.release(NUM_WORKERS_DEFAULT);
390      WAIT_FOR(blocking_tasks);
391  
392      // If Stop() is stuck, joining the stopper thread will deadlock
393      stopper_thread.join();
394  }
395  
396  // Test 12, queued tasks complete after Interrupt()
397  BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
398  {
399      ThreadPool threadPool(POOL_NAME);
400      threadPool.Start(NUM_WORKERS_DEFAULT);
401  
402      std::counting_semaphore<> blocker(0);
403      const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
404  
405      // Queue tasks while all workers are busy, then interrupt
406      std::atomic<int> counter{0};
407      const int num_tasks = 10;
408      std::vector<std::future<void>> futures;
409      futures.reserve(num_tasks);
410      for (int i = 0; i < num_tasks; i++) {
411          futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
412      }
413      threadPool.Interrupt();
414  
415      // Queued tasks must still complete despite the interrupt
416      blocker.release(NUM_WORKERS_DEFAULT);
417      WAIT_FOR(futures);
418      BOOST_CHECK_EQUAL(counter.load(), num_tasks);
419  
420      threadPool.Stop();
421      WAIT_FOR(blocking_tasks);
422  }
423  
424  // Test 13, ensure the Stop() calling thread helps drain the queue
425  BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
426  {
427      ThreadPool threadPool(POOL_NAME);
428      threadPool.Start(NUM_WORKERS_DEFAULT);
429  
430      std::counting_semaphore<> blocker(0);
431      const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
432  
433      auto main_thread_id = std::this_thread::get_id();
434      std::atomic<int> main_thread_tasks{0};
435      const size_t num_tasks = 20;
436      for (size_t i = 0; i < num_tasks; i++) {
437          (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
438              if (std::this_thread::get_id() == main_thread_id)
439                  main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
440          });
441      }
442      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
443  
444      // Delay release so Stop() drains all tasks from the calling thread
445      std::thread unblocker([&blocker, &threadPool]() {
446          while (threadPool.WorkQueueSize() > 0) {
447              std::this_thread::yield();
448          }
449          blocker.release(NUM_WORKERS_DEFAULT);
450      });
451  
452      threadPool.Stop();
453      unblocker.join();
454  
455      // Check the main thread processed all tasks
456      BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
457      WAIT_FOR(blocking_tasks);
458  }
459  
460  // Test 14, submit range of tasks in one lock acquisition
461  BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
462  {
463      constexpr int32_t num_tasks{50};
464  
465      ThreadPool threadPool{POOL_NAME};
466      threadPool.Start(NUM_WORKERS_DEFAULT);
467      std::atomic_int32_t sum{0};
468      const auto square{[&sum](int32_t i) {
469          sum.fetch_add(i, std::memory_order_relaxed);
470          return i * i;
471      }};
472  
473      std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
474      std::vector<std::function<int32_t()>> vector_tasks;
475      vector_tasks.reserve(static_cast<size_t>(num_tasks));
476      for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
477          array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
478          vector_tasks.emplace_back([i, square] { return square(i); });
479      }
480  
481      auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
482      BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
483      std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
484      BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
485  
486      auto squares_sum{0};
487      for (auto& future : futures) {
488          squares_sum += future.get();
489      }
490  
491      // 2x Gauss sum.
492      const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
493      const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
494      BOOST_CHECK_EQUAL(sum, expected_sum);
495      BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
496      BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
497  }
498  
499  BOOST_AUTO_TEST_SUITE_END()