/ fedimint-server / src / net / peers_reliable.rs
peers_reliable.rs
  1  //! Implements a connection manager for communication with other federation
  2  //! members
  3  //!
  4  //! The main interface is [`fedimint_core::net::peers::IPeerConnections`] and
  5  //! its main implementation is [`ReconnectPeerConnectionsReliable`], see these
  6  //! for details.
  7  
  8  use std::collections::HashMap;
  9  use std::fmt::Debug;
 10  use std::time::Duration;
 11  
 12  use anyhow::Context;
 13  use async_trait::async_trait;
 14  use fedimint_api_client::api::PeerConnectionStatus;
 15  use fedimint_core::net::peers::IPeerConnections;
 16  use fedimint_core::task::{sleep_until, Cancellable, Cancelled, TaskGroup, TaskHandle};
 17  use fedimint_core::util::SafeUrl;
 18  use fedimint_core::PeerId;
 19  use fedimint_logging::LOG_NET_PEER;
 20  use futures::future::select_all;
 21  use futures::{SinkExt, StreamExt};
 22  use serde::de::DeserializeOwned;
 23  use serde::{Deserialize, Serialize};
 24  use tokio::sync::mpsc::{Receiver, Sender};
 25  use tokio::sync::oneshot;
 26  use tokio::time::Instant;
 27  use tracing::{debug, info, instrument, trace, warn};
 28  
 29  use crate::net::connect::{AnyConnector, SharedAnyConnector};
 30  use crate::net::framed::AnyFramedTransport;
 31  use crate::net::peers::{DelayCalculator, NetworkConfig};
 32  use crate::net::queue::{MessageId, MessageQueue, UniqueMessage};
 33  
 34  /// Every how many seconds to send an empty message to our peer if we sent no
 35  /// messages during that time. This helps with reducing the amount of messages
 36  /// that need to be re-sent in case of very one-sided communication.
 37  const PING_INTERVAL: Duration = Duration::from_secs(10);
 38  
 39  /// Owned [`Connector`](crate::net::connect::Connector) trait object used by
 40  /// [`ReconnectPeerConnectionsReliable`]
 41  pub type PeerConnector<M> = AnyConnector<PeerMessage<M>>;
 42  
 43  /// Connection manager that automatically reconnects to peers
 44  ///
 45  /// `ReconnectPeerConnections` is based on a
 46  /// [`Connector`](crate::net::connect::Connector) object which is used to open
 47  /// [`FramedTransport`](crate::net::framed::FramedTransport) connections. For
 48  /// production deployments the `Connector` has to ensure that connections are
 49  /// authenticated and encrypted.
 50  pub struct ReconnectPeerConnectionsReliable<T> {
 51      connections: HashMap<PeerId, PeerConnection<T>>,
 52  }
 53  
 54  struct PeerConnection<T> {
 55      outgoing: Sender<T>,
 56      incoming: Receiver<T>,
 57  }
 58  
 59  /// Internal message type for [`ReconnectPeerConnectionsReliable`], just public
 60  /// because it appears in the public interface.
 61  #[derive(Debug, Clone, Serialize, Deserialize)]
 62  pub struct PeerMessage<M> {
 63      msg: Option<UniqueMessage<M>>,
 64      ack: Option<MessageId>,
 65  }
 66  
 67  struct PeerConnectionStateMachine<M> {
 68      common: CommonPeerConnectionState<M>,
 69      state: PeerConnectionState<M>,
 70  }
 71  
 72  struct PeerStatusQuery {
 73      response_sender: oneshot::Sender<PeerConnectionStatus>,
 74  }
 75  
 76  type PeerStatusChannelSender = Sender<PeerStatusQuery>;
 77  type PeerStatusChannelReceiver = Receiver<PeerStatusQuery>;
 78  
 79  /// Keeps the references to a `PeerStatusChannelSender` for each `PeerId`, which
 80  /// can be used to ask the corresponding `PeerConnectionStateMachine` for the
 81  /// current `PeerConnectionStatus`
 82  #[derive(Clone)]
 83  pub struct PeerStatusChannels(HashMap<PeerId, PeerStatusChannelSender>);
 84  
 85  impl PeerStatusChannels {
 86      pub async fn get_all_status(&self) -> HashMap<PeerId, anyhow::Result<PeerConnectionStatus>> {
 87          let results = self.0.iter().map(|(peer_id, sender)| async {
 88              let (response_sender, response_receiver) = oneshot::channel();
 89              let query = PeerStatusQuery { response_sender };
 90              let sender_response = sender
 91                  .send(query)
 92                  .await
 93                  .map_err(|_| anyhow::anyhow!("channel closed while querying peer status"));
 94              match sender_response {
 95                  Ok(()) => {
 96                      let status = response_receiver
 97                          .await
 98                          .map_err(|_| anyhow::anyhow!("channel closed while receiving peer status"));
 99                      (*peer_id, status)
100                  }
101                  Err(e) => (*peer_id, Err(e)),
102              }
103          });
104          futures::future::join_all(results)
105              .await
106              .into_iter()
107              .collect()
108      }
109  }
110  
111  struct CommonPeerConnectionState<M> {
112      resend_queue: MessageQueue<M>,
113      incoming: Sender<M>,
114      outgoing: Receiver<M>,
115      our_id: PeerId,
116      peer_id: PeerId,
117      peer_address: SafeUrl,
118      delay_calculator: DelayCalculator,
119      connect: SharedAnyConnector<PeerMessage<M>>,
120      incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
121      last_received: Option<MessageId>,
122      status_query_receiver: PeerStatusChannelReceiver,
123  }
124  
125  struct DisconnectedPeerConnectionState {
126      reconnect_at: Instant,
127      failed_reconnect_counter: u64,
128  }
129  
130  struct ConnectedPeerConnectionState<M> {
131      connection: AnyFramedTransport<PeerMessage<M>>,
132      next_ping: Instant,
133  }
134  
135  enum PeerConnectionState<M> {
136      Disconnected(DisconnectedPeerConnectionState),
137      Connected(ConnectedPeerConnectionState<M>),
138  }
139  
140  impl<T: 'static> ReconnectPeerConnectionsReliable<T>
141  where
142      T: std::fmt::Debug + Clone + Serialize + DeserializeOwned + Unpin + Send + Sync,
143  {
144      /// Creates a new `ReconnectPeerConnections` connection manager from a
145      /// network config and a [`Connector`](crate::net::connect::Connector).
146      /// See [`ReconnectPeerConnectionsReliable`] for requirements on the
147      /// `Connector`.
148      #[instrument(skip_all)]
149      pub(crate) async fn new(
150          cfg: NetworkConfig,
151          delay_calculator: DelayCalculator,
152          connect: PeerConnector<T>,
153          task_group: &mut TaskGroup,
154      ) -> (Self, PeerStatusChannels) {
155          let shared_connector: SharedAnyConnector<PeerMessage<T>> = connect.into();
156          let mut connection_senders = HashMap::new();
157          let mut status_query_senders = HashMap::new();
158          let mut connections = HashMap::new();
159  
160          for (peer, peer_address) in cfg.peers.iter().filter(|(&peer, _)| peer != cfg.identity) {
161              let (connection_sender, connection_receiver) =
162                  tokio::sync::mpsc::channel::<AnyFramedTransport<PeerMessage<T>>>(4);
163              let (status_query_sender, status_query_receiver) =
164                  tokio::sync::mpsc::channel::<PeerStatusQuery>(1); // better block the sender than flood the receiver
165  
166              let connection = PeerConnection::new(
167                  cfg.identity,
168                  *peer,
169                  peer_address.clone(),
170                  delay_calculator,
171                  shared_connector.clone(),
172                  connection_receiver,
173                  status_query_receiver,
174                  task_group,
175              )
176              .await;
177  
178              connection_senders.insert(*peer, connection_sender);
179              status_query_senders.insert(*peer, status_query_sender);
180              connections.insert(*peer, connection);
181          }
182          task_group.spawn("listen task", move |handle| {
183              Self::run_listen_task(cfg, shared_connector, connection_senders, handle)
184          });
185          (
186              ReconnectPeerConnectionsReliable { connections },
187              PeerStatusChannels(status_query_senders),
188          )
189      }
190  
191      async fn run_listen_task(
192          cfg: NetworkConfig,
193          connect: SharedAnyConnector<PeerMessage<T>>,
194          mut connection_senders: HashMap<PeerId, Sender<AnyFramedTransport<PeerMessage<T>>>>,
195          task_handle: TaskHandle,
196      ) {
197          let mut listener = connect
198              .listen(cfg.bind_addr)
199              .await
200              .with_context(|| anyhow::anyhow!("Failed to listen on {}", cfg.bind_addr))
201              .expect("Could not bind port");
202  
203          let mut shutdown_rx = task_handle.make_shutdown_rx().await;
204  
205          while !task_handle.is_shutting_down() {
206              let new_connection = tokio::select! {
207                  maybe_msg = listener.next() => { maybe_msg },
208                  _ = &mut shutdown_rx => { break; },
209              };
210  
211              let (peer, connection) = match new_connection.expect("Listener closed") {
212                  Ok(connection) => connection,
213                  Err(e) => {
214                      warn!(target: LOG_NET_PEER, mint = ?cfg.identity, err = %e, "Error while opening incoming connection");
215                      continue;
216                  }
217              };
218  
219              let err = connection_senders
220                  .get_mut(&peer)
221                  .expect("Authenticating connectors should not return unknown peers")
222                  .send(connection)
223                  .await
224                  .is_err();
225  
226              if err {
227                  warn!(
228                      target: LOG_NET_PEER,
229                      ?peer,
230                      "Could not send incoming connection to peer io task (possibly banned)"
231                  );
232              }
233          }
234      }
235  }
236  
237  #[async_trait]
238  impl<T> IPeerConnections<T> for ReconnectPeerConnectionsReliable<T>
239  where
240      T: std::fmt::Debug + Serialize + DeserializeOwned + Clone + Unpin + Send + Sync + 'static,
241  {
242      #[must_use]
243      async fn send(&mut self, peers: &[PeerId], msg: T) -> Cancellable<()> {
244          for peer_id in peers {
245              trace!(target: LOG_NET_PEER, ?peer_id, "Sending message to");
246              if let Some(peer) = self.connections.get_mut(peer_id) {
247                  peer.send(msg.clone()).await?;
248              } else {
249                  trace!(target: LOG_NET_PEER,peer = ?peer_id, "Not sending message to unknown peer (maybe banned)");
250              }
251          }
252          Ok(())
253      }
254  
255      async fn receive(&mut self) -> Cancellable<(PeerId, T)> {
256          // if all peers banned (or just solo-federation), just hang here as there's
257          // never going to be any message. This avoids panic on `select_all` with
258          // no futures.
259          if self.connections.is_empty() {
260              std::future::pending().await
261          }
262  
263          // TODO: optimize, don't throw away remaining futures
264  
265          let futures_non_banned = self.connections.iter_mut().map(|(&peer, connection)| {
266              let receive_future = async move {
267                  let msg = connection.receive().await;
268                  (peer, msg)
269              };
270              Box::pin(receive_future)
271          });
272  
273          let first_response = select_all(futures_non_banned).await;
274  
275          first_response.0 .1.map(|v| (first_response.0 .0, v))
276      }
277  
278      async fn ban_peer(&mut self, peer: PeerId) {
279          self.connections.remove(&peer);
280          warn!(target: LOG_NET_PEER, "Peer {} banned.", peer);
281      }
282  }
283  
284  impl<M> PeerConnectionStateMachine<M>
285  where
286      M: Debug + Clone,
287  {
288      async fn run(mut self, task_handle: &TaskHandle) {
289          let peer = self.common.peer_id;
290  
291          // Note: `state_transition` internally uses channel operations (`send` and
292          // `recv`) which will disconnect when other tasks are shutting down
293          // returning here, so we probably don't need any `timeout` here.
294          while !task_handle.is_shutting_down() {
295              if let Some(new_self) = self.state_transition(task_handle).await {
296                  self = new_self;
297              } else {
298                  break;
299              }
300          }
301          info!(
302              target: LOG_NET_PEER,
303              ?peer,
304              "Shutting down peer connection state machine"
305          );
306      }
307  
308      async fn state_transition(self, task_handle: &TaskHandle) -> Option<Self> {
309          let PeerConnectionStateMachine { mut common, state } = self;
310  
311          match state {
312              PeerConnectionState::Disconnected(disconnected) => {
313                  common
314                      .state_transition_disconnected(disconnected, task_handle)
315                      .await
316              }
317              PeerConnectionState::Connected(connected) => {
318                  common
319                      .state_transition_connected(connected, task_handle)
320                      .await
321              }
322          }
323          .map(|new_state| PeerConnectionStateMachine {
324              common,
325              state: new_state,
326          })
327      }
328  }
329  
330  impl<M> CommonPeerConnectionState<M>
331  where
332      M: Debug + Clone,
333  {
334      async fn state_transition_connected(
335          &mut self,
336          mut connected: ConnectedPeerConnectionState<M>,
337          task_handle: &TaskHandle,
338      ) -> Option<PeerConnectionState<M>> {
339          Some(tokio::select! {
340              maybe_msg = self.outgoing.recv() => {
341                  match maybe_msg {
342                      Some(msg) => {
343                          self.send_message_connected(connected, msg).await
344                      },
345                      None => {
346                          debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
347                          return None;
348                      },
349                  }
350              },
351              new_connection_res = self.incoming_connections.recv() => {
352                  match new_connection_res {
353                      Some(new_connection) => {
354                          debug!(target: LOG_NET_PEER, "Replacing existing connection");
355                          self.connect(new_connection, 0).await
356                      },
357                      None => {
358                          debug!(
359                          target: LOG_NET_PEER,
360                              "Exiting peer connection IO task - parent disconnected");
361                          return None;
362                      },
363                  }
364              },
365              Some(status_query) = self.status_query_receiver.recv() => {
366                  if status_query.response_sender.send(PeerConnectionStatus::Connected).is_err() {
367                      let peer_id = self.peer_id;
368                      debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
369                  }
370                  PeerConnectionState::Connected(connected)
371              },
372              Some(msg_res) = connected.connection.next() => {
373                  self.receive_message(connected, msg_res).await
374              },
375              _ = sleep_until(connected.next_ping) => {
376                  self.send_ping(connected).await
377              },
378              _ = task_handle.make_shutdown_rx().await => {
379                  return None;
380              },
381          })
382      }
383  
384      async fn connect(
385          &mut self,
386          mut new_connection: AnyFramedTransport<PeerMessage<M>>,
387          disconnect_count: u64,
388      ) -> PeerConnectionState<M> {
389          debug!(target: LOG_NET_PEER,
390              our_id = ?self.our_id,
391              peer = ?self.peer_id, %disconnect_count,
392              resend_queue_len = self.resend_queue.queue.len(),
393              "Initializing new connection");
394          match self.resend_buffer_contents(&mut new_connection).await {
395              Ok(()) => PeerConnectionState::Connected(ConnectedPeerConnectionState {
396                  connection: new_connection,
397                  next_ping: Instant::now(),
398              }),
399              Err(e) => self.disconnect_err(e, disconnect_count),
400          }
401      }
402  
403      async fn resend_buffer_contents(
404          &self,
405          connection: &mut AnyFramedTransport<PeerMessage<M>>,
406      ) -> Result<(), anyhow::Error> {
407          for msg in self.resend_queue.iter().cloned() {
408              connection
409                  .send(PeerMessage {
410                      msg: Some(msg),
411                      ack: self.last_received,
412                  })
413                  .await?
414          }
415  
416          Ok(())
417      }
418  
419      fn disconnect(&self, mut disconnect_count: u64) -> PeerConnectionState<M> {
420          disconnect_count += 1;
421  
422          let reconnect_at = {
423              let delay = self.delay_calculator.reconnection_delay(disconnect_count);
424              let delay_secs = delay.as_secs_f64();
425              debug!(
426                  target: LOG_NET_PEER,
427                  %disconnect_count,
428                  our_id = ?self.our_id,
429                  peer = ?self.peer_id,
430                  delay_secs,
431                  "Scheduling reopening of connection"
432              );
433              Instant::now() + delay
434          };
435  
436          PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
437              reconnect_at,
438              failed_reconnect_counter: disconnect_count,
439          })
440      }
441  
442      fn disconnect_err(&self, err: anyhow::Error, disconnect_count: u64) -> PeerConnectionState<M> {
443          debug!(target: LOG_NET_PEER,
444              our_id = ?self.our_id,
445              peer = ?self.peer_id, %err, %disconnect_count, "Peer disconnected");
446          self.disconnect(disconnect_count)
447      }
448  
449      async fn send_message_connected(
450          &mut self,
451          connected: ConnectedPeerConnectionState<M>,
452          msg: M,
453      ) -> PeerConnectionState<M> {
454          let umsg = self.resend_queue.push(msg);
455          trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?umsg.id, "Sending outgoing message");
456  
457          self.send_message_connected_inner(connected, Some(umsg))
458              .await
459      }
460  
461      async fn send_ping(
462          &mut self,
463          connected: ConnectedPeerConnectionState<M>,
464      ) -> PeerConnectionState<M> {
465          trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Sending ping");
466          self.send_message_connected_inner(connected, None).await
467      }
468  
469      async fn send_message_connected_inner(
470          &mut self,
471          mut connected: ConnectedPeerConnectionState<M>,
472          maybe_msg: Option<UniqueMessage<M>>,
473      ) -> PeerConnectionState<M> {
474          if let Err(e) = connected
475              .connection
476              .send(PeerMessage {
477                  msg: maybe_msg,
478                  ack: self.last_received,
479              })
480              .await
481          {
482              return self.disconnect_err(e, 0);
483          }
484  
485          connected.next_ping = Instant::now() + PING_INTERVAL;
486  
487          match connected.connection.flush().await {
488              Ok(()) => PeerConnectionState::Connected(connected),
489              Err(e) => self.disconnect_err(e, 0),
490          }
491      }
492  
493      async fn receive_message(
494          &mut self,
495          connected: ConnectedPeerConnectionState<M>,
496          msg_res: Result<PeerMessage<M>, anyhow::Error>,
497      ) -> PeerConnectionState<M> {
498          match self.receive_message_inner(msg_res).await {
499              Ok(()) => PeerConnectionState::Connected(connected),
500              Err(e) => {
501                  self.last_received = None;
502                  self.disconnect_err(e, 0)
503              }
504          }
505      }
506  
507      async fn receive_message_inner(
508          &mut self,
509          msg_res: Result<PeerMessage<M>, anyhow::Error>,
510      ) -> Result<(), anyhow::Error> {
511          let PeerMessage { msg, ack } = msg_res?;
512  
513          // Process ACK no matter if we received a message or not
514          if let Some(ack) = ack {
515              trace!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, ?ack, "Received ACK for sent message");
516              self.resend_queue.ack(ack);
517          }
518  
519          if let Some(msg) = msg {
520              trace!(target: LOG_NET_PEER, peer = ?self.peer_id, id = ?msg.id, "Received incoming message");
521  
522              let expected = self
523                  .last_received
524                  .map(|last_id| last_id.increment())
525                  .unwrap_or(msg.id);
526  
527              if msg.id < expected {
528                  info!(target: LOG_NET_PEER,
529                      ?expected, received = ?msg.id, "Received old message");
530                  return Ok(());
531              }
532  
533              if msg.id > expected {
534                  warn!(target: LOG_NET_PEER, ?expected, received = ?msg.id, "Received message from the future");
535                  return Err(anyhow::anyhow!("Received message from the future"));
536              }
537  
538              self.last_received = Some(expected);
539  
540              debug_assert_eq!(expected, msg.id, "someone removed the check above");
541              if self.incoming.send(msg.msg).await.is_err() {
542                  // ignore error - if the other side is not there,
543                  // it means we're are probably shutting down
544                  debug!(
545                      target: LOG_NET_PEER,
546                      "Could not deliver message to recipient - probably shutting down"
547                  );
548              }
549          }
550  
551          Ok(())
552      }
553  
554      async fn state_transition_disconnected(
555          &mut self,
556          disconnected: DisconnectedPeerConnectionState,
557          task_handle: &TaskHandle,
558      ) -> Option<PeerConnectionState<M>> {
559          Some(tokio::select! {
560              maybe_msg = self.outgoing.recv() => {
561                  match maybe_msg {
562                      Some(msg) => {
563                          self.send_message(disconnected, msg).await}
564                      None => {
565                          debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
566                          return None;
567                      }
568                  }
569              },
570              new_connection_res = self.incoming_connections.recv() => {
571                  match new_connection_res {
572                      Some(new_connection) => {
573                          self.receive_connection(disconnected, new_connection).await
574                      },
575                      None => {
576                          debug!(target: LOG_NET_PEER, "Exiting peer connection IO task - parent disconnected");
577                          return None;
578                      },
579                  }
580              },
581              Some(status_query) = self.status_query_receiver.recv() => {
582                  if status_query.response_sender.send(PeerConnectionStatus::Disconnected).is_err() {
583                      let peer_id = self.peer_id;
584                      debug!(target: LOG_NET_PEER, %peer_id, "Could not send peer status response: receiver dropped");
585                  }
586                  PeerConnectionState::Disconnected(disconnected)
587              },
588              () = tokio::time::sleep_until(disconnected.reconnect_at), if self.our_id < self.peer_id => {
589                  // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
590                  self.reconnect(disconnected).await
591              },
592              _ = task_handle.make_shutdown_rx().await => {
593                  return None;
594              },
595          })
596      }
597  
598      async fn send_message(
599          &mut self,
600          disconnected: DisconnectedPeerConnectionState,
601          msg: M,
602      ) -> PeerConnectionState<M> {
603          let umsg = self.resend_queue.push(msg);
604          trace!(target: LOG_NET_PEER, id = ?umsg.id, "Queueing outgoing message");
605          PeerConnectionState::Disconnected(disconnected)
606      }
607  
608      async fn receive_connection(
609          &mut self,
610          disconnect: DisconnectedPeerConnectionState,
611          new_connection: AnyFramedTransport<PeerMessage<M>>,
612      ) -> PeerConnectionState<M> {
613          self.connect(new_connection, disconnect.failed_reconnect_counter)
614              .await
615      }
616  
617      async fn reconnect(
618          &mut self,
619          disconnected: DisconnectedPeerConnectionState,
620      ) -> PeerConnectionState<M> {
621          match self.try_reconnect().await {
622              Ok(conn) => {
623                  self.connect(conn, disconnected.failed_reconnect_counter)
624                      .await
625              }
626              Err(e) => self.disconnect_err(e, disconnected.failed_reconnect_counter),
627          }
628      }
629  
630      async fn try_reconnect(&self) -> Result<AnyFramedTransport<PeerMessage<M>>, anyhow::Error> {
631          debug!(target: LOG_NET_PEER, our_id = ?self.our_id, peer = ?self.peer_id, "Trying to reconnect");
632          let addr = self.peer_address.clone();
633          let (connected_peer, conn) = self.connect.connect_framed(addr, self.peer_id).await?;
634  
635          if connected_peer == self.peer_id {
636              Ok(conn)
637          } else {
638              Err(anyhow::anyhow!(
639                  "Peer identified itself incorrectly: {:?}",
640                  connected_peer
641              ))
642          }
643      }
644  }
645  
646  impl<M> PeerConnection<M>
647  where
648      M: Debug + Clone + Send + Sync + 'static,
649  {
650      #[allow(clippy::too_many_arguments)]
651      async fn new(
652          our_id: PeerId,
653          peer_id: PeerId,
654          peer_address: SafeUrl,
655          delay_calculator: DelayCalculator,
656          connect: SharedAnyConnector<PeerMessage<M>>,
657          incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
658          status_query_receiver: PeerStatusChannelReceiver,
659          task_group: &mut TaskGroup,
660      ) -> PeerConnection<M> {
661          let (outgoing_sender, outgoing_receiver) = tokio::sync::mpsc::channel::<M>(1024);
662          let (incoming_sender, incoming_receiver) = tokio::sync::mpsc::channel::<M>(1024);
663  
664          task_group.spawn(
665              format!("io-thread-peer-{peer_id}"),
666              move |handle| async move {
667                  Self::run_io_thread(
668                      incoming_sender,
669                      outgoing_receiver,
670                      our_id,
671                      peer_id,
672                      peer_address,
673                      delay_calculator,
674                      connect,
675                      incoming_connections,
676                      status_query_receiver,
677                      &handle,
678                  )
679                  .await
680              },
681          );
682  
683          PeerConnection {
684              outgoing: outgoing_sender,
685              incoming: incoming_receiver,
686          }
687      }
688  
689      async fn send(&mut self, msg: M) -> Cancellable<()> {
690          self.outgoing.send(msg).await.map_err(|_e| Cancelled)
691      }
692  
693      async fn receive(&mut self) -> Cancellable<M> {
694          self.incoming.recv().await.ok_or(Cancelled)
695      }
696  
697      #[allow(clippy::too_many_arguments)] // TODO: consider refactoring
698      #[instrument(skip_all, fields(peer))]
699      async fn run_io_thread(
700          incoming: Sender<M>,
701          outgoing: Receiver<M>,
702          our_id: PeerId,
703          peer_id: PeerId,
704          peer_address: SafeUrl,
705          delay_calculator: DelayCalculator,
706          connect: SharedAnyConnector<PeerMessage<M>>,
707          incoming_connections: Receiver<AnyFramedTransport<PeerMessage<M>>>,
708          status_query_receiver: PeerStatusChannelReceiver,
709          task_handle: &TaskHandle,
710      ) {
711          let common = CommonPeerConnectionState {
712              resend_queue: Default::default(),
713              incoming,
714              outgoing,
715              our_id,
716              peer_id,
717              peer_address,
718              delay_calculator,
719              connect,
720              incoming_connections,
721              status_query_receiver,
722              last_received: None,
723          };
724          let initial_state = PeerConnectionState::Disconnected(DisconnectedPeerConnectionState {
725              reconnect_at: Instant::now(),
726              failed_reconnect_counter: 0,
727          });
728  
729          let state_machine = PeerConnectionStateMachine {
730              common,
731              state: initial_state,
732          };
733  
734          state_machine.run(task_handle).await;
735      }
736  }
737  
738  #[cfg(test)]
739  mod tests {
740      use std::collections::HashMap;
741      use std::time::Duration;
742  
743      use fedimint_core::task::TaskGroup;
744      use fedimint_core::PeerId;
745      use futures::Future;
746  
747      use super::DelayCalculator;
748      use crate::fedimint_core::net::peers::IPeerConnections;
749      use crate::net::connect::mock::{MockNetwork, StreamReliability};
750      use crate::net::connect::Connector;
751      use crate::net::peers::NetworkConfig;
752      use crate::net::peers_reliable::ReconnectPeerConnectionsReliable;
753  
754      async fn timeout<F, T>(f: F) -> Option<T>
755      where
756          F: Future<Output = T>,
757      {
758          tokio::time::timeout(Duration::from_secs(100), f).await.ok()
759      }
760  
761      #[test_log::test(tokio::test)]
762      async fn test_connect() {
763          let task_group = TaskGroup::new();
764  
765          {
766              let net = MockNetwork::new();
767  
768              let peers = [
769                  "http://127.0.0.1:1000",
770                  "http://127.0.0.1:2000",
771                  "http://127.0.0.1:3000",
772              ]
773              .iter()
774              .enumerate()
775              .map(|(idx, &peer)| {
776                  let cfg = peer.parse().unwrap();
777                  (PeerId::from(idx as u16 + 1), cfg)
778              })
779              .collect::<HashMap<_, _>>();
780  
781              let peers_ref = &peers;
782              let net_ref = &net;
783              let build_peers = move |bind: &'static str, id: u16, mut task_group: TaskGroup| async move {
784                  let cfg = NetworkConfig {
785                      identity: PeerId::from(id),
786                      bind_addr: bind.parse().unwrap(),
787                      peers: peers_ref.clone(),
788                  };
789                  let connect = net_ref
790                      .connector(cfg.identity, StreamReliability::MILDLY_UNRELIABLE)
791                      .into_dyn();
792                  ReconnectPeerConnectionsReliable::<u64>::new(
793                      cfg,
794                      DelayCalculator::TEST_DEFAULT,
795                      connect,
796                      &mut task_group,
797                  )
798                  .await
799              };
800  
801              let (mut peers_a, peer_status_client_a) =
802                  build_peers("127.0.0.1:1000", 1, task_group.clone()).await;
803              let (mut peers_b, peer_status_client_b) =
804                  build_peers("127.0.0.1:2000", 2, task_group.clone()).await;
805  
806              peers_a.send(&[PeerId::from(2)], 42).await.unwrap();
807              let recv = timeout(peers_b.receive()).await.unwrap().unwrap();
808              assert_eq!(recv.0, PeerId::from(1));
809              assert_eq!(recv.1, 42);
810              let status = peer_status_client_a.get_all_status().await;
811              assert_eq!(status.len(), 2);
812              assert!(status.values().all(|s| s.is_ok()));
813  
814              peers_a.send(&[PeerId::from(3)], 21).await.unwrap();
815              let status = peer_status_client_b.get_all_status().await;
816              assert_eq!(status.len(), 2);
817              assert!(status.values().all(|s| s.is_ok()));
818  
819              let (mut peers_c, peer_status_client_c) =
820                  build_peers("127.0.0.1:3000", 3, task_group.clone()).await;
821              let recv = timeout(peers_c.receive())
822                  .await
823                  .expect("time out")
824                  .expect("stream closed");
825              assert_eq!(recv.0, PeerId::from(1));
826              assert_eq!(recv.1, 21);
827              let status = peer_status_client_c.get_all_status().await;
828              assert_eq!(status.len(), 2);
829              assert!(status.values().all(|s| s.is_ok()));
830          }
831  
832          task_group.shutdown();
833          task_group.join_all(None).await.unwrap();
834      }
835  
836      #[test]
837      fn test_delay_calculator() {
838          let c = DelayCalculator::TEST_DEFAULT;
839          for i in 1..=20 {
840              println!("{}: {:?}", i, c.reconnection_delay(i));
841          }
842          assert!((2000..3000).contains(&c.reconnection_delay(1).as_millis()));
843          assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
844          let c = DelayCalculator::PROD_DEFAULT;
845          for i in 1..=20 {
846              println!("{}: {:?}", i, c.reconnection_delay(i));
847          }
848          assert!((10..20).contains(&c.reconnection_delay(1).as_millis()));
849          assert!((10000..11000).contains(&c.reconnection_delay(10).as_millis()));
850      }
851  }