etcd.rs
  1  //! Persist state to etcd.
  2  use apibara_core::filter::Filter;
  3  use async_trait::async_trait;
  4  use error_stack::{Result, ResultExt};
  5  use etcd_client::{Client, LeaseKeeper, LockOptions, LockResponse};
  6  use prost::Message;
  7  use std::time::{Duration, Instant};
  8  use tracing::{debug, instrument};
  9  
 10  use crate::{PersistedState, SinkError, SinkErrorResultExt};
 11  
 12  use super::common::PersistenceClient;
 13  
 14  pub struct EtcdPersistence {
 15      client: Client,
 16      sink_id: String,
 17      lock: Option<Lock>,
 18  }
 19  
 20  pub struct Lock {
 21      inner: LockResponse,
 22      lease_id: i64,
 23      keeper: LeaseKeeper,
 24      last_lock_renewal: Instant,
 25      min_lock_refresh_interval: Duration,
 26  }
 27  
 28  impl EtcdPersistence {
 29      pub async fn connect(
 30          url: &str,
 31          sink_id: impl Into<String>,
 32      ) -> Result<EtcdPersistence, SinkError> {
 33          let client = Client::connect([url], None)
 34              .await
 35              .persistence(&format!("failed to connect to etcd server at {url}"))?;
 36  
 37          Ok(EtcdPersistence {
 38              client,
 39              sink_id: sink_id.into(),
 40              lock: None,
 41          })
 42      }
 43  }
 44  
 45  #[async_trait]
 46  impl PersistenceClient for EtcdPersistence {
 47      #[instrument(skip(self), level = "debug")]
 48      async fn lock(&mut self) -> Result<(), SinkError> {
 49          let lease = self
 50              .client
 51              .lease_grant(60, None)
 52              .await
 53              .persistence("failed lease grant")?;
 54          debug!(lease_id = %lease.id(), "acquired lease for lock");
 55          let (keeper, _) = self
 56              .client
 57              .lease_keep_alive(lease.id())
 58              .await
 59              .persistence("failed lease keep alive")?;
 60  
 61          let lock_options = LockOptions::new().with_lease(lease.id());
 62          let inner = self
 63              .client
 64              .lock(self.sink_id.as_str(), Some(lock_options))
 65              .await
 66              .persistence(&format!("failed lock {}", self.sink_id.as_str()))?;
 67  
 68          let last_lock_renewal = Instant::now();
 69          let min_lock_refresh_interval = Duration::from_secs(30);
 70  
 71          let lock = Lock {
 72              inner,
 73              lease_id: lease.id(),
 74              keeper,
 75              last_lock_renewal,
 76              min_lock_refresh_interval,
 77          };
 78  
 79          self.lock = Some(lock);
 80          Ok(())
 81      }
 82  
 83      #[instrument(skip(self), level = "debug")]
 84      async fn unlock(&mut self) -> Result<(), SinkError> {
 85          if let Some(lock) = self.lock.take() {
 86              self.client
 87                  .unlock(lock.inner.key())
 88                  .await
 89                  .persistence("failed unlock")?;
 90          }
 91  
 92          Ok(())
 93      }
 94  
 95      #[instrument(skip(self), level = "debug")]
 96      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
 97          let response = self
 98              .client
 99              .get(self.sink_id.as_str(), None)
100              .await
101              .persistence(&format!("failed get state {}", self.sink_id.as_str()))?;
102  
103          match response.kvs().iter().next() {
104              None => Ok(PersistedState::default()),
105              Some(kv) => {
106                  let state =
107                      PersistedState::decode(kv.value()).persistence("failed to decode state")?;
108                  Ok(state)
109              }
110          }
111      }
112  
113      #[instrument(skip(self), level = "trace")]
114      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> {
115          self.client
116              .put(self.sink_id.as_str(), state.encode_to_vec(), None)
117              .await
118              .persistence(&format!("failed put state {}", self.sink_id.as_str()))?;
119  
120          if let Some(lock) = self.lock.as_mut() {
121              lock.keep_alive()
122                  .await
123                  .attach_printable("failed to keep lock alive")?;
124          }
125  
126          Ok(())
127      }
128  
129      #[instrument(skip(self), level = "trace")]
130      async fn delete_state(&mut self) -> Result<(), SinkError> {
131          self.client
132              .delete(self.sink_id.as_str(), None)
133              .await
134              .persistence(&format!("failed delete state {}", self.sink_id.as_str()))?;
135          Ok(())
136      }
137  }
138  
139  impl Lock {
140      /// Sends a keep alive request.
141      #[instrument(skip(self), level = "debug")]
142      pub async fn keep_alive(&mut self) -> Result<(), SinkError> {
143          // Renew the lock every 30 seconds to avoid hammering etcd.
144          if self.last_lock_renewal.elapsed() <= self.min_lock_refresh_interval {
145              return Ok(());
146          }
147  
148          debug!(lease_id = %self.lease_id, "send keep alive message");
149          self.keeper
150              .keep_alive()
151              .await
152              .persistence("failed to renew lock")?;
153          self.last_lock_renewal = Instant::now();
154  
155          Ok(())
156      }
157  }