/ subplot / subplot.py
subplot.py
  1  import logging
  2  import os
  3  import yaml
  4  
  5  
  6  def fixme(ctx, **kwargs):
  7      assert 0
  8  
  9  
 10  def create_vm(ctx):
 11      QemuSystem = globals()["QemuSystem"]
 12      srcdir = globals()["srcdir"]
 13  
 14      MiB = 1024 ** 2
 15      GiB = 1024 ** 3
 16  
 17      cfg = yaml.safe_load(open(os.path.join(srcdir, "test.cfg")))
 18      name = cfg["name"]
 19      base_image = cfg["base_image"]
 20      username = cfg["username"]
 21      cpus = cfg["cpus"]
 22      memory = cfg["memory"] * MiB
 23  
 24      # We use a hard-coded test key that we have in the source tree.
 25      pubkey = open(os.path.join(srcdir, "ssh", "id.pub")).read().strip()
 26  
 27      # We hard code the disk image size, since we don't expect scenarios to have
 28      # any specific size needs, and also, the qcow2 image format is only as big
 29      # as the data put into the disk, so we can choose a size that fits all.
 30      disk_size = 10 * GiB
 31  
 32      logging.info("starting a VM using qemu-system")
 33      logging.info(f"  name    : {name}")
 34      logging.info(f"  image   : {base_image}")
 35      logging.info(f"  disk    : {disk_size}")
 36      logging.info(f"  pubkey  : {pubkey}")
 37      logging.info(f"  memory  : {memory}")
 38      logging.info(f"  cpus    : {cpus}")
 39      logging.info(f"  username: {username}")
 40  
 41      qemu = QemuSystem(name, base_image, disk_size, pubkey)
 42      qemu.set_memory(memory)
 43      qemu.set_vcpus(cpus)
 44      qemu.set_username(username)
 45      qemu.start()
 46  
 47      logging.debug("waiting for SSH to be ready")
 48      if qemu.wait_for_ssh():
 49          logging.debug("SSH is ready")
 50      else:
 51          logging.error("SSH did not get ready")
 52          assert 0
 53      logging.info("a qemu-system VM is up and running and accessible over SSH")
 54      ctx["qemu"] = qemu
 55  
 56  
 57  def destroy_vm(ctx):
 58      logging.debug(f"destroying qemu running")
 59      qemu = ctx["qemu"]
 60      qemu.stop()
 61  
 62  
 63  def run_true_on_host(ctx):
 64      qemu = ctx["qemu"]
 65      qemu.ssh(["/bin/true"])
 66  
 67  
 68  def use_role_in_playbook(ctx, role=None):
 69      empty_playbook = {
 70          "hosts": "test-host",
 71          "remote_user": "debian",  # FIXME: don't hardcode this
 72          "become": True,
 73          "roles": [],
 74      }
 75      playbook = ctx.get("playbook", dict(empty_playbook))
 76      playbook["roles"].append(role)
 77      ctx["playbook"] = playbook
 78  
 79  
 80  def set_vars_file(ctx, filename=None):
 81      get_file = globals()["get_file"]
 82      data = get_file(filename)
 83      with open("vars.yml", "wb") as f:
 84          f.write(data)
 85  
 86  
 87  def try_playbook(ctx):
 88      runcmd_run = globals()["runcmd_run"]
 89      assert_ne = globals()["assert_ne"]
 90      srcdir = globals()["srcdir"]
 91  
 92      qemu = ctx["qemu"]
 93  
 94      with open("hosts", "w") as f:
 95          f.write("test-host\n")
 96  
 97      if not os.path.exists("vars.yml"):
 98          with open("vars.yml", "w") as f:
 99              yaml.safe_dump({}, stream=f)
100  
101      playbook = [ctx["playbook"]]
102      assert_ne(playbook, None)
103      with open("playbook.yml", "w") as f:
104          yaml.safe_dump(playbook, stream=f)
105  
106      ssh_opts = [
107          "-ouserknownhostsfile=/dev/null",
108          "-ostricthostkeychecking=accept-new",
109          "-i",
110          os.path.join(srcdir, "ssh", "id"),
111          f"-p{qemu.get_port()}",
112      ]
113  
114      env = dict(os.environ)
115      env["ANSIBLE_SSH_ARGS"] = " ".join(ssh_opts)
116      env["ANSIBLE_LOG"] = "ansible.log"
117      env["ANSIBLE_ROLES_PATH"] = os.path.join(srcdir, "roles")
118  
119      argv = [
120          "ansible-playbook",
121          "-i",
122          "hosts",
123          f"-e@vars.yml",
124          "-eansible_ssh_host=localhost",
125          f"-eansible_ssh_port={qemu.get_port()}",
126          "playbook.yml",
127      ]
128  
129      runcmd_run(ctx, argv, env=env)
130  
131  
132  def command_fails(ctx):
133      runcmd_exit_code_is_nonzero = globals()["runcmd_exit_code_is_nonzero"]
134      runcmd_exit_code_is_nonzero(ctx)
135  
136  
137  def run_playbook(ctx):
138      runcmd_exit_code_is_zero = globals()["runcmd_exit_code_is_zero"]
139      try_playbook(ctx)
140      runcmd_exit_code_is_zero(ctx)
141  
142  
143  def xstdout_contains(ctx, text=None):
144      runcmd_stdout_contains = globals()["runcmd_stdout_contains"]
145      runcmd_stdout_contains(ctx, text=text)