peekable_stream.rs
1 //! Provides utilities for peeking at items in [`futures::Stream`]. 2 //! 3 //! # Stability of peeked values 4 //! 5 //! Implementors of this trait guarantee that a peeked `Poll::Ready` result is 6 //! required to remain at the head of the stream until 7 //! [`futures::Stream::poll_next`] or another method requiring a `&mut` 8 //! reference (and documented to potentially change the head of the stream) is 9 //! called. e.g. a caller holding a `Pin<&mut Self>` that observes a Ready value 10 //! via [`PeekableStream::poll_peek`] is guaranteed to observe that same value 11 //! again on a subsequent call to [`PeekableStream::poll_peek`], 12 //! [`futures::Stream::poll_next`], etc. 13 //! 14 //! This property must not be relied up on to prove *soundness*, but can be 15 //! relied upon to prove correctness. 16 17 use std::pin::Pin; 18 use std::task::{Context, Poll}; 19 20 /// A stream that provides the ability to peek at the next available item. 21 /// 22 /// This provides an alternative to interfaces and data structure that would 23 /// otherwise want a [`futures::stream::Peekable<S>`], which can potentially 24 /// avoid multiple layers of buffering where one would do. 25 /// 26 /// # Tasks, waking, and calling context 27 /// 28 /// These methods should be called by the task that is reading from the stream. 29 /// If they are called by another task, the reading task would miss 30 /// notifications. 31 pub trait PeekableStream: futures::Stream { 32 /// Poll for an item to be ready, and then inspect it. 33 /// 34 /// Equivalent to [`futures::stream::Peekable::poll_peek`]. 35 /// 36 /// Guarantees that a returned `Ready` result is stable (See "Stability ..." in 37 /// [`crate::peekable_stream`]). 38 /// 39 /// Should be called only by the task that is reading the stream (see 40 /// "Tasks ..." in [`PeekableStream`]). 41 fn poll_peek( 42 self: Pin<&mut Self>, 43 cx: &mut Context<'_>, 44 ) -> Poll<Option<&<Self as futures::Stream>::Item>> { 45 self.poll_peek_mut(cx).map(|x| x.map(|x| &*x)) 46 } 47 48 /// Poll for an item to be ready, and then inspect it. 49 /// 50 /// Equivalent to [`futures::stream::Peekable::poll_peek_mut`]. 51 /// 52 /// Guarantees that a returned `Ready` result is stable (See "Stability" in 53 /// [`crate::peekable_stream`]). 54 /// 55 /// Should be called only by the task that is reading the stream (see 56 /// "Tasks ..." in [`PeekableStream`]). 57 fn poll_peek_mut( 58 self: Pin<&mut Self>, 59 cx: &mut Context<'_>, 60 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>>; 61 } 62 63 impl<S> PeekableStream for futures::stream::Peekable<S> 64 where 65 S: futures::Stream, 66 { 67 fn poll_peek( 68 self: Pin<&mut Self>, 69 cx: &mut Context<'_>, 70 ) -> Poll<Option<&<Self as futures::Stream>::Item>> { 71 self.poll_peek(cx) 72 } 73 74 fn poll_peek_mut( 75 self: Pin<&mut Self>, 76 cx: &mut Context<'_>, 77 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> { 78 self.poll_peek_mut(cx) 79 } 80 } 81 82 /// A stream that supports peeking without perturbing any registered waker. 83 /// 84 /// # Tasks, waking, and calling context 85 /// 86 /// These functions do not register the current task to be woken when an item 87 /// becomes available on the stream, and ensure that the most recent task that 88 /// was already registered remains so (or is woken if there was an item ready). 89 /// 90 /// Therefore, avoiding calling (only) these functions from the task that is 91 /// reading from the stream, since they will not cause the current task to be 92 /// woken when an item arrives. 93 /// 94 /// Conversely, you *may* call these function in *other* tasks, without 95 /// disturbing the task which is waiting for input. 96 pub trait UnobtrusivePeekableStream: futures::Stream { 97 /// Peek at the next available value, while not losing a previously 98 /// registered waker. 99 /// 100 /// Guarantees that a returned `Some` result is stable (See "Stability" in 101 /// [`crate::peekable_stream`]). 102 /// 103 /// Does not register the current task to be notified when an item becomes 104 /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]). 105 /// 106 /// The caller of `unobtrusive_peek` can't distinguish between a pending and terminated stream. 107 // To address this we could return value in a `Poll` but normally returning `Poll::Pending` 108 // implies a promise of future wakeup, which is precisely contrary to this function's purpose. 109 // We could address that with imprecations in the docs but people don't always read docs. 110 // We could invent a new type, but that seems quite heavyweight. 111 // We'll cross this bridge when we have a requirement for this feature. 112 fn unobtrusive_peek(self: Pin<&mut Self>) -> Option<&<Self as futures::Stream>::Item> { 113 self.unobtrusive_peek_mut().map(|x| &*x) 114 } 115 116 /// Peek at the next available value, while not losing a previously 117 /// registered waker. 118 /// 119 /// Guarantees that a returned `Some` result is stable (See "Stability" in 120 /// [`crate::peekable_stream`]). 121 /// 122 /// Does not register the current task to be notified when an item becomes 123 /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]). 124 /// 125 /// The caller of `unobtrusive_peek_mut` can't distinguish between a pending and terminated stream. 126 // (See comment on `unobtrusive_peek` about options if we need a caller to be able to do that.) 127 fn unobtrusive_peek_mut(self: Pin<&mut Self>) -> Option<&mut <Self as futures::Stream>::Item>; 128 }