/ crates / arroyo-controller / src / states / checkpoint_stopping.rs
checkpoint_stopping.rs
 1  use arroyo_rpc::grpc;
 2  use tracing::debug;
 3  
 4  use crate::{states::StateError, JobMessage};
 5  
 6  use super::{
 7      stopping::{StopBehavior, Stopping},
 8      JobContext, State, Stopped, Transition,
 9  };
10  
11  #[derive(Debug)]
12  pub struct CheckpointStopping {}
13  
14  #[async_trait::async_trait]
15  impl State for CheckpointStopping {
16      fn name(&self) -> &'static str {
17          "CheckpointStopping"
18      }
19  
20      async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
21          let job_controller = ctx.job_controller.as_mut().unwrap();
22  
23          let mut final_checkpoint_started = false;
24  
25          loop {
26              match job_controller.checkpoint_finished().await {
27                  Ok(done) => {
28                      debug!("checked checkpoint, got {}, job_controller.finished(): {}, final_checkpoint_started: {}", done, job_controller.finished(), final_checkpoint_started);
29  
30                      if done && job_controller.finished() && final_checkpoint_started {
31                          return Ok(Transition::next(*self, Stopped {}));
32                      }
33                  }
34                  Err(e) => {
35                      return Err(ctx.retryable(
36                          self,
37                          "failed while monitoring final checkpoint",
38                          e,
39                          10,
40                      ));
41                  }
42              }
43  
44              if !final_checkpoint_started {
45                  match job_controller.checkpoint(true).await {
46                      Ok(started) => final_checkpoint_started = started,
47                      Err(e) => {
48                          return Err(ctx.retryable(
49                              self,
50                              "failed to initiate final checkpoint",
51                              e,
52                              10,
53                          ));
54                      }
55                  }
56              }
57  
58              match ctx.rx.recv().await.expect("channel closed while receiving") {
59                  JobMessage::RunningMessage(msg) => {
60                      if let Err(e) = job_controller.handle_message(msg).await {
61                          return Err(ctx.retryable(
62                              self,
63                              "failed while waiting for job finish",
64                              e,
65                              10,
66                          ));
67                      }
68                  }
69                  JobMessage::ConfigUpdate(c) => {
70                      match c.stop_mode {
71                          crate::types::public::StopMode::immediate => {
72                              return Ok(Transition::next(
73                                  *self,
74                                  Stopping {
75                                      stop_mode: StopBehavior::StopJob(
76                                          grpc::rpc::StopMode::Immediate,
77                                      ),
78                                  },
79                              ));
80                          }
81                          crate::types::public::StopMode::force => {
82                              todo!("implement force stop mode");
83                          }
84                          _ => {
85                              // do nothing
86                          }
87                      }
88                  }
89                  _ => {
90                      // ignore other messages
91                  }
92              }
93          }
94      }
95  }