/ crates / tor-async-utils / src / peekable_stream.rs
peekable_stream.rs
  1  //! Provides utilities for peeking at items in [`futures::Stream`].
  2  //!
  3  //! # Stability of peeked values
  4  //!
  5  //! Implementors of this trait guarantee that a peeked `Poll::Ready` result is
  6  //! required to remain at the head of the stream until
  7  //! [`futures::Stream::poll_next`] or another method requiring a `&mut`
  8  //! reference (and documented to potentially change the head of the stream) is
  9  //! called. e.g. a caller holding a `Pin<&mut Self>` that observes a Ready value
 10  //! via [`PeekableStream::poll_peek`] is guaranteed to observe that same value
 11  //! again on a subsequent call to [`PeekableStream::poll_peek`],
 12  //! [`futures::Stream::poll_next`], etc.
 13  //!
 14  //! This property must not be relied up on to prove *soundness*, but can be
 15  //! relied upon to prove correctness.
 16  
 17  use std::pin::Pin;
 18  use std::task::{Context, Poll};
 19  
 20  /// A stream that provides the ability to peek at the next available item.
 21  ///
 22  /// This provides an alternative to interfaces and data structure that would
 23  /// otherwise want a [`futures::stream::Peekable<S>`], which can potentially
 24  /// avoid multiple layers of buffering where one would do.
 25  ///
 26  /// # Tasks, waking, and calling context
 27  ///
 28  /// These methods should be called by the task that is reading from the stream.
 29  /// If they are called by another task, the reading task would miss
 30  /// notifications.
 31  pub trait PeekableStream: futures::Stream {
 32      /// Poll for an item to be ready, and then inspect it.
 33      ///
 34      /// Equivalent to [`futures::stream::Peekable::poll_peek`].
 35      ///
 36      /// Guarantees that a returned `Ready` result is stable (See "Stability ..." in
 37      /// [`crate::peekable_stream`]).
 38      ///
 39      /// Should be called only by the task that is reading the stream (see
 40      /// "Tasks ..." in [`PeekableStream`]).
 41      fn poll_peek(
 42          self: Pin<&mut Self>,
 43          cx: &mut Context<'_>,
 44      ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
 45          self.poll_peek_mut(cx).map(|x| x.map(|x| &*x))
 46      }
 47  
 48      /// Poll for an item to be ready, and then inspect it.
 49      ///
 50      /// Equivalent to [`futures::stream::Peekable::poll_peek_mut`].
 51      ///
 52      /// Guarantees that a returned `Ready` result is stable (See "Stability" in
 53      /// [`crate::peekable_stream`]).
 54      ///
 55      /// Should be called only by the task that is reading the stream (see
 56      /// "Tasks ..." in [`PeekableStream`]).
 57      fn poll_peek_mut(
 58          self: Pin<&mut Self>,
 59          cx: &mut Context<'_>,
 60      ) -> Poll<Option<&mut <Self as futures::Stream>::Item>>;
 61  }
 62  
 63  impl<S> PeekableStream for futures::stream::Peekable<S>
 64  where
 65      S: futures::Stream,
 66  {
 67      fn poll_peek(
 68          self: Pin<&mut Self>,
 69          cx: &mut Context<'_>,
 70      ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
 71          self.poll_peek(cx)
 72      }
 73  
 74      fn poll_peek_mut(
 75          self: Pin<&mut Self>,
 76          cx: &mut Context<'_>,
 77      ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
 78          self.poll_peek_mut(cx)
 79      }
 80  }
 81  
 82  /// A stream that supports peeking without perturbing any registered waker.
 83  ///
 84  /// # Tasks, waking, and calling context
 85  ///
 86  /// These functions do not register the current task to be woken when an item
 87  /// becomes available on the stream, and ensure that the most recent task that
 88  /// was already registered remains so (or is woken if there was an item ready).
 89  ///
 90  /// Therefore, avoiding calling (only) these functions from the task that is
 91  /// reading from the stream, since they will not cause the current task to be
 92  /// woken when an item arrives.
 93  ///
 94  /// Conversely, you *may* call these function in *other* tasks, without
 95  /// disturbing the task which is waiting for input.
 96  pub trait UnobtrusivePeekableStream: futures::Stream {
 97      /// Peek at the next available value, while not losing a previously
 98      /// registered waker.
 99      ///
100      /// Guarantees that a returned `Some` result is stable (See "Stability" in
101      /// [`crate::peekable_stream`]).
102      ///
103      /// Does not register the current task to be notified when an item becomes
104      /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
105      ///
106      /// The caller of `unobtrusive_peek` can't distinguish between a pending and terminated stream.
107      // To address this we could return value in a `Poll` but normally returning `Poll::Pending`
108      // implies a promise of future wakeup, which is precisely contrary to this function's purpose.
109      // We could address that with imprecations in the docs but people don't always read docs.
110      // We could invent a new type, but that seems quite heavyweight.
111      // We'll cross this bridge when we have a requirement for this feature.
112      fn unobtrusive_peek(self: Pin<&mut Self>) -> Option<&<Self as futures::Stream>::Item> {
113          self.unobtrusive_peek_mut().map(|x| &*x)
114      }
115  
116      /// Peek at the next available value, while not losing a previously
117      /// registered waker.
118      ///
119      /// Guarantees that a returned `Some` result is stable (See "Stability" in
120      /// [`crate::peekable_stream`]).
121      ///
122      /// Does not register the current task to be notified when an item becomes
123      /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
124      ///
125      /// The caller of `unobtrusive_peek_mut` can't distinguish between a pending and terminated stream.
126      // (See comment on `unobtrusive_peek` about options if we need a caller to be able to do that.)
127      fn unobtrusive_peek_mut(self: Pin<&mut Self>) -> Option<&mut <Self as futures::Stream>::Item>;
128  }