Libp2pLockstepTransport.cs
1 using System.Text.Json; 2 using GUNRPG.Application.Distributed; 3 using Nethermind.Libp2p.Core; 4 5 namespace GUNRPG.Infrastructure.Distributed; 6 7 /// <summary> 8 /// Libp2p-based lockstep transport using the /gunrpg/lockstep/1.0.0 protocol. 9 /// Wraps Nethermind.Libp2p for peer-to-peer communication. 10 /// </summary> 11 public sealed class Libp2pLockstepTransport : ILockstepTransport, ISessionProtocol, ISessionListenerProtocol 12 { 13 private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web) 14 { 15 PropertyNamingPolicy = JsonNamingPolicy.CamelCase 16 }; 17 18 private readonly Guid _nodeId; 19 private readonly HashSet<Guid> _connectedPeers = new(); 20 private readonly Dictionary<Guid, IChannel> _peerChannels = new(); 21 private readonly object _lock = new(); 22 23 public Libp2pLockstepTransport(Guid nodeId) 24 { 25 _nodeId = nodeId; 26 } 27 28 // IProtocol implementation 29 public string Id => LockstepProtocol.Id; 30 31 public IReadOnlySet<Guid> ConnectedPeers 32 { 33 get { lock (_lock) return new HashSet<Guid>(_connectedPeers); } 34 } 35 36 public event Action<ActionBroadcastMessage>? OnActionReceived; 37 public event Action<ActionAckMessage>? OnAckReceived; 38 public event Action<HashBroadcastMessage>? OnHashReceived; 39 public event Action<LogSyncRequestMessage>? OnSyncRequestReceived; 40 public event Action<LogSyncResponseMessage>? OnSyncResponseReceived; 41 public event Action<Guid>? OnPeerConnected; 42 public event Action<Guid>? OnPeerDisconnected; 43 public event Action<OperatorEventBroadcastMessage>? OnOperatorEventReceived; 44 public event Action<OperatorEventSyncRequestMessage>? OnOperatorEventSyncRequestReceived; 45 public event Action<OperatorEventSyncResponseMessage>? OnOperatorEventSyncResponseReceived; 46 47 // ISessionProtocol - called when dialing a remote peer 48 public async Task DialAsync(IChannel channel, ISessionContext context) 49 { 50 await HandleChannelAsync(channel, context, isListener: false); 51 } 52 53 // ISessionListenerProtocol - called when a remote peer connects 54 public async Task ListenAsync(IChannel channel, ISessionContext context) 55 { 56 await HandleChannelAsync(channel, context, isListener: true); 57 } 58 59 private async Task HandleChannelAsync(IChannel channel, ISessionContext context, bool isListener) 60 { 61 // Exchange node IDs as hello using line-based protocol 62 await channel.WriteLineAsync(_nodeId.ToString()); 63 var remotePeerIdStr = await channel.ReadLineAsync(); 64 if (!Guid.TryParse(remotePeerIdStr, out var remotePeerId)) return; 65 66 // Deterministic tie-break for simultaneous dials so both peers always converge on the 67 // same underlying TCP connection without needing to replace a live channel: 68 // - Lower-ID peer keeps its outbound (dialer) session. 69 // - Higher-ID peer keeps its inbound (listener) session. 70 // The losing half returns early before starting a read loop, so the previously 71 // registered channel is never replaced, and no spurious OnPeerDisconnected fires. 72 bool isWinningSession = _nodeId.CompareTo(remotePeerId) < 0 73 ? !isListener // lower-ID peer: keep outbound, reject inbound 74 : isListener; // higher-ID peer: keep inbound, reject outbound 75 76 if (!isWinningSession) return; 77 78 lock (_lock) 79 { 80 if (_connectedPeers.Contains(remotePeerId)) 81 return; // Safety guard: both winning sessions arrived concurrently (shouldn't occur in practice) 82 83 _connectedPeers.Add(remotePeerId); 84 _peerChannels[remotePeerId] = channel; 85 } 86 87 OnPeerConnected?.Invoke(remotePeerId); 88 89 try 90 { 91 // Read loop for incoming messages (line-delimited JSON) 92 while (!channel.CancellationToken.IsCancellationRequested) 93 { 94 var json = await channel.ReadLineAsync(); 95 if (json == null) break; 96 DispatchMessage(json); 97 } 98 } 99 finally 100 { 101 // Only clean up peer tracking if this specific channel is still the active one. 102 // Guard against races where a reconnect may have already updated the channel. 103 bool wasActive; 104 lock (_lock) 105 { 106 wasActive = _peerChannels.TryGetValue(remotePeerId, out var activeChannel) 107 && ReferenceEquals(activeChannel, channel); 108 if (wasActive) 109 { 110 _connectedPeers.Remove(remotePeerId); 111 _peerChannels.Remove(remotePeerId); 112 } 113 } 114 if (wasActive) OnPeerDisconnected?.Invoke(remotePeerId); 115 } 116 } 117 118 private void DispatchMessage(string json) 119 { 120 try 121 { 122 using var document = JsonDocument.Parse(json); 123 var root = document.RootElement; 124 125 if (!root.TryGetProperty("type", out var typeProperty)) 126 return; 127 128 var type = typeProperty.GetString(); 129 if (string.IsNullOrEmpty(type)) 130 return; 131 132 switch (type) 133 { 134 case "action_broadcast": 135 { 136 var wrapper = JsonSerializer.Deserialize<MessageWrapper<ActionBroadcastMessage>>(json, JsonOptions); 137 if (wrapper?.Payload != null) OnActionReceived?.Invoke(wrapper.Payload); 138 break; 139 } 140 case "action_ack": 141 { 142 var wrapper = JsonSerializer.Deserialize<MessageWrapper<ActionAckMessage>>(json, JsonOptions); 143 if (wrapper?.Payload != null) OnAckReceived?.Invoke(wrapper.Payload); 144 break; 145 } 146 case "hash_broadcast": 147 { 148 var wrapper = JsonSerializer.Deserialize<MessageWrapper<HashBroadcastMessage>>(json, JsonOptions); 149 if (wrapper?.Payload != null) OnHashReceived?.Invoke(wrapper.Payload); 150 break; 151 } 152 case "sync_request": 153 { 154 var wrapper = JsonSerializer.Deserialize<MessageWrapper<LogSyncRequestMessage>>(json, JsonOptions); 155 if (wrapper?.Payload != null) OnSyncRequestReceived?.Invoke(wrapper.Payload); 156 break; 157 } 158 case "sync_response": 159 { 160 var wrapper = JsonSerializer.Deserialize<MessageWrapper<LogSyncResponseMessage>>(json, JsonOptions); 161 if (wrapper?.Payload != null) OnSyncResponseReceived?.Invoke(wrapper.Payload); 162 break; 163 } 164 case "operator_event": 165 { 166 var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventBroadcastMessage>>(json, JsonOptions); 167 if (wrapper?.Payload != null) OnOperatorEventReceived?.Invoke(wrapper.Payload); 168 break; 169 } 170 case "operator_event_sync_request": 171 { 172 var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventSyncRequestMessage>>(json, JsonOptions); 173 if (wrapper?.Payload != null) OnOperatorEventSyncRequestReceived?.Invoke(wrapper.Payload); 174 break; 175 } 176 case "operator_event_sync_response": 177 { 178 var wrapper = JsonSerializer.Deserialize<MessageWrapper<OperatorEventSyncResponseMessage>>(json, JsonOptions); 179 if (wrapper?.Payload != null) OnOperatorEventSyncResponseReceived?.Invoke(wrapper.Payload); 180 break; 181 } 182 } 183 } 184 catch (JsonException) 185 { 186 // Malformed JSON; ignore to avoid tearing down the session loop 187 } 188 } 189 190 public Task BroadcastActionAsync(ActionBroadcastMessage message, CancellationToken ct = default) 191 => BroadcastAsync("action_broadcast", message, ct); 192 193 public Task SendAckAsync(Guid peerId, ActionAckMessage message, CancellationToken ct = default) 194 => SendToAsync(peerId, "action_ack", message, ct); 195 196 public Task BroadcastHashAsync(HashBroadcastMessage message, CancellationToken ct = default) 197 => BroadcastAsync("hash_broadcast", message, ct); 198 199 public Task SendSyncRequestAsync(Guid peerId, LogSyncRequestMessage message, CancellationToken ct = default) 200 => SendToAsync(peerId, "sync_request", message, ct); 201 202 public Task SendSyncResponseAsync(Guid peerId, LogSyncResponseMessage message, CancellationToken ct = default) 203 => SendToAsync(peerId, "sync_response", message, ct); 204 205 public Task BroadcastOperatorEventAsync(OperatorEventBroadcastMessage message, CancellationToken ct = default) 206 => BroadcastAsync("operator_event", message, ct); 207 208 public Task SendOperatorEventSyncRequestAsync(Guid peerId, OperatorEventSyncRequestMessage message, CancellationToken ct = default) 209 => SendToAsync(peerId, "operator_event_sync_request", message, ct); 210 211 public Task SendOperatorEventSyncResponseAsync(Guid peerId, OperatorEventSyncResponseMessage message, CancellationToken ct = default) 212 => SendToAsync(peerId, "operator_event_sync_response", message, ct); 213 214 private async Task BroadcastAsync<T>(string type, T message, CancellationToken ct) where T : class 215 { 216 ct.ThrowIfCancellationRequested(); 217 218 List<IChannel> channels; 219 lock (_lock) 220 { 221 channels = _peerChannels.Values.ToList(); 222 } 223 224 var json = JsonSerializer.Serialize(new MessageWrapper<T> { Type = type, Payload = message }, JsonOptions); 225 226 var tasks = channels.Select(channel => channel.WriteLineAsync(json).AsTask()).ToArray(); 227 await Task.WhenAll(tasks); 228 } 229 230 private async Task SendToAsync<T>(Guid peerId, string type, T message, CancellationToken ct) where T : class 231 { 232 ct.ThrowIfCancellationRequested(); 233 234 IChannel? channel; 235 lock (_lock) 236 { 237 _peerChannels.TryGetValue(peerId, out channel); 238 } 239 240 if (channel == null) return; 241 242 var json = JsonSerializer.Serialize(new MessageWrapper<T> { Type = type, Payload = message }, JsonOptions); 243 await channel.WriteLineAsync(json); 244 } 245 246 private sealed class MessageWrapper<T> 247 { 248 public string Type { get; set; } = string.Empty; 249 public T? Payload { get; set; } 250 } 251 }