/ crates / tor-async-utils / src / watch.rs
watch.rs
  1  //! Extension trait for more efficient use of [`postage::watch`].
  2  use std::ops::{Deref, DerefMut};
  3  use void::{ResultVoidExt as _, Void};
  4  
  5  /// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
  6  ///
  7  /// Ideally these, or something like them, would be upstream:
  8  /// See <https://github.com/austinjones/postage-rs/issues/56>.
  9  ///
 10  /// We provide this as an extension trait became the implementation is a bit fiddly.
 11  /// This lets us concentrate on the actual logic, when we use it.
 12  pub trait PostageWatchSenderExt<T> {
 13      /// Update, by calling a fallible function, sending only if necessary
 14      ///
 15      /// Calls `update` on the current value in the watch, to obtain a new value.
 16      /// If the new value doesn't compare equal, updates the watch, notifying receivers.
 17      fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
 18      where
 19          T: PartialEq,
 20          F: FnOnce(&T) -> Result<T, E>;
 21  
 22      /// Update, by calling a function, sending only if necessary
 23      ///
 24      /// Calls `update` on the current value in the watch, to obtain a new value.
 25      /// If the new value doesn't compare equal, updates the watch, notifying receivers.
 26      fn maybe_send<F>(&mut self, update: F)
 27      where
 28          T: PartialEq,
 29          F: FnOnce(&T) -> T,
 30      {
 31          self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
 32              .void_unwrap();
 33      }
 34  }
 35  
 36  impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
 37      fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
 38      where
 39          T: PartialEq,
 40          F: FnOnce(&T) -> Result<T, E>,
 41      {
 42          let lock = self.borrow();
 43          let new = update(&*lock)?;
 44          if new != *lock {
 45              // We must drop the lock guard, because otherwise borrow_mut will deadlock.
 46              // There is no race, because we hold &mut self, so no-one else can get a look in.
 47              // (postage::watch::Sender is not one of those facilities which is mereely a
 48              // handle, and Clone.)
 49              drop(lock);
 50              *self.borrow_mut() = new;
 51          }
 52          Ok(())
 53      }
 54  }
 55  
 56  #[derive(Debug)]
 57  /// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
 58  ///
 59  /// Derefs to the inner `Sender`.
 60  ///
 61  /// Ideally this would be behaviour promised by upstream, or something
 62  /// See <https://github.com/austinjones/postage-rs/issues/57>.
 63  pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
 64  
 65  /// Values that can signal EOF
 66  ///
 67  /// Implemented for `Option`, which is usually what you want to use.
 68  pub trait DropNotifyEofSignallable {
 69      /// Generate the EOF value
 70      fn eof() -> Self;
 71  
 72      /// Does this value indicate EOF?
 73      ///
 74      /// ### Deprecated
 75      ///
 76      /// This method is deprecated.
 77      /// It should not be called, or defined, in new programs.
 78      /// It is not required by [`DropNotifyWatchSender`].
 79      /// The provided implementation always returns `false`.
 80      #[deprecated]
 81      fn is_eof(&self) -> bool {
 82          false
 83      }
 84  }
 85  
 86  impl<T> DropNotifyEofSignallable for Option<T> {
 87      fn eof() -> Self {
 88          None
 89      }
 90  
 91      fn is_eof(&self) -> bool {
 92          self.is_none()
 93      }
 94  }
 95  
 96  impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
 97      /// Arrange to send `T::Default` when `inner` is dropped
 98      pub fn new(inner: postage::watch::Sender<T>) -> Self {
 99          DropNotifyWatchSender(Some(inner))
100      }
101  
102      /// Unwrap the inner sender, defusing the drop notification
103      pub fn into_inner(mut self) -> postage::watch::Sender<T> {
104          self.0.take().expect("inner was None")
105      }
106  }
107  
108  impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
109      type Target = postage::watch::Sender<T>;
110      fn deref(&self) -> &Self::Target {
111          self.0.as_ref().expect("inner was None")
112      }
113  }
114  
115  impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
116      fn deref_mut(&mut self) -> &mut Self::Target {
117          self.0.as_mut().expect("inner was None")
118      }
119  }
120  
121  impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
122      fn drop(&mut self) {
123          if let Some(mut inner) = self.0.take() {
124              // None means into_inner() was called
125              *inner.borrow_mut() = DropNotifyEofSignallable::eof();
126          }
127      }
128  }
129  
130  #[cfg(test)]
131  mod test {
132      // @@ begin test lint list maintained by maint/add_warning @@
133      #![allow(clippy::bool_assert_comparison)]
134      #![allow(clippy::clone_on_copy)]
135      #![allow(clippy::dbg_macro)]
136      #![allow(clippy::mixed_attributes_style)]
137      #![allow(clippy::print_stderr)]
138      #![allow(clippy::print_stdout)]
139      #![allow(clippy::single_char_pattern)]
140      #![allow(clippy::unwrap_used)]
141      #![allow(clippy::unchecked_duration_subtraction)]
142      #![allow(clippy::useless_vec)]
143      #![allow(clippy::needless_pass_by_value)]
144      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
145  
146      use super::*;
147      use futures::select_biased;
148      use futures_await_test::async_test;
149  
150      #[async_test]
151      async fn postage_sender_ext() {
152          use futures::stream::StreamExt;
153          use futures::FutureExt;
154  
155          let (mut s, mut r) = postage::watch::channel_with(20);
156          // Receiver of a fresh watch wakes once, but let's not rely on this
157          select_biased! {
158              i = r.next().fuse() => assert_eq!(i, Some(20)),
159              _ = futures::future::ready(()) => { }, // tolerate nothing
160          };
161          // Now, not ready
162          select_biased! {
163              _ = r.next().fuse() => panic!(),
164              _ = futures::future::ready(()) => { },
165          };
166  
167          s.maybe_send(|i| *i);
168          // Still not ready
169          select_biased! {
170              _ = r.next().fuse() => panic!(),
171              _ = futures::future::ready(()) => { },
172          };
173  
174          s.maybe_send(|i| *i + 1);
175          // Ready, with 21
176          select_biased! {
177              i = r.next().fuse() => assert_eq!(i, Some(21)),
178              _ = futures::future::ready(()) => panic!(),
179          };
180  
181          let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
182          // Not ready
183          select_biased! {
184              _ = r.next().fuse() => panic!(),
185              _ = futures::future::ready(()) => { },
186          };
187      }
188  
189      #[async_test]
190      async fn postage_drop() {
191          #[derive(Clone, Copy, Debug, Eq, PartialEq)]
192          struct I(i32);
193  
194          impl DropNotifyEofSignallable for I {
195              fn eof() -> I {
196                  I(0)
197              }
198              fn is_eof(&self) -> bool {
199                  self.0 == 0
200              }
201          }
202  
203          let (s, r) = postage::watch::channel_with(I(20));
204          let s = DropNotifyWatchSender::new(s);
205  
206          assert_eq!(*r.borrow(), I(20));
207          drop(s);
208          assert_eq!(*r.borrow(), I(0));
209  
210          let (s, r) = postage::watch::channel_with(I(44));
211          let s = DropNotifyWatchSender::new(s);
212  
213          assert_eq!(*r.borrow(), I(44));
214          drop(s.into_inner());
215          assert_eq!(*r.borrow(), I(44));
216      }
217  }