/ crates / tor-rtmock / src / runtime.rs
runtime.rs
  1  //! Completely mock runtime
  2  
  3  #![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
  4  
  5  use std::fmt::{Debug, Display};
  6  use std::ops::ControlFlow;
  7  
  8  use amplify::Getters;
  9  use futures::FutureExt as _;
 10  use itertools::chain;
 11  use strum::IntoEnumIterator as _;
 12  use void::{ResultVoidExt as _, Void};
 13  
 14  use crate::util::impl_runtime_prelude::*;
 15  
 16  use crate::net::MockNetProvider;
 17  use crate::simple_time::SimpleMockTimeProvider;
 18  use crate::task::{MockExecutor, SchedulingPolicy};
 19  
 20  /// Completely mock runtime, with simulated time
 21  ///
 22  /// Suitable for test cases that wish to completely control
 23  /// the environment experienced by the code under test.
 24  ///
 25  /// ### Useful properties
 26  ///
 27  /// The execution order is deterministic.
 28  /// Time will advance only in a controlled fashion.
 29  /// Typically, the main task in a test will call
 30  /// [`advance_until_stalled`](MockRuntime::advance_until_stalled).
 31  ///
 32  /// Reliable sequencing techniques which can be used in tests include:
 33  /// sleeping for carefully chosen durations;
 34  /// interlocking via intertask channels; or,
 35  /// sequenced control of requests to the code under test.
 36  ///
 37  /// ### Restrictions
 38  ///
 39  /// The test case must advance the mock time explicitly as desired,
 40  /// typically by calling one of the `MockRuntime::advance_*` methods.
 41  ///
 42  /// Tests that use this runtime *must not* interact with the outside world;
 43  /// everything must go through this runtime (and its pieces).
 44  ///
 45  /// There is no mocking of filesystem access;
 46  /// the `MockRuntime`'s time will disagree with `SystemTime`'s
 47  /// obtained from (for example) `std::fs::Metadata`.
 48  ///
 49  /// #### Allowed
 50  ///
 51  ///  * Inter-future communication facilities from `futures`
 52  ///    or other runtime-agnostic crates.
 53  ///
 54  ///  * Fast synchronous operations that will complete "immediately" or "quickly".
 55  ///    E.g.: filesystem calls.
 56  ///
 57  ///  * `std::sync::Mutex` (assuming the use is deadlock-free in a single-threaded
 58  ///    executor, as it should be in all of Arti).
 59  ///
 60  ///  * Slower operations that are run synchronously (without futures `await`)
 61  ///    provided their completion doesn't depend on any of the futures we're running.
 62  ///    (These kind of operations are often discouraged in async contexts,
 63  ///    because they block the async runtime or its worker threads.
 64  ///    But they are often OK in tests.)
 65  ///
 66  ///  * All facilities provided by this `MockExecutor` and its trait impls.
 67  ///
 68  /// #### Not allowed
 69  ///
 70  ///  * Direct access to the real-world clock (`SystemTime::now`, `Instant::now`),
 71  ///    including direct use of `coarsetime`.
 72  ///    Instead, use [`SleepProvider`] and [`CoarseTimeProvider`] methods on the runtime.
 73  ///    Exception: CPU use measurements.
 74  ///
 75  ///  * Anything that spawns threads and then communicates with those threads
 76  ///    using async Rust facilities (futures).
 77  ///
 78  ///  * Async sockets, or async use of other kernel-based IPC or network mechanisms.
 79  ///
 80  ///  * Anything provided by a Rust runtime/executor project (eg anything from Tokio),
 81  ///    unless it is definitively established that it's runtime-agnostic.
 82  #[derive(Debug, Default, Clone, Getters, Deftly)]
 83  #[derive_deftly(SomeMockRuntime)]
 84  #[getter(prefix = "mock_")]
 85  pub struct MockRuntime {
 86      /// Tasks
 87      #[deftly(mock(task))]
 88      task: MockExecutor,
 89      /// Time provider
 90      #[deftly(mock(sleep))]
 91      sleep: SimpleMockTimeProvider,
 92      /// Net provider
 93      #[deftly(mock(net))]
 94      net: MockNetProvider,
 95  }
 96  
 97  /// Builder for a manually-configured `MockRuntime`
 98  #[derive(Debug, Default, Clone)]
 99  pub struct MockRuntimeBuilder {
100      /// scheduling policy
101      scheduling: SchedulingPolicy,
102      /// sleep provider
103      sleep: Option<SimpleMockTimeProvider>,
104      /// starting wall clock time
105      starting_wallclock: Option<SystemTime>,
106  }
107  
108  impl MockRuntime {
109      /// Create a new `MockRuntime` with default parameters
110      pub fn new() -> Self {
111          Self::default()
112      }
113  
114      /// Return a builder, for creating a `MockRuntime` with some parameters manually configured
115      pub fn builder() -> MockRuntimeBuilder {
116          Default::default()
117      }
118  
119      /// Run a test case with a variety of runtime parameters, to try to find bugs
120      ///
121      /// `test_case` is an async closure which receives a `MockRuntime`.
122      /// It will be run with a number of differently configured executors.
123      ///
124      /// Each run will be preceded by an [`eprintln!`] showing the runtime configuration.
125      ///
126      /// ### Variations
127      ///
128      /// The only variation currently implemented is this:
129      ///
130      /// Both FIFO and LIFO scheduling policies are tested,
131      /// in the hope that this will help discover ordering-dependent bugs.
132      pub fn test_with_various<TC, FUT>(mut test_case: TC)
133      where
134          TC: FnMut(MockRuntime) -> FUT,
135          FUT: Future<Output = ()>,
136      {
137          Self::try_test_with_various(|runtime| test_case(runtime).map(|()| Ok::<_, Void>(())))
138              .void_unwrap();
139      }
140  
141      /// Run a faillible test case with a variety of runtime parameters, to try to find bugs
142      ///
143      /// `test_case` is an async closure which receives a `MockRuntime`.
144      /// It will be run with a number of differently configured executors.
145      ///
146      /// This function accepts a fallible closure,
147      /// and returns the first `Err` to the caller.
148      ///
149      /// See [`test_with_various()`](MockRuntime::test_with_various) for more details.
150      #[allow(clippy::print_stderr)]
151      pub fn try_test_with_various<TC, FUT, E>(mut test_case: TC) -> Result<(), E>
152      where
153          TC: FnMut(MockRuntime) -> FUT,
154          FUT: Future<Output = Result<(), E>>,
155      {
156          for scheduling in SchedulingPolicy::iter() {
157              let config = MockRuntime::builder().scheduling(scheduling);
158              eprintln!("running test with MockRuntime configuration {config:?}");
159              let runtime = config.build();
160              runtime.block_on(test_case(runtime.clone()))?;
161          }
162          Ok(())
163      }
164  
165      /// Spawn a task and return something to identify it
166      ///
167      /// See [`MockExecutor::spawn_identified()`]
168      pub fn spawn_identified(
169          &self,
170          desc: impl Display,
171          fut: impl Future<Output = ()> + Send + 'static,
172      ) -> impl Debug + Clone + Send + 'static {
173          self.task.spawn_identified(desc, fut)
174      }
175  
176      /// Spawn a task and return its output for further usage
177      ///
178      /// See [`MockExecutor::spawn_join()`]
179      pub fn spawn_join<T: Debug + Send + 'static>(
180          &self,
181          desc: impl Display,
182          fut: impl Future<Output = T> + Send + 'static,
183      ) -> impl Future<Output = T> {
184          self.task.spawn_join(desc, fut)
185      }
186  
187      /// Run tasks and advance time, until every task except this one is waiting
188      ///
189      /// On return the other tasks won't be waiting on timeouts,
190      /// since time will be advanced as needed.
191      ///
192      /// Therefore the other tasks (if any) will be waiting for something
193      /// that won't happen by itself,
194      /// such as a provocation via their APIs from this task.
195      ///
196      /// # Panics
197      ///
198      /// See [`progress_until_stalled`](MockRuntime::progress_until_stalled)
199      pub async fn advance_until_stalled(&self) {
200          self.advance_inner(|| {
201              let Some(timeout) = self.time_until_next_timeout() else {
202                  // Nothing is waiting on timeouts
203                  return ControlFlow::Break(());
204              };
205              assert_ne!(timeout, Duration::ZERO);
206              ControlFlow::Continue(timeout)
207          })
208          .await;
209      }
210  
211      /// Run tasks in the current executor until every task except this one is waiting
212      ///
213      /// Calls [`MockExecutor::progress_until_stalled()`].
214      ///
215      /// # Restriction - no automatic time advance
216      ///
217      /// The mocked time will *not* be automatically advanced.
218      ///
219      /// Usually
220      /// (and especially if the tasks under test are waiting for timeouts or periodic events)
221      /// you must use
222      /// [`advance_by()`](MockRuntime::advance_by)
223      /// or
224      /// [`advance_until()`](MockRuntime::advance_until)
225      /// to ensure the simulated time progresses as required.
226      ///
227      /// # Panics
228      ///
229      /// Might malfunction or panic if more than one such call is running at once.
230      ///
231      /// (Ie, you must `.await` or drop the returned `Future`
232      /// before calling this method again.)
233      ///
234      /// Must be called and awaited within a future being run by `self`.
235      pub async fn progress_until_stalled(&self) {
236          self.task.progress_until_stalled().await;
237      }
238  
239      /// Run tasks and advance time up to at most `limit`
240      ///
241      /// Will return when all other tasks are either:
242      ///  * Waiting on a timeout that will fire strictly after `limit`,
243      ///    (return value is the time until the earliest such)
244      ///  * Waiting for something else that won't happen by itself.
245      ///    (return value is `None`)
246      ///
247      /// Like [`advance_until_stalled`](MockRuntime::advance_until_stalled)
248      /// but stops when the mock time reaches `limit`.
249      ///
250      /// # Panics
251      ///
252      /// Panics if the time somehow advances beyond `limit`.
253      /// (This function won't do that, but maybe it was beyond `limit` on entry,
254      /// or another task advanced the clock.)
255      ///
256      /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
257      pub async fn advance_until(&self, limit: Instant) -> Option<Duration> {
258          self.advance_inner(|| {
259              let timeout = self.time_until_next_timeout();
260  
261              let limit = limit
262                  .checked_duration_since(self.now())
263                  .expect("MockRuntime::advance_until: time advanced beyond `limit`!");
264  
265              if limit == Duration::ZERO {
266                  // Time has reached `limit`
267                  return ControlFlow::Break(timeout);
268              }
269  
270              let advance = chain!(timeout, [limit]).min().expect("empty!");
271              assert_ne!(advance, Duration::ZERO);
272  
273              ControlFlow::Continue(advance)
274          })
275          .await
276      }
277  
278      /// Advance time, firing events and other tasks - internal implementation
279      ///
280      /// Common code for `advance_*`.
281      ///
282      /// `body` will called after `progress_until_stalled`.
283      /// It should examine the simulated time, and the next timeout,
284      /// and decide what to do - returning
285      /// `Break` to break the loop, or
286      /// `Continue` giving the `Duration` by which to advance time and go round again.
287      #[allow(clippy::print_stderr)]
288      async fn advance_inner<B>(&self, mut body: impl FnMut() -> ControlFlow<B, Duration>) -> B {
289          /// Warn when we loop more than this many times per call
290          const WARN_AT: u32 = 1000;
291          let mut counter = Some(WARN_AT);
292  
293          loop {
294              self.task.progress_until_stalled().await;
295  
296              match body() {
297                  ControlFlow::Break(v) => break v,
298                  ControlFlow::Continue(advance) => {
299                      counter = match counter.map(|v| v.checked_sub(1)) {
300                          None => None,
301                          Some(Some(v)) => Some(v),
302                          Some(None) => {
303                              eprintln!(
304   "warning: MockRuntime advance_* looped >{WARN_AT} (next sleep: {}ms)\n{:?}",
305                                  advance.as_millis(),
306                                  self.mock_task().as_debug_dump(),
307                              );
308                              None
309                          }
310                      };
311  
312                      self.sleep.advance(advance);
313                  }
314              }
315          }
316      }
317  
318      /// Advances time by `dur`, firing time events and other tasks in order
319      ///
320      /// Prefer this to [`SimpleMockTimeProvider::advance()`];
321      /// it works more faithfully.
322      ///
323      /// Specifically, it advances time in successive stages,
324      /// so that timeouts occur sequentially, in the right order.
325      ///
326      /// # Panics
327      ///
328      /// Can panic if the mock time is advanced by other tasks.
329      ///
330      /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
331      pub async fn advance_by(&self, dur: Duration) -> Option<Duration> {
332          let limit = self
333              .now()
334              .checked_add(dur)
335              .expect("MockRuntime::advance: time overflow");
336  
337          self.advance_until(limit).await
338      }
339  
340      /// See [`SimpleMockTimeProvider::jump_wallclock()`]
341      pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
342          self.sleep.jump_wallclock(new_wallclock);
343      }
344  
345      /// Return the amount of virtual time until the next timeout
346      /// should elapse.
347      ///
348      /// If there are no more timeouts, return None.
349      ///
350      /// If the next
351      /// timeout should elapse right now, return Some(0).
352      /// However, if other tasks are proceeding,
353      /// typically in that situation those other tasks will wake,
354      /// so a `Some(0)` return won't be visible.
355      /// In test cases, detect immediate timeouts by detecting
356      /// what your task does after the timeout occurs.
357      ///
358      /// Likewise whether this function returns `None` or `Some(...)`
359      /// can depend on whether tasks have actually yet polled various futures.
360      /// The answer should be correct after
361      /// [`progress_until_stalled`](Self::progress_until_stalled).
362      pub fn time_until_next_timeout(&self) -> Option<Duration> {
363          self.sleep.time_until_next_timeout()
364      }
365  }
366  
367  impl MockRuntimeBuilder {
368      /// Set the scheduling policy
369      pub fn scheduling(mut self, scheduling: SchedulingPolicy) -> Self {
370          self.scheduling = scheduling;
371          self
372      }
373  
374      /// Provide a non-`Default` [`SimpleMockTimeProvider`]
375      pub fn sleep_provider(mut self, sleep: SimpleMockTimeProvider) -> Self {
376          self.sleep = Some(sleep);
377          self
378      }
379  
380      /// Set the starting wall clock time
381      pub fn starting_wallclock(mut self, starting_wallclock: SystemTime) -> Self {
382          self.starting_wallclock = Some(starting_wallclock);
383          self
384      }
385  
386      /// Build the runtime
387      pub fn build(self) -> MockRuntime {
388          let MockRuntimeBuilder {
389              scheduling,
390              sleep,
391              starting_wallclock,
392          } = self;
393  
394          let sleep = sleep.unwrap_or_default();
395          if let Some(starting_wallclock) = starting_wallclock {
396              sleep.jump_wallclock(starting_wallclock);
397          };
398  
399          let task = MockExecutor::with_scheduling(scheduling);
400  
401          MockRuntime {
402              sleep,
403              task,
404              ..Default::default()
405          }
406      }
407  }
408  
409  #[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
410  mod test {
411      // @@ begin test lint list maintained by maint/add_warning @@
412      #![allow(clippy::bool_assert_comparison)]
413      #![allow(clippy::clone_on_copy)]
414      #![allow(clippy::dbg_macro)]
415      #![allow(clippy::mixed_attributes_style)]
416      #![allow(clippy::print_stderr)]
417      #![allow(clippy::print_stdout)]
418      #![allow(clippy::single_char_pattern)]
419      #![allow(clippy::unwrap_used)]
420      #![allow(clippy::unchecked_duration_subtraction)]
421      #![allow(clippy::useless_vec)]
422      #![allow(clippy::needless_pass_by_value)]
423      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
424      use super::*;
425      use futures::channel::mpsc;
426      use futures::{SinkExt as _, StreamExt as _};
427      use std::sync::atomic::AtomicBool;
428      use std::sync::atomic::Ordering::SeqCst;
429      use std::sync::Arc;
430      use tracing::trace;
431      use tracing_test::traced_test;
432  
433      //---------- helper alias ----------
434  
435      fn ms(i: u64) -> Duration {
436          Duration::from_millis(i)
437      }
438  
439      //---------- set up some test tasks ----------
440  
441      struct TestTasks {
442          runtime: MockRuntime,
443          start: Instant,
444          tx: mpsc::Sender<()>,
445          signals: Vec<Arc<AtomicBool>>,
446      }
447      impl TestTasks {
448          fn spawn(runtime: &MockRuntime) -> TestTasks {
449              let start = runtime.now();
450              let mut signals = vec![];
451  
452              let mut new_signal = || {
453                  let signal = Arc::new(AtomicBool::new(false));
454                  signals.push(signal.clone());
455                  signal
456              };
457  
458              let (tx, mut rx) = mpsc::channel(0);
459              runtime.spawn_identified("rx", {
460                  let signal = new_signal();
461                  async move {
462                      trace!("task rx starting...");
463                      let _: Option<()> = rx.next().await;
464                      signal.store(true, SeqCst);
465                      trace!("task rx finished.");
466                  }
467              });
468  
469              for i in 1..=3 {
470                  let signal = new_signal();
471                  runtime.spawn_identified(i, {
472                      let runtime = runtime.clone();
473                      async move {
474                          trace!("task {i} starting...");
475                          runtime.sleep(ms(i * 1000)).await;
476                          signal.store(true, SeqCst);
477                          trace!("task {i} finished.");
478                      }
479                  });
480              }
481              let runtime = runtime.clone();
482  
483              TestTasks {
484                  runtime,
485                  start,
486                  tx,
487                  signals,
488              }
489          }
490  
491          fn signals_list(&self) -> String {
492              self.signals
493                  .iter()
494                  .map(|s| if s.load(SeqCst) { 't' } else { 'f' })
495                  .collect()
496          }
497      }
498  
499      //---------- test advance_until_stalled ----------
500  
501      impl TestTasks {
502          async fn advance_until_stalled(&self, exp_offset_from_start: Duration, exp_signals: &str) {
503              self.runtime.advance_until_stalled().await;
504              assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
505              assert_eq!(self.signals_list(), exp_signals);
506          }
507      }
508  
509      #[traced_test]
510      #[test]
511      fn advance_until_stalled() {
512          MockRuntime::test_with_various(|runtime| async move {
513              let mut tt = TestTasks::spawn(&runtime);
514  
515              tt.advance_until_stalled(ms(3000), "fttt").await;
516              tt.tx.send(()).await.unwrap();
517              tt.advance_until_stalled(ms(3000), "tttt").await;
518          });
519      }
520  
521      //---------- test advance_until ----------
522  
523      impl TestTasks {
524          async fn advance_until(
525              &self,
526              offset_from_start: Duration,
527              exp_signals: &str,
528              exp_got: Option<Duration>,
529          ) {
530              let limit = self.start + offset_from_start;
531              eprintln!("===> advance_until {}ms", offset_from_start.as_millis());
532              let got = self.runtime.advance_until(limit).await;
533              assert_eq!(self.runtime.now() - self.start, offset_from_start);
534              assert_eq!(self.signals_list(), exp_signals);
535              assert_eq!(got, exp_got);
536          }
537      }
538  
539      #[traced_test]
540      #[test]
541      fn advance_until() {
542          MockRuntime::test_with_various(|runtime| async move {
543              let mut tt = TestTasks::spawn(&runtime);
544  
545              tt.advance_until(ms(1100), "ftff", Some(ms(900))).await;
546              tt.advance_until(ms(2000), "fttf", Some(ms(1000))).await;
547              tt.tx.send(()).await.unwrap();
548              tt.advance_until(ms(2000), "tttf", Some(ms(1000))).await;
549              tt.advance_until(ms(3300), "tttt", None).await;
550          });
551      }
552  
553      //---------- test advance_by ----------
554  
555      impl TestTasks {
556          async fn advance_by(
557              &self,
558              advance: Duration,
559              exp_offset_from_start: Duration,
560              exp_signals: &str,
561              exp_got: Option<Duration>,
562          ) {
563              eprintln!("===> advance {}ms", advance.as_millis());
564              let got = self.runtime.advance_by(advance).await;
565              assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
566              assert_eq!(self.signals_list(), exp_signals);
567              assert_eq!(got, exp_got);
568          }
569      }
570  
571      #[traced_test]
572      #[test]
573      fn advance_by() {
574          MockRuntime::test_with_various(|runtime| async move {
575              let mut tt = TestTasks::spawn(&runtime);
576  
577              tt.advance_by(ms(1100), ms(1100), "ftff", Some(ms(900)))
578                  .await;
579              tt.advance_by(ms(900), ms(2000), "fttf", Some(ms(1000)))
580                  .await;
581              tt.tx.send(()).await.unwrap();
582              tt.advance_by(ms(1300), ms(3300), "tttt", None).await;
583          });
584      }
585  }