scheduler.rs
1 //! Utilities for dealing with periodic recurring tasks. 2 3 use crate::SleepProvider; 4 use futures::channel::mpsc; 5 use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; 6 use futures::{Stream, StreamExt}; 7 use std::future::Future; 8 use std::pin::Pin; 9 use std::task::{Context, Poll}; 10 use std::time::{Duration, Instant, SystemTime}; 11 12 use pin_project::pin_project; 13 14 /// An error returned while telling a [`TaskSchedule`] to sleep. 15 /// 16 /// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`] 17 /// can fail because there are no [`TaskHandle`]s left. 18 /// 19 /// Note that it is *not* an error if the `sleep` function is interrupted, 20 /// cancelled, or or rescheduled for a later time: See [`TaskSchedule::sleep`] 21 /// for more information. 22 #[derive(Clone, Debug, thiserror::Error)] 23 #[non_exhaustive] 24 pub enum SleepError { 25 /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the 26 /// task should exit. 27 #[error("All task handles dropped: task exiting.")] 28 ScheduleDropped, 29 } 30 31 /// A command sent from task handles to schedule objects. 32 #[derive(Copy, Clone)] 33 enum SchedulerCommand { 34 /// Run the task now. 35 Fire, 36 /// Run the task at the provided `Instant`. 37 FireAt(Instant), 38 /// Cancel a pending execution, if there is one. 39 Cancel, 40 /// Pause execution without cancelling any running timers. (Those timers 41 /// will fire after we resume execution.) 42 Suspend, 43 /// Resume execution. If there is a pending timer, start waiting for it again; 44 /// otherwise, fire immediately. 45 Resume, 46 } 47 48 /// A remotely-controllable trigger for recurring tasks. 49 /// 50 /// This implements [`Stream`], and is intended to be used in a `while` loop; you should 51 /// wrap your recurring task in a `while schedule.next().await.is_some()` or similar. 52 #[pin_project(project = TaskScheduleP)] 53 pub struct TaskSchedule<R: SleepProvider> { 54 /// If we're waiting for a deadline to expire, the future for that. 55 sleep: Option<Pin<Box<R::SleepFuture>>>, 56 /// Receiver of scheduler commands from handles. 57 rx: UnboundedReceiver<SchedulerCommand>, 58 /// Runtime. 59 rt: R, 60 /// Whether or not to yield a result immediately when polled, once. 61 /// 62 /// This is used to avoid having to create a `SleepFuture` with zero duration, 63 /// which is potentially a bit wasteful. 64 instant_fire: bool, 65 /// Whether we are currently "suspended". If we are suspended, we won't 66 /// start executing again till we're explicitly "resumed". 67 suspended: bool, 68 } 69 70 /// A handle used to control a [`TaskSchedule`]. 71 /// 72 /// When the final handle is dropped, the computation governed by the 73 /// `TaskSchedule` should terminate. 74 #[derive(Clone)] 75 pub struct TaskHandle { 76 /// Sender of scheduler commands to the corresponding schedule. 77 tx: UnboundedSender<SchedulerCommand>, 78 } 79 80 impl<R: SleepProvider> TaskSchedule<R> { 81 /// Create a new schedule, and corresponding handle. 82 pub fn new(rt: R) -> (Self, TaskHandle) { 83 let (tx, rx) = mpsc::unbounded(); 84 ( 85 Self { 86 sleep: None, 87 rx, 88 rt, 89 // Start off ready. 90 instant_fire: true, 91 suspended: false, 92 }, 93 TaskHandle { tx }, 94 ) 95 } 96 97 /// Trigger the schedule after `dur`. 98 pub fn fire_in(&mut self, dur: Duration) { 99 self.instant_fire = false; 100 self.sleep = Some(Box::pin(self.rt.sleep(dur))); 101 } 102 103 /// Trigger the schedule instantly. 104 pub fn fire(&mut self) { 105 self.instant_fire = true; 106 self.sleep = None; 107 } 108 109 /// Wait until `Dur` has elapsed. 110 /// 111 /// This call is equivalent to [`SleepProvider::sleep`], except that the 112 /// resulting future will respect calls to the functions on this schedule's 113 /// associated [`TaskHandle`]. 114 /// 115 /// Alternatively, you can view this function as equivalent to 116 /// `self.fire_in(dur); self.next().await;`, only with the intent made more 117 /// explicit. 118 /// 119 /// If the associated [`TaskHandle`] for this schedule is suspended, then 120 /// this method will not return until the schedule is unsuspended _and_ the 121 /// timer elapses. If the associated [`TaskHandle`] is cancelled, then this 122 /// method will not return at all, until the schedule is re-activated by 123 /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`]. 124 /// 125 /// Finally, if every associated [`TaskHandle`] has been dropped, then this 126 /// method will return an error. 127 pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> { 128 self.fire_in(dur); 129 self.next().await.ok_or(SleepError::ScheduleDropped) 130 } 131 132 /// As 133 /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock), 134 /// but respect messages from this schedule's associated [`TaskHandle`]. 135 pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> { 136 loop { 137 let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when); 138 self.sleep(delay).await?; 139 if finished { 140 return Ok(()); 141 } 142 } 143 } 144 } 145 146 impl TaskHandle { 147 /// Trigger this handle's corresponding schedule now. 148 /// 149 /// Returns `true` if the schedule still exists, and `false` otherwise. 150 pub fn fire(&self) -> bool { 151 self.tx.unbounded_send(SchedulerCommand::Fire).is_ok() 152 } 153 /// Trigger this handle's corresponding schedule at `instant`. 154 /// 155 /// Returns `true` if the schedule still exists, and `false` otherwise. 156 pub fn fire_at(&self, instant: Instant) -> bool { 157 self.tx 158 .unbounded_send(SchedulerCommand::FireAt(instant)) 159 .is_ok() 160 } 161 /// Cancel a pending firing of the handle's corresponding schedule. 162 /// 163 /// Returns `true` if the schedule still exists, and `false` otherwise. 164 pub fn cancel(&self) -> bool { 165 self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok() 166 } 167 168 /// Suspend execution of the corresponding schedule. 169 /// 170 /// If the schedule is ready now, it will become pending; it won't become 171 /// ready again until `resume()` is called. If the schedule is waiting for a 172 /// timer, the timer will keep counting, but the schedule won't become ready 173 /// until the timer has elapsed _and_ `resume()` has been called. 174 /// 175 /// Returns `true` if the schedule still exists, and `false` otherwise. 176 pub fn suspend(&self) -> bool { 177 self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok() 178 } 179 180 /// Resume execution of the corresponding schedule. 181 /// 182 /// This method undoes the effect of a call to `suspend()`: the schedule 183 /// will fire again if it is ready (or when it becomes ready). 184 /// 185 /// This method won't cause the schedule to fire if it was already 186 /// cancelled. For that, use the `fire()` or fire_at()` methods. 187 /// 188 /// Returns `true` if the schedule still exists, and `false` otherwise. 189 pub fn resume(&self) -> bool { 190 self.tx.unbounded_send(SchedulerCommand::Resume).is_ok() 191 } 192 } 193 194 // NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want 195 // to require `R: Unpin`. Accordingly, all the fields are mutable references. 196 impl<R: SleepProvider> TaskScheduleP<'_, R> { 197 /// Handle an internal command. 198 fn handle_command(&mut self, cmd: SchedulerCommand) { 199 match cmd { 200 SchedulerCommand::Fire => { 201 *self.instant_fire = true; 202 *self.sleep = None; 203 } 204 SchedulerCommand::FireAt(instant) => { 205 let now = self.rt.now(); 206 let dur = instant.saturating_duration_since(now); 207 *self.instant_fire = false; 208 *self.sleep = Some(Box::pin(self.rt.sleep(dur))); 209 } 210 SchedulerCommand::Cancel => { 211 *self.instant_fire = false; 212 *self.sleep = None; 213 } 214 SchedulerCommand::Suspend => { 215 *self.suspended = true; 216 } 217 SchedulerCommand::Resume => { 218 *self.suspended = false; 219 } 220 } 221 } 222 } 223 224 impl<R: SleepProvider> Stream for TaskSchedule<R> { 225 type Item = (); 226 227 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 228 let mut this = self.project(); 229 while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) { 230 match maybe_cmd { 231 Some(c) => this.handle_command(c), 232 None => { 233 // All task handles dropped; return end of stream. 234 return Poll::Ready(None); 235 } 236 } 237 } 238 if *this.suspended { 239 return Poll::Pending; 240 } 241 if *this.instant_fire { 242 *this.instant_fire = false; 243 return Poll::Ready(Some(())); 244 } 245 if this 246 .sleep 247 .as_mut() 248 .map(|x| x.as_mut().poll(cx).is_ready()) 249 .unwrap_or(false) 250 { 251 *this.sleep = None; 252 return Poll::Ready(Some(())); 253 } 254 Poll::Pending 255 } 256 } 257 258 // test_with_all_runtimes! only exists if these features are satisfied. 259 #[cfg(all( 260 test, 261 any(feature = "native-tls", feature = "rustls"), 262 any(feature = "tokio", feature = "async-std"), 263 not(miri), // Several of these use real SystemTime 264 ))] 265 mod test { 266 use crate::scheduler::TaskSchedule; 267 use crate::{test_with_all_runtimes, SleepProvider}; 268 use futures::FutureExt; 269 use futures::StreamExt; 270 use std::time::{Duration, Instant}; 271 272 #[test] 273 fn it_fires_immediately() { 274 test_with_all_runtimes!(|rt| async move { 275 let (mut sch, _hdl) = TaskSchedule::new(rt); 276 assert!(sch.next().now_or_never().is_some()); 277 }); 278 } 279 280 #[test] 281 #[allow(clippy::unwrap_used)] 282 fn it_dies_if_dropped() { 283 test_with_all_runtimes!(|rt| async move { 284 let (mut sch, hdl) = TaskSchedule::new(rt); 285 drop(hdl); 286 assert!(sch.next().now_or_never().unwrap().is_none()); 287 }); 288 } 289 290 #[test] 291 fn it_fires_on_demand() { 292 test_with_all_runtimes!(|rt| async move { 293 let (mut sch, hdl) = TaskSchedule::new(rt); 294 assert!(sch.next().now_or_never().is_some()); 295 296 assert!(sch.next().now_or_never().is_none()); 297 assert!(hdl.fire()); 298 assert!(sch.next().now_or_never().is_some()); 299 assert!(sch.next().now_or_never().is_none()); 300 }); 301 } 302 303 #[test] 304 fn it_cancels_instant_firings() { 305 // NOTE(eta): this test very much assumes that unbounded channels will 306 // transmit things instantly. If it breaks, that's probably why. 307 test_with_all_runtimes!(|rt| async move { 308 let (mut sch, hdl) = TaskSchedule::new(rt); 309 assert!(sch.next().now_or_never().is_some()); 310 311 assert!(sch.next().now_or_never().is_none()); 312 assert!(hdl.fire()); 313 assert!(hdl.cancel()); 314 assert!(sch.next().now_or_never().is_none()); 315 }); 316 } 317 318 #[test] 319 fn it_fires_after_self_reschedule() { 320 test_with_all_runtimes!(|rt| async move { 321 let (mut sch, _hdl) = TaskSchedule::new(rt); 322 assert!(sch.next().now_or_never().is_some()); 323 324 sch.fire_in(Duration::from_millis(100)); 325 326 assert!(sch.next().now_or_never().is_none()); 327 assert!(sch.next().await.is_some()); 328 assert!(sch.next().now_or_never().is_none()); 329 }); 330 } 331 332 #[test] 333 fn it_fires_after_external_reschedule() { 334 test_with_all_runtimes!(|rt| async move { 335 let (mut sch, hdl) = TaskSchedule::new(rt); 336 assert!(sch.next().now_or_never().is_some()); 337 338 hdl.fire_at(Instant::now() + Duration::from_millis(100)); 339 340 assert!(sch.next().now_or_never().is_none()); 341 assert!(sch.next().await.is_some()); 342 assert!(sch.next().now_or_never().is_none()); 343 }); 344 } 345 346 // This test is disabled because it was flaky when the CI servers were 347 // heavily loaded. (See #545.) 348 // 349 // TODO: Let's fix this test and make it more reliable, then re-enable it. 350 #[test] 351 #[ignore] 352 fn it_cancels_delayed_firings() { 353 test_with_all_runtimes!(|rt| async move { 354 let (mut sch, hdl) = TaskSchedule::new(rt.clone()); 355 assert!(sch.next().now_or_never().is_some()); 356 357 hdl.fire_at(Instant::now() + Duration::from_millis(100)); 358 359 assert!(sch.next().now_or_never().is_none()); 360 361 rt.sleep(Duration::from_millis(50)).await; 362 363 assert!(sch.next().now_or_never().is_none()); 364 365 hdl.cancel(); 366 367 assert!(sch.next().now_or_never().is_none()); 368 369 rt.sleep(Duration::from_millis(100)).await; 370 371 assert!(sch.next().now_or_never().is_none()); 372 }); 373 } 374 375 #[test] 376 fn last_fire_wins() { 377 test_with_all_runtimes!(|rt| async move { 378 let (mut sch, hdl) = TaskSchedule::new(rt.clone()); 379 assert!(sch.next().now_or_never().is_some()); 380 381 hdl.fire_at(Instant::now() + Duration::from_millis(100)); 382 hdl.fire(); 383 384 assert!(sch.next().now_or_never().is_some()); 385 assert!(sch.next().now_or_never().is_none()); 386 387 rt.sleep(Duration::from_millis(150)).await; 388 389 assert!(sch.next().now_or_never().is_none()); 390 }); 391 } 392 393 #[test] 394 fn suspend_and_resume_with_fire() { 395 test_with_all_runtimes!(|rt| async move { 396 let (mut sch, hdl) = TaskSchedule::new(rt.clone()); 397 hdl.fire(); 398 hdl.suspend(); 399 400 assert!(sch.next().now_or_never().is_none()); 401 hdl.resume(); 402 assert!(sch.next().now_or_never().is_some()); 403 }); 404 } 405 406 #[test] 407 fn suspend_and_resume_with_sleep() { 408 test_with_all_runtimes!(|rt| async move { 409 let (mut sch, hdl) = TaskSchedule::new(rt.clone()); 410 sch.fire_in(Duration::from_millis(100)); 411 hdl.suspend(); 412 413 assert!(sch.next().now_or_never().is_none()); 414 hdl.resume(); 415 assert!(sch.next().now_or_never().is_none()); 416 assert!(sch.next().await.is_some()); 417 }); 418 } 419 420 #[test] 421 fn suspend_and_resume_with_nothing() { 422 test_with_all_runtimes!(|rt| async move { 423 let (mut sch, hdl) = TaskSchedule::new(rt.clone()); 424 assert!(sch.next().now_or_never().is_some()); 425 hdl.suspend(); 426 427 assert!(sch.next().now_or_never().is_none()); 428 hdl.resume(); 429 }); 430 } 431 }