/ cli / src / run.rs
run.rs
  1  use std::{io::ErrorKind, process};
  2  
  3  use apibara_sink_common::{load_script, FullOptionsFromScript, ScriptOptions};
  4  use clap::Args;
  5  use colored::*;
  6  use error_stack::{Result, ResultExt};
  7  use serde::Deserialize;
  8  
  9  use crate::{error::CliError, paths::plugins_dir};
 10  
 11  #[derive(Args, Debug)]
 12  #[clap(trailing_var_arg = true, allow_hyphen_values = true)]
 13  pub struct RunArgs {
 14      /// The path to the indexer script.
 15      script: String,
 16      #[clap(flatten)]
 17      transform: ScriptOptions,
 18      /// Arguments forwarded to the indexer.
 19      args: Vec<String>,
 20  }
 21  
 22  #[derive(Deserialize, Debug)]
 23  #[serde(rename_all = "camelCase")]
 24  struct DummyOptions {
 25      pub sink_type: String,
 26  }
 27  
 28  pub async fn run(args: RunArgs) -> Result<(), CliError> {
 29      // While not recommended, the script may return a different sink based on some env variable. We
 30      // need to load the environment variables before loading the script.
 31      let script_options = args
 32          .transform
 33          .load_environment_variables()
 34          .change_context(CliError)
 35          .attach_printable("failed to parse script options")?
 36          .into_indexer_options();
 37  
 38      let mut script = load_script(&args.script, script_options).change_context(CliError)?;
 39  
 40      // Load the configuration from the script, but we don't need the full options yet.
 41      let configuration = script
 42          .configuration::<FullOptionsFromScript<DummyOptions>>()
 43          .await
 44          .change_context(CliError)?;
 45  
 46      // Delegate running the indexer to the sink command.
 47      let sink_type = configuration.sink.sink_type;
 48      let sink_command = get_sink_command(&sink_type);
 49  
 50      // Add back the script/transform arguments if specified.
 51      // TODO: There must be a better way to do this.
 52      let mut extra_args = args.args;
 53      if let Some(allow_env) = args.transform.allow_env {
 54          extra_args.push("--allow-env".to_string());
 55          extra_args.push(allow_env.to_string_lossy().to_string());
 56      };
 57      if let Some(allow_env_from_env) = args.transform.allow_env_from_env {
 58          extra_args.push("--allow-env-from-env".to_string());
 59          extra_args.push(allow_env_from_env.join(",").to_string());
 60      }
 61      if let Some(allow_net) = args.transform.allow_net {
 62          extra_args.push("--allow-net".to_string());
 63          extra_args.push(allow_net.join(",").to_string());
 64      };
 65      if let Some(allow_read) = args.transform.allow_read {
 66          extra_args.push("--allow-read".to_string());
 67          extra_args.push(allow_read.join(",").to_string());
 68      };
 69      if let Some(allow_write) = args.transform.allow_write {
 70          extra_args.push("--allow-write".to_string());
 71          extra_args.push(allow_write.join(",").to_string());
 72      };
 73      if let Some(transform_timeout) = args.transform.script_transform_timeout_seconds {
 74          extra_args.push("--script-transform-timeout-seconds".to_string());
 75          extra_args.push(transform_timeout.to_string());
 76      }
 77      if let Some(load_timeout) = args.transform.script_load_timeout_seconds {
 78          extra_args.push("--script-load-timeout-seconds".to_string());
 79          extra_args.push(load_timeout.to_string());
 80      }
 81  
 82      let command_res = process::Command::new(sink_command)
 83          .arg("run")
 84          .arg(args.script)
 85          .args(extra_args)
 86          .spawn();
 87  
 88      match command_res {
 89          Ok(mut child) => {
 90              child.wait().change_context(CliError)?;
 91              Ok(())
 92          }
 93          Err(err) => {
 94              if let ErrorKind::NotFound = err.kind() {
 95                  eprintln!(
 96                      "{} {} {}",
 97                      "Sink".red(),
 98                      sink_type,
 99                      "is not installed".red()
100                  );
101                  eprintln!(
102                      "Install it with {} or by adding it to your $PATH",
103                      format!("`apibara plugins install sink-{}`", sink_type).green()
104                  );
105                  std::process::exit(1);
106              }
107              Err(err)
108                  .change_context(CliError)
109                  .attach_printable("error while running sink")
110          }
111      }
112  }
113  
114  fn get_sink_command(sink_type: &str) -> String {
115      let dir = plugins_dir();
116      let binary = format!("apibara-sink-{}", sink_type);
117  
118      // If the user hasn't installed the plugin, try to invoke from path.
119      let installed = dir.join(&binary);
120      if installed.exists() {
121          return installed.to_string_lossy().to_string();
122      } else {
123          binary
124      }
125  }