/ src / net / metering.rs
metering.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::collections::VecDeque;
 20  
 21  use tracing::debug;
 22  
 23  use crate::util::time::NanoTimestamp;
 24  
 25  /// Struct representing metering configuration parameters.
 26  #[derive(Clone, Debug)]
 27  pub struct MeteringConfiguration {
 28      /// Defines the threshold after which rate limit kicks in.
 29      /// Set to 0 for no threshold.
 30      ///
 31      /// If we don't use raw count as our metric, it should be calculated
 32      /// by multiplying the median increase of the measured item with the
 33      /// "max" number of items we want before rate limit starts.
 34      /// For example, if we measure some item that increases our total
 35      /// measurement by ~5 and want to rate limit after about 10, this
 36      /// should be set as 50.
 37      pub threshold: u64,
 38      /// Sleep time for each unit over the threshold, in milliseconds.
 39      ///
 40      /// This is used to calculate sleep time when ratelimit is active.
 41      /// The computed sleep time when we are over the threshold will be:
 42      ///     sleep_time = (total - threshold) * sleep_step
 43      pub sleep_step: u64,
 44      /// Parameter defining the expiration of each item, for time based
 45      /// decay, in nano seconds. Set to 0 for no expiration.
 46      pub expiry_time: NanoTimestamp,
 47  }
 48  
 49  impl MeteringConfiguration {
 50      /// Generate a new `MeteringConfiguration` for provided threshold,
 51      /// sleep step and expiration time (seconds).
 52      pub fn new(threshold: u64, sleep_step: u64, expiry_time: u128) -> Self {
 53          Self { threshold, sleep_step, expiry_time: NanoTimestamp::from_secs(expiry_time) }
 54      }
 55  }
 56  
 57  impl Default for MeteringConfiguration {
 58      fn default() -> Self {
 59          Self { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) }
 60      }
 61  }
 62  
 63  /// Default `MeteringConfiguration` as a constant,
 64  /// so it can be used in trait macros.
 65  pub const DEFAULT_METERING_CONFIGURATION: MeteringConfiguration =
 66      MeteringConfiguration { threshold: 0, sleep_step: 0, expiry_time: NanoTimestamp(0) };
 67  
 68  /// Struct to keep track of some sequential metered actions and compute
 69  /// rate limits.
 70  ///
 71  /// The queue uses a time based decay and prunes metering information
 72  /// after corresponding expiration time has passed.
 73  #[derive(Debug)]
 74  pub struct MeteringQueue {
 75      /// Metering configuration of the queue.
 76      config: MeteringConfiguration,
 77      /// Ring buffer keeping track of action execution timestamp and
 78      /// its metered value.
 79      queue: VecDeque<(NanoTimestamp, u64)>,
 80  }
 81  
 82  impl MeteringQueue {
 83      /// Generate a new `MeteringQueue` for provided `MeteringConfiguration`.
 84      pub fn new(config: MeteringConfiguration) -> Self {
 85          Self { config, queue: VecDeque::new() }
 86      }
 87  
 88      /// Prune expired metering information from the queue.
 89      pub fn clean(&mut self) {
 90          // Check if expiration has been set
 91          if self.config.expiry_time.0 == 0 {
 92              return
 93          }
 94  
 95          // Iterate the queue to cleanup expired elements
 96          while let Some((ts, _)) = self.queue.front() {
 97              // This is an edge case where system reports a future timestamp
 98              // therefore elapsed computation fails.
 99              let Ok(elapsed) = ts.elapsed() else {
100                  debug!(target: "net::metering::MeteringQueue::clean()", "Timestamp [{ts}] is in future. Removing...");
101                  let _ = self.queue.pop_front();
102                  continue
103              };
104  
105              // Check if elapsed time is over the expiration limit
106              if elapsed < self.config.expiry_time {
107                  break
108              }
109  
110              // Remove element
111              let _ = self.queue.pop_front();
112          }
113      }
114  
115      /// Add new metering value to the queue, after
116      /// prunning expired metering information.
117      /// If no threshold has been set, the insert is
118      /// ignored.
119      pub fn push(&mut self, value: &u64) {
120          // Check if threshold has been set
121          if self.config.threshold == 0 {
122              return
123          }
124  
125          // Prune expired elements
126          self.clean();
127  
128          // Push the new value
129          self.queue.push_back((NanoTimestamp::current_time(), *value));
130      }
131  
132      /// Compute the current metered values total.
133      pub fn total(&self) -> u64 {
134          let mut total = 0;
135          for (_, value) in &self.queue {
136              total += value;
137          }
138          total
139      }
140  
141      /// Compute sleep time for current metered values total, based on
142      /// the metering configuration.
143      ///
144      /// The sleep time increases linearly, based on configuration sleep
145      /// step. For example, in a raw count metering model, if we set the
146      /// configuration with threshold = 6 and sleep_step = 250, when
147      /// total = 10, returned sleep time will be 1000 ms.
148      ///
149      /// Sleep times table for the above example:
150      ///
151      /// | Total | Sleep Time (ms) |
152      /// |-------|-----------------|
153      /// | 0     | 0               |
154      /// | 4     | 0               |
155      /// | 6     | 0               |
156      /// | 7     | 250             |
157      /// | 8     | 500             |
158      /// | 9     | 750             |
159      /// | 10    | 1000            |
160      /// | 14    | 2000            |
161      /// | 18    | 3000            |
162      pub fn sleep_time(&self) -> Option<u64> {
163          // Check if threshold has been set
164          if self.config.threshold == 0 {
165              return None
166          }
167  
168          // Check if we are over the threshold
169          let total = self.total();
170          if total < self.config.threshold {
171              return None
172          }
173  
174          // Compute the actual sleep time
175          Some((total - self.config.threshold) * self.config.sleep_step)
176      }
177  }
178  
179  #[test]
180  fn test_net_metering_queue_default() {
181      let mut queue = MeteringQueue::new(MeteringConfiguration::default());
182      for _ in 0..100 {
183          queue.push(&1);
184          assert!(queue.queue.is_empty());
185          assert_eq!(queue.total(), 0);
186          assert!(queue.sleep_time().is_none());
187      }
188  }
189  
190  #[test]
191  fn test_net_metering_queue_raw_count() {
192      let threshold = 6;
193      let sleep_step = 250;
194      let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
195      let mut queue = MeteringQueue::new(metering_configuration);
196      for i in 1..threshold {
197          queue.push(&1);
198          assert_eq!(queue.total(), i);
199          assert!(queue.sleep_time().is_none());
200      }
201      for i in threshold..100 {
202          queue.push(&1);
203          assert_eq!(queue.total(), i);
204          assert_eq!(queue.sleep_time(), Some((i - threshold) * sleep_step));
205      }
206  }
207  
208  #[test]
209  fn test_net_metering_queue_sleep_time() {
210      let metered_value_median = 5;
211      let threshold_items = 10;
212      let threshold = metered_value_median * threshold_items;
213      let sleep_step = 50;
214      let metering_configuration = MeteringConfiguration::new(threshold, sleep_step, 0);
215      let mut queue = MeteringQueue::new(metering_configuration);
216      for i in 1..threshold_items {
217          queue.push(&metered_value_median);
218          assert_eq!(queue.total(), (i * metered_value_median));
219          assert!(queue.sleep_time().is_none());
220      }
221      for i in threshold_items..100 {
222          queue.push(&metered_value_median);
223          let expected_total = i * metered_value_median;
224          assert_eq!(queue.total(), expected_total);
225          assert_eq!(queue.sleep_time(), Some((expected_total - threshold) * sleep_step));
226      }
227  }