/ sinks / sink-common / src / persistence / redis.rs
redis.rs
 1  use apibara_core::filter::Filter;
 2  use async_trait::async_trait;
 3  use error_stack::Result;
 4  use redis::Commands;
 5  use tracing::warn;
 6  
 7  use crate::{common::PersistenceClient, PersistedState, SinkError, SinkErrorResultExt};
 8  
 9  pub struct RedisPersistence {
10      client: redis::Client,
11      key: String,
12  }
13  
14  impl RedisPersistence {
15      pub async fn connect(
16          url: &str,
17          sink_id: impl Into<String>,
18      ) -> Result<RedisPersistence, SinkError> {
19          let client = redis::Client::open(url)
20              .persistence(&format!("failed to connect to redis server at {url}"))?;
21  
22          let key = format!("apibara:sink:{}", sink_id.into());
23  
24          Ok(RedisPersistence { client, key })
25      }
26  }
27  
28  #[async_trait]
29  impl PersistenceClient for RedisPersistence {
30      async fn lock(&mut self) -> Result<(), SinkError> {
31          warn!("Locking is not yet supported for Redis persistence.");
32          Ok(())
33      }
34  
35      async fn unlock(&mut self) -> Result<(), SinkError> {
36          warn!("Locking is not yet supported for Redis persistence.");
37          Ok(())
38      }
39  
40      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
41          let mut conn = self
42              .client
43              .get_connection()
44              .persistence("failed to connect to redis")?;
45  
46          let content = conn
47              .get::<_, Option<String>>(&self.key)
48              .persistence("failed to get state from redis")?;
49  
50          match content {
51              Some(content) => {
52                  Ok(serde_json::from_str(&content).persistence("failed to deserialize state")?)
53              }
54              None => Ok(PersistedState::<F>::default()),
55          }
56      }
57  
58      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> {
59          let mut conn = self
60              .client
61              .get_connection()
62              .persistence("failed to connect to redis")?;
63  
64          let serialized = serde_json::to_string(&state).persistence("failed to serialize state")?;
65  
66          conn.set(&self.key, serialized)
67              .persistence("failed to put state in redis")?;
68  
69          Ok(())
70      }
71  
72      async fn delete_state(&mut self) -> Result<(), SinkError> {
73          let mut conn = self
74              .client
75              .get_connection()
76              .persistence("failed to connect to redis")?;
77  
78          conn.del(&self.key)
79              .persistence("failed to delete state from redis")?;
80  
81          Ok(())
82      }
83  }