checkpoint_stopping.rs
1 use arroyo_rpc::grpc; 2 use tracing::debug; 3 4 use crate::{states::StateError, JobMessage}; 5 6 use super::{ 7 stopping::{StopBehavior, Stopping}, 8 JobContext, State, Stopped, Transition, 9 }; 10 11 #[derive(Debug)] 12 pub struct CheckpointStopping {} 13 14 #[async_trait::async_trait] 15 impl State for CheckpointStopping { 16 fn name(&self) -> &'static str { 17 "CheckpointStopping" 18 } 19 20 async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> { 21 let job_controller = ctx.job_controller.as_mut().unwrap(); 22 23 let mut final_checkpoint_started = false; 24 25 loop { 26 match job_controller.checkpoint_finished().await { 27 Ok(done) => { 28 debug!("checked checkpoint, got {}, job_controller.finished(): {}, final_checkpoint_started: {}", done, job_controller.finished(), final_checkpoint_started); 29 30 if done && job_controller.finished() && final_checkpoint_started { 31 return Ok(Transition::next(*self, Stopped {})); 32 } 33 } 34 Err(e) => { 35 return Err(ctx.retryable( 36 self, 37 "failed while monitoring final checkpoint", 38 e, 39 10, 40 )); 41 } 42 } 43 44 if !final_checkpoint_started { 45 match job_controller.checkpoint(true).await { 46 Ok(started) => final_checkpoint_started = started, 47 Err(e) => { 48 return Err(ctx.retryable( 49 self, 50 "failed to initiate final checkpoint", 51 e, 52 10, 53 )); 54 } 55 } 56 } 57 58 match ctx.rx.recv().await.expect("channel closed while receiving") { 59 JobMessage::RunningMessage(msg) => { 60 if let Err(e) = job_controller.handle_message(msg).await { 61 return Err(ctx.retryable( 62 self, 63 "failed while waiting for job finish", 64 e, 65 10, 66 )); 67 } 68 } 69 JobMessage::ConfigUpdate(c) => { 70 match c.stop_mode { 71 crate::types::public::StopMode::immediate => { 72 return Ok(Transition::next( 73 *self, 74 Stopping { 75 stop_mode: StopBehavior::StopJob( 76 grpc::rpc::StopMode::Immediate, 77 ), 78 }, 79 )); 80 } 81 crate::types::public::StopMode::force => { 82 todo!("implement force stop mode"); 83 } 84 _ => { 85 // do nothing 86 } 87 } 88 } 89 _ => { 90 // ignore other messages 91 } 92 } 93 } 94 } 95 }