/ GUNRPG.Tests / OperatorEventReplicatorTests.cs
OperatorEventReplicatorTests.cs
1 using GUNRPG.Application.Distributed; 2 using GUNRPG.Application.Operators; 3 using GUNRPG.Core.Operators; 4 using GUNRPG.Infrastructure.Distributed; 5 6 namespace GUNRPG.Tests; 7 8 /// <summary> 9 /// Tests for OperatorEventReplicator: verifying that operator events are distributed 10 /// to all connected peers so player state is accessible from any server. 11 /// </summary> 12 public class OperatorEventReplicatorTests 13 { 14 // --- Broadcast on event append --- 15 16 [Fact] 17 public async Task BroadcastAsync_NoPeers_DoesNotThrow() 18 { 19 var nodeId = Guid.NewGuid(); 20 var transport = new InMemoryLockstepTransport(nodeId); 21 var store = new InMemoryOperatorEventStore(); 22 var replicator = new OperatorEventReplicator(nodeId, transport, store); 23 24 var operatorId = OperatorId.NewId(); 25 var evt = new OperatorCreatedEvent(operatorId, "TestOp"); 26 27 // Should not throw even with no peers 28 await replicator.BroadcastAsync(evt); 29 } 30 31 [Fact] 32 public async Task BroadcastAsync_WithPeer_PeerReceivesEvent() 33 { 34 var nodeIdA = Guid.NewGuid(); 35 var nodeIdB = Guid.NewGuid(); 36 var transportA = new InMemoryLockstepTransport(nodeIdA); 37 var transportB = new InMemoryLockstepTransport(nodeIdB); 38 var storeA = new InMemoryOperatorEventStore(); 39 var storeB = new InMemoryOperatorEventStore(); 40 41 var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA); 42 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 43 44 // Connect after setting up replicators so OnPeerConnected fires 45 // (no pre-existing events, so sync response is empty) 46 transportA.ConnectTo(transportB); 47 48 var operatorId = OperatorId.NewId(); 49 var createdEvent = new OperatorCreatedEvent(operatorId, "Alpha"); 50 51 // Append to storeA and broadcast via the replicator 52 await storeA.AppendEventAsync(createdEvent); 53 await replicatorA.BroadcastAsync(createdEvent); 54 55 // storeB should now have the operator 56 var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1); 57 Assert.Single(events); 58 Assert.Equal("OperatorCreated", events[0].EventType); 59 Assert.Equal(operatorId, events[0].OperatorId); 60 } 61 62 [Fact] 63 public async Task BroadcastAsync_PeerStoresEventWithCorrectData() 64 { 65 var nodeIdA = Guid.NewGuid(); 66 var nodeIdB = Guid.NewGuid(); 67 var transportA = new InMemoryLockstepTransport(nodeIdA); 68 var transportB = new InMemoryLockstepTransport(nodeIdB); 69 var storeA = new InMemoryOperatorEventStore(); 70 var storeB = new InMemoryOperatorEventStore(); 71 72 var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA); 73 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 74 transportA.ConnectTo(transportB); 75 76 var operatorId = OperatorId.NewId(); 77 var createdEvent = new OperatorCreatedEvent(operatorId, "Bravo"); 78 await storeA.AppendEventAsync(createdEvent); 79 await replicatorA.BroadcastAsync(createdEvent); 80 81 var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1); 82 Assert.Single(events); 83 Assert.Equal(createdEvent.OperatorId, events[0].OperatorId); 84 Assert.Equal(createdEvent.SequenceNumber, events[0].SequenceNumber); 85 Assert.Equal(createdEvent.EventType, events[0].EventType); 86 Assert.Equal(createdEvent.Hash, events[0].Hash); 87 Assert.Equal(createdEvent.PreviousHash, events[0].PreviousHash); 88 } 89 90 [Fact] 91 public async Task BroadcastAsync_MultipleEvents_PeerStoresAll() 92 { 93 var nodeIdA = Guid.NewGuid(); 94 var nodeIdB = Guid.NewGuid(); 95 var transportA = new InMemoryLockstepTransport(nodeIdA); 96 var transportB = new InMemoryLockstepTransport(nodeIdB); 97 var storeA = new InMemoryOperatorEventStore(); 98 var storeB = new InMemoryOperatorEventStore(); 99 100 var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA); 101 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 102 transportA.ConnectTo(transportB); 103 104 var operatorId = OperatorId.NewId(); 105 var createdEvent = new OperatorCreatedEvent(operatorId, "Charlie"); 106 await storeA.AppendEventAsync(createdEvent); 107 await replicatorA.BroadcastAsync(createdEvent); 108 109 var xpEvent = new XpGainedEvent(operatorId, 1, 100, "Mission complete", createdEvent.Hash); 110 await storeA.AppendEventAsync(xpEvent); 111 await replicatorA.BroadcastAsync(xpEvent); 112 113 var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 2); 114 Assert.Equal(2, events.Count); 115 Assert.Equal("OperatorCreated", events[0].EventType); 116 Assert.Equal("XpGained", events[1].EventType); 117 } 118 119 // --- Sync on peer connect --- 120 121 [Fact] 122 public async Task OnPeerConnected_ExistingEvents_SyncedToPeer() 123 { 124 var nodeIdA = Guid.NewGuid(); 125 var nodeIdB = Guid.NewGuid(); 126 var transportA = new InMemoryLockstepTransport(nodeIdA); 127 var transportB = new InMemoryLockstepTransport(nodeIdB); 128 var storeA = new InMemoryOperatorEventStore(); 129 var storeB = new InMemoryOperatorEventStore(); 130 131 // A has an operator created before B connects 132 var operatorId = OperatorId.NewId(); 133 var createdEvent = new OperatorCreatedEvent(operatorId, "Delta"); 134 await storeA.AppendEventAsync(createdEvent); 135 136 // Set up replicators then connect — should trigger sync 137 _ = new OperatorEventReplicator(nodeIdA, transportA, storeA); 138 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 139 transportA.ConnectTo(transportB); 140 141 // B should now have the operator 142 var events = await WaitForEventsAsync(storeB, operatorId, expectedCount: 1); 143 Assert.Single(events); 144 Assert.Equal("OperatorCreated", events[0].EventType); 145 } 146 147 [Fact] 148 public async Task OnPeerConnected_MultipleOperators_AllSyncedToPeer() 149 { 150 var nodeIdA = Guid.NewGuid(); 151 var nodeIdB = Guid.NewGuid(); 152 var transportA = new InMemoryLockstepTransport(nodeIdA); 153 var transportB = new InMemoryLockstepTransport(nodeIdB); 154 var storeA = new InMemoryOperatorEventStore(); 155 var storeB = new InMemoryOperatorEventStore(); 156 157 // A has two operators 158 var opId1 = OperatorId.NewId(); 159 var opId2 = OperatorId.NewId(); 160 var evt1 = new OperatorCreatedEvent(opId1, "Echo"); 161 var evt2 = new OperatorCreatedEvent(opId2, "Foxtrot"); 162 await storeA.AppendEventAsync(evt1); 163 await storeA.AppendEventAsync(evt2); 164 165 _ = new OperatorEventReplicator(nodeIdA, transportA, storeA); 166 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 167 transportA.ConnectTo(transportB); 168 169 Assert.Single(await WaitForEventsAsync(storeB, opId1, expectedCount: 1)); 170 Assert.Single(await WaitForEventsAsync(storeB, opId2, expectedCount: 1)); 171 } 172 173 [Fact] 174 public async Task OnPeerConnected_BothHaveEvents_BothSync() 175 { 176 var nodeIdA = Guid.NewGuid(); 177 var nodeIdB = Guid.NewGuid(); 178 var transportA = new InMemoryLockstepTransport(nodeIdA); 179 var transportB = new InMemoryLockstepTransport(nodeIdB); 180 var storeA = new InMemoryOperatorEventStore(); 181 var storeB = new InMemoryOperatorEventStore(); 182 183 var opIdA = OperatorId.NewId(); 184 var opIdB = OperatorId.NewId(); 185 186 await storeA.AppendEventAsync(new OperatorCreatedEvent(opIdA, "Golf")); 187 await storeB.AppendEventAsync(new OperatorCreatedEvent(opIdB, "Hotel")); 188 189 _ = new OperatorEventReplicator(nodeIdA, transportA, storeA); 190 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 191 transportA.ConnectTo(transportB); 192 193 // A should now have B's operator and vice versa 194 Assert.Single(await WaitForEventsAsync(storeA, opIdB, expectedCount: 1)); 195 Assert.Single(await WaitForEventsAsync(storeB, opIdA, expectedCount: 1)); 196 } 197 198 // --- Duplicate handling --- 199 200 [Fact] 201 public async Task BroadcastAsync_AlreadyKnownEvent_NotAppliedAgain() 202 { 203 var nodeIdA = Guid.NewGuid(); 204 var nodeIdB = Guid.NewGuid(); 205 var transportA = new InMemoryLockstepTransport(nodeIdA); 206 var transportB = new InMemoryLockstepTransport(nodeIdB); 207 var storeA = new InMemoryOperatorEventStore(); 208 var storeB = new InMemoryOperatorEventStore(); 209 210 var replicatorA = new OperatorEventReplicator(nodeIdA, transportA, storeA); 211 _ = new OperatorEventReplicator(nodeIdB, transportB, storeB); 212 transportA.ConnectTo(transportB); 213 214 var operatorId = OperatorId.NewId(); 215 var createdEvent = new OperatorCreatedEvent(operatorId, "India"); 216 await storeA.AppendEventAsync(createdEvent); 217 218 // Broadcast same event twice; second should be silently skipped 219 await replicatorA.BroadcastAsync(createdEvent); 220 await WaitForEventsAsync(storeB, operatorId, expectedCount: 1); 221 await replicatorA.BroadcastAsync(createdEvent); 222 // Give the second broadcast a chance to be processed 223 await Task.Delay(50); 224 225 // B should still have exactly one event (no duplicates) 226 var events = await storeB.LoadEventsAsync(operatorId); 227 Assert.Single(events); 228 } 229 230 // --- RehydrateEvent --- 231 232 [Fact] 233 public void RehydrateEvent_OperatorCreated_ReturnsCorrectType() 234 { 235 var operatorId = OperatorId.NewId(); 236 var original = new OperatorCreatedEvent(operatorId, "Juliet"); 237 var msg = CreateMessage(original); 238 239 var result = OperatorEventReplicator.RehydrateEvent(msg); 240 241 Assert.IsType<OperatorCreatedEvent>(result); 242 Assert.Equal(original.OperatorId, result.OperatorId); 243 Assert.Equal(original.SequenceNumber, result.SequenceNumber); 244 Assert.Equal(original.Hash, result.Hash); 245 } 246 247 [Fact] 248 public void RehydrateEvent_XpGained_ReturnsCorrectType() 249 { 250 var operatorId = OperatorId.NewId(); 251 var createdEvent = new OperatorCreatedEvent(operatorId, "Kilo"); 252 var xpEvent = new XpGainedEvent(operatorId, 1, 50, "Mission", createdEvent.Hash); 253 var msg = CreateMessage(xpEvent); 254 255 var result = OperatorEventReplicator.RehydrateEvent(msg); 256 257 Assert.IsType<XpGainedEvent>(result); 258 Assert.Equal(xpEvent.Hash, result.Hash); 259 } 260 261 [Fact] 262 public void RehydrateEvent_UnknownType_ThrowsInvalidOperationException() 263 { 264 var msg = new OperatorEventBroadcastMessage 265 { 266 SenderId = Guid.NewGuid(), 267 OperatorId = Guid.NewGuid(), 268 SequenceNumber = 0, 269 EventType = "UnknownEventType", 270 Payload = "{}", 271 PreviousHash = "", 272 Hash = "abc", 273 Timestamp = DateTimeOffset.UtcNow 274 }; 275 276 Assert.Throws<InvalidOperationException>(() => OperatorEventReplicator.RehydrateEvent(msg)); 277 } 278 279 // --- Helpers --- 280 281 /// <summary> 282 /// Polls <paramref name="store"/> until it contains at least <paramref name="expectedCount"/> 283 /// events for <paramref name="operatorId"/>, or throws a <see cref="TimeoutException"/> after 1 s. 284 /// </summary> 285 private static async Task<IReadOnlyList<OperatorEvent>> WaitForEventsAsync( 286 IOperatorEventStore store, 287 OperatorId operatorId, 288 int expectedCount, 289 int timeoutMs = 1000) 290 { 291 var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); 292 while (true) 293 { 294 var events = await store.LoadEventsAsync(operatorId); 295 if (events.Count >= expectedCount) 296 return events; 297 298 if (DateTime.UtcNow >= deadline) 299 throw new TimeoutException( 300 $"Timed out waiting for {expectedCount} replicated operator event(s) " + 301 $"(got {events.Count})."); 302 303 await Task.Delay(10); 304 } 305 } 306 307 private static OperatorEventBroadcastMessage CreateMessage(OperatorEvent evt) 308 { 309 return new OperatorEventBroadcastMessage 310 { 311 SenderId = Guid.NewGuid(), 312 OperatorId = evt.OperatorId.Value, 313 SequenceNumber = evt.SequenceNumber, 314 EventType = evt.EventType, 315 Payload = evt.Payload, 316 PreviousHash = evt.PreviousHash, 317 Hash = evt.Hash, 318 Timestamp = evt.Timestamp 319 }; 320 } 321 }