ratelim.rs
1 //! Implement half of log rate-limiting: the ability to cause the state of a 2 //! Loggable to get flushed at appropriate intervals. 3 4 use super::{Activity, Loggable}; 5 use futures::task::SpawnExt as _; 6 use std::{ 7 sync::{Arc, Mutex}, 8 time::Duration, 9 }; 10 use tor_error::ErrorReport; 11 12 /// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we 13 /// can install it globally. 14 pub(crate) mod rt { 15 use futures::{future::BoxFuture, task::Spawn}; 16 use once_cell::sync::OnceCell; 17 use std::time::{Duration, Instant}; 18 19 /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting. 20 pub trait RuntimeSupport: Spawn + 'static + Sync + Send { 21 /// Return a future that will yield () after `duration` has passed. 22 fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>; 23 24 /// Return the current time as an Instant. 25 fn now(&self) -> Instant; 26 } 27 28 impl<R: tor_rtcompat::Runtime> RuntimeSupport for R { 29 fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> { 30 Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration)) 31 } 32 fn now(&self) -> Instant { 33 tor_rtcompat::SleepProvider::now(self) 34 } 35 } 36 37 /// A global view of our runtime, used for rate-limited logging. 38 // TODO MSRV 1.70: We could use OnceSync instead. 39 static RUNTIME_SUPPORT: OnceCell<Box<dyn RuntimeSupport>> = OnceCell::new(); 40 41 /// Try to install `runtime` as a global runtime to be used for rate-limited logging. 42 /// 43 /// Return an error (and make no changes) if there there was already a runtime installed. 44 pub fn install_runtime<R: tor_rtcompat::Runtime>( 45 runtime: R, 46 ) -> Result<(), InstallRuntimeError> { 47 let rt = Box::new(runtime); 48 RUNTIME_SUPPORT 49 .set(rt) 50 .map_err(|_| InstallRuntimeError::DuplicateCall) 51 } 52 53 /// An error that occurs while installing a runtime. 54 #[derive(Clone, Debug, thiserror::Error)] 55 #[non_exhaustive] 56 pub enum InstallRuntimeError { 57 /// Tried to install a runtime when there was already one installed. 58 #[error("Called tor_log_ratelim::install_runtime() more than once")] 59 DuplicateCall, 60 } 61 62 /// Return the installed runtime, if there is one. 63 pub fn rt_support() -> Option<&'static dyn RuntimeSupport> { 64 RUNTIME_SUPPORT.get().map(Box::as_ref) 65 } 66 } 67 68 /// A rate-limited wrapper around a [`Loggable`]` that ensures its events are 69 /// flushed from time to time. 70 pub struct RateLim<T> { 71 /// The Loggable itself. 72 inner: Mutex<Inner<T>>, 73 } 74 75 /// The mutable state of a [`RateLim`]. 76 struct Inner<T> { 77 /// The loggable state whose reports are rate-limited 78 loggable: T, 79 /// True if we have a running task that is collating reports for `loggable`. 80 task_running: bool, 81 } 82 83 impl<T: Loggable> RateLim<T> { 84 /// Create a new `RateLim` to flush events for `loggable`. 85 pub fn new(loggable: T) -> Arc<Self> { 86 Arc::new(RateLim { 87 inner: Mutex::new(Inner { 88 loggable, 89 task_running: false, 90 }), 91 }) 92 } 93 94 /// Adjust the status of this reporter's `Loggable` by calling `f` on it, 95 /// but only if it is already scheduled to report itself. Otherwise, do nothing. 96 /// 97 /// This is the appropriate function to use for tracking successes.f 98 pub fn nonevent<F>(&self, f: F) 99 where 100 F: FnOnce(&mut T), 101 { 102 let mut inner = self.inner.lock().expect("lock poisoned"); 103 if inner.task_running { 104 f(&mut inner.loggable); 105 } 106 } 107 108 /// Add an event to this rate-limited reporter by calling `f` on it, and 109 /// schedule it to be reported after an appropriate time. 110 /// 111 /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden, 112 /// we may want to make it call rt_support() directly again, as it did in 113 /// earlier visions. 114 pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F) 115 where 116 F: FnOnce(&mut T), 117 { 118 let mut inner = self.inner.lock().expect("poisoned lock"); 119 f(&mut inner.loggable); 120 121 if !inner.task_running { 122 // Launch a task to make periodic reports on the state of our Loggable. 123 inner.task_running = true; 124 if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) { 125 // We couldn't spawn a task; we have to flush the state 126 // immediately. 127 // 128 // TODO: This behavior is undesirable if it causes us to spam 129 // the log while we are shutting down. On the other hand, it's 130 // also undesirable if we suppress our logs while we're 131 // shutting down. 132 inner.loggable.flush(Duration::default()); 133 tracing::warn!("Also, unable to spawn a logging task: {}", e.report()); 134 } 135 } 136 } 137 } 138 139 /// After approximately this many seconds of not having anything to report, we 140 /// should reset our timeout schedule. 141 const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0); 142 143 /// Return an iterator of reasonable amounts of time to summarize. 144 /// 145 /// We summarize short intervals at first, and back off as the event keeps 146 /// happening. 147 fn timeout_sequence() -> impl Iterator<Item = Duration> { 148 /// seconds per second. 149 const SEC: u64 = 1; 150 /// seconds per minute. 151 const MIN: u64 = 60; 152 /// seconds per hour 153 const HOUR: u64 = 3600; 154 [ 155 5 * SEC, 156 MIN, 157 5 * MIN, 158 30 * MIN, 159 30 * MIN, 160 HOUR, 161 HOUR, 162 4 * HOUR, 163 4 * HOUR, 164 ] 165 .into_iter() 166 .chain(std::iter::repeat(24 * HOUR)) 167 .map(|secs| Duration::new(secs, 0)) 168 } 169 170 /// Helper: runs in a background task, and periodically flushes the `Loggable` 171 /// in `ratelim`. Exits after [`Loggable::flush`] returns [`Activity::Dormant`] 172 /// for "long enough". 173 async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>) 174 // TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take 175 // a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly. 176 where 177 T: Loggable, 178 { 179 let mut dormant_since = None; 180 for duration in timeout_sequence() { 181 rt_support.sleep(duration).await; 182 { 183 let mut inner = ratelim.inner.lock().expect("Lock poisoned"); 184 debug_assert!(inner.task_running); 185 // NOTE: We say that we are summarizing "duration" on the theory 186 // that we actually slept for "duration". But maybe `sleep` slept 187 // for a little more or less? Nonetheless, we report this as if we 188 // had slept for the exact amount, since the alternative appears to 189 // be saying stuff like "this problem occurred 8/12 times in the 190 // last 10min 0.0014ssec" instead of "10m". 191 if inner.loggable.flush(duration) == Activity::Dormant { 192 // TODO: This can tell the user several times that the problem 193 // did not occur! Perhaps we only want to flush once on dormant, 194 // and then not report the dormant condition again until we are 195 // no longer tracking it. Or perhaps we should lower the 196 // responsibility for deciding when to log and when to uninstall 197 // to the Loggable? 198 match dormant_since { 199 Some(when) => { 200 if let Some(dormant_for) = rt_support.now().checked_duration_since(when) { 201 if dormant_for >= RESET_AFTER_DORMANT_FOR { 202 inner.task_running = false; 203 return; 204 } 205 } 206 } 207 None => { 208 dormant_since = Some(rt_support.now()); 209 } 210 } 211 } else { 212 dormant_since = None; 213 } 214 } 215 } 216 217 unreachable!("timeout_sequence returned a finite sequence"); 218 } 219 220 // TODO : Write some tests.