LibP2pPeerService.cs
1 using System.Security.Cryptography; 2 using GUNRPG.Application.Distributed; 3 using Microsoft.Extensions.Hosting; 4 using Microsoft.Extensions.Logging; 5 using Multiformats.Address; 6 using Nethermind.Libp2p.Core; 7 using Nethermind.Libp2p.Core.Discovery; 8 using Nethermind.Libp2p.Core.Dto; 9 using Nethermind.Libp2p.Protocols; 10 11 namespace GUNRPG.Infrastructure.Distributed; 12 13 /// <summary> 14 /// Hosted service that starts the libp2p peer, listens for inbound connections, 15 /// and uses mDNS to discover and connect to other GUNRPG servers on the local network. 16 /// When a peer connects, <see cref="Libp2pLockstepTransport"/> handles the session and 17 /// fires <c>OnPeerConnected</c>, which causes the <c>OperatorEventReplicator</c> to sync 18 /// operator events — making operators created on any server visible from all others. 19 /// </summary> 20 public sealed class LibP2pPeerService : IHostedService 21 { 22 private readonly Guid _nodeId; 23 private readonly IPeerFactory _peerFactory; 24 private readonly PeerStore _peerStore; 25 private readonly MDnsDiscoveryProtocol _mdns; 26 private readonly ILogger<LibP2pPeerService> _logger; 27 // Held to guarantee OperatorEventReplicator is constructed (and subscribed to 28 // OnPeerConnected) before this service's StartAsync runs and discovers peers. 29 private readonly OperatorEventReplicator _replicator; 30 31 private ILocalPeer? _localPeer; 32 private CancellationTokenSource? _cts; 33 // Keep reference to the delegate so it can be unsubscribed in StopAsync. 34 private Action<Multiaddress[]>? _onNewPeerHandler; 35 36 // Tracks libp2p peer IDs we've already started dialing to prevent duplicate outbound dials. 37 private readonly HashSet<string> _dialedPeers = new(StringComparer.Ordinal); 38 private readonly object _dialedLock = new(); 39 40 public LibP2pPeerService( 41 Guid nodeId, 42 IPeerFactory peerFactory, 43 PeerStore peerStore, 44 MDnsDiscoveryProtocol mdns, 45 ILogger<LibP2pPeerService> logger, 46 OperatorEventReplicator replicator) 47 { 48 _nodeId = nodeId; 49 _peerFactory = peerFactory; 50 _peerStore = peerStore; 51 _mdns = mdns; 52 _logger = logger; 53 _replicator = replicator; 54 } 55 56 public async Task StartAsync(CancellationToken cancellationToken) 57 { 58 _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); 59 var ct = _cts.Token; 60 61 // Derive a stable Ed25519 seed from the GUNRPG node ID via HKDF so the libp2p 62 // peer ID is consistent across restarts and tied to the persisted server_node_id. 63 var keyBytes = HKDF.DeriveKey( 64 HashAlgorithmName.SHA256, 65 ikm: _nodeId.ToByteArray(), 66 outputLength: 32, 67 salt: Array.Empty<byte>(), 68 info: "gunrpg-p2p-identity"u8.ToArray()); 69 var identity = new Nethermind.Libp2p.Core.Identity(keyBytes, KeyType.Ed25519); 70 71 _localPeer = _peerFactory.Create(identity); 72 await _localPeer.StartListenAsync([Multiaddress.Decode("/ip4/0.0.0.0/tcp/0")], ct); 73 74 _logger.LogInformation("[P2P] Listening on {Addresses}", 75 string.Join(", ", _localPeer.ListenAddresses)); 76 77 _onNewPeerHandler = addrs => OnPeerDiscovered(addrs, ct); 78 _peerStore.OnNewPeer += _onNewPeerHandler; 79 80 _ = _mdns.StartDiscoveryAsync(_localPeer.ListenAddresses.ToArray(), ct); 81 82 _logger.LogInformation("[P2P] mDNS peer discovery started"); 83 _logger.LogInformation("[P2P] Note: 'Upgrade task failed' and 'ChannelClosedException' errors from Nethermind.Libp2p during simultaneous peer connections are expected and can be safely ignored. These occur when multiple servers discover each other simultaneously and are resolved automatically via connection tie-breaking."); 84 } 85 86 public async Task StopAsync(CancellationToken cancellationToken) 87 { 88 _cts?.Cancel(); 89 90 if (_onNewPeerHandler != null) 91 _peerStore.OnNewPeer -= _onNewPeerHandler; 92 93 if (_mdns is IAsyncDisposable asyncDisposableMdns) 94 await asyncDisposableMdns.DisposeAsync().ConfigureAwait(false); 95 else if (_mdns is IDisposable disposableMdns) 96 disposableMdns.Dispose(); 97 98 if (_localPeer is IAsyncDisposable asyncDisposablePeer) 99 await asyncDisposablePeer.DisposeAsync().ConfigureAwait(false); 100 else if (_localPeer is IDisposable disposablePeer) 101 disposablePeer.Dispose(); 102 } 103 104 private void OnPeerDiscovered(Multiaddress[] addrs, CancellationToken ct) 105 { 106 // Extract the libp2p peer ID from the multiaddress (e.g. /ip4/.../p2p/<id>) 107 // to deduplicate discovery events for the same remote peer. 108 var addr = addrs.FirstOrDefault(a => a.ToString().Contains("/p2p/")); 109 if (addr == null) return; 110 111 var addrStr = addr.ToString(); 112 var p2pIndex = addrStr.LastIndexOf("/p2p/", StringComparison.Ordinal); 113 if (p2pIndex < 0) return; 114 115 var peerId = addrStr[(p2pIndex + 5)..]; 116 if (string.IsNullOrEmpty(peerId)) return; 117 118 bool isNew; 119 lock (_dialedLock) 120 { 121 isNew = _dialedPeers.Add(peerId); 122 } 123 124 if (!isNew) return; 125 126 _ = DialPeerAsync(peerId, addrs, ct); 127 } 128 129 private async Task DialPeerAsync(string peerId, Multiaddress[] addrs, CancellationToken ct) 130 { 131 try 132 { 133 var session = await _localPeer!.DialAsync(addrs, ct); 134 await session.DialAsync<Libp2pLockstepTransport>(ct); 135 } 136 catch (Exception ex) when (!ct.IsCancellationRequested) 137 { 138 // Remove from the dialed set so the next mDNS announcement can retry. 139 lock (_dialedLock) 140 { 141 _dialedPeers.Remove(peerId); 142 } 143 144 // ChannelClosedException is expected during simultaneous peer discovery when both 145 // servers dial each other at the same time. The tie-breaking logic in 146 // Libp2pLockstepTransport.HandleChannelAsync ensures one connection wins; the losing 147 // connection is rejected and triggers this exception. This is normal P2P behavior. 148 // Nethermind.Libp2p's internal logger may still emit "fail:" messages, but these 149 // can be safely ignored as long as the peer eventually connects successfully. 150 var isChannelClosed = ex.GetType().Name == "ChannelClosedException" 151 || ex.InnerException?.GetType().Name == "ChannelClosedException" 152 || (ex is AggregateException agg && 153 agg.InnerExceptions.Any(e => e.GetType().Name == "ChannelClosedException")); 154 155 if (isChannelClosed) 156 { 157 _logger.LogDebug(ex, "[P2P] Connection attempt to {PeerId} was closed (likely simultaneous dial tie-break); will retry on next mDNS announcement", peerId); 158 } 159 else 160 { 161 _logger.LogWarning(ex, "[P2P] Failed to connect to discovered peer {PeerId}; will retry on next discovery", peerId); 162 } 163 } 164 } 165 }