DistributedAuthority.cs
1 using System.Security.Cryptography; 2 using System.Text; 3 using System.Text.Json; 4 5 namespace GUNRPG.Application.Distributed; 6 7 /// <summary> 8 /// Deterministic lockstep distributed authority. 9 /// Every node runs this independently, applies actions in the same order, 10 /// and verifies state hashes match across all peers. 11 /// Game logic is delegated to the shared <see cref="IDeterministicGameEngine"/>; 12 /// this class is responsible only for replication ordering, hashing, and peer consensus. 13 /// </summary> 14 public sealed class DistributedAuthority : IGameAuthority 15 { 16 private readonly ILockstepTransport _transport; 17 private readonly IDeterministicGameEngine _engine; 18 private readonly List<DistributedActionEntry> _actionLog = new(); 19 private readonly HashSet<Guid> _appliedActionIds = new(); 20 private readonly object _lock = new(); 21 22 // Pending outbound actions awaiting acknowledgment from all peers 23 private readonly Dictionary<Guid, PendingAction> _pendingActions = new(); 24 25 // Buffered inbound actions waiting for their sequence turn 26 private readonly SortedDictionary<long, ActionBroadcastMessage> _inboundBuffer = new(); 27 28 private GameStateDto _currentState; 29 private long _nextSequenceNumber; 30 private string _currentStateHash; 31 private bool _isDesynced; 32 33 private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web) 34 { 35 PropertyNamingPolicy = JsonNamingPolicy.CamelCase, 36 WriteIndented = false 37 }; 38 39 public DistributedAuthority(Guid nodeId, ILockstepTransport transport, IDeterministicGameEngine engine) 40 { 41 NodeId = nodeId; 42 _transport = transport; 43 _engine = engine; 44 _currentState = new GameStateDto { ActionCount = 0, Operators = new List<GameStateDto.OperatorSnapshot>() }; 45 _currentStateHash = ComputeHash(_currentState); 46 47 _transport.OnActionReceived += HandleActionReceived; 48 _transport.OnAckReceived += HandleAckReceived; 49 _transport.OnHashReceived += HandleHashReceived; 50 _transport.OnSyncRequestReceived += HandleSyncRequestReceived; 51 _transport.OnSyncResponseReceived += HandleSyncResponseReceived; 52 _transport.OnPeerConnected += HandlePeerConnected; 53 _transport.OnPeerDisconnected += HandlePeerDisconnected; 54 } 55 56 public Guid NodeId { get; } 57 public bool IsDesynced => _isDesynced; 58 59 public async Task SubmitActionAsync(PlayerActionDto action, CancellationToken ct = default) 60 { 61 if (_isDesynced) 62 throw new InvalidOperationException("Node is in desync state. Cannot submit actions."); 63 64 long proposedSeq; 65 lock (_lock) 66 { 67 proposedSeq = _nextSequenceNumber; 68 } 69 70 var broadcast = new ActionBroadcastMessage 71 { 72 SenderId = NodeId, 73 ProposedSequenceNumber = proposedSeq, 74 Action = action 75 }; 76 77 var connectedPeers = _transport.ConnectedPeers; 78 if (connectedPeers.Count == 0) 79 { 80 // Solo mode: apply immediately 81 lock (_lock) 82 { 83 ApplyActionInternal(action, NodeId); 84 } 85 return; 86 } 87 88 // Register pending action 89 var pending = new PendingAction(action, proposedSeq, connectedPeers); 90 lock (_lock) 91 { 92 _pendingActions[action.ActionId] = pending; 93 } 94 95 await _transport.BroadcastActionAsync(broadcast, ct); 96 97 // Wait for consensus (all peers acknowledged) 98 await pending.WaitForConsensusAsync(ct); 99 100 lock (_lock) 101 { 102 _pendingActions.Remove(action.ActionId); 103 ApplyActionInternal(action, NodeId); 104 } 105 106 // Broadcast resulting hash 107 string hash; 108 long seq; 109 lock (_lock) 110 { 111 hash = _currentStateHash; 112 seq = _nextSequenceNumber - 1; 113 } 114 115 await _transport.BroadcastHashAsync(new HashBroadcastMessage 116 { 117 SenderId = NodeId, 118 SequenceNumber = seq, 119 StateHash = hash 120 }, ct); 121 } 122 123 public GameStateDto GetCurrentState() 124 { 125 lock (_lock) 126 { 127 return _currentState; 128 } 129 } 130 131 public string GetCurrentStateHash() 132 { 133 lock (_lock) 134 { 135 return _currentStateHash; 136 } 137 } 138 139 public IReadOnlyList<DistributedActionEntry> GetActionLog() 140 { 141 lock (_lock) 142 { 143 return _actionLog.ToList().AsReadOnly(); 144 } 145 } 146 147 // --- Internal action application --- 148 149 private void ApplyActionInternal(PlayerActionDto action, Guid originNodeId) 150 { 151 _currentState = _engine.Step(_currentState, action); 152 var hash = ComputeHash(_currentState); 153 _currentStateHash = hash; 154 155 var seq = _nextSequenceNumber++; 156 _appliedActionIds.Add(action.ActionId); 157 _actionLog.Add(new DistributedActionEntry 158 { 159 SequenceNumber = seq, 160 NodeId = originNodeId, 161 Action = action, 162 StateHashAfterApply = hash 163 }); 164 } 165 166 private static string ComputeHash(GameStateDto state) 167 { 168 var json = JsonSerializer.Serialize(state, SerializerOptions); 169 var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(json)); 170 return Convert.ToHexString(bytes); 171 } 172 173 /// <summary> 174 /// Drains the inbound buffer, applying any actions whose sequence number 175 /// matches _nextSequenceNumber in order. 176 /// </summary> 177 private void DrainInboundBuffer() 178 { 179 while (_inboundBuffer.Count > 0) 180 { 181 var first = _inboundBuffer.First(); 182 if (first.Key != _nextSequenceNumber) break; 183 184 _inboundBuffer.Remove(first.Key); 185 var msg = first.Value; 186 187 if (!_appliedActionIds.Contains(msg.Action.ActionId)) 188 { 189 ApplyActionInternal(msg.Action, msg.SenderId); 190 } 191 } 192 } 193 194 // --- Peer message handlers --- 195 196 private void HandleActionReceived(ActionBroadcastMessage msg) 197 { 198 if (_isDesynced) return; 199 200 lock (_lock) 201 { 202 // Buffer the action at its proposed sequence position 203 if (!_inboundBuffer.ContainsKey(msg.ProposedSequenceNumber) && 204 !_appliedActionIds.Contains(msg.Action.ActionId)) 205 { 206 _inboundBuffer[msg.ProposedSequenceNumber] = msg; 207 } 208 209 // Apply any buffered actions that are next in sequence 210 DrainInboundBuffer(); 211 } 212 213 // Send acknowledgment back to the sender 214 _ = _transport.SendAckAsync(msg.SenderId, new ActionAckMessage 215 { 216 SenderId = NodeId, 217 AckedActionId = msg.Action.ActionId, 218 SequenceNumber = msg.ProposedSequenceNumber 219 }); 220 } 221 222 private void HandleAckReceived(ActionAckMessage msg) 223 { 224 PendingAction? pending; 225 lock (_lock) 226 { 227 _pendingActions.TryGetValue(msg.AckedActionId, out pending); 228 } 229 230 pending?.Acknowledge(msg.SenderId); 231 } 232 233 private void HandleHashReceived(HashBroadcastMessage msg) 234 { 235 lock (_lock) 236 { 237 if (msg.SequenceNumber < _actionLog.Count) 238 { 239 var localEntry = _actionLog[(int)msg.SequenceNumber]; 240 if (localEntry.StateHashAfterApply != msg.StateHash) 241 { 242 _isDesynced = true; 243 } 244 } 245 } 246 } 247 248 private void HandleSyncRequestReceived(LogSyncRequestMessage msg) 249 { 250 List<DistributedActionEntry> entries; 251 bool fullReplay; 252 253 lock (_lock) 254 { 255 if (msg.FromSequenceNumber == 0 || 256 (msg.FromSequenceNumber <= _actionLog.Count && 257 msg.FromSequenceNumber > 0 && 258 _actionLog[(int)msg.FromSequenceNumber - 1].StateHashAfterApply != msg.LatestHash)) 259 { 260 // Hash mismatch or genesis request: send full log 261 entries = _actionLog.ToList(); 262 fullReplay = true; 263 } 264 else 265 { 266 // Send only missing entries 267 entries = _actionLog.Skip((int)msg.FromSequenceNumber).ToList(); 268 fullReplay = false; 269 } 270 } 271 272 _ = _transport.SendSyncResponseAsync(msg.SenderId, new LogSyncResponseMessage 273 { 274 SenderId = NodeId, 275 Entries = entries, 276 FullReplay = fullReplay 277 }); 278 } 279 280 private void HandleSyncResponseReceived(LogSyncResponseMessage msg) 281 { 282 lock (_lock) 283 { 284 if (msg.FullReplay) 285 { 286 // Full replay from genesis 287 _actionLog.Clear(); 288 _appliedActionIds.Clear(); 289 _currentState = new GameStateDto { ActionCount = 0, Operators = new List<GameStateDto.OperatorSnapshot>() }; 290 _nextSequenceNumber = 0; 291 _currentStateHash = ComputeHash(_currentState); 292 } 293 294 foreach (var entry in msg.Entries) 295 { 296 if (entry.SequenceNumber >= _nextSequenceNumber) 297 { 298 ApplyActionInternal(entry.Action, entry.NodeId); 299 300 // Verify hash after apply 301 if (_currentStateHash != entry.StateHashAfterApply) 302 { 303 _isDesynced = true; 304 return; 305 } 306 } 307 } 308 309 _isDesynced = false; 310 } 311 } 312 313 private void HandlePeerConnected(Guid peerId) 314 { 315 // When a new peer connects, initiate sync 316 string hash; 317 long logLength; 318 319 lock (_lock) 320 { 321 hash = _currentStateHash; 322 logLength = _actionLog.Count; 323 } 324 325 _ = _transport.SendSyncRequestAsync(peerId, new LogSyncRequestMessage 326 { 327 SenderId = NodeId, 328 FromSequenceNumber = logLength, 329 LatestHash = hash 330 }); 331 } 332 333 private void HandlePeerDisconnected(Guid peerId) 334 { 335 // When a peer disconnects, mark it as acknowledged on all pending 336 // outbound actions so SubmitActionAsync does not hang indefinitely. 337 lock (_lock) 338 { 339 foreach (var pending in _pendingActions.Values) 340 { 341 pending.Acknowledge(peerId); 342 } 343 } 344 } 345 346 // --- Pending action tracking --- 347 348 private sealed class PendingAction 349 { 350 private readonly HashSet<Guid> _requiredPeers; 351 private readonly HashSet<Guid> _acknowledgedPeers = new(); 352 private readonly TaskCompletionSource _consensusTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); 353 354 public PlayerActionDto Action { get; } 355 public long ProposedSequenceNumber { get; } 356 357 public PendingAction(PlayerActionDto action, long proposedSeq, IReadOnlySet<Guid> connectedPeers) 358 { 359 Action = action; 360 ProposedSequenceNumber = proposedSeq; 361 _requiredPeers = new HashSet<Guid>(connectedPeers); 362 } 363 364 public bool IsConsensusReached 365 { 366 get 367 { 368 lock (_acknowledgedPeers) 369 { 370 return _requiredPeers.IsSubsetOf(_acknowledgedPeers); 371 } 372 } 373 } 374 375 public void Acknowledge(Guid peerId) 376 { 377 lock (_acknowledgedPeers) 378 { 379 _acknowledgedPeers.Add(peerId); 380 if (_requiredPeers.IsSubsetOf(_acknowledgedPeers)) 381 { 382 _consensusTcs.TrySetResult(); 383 } 384 } 385 } 386 387 public async Task WaitForConsensusAsync(CancellationToken ct) 388 { 389 using (ct.Register(() => _consensusTcs.TrySetCanceled(ct))) 390 { 391 await _consensusTcs.Task.ConfigureAwait(false); 392 } 393 } 394 } 395 }