ElasticSearchLogDownloader.cs
1 using System.Text; 2 using Core; 3 using Logging; 4 using Utils; 5 using WebUtils; 6 7 namespace TraceContract 8 { 9 public class ElasticSearchLogDownloader 10 { 11 private readonly ILog log; 12 private readonly IPluginTools tools; 13 private readonly Config config; 14 15 public ElasticSearchLogDownloader(ILog log, IPluginTools tools, Config config) 16 { 17 this.log = log; 18 this.tools = tools; 19 this.config = config; 20 } 21 22 public void Download(LogFile targetFile, string podName, DateTime startUtc, DateTime endUtc) 23 { 24 try 25 { 26 DownloadLog(targetFile, podName, startUtc, endUtc); 27 } 28 catch (Exception ex) 29 { 30 log.Error("Failed to download log: " + ex); 31 } 32 } 33 34 private void DownloadLog(LogFile targetFile, string podName, DateTime startUtc, DateTime endUtc) 35 { 36 log.Log($"Downloading log (from ElasticSearch) for pod '{podName}' within time range: " + 37 $"{startUtc.ToString("o")} - {endUtc.ToString("o")}"); 38 39 var endpoint = CreateElasticSearchEndpoint(); 40 var queryTemplate = CreateQueryTemplate(podName, startUtc, endUtc); 41 42 targetFile.Write($"Downloading '{podName}' to '{targetFile.Filename}'."); 43 var reconstructor = new LogReconstructor(log, targetFile, endpoint, queryTemplate); 44 reconstructor.DownloadFullLog(); 45 46 log.Log("Log download finished."); 47 } 48 49 private string CreateQueryTemplate(string podName, DateTime startUtc, DateTime endUtc) 50 { 51 var start = startUtc.ToString("o"); 52 var end = endUtc.ToString("o"); 53 54 //container_name : codex3-5 - deploymentName as stored in pod 55 // pod_namespace : codex - continuous - nolimits - tests - 1 56 57 //var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": <SIZE>, <SEARCHAFTER> \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"<STARTTIME>\", \"lte\": \"<ENDTIME>\" } } }, { \"match_phrase\": { \"pod_name\": \"<PODNAME>\" } } ] } } }"; 58 var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"message\" } ], \"size\": <SIZE>, <SEARCHAFTER> \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"<STARTTIME>\", \"lte\": \"<ENDTIME>\" } } }, { \"match_phrase\": { \"pod_name\": \"<PODNAME>\" } } ] } } }"; 59 return source 60 .Replace("<STARTTIME>", start) 61 .Replace("<ENDTIME>", end) 62 .Replace("<PODNAME>", podName); 63 //.Replace("<NAMESPACENAME>", config.StorageNodesKubernetesNamespace); 64 } 65 66 private IEndpoint CreateElasticSearchEndpoint() 67 { 68 //var serviceName = "elasticsearch"; 69 //var k8sNamespace = "monitoring"; 70 //var address = new Address("ElasticSearchEndpoint", $"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200); 71 72 var address = new Address("TestnetElasticSearchEndpoint", config.ElasticSearchUrl, 443); 73 var baseUrl = ""; 74 75 var username = config.GetElasticSearchUsername(); 76 var password = config.GetElasticSearchPassword(); 77 78 var base64Creds = Convert.ToBase64String( 79 Encoding.ASCII.GetBytes($"{username}:{password}") 80 ); 81 82 var http = tools.CreateHttp(address.ToString(), client => 83 { 84 client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting"); 85 client.DefaultRequestHeaders.Add("Authorization", "Basic " + base64Creds); 86 }); 87 88 return http.CreateEndpoint(address, baseUrl); 89 } 90 91 public class LogReconstructor 92 { 93 private readonly List<LogQueueEntry> queue = new List<LogQueueEntry>(); 94 private readonly ILog log; 95 private readonly LogFile targetFile; 96 private readonly IEndpoint endpoint; 97 private readonly string queryTemplate; 98 private const int sizeOfPage = 2000; 99 private string searchAfter = ""; 100 private int lastHits = 1; 101 private ulong? lastLogLine; 102 private uint linesWritten = 0; 103 104 public LogReconstructor(ILog log, LogFile targetFile, IEndpoint endpoint, string queryTemplate) 105 { 106 this.log = log; 107 this.targetFile = targetFile; 108 this.endpoint = endpoint; 109 this.queryTemplate = queryTemplate; 110 } 111 112 public void DownloadFullLog() 113 { 114 while (lastHits > 0) 115 { 116 QueryElasticSearch(); 117 ProcessQueue(); 118 } 119 120 log.Log($"{linesWritten} lines written."); 121 } 122 123 private void QueryElasticSearch() 124 { 125 var query = queryTemplate 126 .Replace("<SIZE>", sizeOfPage.ToString()) 127 .Replace("<SEARCHAFTER>", searchAfter); 128 129 var response = endpoint.HttpPostString<SearchResponse>("/_search", query); 130 131 lastHits = response.hits.hits.Length; 132 log.Log($"pageSize: {sizeOfPage} after: {searchAfter} -> {lastHits} hits"); 133 134 if (lastHits > 0) 135 { 136 UpdateSearchAfter(response); 137 foreach (var hit in response.hits.hits) 138 { 139 AddHitToQueue(hit); 140 } 141 } 142 } 143 144 private void AddHitToQueue(SearchHitEntry hit) 145 { 146 var message = hit.fields.message.Single(); 147 var number = ParseCountNumber(message); 148 if (number != null) 149 { 150 queue.Add(new LogQueueEntry(message, number.Value)); 151 } 152 } 153 154 private ulong? ParseCountNumber(string message) 155 { 156 if (string.IsNullOrEmpty(message)) return null; 157 var tokens = message.Split(' ', StringSplitOptions.RemoveEmptyEntries); 158 if (!tokens.Any()) return null; 159 var countToken = tokens.SingleOrDefault(t => t.StartsWith("count=")); 160 if (countToken == null) return null; 161 var number = countToken.Substring(6); 162 if (ulong.TryParse(number, out ulong value)) 163 { 164 return value; 165 } 166 return null; 167 } 168 169 private void UpdateSearchAfter(SearchResponse response) 170 { 171 var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList(); 172 uniqueSearchNumbers.Reverse(); 173 174 var searchNumber = GetSearchNumber(uniqueSearchNumbers); 175 searchAfter = $"\"search_after\": [{searchNumber}],"; 176 } 177 178 private long GetSearchNumber(List<long> uniqueSearchNumbers) 179 { 180 if (uniqueSearchNumbers.Count == 1) return uniqueSearchNumbers.First(); 181 return uniqueSearchNumbers.Skip(1).First(); 182 } 183 184 private void ProcessQueue() 185 { 186 if (lastLogLine == null && queue.Any()) 187 { 188 lastLogLine = queue.Min(q => q.Number) - 1; 189 } 190 191 while (queue.Any()) 192 { 193 ulong wantedNumber = lastLogLine.Value + 1; 194 195 DeleteOldEntries(wantedNumber); 196 197 var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber); 198 199 if (currentEntry != null) 200 { 201 WriteEntryToFile(currentEntry); 202 queue.Remove(currentEntry); 203 lastLogLine = currentEntry.Number; 204 } 205 else 206 { 207 // The line number we want is not in the queue. 208 // It will be returned by the elastic search query, some time in the future. 209 // Stop processing the queue for now. 210 return; 211 } 212 } 213 } 214 215 private void WriteEntryToFile(LogQueueEntry currentEntry) 216 { 217 targetFile.Write(currentEntry.Message); 218 linesWritten++; 219 } 220 221 private void DeleteOldEntries(ulong wantedNumber) 222 { 223 queue.RemoveAll(e => e.Number < wantedNumber); 224 } 225 226 public class LogQueueEntry 227 { 228 public LogQueueEntry(string message, ulong number) 229 { 230 Message = message; 231 Number = number; 232 } 233 234 public string Message { get; } 235 public ulong Number { get; } 236 } 237 238 public class SearchResponse 239 { 240 public SearchHits hits { get; set; } = new SearchHits(); 241 } 242 243 public class SearchHits 244 { 245 public SearchHitEntry[] hits { get; set; } = Array.Empty<SearchHitEntry>(); 246 } 247 248 public class SearchHitEntry 249 { 250 public SearchHitFields fields { get; set; } = new SearchHitFields(); 251 public long[] sort { get; set; } = Array.Empty<long>(); 252 } 253 254 public class SearchHitFields 255 { 256 public string[] @timestamp { get; set; } = Array.Empty<string>(); 257 public string[] message { get; set; } = Array.Empty<string>(); 258 } 259 } 260 } 261 }