/ adafruit_espatcontrol / adafruit_espatcontrol_socket.py
adafruit_espatcontrol_socket.py
1 """A 'socket' compatible interface thru the ESP AT command set""" 2 from micropython import const 3 4 _the_interface = None # pylint: disable=invalid-name 5 6 7 def set_interface(iface): 8 """Helper to set the global internet interface""" 9 global _the_interface # pylint: disable=global-statement, invalid-name 10 _the_interface = iface 11 12 13 SOCK_STREAM = const(1) 14 AF_INET = const(2) 15 16 # pylint: disable=too-many-arguments, unused-argument 17 def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): 18 """Given a hostname and a port name, return a 'socket.getaddrinfo' 19 compatible list of tuples. Honestly, we ignore anything but host & port""" 20 if not isinstance(port, int): 21 raise RuntimeError("port must be an integer") 22 ipaddr = _the_interface.nslookup(host) 23 return [(AF_INET, socktype, proto, "", (ipaddr, port))] 24 25 26 # pylint: enable=too-many-arguments, unused-argument 27 28 29 # pylint: disable=unused-argument, redefined-builtin, invalid-name 30 class socket: 31 """A simplified implementation of the Python 'socket' class, for connecting 32 through an interface to a remote device""" 33 34 def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None): 35 if family != AF_INET: 36 raise RuntimeError("Only AF_INET family supported") 37 if type != SOCK_STREAM: 38 raise RuntimeError("Only SOCK_STREAM type supported") 39 self._buffer = b"" 40 self.settimeout(0) 41 42 def connect(self, address, conntype=None): 43 """Connect the socket to the 'address' (which should be dotted quad IP). 'conntype' 44 is an extra that may indicate SSL or not, depending on the underlying interface""" 45 host, port = address 46 if not _the_interface.socket_connect( 47 conntype, host, port, keepalive=10, retries=3 48 ): 49 raise RuntimeError("Failed to connect to host", host) 50 self._buffer = b"" 51 52 def send(self, data): # pylint: disable=no-self-use 53 """Send some data to the socket""" 54 _the_interface.socket_send(data) 55 56 def readline(self): 57 """Attempt to return as many bytes as we can up to but not including '\r\n'""" 58 if b"\r\n" not in self._buffer: 59 # there's no line already in there, read some more 60 self._buffer = self._buffer + _the_interface.socket_receive(timeout=3) 61 # print(self._buffer) 62 firstline, self._buffer = self._buffer.split(b"\r\n", 1) 63 return firstline 64 65 def recv(self, num=0): 66 """Read up to 'num' bytes from the socket, this may be buffered internally! 67 If 'num' isnt specified, return everything in the buffer.""" 68 if num == 0: 69 # read as much as we can 70 ret = self._buffer + _the_interface.socket_receive(timeout=self._timeout) 71 self._buffer = b"" 72 else: 73 ret = self._buffer[:num] 74 self._buffer = self._buffer[num:] 75 return ret 76 77 def close(self): 78 """Close the socket, after reading whatever remains""" 79 # read whatever's left 80 self._buffer = self._buffer + _the_interface.socket_receive( 81 timeout=self._timeout 82 ) 83 _the_interface.socket_disconnect() 84 85 def settimeout(self, value): 86 """Set the read timeout for sockets, if value is 0 it will block""" 87 self._timeout = value 88 89 90 # pylint: enable=unused-argument, redefined-builtin, invalid-name