InMemoryLockstepTransport.cs
1 using System.Text.Json; 2 using GUNRPG.Application.Distributed; 3 4 namespace GUNRPG.Infrastructure.Distributed; 5 6 /// <summary> 7 /// In-memory lockstep transport for testing and single-process multi-node scenarios. 8 /// Connects multiple DistributedAuthority instances in the same process. 9 /// </summary> 10 public sealed class InMemoryLockstepTransport : ILockstepTransport 11 { 12 private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); 13 14 private readonly Guid _nodeId; 15 private readonly HashSet<Guid> _connectedPeers = new(); 16 private readonly Dictionary<Guid, InMemoryLockstepTransport> _peerTransports = new(); 17 18 public InMemoryLockstepTransport(Guid nodeId) 19 { 20 _nodeId = nodeId; 21 } 22 23 public IReadOnlySet<Guid> ConnectedPeers => new HashSet<Guid>(_connectedPeers); 24 25 public event Action<ActionBroadcastMessage>? OnActionReceived; 26 public event Action<ActionAckMessage>? OnAckReceived; 27 public event Action<HashBroadcastMessage>? OnHashReceived; 28 public event Action<LogSyncRequestMessage>? OnSyncRequestReceived; 29 public event Action<LogSyncResponseMessage>? OnSyncResponseReceived; 30 public event Action<Guid>? OnPeerConnected; 31 public event Action<Guid>? OnPeerDisconnected; 32 public event Action<OperatorEventBroadcastMessage>? OnOperatorEventReceived; 33 public event Action<OperatorEventSyncRequestMessage>? OnOperatorEventSyncRequestReceived; 34 public event Action<OperatorEventSyncResponseMessage>? OnOperatorEventSyncResponseReceived; 35 36 /// <summary> 37 /// Connect this transport to another transport instance, simulating a P2P link. 38 /// </summary> 39 public void ConnectTo(InMemoryLockstepTransport other) 40 { 41 if (_connectedPeers.Contains(other._nodeId)) return; 42 43 _connectedPeers.Add(other._nodeId); 44 _peerTransports[other._nodeId] = other; 45 46 other._connectedPeers.Add(_nodeId); 47 other._peerTransports[_nodeId] = this; 48 49 OnPeerConnected?.Invoke(other._nodeId); 50 other.OnPeerConnected?.Invoke(_nodeId); 51 } 52 53 /// <summary> 54 /// Disconnect this transport from another transport instance. 55 /// </summary> 56 public void DisconnectFrom(InMemoryLockstepTransport other) 57 { 58 if (!_connectedPeers.Contains(other._nodeId)) return; 59 60 _connectedPeers.Remove(other._nodeId); 61 _peerTransports.Remove(other._nodeId); 62 63 other._connectedPeers.Remove(_nodeId); 64 other._peerTransports.Remove(_nodeId); 65 66 OnPeerDisconnected?.Invoke(other._nodeId); 67 other.OnPeerDisconnected?.Invoke(_nodeId); 68 } 69 70 public Task BroadcastActionAsync(ActionBroadcastMessage message, CancellationToken ct = default) 71 { 72 foreach (var peer in _peerTransports.Values.ToList()) 73 { 74 var clone = Clone<ActionBroadcastMessage>(message); 75 peer.OnActionReceived?.Invoke(clone); 76 } 77 return Task.CompletedTask; 78 } 79 80 public Task SendAckAsync(Guid peerId, ActionAckMessage message, CancellationToken ct = default) 81 { 82 if (_peerTransports.TryGetValue(peerId, out var peer)) 83 { 84 var clone = Clone<ActionAckMessage>(message); 85 peer.OnAckReceived?.Invoke(clone); 86 } 87 return Task.CompletedTask; 88 } 89 90 public Task BroadcastHashAsync(HashBroadcastMessage message, CancellationToken ct = default) 91 { 92 foreach (var peer in _peerTransports.Values.ToList()) 93 { 94 var clone = Clone<HashBroadcastMessage>(message); 95 peer.OnHashReceived?.Invoke(clone); 96 } 97 return Task.CompletedTask; 98 } 99 100 public Task SendSyncRequestAsync(Guid peerId, LogSyncRequestMessage message, CancellationToken ct = default) 101 { 102 if (_peerTransports.TryGetValue(peerId, out var peer)) 103 { 104 var clone = Clone<LogSyncRequestMessage>(message); 105 peer.OnSyncRequestReceived?.Invoke(clone); 106 } 107 return Task.CompletedTask; 108 } 109 110 public Task SendSyncResponseAsync(Guid peerId, LogSyncResponseMessage message, CancellationToken ct = default) 111 { 112 if (_peerTransports.TryGetValue(peerId, out var peer)) 113 { 114 var clone = Clone<LogSyncResponseMessage>(message); 115 peer.OnSyncResponseReceived?.Invoke(clone); 116 } 117 return Task.CompletedTask; 118 } 119 120 /// <summary>Deep-clone a message via JSON round-trip to simulate network serialization.</summary> 121 private static T Clone<T>(T obj) where T : class 122 { 123 var json = JsonSerializer.Serialize(obj, JsonOptions); 124 return JsonSerializer.Deserialize<T>(json, JsonOptions)!; 125 } 126 127 public Task BroadcastOperatorEventAsync(OperatorEventBroadcastMessage message, CancellationToken ct = default) 128 { 129 foreach (var peer in _peerTransports.Values.ToList()) 130 { 131 var clone = Clone<OperatorEventBroadcastMessage>(message); 132 peer.OnOperatorEventReceived?.Invoke(clone); 133 } 134 return Task.CompletedTask; 135 } 136 137 public Task SendOperatorEventSyncRequestAsync(Guid peerId, OperatorEventSyncRequestMessage message, CancellationToken ct = default) 138 { 139 if (_peerTransports.TryGetValue(peerId, out var peer)) 140 { 141 var clone = Clone<OperatorEventSyncRequestMessage>(message); 142 peer.OnOperatorEventSyncRequestReceived?.Invoke(clone); 143 } 144 return Task.CompletedTask; 145 } 146 147 public Task SendOperatorEventSyncResponseAsync(Guid peerId, OperatorEventSyncResponseMessage message, CancellationToken ct = default) 148 { 149 if (_peerTransports.TryGetValue(peerId, out var peer)) 150 { 151 var clone = Clone<OperatorEventSyncResponseMessage>(message); 152 peer.OnOperatorEventSyncResponseReceived?.Invoke(clone); 153 } 154 return Task.CompletedTask; 155 } 156 157 /// <summary> 158 /// Simulate receiving a hash broadcast message (for testing desync scenarios). 159 /// </summary> 160 public void SimulateIncomingHash(HashBroadcastMessage message) 161 { 162 OnHashReceived?.Invoke(message); 163 } 164 }