run.rs
1 use std::{io::ErrorKind, process}; 2 3 use apibara_sink_common::{load_script, FullOptionsFromScript, ScriptOptions}; 4 use clap::Args; 5 use colored::*; 6 use error_stack::{Result, ResultExt}; 7 use serde::Deserialize; 8 9 use crate::{error::CliError, paths::plugins_dir}; 10 11 #[derive(Args, Debug)] 12 #[clap(trailing_var_arg = true, allow_hyphen_values = true)] 13 pub struct RunArgs { 14 /// The path to the indexer script. 15 script: String, 16 #[clap(flatten)] 17 transform: ScriptOptions, 18 /// Arguments forwarded to the indexer. 19 args: Vec<String>, 20 } 21 22 #[derive(Deserialize, Debug)] 23 #[serde(rename_all = "camelCase")] 24 struct DummyOptions { 25 pub sink_type: String, 26 } 27 28 pub async fn run(args: RunArgs) -> Result<(), CliError> { 29 // While not recommended, the script may return a different sink based on some env variable. We 30 // need to load the environment variables before loading the script. 31 let script_options = args 32 .transform 33 .load_environment_variables() 34 .change_context(CliError) 35 .attach_printable("failed to parse script options")? 36 .into_indexer_options(); 37 38 let mut script = load_script(&args.script, script_options).change_context(CliError)?; 39 40 // Load the configuration from the script, but we don't need the full options yet. 41 let configuration = script 42 .configuration::<FullOptionsFromScript<DummyOptions>>() 43 .await 44 .change_context(CliError)?; 45 46 // Delegate running the indexer to the sink command. 47 let sink_type = configuration.sink.sink_type; 48 let sink_command = get_sink_command(&sink_type); 49 50 // Add back the script/transform arguments if specified. 51 // TODO: There must be a better way to do this. 52 let mut extra_args = args.args; 53 if let Some(allow_env) = args.transform.allow_env { 54 extra_args.push("--allow-env".to_string()); 55 extra_args.push(allow_env.to_string_lossy().to_string()); 56 }; 57 if let Some(allow_env_from_env) = args.transform.allow_env_from_env { 58 extra_args.push("--allow-env-from-env".to_string()); 59 extra_args.push(allow_env_from_env.join(",").to_string()); 60 } 61 if let Some(allow_net) = args.transform.allow_net { 62 extra_args.push("--allow-net".to_string()); 63 extra_args.push(allow_net.join(",").to_string()); 64 }; 65 if let Some(allow_read) = args.transform.allow_read { 66 extra_args.push("--allow-read".to_string()); 67 extra_args.push(allow_read.join(",").to_string()); 68 }; 69 if let Some(allow_write) = args.transform.allow_write { 70 extra_args.push("--allow-write".to_string()); 71 extra_args.push(allow_write.join(",").to_string()); 72 }; 73 if let Some(transform_timeout) = args.transform.script_transform_timeout_seconds { 74 extra_args.push("--script-transform-timeout-seconds".to_string()); 75 extra_args.push(transform_timeout.to_string()); 76 } 77 if let Some(load_timeout) = args.transform.script_load_timeout_seconds { 78 extra_args.push("--script-load-timeout-seconds".to_string()); 79 extra_args.push(load_timeout.to_string()); 80 } 81 82 let command_res = process::Command::new(sink_command) 83 .arg("run") 84 .arg(args.script) 85 .args(extra_args) 86 .spawn(); 87 88 match command_res { 89 Ok(mut child) => { 90 child.wait().change_context(CliError)?; 91 Ok(()) 92 } 93 Err(err) => { 94 if let ErrorKind::NotFound = err.kind() { 95 eprintln!( 96 "{} {} {}", 97 "Sink".red(), 98 sink_type, 99 "is not installed".red() 100 ); 101 eprintln!( 102 "Install it with {} or by adding it to your $PATH", 103 format!("`apibara plugins install sink-{}`", sink_type).green() 104 ); 105 std::process::exit(1); 106 } 107 Err(err) 108 .change_context(CliError) 109 .attach_printable("error while running sink") 110 } 111 } 112 } 113 114 fn get_sink_command(sink_type: &str) -> String { 115 let dir = plugins_dir(); 116 let binary = format!("apibara-sink-{}", sink_type); 117 118 // If the user hasn't installed the plugin, try to invoke from path. 119 let installed = dir.join(&binary); 120 if installed.exists() { 121 return installed.to_string_lossy().to_string(); 122 } else { 123 binary 124 } 125 }