/ ampy / pyboard.py
pyboard.py
  1  #!/usr/bin/env python
  2  
  3  """
  4  pyboard interface
  5  
  6  This module provides the Pyboard class, used to communicate with and
  7  control the pyboard over a serial USB connection.
  8  
  9  Example usage:
 10  
 11      import pyboard
 12      pyb = pyboard.Pyboard('/dev/ttyACM0')
 13  
 14  Or:
 15  
 16      pyb = pyboard.Pyboard('192.168.1.1')
 17  
 18  Then:
 19  
 20      pyb.enter_raw_repl()
 21      pyb.exec('pyb.LED(1).on()')
 22      pyb.exit_raw_repl()
 23  
 24  Note: if using Python2 then pyb.exec must be written as pyb.exec_.
 25  To run a script from the local machine on the board and print out the results:
 26  
 27      import pyboard
 28      pyboard.execfile('test.py', device='/dev/ttyACM0')
 29  
 30  This script can also be run directly.  To execute a local script, use:
 31  
 32      ./pyboard.py test.py
 33  
 34  Or:
 35  
 36      python pyboard.py test.py
 37  
 38  """
 39  
 40  import sys
 41  import time
 42  
 43  _rawdelay = None
 44  
 45  try:
 46      stdout = sys.stdout.buffer
 47  except AttributeError:
 48      # Python2 doesn't have buffer attr
 49      stdout = sys.stdout
 50  
 51  def stdout_write_bytes(b):
 52      b = b.replace(b"\x04", b"")
 53      stdout.write(b)
 54      stdout.flush()
 55  
 56  class PyboardError(BaseException):
 57      pass
 58  
 59  class TelnetToSerial:
 60      def __init__(self, ip, user, password, read_timeout=None):
 61          import telnetlib
 62          self.tn = telnetlib.Telnet(ip, timeout=15)
 63          self.read_timeout = read_timeout
 64          if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
 65              self.tn.write(bytes(user, 'ascii') + b"\r\n")
 66  
 67              if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
 68                  # needed because of internal implementation details of the telnet server
 69                  time.sleep(0.2)
 70                  self.tn.write(bytes(password, 'ascii') + b"\r\n")
 71  
 72                  if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
 73                      # login succesful
 74                      from collections import deque
 75                      self.fifo = deque()
 76                      return
 77  
 78          raise PyboardError('Failed to establish a telnet connection with the board')
 79  
 80      def __del__(self):
 81          self.close()
 82  
 83      def close(self):
 84          try:
 85              self.tn.close()
 86          except:
 87              # the telnet object might not exist yet, so ignore this one
 88              pass
 89  
 90      def read(self, size=1):
 91          while len(self.fifo) < size:
 92              timeout_count = 0
 93              data = self.tn.read_eager()
 94              if len(data):
 95                  self.fifo.extend(data)
 96                  timeout_count = 0
 97              else:
 98                  time.sleep(0.25)
 99                  if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
100                      break
101                  timeout_count += 1
102  
103          data = b''
104          while len(data) < size and len(self.fifo) > 0:
105              data += bytes([self.fifo.popleft()])
106          return data
107  
108      def write(self, data):
109          self.tn.write(data)
110          return len(data)
111  
112      def inWaiting(self):
113          n_waiting = len(self.fifo)
114          if not n_waiting:
115              data = self.tn.read_eager()
116              self.fifo.extend(data)
117              return len(data)
118          else:
119              return n_waiting
120  
121  class Pyboard:
122      def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
123          global _rawdelay
124          _rawdelay = rawdelay
125          if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
126              # device looks like an IP address
127              self.serial = TelnetToSerial(device, user, password, read_timeout=10)
128          else:
129              import serial
130              delayed = False
131              for attempt in range(wait + 1):
132                  try:
133                      self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
134                      break
135                  except (OSError, IOError): # Py2 and Py3 have different errors
136                      if wait == 0:
137                          continue
138                      if attempt == 0:
139                          sys.stdout.write('Waiting {} seconds for pyboard '.format(wait))
140                          delayed = True
141                  time.sleep(1)
142                  sys.stdout.write('.')
143                  sys.stdout.flush()
144              else:
145                  if delayed:
146                      print('')
147                  raise PyboardError('failed to access ' + device)
148              if delayed:
149                  print('')
150  
151      def close(self):
152          self.serial.close()
153  
154      def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
155          data = self.serial.read(min_num_bytes)
156          if data_consumer:
157              data_consumer(data)
158          timeout_count = 0
159          while True:
160              if data.endswith(ending):
161                  break
162              elif self.serial.inWaiting() > 0:
163                  new_data = self.serial.read(1)
164                  data = data + new_data
165                  if data_consumer:
166                      data_consumer(new_data)
167                  timeout_count = 0
168              else:
169                  timeout_count += 1
170                  if timeout is not None and timeout_count >= 100 * timeout:
171                      break
172                  time.sleep(0.01)
173          return data
174  
175      def enter_raw_repl(self):
176          # Brief delay before sending RAW MODE char if requests
177          if _rawdelay > 0:
178              time.sleep(_rawdelay)
179  
180          self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program
181  
182          # flush input (without relying on serial.flushInput())
183          n = self.serial.inWaiting()
184          while n > 0:
185              self.serial.read(n)
186              n = self.serial.inWaiting()
187  
188          self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
189          data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
190          if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
191              print(data)
192              raise PyboardError('could not enter raw repl')
193  
194          self.serial.write(b'\x04') # ctrl-D: soft reset
195          data = self.read_until(1, b'soft reboot\r\n')
196          if not data.endswith(b'soft reboot\r\n'):
197              print(data)
198              raise PyboardError('could not enter raw repl')
199          # By splitting this into 2 reads, it allows boot.py to print stuff,
200          # which will show up after the soft reboot and before the raw REPL.
201          # Modification from original pyboard.py below:
202          #   Add a small delay and send Ctrl-C twice after soft reboot to ensure
203          #   any main program loop in main.py is interrupted.
204          time.sleep(0.5)
205          self.serial.write(b'\x03\x03')
206          # End modification above.
207          data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
208          if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
209              print(data)
210              raise PyboardError('could not enter raw repl')
211  
212      def exit_raw_repl(self):
213          self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
214  
215      def follow(self, timeout, data_consumer=None):
216          # wait for normal output
217          data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
218          if not data.endswith(b'\x04'):
219              raise PyboardError('timeout waiting for first EOF reception')
220          data = data[:-1]
221  
222          # wait for error output
223          data_err = self.read_until(1, b'\x04', timeout=timeout)
224          if not data_err.endswith(b'\x04'):
225              raise PyboardError('timeout waiting for second EOF reception')
226          data_err = data_err[:-1]
227  
228          # return normal and error output
229          return data, data_err
230  
231      def exec_raw_no_follow(self, command):
232          if isinstance(command, bytes):
233              command_bytes = command
234          else:
235              command_bytes = bytes(command, encoding='utf8')
236  
237          # check we have a prompt
238          data = self.read_until(1, b'>')
239          if not data.endswith(b'>'):
240              raise PyboardError('could not enter raw repl')
241  
242          # write command
243          for i in range(0, len(command_bytes), 256):
244              self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
245              time.sleep(0.01)
246          self.serial.write(b'\x04')
247  
248          # check if we could exec command
249          data = self.serial.read(2)
250          if data != b'OK':
251              raise PyboardError('could not exec command')
252  
253      def exec_raw(self, command, timeout=10, data_consumer=None):
254          self.exec_raw_no_follow(command);
255          return self.follow(timeout, data_consumer)
256  
257      def eval(self, expression):
258          ret = self.exec_('print({})'.format(expression))
259          ret = ret.strip()
260          return ret
261  
262      def exec_(self, command):
263          ret, ret_err = self.exec_raw(command)
264          if ret_err:
265              raise PyboardError('exception', ret, ret_err)
266          return ret
267  
268      def execfile(self, filename):
269          with open(filename, 'rb') as f:
270              pyfile = f.read()
271          return self.exec_(pyfile)
272  
273      def get_time(self):
274          t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
275          return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
276  
277  # in Python2 exec is a keyword so one must use "exec_"
278  # but for Python3 we want to provide the nicer version "exec"
279  setattr(Pyboard, "exec", Pyboard.exec_)
280  
281  def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
282      pyb = Pyboard(device, baudrate, user, password)
283      pyb.enter_raw_repl()
284      output = pyb.execfile(filename)
285      stdout_write_bytes(output)
286      pyb.exit_raw_repl()
287      pyb.close()
288  
289  def main():
290      import argparse
291      cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
292      cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
293      cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
294      cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
295      cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
296      cmd_parser.add_argument('-c', '--command', help='program passed in as string')
297      cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
298      cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
299      cmd_parser.add_argument('files', nargs='*', help='input files')
300      args = cmd_parser.parse_args()
301  
302      def execbuffer(buf):
303          try:
304              pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
305              pyb.enter_raw_repl()
306              ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
307              pyb.exit_raw_repl()
308              pyb.close()
309          except PyboardError as er:
310              print(er)
311              sys.exit(1)
312          except KeyboardInterrupt:
313              sys.exit(1)
314          if ret_err:
315              stdout_write_bytes(ret_err)
316              sys.exit(1)
317  
318      if args.command is not None:
319          execbuffer(args.command.encode('utf-8'))
320  
321      for filename in args.files:
322          with open(filename, 'rb') as f:
323              pyfile = f.read()
324              execbuffer(pyfile)
325  
326      if args.follow or (args.command is None and len(args.files) == 0):
327          try:
328              pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
329              ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
330              pyb.close()
331          except PyboardError as er:
332              print(er)
333              sys.exit(1)
334          except KeyboardInterrupt:
335              sys.exit(1)
336          if ret_err:
337              stdout_write_bytes(ret_err)
338              sys.exit(1)
339  
340  if __name__ == "__main__":
341      main()