/ crates / arroyo-controller / src / states / running.rs
running.rs
  1  use std::time::{Duration, Instant};
  2  
  3  use time::OffsetDateTime;
  4  use tokio::time::MissedTickBehavior;
  5  
  6  use tracing::error;
  7  
  8  use crate::states::finishing::Finishing;
  9  use crate::states::recovering::Recovering;
 10  use crate::states::rescaling::Rescaling;
 11  use crate::states::restarting::Restarting;
 12  use crate::states::{fatal, stop_if_desired_running};
 13  use crate::JobMessage;
 14  use crate::{job_controller::ControllerProgress, states::StateError};
 15  use arroyo_rpc::config::config;
 16  use arroyo_server_common::log_event;
 17  use serde_json::json;
 18  
 19  use super::{JobContext, State, Transition};
 20  
 21  #[derive(Debug)]
 22  pub struct Running {}
 23  
 24  #[async_trait::async_trait]
 25  impl State for Running {
 26      fn name(&self) -> &'static str {
 27          "Running"
 28      }
 29  
 30      async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
 31          stop_if_desired_running!(self, ctx.config);
 32  
 33          let pipeline_config = &config().clone().pipeline;
 34  
 35          let running_start = Instant::now();
 36  
 37          let mut log_interval = tokio::time::interval(Duration::from_secs(60));
 38          log_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
 39  
 40          loop {
 41              let ttl_end: Option<Duration> = ctx.config.ttl.map(|t| {
 42                  let elapsed = Duration::from_micros(
 43                      (OffsetDateTime::now_utc() - ctx.status.start_time.unwrap())
 44                          .whole_microseconds() as u64,
 45                  );
 46  
 47                  t.checked_sub(elapsed).unwrap_or(Duration::ZERO)
 48              });
 49  
 50              tokio::select! {
 51                  msg = ctx.rx.recv() => {
 52                      match msg {
 53                          Some(JobMessage::ConfigUpdate(c)) => {
 54                              stop_if_desired_running!(self, &c);
 55  
 56                              if c.restart_nonce != ctx.status.restart_nonce {
 57                                  return Ok(Transition::next(*self, Restarting {
 58                                      mode: c.restart_mode
 59                                  }));
 60                              }
 61  
 62                              let job_controller = ctx.job_controller.as_mut().unwrap();
 63  
 64                              for (op, p) in &c.parallelism_overrides {
 65                                  if let Some(actual) = job_controller.operator_parallelism(op){
 66                                      if actual != *p {
 67                                          return Ok(Transition::next(
 68                                              *self,
 69                                              Rescaling {}
 70                                          ));
 71                                      }
 72                                  }
 73                              }
 74  
 75                              job_controller.update_config(c);
 76                          }
 77                          Some(JobMessage::RunningMessage(msg)) => {
 78                              if let Err(e) = ctx.job_controller.as_mut().unwrap().handle_message(msg).await {
 79                                  return Err(ctx.retryable(self, "job encountered an error", e, 10));
 80                              }
 81                          }
 82                          Some(msg) => {
 83                              ctx.handle(msg)?;
 84                          }
 85                          None => {
 86                              panic!("job queue shut down");
 87                          }
 88                      }
 89                  }
 90                  _ = tokio::time::sleep(Duration::from_millis(200)) => {
 91                      if ctx.status.restarts > 0 && running_start.elapsed() > *pipeline_config.healthy_duration {
 92                          let restarts = ctx.status.restarts;
 93                          ctx.status.restarts = 0;
 94                          if let Err(e) = ctx.status.update_db(&ctx.db).await {
 95                              error!(message = "Failed to update status", error = format!("{:?}", e),
 96                                  job_id = *ctx.config.id);
 97                              ctx.status.restarts = restarts;
 98                              // we'll try again on the next round
 99                          }
100                      }
101  
102                      match ctx.job_controller.as_mut().unwrap().progress().await {
103                          Ok(ControllerProgress::Continue) => {
104                              // do nothing
105                          },
106                          Ok(ControllerProgress::Finishing) => {
107                              return Ok(Transition::next(
108                                  *self,
109                                  Finishing {}
110                              ))
111                          },
112                          Err(err) => {
113                              error!(message = "error while running", error = format!("{:?}", err), job_id = *ctx.config.id);
114                              log_event("running_error", json!({
115                                  "service": "controller",
116                                  "job_id": ctx.config.id,
117                                  "error": format!("{:?}", err),
118                                  "is_preview": ctx.config.ttl.is_some(),
119                              }));
120  
121                              // only allow one restart for preview pipelines
122                              if ctx.config.ttl.is_some() {
123                                  return Err(fatal("Job encountered a fatal error; see worker logs for details", err));
124                              }
125  
126                              if pipeline_config.allowed_restarts != -1 && ctx.status.restarts >= pipeline_config.allowed_restarts {
127                                  return Err(fatal(
128                                      "Job has restarted too many times",
129                                      err
130                                  ));
131                              }
132                              return Ok(Transition::next(
133                                  *self,
134                                  Recovering {}
135                              ))
136                          }
137                      }
138                  }
139                  _ = log_interval.tick() => {
140                      log_event(
141                          "job_running",
142                          json!({
143                              "service": "controller",
144                              "job_id": ctx.config.id,
145                              "scheduler": &config().controller.scheduler,
146                              "duration_ms": ctx.last_transitioned_at.elapsed().as_millis() as u64,
147                          }),
148                      );
149                  }
150                  _ = tokio::time::sleep(ttl_end.unwrap_or(Duration::MAX)) => {
151                      // TTL has expired, stop the job
152                      return Ok(Transition::next(
153                          *self,
154                          Stopping {
155                              stop_mode: StopBehavior::StopJob(rpc::StopMode::Immediate),
156                          },
157                      ));
158                  }
159              }
160          }
161      }
162  }