OperatorUpdateHub.cs
1 using System.Runtime.CompilerServices; 2 using System.Threading.Channels; 3 using GUNRPG.Core.Operators; 4 5 namespace GUNRPG.Application.Distributed; 6 7 /// <summary> 8 /// In-process pub/sub hub for real-time operator state change notifications. 9 /// <para> 10 /// The server publishes a notification each time an operator event is appended 11 /// (either locally via <see cref="Operators.OperatorExfilService"/> or via peer 12 /// replication in <see cref="OperatorEventReplicator"/>). 13 /// </para> 14 /// <para> 15 /// HTTP clients subscribe via the SSE endpoint and receive a notification 16 /// whenever the operator's state changes, allowing them to re-fetch and display 17 /// the updated state without polling or using libp2p. 18 /// </para> 19 /// </summary> 20 public sealed class OperatorUpdateHub 21 { 22 private readonly Dictionary<Guid, List<Channel<OperatorEvent>>> _subscriptions = new(); 23 private readonly object _subLock = new(); 24 25 /// <summary> 26 /// Publishes an operator event to all active subscribers for that operator. 27 /// Non-blocking; if a subscriber's channel buffer is full, the oldest buffered 28 /// message is dropped to make room (see <see cref="BoundedChannelFullMode.DropOldest"/>). 29 /// </summary> 30 public void Publish(OperatorEvent evt) 31 { 32 var id = evt.OperatorId.Value; 33 34 List<Channel<OperatorEvent>> snapshot; 35 lock (_subLock) 36 { 37 if (!_subscriptions.TryGetValue(id, out var list)) 38 return; 39 snapshot = [..list]; 40 } 41 42 foreach (var channel in snapshot) 43 { 44 // TryWrite is non-blocking; oldest buffered event is dropped when the channel is full 45 channel.Writer.TryWrite(evt); 46 } 47 } 48 49 /// <summary> 50 /// Subscribes to operator events for the given operator. 51 /// The returned async enumerable yields events until <paramref name="ct"/> is cancelled. 52 /// </summary> 53 public async IAsyncEnumerable<OperatorEvent> SubscribeAsync( 54 OperatorId operatorId, 55 [EnumeratorCancellation] CancellationToken ct) 56 { 57 var channel = Channel.CreateBounded<OperatorEvent>(new BoundedChannelOptions(50) 58 { 59 FullMode = BoundedChannelFullMode.DropOldest, 60 SingleReader = true, 61 SingleWriter = false 62 }); 63 64 var id = operatorId.Value; 65 lock (_subLock) 66 { 67 if (!_subscriptions.TryGetValue(id, out var list)) 68 { 69 list = []; 70 _subscriptions[id] = list; 71 } 72 list.Add(channel); 73 } 74 75 try 76 { 77 await foreach (var evt in channel.Reader.ReadAllAsync(ct)) 78 { 79 yield return evt; 80 } 81 } 82 finally 83 { 84 Unsubscribe(id, channel); 85 } 86 } 87 88 private void Unsubscribe(Guid operatorId, Channel<OperatorEvent> channel) 89 { 90 lock (_subLock) 91 { 92 if (_subscriptions.TryGetValue(operatorId, out var list)) 93 { 94 list.Remove(channel); 95 if (list.Count == 0) 96 _subscriptions.Remove(operatorId); 97 } 98 } 99 channel.Writer.TryComplete(); 100 } 101 }