tcp.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use std::{ 20 collections::HashSet, 21 fmt, 22 io, 23 net::{IpAddr, SocketAddr}, 24 ops::Deref, 25 sync::{ 26 atomic::{AtomicUsize, Ordering::*}, 27 Arc, 28 }, 29 time::{Duration, Instant}, 30 }; 31 32 #[cfg(feature = "locktick")] 33 use locktick::parking_lot::Mutex; 34 use once_cell::sync::OnceCell; 35 #[cfg(not(feature = "locktick"))] 36 use parking_lot::Mutex; 37 use tokio::{ 38 io::split, 39 net::{TcpListener, TcpStream}, 40 sync::oneshot, 41 task::JoinHandle, 42 time::timeout, 43 }; 44 use tracing::*; 45 46 use crate::{ 47 connections::{Connection, ConnectionSide, Connections}, 48 protocols::{Protocol, Protocols}, 49 BannedPeers, 50 Config, 51 KnownPeers, 52 Stats, 53 }; 54 55 // A sequential numeric identifier assigned to `Tcp`s that were not provided with a name. 56 static SEQUENTIAL_NODE_ID: AtomicUsize = AtomicUsize::new(0); 57 58 /// The central object responsible for handling connections. 59 #[derive(Clone)] 60 pub struct Tcp(Arc<InnerTcp>); 61 62 impl Deref for Tcp { 63 type Target = Arc<InnerTcp>; 64 65 fn deref(&self) -> &Self::Target { 66 &self.0 67 } 68 } 69 70 /// Error types for the `Tcp::connect` function. 71 #[allow(missing_docs)] 72 #[derive(thiserror::Error, Debug)] 73 pub enum ConnectError { 74 #[error("already reached the maximum number of {limit} connections")] 75 MaximumConnectionsReached { limit: u16 }, 76 #[error("already connecting to node at {address:?}")] 77 AlreadyConnecting { address: SocketAddr }, 78 #[error("already connected to node at {address:?}")] 79 AlreadyConnected { address: SocketAddr }, 80 #[error("attempt to self-connect (at address {address:?}")] 81 SelfConnect { address: SocketAddr }, 82 #[error("I/O error: {0}")] 83 IoError(std::io::Error), 84 } 85 86 impl From<std::io::Error> for ConnectError { 87 fn from(inner: std::io::Error) -> Self { 88 Self::IoError(inner) 89 } 90 } 91 92 #[doc(hidden)] 93 pub struct InnerTcp { 94 /// The tracing span. 95 span: Span, 96 /// The node's configuration. 97 config: Config, 98 /// The node's listening address. 99 listening_addr: OnceCell<SocketAddr>, 100 /// Contains objects used by the protocols implemented by the node. 101 pub(crate) protocols: Protocols, 102 /// A set of connections that have not been finalized yet. 103 connecting: Mutex<HashSet<SocketAddr>>, 104 /// Contains objects related to the node's active connections. 105 connections: Connections, 106 /// Collects statistics related to the node's peers. 107 known_peers: KnownPeers, 108 /// Contains the set of currently banned peers. 109 banned_peers: BannedPeers, 110 /// Collects statistics related to the node itself. 111 stats: Stats, 112 /// The node's tasks. 113 pub(crate) tasks: Mutex<Vec<JoinHandle<()>>>, 114 } 115 116 impl Tcp { 117 /// Creates a new [`Tcp`] using the given [`Config`]. 118 pub fn new(mut config: Config) -> Self { 119 // If there is no pre-configured name, assign a sequential numeric identifier. 120 if config.name.is_none() { 121 config.name = Some(SEQUENTIAL_NODE_ID.fetch_add(1, Relaxed).to_string()); 122 } 123 124 // Create a tracing span containing the node's name. 125 let span = crate::helpers::create_span(config.name.as_deref().unwrap()); 126 127 // Initialize the Tcp stack. 128 let tcp = Tcp(Arc::new(InnerTcp { 129 span, 130 config, 131 listening_addr: Default::default(), 132 protocols: Default::default(), 133 connecting: Default::default(), 134 connections: Default::default(), 135 known_peers: Default::default(), 136 banned_peers: Default::default(), 137 stats: Stats::new(Instant::now()), 138 tasks: Default::default(), 139 })); 140 141 debug!(parent: tcp.span(), "The node is ready"); 142 143 tcp 144 } 145 146 /// Returns the name assigned. 147 #[inline] 148 pub fn name(&self) -> &str { 149 // safe; can be set as None in Config, but receives a default value on Tcp creation 150 self.config.name.as_deref().unwrap() 151 } 152 153 /// Returns a reference to the configuration. 154 #[inline] 155 pub fn config(&self) -> &Config { 156 &self.config 157 } 158 159 /// Returns the listening address; returns an error if Tcp was not configured 160 /// to listen for inbound connections. 161 pub fn listening_addr(&self) -> io::Result<SocketAddr> { 162 self.listening_addr.get().copied().ok_or_else(|| io::ErrorKind::AddrNotAvailable.into()) 163 } 164 165 /// Checks whether the provided address is connected. 166 pub fn is_connected(&self, addr: SocketAddr) -> bool { 167 self.connections.is_connected(addr) 168 } 169 170 /// Checks if Tcp is currently setting up a connection with the provided address. 171 pub fn is_connecting(&self, addr: SocketAddr) -> bool { 172 self.connecting.lock().contains(&addr) 173 } 174 175 /// Returns the number of active connections. 176 pub fn num_connected(&self) -> usize { 177 self.connections.num_connected() 178 } 179 180 /// Returns the number of connections that are currently being set up. 181 pub fn num_connecting(&self) -> usize { 182 self.connecting.lock().len() 183 } 184 185 /// Returns a list containing addresses of active connections. 186 pub fn connected_addrs(&self) -> Vec<SocketAddr> { 187 self.connections.addrs() 188 } 189 190 /// Returns a list containing addresses of pending connections. 191 pub fn connecting_addrs(&self) -> Vec<SocketAddr> { 192 self.connecting.lock().iter().copied().collect() 193 } 194 195 /// Returns a reference to the collection of statistics of known peers. 196 #[inline] 197 pub fn known_peers(&self) -> &KnownPeers { 198 &self.known_peers 199 } 200 201 /// Returns a reference to the set of currently banned peers. 202 #[inline] 203 pub fn banned_peers(&self) -> &BannedPeers { 204 &self.banned_peers 205 } 206 207 /// Returns a reference to the statistics. 208 #[inline] 209 pub fn stats(&self) -> &Stats { 210 &self.stats 211 } 212 213 /// Returns the tracing [`Span`] associated with Tcp. 214 #[inline] 215 pub fn span(&self) -> &Span { 216 &self.span 217 } 218 219 /// Gracefully shuts down the stack. 220 pub async fn shut_down(&self) { 221 debug!(parent: self.span(), "Shutting down the TCP stack"); 222 223 // Retrieve all tasks. 224 let mut tasks = std::mem::take(&mut *self.tasks.lock()).into_iter(); 225 226 // Abort the listening task first. 227 if let Some(listening_task) = tasks.next() { 228 listening_task.abort(); // abort the listening task first 229 } 230 // Disconnect from all connected peers. 231 for addr in self.connected_addrs() { 232 self.disconnect(addr).await; 233 } 234 // Abort all remaining tasks. 235 for handle in tasks { 236 handle.abort(); 237 } 238 } 239 } 240 241 impl Tcp { 242 /// Connects to the provided `SocketAddr`. 243 pub async fn connect(&self, addr: SocketAddr) -> Result<(), ConnectError> { 244 if let Ok(listening_addr) = self.listening_addr() { 245 // TODO(nkls): maybe this first check can be dropped; though it might be best to keep just in case. 246 if addr == listening_addr || self.is_self_connect(addr) { 247 error!(parent: self.span(), "Attempted to self-connect ({addr})"); 248 return Err(ConnectError::SelfConnect { address: addr }); 249 } 250 } 251 252 if !self.can_add_connection() { 253 error!(parent: self.span(), "Too many connections; refusing to connect to {addr}"); 254 return Err(ConnectError::MaximumConnectionsReached { limit: self.config.max_connections }); 255 } 256 257 if self.is_connected(addr) { 258 warn!(parent: self.span(), "Already connected to {addr}"); 259 return Err(ConnectError::AlreadyConnected { address: addr }); 260 } 261 262 if !self.connecting.lock().insert(addr) { 263 warn!(parent: self.span(), "Already connecting to {addr}"); 264 return Err(ConnectError::AlreadyConnecting { address: addr }); 265 } 266 267 let timeout_duration = Duration::from_millis(self.config().connection_timeout_ms.into()); 268 269 // Bind the tcp socket to the configured listener ip if it's set. 270 // Otherwise default to the system's default interface. 271 let res = if let Some(listen_ip) = self.config().listener_ip { 272 let sock = 273 if listen_ip.is_ipv4() { tokio::net::TcpSocket::new_v4()? } else { tokio::net::TcpSocket::new_v6()? }; 274 sock.bind(SocketAddr::new(listen_ip, 0))?; 275 timeout(timeout_duration, sock.connect(addr)).await 276 } else { 277 timeout(timeout_duration, TcpStream::connect(addr)).await 278 }; 279 280 let stream = match res { 281 Ok(Ok(stream)) => Ok(stream), 282 Ok(err) => { 283 self.connecting.lock().remove(&addr); 284 err 285 } 286 Err(err) => { 287 self.connecting.lock().remove(&addr); 288 error!("connection timeout error: {}", err); 289 Err(io::ErrorKind::TimedOut.into()) 290 } 291 }?; 292 293 let ret = self.adapt_stream(stream, addr, ConnectionSide::Initiator).await; 294 295 if let Err(ref e) = ret { 296 self.connecting.lock().remove(&addr); 297 self.known_peers().register_failure(addr.ip()); 298 error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}"); 299 } 300 301 ret.map_err(|err| err.into()) 302 } 303 304 /// Disconnects from the provided `SocketAddr`. 305 /// 306 /// Returns true if the we were connected to the given address. 307 pub async fn disconnect(&self, addr: SocketAddr) -> bool { 308 // claim the disconnect to avoid duplicate executions, or return early if already claimed 309 if let Some(conn) = self.connections.0.read().get(&addr) { 310 if conn.disconnecting.swap(true, Relaxed) { 311 // valid connection, but someone else is already disconnecting it 312 return false; 313 } 314 } else { 315 // not connected 316 return false; 317 }; 318 319 if let Some(handler) = self.protocols.disconnect.get() { 320 let (sender, receiver) = oneshot::channel(); 321 handler.trigger((addr, sender)); 322 let _ = receiver.await; // can't really fail 323 } 324 325 let conn = self.connections.remove(addr); 326 327 if let Some(ref conn) = conn { 328 debug!(parent: self.span(), "Disconnecting from {}", conn.addr()); 329 330 // Shut down the associated tasks of the peer. 331 for task in conn.tasks.iter().rev() { 332 task.abort(); 333 } 334 335 debug!(parent: self.span(), "Disconnected from {}", conn.addr()); 336 } else { 337 warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}"); 338 } 339 340 conn.is_some() 341 } 342 } 343 344 impl Tcp { 345 /// Spawns a task that listens for incoming connections. 346 pub async fn enable_listener(&self) -> io::Result<SocketAddr> { 347 // Retrieve the listening IP address, which must be set. 348 let listener_ip = 349 self.config().listener_ip.expect("Tcp::enable_listener was called, but Config::listener_ip is not set"); 350 351 // Initialize the TCP listener. 352 let listener = self.create_listener(listener_ip).await?; 353 354 // Discover the port, if it was unspecified. 355 let port = listener.local_addr()?.port(); 356 357 // Set the listening IP address. 358 let listening_addr = (listener_ip, port).into(); 359 self.listening_addr.set(listening_addr).expect("The node's listener was started more than once"); 360 361 // Use a channel to know when the listening task is ready. 362 let (tx, rx) = oneshot::channel(); 363 364 let tcp = self.clone(); 365 let listening_task = tokio::spawn(async move { 366 trace!(parent: tcp.span(), "Spawned the listening task"); 367 tx.send(()).unwrap(); // safe; the channel was just opened 368 369 loop { 370 // Await for a new connection. 371 match listener.accept().await { 372 Ok((stream, addr)) => tcp.handle_connection(stream, addr), 373 Err(e) => error!(parent: tcp.span(), "Failed to accept a connection: {e}"), 374 } 375 } 376 }); 377 self.tasks.lock().push(listening_task); 378 let _ = rx.await; 379 debug!(parent: self.span(), "Listening on {listening_addr}"); 380 381 Ok(listening_addr) 382 } 383 384 /// Creates an instance of `TcpListener` based on the node's configuration. 385 async fn create_listener(&self, listener_ip: IpAddr) -> io::Result<TcpListener> { 386 debug!("Creating a TCP listener on {listener_ip}..."); 387 let listener = if let Some(port) = self.config().desired_listening_port { 388 // Construct the desired listening IP address. 389 let desired_listening_addr = SocketAddr::new(listener_ip, port); 390 // If a desired listening port is set, try to bind to it. 391 match TcpListener::bind(desired_listening_addr).await { 392 Ok(listener) => listener, 393 Err(e) => { 394 if self.config().allow_random_port { 395 warn!( 396 parent: self.span(), 397 "Trying any listening port, as the desired port is unavailable: {e}" 398 ); 399 let random_available_addr = SocketAddr::new(listener_ip, 0); 400 TcpListener::bind(random_available_addr).await? 401 } else { 402 error!(parent: self.span(), "The desired listening port is unavailable: {e}"); 403 return Err(e); 404 } 405 } 406 } 407 } else if self.config().allow_random_port { 408 let random_available_addr = SocketAddr::new(listener_ip, 0); 409 TcpListener::bind(random_available_addr).await? 410 } else { 411 panic!("As 'listener_ip' is set, either 'desired_listening_port' or 'allow_random_port' must be set"); 412 }; 413 414 Ok(listener) 415 } 416 417 /// Handles a new inbound connection. 418 fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) { 419 debug!(parent: self.span(), "Received a connection from {addr}"); 420 421 if !self.can_add_connection() || self.is_self_connect(addr) { 422 debug!(parent: self.span(), "Rejecting the connection from {addr}"); 423 return; 424 } 425 426 self.connecting.lock().insert(addr); 427 428 let tcp = self.clone(); 429 tokio::spawn(async move { 430 if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await { 431 tcp.connecting.lock().remove(&addr); 432 tcp.known_peers().register_failure(addr.ip()); 433 error!(parent: tcp.span(), "Failed to connect with {addr}: {e}"); 434 } 435 }); 436 } 437 438 /// Checks if the given IP address is the same as the listening address of this `Tcp`. 439 fn is_self_connect(&self, addr: SocketAddr) -> bool { 440 // SAFETY: if we're opening connections, this should never fail. 441 let listening_addr = self.listening_addr().unwrap(); 442 443 match listening_addr.ip().is_loopback() { 444 // If localhost, check the ports, this only works on outbound connections, since we 445 // don't know the ephemeral port a peer might be using if they initiate the connection. 446 true => listening_addr.port() == addr.port(), 447 // If it's not localhost, matching IPs indicate a self-connect in both directions. 448 false => listening_addr.ip() == addr.ip(), 449 } 450 } 451 452 /// Checks whether the `Tcp` can handle an additional connection. 453 fn can_add_connection(&self) -> bool { 454 // Retrieve the number of connected peers. 455 let num_connected = self.num_connected(); 456 // Retrieve the maximum number of connected peers. 457 let limit = self.config.max_connections as usize; 458 459 if num_connected >= limit { 460 warn!(parent: self.span(), "Maximum number of active connections ({limit}) reached"); 461 false 462 } else if num_connected + self.num_connecting() >= limit { 463 warn!(parent: self.span(), "Maximum number of active & pending connections ({limit}) reached"); 464 false 465 } else { 466 true 467 } 468 } 469 470 /// Prepares the freshly acquired connection to handle the protocols the Tcp implements. 471 async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> { 472 self.known_peers.add(peer_addr.ip()); 473 474 // Register the port seen by the peer. 475 if own_side == ConnectionSide::Initiator { 476 if let Ok(addr) = stream.local_addr() { 477 debug!( 478 parent: self.span(), "establishing connection with {}; the peer is connected on port {}", 479 peer_addr, addr.port() 480 ); 481 } else { 482 warn!(parent: self.span(), "couldn't determine the peer's port"); 483 } 484 } 485 486 let connection = Connection::new(peer_addr, stream, !own_side); 487 488 // Enact the enabled protocols. 489 let mut connection = self.enable_protocols(connection).await?; 490 491 // if Reading is enabled, we'll notify the related task when the connection is fully ready. 492 let conn_ready_tx = connection.readiness_notifier.take(); 493 494 self.connections.add(connection); 495 self.connecting.lock().remove(&peer_addr); 496 497 // Send the aforementioned notification so that reading from the socket can commence. 498 if let Some(tx) = conn_ready_tx { 499 let _ = tx.send(()); 500 } 501 502 // If enabled, enact OnConnect. 503 if let Some(handler) = self.protocols.on_connect.get() { 504 let (sender, receiver) = oneshot::channel(); 505 handler.trigger((peer_addr, sender)); 506 let _ = receiver.await; // can't really fail 507 } 508 509 Ok(()) 510 } 511 512 /// Enacts the enabled protocols on the provided connection. 513 async fn enable_protocols(&self, conn: Connection) -> io::Result<Connection> { 514 /// A helper macro to enable a protocol on a connection. 515 macro_rules! enable_protocol { 516 ($handler_type: ident, $node:expr, $conn: expr) => { 517 if let Some(handler) = $node.protocols.$handler_type.get() { 518 let (conn_returner, conn_retriever) = oneshot::channel(); 519 520 handler.trigger(($conn, conn_returner)); 521 522 match conn_retriever.await { 523 Ok(Ok(conn)) => conn, 524 Err(_) => return Err(io::ErrorKind::BrokenPipe.into()), 525 Ok(e) => return e, 526 } 527 } else { 528 $conn 529 } 530 }; 531 } 532 533 let mut conn = enable_protocol!(handshake, self, conn); 534 535 // Split the stream after the handshake (if not done before). 536 if let Some(stream) = conn.stream.take() { 537 let (reader, writer) = split(stream); 538 conn.reader = Some(Box::new(reader)); 539 conn.writer = Some(Box::new(writer)); 540 } 541 542 let conn = enable_protocol!(reading, self, conn); 543 let conn = enable_protocol!(writing, self, conn); 544 545 Ok(conn) 546 } 547 } 548 549 impl fmt::Debug for Tcp { 550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 551 write!(f, "The TCP stack config: {:?}", self.config) 552 } 553 } 554 555 #[cfg(test)] 556 mod tests { 557 use super::*; 558 559 use std::{ 560 net::{IpAddr, Ipv4Addr}, 561 str::FromStr, 562 }; 563 564 #[tokio::test] 565 async fn test_new() { 566 let tcp = Tcp::new(Config { 567 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 568 max_connections: 200, 569 ..Default::default() 570 }); 571 572 assert_eq!(tcp.config.max_connections, 200); 573 assert_eq!(tcp.config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST))); 574 assert_eq!(tcp.enable_listener().await.unwrap().ip(), IpAddr::V4(Ipv4Addr::LOCALHOST)); 575 576 assert_eq!(tcp.num_connected(), 0); 577 assert_eq!(tcp.num_connecting(), 0); 578 } 579 580 #[tokio::test] 581 async fn test_connect() { 582 let tcp = Tcp::new(Config::default()); 583 let node_ip = tcp.enable_listener().await.unwrap(); 584 585 // Ensure self-connecting is not possible. 586 let result = tcp.connect(node_ip).await; 587 assert!(matches!(result, Err(ConnectError::SelfConnect { .. }))); 588 589 assert_eq!(tcp.num_connected(), 0); 590 assert_eq!(tcp.num_connecting(), 0); 591 assert!(!tcp.is_connected(node_ip)); 592 assert!(!tcp.is_connecting(node_ip)); 593 594 // Initialize the peer. 595 let peer = Tcp::new(Config { 596 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 597 desired_listening_port: Some(0), 598 max_connections: 1, 599 ..Default::default() 600 }); 601 let peer_ip = peer.enable_listener().await.unwrap(); 602 603 // Connect to the peer. 604 tcp.connect(peer_ip).await.unwrap(); 605 assert_eq!(tcp.num_connected(), 1); 606 assert_eq!(tcp.num_connecting(), 0); 607 assert!(tcp.is_connected(peer_ip)); 608 assert!(!tcp.is_connecting(peer_ip)); 609 } 610 611 #[tokio::test] 612 async fn test_disconnect() { 613 let tcp = Tcp::new(Config::default()); 614 let _node_ip = tcp.enable_listener().await.unwrap(); 615 616 // Initialize the peer. 617 let peer = Tcp::new(Config { 618 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 619 desired_listening_port: Some(0), 620 max_connections: 1, 621 ..Default::default() 622 }); 623 let peer_ip = peer.enable_listener().await.unwrap(); 624 625 // Connect to the peer. 626 tcp.connect(peer_ip).await.unwrap(); 627 assert_eq!(tcp.num_connected(), 1); 628 assert_eq!(tcp.num_connecting(), 0); 629 assert!(tcp.is_connected(peer_ip)); 630 assert!(!tcp.is_connecting(peer_ip)); 631 632 // Disconnect from the peer. 633 let has_disconnected = tcp.disconnect(peer_ip).await; 634 assert!(has_disconnected); 635 assert_eq!(tcp.num_connected(), 0); 636 assert_eq!(tcp.num_connecting(), 0); 637 assert!(!tcp.is_connected(peer_ip)); 638 assert!(!tcp.is_connecting(peer_ip)); 639 640 // Ensure disconnecting from the peer a second time is okay. 641 let has_disconnected = tcp.disconnect(peer_ip).await; 642 assert!(!has_disconnected); 643 assert_eq!(tcp.num_connected(), 0); 644 assert_eq!(tcp.num_connecting(), 0); 645 assert!(!tcp.is_connected(peer_ip)); 646 assert!(!tcp.is_connecting(peer_ip)); 647 } 648 649 #[tokio::test] 650 async fn test_can_add_connection() { 651 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() }); 652 653 // Initialize the peer. 654 let peer = Tcp::new(Config { 655 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 656 desired_listening_port: Some(0), 657 max_connections: 1, 658 ..Default::default() 659 }); 660 let peer_ip = peer.enable_listener().await.unwrap(); 661 662 assert!(tcp.can_add_connection()); 663 664 // Simulate an active connection. 665 let stream = TcpStream::connect(peer_ip).await.unwrap(); 666 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Initiator)); 667 assert!(!tcp.can_add_connection()); 668 669 // Ensure that we cannot invoke connect() successfully in this case. 670 // Use a non-local IP, to ensure it is never qual to peer IP. 671 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap(); 672 let result = tcp.connect(another_ip).await; 673 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. }))); 674 675 // Remove the active connection. 676 tcp.connections.remove(peer_ip); 677 assert!(tcp.can_add_connection()); 678 679 // Simulate a pending connection. 680 tcp.connecting.lock().insert(peer_ip); 681 assert!(!tcp.can_add_connection()); 682 683 // Ensure that we cannot invoke connect() successfully in this case either. 684 let another_ip = SocketAddr::from_str("1.2.3.4:4242").unwrap(); 685 let result = tcp.connect(another_ip).await; 686 assert!(matches!(result, Err(ConnectError::MaximumConnectionsReached { .. }))); 687 688 // Remove the pending connection. 689 tcp.connecting.lock().remove(&peer_ip); 690 assert!(tcp.can_add_connection()); 691 692 // Simulate an active and a pending connection (this case should never occur). 693 let stream = TcpStream::connect(peer_ip).await.unwrap(); 694 tcp.connections.add(Connection::new(peer_ip, stream, ConnectionSide::Responder)); 695 tcp.connecting.lock().insert(peer_ip); 696 assert!(!tcp.can_add_connection()); 697 698 // Remove the active and pending connection. 699 tcp.connections.remove(peer_ip); 700 tcp.connecting.lock().remove(&peer_ip); 701 assert!(tcp.can_add_connection()); 702 } 703 704 #[tokio::test] 705 async fn test_handle_connection() { 706 let tcp = Tcp::new(Config { 707 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 708 max_connections: 1, 709 ..Default::default() 710 }); 711 712 // Initialize peer 1. 713 let peer1 = Tcp::new(Config { 714 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 715 desired_listening_port: Some(0), 716 max_connections: 1, 717 ..Default::default() 718 }); 719 let peer1_ip = peer1.enable_listener().await.unwrap(); 720 721 // Simulate an active connection. 722 let stream = TcpStream::connect(peer1_ip).await.unwrap(); 723 tcp.connections.add(Connection::new(peer1_ip, stream, ConnectionSide::Responder)); 724 assert!(!tcp.can_add_connection()); 725 assert_eq!(tcp.num_connected(), 1); 726 assert_eq!(tcp.num_connecting(), 0); 727 assert!(tcp.is_connected(peer1_ip)); 728 assert!(!tcp.is_connecting(peer1_ip)); 729 730 // Initialize peer 2. 731 let peer2 = Tcp::new(Config { 732 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 733 desired_listening_port: Some(0), 734 max_connections: 1, 735 ..Default::default() 736 }); 737 let peer2_ip = peer2.enable_listener().await.unwrap(); 738 739 // Handle the connection. 740 let stream = TcpStream::connect(peer2_ip).await.unwrap(); 741 tcp.handle_connection(stream, peer2_ip); 742 assert!(!tcp.can_add_connection()); 743 assert_eq!(tcp.num_connected(), 1); 744 assert_eq!(tcp.num_connecting(), 0); 745 assert!(tcp.is_connected(peer1_ip)); 746 assert!(!tcp.is_connected(peer2_ip)); 747 assert!(!tcp.is_connecting(peer1_ip)); 748 assert!(!tcp.is_connecting(peer2_ip)); 749 } 750 751 #[tokio::test] 752 async fn test_adapt_stream() { 753 let tcp = Tcp::new(Config { max_connections: 1, ..Default::default() }); 754 755 // Initialize the peer. 756 let peer = Tcp::new(Config { 757 listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), 758 desired_listening_port: Some(0), 759 max_connections: 1, 760 ..Default::default() 761 }); 762 let peer_ip = peer.enable_listener().await.unwrap(); 763 764 // Simulate a pending connection. 765 tcp.connecting.lock().insert(peer_ip); 766 assert_eq!(tcp.num_connected(), 0); 767 assert_eq!(tcp.num_connecting(), 1); 768 assert!(!tcp.is_connected(peer_ip)); 769 assert!(tcp.is_connecting(peer_ip)); 770 771 // Simulate a new connection. 772 let stream = TcpStream::connect(peer_ip).await.unwrap(); 773 tcp.adapt_stream(stream, peer_ip, ConnectionSide::Responder).await.unwrap(); 774 assert_eq!(tcp.num_connected(), 1); 775 assert_eq!(tcp.num_connecting(), 0); 776 assert!(tcp.is_connected(peer_ip)); 777 assert!(!tcp.is_connecting(peer_ip)); 778 } 779 }