common.rs
1 use apibara_core::{filter::Filter, node::v1alpha2::Cursor}; 2 use async_trait::async_trait; 3 use error_stack::Result; 4 use prost::Message; 5 use serde::{Deserialize, Serialize}; 6 7 use crate::SinkError; 8 9 #[derive(Clone, PartialEq, Message, Deserialize, Serialize)] 10 pub struct PersistedState<F: Message + Default> { 11 #[prost(message, tag = "1")] 12 pub cursor: Option<Cursor>, 13 #[prost(message, tag = "2")] 14 pub filter: Option<F>, 15 } 16 17 /// Client used to interact with the persistence backend. 18 #[async_trait] 19 pub trait PersistenceClient { 20 /// Attempts to acquire a lock on the sink. 21 async fn lock(&mut self) -> Result<(), SinkError>; 22 23 /// Unlock the previously acquired lock. 24 async fn unlock(&mut self) -> Result<(), SinkError>; 25 26 /// Reads the currently stored state. 27 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError>; 28 29 /// Updates the sink state. 30 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError>; 31 32 /// Deletes any stored sink state. 33 async fn delete_state(&mut self) -> Result<(), SinkError>; 34 } 35 36 impl<F: Message + Default> PersistedState<F> { 37 pub fn with_cursor(cursor: Cursor) -> Self { 38 Self { 39 cursor: Some(cursor), 40 filter: None, 41 } 42 } 43 44 pub fn new(cursor: Option<Cursor>, filter: Option<F>) -> Self { 45 Self { cursor, filter } 46 } 47 } 48 49 #[async_trait] 50 impl<P> PersistenceClient for Box<P> 51 where 52 P: PersistenceClient + ?Sized + Send, 53 { 54 async fn lock(&mut self) -> Result<(), SinkError> { 55 (**self).lock().await 56 } 57 58 async fn unlock(&mut self) -> Result<(), SinkError> { 59 (**self).unlock().await 60 } 61 62 async fn get_state<F: Filter>(&mut self) -> Result<PersistedState<F>, SinkError> { 63 (**self).get_state().await 64 } 65 66 async fn put_state<F: Filter>(&mut self, state: PersistedState<F>) -> Result<(), SinkError> { 67 (**self).put_state(state).await 68 } 69 70 async fn delete_state(&mut self) -> Result<(), SinkError> { 71 (**self).delete_state().await 72 } 73 }