threadpool_tests.cpp
1 // Copyright (c) The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <common/system.h> 6 #include <logging.h> 7 #include <random.h> 8 #include <test/util/common.h> 9 #include <util/string.h> 10 #include <util/threadpool.h> 11 #include <util/time.h> 12 13 #include <boost/test/unit_test.hpp> 14 15 #include <array> 16 #include <functional> 17 #include <latch> 18 #include <ranges> 19 #include <semaphore> 20 21 // General test values 22 int NUM_WORKERS_DEFAULT = 0; 23 constexpr char POOL_NAME[] = "test"; 24 constexpr auto WAIT_TIMEOUT = 120s; 25 26 struct ThreadPoolFixture { 27 ThreadPoolFixture() { 28 NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1; 29 LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT); 30 } 31 }; 32 33 // Test Cases Overview 34 // 0) Submit task to a non-started pool. 35 // 1) Submit tasks and verify completion. 36 // 2) Maintain all threads busy except one. 37 // 3) Wait for work to finish. 38 // 4) Wait for result object. 39 // 5) The task throws an exception, catch must be done in the consumer side. 40 // 6) Busy workers, help them by processing tasks externally. 41 // 7) Recursive submission of tasks. 42 // 8) Submit task when all threads are busy, stop pool and verify task gets executed. 43 // 9) Congestion test; create more workers than available cores. 44 // 10) Ensure Interrupt() prevents further submissions. 45 // 11) Start() must not cause a deadlock when called during Stop(). 46 // 12) Ensure queued tasks complete after Interrupt(). 47 // 13) Ensure the Stop() calling thread helps drain the queue. 48 // 14) Submit range of tasks in one lock acquisition. 49 BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) 50 51 #define WAIT_FOR(futures) \ 52 do { \ 53 for (const auto& f : futures) { \ 54 BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \ 55 } \ 56 } while (0) 57 58 // Helper to unwrap a valid pool submission 59 template <typename F> 60 [[nodiscard]] auto Submit(ThreadPool& pool, F&& fn) 61 { 62 return std::move(*Assert(pool.Submit(std::forward<F>(fn)))); 63 } 64 65 // Block a number of worker threads by submitting tasks that wait on `release_sem`. 66 // Returns the futures of the blocking tasks, ensuring all have started and are waiting. 67 std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block) 68 { 69 assert(threadPool.WorkersCount() >= num_of_threads_to_block); 70 std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)}; 71 std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block); 72 for (auto& f : blocking_tasks) f = Submit(threadPool, [&] { 73 ready.count_down(); 74 release_sem.acquire(); 75 }); 76 ready.wait(); 77 return blocking_tasks; 78 } 79 80 // Test 0, submit task to a non-started, interrupted, or stopped pool 81 BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error) 82 { 83 ThreadPool threadPool(POOL_NAME); 84 const auto fn_empty = [&] {}; 85 86 // Never started: Inactive 87 auto res = threadPool.Submit(fn_empty); 88 BOOST_CHECK(!res); 89 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers"); 90 91 // Interrupted (workers still alive): Interrupted, and Start() must be rejected too 92 std::counting_semaphore<> blocker(0); 93 threadPool.Start(NUM_WORKERS_DEFAULT); 94 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); 95 threadPool.Interrupt(); 96 res = threadPool.Submit(fn_empty); 97 BOOST_CHECK(!res); 98 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted"); 99 BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping")); 100 blocker.release(NUM_WORKERS_DEFAULT); 101 WAIT_FOR(blocking_tasks); 102 103 // Interrupted then stopped: Inactive 104 threadPool.Stop(); 105 res = threadPool.Submit(fn_empty); 106 BOOST_CHECK(!res); 107 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers"); 108 109 // Started then stopped: Inactive 110 threadPool.Start(NUM_WORKERS_DEFAULT); 111 threadPool.Stop(); 112 res = threadPool.Submit(fn_empty); 113 BOOST_CHECK(!res); 114 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers"); 115 116 std::vector<std::function<void()>> tasks; 117 const auto range_res{threadPool.Submit(std::move(tasks))}; 118 BOOST_CHECK(!range_res); 119 BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers"); 120 } 121 122 // Test 1, submit tasks and verify completion 123 BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully) 124 { 125 int num_tasks = 50; 126 127 ThreadPool threadPool(POOL_NAME); 128 threadPool.Start(NUM_WORKERS_DEFAULT); 129 std::atomic<int> counter = 0; 130 131 // Store futures to ensure completion before checking counter. 132 std::vector<std::future<void>> futures; 133 futures.reserve(num_tasks); 134 for (int i = 1; i <= num_tasks; i++) { 135 futures.emplace_back(Submit(threadPool, [&counter, i]() { 136 counter.fetch_add(i, std::memory_order_relaxed); 137 })); 138 } 139 140 // Wait for all tasks to finish 141 WAIT_FOR(futures); 142 int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum. 143 BOOST_CHECK_EQUAL(counter.load(), expected_value); 144 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); 145 } 146 147 // Test 2, maintain all threads busy except one 148 BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) 149 { 150 ThreadPool threadPool(POOL_NAME); 151 threadPool.Start(NUM_WORKERS_DEFAULT); 152 std::counting_semaphore<> blocker(0); 153 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1); 154 155 // Now execute tasks on the single available worker 156 // and check that all the tasks are executed. 157 int num_tasks = 15; 158 int counter = 0; 159 160 // Store futures to wait on 161 std::vector<std::future<void>> futures(num_tasks); 162 for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; }); 163 164 WAIT_FOR(futures); 165 BOOST_CHECK_EQUAL(counter, num_tasks); 166 167 blocker.release(NUM_WORKERS_DEFAULT - 1); 168 WAIT_FOR(blocking_tasks); 169 threadPool.Stop(); 170 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); 171 } 172 173 // Test 3, wait for work to finish 174 BOOST_AUTO_TEST_CASE(wait_for_task_to_finish) 175 { 176 ThreadPool threadPool(POOL_NAME); 177 threadPool.Start(NUM_WORKERS_DEFAULT); 178 std::atomic<bool> flag = false; 179 std::future<void> future = Submit(threadPool, [&flag]() { 180 UninterruptibleSleep(200ms); 181 flag.store(true, std::memory_order_release); 182 }); 183 BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready); 184 BOOST_CHECK(flag.load(std::memory_order_acquire)); 185 } 186 187 // Test 4, obtain result object 188 BOOST_AUTO_TEST_CASE(get_result_from_completed_task) 189 { 190 ThreadPool threadPool(POOL_NAME); 191 threadPool.Start(NUM_WORKERS_DEFAULT); 192 std::future<bool> future_bool = Submit(threadPool, []() { return true; }); 193 BOOST_CHECK(future_bool.get()); 194 195 std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); }); 196 std::string result = future_str.get(); 197 BOOST_CHECK_EQUAL(result, "true"); 198 } 199 200 // Test 5, throw exception and catch it on the consumer side 201 BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future) 202 { 203 ThreadPool threadPool(POOL_NAME); 204 threadPool.Start(NUM_WORKERS_DEFAULT); 205 206 const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }}; 207 208 const int num_tasks = 5; 209 std::vector<std::future<void>> futures; 210 futures.reserve(num_tasks); 211 for (int i = 0; i < num_tasks; i++) { 212 futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); })); 213 } 214 215 for (int i = 0; i < num_tasks; i++) { 216 BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)}); 217 } 218 } 219 220 // Test 6, all workers are busy, help them by processing tasks from outside 221 BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) 222 { 223 ThreadPool threadPool(POOL_NAME); 224 threadPool.Start(NUM_WORKERS_DEFAULT); 225 226 std::counting_semaphore<> blocker(0); 227 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); 228 229 // Now submit tasks and check that none of them are executed. 230 int num_tasks = 20; 231 std::atomic<int> counter = 0; 232 for (int i = 0; i < num_tasks; i++) { 233 (void)Submit(threadPool, [&counter]() { 234 counter.fetch_add(1, std::memory_order_relaxed); 235 }); 236 } 237 UninterruptibleSleep(100ms); 238 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks); 239 240 // Now process manually 241 for (int i = 0; i < num_tasks; i++) { 242 threadPool.ProcessTask(); 243 } 244 BOOST_CHECK_EQUAL(counter.load(), num_tasks); 245 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); 246 blocker.release(NUM_WORKERS_DEFAULT); 247 threadPool.Stop(); 248 WAIT_FOR(blocking_tasks); 249 } 250 251 // Test 7, submit tasks from other tasks 252 BOOST_AUTO_TEST_CASE(recursive_task_submission) 253 { 254 ThreadPool threadPool(POOL_NAME); 255 threadPool.Start(NUM_WORKERS_DEFAULT); 256 257 std::promise<void> signal; 258 (void)Submit(threadPool, [&]() { 259 (void)Submit(threadPool, [&]() { 260 signal.set_value(); 261 }); 262 }); 263 264 signal.get_future().wait(); 265 threadPool.Stop(); 266 } 267 268 // Test 8, submit task when all threads are busy and then stop the pool 269 BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) 270 { 271 ThreadPool threadPool(POOL_NAME); 272 threadPool.Start(NUM_WORKERS_DEFAULT); 273 274 std::counting_semaphore<> blocker(0); 275 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); 276 277 // Submit an extra task that should execute once a worker is free 278 std::future<bool> future = Submit(threadPool, []() { return true; }); 279 280 // At this point, all workers are blocked, and the extra task is queued 281 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1); 282 283 // Wait a short moment before unblocking the threads to mimic a concurrent shutdown 284 std::thread thread_unblocker([&blocker]() { 285 UninterruptibleSleep(300ms); 286 blocker.release(NUM_WORKERS_DEFAULT); 287 }); 288 289 // Stop the pool while the workers are still blocked 290 threadPool.Stop(); 291 292 // Expect the submitted task to complete 293 BOOST_CHECK(future.get()); 294 thread_unblocker.join(); 295 296 // Obviously all the previously blocking tasks should be completed at this point too 297 WAIT_FOR(blocking_tasks); 298 299 // Pool should be stopped and no workers remaining 300 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); 301 } 302 303 // Test 9, more workers than available cores (congestion test) 304 BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores) 305 { 306 ThreadPool threadPool(POOL_NAME); 307 threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2× 308 309 int num_tasks = 200; 310 std::atomic<int> counter{0}; 311 312 std::vector<std::future<void>> futures; 313 futures.reserve(num_tasks); 314 for (int i = 0; i < num_tasks; i++) { 315 futures.emplace_back(Submit(threadPool, [&counter] { 316 counter.fetch_add(1, std::memory_order_relaxed); 317 })); 318 } 319 320 WAIT_FOR(futures); 321 BOOST_CHECK_EQUAL(counter.load(), num_tasks); 322 } 323 324 // Test 10, Interrupt() prevents further submissions 325 BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) 326 { 327 // 1) Interrupt from main thread 328 ThreadPool threadPool(POOL_NAME); 329 threadPool.Start(NUM_WORKERS_DEFAULT); 330 threadPool.Interrupt(); 331 332 auto res = threadPool.Submit([]{}); 333 BOOST_CHECK(!res); 334 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted"); 335 336 std::vector<std::function<void()>> tasks; 337 const auto range_res{threadPool.Submit(std::move(tasks))}; 338 BOOST_CHECK(!range_res); 339 BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted"); 340 341 // Reset pool 342 threadPool.Stop(); 343 344 // 2) Interrupt() from a worker thread 345 // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks. 346 threadPool.Start(/*num_workers=*/3); 347 std::atomic<int> counter{0}; 348 std::counting_semaphore<> blocker(0); 349 const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1); 350 Submit(threadPool, [&threadPool, &counter]{ 351 threadPool.Interrupt(); 352 counter.fetch_add(1, std::memory_order_relaxed); 353 }).get(); 354 blocker.release(1); // unblock worker 355 356 BOOST_CHECK_EQUAL(counter.load(), 1); 357 threadPool.Stop(); 358 WAIT_FOR(blocking_tasks); 359 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); 360 } 361 362 // Test 11, Start() must not cause a deadlock when called during Stop() 363 BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock) 364 { 365 ThreadPool threadPool(POOL_NAME); 366 threadPool.Start(NUM_WORKERS_DEFAULT); 367 368 // Keep all workers busy so Stop() gets stuck waiting for them to finish during join() 369 std::counting_semaphore<> workers_blocker(0); 370 const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT); 371 372 std::thread stopper_thread([&threadPool] { threadPool.Stop(); }); 373 374 // Stop() takes ownership of the workers before joining them, so WorkersCount() 375 // hits 0 the moment Stop() is waiting for them to join. That is our signal 376 // to call Start() right into the middle of the joining phase. 377 while (threadPool.WorkersCount() != 0) { 378 std::this_thread::yield(); // let the OS breathe so it can switch context 379 } 380 // Now we know for sure the stopper thread is hanging while workers are still alive. 381 // Restart the pool and resume workers so the stopper thread can proceed. 382 // This will throw an exception only if the pool handles Start-Stop race properly, 383 // otherwise it will proceed and hang the stopper_thread. 384 try { 385 threadPool.Start(NUM_WORKERS_DEFAULT); 386 } catch (std::exception& e) { 387 BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping"); 388 } 389 workers_blocker.release(NUM_WORKERS_DEFAULT); 390 WAIT_FOR(blocking_tasks); 391 392 // If Stop() is stuck, joining the stopper thread will deadlock 393 stopper_thread.join(); 394 } 395 396 // Test 12, queued tasks complete after Interrupt() 397 BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt) 398 { 399 ThreadPool threadPool(POOL_NAME); 400 threadPool.Start(NUM_WORKERS_DEFAULT); 401 402 std::counting_semaphore<> blocker(0); 403 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); 404 405 // Queue tasks while all workers are busy, then interrupt 406 std::atomic<int> counter{0}; 407 const int num_tasks = 10; 408 std::vector<std::future<void>> futures; 409 futures.reserve(num_tasks); 410 for (int i = 0; i < num_tasks; i++) { 411 futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); })); 412 } 413 threadPool.Interrupt(); 414 415 // Queued tasks must still complete despite the interrupt 416 blocker.release(NUM_WORKERS_DEFAULT); 417 WAIT_FOR(futures); 418 BOOST_CHECK_EQUAL(counter.load(), num_tasks); 419 420 threadPool.Stop(); 421 WAIT_FOR(blocking_tasks); 422 } 423 424 // Test 13, ensure the Stop() calling thread helps drain the queue 425 BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue) 426 { 427 ThreadPool threadPool(POOL_NAME); 428 threadPool.Start(NUM_WORKERS_DEFAULT); 429 430 std::counting_semaphore<> blocker(0); 431 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); 432 433 auto main_thread_id = std::this_thread::get_id(); 434 std::atomic<int> main_thread_tasks{0}; 435 const size_t num_tasks = 20; 436 for (size_t i = 0; i < num_tasks; i++) { 437 (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() { 438 if (std::this_thread::get_id() == main_thread_id) 439 main_thread_tasks.fetch_add(1, std::memory_order_relaxed); 440 }); 441 } 442 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks); 443 444 // Delay release so Stop() drains all tasks from the calling thread 445 std::thread unblocker([&blocker, &threadPool]() { 446 while (threadPool.WorkQueueSize() > 0) { 447 std::this_thread::yield(); 448 } 449 blocker.release(NUM_WORKERS_DEFAULT); 450 }); 451 452 threadPool.Stop(); 453 unblocker.join(); 454 455 // Check the main thread processed all tasks 456 BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks); 457 WAIT_FOR(blocking_tasks); 458 } 459 460 // Test 14, submit range of tasks in one lock acquisition 461 BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully) 462 { 463 constexpr int32_t num_tasks{50}; 464 465 ThreadPool threadPool{POOL_NAME}; 466 threadPool.Start(NUM_WORKERS_DEFAULT); 467 std::atomic_int32_t sum{0}; 468 const auto square{[&sum](int32_t i) { 469 sum.fetch_add(i, std::memory_order_relaxed); 470 return i * i; 471 }}; 472 473 std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks; 474 std::vector<std::function<int32_t()>> vector_tasks; 475 vector_tasks.reserve(static_cast<size_t>(num_tasks)); 476 for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) { 477 array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); }; 478 vector_tasks.emplace_back([i, square] { return square(i); }); 479 } 480 481 auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))}; 482 BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks)); 483 std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures)); 484 BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2)); 485 486 auto squares_sum{0}; 487 for (auto& future : futures) { 488 squares_sum += future.get(); 489 } 490 491 // 2x Gauss sum. 492 const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)}; 493 const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)}; 494 BOOST_CHECK_EQUAL(sum, expected_sum); 495 BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum); 496 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); 497 } 498 499 BOOST_AUTO_TEST_SUITE_END()