clock.rs
1 //! Clock abstraction. 2 //! 3 //! The pipeline’s dedupe uses processing time, not event time. 4 //! For tests, we want a deterministic, manually-advancable clock. 5 6 use std::sync::{Arc, Mutex}; 7 use std::time::{SystemTime, UNIX_EPOCH}; 8 9 /// A simple millisecond clock for processing-time logic. 10 pub trait Clock: Clone + Send + Sync + 'static { 11 fn now_ms(&self) -> u64; 12 } 13 14 /// Uses system time (ms since UNIX epoch). 15 #[derive(Clone, Debug, Default)] 16 pub struct SystemClock; 17 18 impl Clock for SystemClock { 19 fn now_ms(&self) -> u64 { 20 SystemTime::now() 21 .duration_since(UNIX_EPOCH) 22 .unwrap_or_default() 23 .as_millis() as u64 24 } 25 } 26 27 /// Deterministic clock for tests (and for simulations). 28 #[derive(Clone, Debug)] 29 pub struct ManualClock { 30 inner_ms: Arc<Mutex<u64>>, 31 } 32 33 impl ManualClock { 34 pub fn new(start_ms: u64) -> Self { 35 Self { 36 inner_ms: Arc::new(Mutex::new(start_ms)), 37 } 38 } 39 40 pub fn advance_ms(&self, delta_ms: u64) { 41 let mut guard = self.inner_ms.lock().expect("manual clock poisoned"); 42 *guard = guard.saturating_add(delta_ms); 43 } 44 45 pub fn set_ms(&self, ms: u64) { 46 let mut guard = self.inner_ms.lock().expect("manual clock poisoned"); 47 *guard = ms; 48 } 49 } 50 51 impl Clock for ManualClock { 52 fn now_ms(&self) -> u64 { 53 *self.inner_ms.lock().expect("manual clock poisoned") 54 } 55 }