/ src / modules / cmdpal / Core / Microsoft.CmdPal.Core.Common / Helpers / SupersedingAsyncValueGate`1.cs
SupersedingAsyncValueGate`1.cs
1 // Copyright (c) Microsoft Corporation 2 // The Microsoft Corporation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 namespace Microsoft.CmdPal.Core.Common.Helpers; 6 7 /// <summary> 8 /// An async gate that ensures only one value computation runs at a time. 9 /// If ExecuteAsync is called while already executing, it cancels the current computation 10 /// and starts the operation again (superseding behavior). 11 /// Once a value is successfully computed, it is applied (via the provided <see cref="Action{T}"/>). 12 /// The apply step uses its own lock so that long-running apply logic does not block the 13 /// computation / superseding pipeline, while still remaining serialized with respect to 14 /// other apply calls. 15 /// </summary> 16 /// <typeparam name="T">The type of the computed value.</typeparam> 17 public sealed partial class SupersedingAsyncValueGate<T> : IDisposable 18 { 19 private readonly Func<CancellationToken, Task<T>> _valueFactory; 20 private readonly Action<T> _apply; 21 private readonly Lock _lock = new(); // Controls scheduling / superseding 22 private readonly Lock _applyLock = new(); // Serializes application of results 23 private int _callId; 24 private TaskCompletionSource<T>? _currentTcs; 25 private CancellationTokenSource? _currentCancellationSource; 26 private Task? _executingTask; 27 28 public SupersedingAsyncValueGate( 29 Func<CancellationToken, Task<T>> valueFactory, 30 Action<T> apply) 31 { 32 ArgumentNullException.ThrowIfNull(valueFactory); 33 ArgumentNullException.ThrowIfNull(apply); 34 _valueFactory = valueFactory; 35 _apply = apply; 36 } 37 38 /// <summary> 39 /// Executes the configured value computation. If another execution is running, this call will 40 /// cancel the current execution and restart the computation. The returned task completes when 41 /// (and only if) the computation associated with this invocation completes (or is canceled / superseded). 42 /// </summary> 43 /// <param name="cancellationToken">Optional external cancellation token.</param> 44 /// <returns>The computed value for this invocation.</returns> 45 public async Task<T> ExecuteAsync(CancellationToken cancellationToken = default) 46 { 47 TaskCompletionSource<T> tcs; 48 49 lock (_lock) 50 { 51 // Supersede any in-flight computation. 52 _currentCancellationSource?.Cancel(); 53 _currentTcs?.TrySetException(new OperationCanceledException("Superseded by newer call")); 54 55 tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); 56 _currentTcs = tcs; 57 _callId++; 58 59 if (_executingTask is null) 60 { 61 _executingTask = Task.Run(ExecuteLoop, CancellationToken.None); 62 } 63 } 64 65 using var ctr = cancellationToken.Register(state => ((TaskCompletionSource<T>)state!).TrySetCanceled(cancellationToken), tcs); 66 return await tcs.Task.ConfigureAwait(false); 67 } 68 69 private async Task ExecuteLoop() 70 { 71 try 72 { 73 while (true) 74 { 75 TaskCompletionSource<T>? currentTcs; 76 CancellationTokenSource? currentCts; 77 int currentCallId; 78 79 lock (_lock) 80 { 81 currentTcs = _currentTcs; 82 currentCallId = _callId; 83 84 if (currentTcs is null) 85 { 86 break; // Nothing pending. 87 } 88 89 _currentCancellationSource?.Dispose(); 90 _currentCancellationSource = new(); 91 currentCts = _currentCancellationSource; 92 } 93 94 try 95 { 96 var value = await _valueFactory(currentCts.Token).ConfigureAwait(false); 97 CompleteSuccessIfCurrent(currentTcs, currentCallId, value); 98 } 99 catch (OperationCanceledException) 100 { 101 CompleteIfCurrent(currentTcs, currentCallId, t => t.TrySetCanceled(currentCts.Token)); 102 } 103 catch (Exception ex) 104 { 105 CompleteIfCurrent(currentTcs, currentCallId, t => t.TrySetException(ex)); 106 } 107 } 108 } 109 finally 110 { 111 lock (_lock) 112 { 113 _currentTcs = null; 114 _currentCancellationSource?.Dispose(); 115 _currentCancellationSource = null; 116 _executingTask = null; 117 } 118 } 119 } 120 121 private void CompleteSuccessIfCurrent(TaskCompletionSource<T> candidate, int id, T value) 122 { 123 var shouldApply = false; 124 lock (_lock) 125 { 126 if (_currentTcs == candidate && _callId == id) 127 { 128 // Mark as consumed so a new computation can start immediately. 129 _currentTcs = null; 130 shouldApply = true; 131 } 132 } 133 134 if (!shouldApply) 135 { 136 return; // Superseded meanwhile. 137 } 138 139 Exception? applyException = null; 140 try 141 { 142 lock (_applyLock) 143 { 144 _apply(value); 145 } 146 } 147 catch (Exception ex) 148 { 149 applyException = ex; 150 } 151 152 if (applyException is null) 153 { 154 candidate.TrySetResult(value); 155 } 156 else 157 { 158 candidate.TrySetException(applyException); 159 } 160 } 161 162 private void CompleteIfCurrent( 163 TaskCompletionSource<T> candidate, 164 int id, 165 Action<TaskCompletionSource<T>> complete) 166 { 167 lock (_lock) 168 { 169 if (_currentTcs == candidate && _callId == id) 170 { 171 complete(candidate); 172 _currentTcs = null; 173 } 174 } 175 } 176 177 public void Dispose() 178 { 179 lock (_lock) 180 { 181 _currentCancellationSource?.Cancel(); 182 _currentCancellationSource?.Dispose(); 183 _currentTcs?.TrySetException(new ObjectDisposedException(nameof(SupersedingAsyncValueGate<T>))); 184 _currentTcs = null; 185 } 186 187 GC.SuppressFinalize(this); 188 } 189 }