topic_filter.rs
1 //! Transport-agnostic topic filter used by the app as a fast allowlist. 2 use std::collections::HashSet; 3 4 use tokio::sync::RwLock; 5 6 use crate::ds::SUBTOPICS; 7 8 #[derive(Debug, Clone, PartialEq, Eq, Hash)] 9 pub struct TopicKey { 10 pub group_id: String, 11 pub subtopic: String, 12 } 13 14 impl TopicKey { 15 pub fn new(group_id: &str, subtopic: &str) -> Self { 16 Self { 17 group_id: group_id.to_string(), 18 subtopic: subtopic.to_string(), 19 } 20 } 21 } 22 23 /// Fast allowlist for inbound routing. 24 #[derive(Default, Debug)] 25 pub struct TopicFilter { 26 set: RwLock<HashSet<TopicKey>>, 27 } 28 29 impl TopicFilter { 30 pub fn new() -> Self { 31 Self::default() 32 } 33 34 /// Add all subtopics for a group. 35 pub async fn add_many(&self, group_name: &str) { 36 let mut w = self.set.write().await; 37 for sub in SUBTOPICS { 38 w.insert(TopicKey::new(group_name, sub)); 39 } 40 } 41 42 /// Remove all subtopics for a group. 43 pub async fn remove_many(&self, group_name: &str) { 44 self.set.write().await.retain(|x| x.group_id != group_name); 45 } 46 47 /// Membership test (first-stage filter). 48 #[inline] 49 pub async fn contains(&self, group_id: &str, subtopic: &str) -> bool { 50 let key = TopicKey::new(group_id, subtopic); 51 self.set.read().await.contains(&key) 52 } 53 54 pub async fn snapshot(&self) -> Vec<TopicKey> { 55 self.set.read().await.iter().cloned().collect() 56 } 57 }