table.rs
1 //! Type-safe database access. 2 3 use std::io::Cursor; 4 5 use apibara_core::stream::{Sequence, StreamId}; 6 use arrayvec::ArrayVec; 7 use byteorder::{BigEndian, ReadBytesExt}; 8 use prost::Message; 9 10 /// Error related to decoding keys. 11 #[derive(Debug, thiserror::Error)] 12 pub enum KeyDecodeError { 13 #[error("invalid key bytes size")] 14 InvalidByteSize { expected: usize, actual: usize }, 15 #[error("error reading key from bytes")] 16 ReadError(#[from] std::io::Error), 17 #[error("Other type of error")] 18 Other(Box<dyn std::error::Error + Send + Sync + 'static>), 19 } 20 21 /// A fixed-capacity vector of bytes. 22 #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] 23 pub struct ByteVec<const CAP: usize>(ArrayVec<u8, CAP>); 24 25 pub trait TableKey: Send + Sync + Sized { 26 type Encoded: AsRef<[u8]> + Send + Sync; 27 28 fn encode(&self) -> Self::Encoded; 29 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError>; 30 } 31 32 pub trait Table: Send + Sync { 33 type Key: TableKey; 34 type Value: Message + Default + Clone; 35 36 fn db_name() -> &'static str; 37 } 38 39 pub trait DupSortTable: Table {} 40 41 impl<const CAP: usize> AsRef<[u8]> for ByteVec<CAP> { 42 fn as_ref(&self) -> &[u8] { 43 self.0.as_ref() 44 } 45 } 46 47 impl TableKey for StreamId { 48 type Encoded = [u8; 8]; 49 50 fn encode(&self) -> Self::Encoded { 51 self.as_u64().to_be_bytes() 52 } 53 54 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> { 55 if b.len() != 8 { 56 return Err(KeyDecodeError::InvalidByteSize { 57 expected: 8, 58 actual: b.len(), 59 }); 60 } 61 let mut cursor = Cursor::new(b); 62 let stream_id = cursor 63 .read_u64::<BigEndian>() 64 .map_err(KeyDecodeError::ReadError)?; 65 Ok(StreamId::from_u64(stream_id)) 66 } 67 } 68 69 impl TableKey for Sequence { 70 type Encoded = [u8; 8]; 71 72 fn encode(&self) -> Self::Encoded { 73 self.as_u64().to_be_bytes() 74 } 75 76 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> { 77 if b.len() != 8 { 78 return Err(KeyDecodeError::InvalidByteSize { 79 expected: 8, 80 actual: b.len(), 81 }); 82 } 83 let mut cursor = Cursor::new(b); 84 let sequence = cursor 85 .read_u64::<BigEndian>() 86 .map_err(KeyDecodeError::ReadError)?; 87 Ok(Sequence::from_u64(sequence)) 88 } 89 } 90 91 impl TableKey for (StreamId, Sequence) { 92 type Encoded = [u8; 16]; 93 94 fn encode(&self) -> Self::Encoded { 95 let mut out = [0; 16]; 96 out[..8].copy_from_slice(&self.0.encode()); 97 out[8..].copy_from_slice(&self.1.encode()); 98 out 99 } 100 101 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> { 102 if b.len() != 16 { 103 return Err(KeyDecodeError::InvalidByteSize { 104 expected: 16, 105 actual: b.len(), 106 }); 107 } 108 let mut cursor = Cursor::new(b); 109 let stream_id = cursor 110 .read_u64::<BigEndian>() 111 .map_err(KeyDecodeError::ReadError)?; 112 let sequence = cursor 113 .read_u64::<BigEndian>() 114 .map_err(KeyDecodeError::ReadError)?; 115 Ok((StreamId::from_u64(stream_id), Sequence::from_u64(sequence))) 116 } 117 } 118 119 impl TableKey for () { 120 type Encoded = [u8; 0]; 121 122 fn encode(&self) -> Self::Encoded { 123 [] 124 } 125 126 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> { 127 if !b.is_empty() { 128 return Err(KeyDecodeError::InvalidByteSize { 129 expected: 0, 130 actual: b.len(), 131 }); 132 } 133 Ok(()) 134 } 135 } 136 137 impl TableKey for u64 { 138 type Encoded = [u8; 8]; 139 140 fn encode(&self) -> Self::Encoded { 141 self.to_be_bytes() 142 } 143 144 fn decode(b: &[u8]) -> Result<Self, KeyDecodeError> { 145 if b.len() != 8 { 146 return Err(KeyDecodeError::InvalidByteSize { 147 expected: 8, 148 actual: b.len(), 149 }); 150 } 151 let mut cursor = Cursor::new(b); 152 cursor 153 .read_u64::<BigEndian>() 154 .map_err(KeyDecodeError::ReadError) 155 } 156 }