/ GUNRPG.Infrastructure / Distributed / Libp2pLockstepTransport.cs
Libp2pLockstepTransport.cs
  1  using System.Text.Json;
  2  using GUNRPG.Application.Distributed;
  3  using Nethermind.Libp2p.Core;
  4  
  5  namespace GUNRPG.Infrastructure.Distributed;
  6  
  7  /// <summary>
  8  /// Libp2p-based lockstep transport using the /gunrpg/lockstep/1.0.0 protocol.
  9  /// Wraps Nethermind.Libp2p for peer-to-peer communication.
 10  /// </summary>
 11  public sealed class Libp2pLockstepTransport : ILockstepTransport, ISessionProtocol, ISessionListenerProtocol
 12  {
 13      private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
 14      {
 15          PropertyNamingPolicy = JsonNamingPolicy.CamelCase
 16      };
 17  
 18      private readonly Guid _nodeId;
 19      private readonly HashSet<Guid> _connectedPeers = new();
 20      private readonly Dictionary<Guid, IChannel> _peerChannels = new();
 21      private readonly object _lock = new();
 22  
 23      public Libp2pLockstepTransport(Guid nodeId)
 24      {
 25          _nodeId = nodeId;
 26      }
 27  
 28      // IProtocol implementation
 29      public string Id => LockstepProtocol.Id;
 30  
 31      public IReadOnlySet<Guid> ConnectedPeers
 32      {
 33          get { lock (_lock) return new HashSet<Guid>(_connectedPeers); }
 34      }
 35  
 36      public event Action<ActionBroadcastMessage>? OnActionReceived;
 37      public event Action<ActionAckMessage>? OnAckReceived;
 38      public event Action<HashBroadcastMessage>? OnHashReceived;
 39      public event Action<LogSyncRequestMessage>? OnSyncRequestReceived;
 40      public event Action<LogSyncResponseMessage>? OnSyncResponseReceived;
 41      public event Action<Guid>? OnPeerConnected;
 42      public event Action<Guid>? OnPeerDisconnected;
 43      public event Action<OperatorEventBroadcastMessage>? OnOperatorEventReceived;
 44      public event Action<OperatorEventSyncRequestMessage>? OnOperatorEventSyncRequestReceived;
 45      public event Action<OperatorEventSyncResponseMessage>? OnOperatorEventSyncResponseReceived;
 46  
 47      // ISessionProtocol - called when dialing a remote peer
 48      public async Task DialAsync(IChannel channel, ISessionContext context)
 49      {
 50          await HandleChannelAsync(channel, context, isListener: false);
 51      }
 52  
 53      // ISessionListenerProtocol - called when a remote peer connects
 54      public async Task ListenAsync(IChannel channel, ISessionContext context)
 55      {
 56          await HandleChannelAsync(channel, context, isListener: true);
 57      }
 58  
 59      private async Task HandleChannelAsync(IChannel channel, ISessionContext context, bool isListener)
 60      {
 61          // Exchange node IDs as hello using line-based protocol
 62          await channel.WriteLineAsync(_nodeId.ToString());
 63          var remotePeerIdStr = await channel.ReadLineAsync();
 64          if (!Guid.TryParse(remotePeerIdStr, out var remotePeerId)) return;
 65  
 66          // Deterministic tie-break for simultaneous dials so both peers always converge on the
 67          // same underlying TCP connection without needing to replace a live channel:
 68          //   - Lower-ID peer keeps its outbound (dialer) session.
 69          //   - Higher-ID peer keeps its inbound (listener) session.
 70          // The losing half returns early before starting a read loop, so the previously
 71          // registered channel is never replaced, and no spurious OnPeerDisconnected fires.
 72          bool isWinningSession = _nodeId.CompareTo(remotePeerId) < 0
 73              ? !isListener   // lower-ID peer: keep outbound, reject inbound
 74              : isListener;   // higher-ID peer: keep inbound, reject outbound
 75  
 76          if (!isWinningSession) return;
 77  
 78          lock (_lock)
 79          {
 80              if (_connectedPeers.Contains(remotePeerId))
 81                  return; // Safety guard: both winning sessions arrived concurrently (shouldn't occur in practice)
 82  
 83              _connectedPeers.Add(remotePeerId);
 84              _peerChannels[remotePeerId] = channel;
 85          }
 86  
 87          OnPeerConnected?.Invoke(remotePeerId);
 88  
 89          try
 90          {
 91              // Read loop for incoming messages (line-delimited JSON)
 92              while (!channel.CancellationToken.IsCancellationRequested)
 93              {
 94                  var json = await channel.ReadLineAsync();
 95                  if (json == null) break;
 96                  DispatchMessage(json);
 97              }
 98          }
 99          finally
100          {
101              // Only clean up peer tracking if this specific channel is still the active one.
102              // Guard against races where a reconnect may have already updated the channel.
103              bool wasActive;
104              lock (_lock)
105              {
106                  wasActive = _peerChannels.TryGetValue(remotePeerId, out var activeChannel)
107                              && ReferenceEquals(activeChannel, channel);
108                  if (wasActive)
109                  {
110                      _connectedPeers.Remove(remotePeerId);
111                      _peerChannels.Remove(remotePeerId);
112                  }
113              }
114              if (wasActive) OnPeerDisconnected?.Invoke(remotePeerId);
115          }
116      }
117  
118      private void DispatchMessage(string json)
119      {
120          try
121          {
122              using var document = JsonDocument.Parse(json);
123              var root = document.RootElement;
124  
125              if (!root.TryGetProperty("type", out var typeProperty))
126                  return;
127  
128              var type = typeProperty.GetString();
129              if (string.IsNullOrEmpty(type))
130                  return;
131  
132              switch (type)
133              {
134                  case "action_broadcast":
135                  {
136                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<ActionBroadcastMessage>>(json, JsonOptions);
137                      if (wrapper?.Payload != null) OnActionReceived?.Invoke(wrapper.Payload);
138                      break;
139                  }
140                  case "action_ack":
141                  {
142                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<ActionAckMessage>>(json, JsonOptions);
143                      if (wrapper?.Payload != null) OnAckReceived?.Invoke(wrapper.Payload);
144                      break;
145                  }
146                  case "hash_broadcast":
147                  {
148                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<HashBroadcastMessage>>(json, JsonOptions);
149                      if (wrapper?.Payload != null) OnHashReceived?.Invoke(wrapper.Payload);
150                      break;
151                  }
152                  case "sync_request":
153                  {
154                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<LogSyncRequestMessage>>(json, JsonOptions);
155                      if (wrapper?.Payload != null) OnSyncRequestReceived?.Invoke(wrapper.Payload);
156                      break;
157                  }
158                  case "sync_response":
159                  {
160                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<LogSyncResponseMessage>>(json, JsonOptions);
161                      if (wrapper?.Payload != null) OnSyncResponseReceived?.Invoke(wrapper.Payload);
162                      break;
163                  }
164                  case "operator_event":
165                  {
166                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventBroadcastMessage>>(json, JsonOptions);
167                      if (wrapper?.Payload != null) OnOperatorEventReceived?.Invoke(wrapper.Payload);
168                      break;
169                  }
170                  case "operator_event_sync_request":
171                  {
172                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventSyncRequestMessage>>(json, JsonOptions);
173                      if (wrapper?.Payload != null) OnOperatorEventSyncRequestReceived?.Invoke(wrapper.Payload);
174                      break;
175                  }
176                  case "operator_event_sync_response":
177                  {
178                      var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventSyncResponseMessage>>(json, JsonOptions);
179                      if (wrapper?.Payload != null) OnOperatorEventSyncResponseReceived?.Invoke(wrapper.Payload);
180                      break;
181                  }
182              }
183          }
184          catch (JsonException)
185          {
186              // Malformed JSON; ignore to avoid tearing down the session loop
187          }
188      }
189  
190      public Task BroadcastActionAsync(ActionBroadcastMessage message, CancellationToken ct = default)
191          => BroadcastAsync("action_broadcast", message, ct);
192  
193      public Task SendAckAsync(Guid peerId, ActionAckMessage message, CancellationToken ct = default)
194          => SendToAsync(peerId, "action_ack", message, ct);
195  
196      public Task BroadcastHashAsync(HashBroadcastMessage message, CancellationToken ct = default)
197          => BroadcastAsync("hash_broadcast", message, ct);
198  
199      public Task SendSyncRequestAsync(Guid peerId, LogSyncRequestMessage message, CancellationToken ct = default)
200          => SendToAsync(peerId, "sync_request", message, ct);
201  
202      public Task SendSyncResponseAsync(Guid peerId, LogSyncResponseMessage message, CancellationToken ct = default)
203          => SendToAsync(peerId, "sync_response", message, ct);
204  
205      public Task BroadcastOperatorEventAsync(OperatorEventBroadcastMessage message, CancellationToken ct = default)
206          => BroadcastAsync("operator_event", message, ct);
207  
208      public Task SendOperatorEventSyncRequestAsync(Guid peerId, OperatorEventSyncRequestMessage message, CancellationToken ct = default)
209          => SendToAsync(peerId, "operator_event_sync_request", message, ct);
210  
211      public Task SendOperatorEventSyncResponseAsync(Guid peerId, OperatorEventSyncResponseMessage message, CancellationToken ct = default)
212          => SendToAsync(peerId, "operator_event_sync_response", message, ct);
213  
214      private async Task BroadcastAsync<T>(string type, T message, CancellationToken ct) where T : class
215      {
216          ct.ThrowIfCancellationRequested();
217  
218          List<IChannel> channels;
219          lock (_lock)
220          {
221              channels = _peerChannels.Values.ToList();
222          }
223  
224          var json = JsonSerializer.Serialize(new MessageWrapper<T> { Type = type, Payload = message }, JsonOptions);
225  
226          var tasks = channels.Select(channel => channel.WriteLineAsync(json).AsTask()).ToArray();
227          await Task.WhenAll(tasks);
228      }
229  
230      private async Task SendToAsync<T>(Guid peerId, string type, T message, CancellationToken ct) where T : class
231      {
232          ct.ThrowIfCancellationRequested();
233  
234          IChannel? channel;
235          lock (_lock)
236          {
237              _peerChannels.TryGetValue(peerId, out channel);
238          }
239  
240          if (channel == null) return;
241  
242          var json = JsonSerializer.Serialize(new MessageWrapper<T> { Type = type, Payload = message }, JsonOptions);
243          await channel.WriteLineAsync(json);
244      }
245  
246      private sealed class MessageWrapper<T>
247      {
248          public string Type { get; set; } = string.Empty;
249          public T? Payload { get; set; }
250      }
251  }