AsyncWorkQueue.cs
1 using System; 2 using System.Collections.Concurrent; 3 using System.Threading; 4 5 namespace Ryujinx.Common 6 { 7 public sealed class AsyncWorkQueue<T> : IDisposable 8 { 9 private readonly Thread _workerThread; 10 private readonly CancellationTokenSource _cts; 11 private readonly Action<T> _workerAction; 12 private readonly BlockingCollection<T> _queue; 13 14 public bool IsCancellationRequested => _cts.IsCancellationRequested; 15 16 public AsyncWorkQueue(Action<T> callback, string name = null) : this(callback, name, new BlockingCollection<T>()) 17 { 18 } 19 20 public AsyncWorkQueue(Action<T> callback, string name, BlockingCollection<T> collection) 21 { 22 _cts = new CancellationTokenSource(); 23 _queue = collection; 24 _workerAction = callback; 25 _workerThread = new Thread(DoWork) 26 { 27 Name = name, 28 IsBackground = true, 29 }; 30 _workerThread.Start(); 31 } 32 33 private void DoWork() 34 { 35 try 36 { 37 foreach (var item in _queue.GetConsumingEnumerable(_cts.Token)) 38 { 39 _workerAction(item); 40 } 41 } 42 catch (OperationCanceledException) 43 { 44 } 45 } 46 47 public void Cancel() 48 { 49 _cts.Cancel(); 50 } 51 52 public void CancelAfter(int millisecondsDelay) 53 { 54 _cts.CancelAfter(millisecondsDelay); 55 } 56 57 public void CancelAfter(TimeSpan delay) 58 { 59 _cts.CancelAfter(delay); 60 } 61 62 public void Add(T workItem) 63 { 64 _queue.Add(workItem); 65 } 66 67 public void Add(T workItem, CancellationToken cancellationToken) 68 { 69 _queue.Add(workItem, cancellationToken); 70 } 71 72 public bool TryAdd(T workItem) 73 { 74 return _queue.TryAdd(workItem); 75 } 76 77 public bool TryAdd(T workItem, int millisecondsDelay) 78 { 79 return _queue.TryAdd(workItem, millisecondsDelay); 80 } 81 82 public bool TryAdd(T workItem, int millisecondsDelay, CancellationToken cancellationToken) 83 { 84 return _queue.TryAdd(workItem, millisecondsDelay, cancellationToken); 85 } 86 87 public bool TryAdd(T workItem, TimeSpan timeout) 88 { 89 return _queue.TryAdd(workItem, timeout); 90 } 91 92 public void Dispose() 93 { 94 _queue.CompleteAdding(); 95 _cts.Cancel(); 96 _workerThread.Join(); 97 98 _queue.Dispose(); 99 _cts.Dispose(); 100 } 101 } 102 }