/ crates / tor-rtmock / src / time.rs
time.rs
  1  //! Functionality for simulating the passage of time in unit tests.
  2  //!
  3  //! We do this by providing [`MockSleepProvider`], a "SleepProvider"
  4  //! instance that can simulate timeouts and retries without requiring
  5  //! the actual system clock to advance.
  6  //!
  7  //! ### Deprecated
  8  //!
  9  //! This mock time facility has some limitations.
 10  //! See [`MockSleepProvider`] for more information.
 11  //! Use [`MockRuntime`](crate::MockRuntime) for new tests.
 12  
 13  #![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
 14  #![allow(clippy::missing_docs_in_private_items)]
 15  
 16  use std::{
 17      cmp::{Eq, Ordering, PartialEq, PartialOrd},
 18      collections::BinaryHeap,
 19      fmt,
 20      pin::Pin,
 21      sync::{Arc, Mutex, Weak},
 22      task::{Context, Poll, Waker},
 23      time::{Duration, Instant, SystemTime},
 24  };
 25  
 26  use futures::Future;
 27  use tracing::trace;
 28  
 29  use std::collections::HashSet;
 30  use std::fmt::Formatter;
 31  use tor_rtcompat::{CoarseInstant, CoarseTimeProvider, SleepProvider};
 32  
 33  use crate::time_core::MockTimeCore;
 34  
 35  /// A dummy [`SleepProvider`] instance for testing.
 36  ///
 37  /// The MockSleepProvider ignores the current time, and instead keeps
 38  /// its own view of the current `Instant` and `SystemTime`.  You
 39  /// can advance them in-step by calling `advance()`, and you can simulate
 40  /// jumps in the system clock by calling `jump()`.
 41  ///
 42  /// This is *not* for production use.
 43  ///
 44  /// ### Deprecated
 45  ///
 46  /// This mock time facility has some limitations, notably lack of support for tasks,
 47  /// and a confusing API for controlling the mock time.
 48  ///
 49  /// New test cases should probably use `MockRuntime`
 50  /// which incorporates `MockSimpletimeProvider`.
 51  ///
 52  /// Comparison of `MockSleepProvider` with `SimpleMockTimeProvider`:
 53  ///
 54  ///  * `SimpleMockTimeProvider` does not support, or expect the use of,
 55  ///    `block_advance` et al.
 56  ///    Instead, the advancement of simulated time is typically done automatically
 57  ///    in cooperation with the executor,
 58  ///    using `MockRuntime`'s `advance_*` methods.
 59  ///
 60  ///  * Consequently, `SimpleMockTimeProvider` can be used in test cases that
 61  ///    spawn tasks and perform sleeps in them.
 62  ///
 63  ///  * And, consequently, `SimpleMockTimeProvider` does not need non-test code to
 64  ///    contain calls which are solely related to getting the time mocking to work right.
 65  ///
 66  ///  * `SimpleMockTimeProvider` gives correct sleeping locations
 67  ///    with `MockExecutor`'s dump of sleeping tasks' stack traces.
 68  ///
 69  ///  * Conversely, to use `SimpleMockTimeProvider` in all but the most simple test cases,
 70  ///    coordination with the executor is required.
 71  ///    This coordination is provided by the integrated `MockRuntime`;
 72  ///    `SimpleMockTimeProvider` is of limited usefulness by itself.
 73  //
 74  // TODO: at some point we should add #[deprecated] to this type
 75  // and to the block_advance etc. methods in SleepProvider.
 76  // But right now that would involve rewriting a whole bunch of tests,
 77  // or generous sprinklings of #[allow].
 78  ///
 79  /// ### Examples
 80  ///
 81  /// Suppose you've written a function that relies on making a
 82  /// connection to the network and possibly timing out:
 83  ///
 84  /// ```rust
 85  /// use tor_rtcompat::{Runtime,SleepProviderExt};
 86  /// use std::{net::SocketAddr, io::Result, time::Duration, io::Error};
 87  /// use futures::io::AsyncWriteExt;
 88  ///
 89  /// async fn say_hi(runtime: impl Runtime, addr: &SocketAddr) -> Result<()> {
 90  ///    let delay = Duration::new(5,0);
 91  ///    runtime.timeout(delay, async {
 92  ///       let mut conn = runtime.connect(addr).await?;
 93  ///       conn.write_all(b"Hello world!\r\n").await?;
 94  ///       conn.close().await?;
 95  ///       Ok::<_,Error>(())
 96  ///    }).await??;
 97  ///    Ok(())
 98  /// }
 99  /// ```
100  ///
101  /// But how should you test this function?
102  ///
103  /// You might try connecting to a well-known website to test the
104  /// connection case, and to a well-known black hole to test the
105  /// timeout case... but that's a bit undesirable.  Your tests might be
106  /// running in a container with no internet access; and even if they
107  /// aren't, it isn't so great for your tests to rely on the actual
108  /// state of the internet.  Similarly, if you make your timeout too long,
109  /// your tests might block for a long time; but if your timeout is too short,
110  /// the tests might fail on a slow machine or on a slow network.
111  ///
112  /// Or, you could solve both of these problems by using `tor-rtmock`
113  /// to replace the internet _and_ the passage of time.  (Here we're only
114  /// replacing the internet.)
115  ///
116  /// ```rust,no_run
117  /// # async fn say_hi<R,A>(runtime: R, addr: A) -> Result<(), ()> { Ok(()) }
118  /// # // TODO this test hangs for some reason?  Fix it and remove no_run above
119  /// use tor_rtmock::{MockSleepRuntime,MockNetRuntime,net::MockNetwork};
120  /// use tor_rtcompat::{NetStreamProvider,NetStreamListener};
121  /// use futures::io::AsyncReadExt;
122  /// use std::net::SocketAddr;
123  /// use futures::StreamExt as _;
124  ///
125  /// tor_rtcompat::test_with_all_runtimes!(|rt| async move {
126  ///
127  ///    let addr1 = "198.51.100.7".parse().unwrap();
128  ///    let addr2 = "198.51.100.99".parse().unwrap();
129  ///    let sockaddr: SocketAddr = "198.51.100.99:101".parse().unwrap();
130  ///
131  ///    // Make a runtime that pretends that we are at the first address...
132  ///    let fake_internet = MockNetwork::new();
133  ///    let rt1 = fake_internet.builder().add_address(addr1).runtime(rt.clone());
134  ///    // ...and one that pretends we're listening at the second address.
135  ///    let rt2 = fake_internet.builder().add_address(addr2).runtime(rt);
136  ///    let listener = rt2.listen(&sockaddr).await.unwrap();
137  ///    let mut incoming_stream = listener.incoming();
138  ///
139  ///    // Now we can test our function!
140  ///    let (result1,output) = futures::join!(
141  ///           say_hi(rt1, &sockaddr),
142  ///           async {
143  ///               let (mut conn,addr) = incoming_stream.next().await.unwrap().unwrap();
144  ///               assert_eq!(addr.ip(), addr1);
145  ///               let mut output = Vec::new();
146  ///               conn.read_to_end(&mut output).await.unwrap();
147  ///               output
148  ///           });
149  ///
150  ///    assert!(result1.is_ok());
151  ///    assert_eq!(&output[..], b"Hello world!\r\n");
152  /// });
153  /// ```
154  #[derive(Clone)]
155  pub struct MockSleepProvider {
156      /// The shared backend for this MockSleepProvider and its futures.
157      state: Arc<Mutex<SleepSchedule>>,
158  }
159  
160  impl fmt::Debug for MockSleepProvider {
161      fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
162          f.debug_struct("MockSleepProvider").finish_non_exhaustive()
163      }
164  }
165  
166  /// Shared backend for sleep provider and Sleeping futures.
167  struct SleepSchedule {
168      /// What time do we pretend it is?
169      core: MockTimeCore,
170      /// Priority queue of events, in the order that we should wake them.
171      sleepers: BinaryHeap<SleepEntry>,
172      /// If the mock time system is being driven by a `WaitFor`, holds a `Waker` to wake up that
173      /// `WaitFor` in order for it to make more progress.
174      waitfor_waker: Option<Waker>,
175      /// Number of sleepers instantiated.
176      sleepers_made: usize,
177      /// Number of sleepers polled.
178      sleepers_polled: usize,
179      /// Whether an advance is needed.
180      should_advance: bool,
181      /// A set of reasons why advances shouldn't be allowed right now.
182      blocked_advance: HashSet<String>,
183      /// A time up to which advances are allowed, irrespective of them being blocked.
184      allowed_advance: Duration,
185  }
186  
187  /// An entry telling us when to wake which future up.
188  struct SleepEntry {
189      /// The time at which this entry should wake
190      when: Instant,
191      /// The Waker to call when the instant has passed.
192      waker: Waker,
193  }
194  
195  /// A future returned by [`MockSleepProvider::sleep()`].
196  pub struct Sleeping {
197      /// The instant when we should become ready.
198      when: Instant,
199      /// True if we have pushed this into the queue.
200      inserted: bool,
201      /// The schedule to queue ourselves in if we're polled before we're ready.
202      provider: Weak<Mutex<SleepSchedule>>,
203  }
204  
205  impl Default for MockSleepProvider {
206      fn default() -> Self {
207          let wallclock = humantime::parse_rfc3339("2023-07-05T11:25:56Z").expect("parse");
208          MockSleepProvider::new(wallclock)
209      }
210  }
211  
212  impl MockSleepProvider {
213      /// Create a new MockSleepProvider, starting at a given wall-clock time.
214      pub fn new(wallclock: SystemTime) -> Self {
215          let instant = Instant::now();
216          let sleepers = BinaryHeap::new();
217          let core = MockTimeCore::new(instant, wallclock);
218          let state = SleepSchedule {
219              core,
220              sleepers,
221              waitfor_waker: None,
222              sleepers_made: 0,
223              sleepers_polled: 0,
224              should_advance: false,
225              blocked_advance: HashSet::new(),
226              allowed_advance: Duration::from_nanos(0),
227          };
228          MockSleepProvider {
229              state: Arc::new(Mutex::new(state)),
230          }
231      }
232  
233      /// Advance the simulated timeline forward by `dur`.
234      ///
235      /// Calling this function will wake any pending futures as
236      /// appropriate, and yield to the scheduler so they get a chance
237      /// to run.
238      ///
239      /// # Limitations
240      ///
241      /// This function advances time in one big step.  We might instead
242      /// want to advance in small steps and make sure that each step's
243      /// futures can get run before the ones scheduled to run after it.
244      pub async fn advance(&self, dur: Duration) {
245          self.advance_noyield(dur);
246          tor_rtcompat::task::yield_now().await;
247      }
248  
249      /// Advance the simulated timeline forward by `dur`.
250      ///
251      /// Calling this function will wake any pending futures as
252      /// appropriate, but not yield to the scheduler.  Mostly you
253      /// should call [`advance`](Self::advance) instead.
254      pub(crate) fn advance_noyield(&self, dur: Duration) {
255          // It's not so great to unwrap here in general, but since this is
256          // only testing code we don't really care.
257          let mut state = self.state.lock().expect("Poisoned lock for state");
258          state.core.advance(dur);
259          state.fire();
260      }
261  
262      /// Simulate a discontinuity in the system clock, by jumping to
263      /// `new_wallclock`.
264      ///
265      /// # Panics
266      ///
267      /// Panics if we have already panicked while holding the lock on
268      /// the internal timer state, and the lock is poisoned.
269      pub fn jump_to(&self, new_wallclock: SystemTime) {
270          let mut state = self.state.lock().expect("Poisoned lock for state");
271          state.core.jump_wallclock(new_wallclock);
272      }
273  
274      /// Return the amount of virtual time until the next timeout
275      /// should elapse.
276      ///
277      /// If there are no more timeouts, return None.  If the next
278      /// timeout should elapse right now, return Some(0).
279      pub(crate) fn time_until_next_timeout(&self) -> Option<Duration> {
280          let state = self.state.lock().expect("Poisoned lock for state");
281          let now = state.core.instant();
282          state
283              .sleepers
284              .peek()
285              .map(|sleepent| sleepent.when.saturating_duration_since(now))
286      }
287  
288      /// Return true if a `WaitFor` driving this sleep provider should advance time in order for
289      /// futures blocked on sleeping to make progress.
290      ///
291      /// NOTE: This function has side-effects; if it returns true, the caller is expected to do an
292      /// advance before calling it again.
293      #[allow(clippy::cognitive_complexity)]
294      pub(crate) fn should_advance(&mut self) -> bool {
295          let mut state = self.state.lock().expect("Poisoned lock for state");
296          if !state.blocked_advance.is_empty() && state.allowed_advance == Duration::from_nanos(0) {
297              // We've had advances blocked, and don't have any quota for doing allowances while
298              // blocked left.
299              trace!(
300                  "should_advance = false: blocked by {:?}",
301                  state.blocked_advance
302              );
303              return false;
304          }
305          if !state.should_advance {
306              // The advance flag wasn't set.
307              trace!("should_advance = false; bit not previously set");
308              return false;
309          }
310          // Clear the advance flag; we'll either return true and cause an advance to happen,
311          // or the reasons to return false below also imply that the advance flag will be set again
312          // later on.
313          state.should_advance = false;
314          if state.sleepers_polled < state.sleepers_made {
315              // Something did set the advance flag before, but it's not valid any more now because
316              // more unpolled sleepers were created.
317              trace!("should_advance = false; advancing no longer valid");
318              return false;
319          }
320          if !state.blocked_advance.is_empty() && state.allowed_advance > Duration::from_nanos(0) {
321              // If we're here, we would've returned earlier due to having advances blocked, but
322              // we have quota to advance up to a certain time while advances are blocked.
323              // Let's see when the next timeout is, and whether it falls within that quota.
324              let next_timeout = {
325                  let now = state.core.instant();
326                  state
327                      .sleepers
328                      .peek()
329                      .map(|sleepent| sleepent.when.saturating_duration_since(now))
330              };
331              let next_timeout = match next_timeout {
332                  Some(x) => x,
333                  None => {
334                      // There's no timeout set, so we really shouldn't be here anyway.
335                      trace!("should_advance = false; allow_one set but no timeout yet");
336                      return false;
337                  }
338              };
339              if next_timeout <= state.allowed_advance {
340                  // We can advance up to the next timeout, since it's in our quota.
341                  // Subtract the amount we're going to advance by from said quota.
342                  state.allowed_advance -= next_timeout;
343                  trace!(
344                      "WARNING: allowing advance due to allow_one; new allowed is {:?}",
345                      state.allowed_advance
346                  );
347              } else {
348                  // The next timeout is too far in the future.
349                  trace!(
350                      "should_advance = false; allow_one set but only up to {:?}, next is {:?}",
351                      state.allowed_advance,
352                      next_timeout
353                  );
354                  return false;
355              }
356          }
357          true
358      }
359  
360      /// Register a `Waker` to be woken up when an advance in time is required to make progress.
361      ///
362      /// This is used by `WaitFor`.
363      pub(crate) fn register_waitfor_waker(&mut self, waker: Waker) {
364          let mut state = self.state.lock().expect("Poisoned lock for state");
365          state.waitfor_waker = Some(waker);
366      }
367  
368      /// Remove a previously registered `Waker` registered with `register_waitfor_waker()`.
369      pub(crate) fn clear_waitfor_waker(&mut self) {
370          let mut state = self.state.lock().expect("Poisoned lock for state");
371          state.waitfor_waker = None;
372      }
373  
374      /// Returns true if a `Waker` has been registered with `register_waitfor_waker()`.
375      ///
376      /// This is used to ensure that you don't have two concurrent `WaitFor`s running.
377      pub(crate) fn has_waitfor_waker(&self) -> bool {
378          let state = self.state.lock().expect("Poisoned lock for state");
379          state.waitfor_waker.is_some()
380      }
381  }
382  
383  impl SleepSchedule {
384      /// Wake any pending events that are ready according to the
385      /// current simulated time.
386      fn fire(&mut self) {
387          use std::collections::binary_heap::PeekMut;
388  
389          let now = self.core.instant();
390          while let Some(top) = self.sleepers.peek_mut() {
391              if now < top.when {
392                  return;
393              }
394  
395              PeekMut::pop(top).waker.wake();
396          }
397      }
398  
399      /// Add a new SleepEntry to this schedule.
400      fn push(&mut self, ent: SleepEntry) {
401          self.sleepers.push(ent);
402      }
403  
404      /// If all sleepers made have been polled, set the advance flag and wake up any `WaitFor` that
405      /// might be waiting.
406      fn maybe_advance(&mut self) {
407          if self.sleepers_polled >= self.sleepers_made {
408              if let Some(ref waker) = self.waitfor_waker {
409                  trace!("setting advance flag");
410                  self.should_advance = true;
411                  waker.wake_by_ref();
412              } else {
413                  trace!("would advance, but no waker");
414              }
415          }
416      }
417  
418      /// Register a sleeper as having been polled, and advance if necessary.
419      fn increment_poll_count(&mut self) {
420          self.sleepers_polled += 1;
421          trace!(
422              "sleeper polled, {}/{}",
423              self.sleepers_polled,
424              self.sleepers_made
425          );
426          self.maybe_advance();
427      }
428  }
429  
430  impl SleepProvider for MockSleepProvider {
431      type SleepFuture = Sleeping;
432      fn sleep(&self, duration: Duration) -> Self::SleepFuture {
433          let mut provider = self.state.lock().expect("Poisoned lock for state");
434          let when = provider.core.instant() + duration;
435          // We're making a new sleeper, so register this in the state.
436          provider.sleepers_made += 1;
437          trace!(
438              "sleeper made for {:?}, {}/{}",
439              duration,
440              provider.sleepers_polled,
441              provider.sleepers_made
442          );
443  
444          Sleeping {
445              when,
446              inserted: false,
447              provider: Arc::downgrade(&self.state),
448          }
449      }
450  
451      fn block_advance<T: Into<String>>(&self, reason: T) {
452          let mut provider = self.state.lock().expect("Poisoned lock for state");
453          let reason = reason.into();
454          trace!("advancing blocked: {}", reason);
455          provider.blocked_advance.insert(reason);
456      }
457  
458      fn release_advance<T: Into<String>>(&self, reason: T) {
459          let mut provider = self.state.lock().expect("Poisoned lock for state");
460          let reason = reason.into();
461          trace!("advancing released: {}", reason);
462          provider.blocked_advance.remove(&reason);
463          if provider.blocked_advance.is_empty() {
464              provider.maybe_advance();
465          }
466      }
467  
468      fn allow_one_advance(&self, dur: Duration) {
469          let mut provider = self.state.lock().expect("Poisoned lock for state");
470          provider.allowed_advance = Duration::max(provider.allowed_advance, dur);
471          trace!(
472              "** allow_one_advance fired; may advance up to {:?} **",
473              provider.allowed_advance
474          );
475          provider.maybe_advance();
476      }
477  
478      fn now(&self) -> Instant {
479          self.state
480              .lock()
481              .expect("Poisoned lock for state")
482              .core
483              .instant()
484      }
485  
486      fn wallclock(&self) -> SystemTime {
487          self.state
488              .lock()
489              .expect("Poisoned lock for state")
490              .core
491              .wallclock()
492      }
493  }
494  
495  impl CoarseTimeProvider for MockSleepProvider {
496      fn now_coarse(&self) -> CoarseInstant {
497          self.state
498              .lock()
499              .expect("poisoned")
500              .core
501              .coarse()
502              .now_coarse()
503      }
504  }
505  
506  impl PartialEq for SleepEntry {
507      fn eq(&self, other: &Self) -> bool {
508          self.when == other.when
509      }
510  }
511  impl Eq for SleepEntry {}
512  impl PartialOrd for SleepEntry {
513      fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
514          Some(self.cmp(other))
515      }
516  }
517  impl Ord for SleepEntry {
518      fn cmp(&self, other: &Self) -> Ordering {
519          self.when.cmp(&other.when).reverse()
520      }
521  }
522  
523  impl Drop for Sleeping {
524      fn drop(&mut self) {
525          if let Some(provider) = Weak::upgrade(&self.provider) {
526              let mut provider = provider.lock().expect("Poisoned lock for provider");
527              if !self.inserted {
528                  // A sleeper being dropped will never be polled, so there's no point waiting;
529                  // act as if it's been polled in order to avoid waiting forever.
530                  trace!("sleeper dropped, incrementing count");
531                  provider.increment_poll_count();
532                  self.inserted = true;
533              }
534          }
535      }
536  }
537  
538  impl Future for Sleeping {
539      type Output = ();
540      fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
541          if let Some(provider) = Weak::upgrade(&self.provider) {
542              let mut provider = provider.lock().expect("Poisoned lock for provider");
543              let now = provider.core.instant();
544  
545              if now >= self.when {
546                  // The sleep time's elapsed.
547                  if !self.inserted {
548                      // If we never registered this sleeper as being polled, do so now.
549                      provider.increment_poll_count();
550                      self.inserted = true;
551                  }
552                  if !provider.should_advance {
553                      // The first advance during a `WaitFor` gets triggered by all sleepers that
554                      // have been created being polled.
555                      // However, this only happens once.
556                      // What we do to get around this is have sleepers that return Ready kick off
557                      // another advance, in order to wake the next waiting sleeper.
558                      provider.maybe_advance();
559                  }
560                  return Poll::Ready(());
561              }
562              // dbg!("sleep check with", self.when-now);
563  
564              if !self.inserted {
565                  let entry = SleepEntry {
566                      when: self.when,
567                      waker: cx.waker().clone(),
568                  };
569  
570                  provider.push(entry);
571                  self.inserted = true;
572                  // Register this sleeper as having been polled.
573                  provider.increment_poll_count();
574              }
575              // dbg!(provider.sleepers.len());
576          }
577          Poll::Pending
578      }
579  }
580  
581  #[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
582  mod test {
583      // @@ begin test lint list maintained by maint/add_warning @@
584      #![allow(clippy::bool_assert_comparison)]
585      #![allow(clippy::clone_on_copy)]
586      #![allow(clippy::dbg_macro)]
587      #![allow(clippy::mixed_attributes_style)]
588      #![allow(clippy::print_stderr)]
589      #![allow(clippy::print_stdout)]
590      #![allow(clippy::single_char_pattern)]
591      #![allow(clippy::unwrap_used)]
592      #![allow(clippy::unchecked_duration_subtraction)]
593      #![allow(clippy::useless_vec)]
594      #![allow(clippy::needless_pass_by_value)]
595      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
596      use super::*;
597      use tor_rtcompat::test_with_all_runtimes;
598  
599      #[test]
600      fn basics_of_time_travel() {
601          let w1 = SystemTime::now();
602          let sp = MockSleepProvider::new(w1);
603          let i1 = sp.now();
604          assert_eq!(sp.wallclock(), w1);
605  
606          let interval = Duration::new(4 * 3600 + 13 * 60, 0);
607          sp.advance_noyield(interval);
608          assert_eq!(sp.now(), i1 + interval);
609          assert_eq!(sp.wallclock(), w1 + interval);
610  
611          sp.jump_to(w1 + interval * 3);
612          assert_eq!(sp.now(), i1 + interval);
613          assert_eq!(sp.wallclock(), w1 + interval * 3);
614      }
615  
616      #[test]
617      fn time_moves_on() {
618          test_with_all_runtimes!(|_| async {
619              use oneshot_fused_workaround as oneshot;
620              use std::sync::atomic::AtomicBool;
621              use std::sync::atomic::Ordering;
622  
623              let sp = MockSleepProvider::new(SystemTime::now());
624              let one_hour = Duration::new(3600, 0);
625  
626              let (s1, r1) = oneshot::channel();
627              let (s2, r2) = oneshot::channel();
628              let (s3, r3) = oneshot::channel();
629  
630              let b1 = AtomicBool::new(false);
631              let b2 = AtomicBool::new(false);
632              let b3 = AtomicBool::new(false);
633  
634              let real_start = Instant::now();
635  
636              futures::join!(
637                  async {
638                      sp.sleep(one_hour).await;
639                      b1.store(true, Ordering::SeqCst);
640                      s1.send(()).unwrap();
641                  },
642                  async {
643                      sp.sleep(one_hour * 3).await;
644                      b2.store(true, Ordering::SeqCst);
645                      s2.send(()).unwrap();
646                  },
647                  async {
648                      sp.sleep(one_hour * 5).await;
649                      b3.store(true, Ordering::SeqCst);
650                      s3.send(()).unwrap();
651                  },
652                  async {
653                      sp.advance(one_hour * 2).await;
654                      r1.await.unwrap();
655                      assert!(b1.load(Ordering::SeqCst));
656                      assert!(!b2.load(Ordering::SeqCst));
657                      assert!(!b3.load(Ordering::SeqCst));
658  
659                      sp.advance(one_hour * 2).await;
660                      r2.await.unwrap();
661                      assert!(b1.load(Ordering::SeqCst));
662                      assert!(b2.load(Ordering::SeqCst));
663                      assert!(!b3.load(Ordering::SeqCst));
664  
665                      sp.advance(one_hour * 2).await;
666                      r3.await.unwrap();
667                      assert!(b1.load(Ordering::SeqCst));
668                      assert!(b2.load(Ordering::SeqCst));
669                      assert!(b3.load(Ordering::SeqCst));
670                      let real_end = Instant::now();
671  
672                      assert!(real_end - real_start < one_hour);
673                  }
674              );
675              std::io::Result::Ok(())
676          });
677      }
678  }