etcd.rs
1 //! Persist state to etcd. 2 use apibara_core::filter::Filter; 3 use async_trait::async_trait; 4 use error_stack::{Result, ResultExt}; 5 use etcd_client::{Client, LeaseKeeper, LockOptions, LockResponse}; 6 use prost::Message; 7 use std::time::{Duration, Instant}; 8 use tracing::{debug, instrument}; 9 10 use crate::{PersistedState, SinkError, SinkErrorResultExt}; 11 12 use super::common::PersistenceClient; 13 14 pub struct EtcdPersistence { 15 client: Client, 16 sink_id: String, 17 lock: Option<Lock>, 18 } 19 20 pub struct Lock { 21 inner: LockResponse, 22 lease_id: i64, 23 keeper: LeaseKeeper, 24 last_lock_renewal: Instant, 25 min_lock_refresh_interval: Duration, 26 } 27 28 impl EtcdPersistence { 29 pub async fn connect( 30 url: &str, 31 sink_id: impl Into<String>, 32 ) -> Result<EtcdPersistence, SinkError> { 33 let client = Client::connect([url], None) 34 .await 35 .persistence(&format!("failed to connect to etcd server at {url}"))?; 36 37 Ok(EtcdPersistence { 38 client, 39 sink_id: sink_id.into(), 40 lock: None, 41 }) 42 } 43 } 44 45 #[async_trait] 46 impl PersistenceClient for EtcdPersistence { 47 #[instrument(skip(self), level = "debug")] 48 async fn lock(&mut self) -> Result<(), SinkError> { 49 let lease = self 50 .client 51 .lease_grant(60, None) 52 .await 53 .persistence("failed lease grant")?; 54 debug!(lease_id = %lease.id(), "acquired lease for lock"); 55 let (keeper, _) = self 56 .client 57 .lease_keep_alive(lease.id()) 58 .await 59 .persistence("failed lease keep alive")?; 60 61 let lock_options = LockOptions::new().with_lease(lease.id()); 62 let inner = self 63 .client 64 .lock(self.sink_id.as_str(), Some(lock_options)) 65 .await 66 .persistence(&format!("failed lock {}", self.sink_id.as_str()))?; 67 68 let last_lock_renewal = Instant::now(); 69 let min_lock_refresh_interval = Duration::from_secs(30); 70 71 let lock = Lock { 72 inner, 73 lease_id: lease.id(), 74 keeper, 75 last_lock_renewal, 76 min_lock_refresh_interval, 77 }; 78 79 self.lock = Some(lock); 80 Ok(()) 81 } 82 83 #[instrument(skip(self), level = "debug")] 84 async fn unlock(&mut self) -> Result<(), SinkError> { 85 if let Some(lock) = self.lock.take() { 86 self.client 87 .unlock(lock.inner.key()) 88 .await 89 .persistence("failed unlock")?; 90 } 91 92 Ok(()) 93 } 94 95 #[instrument(skip(self), level = "debug")] 96 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 97 let response = self 98 .client 99 .get(self.sink_id.as_str(), None) 100 .await 101 .persistence(&format!("failed get state {}", self.sink_id.as_str()))?; 102 103 match response.kvs().iter().next() { 104 None => Ok(PersistedState::default()), 105 Some(kv) => { 106 let state = 107 PersistedState::decode(kv.value()).persistence("failed to decode state")?; 108 Ok(state) 109 } 110 } 111 } 112 113 #[instrument(skip(self), level = "trace")] 114 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> { 115 self.client 116 .put(self.sink_id.as_str(), state.encode_to_vec(), None) 117 .await 118 .persistence(&format!("failed put state {}", self.sink_id.as_str()))?; 119 120 if let Some(lock) = self.lock.as_mut() { 121 lock.keep_alive() 122 .await 123 .attach_printable("failed to keep lock alive")?; 124 } 125 126 Ok(()) 127 } 128 129 #[instrument(skip(self), level = "trace")] 130 async fn delete_state(&mut self) -> Result<(), SinkError> { 131 self.client 132 .delete(self.sink_id.as_str(), None) 133 .await 134 .persistence(&format!("failed delete state {}", self.sink_id.as_str()))?; 135 Ok(()) 136 } 137 } 138 139 impl Lock { 140 /// Sends a keep alive request. 141 #[instrument(skip(self), level = "debug")] 142 pub async fn keep_alive(&mut self) -> Result<(), SinkError> { 143 // Renew the lock every 30 seconds to avoid hammering etcd. 144 if self.last_lock_renewal.elapsed() <= self.min_lock_refresh_interval { 145 return Ok(()); 146 } 147 148 debug!(lease_id = %self.lease_id, "send keep alive message"); 149 self.keeper 150 .keep_alive() 151 .await 152 .persistence("failed to renew lock")?; 153 self.last_lock_renewal = Instant::now(); 154 155 Ok(()) 156 } 157 }