StartupWorkflow.cs
1 using KubernetesWorkflow.Recipe; 2 using KubernetesWorkflow.Types; 3 using Logging; 4 using Newtonsoft.Json; 5 using Utils; 6 7 namespace KubernetesWorkflow 8 { 9 public interface IStartupWorkflow 10 { 11 IKnownLocations GetAvailableLocations(); 12 FutureContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig); 13 FutureContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig); 14 PodInfo GetPodInfo(RunningContainer container); 15 PodInfo GetPodInfo(RunningPod pod); 16 ContainerCrashWatcher CreateCrashWatcher(RunningContainer container); 17 void Stop(RunningPod pod, bool waitTillStopped); 18 void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null, bool? previous = null); 19 IDownloadedLog DownloadContainerLog(RunningContainer container, int? tailLines = null, bool? previous = null); 20 string ExecuteCommand(RunningContainer container, string command, params string[] args); 21 void DeleteNamespace(bool wait); 22 void DeleteNamespacesStartingWith(string namespacePrefix, bool wait); 23 } 24 25 public class StartupWorkflow : IStartupWorkflow 26 { 27 private readonly ILog log; 28 private readonly WorkflowNumberSource numberSource; 29 private readonly K8sCluster cluster; 30 private readonly string k8sNamespace; 31 private readonly RecipeComponentFactory componentFactory = new RecipeComponentFactory(); 32 private readonly LocationProvider locationProvider; 33 34 internal StartupWorkflow(ILog log, WorkflowNumberSource numberSource, K8sCluster cluster, string k8sNamespace) 35 { 36 this.log = log; 37 this.numberSource = numberSource; 38 this.cluster = cluster; 39 this.k8sNamespace = k8sNamespace; 40 41 locationProvider = new LocationProvider(log, K8s); 42 } 43 44 public IKnownLocations GetAvailableLocations() 45 { 46 return locationProvider.GetAvailableLocations(); 47 } 48 49 public FutureContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig) 50 { 51 return Start(numberOfContainers, KnownLocations.UnspecifiedLocation, recipeFactory, startupConfig); 52 } 53 54 public FutureContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig) 55 { 56 return K8s(controller => 57 { 58 componentFactory.Update(controller); 59 60 var recipes = CreateRecipes(numberOfContainers, recipeFactory, startupConfig); 61 var startResult = controller.BringOnline(recipes, location); 62 var containers = CreateContainers(startResult, recipes, startupConfig); 63 64 var rc = new RunningPod(Guid.NewGuid().ToString(), startupConfig, startResult, containers); 65 cluster.Configuration.Hooks.OnContainersStarted(rc); 66 67 if (startResult.ExternalService != null) 68 { 69 componentFactory.Update(controller); 70 } 71 return new FutureContainers(rc, this); 72 }); 73 } 74 75 public PodInfo WaitUntilOnline(RunningPod rc) 76 { 77 K8s(controller => 78 { 79 foreach (var c in rc.Containers) 80 { 81 controller.WaitUntilOnline(c); 82 } 83 }); 84 85 return GetPodInfo(rc.StartResult.Deployment); 86 } 87 88 public PodInfo GetPodInfo(RunningDeployment deployment) 89 { 90 return K8s(c => c.GetPodInfo(deployment)); 91 } 92 93 public PodInfo GetPodInfo(RunningContainer container) 94 { 95 return GetPodInfo(container.RunningPod.StartResult.Deployment); 96 } 97 98 public PodInfo GetPodInfo(RunningPod pod) 99 { 100 return K8s(c => c.GetPodInfo(pod.StartResult.Deployment)); 101 } 102 103 public ContainerCrashWatcher CreateCrashWatcher(RunningContainer container) 104 { 105 return K8s(c => c.CreateCrashWatcher(container)); 106 } 107 108 public void Stop(RunningPod runningPod, bool waitTillStopped) 109 { 110 if (runningPod.IsStopped) return; 111 foreach (var c in runningPod.Containers) 112 { 113 c.StopLog = DownloadContainerLog(c); 114 } 115 runningPod.IsStopped = true; 116 117 K8s(controller => 118 { 119 controller.Stop(runningPod.StartResult, waitTillStopped); 120 }); 121 122 cluster.Configuration.Hooks.OnContainersStopped(runningPod); 123 } 124 125 public void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null, bool? previous = null) 126 { 127 K8s(controller => 128 { 129 controller.DownloadPodLog(container, logHandler, tailLines, previous); 130 }); 131 } 132 133 public IDownloadedLog DownloadContainerLog(RunningContainer container, int? tailLines = null, bool? previous = null) 134 { 135 var msg = $"Downloading container log for '{container.Name}'"; 136 log.Log(msg); 137 var logHandler = new WriteToFileLogHandler(log, msg, container.Name); 138 139 K8s(controller => 140 { 141 controller.DownloadPodLog(container, logHandler, tailLines, previous); 142 }); 143 144 return new DownloadedLog(logHandler.LogFile, container.Name); 145 } 146 147 public string ExecuteCommand(RunningContainer container, string command, params string[] args) 148 { 149 return K8s(controller => 150 { 151 return controller.ExecuteCommand(container, command, args); 152 }); 153 } 154 155 public void DeleteNamespace(bool wait) 156 { 157 K8s(controller => 158 { 159 controller.DeleteNamespace(wait); 160 }); 161 } 162 163 public void DeleteNamespacesStartingWith(string namespacePrefix, bool wait) 164 { 165 K8s(controller => 166 { 167 controller.DeleteAllNamespacesStartingWith(namespacePrefix, wait); 168 }); 169 } 170 171 private RunningContainer[] CreateContainers(StartResult startResult, ContainerRecipe[] recipes, StartupConfig startupConfig) 172 { 173 log.Debug(); 174 return recipes.Select(r => 175 { 176 var name = GetContainerName(r, startupConfig); 177 var addresses = CreateContainerAddresses(startResult, r); 178 log.Debug($"{r}={name} -> container addresses: {string.Join(Environment.NewLine, addresses.Select(a => a.ToString()))}"); 179 180 return new RunningContainer(Guid.NewGuid().ToString(), name, r, addresses); 181 182 }).ToArray(); 183 } 184 185 private string GetContainerName(ContainerRecipe recipe, StartupConfig startupConfig) 186 { 187 if (startupConfig == null) return ""; 188 if (!string.IsNullOrEmpty(startupConfig.NameOverride)) 189 { 190 return $"<{startupConfig.NameOverride}{recipe.Number}>"; 191 } 192 else 193 { 194 return $"<{recipe.Name}>"; 195 } 196 } 197 198 private ContainerAddress[] CreateContainerAddresses(StartResult startResult, ContainerRecipe recipe) 199 { 200 var result = new List<ContainerAddress>(); 201 foreach (var exposedPort in recipe.ExposedPorts) 202 { 203 result.Add(new ContainerAddress(exposedPort.Tag, GetContainerExternalAddress(startResult, recipe, exposedPort.Tag), false)); 204 result.Add(new ContainerAddress(exposedPort.Tag, GetContainerInternalAddress(startResult, recipe, exposedPort.Tag), true)); 205 } 206 foreach (var internalPort in recipe.InternalPorts) 207 { 208 result.Add(new ContainerAddress(internalPort.Tag, GetContainerInternalAddress(startResult, recipe, internalPort.Tag), true)); 209 } 210 211 return result.ToArray(); 212 } 213 214 private static Address GetContainerExternalAddress(StartResult startResult, ContainerRecipe recipe, string tag) 215 { 216 var port = startResult.GetExternalServicePorts(recipe, tag); 217 218 return new Address( 219 logName: $"{recipe.Name}:{tag}", 220 startResult.Cluster.HostAddress, 221 port.Number); 222 } 223 224 private Address GetContainerInternalAddress(StartResult startResult, ContainerRecipe recipe, string tag) 225 { 226 var namespaceName = startResult.Cluster.Configuration.KubernetesNamespace; 227 var serviceName = startResult.InternalService!.Name; 228 var port = startResult.GetInternalServicePorts(recipe, tag); 229 230 return new Address( 231 logName: $"{serviceName}:{tag}", 232 $"http://{serviceName}.{namespaceName}.svc.cluster.local", 233 port.Number); 234 } 235 236 private ContainerRecipe[] CreateRecipes(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig) 237 { 238 log.Debug(); 239 var result = new List<ContainerRecipe>(); 240 for (var i = 0; i < numberOfContainers; i++) 241 { 242 var recipe = recipeFactory.CreateRecipe(i, numberSource.GetContainerNumber(), componentFactory, startupConfig); 243 CheckPorts(recipe); 244 245 if (cluster.Configuration.AddAppPodLabel) recipe.PodLabels.Add("app", recipeFactory.AppName); 246 cluster.Configuration.Hooks.OnContainerRecipeCreated(recipe); 247 result.Add(recipe); 248 } 249 250 return result.ToArray(); 251 } 252 253 private void CheckPorts(ContainerRecipe recipe) 254 { 255 var allTags = 256 recipe.ExposedPorts.Concat(recipe.InternalPorts) 257 .Select(p => K8sNameUtils.Format(p.Tag)).ToArray(); 258 259 if (allTags.Length != allTags.Distinct().Count()) 260 { 261 throw new Exception("Duplicate port tags found in recipe for " + recipe.Name); 262 } 263 } 264 265 private void K8s(Action<K8sController> action) 266 { 267 try 268 { 269 var controller = new K8sController(log, cluster, numberSource, k8sNamespace); 270 action(controller); 271 controller.Dispose(); 272 } 273 catch (k8s.Autorest.HttpOperationException ex) 274 { 275 log.Error(JsonConvert.SerializeObject(ex.Response)); 276 throw; 277 } 278 } 279 280 private T K8s<T>(Func<K8sController, T> action) 281 { 282 try 283 { 284 var controller = new K8sController(log, cluster, numberSource, k8sNamespace); 285 var result = action(controller); 286 controller.Dispose(); 287 return result; 288 } 289 catch (k8s.Autorest.HttpOperationException ex) 290 { 291 log.Error(JsonConvert.SerializeObject(ex.Response)); 292 throw; 293 } 294 } 295 } 296 }