/ src / modules / cmdpal / Core / Microsoft.CmdPal.Core.Common / Helpers / SupersedingAsyncValueGate`1.cs
SupersedingAsyncValueGate`1.cs
  1  // Copyright (c) Microsoft Corporation
  2  // The Microsoft Corporation licenses this file to you under the MIT license.
  3  // See the LICENSE file in the project root for more information.
  4  
  5  namespace Microsoft.CmdPal.Core.Common.Helpers;
  6  
  7  /// <summary>
  8  /// An async gate that ensures only one value computation runs at a time.
  9  /// If ExecuteAsync is called while already executing, it cancels the current computation
 10  /// and starts the operation again (superseding behavior).
 11  /// Once a value is successfully computed, it is applied (via the provided <see cref="Action{T}"/>).
 12  /// The apply step uses its own lock so that long-running apply logic does not block the
 13  /// computation / superseding pipeline, while still remaining serialized with respect to
 14  /// other apply calls.
 15  /// </summary>
 16  /// <typeparam name="T">The type of the computed value.</typeparam>
 17  public sealed partial class SupersedingAsyncValueGate<T> : IDisposable
 18  {
 19      private readonly Func<CancellationToken, Task<T>> _valueFactory;
 20      private readonly Action<T> _apply;
 21      private readonly Lock _lock = new();              // Controls scheduling / superseding
 22      private readonly Lock _applyLock = new();         // Serializes application of results
 23      private int _callId;
 24      private TaskCompletionSource<T>? _currentTcs;
 25      private CancellationTokenSource? _currentCancellationSource;
 26      private Task? _executingTask;
 27  
 28      public SupersedingAsyncValueGate(
 29          Func<CancellationToken, Task<T>> valueFactory,
 30          Action<T> apply)
 31      {
 32          ArgumentNullException.ThrowIfNull(valueFactory);
 33          ArgumentNullException.ThrowIfNull(apply);
 34          _valueFactory = valueFactory;
 35          _apply = apply;
 36      }
 37  
 38      /// <summary>
 39      /// Executes the configured value computation. If another execution is running, this call will
 40      /// cancel the current execution and restart the computation. The returned task completes when
 41      /// (and only if) the computation associated with this invocation completes (or is canceled / superseded).
 42      /// </summary>
 43      /// <param name="cancellationToken">Optional external cancellation token.</param>
 44      /// <returns>The computed value for this invocation.</returns>
 45      public async Task<T> ExecuteAsync(CancellationToken cancellationToken = default)
 46      {
 47          TaskCompletionSource<T> tcs;
 48  
 49          lock (_lock)
 50          {
 51              // Supersede any in-flight computation.
 52              _currentCancellationSource?.Cancel();
 53              _currentTcs?.TrySetException(new OperationCanceledException("Superseded by newer call"));
 54  
 55              tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 56              _currentTcs = tcs;
 57              _callId++;
 58  
 59              if (_executingTask is null)
 60              {
 61                  _executingTask = Task.Run(ExecuteLoop, CancellationToken.None);
 62              }
 63          }
 64  
 65          using var ctr = cancellationToken.Register(state => ((TaskCompletionSource<T>)state!).TrySetCanceled(cancellationToken), tcs);
 66          return await tcs.Task.ConfigureAwait(false);
 67      }
 68  
 69      private async Task ExecuteLoop()
 70      {
 71          try
 72          {
 73              while (true)
 74              {
 75                  TaskCompletionSource<T>? currentTcs;
 76                  CancellationTokenSource? currentCts;
 77                  int currentCallId;
 78  
 79                  lock (_lock)
 80                  {
 81                      currentTcs = _currentTcs;
 82                      currentCallId = _callId;
 83  
 84                      if (currentTcs is null)
 85                      {
 86                          break; // Nothing pending.
 87                      }
 88  
 89                      _currentCancellationSource?.Dispose();
 90                      _currentCancellationSource = new();
 91                      currentCts = _currentCancellationSource;
 92                  }
 93  
 94                  try
 95                  {
 96                      var value = await _valueFactory(currentCts.Token).ConfigureAwait(false);
 97                      CompleteSuccessIfCurrent(currentTcs, currentCallId, value);
 98                  }
 99                  catch (OperationCanceledException)
100                  {
101                      CompleteIfCurrent(currentTcs, currentCallId, t => t.TrySetCanceled(currentCts.Token));
102                  }
103                  catch (Exception ex)
104                  {
105                      CompleteIfCurrent(currentTcs, currentCallId, t => t.TrySetException(ex));
106                  }
107              }
108          }
109          finally
110          {
111              lock (_lock)
112              {
113                  _currentTcs = null;
114                  _currentCancellationSource?.Dispose();
115                  _currentCancellationSource = null;
116                  _executingTask = null;
117              }
118          }
119      }
120  
121      private void CompleteSuccessIfCurrent(TaskCompletionSource<T> candidate, int id, T value)
122      {
123          var shouldApply = false;
124          lock (_lock)
125          {
126              if (_currentTcs == candidate && _callId == id)
127              {
128                  // Mark as consumed so a new computation can start immediately.
129                  _currentTcs = null;
130                  shouldApply = true;
131              }
132          }
133  
134          if (!shouldApply)
135          {
136              return; // Superseded meanwhile.
137          }
138  
139          Exception? applyException = null;
140          try
141          {
142              lock (_applyLock)
143              {
144                  _apply(value);
145              }
146          }
147          catch (Exception ex)
148          {
149              applyException = ex;
150          }
151  
152          if (applyException is null)
153          {
154              candidate.TrySetResult(value);
155          }
156          else
157          {
158              candidate.TrySetException(applyException);
159          }
160      }
161  
162      private void CompleteIfCurrent(
163          TaskCompletionSource<T> candidate,
164          int id,
165          Action<TaskCompletionSource<T>> complete)
166      {
167          lock (_lock)
168          {
169              if (_currentTcs == candidate && _callId == id)
170              {
171                  complete(candidate);
172                  _currentTcs = null;
173              }
174          }
175      }
176  
177      public void Dispose()
178      {
179          lock (_lock)
180          {
181              _currentCancellationSource?.Cancel();
182              _currentCancellationSource?.Dispose();
183              _currentTcs?.TrySetException(new ObjectDisposedException(nameof(SupersedingAsyncValueGate<T>)));
184              _currentTcs = null;
185          }
186  
187          GC.SuppressFinalize(this);
188      }
189  }