/ dev / ssltest.py.bak
ssltest.py.bak
  1  import os
  2  import select
  3  import socket
  4  import ssl
  5  import sys
  6  import traceback
  7  
  8  HOST = "127.0.0.1"
  9  PORT = 8912
 10  
 11  
 12  def sslProtocolVersion():
 13      # sslProtocolVersion
 14      if sys.version_info >= (2, 7, 13):
 15          # this means TLSv1 or higher
 16          # in the future change to
 17          # ssl.PROTOCOL_TLS1.2
 18          return ssl.PROTOCOL_TLS
 19      elif sys.version_info >= (2, 7, 9):
 20          # this means any SSL/TLS. SSLv2 and 3 are excluded with an option after context is created
 21          return ssl.PROTOCOL_SSLv23
 22      else:
 23          # this means TLSv1, there is no way to set "TLSv1 or higher" or
 24          # "TLSv1.2" in < 2.7.9
 25          return ssl.PROTOCOL_TLSv1
 26  
 27  
 28  def sslProtocolCiphers():
 29      if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000:
 30          return "AECDH-AES256-SHA@SECLEVEL=0"
 31      else:
 32          return "AECDH-AES256-SHA"
 33  
 34  
 35  def connect():
 36      sock = socket.create_connection((HOST, PORT))
 37      return sock
 38  
 39  
 40  def listen():
 41      sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 42      sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 43      sock.bind((HOST, PORT))
 44      sock.listen(0)
 45      return sock
 46  
 47  
 48  def sslHandshake(sock, server=False):
 49      if sys.version_info >= (2, 7, 9):
 50          context = ssl.SSLContext(sslProtocolVersion())
 51          context.set_ciphers(sslProtocolCiphers())
 52          context.set_ecdh_curve("secp256k1")
 53          context.check_hostname = False
 54          context.verify_mode = ssl.CERT_NONE
 55          context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3\
 56              | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE
 57          sslSock = context.wrap_socket(sock, server_side=server, do_handshake_on_connect=False)
 58      else:
 59          sslSock = ssl.wrap_socket(sock, keyfile=os.path.join('src', 'sslkeys', 'key.pem'),
 60                                    certfile=os.path.join('src', 'sslkeys', 'cert.pem'),
 61                                    server_side=server, ssl_version=sslProtocolVersion(),
 62                                    do_handshake_on_connect=False, ciphers='AECDH-AES256-SHA')
 63  
 64      while True:
 65          try:
 66              sslSock.do_handshake()
 67              break
 68          except ssl.SSLWantReadError:
 69              print("Waiting for SSL socket handhake read")
 70              select.select([sslSock], [], [], 10)
 71          except ssl.SSLWantWriteError:
 72              print("Waiting for SSL socket handhake write")
 73              select.select([], [sslSock], [], 10)
 74          except Exception:
 75              print("SSL socket handhake failed, shutting down connection")
 76              traceback.print_exc()
 77              return
 78      print("Success!")
 79      return sslSock
 80  
 81  
 82  if __name__ == "__main__":
 83      if len(sys.argv) != 2:
 84          print("Usage: ssltest.py client|server")
 85          sys.exit(0)
 86      elif sys.argv[1] == "server":
 87          serversock = listen()
 88          while True:
 89              print("Waiting for connection")
 90              sock, addr = serversock.accept()
 91              print("Got connection from %s:%i" % (addr[0], addr[1]))
 92              sslSock = sslHandshake(sock, True)
 93              if sslSock:
 94                  sslSock.shutdown(socket.SHUT_RDWR)
 95                  sslSock.close()
 96      elif sys.argv[1] == "client":
 97          sock = connect()
 98          sslSock = sslHandshake(sock, False)
 99          if sslSock:
100              sslSock.shutdown(socket.SHUT_RDWR)
101              sslSock.close()
102      else:
103          print("Usage: ssltest.py client|server")
104          sys.exit(0)