stream.rs
1 use std::{fmt::Debug, marker::PhantomData, ops::Range}; 2 3 /// Unique id for an input stream. 4 #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] 5 pub struct StreamId(u64); 6 7 /// Stream message sequence number. 8 #[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Hash, Eq)] 9 pub struct Sequence(u64); 10 11 /// A range of sequence numbers. The range is non-inclusive. 12 #[derive(Debug, Clone, PartialEq, Hash, Eq)] 13 pub struct SequenceRange(Range<u64>); 14 15 impl StreamId { 16 /// Create a `StreamId` from a `u64`. 17 pub fn from_u64(id: u64) -> StreamId { 18 StreamId(id) 19 } 20 21 /// Returns the stream id as `u64`. 22 pub fn as_u64(&self) -> u64 { 23 self.0 24 } 25 26 /// Returns the stream id as bytes. 27 pub fn to_bytes(&self) -> [u8; 8] { 28 self.0.to_be_bytes() 29 } 30 } 31 32 impl Sequence { 33 /// Create a `Sequence` from a `u64`. 34 pub fn from_u64(n: u64) -> Sequence { 35 Sequence(n) 36 } 37 38 /// Returns the sequence number as `u64`. 39 pub fn as_u64(&self) -> u64 { 40 self.0 41 } 42 43 /// Returns true if the sequence number is 0. 44 pub fn is_zero(&self) -> bool { 45 self.0 == 0 46 } 47 48 /// Returns the sequence number immediately after. 49 pub fn successor(&self) -> Sequence { 50 Sequence(self.0 + 1) 51 } 52 53 /// Returns the sequence number immediately before. 54 /// 55 /// Notice that this will panic if called on `Sequence(0)`. 56 pub fn predecessor(&self) -> Sequence { 57 Sequence(self.0 - 1) 58 } 59 } 60 61 impl SequenceRange { 62 /// Creates a new sequence range. 63 pub fn new_from_u64(start_index: u64, end_index: u64) -> SequenceRange { 64 SequenceRange(start_index..end_index) 65 } 66 67 /// Creates a new sequence range. 68 pub fn new(start_index: &Sequence, end_index: &Sequence) -> SequenceRange { 69 Self::new_from_u64(start_index.as_u64(), end_index.as_u64()) 70 } 71 72 /// Returns the lower bound of the range, inclusive. 73 pub fn start(&self) -> Sequence { 74 Sequence::from_u64(self.0.start) 75 } 76 77 /// Returns the upper bound of the range, exclusive. 78 pub fn end(&self) -> Sequence { 79 Sequence::from_u64(self.0.end) 80 } 81 82 /// Returns true if the range contains no items. 83 pub fn is_empty(&self) -> bool { 84 self.0.is_empty() 85 } 86 } 87 88 impl Iterator for SequenceRange { 89 type Item = Sequence; 90 91 fn size_hint(&self) -> (usize, Option<usize>) { 92 self.0.size_hint() 93 } 94 95 fn next(&mut self) -> Option<Self::Item> { 96 self.0.next().map(Sequence::from_u64) 97 } 98 } 99 100 pub trait MessageData: prost::Message + Default + Clone {} 101 102 impl<T> MessageData for T where T: prost::Message + Default + Clone {} 103 104 /// A [MessageData] that is never decoded. 105 /// 106 /// Use this in place of a [Vec] of bytes to not lose type safety. 107 #[derive(Debug, Clone)] 108 pub struct RawMessageData<M: MessageData> { 109 data: Vec<u8>, 110 _phantom: PhantomData<M>, 111 } 112 113 /// Message sent over the stream. 114 #[derive(Debug, Clone)] 115 pub enum StreamMessage<D: MessageData> { 116 Invalidate { 117 sequence: Sequence, 118 }, 119 Data { 120 sequence: Sequence, 121 data: RawMessageData<D>, 122 }, 123 Pending { 124 sequence: Sequence, 125 data: RawMessageData<D>, 126 }, 127 } 128 129 impl<D> StreamMessage<D> 130 where 131 D: MessageData, 132 { 133 /// Creates a new `Invalidate` message. 134 pub fn new_invalidate(sequence: Sequence) -> Self { 135 Self::Invalidate { sequence } 136 } 137 138 /// Creates a new `Data` message. 139 pub fn new_data(sequence: Sequence, data: RawMessageData<D>) -> Self { 140 Self::Data { sequence, data } 141 } 142 143 /// Creates a new `Pending` message. 144 pub fn new_pending(sequence: Sequence, data: RawMessageData<D>) -> Self { 145 Self::Pending { sequence, data } 146 } 147 148 /// Returns the sequence number associated with the message. 149 pub fn sequence(&self) -> &Sequence { 150 match self { 151 Self::Invalidate { sequence } => sequence, 152 Self::Data { sequence, .. } => sequence, 153 Self::Pending { sequence, .. } => sequence, 154 } 155 } 156 157 /// Returns true if it's a data message. 158 pub fn is_data(&self) -> bool { 159 matches!(self, Self::Data { .. }) 160 } 161 162 /// Returns true if it's an invalidate message. 163 pub fn is_invalidate(&self) -> bool { 164 matches!(self, Self::Invalidate { .. }) 165 } 166 167 /// Returns true if it's a pending message. 168 pub fn is_pending(&self) -> bool { 169 matches!(self, Self::Pending { .. }) 170 } 171 } 172 173 impl<D> RawMessageData<D> 174 where 175 D: MessageData, 176 { 177 /// Creates a new [RawMessageData] from [Vec]. 178 pub fn from_vec(data: Vec<u8>) -> Self { 179 RawMessageData { 180 data, 181 _phantom: PhantomData, 182 } 183 } 184 185 /// Returns the bytes content of the message. 186 pub fn as_bytes(&self) -> &[u8] { 187 self.data.as_ref() 188 } 189 190 /// Decodes the raw message to a [prost::Message]. 191 pub fn to_proto(&self) -> Result<D, prost::DecodeError> { 192 D::decode(self.data.as_ref()) 193 } 194 }