/ letsencrypt / validator.py
validator.py
 1  """Validators to determine the current webserver configuration"""
 2  import logging
 3  import socket
 4  import requests
 5  import zope.interface
 6  
 7  from acme import crypto_util
 8  from acme import errors as acme_errors
 9  from letsencrypt import interfaces
10  
11  
12  logger = logging.getLogger(__name__)
13  
14  
15  class Validator(object):
16      # pylint: disable=no-self-use
17      """Collection of functions to test a live webserver's configuration"""
18      zope.interface.implements(interfaces.IValidator)
19  
20      def certificate(self, cert, name, alt_host=None, port=443):
21          """Verifies the certificate presented at name is cert"""
22          host = alt_host if alt_host else socket.gethostbyname(name)
23          try:
24              presented_cert = crypto_util.probe_sni(name, host, port)
25          except acme_errors.Error as error:
26              logger.exception(error)
27              return False
28  
29          return presented_cert.digest("sha256") == cert.digest("sha256")
30  
31      def redirect(self, name, port=80, headers=None):
32          """Test whether webserver redirects to secure connection."""
33          url = "http://{0}:{1}".format(name, port)
34          if headers:
35              response = requests.get(url, headers=headers, allow_redirects=False)
36          else:
37              response = requests.get(url, allow_redirects=False)
38  
39          if response.status_code not in (301, 303):
40              return False
41  
42          redirect_location = response.headers.get("location", "")
43          if not redirect_location.startswith("https://"):
44              return False
45  
46          if response.status_code != 301:
47              logger.error("Server did not redirect with permanent code")
48              return False
49  
50          return True
51  
52      def hsts(self, name):
53          """Test for HTTP Strict Transport Security header"""
54          headers = requests.get("https://" + name).headers
55          hsts_header = headers.get("strict-transport-security")
56  
57          if not hsts_header:
58              return False
59  
60          # Split directives following RFC6797, section 6.1
61          directives = [d.split("=") for d in hsts_header.split(";")]
62          max_age = [d for d in directives if d[0] == "max-age"]
63  
64          if not max_age:
65              logger.error("Server responded with invalid HSTS header field")
66              return False
67  
68          try:
69              _, max_age_value = max_age[0]
70              max_age_value = int(max_age_value)
71          except ValueError:
72              logger.error("Server responded with invalid HSTS header field")
73              return False
74  
75          # Test whether HSTS does not expire for at least two weeks.
76          if max_age_value <= (2 * 7 * 24 * 3600):
77              logger.error("HSTS should not expire in less than two weeks")
78              return False
79  
80          return True
81  
82      def ocsp_stapling(self, name):
83          """Verify ocsp stapling for domain."""
84          raise NotImplementedError()