/ crates / tor-log-ratelim / src / ratelim.rs
ratelim.rs
  1  //! Implement half of log rate-limiting: the ability to cause the state of a
  2  //! Loggable to get flushed at appropriate intervals.
  3  
  4  use super::{Activity, Loggable};
  5  use futures::task::SpawnExt as _;
  6  use std::{
  7      sync::{Arc, Mutex},
  8      time::Duration,
  9  };
 10  use tor_error::ErrorReport;
 11  
 12  /// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
 13  /// can install it globally.
 14  pub(crate) mod rt {
 15      use futures::{future::BoxFuture, task::Spawn};
 16      use once_cell::sync::OnceCell;
 17      use std::time::{Duration, Instant};
 18  
 19      /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
 20      pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
 21          /// Return a future that will yield () after `duration` has passed.
 22          fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
 23  
 24          /// Return the current time as an Instant.
 25          fn now(&self) -> Instant;
 26      }
 27  
 28      impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
 29          fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
 30              Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
 31          }
 32          fn now(&self) -> Instant {
 33              tor_rtcompat::SleepProvider::now(self)
 34          }
 35      }
 36  
 37      /// A global view of our runtime, used for rate-limited logging.
 38      // TODO MSRV 1.70: We could use OnceSync instead.
 39      static RUNTIME_SUPPORT: OnceCell<Box<dyn RuntimeSupport>> = OnceCell::new();
 40  
 41      /// Try to install `runtime` as a global runtime to be used for rate-limited logging.
 42      ///
 43      /// Return an error (and make no changes) if there there was already a runtime installed.
 44      pub fn install_runtime<R: tor_rtcompat::Runtime>(
 45          runtime: R,
 46      ) -> Result<(), InstallRuntimeError> {
 47          let rt = Box::new(runtime);
 48          RUNTIME_SUPPORT
 49              .set(rt)
 50              .map_err(|_| InstallRuntimeError::DuplicateCall)
 51      }
 52  
 53      /// An error that occurs while installing a runtime.
 54      #[derive(Clone, Debug, thiserror::Error)]
 55      #[non_exhaustive]
 56      pub enum InstallRuntimeError {
 57          /// Tried to install a runtime when there was already one installed.
 58          #[error("Called tor_log_ratelim::install_runtime() more than once")]
 59          DuplicateCall,
 60      }
 61  
 62      /// Return the installed runtime, if there is one.
 63      pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
 64          RUNTIME_SUPPORT.get().map(Box::as_ref)
 65      }
 66  }
 67  
 68  /// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
 69  /// flushed from time to time.
 70  pub struct RateLim<T> {
 71      /// The Loggable itself.
 72      inner: Mutex<Inner<T>>,
 73  }
 74  
 75  /// The mutable state of a [`RateLim`].
 76  struct Inner<T> {
 77      /// The loggable state whose reports are rate-limited
 78      loggable: T,
 79      /// True if we have a running task that is collating reports for `loggable`.
 80      task_running: bool,
 81  }
 82  
 83  impl<T: Loggable> RateLim<T> {
 84      /// Create a new `RateLim` to flush events for `loggable`.
 85      pub fn new(loggable: T) -> Arc<Self> {
 86          Arc::new(RateLim {
 87              inner: Mutex::new(Inner {
 88                  loggable,
 89                  task_running: false,
 90              }),
 91          })
 92      }
 93  
 94      /// Adjust the status of this reporter's `Loggable` by calling `f` on it,
 95      /// but only if it is already scheduled to report itself.  Otherwise, do nothing.
 96      ///
 97      /// This is the appropriate function to use for tracking successes.f
 98      pub fn nonevent<F>(&self, f: F)
 99      where
100          F: FnOnce(&mut T),
101      {
102          let mut inner = self.inner.lock().expect("lock poisoned");
103          if inner.task_running {
104              f(&mut inner.loggable);
105          }
106      }
107  
108      /// Add an event to this rate-limited reporter by calling `f` on it, and
109      /// schedule it to be reported after an appropriate time.
110      ///
111      /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
112      /// we may want to make it call rt_support() directly again, as it did in
113      /// earlier visions.
114      pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
115      where
116          F: FnOnce(&mut T),
117      {
118          let mut inner = self.inner.lock().expect("poisoned lock");
119          f(&mut inner.loggable);
120  
121          if !inner.task_running {
122              // Launch a task to make periodic reports on the state of our Loggable.
123              inner.task_running = true;
124              if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
125                  // We couldn't spawn a task; we have to flush the state
126                  // immediately.
127                  //
128                  // TODO: This behavior is undesirable if it causes us to spam
129                  // the log while we are shutting down.  On the other hand, it's
130                  // also undesirable if we suppress our logs while we're
131                  // shutting down.
132                  inner.loggable.flush(Duration::default());
133                  tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
134              }
135          }
136      }
137  }
138  
139  /// After approximately this many seconds of not having anything to report, we
140  /// should reset our timeout schedule.
141  const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
142  
143  /// Return an iterator of reasonable amounts of time to summarize.
144  ///
145  /// We summarize short intervals at first, and back off as the event keeps
146  /// happening.
147  fn timeout_sequence() -> impl Iterator<Item = Duration> {
148      /// seconds per second.
149      const SEC: u64 = 1;
150      /// seconds per minute.
151      const MIN: u64 = 60;
152      /// seconds per hour
153      const HOUR: u64 = 3600;
154      [
155          5 * SEC,
156          MIN,
157          5 * MIN,
158          30 * MIN,
159          30 * MIN,
160          HOUR,
161          HOUR,
162          4 * HOUR,
163          4 * HOUR,
164      ]
165      .into_iter()
166      .chain(std::iter::repeat(24 * HOUR))
167      .map(|secs| Duration::new(secs, 0))
168  }
169  
170  /// Helper: runs in a background task, and periodically flushes the `Loggable`
171  /// in `ratelim`.  Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
172  /// for "long enough".
173  async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
174  // TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
175  // a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
176  where
177      T: Loggable,
178  {
179      let mut dormant_since = None;
180      for duration in timeout_sequence() {
181          rt_support.sleep(duration).await;
182          {
183              let mut inner = ratelim.inner.lock().expect("Lock poisoned");
184              debug_assert!(inner.task_running);
185              // NOTE: We say that we are summarizing "duration" on the theory
186              // that we actually slept for "duration".  But maybe `sleep` slept
187              // for a little more or less?  Nonetheless, we report this as if we
188              // had slept for the exact amount, since the alternative appears to
189              // be saying stuff like "this problem occurred 8/12 times in the
190              // last 10min 0.0014ssec" instead of "10m".
191              if inner.loggable.flush(duration) == Activity::Dormant {
192                  // TODO: This can tell the user several times that the problem
193                  // did not occur! Perhaps we only want to flush once on dormant,
194                  // and then not report the dormant condition again until we are
195                  // no longer tracking it.  Or perhaps we should lower the
196                  // responsibility for deciding when to log and when to uninstall
197                  // to the Loggable?
198                  match dormant_since {
199                      Some(when) => {
200                          if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
201                              if dormant_for >= RESET_AFTER_DORMANT_FOR {
202                                  inner.task_running = false;
203                                  return;
204                              }
205                          }
206                      }
207                      None => {
208                          dormant_since = Some(rt_support.now());
209                      }
210                  }
211              } else {
212                  dormant_since = None;
213              }
214          }
215      }
216  
217      unreachable!("timeout_sequence returned a finite sequence");
218  }
219  
220  // TODO : Write some tests.