/ sinks / sink-common / src / persistence / common.rs
common.rs
 1  use apibara_core::{filter::Filter, node::v1alpha2::Cursor};
 2  use async_trait::async_trait;
 3  use error_stack::Result;
 4  use prost::Message;
 5  use serde::{Deserialize, Serialize};
 6  
 7  use crate::SinkError;
 8  
 9  #[derive(Clone, PartialEq, Message, Deserialize, Serialize)]
10  pub struct PersistedState<F: Message + Default> {
11      #[prost(message, tag = "1")]
12      pub cursor: Option<Cursor>,
13      #[prost(message, tag = "2")]
14      pub filter: Option<F>,
15  }
16  
17  /// Client used to interact with the persistence backend.
18  #[async_trait]
19  pub trait PersistenceClient {
20      /// Attempts to acquire a lock on the sink.
21      async fn lock(&mut self) -> Result<(), SinkError>;
22  
23      /// Unlock the previously acquired lock.
24      async fn unlock(&mut self) -> Result<(), SinkError>;
25  
26      /// Reads the currently stored state.
27      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError>;
28  
29      /// Updates the sink state.
30      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError>;
31  
32      /// Deletes any stored sink state.
33      async fn delete_state(&mut self) -> Result<(), SinkError>;
34  }
35  
36  impl<F: Message + Default> PersistedState<F> {
37      pub fn with_cursor(cursor: Cursor) -> Self {
38          Self {
39              cursor: Some(cursor),
40              filter: None,
41          }
42      }
43  
44      pub fn new(cursor: Option<Cursor>, filter: Option<F>) -> Self {
45          Self { cursor, filter }
46      }
47  }
48  
49  #[async_trait]
50  impl<P> PersistenceClient for Box<P>
51  where
52      P: PersistenceClient + ?Sized + Send,
53  {
54      async fn lock(&mut self) -> Result<(), SinkError> {
55          (**self).lock().await
56      }
57  
58      async fn unlock(&mut self) -> Result<(), SinkError> {
59          (**self).unlock().await
60      }
61  
62      async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> {
63          (**self).get_state().await
64      }
65  
66      async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> {
67          (**self).put_state(state).await
68      }
69  
70      async fn delete_state(&mut self) -> Result<(), SinkError> {
71          (**self).delete_state().await
72      }
73  }