/ bin / app / src / pubsub.rs
pubsub.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use rand::{rngs::OsRng, Rng};
 20  use std::{
 21      collections::HashMap,
 22      sync::{Arc, Mutex},
 23  };
 24  
 25  use crate::error::{Error, Result};
 26  
 27  pub type SubscriptionId = usize;
 28  
 29  // Waiting for trait aliases
 30  pub trait Piped: Clone + Send + 'static {}
 31  impl<T> Piped for T where T: Clone + Send + 'static {}
 32  
 33  #[derive(Debug)]
 34  /// Subscription to the Publisher. Created using `publisher.subscribe().await`.
 35  pub struct Subscription<T: Piped> {
 36      id: SubscriptionId,
 37      recv_queue: smol::channel::Receiver<T>,
 38      parent: Arc<Publisher<T>>,
 39  }
 40  
 41  impl<T: Piped> Subscription<T> {
 42      pub fn get_id(&self) -> SubscriptionId {
 43          self.id
 44      }
 45  
 46      /// Receive message.
 47      pub async fn receive(&self) -> Result<T> {
 48          let msg_result = self.recv_queue.recv().await;
 49          msg_result.or(Err(Error::PublisherDestroyed))
 50      }
 51  }
 52  
 53  impl<T: Piped> Drop for Subscription<T> {
 54      fn drop(&mut self) {
 55          self.parent.unsubscribe(self.id)
 56      }
 57  }
 58  
 59  pub type PublisherPtr<T> = Arc<Publisher<T>>;
 60  
 61  #[derive(Debug)]
 62  pub struct Publisher<T> {
 63      subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>,
 64  }
 65  
 66  impl<T: Piped> Publisher<T> {
 67      pub fn new() -> Arc<Self> {
 68          Arc::new(Self { subs: Mutex::new(HashMap::new()) })
 69      }
 70  
 71      pub fn subscribe(self: Arc<Self>) -> Subscription<T> {
 72          let (sendr, recvr) = smol::channel::unbounded();
 73          let sub_id = OsRng.gen();
 74          // Optional to check whether this ID already exists.
 75          // It is nearly impossible to ever happen.
 76          self.subs.lock().unwrap().insert(sub_id, sendr);
 77  
 78          Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() }
 79      }
 80  
 81      fn unsubscribe(&self, sub_id: SubscriptionId) {
 82          self.subs.lock().unwrap().remove(&sub_id);
 83      }
 84  
 85      /// Publish a message to subscriptions in the include list
 86      pub fn notify_with_include(&self, message_result: T, include_list: &[SubscriptionId]) {
 87          // Maybe we should just provide a method to get all IDs
 88          // Then people can call notify_with_exclude() instead.
 89          // TODO: just collect and clone directly into a Vec
 90          let subs = self.subs.lock().unwrap().clone();
 91          for (id, sub) in subs.into_iter() {
 92              if !include_list.contains(&id) {
 93                  continue
 94              }
 95  
 96              if let Err(e) = sub.try_send(message_result.clone()) {
 97                  panic!("[system::publisher] Error returned sending message in notify_with_include() call! {}", e);
 98              }
 99          }
100      }
101  
102      /// Publish a message to all listening subscriptions.
103      pub fn notify(&self, msg: T) {
104          let subs = self.subs.lock().unwrap().clone();
105          for (id, sub) in subs {
106              if let Err(e) = sub.try_send(msg.clone()) {
107                  // This should never happen since Drop calls unsubscribe()
108                  panic!("Error in notify() call for sub={}! {}", id, e);
109              }
110          }
111      }
112  }