restarting.rs
1 use crate::states::recovering::Recovering; 2 use crate::states::scheduling::Scheduling; 3 use crate::states::stop_if_desired_non_running; 4 use crate::types::public::RestartMode; 5 use crate::JobMessage; 6 7 use super::{JobContext, State, StateError, Transition}; 8 9 #[derive(Debug)] 10 pub struct Restarting { 11 pub mode: RestartMode, 12 } 13 14 #[async_trait::async_trait] 15 impl State for Restarting { 16 fn name(&self) -> &'static str { 17 "Restarting" 18 } 19 20 async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> { 21 let job_controller = ctx.job_controller.as_mut().unwrap(); 22 23 match self.mode { 24 RestartMode::safe => { 25 if let Err(e) = job_controller.checkpoint(true).await { 26 return Err(ctx.retryable(self, "failed to initiate final checkpoint", e, 10)); 27 } 28 29 loop { 30 match job_controller.checkpoint_finished().await { 31 Ok(done) => { 32 if done && job_controller.finished() { 33 return Ok(Transition::next(*self, Scheduling {})); 34 } 35 } 36 Err(e) => { 37 return Err(ctx.retryable( 38 self, 39 "failed while monitoring final checkpoint", 40 e, 41 10, 42 )); 43 } 44 } 45 46 match ctx.rx.recv().await.expect("channel closed while receiving") { 47 JobMessage::RunningMessage(msg) => { 48 if let Err(e) = job_controller.handle_message(msg).await { 49 return Err(ctx.retryable( 50 self, 51 "failed while waiting for job finish", 52 e, 53 10, 54 )); 55 } 56 } 57 JobMessage::ConfigUpdate(c) => { 58 if c.restart_mode == RestartMode::force { 59 return Ok(Transition::next( 60 *self, 61 Restarting { 62 mode: RestartMode::force, 63 }, 64 )); 65 } 66 stop_if_desired_non_running!(self, &c); 67 } 68 _ => { 69 // ignore other messages 70 } 71 } 72 } 73 } 74 RestartMode::force => { 75 if let Err(e) = Recovering::cleanup(ctx).await { 76 return Err(ctx.retryable(self, "failed to tear down existing cluster", e, 10)); 77 } 78 79 Ok(Transition::next(*self, Scheduling {})) 80 } 81 } 82 } 83 }