a2rchery.py
1 #!/usr/bin/env python3 2 3 # SPDX-FileCopyrightText: 2019 4am 4 # 5 # SPDX-License-Identifier: MIT 6 7 # (c) 2018 by 4am 8 # MIT-licensed 9 10 import argparse 11 import collections 12 import json 13 import os 14 import sys 15 16 __version__ = "1.0" 17 __date__ = "2018-09-08" 18 __progname__ = "a2rchery" 19 __displayname__ = __progname__ + " by 4am (" + __date__ + ")" 20 21 # chunk IDs for .a2r files 22 kA2R2 = b"A2R2" 23 kINFO = b"INFO" 24 kSTRM = b"STRM" 25 kMETA = b"META" 26 # other things defined in the .a2r specification 27 kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other") 28 kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown") 29 kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+") 30 kCaptureTiming = 1 31 kCaptureBits = 2 32 kCaptureXTiming = 3 33 34 # strings and things, for print routines and error messages 35 sEOF = "Unexpected EOF" 36 sBadChunkSize = "Bad chunk size" 37 dNoYes = {False:"no",True:"yes"} 38 tQuarters = (".00",".25",".50",".75") 39 dTiming = {kCaptureTiming:"timing",kCaptureBits:"bits",kCaptureXTiming:"xtiming"} 40 41 # errors that may be raised 42 class A2RError(Exception): pass # base class 43 class A2REOFError(A2RError): pass 44 class A2RFormatError(A2RError): pass 45 class A2RHeaderError(A2RError): pass 46 class A2RHeaderError_NoA2R2(A2RHeaderError): pass 47 class A2RHeaderError_NoFF(A2RHeaderError): pass 48 class A2RHeaderError_NoLF(A2RHeaderError): pass 49 class A2RINFOFormatError(A2RFormatError): pass 50 class A2RINFOFormatError_BadVersion(A2RINFOFormatError): pass 51 class A2RINFOFormatError_BadDiskType(A2RINFOFormatError): pass 52 class A2RINFOFormatError_BadWriteProtected(A2RINFOFormatError): pass 53 class A2RINFOFormatError_BadSynchronized(A2RINFOFormatError): pass 54 class A2RINFOFormatError_BadCleaned(A2RINFOFormatError): pass 55 class A2RINFOFormatError_BadCreator(A2RINFOFormatError): pass 56 class A2RSTRMFormatError(A2RFormatError): pass 57 class A2RMETAFormatError(A2RFormatError): pass 58 class A2RMETAFormatError_DuplicateKey(A2RFormatError): pass 59 class A2RMETAFormatError_BadValue(A2RFormatError): pass 60 class A2RMETAFormatError_BadLanguage(A2RFormatError): pass 61 class A2RMETAFormatError_BadRAM(A2RFormatError): pass 62 class A2RMETAFormatError_BadMachine(A2RFormatError): pass 63 64 class A2RParseError(A2RError): 65 pass 66 67 def from_uint32(b): 68 return int.from_bytes(b, byteorder="little") 69 from_uint16=from_uint32 70 71 def to_uint32(b): 72 return b.to_bytes(4, byteorder="little") 73 74 def to_uint16(b): 75 return b.to_bytes(2, byteorder="little") 76 77 def to_uint8(b): 78 return b.to_bytes(1, byteorder="little") 79 80 def raise_if(cond, e, s=""): 81 if cond: raise e(s) 82 83 class DiskImage: # base class 84 def __init__(self, filename=None, stream=None): 85 raise_if(not filename and not stream, A2RError, "no input") 86 self.filename = filename 87 self.tracks = [] 88 89 def seek(self, track_num): 90 """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)""" 91 return None 92 93 class A2RValidator: 94 def validate_info_version(self, version): 95 raise_if(version != b'\x01', A2RINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version) 96 97 def validate_info_disk_type(self, disk_type): 98 raise_if(disk_type not in (b'\x01',b'\x02'), A2RINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type) 99 100 def validate_info_write_protected(self, write_protected): 101 raise_if(write_protected not in (b'\x00',b'\x01'), A2RINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected) 102 103 def validate_info_synchronized(self, synchronized): 104 raise_if(synchronized not in (b'\x00',b'\x01'), A2RINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized) 105 106 def validate_info_creator(self, creator_as_bytes): 107 raise_if(len(creator_as_bytes) > 32, A2RINFOFormatError_BadCreator, "Creator is longer than 32 bytes") 108 try: 109 creator_as_bytes.decode("UTF-8") 110 except: 111 raise_if(True, A2RINFOFormatError_BadCreator, "Creator is not valid UTF-8") 112 113 def encode_info_creator(self, creator_as_string): 114 creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ") 115 self.validate_info_creator(creator_as_bytes) 116 return creator_as_bytes 117 118 def decode_info_creator(self, creator_as_bytes): 119 self.validate_info_creator(creator_as_bytes) 120 return creator_as_bytes.decode("UTF-8").strip() 121 122 def validate_metadata(self, metadata_as_bytes): 123 try: 124 metadata = metadata_as_bytes.decode("UTF-8") 125 except: 126 raise A2RMETAFormatError("Metadata is not valid UTF-8") 127 128 def decode_metadata(self, metadata_as_bytes): 129 self.validate_metadata(metadata_as_bytes) 130 return metadata_as_bytes.decode("UTF-8") 131 132 def validate_metadata_value(self, value): 133 raise_if("\t" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains tab character)") 134 raise_if("\n" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)") 135 raise_if("|" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)") 136 137 def validate_metadata_language(self, language): 138 raise_if(language and (language not in kLanguages), A2RMETAFormatError_BadLanguage, "Invalid metadata language") 139 140 def validate_metadata_requires_ram(self, requires_ram): 141 raise_if(requires_ram and (requires_ram not in kRequiresRAM), A2RMETAFormatError_BadRAM, "Invalid metadata requires_ram") 142 143 def validate_metadata_requires_machine(self, requires_machine): 144 raise_if(requires_machine and (requires_machine not in kRequiresMachine), A2RMETAFormatError_BadMachine, "Invalid metadata requires_machine") 145 146 class A2RReader(DiskImage, A2RValidator): 147 def __init__(self, filename=None, stream=None): 148 DiskImage.__init__(self, filename, stream) 149 self.info = collections.OrderedDict() 150 self.meta = collections.OrderedDict() 151 self.flux = collections.OrderedDict() 152 153 with stream or open(filename, "rb") as f: 154 header_raw = f.read(8) 155 raise_if(len(header_raw) != 8, A2REOFError, sEOF) 156 self.__process_header(header_raw) 157 while True: 158 chunk_id = f.read(4) 159 if not chunk_id: break 160 raise_if(len(chunk_id) != 4, A2REOFError, sEOF) 161 chunk_size_raw = f.read(4) 162 raise_if(len(chunk_size_raw) != 4, A2REOFError, sEOF) 163 chunk_size = from_uint32(chunk_size_raw) 164 data = f.read(chunk_size) 165 raise_if(len(data) != chunk_size, A2REOFError, sEOF) 166 if chunk_id == kINFO: 167 raise_if(chunk_size != 36, A2RFormatError, sBadChunkSize) 168 self.__process_info(data) 169 elif chunk_id == kSTRM: 170 self.__process_strm(data) 171 elif chunk_id == kMETA: 172 self.__process_meta(data) 173 174 def __process_header(self, data): 175 raise_if(data[:4] != kA2R2, A2RHeaderError_NoA2R2, "Magic string 'A2R2' not present at offset 0") 176 raise_if(data[4] != 0xFF, A2RHeaderError_NoFF, "Magic byte 0xFF not present at offset 4") 177 raise_if(data[5:8] != b"\x0A\x0D\x0A", A2RHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5") 178 179 def __process_info(self, data): 180 version = data[0] 181 self.validate_info_version(to_uint8(version)) 182 disk_type = data[33] 183 self.validate_info_disk_type(to_uint8(disk_type)) 184 write_protected = data[34] 185 self.validate_info_write_protected(to_uint8(write_protected)) 186 synchronized = data[35] 187 self.validate_info_synchronized(to_uint8(synchronized)) 188 creator = self.decode_info_creator(data[1:33]) 189 self.info["version"] = version # int 190 self.info["disk_type"] = disk_type # int 191 self.info["write_protected"] = (write_protected == 1) # boolean 192 self.info["synchronized"] = (synchronized == 1) # boolean 193 self.info["creator"] = creator # string 194 195 def __process_strm(self, data): 196 raise_if(data[-1] != 0xFF, A2RSTRMFormatError, "Missing phase reset at end of STRM chunk") 197 i = 0 198 while i < len(data) - 1: 199 location = data[i] 200 capture_type = data[i+1] 201 data_length = from_uint32(data[i+2:i+6]) 202 tick_count = from_uint32(data[i+6:i+10]) 203 if location not in self.flux: 204 self.flux[location] = [] 205 self.flux[location].append( 206 {"capture_type": capture_type, 207 "data_length": data_length, 208 "tick_count": tick_count, 209 "data": data[i+10:i+10+data_length]} 210 ) 211 i = i + 10 + data_length 212 213 def __process_meta(self, metadata_as_bytes): 214 metadata = self.decode_metadata(metadata_as_bytes) 215 for line in metadata.split("\n"): 216 if not line: continue 217 columns_raw = line.split("\t") 218 raise_if(len(columns_raw) != 2, A2RMETAFormatError, "Malformed metadata") 219 key, value_raw = columns_raw 220 raise_if(key in self.meta, A2RMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key) 221 values = value_raw.split("|") 222 if key == "language": 223 list(map(self.validate_metadata_language, values)) 224 elif key == "requires_ram": 225 list(map(self.validate_metadata_requires_ram, values)) 226 elif key == "requires_machine": 227 list(map(self.validate_metadata_requires_machine, values)) 228 self.meta[key] = len(values) == 1 and values[0] or tuple(values) 229 230 def to_json(self): 231 j = {"a2r": {"info":self.info, "meta":self.meta}} 232 return json.dumps(j, indent=2) 233 234 class A2RWriter(A2RValidator): 235 def __init__(self, creator): 236 self.info = collections.OrderedDict() 237 self.meta = collections.OrderedDict() 238 self.flux = collections.OrderedDict() 239 240 def from_json(self, json_string): 241 j = json.loads(json_string) 242 root = [x for x in j.keys()].pop() 243 self.meta.update(j[root]["meta"]) 244 245 def build_head(self): 246 chunk = bytearray() 247 chunk.extend(kA2R2) # magic bytes 248 chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes 249 return chunk 250 251 def build_info(self): 252 chunk = bytearray() 253 chunk.extend(kINFO) # chunk ID 254 chunk.extend(to_uint32(36)) # chunk size (constant) 255 version_raw = to_uint8(self.info["version"]) 256 self.validate_info_version(version_raw) 257 creator_raw = self.encode_info_creator(self.info["creator"]) 258 disk_type_raw = to_uint8(self.info["disk_type"]) 259 self.validate_info_disk_type(disk_type_raw) 260 write_protected_raw = to_uint8(self.info["write_protected"]) 261 self.validate_info_write_protected(write_protected_raw) 262 synchronized_raw = to_uint8(self.info["synchronized"]) 263 self.validate_info_synchronized(synchronized_raw) 264 chunk.extend(version_raw) # version (int, probably 1) 265 chunk.extend(creator_raw) # creator 266 chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch) 267 chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes) 268 chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes) 269 return chunk 270 271 def build_strm(self): 272 data_raw = bytearray() 273 for location in self.flux.keys(): 274 for capture in self.flux[location]: 275 data_raw.extend(to_uint8(location)) # track where this capture happened 276 data_raw.extend(to_uint8(capture["capture_type"])) # 1 = timing, 2 = bits, 3 = xtiming 277 data_raw.extend(to_uint32(len(capture["data"]))) # data length in bytes 278 data_raw.extend(to_uint32(capture["tick_count"])) # estimated loop point in ticks 279 data_raw.extend(capture["data"]) 280 data_raw.extend(b"\xFF") 281 chunk = bytearray() 282 chunk.extend(kSTRM) # chunk ID 283 chunk.extend(to_uint32(len(data_raw))) # chunk size 284 chunk.extend(data_raw) # all stream data 285 return chunk 286 287 def build_meta(self): 288 if not self.meta: return b"" 289 meta_tmp = {} 290 for key, value_raw in self.meta.items(): 291 if type(value_raw) == str: 292 values = [value_raw] 293 else: 294 values = value_raw 295 meta_tmp[key] = values 296 list(map(self.validate_metadata_value, values)) 297 if key == "language": 298 list(map(self.validate_metadata_language, values)) 299 elif key == "requires_ram": 300 list(map(self.validate_metadata_requires_ram, values)) 301 elif key == "requires_machine": 302 list(map(self.validate_metadata_requires_machine, values)) 303 data = b"\x0A".join( 304 [k.encode("UTF-8") + \ 305 b"\x09" + \ 306 "|".join(v).encode("UTF-8") \ 307 for k, v in meta_tmp.items()]) + b"\x0A" 308 chunk = bytearray() 309 chunk.extend(kMETA) # chunk ID 310 chunk.extend(to_uint32(len(data))) # chunk size 311 chunk.extend(data) 312 return chunk 313 314 def write(self, stream): 315 stream.write(self.build_head()) 316 stream.write(self.build_info()) 317 stream.write(self.build_strm()) 318 stream.write(self.build_meta()) 319 320 #---------- command line interface ---------- 321 322 class BaseCommand: 323 def __init__(self, name): 324 self.name = name 325 326 def setup(self, subparser, description=None, epilog=None, help=".a2r disk image", formatter_class=argparse.HelpFormatter): 327 self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class) 328 self.parser.add_argument("file", help=help) 329 self.parser.set_defaults(action=self) 330 331 def __call__(self, args): 332 self.a2r_image = A2RReader(args.file) 333 334 class CommandVerify(BaseCommand): 335 def __init__(self): 336 BaseCommand.__init__(self, "verify") 337 338 def setup(self, subparser): 339 BaseCommand.setup(self, subparser, 340 description="Verify file structure and metadata of a .a2r disk image (produces no output unless a problem is found)") 341 342 class CommandDump(BaseCommand): 343 kWidth = 30 344 345 def __init__(self): 346 BaseCommand.__init__(self, "dump") 347 348 def setup(self, subparser): 349 BaseCommand.setup(self, subparser, 350 description="Print all available information and metadata in a .a2r disk image") 351 352 def __call__(self, args): 353 BaseCommand.__call__(self, args) 354 self.print_flux() 355 self.print_meta() 356 self.print_info() 357 358 def print_info(self): 359 print("INFO: Format version:".ljust(self.kWidth), "%d" % self.a2r_image.info["version"]) 360 print("INFO: Disk type:".ljust(self.kWidth), ("5.25-inch", "3.5-inch")[self.a2r_image.info["disk_type"]-1]) 361 print("INFO: Write protected:".ljust(self.kWidth), dNoYes[self.a2r_image.info["write_protected"]]) 362 print("INFO: Track synchronized:".ljust(self.kWidth), dNoYes[self.a2r_image.info["synchronized"]]) 363 print("INFO: Creator:".ljust(self.kWidth), self.a2r_image.info["creator"]) 364 365 def print_flux(self): 366 for location in self.a2r_image.flux: 367 for flux_record in self.a2r_image.flux[location]: 368 print(("STRM: Track %d%s" % (location/4, tQuarters[location%4])).ljust(self.kWidth), 369 dTiming[flux_record["capture_type"]], "capture,", 370 flux_record["tick_count"], "ticks") 371 372 def print_meta(self): 373 if not self.a2r_image.meta: return 374 for key, values in self.a2r_image.meta.items(): 375 if type(values) == str: 376 values = [values] 377 print(("META: " + key + ":").ljust(self.kWidth), values[0]) 378 for value in values[1:]: 379 print("META: ".ljust(self.kWidth), value) 380 381 class CommandExport(BaseCommand): 382 def __init__(self): 383 BaseCommand.__init__(self, "export") 384 385 def setup(self, subparser): 386 BaseCommand.setup(self, subparser, 387 description="Export (as JSON) all information and metadata from a .a2r disk image") 388 389 def __call__(self, args): 390 BaseCommand.__call__(self, args) 391 print(self.a2r_image.to_json()) 392 393 class WriterBaseCommand(BaseCommand): 394 def __call__(self, args): 395 BaseCommand.__call__(self, args) 396 self.args = args 397 # maintain creator if there is one, otherwise use default 398 self.output = A2RWriter(self.a2r_image.info.get("creator", __displayname__)) 399 self.output.flux = self.a2r_image.flux.copy() 400 self.output.info = self.a2r_image.info.copy() 401 self.output.meta = self.a2r_image.meta.copy() 402 self.update() 403 tmpfile = args.file + ".chery" 404 with open(tmpfile, "wb") as f: 405 self.output.write(f) 406 os.rename(tmpfile, args.file) 407 408 class CommandEdit(WriterBaseCommand): 409 def __init__(self): 410 WriterBaseCommand.__init__(self, "edit") 411 412 def setup(self, subparser): 413 WriterBaseCommand.setup(self, 414 subparser, 415 description="Edit information and metadata in a .a2r disk image", 416 epilog="""Tips: 417 418 - Use repeated flags to edit multiple fields at once. 419 - Use "key:" with no value to delete a metadata field. 420 - Keys are case-sensitive. 421 - Some values have format restrictions; read the .a2r specification.""", 422 help=".a2r disk image (modified in place)", 423 formatter_class=argparse.RawDescriptionHelpFormatter) 424 self.parser.add_argument("-i", "--info", type=str, action="append", 425 help="""change information field. 426 INFO format is "key:value". 427 Acceptable keys are disk_type, write_protected, synchronized, creator, version. 428 Other keys are ignored. 429 For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""") 430 self.parser.add_argument("-m", "--meta", type=str, action="append", 431 help="""change metadata field. 432 META format is "key:value". 433 Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram, 434 requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""") 435 436 def update(self): 437 # add all new info fields 438 for i in self.args.info or (): 439 k, v = i.split(":", 1) 440 if k in ("write_protected","synchronized"): 441 v = v.lower() in ("1", "true", "yes") 442 self.output.info[k] = v 443 # add all new metadata fields, and delete empty ones 444 for m in self.args.meta or (): 445 k, v = m.split(":", 1) 446 v = v.split("|") 447 if len(v) == 1: 448 v = v[0] 449 if v: 450 self.output.meta[k] = v 451 elif k in self.output.meta.keys(): 452 del self.output.meta[k] 453 454 class CommandImport(WriterBaseCommand): 455 def __init__(self): 456 WriterBaseCommand.__init__(self, "import") 457 458 def setup(self, subparser): 459 WriterBaseCommand.setup(self, subparser, 460 description="Import JSON file to update metadata in a .a2r disk image") 461 462 def update(self): 463 self.output.from_json(sys.stdin.read()) 464 465 if __name__ == "__main__": 466 import sys 467 raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s)) 468 cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandExport(), CommandImport()] 469 parser = argparse.ArgumentParser(prog=__progname__, 470 description="""A multi-purpose tool for manipulating .a2r disk images. 471 472 See '""" + __progname__ + """ <command> -h' for help on individual commands.""", 473 formatter_class=argparse.RawDescriptionHelpFormatter) 474 parser.add_argument("-v", "--version", action="version", version=__displayname__) 475 sp = parser.add_subparsers(dest="command", help="command") 476 for command in cmds: 477 command.setup(sp) 478 args = parser.parse_args() 479 args.action(args)