MetricsQuery.cs
1 using Core; 2 using IdentityModel; 3 using KubernetesWorkflow.Types; 4 using Logging; 5 using System.Globalization; 6 using Utils; 7 using WebUtils; 8 9 namespace MetricsPlugin 10 { 11 public class MetricsQuery 12 { 13 private readonly IEndpoint endpoint; 14 private readonly ILog log; 15 16 public MetricsQuery(IPluginTools tools, RunningContainer runningContainer) 17 { 18 RunningContainer = runningContainer; 19 log = tools.GetLog(); 20 var address = RunningContainer.GetAddress(PrometheusContainerRecipe.PortTag); 21 endpoint = tools 22 .CreateHttp(address.ToString()) 23 .CreateEndpoint(address, "/api/v1/"); 24 } 25 26 public RunningContainer RunningContainer { get; } 27 28 public Metrics GetMostRecent(string metricName, Address target) 29 { 30 var response = GetLastOverTime(metricName, GetInstanceStringForNode(target)); 31 if (response == null) throw new Exception($"Failed to get most recent metric: {metricName}"); 32 33 var result = new Metrics 34 { 35 Sets = response.data.result.Select(r => 36 { 37 return new MetricsSet 38 { 39 Instance = r.metric.instance, 40 Values = MapSingleValue(r.value) 41 }; 42 }).ToArray() 43 }; 44 45 Log(target, metricName, result); 46 return result; 47 } 48 49 public Metrics GetMetrics(string metricName) 50 { 51 var response = GetAll(metricName); 52 if (response == null) throw new Exception($"Failed to get metrics by name: {metricName}"); 53 var result = MapResponseToMetrics(response); 54 Log(metricName, result); 55 return result; 56 } 57 58 public Metrics GetAllMetricsForNode(Address target) 59 { 60 var instanceString = GetInstanceStringForNode(target); 61 var response = endpoint.HttpGetJson<PrometheusQueryResponse>($"query?query={instanceString}{GetQueryTimeRange()}"); 62 if (response.status != "success") throw new Exception($"Failed to get metrics for target: {instanceString}"); 63 var result = MapResponseToMetrics(response); 64 Log(target, result); 65 return result; 66 } 67 68 private PrometheusQueryResponse? GetLastOverTime(string metricName, string instanceString) 69 { 70 var response = endpoint.HttpGetJson<PrometheusQueryResponse>($"query?query=last_over_time({metricName}{instanceString}{GetQueryTimeRange()})"); 71 if (response.status != "success") return null; 72 return response; 73 } 74 75 private PrometheusQueryResponse? GetAll(string metricName) 76 { 77 var response = endpoint.HttpGetJson<PrometheusQueryResponse>($"query?query={metricName}{GetQueryTimeRange()}"); 78 if (response.status != "success") return null; 79 return response; 80 } 81 82 private Metrics MapResponseToMetrics(PrometheusQueryResponse response) 83 { 84 return new Metrics 85 { 86 Sets = response.data.result.Select(CreateMetricsSet).ToArray() 87 }; 88 } 89 90 private MetricsSet CreateMetricsSet(PrometheusQueryResponseDataResultEntry r) 91 { 92 var result = new MetricsSet 93 { 94 Name = r.metric.__name__, 95 Instance = r.metric.instance, 96 Values = MapMultipleValues(r.values) 97 }; 98 99 if (!string.IsNullOrEmpty(r.metric.file) && !string.IsNullOrEmpty(r.metric.line) && !string.IsNullOrEmpty(r.metric.proc)) 100 { 101 result.AsyncProfiler = new AsyncProfilerMetrics 102 { 103 File = r.metric.file, 104 Line = r.metric.line, 105 Proc = r.metric.proc 106 }; 107 } 108 109 return result; 110 } 111 112 private MetricsSetValue[] MapSingleValue(object[] value) 113 { 114 if (value != null && value.Length > 0) 115 { 116 return new[] 117 { 118 MapValue(value) 119 }; 120 } 121 return Array.Empty<MetricsSetValue>(); 122 } 123 124 private MetricsSetValue[] MapMultipleValues(object[][] values) 125 { 126 if (values != null && values.Length > 0) 127 { 128 return values.Select(MapValue).ToArray(); 129 } 130 return Array.Empty<MetricsSetValue>(); 131 } 132 133 private MetricsSetValue MapValue(object[] value) 134 { 135 if (value.Length != 2) throw new InvalidOperationException("Expected value to be [double, string]."); 136 137 return new MetricsSetValue 138 { 139 Timestamp = ToTimestamp(value[0]), 140 Value = ToValue(value[1]) 141 }; 142 } 143 144 private string GetInstanceNameForNode(Address target) 145 { 146 return ScrapeTargetHelper.FormatTarget(target); 147 } 148 149 private string GetInstanceStringForNode(Address target) 150 { 151 return "{instance=\"" + GetInstanceNameForNode(target) + "\"}"; 152 } 153 154 private string GetQueryTimeRange() 155 { 156 return "[12h]"; 157 } 158 159 private double ToValue(object v) 160 { 161 try 162 { 163 return Convert.ToDouble(v, CultureInfo.InvariantCulture); 164 } 165 catch 166 { 167 return double.NaN; 168 } 169 } 170 171 private DateTime ToTimestamp(object v) 172 { 173 var unixSeconds = ToValue(v); 174 return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(unixSeconds); 175 } 176 177 private void Log(Address target, string metricName, Metrics result) 178 { 179 Log($"{target.LogName} '{metricName}' = {result}"); 180 } 181 182 private void Log(string metricName, Metrics result) 183 { 184 Log($"'{metricName}' = {result}"); 185 } 186 187 private void Log(Address target, Metrics result) 188 { 189 Log($"{target.LogName} => {result}"); 190 } 191 192 private void Log(string msg) 193 { 194 log.Log(msg); 195 } 196 } 197 198 public class Metrics 199 { 200 public MetricsSet[] Sets { get; set; } = Array.Empty<MetricsSet>(); 201 202 public override string ToString() 203 { 204 return "[" + string.Join(',', Sets.Select(s => s.ToString())) + "]"; 205 } 206 207 public string AsCsv() 208 { 209 var allTimestamps = Sets.SelectMany(s => s.Values.Select(v => v.Timestamp)).Distinct().OrderDescending().ToArray(); 210 211 var lines = new List<string>(); 212 MakeLine(lines, e => 213 { 214 e.Add("Metrics"); 215 foreach (var ts in allTimestamps) e.Add(ts.ToEpochTime().ToString()); 216 }); 217 218 foreach (var set in Sets) 219 { 220 MakeLine(lines, e => 221 { 222 e.Add(set.Name); 223 foreach (var ts in allTimestamps) 224 { 225 var value = set.Values.SingleOrDefault(v => v.Timestamp == ts); 226 if (value == null) e.Add(" "); 227 else e.Add(value.Value.ToString(CultureInfo.InvariantCulture)); 228 } 229 }); 230 } 231 232 return string.Join(Environment.NewLine, lines.ToArray()); 233 } 234 235 private void MakeLine(List<string> lines, Action<List<string>> values) 236 { 237 var list = new List<string>(); 238 values(list); 239 lines.Add(string.Join(",", list)); 240 } 241 } 242 243 public class MetricsSet 244 { 245 public string Name { get; set; } = string.Empty; 246 public string Instance { get; set; } = string.Empty; 247 public AsyncProfilerMetrics? AsyncProfiler { get; set; } = null; 248 public MetricsSetValue[] Values { get; set; } = Array.Empty<MetricsSetValue>(); 249 250 public override string ToString() 251 { 252 var prefix = ""; 253 if (AsyncProfiler != null) 254 { 255 prefix = $"proc: '{AsyncProfiler.Proc}' in '{AsyncProfiler.File}:{AsyncProfiler.Line}'"; 256 } 257 258 return $"{prefix}{Name} ({Instance}) : {{{string.Join(",", Values.Select(v => v.ToString()))}}}"; 259 } 260 } 261 262 public class AsyncProfilerMetrics 263 { 264 public string File { get; set; } = string.Empty; 265 public string Line { get; set; } = string.Empty; 266 public string Proc { get; set; } = string.Empty; 267 } 268 269 public class MetricsSetValue 270 { 271 public DateTime Timestamp { get; set; } 272 public double Value { get; set; } 273 274 public override string ToString() 275 { 276 return $"<{Timestamp.ToString("o")}={Value}>"; 277 } 278 } 279 280 public class PrometheusQueryResponse 281 { 282 public string status { get; set; } = string.Empty; 283 public PrometheusQueryResponseData data { get; set; } = new(); 284 } 285 286 public class PrometheusQueryResponseData 287 { 288 public string resultType { get; set; } = string.Empty; 289 public PrometheusQueryResponseDataResultEntry[] result { get; set; } = Array.Empty<PrometheusQueryResponseDataResultEntry>(); 290 } 291 292 public class PrometheusQueryResponseDataResultEntry 293 { 294 public ResultEntryMetric metric { get; set; } = new(); 295 public object[] value { get; set; } = Array.Empty<object>(); 296 public object[][] values { get; set; } = Array.Empty<object[]>(); 297 } 298 299 public class ResultEntryMetric 300 { 301 public string __name__ { get; set; } = string.Empty; 302 public string instance { get; set; } = string.Empty; 303 public string job { get; set; } = string.Empty; 304 // Async profiler output. 305 public string? file { get; set; } = null; 306 public string? line { get; set; } = null; 307 public string? proc { get; set; } = null; 308 } 309 310 public class PrometheusAllNamesResponse 311 { 312 public string status { get; set; } = string.Empty; 313 public string[] data { get; set; } = Array.Empty<string>(); 314 } 315 }