BucketSet.cs
  1  using Logging;
  2  using System.Collections.Concurrent;
  3  
  4  namespace OverwatchTranscript
  5  {
  6      public class BucketSet
  7      {
  8          private const int numberOfActiveBuckets = 10;
  9          private readonly ILog log;
 10          private readonly string workingDir;
 11          private readonly object _bucketLock = new object();
 12          private readonly List<EventBucketWriter> fullBuckets = new List<EventBucketWriter>();
 13          private readonly List<EventBucketWriter> activeBuckets = new List<EventBucketWriter>();
 14          private readonly ActionQueue queue = new ActionQueue();
 15          private int activeBucketIndex = 0;
 16          private bool closed = false;
 17          private string internalErrors = string.Empty;
 18          
 19          public BucketSet(ILog log, string workingDir)
 20          {
 21              this.log = log;
 22              this.workingDir = workingDir;
 23  
 24              for (var i = 0; i < numberOfActiveBuckets;i++)
 25              {
 26                  AddNewBucket();
 27              }
 28  
 29              queue.Start();
 30          }
 31  
 32          public void Add(DateTime utc, object payload)
 33          {
 34              if (closed) throw new Exception("Buckets already closed!");
 35              queue.Add(() => AddInternal(utc, payload));
 36              
 37              if (queue.Count > 1000)
 38              {
 39                  Thread.Sleep(1);
 40              }
 41          }
 42  
 43          public IFinalizedBucket[] FinalizeBuckets()
 44          {
 45              closed = true;
 46              queue.StopAndJoin();
 47  
 48              if (IsEmpty()) throw new Exception("No entries have been added.");
 49              if (!string.IsNullOrEmpty(internalErrors)) throw new Exception(internalErrors);
 50  
 51              var buckets = fullBuckets.Concat(activeBuckets).ToArray();
 52              log.Debug($"Finalizing {buckets.Length} buckets...");
 53  
 54              var finalized = new ConcurrentBag<IFinalizedBucket>();
 55              var tasks = Parallel.ForEach(buckets, b => finalized.Add(b.FinalizeBucket()));
 56              if (!tasks.IsCompleted) throw new Exception("Failed to finalize buckets: " + tasks);
 57  
 58              return finalized.ToArray();
 59          }
 60  
 61          private bool IsEmpty()
 62          {
 63              return fullBuckets.All(b => b.Count == 0) && activeBuckets.All(b => b.Count == 0);
 64          }
 65  
 66          private void AddInternal(DateTime utc, object payload)
 67          {
 68              try
 69              {
 70                  lock (_bucketLock)
 71                  {
 72                      var current = activeBuckets[activeBucketIndex];
 73                      current.Add(utc, payload);
 74                      activeBucketIndex = (activeBucketIndex + 1) % numberOfActiveBuckets;
 75  
 76                      if (current.IsFull)
 77                      {
 78                          log.Debug("Bucket is full. New bucket...");
 79                          fullBuckets.Add(current);
 80                          activeBuckets.Remove(current);
 81                          AddNewBucket();
 82                      }
 83                  }
 84              }
 85              catch (Exception ex)
 86              {
 87                  internalErrors += ex.ToString();
 88                  log.Error(ex.ToString());
 89              }
 90          }
 91  
 92          private static int bucketSizeIndex = 0;
 93          private static int[] bucketSizes = new[]
 94          {
 95              10000,
 96              15000,
 97              20000,
 98          };
 99  
100          private void AddNewBucket()
101          {
102              lock (_bucketLock)
103              {
104                  var size = bucketSizes[bucketSizeIndex];
105                  bucketSizeIndex = (bucketSizeIndex + 1) % bucketSizes.Length;
106                  activeBuckets.Add(new EventBucketWriter(log, Path.Combine(workingDir, Guid.NewGuid().ToString()), size));
107              }
108          }
109      }
110  }