packet.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2019 Dan Halbert for Adafruit Industries
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `adafruit_bluefruit_connect.packet`
 24  ====================================================
 25  
 26  Bluefruit Connect App packet superclass
 27  
 28  * Author(s): Dan Halbert for Adafruit Industries
 29  
 30  """
 31  
 32  import struct
 33  
 34  
 35  class Packet:
 36      """
 37      A Bluefruit app controller packet. A packet consists of these bytes, in order:
 38  
 39        - '!' - The first byte is always an exclamation point.
 40        - *type* - A single byte designating the type of packet: b'A', b'B', etc.
 41        - *data ...* - Multiple bytes of data, varying by packet type.
 42        - *checksum* - A single byte checksum, computed by adding up all the data bytes and
 43            inverting the sum.
 44  
 45      This is an abstract class.
 46      """
 47  
 48      # All concrete subclasses should define these class attributes. They're listed here
 49      # as a reminder and to make pylint happy.
 50      # _FMT_PARSE is the whole packet.
 51      _FMT_PARSE = None
 52      # In each class, set PACKET_LENGTH = struct.calcsize(_FMT_PARSE).
 53      PACKET_LENGTH = None
 54      # _FMT_CONSTRUCT does not include the trailing byte, which is the checksum.
 55      _FMT_CONSTRUCT = None
 56      # The first byte of the prefix is always b'!'. The second byte is the type code.
 57      _TYPE_HEADER = None
 58  
 59      _type_to_class = dict()
 60  
 61      @classmethod
 62      def register_packet_type(cls):
 63          """Register a new packet type, using this class and its ``cls._TYPE_HEADER``.
 64          The ``from_bytes()`` and ``from_stream()`` methods will then be able
 65          to recognize this type of packet.
 66          """
 67  
 68          Packet._type_to_class[cls._TYPE_HEADER] = cls
 69  
 70      @classmethod
 71      def from_bytes(cls, packet):
 72          """Create an appropriate object of the correct class for the given packet bytes.
 73          Validate packet type, length, and checksum.
 74          """
 75          if len(packet) < 3:
 76              raise ValueError("Packet too short")
 77          packet_class = cls._type_to_class.get(packet[0:2], None)
 78          if not packet_class:
 79              raise ValueError("Unregistered packet type {}".format(packet[0:2]))
 80  
 81          # In case this was called from a subclass, make sure the parsed
 82          # type matches up with the current class.
 83          if not issubclass(packet_class, cls):
 84              raise ValueError("Packet type is not a {}".format(cls.__name__))
 85  
 86          if len(packet) != packet_class.PACKET_LENGTH:
 87              raise ValueError("Wrong length packet")
 88  
 89          if cls.checksum(packet[0:-1]) != packet[-1]:
 90              raise ValueError("Bad checksum")
 91  
 92          # A packet class may do further validation of the data.
 93          return packet_class.parse_private(packet)
 94  
 95      @classmethod
 96      def from_stream(cls, stream):
 97          """Read the next packet from the incoming stream. Wait as long as the timeout
 98          set on stream, using its own preset timeout.
 99          Return None if there was no input, otherwise return an instance
100          of one of the packet classes registered with ``Packet``.
101          Raise an Error if the packet was not recognized or was malformed
102  
103          :param stream stream: an input stream that provides standard stream read operations,
104            such as ``ble.UARTServer`` or ``busio.UART``.
105          """
106          # Loop looking for a b'!' packet start. If the buffer has overflowed,
107          # or there's been some other problem, we may need to skip some characters
108          # to get to a packet start.
109          while True:
110              start = stream.read(1)
111              if not start:
112                  # Timeout: nothing read.
113                  return None
114              if start == b"!":
115                  # Found start of packet.
116                  packet_type = stream.read(1)
117                  if not packet_type:
118                      # Timeout: nothing more read.
119                      return None
120                  break
121              # Didn't find a packet start. Loop and try again.
122  
123          header = start + packet_type
124          packet_class = cls._type_to_class.get(header, None)
125          if not packet_class:
126              raise ValueError("Unregistered packet type {}".format(header))
127          packet = header + stream.read(packet_class.PACKET_LENGTH - 2)
128          return cls.from_bytes(packet)
129  
130      @classmethod
131      def parse_private(cls, packet):
132          """Default implementation for subclasses.
133          Assumes arguments to ``__init__()`` are exactly the values parsed using
134          ``cls._FMT_PARSE``. Subclasses may need to reimplement if that assumption
135          is not correct.
136  
137          Do not call this directly. It's called from ``cls.from_bytes()``.
138          pylint makes it difficult to call this method _parse(), hence the name.
139          """
140          return cls(*struct.unpack(cls._FMT_PARSE, packet))
141  
142      @staticmethod
143      def checksum(partial_packet):
144          """Compute checksum for bytes, not including the checksum byte itself."""
145          return ~sum(partial_packet) & 0xFF
146  
147      def add_checksum(self, partial_packet):
148          """Compute the checksum of partial_packet and return a new bytes
149          with the checksum appended.
150          """
151          return partial_packet + bytes((self.checksum(partial_packet),))