stream.rs
1 use apibara_sdk::{ClientBuilder, StreamClient}; 2 use error_stack::Result; 3 4 use crate::{error::SinkError, SinkErrorReportExt, StreamConfiguration}; 5 6 /// Action to take after handling a batch of data. 7 #[derive(Debug, PartialEq)] 8 pub enum StreamAction { 9 /// Continue streaming. 10 Continue, 11 /// Stop streaming. 12 Stop, 13 /// Reconnect to the stream using the new filter. 14 Reconnect, 15 } 16 17 pub struct StreamClientFactory { 18 stream_configuration: StreamConfiguration, 19 } 20 21 impl StreamClientFactory { 22 pub fn new(stream_configuration: StreamConfiguration) -> Self { 23 Self { 24 stream_configuration, 25 } 26 } 27 28 pub async fn new_stream_client(&self) -> Result<StreamClient, SinkError> { 29 let mut stream_builder = ClientBuilder::default() 30 .with_max_message_size( 31 self.stream_configuration.max_message_size_bytes.as_u64() as usize 32 ) 33 .with_metadata(self.stream_configuration.metadata.clone()) 34 .with_timeout(self.stream_configuration.timeout_duration); 35 36 stream_builder = if let Some(bearer_token) = self.stream_configuration.bearer_token.clone() 37 { 38 stream_builder.with_bearer_token(Some(bearer_token)) 39 } else { 40 stream_builder 41 }; 42 43 let client = stream_builder 44 .connect(self.stream_configuration.stream_url.clone()) 45 .await 46 .map_err(|err| err.temporary("failed to connect to stream"))?; 47 48 Ok(client) 49 } 50 }