/ GUNRPG.Application / Distributed / DistributedAuthority.cs
DistributedAuthority.cs
  1  using System.Security.Cryptography;
  2  using System.Text;
  3  using System.Text.Json;
  4  
  5  namespace GUNRPG.Application.Distributed;
  6  
  7  /// <summary>
  8  /// Deterministic lockstep distributed authority.
  9  /// Every node runs this independently, applies actions in the same order,
 10  /// and verifies state hashes match across all peers.
 11  /// Game logic is delegated to the shared <see cref="IDeterministicGameEngine"/>;
 12  /// this class is responsible only for replication ordering, hashing, and peer consensus.
 13  /// </summary>
 14  public sealed class DistributedAuthority : IGameAuthority
 15  {
 16      private readonly ILockstepTransport _transport;
 17      private readonly IDeterministicGameEngine _engine;
 18      private readonly List<DistributedActionEntry> _actionLog = new();
 19      private readonly HashSet<Guid> _appliedActionIds = new();
 20      private readonly object _lock = new();
 21  
 22      // Pending outbound actions awaiting acknowledgment from all peers
 23      private readonly Dictionary<Guid, PendingAction> _pendingActions = new();
 24  
 25      // Buffered inbound actions waiting for their sequence turn
 26      private readonly SortedDictionary<long, ActionBroadcastMessage> _inboundBuffer = new();
 27  
 28      private GameStateDto _currentState;
 29      private long _nextSequenceNumber;
 30      private string _currentStateHash;
 31      private bool _isDesynced;
 32  
 33      private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
 34      {
 35          PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
 36          WriteIndented = false
 37      };
 38  
 39      public DistributedAuthority(Guid nodeId, ILockstepTransport transport, IDeterministicGameEngine engine)
 40      {
 41          NodeId = nodeId;
 42          _transport = transport;
 43          _engine = engine;
 44          _currentState = new GameStateDto { ActionCount = 0, Operators = new List<GameStateDto.OperatorSnapshot>() };
 45          _currentStateHash = ComputeHash(_currentState);
 46  
 47          _transport.OnActionReceived += HandleActionReceived;
 48          _transport.OnAckReceived += HandleAckReceived;
 49          _transport.OnHashReceived += HandleHashReceived;
 50          _transport.OnSyncRequestReceived += HandleSyncRequestReceived;
 51          _transport.OnSyncResponseReceived += HandleSyncResponseReceived;
 52          _transport.OnPeerConnected += HandlePeerConnected;
 53          _transport.OnPeerDisconnected += HandlePeerDisconnected;
 54      }
 55  
 56      public Guid NodeId { get; }
 57      public bool IsDesynced => _isDesynced;
 58  
 59      public async Task SubmitActionAsync(PlayerActionDto action, CancellationToken ct = default)
 60      {
 61          if (_isDesynced)
 62              throw new InvalidOperationException("Node is in desync state. Cannot submit actions.");
 63  
 64          long proposedSeq;
 65          lock (_lock)
 66          {
 67              proposedSeq = _nextSequenceNumber;
 68          }
 69  
 70          var broadcast = new ActionBroadcastMessage
 71          {
 72              SenderId = NodeId,
 73              ProposedSequenceNumber = proposedSeq,
 74              Action = action
 75          };
 76  
 77          var connectedPeers = _transport.ConnectedPeers;
 78          if (connectedPeers.Count == 0)
 79          {
 80              // Solo mode: apply immediately
 81              lock (_lock)
 82              {
 83                  ApplyActionInternal(action, NodeId);
 84              }
 85              return;
 86          }
 87  
 88          // Register pending action
 89          var pending = new PendingAction(action, proposedSeq, connectedPeers);
 90          lock (_lock)
 91          {
 92              _pendingActions[action.ActionId] = pending;
 93          }
 94  
 95          await _transport.BroadcastActionAsync(broadcast, ct);
 96  
 97          // Wait for consensus (all peers acknowledged)
 98          await pending.WaitForConsensusAsync(ct);
 99  
100          lock (_lock)
101          {
102              _pendingActions.Remove(action.ActionId);
103              ApplyActionInternal(action, NodeId);
104          }
105  
106          // Broadcast resulting hash
107          string hash;
108          long seq;
109          lock (_lock)
110          {
111              hash = _currentStateHash;
112              seq = _nextSequenceNumber - 1;
113          }
114  
115          await _transport.BroadcastHashAsync(new HashBroadcastMessage
116          {
117              SenderId = NodeId,
118              SequenceNumber = seq,
119              StateHash = hash
120          }, ct);
121      }
122  
123      public GameStateDto GetCurrentState()
124      {
125          lock (_lock)
126          {
127              return _currentState;
128          }
129      }
130  
131      public string GetCurrentStateHash()
132      {
133          lock (_lock)
134          {
135              return _currentStateHash;
136          }
137      }
138  
139      public IReadOnlyList<DistributedActionEntry> GetActionLog()
140      {
141          lock (_lock)
142          {
143              return _actionLog.ToList().AsReadOnly();
144          }
145      }
146  
147      // --- Internal action application ---
148  
149      private void ApplyActionInternal(PlayerActionDto action, Guid originNodeId)
150      {
151          _currentState = _engine.Step(_currentState, action);
152          var hash = ComputeHash(_currentState);
153          _currentStateHash = hash;
154  
155          var seq = _nextSequenceNumber++;
156          _appliedActionIds.Add(action.ActionId);
157          _actionLog.Add(new DistributedActionEntry
158          {
159              SequenceNumber = seq,
160              NodeId = originNodeId,
161              Action = action,
162              StateHashAfterApply = hash
163          });
164      }
165  
166      private static string ComputeHash(GameStateDto state)
167      {
168          var json = JsonSerializer.Serialize(state, SerializerOptions);
169          var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(json));
170          return Convert.ToHexString(bytes);
171      }
172  
173      /// <summary>
174      /// Drains the inbound buffer, applying any actions whose sequence number
175      /// matches _nextSequenceNumber in order.
176      /// </summary>
177      private void DrainInboundBuffer()
178      {
179          while (_inboundBuffer.Count > 0)
180          {
181              var first = _inboundBuffer.First();
182              if (first.Key != _nextSequenceNumber) break;
183  
184              _inboundBuffer.Remove(first.Key);
185              var msg = first.Value;
186  
187              if (!_appliedActionIds.Contains(msg.Action.ActionId))
188              {
189                  ApplyActionInternal(msg.Action, msg.SenderId);
190              }
191          }
192      }
193  
194      // --- Peer message handlers ---
195  
196      private void HandleActionReceived(ActionBroadcastMessage msg)
197      {
198          if (_isDesynced) return;
199  
200          lock (_lock)
201          {
202              // Buffer the action at its proposed sequence position
203              if (!_inboundBuffer.ContainsKey(msg.ProposedSequenceNumber) &&
204                  !_appliedActionIds.Contains(msg.Action.ActionId))
205              {
206                  _inboundBuffer[msg.ProposedSequenceNumber] = msg;
207              }
208  
209              // Apply any buffered actions that are next in sequence
210              DrainInboundBuffer();
211          }
212  
213          // Send acknowledgment back to the sender
214          _ = _transport.SendAckAsync(msg.SenderId, new ActionAckMessage
215          {
216              SenderId = NodeId,
217              AckedActionId = msg.Action.ActionId,
218              SequenceNumber = msg.ProposedSequenceNumber
219          });
220      }
221  
222      private void HandleAckReceived(ActionAckMessage msg)
223      {
224          PendingAction? pending;
225          lock (_lock)
226          {
227              _pendingActions.TryGetValue(msg.AckedActionId, out pending);
228          }
229  
230          pending?.Acknowledge(msg.SenderId);
231      }
232  
233      private void HandleHashReceived(HashBroadcastMessage msg)
234      {
235          lock (_lock)
236          {
237              if (msg.SequenceNumber < _actionLog.Count)
238              {
239                  var localEntry = _actionLog[(int)msg.SequenceNumber];
240                  if (localEntry.StateHashAfterApply != msg.StateHash)
241                  {
242                      _isDesynced = true;
243                  }
244              }
245          }
246      }
247  
248      private void HandleSyncRequestReceived(LogSyncRequestMessage msg)
249      {
250          List<DistributedActionEntry> entries;
251          bool fullReplay;
252  
253          lock (_lock)
254          {
255              if (msg.FromSequenceNumber == 0 ||
256                  (msg.FromSequenceNumber <= _actionLog.Count &&
257                   msg.FromSequenceNumber > 0 &&
258                   _actionLog[(int)msg.FromSequenceNumber - 1].StateHashAfterApply != msg.LatestHash))
259              {
260                  // Hash mismatch or genesis request: send full log
261                  entries = _actionLog.ToList();
262                  fullReplay = true;
263              }
264              else
265              {
266                  // Send only missing entries
267                  entries = _actionLog.Skip((int)msg.FromSequenceNumber).ToList();
268                  fullReplay = false;
269              }
270          }
271  
272          _ = _transport.SendSyncResponseAsync(msg.SenderId, new LogSyncResponseMessage
273          {
274              SenderId = NodeId,
275              Entries = entries,
276              FullReplay = fullReplay
277          });
278      }
279  
280      private void HandleSyncResponseReceived(LogSyncResponseMessage msg)
281      {
282          lock (_lock)
283          {
284              if (msg.FullReplay)
285              {
286                  // Full replay from genesis
287                  _actionLog.Clear();
288                  _appliedActionIds.Clear();
289                  _currentState = new GameStateDto { ActionCount = 0, Operators = new List<GameStateDto.OperatorSnapshot>() };
290                  _nextSequenceNumber = 0;
291                  _currentStateHash = ComputeHash(_currentState);
292              }
293  
294              foreach (var entry in msg.Entries)
295              {
296                  if (entry.SequenceNumber >= _nextSequenceNumber)
297                  {
298                      ApplyActionInternal(entry.Action, entry.NodeId);
299  
300                      // Verify hash after apply
301                      if (_currentStateHash != entry.StateHashAfterApply)
302                      {
303                          _isDesynced = true;
304                          return;
305                      }
306                  }
307              }
308  
309              _isDesynced = false;
310          }
311      }
312  
313      private void HandlePeerConnected(Guid peerId)
314      {
315          // When a new peer connects, initiate sync
316          string hash;
317          long logLength;
318  
319          lock (_lock)
320          {
321              hash = _currentStateHash;
322              logLength = _actionLog.Count;
323          }
324  
325          _ = _transport.SendSyncRequestAsync(peerId, new LogSyncRequestMessage
326          {
327              SenderId = NodeId,
328              FromSequenceNumber = logLength,
329              LatestHash = hash
330          });
331      }
332  
333      private void HandlePeerDisconnected(Guid peerId)
334      {
335          // When a peer disconnects, mark it as acknowledged on all pending
336          // outbound actions so SubmitActionAsync does not hang indefinitely.
337          lock (_lock)
338          {
339              foreach (var pending in _pendingActions.Values)
340              {
341                  pending.Acknowledge(peerId);
342              }
343          }
344      }
345  
346      // --- Pending action tracking ---
347  
348      private sealed class PendingAction
349      {
350          private readonly HashSet<Guid> _requiredPeers;
351          private readonly HashSet<Guid> _acknowledgedPeers = new();
352          private readonly TaskCompletionSource _consensusTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
353  
354          public PlayerActionDto Action { get; }
355          public long ProposedSequenceNumber { get; }
356  
357          public PendingAction(PlayerActionDto action, long proposedSeq, IReadOnlySet<Guid> connectedPeers)
358          {
359              Action = action;
360              ProposedSequenceNumber = proposedSeq;
361              _requiredPeers = new HashSet<Guid>(connectedPeers);
362          }
363  
364          public bool IsConsensusReached
365          {
366              get
367              {
368                  lock (_acknowledgedPeers)
369                  {
370                      return _requiredPeers.IsSubsetOf(_acknowledgedPeers);
371                  }
372              }
373          }
374  
375          public void Acknowledge(Guid peerId)
376          {
377              lock (_acknowledgedPeers)
378              {
379                  _acknowledgedPeers.Add(peerId);
380                  if (_requiredPeers.IsSubsetOf(_acknowledgedPeers))
381                  {
382                      _consensusTcs.TrySetResult();
383                  }
384              }
385          }
386  
387          public async Task WaitForConsensusAsync(CancellationToken ct)
388          {
389              using (ct.Register(() => _consensusTcs.TrySetCanceled(ct)))
390              {
391                  await _consensusTcs.Task.ConfigureAwait(false);
392              }
393          }
394      }
395  }