/ operator / src / controller.rs
controller.rs
 1  use std::fmt::Debug;
 2  
 3  use error_stack::{Result, ResultExt};
 4  use futures::{Future, Stream, StreamExt};
 5  use k8s_openapi::api;
 6  use kube::{
 7      api::ListParams,
 8      core::Resource,
 9      runtime::{
10          controller::{self, Action},
11          reflector::ObjectRef,
12          watcher, Controller,
13      },
14      Api, Client,
15  };
16  use tokio_util::sync::CancellationToken;
17  use tracing::{error, info, warn};
18  
19  use crate::{
20      configuration::Configuration,
21      context::{Context, OperatorError},
22      crd::Indexer,
23      reconcile::{self, ReconcileError},
24  };
25  
26  pub type ReconcileItem<K> =
27      std::result::Result<(ObjectRef<K>, Action), controller::Error<ReconcileError, watcher::Error>>;
28  
29  pub async fn create(
30      client: Client,
31      configuration: Configuration,
32      ct: CancellationToken,
33  ) -> Result<impl Stream<Item = ReconcileItem<Indexer>>, OperatorError> {
34      info!("Creating controller");
35  
36      let namespace = configuration.namespace.clone();
37      let ctx = Context {
38          client,
39          configuration,
40      };
41  
42      let indexers = if let Some(namespace) = &namespace {
43          Api::<Indexer>::namespaced(ctx.client.clone(), namespace)
44      } else {
45          Api::<Indexer>::all(ctx.client.clone())
46      };
47  
48      if indexers.list(&ListParams::default()).await.is_err() {
49          error!("Indexer CRD not installed");
50          return Err(OperatorError).attach_printable("indexer CRD not installed");
51      }
52  
53      info!("CRD installed. Starting controllor loop");
54  
55      let pods = Api::<api::core::v1::Pod>::all(ctx.client.clone());
56  
57      let controller = Controller::new(indexers, watcher::Config::default())
58          .owns(pods, watcher::Config::default())
59          .graceful_shutdown_on(async move {
60              ct.cancelled().await;
61          })
62          .run(
63              reconcile::reconcile_indexer,
64              reconcile::error_policy,
65              ctx.into(),
66          );
67  
68      Ok(controller)
69  }
70  
71  pub async fn start(
72      client: Client,
73      configuration: Configuration,
74      ct: CancellationToken,
75  ) -> Result<(), OperatorError> {
76      let controller = create(client, configuration, ct).await?;
77  
78      run_controller_to_end(controller).await;
79  
80      Ok(())
81  }
82  
83  fn run_controller_to_end<K>(
84      controller_stream: impl Stream<Item = ReconcileItem<K>>,
85  ) -> impl Future<Output = ()>
86  where
87      K: Resource + Debug,
88      <K as Resource>::DynamicType: Debug,
89  {
90      controller_stream.for_each(|res| async move {
91          match res {
92              Ok((obj, action)) => info!(obj = ?obj, action = ?action, "reconcile success"),
93              Err(err) => warn!(err = ?err, "reconcile error"),
94          }
95      })
96  }