/ GUNRPG.Application / Distributed / OperatorEventReplicator.cs
OperatorEventReplicator.cs
  1  using GUNRPG.Application.Operators;
  2  using GUNRPG.Core.Operators;
  3  
  4  namespace GUNRPG.Application.Distributed;
  5  
  6  /// <summary>
  7  /// Replicates operator events across all connected peers via the lockstep transport.
  8  /// <para>
  9  /// On peer connect: sends a sync request so the peer can share any operator events it knows about.
 10  /// On sync request received: responds with all locally known operator events.
 11  /// On operator event broadcast received: stores the event locally if not already present.
 12  /// After local event append: broadcasts the event to all connected peers.
 13  /// </para>
 14  /// </summary>
 15  public sealed class OperatorEventReplicator
 16  {
 17      private readonly Guid _nodeId;
 18      private readonly ILockstepTransport _transport;
 19      private readonly IOperatorEventStore _eventStore;
 20      private readonly OperatorUpdateHub? _updateHub;
 21      // Serializes all incoming-event applications to prevent sequence-check races across concurrent handler calls.
 22      private readonly SemaphoreSlim _applyGate = new(1, 1);
 23  
 24      public OperatorEventReplicator(Guid nodeId, ILockstepTransport transport, IOperatorEventStore eventStore, OperatorUpdateHub? updateHub = null)
 25      {
 26          _nodeId = nodeId;
 27          _transport = transport;
 28          _eventStore = eventStore;
 29          _updateHub = updateHub;
 30  
 31          _transport.OnPeerConnected += OnPeerConnected;
 32          _transport.OnOperatorEventReceived += OnOperatorEventReceived;
 33          _transport.OnOperatorEventSyncRequestReceived += OnOperatorEventSyncRequestReceived;
 34          _transport.OnOperatorEventSyncResponseReceived += OnOperatorEventSyncResponseReceived;
 35      }
 36  
 37      /// <summary>
 38      /// Broadcasts a newly appended operator event to all connected peers.
 39      /// Call this after successfully appending an event to the local store.
 40      /// </summary>
 41      public async Task BroadcastAsync(OperatorEvent evt, CancellationToken ct = default)
 42      {
 43          if (_transport.ConnectedPeers.Count == 0) return;
 44  
 45          await _transport.BroadcastOperatorEventAsync(ToMessage(evt), ct);
 46      }
 47  
 48      // --- Transport event handlers ---
 49  
 50      private void OnPeerConnected(Guid peerId)
 51      {
 52          // Send a sync request so the new peer shares its known operator events
 53          _ = _transport.SendOperatorEventSyncRequestAsync(peerId, new OperatorEventSyncRequestMessage
 54          {
 55              SenderId = _nodeId
 56          });
 57      }
 58  
 59      private void OnOperatorEventReceived(OperatorEventBroadcastMessage msg)
 60      {
 61          _ = ApplyEventIfNewAsync(msg);
 62      }
 63  
 64      private void OnOperatorEventSyncRequestReceived(OperatorEventSyncRequestMessage msg)
 65      {
 66          _ = HandleSyncRequestAsync(msg);
 67      }
 68  
 69      private void OnOperatorEventSyncResponseReceived(OperatorEventSyncResponseMessage msg)
 70      {
 71          _ = HandleSyncResponseAsync(msg);
 72      }
 73  
 74      private async Task HandleSyncRequestAsync(OperatorEventSyncRequestMessage msg)
 75      {
 76          try
 77          {
 78              var allOperatorIds = await _eventStore.ListOperatorIdsAsync();
 79              var allEvents = new List<OperatorEventBroadcastMessage>();
 80  
 81              foreach (var opId in allOperatorIds)
 82              {
 83                  var events = await _eventStore.LoadEventsAsync(opId);
 84                  allEvents.AddRange(events.Select(ToMessage));
 85              }
 86  
 87              await _transport.SendOperatorEventSyncResponseAsync(msg.SenderId, new OperatorEventSyncResponseMessage
 88              {
 89                  SenderId = _nodeId,
 90                  Events = allEvents
 91              });
 92          }
 93          catch (Exception)
 94          {
 95              // Best-effort: ignore sync errors
 96          }
 97      }
 98  
 99      private async Task HandleSyncResponseAsync(OperatorEventSyncResponseMessage msg)
100      {
101          if (msg.Events is not { Count: > 0 }) return;
102  
103          // Group by operator and sort by sequence to apply in order
104          var grouped = msg.Events
105              .GroupBy(e => e.OperatorId)
106              .Select(g => g.OrderBy(e => e.SequenceNumber).ToList());
107  
108          foreach (var operatorEvents in grouped)
109          {
110              foreach (var evt in operatorEvents)
111              {
112                  await ApplyEventIfNewAsync(evt);
113              }
114          }
115      }
116  
117      private async Task ApplyEventIfNewAsync(OperatorEventBroadcastMessage msg)
118      {
119          await _applyGate.WaitAsync();
120          try
121          {
122              var opId = OperatorId.FromGuid(msg.OperatorId);
123              var currentSeq = await _eventStore.GetCurrentSequenceAsync(opId);
124  
125              // Skip events we already have
126              if (msg.SequenceNumber <= currentSeq) return;
127  
128              // We can only apply the next event in sequence; skip if there's a gap
129              if (msg.SequenceNumber != currentSeq + 1) return;
130  
131              var domainEvent = RehydrateEvent(msg);
132              await _eventStore.AppendEventAsync(domainEvent);
133              _updateHub?.Publish(domainEvent);
134          }
135          catch (Exception)
136          {
137              // Best-effort: ignore replication errors (e.g. race conditions, hash mismatches)
138          }
139          finally
140          {
141              _applyGate.Release();
142          }
143      }
144  
145      // --- Conversion helpers ---
146  
147      private OperatorEventBroadcastMessage ToMessage(OperatorEvent evt)
148      {
149          return new OperatorEventBroadcastMessage
150          {
151              SenderId = _nodeId,
152              OperatorId = evt.OperatorId.Value,
153              SequenceNumber = evt.SequenceNumber,
154              EventType = evt.EventType,
155              Payload = evt.Payload,
156              PreviousHash = evt.PreviousHash,
157              Hash = evt.Hash,
158              Timestamp = evt.Timestamp
159          };
160      }
161  
162      /// <summary>
163      /// Reconstructs the concrete <see cref="OperatorEvent"/> subtype from a wire message.
164      /// Mirrors the mapping in <c>LiteDbOperatorEventStore.MapToDomainEvent</c>.
165      /// </summary>
166      public static OperatorEvent RehydrateEvent(OperatorEventBroadcastMessage msg)
167      {
168          var operatorId = OperatorId.FromGuid(msg.OperatorId);
169  
170          return msg.EventType switch
171          {
172              "OperatorCreated" => OperatorCreatedEvent.Rehydrate(operatorId, msg.Payload, msg.Timestamp),
173              "XpGained" => XpGainedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
174              "WoundsTreated" => WoundsTreatedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
175              "LoadoutChanged" => LoadoutChangedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
176              "PerkUnlocked" => PerkUnlockedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
177              "CombatVictory" => CombatVictoryEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
178              "ExfilFailed" => ExfilFailedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
179              "OperatorDied" => OperatorDiedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
180              "InfilStarted" => InfilStartedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
181              "InfilEnded" => InfilEndedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
182              "CombatSessionStarted" => CombatSessionStartedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
183              "PetActionApplied" => PetActionAppliedEvent.Rehydrate(operatorId, msg.SequenceNumber, msg.Payload, msg.PreviousHash, msg.Timestamp),
184              _ => throw new InvalidOperationException($"Unknown event type for replication: {msg.EventType}")
185          };
186      }
187  }