/ GUNRPG.Infrastructure / Distributed / InMemoryLockstepTransport.cs
InMemoryLockstepTransport.cs
  1  using System.Text.Json;
  2  using GUNRPG.Application.Distributed;
  3  
  4  namespace GUNRPG.Infrastructure.Distributed;
  5  
  6  /// <summary>
  7  /// In-memory lockstep transport for testing and single-process multi-node scenarios.
  8  /// Connects multiple DistributedAuthority instances in the same process.
  9  /// </summary>
 10  public sealed class InMemoryLockstepTransport : ILockstepTransport
 11  {
 12      private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
 13  
 14      private readonly Guid _nodeId;
 15      private readonly HashSet<Guid> _connectedPeers = new();
 16      private readonly Dictionary<Guid, InMemoryLockstepTransport> _peerTransports = new();
 17  
 18      public InMemoryLockstepTransport(Guid nodeId)
 19      {
 20          _nodeId = nodeId;
 21      }
 22  
 23      public IReadOnlySet<Guid> ConnectedPeers => new HashSet<Guid>(_connectedPeers);
 24  
 25      public event Action<ActionBroadcastMessage>? OnActionReceived;
 26      public event Action<ActionAckMessage>? OnAckReceived;
 27      public event Action<HashBroadcastMessage>? OnHashReceived;
 28      public event Action<LogSyncRequestMessage>? OnSyncRequestReceived;
 29      public event Action<LogSyncResponseMessage>? OnSyncResponseReceived;
 30      public event Action<Guid>? OnPeerConnected;
 31      public event Action<Guid>? OnPeerDisconnected;
 32      public event Action<OperatorEventBroadcastMessage>? OnOperatorEventReceived;
 33      public event Action<OperatorEventSyncRequestMessage>? OnOperatorEventSyncRequestReceived;
 34      public event Action<OperatorEventSyncResponseMessage>? OnOperatorEventSyncResponseReceived;
 35  
 36      /// <summary>
 37      /// Connect this transport to another transport instance, simulating a P2P link.
 38      /// </summary>
 39      public void ConnectTo(InMemoryLockstepTransport other)
 40      {
 41          if (_connectedPeers.Contains(other._nodeId)) return;
 42  
 43          _connectedPeers.Add(other._nodeId);
 44          _peerTransports[other._nodeId] = other;
 45  
 46          other._connectedPeers.Add(_nodeId);
 47          other._peerTransports[_nodeId] = this;
 48  
 49          OnPeerConnected?.Invoke(other._nodeId);
 50          other.OnPeerConnected?.Invoke(_nodeId);
 51      }
 52  
 53      /// <summary>
 54      /// Disconnect this transport from another transport instance.
 55      /// </summary>
 56      public void DisconnectFrom(InMemoryLockstepTransport other)
 57      {
 58          if (!_connectedPeers.Contains(other._nodeId)) return;
 59  
 60          _connectedPeers.Remove(other._nodeId);
 61          _peerTransports.Remove(other._nodeId);
 62  
 63          other._connectedPeers.Remove(_nodeId);
 64          other._peerTransports.Remove(_nodeId);
 65  
 66          OnPeerDisconnected?.Invoke(other._nodeId);
 67          other.OnPeerDisconnected?.Invoke(_nodeId);
 68      }
 69  
 70      public Task BroadcastActionAsync(ActionBroadcastMessage message, CancellationToken ct = default)
 71      {
 72          foreach (var peer in _peerTransports.Values.ToList())
 73          {
 74              var clone = Clone<ActionBroadcastMessage>(message);
 75              peer.OnActionReceived?.Invoke(clone);
 76          }
 77          return Task.CompletedTask;
 78      }
 79  
 80      public Task SendAckAsync(Guid peerId, ActionAckMessage message, CancellationToken ct = default)
 81      {
 82          if (_peerTransports.TryGetValue(peerId, out var peer))
 83          {
 84              var clone = Clone<ActionAckMessage>(message);
 85              peer.OnAckReceived?.Invoke(clone);
 86          }
 87          return Task.CompletedTask;
 88      }
 89  
 90      public Task BroadcastHashAsync(HashBroadcastMessage message, CancellationToken ct = default)
 91      {
 92          foreach (var peer in _peerTransports.Values.ToList())
 93          {
 94              var clone = Clone<HashBroadcastMessage>(message);
 95              peer.OnHashReceived?.Invoke(clone);
 96          }
 97          return Task.CompletedTask;
 98      }
 99  
100      public Task SendSyncRequestAsync(Guid peerId, LogSyncRequestMessage message, CancellationToken ct = default)
101      {
102          if (_peerTransports.TryGetValue(peerId, out var peer))
103          {
104              var clone = Clone<LogSyncRequestMessage>(message);
105              peer.OnSyncRequestReceived?.Invoke(clone);
106          }
107          return Task.CompletedTask;
108      }
109  
110      public Task SendSyncResponseAsync(Guid peerId, LogSyncResponseMessage message, CancellationToken ct = default)
111      {
112          if (_peerTransports.TryGetValue(peerId, out var peer))
113          {
114              var clone = Clone<LogSyncResponseMessage>(message);
115              peer.OnSyncResponseReceived?.Invoke(clone);
116          }
117          return Task.CompletedTask;
118      }
119  
120      /// <summary>Deep-clone a message via JSON round-trip to simulate network serialization.</summary>
121      private static T Clone<T>(T obj) where T : class
122      {
123          var json = JsonSerializer.Serialize(obj, JsonOptions);
124          return JsonSerializer.Deserialize<T>(json, JsonOptions)!;
125      }
126  
127      public Task BroadcastOperatorEventAsync(OperatorEventBroadcastMessage message, CancellationToken ct = default)
128      {
129          foreach (var peer in _peerTransports.Values.ToList())
130          {
131              var clone = Clone<OperatorEventBroadcastMessage>(message);
132              peer.OnOperatorEventReceived?.Invoke(clone);
133          }
134          return Task.CompletedTask;
135      }
136  
137      public Task SendOperatorEventSyncRequestAsync(Guid peerId, OperatorEventSyncRequestMessage message, CancellationToken ct = default)
138      {
139          if (_peerTransports.TryGetValue(peerId, out var peer))
140          {
141              var clone = Clone<OperatorEventSyncRequestMessage>(message);
142              peer.OnOperatorEventSyncRequestReceived?.Invoke(clone);
143          }
144          return Task.CompletedTask;
145      }
146  
147      public Task SendOperatorEventSyncResponseAsync(Guid peerId, OperatorEventSyncResponseMessage message, CancellationToken ct = default)
148      {
149          if (_peerTransports.TryGetValue(peerId, out var peer))
150          {
151              var clone = Clone<OperatorEventSyncResponseMessage>(message);
152              peer.OnOperatorEventSyncResponseReceived?.Invoke(clone);
153          }
154          return Task.CompletedTask;
155      }
156  
157      /// <summary>
158      /// Simulate receiving a hash broadcast message (for testing desync scenarios).
159      /// </summary>
160      public void SimulateIncomingHash(HashBroadcastMessage message)
161      {
162          OnHashReceived?.Invoke(message);
163      }
164  }