redis.rs
1 use apibara_core::filter::Filter; 2 use async_trait::async_trait; 3 use error_stack::Result; 4 use redis::Commands; 5 use tracing::warn; 6 7 use crate::{common::PersistenceClient, PersistedState, SinkError, SinkErrorResultExt}; 8 9 pub struct RedisPersistence { 10 client: redis::Client, 11 key: String, 12 } 13 14 impl RedisPersistence { 15 pub async fn connect( 16 url: &str, 17 sink_id: impl Into<String>, 18 ) -> Result<RedisPersistence, SinkError> { 19 let client = redis::Client::open(url) 20 .persistence(&format!("failed to connect to redis server at {url}"))?; 21 22 let key = format!("apibara:sink:{}", sink_id.into()); 23 24 Ok(RedisPersistence { client, key }) 25 } 26 } 27 28 #[async_trait] 29 impl PersistenceClient for RedisPersistence { 30 async fn lock(&mut self) -> Result<(), SinkError> { 31 warn!("Locking is not yet supported for Redis persistence."); 32 Ok(()) 33 } 34 35 async fn unlock(&mut self) -> Result<(), SinkError> { 36 warn!("Locking is not yet supported for Redis persistence."); 37 Ok(()) 38 } 39 40 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 41 let mut conn = self 42 .client 43 .get_connection() 44 .persistence("failed to connect to redis")?; 45 46 let content = conn 47 .get::<_, Option<String>>(&self.key) 48 .persistence("failed to get state from redis")?; 49 50 match content { 51 Some(content) => { 52 Ok(serde_json::from_str(&content).persistence("failed to deserialize state")?) 53 } 54 None => Ok(PersistedState::<F>::default()), 55 } 56 } 57 58 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> { 59 let mut conn = self 60 .client 61 .get_connection() 62 .persistence("failed to connect to redis")?; 63 64 let serialized = serde_json::to_string(&state).persistence("failed to serialize state")?; 65 66 conn.set(&self.key, serialized) 67 .persistence("failed to put state in redis")?; 68 69 Ok(()) 70 } 71 72 async fn delete_state(&mut self) -> Result<(), SinkError> { 73 let mut conn = self 74 .client 75 .get_connection() 76 .persistence("failed to connect to redis")?; 77 78 conn.del(&self.key) 79 .persistence("failed to delete state from redis")?; 80 81 Ok(()) 82 } 83 }