/ abzu-router / src / reputation.rs
reputation.rs
  1  //! Peer Reputation System
  2  //!
  3  //! Provides Byzantine Fault Tolerance by tracking peer behavior and 
  4  //! penalizing misbehaving nodes. This addresses the "honest-but-curious"
  5  //! assumption by detecting and isolating actively malicious peers.
  6  //!
  7  //! **Behaviors Tracked**:
  8  //! - Routing success/failure (did they forward our packets?)
  9  //! - Message verification (did they send valid, signed messages?)
 10  //! - Spam detection (are they flooding the network?)
 11  //! - Availability (do they respond to keepalives?)
 12  //!
 13  //! **Penalties**:
 14  //! - Low reputation peers are deprioritized in routing
 15  //! - Severely misbehaving peers are temporarily banned
 16  //! - Reputation recovers slowly over time (forgiveness)
 17  
 18  use serde::{Deserialize, Serialize};
 19  use std::collections::HashMap;
 20  
 21  /// Peer identifier
 22  pub type PeerKey = [u8; 32];
 23  
 24  /// Reputation score bounds
 25  pub const REPUTATION_MIN: i32 = -1000;
 26  pub const REPUTATION_MAX: i32 = 1000;
 27  pub const REPUTATION_INITIAL: i32 = 100;
 28  
 29  /// Threshold below which peer is temporarily banned
 30  pub const REPUTATION_BAN_THRESHOLD: i32 = -500;
 31  
 32  /// Reputation recovery rate (points per minute)
 33  pub const REPUTATION_RECOVERY_RATE: i32 = 1;
 34  
 35  /// Score adjustments for various behaviors
 36  #[derive(Debug, Clone, Copy)]
 37  pub struct ReputationConfig {
 38      /// Reward for successfully forwarding a packet
 39      pub route_success: i32,
 40      /// Penalty for failing to forward when expected
 41      pub route_failure: i32,
 42      /// Penalty for sending invalid/malformed message
 43      pub invalid_message: i32,
 44      /// Penalty for excessive message rate (spam)
 45      pub spam_penalty: i32,
 46      /// Reward for responding to keepalive
 47      pub keepalive_response: i32,
 48      /// Penalty for missing keepalive
 49      pub keepalive_timeout: i32,
 50      /// Penalty for replaying old messages
 51      pub replay_attempt: i32,
 52  }
 53  
 54  impl Default for ReputationConfig {
 55      fn default() -> Self {
 56          Self {
 57              route_success: 1,
 58              route_failure: -10,
 59              invalid_message: -50,
 60              spam_penalty: -20,
 61              keepalive_response: 1,
 62              keepalive_timeout: -5,
 63              replay_attempt: -100,
 64          }
 65      }
 66  }
 67  
 68  /// Reputation entry for a single peer
 69  #[derive(Debug, Clone, Serialize, Deserialize)]
 70  pub struct PeerReputation {
 71      /// Current reputation score
 72      pub score: i32,
 73      /// Total successful routes through this peer
 74      pub routes_success: u64,
 75      /// Total failed routes through this peer
 76      pub routes_failed: u64,
 77      /// Number of invalid messages received
 78      pub invalid_messages: u64,
 79      /// When we first saw this peer (unix millis)
 80      pub first_seen: u64,
 81      /// When we last adjusted their score (unix millis)
 82      pub last_update: u64,
 83      /// If banned, when the ban expires (unix millis)
 84      pub banned_until: Option<u64>,
 85  }
 86  
 87  impl PeerReputation {
 88      /// Create a new reputation entry
 89      pub fn new(now_ms: u64) -> Self {
 90          Self {
 91              score: REPUTATION_INITIAL,
 92              routes_success: 0,
 93              routes_failed: 0,
 94              invalid_messages: 0,
 95              first_seen: now_ms,
 96              last_update: now_ms,
 97              banned_until: None,
 98          }
 99      }
100  
101      /// Check if peer is currently banned
102      pub fn is_banned(&self, now_ms: u64) -> bool {
103          self.banned_until.is_some_and(|until| now_ms < until)
104      }
105  
106      /// Apply reputation recovery over time
107      pub fn apply_recovery(&mut self, now_ms: u64) {
108          let elapsed_mins = (now_ms.saturating_sub(self.last_update)) / 60_000;
109          if elapsed_mins > 0 {
110              let recovery = (elapsed_mins as i32).saturating_mul(REPUTATION_RECOVERY_RATE);
111              self.score = (self.score + recovery).clamp(REPUTATION_MIN, REPUTATION_MAX);
112              self.last_update = now_ms;
113          }
114      }
115  }
116  
117  /// Reputation tracker for all peers
118  #[derive(Debug, Clone, Default)]
119  pub struct ReputationTracker {
120      /// Reputation entries by peer key
121      peers: HashMap<PeerKey, PeerReputation>,
122      /// Configuration
123      config: ReputationConfig,
124  }
125  
126  impl ReputationTracker {
127      /// Create a new tracker
128      pub fn new() -> Self {
129          Self::default()
130      }
131  
132      /// Create with custom config
133      pub fn with_config(config: ReputationConfig) -> Self {
134          Self {
135              peers: HashMap::new(),
136              config,
137          }
138      }
139  
140      /// Get or create reputation entry for a peer
141      fn get_or_create(&mut self, peer: &PeerKey, now_ms: u64) -> &mut PeerReputation {
142          self.peers
143              .entry(*peer)
144              .or_insert_with(|| PeerReputation::new(now_ms))
145      }
146  
147      /// Record a successful route through a peer
148      pub fn route_success(&mut self, peer: &PeerKey, now_ms: u64) {
149          let adjustment = self.config.route_success;
150          let rep = self.get_or_create(peer, now_ms);
151          rep.routes_success += 1;
152          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
153          rep.last_update = now_ms;
154      }
155  
156      /// Record a failed route through a peer
157      pub fn route_failure(&mut self, peer: &PeerKey, now_ms: u64) {
158          let adjustment = self.config.route_failure;
159          let rep = self.get_or_create(peer, now_ms);
160          rep.routes_failed += 1;
161          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
162          rep.last_update = now_ms;
163          self.check_ban(peer, now_ms);
164      }
165  
166      /// Record an invalid message from a peer
167      pub fn invalid_message(&mut self, peer: &PeerKey, now_ms: u64) {
168          let adjustment = self.config.invalid_message;
169          let rep = self.get_or_create(peer, now_ms);
170          rep.invalid_messages += 1;
171          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
172          rep.last_update = now_ms;
173          self.check_ban(peer, now_ms);
174      }
175  
176      /// Record spam behavior from a peer
177      pub fn spam_detected(&mut self, peer: &PeerKey, now_ms: u64) {
178          let adjustment = self.config.spam_penalty;
179          let rep = self.get_or_create(peer, now_ms);
180          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
181          rep.last_update = now_ms;
182          self.check_ban(peer, now_ms);
183      }
184  
185      /// Record a keepalive response
186      pub fn keepalive_received(&mut self, peer: &PeerKey, now_ms: u64) {
187          let adjustment = self.config.keepalive_response;
188          let rep = self.get_or_create(peer, now_ms);
189          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
190          rep.last_update = now_ms;
191      }
192  
193      /// Record a keepalive timeout
194      pub fn keepalive_timeout(&mut self, peer: &PeerKey, now_ms: u64) {
195          let adjustment = self.config.keepalive_timeout;
196          let rep = self.get_or_create(peer, now_ms);
197          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
198          rep.last_update = now_ms;
199      }
200  
201      /// Record a replay attempt
202      pub fn replay_attempt(&mut self, peer: &PeerKey, now_ms: u64) {
203          let adjustment = self.config.replay_attempt;
204          let rep = self.get_or_create(peer, now_ms);
205          rep.score = (rep.score + adjustment).clamp(REPUTATION_MIN, REPUTATION_MAX);
206          rep.last_update = now_ms;
207          self.check_ban(peer, now_ms);
208      }
209  
210      /// Check if peer should be banned
211      fn check_ban(&mut self, peer: &PeerKey, now_ms: u64) {
212          if let Some(rep) = self.peers.get_mut(peer)
213              && rep.score < REPUTATION_BAN_THRESHOLD && rep.banned_until.is_none() {
214                  // Ban for 1 hour
215                  rep.banned_until = Some(now_ms + 3_600_000);
216              }
217      }
218  
219      /// Check if a peer is currently banned
220      pub fn is_banned(&self, peer: &PeerKey, now_ms: u64) -> bool {
221          self.peers
222              .get(peer)
223              .is_some_and(|rep| rep.is_banned(now_ms))
224      }
225  
226      /// Get reputation score for a peer
227      pub fn get_score(&self, peer: &PeerKey) -> Option<i32> {
228          self.peers.get(peer).map(|rep| rep.score)
229      }
230  
231      /// Get full reputation info for a peer
232      pub fn get_reputation(&self, peer: &PeerKey) -> Option<&PeerReputation> {
233          self.peers.get(peer)
234      }
235  
236      /// Apply reputation recovery to all peers
237      pub fn apply_all_recovery(&mut self, now_ms: u64) {
238          for rep in self.peers.values_mut() {
239              rep.apply_recovery(now_ms);
240          }
241      }
242  
243      /// Remove expired bans
244      pub fn clear_expired_bans(&mut self, now_ms: u64) {
245          for rep in self.peers.values_mut() {
246              if rep.banned_until.is_some_and(|until| now_ms >= until) {
247                  rep.banned_until = None;
248              }
249          }
250      }
251  
252      /// Get peers sorted by reputation (best first)
253      pub fn peers_by_reputation(&self) -> Vec<(PeerKey, i32)> {
254          let mut peers: Vec<_> = self.peers
255              .iter()
256              .map(|(k, v)| (*k, v.score))
257              .collect();
258          peers.sort_by(|a, b| b.1.cmp(&a.1));
259          peers
260      }
261  
262      /// Get statistics
263      pub fn stats(&self) -> ReputationStats {
264          let banned_count = self.peers.values().filter(|r| r.banned_until.is_some()).count();
265          let avg_score = if self.peers.is_empty() {
266              0
267          } else {
268              self.peers.values().map(|r| r.score as i64).sum::<i64>() / self.peers.len() as i64
269          };
270          
271          ReputationStats {
272              tracked_peers: self.peers.len(),
273              banned_peers: banned_count,
274              avg_score: avg_score as i32,
275          }
276      }
277  }
278  
279  /// Reputation system statistics
280  #[derive(Debug, Clone)]
281  pub struct ReputationStats {
282      pub tracked_peers: usize,
283      pub banned_peers: usize,
284      pub avg_score: i32,
285  }
286  
287  #[cfg(test)]
288  mod tests {
289      use super::*;
290  
291      fn peer(seed: u8) -> PeerKey {
292          let mut key = [0u8; 32];
293          key[0] = seed;
294          key
295      }
296  
297      #[test]
298      fn test_initial_reputation() {
299          let mut tracker = ReputationTracker::new();
300          let p = peer(1);
301          
302          tracker.keepalive_received(&p, 1000);
303          
304          let score = tracker.get_score(&p).unwrap();
305          assert_eq!(score, REPUTATION_INITIAL + 1);
306      }
307  
308      #[test]
309      fn test_ban_threshold() {
310          let mut tracker = ReputationTracker::new();
311          let p = peer(1);
312          
313          // Send many invalid messages to trigger ban
314          for _ in 0..15 {
315              tracker.invalid_message(&p, 1000);
316          }
317          
318          assert!(tracker.is_banned(&p, 1000));
319      }
320  
321      #[test]
322      fn test_recovery() {
323          let mut tracker = ReputationTracker::new();
324          let p = peer(1);
325          
326          // Damage reputation
327          tracker.invalid_message(&p, 0);
328          let damaged = tracker.get_score(&p).unwrap();
329          
330          // Wait 5 minutes
331          tracker.apply_all_recovery(300_000);
332          
333          let recovered = tracker.get_score(&p).unwrap();
334          assert!(recovered > damaged);
335      }
336  
337      #[test]
338      fn test_sorting() {
339          let mut tracker = ReputationTracker::new();
340          
341          tracker.keepalive_received(&peer(1), 0);
342          tracker.keepalive_received(&peer(1), 0);
343          tracker.invalid_message(&peer(2), 0);
344          
345          let sorted = tracker.peers_by_reputation();
346          
347          assert_eq!(sorted[0].0, peer(1)); // Best first
348          assert_eq!(sorted[1].0, peer(2));
349      }
350  }