acceptor.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{ 20 io::ErrorKind, 21 sync::{ 22 atomic::{AtomicUsize, Ordering::SeqCst}, 23 Arc, 24 }, 25 }; 26 27 use smol::Executor; 28 use tracing::{error, warn}; 29 use url::Url; 30 31 use super::{ 32 channel::{Channel, ChannelPtr}, 33 hosts::HostColor, 34 session::SessionWeakPtr, 35 transport::{Listener, PtListener}, 36 }; 37 use crate::{ 38 system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, 39 util::logger::verbose, 40 Error, Result, 41 }; 42 43 /// Atomic pointer to Acceptor 44 pub type AcceptorPtr = Arc<Acceptor>; 45 46 /// Create inbound socket connections 47 pub struct Acceptor { 48 channel_publisher: PublisherPtr<Result<ChannelPtr>>, 49 task: StoppableTaskPtr, 50 session: SessionWeakPtr, 51 conn_count: AtomicUsize, 52 } 53 54 impl Acceptor { 55 /// Create new Acceptor object. 56 pub fn new(session: SessionWeakPtr) -> AcceptorPtr { 57 Arc::new(Self { 58 channel_publisher: Publisher::new(), 59 task: StoppableTask::new(), 60 session, 61 conn_count: AtomicUsize::new(0), 62 }) 63 } 64 65 /// Start accepting inbound socket connections 66 pub async fn start(self: Arc<Self>, endpoint: Url, ex: Arc<Executor<'_>>) -> Result<()> { 67 let datastore = 68 self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone(); 69 70 // Initialize listener 71 let listener = Listener::new(endpoint.clone(), datastore).await?; 72 73 // Open socket 74 let ptlistener = listener.listen().await?; 75 76 #[cfg(feature = "p2p-tor")] 77 if endpoint.scheme() == "tor" { 78 let onion_addr = listener.endpoint().await; 79 verbose!("[P2P] Adding {onion_addr} to external_addrs"); 80 self.session 81 .upgrade() 82 .unwrap() 83 .p2p() 84 .settings() 85 .write() 86 .await 87 .external_addrs 88 .push(onion_addr); 89 } 90 91 self.accept(ptlistener, ex); 92 Ok(()) 93 } 94 95 /// Stop accepting inbound socket connections 96 pub async fn stop(&self) { 97 // Send stop signal 98 self.task.stop().await; 99 } 100 101 /// Start receiving network messages. 102 pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> { 103 self.channel_publisher.clone().subscribe().await 104 } 105 106 /// Run the accept loop in a new thread and error if a connection problem occurs 107 fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: Arc<Executor<'_>>) { 108 let self_ = self.clone(); 109 self.task.clone().start( 110 self.run_accept_loop(listener, ex.clone()), 111 |result| self_.handle_stop(result), 112 Error::NetworkServiceStopped, 113 ex, 114 ); 115 } 116 117 /// Run the accept loop. 118 async fn run_accept_loop( 119 self: Arc<Self>, 120 listener: Box<dyn PtListener>, 121 ex: Arc<Executor<'_>>, 122 ) -> Result<()> { 123 // CondVar used to notify the loop to recheck if new connections can 124 // be accepted by the listener. 125 let cv = Arc::new(CondVar::new()); 126 let hosts = self.session.upgrade().unwrap().p2p().hosts(); 127 128 loop { 129 // Refuse new connections if we're up to the connection limit 130 let limit = 131 self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections; 132 133 if self.clone().conn_count.load(SeqCst) >= limit { 134 // This will get notified every time an inbound channel is stopped. 135 // These channels are the channels spawned below on listener.next().is_ok(). 136 // After the notification, we reset the condvar and retry this loop to see 137 // if we can accept more connections, and if not - we'll be back here. 138 warn!(target: "net::acceptor::run_accept_loop()", "Reached incoming conn limit, waiting..."); 139 cv.wait().await; 140 cv.reset(); 141 continue 142 } 143 144 // Now we wait for a new connection. 145 match listener.next().await { 146 Ok((stream, url)) => { 147 // Check if we reject this peer 148 if hosts.container.contains(HostColor::Black as usize, &url) || 149 hosts.block_all_ports(&url) 150 { 151 warn!(target: "net::acceptor::run_accept_loop()", "Peer {url} is blacklisted"); 152 continue 153 } 154 155 // Create the new Channel. 156 let session = self.session.clone(); 157 let channel = Channel::new(stream, None, url, session, false).await; 158 159 // Increment the connection counter 160 self.conn_count.fetch_add(1, SeqCst); 161 162 // This task will subscribe on the new channel and decrement 163 // the connection counter. Along with that, it will notify 164 // the CondVar that might be waiting to allow new connections. 165 let self_ = self.clone(); 166 let channel_ = channel.clone(); 167 let cv_ = cv.clone(); 168 ex.spawn(async move { 169 let stop_sub = channel_.subscribe_stop().await?; 170 stop_sub.receive().await; 171 self_.conn_count.fetch_sub(1, SeqCst); 172 cv_.notify(); 173 Ok::<(), crate::Error>(()) 174 }) 175 .detach(); 176 177 // Finally, notify any publishers about the new channel. 178 self.channel_publisher.notify(Ok(channel)).await; 179 } 180 181 // As per accept(2) recommendation: 182 Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() { 183 libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue, 184 libc::ECONNRESET => { 185 warn!( 186 target: "net::acceptor::run_accept_loop()", 187 "[P2P] Connection reset by peer in accept_loop" 188 ); 189 continue 190 } 191 libc::ETIMEDOUT => { 192 warn!( 193 target: "net::acceptor::run_accept_loop()", 194 "[P2P] Connection timed out in accept_loop" 195 ); 196 continue 197 } 198 libc::EPIPE => { 199 warn!( 200 target: "net::acceptor::run_accept_loop()", 201 "[P2P] Broken pipe in accept_loop" 202 ); 203 continue 204 } 205 x => { 206 warn!( 207 target: "net::acceptor::run_accept_loop()", 208 "[P2P] Unhandled OS Error: {e} {x}" 209 ); 210 continue 211 212 /* 213 error!( 214 target: "net::acceptor::run_accept_loop()", 215 "[P2P] Acceptor failed listening: {e} ({x})" 216 ); 217 error!( 218 target: "net::acceptor::run_accept_loop()", 219 "[P2P] Closing listener loop" 220 ); 221 return Err(e.into()) 222 */ 223 } 224 }, 225 226 // In case a TLS handshake fails, we'll get this: 227 Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue, 228 229 // Handle ErrorKind::Other 230 Err(e) if e.kind() == ErrorKind::Other => { 231 if let Some(inner) = std::error::Error::source(&e) { 232 if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() { 233 error!( 234 target: "net::acceptor::run_accept_loop()", 235 "[P2P] rustls listener error: {inner:?}" 236 ); 237 continue 238 } 239 } 240 241 error!( 242 target: "net::acceptor::run_accept_loop()", 243 "[P2P] Unhandled ErrorKind::Other error: {e:?}" 244 ); 245 return Err(e.into()) 246 } 247 248 // Errors we didn't handle above: 249 Err(e) => { 250 error!( 251 target: "net::acceptor::run_accept_loop()", 252 "[P2P] Unhandled listener.next() error: {e}" 253 ); 254 /* 255 error!( 256 target: "net::acceptor::run_accept_loop()", 257 "[P2P] Closing listener loop" 258 ); 259 return Err(e.into()) 260 */ 261 continue 262 } 263 } 264 } 265 } 266 267 /// Handles network errors. Panics if errors pass silently, otherwise broadcasts it 268 /// to all channel publishers. 269 async fn handle_stop(self: Arc<Self>, result: Result<()>) { 270 match result { 271 Ok(()) => panic!("Acceptor task should never complete without error status"), 272 Err(err) => self.channel_publisher.notify(Err(err)).await, 273 } 274 } 275 }