/ Framework / KubernetesWorkflow / ContainerCrashWatcher.cs
ContainerCrashWatcher.cs
 1  using k8s;
 2  using Logging;
 3  
 4  namespace KubernetesWorkflow
 5  {
 6      public class ContainerCrashWatcher
 7      {
 8          private readonly ILog log;
 9          private readonly KubernetesClientConfiguration config;
10          private readonly string containerName;
11          private readonly string podName;
12          private readonly string recipeName;
13          private readonly string k8sNamespace;
14          private CancellationTokenSource cts;
15          private Task? worker;
16          private Exception? workerException;
17          private bool hasCrashed = false;
18  
19          public ContainerCrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace)
20          {
21              this.log = log;
22              this.config = config;
23              this.containerName = containerName;
24              this.podName = podName;
25              this.recipeName = recipeName;
26              this.k8sNamespace = k8sNamespace;
27              cts = new CancellationTokenSource();
28          }
29  
30          public void Start()
31          {
32              if (worker != null) throw new InvalidOperationException();
33  
34              cts = new CancellationTokenSource();
35              worker = Task.Run(Worker);
36          }
37  
38          public void Stop()
39          {
40              if (worker == null) throw new InvalidOperationException();
41  
42              cts.Cancel();
43              worker.Wait();
44              worker = null;
45  
46              if (workerException != null) throw new Exception("Exception occurred in CrashWatcher worker thread.", workerException);
47          }
48  
49          public bool HasCrashed()
50          {
51              return hasCrashed;
52          }
53  
54          private void Worker()
55          {
56              try
57              {
58                  MonitorContainer(cts.Token);
59              }
60              catch (Exception ex)
61              {
62                  workerException = ex;
63              }
64          }
65  
66          private void MonitorContainer(CancellationToken token)
67          {
68              using var client = new Kubernetes(config);
69              while (!token.IsCancellationRequested)
70              {
71                  if (HasContainerBeenRestarted(client))
72                  {
73                      hasCrashed = true;
74                      cts.Cancel();
75  
76                      DownloadCrashedContainerLogs(client);
77                      return;
78                  }
79  
80                  token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10));
81              }
82          }
83  
84          private bool HasContainerBeenRestarted(Kubernetes client)
85          {
86              var podInfo = client.ReadNamespacedPod(podName, k8sNamespace);
87              var result = podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0);
88              if (result) log.Log("Pod crash detected for " + containerName);
89              return result;
90          }
91  
92          private void DownloadCrashedContainerLogs(Kubernetes client)
93          {
94              using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipeName, previous: true);
95              var handler = new WriteToFileLogHandler(log, "Crash detected for " + containerName, containerName);
96              handler.Log(stream);
97          }
98      }
99  }