CombatSessionUpdateHub.cs
1 using System.Runtime.CompilerServices; 2 using System.Threading.Channels; 3 4 namespace GUNRPG.Application.Sessions; 5 6 /// <summary> 7 /// In-process pub/sub hub for real-time combat session state change notifications. 8 /// <para> 9 /// The server publishes a notification each time a combat session is mutated 10 /// (intent submission, advance, pet action), allowing connected clients to 11 /// re-fetch the latest session state without polling. 12 /// </para> 13 /// <para> 14 /// HTTP clients subscribe via the SSE endpoint and receive a notification 15 /// whenever the session state changes, enabling cross-client real-time 16 /// synchronisation without requiring libp2p on the client. 17 /// </para> 18 /// </summary> 19 public sealed class CombatSessionUpdateHub 20 { 21 private readonly Dictionary<Guid, List<Channel<Guid>>> _subscriptions = new(); 22 private readonly object _subLock = new(); 23 24 /// <summary> 25 /// Publishes a notification to all active subscribers for the given session. 26 /// Non-blocking; if a subscriber's channel buffer is full, the oldest buffered 27 /// message is dropped to make room (see <see cref="BoundedChannelFullMode.DropOldest"/>). 28 /// </summary> 29 public void Publish(Guid sessionId) 30 { 31 List<Channel<Guid>> snapshot; 32 lock (_subLock) 33 { 34 if (!_subscriptions.TryGetValue(sessionId, out var list)) 35 return; 36 snapshot = [..list]; 37 } 38 39 foreach (var channel in snapshot) 40 { 41 channel.Writer.TryWrite(sessionId); 42 } 43 } 44 45 /// <summary> 46 /// Subscribes to state-change notifications for the given combat session. 47 /// The returned async enumerable yields the session ID on each change until 48 /// <paramref name="ct"/> is cancelled. 49 /// </summary> 50 public async IAsyncEnumerable<Guid> SubscribeAsync( 51 Guid sessionId, 52 [EnumeratorCancellation] CancellationToken ct) 53 { 54 var channel = Channel.CreateBounded<Guid>(new BoundedChannelOptions(50) 55 { 56 FullMode = BoundedChannelFullMode.DropOldest, 57 SingleReader = true, 58 SingleWriter = false 59 }); 60 61 lock (_subLock) 62 { 63 if (!_subscriptions.TryGetValue(sessionId, out var list)) 64 { 65 list = []; 66 _subscriptions[sessionId] = list; 67 } 68 list.Add(channel); 69 } 70 71 try 72 { 73 await foreach (var id in channel.Reader.ReadAllAsync(ct)) 74 { 75 yield return id; 76 } 77 } 78 finally 79 { 80 Unsubscribe(sessionId, channel); 81 } 82 } 83 84 private void Unsubscribe(Guid sessionId, Channel<Guid> channel) 85 { 86 lock (_subLock) 87 { 88 if (_subscriptions.TryGetValue(sessionId, out var list)) 89 { 90 list.Remove(channel); 91 if (list.Count == 0) 92 _subscriptions.Remove(sessionId); 93 } 94 } 95 channel.Writer.TryComplete(); 96 } 97 }