lib.rs
1 mod cli; 2 mod configuration; 3 mod connector; 4 mod cursor; 5 mod error; 6 mod json; 7 pub mod persistence; 8 mod sink; 9 mod status; 10 11 use apibara_core::starknet::v1alpha2; 12 use error_stack::Result; 13 use error_stack::ResultExt; 14 use serde::Deserialize; 15 use tokio_util::sync::CancellationToken; 16 17 pub use self::cli::*; 18 pub use self::configuration::*; 19 pub use self::connector::*; 20 pub use self::cursor::DisplayCursor; 21 pub use self::error::*; 22 pub use self::json::ValueExt; 23 pub use self::persistence::*; 24 pub use self::sink::*; 25 pub use self::status::*; 26 pub use apibara_sink_options_derive::SinkOptions; 27 28 pub use apibara_script::ScriptOptions as IndexerOptions; 29 30 #[derive(Debug, Deserialize)] 31 pub struct FullOptionsFromScript<SinkOptions> { 32 #[serde(flatten)] 33 pub connector: OptionsFromScript, 34 #[serde(flatten)] 35 pub sink: SinkOptions, 36 } 37 38 pub async fn run_sink_connector<S>( 39 script: &str, 40 connector_cli_options: OptionsFromCli, 41 sink_cli_options: S::Options, 42 ct: CancellationToken, 43 ) -> Result<(), SinkError> 44 where 45 S: Sink + Send + Sync, 46 { 47 let script_options = connector_cli_options 48 .connector 49 .script 50 .load_environment_variables() 51 .map_err(|err| err.configuration("failed to parse cli options"))? 52 .into_indexer_options(); 53 54 let mut script = load_script(script, script_options) 55 .map_err(|err| err.configuration("failed to load script"))?; 56 57 let options_from_script = script 58 .configuration::<FullOptionsFromScript<S::Options>>() 59 .await 60 .map_err(|err| err.configuration("failed to load configuration from script"))?; 61 62 script 63 .check_transform_is_exported() 64 .await 65 .map_err(|err| err.configuration("missing or invalid transform function"))?; 66 67 // Setup sink. 68 let sink_options = sink_cli_options.merge(options_from_script.sink); 69 let sink = S::from_options(sink_options) 70 .await 71 .map_err(|err| err.configuration("invalid sink options"))?; 72 73 // Setup connector. 74 let connector_options_from_script = options_from_script.connector; 75 let stream_configuration = connector_options_from_script.stream_configuration; 76 let stream_options = connector_cli_options 77 .stream 78 .merge(connector_options_from_script.stream); 79 80 let stream = stream_options 81 .to_stream_configuration() 82 .map_err(|err| err.configuration("invalid stream options"))?; 83 84 let persistence = Persistence::new_from_options(connector_cli_options.connector.persistence); 85 let status_server = connector_cli_options 86 .connector 87 .status_server 88 .to_status_server() 89 .map_err(|err| err.configuration("invalid status server options"))?; 90 91 let sink_connector_options = SinkConnectorOptions { 92 stream, 93 persistence, 94 status_server, 95 }; 96 97 let connector = SinkConnector::new(script, sink, sink_connector_options); 98 99 if let Some(starknet_config) = stream_configuration.as_starknet() { 100 connector 101 .consume_stream::<v1alpha2::Filter, v1alpha2::Block>(starknet_config, ct) 102 .await 103 .attach_printable("error while streaming data")?; 104 } else { 105 todo!() 106 }; 107 108 Ok(()) 109 }