mod.rs
1 pub mod common; 2 mod default; 3 mod etcd; 4 mod fs; 5 mod redis; 6 7 pub use self::common::{PersistedState, PersistenceClient as PersistenceClientTrait}; 8 pub use self::default::NoPersistence; 9 pub use self::etcd::EtcdPersistence; 10 pub use self::fs::DirPersistence; 11 pub use self::redis::RedisPersistence; 12 13 use apibara_core::filter::Filter; 14 use async_trait::async_trait; 15 use error_stack::Result; 16 17 use crate::configuration::PersistenceOptions; 18 use crate::SinkError; 19 20 /// Persistence client factory. 21 pub struct Persistence { 22 options: PersistenceOptions, 23 } 24 25 impl Persistence { 26 pub fn new_from_options(options: PersistenceOptions) -> Self { 27 Self { options } 28 } 29 30 pub async fn connect(&mut self) -> Result<PersistenceClient, SinkError> { 31 let sink_id = self 32 .options 33 .sink_id 34 .clone() 35 .unwrap_or_else(|| "default".to_string()); 36 37 if let Some(etcd_url) = &self.options.persistence_type.persist_to_etcd { 38 let client = etcd::EtcdPersistence::connect(etcd_url, sink_id).await?; 39 Ok(PersistenceClient::new_etcd(client)) 40 } else if let Some(dir_path) = &self.options.persistence_type.persist_to_fs { 41 let persistence = DirPersistence::initialize(dir_path, sink_id)?; 42 Ok(PersistenceClient::new_dir(persistence)) 43 } else if let Some(redis_url) = &self.options.persistence_type.persist_to_redis { 44 let client = redis::RedisPersistence::connect(redis_url, sink_id).await?; 45 Ok(PersistenceClient::new_redis(client)) 46 } else { 47 Ok(PersistenceClient::new_none()) 48 } 49 } 50 } 51 52 pub enum PersistenceClient { 53 Etcd(EtcdPersistence), 54 Dir(DirPersistence), 55 Redis(RedisPersistence), 56 None(NoPersistence), 57 } 58 59 impl PersistenceClient { 60 pub fn new_etcd(inner: EtcdPersistence) -> Self { 61 Self::Etcd(inner) 62 } 63 64 pub fn new_dir(inner: DirPersistence) -> Self { 65 Self::Dir(inner) 66 } 67 68 fn new_redis(inner: RedisPersistence) -> PersistenceClient { 69 Self::Redis(inner) 70 } 71 72 pub fn new_none() -> Self { 73 Self::None(NoPersistence) 74 } 75 76 pub async fn lock(&mut self) -> Result<(), SinkError> { 77 match self { 78 Self::Etcd(inner) => inner.lock().await, 79 Self::Dir(inner) => inner.lock().await, 80 Self::Redis(inner) => inner.lock().await, 81 Self::None(inner) => inner.lock().await, 82 } 83 } 84 85 pub async fn unlock(&mut self) -> Result<(), SinkError> { 86 match self { 87 Self::Etcd(inner) => inner.unlock().await, 88 Self::Dir(inner) => inner.unlock().await, 89 Self::Redis(inner) => inner.unlock().await, 90 Self::None(inner) => inner.unlock().await, 91 } 92 } 93 94 pub async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 95 match self { 96 Self::Etcd(inner) => inner.get_state().await, 97 Self::Dir(inner) => inner.get_state().await, 98 Self::Redis(inner) => inner.get_state().await, 99 Self::None(inner) => inner.get_state().await, 100 } 101 } 102 103 pub async fn put_state<F: Filter>( 104 &mut self, 105 state: PersistedState<F>, 106 ) -> Result<(), SinkError> { 107 match self { 108 Self::Etcd(inner) => inner.put_state(state).await, 109 Self::Dir(inner) => inner.put_state(state).await, 110 Self::Redis(inner) => inner.put_state(state).await, 111 Self::None(inner) => inner.put_state(state).await, 112 } 113 } 114 115 pub async fn delete_state(&mut self) -> Result<(), SinkError> { 116 match self { 117 Self::Etcd(inner) => inner.delete_state().await, 118 Self::Dir(inner) => inner.delete_state().await, 119 Self::Redis(inner) => inner.delete_state().await, 120 Self::None(inner) => inner.delete_state().await, 121 } 122 } 123 } 124 125 #[async_trait] 126 impl PersistenceClientTrait for PersistenceClient { 127 async fn lock(&mut self) -> Result<(), SinkError> { 128 self.lock().await 129 } 130 131 async fn unlock(&mut self) -> Result<(), SinkError> { 132 self.unlock().await 133 } 134 135 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 136 self.get_state().await 137 } 138 139 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> { 140 self.put_state(state).await 141 } 142 143 async fn delete_state(&mut self) -> Result<(), SinkError> { 144 self.delete_state().await 145 } 146 }