/ node / src / db / table.rs
table.rs
  1  //! Type-safe database access.
  2  
  3  use std::io::Cursor;
  4  
  5  use apibara_core::stream::{Sequence, StreamId};
  6  use arrayvec::ArrayVec;
  7  use byteorder::{BigEndian, ReadBytesExt};
  8  use prost::Message;
  9  
 10  /// Error related to decoding keys.
 11  #[derive(Debug, thiserror::Error)]
 12  pub enum KeyDecodeError {
 13      #[error("invalid key bytes size")]
 14      InvalidByteSize { expected: usize, actual: usize },
 15      #[error("error reading key from bytes")]
 16      ReadError(#[from] std::io::Error),
 17      #[error("Other type of error")]
 18      Other(Box<dyn std::error::Error + Send + Sync + 'static>),
 19  }
 20  
 21  /// A fixed-capacity vector of bytes.
 22  #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
 23  pub struct ByteVec<const CAP: usize>(ArrayVec<u8, CAP>);
 24  
 25  pub trait TableKey: Send + Sync + Sized {
 26      type Encoded: AsRef<[u8]> + Send + Sync;
 27  
 28      fn encode(&self) -> Self::Encoded;
 29      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError>;
 30  }
 31  
 32  pub trait Table: Send + Sync {
 33      type Key: TableKey;
 34      type Value: Message + Default + Clone;
 35  
 36      fn db_name() -> &'static str;
 37  }
 38  
 39  pub trait DupSortTable: Table {}
 40  
 41  impl<const CAP: usize> AsRef<[u8]> for ByteVec<CAP> {
 42      fn as_ref(&self) -> &[u8] {
 43          self.0.as_ref()
 44      }
 45  }
 46  
 47  impl TableKey for StreamId {
 48      type Encoded = [u8; 8];
 49  
 50      fn encode(&self) -> Self::Encoded {
 51          self.as_u64().to_be_bytes()
 52      }
 53  
 54      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> {
 55          if b.len() != 8 {
 56              return Err(KeyDecodeError::InvalidByteSize {
 57                  expected: 8,
 58                  actual: b.len(),
 59              });
 60          }
 61          let mut cursor = Cursor::new(b);
 62          let stream_id = cursor
 63              .read_u64::<BigEndian>()
 64              .map_err(KeyDecodeError::ReadError)?;
 65          Ok(StreamId::from_u64(stream_id))
 66      }
 67  }
 68  
 69  impl TableKey for Sequence {
 70      type Encoded = [u8; 8];
 71  
 72      fn encode(&self) -> Self::Encoded {
 73          self.as_u64().to_be_bytes()
 74      }
 75  
 76      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> {
 77          if b.len() != 8 {
 78              return Err(KeyDecodeError::InvalidByteSize {
 79                  expected: 8,
 80                  actual: b.len(),
 81              });
 82          }
 83          let mut cursor = Cursor::new(b);
 84          let sequence = cursor
 85              .read_u64::<BigEndian>()
 86              .map_err(KeyDecodeError::ReadError)?;
 87          Ok(Sequence::from_u64(sequence))
 88      }
 89  }
 90  
 91  impl TableKey for (StreamId, Sequence) {
 92      type Encoded = [u8; 16];
 93  
 94      fn encode(&self) -> Self::Encoded {
 95          let mut out = [0; 16];
 96          out[..8].copy_from_slice(&self.0.encode());
 97          out[8..].copy_from_slice(&self.1.encode());
 98          out
 99      }
100  
101      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> {
102          if b.len() != 16 {
103              return Err(KeyDecodeError::InvalidByteSize {
104                  expected: 16,
105                  actual: b.len(),
106              });
107          }
108          let mut cursor = Cursor::new(b);
109          let stream_id = cursor
110              .read_u64::<BigEndian>()
111              .map_err(KeyDecodeError::ReadError)?;
112          let sequence = cursor
113              .read_u64::<BigEndian>()
114              .map_err(KeyDecodeError::ReadError)?;
115          Ok((StreamId::from_u64(stream_id), Sequence::from_u64(sequence)))
116      }
117  }
118  
119  impl TableKey for () {
120      type Encoded = [u8; 0];
121  
122      fn encode(&self) -> Self::Encoded {
123          []
124      }
125  
126      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> {
127          if !b.is_empty() {
128              return Err(KeyDecodeError::InvalidByteSize {
129                  expected: 0,
130                  actual: b.len(),
131              });
132          }
133          Ok(())
134      }
135  }
136  
137  impl TableKey for u64 {
138      type Encoded = [u8; 8];
139  
140      fn encode(&self) -> Self::Encoded {
141          self.to_be_bytes()
142      }
143  
144      fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> {
145          if b.len() != 8 {
146              return Err(KeyDecodeError::InvalidByteSize {
147                  expected: 8,
148                  actual: b.len(),
149              });
150          }
151          let mut cursor = Cursor::new(b);
152          cursor
153              .read_u64::<BigEndian>()
154              .map_err(KeyDecodeError::ReadError)
155      }
156  }