/ operator / src / bin.rs
bin.rs
  1  use std::collections::HashMap;
  2  
  3  use apibara_observability::init_opentelemetry;
  4  use apibara_operator::{
  5      configuration::{Configuration, SinkConfiguration},
  6      controller,
  7      crd::Indexer,
  8      error::OperatorError,
  9  };
 10  use clap::{Args, Parser, Subcommand};
 11  use error_stack::{Result, ResultExt};
 12  use kube::{Client, CustomResourceExt};
 13  use tokio_util::sync::CancellationToken;
 14  
 15  #[derive(Parser, Debug)]
 16  #[command(author, version, about, long_about = None)]
 17  struct Cli {
 18      #[command(subcommand)]
 19      command: Command,
 20  }
 21  
 22  #[derive(Subcommand, Debug)]
 23  enum Command {
 24      /// Generate the operator CRDs and exit.
 25      GenerateCrd(GenerateCrdArgs),
 26      /// Start the operator.
 27      Start(StartArgs),
 28  }
 29  
 30  #[derive(Args, Debug)]
 31  struct GenerateCrdArgs {}
 32  
 33  #[derive(Args, Debug)]
 34  struct StartArgs {
 35      #[clap(flatten)]
 36      pub sink: SinkArgs,
 37      /// Limit the namespace the operator watches.
 38      #[arg(long, env)]
 39      pub namespace: Option<String>,
 40  }
 41  
 42  #[derive(Args, Debug)]
 43  struct SinkArgs {
 44      /// Sink type to image mapping.
 45      ///
 46      /// Values are separated by commas,
 47      /// e.g. `console=quay.io/apibara/sink-console:latest,mongo=quay.io/apibara/sink-mongo:latest`.
 48      #[arg(long, env, value_delimiter = ',')]
 49      pub sink_images: Option<Vec<String>>,
 50  }
 51  
 52  fn generate_crds(_args: GenerateCrdArgs) -> Result<(), OperatorError> {
 53      let crds = [Indexer::crd()]
 54          .iter()
 55          .map(|crd| serde_yaml::to_string(&crd))
 56          .collect::<std::result::Result<Vec<_>, _>>()
 57          .change_context(OperatorError)
 58          .attach_printable("failed to serialize CRD to yaml")?
 59          .join("---\n");
 60      println!("{}", crds);
 61      Ok(())
 62  }
 63  
 64  async fn start(args: StartArgs) -> Result<(), OperatorError> {
 65      let client = Client::try_default()
 66          .await
 67          .change_context(OperatorError)
 68          .attach_printable("failed to build Kubernetes client")?;
 69      let configuration = args
 70          .into_configuration()
 71          .attach_printable("invalid cli arguments")?;
 72      let ct = CancellationToken::new();
 73  
 74      ctrlc::set_handler({
 75          let ct = ct.clone();
 76          move || {
 77              ct.cancel();
 78          }
 79      })
 80      .change_context(OperatorError)
 81      .attach_printable("failed to setup ctrl-c handler")?;
 82  
 83      controller::start(client, configuration, ct)
 84          .await
 85          .change_context(OperatorError)
 86          .attach_printable("error while running operator")?;
 87      Ok(())
 88  }
 89  
 90  #[tokio::main]
 91  async fn main() -> Result<(), OperatorError> {
 92      init_opentelemetry()
 93          .change_context(OperatorError)
 94          .attach_printable("failed to initialize opentelemetry")?;
 95      let args = Cli::parse();
 96  
 97      match args.command {
 98          Command::GenerateCrd(args) => generate_crds(args)?,
 99          Command::Start(args) => start(args).await?,
100      }
101  
102      Ok(())
103  }
104  
105  impl StartArgs {
106      pub fn into_configuration(self) -> Result<Configuration, OperatorError> {
107          let mut configuration = Configuration {
108              namespace: self.namespace,
109              ..Configuration::default()
110          };
111  
112          if let Some(sink_images) = self.sink.sink_images {
113              let mut sinks = HashMap::new();
114              for image_kv in &sink_images {
115                  match image_kv.split_once('=') {
116                      Some((name, image)) if !image.contains('=') => {
117                          sinks.insert(
118                              name.to_string(),
119                              SinkConfiguration {
120                                  image: image.to_string(),
121                              },
122                          );
123                      }
124                      _ => {
125                          return Err(OperatorError)
126                              .attach_printable_lazy(|| {
127                                  format!("invalid sink image mapping: {}", image_kv)
128                              })
129                              .attach_printable("hint: expected format is `type=image`")
130                      }
131                  }
132              }
133  
134              configuration.with_sinks(sinks);
135          }
136  
137          Ok(configuration)
138      }
139  }