LoadBalancer.cs
1 using Logging; 2 3 namespace AutoClient 4 { 5 public class LoadBalancer 6 { 7 private readonly List<Cdx> instances; 8 private readonly object instanceLock = new object(); 9 10 private class Cdx 11 { 12 private readonly ILog log; 13 private readonly CodexWrapper instance; 14 private readonly List<Action<CodexWrapper>> queue = new List<Action<CodexWrapper>>(); 15 private readonly object queueLock = new object(); 16 private bool running = true; 17 private Task worker = Task.CompletedTask; 18 19 public Cdx(App app, CodexWrapper instance) 20 { 21 Id = instance.Node.GetName(); 22 log = new LogPrefixer(app.Log, $"[Queue-{Id}]"); 23 this.instance = instance; 24 } 25 26 public string Id { get; } 27 28 public void Start() 29 { 30 worker = Task.Run(Worker); 31 } 32 33 public void Stop() 34 { 35 running = false; 36 worker.Wait(); 37 } 38 39 public void CheckErrors() 40 { 41 if (worker.IsFaulted) throw worker.Exception; 42 } 43 44 public void Queue(Action<CodexWrapper> action) 45 { 46 if (queue.Count > 2) log.Log("Queue full. Waiting..."); 47 while (queue.Count > 2) 48 { 49 Thread.Sleep(TimeSpan.FromSeconds(5.0)); 50 } 51 52 lock (queueLock) 53 { 54 queue.Add(action); 55 } 56 } 57 58 private void Worker() 59 { 60 try 61 { 62 while (running) 63 { 64 while (queue.Count == 0) Thread.Sleep(TimeSpan.FromSeconds(5.0)); 65 66 Action<CodexWrapper> action = w => { }; 67 lock (queueLock) 68 { 69 action = queue[0]; 70 queue.RemoveAt(0); 71 } 72 73 action(instance); 74 } 75 } 76 catch (Exception ex) 77 { 78 log.Error("Exception in worker: " + ex); 79 throw; 80 } 81 } 82 } 83 84 public LoadBalancer(App app, CodexWrapper[] instances) 85 { 86 this.instances = instances.Select(i => new Cdx(app, i)).ToList(); 87 } 88 89 public void Start() 90 { 91 foreach (var i in instances) i.Start(); 92 } 93 94 public void Stop() 95 { 96 foreach (var i in instances) i.Stop(); 97 } 98 99 public void DispatchOnCodex(Action<CodexWrapper> action) 100 { 101 lock (instanceLock) 102 { 103 var i = instances.First(); 104 instances.RemoveAt(0); 105 instances.Add(i); 106 107 i.Queue(action); 108 } 109 } 110 111 public void DispatchOnSpecificCodex(Action<CodexWrapper> action, string id) 112 { 113 lock (instanceLock) 114 { 115 var i = instances.Single(a => a.Id == id); 116 instances.Remove(i); 117 instances.Add(i); 118 119 i.Queue(action); 120 } 121 } 122 123 public void CheckErrors() 124 { 125 lock (instanceLock) 126 { 127 foreach (var i in instances) i.CheckErrors(); 128 } 129 } 130 } 131 }