__init__.py
1 #!/usr/bin/env python3 2 3 # SPDX-FileCopyrightText: 2025 2025 4 # SPDX-FileContributor: Nathan Fritzler 5 # 6 # SPDX-License-Identifier: MIT 7 8 import os 9 from typing import Dict, List, TextIO, Optional 10 import json 11 import subprocess 12 import shlex 13 import sys 14 import traceback 15 16 import click 17 import yaml 18 import docker 19 import docker.errors 20 21 from .schemas import ( 22 ConfigNode, 23 ConfigNodeManagerStatus, 24 ConfigNodeRMSpec, 25 ConfigNodeSpec, 26 ConfigNodeStatus, 27 ConfigNodes, 28 ConfigPlugins, 29 ConfigServices, 30 ConfigStack, 31 ConfigSwarm, 32 injest_config, 33 Config, 34 ) 35 from .cantgetno import satisfy_config 36 37 debug = True 38 39 # TODO: https://click.palletsprojects.com/en/stable/shell-completion/ 40 # TODO: automatic swarm state backup 41 42 43 def run_cmd(args: List[str], check=True, **kwargs) -> subprocess.CompletedProcess: 44 click.echo(f"\n$ {shlex.join(args)}\n") 45 try: 46 return subprocess.run(args, check=check, **kwargs) 47 except subprocess.CalledProcessError as e: 48 sys.exit(e.returncode) 49 50 51 @click.group() 52 @click.version_option() 53 def main(): 54 pass 55 56 57 _infile_option = click.option( 58 "-f", 59 "--file", 60 "--infile", 61 "infile", 62 # prompt=True, 63 default="docker_static_cluster.toml", 64 type=click.File("rb"), 65 ) 66 67 _composefile_option = click.option( 68 "-c", 69 "--compose-file", 70 "compose_file", 71 # prompt=True, 72 default=lambda: os.environ.get("COMPOSE_FILE", "compose.yaml").split( 73 os.environ.get("COMPOSE_PATH_SEPARATOR", ":") 74 )[0], 75 type=click.File("w"), 76 ) 77 78 _as_remote_node_option = click.option( 79 "-r", 80 "--as-remote-node", 81 "as_remote_node", 82 type=str, 83 ) 84 85 86 @main.command() 87 @click.argument("stack_name", type=str) 88 @_infile_option 89 @_composefile_option 90 def generate_compose( 91 stack_name: str, infile: TextIO, compose_file: TextIO 92 ) -> tuple[Config, ConfigNodes, ConfigSwarm, ConfigPlugins, ConfigStack]: 93 """Generate a compose file for use with `docker stack`""" 94 config = injest_config(infile) 95 config, nodes, swarm, plugins, stack = satisfy_config(config, stack_name) 96 stack_d = stack.model_dump() 97 yaml.dump(stack_d, compose_file) 98 return config, nodes, swarm, plugins, stack 99 100 101 @main.command() 102 @click.argument("output", type=click.File("w")) 103 def generate_compose_schema(output: TextIO): 104 """Generate the schema file for the config""" 105 json.dump(Config.model_json_schema(), output) 106 107 108 @main.command() 109 @_infile_option 110 @_composefile_option 111 @_as_remote_node_option 112 @click.option("--detach", type=bool, default=None) 113 @click.option("--skip-swarm", is_flag=True) 114 @click.option("--skip-plugins", is_flag=True) 115 @click.option("--skip-nodes", is_flag=True) 116 @click.option("--skip-propagate-config", is_flag=True) 117 @click.option("--skip-stack-deploy", is_flag=True) 118 @click.option("--force-service-update", is_flag=True) 119 @click.argument("stack_name", type=str) 120 @click.pass_context 121 def deploy( 122 ctx, 123 infile: TextIO, 124 compose_file: TextIO, 125 as_remote_node: Optional[str], 126 skip_swarm: bool, 127 skip_plugins: bool, 128 skip_nodes: bool, 129 skip_propagate_config: bool, 130 skip_stack_deploy: bool, 131 force_service_update: bool, 132 stack_name: str, 133 detach: bool, 134 ): 135 """Deploy the config file.""" 136 config, nodes_settings, swarm_settings, plugins_settings, stack_settings = ( 137 ctx.invoke( 138 generate_compose, 139 stack_name=stack_name, 140 infile=infile, 141 compose_file=compose_file, 142 ) 143 ) 144 assert isinstance(config, Config) 145 assert isinstance(nodes_settings, ConfigNodes) 146 assert isinstance(swarm_settings, ConfigSwarm) 147 assert isinstance(plugins_settings, ConfigPlugins) 148 149 # TODO: something was ignoring unsupported "restart" option 150 151 if as_remote_node: 152 node_settings = nodes_settings[as_remote_node] 153 assert node_settings, f"remote node {as_remote_node} could not be found" 154 assert node_settings.remote_docker_conf, ( 155 f"remote node {as_remote_node} does not have a remote remote_docker_conf" 156 ) 157 remote_docker_conf_d = node_settings.remote_docker_conf.model_dump() 158 d_client = docker.DockerClient( 159 **{ 160 key: value 161 for key, value in remote_docker_conf_d.items() 162 if value is not None 163 } 164 ) 165 else: 166 d_client = docker.from_env() 167 168 if not skip_plugins: 169 for plugin_name, plugin_config in plugins_settings.items(): 170 try: 171 d_plugin = d_client.plugins.get(plugin_name) 172 if plugin_config.remove: 173 d_plugin.remove(force=plugin_config.remove == "force") 174 continue 175 except docker.errors.NotFound: 176 d_plugin = d_client.plugins.install( 177 remote_name=plugin_config.image, 178 local_name=plugin_name, 179 ) 180 plugin_config_d = plugin_config.model_dump() 181 plugin_config_d["name"] = plugin_name 182 d_plugin.configure(plugin_config) 183 # TODO: prune option 184 if not skip_swarm and swarm_settings: 185 ctx.invoke(swarm_update, stack_name=stack_name) 186 if not skip_nodes: 187 for node_name in nodes_settings.keys(): 188 ctx.invoke(node_update, node=node_name, stack_name=stack_name) 189 # TODO prune 190 if (not skip_propagate_config) and (not skip_plugins): 191 for node_name in nodes_settings.keys(): 192 ctx.invoke( 193 deploy, 194 skip_plugins=False, 195 skip_nodes=True, 196 skip_swarm=True, 197 skip_propagate_config=True, 198 skip_stack_deploy=True, 199 as_remote_node=node_name, 200 stack_name=stack_name, 201 ) 202 if not skip_stack_deploy: 203 # TODO prune 204 205 # stack_settings = stacks_settings[stack_name] 206 if as_remote_node: 207 # TODO: support ssh 208 raise NotImplementedError("Stack commands cannot be run on a remote node") 209 # NOTE: below doesn't do things the right way 210 # 211 # cmd = ["docker"] 212 # 213 # this instead adds behavior expected from docker stack, 214 # alike that of docker compose 215 # TODO: this is python code, but they don't provide a python API 216 cmd = ["docker-sdp"] 217 218 cmd.extend(["stack", "deploy"]) 219 cmd.append(stack_name) 220 cmd.extend(["--compose-file", compose_file.name]) 221 if detach is not None: 222 cmd.append(f"--detach={'true' if detach else 'false'}") 223 224 run_cmd(cmd) 225 if force_service_update: 226 if as_remote_node: 227 # TODO: support ssh 228 raise NotImplementedError( 229 "forcing a service update cannot be run on a remote node" 230 ) 231 232 services_settings: ConfigServices = stack_settings.services 233 services_settings_d = services_settings.model_dump() 234 235 for service_name in services_settings_d.keys(): 236 cmd = ["docker", "service", "update"] 237 cmd.append("--force") 238 cmd.append(f"{stack_name}_{service_name}") 239 240 run_cmd(cmd) 241 242 243 # TODO: make these into commands 244 # 245 # TIP: if you're looking for a way to force-restart stuff, 246 # 247 # docker stack services -q $stack | xargs -rt -n1 -- docker service update --force 248 # 249 # from https://stackoverflow.com/a/71724439/6353323 250 # 251 # can recreate everything 252 # 253 # docker service update --force $service 254 # 255 # from https://stackoverflow.com/a/44110795/6353323 256 # 257 # will just update the unchanged 258 259 # TODO: join swarm command 260 261 262 @main.group() 263 def swarm(): 264 """ 265 wrapper for docker swarm 266 267 Not needed: 268 - join-token 269 - leave 270 - unlock 271 - unlock-key 272 """ 273 pass 274 275 276 _swarm_init_keys = ( 277 "task_history_retention_limit", 278 "snapshot_interval", 279 "keep_old_snapshots", 280 "log_entries_for_slow_followers", 281 "heartbeat_tick", 282 "dispatcher_heartbeat_period", 283 "signing_ca_cert", 284 "signing_ca_key", 285 ) 286 287 288 @swarm.command("init") 289 @_infile_option 290 @click.option("--force-new-cluster", is_flag=True) 291 @click.argument("node_name", type=str) 292 def swarm_init( 293 infile: TextIO, 294 force_new_cluster: bool, 295 node_name: str, 296 ): 297 """wrapper for docker swarm init""" 298 config = injest_config(infile) 299 300 swarm_settings = config.swarm_settings 301 swarm_nodes = config.swarm_nodes 302 assert swarm_nodes, "There are no nodes defined!" 303 the_node: ConfigNode = swarm_nodes[node_name] 304 305 d_client = docker.from_env() 306 307 kwargs = {} 308 309 swarm_settings_d = swarm_settings.model_dump() 310 for key in _swarm_init_keys: 311 if key in swarm_settings_d: 312 kwargs[key] = swarm_settings_d[key] 313 314 if the_node.Status and the_node.Status.Addr: 315 kwargs["advertise_addr"] = the_node.Status.Addr 316 if the_node.ManagerStatus and the_node.ManagerStatus.Addr: 317 kwargs["listen_addr"] = the_node.ManagerStatus.Addr 318 if the_node.DataPathAddr: 319 kwargs["data_path_addr"] = the_node.DataPathAddr 320 321 click.echo(d_client.swarm.init(force_new_cluster=force_new_cluster, **kwargs)) 322 323 324 main.add_command(swarm_init) 325 326 327 # BUG: this command wasn't working, so I've disabled it. 328 # @swarm.command("join") 329 @click.argument("node", type=str) 330 @click.argument("stack_name", type=str) 331 @_infile_option 332 @click.option("--token", type=str) 333 def swarm_join(stack_name: str, infile: TextIO, node: str, token): 334 """wrapper for docker swarm join""" 335 config = injest_config(infile) 336 config, nodes, _, _, _ = satisfy_config(config, stack_name) 337 338 d_client = docker.from_env() 339 340 the_node = nodes[node] 341 342 kwargs: Dict[str, object] = {} 343 344 kwargs["remote_addrs"] = [ 345 man_node.Status.Addr 346 for node_name, man_node in nodes.items() 347 if isinstance(man_node, ConfigNode) 348 and node_name != node 349 and man_node.Status 350 and man_node.Status.Addr 351 ] 352 353 if the_node.Status and the_node.Status.Addr: 354 kwargs["advertise_addr"] = the_node.Status.Addr 355 if the_node.ManagerStatus and the_node.ManagerStatus.Addr: 356 kwargs["listen_addr"] = the_node.ManagerStatus.Addr 357 if the_node.DataPathAddr: 358 kwargs["data_path_addr"] = the_node.DataPathAddr 359 360 click.echo(json.dumps(kwargs)) 361 362 assert d_client.swarm.join(join_token=token, **kwargs) 363 364 365 # BUG: I've disabled this command, see above 366 # main.add_command(swarm_join) 367 368 369 @swarm.command("update") 370 @click.argument("stack_name", type=str) 371 @_infile_option 372 @click.option("--rotate-worker-token", is_flag=True) 373 @click.option("--rotate-manager-token", is_flag=True) 374 @click.option("--rotate-manager-unlock-key", is_flag=True) 375 def swarm_update( 376 stack_name: str, 377 infile: TextIO, 378 rotate_worker_token, 379 rotate_manager_token, 380 rotate_manager_unlock_key, 381 ): 382 """wrapper for docker swarm update""" 383 config = injest_config(infile) 384 config, _, swarm_settings, _, _ = satisfy_config(config, stack_name) 385 386 d_client = docker.from_env() 387 388 assert d_client.swarm.attrs, ( 389 "Not connected to a swarm! You need to either init or join!" 390 ) 391 392 kwargs = {} 393 394 swarm_settings_d = swarm_settings.model_dump() 395 396 for key in _swarm_init_keys: 397 if key in swarm_settings_d: 398 kwargs[key] = swarm_settings_d[key] 399 400 d_client.swarm.update( 401 rotate_worker_token=rotate_worker_token, 402 rotate_manager_token=rotate_manager_token, 403 rotate_manager_unlock_key=rotate_manager_unlock_key, 404 **kwargs, 405 ) 406 407 408 @main.group() 409 def node(): 410 """ 411 wrapper for docker node 412 413 Not needed: 414 - demote 415 - inspect 416 - ls 417 - promote 418 - ps 419 - rm 420 """ 421 pass 422 423 424 @node.command("update") 425 @click.argument("stack_name", type=str) 426 @_infile_option 427 @click.argument("node", type=str) 428 def node_update(stack_name: str, infile: TextIO, node): 429 """wrapper for docker node update""" 430 config = injest_config(infile) 431 config, nodes, _, _, _ = satisfy_config(config, stack_name) 432 433 rm = node not in nodes 434 rm_force = False 435 436 d_client = docker.from_env() 437 try: 438 d_node = d_client.nodes.get(node) 439 except docker.errors.APIError as e: 440 if e.status_code == 404: 441 if rm: 442 click.echo(f"node {node} was already removed") 443 return 444 else: 445 click.echo(f"node {node} needs to join the swarm") 446 # TODO: do this automatically 447 raise NotImplementedError(f"can't yet auto-join node {node}") from e 448 else: 449 raise e 450 451 if not rm: 452 node_settings: ConfigNode = nodes[node] 453 454 spec = node_settings.Spec 455 456 if isinstance(spec, ConfigNodeRMSpec): 457 rm = True 458 rm_force = spec.Role == "rm-force" 459 460 node_settings.Spec = ConfigNodeSpec(Role="worker", Availability="drain") 461 462 if not rm_force: 463 # TODO: may need to actually promote or demote 464 assert d_node.update(node_settings.Spec.model_dump()), ( 465 "failed to update node" 466 ) 467 d_node.reload() 468 if rm: 469 assert d_node.remove(force=rm_force), "failed to remove node" 470 471 472 def handle_ecxeption(exc_type, exc_value, exc_traceback): 473 import yaml.parser 474 475 # TODO: put this to stderr 476 click.echo(f"{exc_type.__name__} {exc_type}") 477 click.echo() 478 try: 479 raise exc_value 480 except docker.errors.APIError as e: 481 if debug: 482 click.echo(traceback.format_exc()) 483 click.echo() 484 click.echo( 485 json.dumps( 486 { 487 "is_client_error": e.is_client_error(), 488 "is_error": e.is_error(), 489 "is_server_error": e.is_server_error(), 490 "status_code": e.status_code, 491 "strerror": e.strerror, 492 "errno": e.errno, 493 "filename": e.filename, 494 "filename2": e.filename2, 495 } 496 ) 497 ) 498 click.echo() 499 click.echo(e) 500 click.echo() 501 click.echo(e.explanation) 502 except Exception as e: 503 if debug: 504 click.echo(traceback.format_exc()) 505 click.echo() 506 click.echo(e) 507 508 509 sys.excepthook = handle_ecxeption