/ Tools / TraceContract / ElasticSearchLogDownloader.cs
ElasticSearchLogDownloader.cs
  1  using System.Text;
  2  using Core;
  3  using Logging;
  4  using Utils;
  5  using WebUtils;
  6  
  7  namespace TraceContract
  8  {
  9      public class ElasticSearchLogDownloader
 10      {
 11          private readonly ILog log;
 12          private readonly IPluginTools tools;
 13          private readonly Config config;
 14  
 15          public ElasticSearchLogDownloader(ILog log, IPluginTools tools, Config config)
 16          {
 17              this.log = log;
 18              this.tools = tools;
 19              this.config = config;
 20          }
 21  
 22          public void Download(LogFile targetFile, string podName, DateTime startUtc, DateTime endUtc)
 23          {
 24              try
 25              {
 26                  DownloadLog(targetFile, podName, startUtc, endUtc);
 27              }
 28              catch (Exception ex)
 29              {
 30                  log.Error("Failed to download log: " + ex);
 31              }
 32          }
 33  
 34          private void DownloadLog(LogFile targetFile, string podName, DateTime startUtc, DateTime endUtc)
 35          {
 36              log.Log($"Downloading log (from ElasticSearch) for pod '{podName}' within time range: " +
 37                  $"{startUtc.ToString("o")} - {endUtc.ToString("o")}");
 38  
 39              var endpoint = CreateElasticSearchEndpoint();
 40              var queryTemplate = CreateQueryTemplate(podName, startUtc, endUtc);
 41  
 42              targetFile.Write($"Downloading '{podName}' to '{targetFile.Filename}'.");
 43              var reconstructor = new LogReconstructor(log, targetFile, endpoint, queryTemplate);
 44              reconstructor.DownloadFullLog();
 45  
 46              log.Log("Log download finished.");
 47          }
 48  
 49          private string CreateQueryTemplate(string podName, DateTime startUtc, DateTime endUtc)
 50          {
 51              var start = startUtc.ToString("o");
 52              var end = endUtc.ToString("o");
 53  
 54              //container_name : codex3-5 - deploymentName as stored in pod
 55              // pod_namespace : codex - continuous - nolimits - tests - 1
 56  
 57              //var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"pod_name\" }, { \"field\": \"message\" } ], \"size\": <SIZE>, <SEARCHAFTER> \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"<STARTTIME>\", \"lte\": \"<ENDTIME>\" } } }, { \"match_phrase\": { \"pod_name\": \"<PODNAME>\" } } ] } } }";
 58              var source = "{ \"sort\": [ { \"@timestamp\": { \"order\": \"asc\" } } ], \"fields\": [ { \"field\": \"@timestamp\", \"format\": \"strict_date_optional_time\" }, { \"field\": \"message\" } ], \"size\": <SIZE>, <SEARCHAFTER> \"_source\": false, \"query\": { \"bool\": { \"must\": [], \"filter\": [ { \"range\": { \"@timestamp\": { \"format\": \"strict_date_optional_time\", \"gte\": \"<STARTTIME>\", \"lte\": \"<ENDTIME>\" } } }, { \"match_phrase\": { \"pod_name\": \"<PODNAME>\" } } ] } } }";
 59              return source
 60                  .Replace("<STARTTIME>", start)
 61                  .Replace("<ENDTIME>", end)
 62                  .Replace("<PODNAME>", podName);
 63                  //.Replace("<NAMESPACENAME>", config.StorageNodesKubernetesNamespace);
 64          }
 65  
 66          private IEndpoint CreateElasticSearchEndpoint()
 67          {
 68              //var serviceName = "elasticsearch";
 69              //var k8sNamespace = "monitoring";
 70              //var address = new Address("ElasticSearchEndpoint", $"http://{serviceName}.{k8sNamespace}.svc.cluster.local", 9200);
 71  
 72              var address = new Address("TestnetElasticSearchEndpoint", config.ElasticSearchUrl, 443);
 73              var baseUrl = "";
 74  
 75              var username = config.GetElasticSearchUsername();
 76              var password = config.GetElasticSearchPassword();
 77  
 78              var base64Creds = Convert.ToBase64String(
 79                  Encoding.ASCII.GetBytes($"{username}:{password}")
 80              );
 81  
 82              var http = tools.CreateHttp(address.ToString(), client =>
 83              {
 84                  client.DefaultRequestHeaders.Add("kbn-xsrf", "reporting");
 85                  client.DefaultRequestHeaders.Add("Authorization", "Basic " + base64Creds);
 86              });
 87  
 88              return http.CreateEndpoint(address, baseUrl);
 89          }
 90  
 91          public class LogReconstructor
 92          {
 93              private readonly List<LogQueueEntry> queue = new List<LogQueueEntry>();
 94              private readonly ILog log;
 95              private readonly LogFile targetFile;
 96              private readonly IEndpoint endpoint;
 97              private readonly string queryTemplate;
 98              private const int sizeOfPage = 2000;
 99              private string searchAfter = "";
100              private int lastHits = 1;
101              private ulong? lastLogLine;
102              private uint linesWritten = 0;
103  
104              public LogReconstructor(ILog log, LogFile targetFile, IEndpoint endpoint, string queryTemplate)
105              {
106                  this.log = log;
107                  this.targetFile = targetFile;
108                  this.endpoint = endpoint;
109                  this.queryTemplate = queryTemplate;
110              }
111  
112              public void DownloadFullLog()
113              {
114                  while (lastHits > 0)
115                  {
116                      QueryElasticSearch();
117                      ProcessQueue();
118                  }
119  
120                  log.Log($"{linesWritten} lines written.");
121              }
122  
123              private void QueryElasticSearch()
124              {
125                  var query = queryTemplate
126                                  .Replace("<SIZE>", sizeOfPage.ToString())
127                                  .Replace("<SEARCHAFTER>", searchAfter);
128  
129                  var response = endpoint.HttpPostString<SearchResponse>("/_search", query);
130  
131                  lastHits = response.hits.hits.Length;
132                  log.Log($"pageSize: {sizeOfPage} after: {searchAfter} -> {lastHits} hits");
133  
134                  if (lastHits > 0)
135                  {
136                      UpdateSearchAfter(response);
137                      foreach (var hit in response.hits.hits)
138                      {
139                          AddHitToQueue(hit);
140                      }
141                  }
142              }
143  
144              private void AddHitToQueue(SearchHitEntry hit)
145              {
146                  var message = hit.fields.message.Single();
147                  var number = ParseCountNumber(message);
148                  if (number != null)
149                  {
150                      queue.Add(new LogQueueEntry(message, number.Value));
151                  }
152              }
153  
154              private ulong? ParseCountNumber(string message)
155              {
156                  if (string.IsNullOrEmpty(message)) return null;
157                  var tokens = message.Split(' ', StringSplitOptions.RemoveEmptyEntries);
158                  if (!tokens.Any()) return null;
159                  var countToken = tokens.SingleOrDefault(t => t.StartsWith("count="));
160                  if (countToken == null) return null;
161                  var number = countToken.Substring(6);
162                  if (ulong.TryParse(number, out ulong value))
163                  {
164                      return value;
165                  }
166                  return null;
167              }
168  
169              private void UpdateSearchAfter(SearchResponse response)
170              {
171                  var uniqueSearchNumbers = response.hits.hits.Select(h => h.sort.Single()).Distinct().ToList();
172                  uniqueSearchNumbers.Reverse();
173  
174                  var searchNumber = GetSearchNumber(uniqueSearchNumbers);
175                  searchAfter = $"\"search_after\": [{searchNumber}],";
176              }
177  
178              private long GetSearchNumber(List<long> uniqueSearchNumbers)
179              {
180                  if (uniqueSearchNumbers.Count == 1) return uniqueSearchNumbers.First();
181                  return uniqueSearchNumbers.Skip(1).First();
182              }
183  
184              private void ProcessQueue()
185              {
186                  if (lastLogLine == null && queue.Any())
187                  {
188                      lastLogLine = queue.Min(q => q.Number) - 1;
189                  }
190  
191                  while (queue.Any())
192                  {
193                      ulong wantedNumber = lastLogLine.Value + 1;
194  
195                      DeleteOldEntries(wantedNumber);
196  
197                      var currentEntry = queue.FirstOrDefault(e => e.Number == wantedNumber);
198  
199                      if (currentEntry != null)
200                      {
201                          WriteEntryToFile(currentEntry);
202                          queue.Remove(currentEntry);
203                          lastLogLine = currentEntry.Number;
204                      }
205                      else
206                      {
207                          // The line number we want is not in the queue.
208                          // It will be returned by the elastic search query, some time in the future.
209                          // Stop processing the queue for now.
210                          return;
211                      }
212                  }
213              }
214  
215              private void WriteEntryToFile(LogQueueEntry currentEntry)
216              {
217                  targetFile.Write(currentEntry.Message);
218                  linesWritten++;
219              }
220  
221              private void DeleteOldEntries(ulong wantedNumber)
222              {
223                  queue.RemoveAll(e => e.Number < wantedNumber);
224              }
225  
226              public class LogQueueEntry
227              {
228                  public LogQueueEntry(string message, ulong number)
229                  {
230                      Message = message;
231                      Number = number;
232                  }
233  
234                  public string Message { get; }
235                  public ulong Number { get; }
236              }
237  
238              public class SearchResponse
239              {
240                  public SearchHits hits { get; set; } = new SearchHits();
241              }
242  
243              public class SearchHits
244              {
245                  public SearchHitEntry[] hits { get; set; } = Array.Empty<SearchHitEntry>();
246              }
247  
248              public class SearchHitEntry
249              {
250                  public SearchHitFields fields { get; set; } = new SearchHitFields();
251                  public long[] sort { get; set; } = Array.Empty<long>();
252              }
253  
254              public class SearchHitFields
255              {
256                  public string[] @timestamp { get; set; } = Array.Empty<string>();
257                  public string[] message { get; set; } = Array.Empty<string>();
258              }
259          }
260      }
261  }