rt.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use async_channel::{Receiver, Sender}; 20 use parking_lot::Mutex as SyncMutex; 21 use smol::Task; 22 use std::{sync::Arc, thread}; 23 24 macro_rules! d { ($($arg:tt)*) => { debug!(target: "rt", $($arg)*); } } 25 macro_rules! t { ($($arg:tt)*) => { trace!(target: "rt", $($arg)*); } } 26 27 pub type ExecutorPtr = Arc<smol::Executor<'static>>; 28 29 pub struct AsyncRuntime { 30 name: &'static str, 31 signal: Sender<()>, 32 shutdown: Receiver<()>, 33 exec_threadpool: SyncMutex<Vec<thread::JoinHandle<()>>>, 34 ex: ExecutorPtr, 35 tasks: SyncMutex<Vec<Task<()>>>, 36 } 37 38 impl AsyncRuntime { 39 pub fn new(ex: ExecutorPtr, name: &'static str) -> Self { 40 let (signal, shutdown) = async_channel::unbounded::<()>(); 41 42 Self { 43 name, 44 signal, 45 shutdown, 46 exec_threadpool: SyncMutex::new(vec![]), 47 ex, 48 tasks: SyncMutex::new(vec![]), 49 } 50 } 51 52 pub fn start(&self) { 53 let n_threads = thread::available_parallelism().unwrap().get(); 54 self.start_with_count(n_threads); 55 } 56 57 pub fn start_with_count(&self, n_threads: usize) { 58 let mut exec_threadpool = Vec::with_capacity(n_threads); 59 // N executor threads 60 for _ in 0..n_threads { 61 let shutdown = self.shutdown.clone(); 62 let ex = self.ex.clone(); 63 64 let handle = thread::spawn(move || { 65 let _ = smol::future::block_on(ex.run(shutdown.recv())); 66 }); 67 exec_threadpool.push(handle); 68 } 69 *self.exec_threadpool.lock() = exec_threadpool; 70 info!(target: "rt", "[{}] Started runtime [{n_threads} threads]", self.name); 71 } 72 73 pub fn push_task(&self, task: Task<()>) { 74 self.tasks.lock().push(task); 75 } 76 77 pub fn stop(&self) { 78 let exec_threadpool = std::mem::take(&mut *self.exec_threadpool.lock()); 79 80 d!("[{}] Stopping async runtime...", self.name); 81 // Just drop all the tasks without waiting for them to finish. 82 self.tasks.lock().clear(); 83 84 for _ in &exec_threadpool { 85 self.signal.try_send(()).unwrap(); 86 } 87 88 for handle in exec_threadpool { 89 handle.join().unwrap(); 90 } 91 92 t!("[{}] Stopped runtime", self.name); 93 } 94 }