/ bin / app / src / util / rt.rs
rt.rs
 1  /* This file is part of DarkFi (https://dark.fi)
 2   *
 3   * Copyright (C) 2020-2025 Dyne.org foundation
 4   *
 5   * This program is free software: you can redistribute it and/or modify
 6   * it under the terms of the GNU Affero General Public License as
 7   * published by the Free Software Foundation, either version 3 of the
 8   * License, or (at your option) any later version.
 9   *
10   * This program is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Affero General Public License for more details.
14   *
15   * You should have received a copy of the GNU Affero General Public License
16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17   */
18  
19  use async_channel::{Receiver, Sender};
20  use parking_lot::Mutex as SyncMutex;
21  use smol::Task;
22  use std::{sync::Arc, thread};
23  
24  macro_rules! d { ($($arg:tt)*) => { debug!(target: "rt", $($arg)*); } }
25  macro_rules! t { ($($arg:tt)*) => { trace!(target: "rt", $($arg)*); } }
26  
27  pub type ExecutorPtr = Arc<smol::Executor<'static>>;
28  
29  pub struct AsyncRuntime {
30      name: &'static str,
31      signal: Sender<()>,
32      shutdown: Receiver<()>,
33      exec_threadpool: SyncMutex<Vec<thread::JoinHandle<()>>>,
34      ex: ExecutorPtr,
35      tasks: SyncMutex<Vec<Task<()>>>,
36  }
37  
38  impl AsyncRuntime {
39      pub fn new(ex: ExecutorPtr, name: &'static str) -> Self {
40          let (signal, shutdown) = async_channel::unbounded::<()>();
41  
42          Self {
43              name,
44              signal,
45              shutdown,
46              exec_threadpool: SyncMutex::new(vec![]),
47              ex,
48              tasks: SyncMutex::new(vec![]),
49          }
50      }
51  
52      pub fn start(&self) {
53          let n_threads = thread::available_parallelism().unwrap().get();
54          self.start_with_count(n_threads);
55      }
56  
57      pub fn start_with_count(&self, n_threads: usize) {
58          let mut exec_threadpool = Vec::with_capacity(n_threads);
59          // N executor threads
60          for _ in 0..n_threads {
61              let shutdown = self.shutdown.clone();
62              let ex = self.ex.clone();
63  
64              let handle = thread::spawn(move || {
65                  let _ = smol::future::block_on(ex.run(shutdown.recv()));
66              });
67              exec_threadpool.push(handle);
68          }
69          *self.exec_threadpool.lock() = exec_threadpool;
70          info!(target: "rt", "[{}] Started runtime [{n_threads} threads]", self.name);
71      }
72  
73      pub fn push_task(&self, task: Task<()>) {
74          self.tasks.lock().push(task);
75      }
76  
77      pub fn stop(&self) {
78          let exec_threadpool = std::mem::take(&mut *self.exec_threadpool.lock());
79  
80          d!("[{}] Stopping async runtime...", self.name);
81          // Just drop all the tasks without waiting for them to finish.
82          self.tasks.lock().clear();
83  
84          for _ in &exec_threadpool {
85              self.signal.try_send(()).unwrap();
86          }
87  
88          for handle in exec_threadpool {
89              handle.join().unwrap();
90          }
91  
92          t!("[{}] Stopped runtime", self.name);
93      }
94  }