/ GUNRPG.Infrastructure / Distributed / LibP2pPeerService.cs
LibP2pPeerService.cs
  1  using System.Security.Cryptography;
  2  using GUNRPG.Application.Distributed;
  3  using Microsoft.Extensions.Hosting;
  4  using Microsoft.Extensions.Logging;
  5  using Multiformats.Address;
  6  using Nethermind.Libp2p.Core;
  7  using Nethermind.Libp2p.Core.Discovery;
  8  using Nethermind.Libp2p.Core.Dto;
  9  using Nethermind.Libp2p.Protocols;
 10  
 11  namespace GUNRPG.Infrastructure.Distributed;
 12  
 13  /// <summary>
 14  /// Hosted service that starts the libp2p peer, listens for inbound connections,
 15  /// and uses mDNS to discover and connect to other GUNRPG servers on the local network.
 16  /// When a peer connects, <see cref="Libp2pLockstepTransport"/> handles the session and
 17  /// fires <c>OnPeerConnected</c>, which causes the <c>OperatorEventReplicator</c> to sync
 18  /// operator events — making operators created on any server visible from all others.
 19  /// </summary>
 20  public sealed class LibP2pPeerService : IHostedService
 21  {
 22      private readonly Guid _nodeId;
 23      private readonly IPeerFactory _peerFactory;
 24      private readonly PeerStore _peerStore;
 25      private readonly MDnsDiscoveryProtocol _mdns;
 26      private readonly ILogger<LibP2pPeerService> _logger;
 27      // Held to guarantee OperatorEventReplicator is constructed (and subscribed to
 28      // OnPeerConnected) before this service's StartAsync runs and discovers peers.
 29      private readonly OperatorEventReplicator _replicator;
 30  
 31      private ILocalPeer? _localPeer;
 32      private CancellationTokenSource? _cts;
 33      // Keep reference to the delegate so it can be unsubscribed in StopAsync.
 34      private Action<Multiaddress[]>? _onNewPeerHandler;
 35  
 36      // Tracks libp2p peer IDs we've already started dialing to prevent duplicate outbound dials.
 37      private readonly HashSet<string> _dialedPeers = new(StringComparer.Ordinal);
 38      private readonly object _dialedLock = new();
 39  
 40      public LibP2pPeerService(
 41          Guid nodeId,
 42          IPeerFactory peerFactory,
 43          PeerStore peerStore,
 44          MDnsDiscoveryProtocol mdns,
 45          ILogger<LibP2pPeerService> logger,
 46          OperatorEventReplicator replicator)
 47      {
 48          _nodeId = nodeId;
 49          _peerFactory = peerFactory;
 50          _peerStore = peerStore;
 51          _mdns = mdns;
 52          _logger = logger;
 53          _replicator = replicator;
 54      }
 55  
 56      public async Task StartAsync(CancellationToken cancellationToken)
 57      {
 58          _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 59          var ct = _cts.Token;
 60  
 61          // Derive a stable Ed25519 seed from the GUNRPG node ID via HKDF so the libp2p
 62          // peer ID is consistent across restarts and tied to the persisted server_node_id.
 63          var keyBytes = HKDF.DeriveKey(
 64              HashAlgorithmName.SHA256,
 65              ikm: _nodeId.ToByteArray(),
 66              outputLength: 32,
 67              salt: Array.Empty<byte>(),
 68              info: "gunrpg-p2p-identity"u8.ToArray());
 69          var identity = new Nethermind.Libp2p.Core.Identity(keyBytes, KeyType.Ed25519);
 70  
 71          _localPeer = _peerFactory.Create(identity);
 72          await _localPeer.StartListenAsync([Multiaddress.Decode("/ip4/0.0.0.0/tcp/0")], ct);
 73  
 74          _logger.LogInformation("[P2P] Listening on {Addresses}",
 75              string.Join(", ", _localPeer.ListenAddresses));
 76  
 77          _onNewPeerHandler = addrs => OnPeerDiscovered(addrs, ct);
 78          _peerStore.OnNewPeer += _onNewPeerHandler;
 79  
 80          _ = _mdns.StartDiscoveryAsync(_localPeer.ListenAddresses.ToArray(), ct);
 81  
 82          _logger.LogInformation("[P2P] mDNS peer discovery started");
 83          _logger.LogInformation("[P2P] Note: 'Upgrade task failed' and 'ChannelClosedException' errors from Nethermind.Libp2p during simultaneous peer connections are expected and can be safely ignored. These occur when multiple servers discover each other simultaneously and are resolved automatically via connection tie-breaking.");
 84      }
 85  
 86      public async Task StopAsync(CancellationToken cancellationToken)
 87      {
 88          _cts?.Cancel();
 89  
 90          if (_onNewPeerHandler != null)
 91              _peerStore.OnNewPeer -= _onNewPeerHandler;
 92  
 93          if (_mdns is IAsyncDisposable asyncDisposableMdns)
 94              await asyncDisposableMdns.DisposeAsync().ConfigureAwait(false);
 95          else if (_mdns is IDisposable disposableMdns)
 96              disposableMdns.Dispose();
 97  
 98          if (_localPeer is IAsyncDisposable asyncDisposablePeer)
 99              await asyncDisposablePeer.DisposeAsync().ConfigureAwait(false);
100          else if (_localPeer is IDisposable disposablePeer)
101              disposablePeer.Dispose();
102      }
103  
104      private void OnPeerDiscovered(Multiaddress[] addrs, CancellationToken ct)
105      {
106          // Extract the libp2p peer ID from the multiaddress (e.g. /ip4/.../p2p/<id>)
107          // to deduplicate discovery events for the same remote peer.
108          var addr = addrs.FirstOrDefault(a => a.ToString().Contains("/p2p/"));
109          if (addr == null) return;
110  
111          var addrStr = addr.ToString();
112          var p2pIndex = addrStr.LastIndexOf("/p2p/", StringComparison.Ordinal);
113          if (p2pIndex < 0) return;
114  
115          var peerId = addrStr[(p2pIndex + 5)..];
116          if (string.IsNullOrEmpty(peerId)) return;
117  
118          bool isNew;
119          lock (_dialedLock)
120          {
121              isNew = _dialedPeers.Add(peerId);
122          }
123  
124          if (!isNew) return;
125  
126          _ = DialPeerAsync(peerId, addrs, ct);
127      }
128  
129      private async Task DialPeerAsync(string peerId, Multiaddress[] addrs, CancellationToken ct)
130      {
131          try
132          {
133              var session = await _localPeer!.DialAsync(addrs, ct);
134              await session.DialAsync<Libp2pLockstepTransport>(ct);
135          }
136          catch (Exception ex) when (!ct.IsCancellationRequested)
137          {
138              // Remove from the dialed set so the next mDNS announcement can retry.
139              lock (_dialedLock)
140              {
141                  _dialedPeers.Remove(peerId);
142              }
143  
144              // ChannelClosedException is expected during simultaneous peer discovery when both
145              // servers dial each other at the same time. The tie-breaking logic in
146              // Libp2pLockstepTransport.HandleChannelAsync ensures one connection wins; the losing
147              // connection is rejected and triggers this exception. This is normal P2P behavior.
148              // Nethermind.Libp2p's internal logger may still emit "fail:" messages, but these
149              // can be safely ignored as long as the peer eventually connects successfully.
150              var isChannelClosed = ex.GetType().Name == "ChannelClosedException"
151                                 || ex.InnerException?.GetType().Name == "ChannelClosedException"
152                                 || (ex is AggregateException agg &&
153                                     agg.InnerExceptions.Any(e => e.GetType().Name == "ChannelClosedException"));
154  
155              if (isChannelClosed)
156              {
157                  _logger.LogDebug(ex, "[P2P] Connection attempt to {PeerId} was closed (likely simultaneous dial tie-break); will retry on next mDNS announcement", peerId);
158              }
159              else
160              {
161                  _logger.LogWarning(ex, "[P2P] Failed to connect to discovered peer {PeerId}; will retry on next discovery", peerId);
162              }
163          }
164      }
165  }