scheduler_tests.cpp
1 // Copyright (c) 2012-2022 The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <random.h> 6 #include <scheduler.h> 7 #include <util/time.h> 8 9 #include <boost/test/unit_test.hpp> 10 11 #include <functional> 12 #include <mutex> 13 #include <thread> 14 #include <vector> 15 16 BOOST_AUTO_TEST_SUITE(scheduler_tests) 17 18 static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime) 19 { 20 { 21 std::lock_guard<std::mutex> lock(mutex); 22 counter += delta; 23 } 24 auto noTime = std::chrono::steady_clock::time_point::min(); 25 if (rescheduleTime != noTime) { 26 CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); 27 s.schedule(f, rescheduleTime); 28 } 29 } 30 31 BOOST_AUTO_TEST_CASE(manythreads) 32 { 33 // Stress test: hundreds of microsecond-scheduled tasks, 34 // serviced by 10 threads. 35 // 36 // So... ten shared counters, which if all the tasks execute 37 // properly will sum to the number of tasks done. 38 // Each task adds or subtracts a random amount from one of the 39 // counters, and then schedules another task 0-1000 40 // microseconds in the future to subtract or add from 41 // the counter -random_amount+1, so in the end the shared 42 // counters should sum to the number of initial tasks performed. 43 CScheduler microTasks; 44 45 std::mutex counterMutex[10]; 46 int counter[10] = { 0 }; 47 FastRandomContext rng{/*fDeterministic=*/true}; 48 auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9] 49 auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000] 50 auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000] 51 52 auto start = std::chrono::steady_clock::now(); 53 auto now = start; 54 std::chrono::steady_clock::time_point first, last; 55 size_t nTasks = microTasks.getQueueInfo(first, last); 56 BOOST_CHECK(nTasks == 0); 57 58 for (int i = 0; i < 100; ++i) { 59 auto t = now + std::chrono::microseconds(randomMsec(rng)); 60 auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); 61 int whichCounter = zeroToNine(rng); 62 CScheduler::Function f = std::bind(µTask, std::ref(microTasks), 63 std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), 64 randomDelta(rng), tReschedule); 65 microTasks.schedule(f, t); 66 } 67 nTasks = microTasks.getQueueInfo(first, last); 68 BOOST_CHECK(nTasks == 100); 69 BOOST_CHECK(first < last); 70 BOOST_CHECK(last > now); 71 72 // As soon as these are created they will start running and servicing the queue 73 std::vector<std::thread> microThreads; 74 microThreads.reserve(10); 75 for (int i = 0; i < 5; i++) 76 microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks)); 77 78 UninterruptibleSleep(std::chrono::microseconds{600}); 79 now = std::chrono::steady_clock::now(); 80 81 // More threads and more tasks: 82 for (int i = 0; i < 5; i++) 83 microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks)); 84 for (int i = 0; i < 100; i++) { 85 auto t = now + std::chrono::microseconds(randomMsec(rng)); 86 auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); 87 int whichCounter = zeroToNine(rng); 88 CScheduler::Function f = std::bind(µTask, std::ref(microTasks), 89 std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), 90 randomDelta(rng), tReschedule); 91 microTasks.schedule(f, t); 92 } 93 94 // Drain the task queue then exit threads 95 microTasks.StopWhenDrained(); 96 // wait until all the threads are done 97 for (auto& thread: microThreads) { 98 if (thread.joinable()) thread.join(); 99 } 100 101 int counterSum = 0; 102 for (int i = 0; i < 10; i++) { 103 BOOST_CHECK(counter[i] != 0); 104 counterSum += counter[i]; 105 } 106 BOOST_CHECK_EQUAL(counterSum, 200); 107 } 108 109 BOOST_AUTO_TEST_CASE(wait_until_past) 110 { 111 std::condition_variable condvar; 112 Mutex mtx; 113 WAIT_LOCK(mtx, lock); 114 115 const auto no_wait = [&](const std::chrono::seconds& d) { 116 return condvar.wait_until(lock, std::chrono::steady_clock::now() - d); 117 }; 118 119 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1})); 120 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1})); 121 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1})); 122 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10})); 123 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100})); 124 BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000})); 125 } 126 127 BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) 128 { 129 CScheduler scheduler; 130 131 // each queue should be well ordered with respect to itself but not other queues 132 SerialTaskRunner queue1(scheduler); 133 SerialTaskRunner queue2(scheduler); 134 135 // create more threads than queues 136 // if the queues only permit execution of one task at once then 137 // the extra threads should effectively be doing nothing 138 // if they don't we'll get out of order behaviour 139 std::vector<std::thread> threads; 140 threads.reserve(5); 141 for (int i = 0; i < 5; ++i) { 142 threads.emplace_back([&] { scheduler.serviceQueue(); }); 143 } 144 145 // these are not atomic, if SerialTaskRunner prevents 146 // parallel execution at the queue level no synchronization should be required here 147 int counter1 = 0; 148 int counter2 = 0; 149 150 // just simply count up on each queue - if execution is properly ordered then 151 // the callbacks should run in exactly the order in which they were enqueued 152 for (int i = 0; i < 100; ++i) { 153 queue1.insert([i, &counter1]() { 154 bool expectation = i == counter1++; 155 assert(expectation); 156 }); 157 158 queue2.insert([i, &counter2]() { 159 bool expectation = i == counter2++; 160 assert(expectation); 161 }); 162 } 163 164 // finish up 165 scheduler.StopWhenDrained(); 166 for (auto& thread: threads) { 167 if (thread.joinable()) thread.join(); 168 } 169 170 BOOST_CHECK_EQUAL(counter1, 100); 171 BOOST_CHECK_EQUAL(counter2, 100); 172 } 173 174 BOOST_AUTO_TEST_CASE(mockforward) 175 { 176 CScheduler scheduler; 177 178 int counter{0}; 179 CScheduler::Function dummy = [&counter]{counter++;}; 180 181 // schedule jobs for 2, 5 & 8 minutes into the future 182 183 scheduler.scheduleFromNow(dummy, std::chrono::minutes{2}); 184 scheduler.scheduleFromNow(dummy, std::chrono::minutes{5}); 185 scheduler.scheduleFromNow(dummy, std::chrono::minutes{8}); 186 187 // check taskQueue 188 std::chrono::steady_clock::time_point first, last; 189 size_t num_tasks = scheduler.getQueueInfo(first, last); 190 BOOST_CHECK_EQUAL(num_tasks, 3ul); 191 192 std::thread scheduler_thread([&]() { scheduler.serviceQueue(); }); 193 194 // bump the scheduler forward 5 minutes 195 scheduler.MockForward(std::chrono::minutes{5}); 196 197 // ensure scheduler has chance to process all tasks queued for before 1 ms from now. 198 scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1}); 199 scheduler_thread.join(); 200 201 // check that the queue only has one job remaining 202 num_tasks = scheduler.getQueueInfo(first, last); 203 BOOST_CHECK_EQUAL(num_tasks, 1ul); 204 205 // check that the dummy function actually ran 206 BOOST_CHECK_EQUAL(counter, 2); 207 208 // check that the time of the remaining job has been updated 209 auto now = std::chrono::steady_clock::now(); 210 int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count(); 211 // should be between 2 & 3 minutes from now 212 BOOST_CHECK(delta > 2*60 && delta < 3*60); 213 } 214 215 BOOST_AUTO_TEST_SUITE_END()