inq_reader.rs
1 use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; 2 use futures::{ready, Stream, StreamExt}; 3 use std::fmt; 4 use std::fmt::Debug; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 pub struct InQReader<St> { 9 inner: FuturesUnordered<StreamFuture<St>>, 10 } 11 12 impl<St: Debug> Debug for InQReader<St> { 13 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 14 write!(f, "InQReader {{ ... }}") 15 } 16 } 17 18 impl<St: Stream + Unpin> InQReader<St> { 19 /// Constructs a new, empty `SelectAll` 20 /// 21 /// The returned `SelectAll` does not contain any streams and, in this 22 /// state, `SelectAll::poll` will return `Poll::Ready(None)`. 23 pub fn new() -> Self { 24 Self { 25 inner: FuturesUnordered::new(), 26 } 27 } 28 29 /// Push a stream into the set. 30 /// 31 /// This function submits the given stream to the set for managing. This 32 /// function will not call `poll` on the submitted stream. The caller must 33 /// ensure that `SelectAll::poll` is called in order to receive task 34 /// notifications. 35 pub fn push(&mut self, stream: St) { 36 self.inner.push(stream.into_future()); 37 } 38 } 39 40 impl<St: Stream + Unpin> Default for InQReader<St> { 41 fn default() -> Self { 42 Self::new() 43 } 44 } 45 46 impl<St: Stream + Unpin> Stream for InQReader<St> { 47 type Item = (St::Item, St); 48 49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 50 loop { 51 match ready!(self.inner.poll_next_unpin(cx)) { 52 Some((Some(item), remaining)) => { 53 return Poll::Ready(Some((item, remaining))); 54 } 55 Some((None, _)) => { 56 // `FuturesUnordered` thinks it isn't terminated 57 // because it yielded a Some. 58 // We do not return, but poll `FuturesUnordered` 59 // in the next loop iteration. 60 } 61 None => return Poll::Ready(None), 62 } 63 } 64 } 65 } 66 67 impl<St: Stream + Unpin> FusedStream for InQReader<St> { 68 fn is_terminated(&self) -> bool { 69 self.inner.is_terminated() 70 } 71 }