running.rs
1 use std::time::{Duration, Instant}; 2 3 use time::OffsetDateTime; 4 use tokio::time::MissedTickBehavior; 5 6 use tracing::error; 7 8 use crate::states::finishing::Finishing; 9 use crate::states::recovering::Recovering; 10 use crate::states::rescaling::Rescaling; 11 use crate::states::restarting::Restarting; 12 use crate::states::{fatal, stop_if_desired_running}; 13 use crate::JobMessage; 14 use crate::{job_controller::ControllerProgress, states::StateError}; 15 use arroyo_rpc::config::config; 16 use arroyo_server_common::log_event; 17 use serde_json::json; 18 19 use super::{JobContext, State, Transition}; 20 21 #[derive(Debug)] 22 pub struct Running {} 23 24 #[async_trait::async_trait] 25 impl State for Running { 26 fn name(&self) -> &'static str { 27 "Running" 28 } 29 30 async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> { 31 stop_if_desired_running!(self, ctx.config); 32 33 let pipeline_config = &config().clone().pipeline; 34 35 let running_start = Instant::now(); 36 37 let mut log_interval = tokio::time::interval(Duration::from_secs(60)); 38 log_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); 39 40 loop { 41 let ttl_end: Option<Duration> = ctx.config.ttl.map(|t| { 42 let elapsed = Duration::from_micros( 43 (OffsetDateTime::now_utc() - ctx.status.start_time.unwrap()) 44 .whole_microseconds() as u64, 45 ); 46 47 t.checked_sub(elapsed).unwrap_or(Duration::ZERO) 48 }); 49 50 tokio::select! { 51 msg = ctx.rx.recv() => { 52 match msg { 53 Some(JobMessage::ConfigUpdate(c)) => { 54 stop_if_desired_running!(self, &c); 55 56 if c.restart_nonce != ctx.status.restart_nonce { 57 return Ok(Transition::next(*self, Restarting { 58 mode: c.restart_mode 59 })); 60 } 61 62 let job_controller = ctx.job_controller.as_mut().unwrap(); 63 64 for (op, p) in &c.parallelism_overrides { 65 if let Some(actual) = job_controller.operator_parallelism(op){ 66 if actual != *p { 67 return Ok(Transition::next( 68 *self, 69 Rescaling {} 70 )); 71 } 72 } 73 } 74 75 job_controller.update_config(c); 76 } 77 Some(JobMessage::RunningMessage(msg)) => { 78 if let Err(e) = ctx.job_controller.as_mut().unwrap().handle_message(msg).await { 79 return Err(ctx.retryable(self, "job encountered an error", e, 10)); 80 } 81 } 82 Some(msg) => { 83 ctx.handle(msg)?; 84 } 85 None => { 86 panic!("job queue shut down"); 87 } 88 } 89 } 90 _ = tokio::time::sleep(Duration::from_millis(200)) => { 91 if ctx.status.restarts > 0 && running_start.elapsed() > *pipeline_config.healthy_duration { 92 let restarts = ctx.status.restarts; 93 ctx.status.restarts = 0; 94 if let Err(e) = ctx.status.update_db(&ctx.db).await { 95 error!(message = "Failed to update status", error = format!("{:?}", e), 96 job_id = *ctx.config.id); 97 ctx.status.restarts = restarts; 98 // we'll try again on the next round 99 } 100 } 101 102 match ctx.job_controller.as_mut().unwrap().progress().await { 103 Ok(ControllerProgress::Continue) => { 104 // do nothing 105 }, 106 Ok(ControllerProgress::Finishing) => { 107 return Ok(Transition::next( 108 *self, 109 Finishing {} 110 )) 111 }, 112 Err(err) => { 113 error!(message = "error while running", error = format!("{:?}", err), job_id = *ctx.config.id); 114 log_event("running_error", json!({ 115 "service": "controller", 116 "job_id": ctx.config.id, 117 "error": format!("{:?}", err), 118 "is_preview": ctx.config.ttl.is_some(), 119 })); 120 121 // only allow one restart for preview pipelines 122 if ctx.config.ttl.is_some() { 123 return Err(fatal("Job encountered a fatal error; see worker logs for details", err)); 124 } 125 126 if pipeline_config.allowed_restarts != -1 && ctx.status.restarts >= pipeline_config.allowed_restarts { 127 return Err(fatal( 128 "Job has restarted too many times", 129 err 130 )); 131 } 132 return Ok(Transition::next( 133 *self, 134 Recovering {} 135 )) 136 } 137 } 138 } 139 _ = log_interval.tick() => { 140 log_event( 141 "job_running", 142 json!({ 143 "service": "controller", 144 "job_id": ctx.config.id, 145 "scheduler": &config().controller.scheduler, 146 "duration_ms": ctx.last_transitioned_at.elapsed().as_millis() as u64, 147 }), 148 ); 149 } 150 _ = tokio::time::sleep(ttl_end.unwrap_or(Duration::MAX)) => { 151 // TTL has expired, stop the job 152 return Ok(Transition::next( 153 *self, 154 Stopping { 155 stop_mode: StopBehavior::StopJob(rpc::StopMode::Immediate), 156 }, 157 )); 158 } 159 } 160 } 161 } 162 }