/ src / test / fuzz / threadpool.cpp
threadpool.cpp
  1  // Copyright (c) The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <logging.h>
  6  #include <util/threadpool.h>
  7  
  8  #include <test/fuzz/FuzzedDataProvider.h>
  9  #include <test/fuzz/fuzz.h>
 10  
 11  #include <atomic>
 12  #include <future>
 13  #include <queue>
 14  
 15  struct ExpectedException : std::runtime_error {
 16      explicit ExpectedException(const std::string& msg) : std::runtime_error(msg) {}
 17  };
 18  
 19  struct ThrowTask {
 20      void operator()() const { throw ExpectedException("fail"); }
 21  };
 22  
 23  struct CounterTask {
 24      std::atomic_uint32_t& m_counter;
 25      explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
 26      void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
 27  };
 28  
 29  // Waits for a future to complete. Increments 'fail_counter' if the expected exception is thrown.
 30  static void GetFuture(std::future<void>& future, uint32_t& fail_counter)
 31  {
 32      try {
 33          future.get();
 34      } catch (const ExpectedException&) {
 35          fail_counter++;
 36      } catch (...) {
 37          assert(false && "Unexpected exception type");
 38      }
 39  }
 40  
 41  // Global thread pool for fuzzing. Persisting it across iterations prevents
 42  // the excessive thread creation/destruction overhead that can lead to
 43  // instability in the fuzzing environment.
 44  // This is also how we use it in the app's lifecycle.
 45  ThreadPool g_pool{"fuzz"};
 46  Mutex g_pool_mutex;
 47  // Global to verify we always have the same number of threads.
 48  size_t g_num_workers = 3;
 49  
 50  static void StartPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
 51  {
 52      LOCK(g_pool_mutex);
 53      if (g_pool.WorkersCount() == g_num_workers) return;
 54      g_pool.Start(g_num_workers);
 55  }
 56  
 57  static void setup_threadpool_test()
 58  {
 59      // Disable logging entirely. It seems to cause memory leaks.
 60      LogInstance().DisableLogging();
 61  }
 62  
 63  FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED(!g_pool_mutex)
 64  {
 65      // Because LibAFL calls fork() after calling the init setup function,
 66      // the child processes end up having one thread active and no workers.
 67      // To work around this limitation, start thread pool inside the first runner.
 68      StartPoolIfNeeded();
 69  
 70      FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
 71  
 72      const uint32_t num_tasks = fuzzed_data_provider.ConsumeIntegralInRange<uint32_t>(0, 1024);
 73      assert(g_pool.WorkersCount() == g_num_workers);
 74      assert(g_pool.WorkQueueSize() == 0);
 75  
 76      // Counters
 77      std::atomic_uint32_t task_counter{0};
 78      uint32_t fail_counter{0};
 79      uint32_t expected_task_counter{0};
 80      uint32_t expected_fail_tasks{0};
 81  
 82      std::queue<std::future<void>> futures;
 83      for (uint32_t i = 0; i < num_tasks; ++i) {
 84          const bool will_throw = fuzzed_data_provider.ConsumeBool();
 85          const bool wait_immediately = fuzzed_data_provider.ConsumeBool();
 86  
 87          std::future<void> fut;
 88          if (will_throw) {
 89              expected_fail_tasks++;
 90              fut = *Assert(g_pool.Submit(ThrowTask{}));
 91          } else {
 92              expected_task_counter++;
 93              fut = *Assert(g_pool.Submit(CounterTask{task_counter}));
 94          }
 95  
 96          // If caller wants to wait immediately, consume the future here (safe).
 97          if (wait_immediately) {
 98              // Waits for this task to complete immediately; prior queued tasks may also complete
 99              // as they were queued earlier.
100              GetFuture(fut, fail_counter);
101          } else {
102              // Store task for a posterior check
103              futures.emplace(std::move(fut));
104          }
105      }
106  
107      // Drain remaining futures
108      while (!futures.empty()) {
109          auto fut = std::move(futures.front());
110          futures.pop();
111          GetFuture(fut, fail_counter);
112      }
113  
114      assert(g_pool.WorkQueueSize() == 0);
115      assert(task_counter.load() == expected_task_counter);
116      assert(fail_counter == expected_fail_tasks);
117  }