/ crates / arroyo-controller / src / states / restarting.rs
restarting.rs
 1  use crate::states::recovering::Recovering;
 2  use crate::states::scheduling::Scheduling;
 3  use crate::states::stop_if_desired_non_running;
 4  use crate::types::public::RestartMode;
 5  use crate::JobMessage;
 6  
 7  use super::{JobContext, State, StateError, Transition};
 8  
 9  #[derive(Debug)]
10  pub struct Restarting {
11      pub mode: RestartMode,
12  }
13  
14  #[async_trait::async_trait]
15  impl State for Restarting {
16      fn name(&self) -> &'static str {
17          "Restarting"
18      }
19  
20      async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
21          let job_controller = ctx.job_controller.as_mut().unwrap();
22  
23          match self.mode {
24              RestartMode::safe => {
25                  if let Err(e) = job_controller.checkpoint(true).await {
26                      return Err(ctx.retryable(self, "failed to initiate final checkpoint", e, 10));
27                  }
28  
29                  loop {
30                      match job_controller.checkpoint_finished().await {
31                          Ok(done) => {
32                              if done && job_controller.finished() {
33                                  return Ok(Transition::next(*self, Scheduling {}));
34                              }
35                          }
36                          Err(e) => {
37                              return Err(ctx.retryable(
38                                  self,
39                                  "failed while monitoring final checkpoint",
40                                  e,
41                                  10,
42                              ));
43                          }
44                      }
45  
46                      match ctx.rx.recv().await.expect("channel closed while receiving") {
47                          JobMessage::RunningMessage(msg) => {
48                              if let Err(e) = job_controller.handle_message(msg).await {
49                                  return Err(ctx.retryable(
50                                      self,
51                                      "failed while waiting for job finish",
52                                      e,
53                                      10,
54                                  ));
55                              }
56                          }
57                          JobMessage::ConfigUpdate(c) => {
58                              if c.restart_mode == RestartMode::force {
59                                  return Ok(Transition::next(
60                                      *self,
61                                      Restarting {
62                                          mode: RestartMode::force,
63                                      },
64                                  ));
65                              }
66                              stop_if_desired_non_running!(self, &c);
67                          }
68                          _ => {
69                              // ignore other messages
70                          }
71                      }
72                  }
73              }
74              RestartMode::force => {
75                  if let Err(e) = Recovering::cleanup(ctx).await {
76                      return Err(ctx.retryable(self, "failed to tear down existing cluster", e, 10));
77                  }
78  
79                  Ok(Transition::next(*self, Scheduling {}))
80              }
81          }
82      }
83  }