time.rs
1 //! Functionality for simulating the passage of time in unit tests. 2 //! 3 //! We do this by providing [`MockSleepProvider`], a "SleepProvider" 4 //! instance that can simulate timeouts and retries without requiring 5 //! the actual system clock to advance. 6 //! 7 //! ### Deprecated 8 //! 9 //! This mock time facility has some limitations. 10 //! See [`MockSleepProvider`] for more information. 11 //! Use [`MockRuntime`](crate::MockRuntime) for new tests. 12 13 #![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri) 14 #![allow(clippy::missing_docs_in_private_items)] 15 16 use std::{ 17 cmp::{Eq, Ordering, PartialEq, PartialOrd}, 18 collections::BinaryHeap, 19 fmt, 20 pin::Pin, 21 sync::{Arc, Mutex, Weak}, 22 task::{Context, Poll, Waker}, 23 time::{Duration, Instant, SystemTime}, 24 }; 25 26 use futures::Future; 27 use tracing::trace; 28 29 use std::collections::HashSet; 30 use std::fmt::Formatter; 31 use tor_rtcompat::{CoarseInstant, CoarseTimeProvider, SleepProvider}; 32 33 use crate::time_core::MockTimeCore; 34 35 /// A dummy [`SleepProvider`] instance for testing. 36 /// 37 /// The MockSleepProvider ignores the current time, and instead keeps 38 /// its own view of the current `Instant` and `SystemTime`. You 39 /// can advance them in-step by calling `advance()`, and you can simulate 40 /// jumps in the system clock by calling `jump()`. 41 /// 42 /// This is *not* for production use. 43 /// 44 /// ### Deprecated 45 /// 46 /// This mock time facility has some limitations, notably lack of support for tasks, 47 /// and a confusing API for controlling the mock time. 48 /// 49 /// New test cases should probably use `MockRuntime` 50 /// which incorporates `MockSimpletimeProvider`. 51 /// 52 /// Comparison of `MockSleepProvider` with `SimpleMockTimeProvider`: 53 /// 54 /// * `SimpleMockTimeProvider` does not support, or expect the use of, 55 /// `block_advance` et al. 56 /// Instead, the advancement of simulated time is typically done automatically 57 /// in cooperation with the executor, 58 /// using `MockRuntime`'s `advance_*` methods. 59 /// 60 /// * Consequently, `SimpleMockTimeProvider` can be used in test cases that 61 /// spawn tasks and perform sleeps in them. 62 /// 63 /// * And, consequently, `SimpleMockTimeProvider` does not need non-test code to 64 /// contain calls which are solely related to getting the time mocking to work right. 65 /// 66 /// * `SimpleMockTimeProvider` gives correct sleeping locations 67 /// with `MockExecutor`'s dump of sleeping tasks' stack traces. 68 /// 69 /// * Conversely, to use `SimpleMockTimeProvider` in all but the most simple test cases, 70 /// coordination with the executor is required. 71 /// This coordination is provided by the integrated `MockRuntime`; 72 /// `SimpleMockTimeProvider` is of limited usefulness by itself. 73 // 74 // TODO: at some point we should add #[deprecated] to this type 75 // and to the block_advance etc. methods in SleepProvider. 76 // But right now that would involve rewriting a whole bunch of tests, 77 // or generous sprinklings of #[allow]. 78 /// 79 /// ### Examples 80 /// 81 /// Suppose you've written a function that relies on making a 82 /// connection to the network and possibly timing out: 83 /// 84 /// ```rust 85 /// use tor_rtcompat::{Runtime,SleepProviderExt}; 86 /// use std::{net::SocketAddr, io::Result, time::Duration, io::Error}; 87 /// use futures::io::AsyncWriteExt; 88 /// 89 /// async fn say_hi(runtime: impl Runtime, addr: &SocketAddr) -> Result<()> { 90 /// let delay = Duration::new(5,0); 91 /// runtime.timeout(delay, async { 92 /// let mut conn = runtime.connect(addr).await?; 93 /// conn.write_all(b"Hello world!\r\n").await?; 94 /// conn.close().await?; 95 /// Ok::<_,Error>(()) 96 /// }).await??; 97 /// Ok(()) 98 /// } 99 /// ``` 100 /// 101 /// But how should you test this function? 102 /// 103 /// You might try connecting to a well-known website to test the 104 /// connection case, and to a well-known black hole to test the 105 /// timeout case... but that's a bit undesirable. Your tests might be 106 /// running in a container with no internet access; and even if they 107 /// aren't, it isn't so great for your tests to rely on the actual 108 /// state of the internet. Similarly, if you make your timeout too long, 109 /// your tests might block for a long time; but if your timeout is too short, 110 /// the tests might fail on a slow machine or on a slow network. 111 /// 112 /// Or, you could solve both of these problems by using `tor-rtmock` 113 /// to replace the internet _and_ the passage of time. (Here we're only 114 /// replacing the internet.) 115 /// 116 /// ```rust,no_run 117 /// # async fn say_hi<R,A>(runtime: R, addr: A) -> Result<(), ()> { Ok(()) } 118 /// # // TODO this test hangs for some reason? Fix it and remove no_run above 119 /// use tor_rtmock::{MockSleepRuntime,MockNetRuntime,net::MockNetwork}; 120 /// use tor_rtcompat::{NetStreamProvider,NetStreamListener}; 121 /// use futures::io::AsyncReadExt; 122 /// use std::net::SocketAddr; 123 /// use futures::StreamExt as _; 124 /// 125 /// tor_rtcompat::test_with_all_runtimes!(|rt| async move { 126 /// 127 /// let addr1 = "198.51.100.7".parse().unwrap(); 128 /// let addr2 = "198.51.100.99".parse().unwrap(); 129 /// let sockaddr: SocketAddr = "198.51.100.99:101".parse().unwrap(); 130 /// 131 /// // Make a runtime that pretends that we are at the first address... 132 /// let fake_internet = MockNetwork::new(); 133 /// let rt1 = fake_internet.builder().add_address(addr1).runtime(rt.clone()); 134 /// // ...and one that pretends we're listening at the second address. 135 /// let rt2 = fake_internet.builder().add_address(addr2).runtime(rt); 136 /// let listener = rt2.listen(&sockaddr).await.unwrap(); 137 /// let mut incoming_stream = listener.incoming(); 138 /// 139 /// // Now we can test our function! 140 /// let (result1,output) = futures::join!( 141 /// say_hi(rt1, &sockaddr), 142 /// async { 143 /// let (mut conn,addr) = incoming_stream.next().await.unwrap().unwrap(); 144 /// assert_eq!(addr.ip(), addr1); 145 /// let mut output = Vec::new(); 146 /// conn.read_to_end(&mut output).await.unwrap(); 147 /// output 148 /// }); 149 /// 150 /// assert!(result1.is_ok()); 151 /// assert_eq!(&output[..], b"Hello world!\r\n"); 152 /// }); 153 /// ``` 154 #[derive(Clone)] 155 pub struct MockSleepProvider { 156 /// The shared backend for this MockSleepProvider and its futures. 157 state: Arc<Mutex<SleepSchedule>>, 158 } 159 160 impl fmt::Debug for MockSleepProvider { 161 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 162 f.debug_struct("MockSleepProvider").finish_non_exhaustive() 163 } 164 } 165 166 /// Shared backend for sleep provider and Sleeping futures. 167 struct SleepSchedule { 168 /// What time do we pretend it is? 169 core: MockTimeCore, 170 /// Priority queue of events, in the order that we should wake them. 171 sleepers: BinaryHeap<SleepEntry>, 172 /// If the mock time system is being driven by a `WaitFor`, holds a `Waker` to wake up that 173 /// `WaitFor` in order for it to make more progress. 174 waitfor_waker: Option<Waker>, 175 /// Number of sleepers instantiated. 176 sleepers_made: usize, 177 /// Number of sleepers polled. 178 sleepers_polled: usize, 179 /// Whether an advance is needed. 180 should_advance: bool, 181 /// A set of reasons why advances shouldn't be allowed right now. 182 blocked_advance: HashSet<String>, 183 /// A time up to which advances are allowed, irrespective of them being blocked. 184 allowed_advance: Duration, 185 } 186 187 /// An entry telling us when to wake which future up. 188 struct SleepEntry { 189 /// The time at which this entry should wake 190 when: Instant, 191 /// The Waker to call when the instant has passed. 192 waker: Waker, 193 } 194 195 /// A future returned by [`MockSleepProvider::sleep()`]. 196 pub struct Sleeping { 197 /// The instant when we should become ready. 198 when: Instant, 199 /// True if we have pushed this into the queue. 200 inserted: bool, 201 /// The schedule to queue ourselves in if we're polled before we're ready. 202 provider: Weak<Mutex<SleepSchedule>>, 203 } 204 205 impl Default for MockSleepProvider { 206 fn default() -> Self { 207 let wallclock = humantime::parse_rfc3339("2023-07-05T11:25:56Z").expect("parse"); 208 MockSleepProvider::new(wallclock) 209 } 210 } 211 212 impl MockSleepProvider { 213 /// Create a new MockSleepProvider, starting at a given wall-clock time. 214 pub fn new(wallclock: SystemTime) -> Self { 215 let instant = Instant::now(); 216 let sleepers = BinaryHeap::new(); 217 let core = MockTimeCore::new(instant, wallclock); 218 let state = SleepSchedule { 219 core, 220 sleepers, 221 waitfor_waker: None, 222 sleepers_made: 0, 223 sleepers_polled: 0, 224 should_advance: false, 225 blocked_advance: HashSet::new(), 226 allowed_advance: Duration::from_nanos(0), 227 }; 228 MockSleepProvider { 229 state: Arc::new(Mutex::new(state)), 230 } 231 } 232 233 /// Advance the simulated timeline forward by `dur`. 234 /// 235 /// Calling this function will wake any pending futures as 236 /// appropriate, and yield to the scheduler so they get a chance 237 /// to run. 238 /// 239 /// # Limitations 240 /// 241 /// This function advances time in one big step. We might instead 242 /// want to advance in small steps and make sure that each step's 243 /// futures can get run before the ones scheduled to run after it. 244 pub async fn advance(&self, dur: Duration) { 245 self.advance_noyield(dur); 246 tor_rtcompat::task::yield_now().await; 247 } 248 249 /// Advance the simulated timeline forward by `dur`. 250 /// 251 /// Calling this function will wake any pending futures as 252 /// appropriate, but not yield to the scheduler. Mostly you 253 /// should call [`advance`](Self::advance) instead. 254 pub(crate) fn advance_noyield(&self, dur: Duration) { 255 // It's not so great to unwrap here in general, but since this is 256 // only testing code we don't really care. 257 let mut state = self.state.lock().expect("Poisoned lock for state"); 258 state.core.advance(dur); 259 state.fire(); 260 } 261 262 /// Simulate a discontinuity in the system clock, by jumping to 263 /// `new_wallclock`. 264 /// 265 /// # Panics 266 /// 267 /// Panics if we have already panicked while holding the lock on 268 /// the internal timer state, and the lock is poisoned. 269 pub fn jump_to(&self, new_wallclock: SystemTime) { 270 let mut state = self.state.lock().expect("Poisoned lock for state"); 271 state.core.jump_wallclock(new_wallclock); 272 } 273 274 /// Return the amount of virtual time until the next timeout 275 /// should elapse. 276 /// 277 /// If there are no more timeouts, return None. If the next 278 /// timeout should elapse right now, return Some(0). 279 pub(crate) fn time_until_next_timeout(&self) -> Option<Duration> { 280 let state = self.state.lock().expect("Poisoned lock for state"); 281 let now = state.core.instant(); 282 state 283 .sleepers 284 .peek() 285 .map(|sleepent| sleepent.when.saturating_duration_since(now)) 286 } 287 288 /// Return true if a `WaitFor` driving this sleep provider should advance time in order for 289 /// futures blocked on sleeping to make progress. 290 /// 291 /// NOTE: This function has side-effects; if it returns true, the caller is expected to do an 292 /// advance before calling it again. 293 #[allow(clippy::cognitive_complexity)] 294 pub(crate) fn should_advance(&mut self) -> bool { 295 let mut state = self.state.lock().expect("Poisoned lock for state"); 296 if !state.blocked_advance.is_empty() && state.allowed_advance == Duration::from_nanos(0) { 297 // We've had advances blocked, and don't have any quota for doing allowances while 298 // blocked left. 299 trace!( 300 "should_advance = false: blocked by {:?}", 301 state.blocked_advance 302 ); 303 return false; 304 } 305 if !state.should_advance { 306 // The advance flag wasn't set. 307 trace!("should_advance = false; bit not previously set"); 308 return false; 309 } 310 // Clear the advance flag; we'll either return true and cause an advance to happen, 311 // or the reasons to return false below also imply that the advance flag will be set again 312 // later on. 313 state.should_advance = false; 314 if state.sleepers_polled < state.sleepers_made { 315 // Something did set the advance flag before, but it's not valid any more now because 316 // more unpolled sleepers were created. 317 trace!("should_advance = false; advancing no longer valid"); 318 return false; 319 } 320 if !state.blocked_advance.is_empty() && state.allowed_advance > Duration::from_nanos(0) { 321 // If we're here, we would've returned earlier due to having advances blocked, but 322 // we have quota to advance up to a certain time while advances are blocked. 323 // Let's see when the next timeout is, and whether it falls within that quota. 324 let next_timeout = { 325 let now = state.core.instant(); 326 state 327 .sleepers 328 .peek() 329 .map(|sleepent| sleepent.when.saturating_duration_since(now)) 330 }; 331 let next_timeout = match next_timeout { 332 Some(x) => x, 333 None => { 334 // There's no timeout set, so we really shouldn't be here anyway. 335 trace!("should_advance = false; allow_one set but no timeout yet"); 336 return false; 337 } 338 }; 339 if next_timeout <= state.allowed_advance { 340 // We can advance up to the next timeout, since it's in our quota. 341 // Subtract the amount we're going to advance by from said quota. 342 state.allowed_advance -= next_timeout; 343 trace!( 344 "WARNING: allowing advance due to allow_one; new allowed is {:?}", 345 state.allowed_advance 346 ); 347 } else { 348 // The next timeout is too far in the future. 349 trace!( 350 "should_advance = false; allow_one set but only up to {:?}, next is {:?}", 351 state.allowed_advance, 352 next_timeout 353 ); 354 return false; 355 } 356 } 357 true 358 } 359 360 /// Register a `Waker` to be woken up when an advance in time is required to make progress. 361 /// 362 /// This is used by `WaitFor`. 363 pub(crate) fn register_waitfor_waker(&mut self, waker: Waker) { 364 let mut state = self.state.lock().expect("Poisoned lock for state"); 365 state.waitfor_waker = Some(waker); 366 } 367 368 /// Remove a previously registered `Waker` registered with `register_waitfor_waker()`. 369 pub(crate) fn clear_waitfor_waker(&mut self) { 370 let mut state = self.state.lock().expect("Poisoned lock for state"); 371 state.waitfor_waker = None; 372 } 373 374 /// Returns true if a `Waker` has been registered with `register_waitfor_waker()`. 375 /// 376 /// This is used to ensure that you don't have two concurrent `WaitFor`s running. 377 pub(crate) fn has_waitfor_waker(&self) -> bool { 378 let state = self.state.lock().expect("Poisoned lock for state"); 379 state.waitfor_waker.is_some() 380 } 381 } 382 383 impl SleepSchedule { 384 /// Wake any pending events that are ready according to the 385 /// current simulated time. 386 fn fire(&mut self) { 387 use std::collections::binary_heap::PeekMut; 388 389 let now = self.core.instant(); 390 while let Some(top) = self.sleepers.peek_mut() { 391 if now < top.when { 392 return; 393 } 394 395 PeekMut::pop(top).waker.wake(); 396 } 397 } 398 399 /// Add a new SleepEntry to this schedule. 400 fn push(&mut self, ent: SleepEntry) { 401 self.sleepers.push(ent); 402 } 403 404 /// If all sleepers made have been polled, set the advance flag and wake up any `WaitFor` that 405 /// might be waiting. 406 fn maybe_advance(&mut self) { 407 if self.sleepers_polled >= self.sleepers_made { 408 if let Some(ref waker) = self.waitfor_waker { 409 trace!("setting advance flag"); 410 self.should_advance = true; 411 waker.wake_by_ref(); 412 } else { 413 trace!("would advance, but no waker"); 414 } 415 } 416 } 417 418 /// Register a sleeper as having been polled, and advance if necessary. 419 fn increment_poll_count(&mut self) { 420 self.sleepers_polled += 1; 421 trace!( 422 "sleeper polled, {}/{}", 423 self.sleepers_polled, 424 self.sleepers_made 425 ); 426 self.maybe_advance(); 427 } 428 } 429 430 impl SleepProvider for MockSleepProvider { 431 type SleepFuture = Sleeping; 432 fn sleep(&self, duration: Duration) -> Self::SleepFuture { 433 let mut provider = self.state.lock().expect("Poisoned lock for state"); 434 let when = provider.core.instant() + duration; 435 // We're making a new sleeper, so register this in the state. 436 provider.sleepers_made += 1; 437 trace!( 438 "sleeper made for {:?}, {}/{}", 439 duration, 440 provider.sleepers_polled, 441 provider.sleepers_made 442 ); 443 444 Sleeping { 445 when, 446 inserted: false, 447 provider: Arc::downgrade(&self.state), 448 } 449 } 450 451 fn block_advance<T: Into<String>>(&self, reason: T) { 452 let mut provider = self.state.lock().expect("Poisoned lock for state"); 453 let reason = reason.into(); 454 trace!("advancing blocked: {}", reason); 455 provider.blocked_advance.insert(reason); 456 } 457 458 fn release_advance<T: Into<String>>(&self, reason: T) { 459 let mut provider = self.state.lock().expect("Poisoned lock for state"); 460 let reason = reason.into(); 461 trace!("advancing released: {}", reason); 462 provider.blocked_advance.remove(&reason); 463 if provider.blocked_advance.is_empty() { 464 provider.maybe_advance(); 465 } 466 } 467 468 fn allow_one_advance(&self, dur: Duration) { 469 let mut provider = self.state.lock().expect("Poisoned lock for state"); 470 provider.allowed_advance = Duration::max(provider.allowed_advance, dur); 471 trace!( 472 "** allow_one_advance fired; may advance up to {:?} **", 473 provider.allowed_advance 474 ); 475 provider.maybe_advance(); 476 } 477 478 fn now(&self) -> Instant { 479 self.state 480 .lock() 481 .expect("Poisoned lock for state") 482 .core 483 .instant() 484 } 485 486 fn wallclock(&self) -> SystemTime { 487 self.state 488 .lock() 489 .expect("Poisoned lock for state") 490 .core 491 .wallclock() 492 } 493 } 494 495 impl CoarseTimeProvider for MockSleepProvider { 496 fn now_coarse(&self) -> CoarseInstant { 497 self.state 498 .lock() 499 .expect("poisoned") 500 .core 501 .coarse() 502 .now_coarse() 503 } 504 } 505 506 impl PartialEq for SleepEntry { 507 fn eq(&self, other: &Self) -> bool { 508 self.when == other.when 509 } 510 } 511 impl Eq for SleepEntry {} 512 impl PartialOrd for SleepEntry { 513 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 514 Some(self.cmp(other)) 515 } 516 } 517 impl Ord for SleepEntry { 518 fn cmp(&self, other: &Self) -> Ordering { 519 self.when.cmp(&other.when).reverse() 520 } 521 } 522 523 impl Drop for Sleeping { 524 fn drop(&mut self) { 525 if let Some(provider) = Weak::upgrade(&self.provider) { 526 let mut provider = provider.lock().expect("Poisoned lock for provider"); 527 if !self.inserted { 528 // A sleeper being dropped will never be polled, so there's no point waiting; 529 // act as if it's been polled in order to avoid waiting forever. 530 trace!("sleeper dropped, incrementing count"); 531 provider.increment_poll_count(); 532 self.inserted = true; 533 } 534 } 535 } 536 } 537 538 impl Future for Sleeping { 539 type Output = (); 540 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 541 if let Some(provider) = Weak::upgrade(&self.provider) { 542 let mut provider = provider.lock().expect("Poisoned lock for provider"); 543 let now = provider.core.instant(); 544 545 if now >= self.when { 546 // The sleep time's elapsed. 547 if !self.inserted { 548 // If we never registered this sleeper as being polled, do so now. 549 provider.increment_poll_count(); 550 self.inserted = true; 551 } 552 if !provider.should_advance { 553 // The first advance during a `WaitFor` gets triggered by all sleepers that 554 // have been created being polled. 555 // However, this only happens once. 556 // What we do to get around this is have sleepers that return Ready kick off 557 // another advance, in order to wake the next waiting sleeper. 558 provider.maybe_advance(); 559 } 560 return Poll::Ready(()); 561 } 562 // dbg!("sleep check with", self.when-now); 563 564 if !self.inserted { 565 let entry = SleepEntry { 566 when: self.when, 567 waker: cx.waker().clone(), 568 }; 569 570 provider.push(entry); 571 self.inserted = true; 572 // Register this sleeper as having been polled. 573 provider.increment_poll_count(); 574 } 575 // dbg!(provider.sleepers.len()); 576 } 577 Poll::Pending 578 } 579 } 580 581 #[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME 582 mod test { 583 // @@ begin test lint list maintained by maint/add_warning @@ 584 #![allow(clippy::bool_assert_comparison)] 585 #![allow(clippy::clone_on_copy)] 586 #![allow(clippy::dbg_macro)] 587 #![allow(clippy::mixed_attributes_style)] 588 #![allow(clippy::print_stderr)] 589 #![allow(clippy::print_stdout)] 590 #![allow(clippy::single_char_pattern)] 591 #![allow(clippy::unwrap_used)] 592 #![allow(clippy::unchecked_duration_subtraction)] 593 #![allow(clippy::useless_vec)] 594 #![allow(clippy::needless_pass_by_value)] 595 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 596 use super::*; 597 use tor_rtcompat::test_with_all_runtimes; 598 599 #[test] 600 fn basics_of_time_travel() { 601 let w1 = SystemTime::now(); 602 let sp = MockSleepProvider::new(w1); 603 let i1 = sp.now(); 604 assert_eq!(sp.wallclock(), w1); 605 606 let interval = Duration::new(4 * 3600 + 13 * 60, 0); 607 sp.advance_noyield(interval); 608 assert_eq!(sp.now(), i1 + interval); 609 assert_eq!(sp.wallclock(), w1 + interval); 610 611 sp.jump_to(w1 + interval * 3); 612 assert_eq!(sp.now(), i1 + interval); 613 assert_eq!(sp.wallclock(), w1 + interval * 3); 614 } 615 616 #[test] 617 fn time_moves_on() { 618 test_with_all_runtimes!(|_| async { 619 use oneshot_fused_workaround as oneshot; 620 use std::sync::atomic::AtomicBool; 621 use std::sync::atomic::Ordering; 622 623 let sp = MockSleepProvider::new(SystemTime::now()); 624 let one_hour = Duration::new(3600, 0); 625 626 let (s1, r1) = oneshot::channel(); 627 let (s2, r2) = oneshot::channel(); 628 let (s3, r3) = oneshot::channel(); 629 630 let b1 = AtomicBool::new(false); 631 let b2 = AtomicBool::new(false); 632 let b3 = AtomicBool::new(false); 633 634 let real_start = Instant::now(); 635 636 futures::join!( 637 async { 638 sp.sleep(one_hour).await; 639 b1.store(true, Ordering::SeqCst); 640 s1.send(()).unwrap(); 641 }, 642 async { 643 sp.sleep(one_hour * 3).await; 644 b2.store(true, Ordering::SeqCst); 645 s2.send(()).unwrap(); 646 }, 647 async { 648 sp.sleep(one_hour * 5).await; 649 b3.store(true, Ordering::SeqCst); 650 s3.send(()).unwrap(); 651 }, 652 async { 653 sp.advance(one_hour * 2).await; 654 r1.await.unwrap(); 655 assert!(b1.load(Ordering::SeqCst)); 656 assert!(!b2.load(Ordering::SeqCst)); 657 assert!(!b3.load(Ordering::SeqCst)); 658 659 sp.advance(one_hour * 2).await; 660 r2.await.unwrap(); 661 assert!(b1.load(Ordering::SeqCst)); 662 assert!(b2.load(Ordering::SeqCst)); 663 assert!(!b3.load(Ordering::SeqCst)); 664 665 sp.advance(one_hour * 2).await; 666 r3.await.unwrap(); 667 assert!(b1.load(Ordering::SeqCst)); 668 assert!(b2.load(Ordering::SeqCst)); 669 assert!(b3.load(Ordering::SeqCst)); 670 let real_end = Instant::now(); 671 672 assert!(real_end - real_start < one_hour); 673 } 674 ); 675 std::io::Result::Ok(()) 676 }); 677 } 678 }