/ GUNRPG.Application / Sessions / CombatSessionUpdateHub.cs
CombatSessionUpdateHub.cs
 1  using System.Runtime.CompilerServices;
 2  using System.Threading.Channels;
 3  
 4  namespace GUNRPG.Application.Sessions;
 5  
 6  /// <summary>
 7  /// In-process pub/sub hub for real-time combat session state change notifications.
 8  /// <para>
 9  /// The server publishes a notification each time a combat session is mutated
10  /// (intent submission, advance, pet action), allowing connected clients to
11  /// re-fetch the latest session state without polling.
12  /// </para>
13  /// <para>
14  /// HTTP clients subscribe via the SSE endpoint and receive a notification
15  /// whenever the session state changes, enabling cross-client real-time
16  /// synchronisation without requiring libp2p on the client.
17  /// </para>
18  /// </summary>
19  public sealed class CombatSessionUpdateHub
20  {
21      private readonly Dictionary<Guid, List<Channel<Guid>>> _subscriptions = new();
22      private readonly object _subLock = new();
23  
24      /// <summary>
25      /// Publishes a notification to all active subscribers for the given session.
26      /// Non-blocking; if a subscriber's channel buffer is full, the oldest buffered
27      /// message is dropped to make room (see <see cref="BoundedChannelFullMode.DropOldest"/>).
28      /// </summary>
29      public void Publish(Guid sessionId)
30      {
31          List<Channel<Guid>> snapshot;
32          lock (_subLock)
33          {
34              if (!_subscriptions.TryGetValue(sessionId, out var list))
35                  return;
36              snapshot = [..list];
37          }
38  
39          foreach (var channel in snapshot)
40          {
41              channel.Writer.TryWrite(sessionId);
42          }
43      }
44  
45      /// <summary>
46      /// Subscribes to state-change notifications for the given combat session.
47      /// The returned async enumerable yields the session ID on each change until
48      /// <paramref name="ct"/> is cancelled.
49      /// </summary>
50      public async IAsyncEnumerable<Guid> SubscribeAsync(
51          Guid sessionId,
52          [EnumeratorCancellation] CancellationToken ct)
53      {
54          var channel = Channel.CreateBounded<Guid>(new BoundedChannelOptions(50)
55          {
56              FullMode = BoundedChannelFullMode.DropOldest,
57              SingleReader = true,
58              SingleWriter = false
59          });
60  
61          lock (_subLock)
62          {
63              if (!_subscriptions.TryGetValue(sessionId, out var list))
64              {
65                  list = [];
66                  _subscriptions[sessionId] = list;
67              }
68              list.Add(channel);
69          }
70  
71          try
72          {
73              await foreach (var id in channel.Reader.ReadAllAsync(ct))
74              {
75                  yield return id;
76              }
77          }
78          finally
79          {
80              Unsubscribe(sessionId, channel);
81          }
82      }
83  
84      private void Unsubscribe(Guid sessionId, Channel<Guid> channel)
85      {
86          lock (_subLock)
87          {
88              if (_subscriptions.TryGetValue(sessionId, out var list))
89              {
90                  list.Remove(channel);
91                  if (list.Count == 0)
92                      _subscriptions.Remove(sessionId);
93              }
94          }
95          channel.Writer.TryComplete();
96      }
97  }