/ Framework / KubernetesWorkflow / StartupWorkflow.cs
StartupWorkflow.cs
  1  using KubernetesWorkflow.Recipe;
  2  using KubernetesWorkflow.Types;
  3  using Logging;
  4  using Newtonsoft.Json;
  5  using Utils;
  6  
  7  namespace KubernetesWorkflow
  8  {
  9      public interface IStartupWorkflow
 10      {
 11          IKnownLocations GetAvailableLocations();
 12          FutureContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig);
 13          FutureContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig);
 14          PodInfo GetPodInfo(RunningContainer container);
 15          PodInfo GetPodInfo(RunningPod pod);
 16          ContainerCrashWatcher CreateCrashWatcher(RunningContainer container);
 17          void Stop(RunningPod pod, bool waitTillStopped);
 18          void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null, bool? previous = null);
 19          IDownloadedLog DownloadContainerLog(RunningContainer container, int? tailLines = null, bool? previous = null);
 20          string ExecuteCommand(RunningContainer container, string command, params string[] args);
 21          void DeleteNamespace(bool wait);
 22          void DeleteNamespacesStartingWith(string namespacePrefix, bool wait);
 23      }
 24  
 25      public class StartupWorkflow : IStartupWorkflow
 26      {
 27          private readonly ILog log;
 28          private readonly WorkflowNumberSource numberSource;
 29          private readonly K8sCluster cluster;
 30          private readonly string k8sNamespace;
 31          private readonly RecipeComponentFactory componentFactory = new RecipeComponentFactory();
 32          private readonly LocationProvider locationProvider;
 33  
 34          internal StartupWorkflow(ILog log, WorkflowNumberSource numberSource, K8sCluster cluster, string k8sNamespace)
 35          {
 36              this.log = log;
 37              this.numberSource = numberSource;
 38              this.cluster = cluster;
 39              this.k8sNamespace = k8sNamespace;
 40  
 41              locationProvider = new LocationProvider(log, K8s);
 42          }
 43  
 44          public IKnownLocations GetAvailableLocations()
 45          {
 46              return locationProvider.GetAvailableLocations();
 47          }
 48  
 49          public FutureContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
 50          {
 51              return Start(numberOfContainers, KnownLocations.UnspecifiedLocation, recipeFactory, startupConfig);
 52          }
 53  
 54          public FutureContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
 55          {
 56              return K8s(controller =>
 57              {
 58                  componentFactory.Update(controller);
 59  
 60                  var recipes = CreateRecipes(numberOfContainers, recipeFactory, startupConfig);
 61                  var startResult = controller.BringOnline(recipes, location);
 62                  var containers = CreateContainers(startResult, recipes, startupConfig);
 63  
 64                  var rc = new RunningPod(Guid.NewGuid().ToString(), startupConfig, startResult, containers);
 65                  cluster.Configuration.Hooks.OnContainersStarted(rc);
 66  
 67                  if (startResult.ExternalService != null)
 68                  {
 69                      componentFactory.Update(controller);
 70                  }
 71                  return new FutureContainers(rc, this);
 72              });
 73          }
 74  
 75          public PodInfo WaitUntilOnline(RunningPod rc)
 76          {
 77              K8s(controller =>
 78              {
 79                  foreach (var c in rc.Containers)
 80                  {
 81                      controller.WaitUntilOnline(c);
 82                  }
 83              });
 84  
 85              return GetPodInfo(rc.StartResult.Deployment);
 86          }
 87  
 88          public PodInfo GetPodInfo(RunningDeployment deployment)
 89          {
 90              return K8s(c => c.GetPodInfo(deployment));
 91          }
 92  
 93          public PodInfo GetPodInfo(RunningContainer container)
 94          {
 95              return GetPodInfo(container.RunningPod.StartResult.Deployment);
 96          }
 97  
 98          public PodInfo GetPodInfo(RunningPod pod)
 99          {
100              return K8s(c => c.GetPodInfo(pod.StartResult.Deployment));
101          }
102  
103          public ContainerCrashWatcher CreateCrashWatcher(RunningContainer container)
104          {
105              return K8s(c => c.CreateCrashWatcher(container));
106          }
107  
108          public void Stop(RunningPod runningPod, bool waitTillStopped)
109          {
110              if (runningPod.IsStopped) return;
111              foreach (var c in runningPod.Containers)
112              {
113                  c.StopLog = DownloadContainerLog(c);
114              }
115              runningPod.IsStopped = true;
116  
117              K8s(controller =>
118              {
119                  controller.Stop(runningPod.StartResult, waitTillStopped);
120              });
121  
122              cluster.Configuration.Hooks.OnContainersStopped(runningPod);
123          }
124  
125          public void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null, bool? previous = null)
126          {
127              K8s(controller =>
128              {
129                  controller.DownloadPodLog(container, logHandler, tailLines, previous);
130              });
131          }
132  
133          public IDownloadedLog DownloadContainerLog(RunningContainer container, int? tailLines = null, bool? previous = null)
134          {
135              var msg = $"Downloading container log for '{container.Name}'";
136              log.Log(msg);
137              var logHandler = new WriteToFileLogHandler(log, msg, container.Name);
138  
139              K8s(controller =>
140              {
141                  controller.DownloadPodLog(container, logHandler, tailLines, previous);
142              });
143  
144              return new DownloadedLog(logHandler.LogFile, container.Name);
145          }
146  
147          public string ExecuteCommand(RunningContainer container, string command, params string[] args)
148          {
149              return K8s(controller =>
150              {
151                  return controller.ExecuteCommand(container, command, args);
152              });
153          }
154  
155          public void DeleteNamespace(bool wait)
156          {
157              K8s(controller =>
158              {
159                  controller.DeleteNamespace(wait);
160              });
161          }
162  
163          public void DeleteNamespacesStartingWith(string namespacePrefix, bool wait)
164          {
165              K8s(controller =>
166              {
167                  controller.DeleteAllNamespacesStartingWith(namespacePrefix, wait);
168              });
169          }
170  
171          private RunningContainer[] CreateContainers(StartResult startResult, ContainerRecipe[] recipes, StartupConfig startupConfig)
172          {
173              log.Debug();
174              return recipes.Select(r =>
175              {
176                  var name = GetContainerName(r, startupConfig);
177                  var addresses = CreateContainerAddresses(startResult, r);
178                  log.Debug($"{r}={name} -> container addresses: {string.Join(Environment.NewLine, addresses.Select(a => a.ToString()))}");
179  
180                  return new RunningContainer(Guid.NewGuid().ToString(), name, r, addresses);
181  
182              }).ToArray();
183          }
184  
185          private string GetContainerName(ContainerRecipe recipe, StartupConfig startupConfig)
186          {
187              if (startupConfig == null) return "";
188              if (!string.IsNullOrEmpty(startupConfig.NameOverride))
189              {
190                  return $"<{startupConfig.NameOverride}{recipe.Number}>";
191              }
192              else
193              {
194                  return $"<{recipe.Name}>";
195              }
196          }
197  
198          private ContainerAddress[] CreateContainerAddresses(StartResult startResult, ContainerRecipe recipe)
199          {
200              var result = new List<ContainerAddress>();
201              foreach (var exposedPort in recipe.ExposedPorts)
202              {
203                  result.Add(new ContainerAddress(exposedPort.Tag, GetContainerExternalAddress(startResult, recipe, exposedPort.Tag), false));
204                  result.Add(new ContainerAddress(exposedPort.Tag, GetContainerInternalAddress(startResult, recipe, exposedPort.Tag), true));
205              }
206              foreach (var internalPort in recipe.InternalPorts)
207              {
208                  result.Add(new ContainerAddress(internalPort.Tag, GetContainerInternalAddress(startResult, recipe, internalPort.Tag), true));
209              }
210  
211              return result.ToArray();
212          }
213  
214          private static Address GetContainerExternalAddress(StartResult startResult, ContainerRecipe recipe, string tag)
215          {
216              var port = startResult.GetExternalServicePorts(recipe, tag);
217  
218              return new Address(
219                  logName: $"{recipe.Name}:{tag}",
220                  startResult.Cluster.HostAddress,
221                  port.Number);
222          }
223  
224          private Address GetContainerInternalAddress(StartResult startResult, ContainerRecipe recipe, string tag)
225          {
226              var namespaceName = startResult.Cluster.Configuration.KubernetesNamespace;
227              var serviceName = startResult.InternalService!.Name;
228              var port = startResult.GetInternalServicePorts(recipe, tag);
229  
230              return new Address(
231                  logName: $"{serviceName}:{tag}",
232                  $"http://{serviceName}.{namespaceName}.svc.cluster.local",
233                  port.Number);
234          }
235          
236          private ContainerRecipe[] CreateRecipes(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
237          {
238              log.Debug();
239              var result = new List<ContainerRecipe>();
240              for (var i = 0; i < numberOfContainers; i++)
241              {
242                  var recipe = recipeFactory.CreateRecipe(i, numberSource.GetContainerNumber(), componentFactory, startupConfig);
243                  CheckPorts(recipe);
244  
245                  if (cluster.Configuration.AddAppPodLabel) recipe.PodLabels.Add("app", recipeFactory.AppName);
246                  cluster.Configuration.Hooks.OnContainerRecipeCreated(recipe);
247                  result.Add(recipe);
248              }
249  
250              return result.ToArray();
251          }
252  
253          private void CheckPorts(ContainerRecipe recipe)
254          {
255              var allTags =
256                  recipe.ExposedPorts.Concat(recipe.InternalPorts)
257                  .Select(p => K8sNameUtils.Format(p.Tag)).ToArray();
258  
259              if (allTags.Length != allTags.Distinct().Count())
260              {
261                  throw new Exception("Duplicate port tags found in recipe for " + recipe.Name);
262              }
263          }
264  
265          private void K8s(Action<K8sController> action)
266          {
267              try
268              {
269                  var controller = new K8sController(log, cluster, numberSource, k8sNamespace);
270                  action(controller);
271                  controller.Dispose();
272              }
273              catch (k8s.Autorest.HttpOperationException ex)
274              {
275                  log.Error(JsonConvert.SerializeObject(ex.Response));
276                  throw;
277              }
278          }
279  
280          private T K8s<T>(Func<K8sController, T> action)
281          {
282              try
283              {
284                  var controller = new K8sController(log, cluster, numberSource, k8sNamespace);
285                  var result = action(controller);
286                  controller.Dispose();
287                  return result;
288              }
289              catch (k8s.Autorest.HttpOperationException ex)
290              {
291                  log.Error(JsonConvert.SerializeObject(ex.Response));
292                  throw;
293              }
294          }
295      }
296  }