simple_time.rs
1 //! Simple provider of simulated time 2 //! 3 //! See [`SimpleMockTimeProvider`] 4 5 use std::cmp::Reverse; 6 use std::future::Future; 7 use std::pin::Pin; 8 use std::sync::{Arc, Mutex, MutexGuard}; 9 use std::task::{Context, Poll, Waker}; 10 use std::time::{Duration, Instant, SystemTime}; 11 12 use derive_more::AsMut; 13 use priority_queue::priority_queue::PriorityQueue; 14 use slotmap_careful::DenseSlotMap; 15 16 use tor_rtcompat::CoarseInstant; 17 use tor_rtcompat::CoarseTimeProvider; 18 use tor_rtcompat::SleepProvider; 19 20 use crate::time_core::MockTimeCore; 21 22 /// Simple provider of simulated time 23 /// 24 /// Maintains a mocked view of the current [`Instant`] and [`SystemTime`]. 25 /// 26 /// The simulated time advances only when explicitly instructed, 27 /// by calling [`.advance()`](Provider::advance). 28 /// 29 /// The wallclock time can be warped with 30 /// [`.jump_wallclock()`](Provider::jump_wallclock), 31 /// allowing simulation of wall clock non-monotonicity. 32 /// 33 /// # Panics and aborts 34 /// 35 /// Panics on time under/overflow. 36 /// 37 /// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs. 38 #[derive(Clone, Debug)] 39 pub struct SimpleMockTimeProvider { 40 /// The actual state 41 state: Arc<Mutex<State>>, 42 } 43 44 /// Convenience abbreviation 45 pub(crate) use SimpleMockTimeProvider as Provider; 46 47 /// Identifier of a [`SleepFuture`] 48 type Id = slotmap_careful::DefaultKey; 49 50 /// Future for `sleep` 51 /// 52 /// Iff this struct exists, there is an entry for `id` in `prov.futures`. 53 /// (It might contain `None`.) 54 pub struct SleepFuture { 55 /// Reference to our state 56 prov: Provider, 57 58 /// Which `SleepFuture` are we 59 id: Id, 60 } 61 62 /// Mutable state for a [`Provider`] 63 /// 64 /// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states: 65 /// 66 /// | state | [`SleepFuture`] | `futures` | `unready` | 67 /// |-------------|------------------|------------------|--------------------| 68 /// | UNPOLLLED | exists | present, `None` | present, `> now` | 69 /// | WAITING | exists | present, `Some` | present, `> now` | 70 /// | READY | exists | present, `None` | absent | 71 /// | DROPPED | dropped | absent | absent | 72 #[derive(Debug, AsMut)] 73 struct State { 74 /// Current time (coarse) 75 core: MockTimeCore, 76 77 /// Futures; record of every existing [`SleepFuture`], including any `Waker` 78 /// 79 /// Entry exists iff `SleepFuture` exists. 80 /// 81 /// Contains `None` if we haven't polled the future; 82 /// `Some` if we have. 83 /// 84 /// We could use a `Vec` or `TiVec` 85 /// but using a slotmap is more robust against bugs here. 86 futures: DenseSlotMap<Id, Option<Waker>>, 87 88 /// Priority queue 89 /// 90 /// Subset of `futures`. 91 /// 92 /// An entry is present iff the `Instant` is *strictly* after `State.now`, 93 /// in which case that's when the future should be woken. 94 /// 95 /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse` 96 unready: PriorityQueue<Id, Reverse<Instant>>, 97 } 98 99 /// `Default` makes a `Provider` which starts at whatever the current real time is 100 impl Default for Provider { 101 fn default() -> Self { 102 Self::from_real() 103 } 104 } 105 106 impl Provider { 107 /// Return a new mock time provider starting at a specified point in time 108 pub fn new(now: Instant, wallclock: SystemTime) -> Self { 109 let state = State { 110 core: MockTimeCore::new(now, wallclock), 111 futures: Default::default(), 112 unready: Default::default(), 113 }; 114 Provider { 115 state: Arc::new(Mutex::new(state)), 116 } 117 } 118 119 /// Return a new mock time provider starting at the current actual (non-mock) time 120 /// 121 /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes 122 /// due to calls to `advance`. 123 pub fn from_real() -> Self { 124 Provider::from_wallclock(SystemTime::now()) 125 } 126 /// Return a new mock time provider starting at a specified wallclock time 127 /// 128 /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time. 129 /// (Absolute values of the real monotonic time are not readily 130 /// observable or distinguishable from Rust, 131 /// nor can a fixed `Instant` be constructed, 132 /// so this is usually sufficient for a reproducible test.) 133 pub fn from_wallclock(wallclock: SystemTime) -> Self { 134 Provider::new(Instant::now(), wallclock) 135 } 136 137 /// Advance the simulated time by `d` 138 /// 139 /// This advances both the `Instant` (monotonic time) 140 /// and `SystemTime` (wallclock time) 141 /// by the same amount. 142 /// 143 /// Will wake sleeping [`SleepFuture`]s, as appropriate. 144 /// 145 /// Note that the tasks which were waiting on those now-expired `SleepFuture`s 146 /// will only actually execute when they are next polled. 147 /// `advance` does not yield to the executor or poll any futures. 148 /// The executor will (presumably) poll those woken tasks, when it regains control. 149 /// But the order in which the tasks run will depend on its scheduling policy, 150 /// and might be different to the order implied by the futures' timeout values. 151 /// 152 /// To simulate normal time advancement, wakeups, and task activations, 153 /// use [`MockExecutor::advance_*()`](crate::MockRuntime). 154 pub fn advance(&self, d: Duration) { 155 let mut state = self.lock(); 156 state.core.advance(d); 157 state.wake_any(); 158 } 159 160 /// Warp the wallclock time 161 /// 162 /// This has no effect on any sleeping futures. 163 /// It only affects the return value from [`.wallclock()`](Provider::wallclock). 164 pub fn jump_wallclock(&self, new_wallclock: SystemTime) { 165 self.lock().core.jump_wallclock(new_wallclock); 166 // Really we ought to wake people up, here. 167 // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime. 168 // (There might be some less-portable non-Rust APIs for that.) 169 } 170 171 /// When will the next timeout occur? 172 /// 173 /// Returns the duration until the next [`SleepFuture`] should wake up. 174 /// 175 /// Advancing time by at least this amount will wake up that future, 176 /// and any others with the same wakeup time. 177 /// 178 /// Will never return `Some(ZERO)`: 179 /// any future that is supposed to wake up now (or earlier) has indeed already been woken, 180 /// so it is no longer sleeping and isn't included in the calculation. 181 pub fn time_until_next_timeout(&self) -> Option<Duration> { 182 let state = self.lock(); 183 let Reverse(until) = state.unready.peek()?.1; 184 // The invariant (see `State`) guarantees that entries in `unready` are always `> now`, 185 // so we don't whether duration_since would panic or saturate. 186 let d = until.duration_since(state.core.instant()); 187 Some(d) 188 } 189 190 /// Convenience function to lock the state 191 fn lock(&self) -> MutexGuard<'_, State> { 192 self.state.lock().expect("simple time state poisoned") 193 } 194 } 195 196 impl SleepProvider for Provider { 197 type SleepFuture = SleepFuture; 198 199 fn sleep(&self, d: Duration) -> SleepFuture { 200 let mut state = self.lock(); 201 let until = state.core.instant() + d; 202 203 let id = state.futures.insert(None); 204 state.unready.push(id, Reverse(until)); 205 206 let fut = SleepFuture { 207 id, 208 prov: self.clone(), 209 }; 210 211 // This sleep is now UNPOLLLED, except that its time might be `<= now`: 212 213 // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0. 214 // If so, .wake_any() will restore the invariant by immediately waking. 215 state.wake_any(); 216 217 // This sleep is now UNPOLLED or READY, according to whether duration was 0. 218 219 fut 220 } 221 222 fn now(&self) -> Instant { 223 self.lock().core.instant() 224 } 225 fn wallclock(&self) -> SystemTime { 226 self.lock().core.wallclock() 227 } 228 } 229 230 impl CoarseTimeProvider for Provider { 231 fn now_coarse(&self) -> CoarseInstant { 232 self.lock().core.coarse().now_coarse() 233 } 234 } 235 236 impl Future for SleepFuture { 237 type Output = (); 238 239 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 240 let mut state = self.prov.lock(); 241 if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) { 242 // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING 243 assert!(*scheduled > state.core.instant()); 244 let waker = Some(cx.waker().clone()); 245 // Make this be WAITING. (If we're re-polled, we simply drop any previous waker.) 246 *state 247 .futures 248 .get_mut(self.id) 249 .expect("polling futures entry") = waker; 250 Poll::Pending 251 } else { 252 // Absence implies scheduled (no longer stored) <= now: we are READY 253 Poll::Ready(()) 254 } 255 } 256 } 257 258 impl State { 259 /// Restore the invariant for `unready` after `now` has been increased 260 /// 261 /// Ie, ensures that any sleeps which are 262 /// WAITING/UNPOLLED except that they are `<= now`, 263 /// are moved to state READY. 264 fn wake_any(&mut self) { 265 loop { 266 match self.unready.peek() { 267 // Keep picking off entries with scheduled <= now 268 Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => { 269 let (id, _) = self.unready.pop().expect("vanished"); 270 // We can .take() the waker since this can only ever run once 271 // per sleep future (since it happens when we pop it from unready). 272 let futures_entry = self.futures.get_mut(id).expect("stale unready entry"); 273 if let Some(waker) = futures_entry.take() { 274 waker.wake(); 275 } 276 } 277 _ => break, 278 } 279 } 280 } 281 } 282 283 impl Drop for SleepFuture { 284 fn drop(&mut self) { 285 let mut state = self.prov.lock(); 286 let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished"); 287 let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id); 288 // Now it is DROPPED. 289 } 290 } 291 292 #[cfg(test)] 293 mod test { 294 // @@ begin test lint list maintained by maint/add_warning @@ 295 #![allow(clippy::bool_assert_comparison)] 296 #![allow(clippy::clone_on_copy)] 297 #![allow(clippy::dbg_macro)] 298 #![allow(clippy::mixed_attributes_style)] 299 #![allow(clippy::print_stderr)] 300 #![allow(clippy::print_stdout)] 301 #![allow(clippy::single_char_pattern)] 302 #![allow(clippy::unwrap_used)] 303 #![allow(clippy::unchecked_duration_subtraction)] 304 #![allow(clippy::useless_vec)] 305 #![allow(clippy::needless_pass_by_value)] 306 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 307 use super::*; 308 use crate::task::MockExecutor; 309 use futures::poll; 310 use humantime::parse_rfc3339; 311 use tor_rtcompat::BlockOn as _; 312 use Poll::*; 313 314 fn ms(ms: u64) -> Duration { 315 Duration::from_millis(ms) 316 } 317 318 fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT) 319 where 320 FUT: Future<Output = ()>, 321 { 322 let sp = Provider::new( 323 Instant::now(), // it would have been nice to make this fixed for the test 324 parse_rfc3339("2000-01-01T00:00:00Z").unwrap(), 325 ); 326 let exec = MockExecutor::new(); 327 exec.block_on(f(sp, exec.clone())); 328 } 329 330 #[test] 331 fn simple() { 332 run_test(|sp, _exec| async move { 333 let n1 = sp.now(); 334 let w1 = sp.wallclock(); 335 let mut f1 = sp.sleep(ms(500)); 336 let mut f2 = sp.sleep(ms(1500)); 337 assert_eq!(poll!(&mut f1), Pending); 338 sp.advance(ms(200)); 339 assert_eq!(n1 + ms(200), sp.now()); 340 assert_eq!(w1 + ms(200), sp.wallclock()); 341 assert_eq!(poll!(&mut f1), Pending); 342 assert_eq!(poll!(&mut f2), Pending); 343 drop(f2); 344 sp.jump_wallclock(w1 + ms(10_000)); 345 sp.advance(ms(300)); 346 assert_eq!(n1 + ms(500), sp.now()); 347 assert_eq!(w1 + ms(10_300), sp.wallclock()); 348 assert_eq!(poll!(&mut f1), Ready(())); 349 let mut f0 = sp.sleep(ms(0)); 350 assert_eq!(poll!(&mut f0), Ready(())); 351 }); 352 } 353 354 #[test] 355 fn task() { 356 run_test(|sp, exec| async move { 357 let st = Arc::new(Mutex::new(0_i8)); 358 359 exec.spawn_identified("test task", { 360 let st = st.clone(); 361 let sp = sp.clone(); 362 async move { 363 *st.lock().unwrap() = 1; 364 sp.sleep(ms(500)).await; 365 *st.lock().unwrap() = 2; 366 sp.sleep(ms(300)).await; 367 *st.lock().unwrap() = 3; 368 } 369 }); 370 371 let st = move || *st.lock().unwrap(); 372 373 assert_eq!(st(), 0); 374 exec.progress_until_stalled().await; 375 assert_eq!(st(), 1); 376 assert_eq!(sp.time_until_next_timeout(), Some(ms(500))); 377 378 sp.advance(ms(500)); 379 380 assert_eq!(st(), 1); 381 assert_eq!(sp.time_until_next_timeout(), None); 382 exec.progress_until_stalled().await; 383 assert_eq!(st(), 2); 384 assert_eq!(sp.time_until_next_timeout(), Some(ms(300))); 385 386 sp.advance(ms(500)); 387 assert_eq!(st(), 2); 388 assert_eq!(sp.time_until_next_timeout(), None); 389 exec.progress_until_stalled().await; 390 assert_eq!(sp.time_until_next_timeout(), None); 391 assert_eq!(st(), 3); 392 }); 393 } 394 }