runtime.rs
1 //! Completely mock runtime 2 3 #![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri) 4 5 use std::fmt::{Debug, Display}; 6 use std::ops::ControlFlow; 7 8 use amplify::Getters; 9 use futures::FutureExt as _; 10 use itertools::chain; 11 use strum::IntoEnumIterator as _; 12 use void::{ResultVoidExt as _, Void}; 13 14 use crate::util::impl_runtime_prelude::*; 15 16 use crate::net::MockNetProvider; 17 use crate::simple_time::SimpleMockTimeProvider; 18 use crate::task::{MockExecutor, SchedulingPolicy}; 19 20 /// Completely mock runtime, with simulated time 21 /// 22 /// Suitable for test cases that wish to completely control 23 /// the environment experienced by the code under test. 24 /// 25 /// ### Useful properties 26 /// 27 /// The execution order is deterministic. 28 /// Time will advance only in a controlled fashion. 29 /// Typically, the main task in a test will call 30 /// [`advance_until_stalled`](MockRuntime::advance_until_stalled). 31 /// 32 /// Reliable sequencing techniques which can be used in tests include: 33 /// sleeping for carefully chosen durations; 34 /// interlocking via intertask channels; or, 35 /// sequenced control of requests to the code under test. 36 /// 37 /// ### Restrictions 38 /// 39 /// The test case must advance the mock time explicitly as desired, 40 /// typically by calling one of the `MockRuntime::advance_*` methods. 41 /// 42 /// Tests that use this runtime *must not* interact with the outside world; 43 /// everything must go through this runtime (and its pieces). 44 /// 45 /// There is no mocking of filesystem access; 46 /// the `MockRuntime`'s time will disagree with `SystemTime`'s 47 /// obtained from (for example) `std::fs::Metadata`. 48 /// 49 /// #### Allowed 50 /// 51 /// * Inter-future communication facilities from `futures` 52 /// or other runtime-agnostic crates. 53 /// 54 /// * Fast synchronous operations that will complete "immediately" or "quickly". 55 /// E.g.: filesystem calls. 56 /// 57 /// * `std::sync::Mutex` (assuming the use is deadlock-free in a single-threaded 58 /// executor, as it should be in all of Arti). 59 /// 60 /// * Slower operations that are run synchronously (without futures `await`) 61 /// provided their completion doesn't depend on any of the futures we're running. 62 /// (These kind of operations are often discouraged in async contexts, 63 /// because they block the async runtime or its worker threads. 64 /// But they are often OK in tests.) 65 /// 66 /// * All facilities provided by this `MockExecutor` and its trait impls. 67 /// 68 /// #### Not allowed 69 /// 70 /// * Direct access to the real-world clock (`SystemTime::now`, `Instant::now`), 71 /// including direct use of `coarsetime`. 72 /// Instead, use [`SleepProvider`] and [`CoarseTimeProvider`] methods on the runtime. 73 /// Exception: CPU use measurements. 74 /// 75 /// * Anything that spawns threads and then communicates with those threads 76 /// using async Rust facilities (futures). 77 /// 78 /// * Async sockets, or async use of other kernel-based IPC or network mechanisms. 79 /// 80 /// * Anything provided by a Rust runtime/executor project (eg anything from Tokio), 81 /// unless it is definitively established that it's runtime-agnostic. 82 #[derive(Debug, Default, Clone, Getters, Deftly)] 83 #[derive_deftly(SomeMockRuntime)] 84 #[getter(prefix = "mock_")] 85 pub struct MockRuntime { 86 /// Tasks 87 #[deftly(mock(task))] 88 task: MockExecutor, 89 /// Time provider 90 #[deftly(mock(sleep))] 91 sleep: SimpleMockTimeProvider, 92 /// Net provider 93 #[deftly(mock(net))] 94 net: MockNetProvider, 95 } 96 97 /// Builder for a manually-configured `MockRuntime` 98 #[derive(Debug, Default, Clone)] 99 pub struct MockRuntimeBuilder { 100 /// scheduling policy 101 scheduling: SchedulingPolicy, 102 /// sleep provider 103 sleep: Option<SimpleMockTimeProvider>, 104 /// starting wall clock time 105 starting_wallclock: Option<SystemTime>, 106 } 107 108 impl MockRuntime { 109 /// Create a new `MockRuntime` with default parameters 110 pub fn new() -> Self { 111 Self::default() 112 } 113 114 /// Return a builder, for creating a `MockRuntime` with some parameters manually configured 115 pub fn builder() -> MockRuntimeBuilder { 116 Default::default() 117 } 118 119 /// Run a test case with a variety of runtime parameters, to try to find bugs 120 /// 121 /// `test_case` is an async closure which receives a `MockRuntime`. 122 /// It will be run with a number of differently configured executors. 123 /// 124 /// Each run will be preceded by an [`eprintln!`] showing the runtime configuration. 125 /// 126 /// ### Variations 127 /// 128 /// The only variation currently implemented is this: 129 /// 130 /// Both FIFO and LIFO scheduling policies are tested, 131 /// in the hope that this will help discover ordering-dependent bugs. 132 pub fn test_with_various<TC, FUT>(mut test_case: TC) 133 where 134 TC: FnMut(MockRuntime) -> FUT, 135 FUT: Future<Output = ()>, 136 { 137 Self::try_test_with_various(|runtime| test_case(runtime).map(|()| Ok::<_, Void>(()))) 138 .void_unwrap(); 139 } 140 141 /// Run a faillible test case with a variety of runtime parameters, to try to find bugs 142 /// 143 /// `test_case` is an async closure which receives a `MockRuntime`. 144 /// It will be run with a number of differently configured executors. 145 /// 146 /// This function accepts a fallible closure, 147 /// and returns the first `Err` to the caller. 148 /// 149 /// See [`test_with_various()`](MockRuntime::test_with_various) for more details. 150 #[allow(clippy::print_stderr)] 151 pub fn try_test_with_various<TC, FUT, E>(mut test_case: TC) -> Result<(), E> 152 where 153 TC: FnMut(MockRuntime) -> FUT, 154 FUT: Future<Output = Result<(), E>>, 155 { 156 for scheduling in SchedulingPolicy::iter() { 157 let config = MockRuntime::builder().scheduling(scheduling); 158 eprintln!("running test with MockRuntime configuration {config:?}"); 159 let runtime = config.build(); 160 runtime.block_on(test_case(runtime.clone()))?; 161 } 162 Ok(()) 163 } 164 165 /// Spawn a task and return something to identify it 166 /// 167 /// See [`MockExecutor::spawn_identified()`] 168 pub fn spawn_identified( 169 &self, 170 desc: impl Display, 171 fut: impl Future<Output = ()> + Send + 'static, 172 ) -> impl Debug + Clone + Send + 'static { 173 self.task.spawn_identified(desc, fut) 174 } 175 176 /// Spawn a task and return its output for further usage 177 /// 178 /// See [`MockExecutor::spawn_join()`] 179 pub fn spawn_join<T: Debug + Send + 'static>( 180 &self, 181 desc: impl Display, 182 fut: impl Future<Output = T> + Send + 'static, 183 ) -> impl Future<Output = T> { 184 self.task.spawn_join(desc, fut) 185 } 186 187 /// Run tasks and advance time, until every task except this one is waiting 188 /// 189 /// On return the other tasks won't be waiting on timeouts, 190 /// since time will be advanced as needed. 191 /// 192 /// Therefore the other tasks (if any) will be waiting for something 193 /// that won't happen by itself, 194 /// such as a provocation via their APIs from this task. 195 /// 196 /// # Panics 197 /// 198 /// See [`progress_until_stalled`](MockRuntime::progress_until_stalled) 199 pub async fn advance_until_stalled(&self) { 200 self.advance_inner(|| { 201 let Some(timeout) = self.time_until_next_timeout() else { 202 // Nothing is waiting on timeouts 203 return ControlFlow::Break(()); 204 }; 205 assert_ne!(timeout, Duration::ZERO); 206 ControlFlow::Continue(timeout) 207 }) 208 .await; 209 } 210 211 /// Run tasks in the current executor until every task except this one is waiting 212 /// 213 /// Calls [`MockExecutor::progress_until_stalled()`]. 214 /// 215 /// # Restriction - no automatic time advance 216 /// 217 /// The mocked time will *not* be automatically advanced. 218 /// 219 /// Usually 220 /// (and especially if the tasks under test are waiting for timeouts or periodic events) 221 /// you must use 222 /// [`advance_by()`](MockRuntime::advance_by) 223 /// or 224 /// [`advance_until()`](MockRuntime::advance_until) 225 /// to ensure the simulated time progresses as required. 226 /// 227 /// # Panics 228 /// 229 /// Might malfunction or panic if more than one such call is running at once. 230 /// 231 /// (Ie, you must `.await` or drop the returned `Future` 232 /// before calling this method again.) 233 /// 234 /// Must be called and awaited within a future being run by `self`. 235 pub async fn progress_until_stalled(&self) { 236 self.task.progress_until_stalled().await; 237 } 238 239 /// Run tasks and advance time up to at most `limit` 240 /// 241 /// Will return when all other tasks are either: 242 /// * Waiting on a timeout that will fire strictly after `limit`, 243 /// (return value is the time until the earliest such) 244 /// * Waiting for something else that won't happen by itself. 245 /// (return value is `None`) 246 /// 247 /// Like [`advance_until_stalled`](MockRuntime::advance_until_stalled) 248 /// but stops when the mock time reaches `limit`. 249 /// 250 /// # Panics 251 /// 252 /// Panics if the time somehow advances beyond `limit`. 253 /// (This function won't do that, but maybe it was beyond `limit` on entry, 254 /// or another task advanced the clock.) 255 /// 256 /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled) 257 pub async fn advance_until(&self, limit: Instant) -> Option<Duration> { 258 self.advance_inner(|| { 259 let timeout = self.time_until_next_timeout(); 260 261 let limit = limit 262 .checked_duration_since(self.now()) 263 .expect("MockRuntime::advance_until: time advanced beyond `limit`!"); 264 265 if limit == Duration::ZERO { 266 // Time has reached `limit` 267 return ControlFlow::Break(timeout); 268 } 269 270 let advance = chain!(timeout, [limit]).min().expect("empty!"); 271 assert_ne!(advance, Duration::ZERO); 272 273 ControlFlow::Continue(advance) 274 }) 275 .await 276 } 277 278 /// Advance time, firing events and other tasks - internal implementation 279 /// 280 /// Common code for `advance_*`. 281 /// 282 /// `body` will called after `progress_until_stalled`. 283 /// It should examine the simulated time, and the next timeout, 284 /// and decide what to do - returning 285 /// `Break` to break the loop, or 286 /// `Continue` giving the `Duration` by which to advance time and go round again. 287 #[allow(clippy::print_stderr)] 288 async fn advance_inner<B>(&self, mut body: impl FnMut() -> ControlFlow<B, Duration>) -> B { 289 /// Warn when we loop more than this many times per call 290 const WARN_AT: u32 = 1000; 291 let mut counter = Some(WARN_AT); 292 293 loop { 294 self.task.progress_until_stalled().await; 295 296 match body() { 297 ControlFlow::Break(v) => break v, 298 ControlFlow::Continue(advance) => { 299 counter = match counter.map(|v| v.checked_sub(1)) { 300 None => None, 301 Some(Some(v)) => Some(v), 302 Some(None) => { 303 eprintln!( 304 "warning: MockRuntime advance_* looped >{WARN_AT} (next sleep: {}ms)\n{:?}", 305 advance.as_millis(), 306 self.mock_task().as_debug_dump(), 307 ); 308 None 309 } 310 }; 311 312 self.sleep.advance(advance); 313 } 314 } 315 } 316 } 317 318 /// Advances time by `dur`, firing time events and other tasks in order 319 /// 320 /// Prefer this to [`SimpleMockTimeProvider::advance()`]; 321 /// it works more faithfully. 322 /// 323 /// Specifically, it advances time in successive stages, 324 /// so that timeouts occur sequentially, in the right order. 325 /// 326 /// # Panics 327 /// 328 /// Can panic if the mock time is advanced by other tasks. 329 /// 330 /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled) 331 pub async fn advance_by(&self, dur: Duration) -> Option<Duration> { 332 let limit = self 333 .now() 334 .checked_add(dur) 335 .expect("MockRuntime::advance: time overflow"); 336 337 self.advance_until(limit).await 338 } 339 340 /// See [`SimpleMockTimeProvider::jump_wallclock()`] 341 pub fn jump_wallclock(&self, new_wallclock: SystemTime) { 342 self.sleep.jump_wallclock(new_wallclock); 343 } 344 345 /// Return the amount of virtual time until the next timeout 346 /// should elapse. 347 /// 348 /// If there are no more timeouts, return None. 349 /// 350 /// If the next 351 /// timeout should elapse right now, return Some(0). 352 /// However, if other tasks are proceeding, 353 /// typically in that situation those other tasks will wake, 354 /// so a `Some(0)` return won't be visible. 355 /// In test cases, detect immediate timeouts by detecting 356 /// what your task does after the timeout occurs. 357 /// 358 /// Likewise whether this function returns `None` or `Some(...)` 359 /// can depend on whether tasks have actually yet polled various futures. 360 /// The answer should be correct after 361 /// [`progress_until_stalled`](Self::progress_until_stalled). 362 pub fn time_until_next_timeout(&self) -> Option<Duration> { 363 self.sleep.time_until_next_timeout() 364 } 365 } 366 367 impl MockRuntimeBuilder { 368 /// Set the scheduling policy 369 pub fn scheduling(mut self, scheduling: SchedulingPolicy) -> Self { 370 self.scheduling = scheduling; 371 self 372 } 373 374 /// Provide a non-`Default` [`SimpleMockTimeProvider`] 375 pub fn sleep_provider(mut self, sleep: SimpleMockTimeProvider) -> Self { 376 self.sleep = Some(sleep); 377 self 378 } 379 380 /// Set the starting wall clock time 381 pub fn starting_wallclock(mut self, starting_wallclock: SystemTime) -> Self { 382 self.starting_wallclock = Some(starting_wallclock); 383 self 384 } 385 386 /// Build the runtime 387 pub fn build(self) -> MockRuntime { 388 let MockRuntimeBuilder { 389 scheduling, 390 sleep, 391 starting_wallclock, 392 } = self; 393 394 let sleep = sleep.unwrap_or_default(); 395 if let Some(starting_wallclock) = starting_wallclock { 396 sleep.jump_wallclock(starting_wallclock); 397 }; 398 399 let task = MockExecutor::with_scheduling(scheduling); 400 401 MockRuntime { 402 sleep, 403 task, 404 ..Default::default() 405 } 406 } 407 } 408 409 #[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME 410 mod test { 411 // @@ begin test lint list maintained by maint/add_warning @@ 412 #![allow(clippy::bool_assert_comparison)] 413 #![allow(clippy::clone_on_copy)] 414 #![allow(clippy::dbg_macro)] 415 #![allow(clippy::mixed_attributes_style)] 416 #![allow(clippy::print_stderr)] 417 #![allow(clippy::print_stdout)] 418 #![allow(clippy::single_char_pattern)] 419 #![allow(clippy::unwrap_used)] 420 #![allow(clippy::unchecked_duration_subtraction)] 421 #![allow(clippy::useless_vec)] 422 #![allow(clippy::needless_pass_by_value)] 423 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 424 use super::*; 425 use futures::channel::mpsc; 426 use futures::{SinkExt as _, StreamExt as _}; 427 use std::sync::atomic::AtomicBool; 428 use std::sync::atomic::Ordering::SeqCst; 429 use std::sync::Arc; 430 use tracing::trace; 431 use tracing_test::traced_test; 432 433 //---------- helper alias ---------- 434 435 fn ms(i: u64) -> Duration { 436 Duration::from_millis(i) 437 } 438 439 //---------- set up some test tasks ---------- 440 441 struct TestTasks { 442 runtime: MockRuntime, 443 start: Instant, 444 tx: mpsc::Sender<()>, 445 signals: Vec<Arc<AtomicBool>>, 446 } 447 impl TestTasks { 448 fn spawn(runtime: &MockRuntime) -> TestTasks { 449 let start = runtime.now(); 450 let mut signals = vec![]; 451 452 let mut new_signal = || { 453 let signal = Arc::new(AtomicBool::new(false)); 454 signals.push(signal.clone()); 455 signal 456 }; 457 458 let (tx, mut rx) = mpsc::channel(0); 459 runtime.spawn_identified("rx", { 460 let signal = new_signal(); 461 async move { 462 trace!("task rx starting..."); 463 let _: Option<()> = rx.next().await; 464 signal.store(true, SeqCst); 465 trace!("task rx finished."); 466 } 467 }); 468 469 for i in 1..=3 { 470 let signal = new_signal(); 471 runtime.spawn_identified(i, { 472 let runtime = runtime.clone(); 473 async move { 474 trace!("task {i} starting..."); 475 runtime.sleep(ms(i * 1000)).await; 476 signal.store(true, SeqCst); 477 trace!("task {i} finished."); 478 } 479 }); 480 } 481 let runtime = runtime.clone(); 482 483 TestTasks { 484 runtime, 485 start, 486 tx, 487 signals, 488 } 489 } 490 491 fn signals_list(&self) -> String { 492 self.signals 493 .iter() 494 .map(|s| if s.load(SeqCst) { 't' } else { 'f' }) 495 .collect() 496 } 497 } 498 499 //---------- test advance_until_stalled ---------- 500 501 impl TestTasks { 502 async fn advance_until_stalled(&self, exp_offset_from_start: Duration, exp_signals: &str) { 503 self.runtime.advance_until_stalled().await; 504 assert_eq!(self.runtime.now() - self.start, exp_offset_from_start); 505 assert_eq!(self.signals_list(), exp_signals); 506 } 507 } 508 509 #[traced_test] 510 #[test] 511 fn advance_until_stalled() { 512 MockRuntime::test_with_various(|runtime| async move { 513 let mut tt = TestTasks::spawn(&runtime); 514 515 tt.advance_until_stalled(ms(3000), "fttt").await; 516 tt.tx.send(()).await.unwrap(); 517 tt.advance_until_stalled(ms(3000), "tttt").await; 518 }); 519 } 520 521 //---------- test advance_until ---------- 522 523 impl TestTasks { 524 async fn advance_until( 525 &self, 526 offset_from_start: Duration, 527 exp_signals: &str, 528 exp_got: Option<Duration>, 529 ) { 530 let limit = self.start + offset_from_start; 531 eprintln!("===> advance_until {}ms", offset_from_start.as_millis()); 532 let got = self.runtime.advance_until(limit).await; 533 assert_eq!(self.runtime.now() - self.start, offset_from_start); 534 assert_eq!(self.signals_list(), exp_signals); 535 assert_eq!(got, exp_got); 536 } 537 } 538 539 #[traced_test] 540 #[test] 541 fn advance_until() { 542 MockRuntime::test_with_various(|runtime| async move { 543 let mut tt = TestTasks::spawn(&runtime); 544 545 tt.advance_until(ms(1100), "ftff", Some(ms(900))).await; 546 tt.advance_until(ms(2000), "fttf", Some(ms(1000))).await; 547 tt.tx.send(()).await.unwrap(); 548 tt.advance_until(ms(2000), "tttf", Some(ms(1000))).await; 549 tt.advance_until(ms(3300), "tttt", None).await; 550 }); 551 } 552 553 //---------- test advance_by ---------- 554 555 impl TestTasks { 556 async fn advance_by( 557 &self, 558 advance: Duration, 559 exp_offset_from_start: Duration, 560 exp_signals: &str, 561 exp_got: Option<Duration>, 562 ) { 563 eprintln!("===> advance {}ms", advance.as_millis()); 564 let got = self.runtime.advance_by(advance).await; 565 assert_eq!(self.runtime.now() - self.start, exp_offset_from_start); 566 assert_eq!(self.signals_list(), exp_signals); 567 assert_eq!(got, exp_got); 568 } 569 } 570 571 #[traced_test] 572 #[test] 573 fn advance_by() { 574 MockRuntime::test_with_various(|runtime| async move { 575 let mut tt = TestTasks::spawn(&runtime); 576 577 tt.advance_by(ms(1100), ms(1100), "ftff", Some(ms(900))) 578 .await; 579 tt.advance_by(ms(900), ms(2000), "fttf", Some(ms(1000))) 580 .await; 581 tt.tx.send(()).await.unwrap(); 582 tt.advance_by(ms(1300), ms(3300), "tttt", None).await; 583 }); 584 } 585 }