/ src / system / publisher.rs
publisher.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::{collections::HashMap, sync::Arc};
 20  
 21  use rand::{rngs::OsRng, Rng};
 22  use smol::lock::Mutex;
 23  use tracing::warn;
 24  
 25  pub type PublisherPtr<T> = Arc<Publisher<T>>;
 26  pub type SubscriptionId = usize;
 27  
 28  #[derive(Debug)]
 29  /// Subscription to the Publisher. Created using `publisher.subscribe().await`.
 30  pub struct Subscription<T> {
 31      id: SubscriptionId,
 32      recv_queue: smol::channel::Receiver<T>,
 33      parent: Arc<Publisher<T>>,
 34  }
 35  
 36  impl<T: Clone> Subscription<T> {
 37      pub fn get_id(&self) -> SubscriptionId {
 38          self.id
 39      }
 40  
 41      /// Receive message.
 42      pub async fn receive(&self) -> T {
 43          let message_result = self.recv_queue.recv().await;
 44  
 45          match message_result {
 46              Ok(message_result) => message_result,
 47              Err(err) => {
 48                  panic!("Subscription::receive() recv_queue failed! {err}");
 49              }
 50          }
 51      }
 52  
 53      /// Must be called manually since async Drop is not possible in Rust
 54      pub async fn unsubscribe(&self) {
 55          self.parent.clone().unsubscribe(self.id).await
 56      }
 57  }
 58  
 59  /// Simple broadcast (publish-subscribe) class.
 60  #[derive(Debug)]
 61  pub struct Publisher<T> {
 62      subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>,
 63  }
 64  
 65  impl<T: Clone> Publisher<T> {
 66      /// Construct a new publisher.
 67      pub fn new() -> Arc<Self> {
 68          Arc::new(Self { subs: Mutex::new(HashMap::new()) })
 69      }
 70  
 71      fn random_id() -> SubscriptionId {
 72          OsRng.gen()
 73      }
 74  
 75      /// Make sure you call this method early in your setup. That way the subscription
 76      /// will begin accumulating messages from notify.
 77      /// Then when your main loop begins calling `sub.receive().await`, the messages will
 78      /// already be queued.
 79      pub async fn subscribe(self: Arc<Self>) -> Subscription<T> {
 80          let (sender, recvr) = smol::channel::unbounded();
 81  
 82          // Poor-man's do/while
 83          let mut subs = self.subs.lock().await;
 84          let mut sub_id = Self::random_id();
 85          while subs.contains_key(&sub_id) {
 86              sub_id = Self::random_id();
 87          }
 88  
 89          subs.insert(sub_id, sender);
 90  
 91          Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() }
 92      }
 93  
 94      async fn unsubscribe(self: Arc<Self>, sub_id: SubscriptionId) {
 95          self.subs.lock().await.remove(&sub_id);
 96      }
 97  
 98      /// Publish a message to all listening subscriptions.
 99      pub async fn notify(&self, message_result: T) {
100          self.notify_with_exclude(message_result, &[]).await
101      }
102  
103      /// Publish a message to all listening subscriptions but exclude some subset.
104      pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) {
105          for (id, sub) in (*self.subs.lock().await).iter() {
106              if exclude_list.contains(id) {
107                  continue
108              }
109  
110              if let Err(e) = sub.send(message_result.clone()).await {
111                  warn!(
112                      target: "system::publisher",
113                      "[system::publisher] Error returned sending message in notify_with_exclude() call! {e}"
114                  );
115              }
116          }
117      }
118  }