/ src / docker_static_cluster / __init__.py
__init__.py
  1  #!/usr/bin/env python3
  2  
  3  # SPDX-FileCopyrightText: 2025 2025
  4  # SPDX-FileContributor: Nathan Fritzler
  5  #
  6  # SPDX-License-Identifier: MIT
  7  
  8  import os
  9  from typing import Dict, List, TextIO, Optional
 10  import json
 11  import subprocess
 12  import shlex
 13  import sys
 14  import traceback
 15  
 16  import click
 17  import yaml
 18  import docker
 19  import docker.errors
 20  
 21  from .schemas import (
 22      ConfigNode,
 23      ConfigNodeManagerStatus,
 24      ConfigNodeRMSpec,
 25      ConfigNodeSpec,
 26      ConfigNodeStatus,
 27      ConfigNodes,
 28      ConfigPlugins,
 29      ConfigServices,
 30      ConfigStack,
 31      ConfigSwarm,
 32      injest_config,
 33      Config,
 34  )
 35  from .cantgetno import satisfy_config
 36  
 37  debug = True
 38  
 39  # TODO: https://click.palletsprojects.com/en/stable/shell-completion/
 40  # TODO: automatic swarm state backup
 41  
 42  
 43  def run_cmd(args: List[str], check=True, **kwargs) -> subprocess.CompletedProcess:
 44      click.echo(f"\n$ {shlex.join(args)}\n")
 45      try:
 46          return subprocess.run(args, check=check, **kwargs)
 47      except subprocess.CalledProcessError as e:
 48          sys.exit(e.returncode)
 49  
 50  
 51  @click.group()
 52  @click.version_option()
 53  def main():
 54      pass
 55  
 56  
 57  _infile_option = click.option(
 58      "-f",
 59      "--file",
 60      "--infile",
 61      "infile",
 62      # prompt=True,
 63      default="docker_static_cluster.toml",
 64      type=click.File("rb"),
 65  )
 66  
 67  _composefile_option = click.option(
 68      "-c",
 69      "--compose-file",
 70      "compose_file",
 71      # prompt=True,
 72      default=lambda: os.environ.get("COMPOSE_FILE", "compose.yaml").split(
 73          os.environ.get("COMPOSE_PATH_SEPARATOR", ":")
 74      )[0],
 75      type=click.File("w"),
 76  )
 77  
 78  _as_remote_node_option = click.option(
 79      "-r",
 80      "--as-remote-node",
 81      "as_remote_node",
 82      type=str,
 83  )
 84  
 85  
 86  @main.command()
 87  @click.argument("stack_name", type=str)
 88  @_infile_option
 89  @_composefile_option
 90  def generate_compose(
 91      stack_name: str, infile: TextIO, compose_file: TextIO
 92  ) -> tuple[Config, ConfigNodes, ConfigSwarm, ConfigPlugins, ConfigStack]:
 93      """Generate a compose file for use with `docker stack`"""
 94      config = injest_config(infile)
 95      config, nodes, swarm, plugins, stack = satisfy_config(config, stack_name)
 96      stack_d = stack.model_dump()
 97      yaml.dump(stack_d, compose_file)
 98      return config, nodes, swarm, plugins, stack
 99  
100  
101  @main.command()
102  @click.argument("output", type=click.File("w"))
103  def generate_compose_schema(output: TextIO):
104      """Generate the schema file for the config"""
105      json.dump(Config.model_json_schema(), output)
106  
107  
108  @main.command()
109  @_infile_option
110  @_composefile_option
111  @_as_remote_node_option
112  @click.option("--detach", type=bool, default=None)
113  @click.option("--skip-swarm", is_flag=True)
114  @click.option("--skip-plugins", is_flag=True)
115  @click.option("--skip-nodes", is_flag=True)
116  @click.option("--skip-propagate-config", is_flag=True)
117  @click.option("--skip-stack-deploy", is_flag=True)
118  @click.option("--force-service-update", is_flag=True)
119  @click.argument("stack_name", type=str)
120  @click.pass_context
121  def deploy(
122      ctx,
123      infile: TextIO,
124      compose_file: TextIO,
125      as_remote_node: Optional[str],
126      skip_swarm: bool,
127      skip_plugins: bool,
128      skip_nodes: bool,
129      skip_propagate_config: bool,
130      skip_stack_deploy: bool,
131      force_service_update: bool,
132      stack_name: str,
133      detach: bool,
134  ):
135      """Deploy the config file."""
136      config, nodes_settings, swarm_settings, plugins_settings, stack_settings = (
137          ctx.invoke(
138              generate_compose,
139              stack_name=stack_name,
140              infile=infile,
141              compose_file=compose_file,
142          )
143      )
144      assert isinstance(config, Config)
145      assert isinstance(nodes_settings, ConfigNodes)
146      assert isinstance(swarm_settings, ConfigSwarm)
147      assert isinstance(plugins_settings, ConfigPlugins)
148  
149      # TODO: something was ignoring unsupported "restart" option
150  
151      if as_remote_node:
152          node_settings = nodes_settings[as_remote_node]
153          assert node_settings, f"remote node {as_remote_node} could not be found"
154          assert node_settings.remote_docker_conf, (
155              f"remote node {as_remote_node} does not have a remote remote_docker_conf"
156          )
157          remote_docker_conf_d = node_settings.remote_docker_conf.model_dump()
158          d_client = docker.DockerClient(
159              **{
160                  key: value
161                  for key, value in remote_docker_conf_d.items()
162                  if value is not None
163              }
164          )
165      else:
166          d_client = docker.from_env()
167  
168      if not skip_plugins:
169          for plugin_name, plugin_config in plugins_settings.items():
170              try:
171                  d_plugin = d_client.plugins.get(plugin_name)
172                  if plugin_config.remove:
173                      d_plugin.remove(force=plugin_config.remove == "force")
174                      continue
175              except docker.errors.NotFound:
176                  d_plugin = d_client.plugins.install(
177                      remote_name=plugin_config.image,
178                      local_name=plugin_name,
179                  )
180              plugin_config_d = plugin_config.model_dump()
181              plugin_config_d["name"] = plugin_name
182              d_plugin.configure(plugin_config)
183          # TODO: prune option
184      if not skip_swarm and swarm_settings:
185          ctx.invoke(swarm_update, stack_name=stack_name)
186      if not skip_nodes:
187          for node_name in nodes_settings.keys():
188              ctx.invoke(node_update, node=node_name, stack_name=stack_name)
189          # TODO prune
190      if (not skip_propagate_config) and (not skip_plugins):
191          for node_name in nodes_settings.keys():
192              ctx.invoke(
193                  deploy,
194                  skip_plugins=False,
195                  skip_nodes=True,
196                  skip_swarm=True,
197                  skip_propagate_config=True,
198                  skip_stack_deploy=True,
199                  as_remote_node=node_name,
200                  stack_name=stack_name,
201              )
202      if not skip_stack_deploy:
203          # TODO prune
204  
205          # stack_settings = stacks_settings[stack_name]
206          if as_remote_node:
207              # TODO: support ssh
208              raise NotImplementedError("Stack commands cannot be run on a remote node")
209          # NOTE: below doesn't do things the right way
210          #
211          # cmd = ["docker"]
212          #
213          # this instead adds behavior expected from docker stack,
214          #  alike that of docker compose
215          # TODO: this is python code, but they don't provide a python API
216          cmd = ["docker-sdp"]
217  
218          cmd.extend(["stack", "deploy"])
219          cmd.append(stack_name)
220          cmd.extend(["--compose-file", compose_file.name])
221          if detach is not None:
222              cmd.append(f"--detach={'true' if detach else 'false'}")
223  
224          run_cmd(cmd)
225      if force_service_update:
226          if as_remote_node:
227              # TODO: support ssh
228              raise NotImplementedError(
229                  "forcing a service update cannot be run on a remote node"
230              )
231  
232          services_settings: ConfigServices = stack_settings.services
233          services_settings_d = services_settings.model_dump()
234  
235          for service_name in services_settings_d.keys():
236              cmd = ["docker", "service", "update"]
237              cmd.append("--force")
238              cmd.append(f"{stack_name}_{service_name}")
239  
240              run_cmd(cmd)
241  
242  
243  # TODO: make these into commands
244  #
245  # TIP: if you're looking for a way to force-restart stuff,
246  #
247  #     docker stack services -q $stack | xargs -rt -n1 -- docker service update --force
248  #
249  # from https://stackoverflow.com/a/71724439/6353323
250  #
251  # can recreate everything
252  #
253  #     docker service update --force $service
254  #
255  # from https://stackoverflow.com/a/44110795/6353323
256  #
257  # will just update the unchanged
258  
259  # TODO: join swarm command
260  
261  
262  @main.group()
263  def swarm():
264      """
265      wrapper for docker swarm
266  
267      Not needed:
268      - join-token
269      - leave
270      - unlock
271      - unlock-key
272      """
273      pass
274  
275  
276  _swarm_init_keys = (
277      "task_history_retention_limit",
278      "snapshot_interval",
279      "keep_old_snapshots",
280      "log_entries_for_slow_followers",
281      "heartbeat_tick",
282      "dispatcher_heartbeat_period",
283      "signing_ca_cert",
284      "signing_ca_key",
285  )
286  
287  
288  @swarm.command("init")
289  @_infile_option
290  @click.option("--force-new-cluster", is_flag=True)
291  @click.argument("node_name", type=str)
292  def swarm_init(
293      infile: TextIO,
294      force_new_cluster: bool,
295      node_name: str,
296  ):
297      """wrapper for docker swarm init"""
298      config = injest_config(infile)
299  
300      swarm_settings = config.swarm_settings
301      swarm_nodes = config.swarm_nodes
302      assert swarm_nodes, "There are no nodes defined!"
303      the_node: ConfigNode = swarm_nodes[node_name]
304  
305      d_client = docker.from_env()
306  
307      kwargs = {}
308  
309      swarm_settings_d = swarm_settings.model_dump()
310      for key in _swarm_init_keys:
311          if key in swarm_settings_d:
312              kwargs[key] = swarm_settings_d[key]
313  
314      if the_node.Status and the_node.Status.Addr:
315          kwargs["advertise_addr"] = the_node.Status.Addr
316      if the_node.ManagerStatus and the_node.ManagerStatus.Addr:
317          kwargs["listen_addr"] = the_node.ManagerStatus.Addr
318      if the_node.DataPathAddr:
319          kwargs["data_path_addr"] = the_node.DataPathAddr
320  
321      click.echo(d_client.swarm.init(force_new_cluster=force_new_cluster, **kwargs))
322  
323  
324  main.add_command(swarm_init)
325  
326  
327  # BUG: this command wasn't working, so I've disabled it.
328  # @swarm.command("join")
329  @click.argument("node", type=str)
330  @click.argument("stack_name", type=str)
331  @_infile_option
332  @click.option("--token", type=str)
333  def swarm_join(stack_name: str, infile: TextIO, node: str, token):
334      """wrapper for docker swarm join"""
335      config = injest_config(infile)
336      config, nodes, _, _, _ = satisfy_config(config, stack_name)
337  
338      d_client = docker.from_env()
339  
340      the_node = nodes[node]
341  
342      kwargs: Dict[str, object] = {}
343  
344      kwargs["remote_addrs"] = [
345          man_node.Status.Addr
346          for node_name, man_node in nodes.items()
347          if isinstance(man_node, ConfigNode)
348          and node_name != node
349          and man_node.Status
350          and man_node.Status.Addr
351      ]
352  
353      if the_node.Status and the_node.Status.Addr:
354          kwargs["advertise_addr"] = the_node.Status.Addr
355      if the_node.ManagerStatus and the_node.ManagerStatus.Addr:
356          kwargs["listen_addr"] = the_node.ManagerStatus.Addr
357      if the_node.DataPathAddr:
358          kwargs["data_path_addr"] = the_node.DataPathAddr
359  
360      click.echo(json.dumps(kwargs))
361  
362      assert d_client.swarm.join(join_token=token, **kwargs)
363  
364  
365  # BUG: I've disabled this command, see above
366  # main.add_command(swarm_join)
367  
368  
369  @swarm.command("update")
370  @click.argument("stack_name", type=str)
371  @_infile_option
372  @click.option("--rotate-worker-token", is_flag=True)
373  @click.option("--rotate-manager-token", is_flag=True)
374  @click.option("--rotate-manager-unlock-key", is_flag=True)
375  def swarm_update(
376      stack_name: str,
377      infile: TextIO,
378      rotate_worker_token,
379      rotate_manager_token,
380      rotate_manager_unlock_key,
381  ):
382      """wrapper for docker swarm update"""
383      config = injest_config(infile)
384      config, _, swarm_settings, _, _ = satisfy_config(config, stack_name)
385  
386      d_client = docker.from_env()
387  
388      assert d_client.swarm.attrs, (
389          "Not connected to a swarm! You need to either init or join!"
390      )
391  
392      kwargs = {}
393  
394      swarm_settings_d = swarm_settings.model_dump()
395  
396      for key in _swarm_init_keys:
397          if key in swarm_settings_d:
398              kwargs[key] = swarm_settings_d[key]
399  
400      d_client.swarm.update(
401          rotate_worker_token=rotate_worker_token,
402          rotate_manager_token=rotate_manager_token,
403          rotate_manager_unlock_key=rotate_manager_unlock_key,
404          **kwargs,
405      )
406  
407  
408  @main.group()
409  def node():
410      """
411      wrapper for docker node
412  
413      Not needed:
414      - demote
415      - inspect
416      - ls
417      - promote
418      - ps
419      - rm
420      """
421      pass
422  
423  
424  @node.command("update")
425  @click.argument("stack_name", type=str)
426  @_infile_option
427  @click.argument("node", type=str)
428  def node_update(stack_name: str, infile: TextIO, node):
429      """wrapper for docker node update"""
430      config = injest_config(infile)
431      config, nodes, _, _, _ = satisfy_config(config, stack_name)
432  
433      rm = node not in nodes
434      rm_force = False
435  
436      d_client = docker.from_env()
437      try:
438          d_node = d_client.nodes.get(node)
439      except docker.errors.APIError as e:
440          if e.status_code == 404:
441              if rm:
442                  click.echo(f"node {node} was already removed")
443                  return
444              else:
445                  click.echo(f"node {node} needs to join the swarm")
446                  # TODO: do this automatically
447                  raise NotImplementedError(f"can't yet auto-join node {node}") from e
448          else:
449              raise e
450  
451      if not rm:
452          node_settings: ConfigNode = nodes[node]
453  
454          spec = node_settings.Spec
455  
456          if isinstance(spec, ConfigNodeRMSpec):
457              rm = True
458              rm_force = spec.Role == "rm-force"
459  
460              node_settings.Spec = ConfigNodeSpec(Role="worker", Availability="drain")
461  
462          if not rm_force:
463              # TODO: may need to actually promote or demote
464              assert d_node.update(node_settings.Spec.model_dump()), (
465                  "failed to update node"
466              )
467              d_node.reload()
468      if rm:
469          assert d_node.remove(force=rm_force), "failed to remove node"
470  
471  
472  def handle_ecxeption(exc_type, exc_value, exc_traceback):
473      import yaml.parser
474  
475      # TODO: put this to stderr
476      click.echo(f"{exc_type.__name__} {exc_type}")
477      click.echo()
478      try:
479          raise exc_value
480      except docker.errors.APIError as e:
481          if debug:
482              click.echo(traceback.format_exc())
483              click.echo()
484              click.echo(
485                  json.dumps(
486                      {
487                          "is_client_error": e.is_client_error(),
488                          "is_error": e.is_error(),
489                          "is_server_error": e.is_server_error(),
490                          "status_code": e.status_code,
491                          "strerror": e.strerror,
492                          "errno": e.errno,
493                          "filename": e.filename,
494                          "filename2": e.filename2,
495                      }
496                  )
497              )
498              click.echo()
499              click.echo(e)
500              click.echo()
501          click.echo(e.explanation)
502      except Exception as e:
503          if debug:
504              click.echo(traceback.format_exc())
505              click.echo()
506          click.echo(e)
507  
508  
509  sys.excepthook = handle_ecxeption