metering.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::collections::VecDeque; 20 21 use tracing::debug; 22 23 use crate::util::time::NanoTimestamp; 24 25 /// Struct representing metering configuration parameters. 26 #[derive(Clone, Debug)] 27 pub struct MeteringConfiguration { 28 /// Defines the threshold after which rate limit kicks in. 29 /// Set to 0 for no threshold. 30 /// 31 /// If we don't use raw count as our metric, it should be calculated 32 /// by multiplying the median increase of the measured item with the 33 /// "max" number of items we want before rate limit starts. 34 /// For example, if we measure some item that increases our total 35 /// measurement by ~5 and want to rate limit after about 10, this 36 /// should be set as 50. 37 pub threshold: u64, 38 /// Sleep time for each unit over the threshold, in milliseconds. 39 /// 40 /// This is used to calculate sleep time when ratelimit is active. 41 /// The computed sleep time when we are over the threshold will be: 42 /// sleep_time = (total - threshold) * sleep_step 43 pub sleep_step: u64, 44 /// Parameter defining the expiration of each item, for time based 45 /// decay, in nano seconds. Set to 0 for no expiration. 46 pub expiry_time: NanoTimestamp, 47 } 48 49 impl MeteringConfiguration { 50 /// Generate a new `MeteringConfiguration` for provided threshold, 51 /// sleep step and expiration time (seconds). 52 pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self { 53 Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) } 54 } 55 } 56 57 impl Default for MeteringConfiguration { 58 fn default() -> Self { 59 Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) } 60 } 61 } 62 63 /// Default `MeteringConfiguration` as a constant, 64 /// so it can be used in trait macros. 65 pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration = 66 MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }; 67 68 /// Struct to keep track of some sequential metered actions and compute 69 /// rate limits. 70 /// 71 /// The queue uses a time based decay and prunes metering information 72 /// after corresponding expiration time has passed. 73 #[derive(Debug)] 74 pub struct MeteringQueue { 75 /// Metering configuration of the queue. 76 config: MeteringConfiguration, 77 /// Ring buffer keeping track of action execution timestamp and 78 /// its metered value. 79 queue: VecDeque<(NanoTimestamp, u64)>, 80 } 81 82 impl MeteringQueue { 83 /// Generate a new `MeteringQueue` for provided `MeteringConfiguration`. 84 pub fn new(config: MeteringConfiguration) -> Self { 85 Self { config, queue: VecDeque::new() } 86 } 87 88 /// Prune expired metering information from the queue. 89 pub fn clean(&mut self) { 90 // Check if expiration has been set 91 if self.config.expiry_time.0 == 0 { 92 return 93 } 94 95 // Iterate the queue to cleanup expired elements 96 while let Some((ts, _)) = self.queue.front() { 97 // This is an edge case where system reports a future timestamp 98 // therefore elapsed computation fails. 99 let Ok(elapsed) = ts.elapsed() else { 100 debug!(target: "net::metering::MeteringQueue::clean()", "Timestamp [{ts}] is in future. Removing..."); 101 let _ = self.queue.pop_front(); 102 continue 103 }; 104 105 // Check if elapsed time is over the expiration limit 106 if elapsed < self.config.expiry_time { 107 break 108 } 109 110 // Remove element 111 let _ = self.queue.pop_front(); 112 } 113 } 114 115 /// Add new metering value to the queue, after 116 /// prunning expired metering information. 117 /// If no threshold has been set, the insert is 118 /// ignored. 119 pub fn push(&mut self, value: &u64) { 120 // Check if threshold has been set 121 if self.config.threshold == 0 { 122 return 123 } 124 125 // Prune expired elements 126 self.clean(); 127 128 // Push the new value 129 self.queue.push_back((NanoTimestamp::current_time(), *value)); 130 } 131 132 /// Compute the current metered values total. 133 pub fn total(&self) -> u64 { 134 let mut total = 0; 135 for (_, value) in &self.queue { 136 total += value; 137 } 138 total 139 } 140 141 /// Compute sleep time for current metered values total, based on 142 /// the metering configuration. 143 /// 144 /// The sleep time increases linearly, based on configuration sleep 145 /// step. For example, in a raw count metering model, if we set the 146 /// configuration with threshold = 6 and sleep_step = 250, when 147 /// total = 10, returned sleep time will be 1000 ms. 148 /// 149 /// Sleep times table for the above example: 150 /// 151 /// | Total | Sleep Time (ms) | 152 /// |-------|-----------------| 153 /// | 0 | 0 | 154 /// | 4 | 0 | 155 /// | 6 | 0 | 156 /// | 7 | 250 | 157 /// | 8 | 500 | 158 /// | 9 | 750 | 159 /// | 10 | 1000 | 160 /// | 14 | 2000 | 161 /// | 18 | 3000 | 162 pub fn sleep_time(&self) -> Option<u64> { 163 // Check if threshold has been set 164 if self.config.threshold == 0 { 165 return None 166 } 167 168 // Check if we are over the threshold 169 let total = self.total(); 170 if total < self.config.threshold { 171 return None 172 } 173 174 // Compute the actual sleep time 175 Some((total - self.config.threshold) * self.config.sleep_step) 176 } 177 } 178 179 #[test] 180 fn test_net_metering_queue_default() { 181 let mut queue = MeteringQueue::new(MeteringConfiguration::default()); 182 for _ in 0..100 { 183 queue.push(&1); 184 assert!(queue.queue.is_empty()); 185 assert_eq!(queue.total(), 0); 186 assert!(queue.sleep_time().is_none()); 187 } 188 } 189 190 #[test] 191 fn test_net_metering_queue_raw_count() { 192 let threshold = 6; 193 let sleep_step = 250; 194 let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0); 195 let mut queue = MeteringQueue::new(metering_configuration); 196 for i in 1..threshold { 197 queue.push(&1); 198 assert_eq!(queue.total(), i); 199 assert!(queue.sleep_time().is_none()); 200 } 201 for i in threshold..100 { 202 queue.push(&1); 203 assert_eq!(queue.total(), i); 204 assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step)); 205 } 206 } 207 208 #[test] 209 fn test_net_metering_queue_sleep_time() { 210 let metered_value_median = 5; 211 let threshold_items = 10; 212 let threshold = metered_value_median * threshold_items; 213 let sleep_step = 50; 214 let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0); 215 let mut queue = MeteringQueue::new(metering_configuration); 216 for i in 1..threshold_items { 217 queue.push(&metered_value_median); 218 assert_eq!(queue.total(), (i * metered_value_median)); 219 assert!(queue.sleep_time().is_none()); 220 } 221 for i in threshold_items..100 { 222 queue.push(&metered_value_median); 223 let expected_total = i * metered_value_median; 224 assert_eq!(queue.total(), expected_total); 225 assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step)); 226 } 227 }