threadpool.cpp
1 // Copyright (c) The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <logging.h> 6 #include <util/threadpool.h> 7 8 #include <test/fuzz/FuzzedDataProvider.h> 9 #include <test/fuzz/fuzz.h> 10 11 #include <atomic> 12 #include <future> 13 #include <queue> 14 15 struct ExpectedException : std::runtime_error { 16 explicit ExpectedException(const std::string& msg) : std::runtime_error(msg) {} 17 }; 18 19 struct ThrowTask { 20 void operator()() const { throw ExpectedException("fail"); } 21 }; 22 23 struct CounterTask { 24 std::atomic_uint32_t& m_counter; 25 explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {} 26 void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); } 27 }; 28 29 // Waits for a future to complete. Increments 'fail_counter' if the expected exception is thrown. 30 static void GetFuture(std::future<void>& future, uint32_t& fail_counter) 31 { 32 try { 33 future.get(); 34 } catch (const ExpectedException&) { 35 fail_counter++; 36 } catch (...) { 37 assert(false && "Unexpected exception type"); 38 } 39 } 40 41 // Global thread pool for fuzzing. Persisting it across iterations prevents 42 // the excessive thread creation/destruction overhead that can lead to 43 // instability in the fuzzing environment. 44 // This is also how we use it in the app's lifecycle. 45 ThreadPool g_pool{"fuzz"}; 46 Mutex g_pool_mutex; 47 // Global to verify we always have the same number of threads. 48 size_t g_num_workers = 3; 49 50 static void StartPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex) 51 { 52 LOCK(g_pool_mutex); 53 if (g_pool.WorkersCount() == g_num_workers) return; 54 g_pool.Start(g_num_workers); 55 } 56 57 static void setup_threadpool_test() 58 { 59 // Disable logging entirely. It seems to cause memory leaks. 60 LogInstance().DisableLogging(); 61 } 62 63 FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex) 64 { 65 // Because LibAFL calls fork() after calling the init setup function, 66 // the child processes end up having one thread active and no workers. 67 // To work around this limitation, start thread pool inside the first runner. 68 StartPoolIfNeeded(); 69 70 FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size()); 71 72 const uint32_t num_tasks = fuzzed_data_provider.ConsumeIntegralInRange<uint32_t>(0, 1024); 73 assert(g_pool.WorkersCount() == g_num_workers); 74 assert(g_pool.WorkQueueSize() == 0); 75 76 // Counters 77 std::atomic_uint32_t task_counter{0}; 78 uint32_t fail_counter{0}; 79 uint32_t expected_task_counter{0}; 80 uint32_t expected_fail_tasks{0}; 81 82 std::queue<std::future<void>> futures; 83 for (uint32_t i = 0; i < num_tasks; ++i) { 84 const bool will_throw = fuzzed_data_provider.ConsumeBool(); 85 const bool wait_immediately = fuzzed_data_provider.ConsumeBool(); 86 87 std::future<void> fut; 88 if (will_throw) { 89 expected_fail_tasks++; 90 fut = *Assert(g_pool.Submit(ThrowTask{})); 91 } else { 92 expected_task_counter++; 93 fut = *Assert(g_pool.Submit(CounterTask{task_counter})); 94 } 95 96 // If caller wants to wait immediately, consume the future here (safe). 97 if (wait_immediately) { 98 // Waits for this task to complete immediately; prior queued tasks may also complete 99 // as they were queued earlier. 100 GetFuture(fut, fail_counter); 101 } else { 102 // Store task for a posterior check 103 futures.emplace(std::move(fut)); 104 } 105 } 106 107 // Drain remaining futures 108 while (!futures.empty()) { 109 auto fut = std::move(futures.front()); 110 futures.pop(); 111 GetFuture(fut, fail_counter); 112 } 113 114 assert(g_pool.WorkQueueSize() == 0); 115 assert(task_counter.load() == expected_task_counter); 116 assert(fail_counter == expected_fail_tasks); 117 }