/ fedimint-core / src / task / waiter.rs
waiter.rs
  1  //! Wait for a task to finish.
  2  
  3  use tokio::sync::Semaphore;
  4  
  5  /// Helper to wait for actions to be [`Self::done`]
  6  #[derive(Debug)]
  7  pub struct Waiter {
  8      done_semaphore: Semaphore,
  9  }
 10  
 11  impl Default for Waiter {
 12      fn default() -> Self {
 13          Self::new()
 14      }
 15  }
 16  
 17  impl Waiter {
 18      pub fn new() -> Self {
 19          Self {
 20              // semaphore never has permits.
 21              done_semaphore: Semaphore::new(0),
 22          }
 23      }
 24  
 25      /// Mark this waiter as done.
 26      ///
 27      /// NOTE: Calling this twice is ignored.
 28      pub fn done(&self) {
 29          // close the semaphore and notify all waiters.
 30          self.done_semaphore.close();
 31      }
 32  
 33      /// Wait for [`Self::done`] call.
 34      pub async fn wait(&self) {
 35          // wait for semaphore to be closed.
 36          self.done_semaphore
 37              .acquire()
 38              .await
 39              .expect_err("done semaphore is only closed, never has permits");
 40      }
 41  
 42      /// Check if Waiter was marked as done.
 43      pub fn is_done(&self) -> bool {
 44          self.done_semaphore.is_closed()
 45      }
 46  }
 47  
 48  #[cfg(test)]
 49  mod tests {
 50      use std::time::Duration;
 51  
 52      use super::*;
 53  
 54      #[tokio::test]
 55      async fn test_simple() {
 56          let waiter = Waiter::new();
 57          assert!(!waiter.is_done());
 58          waiter.done();
 59          assert!(waiter.is_done());
 60      }
 61  
 62      #[tokio::test]
 63      async fn test_async() {
 64          let waiter = Waiter::new();
 65          assert!(!waiter.is_done());
 66          tokio::join!(
 67              async {
 68                  waiter.done();
 69              },
 70              async {
 71                  waiter.wait().await;
 72              }
 73          );
 74          assert!(waiter.is_done());
 75          waiter.wait().await;
 76          assert!(waiter.is_done());
 77      }
 78      #[tokio::test]
 79      async fn test_async_multi() {
 80          let waiter = Waiter::new();
 81          assert!(!waiter.is_done());
 82          tokio::join!(
 83              async {
 84                  waiter.done();
 85              },
 86              async {
 87                  waiter.done();
 88              },
 89              async {
 90                  waiter.done();
 91              },
 92          );
 93          assert!(waiter.is_done());
 94          waiter.wait().await;
 95          assert!(waiter.is_done());
 96      }
 97      #[tokio::test]
 98      async fn test_async_sleep() {
 99          let waiter = Waiter::new();
100          assert!(!waiter.is_done());
101          tokio::join!(
102              async {
103                  fedimint_core::runtime::sleep(Duration::from_millis(10)).await;
104                  waiter.done();
105              },
106              waiter.wait(),
107          );
108          assert!(waiter.is_done());
109          waiter.wait().await;
110          assert!(waiter.is_done());
111      }
112  }