mod.rs
  1  pub mod common;
  2  mod default;
  3  mod etcd;
  4  mod fs;
  5  mod redis;
  6  
  7  pub use self::common::{PersistedState, PersistenceClient as PersistenceClientTrait};
  8  pub use self::default::NoPersistence;
  9  pub use self::etcd::EtcdPersistence;
 10  pub use self::fs::DirPersistence;
 11  pub use self::redis::RedisPersistence;
 12  
 13  use apibara_core::filter::Filter;
 14  use async_trait::async_trait;
 15  use error_stack::Result;
 16  
 17  use crate::configuration::PersistenceOptions;
 18  use crate::SinkError;
 19  
 20  /// Persistence client factory.
 21  pub struct Persistence {
 22      options: PersistenceOptions,
 23  }
 24  
 25  impl Persistence {
 26      pub fn new_from_options(options: PersistenceOptions) -> Self {
 27          Self { options }
 28      }
 29  
 30      pub async fn connect(&mut self) -> Result<PersistenceClient, SinkError> {
 31          let sink_id = self
 32              .options
 33              .sink_id
 34              .clone()
 35              .unwrap_or_else(|| "default".to_string());
 36  
 37          if let Some(etcd_url) = &self.options.persistence_type.persist_to_etcd {
 38              let client = etcd::EtcdPersistence::connect(etcd_url, sink_id).await?;
 39              Ok(PersistenceClient::new_etcd(client))
 40          } else if let Some(dir_path) = &self.options.persistence_type.persist_to_fs {
 41              let persistence = DirPersistence::initialize(dir_path, sink_id)?;
 42              Ok(PersistenceClient::new_dir(persistence))
 43          } else if let Some(redis_url) = &self.options.persistence_type.persist_to_redis {
 44              let client = redis::RedisPersistence::connect(redis_url, sink_id).await?;
 45              Ok(PersistenceClient::new_redis(client))
 46          } else {
 47              Ok(PersistenceClient::new_none())
 48          }
 49      }
 50  }
 51  
 52  pub enum PersistenceClient {
 53      Etcd(EtcdPersistence),
 54      Dir(DirPersistence),
 55      Redis(RedisPersistence),
 56      None(NoPersistence),
 57  }
 58  
 59  impl PersistenceClient {
 60      pub fn new_etcd(inner: EtcdPersistence) -> Self {
 61          Self::Etcd(inner)
 62      }
 63  
 64      pub fn new_dir(inner: DirPersistence) -> Self {
 65          Self::Dir(inner)
 66      }
 67  
 68      fn new_redis(inner: RedisPersistence) -> PersistenceClient {
 69          Self::Redis(inner)
 70      }
 71  
 72      pub fn new_none() -> Self {
 73          Self::None(NoPersistence)
 74      }
 75  
 76      pub async fn lock(&mut self) -> Result<(), SinkError> {
 77          match self {
 78              Self::Etcd(inner) => inner.lock().await,
 79              Self::Dir(inner) => inner.lock().await,
 80              Self::Redis(inner) => inner.lock().await,
 81              Self::None(inner) => inner.lock().await,
 82          }
 83      }
 84  
 85      pub async fn unlock(&mut self) -> Result<(), SinkError> {
 86          match self {
 87              Self::Etcd(inner) => inner.unlock().await,
 88              Self::Dir(inner) => inner.unlock().await,
 89              Self::Redis(inner) => inner.unlock().await,
 90              Self::None(inner) => inner.unlock().await,
 91          }
 92      }
 93  
 94      pub async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
 95          match self {
 96              Self::Etcd(inner) => inner.get_state().await,
 97              Self::Dir(inner) => inner.get_state().await,
 98              Self::Redis(inner) => inner.get_state().await,
 99              Self::None(inner) => inner.get_state().await,
100          }
101      }
102  
103      pub async fn put_state<F: Filter>(
104          &mut self,
105          state: PersistedState<F>,
106      ) -> Result<(), SinkError> {
107          match self {
108              Self::Etcd(inner) => inner.put_state(state).await,
109              Self::Dir(inner) => inner.put_state(state).await,
110              Self::Redis(inner) => inner.put_state(state).await,
111              Self::None(inner) => inner.put_state(state).await,
112          }
113      }
114  
115      pub async fn delete_state(&mut self) -> Result<(), SinkError> {
116          match self {
117              Self::Etcd(inner) => inner.delete_state().await,
118              Self::Dir(inner) => inner.delete_state().await,
119              Self::Redis(inner) => inner.delete_state().await,
120              Self::None(inner) => inner.delete_state().await,
121          }
122      }
123  }
124  
125  #[async_trait]
126  impl PersistenceClientTrait for PersistenceClient {
127      async fn lock(&mut self) -> Result<(), SinkError> {
128          self.lock().await
129      }
130  
131      async fn unlock(&mut self) -> Result<(), SinkError> {
132          self.unlock().await
133      }
134  
135      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
136          self.get_state().await
137      }
138  
139      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> {
140          self.put_state(state).await
141      }
142  
143      async fn delete_state(&mut self) -> Result<(), SinkError> {
144          self.delete_state().await
145      }
146  }