/ crates / tor-async-utils / src / prepare_send.rs
prepare_send.rs
  1  //! Extension trait for using [`Sink`] more safely.
  2  
  3  use std::future::Future;
  4  use std::marker::PhantomData;
  5  use std::pin::Pin;
  6  use std::task::{Context, Poll};
  7  
  8  use futures::future::FusedFuture;
  9  use futures::ready;
 10  use futures::Sink;
 11  use pin_project::pin_project;
 12  
 13  /// Switch to the nontrivial version of this, to get debugging output on stderr
 14  macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } }
 15  //macro_rules! dprintln { { $f:literal $($a:tt)* } => { eprintln!(concat!("    ",$f) $($a)*) } }
 16  
 17  /// Extension trait for [`Sink`] to add a method for cancel-safe usage.
 18  pub trait SinkPrepareExt<'w, OS, OM>
 19  where
 20      OS: Sink<OM>,
 21  {
 22      /// For processing an item obtained from a future, avoiding async cancel lossage
 23      ///
 24      /// ```
 25      /// # use futures::channel::mpsc;
 26      /// # use tor_async_utils::SinkPrepareExt as _;
 27      /// #
 28      /// # #[tokio::main]
 29      /// # async fn main() -> Result<(),mpsc::SendError> {
 30      /// #   let (mut sink, sink_r) = mpsc::unbounded::<usize>();
 31      /// #   let message_generator_future = futures::future::ready(42);
 32      /// #   let process_message = |m| Ok::<_,mpsc::SendError>(m);
 33      ///     let (message, sendable) = sink.prepare_send_from(
 34      ///         message_generator_future
 35      ///     ).await?;
 36      ///     let message = process_message(message)?;
 37      ///     sendable.send(message);
 38      /// #   Ok(())
 39      /// # }
 40      /// ```
 41      ///
 42      /// Prepares to send a output message[^terminology] `OM` to an output sink `OS` (`self`),
 43      /// where the `OM` is made from an input message `IM`,
 44      /// and the `IM` is obtained from a future, `generator: IF`.
 45      ///
 46      /// [^terminology]: We sometimes use slightly inconsistent terminology,
 47      /// "item" vs "message".
 48      /// This avoids having to have the generic parameters by named `OI` and `II`
 49      /// where `I` is sometimes "item" and sometimes "input".
 50      ///
 51      /// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`.
 52      ///
 53      /// After processing `IM` into `OM`,
 54      /// use the [`SinkSendable`] to [`send`](SinkSendable::send) the `OM` to `OS`.
 55      ///
 56      /// # Why use this
 57      ///
 58      /// This avoids the an async cancellation hazard
 59      /// which exists with naive use of `select!`
 60      /// followed by `OS.send().await`.  You might write this:
 61      ///
 62      /// ```rust,ignore
 63      /// select!{
 64      ///     message = input_stream.next() => {
 65      ///         if let Some(message) = message {
 66      ///             let message = do_our_processing(message);
 67      ///             output_sink(message).await; // <---**BUG**
 68      ///         }
 69      ///     }
 70      ///     control = something_else() => { .. }
 71      /// }
 72      /// ```
 73      ///
 74      /// If, when we reach `BUG`, the output sink is not ready to receive the message,
 75      /// the future for that particular `select!` branch will be suspended.
 76      /// But when `select!` finds that *any one* of the branches is ready,
 77      /// it *drops* the futures for the other branches.
 78      /// That drops all the local variables, including possibly `message`, losing it.
 79      ///
 80      /// For more about cancellation safety, see
 81      /// [Rust for the Polyglot Programmer](https://www.chiark.greenend.org.uk/~ianmdlvl/rust-polyglot/async.html#cancellation-safety)
 82      /// which has a general summary, and
 83      /// Matthias Einwag's
 84      /// [extensive discussion in his gist](https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8#cancellation-in-async-rust)
 85      /// with comparisons to other languages.
 86      ///
 87      /// ## Alternatives
 88      ///
 89      /// Unbounded mpsc channels, and certain other primitives,
 90      /// do not suffer from this problem because they do not block.
 91      /// `UnboundedSender` offers
 92      /// [`unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send)
 93      /// but only as an inherent method, so this does not compose with `Sink` combinators.
 94      /// And of course unbounded channels do not implement any backpressure.
 95      ///
 96      /// The problem can otherwise be avoided by completely eschewing use of `select!`
 97      /// and writing manual implementations of `Future`, `Sink`, and so on,
 98      /// However, such code is typically considerably more complex and involves
 99      /// entangling the primary logic with future machinery.
100      /// It is normally better to write primary functionality in `async { }`
101      /// using utilities (often "futures combinators") such as this one.
102      ///
103      // Personal note from @Diziet:
104      // IMO it is generally accepted in the Rust community that
105      // it is not good practice to write principal code at the manual futues level.
106      // However, I have not been able to find very clear support for this proposition.
107      // There are endless articles explaining how futures work internally,
108      // often by describing how to reimplement standard combinators such as `map`.
109      // ISTM that these exist to help understanding,
110      // but it seems to be only rarely stated that doing this is not generally a good idea.
111      //
112      // I did find the following:
113      //
114      //  https://dev.to/mindflavor/rust-futures-an-uneducated-short-and-hopefully-not-boring-tutorial---part-4---a-real-future-from-scratch-734#conclusion
115      //
116      //    Of course you generally do not write a future manually. You use the ones provided by
117      //    libraries and compose them as needed. It's important to understand how they work
118      //    nevertheless.
119      //
120      // And of curse the existence of the `futures` crate is indicative:
121      // it consists almost entirely of combinators and utilities
122      // whose purpose is to allow you to write many structures in async code
123      // without needing to resort to manual future impls.
124      //
125      /// # Example
126      ///
127      /// This comprehensive example demonstrates how to read from possibly multiple sources
128      /// and also be able to process other events:
129      ///
130      /// ```
131      /// # #[tokio::main]
132      /// # async fn main() {
133      /// use futures::select;
134      /// use futures::{SinkExt as _, StreamExt as _};
135      /// use tor_async_utils::SinkPrepareExt as _;
136      ///
137      /// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
138      /// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
139      /// input_w.send(42).await;
140      /// select!{
141      ///     ret = output_w.prepare_send_from(async {
142      ///         select!{
143      ///             got_input = input_r.next() => got_input.expect("input stream ended!"),
144      ///             () = futures::future::pending() => panic!(), // other branches are OK here
145      ///         }
146      ///     }) => {
147      ///         let (input_msg, sendable) = ret.unwrap();
148      ///         let output_msg = input_msg.to_string();
149      ///         let () = sendable.send(output_msg).unwrap();
150      ///     },
151      ///     () = futures::future::pending() => panic!(), // other branches are OK here
152      /// }
153      ///
154      /// assert_eq!(output_r.next().await.unwrap(), "42");
155      /// # }
156      /// ```
157      ///
158      /// # Formally
159      ///
160      /// [`prepare_send_from`](SinkPrepareExt::prepare_send_from)
161      /// returns a [`SinkPrepareSendFuture`] which, when awaited:
162      ///
163      ///  * Waits for `OS` to be ready to receive an item.
164      ///  * Runs `message_generator` to obtain a `IM`.
165      ///  * Returns the `IM` (for processing), and a [`SinkSendable`].
166      ///
167      /// The caller should then:
168      ///
169      ///  * Check the error from `prepare_send_from`
170      ///    (which came from the *output* sink).
171      ///  * Process the `IM`, making an `OM` out of it.
172      ///  * Call [`sendable.send()`](SinkSendable::send) (and check its error).
173      ///
174      /// # Flushing
175      ///
176      /// `prepare_send_from` will (when awaited)
177      /// [`flush`](futures::SinkExt::flush) the output sink
178      /// when it finds the input is not ready yet.
179      /// Until then items may be buffered
180      /// (as if they had been written with [`feed`](futures::SinkExt::feed)).
181      ///
182      /// # Errors
183      ///
184      /// ## Output sink errors
185      ///
186      /// The call site can experience output sink errors in two places,
187      /// [`prepare_send_from()`](SinkPrepareExt::prepare_send_from) and [`SinkSendable::send()`].
188      /// The caller should typically handle them the same way regardless of when they occurred.
189      ///
190      /// If the error happens at [`SinkSendable::send()`],
191      /// the call site will usually be forced to discard the item being processed.
192      /// This will only occur if the sink is actually broken.
193      ///
194      /// ## Errors specific to the call site: faillible input, and fallible processing
195      ///
196      /// At some call sites, the input future may yield errors
197      /// (perhaps it is reading from a `Stream` of [`Result`]s).
198      /// in that case the value from the input future will be a [`Result`].
199      /// Then `IM` is a `Result`, and is provided in the `.0` element
200      /// of the "successful" return from `prepare_send_from`.
201      ///
202      /// And, at some call sites, the processing of an `IM` into an `OM` is fallible.
203      ///
204      /// Handling these latter two error caess is up to the caller,
205      /// in the code which processes `IM`.
206      /// The call site will often want to deal with such an error
207      /// without sending anything into the output sink,
208      /// and can then just drop the [`SinkSendable`].
209      ///
210      /// # Implementations
211      ///
212      /// This is an extension trait and you are not expected to need to implement it.
213      ///
214      /// There are provided implementations for `Pin<&mut impl Sink>`
215      /// and `&mut impl Sink + Unpin`, for your convenience.
216      fn prepare_send_from<IF, IM>(
217          self,
218          message_generator: IF,
219      ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
220      where
221          IF: Future<Output = IM>;
222  }
223  
224  impl<'w, OS, OM> SinkPrepareExt<'w, OS, OM> for Pin<&'w mut OS>
225  where
226      OS: Sink<OM>,
227  {
228      fn prepare_send_from<'r, IF, IM>(
229          self,
230          message_generator: IF,
231      ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
232      where
233          IF: Future<Output = IM>,
234      {
235          SinkPrepareSendFuture {
236              output: Some(self),
237              generator: message_generator,
238              tw: PhantomData,
239          }
240      }
241  }
242  
243  impl<'w, OS, OM> SinkPrepareExt<'w, OS, OM> for &'w mut OS
244  where
245      OS: Sink<OM> + Unpin,
246  {
247      fn prepare_send_from<'r, IF, IM>(
248          self,
249          message_generator: IF,
250      ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
251      where
252          IF: Future<Output = IM>,
253      {
254          Pin::new(self).prepare_send_from(message_generator)
255      }
256  }
257  
258  /// Future for `SinkPrepareExt::prepare_send_from`
259  #[pin_project]
260  #[must_use]
261  pub struct SinkPrepareSendFuture<'w, IF, OS, OM> {
262      /// Underlying future that will yield a message.
263      #[pin]
264      generator: IF,
265  
266      /// This Option exists because otherwise SinkPrepareSendFuture::poll()
267      /// can't move `output` out of this struct to put it into the `SinkSendable`.
268      /// (The poll() impl cannot borrow from SinkPrepareSendFuture.)
269      output: Option<Pin<&'w mut OS>>,
270  
271      /// `fn(OM)` gives contravariance in OM.
272      ///
273      /// Variance is confusing.
274      /// Loosely, a SinkPrepareSendFuture<..OM> consumes an OM.
275      /// Actually, we don't really need to add any variance restricions wrt OM,
276      /// because the &mut OS already implies the correct variance,
277      /// so we could have used the PhantomData<fn(*const OM)> trick.
278      /// Happily there is no unsafe anywhere nearby, so it is not possible for us to write
279      /// a bug due to getting the variance wrong - only to erroneously prevent some use
280      /// case.
281      tw: PhantomData<fn(OM)>,
282  }
283  
284  /// A [`Sink`] which is ready to receive an item
285  ///
286  /// Produced by [`SinkPrepareExt::prepare_send_from`].  See there for the overview docs.
287  ///
288  /// This references an output sink `OS`.
289  /// It offers the ability to write into the sink without blocking,
290  /// (and constitutes a proof token that the sink has declared itself ready for that).
291  ///
292  /// The only useful method is [`send`](SinkSendable::send).
293  ///
294  /// `SinkSendable` has no drop glue and can be freely dropped,
295  /// for example if you prepare to send a message and then
296  /// encounter an error when producing the output message.
297  #[must_use]
298  pub struct SinkSendable<'w, OS, OM> {
299      /// Reference to underlying output sink.
300      output: Pin<&'w mut OS>,
301      /// Marker to ensure that `OM` is used.
302      tw: PhantomData<fn(OM)>,
303  }
304  
305  impl<'w, IF, OS, IM, OM> Future for SinkPrepareSendFuture<'w, IF, OS, OM>
306  where
307      IF: Future<Output = IM>,
308      OS: Sink<OM>,
309  {
310      type Output = Result<(IM, SinkSendable<'w, OS, OM>), OS::Error>;
311  
312      fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
313          let mut self_ = self.project();
314  
315          /// returns `&mut Pin<&'w mut OS>` from self_.output
316          //
317          // macro because the closure's type parameters would be unnameable.
318          macro_rules! get_output {
319              ($self_:expr) => {
320                  $self_.output.as_mut().expect(BAD_POLL_MSG).as_mut()
321              };
322          }
323          /// Message to give when panicking because of improper extra poll.
324          const BAD_POLL_MSG: &str =
325              "future from SinkPrepareExt::prepare_send_from (SinkPrepareSendFuture) \
326                   polled after returning Ready(Ok)";
327  
328          let () = match ready!(get_output!(self_).poll_ready(cx)) {
329              Err(e) => {
330                  dprintln!("poll: output poll = IF.Err    SO  IF.Err");
331                  // Deliberately don't fuse by `take`ing output.  If we did that, we would expose
332                  // our caller to an additional panic risk.  There is no harm in polling the output
333                  // sink again: although `Sink` documents that a sink that returns errors will
334                  // probably continue to do so, it is not forbidden to try it and see.  This is in
335                  // any case better than definitely crashing if the `SinkPrepareSendFuture` is
336                  // polled after it gave Ready.
337                  return Poll::Ready(Err(e));
338              }
339              Ok(()) => {
340                  dprintln!("poll: output poll = IF.Ok     calling generator");
341              }
342          };
343  
344          let value = match self_.generator.as_mut().poll(cx) {
345              Poll::Pending => {
346                  // We defer flushing the output until the input stops yielding.
347                  // This allows our caller (which is typically a loop) to transfer multiple
348                  // items from their input to their output between flushes.
349                  //
350                  // But we must not return `Pending` without flushing, or the caller could block
351                  // without flushing output, leading to untimely delivery of buffered data.
352                  dprintln!("poll: generator = Pending     calling output flush");
353                  let flushed = get_output!(self_).poll_flush(cx);
354                  return match flushed {
355                      Poll::Ready(Err(e)) => {
356                          dprintln!("poll: output flush = IF.Err   SO  IF.Err");
357                          Poll::Ready(Err(e))
358                      }
359                      Poll::Ready(Ok(())) => {
360                          dprintln!("poll: output flush = IF.Ok    SO  Pending");
361                          Poll::Pending
362                      }
363                      Poll::Pending => {
364                          dprintln!("poll: output flush = Pending  SO  Pending");
365                          Poll::Pending
366                      }
367                  };
368              }
369              Poll::Ready(v) => {
370                  dprintln!("poll: generator = Ready       SO  IF.Ok");
371                  v
372              }
373          };
374  
375          let sendable = SinkSendable {
376              output: self_.output.take().expect(BAD_POLL_MSG),
377              tw: PhantomData,
378          };
379  
380          Poll::Ready(Ok((value, sendable)))
381      }
382  }
383  
384  impl<'w, IF, OS, IM, OM> FusedFuture for SinkPrepareSendFuture<'w, IF, OS, OM>
385  where
386      IF: Future<Output = IM>,
387      OS: Sink<OM>,
388  {
389      fn is_terminated(&self) -> bool {
390          let r = self.output.is_none();
391          dprintln!("is_terminated = {}", r);
392          r
393      }
394  }
395  
396  impl<'w, OS, OM> SinkSendable<'w, OS, OM>
397  where
398      OS: Sink<OM>,
399  {
400      /// Synchronously send an item into `OS`, which is a [`Sink`]
401      ///
402      /// Can fail if the sink `OS` reports an error.
403      ///
404      /// (However, the existence of the `SinkSendable` demonstrates that
405      /// the sink reported itself ready for sending,
406      /// so this call is synchronous, avoiding cancellation hazards.)
407      pub fn send(self, item: OM) -> Result<(), OS::Error> {
408          dprintln!("send ...");
409          let r = self.output.start_send(item);
410          dprintln!("send: {:?}", r.as_ref().map_err(|_| (())));
411          r
412      }
413  }
414  
415  #[cfg(test)]
416  mod test {
417      // @@ begin test lint list maintained by maint/add_warning @@
418      #![allow(clippy::bool_assert_comparison)]
419      #![allow(clippy::clone_on_copy)]
420      #![allow(clippy::dbg_macro)]
421      #![allow(clippy::mixed_attributes_style)]
422      #![allow(clippy::print_stderr)]
423      #![allow(clippy::print_stdout)]
424      #![allow(clippy::single_char_pattern)]
425      #![allow(clippy::unwrap_used)]
426      #![allow(clippy::unchecked_duration_subtraction)]
427      #![allow(clippy::useless_vec)]
428      #![allow(clippy::needless_pass_by_value)]
429      //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
430  
431      use super::*;
432      use futures::channel::mpsc;
433      use futures::future::poll_fn;
434      use futures::select_biased;
435      use futures::SinkExt as _;
436      use futures_await_test::async_test;
437      use std::convert::Infallible;
438      use std::sync::Arc;
439      use std::sync::Mutex;
440  
441      #[async_test]
442      async fn prepare_send() {
443          // Early versions of this used unfold quite a lot more, but it is not really
444          // convenient for testing.  It buffers one item internally, and is also buggy:
445          //   https://github.com/rust-lang/futures-rs/issues/2600
446          // So we use mpsc channels, which (perhaps with buffering) are quite controllable.
447  
448          // The eprintln!("FOR ...") calls correspond go the dprintln1() calls in the impl,
449          // and can check that each code path in the implementation is used,
450          // by turning on the dbug and using `--nocapture`.
451          {
452              eprintln!("-- disconnected ---");
453              eprintln!("FOR poll: output poll = IF.Err    SO  IF.Err");
454              let (mut w, r) = mpsc::unbounded::<usize>();
455              drop(r);
456              let ret = w.prepare_send_from(async { Ok::<_, Infallible>(12) }).await;
457              assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
458          }
459  
460          {
461              eprintln!("-- buffered late disconnect --");
462              eprintln!("FOR poll: output poll = IF.Ok     calling generator");
463              eprintln!("FOR poll: output flush = IF.Err   SO  IF.Err");
464              let (w, r) = mpsc::unbounded::<usize>();
465              let mut w = w.buffer(10);
466              let mut r = Some(r);
467              w.feed(66).await.unwrap();
468              let ret = w
469                  .prepare_send_from(poll_fn(move |_cx| {
470                      drop(r.take());
471                      Poll::Pending::<usize>
472                  }))
473                  .await;
474              assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
475          }
476  
477          {
478              eprintln!("-- flushing before wait --");
479              eprintln!("FOR poll: output flush = IF.Ok    SO  Pending");
480              let (mut w, _r) = mpsc::unbounded::<usize>();
481              let () = select_biased! {
482                  _ = w.prepare_send_from(poll_fn(
483                      move |_cx| {
484                          Poll::Pending::<usize>
485                      }
486                  )) => panic!(),
487                  _ = futures::future::ready(()) => { },
488              };
489          }
490  
491          {
492              eprintln!("-- flush before wait is pending --");
493              eprintln!("FOR poll: output flush = Pending  SO  Pending");
494              let (mut w, _r) = mpsc::channel::<usize>(0);
495              let () = w.feed(77).await.unwrap();
496              let mut w = w.buffer(10);
497              let () = select_biased! {
498                  _ = w.prepare_send_from(poll_fn(
499                      move |_cx| {
500                          Poll::Pending::<usize>
501                      }
502                  )) => panic!(),
503                  _ = futures::future::ready(()) => { },
504              };
505          }
506  
507          {
508              eprintln!("-- flush before wait is pending --");
509              eprintln!("FOR poll: generator = Ready       SO  IF.Ok");
510              eprintln!("FOR send ...");
511              eprintln!("ALSO check that bufferinrg works as expected");
512  
513              let sunk = Arc::new(Mutex::new(vec![]));
514              let unfold = futures::sink::unfold((), |(), v| {
515                  let sunk = sunk.clone();
516                  async move {
517                      dbg!();
518                      sunk.lock().unwrap().push(v);
519                      Ok::<_, Infallible>(())
520                  }
521              });
522              let mut unfold = Box::pin(unfold.buffer(10));
523              for v in [42, 43] {
524                  // We can only do two here because that's how many we can actually buffer in Buffer
525                  // and Unfold.  Because our closure is always ready, the buffering isn't actually
526                  // as copious as all that.  This is fine, because the point of this test is to test
527                  // *flushing*.
528                  dbg!(v);
529                  let ret = unfold
530                      .prepare_send_from(async move { Ok::<_, Infallible>(v) })
531                      .await;
532                  let (msg, sendable) = ret.unwrap();
533                  let msg = msg.unwrap();
534                  assert_eq!(msg, v);
535                  let () = sendable.send(msg).unwrap();
536                  assert_eq!(*sunk.lock().unwrap(), &[]); // It's still buffered
537              }
538              select_biased! {
539                  _ = unfold.prepare_send_from(futures::future::pending::<()>()) => panic!(),
540                  _ = futures::future::ready(()) => { },
541              };
542              assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
543          }
544      }
545  }