/ node / src / db / sequencer.rs
sequencer.rs
  1  //! Sequencer-related tables.
  2  
  3  use apibara_core::stream::{Sequence, StreamId};
  4  use prost::Message;
  5  
  6  use super::Table;
  7  
  8  /// Table with the state of each input sequence, together with the respective
  9  /// output range.
 10  #[derive(Debug, Clone, Copy, Default)]
 11  pub struct SequencerStateTable;
 12  
 13  /// Store the output's start and end sequence for a given stream id and input
 14  /// sequence.
 15  ///
 16  /// Since the output sequence is strictly increasing, we can use the input
 17  /// sequence to order the state and that will also keep the output's sequence
 18  /// ordered.
 19  ///
 20  /// Mark fields as optional to enforce serializing the `0` value.
 21  #[derive(Clone, PartialEq, Message)]
 22  pub struct SequencerState {
 23      #[prost(fixed64, optional, tag = "1")]
 24      pub output_sequence_start: Option<u64>,
 25      #[prost(fixed64, optional, tag = "2")]
 26      pub output_sequence_end: Option<u64>,
 27  }
 28  
 29  impl Table for SequencerStateTable {
 30      type Key = (StreamId, Sequence);
 31      type Value = SequencerState;
 32  
 33      fn db_name() -> &'static str {
 34          "SequencerState"
 35      }
 36  }
 37  
 38  /// Table with the state of each input stream.
 39  #[derive(Debug, Clone, Copy, Default)]
 40  pub struct StreamStateTable;
 41  
 42  /// Contains the most recent sequence number of each input stream.
 43  #[derive(Clone, PartialEq, Message)]
 44  pub struct StreamState {
 45      #[prost(fixed64, optional, tag = "1")]
 46      pub sequence: Option<u64>,
 47  }
 48  
 49  impl Table for StreamStateTable {
 50      type Key = StreamId;
 51      type Value = StreamState;
 52  
 53      fn db_name() -> &'static str {
 54          "StreamState"
 55      }
 56  }
 57  
 58  #[cfg(test)]
 59  mod tests {
 60      use apibara_core::stream::{Sequence, StreamId};
 61      use libmdbx::{Environment, NoWriteMap};
 62      use tempfile::tempdir;
 63  
 64      use crate::db::{
 65          sequencer::StreamStateTable, MdbxEnvironmentExt, MdbxRWTransactionExt, MdbxTransactionExt,
 66      };
 67  
 68      use super::{SequencerState, SequencerStateTable};
 69  
 70      #[test]
 71      fn test_state_order() {
 72          let path = tempdir().unwrap();
 73          let db = Environment::<NoWriteMap>::open(path.path()).unwrap();
 74          let stream_id = StreamId::from_u64(1);
 75          let value_low = SequencerState {
 76              output_sequence_start: Some(0),
 77              output_sequence_end: Some(1),
 78          };
 79          let value_mid = SequencerState {
 80              output_sequence_start: Some(2),
 81              output_sequence_end: Some(2),
 82          };
 83          let value_high = SequencerState {
 84              output_sequence_start: Some(3),
 85              output_sequence_end: Some(4),
 86          };
 87  
 88          let txn = db.begin_rw_txn().unwrap();
 89          txn.ensure_table::<SequencerStateTable>(None).unwrap();
 90          txn.ensure_table::<StreamStateTable>(None).unwrap();
 91          let table = txn.open_table::<SequencerStateTable>().unwrap();
 92          let mut cursor = table.cursor().unwrap();
 93          cursor.first().unwrap();
 94          // insert three values in the wrong order, then check if they're stored in order.
 95          cursor
 96              .put(&(stream_id, Sequence::from_u64(1)), &value_mid)
 97              .unwrap();
 98          cursor
 99              .put(&(stream_id, Sequence::from_u64(2)), &value_high)
100              .unwrap();
101          cursor
102              .put(&(stream_id, Sequence::from_u64(0)), &value_low)
103              .unwrap();
104          txn.commit().unwrap();
105  
106          let txn = db.begin_ro_txn().unwrap();
107          let table = txn.open_table::<SequencerStateTable>().unwrap();
108          let mut cursor = table.cursor().unwrap();
109          let ((stream_id, input_seq), _value) = cursor
110              .seek_range(&(stream_id, Sequence::from_u64(0)))
111              .unwrap()
112              .unwrap();
113          assert_eq!(stream_id.as_u64(), 1);
114          assert_eq!(input_seq.as_u64(), 0);
115  
116          let ((stream_id, input_seq), _value) = cursor.next().unwrap().unwrap();
117          assert_eq!(stream_id.as_u64(), 1);
118          assert_eq!(input_seq.as_u64(), 1);
119  
120          let ((stream_id, input_seq), _value) = cursor.next().unwrap().unwrap();
121          assert_eq!(stream_id.as_u64(), 1);
122          assert_eq!(input_seq.as_u64(), 2);
123  
124          let value = cursor.next().unwrap();
125          assert!(value.is_none());
126          txn.commit().unwrap();
127      }
128  }