/ adafruit_espatcontrol / adafruit_espatcontrol_socket.py
adafruit_espatcontrol_socket.py
 1  """A 'socket' compatible interface thru the ESP AT command set"""
 2  from micropython import const
 3  
 4  _the_interface = None  # pylint: disable=invalid-name
 5  
 6  
 7  def set_interface(iface):
 8      """Helper to set the global internet interface"""
 9      global _the_interface  # pylint: disable=global-statement, invalid-name
10      _the_interface = iface
11  
12  
13  SOCK_STREAM = const(1)
14  AF_INET = const(2)
15  
16  # pylint: disable=too-many-arguments, unused-argument
17  def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
18      """Given a hostname and a port name, return a 'socket.getaddrinfo'
19      compatible list of tuples. Honestly, we ignore anything but host & port"""
20      if not isinstance(port, int):
21          raise RuntimeError("port must be an integer")
22      ipaddr = _the_interface.nslookup(host)
23      return [(AF_INET, socktype, proto, "", (ipaddr, port))]
24  
25  
26  # pylint: enable=too-many-arguments, unused-argument
27  
28  
29  # pylint: disable=unused-argument, redefined-builtin, invalid-name
30  class socket:
31      """A simplified implementation of the Python 'socket' class, for connecting
32      through an interface to a remote device"""
33  
34      def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None):
35          if family != AF_INET:
36              raise RuntimeError("Only AF_INET family supported")
37          if type != SOCK_STREAM:
38              raise RuntimeError("Only SOCK_STREAM type supported")
39          self._buffer = b""
40          self.settimeout(0)
41  
42      def connect(self, address, conntype=None):
43          """Connect the socket to the 'address' (which should be dotted quad IP). 'conntype'
44          is an extra that may indicate SSL or not, depending on the underlying interface"""
45          host, port = address
46          if not _the_interface.socket_connect(
47              conntype, host, port, keepalive=10, retries=3
48          ):
49              raise RuntimeError("Failed to connect to host", host)
50          self._buffer = b""
51  
52      def send(self, data):  # pylint: disable=no-self-use
53          """Send some data to the socket"""
54          _the_interface.socket_send(data)
55  
56      def readline(self):
57          """Attempt to return as many bytes as we can up to but not including '\r\n'"""
58          if b"\r\n" not in self._buffer:
59              # there's no line already in there, read some more
60              self._buffer = self._buffer + _the_interface.socket_receive(timeout=3)
61              # print(self._buffer)
62          firstline, self._buffer = self._buffer.split(b"\r\n", 1)
63          return firstline
64  
65      def recv(self, num=0):
66          """Read up to 'num' bytes from the socket, this may be buffered internally!
67          If 'num' isnt specified, return everything in the buffer."""
68          if num == 0:
69              # read as much as we can
70              ret = self._buffer + _the_interface.socket_receive(timeout=self._timeout)
71              self._buffer = b""
72          else:
73              ret = self._buffer[:num]
74              self._buffer = self._buffer[num:]
75          return ret
76  
77      def close(self):
78          """Close the socket, after reading whatever remains"""
79          # read whatever's left
80          self._buffer = self._buffer + _the_interface.socket_receive(
81              timeout=self._timeout
82          )
83          _the_interface.socket_disconnect()
84  
85      def settimeout(self, value):
86          """Set the read timeout for sockets, if value is 0 it will block"""
87          self._timeout = value
88  
89  
90  # pylint: enable=unused-argument, redefined-builtin, invalid-name