/ sinks / sink-parquet / src / bin.rs
bin.rs
 1  use std::process::ExitCode;
 2  
 3  use apibara_sink_common::{
 4      apibara_cli_style, initialize_sink, run_sink_connector, OptionsFromCli, ReportExt, SinkError,
 5  };
 6  use apibara_sink_parquet::{ParquetSink, SinkParquetOptions};
 7  use clap::{Args, Parser, Subcommand};
 8  use error_stack::Result;
 9  use tokio_util::sync::CancellationToken;
10  
11  #[cfg(not(windows))]
12  #[global_allocator]
13  static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
14  
15  #[derive(Parser, Debug)]
16  #[command(author, version, about, long_about = None, styles = apibara_cli_style())]
17  struct Cli {
18      #[command(subcommand)]
19      subcommand: Command,
20  }
21  
22  #[derive(Subcommand, Debug)]
23  enum Command {
24      Run(RunArgs),
25  }
26  
27  #[derive(Args, Debug)]
28  struct RunArgs {
29      /// The path to the indexer script.
30      script: String,
31      #[command(flatten)]
32      parquet: SinkParquetOptions,
33      #[command(flatten)]
34      common: OptionsFromCli,
35  }
36  
37  #[tokio::main(flavor = "current_thread")]
38  async fn main() -> ExitCode {
39      let args = Cli::parse();
40      run_with_args(args).await.to_exit_code()
41  }
42  
43  async fn run_with_args(args: Cli) -> Result<(), SinkError> {
44      let ct = CancellationToken::new();
45      initialize_sink(ct.clone())?;
46  
47      match args.subcommand {
48          Command::Run(args) => {
49              run_sink_connector::<ParquetSink>(&args.script, args.common, args.parquet, ct).await
50          }
51      }
52  }