/ crates / arroyo-operator / src / inq_reader.rs
inq_reader.rs
 1  use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
 2  use futures::{ready, Stream, StreamExt};
 3  use std::fmt;
 4  use std::fmt::Debug;
 5  use std::pin::Pin;
 6  use std::task::{Context, Poll};
 7  
 8  pub struct InQReader<St> {
 9      inner: FuturesUnordered<StreamFuture<St>>,
10  }
11  
12  impl<St: Debug> Debug for InQReader<St> {
13      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
14          write!(f, "InQReader {{ ... }}")
15      }
16  }
17  
18  impl<St: Stream + Unpin> InQReader<St> {
19      /// Constructs a new, empty `SelectAll`
20      ///
21      /// The returned `SelectAll` does not contain any streams and, in this
22      /// state, `SelectAll::poll` will return `Poll::Ready(None)`.
23      pub fn new() -> Self {
24          Self {
25              inner: FuturesUnordered::new(),
26          }
27      }
28  
29      /// Push a stream into the set.
30      ///
31      /// This function submits the given stream to the set for managing. This
32      /// function will not call `poll` on the submitted stream. The caller must
33      /// ensure that `SelectAll::poll` is called in order to receive task
34      /// notifications.
35      pub fn push(&mut self, stream: St) {
36          self.inner.push(stream.into_future());
37      }
38  }
39  
40  impl<St: Stream + Unpin> Default for InQReader<St> {
41      fn default() -> Self {
42          Self::new()
43      }
44  }
45  
46  impl<St: Stream + Unpin> Stream for InQReader<St> {
47      type Item = (St::Item, St);
48  
49      fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50          loop {
51              match ready!(self.inner.poll_next_unpin(cx)) {
52                  Some((Some(item), remaining)) => {
53                      return Poll::Ready(Some((item, remaining)));
54                  }
55                  Some((None, _)) => {
56                      // `FuturesUnordered` thinks it isn't terminated
57                      // because it yielded a Some.
58                      // We do not return, but poll `FuturesUnordered`
59                      // in the next loop iteration.
60                  }
61                  None => return Poll::Ready(None),
62              }
63          }
64      }
65  }
66  
67  impl<St: Stream + Unpin> FusedStream for InQReader<St> {
68      fn is_terminated(&self) -> bool {
69          self.inner.is_terminated()
70      }
71  }