/ src / test / scheduler_tests.cpp
scheduler_tests.cpp
  1  // Copyright (c) 2012-2022 The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <random.h>
  6  #include <scheduler.h>
  7  #include <util/time.h>
  8  
  9  #include <boost/test/unit_test.hpp>
 10  
 11  #include <functional>
 12  #include <mutex>
 13  #include <thread>
 14  #include <vector>
 15  
 16  BOOST_AUTO_TEST_SUITE(scheduler_tests)
 17  
 18  static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
 19  {
 20      {
 21          std::lock_guard<std::mutex> lock(mutex);
 22          counter += delta;
 23      }
 24      auto noTime = std::chrono::steady_clock::time_point::min();
 25      if (rescheduleTime != noTime) {
 26          CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
 27          s.schedule(f, rescheduleTime);
 28      }
 29  }
 30  
 31  BOOST_AUTO_TEST_CASE(manythreads)
 32  {
 33      // Stress test: hundreds of microsecond-scheduled tasks,
 34      // serviced by 10 threads.
 35      //
 36      // So... ten shared counters, which if all the tasks execute
 37      // properly will sum to the number of tasks done.
 38      // Each task adds or subtracts a random amount from one of the
 39      // counters, and then schedules another task 0-1000
 40      // microseconds in the future to subtract or add from
 41      // the counter -random_amount+1, so in the end the shared
 42      // counters should sum to the number of initial tasks performed.
 43      CScheduler microTasks;
 44  
 45      std::mutex counterMutex[10];
 46      int counter[10] = { 0 };
 47      FastRandomContext rng{/*fDeterministic=*/true};
 48      auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
 49      auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
 50      auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
 51  
 52      auto start = std::chrono::steady_clock::now();
 53      auto now = start;
 54      std::chrono::steady_clock::time_point first, last;
 55      size_t nTasks = microTasks.getQueueInfo(first, last);
 56      BOOST_CHECK(nTasks == 0);
 57  
 58      for (int i = 0; i < 100; ++i) {
 59          auto t = now + std::chrono::microseconds(randomMsec(rng));
 60          auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
 61          int whichCounter = zeroToNine(rng);
 62          CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
 63                                               std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
 64                                               randomDelta(rng), tReschedule);
 65          microTasks.schedule(f, t);
 66      }
 67      nTasks = microTasks.getQueueInfo(first, last);
 68      BOOST_CHECK(nTasks == 100);
 69      BOOST_CHECK(first < last);
 70      BOOST_CHECK(last > now);
 71  
 72      // As soon as these are created they will start running and servicing the queue
 73      std::vector<std::thread> microThreads;
 74      microThreads.reserve(10);
 75      for (int i = 0; i < 5; i++)
 76          microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
 77  
 78      UninterruptibleSleep(std::chrono::microseconds{600});
 79      now = std::chrono::steady_clock::now();
 80  
 81      // More threads and more tasks:
 82      for (int i = 0; i < 5; i++)
 83          microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
 84      for (int i = 0; i < 100; i++) {
 85          auto t = now + std::chrono::microseconds(randomMsec(rng));
 86          auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
 87          int whichCounter = zeroToNine(rng);
 88          CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
 89                                               std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
 90                                               randomDelta(rng), tReschedule);
 91          microTasks.schedule(f, t);
 92      }
 93  
 94      // Drain the task queue then exit threads
 95      microTasks.StopWhenDrained();
 96      // wait until all the threads are done
 97      for (auto& thread: microThreads) {
 98          if (thread.joinable()) thread.join();
 99      }
100  
101      int counterSum = 0;
102      for (int i = 0; i < 10; i++) {
103          BOOST_CHECK(counter[i] != 0);
104          counterSum += counter[i];
105      }
106      BOOST_CHECK_EQUAL(counterSum, 200);
107  }
108  
109  BOOST_AUTO_TEST_CASE(wait_until_past)
110  {
111      std::condition_variable condvar;
112      Mutex mtx;
113      WAIT_LOCK(mtx, lock);
114  
115      const auto no_wait = [&](const std::chrono::seconds& d) {
116          return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
117      };
118  
119      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
120      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
121      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
122      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
123      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
124      BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
125  }
126  
127  BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
128  {
129      CScheduler scheduler;
130  
131      // each queue should be well ordered with respect to itself but not other queues
132      SerialTaskRunner queue1(scheduler);
133      SerialTaskRunner queue2(scheduler);
134  
135      // create more threads than queues
136      // if the queues only permit execution of one task at once then
137      // the extra threads should effectively be doing nothing
138      // if they don't we'll get out of order behaviour
139      std::vector<std::thread> threads;
140      threads.reserve(5);
141      for (int i = 0; i < 5; ++i) {
142          threads.emplace_back([&] { scheduler.serviceQueue(); });
143      }
144  
145      // these are not atomic, if SerialTaskRunner prevents
146      // parallel execution at the queue level no synchronization should be required here
147      int counter1 = 0;
148      int counter2 = 0;
149  
150      // just simply count up on each queue - if execution is properly ordered then
151      // the callbacks should run in exactly the order in which they were enqueued
152      for (int i = 0; i < 100; ++i) {
153          queue1.insert([i, &counter1]() {
154              bool expectation = i == counter1++;
155              assert(expectation);
156          });
157  
158          queue2.insert([i, &counter2]() {
159              bool expectation = i == counter2++;
160              assert(expectation);
161          });
162      }
163  
164      // finish up
165      scheduler.StopWhenDrained();
166      for (auto& thread: threads) {
167          if (thread.joinable()) thread.join();
168      }
169  
170      BOOST_CHECK_EQUAL(counter1, 100);
171      BOOST_CHECK_EQUAL(counter2, 100);
172  }
173  
174  BOOST_AUTO_TEST_CASE(mockforward)
175  {
176      CScheduler scheduler;
177  
178      int counter{0};
179      CScheduler::Function dummy = [&counter]{counter++;};
180  
181      // schedule jobs for 2, 5 & 8 minutes into the future
182  
183      scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
184      scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
185      scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
186  
187      // check taskQueue
188      std::chrono::steady_clock::time_point first, last;
189      size_t num_tasks = scheduler.getQueueInfo(first, last);
190      BOOST_CHECK_EQUAL(num_tasks, 3ul);
191  
192      std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
193  
194      // bump the scheduler forward 5 minutes
195      scheduler.MockForward(std::chrono::minutes{5});
196  
197      // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
198      scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
199      scheduler_thread.join();
200  
201      // check that the queue only has one job remaining
202      num_tasks = scheduler.getQueueInfo(first, last);
203      BOOST_CHECK_EQUAL(num_tasks, 1ul);
204  
205      // check that the dummy function actually ran
206      BOOST_CHECK_EQUAL(counter, 2);
207  
208      // check that the time of the remaining job has been updated
209      auto now = std::chrono::steady_clock::now();
210      int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
211      // should be between 2 & 3 minutes from now
212      BOOST_CHECK(delta > 2*60 && delta < 3*60);
213  }
214  
215  BOOST_AUTO_TEST_SUITE_END()