sink.rs
1 use std::fmt::Display; 2 3 use apibara_core::node::v1alpha2::{Cursor, DataFinality}; 4 use async_trait::async_trait; 5 use error_stack::Result; 6 use serde::de::DeserializeOwned; 7 use serde_json::Value; 8 9 use crate::cursor::DisplayCursor; 10 11 pub trait SinkOptions: DeserializeOwned { 12 fn merge(self, other: Self) -> Self; 13 } 14 15 #[derive(Debug, PartialEq)] 16 pub enum CursorAction { 17 Persist, 18 Skip, 19 } 20 21 #[derive(Debug, Clone)] 22 pub struct Context { 23 pub cursor: Option<Cursor>, 24 pub end_cursor: Cursor, 25 pub finality: DataFinality, 26 } 27 28 #[async_trait] 29 pub trait Sink { 30 type Options: SinkOptions; 31 type Error: error_stack::Context + Send + Sync + 'static; 32 33 async fn from_options(options: Self::Options) -> Result<Self, Self::Error> 34 where 35 Self: Sized; 36 37 async fn handle_data( 38 &mut self, 39 ctx: &Context, 40 batch: &Value, 41 ) -> Result<CursorAction, Self::Error>; 42 43 async fn handle_invalidate(&mut self, cursor: &Option<Cursor>) -> Result<(), Self::Error>; 44 45 async fn cleanup(&mut self) -> Result<(), Self::Error> { 46 Ok(()) 47 } 48 49 async fn handle_heartbeat(&mut self) -> Result<(), Self::Error> { 50 Ok(()) 51 } 52 } 53 54 impl Display for Context { 55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 56 let start = DisplayCursor(&self.cursor); 57 write!( 58 f, 59 "Context(start={}, end={}, finality={:?})", 60 start, self.end_cursor, self.finality 61 ) 62 } 63 }