bin.rs
1 use std::collections::HashMap; 2 3 use apibara_observability::init_opentelemetry; 4 use apibara_operator::{ 5 configuration::{Configuration, SinkConfiguration}, 6 controller, 7 crd::Indexer, 8 error::OperatorError, 9 }; 10 use clap::{Args, Parser, Subcommand}; 11 use error_stack::{Result, ResultExt}; 12 use kube::{Client, CustomResourceExt}; 13 use tokio_util::sync::CancellationToken; 14 15 #[derive(Parser, Debug)] 16 #[command(author, version, about, long_about = None)] 17 struct Cli { 18 #[command(subcommand)] 19 command: Command, 20 } 21 22 #[derive(Subcommand, Debug)] 23 enum Command { 24 /// Generate the operator CRDs and exit. 25 GenerateCrd(GenerateCrdArgs), 26 /// Start the operator. 27 Start(StartArgs), 28 } 29 30 #[derive(Args, Debug)] 31 struct GenerateCrdArgs {} 32 33 #[derive(Args, Debug)] 34 struct StartArgs { 35 #[clap(flatten)] 36 pub sink: SinkArgs, 37 /// Limit the namespace the operator watches. 38 #[arg(long, env)] 39 pub namespace: Option<String>, 40 } 41 42 #[derive(Args, Debug)] 43 struct SinkArgs { 44 /// Sink type to image mapping. 45 /// 46 /// Values are separated by commas, 47 /// e.g. `console=quay.io/apibara/sink-console:latest,mongo=quay.io/apibara/sink-mongo:latest`. 48 #[arg(long, env, value_delimiter = ',')] 49 pub sink_images: Option<Vec<String>>, 50 } 51 52 fn generate_crds(_args: GenerateCrdArgs) -> Result<(), OperatorError> { 53 let crds = [Indexer::crd()] 54 .iter() 55 .map(|crd| serde_yaml::to_string(&crd)) 56 .collect::<std::result::Result<Vec<_>, _>>() 57 .change_context(OperatorError) 58 .attach_printable("failed to serialize CRD to yaml")? 59 .join("---\n"); 60 println!("{}", crds); 61 Ok(()) 62 } 63 64 async fn start(args: StartArgs) -> Result<(), OperatorError> { 65 let client = Client::try_default() 66 .await 67 .change_context(OperatorError) 68 .attach_printable("failed to build Kubernetes client")?; 69 let configuration = args 70 .into_configuration() 71 .attach_printable("invalid cli arguments")?; 72 let ct = CancellationToken::new(); 73 74 ctrlc::set_handler({ 75 let ct = ct.clone(); 76 move || { 77 ct.cancel(); 78 } 79 }) 80 .change_context(OperatorError) 81 .attach_printable("failed to setup ctrl-c handler")?; 82 83 controller::start(client, configuration, ct) 84 .await 85 .change_context(OperatorError) 86 .attach_printable("error while running operator")?; 87 Ok(()) 88 } 89 90 #[tokio::main] 91 async fn main() -> Result<(), OperatorError> { 92 init_opentelemetry() 93 .change_context(OperatorError) 94 .attach_printable("failed to initialize opentelemetry")?; 95 let args = Cli::parse(); 96 97 match args.command { 98 Command::GenerateCrd(args) => generate_crds(args)?, 99 Command::Start(args) => start(args).await?, 100 } 101 102 Ok(()) 103 } 104 105 impl StartArgs { 106 pub fn into_configuration(self) -> Result<Configuration, OperatorError> { 107 let mut configuration = Configuration { 108 namespace: self.namespace, 109 ..Configuration::default() 110 }; 111 112 if let Some(sink_images) = self.sink.sink_images { 113 let mut sinks = HashMap::new(); 114 for image_kv in &sink_images { 115 match image_kv.split_once('=') { 116 Some((name, image)) if !image.contains('=') => { 117 sinks.insert( 118 name.to_string(), 119 SinkConfiguration { 120 image: image.to_string(), 121 }, 122 ); 123 } 124 _ => { 125 return Err(OperatorError) 126 .attach_printable_lazy(|| { 127 format!("invalid sink image mapping: {}", image_kv) 128 }) 129 .attach_printable("hint: expected format is `type=image`") 130 } 131 } 132 } 133 134 configuration.with_sinks(sinks); 135 } 136 137 Ok(configuration) 138 } 139 }