/ crates / tor-rtmock / src / simple_time.rs
simple_time.rs
  1  //! Simple provider of simulated time
  2  //!
  3  //! See [`SimpleMockTimeProvider`]
  4  
  5  use std::cmp::Reverse;
  6  use std::future::Future;
  7  use std::pin::Pin;
  8  use std::sync::{Arc, Mutex, MutexGuard};
  9  use std::task::{Context, Poll, Waker};
 10  use std::time::{Duration, Instant, SystemTime};
 11  
 12  use derive_more::AsMut;
 13  use priority_queue::priority_queue::PriorityQueue;
 14  use slotmap_careful::DenseSlotMap;
 15  
 16  use tor_rtcompat::CoarseInstant;
 17  use tor_rtcompat::CoarseTimeProvider;
 18  use tor_rtcompat::SleepProvider;
 19  
 20  use crate::time_core::MockTimeCore;
 21  
 22  /// Simple provider of simulated time
 23  ///
 24  /// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
 25  ///
 26  /// The simulated time advances only when explicitly instructed,
 27  /// by calling [`.advance()`](Provider::advance).
 28  ///
 29  /// The wallclock time can be warped with
 30  /// [`.jump_wallclock()`](Provider::jump_wallclock),
 31  /// allowing simulation of wall clock non-monotonicity.
 32  ///
 33  /// # Panics and aborts
 34  ///
 35  /// Panics on time under/overflow.
 36  ///
 37  /// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
 38  #[derive(Clone, Debug)]
 39  pub struct SimpleMockTimeProvider {
 40      /// The actual state
 41      state: Arc<Mutex<State>>,
 42  }
 43  
 44  /// Convenience abbreviation
 45  pub(crate) use SimpleMockTimeProvider as Provider;
 46  
 47  /// Identifier of a [`SleepFuture`]
 48  type Id = slotmap_careful::DefaultKey;
 49  
 50  /// Future for `sleep`
 51  ///
 52  /// Iff this struct exists, there is an entry for `id` in `prov.futures`.
 53  /// (It might contain `None`.)
 54  pub struct SleepFuture {
 55      /// Reference to our state
 56      prov: Provider,
 57  
 58      /// Which `SleepFuture` are we
 59      id: Id,
 60  }
 61  
 62  /// Mutable state for a [`Provider`]
 63  ///
 64  /// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
 65  ///
 66  /// | state       | [`SleepFuture`]  | `futures`         | `unready`          |
 67  /// |-------------|------------------|------------------|--------------------|
 68  /// | UNPOLLLED   | exists           | present, `None`  | present, `> now`   |
 69  /// | WAITING     | exists           | present, `Some`  | present, `> now`   |
 70  /// | READY       | exists           | present, `None`  | absent             |
 71  /// | DROPPED     | dropped          | absent           | absent             |
 72  #[derive(Debug, AsMut)]
 73  struct State {
 74      /// Current time (coarse)
 75      core: MockTimeCore,
 76  
 77      /// Futures; record of every existing [`SleepFuture`], including any `Waker`
 78      ///
 79      /// Entry exists iff `SleepFuture` exists.
 80      ///
 81      /// Contains `None` if we haven't polled the future;
 82      /// `Some` if we have.
 83      ///
 84      /// We could use a `Vec` or `TiVec`
 85      /// but using a slotmap is more robust against bugs here.
 86      futures: DenseSlotMap<Id, Option<Waker>>,
 87  
 88      /// Priority queue
 89      ///
 90      /// Subset of `futures`.
 91      ///
 92      /// An entry is present iff the `Instant` is *strictly* after `State.now`,
 93      /// in which case that's when the future should be woken.
 94      ///
 95      /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
 96      unready: PriorityQueue<Id, Reverse<Instant>>,
 97  }
 98  
 99  /// `Default` makes a `Provider` which starts at whatever the current real time is
100  impl Default for Provider {
101      fn default() -> Self {
102          Self::from_real()
103      }
104  }
105  
106  impl Provider {
107      /// Return a new mock time provider starting at a specified point in time
108      pub fn new(now: Instant, wallclock: SystemTime) -> Self {
109          let state = State {
110              core: MockTimeCore::new(now, wallclock),
111              futures: Default::default(),
112              unready: Default::default(),
113          };
114          Provider {
115              state: Arc::new(Mutex::new(state)),
116          }
117      }
118  
119      /// Return a new mock time provider starting at the current actual (non-mock) time
120      ///
121      /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
122      /// due to calls to `advance`.
123      pub fn from_real() -> Self {
124          Provider::from_wallclock(SystemTime::now())
125      }
126      /// Return a new mock time provider starting at a specified wallclock time
127      ///
128      /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
129      /// (Absolute values of the real monotonic time are not readily
130      /// observable or distinguishable from Rust,
131      /// nor can a fixed `Instant` be constructed,
132      /// so this is usually sufficient for a reproducible test.)
133      pub fn from_wallclock(wallclock: SystemTime) -> Self {
134          Provider::new(Instant::now(), wallclock)
135      }
136  
137      /// Advance the simulated time by `d`
138      ///
139      /// This advances both the `Instant` (monotonic time)
140      /// and `SystemTime` (wallclock time)
141      /// by the same amount.
142      ///
143      /// Will wake sleeping [`SleepFuture`]s, as appropriate.
144      ///
145      /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
146      /// will only actually execute when they are next polled.
147      /// `advance` does not yield to the executor or poll any futures.
148      /// The executor will (presumably) poll those woken tasks, when it regains control.
149      /// But the order in which the tasks run will depend on its scheduling policy,
150      /// and might be different to the order implied by the futures' timeout values.
151      ///
152      /// To simulate normal time advancement, wakeups, and task activations,
153      /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
154      pub fn advance(&self, d: Duration) {
155          let mut state = self.lock();
156          state.core.advance(d);
157          state.wake_any();
158      }
159  
160      /// Warp the wallclock time
161      ///
162      /// This has no effect on any sleeping futures.
163      /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
164      pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
165          self.lock().core.jump_wallclock(new_wallclock);
166          // Really we ought to wake people up, here.
167          // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
168          // (There might be some less-portable non-Rust APIs for that.)
169      }
170  
171      /// When will the next timeout occur?
172      ///
173      /// Returns the duration until the next [`SleepFuture`] should wake up.
174      ///
175      /// Advancing time by at least this amount will wake up that future,
176      /// and any others with the same wakeup time.
177      ///
178      /// Will never return `Some(ZERO)`:
179      /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
180      /// so it is no longer sleeping and isn't included in the calculation.
181      pub fn time_until_next_timeout(&self) -> Option<Duration> {
182          let state = self.lock();
183          let Reverse(until) = state.unready.peek()?.1;
184          // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
185          // so we don't whether duration_since would panic or saturate.
186          let d = until.duration_since(state.core.instant());
187          Some(d)
188      }
189  
190      /// Convenience function to lock the state
191      fn lock(&self) -> MutexGuard<'_, State> {
192          self.state.lock().expect("simple time state poisoned")
193      }
194  }
195  
196  impl SleepProvider for Provider {
197      type SleepFuture = SleepFuture;
198  
199      fn sleep(&self, d: Duration) -> SleepFuture {
200          let mut state = self.lock();
201          let until = state.core.instant() + d;
202  
203          let id = state.futures.insert(None);
204          state.unready.push(id, Reverse(until));
205  
206          let fut = SleepFuture {
207              id,
208              prov: self.clone(),
209          };
210  
211          // This sleep is now UNPOLLLED, except that its time might be `<= now`:
212  
213          // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
214          // If so, .wake_any() will restore the invariant by immediately waking.
215          state.wake_any();
216  
217          // This sleep is now UNPOLLED or READY, according to whether duration was 0.
218  
219          fut
220      }
221  
222      fn now(&self) -> Instant {
223          self.lock().core.instant()
224      }
225      fn wallclock(&self) -> SystemTime {
226          self.lock().core.wallclock()
227      }
228  }
229  
230  impl CoarseTimeProvider for Provider {
231      fn now_coarse(&self) -> CoarseInstant {
232          self.lock().core.coarse().now_coarse()
233      }
234  }
235  
236  impl Future for SleepFuture {
237      type Output = ();
238  
239      fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
240          let mut state = self.prov.lock();
241          if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
242              // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
243              assert!(*scheduled > state.core.instant());
244              let waker = Some(cx.waker().clone());
245              // Make this be WAITING.  (If we're re-polled, we simply drop any previous waker.)
246              *state
247                  .futures
248                  .get_mut(self.id)
249                  .expect("polling futures entry") = waker;
250              Poll::Pending
251          } else {
252              // Absence implies scheduled (no longer stored) <= now: we are READY
253              Poll::Ready(())
254          }
255      }
256  }
257  
258  impl State {
259      /// Restore the invariant for `unready` after `now` has been increased
260      ///
261      /// Ie, ensures that any sleeps which are
262      /// WAITING/UNPOLLED except that they are `<= now`,
263      /// are moved to state READY.
264      fn wake_any(&mut self) {
265          loop {
266              match self.unready.peek() {
267                  // Keep picking off entries with scheduled <= now
268                  Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
269                      let (id, _) = self.unready.pop().expect("vanished");
270                      // We can .take() the waker since this can only ever run once
271                      // per sleep future (since it happens when we pop it from unready).
272                      let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
273                      if let Some(waker) = futures_entry.take() {
274                          waker.wake();
275                      }
276                  }
277                  _ => break,
278              }
279          }
280      }
281  }
282  
283  impl Drop for SleepFuture {
284      fn drop(&mut self) {
285          let mut state = self.prov.lock();
286          let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
287          let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
288          // Now it is DROPPED.
289      }
290  }
291  
292  #[cfg(test)]
293  mod test {
294      // @@ begin test lint list maintained by maint/add_warning @@
295      #![allow(clippy::bool_assert_comparison)]
296      #![allow(clippy::clone_on_copy)]
297      #![allow(clippy::dbg_macro)]
298      #![allow(clippy::mixed_attributes_style)]
299      #![allow(clippy::print_stderr)]
300      #![allow(clippy::print_stdout)]
301      #![allow(clippy::single_char_pattern)]
302      #![allow(clippy::unwrap_used)]
303      #![allow(clippy::unchecked_duration_subtraction)]
304      #![allow(clippy::useless_vec)]
305      #![allow(clippy::needless_pass_by_value)]
306      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
307      use super::*;
308      use crate::task::MockExecutor;
309      use futures::poll;
310      use humantime::parse_rfc3339;
311      use tor_rtcompat::BlockOn as _;
312      use Poll::*;
313  
314      fn ms(ms: u64) -> Duration {
315          Duration::from_millis(ms)
316      }
317  
318      fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
319      where
320          FUT: Future<Output = ()>,
321      {
322          let sp = Provider::new(
323              Instant::now(), // it would have been nice to make this fixed for the test
324              parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
325          );
326          let exec = MockExecutor::new();
327          exec.block_on(f(sp, exec.clone()));
328      }
329  
330      #[test]
331      fn simple() {
332          run_test(|sp, _exec| async move {
333              let n1 = sp.now();
334              let w1 = sp.wallclock();
335              let mut f1 = sp.sleep(ms(500));
336              let mut f2 = sp.sleep(ms(1500));
337              assert_eq!(poll!(&mut f1), Pending);
338              sp.advance(ms(200));
339              assert_eq!(n1 + ms(200), sp.now());
340              assert_eq!(w1 + ms(200), sp.wallclock());
341              assert_eq!(poll!(&mut f1), Pending);
342              assert_eq!(poll!(&mut f2), Pending);
343              drop(f2);
344              sp.jump_wallclock(w1 + ms(10_000));
345              sp.advance(ms(300));
346              assert_eq!(n1 + ms(500), sp.now());
347              assert_eq!(w1 + ms(10_300), sp.wallclock());
348              assert_eq!(poll!(&mut f1), Ready(()));
349              let mut f0 = sp.sleep(ms(0));
350              assert_eq!(poll!(&mut f0), Ready(()));
351          });
352      }
353  
354      #[test]
355      fn task() {
356          run_test(|sp, exec| async move {
357              let st = Arc::new(Mutex::new(0_i8));
358  
359              exec.spawn_identified("test task", {
360                  let st = st.clone();
361                  let sp = sp.clone();
362                  async move {
363                      *st.lock().unwrap() = 1;
364                      sp.sleep(ms(500)).await;
365                      *st.lock().unwrap() = 2;
366                      sp.sleep(ms(300)).await;
367                      *st.lock().unwrap() = 3;
368                  }
369              });
370  
371              let st = move || *st.lock().unwrap();
372  
373              assert_eq!(st(), 0);
374              exec.progress_until_stalled().await;
375              assert_eq!(st(), 1);
376              assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
377  
378              sp.advance(ms(500));
379  
380              assert_eq!(st(), 1);
381              assert_eq!(sp.time_until_next_timeout(), None);
382              exec.progress_until_stalled().await;
383              assert_eq!(st(), 2);
384              assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
385  
386              sp.advance(ms(500));
387              assert_eq!(st(), 2);
388              assert_eq!(sp.time_until_next_timeout(), None);
389              exec.progress_until_stalled().await;
390              assert_eq!(sp.time_until_next_timeout(), None);
391              assert_eq!(st(), 3);
392          });
393      }
394  }