pubsub.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use rand::{rngs::OsRng, Rng}; 20 use std::{ 21 collections::HashMap, 22 sync::{Arc, Mutex}, 23 }; 24 25 use crate::error::{Error, Result}; 26 27 pub type SubscriptionId = usize; 28 29 // Waiting for trait aliases 30 pub trait Piped: Clone + Send + 'static {} 31 impl<T> Piped for T where T: Clone + Send + 'static {} 32 33 #[derive(Debug)] 34 /// Subscription to the Publisher. Created using `publisher.subscribe().await`. 35 pub struct Subscription<T: Piped> { 36 id: SubscriptionId, 37 recv_queue: smol::channel::Receiver<T>, 38 parent: Arc<Publisher<T>>, 39 } 40 41 impl<T: Piped> Subscription<T> { 42 pub fn get_id(&self) -> SubscriptionId { 43 self.id 44 } 45 46 /// Receive message. 47 pub async fn receive(&self) -> Result<T> { 48 let msg_result = self.recv_queue.recv().await; 49 msg_result.or(Err(Error::PublisherDestroyed)) 50 } 51 } 52 53 impl<T: Piped> Drop for Subscription<T> { 54 fn drop(&mut self) { 55 self.parent.unsubscribe(self.id) 56 } 57 } 58 59 pub type PublisherPtr<T> = Arc<Publisher<T>>; 60 61 #[derive(Debug)] 62 pub struct Publisher<T> { 63 subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>, 64 } 65 66 impl<T: Piped> Publisher<T> { 67 pub fn new() -> Arc<Self> { 68 Arc::new(Self { subs: Mutex::new(HashMap::new()) }) 69 } 70 71 pub fn subscribe(self: Arc<Self>) -> Subscription<T> { 72 let (sendr, recvr) = smol::channel::unbounded(); 73 let sub_id = OsRng.gen(); 74 // Optional to check whether this ID already exists. 75 // It is nearly impossible to ever happen. 76 self.subs.lock().unwrap().insert(sub_id, sendr); 77 78 Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() } 79 } 80 81 fn unsubscribe(&self, sub_id: SubscriptionId) { 82 self.subs.lock().unwrap().remove(&sub_id); 83 } 84 85 /// Publish a message to subscriptions in the include list 86 pub fn notify_with_include(&self, message_result: T, include_list: &[SubscriptionId]) { 87 // Maybe we should just provide a method to get all IDs 88 // Then people can call notify_with_exclude() instead. 89 // TODO: just collect and clone directly into a Vec 90 let subs = self.subs.lock().unwrap().clone(); 91 for (id, sub) in subs.into_iter() { 92 if !include_list.contains(&id) { 93 continue 94 } 95 96 if let Err(e) = sub.try_send(message_result.clone()) { 97 panic!("[system::publisher] Error returned sending message in notify_with_include() call! {}", e); 98 } 99 } 100 } 101 102 /// Publish a message to all listening subscriptions. 103 pub fn notify(&self, msg: T) { 104 let subs = self.subs.lock().unwrap().clone(); 105 for (id, sub) in subs { 106 if let Err(e) = sub.try_send(msg.clone()) { 107 // This should never happen since Drop calls unsubscribe() 108 panic!("Error in notify() call for sub={}! {}", id, e); 109 } 110 } 111 } 112 }