mod.rs
  1  mod default;
  2  mod factory;
  3  mod sink;
  4  mod state;
  5  mod stream;
  6  
  7  use std::time::Duration;
  8  
  9  use apibara_core::filter::Filter;
 10  use apibara_script::Script;
 11  use apibara_sdk::{Configuration, MetadataMap, Uri};
 12  use bytesize::ByteSize;
 13  use error_stack::Result;
 14  use exponential_backoff::Backoff;
 15  use prost::Message;
 16  use serde::ser::Serialize;
 17  use tokio_util::sync::CancellationToken;
 18  use tracing::{info, warn};
 19  
 20  use crate::{
 21      connector::{state::StateManager, stream::StreamClientFactory},
 22      error::{SinkError, SinkErrorReportExt},
 23      persistence::Persistence,
 24      sink::Sink,
 25      status::StatusServer,
 26  };
 27  
 28  use self::{default::DefaultConnector, factory::FactoryConnector, sink::SinkWithBackoff};
 29  
 30  #[derive(Debug)]
 31  pub struct StreamConfiguration {
 32      pub stream_url: Uri,
 33      pub max_message_size_bytes: ByteSize,
 34      pub metadata: MetadataMap,
 35      pub bearer_token: Option<String>,
 36      pub timeout_duration: Duration,
 37      pub ending_block: Option<u64>,
 38  }
 39  
 40  pub struct SinkConnectorOptions {
 41      pub stream: StreamConfiguration,
 42      pub persistence: Persistence,
 43      pub status_server: StatusServer,
 44  }
 45  
 46  pub struct SinkConnector<S>
 47  where
 48      S: Sink + Send + Sync,
 49  {
 50      script: Script,
 51      sink: S,
 52      stream_configuration: StreamConfiguration,
 53      backoff: Backoff,
 54      persistence: Persistence,
 55      status_server: StatusServer,
 56  }
 57  
 58  impl<S> SinkConnector<S>
 59  where
 60      S: Sink + Send + Sync,
 61  {
 62      /// Creates a new connector with the given stream URL.
 63      pub fn new(script: Script, sink: S, options: SinkConnectorOptions) -> Self {
 64          Self {
 65              script,
 66              sink,
 67              backoff: default_backoff(),
 68              stream_configuration: options.stream,
 69              persistence: options.persistence,
 70              status_server: options.status_server,
 71          }
 72      }
 73  
 74      /// Start consuming the stream, calling the configured callback for each message.
 75      pub async fn consume_stream<F, B>(
 76          mut self,
 77          configuration: Configuration<F>,
 78          ct: CancellationToken,
 79      ) -> Result<(), SinkError>
 80      where
 81          F: Filter,
 82          B: Message + Default + Serialize,
 83      {
 84          let stream_ending_block = self.stream_configuration.ending_block;
 85  
 86          let stream_client_factory = StreamClientFactory::new(self.stream_configuration);
 87          let stream_client = stream_client_factory.new_stream_client().await?;
 88  
 89          let (state_manager, mut state_manager_fut) = StateManager::start(
 90              self.persistence,
 91              self.status_server,
 92              stream_client,
 93              ct.clone(),
 94          )
 95          .await?;
 96  
 97          let use_factory_mode = self
 98              .script
 99              .has_factory()
100              .await
101              .map_err(|err| err.configuration("failed to detect mode"))?;
102  
103          let sink = SinkWithBackoff::new(self.sink, self.backoff);
104  
105          let mut inner = if use_factory_mode {
106              InnerConnector::<S, F, B>::new_factory(
107                  self.script,
108                  sink,
109                  stream_ending_block,
110                  configuration,
111                  stream_client_factory,
112                  state_manager,
113              )
114          } else {
115              InnerConnector::<S, F, B>::new_default(
116                  self.script,
117                  sink,
118                  stream_ending_block,
119                  configuration,
120                  stream_client_factory,
121                  state_manager,
122              )
123          };
124  
125          loop {
126              let inner_fut = inner.start(ct.clone());
127              tokio::select! {
128                  _ = &mut state_manager_fut => {
129                      info!("status server stopped");
130                      break;
131                  }
132                  _ = ct.cancelled() => {
133                      break;
134                  }
135                  ret = inner_fut => {
136                      match ret {
137                          Ok(_) => {
138                              info!("connector stopped.");
139                              break;
140                          }
141                          Err(err) => {
142                              match err.downcast_ref::<SinkError>() {
143                                  Some(SinkError::Temporary) => {
144                                      warn!(err = ?err, "connector failed. restarting.");
145                                  }
146                                  _ => {
147                                      return Err(err);
148                                  }
149                              };
150                          }
151                      }
152                  }
153              };
154  
155              // Wait before restarting.
156              tokio::select! {
157                  _ = ct.cancelled() => {
158                      break;
159                  }
160                  _ = tokio::time::sleep(Duration::from_secs(10)) => {
161                      // continue
162                  }
163              }
164          }
165  
166          Ok(())
167      }
168  }
169  
170  enum InnerConnector<S, F, B>
171  where
172      S: Sink + Send + Sync,
173      F: Filter,
174      B: Message + Default + Serialize,
175  {
176      Default(DefaultConnector<S, F, B>),
177      Factory(FactoryConnector<S, F, B>),
178  }
179  
180  impl<S, F, B> InnerConnector<S, F, B>
181  where
182      S: Sink + Send + Sync,
183      F: Filter,
184      B: Message + Default + Serialize,
185  {
186      pub fn new_default(
187          script: Script,
188          sink: SinkWithBackoff<S>,
189          ending_block: Option<u64>,
190          starting_configuration: Configuration<F>,
191          stream_client_factory: StreamClientFactory,
192          state_manager: StateManager,
193      ) -> Self {
194          let inner = DefaultConnector::new(
195              script,
196              sink,
197              ending_block,
198              starting_configuration,
199              stream_client_factory,
200              state_manager,
201          );
202          Self::Default(inner)
203      }
204  
205      pub fn new_factory(
206          script: Script,
207          sink: SinkWithBackoff<S>,
208          ending_block: Option<u64>,
209          starting_configuration: Configuration<F>,
210          stream_client_factory: StreamClientFactory,
211          state_manager: StateManager,
212      ) -> Self {
213          let inner = FactoryConnector::new(
214              script,
215              sink,
216              ending_block,
217              starting_configuration,
218              stream_client_factory,
219              state_manager,
220          );
221          Self::Factory(inner)
222      }
223  
224      pub async fn start(&mut self, ct: CancellationToken) -> Result<(), SinkError> {
225          match self {
226              Self::Default(inner) => inner.start(ct).await,
227              Self::Factory(inner) => inner.start(ct).await,
228          }
229      }
230  }
231  
232  fn default_backoff() -> Backoff {
233      let retries = 10;
234      let min_delay = Duration::from_secs(3);
235      let max_delay = Duration::from_secs(60);
236      let mut backoff = Backoff::new(retries, min_delay, Some(max_delay));
237      backoff.set_factor(3);
238      backoff
239  }