reconcile.rs
1 use std::{collections::BTreeMap, fmt, sync::Arc, time::Duration}; 2 3 use error_stack::{Report, Result, ResultExt}; 4 use k8s_openapi::{ 5 api::{self, core::v1::ServiceSpec}, 6 apimachinery::pkg::apis::meta::{self, v1::Condition}, 7 chrono::{DateTime, Utc}, 8 Metadata, 9 }; 10 use kube::{ 11 api::{DeleteParams, Patch, PatchParams}, 12 core::Resource, 13 runtime::{controller::Action, finalizer}, 14 Api, ResourceExt, 15 }; 16 use serde_json::json; 17 use tracing::{info, instrument, warn}; 18 19 use crate::{ 20 context::{Context, OperatorError}, 21 crd::{Indexer, IndexerSource, IndexerStatus, SinkType}, 22 }; 23 24 static INDEXER_FINALIZER: &str = "indexer.apibara.com"; 25 static GIT_CLONE_IMAGE: &str = "docker.io/alpine/git:latest"; 26 27 pub struct ReconcileError(Report<OperatorError>); 28 29 impl Indexer { 30 #[instrument(skip_all)] 31 async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action, OperatorError> { 32 use api::core::v1::{Pod, Service}; 33 34 let name = self.name_any(); 35 36 let ns = self 37 .namespace() 38 .ok_or(OperatorError) 39 .attach_printable("failed to get namespace") 40 .attach_printable_lazy(|| format!("indexer: {name}"))?; 41 42 let indexers: Api<Indexer> = Api::namespaced(ctx.client.clone(), &ns); 43 let pods: Api<Pod> = Api::namespaced(ctx.client.clone(), &ns); 44 let services: Api<Service> = Api::namespaced(ctx.client.clone(), &ns); 45 46 // Check if the indexer needs to be restarted. 47 let (restart_increment, error_condition) = if let Some(pod_name) = self.instance_name() { 48 self.maybe_delete_pod(pod_name, &pods).await? 49 } else { 50 (0, None) 51 }; 52 53 let metadata = self.object_metadata(&ctx); 54 55 let mut phase = if error_condition.is_some() { 56 "Error".to_string() 57 } else { 58 "Running".to_string() 59 }; 60 61 let mut conditions = Vec::default(); 62 let mut pod_created = None; 63 let mut instance_name = None; 64 let mut status_service_name = None; 65 66 match self.pod_and_status_svc_spec(&ctx) { 67 None => { 68 let pod_scheduled_condition = Condition { 69 last_transition_time: self 70 .meta() 71 .creation_timestamp 72 .clone() 73 .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)), 74 type_: "PodNotScheduled".to_string(), 75 message: "The specified indexer type doesn't exist.".to_string(), 76 observed_generation: self.meta().generation, 77 reason: "ConfigurationError".to_string(), 78 status: "False".to_string(), 79 }; 80 81 conditions.push(pod_scheduled_condition); 82 phase = "Error".to_string(); 83 } 84 Some((spec, svc_spec)) => { 85 let svc_name = self.status_service_name(); 86 let svc_metadata = kube::core::ObjectMeta { 87 name: svc_name.clone().into(), 88 ..metadata.clone() 89 }; 90 91 let pod_manifest = Pod { 92 metadata, 93 spec: Some(spec), 94 ..Pod::default() 95 }; 96 97 let pod = pods 98 .patch( 99 &name, 100 &PatchParams::apply("indexer"), 101 &Patch::Apply(pod_manifest), 102 ) 103 .await 104 .change_context(OperatorError) 105 .attach_printable("failed to update pod") 106 .attach_printable_lazy(|| format!("indexer: {name}"))?; 107 108 let svc_manifest = Service { 109 metadata: svc_metadata, 110 spec: Some(svc_spec), 111 ..Service::default() 112 }; 113 114 let service = services 115 .patch( 116 &svc_name, 117 &PatchParams::apply("indexer"), 118 &Patch::Apply(svc_manifest), 119 ) 120 .await 121 .change_context(OperatorError) 122 .attach_printable("failed to update service") 123 .attach_printable_lazy(|| format!("indexer: {name}"))?; 124 125 let pod_scheduled_condition = Condition { 126 last_transition_time: pod 127 .meta() 128 .creation_timestamp 129 .clone() 130 .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)), 131 type_: "PodScheduled".to_string(), 132 message: "Pod has been scheduled".to_string(), 133 observed_generation: self.meta().generation, 134 reason: "PodScheduled".to_string(), 135 status: "True".to_string(), 136 }; 137 138 let status_service_created = Condition { 139 last_transition_time: service 140 .meta() 141 .creation_timestamp 142 .clone() 143 .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)), 144 type_: "StatusServiceCreated".to_string(), 145 message: "Status service created".to_string(), 146 observed_generation: self.meta().generation, 147 reason: "StatusServiceCreated".to_string(), 148 status: "True".to_string(), 149 }; 150 151 conditions.push(pod_scheduled_condition); 152 conditions.push(status_service_created); 153 pod_created = pod.meta().creation_timestamp.clone(); 154 instance_name = pod.metadata().name.clone(); 155 status_service_name = service.metadata().name.clone(); 156 } 157 } 158 159 if let Some(error_condition) = error_condition { 160 conditions.push(error_condition); 161 } 162 163 let restart_count = self 164 .status 165 .as_ref() 166 .map(|status| status.restart_count.unwrap_or_default() + restart_increment); 167 168 let status = json!({ 169 "status": IndexerStatus { 170 pod_created, 171 instance_name, 172 status_service_name, 173 phase: Some(phase), 174 conditions: Some(conditions), 175 restart_count, 176 } 177 }); 178 179 indexers 180 .patch_status(&name, &PatchParams::default(), &Patch::Merge(&status)) 181 .await 182 .change_context(OperatorError) 183 .attach_printable("failed to update status") 184 .attach_printable_lazy(|| format!("indexer: {name}"))?; 185 186 Ok(Action::requeue(Duration::from_secs(10))) 187 } 188 189 #[instrument(skip_all)] 190 async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action, OperatorError> { 191 use api::core::v1::{Pod, Service}; 192 193 let name = self.name_any(); 194 let svc_name = self.status_service_name(); 195 196 let ns = self 197 .namespace() 198 .ok_or(OperatorError) 199 .attach_printable("failed to get namespace") 200 .attach_printable_lazy(|| format!("indexer: {name}"))?; 201 202 let pods: Api<Pod> = Api::namespaced(ctx.client.clone(), &ns); 203 let services: Api<Service> = Api::namespaced(ctx.client.clone(), &ns); 204 205 if let Some(_existing) = pods 206 .get_opt(&name) 207 .await 208 .change_context(OperatorError) 209 .attach_printable("failed to get pod") 210 .attach_printable_lazy(|| format!("indexer: {name}"))? 211 { 212 pods.delete(&name, &DeleteParams::default()) 213 .await 214 .change_context(OperatorError) 215 .attach_printable("failed to delete pod") 216 .attach_printable_lazy(|| format!("indexer: {name}"))?; 217 } 218 219 if let Some(_existing) = services 220 .get_opt(&svc_name) 221 .await 222 .change_context(OperatorError) 223 .attach_printable("failed to get service") 224 .attach_printable_lazy(|| format!("indexer: {name}"))? 225 { 226 services 227 .delete(&svc_name, &DeleteParams::default()) 228 .await 229 .change_context(OperatorError) 230 .attach_printable("failed to delete service") 231 .attach_printable_lazy(|| format!("indexer: {name}"))?; 232 } 233 234 Ok(Action::requeue(Duration::from_secs(10))) 235 } 236 237 fn instance_name(&self) -> Option<&str> { 238 self.status 239 .as_ref() 240 .and_then(|status| status.instance_name.as_ref()) 241 .map(|name| name.as_str()) 242 } 243 244 /// Check the pod status and delete it if it has failed. 245 /// 246 /// The pod is deleted so that it can be recreated once the 247 /// status of the indexer has been updated. 248 async fn maybe_delete_pod( 249 &self, 250 pod_name: &str, 251 pods: &Api<api::core::v1::Pod>, 252 ) -> Result<(i32, Option<Condition>), OperatorError> { 253 let Some(pod) = pods 254 .get_opt(pod_name) 255 .await 256 .change_context(OperatorError) 257 .attach_printable("failed to get pod") 258 .attach_printable_lazy(|| format!("indexer: {pod_name}"))? 259 else { 260 return Ok((0, None)); 261 }; 262 263 let container_status = pod.status.as_ref().and_then(|status| { 264 status 265 .container_statuses 266 .as_ref() 267 .and_then(|statuses| statuses.iter().find(|c| c.name == "sink")) 268 }); 269 270 let Some(terminated) = container_status 271 .and_then(|cs| cs.state.as_ref()) 272 .and_then(|st| st.terminated.clone()) 273 else { 274 return Ok((0, None)); 275 }; 276 277 let exit_code = terminated.exit_code; 278 279 // Exit code from sysexits.h 280 // 75 = temporary failure 281 // 282 // Don't restart the container if it's a non temporary error. 283 let is_temporary_error = exit_code == 0 || exit_code == 75; 284 if is_temporary_error { 285 let should_delete = match &terminated.finished_at { 286 None => true, 287 Some(finished_at) => { 288 let elapsed = (Utc::now().time() - finished_at.0.time()) 289 .to_std() 290 .unwrap_or_default(); 291 elapsed > Duration::from_secs(60) 292 } 293 }; 294 295 if should_delete { 296 info!(pod = %pod.name_any(), "deleting pod for restart"); 297 298 pods.delete(pod_name, &Default::default()) 299 .await 300 .change_context(OperatorError) 301 .attach_printable("failed to delete pod") 302 .attach_printable_lazy(|| format!("indexer: {pod_name}"))?; 303 304 return Ok((1, None)); 305 } 306 } 307 308 let last_transition_time = terminated 309 .finished_at 310 .unwrap_or(meta::v1::Time(DateTime::<Utc>::MIN_UTC)); 311 312 let reason = if is_temporary_error { 313 "temporary".to_string() 314 } else { 315 "fatal".to_string() 316 }; 317 let error_condition = Condition { 318 last_transition_time, 319 type_: "PodTerminated".to_string(), 320 message: format!("Pod has been terminated ({reason})"), 321 observed_generation: self.meta().generation, 322 reason: "PodTerminated".to_string(), 323 status: "False".to_string(), 324 }; 325 326 Ok((0, Some(error_condition))) 327 } 328 329 fn status_service_name(&self) -> String { 330 self.name_any() + "-status" 331 } 332 333 fn object_metadata(&self, ctx: &Arc<Context>) -> meta::v1::ObjectMeta { 334 use meta::v1::ObjectMeta; 335 336 ObjectMeta { 337 name: self.metadata.name.clone(), 338 labels: self.pod_labels(ctx).into(), 339 ..ObjectMeta::default() 340 } 341 } 342 343 fn pod_labels(&self, _ctx: &Arc<Context>) -> BTreeMap<String, String> { 344 let name = self.name_any(); 345 BTreeMap::from([ 346 ("app.kubernetes.io/name".to_string(), name.clone()), 347 ("app.kubernetes.io/instance".to_string(), name), 348 ]) 349 } 350 351 fn pod_and_status_svc_spec( 352 &self, 353 ctx: &Arc<Context>, 354 ) -> Option<(api::core::v1::PodSpec, api::core::v1::ServiceSpec)> { 355 use api::core::v1::{PodSpec, ServicePort}; 356 357 // Initialize volume, volume mounts, and env vars. 358 let mut volumes = Vec::new(); 359 let mut volume_mounts = Vec::new(); 360 let env = self.spec.env.clone().unwrap_or_default(); 361 362 if let Some(volumes_from_config) = self.spec.volumes.as_ref() { 363 for volume in volumes_from_config { 364 volumes.push(volume.volume.clone()); 365 volume_mounts.push(volume.volume_mount.clone()); 366 } 367 } 368 369 let mut init_containers = Vec::new(); 370 let mut containers = Vec::new(); 371 372 let (init_container, workdir) = 373 self.source_container(&mut volumes, &mut volume_mounts, &env, ctx); 374 375 if let Some(init_container) = init_container { 376 init_containers.push(init_container); 377 } 378 379 let container = self.sink_container(&workdir, &volume_mounts, &env, ctx)?; 380 containers.push(container); 381 382 let pod_spec = PodSpec { 383 init_containers: Some(init_containers), 384 containers, 385 volumes: Some(volumes), 386 restart_policy: Some("Never".to_string()), 387 ..PodSpec::default() 388 }; 389 390 let svc_spec = ServiceSpec { 391 selector: self.pod_labels(ctx).into(), 392 ports: Some(vec![ServicePort { 393 port: ctx.configuration.status_port, 394 ..ServicePort::default() 395 }]), 396 ..ServiceSpec::default() 397 }; 398 399 Some((pod_spec, svc_spec)) 400 } 401 402 fn source_container( 403 &self, 404 volumes: &mut Vec<api::core::v1::Volume>, 405 volume_mounts: &mut Vec<api::core::v1::VolumeMount>, 406 env: &[api::core::v1::EnvVar], 407 _ctx: &Arc<Context>, 408 ) -> (Option<api::core::v1::Container>, String) { 409 use api::core::v1::{Container, Volume, VolumeMount}; 410 411 match &self.spec.source { 412 IndexerSource::GitHub(github) => { 413 // This init container clones the indexer source code from GitHub. 414 // It's a 5 step process: 415 // 416 // 1. Clone the repository, maybe authenticating with GitHub. 417 // 2. Clean files. 418 // 3. Fetch the specified revision. 419 // 4. Checkout the specified revision. 420 // 5. Clean files again. 421 let mut args = "cd /code".to_string(); 422 423 let clone_args = github 424 .git_clone_flags 425 .clone() 426 .unwrap_or_else(|| "-v".to_string()); 427 428 let url = if let Some(token) = &github.access_token_env_var { 429 format!( 430 "https://x-access-token:$({})@github.com/{}/{}.git", 431 token, github.owner, github.repo 432 ) 433 } else { 434 format!("https://github.com/{}/{}.git", github.owner, github.repo) 435 }; 436 437 args.push_str(&format!("&& git clone {clone_args} -- {url} .")); 438 439 let clean_flags = github 440 .git_clean_flags 441 .clone() 442 .unwrap_or_else(|| "-ffxdq".to_string()); 443 444 args.push_str(&format!("&& git clean {clean_flags}")); 445 args.push_str(&format!( 446 "&& git fetch -v --prune --tags -- origin {}", 447 github.revision 448 )); 449 args.push_str(&format!("&& git checkout -f {}", github.revision)); 450 args.push_str(&format!("&& git clean {clean_flags}")); 451 452 // Create an emptyDir volume where the source code will be cloned. 453 volumes.push(Volume { 454 name: "code".to_string(), 455 empty_dir: Some(Default::default()), 456 ..Volume::default() 457 }); 458 459 volume_mounts.push(VolumeMount { 460 name: "code".to_string(), 461 mount_path: "/code".to_string(), 462 ..VolumeMount::default() 463 }); 464 465 let workdir = match github.subpath { 466 None => "/code".to_string(), 467 Some(ref subpath) => format!("/code/{}", subpath), 468 }; 469 470 let container = Container { 471 name: "clone-github-repo".to_string(), 472 image: Some(GIT_CLONE_IMAGE.to_string()), 473 command: Some(vec!["/bin/sh".to_string()]), 474 args: Some(vec!["-c".to_string(), args]), 475 volume_mounts: Some(volume_mounts.clone()), 476 env: Some(env.to_owned()), 477 ..Container::default() 478 }; 479 480 (Some(container), workdir) 481 } 482 IndexerSource::Volume(fs) => (None, fs.path.clone()), 483 } 484 } 485 486 fn sink_container( 487 &self, 488 workdir: &str, 489 volume_mounts: &[api::core::v1::VolumeMount], 490 env: &[api::core::v1::EnvVar], 491 ctx: &Arc<Context>, 492 ) -> Option<api::core::v1::Container> { 493 use api::core::v1::Container; 494 495 let image = match &self.spec.sink.sink { 496 SinkType::Type { r#type } => { 497 let Some(config) = ctx.configuration.sinks.get(r#type) else { 498 return None; 499 }; 500 config.image.clone() 501 } 502 SinkType::Image { image } => image.clone(), 503 }; 504 505 let script = self.spec.sink.script.clone(); 506 507 let port = ctx.configuration.status_port; 508 let mut args = vec![ 509 "run".to_string(), 510 script, 511 "--status-server-address".to_string(), 512 format!("0.0.0.0:{port}"), 513 ]; 514 use api::core::v1::ContainerPort; 515 516 if let Some(extra_args) = &self.spec.sink.args { 517 args.extend_from_slice(extra_args); 518 } 519 520 let container = Container { 521 name: "sink".to_string(), 522 image: Some(image), 523 args: Some(args), 524 volume_mounts: Some(volume_mounts.to_owned()), 525 env: Some(env.to_owned()), 526 working_dir: Some(workdir.to_string()), 527 ports: Some(vec![ContainerPort { 528 container_port: port, 529 name: Some("status".to_string()), 530 ..ContainerPort::default() 531 }]), 532 ..Container::default() 533 }; 534 535 Some(container) 536 } 537 } 538 539 pub async fn reconcile_indexer( 540 indexer: Arc<Indexer>, 541 ctx: Arc<Context>, 542 ) -> std::result::Result<Action, ReconcileError> { 543 reconcile_indexer_impl(indexer, ctx) 544 .await 545 .map_err(ReconcileError) 546 } 547 548 async fn reconcile_indexer_impl( 549 indexer: Arc<Indexer>, 550 ctx: Arc<Context>, 551 ) -> Result<Action, OperatorError> { 552 let ns = indexer 553 .namespace() 554 .ok_or(OperatorError) 555 .attach_printable("failed to get namespace")?; 556 557 let indexers: Api<Indexer> = Api::namespaced(ctx.client.clone(), &ns); 558 559 info!( 560 indexer = %indexer.name_any(), 561 namespace = %ns, 562 "reconcile indexer" 563 ); 564 565 finalizer::finalizer(&indexers, INDEXER_FINALIZER, indexer, |event| async { 566 use finalizer::Event::*; 567 match event { 568 Apply(indexer) => indexer 569 .reconcile(ctx.clone()) 570 .await 571 .map_err(|err| err.into_error()), 572 Cleanup(indexer) => indexer 573 .cleanup(ctx.clone()) 574 .await 575 .map_err(|err| err.into_error()), 576 } 577 }) 578 .await 579 .change_context(OperatorError) 580 .attach_printable("finalizer operation failed") 581 } 582 583 pub fn error_policy(_indexer: Arc<Indexer>, err: &ReconcileError, _ctx: Arc<Context>) -> Action { 584 warn!(err = ?err, "Error reconciling indexer"); 585 586 Action::requeue(Duration::from_secs(30)) 587 } 588 589 impl fmt::Debug for ReconcileError { 590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 591 fmt::Debug::fmt(&self.0, f) 592 } 593 } 594 595 impl fmt::Display for ReconcileError { 596 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 597 fmt::Display::fmt(&self.0, f) 598 } 599 } 600 601 impl std::error::Error for ReconcileError {}