peers_reliable.rs
1 //! Implements a connection manager for communication with other federation 2 //! members 3 //! 4 //! The main interface is [`fedimint_core::net::peers::IPeerConnections`] and 5 //! its main implementation is [`ReconnectPeerConnectionsReliable`], see these 6 //! for details. 7 8 use std::collections::HashMap; 9 use std::fmt::Debug; 10 use std::time::Duration; 11 12 use anyhow::Context; 13 use async_trait::async_trait; 14 use fedimint_api_client::api::PeerConnectionStatus; 15 use fedimint_core::net::peers::IPeerConnections; 16 use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle}; 17 use fedimint_core::util::SafeUrl; 18 use fedimint_core::PeerId; 19 use fedimint_logging::LOG_NET_PEER; 20 use futures::future::select_all; 21 use futures::{SinkExt, StreamExt}; 22 use serde::de::DeserializeOwned; 23 use serde::{Deserialize, Serialize}; 24 use tokio::sync::mpsc::{Receiver, Sender}; 25 use tokio::sync::oneshot; 26 use tokio::time::Instant; 27 use tracing::{debug, info, instrument, trace, warn}; 28 29 use crate::net::connect::{AnyConnector, SharedAnyConnector}; 30 use crate::net::framed::AnyFramedTransport; 31 use crate::net::peers::{DelayCalculator, NetworkConfig}; 32 use crate::net::queue::{MessageId, MessageQueue, UniqueMessage}; 33 34 /// Every how many seconds to send an empty message to our peer if we sent no 35 /// messages during that time. This helps with reducing the amount of messages 36 /// that need to be re-sent in case of very one-sided communication. 37 const PING_INTERVAL: Duration = Duration::from_secs(10); 38 39 /// Owned [`Connector`](crate::net::connect::Connector) trait object used by 40 /// [`ReconnectPeerConnectionsReliable`] 41 pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>; 42 43 /// Connection manager that automatically reconnects to peers 44 /// 45 /// `ReconnectPeerConnections` is based on a 46 /// [`Connector`](crate::net::connect::Connector) object which is used to open 47 /// [`FramedTransport`](crate::net::framed::FramedTransport) connections. For 48 /// production deployments the `Connector` has to ensure that connections are 49 /// authenticated and encrypted. 50 pub struct ReconnectPeerConnectionsReliable<T> { 51 connections: HashMap<PeerId, PeerConnection<T>>, 52 } 53 54 struct PeerConnection<T> { 55 outgoing: Sender<T>, 56 incoming: Receiver<T>, 57 } 58 59 /// Internal message type for [`ReconnectPeerConnectionsReliable`], just public 60 /// because it appears in the public interface. 61 #[derive(Debug, Clone, Serialize, Deserialize)] 62 pub struct PeerMessage<M> { 63 msg: Option<UniqueMessage<M>>, 64 ack: Option<MessageId>, 65 } 66 67 struct PeerConnectionStateMachine<M> { 68 common: CommonPeerConnectionState<M>, 69 state: PeerConnectionState<M>, 70 } 71 72 struct PeerStatusQuery { 73 response_sender: oneshot::Sender<PeerConnectionStatus>, 74 } 75 76 type PeerStatusChannelSender = Sender<PeerStatusQuery>; 77 type PeerStatusChannelReceiver = Receiver<PeerStatusQuery>; 78 79 /// Keeps the references to a `PeerStatusChannelSender` for each `PeerId`, which 80 /// can be used to ask the corresponding `PeerConnectionStateMachine` for the 81 /// current `PeerConnectionStatus` 82 #[derive(Clone)] 83 pub struct PeerStatusChannels(HashMap<PeerId, PeerStatusChannelSender>); 84 85 impl PeerStatusChannels { 86 pub async fn get_all_status(&self) -> HashMap<PeerId, anyhow::Result<PeerConnectionStatus>> { 87 let results = self.0.iter().map(|(peer_id, sender)| async { 88 let (response_sender, response_receiver) = oneshot::channel(); 89 let query = PeerStatusQuery { response_sender }; 90 let sender_response = sender 91 .send(query) 92 .await 93 .map_err(|_| anyhow::anyhow!("channel closed while querying peer status")); 94 match sender_response { 95 Ok(()) => { 96 let status = response_receiver 97 .await 98 .map_err(|_| anyhow::anyhow!("channel closed while receiving peer status")); 99 (*peer_id, status) 100 } 101 Err(e) => (*peer_id, Err(e)), 102 } 103 }); 104 futures::future::join_all(results) 105 .await 106 .into_iter() 107 .collect() 108 } 109 } 110 111 struct CommonPeerConnectionState<M> { 112 resend_queue: MessageQueue<M>, 113 incoming: Sender<M>, 114 outgoing: Receiver<M>, 115 our_id: PeerId, 116 peer_id: PeerId, 117 peer_address: SafeUrl, 118 delay_calculator: DelayCalculator, 119 connect: SharedAnyConnector<PeerMessage<M>>, 120 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>, 121 last_received: Option<MessageId>, 122 status_query_receiver: PeerStatusChannelReceiver, 123 } 124 125 struct DisconnectedPeerConnectionState { 126 reconnect_at: Instant, 127 failed_reconnect_counter: u64, 128 } 129 130 struct ConnectedPeerConnectionState<M> { 131 connection: AnyFramedTransport<PeerMessage<M>>, 132 next_ping: Instant, 133 } 134 135 enum PeerConnectionState<M> { 136 Disconnected(DisconnectedPeerConnectionState), 137 Connected(ConnectedPeerConnectionState<M>), 138 } 139 140 impl<T: 'static> ReconnectPeerConnectionsReliable<T> 141 where 142 T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync, 143 { 144 /// Creates a new `ReconnectPeerConnections` connection manager from a 145 /// network config and a [`Connector`](crate::net::connect::Connector). 146 /// See [`ReconnectPeerConnectionsReliable`] for requirements on the 147 /// `Connector`. 148 #[instrument(skip_all)] 149 pub(crate) async fn new( 150 cfg: NetworkConfig, 151 delay_calculator: DelayCalculator, 152 connect: PeerConnector<T>, 153 task_group: &mut TaskGroup, 154 ) -> (Self, PeerStatusChannels) { 155 let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into(); 156 let mut connection_senders = HashMap::new(); 157 let mut status_query_senders = HashMap::new(); 158 let mut connections = HashMap::new(); 159 160 for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) { 161 let (connection_sender, connection_receiver) = 162 tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4); 163 let (status_query_sender, status_query_receiver) = 164 tokio::sync::mpsc::channel::<PeerStatusQuery>(1); // better block the sender than flood the receiver 165 166 let connection = PeerConnection::new( 167 cfg.identity, 168 *peer, 169 peer_address.clone(), 170 delay_calculator, 171 shared_connector.clone(), 172 connection_receiver, 173 status_query_receiver, 174 task_group, 175 ) 176 .await; 177 178 connection_senders.insert(*peer, connection_sender); 179 status_query_senders.insert(*peer, status_query_sender); 180 connections.insert(*peer, connection); 181 } 182 task_group.spawn("listen task", move |handle| { 183 Self::run_listen_task(cfg, shared_connector, connection_senders, handle) 184 }); 185 ( 186 ReconnectPeerConnectionsReliable { connections }, 187 PeerStatusChannels(status_query_senders), 188 ) 189 } 190 191 async fn run_listen_task( 192 cfg: NetworkConfig, 193 connect: SharedAnyConnector<PeerMessage<T>>, 194 mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>, 195 task_handle: TaskHandle, 196 ) { 197 let mut listener = connect 198 .listen(cfg.bind_addr) 199 .await 200 .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.bind_addr)) 201 .expect("Could not bind port"); 202 203 let mut shutdown_rx = task_handle.make_shutdown_rx().await; 204 205 while !task_handle.is_shutting_down() { 206 let new_connection = tokio::select! { 207 maybe_msg = listener.next() => { maybe_msg }, 208 _ = &mut shutdown_rx => { break; }, 209 }; 210 211 let (peer, connection) = match new_connection.expect("Listener closed") { 212 Ok(connection) => connection, 213 Err(e) => { 214 warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection"); 215 continue; 216 } 217 }; 218 219 let err = connection_senders 220 .get_mut(&peer) 221 .expect("Authenticating connectors should not return unknown peers") 222 .send(connection) 223 .await 224 .is_err(); 225 226 if err { 227 warn!( 228 target: LOG_NET_PEER, 229 ?peer, 230 "Could not send incoming connection to peer io task (possibly banned)" 231 ); 232 } 233 } 234 } 235 } 236 237 #[async_trait] 238 impl<T> IPeerConnections<T> for ReconnectPeerConnectionsReliable<T> 239 where 240 T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static, 241 { 242 #[must_use] 243 async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> { 244 for peer_id in peers { 245 trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to"); 246 if let Some(peer) = self.connections.get_mut(peer_id) { 247 peer.send(msg.clone()).await?; 248 } else { 249 trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)"); 250 } 251 } 252 Ok(()) 253 } 254 255 async fn receive(&mut self) -> Cancellable<(PeerId, T)> { 256 // if all peers banned (or just solo-federation), just hang here as there's 257 // never going to be any message. This avoids panic on `select_all` with 258 // no futures. 259 if self.connections.is_empty() { 260 std::future::pending().await 261 } 262 263 // TODO: optimize, don't throw away remaining futures 264 265 let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| { 266 let receive_future = async move { 267 let msg = connection.receive().await; 268 (peer, msg) 269 }; 270 Box::pin(receive_future) 271 }); 272 273 let first_response = select_all(futures_non_banned).await; 274 275 first_response.0 .1.map(|v| (first_response.0 .0, v)) 276 } 277 278 async fn ban_peer(&mut self, peer: PeerId) { 279 self.connections.remove(&peer); 280 warn!(target: LOG_NET_PEER, "Peer {} banned.", peer); 281 } 282 } 283 284 impl<M> PeerConnectionStateMachine<M> 285 where 286 M: Debug + Clone, 287 { 288 async fn run(mut self, task_handle: &TaskHandle) { 289 let peer = self.common.peer_id; 290 291 // Note: `state_transition` internally uses channel operations (`send` and 292 // `recv`) which will disconnect when other tasks are shutting down 293 // returning here, so we probably don't need any `timeout` here. 294 while !task_handle.is_shutting_down() { 295 if let Some(new_self) = self.state_transition(task_handle).await { 296 self = new_self; 297 } else { 298 break; 299 } 300 } 301 info!( 302 target: LOG_NET_PEER, 303 ?peer, 304 "Shutting down peer connection state machine" 305 ); 306 } 307 308 async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> { 309 let PeerConnectionStateMachine { mut common, state } = self; 310 311 match state { 312 PeerConnectionState::Disconnected(disconnected) => { 313 common 314 .state_transition_disconnected(disconnected, task_handle) 315 .await 316 } 317 PeerConnectionState::Connected(connected) => { 318 common 319 .state_transition_connected(connected, task_handle) 320 .await 321 } 322 } 323 .map(|new_state| PeerConnectionStateMachine { 324 common, 325 state: new_state, 326 }) 327 } 328 } 329 330 impl<M> CommonPeerConnectionState<M> 331 where 332 M: Debug + Clone, 333 { 334 async fn state_transition_connected( 335 &mut self, 336 mut connected: ConnectedPeerConnectionState<M>, 337 task_handle: &TaskHandle, 338 ) -> Option<PeerConnectionState<M>> { 339 Some(tokio::select! { 340 maybe_msg = self.outgoing.recv() => { 341 match maybe_msg { 342 Some(msg) => { 343 self.send_message_connected(connected, msg).await 344 }, 345 None => { 346 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected"); 347 return None; 348 }, 349 } 350 }, 351 new_connection_res = self.incoming_connections.recv() => { 352 match new_connection_res { 353 Some(new_connection) => { 354 debug!(target: LOG_NET_PEER, "Replacing existing connection"); 355 self.connect(new_connection, 0).await 356 }, 357 None => { 358 debug!( 359 target: LOG_NET_PEER, 360 "Exiting peer connection IO task - parent disconnected"); 361 return None; 362 }, 363 } 364 }, 365 Some(status_query) = self.status_query_receiver.recv() => { 366 if status_query.response_sender.send(PeerConnectionStatus::Connected).is_err() { 367 let peer_id = self.peer_id; 368 debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped"); 369 } 370 PeerConnectionState::Connected(connected) 371 }, 372 Some(msg_res) = connected.connection.next() => { 373 self.receive_message(connected, msg_res).await 374 }, 375 _ = sleep_until(connected.next_ping) => { 376 self.send_ping(connected).await 377 }, 378 _ = task_handle.make_shutdown_rx().await => { 379 return None; 380 }, 381 }) 382 } 383 384 async fn connect( 385 &mut self, 386 mut new_connection: AnyFramedTransport<PeerMessage<M>>, 387 disconnect_count: u64, 388 ) -> PeerConnectionState<M> { 389 debug!(target: LOG_NET_PEER, 390 our_id = ?self.our_id, 391 peer = ?self.peer_id, %disconnect_count, 392 resend_queue_len = self.resend_queue.queue.len(), 393 "Initializing new connection"); 394 match self.resend_buffer_contents(&mut new_connection).await { 395 Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState { 396 connection: new_connection, 397 next_ping: Instant::now(), 398 }), 399 Err(e) => self.disconnect_err(e, disconnect_count), 400 } 401 } 402 403 async fn resend_buffer_contents( 404 &self, 405 connection: &mut AnyFramedTransport<PeerMessage<M>>, 406 ) -> Result<(), anyhow::Error> { 407 for msg in self.resend_queue.iter().cloned() { 408 connection 409 .send(PeerMessage { 410 msg: Some(msg), 411 ack: self.last_received, 412 }) 413 .await? 414 } 415 416 Ok(()) 417 } 418 419 fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> { 420 disconnect_count += 1; 421 422 let reconnect_at = { 423 let delay = self.delay_calculator.reconnection_delay(disconnect_count); 424 let delay_secs = delay.as_secs_f64(); 425 debug!( 426 target: LOG_NET_PEER, 427 %disconnect_count, 428 our_id = ?self.our_id, 429 peer = ?self.peer_id, 430 delay_secs, 431 "Scheduling reopening of connection" 432 ); 433 Instant::now() + delay 434 }; 435 436 PeerConnectionState::Disconnected(DisconnectedPeerConnectionState { 437 reconnect_at, 438 failed_reconnect_counter: disconnect_count, 439 }) 440 } 441 442 fn disconnect_err(&self, err: anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> { 443 debug!(target: LOG_NET_PEER, 444 our_id = ?self.our_id, 445 peer = ?self.peer_id, %err, %disconnect_count, "Peer disconnected"); 446 self.disconnect(disconnect_count) 447 } 448 449 async fn send_message_connected( 450 &mut self, 451 connected: ConnectedPeerConnectionState<M>, 452 msg: M, 453 ) -> PeerConnectionState<M> { 454 let umsg = self.resend_queue.push(msg); 455 trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?umsg.id, "Sending outgoing message"); 456 457 self.send_message_connected_inner(connected, Some(umsg)) 458 .await 459 } 460 461 async fn send_ping( 462 &mut self, 463 connected: ConnectedPeerConnectionState<M>, 464 ) -> PeerConnectionState<M> { 465 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping"); 466 self.send_message_connected_inner(connected, None).await 467 } 468 469 async fn send_message_connected_inner( 470 &mut self, 471 mut connected: ConnectedPeerConnectionState<M>, 472 maybe_msg: Option<UniqueMessage<M>>, 473 ) -> PeerConnectionState<M> { 474 if let Err(e) = connected 475 .connection 476 .send(PeerMessage { 477 msg: maybe_msg, 478 ack: self.last_received, 479 }) 480 .await 481 { 482 return self.disconnect_err(e, 0); 483 } 484 485 connected.next_ping = Instant::now() + PING_INTERVAL; 486 487 match connected.connection.flush().await { 488 Ok(()) => PeerConnectionState::Connected(connected), 489 Err(e) => self.disconnect_err(e, 0), 490 } 491 } 492 493 async fn receive_message( 494 &mut self, 495 connected: ConnectedPeerConnectionState<M>, 496 msg_res: Result<PeerMessage<M>, anyhow::Error>, 497 ) -> PeerConnectionState<M> { 498 match self.receive_message_inner(msg_res).await { 499 Ok(()) => PeerConnectionState::Connected(connected), 500 Err(e) => { 501 self.last_received = None; 502 self.disconnect_err(e, 0) 503 } 504 } 505 } 506 507 async fn receive_message_inner( 508 &mut self, 509 msg_res: Result<PeerMessage<M>, anyhow::Error>, 510 ) -> Result<(), anyhow::Error> { 511 let PeerMessage { msg, ack } = msg_res?; 512 513 // Process ACK no matter if we received a message or not 514 if let Some(ack) = ack { 515 trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, ?ack, "Received ACK for sent message"); 516 self.resend_queue.ack(ack); 517 } 518 519 if let Some(msg) = msg { 520 trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?msg.id, "Received incoming message"); 521 522 let expected = self 523 .last_received 524 .map(|last_id| last_id.increment()) 525 .unwrap_or(msg.id); 526 527 if msg.id < expected { 528 info!(target: LOG_NET_PEER, 529 ?expected, received = ?msg.id, "Received old message"); 530 return Ok(()); 531 } 532 533 if msg.id > expected { 534 warn!(target: LOG_NET_PEER, ?expected, received = ?msg.id, "Received message from the future"); 535 return Err(anyhow::anyhow!("Received message from the future")); 536 } 537 538 self.last_received = Some(expected); 539 540 debug_assert_eq!(expected, msg.id, "someone removed the check above"); 541 if self.incoming.send(msg.msg).await.is_err() { 542 // ignore error - if the other side is not there, 543 // it means we're are probably shutting down 544 debug!( 545 target: LOG_NET_PEER, 546 "Could not deliver message to recipient - probably shutting down" 547 ); 548 } 549 } 550 551 Ok(()) 552 } 553 554 async fn state_transition_disconnected( 555 &mut self, 556 disconnected: DisconnectedPeerConnectionState, 557 task_handle: &TaskHandle, 558 ) -> Option<PeerConnectionState<M>> { 559 Some(tokio::select! { 560 maybe_msg = self.outgoing.recv() => { 561 match maybe_msg { 562 Some(msg) => { 563 self.send_message(disconnected, msg).await} 564 None => { 565 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected"); 566 return None; 567 } 568 } 569 }, 570 new_connection_res = self.incoming_connections.recv() => { 571 match new_connection_res { 572 Some(new_connection) => { 573 self.receive_connection(disconnected, new_connection).await 574 }, 575 None => { 576 debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected"); 577 return None; 578 }, 579 } 580 }, 581 Some(status_query) = self.status_query_receiver.recv() => { 582 if status_query.response_sender.send(PeerConnectionStatus::Disconnected).is_err() { 583 let peer_id = self.peer_id; 584 debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped"); 585 } 586 PeerConnectionState::Disconnected(disconnected) 587 }, 588 () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => { 589 // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting 590 self.reconnect(disconnected).await 591 }, 592 _ = task_handle.make_shutdown_rx().await => { 593 return None; 594 }, 595 }) 596 } 597 598 async fn send_message( 599 &mut self, 600 disconnected: DisconnectedPeerConnectionState, 601 msg: M, 602 ) -> PeerConnectionState<M> { 603 let umsg = self.resend_queue.push(msg); 604 trace!(target: LOG_NET_PEER, id = ?umsg.id, "Queueing outgoing message"); 605 PeerConnectionState::Disconnected(disconnected) 606 } 607 608 async fn receive_connection( 609 &mut self, 610 disconnect: DisconnectedPeerConnectionState, 611 new_connection: AnyFramedTransport<PeerMessage<M>>, 612 ) -> PeerConnectionState<M> { 613 self.connect(new_connection, disconnect.failed_reconnect_counter) 614 .await 615 } 616 617 async fn reconnect( 618 &mut self, 619 disconnected: DisconnectedPeerConnectionState, 620 ) -> PeerConnectionState<M> { 621 match self.try_reconnect().await { 622 Ok(conn) => { 623 self.connect(conn, disconnected.failed_reconnect_counter) 624 .await 625 } 626 Err(e) => self.disconnect_err(e, disconnected.failed_reconnect_counter), 627 } 628 } 629 630 async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> { 631 debug!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Trying to reconnect"); 632 let addr = self.peer_address.clone(); 633 let (connected_peer, conn) = self.connect.connect_framed(addr, self.peer_id).await?; 634 635 if connected_peer == self.peer_id { 636 Ok(conn) 637 } else { 638 Err(anyhow::anyhow!( 639 "Peer identified itself incorrectly: {:?}", 640 connected_peer 641 )) 642 } 643 } 644 } 645 646 impl<M> PeerConnection<M> 647 where 648 M: Debug + Clone + Send + Sync + 'static, 649 { 650 #[allow(clippy::too_many_arguments)] 651 async fn new( 652 our_id: PeerId, 653 peer_id: PeerId, 654 peer_address: SafeUrl, 655 delay_calculator: DelayCalculator, 656 connect: SharedAnyConnector<PeerMessage<M>>, 657 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>, 658 status_query_receiver: PeerStatusChannelReceiver, 659 task_group: &mut TaskGroup, 660 ) -> PeerConnection<M> { 661 let (outgoing_sender, outgoing_receiver) = tokio::sync::mpsc::channel::<M>(1024); 662 let (incoming_sender, incoming_receiver) = tokio::sync::mpsc::channel::<M>(1024); 663 664 task_group.spawn( 665 format!("io-thread-peer-{peer_id}"), 666 move |handle| async move { 667 Self::run_io_thread( 668 incoming_sender, 669 outgoing_receiver, 670 our_id, 671 peer_id, 672 peer_address, 673 delay_calculator, 674 connect, 675 incoming_connections, 676 status_query_receiver, 677 &handle, 678 ) 679 .await 680 }, 681 ); 682 683 PeerConnection { 684 outgoing: outgoing_sender, 685 incoming: incoming_receiver, 686 } 687 } 688 689 async fn send(&mut self, msg: M) -> Cancellable<()> { 690 self.outgoing.send(msg).await.map_err(|_e| Cancelled) 691 } 692 693 async fn receive(&mut self) -> Cancellable<M> { 694 self.incoming.recv().await.ok_or(Cancelled) 695 } 696 697 #[allow(clippy::too_many_arguments)] // TODO: consider refactoring 698 #[instrument(skip_all, fields(peer))] 699 async fn run_io_thread( 700 incoming: Sender<M>, 701 outgoing: Receiver<M>, 702 our_id: PeerId, 703 peer_id: PeerId, 704 peer_address: SafeUrl, 705 delay_calculator: DelayCalculator, 706 connect: SharedAnyConnector<PeerMessage<M>>, 707 incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>, 708 status_query_receiver: PeerStatusChannelReceiver, 709 task_handle: &TaskHandle, 710 ) { 711 let common = CommonPeerConnectionState { 712 resend_queue: Default::default(), 713 incoming, 714 outgoing, 715 our_id, 716 peer_id, 717 peer_address, 718 delay_calculator, 719 connect, 720 incoming_connections, 721 status_query_receiver, 722 last_received: None, 723 }; 724 let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState { 725 reconnect_at: Instant::now(), 726 failed_reconnect_counter: 0, 727 }); 728 729 let state_machine = PeerConnectionStateMachine { 730 common, 731 state: initial_state, 732 }; 733 734 state_machine.run(task_handle).await; 735 } 736 } 737 738 #[cfg(test)] 739 mod tests { 740 use std::collections::HashMap; 741 use std::time::Duration; 742 743 use fedimint_core::task::TaskGroup; 744 use fedimint_core::PeerId; 745 use futures::Future; 746 747 use super::DelayCalculator; 748 use crate::fedimint_core::net::peers::IPeerConnections; 749 use crate::net::connect::mock::{MockNetwork, StreamReliability}; 750 use crate::net::connect::Connector; 751 use crate::net::peers::NetworkConfig; 752 use crate::net::peers_reliable::ReconnectPeerConnectionsReliable; 753 754 async fn timeout<F, T>(f: F) -> Option<T> 755 where 756 F: Future<Output = T>, 757 { 758 tokio::time::timeout(Duration::from_secs(100), f).await.ok() 759 } 760 761 #[test_log::test(tokio::test)] 762 async fn test_connect() { 763 let task_group = TaskGroup::new(); 764 765 { 766 let net = MockNetwork::new(); 767 768 let peers = [ 769 "http://127.0.0.1:1000", 770 "http://127.0.0.1:2000", 771 "http://127.0.0.1:3000", 772 ] 773 .iter() 774 .enumerate() 775 .map(|(idx, &peer)| { 776 let cfg = peer.parse().unwrap(); 777 (PeerId::from(idx as u16 + 1), cfg) 778 }) 779 .collect::<HashMap<_, _>>(); 780 781 let peers_ref = &peers; 782 let net_ref = &net; 783 let build_peers = move |bind: &'static str, id: u16, mut task_group: TaskGroup| async move { 784 let cfg = NetworkConfig { 785 identity: PeerId::from(id), 786 bind_addr: bind.parse().unwrap(), 787 peers: peers_ref.clone(), 788 }; 789 let connect = net_ref 790 .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE) 791 .into_dyn(); 792 ReconnectPeerConnectionsReliable::<u64>::new( 793 cfg, 794 DelayCalculator::TEST_DEFAULT, 795 connect, 796 &mut task_group, 797 ) 798 .await 799 }; 800 801 let (mut peers_a, peer_status_client_a) = 802 build_peers("127.0.0.1:1000", 1, task_group.clone()).await; 803 let (mut peers_b, peer_status_client_b) = 804 build_peers("127.0.0.1:2000", 2, task_group.clone()).await; 805 806 peers_a.send(&[PeerId::from(2)], 42).await.unwrap(); 807 let recv = timeout(peers_b.receive()).await.unwrap().unwrap(); 808 assert_eq!(recv.0, PeerId::from(1)); 809 assert_eq!(recv.1, 42); 810 let status = peer_status_client_a.get_all_status().await; 811 assert_eq!(status.len(), 2); 812 assert!(status.values().all(|s| s.is_ok())); 813 814 peers_a.send(&[PeerId::from(3)], 21).await.unwrap(); 815 let status = peer_status_client_b.get_all_status().await; 816 assert_eq!(status.len(), 2); 817 assert!(status.values().all(|s| s.is_ok())); 818 819 let (mut peers_c, peer_status_client_c) = 820 build_peers("127.0.0.1:3000", 3, task_group.clone()).await; 821 let recv = timeout(peers_c.receive()) 822 .await 823 .expect("time out") 824 .expect("stream closed"); 825 assert_eq!(recv.0, PeerId::from(1)); 826 assert_eq!(recv.1, 21); 827 let status = peer_status_client_c.get_all_status().await; 828 assert_eq!(status.len(), 2); 829 assert!(status.values().all(|s| s.is_ok())); 830 } 831 832 task_group.shutdown(); 833 task_group.join_all(None).await.unwrap(); 834 } 835 836 #[test] 837 fn test_delay_calculator() { 838 let c = DelayCalculator::TEST_DEFAULT; 839 for i in 1..=20 { 840 println!("{}: {:?}", i, c.reconnection_delay(i)); 841 } 842 assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis())); 843 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis())); 844 let c = DelayCalculator::PROD_DEFAULT; 845 for i in 1..=20 { 846 println!("{}: {:?}", i, c.reconnection_delay(i)); 847 } 848 assert!((10..20).contains(&c.reconnection_delay(1).as_millis())); 849 assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis())); 850 } 851 }