/ deployment.py
deployment.py
  1  # Python Imports
  2  import argparse
  3  import asyncio
  4  import logging
  5  import random
  6  from datetime import datetime
  7  from pathlib import Path
  8  from typing import Optional
  9  
 10  from kubernetes import config
 11  from kubernetes.client import ApiClient
 12  from ruamel import yaml
 13  
 14  # Project Imports
 15  from src.deployments.core.kube_utils import init_logger, set_config_file
 16  from src.deployments.registry import registry as experiment_registry
 17  
 18  logger = logging.getLogger(__name__)
 19  
 20  
 21  async def run_experiment(
 22      name: str,
 23      args: argparse.Namespace,
 24      values_path: Optional[str],
 25      kube_config: Path,
 26  ):
 27      logger.debug(f"params: {args}")
 28      config.load_kube_config(config_file=kube_config)
 29      set_config_file(kube_config)
 30      api_client = ApiClient()
 31  
 32      try:
 33          with open(values_path, "r") as values:
 34              values_yaml = yaml.safe_load(values.read())  # todo: change to get_YAML().load()...
 35      except TypeError:
 36          # values_path is None.
 37          values_yaml = None
 38  
 39      info = experiment_registry[name]
 40      experiment = info.cls()
 41      logger.info(f"Running experiment. name `{info.name}` file: `{info.metadata['module_path']}`")
 42      await experiment.run(api_client, args, values_yaml)
 43  
 44  
 45  def setup_output_folder(args: argparse.Namespace) -> Path:
 46      base_out_dir = Path(__file__).parent / "out"
 47      if args.out_folder is not None:
 48          out_dir = (
 49              args.out_folder if args.out_folder.is_absolute() else base_out_dir / args.out_folder
 50          )
 51      else:
 52          # Adding a random number helps distinguish experiments.
 53          random_number = random.randint(1000, 9999)
 54          datetime_str = datetime.now().strftime("%Y.%m.%d_%H.%M.%f")[:-3]
 55          out_dir = base_out_dir / f"{datetime_str}_{random_number}"
 56  
 57      out_dir.mkdir(parents=True, exist_ok=False)
 58      return out_dir
 59  
 60  
 61  async def main():
 62      parser = argparse.ArgumentParser(
 63          description="A tool to run experiments. Generates deployment yaml with helm and deploys using the given kubeconfig. Dependencies: helm must be installed and in $PATH."
 64      )
 65  
 66      subparsers = parser.add_subparsers(dest="experiment", required=True)
 67  
 68      parser.add_argument("--values", default=None, help="Path to values.yaml", dest="values_path")
 69      parser.add_argument(
 70          "--config",
 71          required=False,
 72          help="Config passed to --kubeconfig in kubernetes commands.",
 73          default="~/.kube/config",
 74          dest="kube_config",
 75      )
 76      parser.add_argument(
 77          "-v",
 78          "--verbose",
 79          action="count",
 80          dest="verbosity",
 81          default=0,
 82          help="Set the log level: -v (warnings), -vv (info), -vvv (debug) -vvvv (most verbose)",
 83      )
 84      parser.add_argument(
 85          "-l",
 86          "--out-folder",
 87          type=Path,
 88          dest="out_folder",
 89          default=None,
 90          required=False,
 91          help="Output folder to contain all experiment output. "
 92          "Path is relative to `__file__/out/` unless given as absolute path. "
 93          "Default path uses `__file__/out/{{datetime}}`.",
 94      )
 95  
 96      # Scan for experiments.
 97      experiment_registry.scan(Path(__file__) / ".." / "src" / "deployments", mode="skip")
 98  
 99      # Add subparsers for all experiments.
100      for info in experiment_registry.items():
101          try:
102              info.cls.add_parser(subparsers)
103          except AttributeError as e:
104              raise AttributeError(f"{info}") from e
105  
106      args = parser.parse_args()
107      out_folder = setup_output_folder(args)
108      args.output_folder = out_folder
109      verbosity = args.verbosity or 2
110      init_logger(logging.getLogger(), verbosity, out_folder / "out.log")
111      try:
112          await run_experiment(
113              name=args.experiment,
114              args=args,
115              values_path=args.values_path,
116              kube_config=args.kube_config,
117          )
118      except KeyboardInterrupt:
119          pass
120  
121  
122  if __name__ == "__main__":
123      asyncio.run(main())