controller.rs
1 use std::fmt::Debug; 2 3 use error_stack::{Result, ResultExt}; 4 use futures::{Future, Stream, StreamExt}; 5 use k8s_openapi::api; 6 use kube::{ 7 api::ListParams, 8 core::Resource, 9 runtime::{ 10 controller::{self, Action}, 11 reflector::ObjectRef, 12 watcher, Controller, 13 }, 14 Api, Client, 15 }; 16 use tokio_util::sync::CancellationToken; 17 use tracing::{error, info, warn}; 18 19 use crate::{ 20 configuration::Configuration, 21 context::{Context, OperatorError}, 22 crd::Indexer, 23 reconcile::{self, ReconcileError}, 24 }; 25 26 pub type ReconcileItem<K> = 27 std::result::Result<(ObjectRef<K>, Action), controller::Error<ReconcileError, watcher::Error>>; 28 29 pub async fn create( 30 client: Client, 31 configuration: Configuration, 32 ct: CancellationToken, 33 ) -> Result<impl Stream<Item = ReconcileItem<Indexer>>, OperatorError> { 34 info!("Creating controller"); 35 36 let namespace = configuration.namespace.clone(); 37 let ctx = Context { 38 client, 39 configuration, 40 }; 41 42 let indexers = if let Some(namespace) = &namespace { 43 Api::<Indexer>::namespaced(ctx.client.clone(), namespace) 44 } else { 45 Api::<Indexer>::all(ctx.client.clone()) 46 }; 47 48 if indexers.list(&ListParams::default()).await.is_err() { 49 error!("Indexer CRD not installed"); 50 return Err(OperatorError).attach_printable("indexer CRD not installed"); 51 } 52 53 info!("CRD installed. Starting controllor loop"); 54 55 let pods = Api::<api::core::v1::Pod>::all(ctx.client.clone()); 56 57 let controller = Controller::new(indexers, watcher::Config::default()) 58 .owns(pods, watcher::Config::default()) 59 .graceful_shutdown_on(async move { 60 ct.cancelled().await; 61 }) 62 .run( 63 reconcile::reconcile_indexer, 64 reconcile::error_policy, 65 ctx.into(), 66 ); 67 68 Ok(controller) 69 } 70 71 pub async fn start( 72 client: Client, 73 configuration: Configuration, 74 ct: CancellationToken, 75 ) -> Result<(), OperatorError> { 76 let controller = create(client, configuration, ct).await?; 77 78 run_controller_to_end(controller).await; 79 80 Ok(()) 81 } 82 83 fn run_controller_to_end<K>( 84 controller_stream: impl Stream<Item = ReconcileItem<K>>, 85 ) -> impl Future<Output = ()> 86 where 87 K: Resource + Debug, 88 <K as Resource>::DynamicType: Debug, 89 { 90 controller_stream.for_each(|res| async move { 91 match res { 92 Ok((obj, action)) => info!(obj = ?obj, action = ?action, "reconcile success"), 93 Err(err) => warn!(err = ?err, "reconcile error"), 94 } 95 }) 96 }