/ fedimint-core / src / db / notifications.rs
notifications.rs
  1  use std::collections::hash_map::DefaultHasher;
  2  use std::hash::{Hash, Hasher};
  3  
  4  use bitvec::vec::BitVec;
  5  use tokio::sync::futures::Notified;
  6  use tokio::sync::Notify;
  7  
  8  /// Number of buckets used for `Notifications`.
  9  const NOTIFY_BUCKETS: usize = 32;
 10  
 11  /// The state of Notification.
 12  ///
 13  /// This stores `NOTIFY_BUCKETS` number of `Notifies`.
 14  /// Each key is assigned a bucket based on its hash value.
 15  /// This will cause some false positives.
 16  #[derive(Debug)]
 17  pub struct Notifications {
 18      buckets: Vec<Notify>,
 19  }
 20  
 21  impl Default for Notifications {
 22      fn default() -> Self {
 23          Self {
 24              buckets: (0..NOTIFY_BUCKETS).map(|_| Notify::new()).collect(),
 25          }
 26      }
 27  }
 28  
 29  fn slot_index_for_hash(hash_value: u64) -> usize {
 30      (hash_value % (NOTIFY_BUCKETS as u64)) as usize
 31  }
 32  
 33  fn slot_index_for_key<K: Hash>(key: K) -> usize {
 34      let mut hasher = DefaultHasher::new();
 35      key.hash(&mut hasher);
 36      let hash_value = hasher.finish();
 37      slot_index_for_hash(hash_value)
 38  }
 39  
 40  impl Notifications {
 41      pub fn new() -> Self {
 42          Self::default()
 43      }
 44  
 45      /// This registers for notification when called.
 46      ///
 47      /// Then waits for the notification when .awaited.
 48      ///
 49      /// NOTE: This may some false positives.
 50      pub fn register<K>(&self, key: K) -> Notified
 51      where
 52          K: Hash,
 53      {
 54          self.buckets[slot_index_for_key(key)].notified()
 55      }
 56  
 57      /// Notify a key.
 58      ///
 59      /// All the waiters for this keys will be notified.
 60      pub async fn notify<K>(&self, key: K)
 61      where
 62          K: Hash,
 63      {
 64          self.buckets[slot_index_for_key(key)].notify_waiters();
 65      }
 66  
 67      /// Notifies the waiters about the notifications recorded in NotifyQueue.
 68      pub fn submit_queue(&self, queue: NotifyQueue) {
 69          for bucket in queue.buckets.iter_ones() {
 70              self.buckets[bucket].notify_waiters();
 71          }
 72      }
 73  }
 74  
 75  /// Save notifications to be sent after transaction is complete.
 76  #[derive(Debug)]
 77  pub struct NotifyQueue {
 78      buckets: BitVec,
 79  }
 80  
 81  impl Default for NotifyQueue {
 82      fn default() -> Self {
 83          Self {
 84              buckets: BitVec::repeat(false, NOTIFY_BUCKETS),
 85          }
 86      }
 87  }
 88  
 89  impl NotifyQueue {
 90      pub fn new() -> Self {
 91          Self::default()
 92      }
 93  
 94      pub fn add<K>(&mut self, key: &K)
 95      where
 96          K: Hash,
 97      {
 98          self.buckets.set(slot_index_for_key(key), true);
 99      }
100  }
101  
102  #[cfg(test)]
103  mod tests {
104      use fedimint_core::db::test_utils::future_returns_shortly;
105  
106      use super::*;
107  
108      #[tokio::test]
109      async fn test_notification_after_notify() {
110          let notifs = Notifications::new();
111          let key = 1;
112          let sub = notifs.register(key);
113          notifs.notify(&key).await;
114          assert!(future_returns_shortly(sub).await.is_some(), "should notify");
115      }
116  
117      #[tokio::test]
118      async fn test_no_notification_without_notify() {
119          let notifs = Notifications::new();
120          let key = 1;
121          let sub = notifs.register(key);
122          assert!(
123              future_returns_shortly(sub).await.is_none(),
124              "should not notify"
125          );
126      }
127  
128      #[tokio::test]
129      async fn test_multi() {
130          let notifs = Notifications::new();
131          let key1 = 1;
132          let key2 = 2;
133          let sub1 = notifs.register(key1);
134          let sub2 = notifs.register(key2);
135          notifs.notify(&key1).await;
136          notifs.notify(&key2).await;
137          assert!(
138              future_returns_shortly(sub1).await.is_some(),
139              "should notify"
140          );
141          assert!(
142              future_returns_shortly(sub2).await.is_some(),
143              "should notify"
144          );
145      }
146  
147      #[tokio::test]
148      async fn test_notify_queue() {
149          let notifs = Notifications::new();
150          let key1 = 1;
151          let key2 = 2;
152          let sub1 = notifs.register(key1);
153          let sub2 = notifs.register(key2);
154          let mut queue = NotifyQueue::new();
155          queue.add(&key1);
156          queue.add(&key2);
157          notifs.submit_queue(queue);
158          assert!(
159              future_returns_shortly(sub1).await.is_some(),
160              "should notify"
161          );
162          assert!(
163              future_returns_shortly(sub2).await.is_some(),
164              "should notify"
165          );
166      }
167  }