/ a2woz / a2rchery.py
a2rchery.py
  1  #!/usr/bin/env python3
  2  
  3  # SPDX-FileCopyrightText: 2019 4am
  4  #
  5  # SPDX-License-Identifier: MIT
  6  
  7  # (c) 2018 by 4am
  8  # MIT-licensed
  9  
 10  import argparse
 11  import collections
 12  import json
 13  import os
 14  import sys
 15  
 16  __version__ = "1.0"
 17  __date__ = "2018-09-08"
 18  __progname__ = "a2rchery"
 19  __displayname__ = __progname__ + " by 4am (" + __date__ + ")"
 20  
 21  # chunk IDs for .a2r files
 22  kA2R2 = b"A2R2"
 23  kINFO = b"INFO"
 24  kSTRM = b"STRM"
 25  kMETA = b"META"
 26  # other things defined in the .a2r specification
 27  kLanguages = ("English","Spanish","French","German","Chinese","Japanese","Italian","Dutch","Portuguese","Danish","Finnish","Norwegian","Swedish","Russian","Polish","Turkish","Arabic","Thai","Czech","Hungarian","Catalan","Croatian","Greek","Hebrew","Romanian","Slovak","Ukrainian","Indonesian","Malay","Vietnamese","Other")
 28  kRequiresRAM = ("16K","24K","32K","48K","64K","128K","256K","512K","768K","1M","1.25M","1.5M+","Unknown")
 29  kRequiresMachine = ("2","2+","2e","2c","2e+","2gs","2c+","3","3+")
 30  kCaptureTiming = 1
 31  kCaptureBits = 2
 32  kCaptureXTiming = 3
 33  
 34  # strings and things, for print routines and error messages
 35  sEOF = "Unexpected EOF"
 36  sBadChunkSize = "Bad chunk size"
 37  dNoYes = {False:"no",True:"yes"}
 38  tQuarters = (".00",".25",".50",".75")
 39  dTiming = {kCaptureTiming:"timing",kCaptureBits:"bits",kCaptureXTiming:"xtiming"}
 40  
 41  # errors that may be raised
 42  class A2RError(Exception): pass # base class
 43  class A2REOFError(A2RError): pass
 44  class A2RFormatError(A2RError): pass
 45  class A2RHeaderError(A2RError): pass
 46  class A2RHeaderError_NoA2R2(A2RHeaderError): pass
 47  class A2RHeaderError_NoFF(A2RHeaderError): pass
 48  class A2RHeaderError_NoLF(A2RHeaderError): pass
 49  class A2RINFOFormatError(A2RFormatError): pass
 50  class A2RINFOFormatError_BadVersion(A2RINFOFormatError): pass
 51  class A2RINFOFormatError_BadDiskType(A2RINFOFormatError): pass
 52  class A2RINFOFormatError_BadWriteProtected(A2RINFOFormatError): pass
 53  class A2RINFOFormatError_BadSynchronized(A2RINFOFormatError): pass
 54  class A2RINFOFormatError_BadCleaned(A2RINFOFormatError): pass
 55  class A2RINFOFormatError_BadCreator(A2RINFOFormatError): pass
 56  class A2RSTRMFormatError(A2RFormatError): pass
 57  class A2RMETAFormatError(A2RFormatError): pass
 58  class A2RMETAFormatError_DuplicateKey(A2RFormatError): pass
 59  class A2RMETAFormatError_BadValue(A2RFormatError): pass
 60  class A2RMETAFormatError_BadLanguage(A2RFormatError): pass
 61  class A2RMETAFormatError_BadRAM(A2RFormatError): pass
 62  class A2RMETAFormatError_BadMachine(A2RFormatError): pass
 63  
 64  class A2RParseError(A2RError):
 65      pass
 66  
 67  def from_uint32(b):
 68      return int.from_bytes(b, byteorder="little")
 69  from_uint16=from_uint32
 70  
 71  def to_uint32(b):
 72      return b.to_bytes(4, byteorder="little")
 73  
 74  def to_uint16(b):
 75      return b.to_bytes(2, byteorder="little")
 76  
 77  def to_uint8(b):
 78      return b.to_bytes(1, byteorder="little")
 79  
 80  def raise_if(cond, e, s=""):
 81      if cond: raise e(s)
 82  
 83  class DiskImage: # base class
 84      def __init__(self, filename=None, stream=None):
 85          raise_if(not filename and not stream, A2RError, "no input")
 86          self.filename = filename
 87          self.tracks = []
 88  
 89      def seek(self, track_num):
 90          """returns Track object for the given track, or None if the track is not part of this disk image. track_num can be 0..40 in 0.25 increments (0, 0.25, 0.5, 0.75, 1, &c.)"""
 91          return None
 92  
 93  class A2RValidator:
 94      def validate_info_version(self, version):
 95          raise_if(version != b'\x01', A2RINFOFormatError_BadVersion, "Unknown version (expected 1, found %s)" % version)
 96  
 97      def validate_info_disk_type(self, disk_type):
 98          raise_if(disk_type not in (b'\x01',b'\x02'), A2RINFOFormatError_BadDiskType, "Unknown disk type (expected 1 or 2, found %s)" % disk_type)
 99  
100      def validate_info_write_protected(self, write_protected):
101          raise_if(write_protected not in (b'\x00',b'\x01'), A2RINFOFormatError_BadWriteProtected, "Unknown write protected flag (expected 0 or 1, found %s)" % write_protected)
102  
103      def validate_info_synchronized(self, synchronized):
104          raise_if(synchronized not in (b'\x00',b'\x01'), A2RINFOFormatError_BadSynchronized, "Unknown synchronized flag (expected 0, or 1, found %s)" % synchronized)
105  
106      def validate_info_creator(self, creator_as_bytes):
107          raise_if(len(creator_as_bytes) > 32, A2RINFOFormatError_BadCreator, "Creator is longer than 32 bytes")
108          try:
109              creator_as_bytes.decode("UTF-8")
110          except:
111              raise_if(True, A2RINFOFormatError_BadCreator, "Creator is not valid UTF-8")
112  
113      def encode_info_creator(self, creator_as_string):
114          creator_as_bytes = creator_as_string.encode("UTF-8").ljust(32, b" ")
115          self.validate_info_creator(creator_as_bytes)
116          return creator_as_bytes
117  
118      def decode_info_creator(self, creator_as_bytes):
119          self.validate_info_creator(creator_as_bytes)
120          return creator_as_bytes.decode("UTF-8").strip()
121  
122      def validate_metadata(self, metadata_as_bytes):
123          try:
124              metadata = metadata_as_bytes.decode("UTF-8")
125          except:
126              raise A2RMETAFormatError("Metadata is not valid UTF-8")
127  
128      def decode_metadata(self, metadata_as_bytes):
129          self.validate_metadata(metadata_as_bytes)
130          return metadata_as_bytes.decode("UTF-8")
131  
132      def validate_metadata_value(self, value):
133          raise_if("\t" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains tab character)")
134          raise_if("\n" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains linefeed character)")
135          raise_if("|" in value, A2RMETAFormatError_BadValue, "Invalid metadata value (contains pipe character)")
136  
137      def validate_metadata_language(self, language):
138          raise_if(language and (language not in kLanguages), A2RMETAFormatError_BadLanguage, "Invalid metadata language")
139  
140      def validate_metadata_requires_ram(self, requires_ram):
141          raise_if(requires_ram and (requires_ram not in kRequiresRAM), A2RMETAFormatError_BadRAM, "Invalid metadata requires_ram")
142  
143      def validate_metadata_requires_machine(self, requires_machine):
144          raise_if(requires_machine and (requires_machine not in kRequiresMachine), A2RMETAFormatError_BadMachine, "Invalid metadata requires_machine")
145  
146  class A2RReader(DiskImage, A2RValidator):
147      def __init__(self, filename=None, stream=None):
148          DiskImage.__init__(self, filename, stream)
149          self.info = collections.OrderedDict()
150          self.meta = collections.OrderedDict()
151          self.flux = collections.OrderedDict()
152  
153          with stream or open(filename, "rb") as f:
154              header_raw = f.read(8)
155              raise_if(len(header_raw) != 8, A2REOFError, sEOF)
156              self.__process_header(header_raw)
157              while True:
158                  chunk_id = f.read(4)
159                  if not chunk_id: break
160                  raise_if(len(chunk_id) != 4, A2REOFError, sEOF)
161                  chunk_size_raw = f.read(4)
162                  raise_if(len(chunk_size_raw) != 4, A2REOFError, sEOF)
163                  chunk_size = from_uint32(chunk_size_raw)
164                  data = f.read(chunk_size)
165                  raise_if(len(data) != chunk_size, A2REOFError, sEOF)
166                  if chunk_id == kINFO:
167                      raise_if(chunk_size != 36, A2RFormatError, sBadChunkSize)
168                      self.__process_info(data)
169                  elif chunk_id == kSTRM:
170                      self.__process_strm(data)
171                  elif chunk_id == kMETA:
172                      self.__process_meta(data)
173  
174      def __process_header(self, data):
175          raise_if(data[:4] != kA2R2, A2RHeaderError_NoA2R2, "Magic string 'A2R2' not present at offset 0")
176          raise_if(data[4] != 0xFF, A2RHeaderError_NoFF, "Magic byte 0xFF not present at offset 4")
177          raise_if(data[5:8] != b"\x0A\x0D\x0A", A2RHeaderError_NoLF, "Magic bytes 0x0A0D0A not present at offset 5")
178  
179      def __process_info(self, data):
180          version = data[0]
181          self.validate_info_version(to_uint8(version))
182          disk_type = data[33]
183          self.validate_info_disk_type(to_uint8(disk_type))
184          write_protected = data[34]
185          self.validate_info_write_protected(to_uint8(write_protected))
186          synchronized = data[35]
187          self.validate_info_synchronized(to_uint8(synchronized))
188          creator = self.decode_info_creator(data[1:33])
189          self.info["version"] = version # int
190          self.info["disk_type"] = disk_type # int
191          self.info["write_protected"] = (write_protected == 1) # boolean
192          self.info["synchronized"] = (synchronized == 1) # boolean
193          self.info["creator"] = creator # string
194  
195      def __process_strm(self, data):
196          raise_if(data[-1] != 0xFF, A2RSTRMFormatError, "Missing phase reset at end of STRM chunk")
197          i = 0
198          while i < len(data) - 1:
199              location = data[i]
200              capture_type = data[i+1]
201              data_length = from_uint32(data[i+2:i+6])
202              tick_count = from_uint32(data[i+6:i+10])
203              if location not in self.flux:
204                  self.flux[location] = []
205              self.flux[location].append(
206                  {"capture_type": capture_type,
207                   "data_length": data_length,
208                   "tick_count": tick_count,
209                   "data": data[i+10:i+10+data_length]}
210              )
211              i = i + 10 + data_length
212  
213      def __process_meta(self, metadata_as_bytes):
214          metadata = self.decode_metadata(metadata_as_bytes)
215          for line in metadata.split("\n"):
216              if not line: continue
217              columns_raw = line.split("\t")
218              raise_if(len(columns_raw) != 2, A2RMETAFormatError, "Malformed metadata")
219              key, value_raw = columns_raw
220              raise_if(key in self.meta, A2RMETAFormatError_DuplicateKey, "Duplicate metadata key %s" % key)
221              values = value_raw.split("|")
222              if key == "language":
223                  list(map(self.validate_metadata_language, values))
224              elif key == "requires_ram":
225                  list(map(self.validate_metadata_requires_ram, values))
226              elif key == "requires_machine":
227                  list(map(self.validate_metadata_requires_machine, values))
228              self.meta[key] = len(values) == 1 and values[0] or tuple(values)
229  
230      def to_json(self):
231          j = {"a2r": {"info":self.info, "meta":self.meta}}
232          return json.dumps(j, indent=2)
233  
234  class A2RWriter(A2RValidator):
235      def __init__(self, creator):
236          self.info = collections.OrderedDict()
237          self.meta = collections.OrderedDict()
238          self.flux = collections.OrderedDict()
239  
240      def from_json(self, json_string):
241          j = json.loads(json_string)
242          root = [x for x in j.keys()].pop()
243          self.meta.update(j[root]["meta"])
244  
245      def build_head(self):
246          chunk = bytearray()
247          chunk.extend(kA2R2) # magic bytes
248          chunk.extend(b"\xFF\x0A\x0D\x0A") # more magic bytes
249          return chunk
250  
251      def build_info(self):
252          chunk = bytearray()
253          chunk.extend(kINFO) # chunk ID
254          chunk.extend(to_uint32(36)) # chunk size (constant)
255          version_raw = to_uint8(self.info["version"])
256          self.validate_info_version(version_raw)
257          creator_raw = self.encode_info_creator(self.info["creator"])
258          disk_type_raw = to_uint8(self.info["disk_type"])
259          self.validate_info_disk_type(disk_type_raw)
260          write_protected_raw = to_uint8(self.info["write_protected"])
261          self.validate_info_write_protected(write_protected_raw)
262          synchronized_raw = to_uint8(self.info["synchronized"])
263          self.validate_info_synchronized(synchronized_raw)
264          chunk.extend(version_raw) # version (int, probably 1)
265          chunk.extend(creator_raw) # creator
266          chunk.extend(disk_type_raw) # disk type (1=5.25 inch, 2=3.5 inch)
267          chunk.extend(write_protected_raw) # write-protected (0=no, 1=yes)
268          chunk.extend(synchronized_raw) # tracks synchronized (0=no, 1=yes)
269          return chunk
270  
271      def build_strm(self):
272          data_raw = bytearray()
273          for location in self.flux.keys():
274              for capture in self.flux[location]:
275                  data_raw.extend(to_uint8(location)) # track where this capture happened
276                  data_raw.extend(to_uint8(capture["capture_type"])) # 1 = timing, 2 = bits, 3 = xtiming
277                  data_raw.extend(to_uint32(len(capture["data"]))) # data length in bytes
278                  data_raw.extend(to_uint32(capture["tick_count"])) # estimated loop point in ticks
279                  data_raw.extend(capture["data"])
280          data_raw.extend(b"\xFF")
281          chunk = bytearray()
282          chunk.extend(kSTRM) # chunk ID
283          chunk.extend(to_uint32(len(data_raw))) # chunk size
284          chunk.extend(data_raw) # all stream data
285          return chunk
286  
287      def build_meta(self):
288          if not self.meta: return b""
289          meta_tmp = {}
290          for key, value_raw in self.meta.items():
291              if type(value_raw) == str:
292                  values = [value_raw]
293              else:
294                  values = value_raw
295              meta_tmp[key] = values
296              list(map(self.validate_metadata_value, values))
297              if key == "language":
298                  list(map(self.validate_metadata_language, values))
299              elif key == "requires_ram":
300                  list(map(self.validate_metadata_requires_ram, values))
301              elif key == "requires_machine":
302                  list(map(self.validate_metadata_requires_machine, values))
303          data = b"\x0A".join(
304              [k.encode("UTF-8") + \
305               b"\x09" + \
306               "|".join(v).encode("UTF-8") \
307               for k, v in meta_tmp.items()]) + b"\x0A"
308          chunk = bytearray()
309          chunk.extend(kMETA) # chunk ID
310          chunk.extend(to_uint32(len(data))) # chunk size
311          chunk.extend(data)
312          return chunk
313  
314      def write(self, stream):
315          stream.write(self.build_head())
316          stream.write(self.build_info())
317          stream.write(self.build_strm())
318          stream.write(self.build_meta())
319  
320  #---------- command line interface ----------
321  
322  class BaseCommand:
323      def __init__(self, name):
324          self.name = name
325  
326      def setup(self, subparser, description=None, epilog=None, help=".a2r disk image", formatter_class=argparse.HelpFormatter):
327          self.parser = subparser.add_parser(self.name, description=description, epilog=epilog, formatter_class=formatter_class)
328          self.parser.add_argument("file", help=help)
329          self.parser.set_defaults(action=self)
330  
331      def __call__(self, args):
332          self.a2r_image = A2RReader(args.file)
333  
334  class CommandVerify(BaseCommand):
335      def __init__(self):
336          BaseCommand.__init__(self, "verify")
337  
338      def setup(self, subparser):
339          BaseCommand.setup(self, subparser,
340                            description="Verify file structure and metadata of a .a2r disk image (produces no output unless a problem is found)")
341  
342  class CommandDump(BaseCommand):
343      kWidth = 30
344  
345      def __init__(self):
346          BaseCommand.__init__(self, "dump")
347  
348      def setup(self, subparser):
349          BaseCommand.setup(self, subparser,
350                            description="Print all available information and metadata in a .a2r disk image")
351  
352      def __call__(self, args):
353          BaseCommand.__call__(self, args)
354          self.print_flux()
355          self.print_meta()
356          self.print_info()
357  
358      def print_info(self):
359          print("INFO:  Format version:".ljust(self.kWidth), "%d" % self.a2r_image.info["version"])
360          print("INFO:  Disk type:".ljust(self.kWidth),           ("5.25-inch", "3.5-inch")[self.a2r_image.info["disk_type"]-1])
361          print("INFO:  Write protected:".ljust(self.kWidth),     dNoYes[self.a2r_image.info["write_protected"]])
362          print("INFO:  Track synchronized:".ljust(self.kWidth),  dNoYes[self.a2r_image.info["synchronized"]])
363          print("INFO:  Creator:".ljust(self.kWidth),             self.a2r_image.info["creator"])
364  
365      def print_flux(self):
366          for location in self.a2r_image.flux:
367              for flux_record in self.a2r_image.flux[location]:
368                  print(("STRM:  Track %d%s" % (location/4, tQuarters[location%4])).ljust(self.kWidth),
369                        dTiming[flux_record["capture_type"]], "capture,",
370                        flux_record["tick_count"], "ticks")
371  
372      def print_meta(self):
373          if not self.a2r_image.meta: return
374          for key, values in self.a2r_image.meta.items():
375              if type(values) == str:
376                  values = [values]
377              print(("META:  " + key + ":").ljust(self.kWidth), values[0])
378              for value in values[1:]:
379                  print("META:  ".ljust(self.kWidth), value)
380  
381  class CommandExport(BaseCommand):
382      def __init__(self):
383          BaseCommand.__init__(self, "export")
384  
385      def setup(self, subparser):
386          BaseCommand.setup(self, subparser,
387                            description="Export (as JSON) all information and metadata from a .a2r disk image")
388  
389      def __call__(self, args):
390          BaseCommand.__call__(self, args)
391          print(self.a2r_image.to_json())
392  
393  class WriterBaseCommand(BaseCommand):
394      def __call__(self, args):
395          BaseCommand.__call__(self, args)
396          self.args = args
397          # maintain creator if there is one, otherwise use default
398          self.output = A2RWriter(self.a2r_image.info.get("creator", __displayname__))
399          self.output.flux = self.a2r_image.flux.copy()
400          self.output.info = self.a2r_image.info.copy()
401          self.output.meta = self.a2r_image.meta.copy()
402          self.update()
403          tmpfile = args.file + ".chery"
404          with open(tmpfile, "wb") as f:
405              self.output.write(f)
406          os.rename(tmpfile, args.file)
407  
408  class CommandEdit(WriterBaseCommand):
409      def __init__(self):
410          WriterBaseCommand.__init__(self, "edit")
411  
412      def setup(self, subparser):
413          WriterBaseCommand.setup(self,
414                                  subparser,
415                                  description="Edit information and metadata in a .a2r disk image",
416                                  epilog="""Tips:
417  
418   - Use repeated flags to edit multiple fields at once.
419   - Use "key:" with no value to delete a metadata field.
420   - Keys are case-sensitive.
421   - Some values have format restrictions; read the .a2r specification.""",
422                                  help=".a2r disk image (modified in place)",
423                                  formatter_class=argparse.RawDescriptionHelpFormatter)
424          self.parser.add_argument("-i", "--info", type=str, action="append",
425                                   help="""change information field.
426  INFO format is "key:value".
427  Acceptable keys are disk_type, write_protected, synchronized, creator, version.
428  Other keys are ignored.
429  For boolean fields, use "1" or "true" or "yes" for true, "0" or "false" or "no" for false.""")
430          self.parser.add_argument("-m", "--meta", type=str, action="append",
431                                   help="""change metadata field.
432  META format is "key:value".
433  Standard keys are title, subtitle, publisher, developer, copyright, version, language, requires_ram,
434  requires_machine, notes, side, side_name, contributor, image_date. Other keys are allowed.""")
435  
436      def update(self):
437          # add all new info fields
438          for i in self.args.info or ():
439              k, v = i.split(":", 1)
440              if k in ("write_protected","synchronized"):
441                  v = v.lower() in ("1", "true", "yes")
442              self.output.info[k] = v
443          # add all new metadata fields, and delete empty ones
444          for m in self.args.meta or ():
445              k, v = m.split(":", 1)
446              v = v.split("|")
447              if len(v) == 1:
448                  v = v[0]
449              if v:
450                  self.output.meta[k] = v
451              elif k in self.output.meta.keys():
452                  del self.output.meta[k]
453  
454  class CommandImport(WriterBaseCommand):
455      def __init__(self):
456          WriterBaseCommand.__init__(self, "import")
457  
458      def setup(self, subparser):
459          WriterBaseCommand.setup(self, subparser,
460                                  description="Import JSON file to update metadata in a .a2r disk image")
461  
462      def update(self):
463          self.output.from_json(sys.stdin.read())
464  
465  if __name__ == "__main__":
466      import sys
467      raise_if = lambda cond, e, s="": cond and sys.exit("%s: %s" % (e.__name__, s))
468      cmds = [CommandDump(), CommandVerify(), CommandEdit(), CommandExport(), CommandImport()]
469      parser = argparse.ArgumentParser(prog=__progname__,
470                                       description="""A multi-purpose tool for manipulating .a2r disk images.
471  
472  See '""" + __progname__ + """ <command> -h' for help on individual commands.""",
473                                       formatter_class=argparse.RawDescriptionHelpFormatter)
474      parser.add_argument("-v", "--version", action="version", version=__displayname__)
475      sp = parser.add_subparsers(dest="command", help="command")
476      for command in cmds:
477          command.setup(sp)
478      args = parser.parse_args()
479      args.action(args)