sequencer.rs
1 //! Sequencer-related tables. 2 3 use apibara_core::stream::{Sequence, StreamId}; 4 use prost::Message; 5 6 use super::Table; 7 8 /// Table with the state of each input sequence, together with the respective 9 /// output range. 10 #[derive(Debug, Clone, Copy, Default)] 11 pub struct SequencerStateTable; 12 13 /// Store the output's start and end sequence for a given stream id and input 14 /// sequence. 15 /// 16 /// Since the output sequence is strictly increasing, we can use the input 17 /// sequence to order the state and that will also keep the output's sequence 18 /// ordered. 19 /// 20 /// Mark fields as optional to enforce serializing the `0` value. 21 #[derive(Clone, PartialEq, Message)] 22 pub struct SequencerState { 23 #[prost(fixed64, optional, tag = "1")] 24 pub output_sequence_start: Option<u64>, 25 #[prost(fixed64, optional, tag = "2")] 26 pub output_sequence_end: Option<u64>, 27 } 28 29 impl Table for SequencerStateTable { 30 type Key = (StreamId, Sequence); 31 type Value = SequencerState; 32 33 fn db_name() -> &'static str { 34 "SequencerState" 35 } 36 } 37 38 /// Table with the state of each input stream. 39 #[derive(Debug, Clone, Copy, Default)] 40 pub struct StreamStateTable; 41 42 /// Contains the most recent sequence number of each input stream. 43 #[derive(Clone, PartialEq, Message)] 44 pub struct StreamState { 45 #[prost(fixed64, optional, tag = "1")] 46 pub sequence: Option<u64>, 47 } 48 49 impl Table for StreamStateTable { 50 type Key = StreamId; 51 type Value = StreamState; 52 53 fn db_name() -> &'static str { 54 "StreamState" 55 } 56 } 57 58 #[cfg(test)] 59 mod tests { 60 use apibara_core::stream::{Sequence, StreamId}; 61 use libmdbx::{Environment, NoWriteMap}; 62 use tempfile::tempdir; 63 64 use crate::db::{ 65 sequencer::StreamStateTable, MdbxEnvironmentExt, MdbxRWTransactionExt, MdbxTransactionExt, 66 }; 67 68 use super::{SequencerState, SequencerStateTable}; 69 70 #[test] 71 fn test_state_order() { 72 let path = tempdir().unwrap(); 73 let db = Environment::<NoWriteMap>::open(path.path()).unwrap(); 74 let stream_id = StreamId::from_u64(1); 75 let value_low = SequencerState { 76 output_sequence_start: Some(0), 77 output_sequence_end: Some(1), 78 }; 79 let value_mid = SequencerState { 80 output_sequence_start: Some(2), 81 output_sequence_end: Some(2), 82 }; 83 let value_high = SequencerState { 84 output_sequence_start: Some(3), 85 output_sequence_end: Some(4), 86 }; 87 88 let txn = db.begin_rw_txn().unwrap(); 89 txn.ensure_table::<SequencerStateTable>(None).unwrap(); 90 txn.ensure_table::<StreamStateTable>(None).unwrap(); 91 let table = txn.open_table::<SequencerStateTable>().unwrap(); 92 let mut cursor = table.cursor().unwrap(); 93 cursor.first().unwrap(); 94 // insert three values in the wrong order, then check if they're stored in order. 95 cursor 96 .put(&(stream_id, Sequence::from_u64(1)), &value_mid) 97 .unwrap(); 98 cursor 99 .put(&(stream_id, Sequence::from_u64(2)), &value_high) 100 .unwrap(); 101 cursor 102 .put(&(stream_id, Sequence::from_u64(0)), &value_low) 103 .unwrap(); 104 txn.commit().unwrap(); 105 106 let txn = db.begin_ro_txn().unwrap(); 107 let table = txn.open_table::<SequencerStateTable>().unwrap(); 108 let mut cursor = table.cursor().unwrap(); 109 let ((stream_id, input_seq), _value) = cursor 110 .seek_range(&(stream_id, Sequence::from_u64(0))) 111 .unwrap() 112 .unwrap(); 113 assert_eq!(stream_id.as_u64(), 1); 114 assert_eq!(input_seq.as_u64(), 0); 115 116 let ((stream_id, input_seq), _value) = cursor.next().unwrap().unwrap(); 117 assert_eq!(stream_id.as_u64(), 1); 118 assert_eq!(input_seq.as_u64(), 1); 119 120 let ((stream_id, input_seq), _value) = cursor.next().unwrap().unwrap(); 121 assert_eq!(stream_id.as_u64(), 1); 122 assert_eq!(input_seq.as_u64(), 2); 123 124 let value = cursor.next().unwrap(); 125 assert!(value.is_none()); 126 txn.commit().unwrap(); 127 } 128 }