/ operator / src / reconcile.rs
reconcile.rs
  1  use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration};
  2  
  3  use error_stack::{Report, Result, ResultExt};
  4  use k8s_openapi::{
  5      api::{self, core::v1::ServiceSpec},
  6      apimachinery::pkg::apis::meta::{self, v1::Condition},
  7      chrono::{DateTime, Utc},
  8      Metadata,
  9  };
 10  use kube::{
 11      api::{DeleteParams, Patch, PatchParams},
 12      core::Resource,
 13      runtime::{controller::Action, finalizer},
 14      Api, ResourceExt,
 15  };
 16  use serde_json::json;
 17  use tracing::{info, instrument, warn};
 18  
 19  use crate::{
 20      context::{Context, OperatorError},
 21      crd::{Indexer, IndexerSource, IndexerStatus, SinkType},
 22  };
 23  
 24  static INDEXER_FINALIZER: &str = "indexer.apibara.com";
 25  static GIT_CLONE_IMAGE: &str = "docker.io/alpine/git:latest";
 26  
 27  pub struct ReconcileError(Report<OperatorError>);
 28  
 29  impl Indexer {
 30      #[instrument(skip_all)]
 31      async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action, OperatorError> {
 32          use api::core::v1::{Pod, Service};
 33  
 34          let name = self.name_any();
 35  
 36          let ns = self
 37              .namespace()
 38              .ok_or(OperatorError)
 39              .attach_printable("failed to get namespace")
 40              .attach_printable_lazy(|| format!("indexer: {name}"))?;
 41  
 42          let indexers: Api<Indexer> = Api::namespaced(ctx.client.clone(), &ns);
 43          let pods: Api<Pod> = Api::namespaced(ctx.client.clone(), &ns);
 44          let services: Api<Service> = Api::namespaced(ctx.client.clone(), &ns);
 45  
 46          // Check if the indexer needs to be restarted.
 47          let (restart_increment, error_condition) = if let Some(pod_name) = self.instance_name() {
 48              self.maybe_delete_pod(pod_name, &pods).await?
 49          } else {
 50              (0, None)
 51          };
 52  
 53          let metadata = self.object_metadata(&ctx);
 54  
 55          let mut phase = if error_condition.is_some() {
 56              "Error".to_string()
 57          } else {
 58              "Running".to_string()
 59          };
 60  
 61          let mut conditions = Vec::default();
 62          let mut pod_created = None;
 63          let mut instance_name = None;
 64          let mut status_service_name = None;
 65  
 66          match self.pod_and_status_svc_spec(&ctx) {
 67              None => {
 68                  let pod_scheduled_condition = Condition {
 69                      last_transition_time: self
 70                          .meta()
 71                          .creation_timestamp
 72                          .clone()
 73                          .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)),
 74                      type_: "PodNotScheduled".to_string(),
 75                      message: "The specified indexer type doesn't exist.".to_string(),
 76                      observed_generation: self.meta().generation,
 77                      reason: "ConfigurationError".to_string(),
 78                      status: "False".to_string(),
 79                  };
 80  
 81                  conditions.push(pod_scheduled_condition);
 82                  phase = "Error".to_string();
 83              }
 84              Some((spec, svc_spec)) => {
 85                  let svc_name = self.status_service_name();
 86                  let svc_metadata = kube::core::ObjectMeta {
 87                      name: svc_name.clone().into(),
 88                      ..metadata.clone()
 89                  };
 90  
 91                  let pod_manifest = Pod {
 92                      metadata,
 93                      spec: Some(spec),
 94                      ..Pod::default()
 95                  };
 96  
 97                  let pod = pods
 98                      .patch(
 99                          &name,
100                          &PatchParams::apply("indexer"),
101                          &Patch::Apply(pod_manifest),
102                      )
103                      .await
104                      .change_context(OperatorError)
105                      .attach_printable("failed to update pod")
106                      .attach_printable_lazy(|| format!("indexer: {name}"))?;
107  
108                  let svc_manifest = Service {
109                      metadata: svc_metadata,
110                      spec: Some(svc_spec),
111                      ..Service::default()
112                  };
113  
114                  let service = services
115                      .patch(
116                          &svc_name,
117                          &PatchParams::apply("indexer"),
118                          &Patch::Apply(svc_manifest),
119                      )
120                      .await
121                      .change_context(OperatorError)
122                      .attach_printable("failed to update service")
123                      .attach_printable_lazy(|| format!("indexer: {name}"))?;
124  
125                  let pod_scheduled_condition = Condition {
126                      last_transition_time: pod
127                          .meta()
128                          .creation_timestamp
129                          .clone()
130                          .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)),
131                      type_: "PodScheduled".to_string(),
132                      message: "Pod has been scheduled".to_string(),
133                      observed_generation: self.meta().generation,
134                      reason: "PodScheduled".to_string(),
135                      status: "True".to_string(),
136                  };
137  
138                  let status_service_created = Condition {
139                      last_transition_time: service
140                          .meta()
141                          .creation_timestamp
142                          .clone()
143                          .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)),
144                      type_: "StatusServiceCreated".to_string(),
145                      message: "Status service created".to_string(),
146                      observed_generation: self.meta().generation,
147                      reason: "StatusServiceCreated".to_string(),
148                      status: "True".to_string(),
149                  };
150  
151                  conditions.push(pod_scheduled_condition);
152                  conditions.push(status_service_created);
153                  pod_created = pod.meta().creation_timestamp.clone();
154                  instance_name = pod.metadata().name.clone();
155                  status_service_name = service.metadata().name.clone();
156              }
157          }
158  
159          if let Some(error_condition) = error_condition {
160              conditions.push(error_condition);
161          }
162  
163          let restart_count = self
164              .status
165              .as_ref()
166              .map(|status| status.restart_count.unwrap_or_default() + restart_increment);
167  
168          let status = json!({
169              "status": IndexerStatus {
170                  pod_created,
171                  instance_name,
172                  status_service_name,
173                  phase: Some(phase),
174                  conditions: Some(conditions),
175                  restart_count,
176              }
177          });
178  
179          indexers
180              .patch_status(&name, &PatchParams::default(), &Patch::Merge(&status))
181              .await
182              .change_context(OperatorError)
183              .attach_printable("failed to update status")
184              .attach_printable_lazy(|| format!("indexer: {name}"))?;
185  
186          Ok(Action::requeue(Duration::from_secs(10)))
187      }
188  
189      #[instrument(skip_all)]
190      async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action, OperatorError> {
191          use api::core::v1::{Pod, Service};
192  
193          let name = self.name_any();
194          let svc_name = self.status_service_name();
195  
196          let ns = self
197              .namespace()
198              .ok_or(OperatorError)
199              .attach_printable("failed to get namespace")
200              .attach_printable_lazy(|| format!("indexer: {name}"))?;
201  
202          let pods: Api<Pod> = Api::namespaced(ctx.client.clone(), &ns);
203          let services: Api<Service> = Api::namespaced(ctx.client.clone(), &ns);
204  
205          if let Some(_existing) = pods
206              .get_opt(&name)
207              .await
208              .change_context(OperatorError)
209              .attach_printable("failed to get pod")
210              .attach_printable_lazy(|| format!("indexer: {name}"))?
211          {
212              pods.delete(&name, &DeleteParams::default())
213                  .await
214                  .change_context(OperatorError)
215                  .attach_printable("failed to delete pod")
216                  .attach_printable_lazy(|| format!("indexer: {name}"))?;
217          }
218  
219          if let Some(_existing) = services
220              .get_opt(&svc_name)
221              .await
222              .change_context(OperatorError)
223              .attach_printable("failed to get service")
224              .attach_printable_lazy(|| format!("indexer: {name}"))?
225          {
226              services
227                  .delete(&svc_name, &DeleteParams::default())
228                  .await
229                  .change_context(OperatorError)
230                  .attach_printable("failed to delete service")
231                  .attach_printable_lazy(|| format!("indexer: {name}"))?;
232          }
233  
234          Ok(Action::requeue(Duration::from_secs(10)))
235      }
236  
237      fn instance_name(&self) -> Option<&str> {
238          self.status
239              .as_ref()
240              .and_then(|status| status.instance_name.as_ref())
241              .map(|name| name.as_str())
242      }
243  
244      /// Check the pod status and delete it if it has failed.
245      ///
246      /// The pod is deleted so that it can be recreated once the
247      /// status of the indexer has been updated.
248      async fn maybe_delete_pod(
249          &self,
250          pod_name: &str,
251          pods: &Api<api::core::v1::Pod>,
252      ) -> Result<(i32, Option<Condition>), OperatorError> {
253          let Some(pod) = pods
254              .get_opt(pod_name)
255              .await
256              .change_context(OperatorError)
257              .attach_printable("failed to get pod")
258              .attach_printable_lazy(|| format!("indexer: {pod_name}"))?
259          else {
260              return Ok((0, None));
261          };
262  
263          let container_status = pod.status.as_ref().and_then(|status| {
264              status
265                  .container_statuses
266                  .as_ref()
267                  .and_then(|statuses| statuses.iter().find(|c| c.name == "sink"))
268          });
269  
270          let Some(terminated) = container_status
271              .and_then(|cs| cs.state.as_ref())
272              .and_then(|st| st.terminated.clone())
273          else {
274              return Ok((0, None));
275          };
276  
277          let exit_code = terminated.exit_code;
278  
279          // Exit code from sysexits.h
280          // 75 = temporary failure
281          //
282          // Don't restart the container if it's a non temporary error.
283          let is_temporary_error = exit_code == 0 || exit_code == 75;
284          if is_temporary_error {
285              let should_delete = match &terminated.finished_at {
286                  None => true,
287                  Some(finished_at) => {
288                      let elapsed = (Utc::now().time() - finished_at.0.time())
289                          .to_std()
290                          .unwrap_or_default();
291                      elapsed > Duration::from_secs(60)
292                  }
293              };
294  
295              if should_delete {
296                  info!(pod = %pod.name_any(), "deleting pod for restart");
297  
298                  pods.delete(pod_name, &Default::default())
299                      .await
300                      .change_context(OperatorError)
301                      .attach_printable("failed to delete pod")
302                      .attach_printable_lazy(|| format!("indexer: {pod_name}"))?;
303  
304                  return Ok((1, None));
305              }
306          }
307  
308          let last_transition_time = terminated
309              .finished_at
310              .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC));
311  
312          let reason = if is_temporary_error {
313              "temporary".to_string()
314          } else {
315              "fatal".to_string()
316          };
317          let error_condition = Condition {
318              last_transition_time,
319              type_: "PodTerminated".to_string(),
320              message: format!("Pod has been terminated ({reason})"),
321              observed_generation: self.meta().generation,
322              reason: "PodTerminated".to_string(),
323              status: "False".to_string(),
324          };
325  
326          Ok((0, Some(error_condition)))
327      }
328  
329      fn status_service_name(&self) -> String {
330          self.name_any() + "-status"
331      }
332  
333      fn object_metadata(&self, ctx: &Arc<Context>) -> meta::v1::ObjectMeta {
334          use meta::v1::ObjectMeta;
335  
336          ObjectMeta {
337              name: self.metadata.name.clone(),
338              labels: self.pod_labels(ctx).into(),
339              ..ObjectMeta::default()
340          }
341      }
342  
343      fn pod_labels(&self, _ctx: &Arc<Context>) -> BTreeMap<String, String> {
344          let name = self.name_any();
345          BTreeMap::from([
346              ("app.kubernetes.io/name".to_string(), name.clone()),
347              ("app.kubernetes.io/instance".to_string(), name),
348          ])
349      }
350  
351      fn pod_and_status_svc_spec(
352          &self,
353          ctx: &Arc<Context>,
354      ) -> Option<(api::core::v1::PodSpec, api::core::v1::ServiceSpec)> {
355          use api::core::v1::{PodSpec, ServicePort};
356  
357          // Initialize volume, volume mounts, and env vars.
358          let mut volumes = Vec::new();
359          let mut volume_mounts = Vec::new();
360          let env = self.spec.env.clone().unwrap_or_default();
361  
362          if let Some(volumes_from_config) = self.spec.volumes.as_ref() {
363              for volume in volumes_from_config {
364                  volumes.push(volume.volume.clone());
365                  volume_mounts.push(volume.volume_mount.clone());
366              }
367          }
368  
369          let mut init_containers = Vec::new();
370          let mut containers = Vec::new();
371  
372          let (init_container, workdir) =
373              self.source_container(&mut volumes, &mut volume_mounts, &env, ctx);
374  
375          if let Some(init_container) = init_container {
376              init_containers.push(init_container);
377          }
378  
379          let container = self.sink_container(&workdir, &volume_mounts, &env, ctx)?;
380          containers.push(container);
381  
382          let pod_spec = PodSpec {
383              init_containers: Some(init_containers),
384              containers,
385              volumes: Some(volumes),
386              restart_policy: Some("Never".to_string()),
387              ..PodSpec::default()
388          };
389  
390          let svc_spec = ServiceSpec {
391              selector: self.pod_labels(ctx).into(),
392              ports: Some(vec![ServicePort {
393                  port: ctx.configuration.status_port,
394                  ..ServicePort::default()
395              }]),
396              ..ServiceSpec::default()
397          };
398  
399          Some((pod_spec, svc_spec))
400      }
401  
402      fn source_container(
403          &self,
404          volumes: &mut Vec<api::core::v1::Volume>,
405          volume_mounts: &mut Vec<api::core::v1::VolumeMount>,
406          env: &[api::core::v1::EnvVar],
407          _ctx: &Arc<Context>,
408      ) -> (Option<api::core::v1::Container>, String) {
409          use api::core::v1::{Container, Volume, VolumeMount};
410  
411          match &self.spec.source {
412              IndexerSource::GitHub(github) => {
413                  // This init container clones the indexer source code from GitHub.
414                  // It's a 5 step process:
415                  //
416                  // 1. Clone the repository, maybe authenticating with GitHub.
417                  // 2. Clean files.
418                  // 3. Fetch the specified revision.
419                  // 4. Checkout the specified revision.
420                  // 5. Clean files again.
421                  let mut args = "cd /code".to_string();
422  
423                  let clone_args = github
424                      .git_clone_flags
425                      .clone()
426                      .unwrap_or_else(|| "-v".to_string());
427  
428                  let url = if let Some(token) = &github.access_token_env_var {
429                      format!(
430                          "https://x-access-token:$({})@github.com/{}/{}.git",
431                          token, github.owner, github.repo
432                      )
433                  } else {
434                      format!("https://github.com/{}/{}.git", github.owner, github.repo)
435                  };
436  
437                  args.push_str(&format!("&& git clone {clone_args} -- {url} ."));
438  
439                  let clean_flags = github
440                      .git_clean_flags
441                      .clone()
442                      .unwrap_or_else(|| "-ffxdq".to_string());
443  
444                  args.push_str(&format!("&& git clean {clean_flags}"));
445                  args.push_str(&format!(
446                      "&& git fetch -v --prune --tags -- origin {}",
447                      github.revision
448                  ));
449                  args.push_str(&format!("&& git checkout -f {}", github.revision));
450                  args.push_str(&format!("&& git clean {clean_flags}"));
451  
452                  // Create an emptyDir volume where the source code will be cloned.
453                  volumes.push(Volume {
454                      name: "code".to_string(),
455                      empty_dir: Some(Default::default()),
456                      ..Volume::default()
457                  });
458  
459                  volume_mounts.push(VolumeMount {
460                      name: "code".to_string(),
461                      mount_path: "/code".to_string(),
462                      ..VolumeMount::default()
463                  });
464  
465                  let workdir = match github.subpath {
466                      None => "/code".to_string(),
467                      Some(ref subpath) => format!("/code/{}", subpath),
468                  };
469  
470                  let container = Container {
471                      name: "clone-github-repo".to_string(),
472                      image: Some(GIT_CLONE_IMAGE.to_string()),
473                      command: Some(vec!["/bin/sh".to_string()]),
474                      args: Some(vec!["-c".to_string(), args]),
475                      volume_mounts: Some(volume_mounts.clone()),
476                      env: Some(env.to_owned()),
477                      ..Container::default()
478                  };
479  
480                  (Some(container), workdir)
481              }
482              IndexerSource::Volume(fs) => (None, fs.path.clone()),
483          }
484      }
485  
486      fn sink_container(
487          &self,
488          workdir: &str,
489          volume_mounts: &[api::core::v1::VolumeMount],
490          env: &[api::core::v1::EnvVar],
491          ctx: &Arc<Context>,
492      ) -> Option<api::core::v1::Container> {
493          use api::core::v1::Container;
494  
495          let image = match &self.spec.sink.sink {
496              SinkType::Type { r#type } => {
497                  let Some(config) = ctx.configuration.sinks.get(r#type) else {
498                      return None;
499                  };
500                  config.image.clone()
501              }
502              SinkType::Image { image } => image.clone(),
503          };
504  
505          let script = self.spec.sink.script.clone();
506  
507          let port = ctx.configuration.status_port;
508          let mut args = vec![
509              "run".to_string(),
510              script,
511              "--status-server-address".to_string(),
512              format!("0.0.0.0:{port}"),
513          ];
514          use api::core::v1::ContainerPort;
515  
516          if let Some(extra_args) = &self.spec.sink.args {
517              args.extend_from_slice(extra_args);
518          }
519  
520          let container = Container {
521              name: "sink".to_string(),
522              image: Some(image),
523              args: Some(args),
524              volume_mounts: Some(volume_mounts.to_owned()),
525              env: Some(env.to_owned()),
526              working_dir: Some(workdir.to_string()),
527              ports: Some(vec![ContainerPort {
528                  container_port: port,
529                  name: Some("status".to_string()),
530                  ..ContainerPort::default()
531              }]),
532              ..Container::default()
533          };
534  
535          Some(container)
536      }
537  }
538  
539  pub async fn reconcile_indexer(
540      indexer: Arc<Indexer>,
541      ctx: Arc<Context>,
542  ) -> std::result::Result<Action, ReconcileError> {
543      reconcile_indexer_impl(indexer, ctx)
544          .await
545          .map_err(ReconcileError)
546  }
547  
548  async fn reconcile_indexer_impl(
549      indexer: Arc<Indexer>,
550      ctx: Arc<Context>,
551  ) -> Result<Action, OperatorError> {
552      let ns = indexer
553          .namespace()
554          .ok_or(OperatorError)
555          .attach_printable("failed to get namespace")?;
556  
557      let indexers: Api<Indexer> = Api::namespaced(ctx.client.clone(), &ns);
558  
559      info!(
560          indexer = %indexer.name_any(),
561          namespace = %ns,
562          "reconcile indexer"
563      );
564  
565      finalizer::finalizer(&indexers, INDEXER_FINALIZER, indexer, |event| async {
566          use finalizer::Event::*;
567          match event {
568              Apply(indexer) => indexer
569                  .reconcile(ctx.clone())
570                  .await
571                  .map_err(|err| err.into_error()),
572              Cleanup(indexer) => indexer
573                  .cleanup(ctx.clone())
574                  .await
575                  .map_err(|err| err.into_error()),
576          }
577      })
578      .await
579      .change_context(OperatorError)
580      .attach_printable("finalizer operation failed")
581  }
582  
583  pub fn error_policy(_indexer: Arc<Indexer>, err: &ReconcileError, _ctx: Arc<Context>) -> Action {
584      warn!(err = ?err, "Error reconciling indexer");
585  
586      Action::requeue(Duration::from_secs(30))
587  }
588  
589  impl fmt::Debug for ReconcileError {
590      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591          fmt::Debug::fmt(&self.0, f)
592      }
593  }
594  
595  impl fmt::Display for ReconcileError {
596      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597          fmt::Display::fmt(&self.0, f)
598      }
599  }
600  
601  impl std::error::Error for ReconcileError {}