OperatorEventReplicator.cs
1 using GUNRPG.Application.Operators; 2 using GUNRPG.Core.Operators; 3 4 namespace GUNRPG.Application.Distributed; 5 6 /// <summary> 7 /// Replicates operator events across all connected peers via the lockstep transport. 8 /// <para> 9 /// On peer connect: sends a sync request so the peer can share any operator events it knows about. 10 /// On sync request received: responds with all locally known operator events. 11 /// On operator event broadcast received: stores the event locally if not already present. 12 /// After local event append: broadcasts the event to all connected peers. 13 /// </para> 14 /// </summary> 15 public sealed class OperatorEventReplicator 16 { 17 private readonly Guid _nodeId; 18 private readonly ILockstepTransport _transport; 19 private readonly IOperatorEventStore _eventStore; 20 private readonly OperatorUpdateHub? _updateHub; 21 // Serializes all incoming-event applications to prevent sequence-check races across concurrent handler calls. 22 private readonly SemaphoreSlim _applyGate = new(1, 1); 23 24 public OperatorEventReplicator(Guid nodeId, ILockstepTransport transport, IOperatorEventStore eventStore, OperatorUpdateHub? updateHub = null) 25 { 26 _nodeId = nodeId; 27 _transport = transport; 28 _eventStore = eventStore; 29 _updateHub = updateHub; 30 31 _transport.OnPeerConnected += OnPeerConnected; 32 _transport.OnOperatorEventReceived += OnOperatorEventReceived; 33 _transport.OnOperatorEventSyncRequestReceived += OnOperatorEventSyncRequestReceived; 34 _transport.OnOperatorEventSyncResponseReceived += OnOperatorEventSyncResponseReceived; 35 } 36 37 /// <summary> 38 /// Broadcasts a newly appended operator event to all connected peers. 39 /// Call this after successfully appending an event to the local store. 40 /// </summary> 41 public async Task BroadcastAsync(OperatorEvent evt, CancellationToken ct = default) 42 { 43 if (_transport.ConnectedPeers.Count == 0) return; 44 45 await _transport.BroadcastOperatorEventAsync(ToMessage(evt), ct); 46 } 47 48 // --- Transport event handlers --- 49 50 private void OnPeerConnected(Guid peerId) 51 { 52 // Send a sync request so the new peer shares its known operator events 53 _ = _transport.SendOperatorEventSyncRequestAsync(peerId, new OperatorEventSyncRequestMessage 54 { 55 SenderId = _nodeId 56 }); 57 } 58 59 private void OnOperatorEventReceived(OperatorEventBroadcastMessage msg) 60 { 61 _ = ApplyEventIfNewAsync(msg); 62 } 63 64 private void OnOperatorEventSyncRequestReceived(OperatorEventSyncRequestMessage msg) 65 { 66 _ = HandleSyncRequestAsync(msg); 67 } 68 69 private void OnOperatorEventSyncResponseReceived(OperatorEventSyncResponseMessage msg) 70 { 71 _ = HandleSyncResponseAsync(msg); 72 } 73 74 private async Task HandleSyncRequestAsync(OperatorEventSyncRequestMessage msg) 75 { 76 try 77 { 78 var allOperatorIds = await _eventStore.ListOperatorIdsAsync(); 79 var allEvents = new List<OperatorEventBroadcastMessage>(); 80 81 foreach (var opId in allOperatorIds) 82 { 83 var events = await _eventStore.LoadEventsAsync(opId); 84 allEvents.AddRange(events.Select(ToMessage)); 85 } 86 87 await _transport.SendOperatorEventSyncResponseAsync(msg.SenderId, new OperatorEventSyncResponseMessage 88 { 89 SenderId = _nodeId, 90 Events = allEvents 91 }); 92 } 93 catch (Exception) 94 { 95 // Best-effort: ignore sync errors 96 } 97 } 98 99 private async Task HandleSyncResponseAsync(OperatorEventSyncResponseMessage msg) 100 { 101 if (msg.Events is not { Count: > 0 }) return; 102 103 // Group by operator and sort by sequence to apply in order 104 var grouped = msg.Events 105 .GroupBy(e => e.OperatorId) 106 .Select(g => g.OrderBy(e => e.SequenceNumber).ToList()); 107 108 foreach (var operatorEvents in grouped) 109 { 110 foreach (var evt in operatorEvents) 111 { 112 await ApplyEventIfNewAsync(evt); 113 } 114 } 115 } 116 117 private async Task ApplyEventIfNewAsync(OperatorEventBroadcastMessage msg) 118 { 119 await _applyGate.WaitAsync(); 120 try 121 { 122 var opId = OperatorId.FromGuid(msg.OperatorId); 123 var currentSeq = await _eventStore.GetCurrentSequenceAsync(opId); 124 125 // Skip events we already have 126 if (msg.SequenceNumber <= currentSeq) return; 127 128 // We can only apply the next event in sequence; skip if there's a gap 129 if (msg.SequenceNumber != currentSeq + 1) return; 130 131 var domainEvent = RehydrateEvent(msg); 132 await _eventStore.AppendEventAsync(domainEvent); 133 _updateHub?.Publish(domainEvent); 134 } 135 catch (Exception) 136 { 137 // Best-effort: ignore replication errors (e.g. race conditions, hash mismatches) 138 } 139 finally 140 { 141 _applyGate.Release(); 142 } 143 } 144 145 // --- Conversion helpers --- 146 147 private OperatorEventBroadcastMessage ToMessage(OperatorEvent evt) 148 { 149 return new OperatorEventBroadcastMessage 150 { 151 SenderId = _nodeId, 152 OperatorId = evt.OperatorId.Value, 153 SequenceNumber = evt.SequenceNumber, 154 EventType = evt.EventType, 155 Payload = evt.Payload, 156 PreviousHash = evt.PreviousHash, 157 Hash = evt.Hash, 158 Timestamp = evt.Timestamp 159 }; 160 } 161 162 /// <summary> 163 /// Reconstructs the concrete <see cref="OperatorEvent"/> subtype from a wire message. 164 /// Mirrors the mapping in <c>LiteDbOperatorEventStore.MapToDomainEvent</c>. 165 /// </summary> 166 public static OperatorEvent RehydrateEvent(OperatorEventBroadcastMessage msg) 167 { 168 var operatorId = OperatorId.FromGuid(msg.OperatorId); 169 170 return msg.EventType switch 171 { 172 "OperatorCreated" => OperatorCreatedEvent.Rehydrate(operatorId, msg.Payload, msg.Timestamp), 173 "XpGained" => XpGainedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 174 "WoundsTreated" => WoundsTreatedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 175 "LoadoutChanged" => LoadoutChangedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 176 "PerkUnlocked" => PerkUnlockedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 177 "CombatVictory" => CombatVictoryEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 178 "ExfilFailed" => ExfilFailedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 179 "OperatorDied" => OperatorDiedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 180 "InfilStarted" => InfilStartedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 181 "InfilEnded" => InfilEndedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 182 "CombatSessionStarted" => CombatSessionStartedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 183 "PetActionApplied" => PetActionAppliedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp), 184 _ => throw new InvalidOperationException($"Unknown event type for replication: {msg.EventType}") 185 }; 186 } 187 }