/ sinks / sink-common / src / sink.rs
sink.rs
 1  use std::fmt::Display;
 2  
 3  use apibara_core::node::v1alpha2::{Cursor, DataFinality};
 4  use async_trait::async_trait;
 5  use error_stack::Result;
 6  use serde::de::DeserializeOwned;
 7  use serde_json::Value;
 8  
 9  use crate::cursor::DisplayCursor;
10  
11  pub trait SinkOptions: DeserializeOwned {
12      fn merge(self, other: Self) -> Self;
13  }
14  
15  #[derive(Debug, PartialEq)]
16  pub enum CursorAction {
17      Persist,
18      Skip,
19  }
20  
21  #[derive(Debug, Clone)]
22  pub struct Context {
23      pub cursor: Option<Cursor>,
24      pub end_cursor: Cursor,
25      pub finality: DataFinality,
26  }
27  
28  #[async_trait]
29  pub trait Sink {
30      type Options: SinkOptions;
31      type Error: error_stack::Context + Send + Sync + 'static;
32  
33      async fn from_options(options: Self::Options) -> Result<Self, Self::Error>
34      where
35          Self: Sized;
36  
37      async fn handle_data(
38          &mut self,
39          ctx: &Context,
40          batch: &Value,
41      ) -> Result<CursorAction, Self::Error>;
42  
43      async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error>;
44  
45      async fn cleanup(&mut self) -> Result<(), Self::Error> {
46          Ok(())
47      }
48  
49      async fn handle_heartbeat(&mut self) -> Result<(), Self::Error> {
50          Ok(())
51      }
52  }
53  
54  impl Display for Context {
55      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56          let start = DisplayCursor(&self.cursor);
57          write!(
58              f,
59              "Context(start={}, end={}, finality={:?})",
60              start, self.end_cursor, self.finality
61          )
62      }
63  }