/ letsencrypt / crypto_util.py
crypto_util.py
1 """Let's Encrypt client crypto utility functions. 2 3 .. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server 4 is capable of handling the signatures. 5 6 """ 7 import logging 8 import os 9 10 import OpenSSL 11 import pyrfc3339 12 import zope.component 13 14 from acme import crypto_util as acme_crypto_util 15 from acme import jose 16 17 from letsencrypt import errors 18 from letsencrypt import interfaces 19 from letsencrypt import le_util 20 21 22 logger = logging.getLogger(__name__) 23 24 25 # High level functions 26 def init_save_key(key_size, key_dir, keyname="key-letsencrypt.pem"): 27 """Initializes and saves a privkey. 28 29 Inits key and saves it in PEM format on the filesystem. 30 31 .. note:: keyname is the attempted filename, it may be different if a file 32 already exists at the path. 33 34 :param int key_size: RSA key size in bits 35 :param str key_dir: Key save directory. 36 :param str keyname: Filename of key 37 38 :returns: Key 39 :rtype: :class:`letsencrypt.le_util.Key` 40 41 :raises ValueError: If unable to generate the key given key_size. 42 43 """ 44 try: 45 key_pem = make_key(key_size) 46 except ValueError as err: 47 logger.exception(err) 48 raise err 49 50 config = zope.component.getUtility(interfaces.IConfig) 51 # Save file 52 le_util.make_or_verify_dir(key_dir, 0o700, os.geteuid(), 53 config.strict_permissions) 54 key_f, key_path = le_util.unique_file( 55 os.path.join(key_dir, keyname), 0o600) 56 key_f.write(key_pem) 57 key_f.close() 58 59 logger.info("Generating key (%d bits): %s", key_size, key_path) 60 61 return le_util.Key(key_path, key_pem) 62 63 64 def init_save_csr(privkey, names, path, csrname="csr-letsencrypt.pem"): 65 """Initialize a CSR with the given private key. 66 67 :param privkey: Key to include in the CSR 68 :type privkey: :class:`letsencrypt.le_util.Key` 69 70 :param set names: `str` names to include in the CSR 71 72 :param str path: Certificate save directory. 73 74 :returns: CSR 75 :rtype: :class:`letsencrypt.le_util.CSR` 76 77 """ 78 csr_pem, csr_der = make_csr(privkey.pem, names) 79 80 config = zope.component.getUtility(interfaces.IConfig) 81 # Save CSR 82 le_util.make_or_verify_dir(path, 0o755, os.geteuid(), 83 config.strict_permissions) 84 csr_f, csr_filename = le_util.unique_file( 85 os.path.join(path, csrname), 0o644) 86 csr_f.write(csr_pem) 87 csr_f.close() 88 89 logger.info("Creating CSR: %s", csr_filename) 90 91 return le_util.CSR(csr_filename, csr_der, "der") 92 93 94 # Lower level functions 95 def make_csr(key_str, domains): 96 """Generate a CSR. 97 98 :param str key_str: PEM-encoded RSA key. 99 :param list domains: Domains included in the certificate. 100 101 .. todo:: Detect duplicates in `domains`? Using a set doesn't 102 preserve order... 103 104 :returns: new CSR in PEM and DER form containing all domains 105 :rtype: tuple 106 107 """ 108 assert domains, "Must provide one or more hostnames for the CSR." 109 pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str) 110 req = OpenSSL.crypto.X509Req() 111 req.get_subject().CN = domains[0] 112 # TODO: what to put into req.get_subject()? 113 # TODO: put SAN if len(domains) > 1 114 req.add_extensions([ 115 OpenSSL.crypto.X509Extension( 116 "subjectAltName", 117 critical=False, 118 value=", ".join("DNS:%s" % d for d in domains) 119 ), 120 ]) 121 req.set_pubkey(pkey) 122 req.sign(pkey, "sha256") 123 return tuple(OpenSSL.crypto.dump_certificate_request(method, req) 124 for method in (OpenSSL.crypto.FILETYPE_PEM, 125 OpenSSL.crypto.FILETYPE_ASN1)) 126 127 128 # WARNING: the csr and private key file are possible attack vectors for TOCTOU 129 # We should either... 130 # A. Do more checks to verify that the CSR is trusted/valid 131 # B. Audit the parsing code for vulnerabilities 132 133 def valid_csr(csr): 134 """Validate CSR. 135 136 Check if `csr` is a valid CSR for the given domains. 137 138 :param str csr: CSR in PEM. 139 140 :returns: Validity of CSR. 141 :rtype: bool 142 143 """ 144 try: 145 req = OpenSSL.crypto.load_certificate_request( 146 OpenSSL.crypto.FILETYPE_PEM, csr) 147 return req.verify(req.get_pubkey()) 148 except OpenSSL.crypto.Error as error: 149 logger.debug(error, exc_info=True) 150 return False 151 152 153 def csr_matches_pubkey(csr, privkey): 154 """Does private key correspond to the subject public key in the CSR? 155 156 :param str csr: CSR in PEM. 157 :param str privkey: Private key file contents (PEM) 158 159 :returns: Correspondence of private key to CSR subject public key. 160 :rtype: bool 161 162 """ 163 req = OpenSSL.crypto.load_certificate_request( 164 OpenSSL.crypto.FILETYPE_PEM, csr) 165 pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey) 166 try: 167 return req.verify(pkey) 168 except OpenSSL.crypto.Error as error: 169 logger.debug(error, exc_info=True) 170 return False 171 172 173 def make_key(bits): 174 """Generate PEM encoded RSA key. 175 176 :param int bits: Number of bits, at least 1024. 177 178 :returns: new RSA key in PEM form with specified number of bits 179 :rtype: str 180 181 """ 182 assert bits >= 1024 # XXX 183 key = OpenSSL.crypto.PKey() 184 key.generate_key(OpenSSL.crypto.TYPE_RSA, bits) 185 return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key) 186 187 188 def valid_privkey(privkey): 189 """Is valid RSA private key? 190 191 :param str privkey: Private key file contents in PEM 192 193 :returns: Validity of private key. 194 :rtype: bool 195 196 """ 197 try: 198 return OpenSSL.crypto.load_privatekey( 199 OpenSSL.crypto.FILETYPE_PEM, privkey).check() 200 except (TypeError, OpenSSL.crypto.Error): 201 return False 202 203 204 def pyopenssl_load_certificate(data): 205 """Load PEM/DER certificate. 206 207 :raises errors.Error: 208 209 """ 210 211 openssl_errors = [] 212 213 for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1): 214 try: 215 return OpenSSL.crypto.load_certificate(file_type, data), file_type 216 except OpenSSL.crypto.Error as error: # TODO: other errors? 217 openssl_errors.append(error) 218 raise errors.Error("Unable to load: {0}".format(",".join( 219 str(error) for error in openssl_errors))) 220 221 222 def _get_sans_from_cert_or_req(cert_or_req_str, load_func, 223 typ=OpenSSL.crypto.FILETYPE_PEM): 224 try: 225 cert_or_req = load_func(typ, cert_or_req_str) 226 except OpenSSL.crypto.Error as error: 227 logger.exception(error) 228 raise 229 # pylint: disable=protected-access 230 return acme_crypto_util._pyopenssl_cert_or_req_san(cert_or_req) 231 232 233 def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM): 234 """Get a list of Subject Alternative Names from a certificate. 235 236 :param str cert: Certificate (encoded). 237 :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` 238 239 :returns: A list of Subject Alternative Names. 240 :rtype: list 241 242 """ 243 return _get_sans_from_cert_or_req( 244 cert, OpenSSL.crypto.load_certificate, typ) 245 246 247 def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM): 248 """Get a list of Subject Alternative Names from a CSR. 249 250 :param str csr: CSR (encoded). 251 :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` 252 253 :returns: A list of Subject Alternative Names. 254 :rtype: list 255 256 """ 257 return _get_sans_from_cert_or_req( 258 csr, OpenSSL.crypto.load_certificate_request, typ) 259 260 261 def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): 262 """Dump certificate chain into a bundle. 263 264 :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in 265 `acme.jose.ComparableX509`). 266 267 """ 268 # XXX: returns empty string when no chain is available, which 269 # shuts up RenewableCert, but might not be the best solution... 270 271 def _dump_cert(cert): 272 if isinstance(cert, jose.ComparableX509): 273 # pylint: disable=protected-access 274 cert = cert._wrapped 275 return OpenSSL.crypto.dump_certificate(filetype, cert) 276 277 # assumes that OpenSSL.crypto.dump_certificate includes ending 278 # newline character 279 return "".join(_dump_cert(cert) for cert in chain) 280 281 282 def notBefore(cert_path): 283 """When does the cert at cert_path start being valid? 284 285 :param str cert_path: path to a cert in PEM format 286 287 :returns: the notBefore value from the cert at cert_path 288 :rtype: :class:`datetime.datetime` 289 290 """ 291 return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notBefore) 292 293 294 def notAfter(cert_path): 295 """When does the cert at cert_path stop being valid? 296 297 :param str cert_path: path to a cert in PEM format 298 299 :returns: the notAfter value from the cert at cert_path 300 :rtype: :class:`datetime.datetime` 301 302 """ 303 return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notAfter) 304 305 306 def _notAfterBefore(cert_path, method): 307 """Internal helper function for finding notbefore/notafter. 308 309 :param str cert_path: path to a cert in PEM format 310 :param function method: one of ``OpenSSL.crypto.X509.get_notBefore`` 311 or ``OpenSSL.crypto.X509.get_notAfter`` 312 313 :returns: the notBefore or notAfter value from the cert at cert_path 314 :rtype: :class:`datetime.datetime` 315 316 """ 317 with open(cert_path) as f: 318 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, 319 f.read()) 320 timestamp = method(x509) 321 reformatted_timestamp = [timestamp[0:4], "-", timestamp[4:6], "-", 322 timestamp[6:8], "T", timestamp[8:10], ":", 323 timestamp[10:12], ":", timestamp[12:]] 324 return pyrfc3339.parse("".join(reformatted_timestamp))