/ sinks / sink-common / src / lib.rs
lib.rs
  1  mod cli;
  2  mod configuration;
  3  mod connector;
  4  mod cursor;
  5  mod error;
  6  mod json;
  7  pub mod persistence;
  8  mod sink;
  9  mod status;
 10  
 11  use apibara_core::starknet::v1alpha2;
 12  use error_stack::Result;
 13  use error_stack::ResultExt;
 14  use serde::Deserialize;
 15  use tokio_util::sync::CancellationToken;
 16  
 17  pub use self::cli::*;
 18  pub use self::configuration::*;
 19  pub use self::connector::*;
 20  pub use self::cursor::DisplayCursor;
 21  pub use self::error::*;
 22  pub use self::json::ValueExt;
 23  pub use self::persistence::*;
 24  pub use self::sink::*;
 25  pub use self::status::*;
 26  pub use apibara_sink_options_derive::SinkOptions;
 27  
 28  pub use apibara_script::ScriptOptions as IndexerOptions;
 29  
 30  #[derive(Debug, Deserialize)]
 31  pub struct FullOptionsFromScript<SinkOptions> {
 32      #[serde(flatten)]
 33      pub connector: OptionsFromScript,
 34      #[serde(flatten)]
 35      pub sink: SinkOptions,
 36  }
 37  
 38  pub async fn run_sink_connector<S>(
 39      script: &str,
 40      connector_cli_options: OptionsFromCli,
 41      sink_cli_options: S::Options,
 42      ct: CancellationToken,
 43  ) -> Result<(), SinkError>
 44  where
 45      S: Sink + Send + Sync,
 46  {
 47      let script_options = connector_cli_options
 48          .connector
 49          .script
 50          .load_environment_variables()
 51          .map_err(|err| err.configuration("failed to parse cli options"))?
 52          .into_indexer_options();
 53  
 54      let mut script = load_script(script, script_options)
 55          .map_err(|err| err.configuration("failed to load script"))?;
 56  
 57      let options_from_script = script
 58          .configuration::<FullOptionsFromScript<S::Options>>()
 59          .await
 60          .map_err(|err| err.configuration("failed to load configuration from script"))?;
 61  
 62      script
 63          .check_transform_is_exported()
 64          .await
 65          .map_err(|err| err.configuration("missing or invalid transform function"))?;
 66  
 67      // Setup sink.
 68      let sink_options = sink_cli_options.merge(options_from_script.sink);
 69      let sink = S::from_options(sink_options)
 70          .await
 71          .map_err(|err| err.configuration("invalid sink options"))?;
 72  
 73      // Setup connector.
 74      let connector_options_from_script = options_from_script.connector;
 75      let stream_configuration = connector_options_from_script.stream_configuration;
 76      let stream_options = connector_cli_options
 77          .stream
 78          .merge(connector_options_from_script.stream);
 79  
 80      let stream = stream_options
 81          .to_stream_configuration()
 82          .map_err(|err| err.configuration("invalid stream options"))?;
 83  
 84      let persistence = Persistence::new_from_options(connector_cli_options.connector.persistence);
 85      let status_server = connector_cli_options
 86          .connector
 87          .status_server
 88          .to_status_server()
 89          .map_err(|err| err.configuration("invalid status server options"))?;
 90  
 91      let sink_connector_options = SinkConnectorOptions {
 92          stream,
 93          persistence,
 94          status_server,
 95      };
 96  
 97      let connector = SinkConnector::new(script, sink, sink_connector_options);
 98  
 99      if let Some(starknet_config) = stream_configuration.as_starknet() {
100          connector
101              .consume_stream::<v1alpha2::Filter, v1alpha2::Block>(starknet_config, ct)
102              .await
103              .attach_printable("error while streaming data")?;
104      } else {
105          todo!()
106      };
107  
108      Ok(())
109  }