pyboard.py
1 #!/usr/bin/env python 2 3 """ 4 pyboard interface 5 6 This module provides the Pyboard class, used to communicate with and 7 control the pyboard over a serial USB connection. 8 9 Example usage: 10 11 import pyboard 12 pyb = pyboard.Pyboard('/dev/ttyACM0') 13 14 Or: 15 16 pyb = pyboard.Pyboard('192.168.1.1') 17 18 Then: 19 20 pyb.enter_raw_repl() 21 pyb.exec('pyb.LED(1).on()') 22 pyb.exit_raw_repl() 23 24 Note: if using Python2 then pyb.exec must be written as pyb.exec_. 25 To run a script from the local machine on the board and print out the results: 26 27 import pyboard 28 pyboard.execfile('test.py', device='/dev/ttyACM0') 29 30 This script can also be run directly. To execute a local script, use: 31 32 ./pyboard.py test.py 33 34 Or: 35 36 python pyboard.py test.py 37 38 """ 39 40 import sys 41 import time 42 43 _rawdelay = None 44 45 try: 46 stdout = sys.stdout.buffer 47 except AttributeError: 48 # Python2 doesn't have buffer attr 49 stdout = sys.stdout 50 51 def stdout_write_bytes(b): 52 b = b.replace(b"\x04", b"") 53 stdout.write(b) 54 stdout.flush() 55 56 class PyboardError(BaseException): 57 pass 58 59 class TelnetToSerial: 60 def __init__(self, ip, user, password, read_timeout=None): 61 import telnetlib 62 self.tn = telnetlib.Telnet(ip, timeout=15) 63 self.read_timeout = read_timeout 64 if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): 65 self.tn.write(bytes(user, 'ascii') + b"\r\n") 66 67 if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): 68 # needed because of internal implementation details of the telnet server 69 time.sleep(0.2) 70 self.tn.write(bytes(password, 'ascii') + b"\r\n") 71 72 if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): 73 # login succesful 74 from collections import deque 75 self.fifo = deque() 76 return 77 78 raise PyboardError('Failed to establish a telnet connection with the board') 79 80 def __del__(self): 81 self.close() 82 83 def close(self): 84 try: 85 self.tn.close() 86 except: 87 # the telnet object might not exist yet, so ignore this one 88 pass 89 90 def read(self, size=1): 91 while len(self.fifo) < size: 92 timeout_count = 0 93 data = self.tn.read_eager() 94 if len(data): 95 self.fifo.extend(data) 96 timeout_count = 0 97 else: 98 time.sleep(0.25) 99 if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: 100 break 101 timeout_count += 1 102 103 data = b'' 104 while len(data) < size and len(self.fifo) > 0: 105 data += bytes([self.fifo.popleft()]) 106 return data 107 108 def write(self, data): 109 self.tn.write(data) 110 return len(data) 111 112 def inWaiting(self): 113 n_waiting = len(self.fifo) 114 if not n_waiting: 115 data = self.tn.read_eager() 116 self.fifo.extend(data) 117 return len(data) 118 else: 119 return n_waiting 120 121 class Pyboard: 122 def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): 123 global _rawdelay 124 _rawdelay = rawdelay 125 if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: 126 # device looks like an IP address 127 self.serial = TelnetToSerial(device, user, password, read_timeout=10) 128 else: 129 import serial 130 delayed = False 131 for attempt in range(wait + 1): 132 try: 133 self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) 134 break 135 except (OSError, IOError): # Py2 and Py3 have different errors 136 if wait == 0: 137 continue 138 if attempt == 0: 139 sys.stdout.write('Waiting {} seconds for pyboard '.format(wait)) 140 delayed = True 141 time.sleep(1) 142 sys.stdout.write('.') 143 sys.stdout.flush() 144 else: 145 if delayed: 146 print('') 147 raise PyboardError('failed to access ' + device) 148 if delayed: 149 print('') 150 151 def close(self): 152 self.serial.close() 153 154 def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): 155 data = self.serial.read(min_num_bytes) 156 if data_consumer: 157 data_consumer(data) 158 timeout_count = 0 159 while True: 160 if data.endswith(ending): 161 break 162 elif self.serial.inWaiting() > 0: 163 new_data = self.serial.read(1) 164 data = data + new_data 165 if data_consumer: 166 data_consumer(new_data) 167 timeout_count = 0 168 else: 169 timeout_count += 1 170 if timeout is not None and timeout_count >= 100 * timeout: 171 break 172 time.sleep(0.01) 173 return data 174 175 def enter_raw_repl(self): 176 # Brief delay before sending RAW MODE char if requests 177 if _rawdelay > 0: 178 time.sleep(_rawdelay) 179 180 self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program 181 182 # flush input (without relying on serial.flushInput()) 183 n = self.serial.inWaiting() 184 while n > 0: 185 self.serial.read(n) 186 n = self.serial.inWaiting() 187 188 self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL 189 data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') 190 if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): 191 print(data) 192 raise PyboardError('could not enter raw repl') 193 194 self.serial.write(b'\x04') # ctrl-D: soft reset 195 data = self.read_until(1, b'soft reboot\r\n') 196 if not data.endswith(b'soft reboot\r\n'): 197 print(data) 198 raise PyboardError('could not enter raw repl') 199 # By splitting this into 2 reads, it allows boot.py to print stuff, 200 # which will show up after the soft reboot and before the raw REPL. 201 # Modification from original pyboard.py below: 202 # Add a small delay and send Ctrl-C twice after soft reboot to ensure 203 # any main program loop in main.py is interrupted. 204 time.sleep(0.5) 205 self.serial.write(b'\x03\x03') 206 # End modification above. 207 data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') 208 if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): 209 print(data) 210 raise PyboardError('could not enter raw repl') 211 212 def exit_raw_repl(self): 213 self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL 214 215 def follow(self, timeout, data_consumer=None): 216 # wait for normal output 217 data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) 218 if not data.endswith(b'\x04'): 219 raise PyboardError('timeout waiting for first EOF reception') 220 data = data[:-1] 221 222 # wait for error output 223 data_err = self.read_until(1, b'\x04', timeout=timeout) 224 if not data_err.endswith(b'\x04'): 225 raise PyboardError('timeout waiting for second EOF reception') 226 data_err = data_err[:-1] 227 228 # return normal and error output 229 return data, data_err 230 231 def exec_raw_no_follow(self, command): 232 if isinstance(command, bytes): 233 command_bytes = command 234 else: 235 command_bytes = bytes(command, encoding='utf8') 236 237 # check we have a prompt 238 data = self.read_until(1, b'>') 239 if not data.endswith(b'>'): 240 raise PyboardError('could not enter raw repl') 241 242 # write command 243 for i in range(0, len(command_bytes), 256): 244 self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) 245 time.sleep(0.01) 246 self.serial.write(b'\x04') 247 248 # check if we could exec command 249 data = self.serial.read(2) 250 if data != b'OK': 251 raise PyboardError('could not exec command') 252 253 def exec_raw(self, command, timeout=10, data_consumer=None): 254 self.exec_raw_no_follow(command); 255 return self.follow(timeout, data_consumer) 256 257 def eval(self, expression): 258 ret = self.exec_('print({})'.format(expression)) 259 ret = ret.strip() 260 return ret 261 262 def exec_(self, command): 263 ret, ret_err = self.exec_raw(command) 264 if ret_err: 265 raise PyboardError('exception', ret, ret_err) 266 return ret 267 268 def execfile(self, filename): 269 with open(filename, 'rb') as f: 270 pyfile = f.read() 271 return self.exec_(pyfile) 272 273 def get_time(self): 274 t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') 275 return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) 276 277 # in Python2 exec is a keyword so one must use "exec_" 278 # but for Python3 we want to provide the nicer version "exec" 279 setattr(Pyboard, "exec", Pyboard.exec_) 280 281 def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): 282 pyb = Pyboard(device, baudrate, user, password) 283 pyb.enter_raw_repl() 284 output = pyb.execfile(filename) 285 stdout_write_bytes(output) 286 pyb.exit_raw_repl() 287 pyb.close() 288 289 def main(): 290 import argparse 291 cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') 292 cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') 293 cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') 294 cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') 295 cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') 296 cmd_parser.add_argument('-c', '--command', help='program passed in as string') 297 cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') 298 cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') 299 cmd_parser.add_argument('files', nargs='*', help='input files') 300 args = cmd_parser.parse_args() 301 302 def execbuffer(buf): 303 try: 304 pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) 305 pyb.enter_raw_repl() 306 ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) 307 pyb.exit_raw_repl() 308 pyb.close() 309 except PyboardError as er: 310 print(er) 311 sys.exit(1) 312 except KeyboardInterrupt: 313 sys.exit(1) 314 if ret_err: 315 stdout_write_bytes(ret_err) 316 sys.exit(1) 317 318 if args.command is not None: 319 execbuffer(args.command.encode('utf-8')) 320 321 for filename in args.files: 322 with open(filename, 'rb') as f: 323 pyfile = f.read() 324 execbuffer(pyfile) 325 326 if args.follow or (args.command is None and len(args.files) == 0): 327 try: 328 pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) 329 ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) 330 pyb.close() 331 except PyboardError as er: 332 print(er) 333 sys.exit(1) 334 except KeyboardInterrupt: 335 sys.exit(1) 336 if ret_err: 337 stdout_write_bytes(ret_err) 338 sys.exit(1) 339 340 if __name__ == "__main__": 341 main()