/ crates / tor-rtcompat / src / scheduler.rs
scheduler.rs
  1  //! Utilities for dealing with periodic recurring tasks.
  2  
  3  use crate::SleepProvider;
  4  use futures::channel::mpsc;
  5  use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
  6  use futures::{Stream, StreamExt};
  7  use std::future::Future;
  8  use std::pin::Pin;
  9  use std::task::{Context, Poll};
 10  use std::time::{Duration, Instant, SystemTime};
 11  
 12  use pin_project::pin_project;
 13  
 14  /// An error returned while telling a [`TaskSchedule`] to sleep.
 15  ///
 16  /// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
 17  /// can fail because there are no [`TaskHandle`]s left.
 18  ///
 19  /// Note that it is *not* an error if the `sleep` function is interrupted,
 20  /// cancelled, or  or rescheduled for a later time: See [`TaskSchedule::sleep`]
 21  /// for more information.
 22  #[derive(Clone, Debug, thiserror::Error)]
 23  #[non_exhaustive]
 24  pub enum SleepError {
 25      /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
 26      /// task should exit.
 27      #[error("All task handles dropped: task exiting.")]
 28      ScheduleDropped,
 29  }
 30  
 31  /// A command sent from task handles to schedule objects.
 32  #[derive(Copy, Clone)]
 33  enum SchedulerCommand {
 34      /// Run the task now.
 35      Fire,
 36      /// Run the task at the provided `Instant`.
 37      FireAt(Instant),
 38      /// Cancel a pending execution, if there is one.
 39      Cancel,
 40      /// Pause execution without cancelling any running timers.  (Those timers
 41      /// will fire after we resume execution.)
 42      Suspend,
 43      /// Resume execution.  If there is a pending timer, start waiting for it again;
 44      /// otherwise, fire immediately.
 45      Resume,
 46  }
 47  
 48  /// A remotely-controllable trigger for recurring tasks.
 49  ///
 50  /// This implements [`Stream`], and is intended to be used in a `while` loop; you should
 51  /// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
 52  #[pin_project(project = TaskScheduleP)]
 53  pub struct TaskSchedule<R: SleepProvider> {
 54      /// If we're waiting for a deadline to expire, the future for that.
 55      sleep: Option<Pin<Box<R::SleepFuture>>>,
 56      /// Receiver of scheduler commands from handles.
 57      rx: UnboundedReceiver<SchedulerCommand>,
 58      /// Runtime.
 59      rt: R,
 60      /// Whether or not to yield a result immediately when polled, once.
 61      ///
 62      /// This is used to avoid having to create a `SleepFuture` with zero duration,
 63      /// which is potentially a bit wasteful.
 64      instant_fire: bool,
 65      /// Whether we are currently "suspended".  If we are suspended, we won't
 66      /// start executing again till we're explicitly "resumed".
 67      suspended: bool,
 68  }
 69  
 70  /// A handle used to control a [`TaskSchedule`].
 71  ///
 72  /// When the final handle is dropped, the computation governed by the
 73  /// `TaskSchedule` should terminate.
 74  #[derive(Clone)]
 75  pub struct TaskHandle {
 76      /// Sender of scheduler commands to the corresponding schedule.
 77      tx: UnboundedSender<SchedulerCommand>,
 78  }
 79  
 80  impl<R: SleepProvider> TaskSchedule<R> {
 81      /// Create a new schedule, and corresponding handle.
 82      pub fn new(rt: R) -> (Self, TaskHandle) {
 83          let (tx, rx) = mpsc::unbounded();
 84          (
 85              Self {
 86                  sleep: None,
 87                  rx,
 88                  rt,
 89                  // Start off ready.
 90                  instant_fire: true,
 91                  suspended: false,
 92              },
 93              TaskHandle { tx },
 94          )
 95      }
 96  
 97      /// Trigger the schedule after `dur`.
 98      pub fn fire_in(&mut self, dur: Duration) {
 99          self.instant_fire = false;
100          self.sleep = Some(Box::pin(self.rt.sleep(dur)));
101      }
102  
103      /// Trigger the schedule instantly.
104      pub fn fire(&mut self) {
105          self.instant_fire = true;
106          self.sleep = None;
107      }
108  
109      /// Wait until `Dur` has elapsed.
110      ///
111      /// This call is equivalent to [`SleepProvider::sleep`], except that the
112      /// resulting future will respect calls to the functions on this schedule's
113      /// associated [`TaskHandle`].
114      ///
115      /// Alternatively, you can view this function as equivalent to
116      /// `self.fire_in(dur); self.next().await;`, only  with the intent made more
117      /// explicit.
118      ///
119      /// If the associated [`TaskHandle`] for this schedule is suspended, then
120      /// this method will not return until the schedule is unsuspended _and_ the
121      /// timer elapses.  If the associated [`TaskHandle`] is cancelled, then this
122      /// method will not return at all, until the schedule is re-activated by
123      /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
124      ///
125      /// Finally, if every associated [`TaskHandle`] has been dropped, then this
126      /// method will return an error.
127      pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
128          self.fire_in(dur);
129          self.next().await.ok_or(SleepError::ScheduleDropped)
130      }
131  
132      /// As
133      /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
134      /// but respect messages from this schedule's associated [`TaskHandle`].
135      pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
136          loop {
137              let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
138              self.sleep(delay).await?;
139              if finished {
140                  return Ok(());
141              }
142          }
143      }
144  }
145  
146  impl TaskHandle {
147      /// Trigger this handle's corresponding schedule now.
148      ///
149      /// Returns `true` if the schedule still exists, and `false` otherwise.
150      pub fn fire(&self) -> bool {
151          self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
152      }
153      /// Trigger this handle's corresponding schedule at `instant`.
154      ///
155      /// Returns `true` if the schedule still exists, and `false` otherwise.
156      pub fn fire_at(&self, instant: Instant) -> bool {
157          self.tx
158              .unbounded_send(SchedulerCommand::FireAt(instant))
159              .is_ok()
160      }
161      /// Cancel a pending firing of the handle's corresponding schedule.
162      ///
163      /// Returns `true` if the schedule still exists, and `false` otherwise.
164      pub fn cancel(&self) -> bool {
165          self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
166      }
167  
168      /// Suspend execution of the corresponding schedule.
169      ///
170      /// If the schedule is ready now, it will become pending; it won't become
171      /// ready again until `resume()` is called. If the schedule is waiting for a
172      /// timer, the timer will keep counting, but the schedule won't become ready
173      /// until the timer has elapsed _and_ `resume()` has been called.
174      ///
175      /// Returns `true` if the schedule still exists, and `false` otherwise.
176      pub fn suspend(&self) -> bool {
177          self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
178      }
179  
180      /// Resume execution of the corresponding schedule.
181      ///
182      /// This method undoes the effect of a call to `suspend()`: the schedule
183      /// will fire again if it is ready (or when it becomes ready).
184      ///
185      /// This method won't cause the schedule to fire if it was already
186      /// cancelled. For that, use the `fire()` or fire_at()` methods.
187      ///
188      /// Returns `true` if the schedule still exists, and `false` otherwise.
189      pub fn resume(&self) -> bool {
190          self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
191      }
192  }
193  
194  // NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
195  //            to require `R: Unpin`. Accordingly, all the fields are mutable references.
196  impl<R: SleepProvider> TaskScheduleP<'_, R> {
197      /// Handle an internal command.
198      fn handle_command(&mut self, cmd: SchedulerCommand) {
199          match cmd {
200              SchedulerCommand::Fire => {
201                  *self.instant_fire = true;
202                  *self.sleep = None;
203              }
204              SchedulerCommand::FireAt(instant) => {
205                  let now = self.rt.now();
206                  let dur = instant.saturating_duration_since(now);
207                  *self.instant_fire = false;
208                  *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
209              }
210              SchedulerCommand::Cancel => {
211                  *self.instant_fire = false;
212                  *self.sleep = None;
213              }
214              SchedulerCommand::Suspend => {
215                  *self.suspended = true;
216              }
217              SchedulerCommand::Resume => {
218                  *self.suspended = false;
219              }
220          }
221      }
222  }
223  
224  impl<R: SleepProvider> Stream for TaskSchedule<R> {
225      type Item = ();
226  
227      fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228          let mut this = self.project();
229          while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
230              match maybe_cmd {
231                  Some(c) => this.handle_command(c),
232                  None => {
233                      // All task handles dropped; return end of stream.
234                      return Poll::Ready(None);
235                  }
236              }
237          }
238          if *this.suspended {
239              return Poll::Pending;
240          }
241          if *this.instant_fire {
242              *this.instant_fire = false;
243              return Poll::Ready(Some(()));
244          }
245          if this
246              .sleep
247              .as_mut()
248              .map(|x| x.as_mut().poll(cx).is_ready())
249              .unwrap_or(false)
250          {
251              *this.sleep = None;
252              return Poll::Ready(Some(()));
253          }
254          Poll::Pending
255      }
256  }
257  
258  // test_with_all_runtimes! only exists if these features are satisfied.
259  #[cfg(all(
260      test,
261      any(feature = "native-tls", feature = "rustls"),
262      any(feature = "tokio", feature = "async-std"),
263      not(miri), // Several of these use real SystemTime
264  ))]
265  mod test {
266      use crate::scheduler::TaskSchedule;
267      use crate::{test_with_all_runtimes, SleepProvider};
268      use futures::FutureExt;
269      use futures::StreamExt;
270      use std::time::{Duration, Instant};
271  
272      #[test]
273      fn it_fires_immediately() {
274          test_with_all_runtimes!(|rt| async move {
275              let (mut sch, _hdl) = TaskSchedule::new(rt);
276              assert!(sch.next().now_or_never().is_some());
277          });
278      }
279  
280      #[test]
281      #[allow(clippy::unwrap_used)]
282      fn it_dies_if_dropped() {
283          test_with_all_runtimes!(|rt| async move {
284              let (mut sch, hdl) = TaskSchedule::new(rt);
285              drop(hdl);
286              assert!(sch.next().now_or_never().unwrap().is_none());
287          });
288      }
289  
290      #[test]
291      fn it_fires_on_demand() {
292          test_with_all_runtimes!(|rt| async move {
293              let (mut sch, hdl) = TaskSchedule::new(rt);
294              assert!(sch.next().now_or_never().is_some());
295  
296              assert!(sch.next().now_or_never().is_none());
297              assert!(hdl.fire());
298              assert!(sch.next().now_or_never().is_some());
299              assert!(sch.next().now_or_never().is_none());
300          });
301      }
302  
303      #[test]
304      fn it_cancels_instant_firings() {
305          // NOTE(eta): this test very much assumes that unbounded channels will
306          //            transmit things instantly. If it breaks, that's probably why.
307          test_with_all_runtimes!(|rt| async move {
308              let (mut sch, hdl) = TaskSchedule::new(rt);
309              assert!(sch.next().now_or_never().is_some());
310  
311              assert!(sch.next().now_or_never().is_none());
312              assert!(hdl.fire());
313              assert!(hdl.cancel());
314              assert!(sch.next().now_or_never().is_none());
315          });
316      }
317  
318      #[test]
319      fn it_fires_after_self_reschedule() {
320          test_with_all_runtimes!(|rt| async move {
321              let (mut sch, _hdl) = TaskSchedule::new(rt);
322              assert!(sch.next().now_or_never().is_some());
323  
324              sch.fire_in(Duration::from_millis(100));
325  
326              assert!(sch.next().now_or_never().is_none());
327              assert!(sch.next().await.is_some());
328              assert!(sch.next().now_or_never().is_none());
329          });
330      }
331  
332      #[test]
333      fn it_fires_after_external_reschedule() {
334          test_with_all_runtimes!(|rt| async move {
335              let (mut sch, hdl) = TaskSchedule::new(rt);
336              assert!(sch.next().now_or_never().is_some());
337  
338              hdl.fire_at(Instant::now() + Duration::from_millis(100));
339  
340              assert!(sch.next().now_or_never().is_none());
341              assert!(sch.next().await.is_some());
342              assert!(sch.next().now_or_never().is_none());
343          });
344      }
345  
346      // This test is disabled because it was flaky when the CI servers were
347      // heavily loaded. (See #545.)
348      //
349      // TODO: Let's fix this test and make it more reliable, then re-enable it.
350      #[test]
351      #[ignore]
352      fn it_cancels_delayed_firings() {
353          test_with_all_runtimes!(|rt| async move {
354              let (mut sch, hdl) = TaskSchedule::new(rt.clone());
355              assert!(sch.next().now_or_never().is_some());
356  
357              hdl.fire_at(Instant::now() + Duration::from_millis(100));
358  
359              assert!(sch.next().now_or_never().is_none());
360  
361              rt.sleep(Duration::from_millis(50)).await;
362  
363              assert!(sch.next().now_or_never().is_none());
364  
365              hdl.cancel();
366  
367              assert!(sch.next().now_or_never().is_none());
368  
369              rt.sleep(Duration::from_millis(100)).await;
370  
371              assert!(sch.next().now_or_never().is_none());
372          });
373      }
374  
375      #[test]
376      fn last_fire_wins() {
377          test_with_all_runtimes!(|rt| async move {
378              let (mut sch, hdl) = TaskSchedule::new(rt.clone());
379              assert!(sch.next().now_or_never().is_some());
380  
381              hdl.fire_at(Instant::now() + Duration::from_millis(100));
382              hdl.fire();
383  
384              assert!(sch.next().now_or_never().is_some());
385              assert!(sch.next().now_or_never().is_none());
386  
387              rt.sleep(Duration::from_millis(150)).await;
388  
389              assert!(sch.next().now_or_never().is_none());
390          });
391      }
392  
393      #[test]
394      fn suspend_and_resume_with_fire() {
395          test_with_all_runtimes!(|rt| async move {
396              let (mut sch, hdl) = TaskSchedule::new(rt.clone());
397              hdl.fire();
398              hdl.suspend();
399  
400              assert!(sch.next().now_or_never().is_none());
401              hdl.resume();
402              assert!(sch.next().now_or_never().is_some());
403          });
404      }
405  
406      #[test]
407      fn suspend_and_resume_with_sleep() {
408          test_with_all_runtimes!(|rt| async move {
409              let (mut sch, hdl) = TaskSchedule::new(rt.clone());
410              sch.fire_in(Duration::from_millis(100));
411              hdl.suspend();
412  
413              assert!(sch.next().now_or_never().is_none());
414              hdl.resume();
415              assert!(sch.next().now_or_never().is_none());
416              assert!(sch.next().await.is_some());
417          });
418      }
419  
420      #[test]
421      fn suspend_and_resume_with_nothing() {
422          test_with_all_runtimes!(|rt| async move {
423              let (mut sch, hdl) = TaskSchedule::new(rt.clone());
424              assert!(sch.next().now_or_never().is_some());
425              hdl.suspend();
426  
427              assert!(sch.next().now_or_never().is_none());
428              hdl.resume();
429          });
430      }
431  }