/ sinks / sink-common / src / connector / stream.rs
stream.rs
 1  use apibara_sdk::{ClientBuilder, StreamClient};
 2  use error_stack::Result;
 3  
 4  use crate::{error::SinkError, SinkErrorReportExt, StreamConfiguration};
 5  
 6  /// Action to take after handling a batch of data.
 7  #[derive(Debug, PartialEq)]
 8  pub enum StreamAction {
 9      /// Continue streaming.
10      Continue,
11      /// Stop streaming.
12      Stop,
13      /// Reconnect to the stream using the new filter.
14      Reconnect,
15  }
16  
17  pub struct StreamClientFactory {
18      stream_configuration: StreamConfiguration,
19  }
20  
21  impl StreamClientFactory {
22      pub fn new(stream_configuration: StreamConfiguration) -> Self {
23          Self {
24              stream_configuration,
25          }
26      }
27  
28      pub async fn new_stream_client(&self) -> Result<StreamClient, SinkError> {
29          let mut stream_builder = ClientBuilder::default()
30              .with_max_message_size(
31                  self.stream_configuration.max_message_size_bytes.as_u64() as usize
32              )
33              .with_metadata(self.stream_configuration.metadata.clone())
34              .with_timeout(self.stream_configuration.timeout_duration);
35  
36          stream_builder = if let Some(bearer_token) = self.stream_configuration.bearer_token.clone()
37          {
38              stream_builder.with_bearer_token(Some(bearer_token))
39          } else {
40              stream_builder
41          };
42  
43          let client = stream_builder
44              .connect(self.stream_configuration.stream_url.clone())
45              .await
46              .map_err(|err| err.temporary("failed to connect to stream"))?;
47  
48          Ok(client)
49      }
50  }