/ core / src / stream.rs
stream.rs
  1  use std::{fmt::Debug, marker::PhantomData, ops::Range};
  2  
  3  /// Unique id for an input stream.
  4  #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
  5  pub struct StreamId(u64);
  6  
  7  /// Stream message sequence number.
  8  #[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Hash, Eq)]
  9  pub struct Sequence(u64);
 10  
 11  /// A range of sequence numbers. The range is non-inclusive.
 12  #[derive(Debug, Clone, PartialEq, Hash, Eq)]
 13  pub struct SequenceRange(Range<u64>);
 14  
 15  impl StreamId {
 16      /// Create a `StreamId` from a `u64`.
 17      pub fn from_u64(id: u64) -> StreamId {
 18          StreamId(id)
 19      }
 20  
 21      /// Returns the stream id as `u64`.
 22      pub fn as_u64(&self) -> u64 {
 23          self.0
 24      }
 25  
 26      /// Returns the stream id as bytes.
 27      pub fn to_bytes(&self) -> [u8; 8] {
 28          self.0.to_be_bytes()
 29      }
 30  }
 31  
 32  impl Sequence {
 33      /// Create a `Sequence` from a `u64`.
 34      pub fn from_u64(n: u64) -> Sequence {
 35          Sequence(n)
 36      }
 37  
 38      /// Returns the sequence number as `u64`.
 39      pub fn as_u64(&self) -> u64 {
 40          self.0
 41      }
 42  
 43      /// Returns true if the sequence number is 0.
 44      pub fn is_zero(&self) -> bool {
 45          self.0 == 0
 46      }
 47  
 48      /// Returns the sequence number immediately after.
 49      pub fn successor(&self) -> Sequence {
 50          Sequence(self.0 + 1)
 51      }
 52  
 53      /// Returns the sequence number immediately before.
 54      ///
 55      /// Notice that this will panic if called on `Sequence(0)`.
 56      pub fn predecessor(&self) -> Sequence {
 57          Sequence(self.0 - 1)
 58      }
 59  }
 60  
 61  impl SequenceRange {
 62      /// Creates a new sequence range.
 63      pub fn new_from_u64(start_index: u64, end_index: u64) -> SequenceRange {
 64          SequenceRange(start_index..end_index)
 65      }
 66  
 67      /// Creates a new sequence range.
 68      pub fn new(start_index: &Sequence, end_index: &Sequence) -> SequenceRange {
 69          Self::new_from_u64(start_index.as_u64(), end_index.as_u64())
 70      }
 71  
 72      /// Returns the lower bound of the range, inclusive.
 73      pub fn start(&self) -> Sequence {
 74          Sequence::from_u64(self.0.start)
 75      }
 76  
 77      /// Returns the upper bound of the range, exclusive.
 78      pub fn end(&self) -> Sequence {
 79          Sequence::from_u64(self.0.end)
 80      }
 81  
 82      /// Returns true if the range contains no items.
 83      pub fn is_empty(&self) -> bool {
 84          self.0.is_empty()
 85      }
 86  }
 87  
 88  impl Iterator for SequenceRange {
 89      type Item = Sequence;
 90  
 91      fn size_hint(&self) -> (usize, Option<usize>) {
 92          self.0.size_hint()
 93      }
 94  
 95      fn next(&mut self) -> Option<Self::Item> {
 96          self.0.next().map(Sequence::from_u64)
 97      }
 98  }
 99  
100  pub trait MessageData: prost::Message + Default + Clone {}
101  
102  impl<T> MessageData for T where T: prost::Message + Default + Clone {}
103  
104  /// A [MessageData] that is never decoded.
105  ///
106  /// Use this in place of a [Vec] of bytes to not lose type safety.
107  #[derive(Debug, Clone)]
108  pub struct RawMessageData<M: MessageData> {
109      data: Vec<u8>,
110      _phantom: PhantomData<M>,
111  }
112  
113  /// Message sent over the stream.
114  #[derive(Debug, Clone)]
115  pub enum StreamMessage<D: MessageData> {
116      Invalidate {
117          sequence: Sequence,
118      },
119      Data {
120          sequence: Sequence,
121          data: RawMessageData<D>,
122      },
123      Pending {
124          sequence: Sequence,
125          data: RawMessageData<D>,
126      },
127  }
128  
129  impl<D> StreamMessage<D>
130  where
131      D: MessageData,
132  {
133      /// Creates a new `Invalidate` message.
134      pub fn new_invalidate(sequence: Sequence) -> Self {
135          Self::Invalidate { sequence }
136      }
137  
138      /// Creates a new `Data` message.
139      pub fn new_data(sequence: Sequence, data: RawMessageData<D>) -> Self {
140          Self::Data { sequence, data }
141      }
142  
143      /// Creates a new `Pending` message.
144      pub fn new_pending(sequence: Sequence, data: RawMessageData<D>) -> Self {
145          Self::Pending { sequence, data }
146      }
147  
148      /// Returns the sequence number associated with the message.
149      pub fn sequence(&self) -> &Sequence {
150          match self {
151              Self::Invalidate { sequence } => sequence,
152              Self::Data { sequence, .. } => sequence,
153              Self::Pending { sequence, .. } => sequence,
154          }
155      }
156  
157      /// Returns true if it's a data message.
158      pub fn is_data(&self) -> bool {
159          matches!(self, Self::Data { .. })
160      }
161  
162      /// Returns true if it's an invalidate message.
163      pub fn is_invalidate(&self) -> bool {
164          matches!(self, Self::Invalidate { .. })
165      }
166  
167      /// Returns true if it's a pending message.
168      pub fn is_pending(&self) -> bool {
169          matches!(self, Self::Pending { .. })
170      }
171  }
172  
173  impl<D> RawMessageData<D>
174  where
175      D: MessageData,
176  {
177      /// Creates a new [RawMessageData] from [Vec].
178      pub fn from_vec(data: Vec<u8>) -> Self {
179          RawMessageData {
180              data,
181              _phantom: PhantomData,
182          }
183      }
184  
185      /// Returns the bytes content of the message.
186      pub fn as_bytes(&self) -> &[u8] {
187          self.data.as_ref()
188      }
189  
190      /// Decodes the raw message to a [prost::Message].
191      pub fn to_proto(&self) -> Result<D, prost::DecodeError> {
192          D::decode(self.data.as_ref())
193      }
194  }