/ oci_reg_helper
oci_reg_helper
  1  #!/usr/bin/env python3
  2  
  3  """OCI Registry Helper
  4  
  5  Usage:
  6      ocirh <subcommand> [<args>...]
  7  
  8  Subcommands:
  9      repos           Lists repositories in the registry. Repos correspond to images
 10                      pushed to the registry.
 11      tags            Lists tags of the given repository.
 12      manifests       Lists manifests of the given repository for the given tag.
 13      rmi             Removes a tag from an image. If given tag is the only tag,
 14                      removes the image.
 15      gc              Runs garbage collection on the registry. Requires SSH public key
 16                      access to registry server.
 17      rmr             Removes given repository from the registry. Requires SSH public
 18                      key access to registry server.
 19  
 20  Examples:
 21      Suppose we have an image called 'fedora-toolbox' tagged with 'latest'.
 22  
 23      ocirh repos
 24      ocirh tags fedora-toolbox
 25      ocirh manifests fedora-toolbox latest
 26      ocirh rmi fedora-toolbox latest
 27      ocirh gc
 28      ocirh rmr fedora-toolbox
 29  """
 30  import http.client
 31  import json
 32  import logging
 33  import math
 34  import subprocess
 35  
 36  from docopt import docopt
 37  from rich import print
 38  from rich.console import Group
 39  from rich.logging import RichHandler
 40  from rich.panel import Panel
 41  from rich.table import Table
 42  from rich.text import Text
 43  from rich.traceback import install
 44  from rich.tree import Tree
 45  
 46  install(show_locals=True)
 47  
 48  # Rich logging handler
 49  FORMAT = "%(message)s"
 50  logging.basicConfig(
 51      level="NOTSET",
 52      format=FORMAT,
 53      datefmt="[%X]",
 54      handlers=[RichHandler(rich_tracebacks=True)],
 55  )
 56  log = logging.getLogger("rich")
 57  
 58  
 59  # Taken from https://stackoverflow.com/a/14822210
 60  #
 61  # How this function works:
 62  #   If size_bytes == 0, returns 0 B.
 63  #   size_name is a tuple containing binary prefixes for bytes.
 64  #
 65  #   math.log takes the logarithm of size_bytes to base 1024.
 66  #   math.floor rounds down the result of math.log to the nearest integer.
 67  #   int ensures the result of math.floor is of type int, and stores it in i.
 68  #   The value of i is used to determine which binary prefix to use from
 69  #   size_name.
 70  #
 71  #   math.pow returns the value of 1024 raised to the power of i, stores it in p.
 72  #
 73  #   round takes the value of size_bytes, divides it by p, and stores the result
 74  #   in s at precision of 2 decimal places.
 75  #
 76  #   A formatted string with size s and binary prefix size_name[i] is returned.
 77  def convert_size(size_bytes: int) -> str:
 78      """
 79      Converts a decimal integer of bytes to its respective binary-prefixed size.
 80  
 81          Parameters:
 82              size_bytes (int): A decimal integer.
 83  
 84          Returns:
 85              (str): Binary-prefixed size of size_bytes formatted as a string.
 86      """
 87      if size_bytes == 0:
 88          return "0 B"
 89      size_name = ("B", "KiB", "MiB", "GiB")
 90      i = int(math.floor(math.log(size_bytes, 1024)))
 91      p = math.pow(1024, i)
 92      s = round(size_bytes / p, 2)
 93      return "%s %s" % (s, size_name[i])
 94  
 95  
 96  REGISTRY_URL = "registry.hyperreal.coffee"
 97  
 98  
 99  def get_auth() -> str:
100      """
101      Get the base64 encoded password for registry autentication.
102  
103          Returns:
104              auth (str): A string containing the base64 encoded password.
105      """
106      try:
107          with open("/run/user/1000/containers/auth.json", "r") as authfile:
108              json_data = json.loads(authfile.read())
109      except Exception as ex:
110          log.exception(ex)
111  
112      auth = json_data["auths"][REGISTRY_URL]["auth"]
113      return auth
114  
115  
116  def get_headers() -> dict:
117      """
118      Returns headers for HTTP request authentication to the registry server.
119  
120          Returns:
121              headers (dict): A dict of HTTP headers
122      """
123      return {
124          "Accept": "application/vnd.oci.image.manifest.v1+json",
125          "Authorization": "Basic " + get_auth(),
126      }
127  
128  
129  def get_json_response(request: str, url: str) -> dict:
130      """
131      Connects to registry and returns response data as JSON.
132  
133          Parameters:
134              request (str): A string like "GET" or "DELETE"
135              url (str)    : A string containing the URL of the requested data
136  
137          Returns:
138              json_data (dict): JSON data as a dict object
139      """
140      conn = http.client.HTTPSConnection(REGISTRY_URL)
141      headers = get_headers()
142      try:
143          conn.request(request, url, "", headers)
144          res = conn.getresponse()
145          data = res.read()
146          json_data = json.loads(data.decode("utf-8"))
147      except Exception as ex:
148          log.exception(ex)
149  
150      return json_data
151  
152  
153  def get_repositories():
154      """
155      Prints a Rich Tree that lists the repositories of the registry.
156      """
157  
158      json_data = get_json_response("GET", "/v2/_catalog")
159      repo_tree = Tree("[green]Repositories")
160      for repo in json_data["repositories"]:
161          repo_tree.add("[blue]%s" % repo)
162  
163      print(repo_tree)
164  
165  
166  def get_tags(repo: str):
167      """
168      Prints a Rich Tree that lists the tags for the given repository.
169  
170          Parameters:
171              repo (str): A string containing the name of the repo
172      """
173      json_data = get_json_response("GET", "/v2/" + repo + "/tags/list")
174      tags_tree = Tree("[green]%s tags" % repo)
175      for tag in json_data["tags"]:
176          tags_tree.add("[cyan]:%s" % tag)
177  
178      print(tags_tree)
179  
180  
181  def get_manifests(repo: str, tag: str):
182      """
183      Prints a Rich grid table that displays the manifests and metadata of the
184      image repository.
185  
186          Parameters:
187              repo (str): A string containing the name of the repo
188              tag (str) : A string containing the tag of the desired image
189      """
190      json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag)
191  
192      grid_meta = Table.grid(expand=True)
193      grid_meta.add_column()
194      grid_meta.add_column()
195      meta_schema_version_key = Text("Schema version")
196      meta_schema_version_key.stylize("bold green", 0)
197      meta_schema_version_value = Text(str(json_data["schemaVersion"]))
198      meta_media_type_key = Text("Media type")
199      meta_media_type_key.stylize("bold green", 0)
200      meta_media_type_value = Text(json_data["mediaType"])
201      grid_meta.add_row(meta_schema_version_key, meta_schema_version_value)
202      grid_meta.add_row(meta_media_type_key, meta_media_type_value)
203  
204      grid_config = Table.grid(expand=True)
205      grid_config.add_column()
206      grid_config.add_column()
207      config_media_type_key = Text("Media type")
208      config_media_type_key.stylize("bold green", 0)
209      config_media_type_value = Text(json_data["config"]["mediaType"])
210      config_digest_key = Text("Digest")
211      config_digest_key.stylize("bold green", 0)
212      config_digest_value = Text(json_data["config"]["digest"])
213      config_size_key = Text("Size")
214      config_size_key.stylize("bold green", 0)
215      config_size_value = Text(convert_size(json_data["config"]["size"]))
216      grid_config.add_row(config_media_type_key, config_media_type_value)
217      grid_config.add_row(config_digest_key, config_digest_value)
218      grid_config.add_row(config_size_key, config_size_value)
219  
220      grid_annotations = Table.grid(expand=True)
221      grid_annotations.add_column()
222      grid_annotations.add_column()
223      for item in json_data["annotations"].items():
224          annotations_item_key = Text(item[0])
225          annotations_item_key.stylize("bold green", 0)
226          annotations_item_value = Text(item[1])
227          grid_annotations.add_row(annotations_item_key, annotations_item_value)
228  
229      total_size = sum(layer.get("size") for layer in json_data["layers"])
230      table_layers = Table(box=None, show_footer=True)
231      table_layers.add_column(
232          "Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:"
233      )
234      table_layers.add_column(
235          "Size",
236          justify="left",
237          style="cyan",
238          no_wrap=True,
239          footer=convert_size(total_size),
240      )
241      for layer in json_data["layers"]:
242          table_layers.add_row(layer.get("digest"), convert_size(layer.get("size")))
243  
244      panel_group = Group(
245          Panel(grid_meta, title="[bold blue]Metadata"),
246          Panel(grid_config, title="[bold blue]Config"),
247          Panel(grid_annotations, title="Annotations"),
248          Panel(
249              table_layers,
250              title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"),
251          ),
252      )
253      print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag)))
254  
255  
256  def delete_image(repo: str, tag: str):
257      """
258      Removes the given tag from the image. If the given tag is the only tag,
259      removes the image.
260  
261          Parameters:
262              repo (str): A string containing the name of the repo
263              tag (str) : A string containing the tag to be removed
264      """
265      try:
266          conn = http.client.HTTPSConnection(REGISTRY_URL)
267          headers = get_headers()
268          conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers)
269          res = conn.getresponse()
270          docker_content_digest = res.getheader("Docker-Content-Digest")
271      except Exception as ex:
272          log.exception(ex)
273  
274      try:
275          conn.request(
276              "DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers
277          )
278      except Exception as ex:
279          log.exception(ex)
280  
281      print("Untagged %s:%s successfully" % (repo, tag))
282  
283  
284  def garbage_collection():
285      """
286      Runs garbage collection command on the remote registry server. Requires
287      SSH public key access.
288      """
289      command = "/usr/local/bin/registry-gc"
290  
291      try:
292          ssh = subprocess.Popen(
293              ["ssh", "%s" % REGISTRY_URL, command],
294              shell=False,
295              stdout=subprocess.PIPE,
296              stderr=subprocess.PIPE,
297              text=True,
298          )
299          result = ssh.stdout.readlines()
300          if result == []:
301              log.error(ssh.stderr.readlines())
302          else:
303              print(result)
304      except Exception as ex:
305          log.exception(ex)
306  
307  
308  def remove_repo(repo: str):
309      """
310      Runs command on remote registry server to remove the given repo.
311  
312          Parameters:
313              repo (str): A string containing the name of the repo.
314      """
315      command = "/usr/local/bin/registry-rm-repo " + repo
316  
317      try:
318          ssh = subprocess.Popen(
319              ["ssh", "%s" % REGISTRY_URL, command],
320              shell=False,
321              stdout=subprocess.PIPE,
322              stderr=subprocess.PIPE,
323              text=True,
324          )
325          result = ssh.stdout.readlines()
326          if result == []:
327              log.error(ssh.stderr.readlines())
328          else:
329              print(result)
330      except Exception as ex:
331          log.exception(ex)
332  
333  
334  if __name__ == "__main__":
335      args = docopt(__doc__, options_first=True)
336      match args["<subcommand>"]:
337          case "repos":
338              get_repositories()
339          case "tags":
340              get_tags(args["<args>"][0])
341          case "manifests":
342              get_manifests(args["<args>"][0], args["<args>"][1])
343          case "rmi":
344              delete_image(args["<args>"][0], args["<args>"][1])
345          case "gc":
346              garbage_collection()
347          case "rmr":
348              remove_repo(args["<args>"])
349          case _:
350              if args["<subcommand>"] in ["help", None]:
351                  exit(subprocess.call(["python3", "ocirh", "--help"]))
352              else:
353                  exit(
354                      "%r is not a ocirh subcommand. See 'ocirh --help."
355                      % args["<subcommand>"]
356                  )