/ tools / esp_prov / esp_prov.py
esp_prov.py
  1  #!/usr/bin/env python
  2  #
  3  # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  4  #
  5  # Licensed under the Apache License, Version 2.0 (the "License");
  6  # you may not use this file except in compliance with the License.
  7  # You may obtain a copy of the License at
  8  #
  9  #     http://www.apache.org/licenses/LICENSE-2.0
 10  #
 11  # Unless required by applicable law or agreed to in writing, software
 12  # distributed under the License is distributed on an "AS IS" BASIS,
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  # See the License for the specific language governing permissions and
 15  # limitations under the License.
 16  #
 17  
 18  from __future__ import print_function
 19  from builtins import input
 20  import argparse
 21  import textwrap
 22  import time
 23  import os
 24  import sys
 25  import json
 26  from getpass import getpass
 27  
 28  try:
 29      import security
 30      import transport
 31      import prov
 32  
 33  except ImportError:
 34      idf_path = os.environ['IDF_PATH']
 35      sys.path.insert(0, idf_path + "/components/protocomm/python")
 36      sys.path.insert(1, idf_path + "/tools/esp_prov")
 37  
 38      import security
 39      import transport
 40      import prov
 41  
 42  # Set this to true to allow exceptions to be thrown
 43  config_throw_except = False
 44  
 45  
 46  def on_except(err):
 47      if config_throw_except:
 48          raise RuntimeError(err)
 49      else:
 50          print(err)
 51  
 52  
 53  def get_security(secver, pop=None, verbose=False):
 54      if secver == 1:
 55          return security.Security1(pop, verbose)
 56      elif secver == 0:
 57          return security.Security0(verbose)
 58      return None
 59  
 60  
 61  def get_transport(sel_transport, service_name):
 62      try:
 63          tp = None
 64          if (sel_transport == 'softap'):
 65              if service_name is None:
 66                  service_name = '192.168.4.1:80'
 67              tp = transport.Transport_HTTP(service_name)
 68          elif (sel_transport == 'ble'):
 69              if service_name is None:
 70                  raise RuntimeError('"--service_name" must be specified for ble transport')
 71              # BLE client is now capable of automatically figuring out
 72              # the primary service from the advertisement data and the
 73              # characteristics corresponding to each endpoint.
 74              # Below, the service_uuid field and 16bit UUIDs in the nu_lookup
 75              # table are provided only to support devices running older firmware,
 76              # in which case, the automated discovery will fail and the client
 77              # will fallback to using the provided UUIDs instead
 78              nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'}
 79              tp = transport.Transport_BLE(devname=service_name,
 80                                           service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
 81                                           nu_lookup=nu_lookup)
 82          elif (sel_transport == 'console'):
 83              tp = transport.Transport_Console()
 84          return tp
 85      except RuntimeError as e:
 86          on_except(e)
 87          return None
 88  
 89  
 90  def version_match(tp, protover, verbose=False):
 91      try:
 92          response = tp.send_data('proto-ver', protover)
 93  
 94          if verbose:
 95              print("proto-ver response : ", response)
 96  
 97          # First assume this to be a simple version string
 98          if response.lower() == protover.lower():
 99              return True
100  
101          try:
102              # Else interpret this as JSON structure containing
103              # information with versions and capabilities of both
104              # provisioning service and application
105              info = json.loads(response)
106              if info['prov']['ver'].lower() == protover.lower():
107                  return True
108  
109          except ValueError:
110              # If decoding as JSON fails, it means that capabilities
111              # are not supported
112              return False
113  
114      except Exception as e:
115          on_except(e)
116          return None
117  
118  
119  def has_capability(tp, capability='none', verbose=False):
120      # Note : default value of `capability` argument cannot be empty string
121      # because protocomm_httpd expects non zero content lengths
122      try:
123          response = tp.send_data('proto-ver', capability)
124  
125          if verbose:
126              print("proto-ver response : ", response)
127  
128          try:
129              # Interpret this as JSON structure containing
130              # information with versions and capabilities of both
131              # provisioning service and application
132              info = json.loads(response)
133              supported_capabilities = info['prov']['cap']
134              if capability.lower() == 'none':
135                  # No specific capability to check, but capabilities
136                  # feature is present so return True
137                  return True
138              elif capability in supported_capabilities:
139                  return True
140              return False
141  
142          except ValueError:
143              # If decoding as JSON fails, it means that capabilities
144              # are not supported
145              return False
146  
147      except RuntimeError as e:
148          on_except(e)
149  
150      return False
151  
152  
153  def get_version(tp):
154      response = None
155      try:
156          response = tp.send_data('proto-ver', '---')
157      except RuntimeError as e:
158          on_except(e)
159          response = ''
160      return response
161  
162  
163  def establish_session(tp, sec):
164      try:
165          response = None
166          while True:
167              request = sec.security_session(response)
168              if request is None:
169                  break
170              response = tp.send_data('prov-session', request)
171              if (response is None):
172                  return False
173          return True
174      except RuntimeError as e:
175          on_except(e)
176          return None
177  
178  
179  def custom_config(tp, sec, custom_info, custom_ver):
180      try:
181          message = prov.custom_config_request(sec, custom_info, custom_ver)
182          response = tp.send_data('custom-config', message)
183          return (prov.custom_config_response(sec, response) == 0)
184      except RuntimeError as e:
185          on_except(e)
186          return None
187  
188  
189  def custom_data(tp, sec, custom_data):
190      try:
191          message = prov.custom_data_request(sec, custom_data)
192          response = tp.send_data('custom-data', message)
193          return (prov.custom_data_response(sec, response) == 0)
194      except RuntimeError as e:
195          on_except(e)
196          return None
197  
198  
199  def scan_wifi_APs(sel_transport, tp, sec):
200      APs = []
201      group_channels = 0
202      readlen = 100
203      if sel_transport == 'softap':
204          # In case of softAP we must perform the scan on individual channels, one by one,
205          # so that the Wi-Fi controller gets ample time to send out beacons (necessary to
206          # maintain connectivity with authenticated stations. As scanning one channel at a
207          # time will be slow, we can group more than one channels to be scanned in quick
208          # succession, hence speeding up the scan process. Though if too many channels are
209          # present in a group, the controller may again miss out on sending beacons. Hence,
210          # the application must should use an optimum value. The following value usually
211          # works out in most cases
212          group_channels = 5
213      elif sel_transport == 'ble':
214          # Read at most 4 entries at a time. This is because if we are using BLE transport
215          # then the response packet size should not exceed the present limit of 256 bytes of
216          # characteristic value imposed by protocomm_ble. This limit may be removed in the
217          # future
218          readlen = 4
219      try:
220          message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels)
221          start_time = time.time()
222          response = tp.send_data('prov-scan', message)
223          stop_time = time.time()
224          print("++++ Scan process executed in " + str(stop_time - start_time) + " sec")
225          prov.scan_start_response(sec, response)
226  
227          message = prov.scan_status_request(sec)
228          response = tp.send_data('prov-scan', message)
229          result = prov.scan_status_response(sec, response)
230          print("++++ Scan results : " + str(result["count"]))
231          if result["count"] != 0:
232              index = 0
233              remaining = result["count"]
234              while remaining:
235                  count = [remaining, readlen][remaining > readlen]
236                  message = prov.scan_result_request(sec, index, count)
237                  response = tp.send_data('prov-scan', message)
238                  APs += prov.scan_result_response(sec, response)
239                  remaining -= count
240                  index += count
241  
242      except RuntimeError as e:
243          on_except(e)
244          return None
245  
246      return APs
247  
248  
249  def send_wifi_config(tp, sec, ssid, passphrase):
250      try:
251          message = prov.config_set_config_request(sec, ssid, passphrase)
252          response = tp.send_data('prov-config', message)
253          return (prov.config_set_config_response(sec, response) == 0)
254      except RuntimeError as e:
255          on_except(e)
256          return None
257  
258  
259  def apply_wifi_config(tp, sec):
260      try:
261          message = prov.config_apply_config_request(sec)
262          response = tp.send_data('prov-config', message)
263          return (prov.config_apply_config_response(sec, response) == 0)
264      except RuntimeError as e:
265          on_except(e)
266          return None
267  
268  
269  def get_wifi_config(tp, sec):
270      try:
271          message = prov.config_get_status_request(sec)
272          response = tp.send_data('prov-config', message)
273          return prov.config_get_status_response(sec, response)
274      except RuntimeError as e:
275          on_except(e)
276          return None
277  
278  
279  def wait_wifi_connected(tp, sec):
280      """
281      Wait for provisioning to report Wi-Fi is connected
282  
283      Returns True if Wi-Fi connection succeeded, False if connection consistently failed
284      """
285      TIME_PER_POLL = 5
286      retry = 3
287  
288      while True:
289          time.sleep(TIME_PER_POLL)
290          print("\n==== Wi-Fi connection state  ====")
291          ret = get_wifi_config(tp, sec)
292          if ret == "connecting":
293              continue
294          elif ret == "connected":
295              print("==== Provisioning was successful ====")
296              return True
297          elif retry > 0:
298              retry -= 1
299              print("Waiting to poll status again (status %s, %d tries left)..." % (ret, retry))
300          else:
301              print("---- Provisioning failed ----")
302              return False
303  
304  
305  def desc_format(*args):
306      desc = ''
307      for arg in args:
308          desc += textwrap.fill(replace_whitespace=False, text=arg) + "\n"
309      return desc
310  
311  
312  if __name__ == '__main__':
313      parser = argparse.ArgumentParser(description=desc_format(
314                                       'ESP Provisioning tool for configuring devices '
315                                       'running protocomm based provisioning service.',
316                                       'See esp-idf/examples/provisioning for sample applications'),
317                                       formatter_class=argparse.RawTextHelpFormatter)
318  
319      parser.add_argument("--transport", required=True, dest='mode', type=str,
320                          help=desc_format(
321                              'Mode of transport over which provisioning is to be performed.',
322                              'This should be one of "softap", "ble" or "console"'))
323  
324      parser.add_argument("--service_name", dest='name', type=str,
325                          help=desc_format(
326                              'This specifies the name of the provisioning service to connect to, '
327                              'depending upon the mode of transport :',
328                              '\t- transport "ble"    : The BLE Device Name',
329                              '\t- transport "softap" : HTTP Server hostname or IP',
330                              '\t                       (default "192.168.4.1:80")'))
331  
332      parser.add_argument("--proto_ver", dest='version', type=str, default='',
333                          help=desc_format(
334                              'This checks the protocol version of the provisioning service running '
335                              'on the device before initiating Wi-Fi configuration'))
336  
337      parser.add_argument("--sec_ver", dest='secver', type=int, default=None,
338                          help=desc_format(
339                              'Protocomm security scheme used by the provisioning service for secure '
340                              'session establishment. Accepted values are :',
341                              '\t- 0 : No security',
342                              '\t- 1 : X25519 key exchange + AES-CTR encryption',
343                              '\t      + Authentication using Proof of Possession (PoP)',
344                              'In case device side application uses IDF\'s provisioning manager, '
345                              'the compatible security version is automatically determined from '
346                              'capabilities retrieved via the version endpoint'))
347  
348      parser.add_argument("--pop", dest='pop', type=str, default='',
349                          help=desc_format(
350                              'This specifies the Proof of possession (PoP) when security scheme 1 '
351                              'is used'))
352  
353      parser.add_argument("--ssid", dest='ssid', type=str, default='',
354                          help=desc_format(
355                              'This configures the device to use SSID of the Wi-Fi network to which '
356                              'we would like it to connect to permanently, once provisioning is complete. '
357                              'If Wi-Fi scanning is supported by the provisioning service, this need not '
358                              'be specified'))
359  
360      parser.add_argument("--passphrase", dest='passphrase', type=str, default='',
361                          help=desc_format(
362                              'This configures the device to use Passphrase for the Wi-Fi network to which '
363                              'we would like it to connect to permanently, once provisioning is complete. '
364                              'If Wi-Fi scanning is supported by the provisioning service, this need not '
365                              'be specified'))
366  
367      parser.add_argument("--custom_data", dest='custom_data', type=str, default='',
368                          help=desc_format(
369                              'This is an optional parameter, only intended for use with '
370                              '"examples/provisioning/wifi_prov_mgr_custom_data"'))
371  
372      parser.add_argument("--custom_config", action="store_true",
373                          help=desc_format(
374                              'This is an optional parameter, only intended for use with '
375                              '"examples/provisioning/custom_config"'))
376      parser.add_argument("--custom_info", dest='custom_info', type=str, default='<some custom info string>',
377                          help=desc_format(
378                              'Custom Config Info String. "--custom_config" must be specified for using this'))
379      parser.add_argument("--custom_ver", dest='custom_ver', type=int, default=2,
380                          help=desc_format(
381                              'Custom Config Version Number. "--custom_config" must be specified for using this'))
382  
383      parser.add_argument("-v","--verbose", help="Increase output verbosity", action="store_true")
384  
385      args = parser.parse_args()
386  
387      obj_transport = get_transport(args.mode.lower(), args.name)
388      if obj_transport is None:
389          print("---- Failed to establish connection ----")
390          exit(1)
391  
392      # If security version not specified check in capabilities
393      if args.secver is None:
394          # First check if capabilities are supported or not
395          if not has_capability(obj_transport):
396              print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly')
397              print("---- Invalid Security Version ----")
398              exit(2)
399  
400          # When no_sec is present, use security 0, else security 1
401          args.secver = int(not has_capability(obj_transport, 'no_sec'))
402          print("Security scheme determined to be :", args.secver)
403  
404          if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'):
405              if len(args.pop) == 0:
406                  print("---- Proof of Possession argument not provided ----")
407                  exit(2)
408          elif len(args.pop) != 0:
409              print("---- Proof of Possession will be ignored ----")
410              args.pop = ''
411  
412      obj_security = get_security(args.secver, args.pop, args.verbose)
413      if obj_security is None:
414          print("---- Invalid Security Version ----")
415          exit(2)
416  
417      if args.version != '':
418          print("\n==== Verifying protocol version ====")
419          if not version_match(obj_transport, args.version, args.verbose):
420              print("---- Error in protocol version matching ----")
421              exit(3)
422          print("==== Verified protocol version successfully ====")
423  
424      print("\n==== Starting Session ====")
425      if not establish_session(obj_transport, obj_security):
426          print("Failed to establish session. Ensure that security scheme and proof of possession are correct")
427          print("---- Error in establishing session ----")
428          exit(4)
429      print("==== Session Established ====")
430  
431      if args.custom_config:
432          print("\n==== Sending Custom config to esp32 ====")
433          if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver):
434              print("---- Error in custom config ----")
435              exit(5)
436          print("==== Custom config sent successfully ====")
437  
438      if args.custom_data != '':
439          print("\n==== Sending Custom data to esp32 ====")
440          if not custom_data(obj_transport, obj_security, args.custom_data):
441              print("---- Error in custom data ----")
442              exit(5)
443          print("==== Custom data sent successfully ====")
444  
445      if args.ssid == '':
446          if not has_capability(obj_transport, 'wifi_scan'):
447              print("---- Wi-Fi Scan List is not supported by provisioning service ----")
448              print("---- Rerun esp_prov with SSID and Passphrase as argument ----")
449              exit(3)
450  
451          while True:
452              print("\n==== Scanning Wi-Fi APs ====")
453              start_time = time.time()
454              APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
455              end_time = time.time()
456              print("\n++++ Scan finished in " + str(end_time - start_time) + " sec")
457              if APs is None:
458                  print("---- Error in scanning Wi-Fi APs ----")
459                  exit(8)
460  
461              if len(APs) == 0:
462                  print("No APs found!")
463                  exit(9)
464  
465              print("==== Wi-Fi Scan results ====")
466              print("{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}".format(
467                  "S.N.", "SSID", "BSSID", "CHN", "RSSI", "AUTH"))
468              for i in range(len(APs)):
469                  print("[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}".format(
470                      i + 1, APs[i]["ssid"], APs[i]["bssid"], APs[i]["channel"], APs[i]["rssi"], APs[i]["auth"]))
471  
472              while True:
473                  try:
474                      select = int(input("Select AP by number (0 to rescan) : "))
475                      if select < 0 or select > len(APs):
476                          raise ValueError
477                      break
478                  except ValueError:
479                      print("Invalid input! Retry")
480  
481              if select != 0:
482                  break
483  
484          args.ssid = APs[select - 1]["ssid"]
485          prompt_str = "Enter passphrase for {0} : ".format(args.ssid)
486          args.passphrase = getpass(prompt_str)
487  
488      print("\n==== Sending Wi-Fi credential to esp32 ====")
489      if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
490          print("---- Error in send Wi-Fi config ----")
491          exit(6)
492      print("==== Wi-Fi Credentials sent successfully ====")
493  
494      print("\n==== Applying config to esp32 ====")
495      if not apply_wifi_config(obj_transport, obj_security):
496          print("---- Error in apply Wi-Fi config ----")
497          exit(7)
498      print("==== Apply config sent successfully ====")
499  
500      wait_wifi_connected(obj_transport, obj_security)