publisher.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{collections::HashMap, sync::Arc}; 20 21 use rand::{rngs::OsRng, Rng}; 22 use smol::lock::Mutex; 23 use tracing::warn; 24 25 pub type PublisherPtr<T> = Arc<Publisher<T>>; 26 pub type SubscriptionId = usize; 27 28 #[derive(Debug)] 29 /// Subscription to the Publisher. Created using `publisher.subscribe().await`. 30 pub struct Subscription<T> { 31 id: SubscriptionId, 32 recv_queue: smol::channel::Receiver<T>, 33 parent: Arc<Publisher<T>>, 34 } 35 36 impl<T: Clone> Subscription<T> { 37 pub fn get_id(&self) -> SubscriptionId { 38 self.id 39 } 40 41 /// Receive message. 42 pub async fn receive(&self) -> T { 43 let message_result = self.recv_queue.recv().await; 44 45 match message_result { 46 Ok(message_result) => message_result, 47 Err(err) => { 48 panic!("Subscription::receive() recv_queue failed! {err}"); 49 } 50 } 51 } 52 53 /// Must be called manually since async Drop is not possible in Rust 54 pub async fn unsubscribe(&self) { 55 self.parent.clone().unsubscribe(self.id).await 56 } 57 } 58 59 /// Simple broadcast (publish-subscribe) class. 60 #[derive(Debug)] 61 pub struct Publisher<T> { 62 subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>, 63 } 64 65 impl<T: Clone> Publisher<T> { 66 /// Construct a new publisher. 67 pub fn new() -> Arc<Self> { 68 Arc::new(Self { subs: Mutex::new(HashMap::new()) }) 69 } 70 71 fn random_id() -> SubscriptionId { 72 OsRng.gen() 73 } 74 75 /// Make sure you call this method early in your setup. That way the subscription 76 /// will begin accumulating messages from notify. 77 /// Then when your main loop begins calling `sub.receive().await`, the messages will 78 /// already be queued. 79 pub async fn subscribe(self: Arc<Self>) -> Subscription<T> { 80 let (sender, recvr) = smol::channel::unbounded(); 81 82 // Poor-man's do/while 83 let mut subs = self.subs.lock().await; 84 let mut sub_id = Self::random_id(); 85 while subs.contains_key(&sub_id) { 86 sub_id = Self::random_id(); 87 } 88 89 subs.insert(sub_id, sender); 90 91 Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() } 92 } 93 94 async fn unsubscribe(self: Arc<Self>, sub_id: SubscriptionId) { 95 self.subs.lock().await.remove(&sub_id); 96 } 97 98 /// Publish a message to all listening subscriptions. 99 pub async fn notify(&self, message_result: T) { 100 self.notify_with_exclude(message_result, &[]).await 101 } 102 103 /// Publish a message to all listening subscriptions but exclude some subset. 104 pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) { 105 for (id, sub) in (*self.subs.lock().await).iter() { 106 if exclude_list.contains(id) { 107 continue 108 } 109 110 if let Err(e) = sub.send(message_result.clone()).await { 111 warn!( 112 target: "system::publisher", 113 "[system::publisher] Error returned sending message in notify_with_exclude() call! {e}" 114 ); 115 } 116 } 117 } 118 }