BucketSet.cs
1 using Logging; 2 using System.Collections.Concurrent; 3 4 namespace OverwatchTranscript 5 { 6 public class BucketSet 7 { 8 private const int numberOfActiveBuckets = 10; 9 private readonly ILog log; 10 private readonly string workingDir; 11 private readonly object _bucketLock = new object(); 12 private readonly List<EventBucketWriter> fullBuckets = new List<EventBucketWriter>(); 13 private readonly List<EventBucketWriter> activeBuckets = new List<EventBucketWriter>(); 14 private readonly ActionQueue queue = new ActionQueue(); 15 private int activeBucketIndex = 0; 16 private bool closed = false; 17 private string internalErrors = string.Empty; 18 19 public BucketSet(ILog log, string workingDir) 20 { 21 this.log = log; 22 this.workingDir = workingDir; 23 24 for (var i = 0; i < numberOfActiveBuckets;i++) 25 { 26 AddNewBucket(); 27 } 28 29 queue.Start(); 30 } 31 32 public void Add(DateTime utc, object payload) 33 { 34 if (closed) throw new Exception("Buckets already closed!"); 35 queue.Add(() => AddInternal(utc, payload)); 36 37 if (queue.Count > 1000) 38 { 39 Thread.Sleep(1); 40 } 41 } 42 43 public IFinalizedBucket[] FinalizeBuckets() 44 { 45 closed = true; 46 queue.StopAndJoin(); 47 48 if (IsEmpty()) throw new Exception("No entries have been added."); 49 if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors); 50 51 var buckets = fullBuckets.Concat(activeBuckets).ToArray(); 52 log.Debug($"Finalizing {buckets.Length} buckets..."); 53 54 var finalized = new ConcurrentBag<IFinalizedBucket>(); 55 var tasks = Parallel.ForEach(buckets, b => finalized.Add(b.FinalizeBucket())); 56 if (!tasks.IsCompleted) throw new Exception("Failed to finalize buckets: " + tasks); 57 58 return finalized.ToArray(); 59 } 60 61 private bool IsEmpty() 62 { 63 return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0); 64 } 65 66 private void AddInternal(DateTime utc, object payload) 67 { 68 try 69 { 70 lock (_bucketLock) 71 { 72 var current = activeBuckets[activeBucketIndex]; 73 current.Add(utc, payload); 74 activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets; 75 76 if (current.IsFull) 77 { 78 log.Debug("Bucket is full. New bucket..."); 79 fullBuckets.Add(current); 80 activeBuckets.Remove(current); 81 AddNewBucket(); 82 } 83 } 84 } 85 catch (Exception ex) 86 { 87 internalErrors += ex.ToString(); 88 log.Error(ex.ToString()); 89 } 90 } 91 92 private static int bucketSizeIndex = 0; 93 private static int[] bucketSizes = new[] 94 { 95 10000, 96 15000, 97 20000, 98 }; 99 100 private void AddNewBucket() 101 { 102 lock (_bucketLock) 103 { 104 var size = bucketSizes[bucketSizeIndex]; 105 bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length; 106 activeBuckets.Add(new EventBucketWriter(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size)); 107 } 108 } 109 } 110 }