watch.rs
1 //! Extension trait for more efficient use of [`postage::watch`]. 2 use std::ops::{Deref, DerefMut}; 3 use void::{ResultVoidExt as _, Void}; 4 5 /// Extension trait for some `postage::watch::Sender` to provide `maybe_send` 6 /// 7 /// Ideally these, or something like them, would be upstream: 8 /// See <https://github.com/austinjones/postage-rs/issues/56>. 9 /// 10 /// We provide this as an extension trait became the implementation is a bit fiddly. 11 /// This lets us concentrate on the actual logic, when we use it. 12 pub trait PostageWatchSenderExt<T> { 13 /// Update, by calling a fallible function, sending only if necessary 14 /// 15 /// Calls `update` on the current value in the watch, to obtain a new value. 16 /// If the new value doesn't compare equal, updates the watch, notifying receivers. 17 fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E> 18 where 19 T: PartialEq, 20 F: FnOnce(&T) -> Result<T, E>; 21 22 /// Update, by calling a function, sending only if necessary 23 /// 24 /// Calls `update` on the current value in the watch, to obtain a new value. 25 /// If the new value doesn't compare equal, updates the watch, notifying receivers. 26 fn maybe_send<F>(&mut self, update: F) 27 where 28 T: PartialEq, 29 F: FnOnce(&T) -> T, 30 { 31 self.try_maybe_send(|t| Ok::<_, Void>(update(t))) 32 .void_unwrap(); 33 } 34 } 35 36 impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> { 37 fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E> 38 where 39 T: PartialEq, 40 F: FnOnce(&T) -> Result<T, E>, 41 { 42 let lock = self.borrow(); 43 let new = update(&*lock)?; 44 if new != *lock { 45 // We must drop the lock guard, because otherwise borrow_mut will deadlock. 46 // There is no race, because we hold &mut self, so no-one else can get a look in. 47 // (postage::watch::Sender is not one of those facilities which is mereely a 48 // handle, and Clone.) 49 drop(lock); 50 *self.borrow_mut() = new; 51 } 52 Ok(()) 53 } 54 } 55 56 #[derive(Debug)] 57 /// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped 58 /// 59 /// Derefs to the inner `Sender`. 60 /// 61 /// Ideally this would be behaviour promised by upstream, or something 62 /// See <https://github.com/austinjones/postage-rs/issues/57>. 63 pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>); 64 65 /// Values that can signal EOF 66 /// 67 /// Implemented for `Option`, which is usually what you want to use. 68 pub trait DropNotifyEofSignallable { 69 /// Generate the EOF value 70 fn eof() -> Self; 71 72 /// Does this value indicate EOF? 73 /// 74 /// ### Deprecated 75 /// 76 /// This method is deprecated. 77 /// It should not be called, or defined, in new programs. 78 /// It is not required by [`DropNotifyWatchSender`]. 79 /// The provided implementation always returns `false`. 80 #[deprecated] 81 fn is_eof(&self) -> bool { 82 false 83 } 84 } 85 86 impl<T> DropNotifyEofSignallable for Option<T> { 87 fn eof() -> Self { 88 None 89 } 90 91 fn is_eof(&self) -> bool { 92 self.is_none() 93 } 94 } 95 96 impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> { 97 /// Arrange to send `T::Default` when `inner` is dropped 98 pub fn new(inner: postage::watch::Sender<T>) -> Self { 99 DropNotifyWatchSender(Some(inner)) 100 } 101 102 /// Unwrap the inner sender, defusing the drop notification 103 pub fn into_inner(mut self) -> postage::watch::Sender<T> { 104 self.0.take().expect("inner was None") 105 } 106 } 107 108 impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> { 109 type Target = postage::watch::Sender<T>; 110 fn deref(&self) -> &Self::Target { 111 self.0.as_ref().expect("inner was None") 112 } 113 } 114 115 impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> { 116 fn deref_mut(&mut self) -> &mut Self::Target { 117 self.0.as_mut().expect("inner was None") 118 } 119 } 120 121 impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> { 122 fn drop(&mut self) { 123 if let Some(mut inner) = self.0.take() { 124 // None means into_inner() was called 125 *inner.borrow_mut() = DropNotifyEofSignallable::eof(); 126 } 127 } 128 } 129 130 #[cfg(test)] 131 mod test { 132 // @@ begin test lint list maintained by maint/add_warning @@ 133 #![allow(clippy::bool_assert_comparison)] 134 #![allow(clippy::clone_on_copy)] 135 #![allow(clippy::dbg_macro)] 136 #![allow(clippy::mixed_attributes_style)] 137 #![allow(clippy::print_stderr)] 138 #![allow(clippy::print_stdout)] 139 #![allow(clippy::single_char_pattern)] 140 #![allow(clippy::unwrap_used)] 141 #![allow(clippy::unchecked_duration_subtraction)] 142 #![allow(clippy::useless_vec)] 143 #![allow(clippy::needless_pass_by_value)] 144 //! <!-- @@ end test lint list maintained by maint/add_warning @@ --> 145 146 use super::*; 147 use futures::select_biased; 148 use futures_await_test::async_test; 149 150 #[async_test] 151 async fn postage_sender_ext() { 152 use futures::stream::StreamExt; 153 use futures::FutureExt; 154 155 let (mut s, mut r) = postage::watch::channel_with(20); 156 // Receiver of a fresh watch wakes once, but let's not rely on this 157 select_biased! { 158 i = r.next().fuse() => assert_eq!(i, Some(20)), 159 _ = futures::future::ready(()) => { }, // tolerate nothing 160 }; 161 // Now, not ready 162 select_biased! { 163 _ = r.next().fuse() => panic!(), 164 _ = futures::future::ready(()) => { }, 165 }; 166 167 s.maybe_send(|i| *i); 168 // Still not ready 169 select_biased! { 170 _ = r.next().fuse() => panic!(), 171 _ = futures::future::ready(()) => { }, 172 }; 173 174 s.maybe_send(|i| *i + 1); 175 // Ready, with 21 176 select_biased! { 177 i = r.next().fuse() => assert_eq!(i, Some(21)), 178 _ = futures::future::ready(()) => panic!(), 179 }; 180 181 let () = s.try_maybe_send(|_i| Err(())).unwrap_err(); 182 // Not ready 183 select_biased! { 184 _ = r.next().fuse() => panic!(), 185 _ = futures::future::ready(()) => { }, 186 }; 187 } 188 189 #[async_test] 190 async fn postage_drop() { 191 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 192 struct I(i32); 193 194 impl DropNotifyEofSignallable for I { 195 fn eof() -> I { 196 I(0) 197 } 198 fn is_eof(&self) -> bool { 199 self.0 == 0 200 } 201 } 202 203 let (s, r) = postage::watch::channel_with(I(20)); 204 let s = DropNotifyWatchSender::new(s); 205 206 assert_eq!(*r.borrow(), I(20)); 207 drop(s); 208 assert_eq!(*r.borrow(), I(0)); 209 210 let (s, r) = postage::watch::channel_with(I(44)); 211 let s = DropNotifyWatchSender::new(s); 212 213 assert_eq!(*r.borrow(), I(44)); 214 drop(s.into_inner()); 215 assert_eq!(*r.borrow(), I(44)); 216 } 217 }