/ letsencrypt / crypto_util.py
crypto_util.py
  1  """Let's Encrypt client crypto utility functions.
  2  
  3  .. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server
  4      is capable of handling the signatures.
  5  
  6  """
  7  import logging
  8  import os
  9  
 10  import OpenSSL
 11  import pyrfc3339
 12  import zope.component
 13  
 14  from acme import crypto_util as acme_crypto_util
 15  from acme import jose
 16  
 17  from letsencrypt import errors
 18  from letsencrypt import interfaces
 19  from letsencrypt import le_util
 20  
 21  
 22  logger = logging.getLogger(__name__)
 23  
 24  
 25  # High level functions
 26  def init_save_key(key_size, key_dir, keyname="key-letsencrypt.pem"):
 27      """Initializes and saves a privkey.
 28  
 29      Inits key and saves it in PEM format on the filesystem.
 30  
 31      .. note:: keyname is the attempted filename, it may be different if a file
 32          already exists at the path.
 33  
 34      :param int key_size: RSA key size in bits
 35      :param str key_dir: Key save directory.
 36      :param str keyname: Filename of key
 37  
 38      :returns: Key
 39      :rtype: :class:`letsencrypt.le_util.Key`
 40  
 41      :raises ValueError: If unable to generate the key given key_size.
 42  
 43      """
 44      try:
 45          key_pem = make_key(key_size)
 46      except ValueError as err:
 47          logger.exception(err)
 48          raise err
 49  
 50      config = zope.component.getUtility(interfaces.IConfig)
 51      # Save file
 52      le_util.make_or_verify_dir(key_dir, 0o700, os.geteuid(),
 53                                 config.strict_permissions)
 54      key_f, key_path = le_util.unique_file(
 55          os.path.join(key_dir, keyname), 0o600)
 56      key_f.write(key_pem)
 57      key_f.close()
 58  
 59      logger.info("Generating key (%d bits): %s", key_size, key_path)
 60  
 61      return le_util.Key(key_path, key_pem)
 62  
 63  
 64  def init_save_csr(privkey, names, path, csrname="csr-letsencrypt.pem"):
 65      """Initialize a CSR with the given private key.
 66  
 67      :param privkey: Key to include in the CSR
 68      :type privkey: :class:`letsencrypt.le_util.Key`
 69  
 70      :param set names: `str` names to include in the CSR
 71  
 72      :param str path: Certificate save directory.
 73  
 74      :returns: CSR
 75      :rtype: :class:`letsencrypt.le_util.CSR`
 76  
 77      """
 78      csr_pem, csr_der = make_csr(privkey.pem, names)
 79  
 80      config = zope.component.getUtility(interfaces.IConfig)
 81      # Save CSR
 82      le_util.make_or_verify_dir(path, 0o755, os.geteuid(),
 83                                 config.strict_permissions)
 84      csr_f, csr_filename = le_util.unique_file(
 85          os.path.join(path, csrname), 0o644)
 86      csr_f.write(csr_pem)
 87      csr_f.close()
 88  
 89      logger.info("Creating CSR: %s", csr_filename)
 90  
 91      return le_util.CSR(csr_filename, csr_der, "der")
 92  
 93  
 94  # Lower level functions
 95  def make_csr(key_str, domains):
 96      """Generate a CSR.
 97  
 98      :param str key_str: PEM-encoded RSA key.
 99      :param list domains: Domains included in the certificate.
100  
101      .. todo:: Detect duplicates in `domains`? Using a set doesn't
102                preserve order...
103  
104      :returns: new CSR in PEM and DER form containing all domains
105      :rtype: tuple
106  
107      """
108      assert domains, "Must provide one or more hostnames for the CSR."
109      pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str)
110      req = OpenSSL.crypto.X509Req()
111      req.get_subject().CN = domains[0]
112      # TODO: what to put into req.get_subject()?
113      # TODO: put SAN if len(domains) > 1
114      req.add_extensions([
115          OpenSSL.crypto.X509Extension(
116              "subjectAltName",
117              critical=False,
118              value=", ".join("DNS:%s" % d for d in domains)
119          ),
120      ])
121      req.set_pubkey(pkey)
122      req.sign(pkey, "sha256")
123      return tuple(OpenSSL.crypto.dump_certificate_request(method, req)
124                   for method in (OpenSSL.crypto.FILETYPE_PEM,
125                                  OpenSSL.crypto.FILETYPE_ASN1))
126  
127  
128  # WARNING: the csr and private key file are possible attack vectors for TOCTOU
129  # We should either...
130  # A. Do more checks to verify that the CSR is trusted/valid
131  # B. Audit the parsing code for vulnerabilities
132  
133  def valid_csr(csr):
134      """Validate CSR.
135  
136      Check if `csr` is a valid CSR for the given domains.
137  
138      :param str csr: CSR in PEM.
139  
140      :returns: Validity of CSR.
141      :rtype: bool
142  
143      """
144      try:
145          req = OpenSSL.crypto.load_certificate_request(
146              OpenSSL.crypto.FILETYPE_PEM, csr)
147          return req.verify(req.get_pubkey())
148      except OpenSSL.crypto.Error as error:
149          logger.debug(error, exc_info=True)
150          return False
151  
152  
153  def csr_matches_pubkey(csr, privkey):
154      """Does private key correspond to the subject public key in the CSR?
155  
156      :param str csr: CSR in PEM.
157      :param str privkey: Private key file contents (PEM)
158  
159      :returns: Correspondence of private key to CSR subject public key.
160      :rtype: bool
161  
162      """
163      req = OpenSSL.crypto.load_certificate_request(
164          OpenSSL.crypto.FILETYPE_PEM, csr)
165      pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
166      try:
167          return req.verify(pkey)
168      except OpenSSL.crypto.Error as error:
169          logger.debug(error, exc_info=True)
170          return False
171  
172  
173  def make_key(bits):
174      """Generate PEM encoded RSA key.
175  
176      :param int bits: Number of bits, at least 1024.
177  
178      :returns: new RSA key in PEM form with specified number of bits
179      :rtype: str
180  
181      """
182      assert bits >= 1024  # XXX
183      key = OpenSSL.crypto.PKey()
184      key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
185      return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
186  
187  
188  def valid_privkey(privkey):
189      """Is valid RSA private key?
190  
191      :param str privkey: Private key file contents in PEM
192  
193      :returns: Validity of private key.
194      :rtype: bool
195  
196      """
197      try:
198          return OpenSSL.crypto.load_privatekey(
199              OpenSSL.crypto.FILETYPE_PEM, privkey).check()
200      except (TypeError, OpenSSL.crypto.Error):
201          return False
202  
203  
204  def pyopenssl_load_certificate(data):
205      """Load PEM/DER certificate.
206  
207      :raises errors.Error:
208  
209      """
210  
211      openssl_errors = []
212  
213      for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
214          try:
215              return OpenSSL.crypto.load_certificate(file_type, data), file_type
216          except OpenSSL.crypto.Error as error:  # TODO: other errors?
217              openssl_errors.append(error)
218      raise errors.Error("Unable to load: {0}".format(",".join(
219          str(error) for error in openssl_errors)))
220  
221  
222  def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
223                                 typ=OpenSSL.crypto.FILETYPE_PEM):
224      try:
225          cert_or_req = load_func(typ, cert_or_req_str)
226      except OpenSSL.crypto.Error as error:
227          logger.exception(error)
228          raise
229      # pylint: disable=protected-access
230      return acme_crypto_util._pyopenssl_cert_or_req_san(cert_or_req)
231  
232  
233  def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
234      """Get a list of Subject Alternative Names from a certificate.
235  
236      :param str cert: Certificate (encoded).
237      :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
238  
239      :returns: A list of Subject Alternative Names.
240      :rtype: list
241  
242      """
243      return _get_sans_from_cert_or_req(
244          cert, OpenSSL.crypto.load_certificate, typ)
245  
246  
247  def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
248      """Get a list of Subject Alternative Names from a CSR.
249  
250      :param str csr: CSR (encoded).
251      :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
252  
253      :returns: A list of Subject Alternative Names.
254      :rtype: list
255  
256      """
257      return _get_sans_from_cert_or_req(
258          csr, OpenSSL.crypto.load_certificate_request, typ)
259  
260  
261  def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
262      """Dump certificate chain into a bundle.
263  
264      :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
265          `acme.jose.ComparableX509`).
266  
267      """
268      # XXX: returns empty string when no chain is available, which
269      # shuts up RenewableCert, but might not be the best solution...
270  
271      def _dump_cert(cert):
272          if isinstance(cert, jose.ComparableX509):
273              # pylint: disable=protected-access
274              cert = cert._wrapped
275          return OpenSSL.crypto.dump_certificate(filetype, cert)
276  
277      # assumes that OpenSSL.crypto.dump_certificate includes ending
278      # newline character
279      return "".join(_dump_cert(cert) for cert in chain)
280  
281  
282  def notBefore(cert_path):
283      """When does the cert at cert_path start being valid?
284  
285      :param str cert_path: path to a cert in PEM format
286  
287      :returns: the notBefore value from the cert at cert_path
288      :rtype: :class:`datetime.datetime`
289  
290      """
291      return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notBefore)
292  
293  
294  def notAfter(cert_path):
295      """When does the cert at cert_path stop being valid?
296  
297      :param str cert_path: path to a cert in PEM format
298  
299      :returns: the notAfter value from the cert at cert_path
300      :rtype: :class:`datetime.datetime`
301  
302      """
303      return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notAfter)
304  
305  
306  def _notAfterBefore(cert_path, method):
307      """Internal helper function for finding notbefore/notafter.
308  
309      :param str cert_path: path to a cert in PEM format
310      :param function method: one of ``OpenSSL.crypto.X509.get_notBefore``
311          or ``OpenSSL.crypto.X509.get_notAfter``
312  
313      :returns: the notBefore or notAfter value from the cert at cert_path
314      :rtype: :class:`datetime.datetime`
315  
316      """
317      with open(cert_path) as f:
318          x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
319                                                 f.read())
320      timestamp = method(x509)
321      reformatted_timestamp = [timestamp[0:4], "-", timestamp[4:6], "-",
322                               timestamp[6:8], "T", timestamp[8:10], ":",
323                               timestamp[10:12], ":", timestamp[12:]]
324      return pyrfc3339.parse("".join(reformatted_timestamp))