/ node / tcp / src / tcp.rs
tcp.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use std::{
 20      collections::HashSet,
 21      fmt,
 22      io,
 23      net::{IpAddr, SocketAddr},
 24      ops::Deref,
 25      sync::{
 26          atomic::{AtomicUsize, Ordering::*},
 27          Arc,
 28      },
 29      time::{Duration, Instant},
 30  };
 31  
 32  #[cfg(feature = "locktick")]
 33  use locktick::parking_lot::Mutex;
 34  use once_cell::sync::OnceCell;
 35  #[cfg(not(feature = "locktick"))]
 36  use parking_lot::Mutex;
 37  use tokio::{
 38      io::split,
 39      net::{TcpListener, TcpStream},
 40      sync::oneshot,
 41      task::JoinHandle,
 42      time::timeout,
 43  };
 44  use tracing::*;
 45  
 46  use crate::{
 47      connections::{Connection, ConnectionSide, Connections},
 48      protocols::{Protocol, Protocols},
 49      BannedPeers,
 50      Config,
 51      KnownPeers,
 52      Stats,
 53  };
 54  
 55  // A sequential numeric identifier assigned to `Tcp`s that were not provided with a name.
 56  static SEQUENTIAL_NODE_ID: AtomicUsize = AtomicUsize::new(0);
 57  
 58  /// The central object responsible for handling connections.
 59  #[derive(Clone)]
 60  pub struct Tcp(Arc<InnerTcp>);
 61  
 62  impl Deref for Tcp {
 63      type Target = Arc<InnerTcp>;
 64  
 65      fn deref(&self) -> &Self::Target {
 66          &self.0
 67      }
 68  }
 69  
 70  /// Error types for the `Tcp::connect` function.
 71  #[allow(missing_docs)]
 72  #[derive(thiserror::Error, Debug)]
 73  pub enum ConnectError {
 74      #[error("already reached the maximum number of {limit} connections")]
 75      MaximumConnectionsReached { limit: u16 },
 76      #[error("already connecting to node at {address:?}")]
 77      AlreadyConnecting { address: SocketAddr },
 78      #[error("already connected to node at {address:?}")]
 79      AlreadyConnected { address: SocketAddr },
 80      #[error("attempt to self-connect (at address {address:?}")]
 81      SelfConnect { address: SocketAddr },
 82      #[error("I/O error: {0}")]
 83      IoError(std::io::Error),
 84  }
 85  
 86  impl From<std::io::Error> for ConnectError {
 87      fn from(inner: std::io::Error) -> Self {
 88          Self::IoError(inner)
 89      }
 90  }
 91  
 92  #[doc(hidden)]
 93  pub struct InnerTcp {
 94      /// The tracing span.
 95      span: Span,
 96      /// The node's configuration.
 97      config: Config,
 98      /// The node's listening address.
 99      listening_addr: OnceCell<SocketAddr>,
100      /// Contains objects used by the protocols implemented by the node.
101      pub(crate) protocols: Protocols,
102      /// A set of connections that have not been finalized yet.
103      connecting: Mutex<HashSet<SocketAddr>>,
104      /// Contains objects related to the node's active connections.
105      connections: Connections,
106      /// Collects statistics related to the node's peers.
107      known_peers: KnownPeers,
108      /// Contains the set of currently banned peers.
109      banned_peers: BannedPeers,
110      /// Collects statistics related to the node itself.
111      stats: Stats,
112      /// The node's tasks.
113      pub(crate) tasks: Mutex<Vec<JoinHandle<()>>>,
114  }
115  
116  impl Tcp {
117      /// Creates a new [`Tcp`] using the given [`Config`].
118      pub fn new(mut config: Config) -> Self {
119          // If there is no pre-configured name, assign a sequential numeric identifier.
120          if config.name.is_none() {
121              config.name = Some(SEQUENTIAL_NODE_ID.fetch_add(1, Relaxed).to_string());
122          }
123  
124          // Create a tracing span containing the node's name.
125          let span = crate::helpers::create_span(config.name.as_deref().unwrap());
126  
127          // Initialize the Tcp stack.
128          let tcp = Tcp(Arc::new(InnerTcp {
129              span,
130              config,
131              listening_addr: Default::default(),
132              protocols: Default::default(),
133              connecting: Default::default(),
134              connections: Default::default(),
135              known_peers: Default::default(),
136              banned_peers: Default::default(),
137              stats: Stats::new(Instant::now()),
138              tasks: Default::default(),
139          }));
140  
141          debug!(parent: tcp.span(), "The node is ready");
142  
143          tcp
144      }
145  
146      /// Returns the name assigned.
147      #[inline]
148      pub fn name(&self) -> &str {
149          // safe; can be set as None in Config, but receives a default value on Tcp creation
150          self.config.name.as_deref().unwrap()
151      }
152  
153      /// Returns a reference to the configuration.
154      #[inline]
155      pub fn config(&self) -> &Config {
156          &self.config
157      }
158  
159      /// Returns the listening address; returns an error if Tcp was not configured
160      /// to listen for inbound connections.
161      pub fn listening_addr(&self) -> io::Result<SocketAddr> {
162          self.listening_addr.get().copied().ok_or_else(|| io::ErrorKind::AddrNotAvailable.into())
163      }
164  
165      /// Checks whether the provided address is connected.
166      pub fn is_connected(&self, addr: SocketAddr) -> bool {
167          self.connections.is_connected(addr)
168      }
169  
170      /// Checks if Tcp is currently setting up a connection with the provided address.
171      pub fn is_connecting(&self, addr: SocketAddr) -> bool {
172          self.connecting.lock().contains(&addr)
173      }
174  
175      /// Returns the number of active connections.
176      pub fn num_connected(&self) -> usize {
177          self.connections.num_connected()
178      }
179  
180      /// Returns the number of connections that are currently being set up.
181      pub fn num_connecting(&self) -> usize {
182          self.connecting.lock().len()
183      }
184  
185      /// Returns a list containing addresses of active connections.
186      pub fn connected_addrs(&self) -> Vec<SocketAddr> {
187          self.connections.addrs()
188      }
189  
190      /// Returns a list containing addresses of pending connections.
191      pub fn connecting_addrs(&self) -> Vec<SocketAddr> {
192          self.connecting.lock().iter().copied().collect()
193      }
194  
195      /// Returns a reference to the collection of statistics of known peers.
196      #[inline]
197      pub fn known_peers(&self) -> &KnownPeers {
198          &self.known_peers
199      }
200  
201      /// Returns a reference to the set of currently banned peers.
202      #[inline]
203      pub fn banned_peers(&self) -> &BannedPeers {
204          &self.banned_peers
205      }
206  
207      /// Returns a reference to the statistics.
208      #[inline]
209      pub fn stats(&self) -> &Stats {
210          &self.stats
211      }
212  
213      /// Returns the tracing [`Span`] associated with Tcp.
214      #[inline]
215      pub fn span(&self) -> &Span {
216          &self.span
217      }
218  
219      /// Gracefully shuts down the stack.
220      pub async fn shut_down(&self) {
221          debug!(parent: self.span(), "Shutting down the TCP stack");
222  
223          // Retrieve all tasks.
224          let mut tasks = std::mem::take(&mut *self.tasks.lock()).into_iter();
225  
226          // Abort the listening task first.
227          if let Some(listening_task) = tasks.next() {
228              listening_task.abort(); // abort the listening task first
229          }
230          // Disconnect from all connected peers.
231          for addr in self.connected_addrs() {
232              self.disconnect(addr).await;
233          }
234          // Abort all remaining tasks.
235          for handle in tasks {
236              handle.abort();
237          }
238      }
239  }
240  
241  impl Tcp {
242      /// Connects to the provided `SocketAddr`.
243      pub async fn connect(&self, addr: SocketAddr) -> Result<(), ConnectError> {
244          if let Ok(listening_addr) = self.listening_addr() {
245              // TODO(nkls): maybe this first check can be dropped; though it might be best to keep just in case.
246              if addr == listening_addr || self.is_self_connect(addr) {
247                  error!(parent: self.span(), "Attempted to self-connect ({addr})");
248                  return Err(ConnectError::SelfConnect { address: addr });
249              }
250          }
251  
252          if !self.can_add_connection() {
253              error!(parent: self.span(), "Too many connections; refusing to connect to {addr}");
254              return Err(ConnectError::MaximumConnectionsReached { limit: self.config.max_connections });
255          }
256  
257          if self.is_connected(addr) {
258              warn!(parent: self.span(), "Already connected to {addr}");
259              return Err(ConnectError::AlreadyConnected { address: addr });
260          }
261  
262          if !self.connecting.lock().insert(addr) {
263              warn!(parent: self.span(), "Already connecting to {addr}");
264              return Err(ConnectError::AlreadyConnecting { address: addr });
265          }
266  
267          let timeout_duration = Duration::from_millis(self.config().connection_timeout_ms.into());
268  
269          // Bind the tcp socket to the configured listener ip if it's set.
270          // Otherwise default to the system's default interface.
271          let res = if let Some(listen_ip) = self.config().listener_ip {
272              let sock =
273                  if listen_ip.is_ipv4() { tokio::net::TcpSocket::new_v4()? } else { tokio::net::TcpSocket::new_v6()? };
274              sock.bind(SocketAddr::new(listen_ip, 0))?;
275              timeout(timeout_duration, sock.connect(addr)).await
276          } else {
277              timeout(timeout_duration, TcpStream::connect(addr)).await
278          };
279  
280          let stream = match res {
281              Ok(Ok(stream)) => Ok(stream),
282              Ok(err) => {
283                  self.connecting.lock().remove(&addr);
284                  err
285              }
286              Err(err) => {
287                  self.connecting.lock().remove(&addr);
288                  error!("connection timeout error: {}", err);
289                  Err(io::ErrorKind::TimedOut.into())
290              }
291          }?;
292  
293          let ret = self.adapt_stream(stream, addr, ConnectionSide::Initiator).await;
294  
295          if let Err(ref e) = ret {
296              self.connecting.lock().remove(&addr);
297              self.known_peers().register_failure(addr.ip());
298              error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}");
299          }
300  
301          ret.map_err(|err| err.into())
302      }
303  
304      /// Disconnects from the provided `SocketAddr`.
305      ///
306      /// Returns true if the we were connected to the given address.
307      pub async fn disconnect(&self, addr: SocketAddr) -> bool {
308          // claim the disconnect to avoid duplicate executions, or return early if already claimed
309          if let Some(conn) = self.connections.0.read().get(&addr) {
310              if conn.disconnecting.swap(true, Relaxed) {
311                  // valid connection, but someone else is already disconnecting it
312                  return false;
313              }
314          } else {
315              // not connected
316              return false;
317          };
318  
319          if let Some(handler) = self.protocols.disconnect.get() {
320              let (sender, receiver) = oneshot::channel();
321              handler.trigger((addr, sender));
322              let _ = receiver.await; // can't really fail
323          }
324  
325          let conn = self.connections.remove(addr);
326  
327          if let Some(ref conn) = conn {
328              debug!(parent: self.span(), "Disconnecting from {}", conn.addr());
329  
330              // Shut down the associated tasks of the peer.
331              for task in conn.tasks.iter().rev() {
332                  task.abort();
333              }
334  
335              debug!(parent: self.span(), "Disconnected from {}", conn.addr());
336          } else {
337              warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}");
338          }
339  
340          conn.is_some()
341      }
342  }
343  
344  impl Tcp {
345      /// Spawns a task that listens for incoming connections.
346      pub async fn enable_listener(&self) -> io::Result<SocketAddr> {
347          // Retrieve the listening IP address, which must be set.
348          let listener_ip =
349              self.config().listener_ip.expect("Tcp::enable_listener was called, but Config::listener_ip is not set");
350  
351          // Initialize the TCP listener.
352          let listener = self.create_listener(listener_ip).await?;
353  
354          // Discover the port, if it was unspecified.
355          let port = listener.local_addr()?.port();
356  
357          // Set the listening IP address.
358          let listening_addr = (listener_ip, port).into();
359          self.listening_addr.set(listening_addr).expect("The node's listener was started more than once");
360  
361          // Use a channel to know when the listening task is ready.
362          let (tx, rx) = oneshot::channel();
363  
364          let tcp = self.clone();
365          let listening_task = tokio::spawn(async move {
366              trace!(parent: tcp.span(), "Spawned the listening task");
367              tx.send(()).unwrap(); // safe; the channel was just opened
368  
369              loop {
370                  // Await for a new connection.
371                  match listener.accept().await {
372                      Ok((stream, addr)) => tcp.handle_connection(stream, addr),
373                      Err(e) => error!(parent: tcp.span(), "Failed to accept a connection: {e}"),
374                  }
375              }
376          });
377          self.tasks.lock().push(listening_task);
378          let _ = rx.await;
379          debug!(parent: self.span(), "Listening on {listening_addr}");
380  
381          Ok(listening_addr)
382      }
383  
384      /// Creates an instance of `TcpListener` based on the node's configuration.
385      async fn create_listener(&self, listener_ip: IpAddr) -> io::Result<TcpListener> {
386          debug!("Creating a TCP listener on {listener_ip}...");
387          let listener = if let Some(port) = self.config().desired_listening_port {
388              // Construct the desired listening IP address.
389              let desired_listening_addr = SocketAddr::new(listener_ip, port);
390              // If a desired listening port is set, try to bind to it.
391              match TcpListener::bind(desired_listening_addr).await {
392                  Ok(listener) => listener,
393                  Err(e) => {
394                      if self.config().allow_random_port {
395                          warn!(
396                              parent: self.span(),
397                              "Trying any listening port, as the desired port is unavailable: {e}"
398                          );
399                          let random_available_addr = SocketAddr::new(listener_ip, 0);
400                          TcpListener::bind(random_available_addr).await?
401                      } else {
402                          error!(parent: self.span(), "The desired listening port is unavailable: {e}");
403                          return Err(e);
404                      }
405                  }
406              }
407          } else if self.config().allow_random_port {
408              let random_available_addr = SocketAddr::new(listener_ip, 0);
409              TcpListener::bind(random_available_addr).await?
410          } else {
411              panic!("As 'listener_ip' is set, either 'desired_listening_port' or 'allow_random_port' must be set");
412          };
413  
414          Ok(listener)
415      }
416  
417      /// Handles a new inbound connection.
418      fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) {
419          debug!(parent: self.span(), "Received a connection from {addr}");
420  
421          if !self.can_add_connection() || self.is_self_connect(addr) {
422              debug!(parent: self.span(), "Rejecting the connection from {addr}");
423              return;
424          }
425  
426          self.connecting.lock().insert(addr);
427  
428          let tcp = self.clone();
429          tokio::spawn(async move {
430              if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await {
431                  tcp.connecting.lock().remove(&addr);
432                  tcp.known_peers().register_failure(addr.ip());
433                  error!(parent: tcp.span(), "Failed to connect with {addr}: {e}");
434              }
435          });
436      }
437  
438      /// Checks if the given IP address is the same as the listening address of this `Tcp`.
439      fn is_self_connect(&self, addr: SocketAddr) -> bool {
440          // SAFETY: if we're opening connections, this should never fail.
441          let listening_addr = self.listening_addr().unwrap();
442  
443          match listening_addr.ip().is_loopback() {
444              // If localhost, check the ports, this only works on outbound connections, since we
445              // don't know the ephemeral port a peer might be using if they initiate the connection.
446              true => listening_addr.port() == addr.port(),
447              // If it's not localhost, matching IPs indicate a self-connect in both directions.
448              false => listening_addr.ip() == addr.ip(),
449          }
450      }
451  
452      /// Checks whether the `Tcp` can handle an additional connection.
453      fn can_add_connection(&self) -> bool {
454          // Retrieve the number of connected peers.
455          let num_connected = self.num_connected();
456          // Retrieve the maximum number of connected peers.
457          let limit = self.config.max_connections as usize;
458  
459          if num_connected >= limit {
460              warn!(parent: self.span(), "Maximum number of active connections ({limit}) reached");
461              false
462          } else if num_connected + self.num_connecting() >= limit {
463              warn!(parent: self.span(), "Maximum number of active & pending connections ({limit}) reached");
464              false
465          } else {
466              true
467          }
468      }
469  
470      /// Prepares the freshly acquired connection to handle the protocols the Tcp implements.
471      async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> {
472          self.known_peers.add(peer_addr.ip());
473  
474          // Register the port seen by the peer.
475          if own_side == ConnectionSide::Initiator {
476              if let Ok(addr) = stream.local_addr() {
477                  debug!(
478                      parent: self.span(), "establishing connection with {}; the peer is connected on port {}",
479                      peer_addr, addr.port()
480                  );
481              } else {
482                  warn!(parent: self.span(), "couldn't determine the peer's port");
483              }
484          }
485  
486          let connection = Connection::new(peer_addr, stream, !own_side);
487  
488          // Enact the enabled protocols.
489          let mut connection = self.enable_protocols(connection).await?;
490  
491          // if Reading is enabled, we'll notify the related task when the connection is fully ready.
492          let conn_ready_tx = connection.readiness_notifier.take();
493  
494          self.connections.add(connection);
495          self.connecting.lock().remove(&peer_addr);
496  
497          // Send the aforementioned notification so that reading from the socket can commence.
498          if let Some(tx) = conn_ready_tx {
499              let _ = tx.send(());
500          }
501  
502          // If enabled, enact OnConnect.
503          if let Some(handler) = self.protocols.on_connect.get() {
504              let (sender, receiver) = oneshot::channel();
505              handler.trigger((peer_addr, sender));
506              let _ = receiver.await; // can't really fail
507          }
508  
509          Ok(())
510      }
511  
512      /// Enacts the enabled protocols on the provided connection.
513      async fn enable_protocols(&self, conn: Connection) -> io::Result<Connection> {
514          /// A helper macro to enable a protocol on a connection.
515          macro_rules! enable_protocol {
516              ($handler_type: ident, $node:expr, $conn: expr) => {
517                  if let Some(handler) = $node.protocols.$handler_type.get() {
518                      let (conn_returner, conn_retriever) = oneshot::channel();
519  
520                      handler.trigger(($conn, conn_returner));
521  
522                      match conn_retriever.await {
523                          Ok(Ok(conn)) => conn,
524                          Err(_) => return Err(io::ErrorKind::BrokenPipe.into()),
525                          Ok(e) => return e,
526                      }
527                  } else {
528                      $conn
529                  }
530              };
531          }
532  
533          let mut conn = enable_protocol!(handshake, self, conn);
534  
535          // Split the stream after the handshake (if not done before).
536          if let Some(stream) = conn.stream.take() {
537              let (reader, writer) = split(stream);
538              conn.reader = Some(Box::new(reader));
539              conn.writer = Some(Box::new(writer));
540          }
541  
542          let conn = enable_protocol!(reading, self, conn);
543          let conn = enable_protocol!(writing, self, conn);
544  
545          Ok(conn)
546      }
547  }
548  
549  impl fmt::Debug for Tcp {
550      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
551          write!(f, "The TCP stack config: {:?}", self.config)
552      }
553  }
554  
555  #[cfg(test)]
556  mod tests {
557      use super::*;
558  
559      use std::{
560          net::{IpAddr, Ipv4Addr},
561          str::FromStr,
562      };
563  
564      #[tokio::test]
565      async fn test_new() {
566          let tcp = Tcp::new(Config {
567              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
568              max_connections: 200,
569              ..Default::default()
570          });
571  
572          assert_eq!(tcp.config.max_connections, 200);
573          assert_eq!(tcp.config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
574          assert_eq!(tcp.enable_listener().await.unwrap().ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
575  
576          assert_eq!(tcp.num_connected(), 0);
577          assert_eq!(tcp.num_connecting(), 0);
578      }
579  
580      #[tokio::test]
581      async fn test_connect() {
582          let tcp = Tcp::new(Config::default());
583          let node_ip = tcp.enable_listener().await.unwrap();
584  
585          // Ensure self-connecting is not possible.
586          let result = tcp.connect(node_ip).await;
587          assert!(matches!(result, Err(ConnectError::SelfConnect { .. })));
588  
589          assert_eq!(tcp.num_connected(), 0);
590          assert_eq!(tcp.num_connecting(), 0);
591          assert!(!tcp.is_connected(node_ip));
592          assert!(!tcp.is_connecting(node_ip));
593  
594          // Initialize the peer.
595          let peer = Tcp::new(Config {
596              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
597              desired_listening_port: Some(0),
598              max_connections: 1,
599              ..Default::default()
600          });
601          let peer_ip = peer.enable_listener().await.unwrap();
602  
603          // Connect to the peer.
604          tcp.connect(peer_ip).await.unwrap();
605          assert_eq!(tcp.num_connected(), 1);
606          assert_eq!(tcp.num_connecting(), 0);
607          assert!(tcp.is_connected(peer_ip));
608          assert!(!tcp.is_connecting(peer_ip));
609      }
610  
611      #[tokio::test]
612      async fn test_disconnect() {
613          let tcp = Tcp::new(Config::default());
614          let _node_ip = tcp.enable_listener().await.unwrap();
615  
616          // Initialize the peer.
617          let peer = Tcp::new(Config {
618              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
619              desired_listening_port: Some(0),
620              max_connections: 1,
621              ..Default::default()
622          });
623          let peer_ip = peer.enable_listener().await.unwrap();
624  
625          // Connect to the peer.
626          tcp.connect(peer_ip).await.unwrap();
627          assert_eq!(tcp.num_connected(), 1);
628          assert_eq!(tcp.num_connecting(), 0);
629          assert!(tcp.is_connected(peer_ip));
630          assert!(!tcp.is_connecting(peer_ip));
631  
632          // Disconnect from the peer.
633          let has_disconnected = tcp.disconnect(peer_ip).await;
634          assert!(has_disconnected);
635          assert_eq!(tcp.num_connected(), 0);
636          assert_eq!(tcp.num_connecting(), 0);
637          assert!(!tcp.is_connected(peer_ip));
638          assert!(!tcp.is_connecting(peer_ip));
639  
640          // Ensure disconnecting from the peer a second time is okay.
641          let has_disconnected = tcp.disconnect(peer_ip).await;
642          assert!(!has_disconnected);
643          assert_eq!(tcp.num_connected(), 0);
644          assert_eq!(tcp.num_connecting(), 0);
645          assert!(!tcp.is_connected(peer_ip));
646          assert!(!tcp.is_connecting(peer_ip));
647      }
648  
649      #[tokio::test]
650      async fn test_can_add_connection() {
651          let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
652  
653          // Initialize the peer.
654          let peer = Tcp::new(Config {
655              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
656              desired_listening_port: Some(0),
657              max_connections: 1,
658              ..Default::default()
659          });
660          let peer_ip = peer.enable_listener().await.unwrap();
661  
662          assert!(tcp.can_add_connection());
663  
664          // Simulate an active connection.
665          let stream = TcpStream::connect(peer_ip).await.unwrap();
666          tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Initiator));
667          assert!(!tcp.can_add_connection());
668  
669          // Ensure that we cannot invoke connect() successfully in this case.
670          // Use a non-local IP, to ensure it is never qual to peer IP.
671          let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
672          let result = tcp.connect(another_ip).await;
673          assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
674  
675          // Remove the active connection.
676          tcp.connections.remove(peer_ip);
677          assert!(tcp.can_add_connection());
678  
679          // Simulate a pending connection.
680          tcp.connecting.lock().insert(peer_ip);
681          assert!(!tcp.can_add_connection());
682  
683          // Ensure that we cannot invoke connect() successfully in this case either.
684          let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap();
685          let result = tcp.connect(another_ip).await;
686          assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. })));
687  
688          // Remove the pending connection.
689          tcp.connecting.lock().remove(&peer_ip);
690          assert!(tcp.can_add_connection());
691  
692          // Simulate an active and a pending connection (this case should never occur).
693          let stream = TcpStream::connect(peer_ip).await.unwrap();
694          tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Responder));
695          tcp.connecting.lock().insert(peer_ip);
696          assert!(!tcp.can_add_connection());
697  
698          // Remove the active and pending connection.
699          tcp.connections.remove(peer_ip);
700          tcp.connecting.lock().remove(&peer_ip);
701          assert!(tcp.can_add_connection());
702      }
703  
704      #[tokio::test]
705      async fn test_handle_connection() {
706          let tcp = Tcp::new(Config {
707              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
708              max_connections: 1,
709              ..Default::default()
710          });
711  
712          // Initialize peer 1.
713          let peer1 = Tcp::new(Config {
714              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
715              desired_listening_port: Some(0),
716              max_connections: 1,
717              ..Default::default()
718          });
719          let peer1_ip = peer1.enable_listener().await.unwrap();
720  
721          // Simulate an active connection.
722          let stream = TcpStream::connect(peer1_ip).await.unwrap();
723          tcp.connections.add(Connection::new(peer1_ip, stream, ConnectionSide::Responder));
724          assert!(!tcp.can_add_connection());
725          assert_eq!(tcp.num_connected(), 1);
726          assert_eq!(tcp.num_connecting(), 0);
727          assert!(tcp.is_connected(peer1_ip));
728          assert!(!tcp.is_connecting(peer1_ip));
729  
730          // Initialize peer 2.
731          let peer2 = Tcp::new(Config {
732              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
733              desired_listening_port: Some(0),
734              max_connections: 1,
735              ..Default::default()
736          });
737          let peer2_ip = peer2.enable_listener().await.unwrap();
738  
739          // Handle the connection.
740          let stream = TcpStream::connect(peer2_ip).await.unwrap();
741          tcp.handle_connection(stream, peer2_ip);
742          assert!(!tcp.can_add_connection());
743          assert_eq!(tcp.num_connected(), 1);
744          assert_eq!(tcp.num_connecting(), 0);
745          assert!(tcp.is_connected(peer1_ip));
746          assert!(!tcp.is_connected(peer2_ip));
747          assert!(!tcp.is_connecting(peer1_ip));
748          assert!(!tcp.is_connecting(peer2_ip));
749      }
750  
751      #[tokio::test]
752      async fn test_adapt_stream() {
753          let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() });
754  
755          // Initialize the peer.
756          let peer = Tcp::new(Config {
757              listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
758              desired_listening_port: Some(0),
759              max_connections: 1,
760              ..Default::default()
761          });
762          let peer_ip = peer.enable_listener().await.unwrap();
763  
764          // Simulate a pending connection.
765          tcp.connecting.lock().insert(peer_ip);
766          assert_eq!(tcp.num_connected(), 0);
767          assert_eq!(tcp.num_connecting(), 1);
768          assert!(!tcp.is_connected(peer_ip));
769          assert!(tcp.is_connecting(peer_ip));
770  
771          // Simulate a new connection.
772          let stream = TcpStream::connect(peer_ip).await.unwrap();
773          tcp.adapt_stream(stream, peer_ip, ConnectionSide::Responder).await.unwrap();
774          assert_eq!(tcp.num_connected(), 1);
775          assert_eq!(tcp.num_connecting(), 0);
776          assert!(tcp.is_connected(peer_ip));
777          assert!(!tcp.is_connecting(peer_ip));
778      }
779  }