notifications.rs
1 use std::collections::hash_map::DefaultHasher; 2 use std::hash::{Hash, Hasher}; 3 4 use bitvec::vec::BitVec; 5 use tokio::sync::futures::Notified; 6 use tokio::sync::Notify; 7 8 /// Number of buckets used for `Notifications`. 9 const NOTIFY_BUCKETS: usize = 32; 10 11 /// The state of Notification. 12 /// 13 /// This stores `NOTIFY_BUCKETS` number of `Notifies`. 14 /// Each key is assigned a bucket based on its hash value. 15 /// This will cause some false positives. 16 #[derive(Debug)] 17 pub struct Notifications { 18 buckets: Vec<Notify>, 19 } 20 21 impl Default for Notifications { 22 fn default() -> Self { 23 Self { 24 buckets: (0..NOTIFY_BUCKETS).map(|_| Notify::new()).collect(), 25 } 26 } 27 } 28 29 fn slot_index_for_hash(hash_value: u64) -> usize { 30 (hash_value % (NOTIFY_BUCKETS as u64)) as usize 31 } 32 33 fn slot_index_for_key<K: Hash>(key: K) -> usize { 34 let mut hasher = DefaultHasher::new(); 35 key.hash(&mut hasher); 36 let hash_value = hasher.finish(); 37 slot_index_for_hash(hash_value) 38 } 39 40 impl Notifications { 41 pub fn new() -> Self { 42 Self::default() 43 } 44 45 /// This registers for notification when called. 46 /// 47 /// Then waits for the notification when .awaited. 48 /// 49 /// NOTE: This may some false positives. 50 pub fn register<K>(&self, key: K) -> Notified 51 where 52 K: Hash, 53 { 54 self.buckets[slot_index_for_key(key)].notified() 55 } 56 57 /// Notify a key. 58 /// 59 /// All the waiters for this keys will be notified. 60 pub async fn notify<K>(&self, key: K) 61 where 62 K: Hash, 63 { 64 self.buckets[slot_index_for_key(key)].notify_waiters(); 65 } 66 67 /// Notifies the waiters about the notifications recorded in NotifyQueue. 68 pub fn submit_queue(&self, queue: NotifyQueue) { 69 for bucket in queue.buckets.iter_ones() { 70 self.buckets[bucket].notify_waiters(); 71 } 72 } 73 } 74 75 /// Save notifications to be sent after transaction is complete. 76 #[derive(Debug)] 77 pub struct NotifyQueue { 78 buckets: BitVec, 79 } 80 81 impl Default for NotifyQueue { 82 fn default() -> Self { 83 Self { 84 buckets: BitVec::repeat(false, NOTIFY_BUCKETS), 85 } 86 } 87 } 88 89 impl NotifyQueue { 90 pub fn new() -> Self { 91 Self::default() 92 } 93 94 pub fn add<K>(&mut self, key: &K) 95 where 96 K: Hash, 97 { 98 self.buckets.set(slot_index_for_key(key), true); 99 } 100 } 101 102 #[cfg(test)] 103 mod tests { 104 use fedimint_core::db::test_utils::future_returns_shortly; 105 106 use super::*; 107 108 #[tokio::test] 109 async fn test_notification_after_notify() { 110 let notifs = Notifications::new(); 111 let key = 1; 112 let sub = notifs.register(key); 113 notifs.notify(&key).await; 114 assert!(future_returns_shortly(sub).await.is_some(), "should notify"); 115 } 116 117 #[tokio::test] 118 async fn test_no_notification_without_notify() { 119 let notifs = Notifications::new(); 120 let key = 1; 121 let sub = notifs.register(key); 122 assert!( 123 future_returns_shortly(sub).await.is_none(), 124 "should not notify" 125 ); 126 } 127 128 #[tokio::test] 129 async fn test_multi() { 130 let notifs = Notifications::new(); 131 let key1 = 1; 132 let key2 = 2; 133 let sub1 = notifs.register(key1); 134 let sub2 = notifs.register(key2); 135 notifs.notify(&key1).await; 136 notifs.notify(&key2).await; 137 assert!( 138 future_returns_shortly(sub1).await.is_some(), 139 "should notify" 140 ); 141 assert!( 142 future_returns_shortly(sub2).await.is_some(), 143 "should notify" 144 ); 145 } 146 147 #[tokio::test] 148 async fn test_notify_queue() { 149 let notifs = Notifications::new(); 150 let key1 = 1; 151 let key2 = 2; 152 let sub1 = notifs.register(key1); 153 let sub2 = notifs.register(key2); 154 let mut queue = NotifyQueue::new(); 155 queue.add(&key1); 156 queue.add(&key2); 157 notifs.submit_queue(queue); 158 assert!( 159 future_returns_shortly(sub1).await.is_some(), 160 "should notify" 161 ); 162 assert!( 163 future_returns_shortly(sub2).await.is_some(), 164 "should notify" 165 ); 166 } 167 }