/ tools / mkdfu.py
mkdfu.py
  1  #!/usr/bin/env python
  2  #
  3  # Copyright 2020 Espressif Systems (Shanghai) PTE LTD
  4  #
  5  # Licensed under the Apache License, Version 2.0 (the "License");
  6  # you may not use this file except in compliance with the License.
  7  # You may obtain a copy of the License at
  8  #
  9  #     http://www.apache.org/licenses/LICENSE-2.0
 10  #
 11  # Unless required by applicable law or agreed to in writing, software
 12  # distributed under the License is distributed on an "AS IS" BASIS,
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  # See the License for the specific language governing permissions and
 15  # limitations under the License.
 16  #
 17  # This program creates archives compatible with ESP32-S* ROM DFU implementation.
 18  #
 19  # The archives are in CPIO format. Each file which needs to be flashed is added to the archive
 20  # as a separate file. In addition to that, a special index file, 'dfuinfo0.dat', is created.
 21  # This file must be the first one in the archive. It contains binary structures describing each
 22  # subsequent file (for example, where the file needs to be flashed/loaded).
 23  
 24  from collections import namedtuple
 25  from future.utils import iteritems
 26  import argparse
 27  import hashlib
 28  import json
 29  import os
 30  import struct
 31  import zlib
 32  
 33  try:
 34      import typing
 35  except ImportError:
 36      # Only used for type annotations
 37      pass
 38  
 39  try:
 40      from itertools import izip as zip
 41  except ImportError:
 42      # Python 3
 43      pass
 44  
 45  # CPIO ("new ASCII") format related things
 46  CPIO_MAGIC = b"070701"
 47  CPIO_STRUCT = b"=6s" + b"8s" * 13
 48  CPIOHeader = namedtuple(
 49      "CPIOHeader",
 50      [
 51          "magic",
 52          "ino",
 53          "mode",
 54          "uid",
 55          "gid",
 56          "nlink",
 57          "mtime",
 58          "filesize",
 59          "devmajor",
 60          "devminor",
 61          "rdevmajor",
 62          "rdevminor",
 63          "namesize",
 64          "check",
 65      ],
 66  )
 67  CPIO_TRAILER = "TRAILER!!!"
 68  
 69  
 70  def make_cpio_header(
 71      filename_len, file_len, is_trailer=False
 72  ):  # type: (int, int, bool) -> CPIOHeader
 73      """ Returns CPIOHeader for the given file name and file size """
 74  
 75      def as_hex(val):  # type: (int) -> bytes
 76          return "{:08x}".format(val).encode("ascii")
 77  
 78      hex_0 = as_hex(0)
 79      mode = hex_0 if is_trailer else as_hex(0o0100644)
 80      nlink = as_hex(1) if is_trailer else hex_0
 81      return CPIOHeader(
 82          magic=CPIO_MAGIC,
 83          ino=hex_0,
 84          mode=mode,
 85          uid=hex_0,
 86          gid=hex_0,
 87          nlink=nlink,
 88          mtime=hex_0,
 89          filesize=as_hex(file_len),
 90          devmajor=hex_0,
 91          devminor=hex_0,
 92          rdevmajor=hex_0,
 93          rdevminor=hex_0,
 94          namesize=as_hex(filename_len),
 95          check=hex_0,
 96      )
 97  
 98  
 99  # DFU format related things
100  # Structure of one entry in dfuinfo0.dat
101  DFUINFO_STRUCT = b"<I I 64s 16s"
102  DFUInfo = namedtuple("DFUInfo", ["address", "flags", "name", "md5"])
103  DFUINFO_FILE = "dfuinfo0.dat"
104  # Structure which gets added at the end of the entire DFU file
105  DFUSUFFIX_STRUCT = b"<H H H H 3s B"
106  DFUSuffix = namedtuple(
107      "DFUSuffix", ["bcd_device", "pid", "vid", "bcd_dfu", "sig", "len"]
108  )
109  ESPRESSIF_VID = 12346
110  # This CRC32 gets added after DFUSUFFIX_STRUCT
111  DFUCRC_STRUCT = b"<I"
112  
113  
114  def dfu_crc(data, crc=0):  # type: (bytes, int) -> int
115      """ Calculate CRC32/JAMCRC of data, with an optional initial value """
116      uint32_max = 0xFFFFFFFF
117      return uint32_max - (zlib.crc32(data, crc) & uint32_max)
118  
119  
120  def pad_bytes(b, multiple, padding=b"\x00"):  # type: (bytes, int, bytes) -> bytes
121      """ Pad 'b' to a length divisible by 'multiple' """
122      padded_len = (len(b) + multiple - 1) // multiple * multiple
123      return b + padding * (padded_len - len(b))
124  
125  
126  class EspDfuWriter(object):
127      def __init__(self, dest_file, pid):  # type: (typing.BinaryIO) -> None
128          self.dest = dest_file
129          self.pid = pid
130          self.entries = []  # type: typing.List[bytes]
131          self.index = []  # type: typing.List[DFUInfo]
132  
133      def add_file(self, flash_addr, path):  # type: (int, str) -> None
134          """ Add file to be written into flash at given address """
135          with open(path, "rb") as f:
136              self._add_cpio_flash_entry(os.path.basename(path), flash_addr, f.read())
137  
138      def finish(self):  # type: () -> None
139          """ Write DFU file """
140          # Prepare and add dfuinfo0.dat file
141          dfuinfo = b"".join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index])
142          self._add_cpio_entry(DFUINFO_FILE, dfuinfo, first=True)
143  
144          # Add CPIO archive trailer
145          self._add_cpio_entry(CPIO_TRAILER, b"", trailer=True)
146  
147          # Combine all the entries and pad the file
148          out_data = b"".join(self.entries)
149          cpio_block_size = 10240
150          out_data = pad_bytes(out_data, cpio_block_size)
151  
152          # Add DFU suffix and CRC
153          dfu_suffix = DFUSuffix(0xFFFF, self.pid, ESPRESSIF_VID, 0x0100, b"UFD", 16)
154          out_data += struct.pack(DFUSUFFIX_STRUCT, *dfu_suffix)
155          out_data += struct.pack(DFUCRC_STRUCT, dfu_crc(out_data))
156  
157          # Finally write the entire binary
158          self.dest.write(out_data)
159  
160      def _add_cpio_flash_entry(
161          self, filename, flash_addr, data
162      ):  # type: (str, int, bytes) -> None
163          md5 = hashlib.md5()
164          md5.update(data)
165          self.index.append(
166              DFUInfo(
167                  address=flash_addr,
168                  flags=0,
169                  name=filename.encode("utf-8"),
170                  md5=md5.digest(),
171              )
172          )
173          self._add_cpio_entry(filename, data)
174  
175      def _add_cpio_entry(
176          self, filename, data, first=False, trailer=False
177      ):  # type: (str, bytes, bool, bool) -> None
178          filename_b = filename.encode("utf-8") + b"\x00"
179          cpio_header = make_cpio_header(len(filename_b), len(data), is_trailer=trailer)
180          entry = pad_bytes(
181              struct.pack(CPIO_STRUCT, *cpio_header) + filename_b, 4
182          ) + pad_bytes(data, 4)
183          if not first:
184              self.entries.append(entry)
185          else:
186              self.entries.insert(0, entry)
187  
188  
189  def action_write(args):
190      writer = EspDfuWriter(args['output_file'], args['pid'])
191      for addr, f in args['files']:
192          print('Adding {} at {:#x}'.format(f, addr))
193          writer.add_file(addr, f)
194      writer.finish()
195      print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name))
196  
197  
198  def main():
199      parser = argparse.ArgumentParser()
200  
201      # Provision to add "info" command
202      subparsers = parser.add_subparsers(dest="command")
203      write_parser = subparsers.add_parser("write")
204      write_parser.add_argument("-o", "--output-file",
205                                help='Filename for storing the output DFU image',
206                                required=True,
207                                type=argparse.FileType("wb"))
208      write_parser.add_argument("--pid",
209                                required=True,
210                                type=lambda h: int(h, 16),
211                                help='Hexa-decimal product indentificator')
212      write_parser.add_argument("--json",
213                                help='Optional file for loading "flash_files" dictionary with <address> <file> items')
214      write_parser.add_argument("files",
215                                metavar="<address> <file>", help='Add <file> at <address>',
216                                nargs="*")
217  
218      args = parser.parse_args()
219  
220      def check_file(file_name):
221          if not os.path.isfile(file_name):
222              raise RuntimeError('{} is not a regular file!'.format(file_name))
223          return file_name
224  
225      files = []
226      if args.files:
227          files += [(int(addr, 0), check_file(f_name)) for addr, f_name in zip(args.files[::2], args.files[1::2])]
228  
229      if args.json:
230          json_dir = os.path.dirname(os.path.abspath(args.json))
231  
232          def process_json_file(path):
233              '''
234              The input path is relative to json_dir. This function makes it relative to the current working
235              directory.
236              '''
237              return check_file(os.path.relpath(os.path.join(json_dir, path), start=os.curdir))
238  
239          with open(args.json) as f:
240              files += [(int(addr, 0),
241                         process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])]
242  
243      files = sorted([(addr, f_name) for addr, f_name in iteritems(dict(files))],
244                     key=lambda x: x[0])  # remove possible duplicates and sort based on the address
245  
246      cmd_args = {'output_file': args.output_file,
247                  'files': files,
248                  'pid': args.pid,
249                  }
250  
251      {'write': action_write
252       }[args.command](cmd_args)
253  
254  
255  if __name__ == "__main__":
256      main()