/ oci_reg_helper
oci_reg_helper
1 #!/usr/bin/env python3 2 3 """OCI Registry Helper 4 5 Usage: 6 ocirh <subcommand> [<args>...] 7 8 Subcommands: 9 repos Lists repositories in the registry. Repos correspond to images 10 pushed to the registry. 11 tags Lists tags of the given repository. 12 manifests Lists manifests of the given repository for the given tag. 13 rmi Removes a tag from an image. If given tag is the only tag, 14 removes the image. 15 gc Runs garbage collection on the registry. Requires SSH public key 16 access to registry server. 17 rmr Removes given repository from the registry. Requires SSH public 18 key access to registry server. 19 20 Examples: 21 Suppose we have an image called 'fedora-toolbox' tagged with 'latest'. 22 23 ocirh repos 24 ocirh tags fedora-toolbox 25 ocirh manifests fedora-toolbox latest 26 ocirh rmi fedora-toolbox latest 27 ocirh gc 28 ocirh rmr fedora-toolbox 29 """ 30 import http.client 31 import json 32 import logging 33 import math 34 import subprocess 35 36 from docopt import docopt 37 from rich import print 38 from rich.console import Group 39 from rich.logging import RichHandler 40 from rich.panel import Panel 41 from rich.table import Table 42 from rich.text import Text 43 from rich.traceback import install 44 from rich.tree import Tree 45 46 install(show_locals=True) 47 48 # Rich logging handler 49 FORMAT = "%(message)s" 50 logging.basicConfig( 51 level="NOTSET", 52 format=FORMAT, 53 datefmt="[%X]", 54 handlers=[RichHandler(rich_tracebacks=True)], 55 ) 56 log = logging.getLogger("rich") 57 58 59 # Taken from https://stackoverflow.com/a/14822210 60 # 61 # How this function works: 62 # If size_bytes == 0, returns 0 B. 63 # size_name is a tuple containing binary prefixes for bytes. 64 # 65 # math.log takes the logarithm of size_bytes to base 1024. 66 # math.floor rounds down the result of math.log to the nearest integer. 67 # int ensures the result of math.floor is of type int, and stores it in i. 68 # The value of i is used to determine which binary prefix to use from 69 # size_name. 70 # 71 # math.pow returns the value of 1024 raised to the power of i, stores it in p. 72 # 73 # round takes the value of size_bytes, divides it by p, and stores the result 74 # in s at precision of 2 decimal places. 75 # 76 # A formatted string with size s and binary prefix size_name[i] is returned. 77 def convert_size(size_bytes: int) -> str: 78 """ 79 Converts a decimal integer of bytes to its respective binary-prefixed size. 80 81 Parameters: 82 size_bytes (int): A decimal integer. 83 84 Returns: 85 (str): Binary-prefixed size of size_bytes formatted as a string. 86 """ 87 if size_bytes == 0: 88 return "0 B" 89 size_name = ("B", "KiB", "MiB", "GiB") 90 i = int(math.floor(math.log(size_bytes, 1024))) 91 p = math.pow(1024, i) 92 s = round(size_bytes / p, 2) 93 return "%s %s" % (s, size_name[i]) 94 95 96 REGISTRY_URL = "registry.hyperreal.coffee" 97 98 99 def get_auth() -> str: 100 """ 101 Get the base64 encoded password for registry autentication. 102 103 Returns: 104 auth (str): A string containing the base64 encoded password. 105 """ 106 try: 107 with open("/run/user/1000/containers/auth.json", "r") as authfile: 108 json_data = json.loads(authfile.read()) 109 except Exception as ex: 110 log.exception(ex) 111 112 auth = json_data["auths"][REGISTRY_URL]["auth"] 113 return auth 114 115 116 def get_headers() -> dict: 117 """ 118 Returns headers for HTTP request authentication to the registry server. 119 120 Returns: 121 headers (dict): A dict of HTTP headers 122 """ 123 return { 124 "Accept": "application/vnd.oci.image.manifest.v1+json", 125 "Authorization": "Basic " + get_auth(), 126 } 127 128 129 def get_json_response(request: str, url: str) -> dict: 130 """ 131 Connects to registry and returns response data as JSON. 132 133 Parameters: 134 request (str): A string like "GET" or "DELETE" 135 url (str) : A string containing the URL of the requested data 136 137 Returns: 138 json_data (dict): JSON data as a dict object 139 """ 140 conn = http.client.HTTPSConnection(REGISTRY_URL) 141 headers = get_headers() 142 try: 143 conn.request(request, url, "", headers) 144 res = conn.getresponse() 145 data = res.read() 146 json_data = json.loads(data.decode("utf-8")) 147 except Exception as ex: 148 log.exception(ex) 149 150 return json_data 151 152 153 def get_repositories(): 154 """ 155 Prints a Rich Tree that lists the repositories of the registry. 156 """ 157 158 json_data = get_json_response("GET", "/v2/_catalog") 159 repo_tree = Tree("[green]Repositories") 160 for repo in json_data["repositories"]: 161 repo_tree.add("[blue]%s" % repo) 162 163 print(repo_tree) 164 165 166 def get_tags(repo: str): 167 """ 168 Prints a Rich Tree that lists the tags for the given repository. 169 170 Parameters: 171 repo (str): A string containing the name of the repo 172 """ 173 json_data = get_json_response("GET", "/v2/" + repo + "/tags/list") 174 tags_tree = Tree("[green]%s tags" % repo) 175 for tag in json_data["tags"]: 176 tags_tree.add("[cyan]:%s" % tag) 177 178 print(tags_tree) 179 180 181 def get_manifests(repo: str, tag: str): 182 """ 183 Prints a Rich grid table that displays the manifests and metadata of the 184 image repository. 185 186 Parameters: 187 repo (str): A string containing the name of the repo 188 tag (str) : A string containing the tag of the desired image 189 """ 190 json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag) 191 192 grid_meta = Table.grid(expand=True) 193 grid_meta.add_column() 194 grid_meta.add_column() 195 meta_schema_version_key = Text("Schema version") 196 meta_schema_version_key.stylize("bold green", 0) 197 meta_schema_version_value = Text(str(json_data["schemaVersion"])) 198 meta_media_type_key = Text("Media type") 199 meta_media_type_key.stylize("bold green", 0) 200 meta_media_type_value = Text(json_data["mediaType"]) 201 grid_meta.add_row(meta_schema_version_key, meta_schema_version_value) 202 grid_meta.add_row(meta_media_type_key, meta_media_type_value) 203 204 grid_config = Table.grid(expand=True) 205 grid_config.add_column() 206 grid_config.add_column() 207 config_media_type_key = Text("Media type") 208 config_media_type_key.stylize("bold green", 0) 209 config_media_type_value = Text(json_data["config"]["mediaType"]) 210 config_digest_key = Text("Digest") 211 config_digest_key.stylize("bold green", 0) 212 config_digest_value = Text(json_data["config"]["digest"]) 213 config_size_key = Text("Size") 214 config_size_key.stylize("bold green", 0) 215 config_size_value = Text(convert_size(json_data["config"]["size"])) 216 grid_config.add_row(config_media_type_key, config_media_type_value) 217 grid_config.add_row(config_digest_key, config_digest_value) 218 grid_config.add_row(config_size_key, config_size_value) 219 220 grid_annotations = Table.grid(expand=True) 221 grid_annotations.add_column() 222 grid_annotations.add_column() 223 for item in json_data["annotations"].items(): 224 annotations_item_key = Text(item[0]) 225 annotations_item_key.stylize("bold green", 0) 226 annotations_item_value = Text(item[1]) 227 grid_annotations.add_row(annotations_item_key, annotations_item_value) 228 229 total_size = sum(layer.get("size") for layer in json_data["layers"]) 230 table_layers = Table(box=None, show_footer=True) 231 table_layers.add_column( 232 "Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:" 233 ) 234 table_layers.add_column( 235 "Size", 236 justify="left", 237 style="cyan", 238 no_wrap=True, 239 footer=convert_size(total_size), 240 ) 241 for layer in json_data["layers"]: 242 table_layers.add_row(layer.get("digest"), convert_size(layer.get("size"))) 243 244 panel_group = Group( 245 Panel(grid_meta, title="[bold blue]Metadata"), 246 Panel(grid_config, title="[bold blue]Config"), 247 Panel(grid_annotations, title="Annotations"), 248 Panel( 249 table_layers, 250 title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"), 251 ), 252 ) 253 print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag))) 254 255 256 def delete_image(repo: str, tag: str): 257 """ 258 Removes the given tag from the image. If the given tag is the only tag, 259 removes the image. 260 261 Parameters: 262 repo (str): A string containing the name of the repo 263 tag (str) : A string containing the tag to be removed 264 """ 265 try: 266 conn = http.client.HTTPSConnection(REGISTRY_URL) 267 headers = get_headers() 268 conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers) 269 res = conn.getresponse() 270 docker_content_digest = res.getheader("Docker-Content-Digest") 271 except Exception as ex: 272 log.exception(ex) 273 274 try: 275 conn.request( 276 "DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers 277 ) 278 except Exception as ex: 279 log.exception(ex) 280 281 print("Untagged %s:%s successfully" % (repo, tag)) 282 283 284 def garbage_collection(): 285 """ 286 Runs garbage collection command on the remote registry server. Requires 287 SSH public key access. 288 """ 289 command = "/usr/local/bin/registry-gc" 290 291 try: 292 ssh = subprocess.Popen( 293 ["ssh", "%s" % REGISTRY_URL, command], 294 shell=False, 295 stdout=subprocess.PIPE, 296 stderr=subprocess.PIPE, 297 text=True, 298 ) 299 result = ssh.stdout.readlines() 300 if result == []: 301 log.error(ssh.stderr.readlines()) 302 else: 303 print(result) 304 except Exception as ex: 305 log.exception(ex) 306 307 308 def remove_repo(repo: str): 309 """ 310 Runs command on remote registry server to remove the given repo. 311 312 Parameters: 313 repo (str): A string containing the name of the repo. 314 """ 315 command = "/usr/local/bin/registry-rm-repo " + repo 316 317 try: 318 ssh = subprocess.Popen( 319 ["ssh", "%s" % REGISTRY_URL, command], 320 shell=False, 321 stdout=subprocess.PIPE, 322 stderr=subprocess.PIPE, 323 text=True, 324 ) 325 result = ssh.stdout.readlines() 326 if result == []: 327 log.error(ssh.stderr.readlines()) 328 else: 329 print(result) 330 except Exception as ex: 331 log.exception(ex) 332 333 334 if __name__ == "__main__": 335 args = docopt(__doc__, options_first=True) 336 match args["<subcommand>"]: 337 case "repos": 338 get_repositories() 339 case "tags": 340 get_tags(args["<args>"][0]) 341 case "manifests": 342 get_manifests(args["<args>"][0], args["<args>"][1]) 343 case "rmi": 344 delete_image(args["<args>"][0], args["<args>"][1]) 345 case "gc": 346 garbage_collection() 347 case "rmr": 348 remove_repo(args["<args>"]) 349 case _: 350 if args["<subcommand>"] in ["help", None]: 351 exit(subprocess.call(["python3", "ocirh", "--help"])) 352 else: 353 exit( 354 "%r is not a ocirh subcommand. See 'ocirh --help." 355 % args["<subcommand>"] 356 )