/ src / net / acceptor.rs
acceptor.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::{
 20      io::ErrorKind,
 21      sync::{
 22          atomic::{AtomicUsize, Ordering::SeqCst},
 23          Arc,
 24      },
 25  };
 26  
 27  use smol::Executor;
 28  use tracing::{error, warn};
 29  use url::Url;
 30  
 31  use super::{
 32      channel::{Channel, ChannelPtr},
 33      hosts::HostColor,
 34      session::SessionWeakPtr,
 35      transport::{Listener, PtListener},
 36  };
 37  use crate::{
 38      system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
 39      util::logger::verbose,
 40      Error, Result,
 41  };
 42  
 43  /// Atomic pointer to Acceptor
 44  pub type AcceptorPtr = Arc<Acceptor>;
 45  
 46  /// Create inbound socket connections
 47  pub struct Acceptor {
 48      channel_publisher: PublisherPtr<Result<ChannelPtr>>,
 49      task: StoppableTaskPtr,
 50      session: SessionWeakPtr,
 51      conn_count: AtomicUsize,
 52  }
 53  
 54  impl Acceptor {
 55      /// Create new Acceptor object.
 56      pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
 57          Arc::new(Self {
 58              channel_publisher: Publisher::new(),
 59              task: StoppableTask::new(),
 60              session,
 61              conn_count: AtomicUsize::new(0),
 62          })
 63      }
 64  
 65      /// Start accepting inbound socket connections
 66      pub async fn start(self: Arc<Self>, endpoint: Url, ex: Arc<Executor<'_>>) -> Result<()> {
 67          let datastore =
 68              self.session.upgrade().unwrap().p2p().settings().read().await.p2p_datastore.clone();
 69  
 70          // Initialize listener
 71          let listener = Listener::new(endpoint.clone(), datastore).await?;
 72  
 73          // Open socket
 74          let ptlistener = listener.listen().await?;
 75  
 76          #[cfg(feature = "p2p-tor")]
 77          if endpoint.scheme() == "tor" {
 78              let onion_addr = listener.endpoint().await;
 79              verbose!("[P2P] Adding {onion_addr} to external_addrs");
 80              self.session
 81                  .upgrade()
 82                  .unwrap()
 83                  .p2p()
 84                  .settings()
 85                  .write()
 86                  .await
 87                  .external_addrs
 88                  .push(onion_addr);
 89          }
 90  
 91          self.accept(ptlistener, ex);
 92          Ok(())
 93      }
 94  
 95      /// Stop accepting inbound socket connections
 96      pub async fn stop(&self) {
 97          // Send stop signal
 98          self.task.stop().await;
 99      }
100  
101      /// Start receiving network messages.
102      pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
103          self.channel_publisher.clone().subscribe().await
104      }
105  
106      /// Run the accept loop in a new thread and error if a connection problem occurs
107      fn accept(self: Arc<Self>, listener: Box<dyn PtListener>, ex: Arc<Executor<'_>>) {
108          let self_ = self.clone();
109          self.task.clone().start(
110              self.run_accept_loop(listener, ex.clone()),
111              |result| self_.handle_stop(result),
112              Error::NetworkServiceStopped,
113              ex,
114          );
115      }
116  
117      /// Run the accept loop.
118      async fn run_accept_loop(
119          self: Arc<Self>,
120          listener: Box<dyn PtListener>,
121          ex: Arc<Executor<'_>>,
122      ) -> Result<()> {
123          // CondVar used to notify the loop to recheck if new connections can
124          // be accepted by the listener.
125          let cv = Arc::new(CondVar::new());
126          let hosts = self.session.upgrade().unwrap().p2p().hosts();
127  
128          loop {
129              // Refuse new connections if we're up to the connection limit
130              let limit =
131                  self.session.upgrade().unwrap().p2p().settings().read().await.inbound_connections;
132  
133              if self.clone().conn_count.load(SeqCst) >= limit {
134                  // This will get notified every time an inbound channel is stopped.
135                  // These channels are the channels spawned below on listener.next().is_ok().
136                  // After the notification, we reset the condvar and retry this loop to see
137                  // if we can accept more connections, and if not - we'll be back here.
138                  warn!(target: "net::acceptor::run_accept_loop()", "Reached incoming conn limit, waiting...");
139                  cv.wait().await;
140                  cv.reset();
141                  continue
142              }
143  
144              // Now we wait for a new connection.
145              match listener.next().await {
146                  Ok((stream, url)) => {
147                      // Check if we reject this peer
148                      if hosts.container.contains(HostColor::Black as usize, &url) ||
149                          hosts.block_all_ports(&url)
150                      {
151                          warn!(target: "net::acceptor::run_accept_loop()", "Peer {url} is blacklisted");
152                          continue
153                      }
154  
155                      // Create the new Channel.
156                      let session = self.session.clone();
157                      let channel = Channel::new(stream, None, url, session, false).await;
158  
159                      // Increment the connection counter
160                      self.conn_count.fetch_add(1, SeqCst);
161  
162                      // This task will subscribe on the new channel and decrement
163                      // the connection counter. Along with that, it will notify
164                      // the CondVar that might be waiting to allow new connections.
165                      let self_ = self.clone();
166                      let channel_ = channel.clone();
167                      let cv_ = cv.clone();
168                      ex.spawn(async move {
169                          let stop_sub = channel_.subscribe_stop().await?;
170                          stop_sub.receive().await;
171                          self_.conn_count.fetch_sub(1, SeqCst);
172                          cv_.notify();
173                          Ok::<(), crate::Error>(())
174                      })
175                      .detach();
176  
177                      // Finally, notify any publishers about the new channel.
178                      self.channel_publisher.notify(Ok(channel)).await;
179                  }
180  
181                  // As per accept(2) recommendation:
182                  Err(e) if e.raw_os_error().is_some() => match e.raw_os_error().unwrap() {
183                      libc::EAGAIN | libc::ECONNABORTED | libc::EPROTO | libc::EINTR => continue,
184                      libc::ECONNRESET => {
185                          warn!(
186                              target: "net::acceptor::run_accept_loop()",
187                              "[P2P] Connection reset by peer in accept_loop"
188                          );
189                          continue
190                      }
191                      libc::ETIMEDOUT => {
192                          warn!(
193                              target: "net::acceptor::run_accept_loop()",
194                              "[P2P] Connection timed out in accept_loop"
195                          );
196                          continue
197                      }
198                      libc::EPIPE => {
199                          warn!(
200                              target: "net::acceptor::run_accept_loop()",
201                              "[P2P] Broken pipe in accept_loop"
202                          );
203                          continue
204                      }
205                      x => {
206                          warn!(
207                              target: "net::acceptor::run_accept_loop()",
208                              "[P2P] Unhandled OS Error: {e} {x}"
209                          );
210                          continue
211  
212                          /*
213                          error!(
214                              target: "net::acceptor::run_accept_loop()",
215                              "[P2P] Acceptor failed listening: {e} ({x})"
216                          );
217                          error!(
218                              target: "net::acceptor::run_accept_loop()",
219                              "[P2P] Closing listener loop"
220                          );
221                          return Err(e.into())
222                          */
223                      }
224                  },
225  
226                  // In case a TLS handshake fails, we'll get this:
227                  Err(e) if e.kind() == ErrorKind::UnexpectedEof => continue,
228  
229                  // Handle ErrorKind::Other
230                  Err(e) if e.kind() == ErrorKind::Other => {
231                      if let Some(inner) = std::error::Error::source(&e) {
232                          if let Some(inner) = inner.downcast_ref::<futures_rustls::rustls::Error>() {
233                              error!(
234                                  target: "net::acceptor::run_accept_loop()",
235                                  "[P2P] rustls listener error: {inner:?}"
236                              );
237                              continue
238                          }
239                      }
240  
241                      error!(
242                          target: "net::acceptor::run_accept_loop()",
243                          "[P2P] Unhandled ErrorKind::Other error: {e:?}"
244                      );
245                      return Err(e.into())
246                  }
247  
248                  // Errors we didn't handle above:
249                  Err(e) => {
250                      error!(
251                          target: "net::acceptor::run_accept_loop()",
252                          "[P2P] Unhandled listener.next() error: {e}"
253                      );
254                      /*
255                      error!(
256                          target: "net::acceptor::run_accept_loop()",
257                          "[P2P] Closing listener loop"
258                      );
259                      return Err(e.into())
260                      */
261                      continue
262                  }
263              }
264          }
265      }
266  
267      /// Handles network errors. Panics if errors pass silently, otherwise broadcasts it
268      /// to all channel publishers.
269      async fn handle_stop(self: Arc<Self>, result: Result<()>) {
270          match result {
271              Ok(()) => panic!("Acceptor task should never complete without error status"),
272              Err(err) => self.channel_publisher.notify(Err(err)).await,
273          }
274      }
275  }