esp_prov.py
1 #!/usr/bin/env python 2 # 3 # Copyright 2018 Espressif Systems (Shanghai) PTE LTD 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 from __future__ import print_function 19 from builtins import input 20 import argparse 21 import textwrap 22 import time 23 import os 24 import sys 25 import json 26 from getpass import getpass 27 28 try: 29 import security 30 import transport 31 import prov 32 33 except ImportError: 34 idf_path = os.environ['IDF_PATH'] 35 sys.path.insert(0, idf_path + "/components/protocomm/python") 36 sys.path.insert(1, idf_path + "/tools/esp_prov") 37 38 import security 39 import transport 40 import prov 41 42 # Set this to true to allow exceptions to be thrown 43 config_throw_except = False 44 45 46 def on_except(err): 47 if config_throw_except: 48 raise RuntimeError(err) 49 else: 50 print(err) 51 52 53 def get_security(secver, pop=None, verbose=False): 54 if secver == 1: 55 return security.Security1(pop, verbose) 56 elif secver == 0: 57 return security.Security0(verbose) 58 return None 59 60 61 def get_transport(sel_transport, service_name): 62 try: 63 tp = None 64 if (sel_transport == 'softap'): 65 if service_name is None: 66 service_name = '192.168.4.1:80' 67 tp = transport.Transport_HTTP(service_name) 68 elif (sel_transport == 'ble'): 69 if service_name is None: 70 raise RuntimeError('"--service_name" must be specified for ble transport') 71 # BLE client is now capable of automatically figuring out 72 # the primary service from the advertisement data and the 73 # characteristics corresponding to each endpoint. 74 # Below, the service_uuid field and 16bit UUIDs in the nu_lookup 75 # table are provided only to support devices running older firmware, 76 # in which case, the automated discovery will fail and the client 77 # will fallback to using the provided UUIDs instead 78 nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'} 79 tp = transport.Transport_BLE(devname=service_name, 80 service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4', 81 nu_lookup=nu_lookup) 82 elif (sel_transport == 'console'): 83 tp = transport.Transport_Console() 84 return tp 85 except RuntimeError as e: 86 on_except(e) 87 return None 88 89 90 def version_match(tp, protover, verbose=False): 91 try: 92 response = tp.send_data('proto-ver', protover) 93 94 if verbose: 95 print("proto-ver response : ", response) 96 97 # First assume this to be a simple version string 98 if response.lower() == protover.lower(): 99 return True 100 101 try: 102 # Else interpret this as JSON structure containing 103 # information with versions and capabilities of both 104 # provisioning service and application 105 info = json.loads(response) 106 if info['prov']['ver'].lower() == protover.lower(): 107 return True 108 109 except ValueError: 110 # If decoding as JSON fails, it means that capabilities 111 # are not supported 112 return False 113 114 except Exception as e: 115 on_except(e) 116 return None 117 118 119 def has_capability(tp, capability='none', verbose=False): 120 # Note : default value of `capability` argument cannot be empty string 121 # because protocomm_httpd expects non zero content lengths 122 try: 123 response = tp.send_data('proto-ver', capability) 124 125 if verbose: 126 print("proto-ver response : ", response) 127 128 try: 129 # Interpret this as JSON structure containing 130 # information with versions and capabilities of both 131 # provisioning service and application 132 info = json.loads(response) 133 supported_capabilities = info['prov']['cap'] 134 if capability.lower() == 'none': 135 # No specific capability to check, but capabilities 136 # feature is present so return True 137 return True 138 elif capability in supported_capabilities: 139 return True 140 return False 141 142 except ValueError: 143 # If decoding as JSON fails, it means that capabilities 144 # are not supported 145 return False 146 147 except RuntimeError as e: 148 on_except(e) 149 150 return False 151 152 153 def get_version(tp): 154 response = None 155 try: 156 response = tp.send_data('proto-ver', '---') 157 except RuntimeError as e: 158 on_except(e) 159 response = '' 160 return response 161 162 163 def establish_session(tp, sec): 164 try: 165 response = None 166 while True: 167 request = sec.security_session(response) 168 if request is None: 169 break 170 response = tp.send_data('prov-session', request) 171 if (response is None): 172 return False 173 return True 174 except RuntimeError as e: 175 on_except(e) 176 return None 177 178 179 def custom_config(tp, sec, custom_info, custom_ver): 180 try: 181 message = prov.custom_config_request(sec, custom_info, custom_ver) 182 response = tp.send_data('custom-config', message) 183 return (prov.custom_config_response(sec, response) == 0) 184 except RuntimeError as e: 185 on_except(e) 186 return None 187 188 189 def custom_data(tp, sec, custom_data): 190 try: 191 message = prov.custom_data_request(sec, custom_data) 192 response = tp.send_data('custom-data', message) 193 return (prov.custom_data_response(sec, response) == 0) 194 except RuntimeError as e: 195 on_except(e) 196 return None 197 198 199 def scan_wifi_APs(sel_transport, tp, sec): 200 APs = [] 201 group_channels = 0 202 readlen = 100 203 if sel_transport == 'softap': 204 # In case of softAP we must perform the scan on individual channels, one by one, 205 # so that the Wi-Fi controller gets ample time to send out beacons (necessary to 206 # maintain connectivity with authenticated stations. As scanning one channel at a 207 # time will be slow, we can group more than one channels to be scanned in quick 208 # succession, hence speeding up the scan process. Though if too many channels are 209 # present in a group, the controller may again miss out on sending beacons. Hence, 210 # the application must should use an optimum value. The following value usually 211 # works out in most cases 212 group_channels = 5 213 elif sel_transport == 'ble': 214 # Read at most 4 entries at a time. This is because if we are using BLE transport 215 # then the response packet size should not exceed the present limit of 256 bytes of 216 # characteristic value imposed by protocomm_ble. This limit may be removed in the 217 # future 218 readlen = 4 219 try: 220 message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels) 221 start_time = time.time() 222 response = tp.send_data('prov-scan', message) 223 stop_time = time.time() 224 print("++++ Scan process executed in " + str(stop_time - start_time) + " sec") 225 prov.scan_start_response(sec, response) 226 227 message = prov.scan_status_request(sec) 228 response = tp.send_data('prov-scan', message) 229 result = prov.scan_status_response(sec, response) 230 print("++++ Scan results : " + str(result["count"])) 231 if result["count"] != 0: 232 index = 0 233 remaining = result["count"] 234 while remaining: 235 count = [remaining, readlen][remaining > readlen] 236 message = prov.scan_result_request(sec, index, count) 237 response = tp.send_data('prov-scan', message) 238 APs += prov.scan_result_response(sec, response) 239 remaining -= count 240 index += count 241 242 except RuntimeError as e: 243 on_except(e) 244 return None 245 246 return APs 247 248 249 def send_wifi_config(tp, sec, ssid, passphrase): 250 try: 251 message = prov.config_set_config_request(sec, ssid, passphrase) 252 response = tp.send_data('prov-config', message) 253 return (prov.config_set_config_response(sec, response) == 0) 254 except RuntimeError as e: 255 on_except(e) 256 return None 257 258 259 def apply_wifi_config(tp, sec): 260 try: 261 message = prov.config_apply_config_request(sec) 262 response = tp.send_data('prov-config', message) 263 return (prov.config_apply_config_response(sec, response) == 0) 264 except RuntimeError as e: 265 on_except(e) 266 return None 267 268 269 def get_wifi_config(tp, sec): 270 try: 271 message = prov.config_get_status_request(sec) 272 response = tp.send_data('prov-config', message) 273 return prov.config_get_status_response(sec, response) 274 except RuntimeError as e: 275 on_except(e) 276 return None 277 278 279 def wait_wifi_connected(tp, sec): 280 """ 281 Wait for provisioning to report Wi-Fi is connected 282 283 Returns True if Wi-Fi connection succeeded, False if connection consistently failed 284 """ 285 TIME_PER_POLL = 5 286 retry = 3 287 288 while True: 289 time.sleep(TIME_PER_POLL) 290 print("\n==== Wi-Fi connection state ====") 291 ret = get_wifi_config(tp, sec) 292 if ret == "connecting": 293 continue 294 elif ret == "connected": 295 print("==== Provisioning was successful ====") 296 return True 297 elif retry > 0: 298 retry -= 1 299 print("Waiting to poll status again (status %s, %d tries left)..." % (ret, retry)) 300 else: 301 print("---- Provisioning failed ----") 302 return False 303 304 305 def desc_format(*args): 306 desc = '' 307 for arg in args: 308 desc += textwrap.fill(replace_whitespace=False, text=arg) + "\n" 309 return desc 310 311 312 if __name__ == '__main__': 313 parser = argparse.ArgumentParser(description=desc_format( 314 'ESP Provisioning tool for configuring devices ' 315 'running protocomm based provisioning service.', 316 'See esp-idf/examples/provisioning for sample applications'), 317 formatter_class=argparse.RawTextHelpFormatter) 318 319 parser.add_argument("--transport", required=True, dest='mode', type=str, 320 help=desc_format( 321 'Mode of transport over which provisioning is to be performed.', 322 'This should be one of "softap", "ble" or "console"')) 323 324 parser.add_argument("--service_name", dest='name', type=str, 325 help=desc_format( 326 'This specifies the name of the provisioning service to connect to, ' 327 'depending upon the mode of transport :', 328 '\t- transport "ble" : The BLE Device Name', 329 '\t- transport "softap" : HTTP Server hostname or IP', 330 '\t (default "192.168.4.1:80")')) 331 332 parser.add_argument("--proto_ver", dest='version', type=str, default='', 333 help=desc_format( 334 'This checks the protocol version of the provisioning service running ' 335 'on the device before initiating Wi-Fi configuration')) 336 337 parser.add_argument("--sec_ver", dest='secver', type=int, default=None, 338 help=desc_format( 339 'Protocomm security scheme used by the provisioning service for secure ' 340 'session establishment. Accepted values are :', 341 '\t- 0 : No security', 342 '\t- 1 : X25519 key exchange + AES-CTR encryption', 343 '\t + Authentication using Proof of Possession (PoP)', 344 'In case device side application uses IDF\'s provisioning manager, ' 345 'the compatible security version is automatically determined from ' 346 'capabilities retrieved via the version endpoint')) 347 348 parser.add_argument("--pop", dest='pop', type=str, default='', 349 help=desc_format( 350 'This specifies the Proof of possession (PoP) when security scheme 1 ' 351 'is used')) 352 353 parser.add_argument("--ssid", dest='ssid', type=str, default='', 354 help=desc_format( 355 'This configures the device to use SSID of the Wi-Fi network to which ' 356 'we would like it to connect to permanently, once provisioning is complete. ' 357 'If Wi-Fi scanning is supported by the provisioning service, this need not ' 358 'be specified')) 359 360 parser.add_argument("--passphrase", dest='passphrase', type=str, default='', 361 help=desc_format( 362 'This configures the device to use Passphrase for the Wi-Fi network to which ' 363 'we would like it to connect to permanently, once provisioning is complete. ' 364 'If Wi-Fi scanning is supported by the provisioning service, this need not ' 365 'be specified')) 366 367 parser.add_argument("--custom_data", dest='custom_data', type=str, default='', 368 help=desc_format( 369 'This is an optional parameter, only intended for use with ' 370 '"examples/provisioning/wifi_prov_mgr_custom_data"')) 371 372 parser.add_argument("--custom_config", action="store_true", 373 help=desc_format( 374 'This is an optional parameter, only intended for use with ' 375 '"examples/provisioning/custom_config"')) 376 parser.add_argument("--custom_info", dest='custom_info', type=str, default='<some custom info string>', 377 help=desc_format( 378 'Custom Config Info String. "--custom_config" must be specified for using this')) 379 parser.add_argument("--custom_ver", dest='custom_ver', type=int, default=2, 380 help=desc_format( 381 'Custom Config Version Number. "--custom_config" must be specified for using this')) 382 383 parser.add_argument("-v","--verbose", help="Increase output verbosity", action="store_true") 384 385 args = parser.parse_args() 386 387 obj_transport = get_transport(args.mode.lower(), args.name) 388 if obj_transport is None: 389 print("---- Failed to establish connection ----") 390 exit(1) 391 392 # If security version not specified check in capabilities 393 if args.secver is None: 394 # First check if capabilities are supported or not 395 if not has_capability(obj_transport): 396 print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly') 397 print("---- Invalid Security Version ----") 398 exit(2) 399 400 # When no_sec is present, use security 0, else security 1 401 args.secver = int(not has_capability(obj_transport, 'no_sec')) 402 print("Security scheme determined to be :", args.secver) 403 404 if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'): 405 if len(args.pop) == 0: 406 print("---- Proof of Possession argument not provided ----") 407 exit(2) 408 elif len(args.pop) != 0: 409 print("---- Proof of Possession will be ignored ----") 410 args.pop = '' 411 412 obj_security = get_security(args.secver, args.pop, args.verbose) 413 if obj_security is None: 414 print("---- Invalid Security Version ----") 415 exit(2) 416 417 if args.version != '': 418 print("\n==== Verifying protocol version ====") 419 if not version_match(obj_transport, args.version, args.verbose): 420 print("---- Error in protocol version matching ----") 421 exit(3) 422 print("==== Verified protocol version successfully ====") 423 424 print("\n==== Starting Session ====") 425 if not establish_session(obj_transport, obj_security): 426 print("Failed to establish session. Ensure that security scheme and proof of possession are correct") 427 print("---- Error in establishing session ----") 428 exit(4) 429 print("==== Session Established ====") 430 431 if args.custom_config: 432 print("\n==== Sending Custom config to esp32 ====") 433 if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver): 434 print("---- Error in custom config ----") 435 exit(5) 436 print("==== Custom config sent successfully ====") 437 438 if args.custom_data != '': 439 print("\n==== Sending Custom data to esp32 ====") 440 if not custom_data(obj_transport, obj_security, args.custom_data): 441 print("---- Error in custom data ----") 442 exit(5) 443 print("==== Custom data sent successfully ====") 444 445 if args.ssid == '': 446 if not has_capability(obj_transport, 'wifi_scan'): 447 print("---- Wi-Fi Scan List is not supported by provisioning service ----") 448 print("---- Rerun esp_prov with SSID and Passphrase as argument ----") 449 exit(3) 450 451 while True: 452 print("\n==== Scanning Wi-Fi APs ====") 453 start_time = time.time() 454 APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security) 455 end_time = time.time() 456 print("\n++++ Scan finished in " + str(end_time - start_time) + " sec") 457 if APs is None: 458 print("---- Error in scanning Wi-Fi APs ----") 459 exit(8) 460 461 if len(APs) == 0: 462 print("No APs found!") 463 exit(9) 464 465 print("==== Wi-Fi Scan results ====") 466 print("{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}".format( 467 "S.N.", "SSID", "BSSID", "CHN", "RSSI", "AUTH")) 468 for i in range(len(APs)): 469 print("[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}".format( 470 i + 1, APs[i]["ssid"], APs[i]["bssid"], APs[i]["channel"], APs[i]["rssi"], APs[i]["auth"])) 471 472 while True: 473 try: 474 select = int(input("Select AP by number (0 to rescan) : ")) 475 if select < 0 or select > len(APs): 476 raise ValueError 477 break 478 except ValueError: 479 print("Invalid input! Retry") 480 481 if select != 0: 482 break 483 484 args.ssid = APs[select - 1]["ssid"] 485 prompt_str = "Enter passphrase for {0} : ".format(args.ssid) 486 args.passphrase = getpass(prompt_str) 487 488 print("\n==== Sending Wi-Fi credential to esp32 ====") 489 if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase): 490 print("---- Error in send Wi-Fi config ----") 491 exit(6) 492 print("==== Wi-Fi Credentials sent successfully ====") 493 494 print("\n==== Applying config to esp32 ====") 495 if not apply_wifi_config(obj_transport, obj_security): 496 print("---- Error in apply Wi-Fi config ----") 497 exit(7) 498 print("==== Apply config sent successfully ====") 499 500 wait_wifi_connected(obj_transport, obj_security)