/ src / ds / topic_filter.rs
topic_filter.rs
 1  //! Transport-agnostic topic filter used by the app as a fast allowlist.
 2  use std::collections::HashSet;
 3  
 4  use tokio::sync::RwLock;
 5  
 6  use crate::ds::SUBTOPICS;
 7  
 8  #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 9  pub struct TopicKey {
10      pub group_id: String,
11      pub subtopic: String,
12  }
13  
14  impl TopicKey {
15      pub fn new(group_id: &str, subtopic: &str) -> Self {
16          Self {
17              group_id: group_id.to_string(),
18              subtopic: subtopic.to_string(),
19          }
20      }
21  }
22  
23  /// Fast allowlist for inbound routing.
24  #[derive(Default, Debug)]
25  pub struct TopicFilter {
26      set: RwLock<HashSet<TopicKey>>,
27  }
28  
29  impl TopicFilter {
30      pub fn new() -> Self {
31          Self::default()
32      }
33  
34      /// Add all subtopics for a group.
35      pub async fn add_many(&self, group_name: &str) {
36          let mut w = self.set.write().await;
37          for sub in SUBTOPICS {
38              w.insert(TopicKey::new(group_name, sub));
39          }
40      }
41  
42      /// Remove all subtopics for a group.
43      pub async fn remove_many(&self, group_name: &str) {
44          self.set.write().await.retain(|x| x.group_id != group_name);
45      }
46  
47      /// Membership test (first-stage filter).
48      #[inline]
49      pub async fn contains(&self, group_id: &str, subtopic: &str) -> bool {
50          let key = TopicKey::new(group_id, subtopic);
51          self.set.read().await.contains(&key)
52      }
53  
54      pub async fn snapshot(&self) -> Vec<TopicKey> {
55          self.set.read().await.iter().cloned().collect()
56      }
57  }