/ GUNRPG.Tests / OperatorEventReplicatorTests.cs
OperatorEventReplicatorTests.cs
  1  using GUNRPG.Application.Distributed;
  2  using GUNRPG.Application.Operators;
  3  using GUNRPG.Core.Operators;
  4  using GUNRPG.Infrastructure.Distributed;
  5  
  6  namespace GUNRPG.Tests;
  7  
  8  /// <summary>
  9  /// Tests for OperatorEventReplicator: verifying that operator events are distributed
 10  /// to all connected peers so player state is accessible from any server.
 11  /// </summary>
 12  public class OperatorEventReplicatorTests
 13  {
 14      // --- Broadcast on event append ---
 15  
 16      [Fact]
 17      public async Task BroadcastAsync_NoPeers_DoesNotThrow()
 18      {
 19          var nodeId = Guid.NewGuid();
 20          var transport = new InMemoryLockstepTransport(nodeId);
 21          var store = new InMemoryOperatorEventStore();
 22          var replicator = new OperatorEventReplicator(nodeId, transport, store);
 23  
 24          var operatorId = OperatorId.NewId();
 25          var evt = new OperatorCreatedEvent(operatorId, "TestOp");
 26  
 27          // Should not throw even with no peers
 28          await replicator.BroadcastAsync(evt);
 29      }
 30  
 31      [Fact]
 32      public async Task BroadcastAsync_WithPeer_PeerReceivesEvent()
 33      {
 34          var nodeIdA = Guid.NewGuid();
 35          var nodeIdB = Guid.NewGuid();
 36          var transportA = new InMemoryLockstepTransport(nodeIdA);
 37          var transportB = new InMemoryLockstepTransport(nodeIdB);
 38          var storeA = new InMemoryOperatorEventStore();
 39          var storeB = new InMemoryOperatorEventStore();
 40  
 41          var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA);
 42          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
 43  
 44          // Connect after setting up replicators so OnPeerConnected fires
 45          // (no pre-existing events, so sync response is empty)
 46          transportA.ConnectTo(transportB);
 47  
 48          var operatorId = OperatorId.NewId();
 49          var createdEvent = new OperatorCreatedEvent(operatorId, "Alpha");
 50  
 51          // Append to storeA and broadcast via the replicator
 52          await storeA.AppendEventAsync(createdEvent);
 53          await replicatorA.BroadcastAsync(createdEvent);
 54  
 55          // storeB should now have the operator
 56          var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1);
 57          Assert.Single(events);
 58          Assert.Equal("OperatorCreated", events[0].EventType);
 59          Assert.Equal(operatorId, events[0].OperatorId);
 60      }
 61  
 62      [Fact]
 63      public async Task BroadcastAsync_PeerStoresEventWithCorrectData()
 64      {
 65          var nodeIdA = Guid.NewGuid();
 66          var nodeIdB = Guid.NewGuid();
 67          var transportA = new InMemoryLockstepTransport(nodeIdA);
 68          var transportB = new InMemoryLockstepTransport(nodeIdB);
 69          var storeA = new InMemoryOperatorEventStore();
 70          var storeB = new InMemoryOperatorEventStore();
 71  
 72          var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA);
 73          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
 74          transportA.ConnectTo(transportB);
 75  
 76          var operatorId = OperatorId.NewId();
 77          var createdEvent = new OperatorCreatedEvent(operatorId, "Bravo");
 78          await storeA.AppendEventAsync(createdEvent);
 79          await replicatorA.BroadcastAsync(createdEvent);
 80  
 81          var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1);
 82          Assert.Single(events);
 83          Assert.Equal(createdEvent.OperatorId, events[0].OperatorId);
 84          Assert.Equal(createdEvent.SequenceNumber, events[0].SequenceNumber);
 85          Assert.Equal(createdEvent.EventType, events[0].EventType);
 86          Assert.Equal(createdEvent.Hash, events[0].Hash);
 87          Assert.Equal(createdEvent.PreviousHash, events[0].PreviousHash);
 88      }
 89  
 90      [Fact]
 91      public async Task BroadcastAsync_MultipleEvents_PeerStoresAll()
 92      {
 93          var nodeIdA = Guid.NewGuid();
 94          var nodeIdB = Guid.NewGuid();
 95          var transportA = new InMemoryLockstepTransport(nodeIdA);
 96          var transportB = new InMemoryLockstepTransport(nodeIdB);
 97          var storeA = new InMemoryOperatorEventStore();
 98          var storeB = new InMemoryOperatorEventStore();
 99  
100          var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA);
101          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
102          transportA.ConnectTo(transportB);
103  
104          var operatorId = OperatorId.NewId();
105          var createdEvent = new OperatorCreatedEvent(operatorId, "Charlie");
106          await storeA.AppendEventAsync(createdEvent);
107          await replicatorA.BroadcastAsync(createdEvent);
108  
109          var xpEvent = new XpGainedEvent(operatorId, 1, 100, "Mission complete", createdEvent.Hash);
110          await storeA.AppendEventAsync(xpEvent);
111          await replicatorA.BroadcastAsync(xpEvent);
112  
113          var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 2);
114          Assert.Equal(2, events.Count);
115          Assert.Equal("OperatorCreated", events[0].EventType);
116          Assert.Equal("XpGained", events[1].EventType);
117      }
118  
119      // --- Sync on peer connect ---
120  
121      [Fact]
122      public async Task OnPeerConnected_ExistingEvents_SyncedToPeer()
123      {
124          var nodeIdA = Guid.NewGuid();
125          var nodeIdB = Guid.NewGuid();
126          var transportA = new InMemoryLockstepTransport(nodeIdA);
127          var transportB = new InMemoryLockstepTransport(nodeIdB);
128          var storeA = new InMemoryOperatorEventStore();
129          var storeB = new InMemoryOperatorEventStore();
130  
131          // A has an operator created before B connects
132          var operatorId = OperatorId.NewId();
133          var createdEvent = new OperatorCreatedEvent(operatorId, "Delta");
134          await storeA.AppendEventAsync(createdEvent);
135  
136          // Set up replicators then connect — should trigger sync
137          _ = new OperatorEventReplicator(nodeIdA, transportA, storeA);
138          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
139          transportA.ConnectTo(transportB);
140  
141          // B should now have the operator
142          var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1);
143          Assert.Single(events);
144          Assert.Equal("OperatorCreated", events[0].EventType);
145      }
146  
147      [Fact]
148      public async Task OnPeerConnected_MultipleOperators_AllSyncedToPeer()
149      {
150          var nodeIdA = Guid.NewGuid();
151          var nodeIdB = Guid.NewGuid();
152          var transportA = new InMemoryLockstepTransport(nodeIdA);
153          var transportB = new InMemoryLockstepTransport(nodeIdB);
154          var storeA = new InMemoryOperatorEventStore();
155          var storeB = new InMemoryOperatorEventStore();
156  
157          // A has two operators
158          var opId1 = OperatorId.NewId();
159          var opId2 = OperatorId.NewId();
160          var evt1 = new OperatorCreatedEvent(opId1, "Echo");
161          var evt2 = new OperatorCreatedEvent(opId2, "Foxtrot");
162          await storeA.AppendEventAsync(evt1);
163          await storeA.AppendEventAsync(evt2);
164  
165          _ = new OperatorEventReplicator(nodeIdA, transportA, storeA);
166          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
167          transportA.ConnectTo(transportB);
168  
169          Assert.Single(await WaitForEventsAsync(storeB, opId1, expectedCount: 1));
170          Assert.Single(await WaitForEventsAsync(storeB, opId2, expectedCount: 1));
171      }
172  
173      [Fact]
174      public async Task OnPeerConnected_BothHaveEvents_BothSync()
175      {
176          var nodeIdA = Guid.NewGuid();
177          var nodeIdB = Guid.NewGuid();
178          var transportA = new InMemoryLockstepTransport(nodeIdA);
179          var transportB = new InMemoryLockstepTransport(nodeIdB);
180          var storeA = new InMemoryOperatorEventStore();
181          var storeB = new InMemoryOperatorEventStore();
182  
183          var opIdA = OperatorId.NewId();
184          var opIdB = OperatorId.NewId();
185  
186          await storeA.AppendEventAsync(new OperatorCreatedEvent(opIdA, "Golf"));
187          await storeB.AppendEventAsync(new OperatorCreatedEvent(opIdB, "Hotel"));
188  
189          _ = new OperatorEventReplicator(nodeIdA, transportA, storeA);
190          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
191          transportA.ConnectTo(transportB);
192  
193          // A should now have B's operator and vice versa
194          Assert.Single(await WaitForEventsAsync(storeA, opIdB, expectedCount: 1));
195          Assert.Single(await WaitForEventsAsync(storeB, opIdA, expectedCount: 1));
196      }
197  
198      // --- Duplicate handling ---
199  
200      [Fact]
201      public async Task BroadcastAsync_AlreadyKnownEvent_NotAppliedAgain()
202      {
203          var nodeIdA = Guid.NewGuid();
204          var nodeIdB = Guid.NewGuid();
205          var transportA = new InMemoryLockstepTransport(nodeIdA);
206          var transportB = new InMemoryLockstepTransport(nodeIdB);
207          var storeA = new InMemoryOperatorEventStore();
208          var storeB = new InMemoryOperatorEventStore();
209  
210          var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA);
211          _ = new OperatorEventReplicator(nodeIdB, transportB, storeB);
212          transportA.ConnectTo(transportB);
213  
214          var operatorId = OperatorId.NewId();
215          var createdEvent = new OperatorCreatedEvent(operatorId, "India");
216          await storeA.AppendEventAsync(createdEvent);
217  
218          // Broadcast same event twice; second should be silently skipped
219          await replicatorA.BroadcastAsync(createdEvent);
220          await WaitForEventsAsync(storeB, operatorId, expectedCount: 1);
221          await replicatorA.BroadcastAsync(createdEvent);
222          // Give the second broadcast a chance to be processed
223          await Task.Delay(50);
224  
225          // B should still have exactly one event (no duplicates)
226          var events = await storeB.LoadEventsAsync(operatorId);
227          Assert.Single(events);
228      }
229  
230      // --- RehydrateEvent ---
231  
232      [Fact]
233      public void RehydrateEvent_OperatorCreated_ReturnsCorrectType()
234      {
235          var operatorId = OperatorId.NewId();
236          var original = new OperatorCreatedEvent(operatorId, "Juliet");
237          var msg = CreateMessage(original);
238  
239          var result = OperatorEventReplicator.RehydrateEvent(msg);
240  
241          Assert.IsType<OperatorCreatedEvent>(result);
242          Assert.Equal(original.OperatorId, result.OperatorId);
243          Assert.Equal(original.SequenceNumber, result.SequenceNumber);
244          Assert.Equal(original.Hash, result.Hash);
245      }
246  
247      [Fact]
248      public void RehydrateEvent_XpGained_ReturnsCorrectType()
249      {
250          var operatorId = OperatorId.NewId();
251          var createdEvent = new OperatorCreatedEvent(operatorId, "Kilo");
252          var xpEvent = new XpGainedEvent(operatorId, 1, 50, "Mission", createdEvent.Hash);
253          var msg = CreateMessage(xpEvent);
254  
255          var result = OperatorEventReplicator.RehydrateEvent(msg);
256  
257          Assert.IsType<XpGainedEvent>(result);
258          Assert.Equal(xpEvent.Hash, result.Hash);
259      }
260  
261      [Fact]
262      public void RehydrateEvent_UnknownType_ThrowsInvalidOperationException()
263      {
264          var msg = new OperatorEventBroadcastMessage
265          {
266              SenderId = Guid.NewGuid(),
267              OperatorId = Guid.NewGuid(),
268              SequenceNumber = 0,
269              EventType = "UnknownEventType",
270              Payload = "{}",
271              PreviousHash = "",
272              Hash = "abc",
273              Timestamp = DateTimeOffset.UtcNow
274          };
275  
276          Assert.Throws<InvalidOperationException>(() => OperatorEventReplicator.RehydrateEvent(msg));
277      }
278  
279      // --- Helpers ---
280  
281      /// <summary>
282      /// Polls <paramref name="store"/> until it contains at least <paramref name="expectedCount"/>
283      /// events for <paramref name="operatorId"/>, or throws a <see cref="TimeoutException"/> after 1 s.
284      /// </summary>
285      private static async Task<IReadOnlyList<OperatorEvent>> WaitForEventsAsync(
286          IOperatorEventStore store,
287          OperatorId operatorId,
288          int expectedCount,
289          int timeoutMs = 1000)
290      {
291          var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
292          while (true)
293          {
294              var events = await store.LoadEventsAsync(operatorId);
295              if (events.Count >= expectedCount)
296                  return events;
297  
298              if (DateTime.UtcNow >= deadline)
299                  throw new TimeoutException(
300                      $"Timed out waiting for {expectedCount} replicated operator event(s) " +
301                      $"(got {events.Count}).");
302  
303              await Task.Delay(10);
304          }
305      }
306  
307      private static OperatorEventBroadcastMessage CreateMessage(OperatorEvent evt)
308      {
309          return new OperatorEventBroadcastMessage
310          {
311              SenderId = Guid.NewGuid(),
312              OperatorId = evt.OperatorId.Value,
313              SequenceNumber = evt.SequenceNumber,
314              EventType = evt.EventType,
315              Payload = evt.Payload,
316              PreviousHash = evt.PreviousHash,
317              Hash = evt.Hash,
318              Timestamp = evt.Timestamp
319          };
320      }
321  }