/ GUNRPG.Application / Distributed / OperatorUpdateHub.cs
OperatorUpdateHub.cs
  1  using System.Runtime.CompilerServices;
  2  using System.Threading.Channels;
  3  using GUNRPG.Core.Operators;
  4  
  5  namespace GUNRPG.Application.Distributed;
  6  
  7  /// <summary>
  8  /// In-process pub/sub hub for real-time operator state change notifications.
  9  /// <para>
 10  /// The server publishes a notification each time an operator event is appended
 11  /// (either locally via <see cref="Operators.OperatorExfilService"/> or via peer
 12  /// replication in <see cref="OperatorEventReplicator"/>).
 13  /// </para>
 14  /// <para>
 15  /// HTTP clients subscribe via the SSE endpoint and receive a notification
 16  /// whenever the operator's state changes, allowing them to re-fetch and display
 17  /// the updated state without polling or using libp2p.
 18  /// </para>
 19  /// </summary>
 20  public sealed class OperatorUpdateHub
 21  {
 22      private readonly Dictionary<Guid, List<Channel<OperatorEvent>>> _subscriptions = new();
 23      private readonly object _subLock = new();
 24  
 25      /// <summary>
 26      /// Publishes an operator event to all active subscribers for that operator.
 27      /// Non-blocking; if a subscriber's channel buffer is full, the oldest buffered
 28      /// message is dropped to make room (see <see cref="BoundedChannelFullMode.DropOldest"/>).
 29      /// </summary>
 30      public void Publish(OperatorEvent evt)
 31      {
 32          var id = evt.OperatorId.Value;
 33  
 34          List<Channel<OperatorEvent>> snapshot;
 35          lock (_subLock)
 36          {
 37              if (!_subscriptions.TryGetValue(id, out var list))
 38                  return;
 39              snapshot = [..list];
 40          }
 41  
 42          foreach (var channel in snapshot)
 43          {
 44              // TryWrite is non-blocking; oldest buffered event is dropped when the channel is full
 45              channel.Writer.TryWrite(evt);
 46          }
 47      }
 48  
 49      /// <summary>
 50      /// Subscribes to operator events for the given operator.
 51      /// The returned async enumerable yields events until <paramref name="ct"/> is cancelled.
 52      /// </summary>
 53      public async IAsyncEnumerable<OperatorEvent> SubscribeAsync(
 54          OperatorId operatorId,
 55          [EnumeratorCancellation] CancellationToken ct)
 56      {
 57          var channel = Channel.CreateBounded<OperatorEvent>(new BoundedChannelOptions(50)
 58          {
 59              FullMode = BoundedChannelFullMode.DropOldest,
 60              SingleReader = true,
 61              SingleWriter = false
 62          });
 63  
 64          var id = operatorId.Value;
 65          lock (_subLock)
 66          {
 67              if (!_subscriptions.TryGetValue(id, out var list))
 68              {
 69                  list = [];
 70                  _subscriptions[id] = list;
 71              }
 72              list.Add(channel);
 73          }
 74  
 75          try
 76          {
 77              await foreach (var evt in channel.Reader.ReadAllAsync(ct))
 78              {
 79                  yield return evt;
 80              }
 81          }
 82          finally
 83          {
 84              Unsubscribe(id, channel);
 85          }
 86      }
 87  
 88      private void Unsubscribe(Guid operatorId, Channel<OperatorEvent> channel)
 89      {
 90          lock (_subLock)
 91          {
 92              if (_subscriptions.TryGetValue(operatorId, out var list))
 93              {
 94                  list.Remove(channel);
 95                  if (list.Count == 0)
 96                      _subscriptions.Remove(operatorId);
 97              }
 98          }
 99          channel.Writer.TryComplete();
100      }
101  }