/ src / clock.rs
clock.rs
 1  //! Clock abstraction.
 2  //!
 3  //! The pipeline’s dedupe uses processing time, not event time.
 4  //! For tests, we want a deterministic, manually-advancable clock.
 5  
 6  use std::sync::{Arc, Mutex};
 7  use std::time::{SystemTime, UNIX_EPOCH};
 8  
 9  /// A simple millisecond clock for processing-time logic.
10  pub trait Clock: Clone + Send + Sync + 'static {
11      fn now_ms(&self) -> u64;
12  }
13  
14  /// Uses system time (ms since UNIX epoch).
15  #[derive(Clone, Debug, Default)]
16  pub struct SystemClock;
17  
18  impl Clock for SystemClock {
19      fn now_ms(&self) -> u64 {
20          SystemTime::now()
21              .duration_since(UNIX_EPOCH)
22              .unwrap_or_default()
23              .as_millis() as u64
24      }
25  }
26  
27  /// Deterministic clock for tests (and for simulations).
28  #[derive(Clone, Debug)]
29  pub struct ManualClock {
30      inner_ms: Arc<Mutex<u64>>,
31  }
32  
33  impl ManualClock {
34      pub fn new(start_ms: u64) -> Self {
35          Self {
36              inner_ms: Arc::new(Mutex::new(start_ms)),
37          }
38      }
39  
40      pub fn advance_ms(&self, delta_ms: u64) {
41          let mut guard = self.inner_ms.lock().expect("manual clock poisoned");
42          *guard = guard.saturating_add(delta_ms);
43      }
44  
45      pub fn set_ms(&self, ms: u64) {
46          let mut guard = self.inner_ms.lock().expect("manual clock poisoned");
47          *guard = ms;
48      }
49  }
50  
51  impl Clock for ManualClock {
52      fn now_ms(&self) -> u64 {
53          *self.inner_ms.lock().expect("manual clock poisoned")
54      }
55  }