test_driver.py
1 """Tests Let's Encrypt plugins against different server configurations.""" 2 import argparse 3 import filecmp 4 import functools 5 import logging 6 import os 7 import shutil 8 import tempfile 9 import time 10 11 import OpenSSL 12 13 from acme import challenges 14 from acme import crypto_util 15 from acme import messages 16 from letsencrypt import achallenges 17 from letsencrypt import errors as le_errors 18 from letsencrypt import validator 19 from letsencrypt.tests import acme_util 20 21 from letsencrypt_compatibility_test import errors 22 from letsencrypt_compatibility_test import util 23 from letsencrypt_compatibility_test.configurators.apache import apache24 24 25 26 DESCRIPTION = """ 27 Tests Let's Encrypt plugins against different server configuratons. It is 28 assumed that Docker is already installed. If no test types is specified, all 29 tests that the plugin supports are performed. 30 31 """ 32 33 PLUGINS = {"apache": apache24.Proxy} 34 35 36 logger = logging.getLogger(__name__) 37 38 39 def test_authenticator(plugin, config, temp_dir): 40 """Tests authenticator, returning True if the tests are successful""" 41 backup = _create_backup(config, temp_dir) 42 43 achalls = _create_achalls(plugin) 44 if not achalls: 45 logger.error("The plugin and this program support no common " 46 "challenge types") 47 return False 48 49 try: 50 responses = plugin.perform(achalls) 51 except le_errors.Error as error: 52 logger.error("Performing challenges on %s caused an error:", config) 53 logger.exception(error) 54 return False 55 56 success = True 57 for i in xrange(len(responses)): 58 if not responses[i]: 59 logger.error( 60 "Plugin failed to complete %s for %s in %s", 61 type(achalls[i]), achalls[i].domain, config) 62 success = False 63 elif isinstance(responses[i], challenges.DVSNIResponse): 64 verify = functools.partial(responses[i].simple_verify, achalls[i], 65 achalls[i].domain, 66 util.JWK.public_key(), 67 host="127.0.0.1", 68 port=plugin.https_port) 69 if _try_until_true(verify): 70 logger.info( 71 "DVSNI verification for %s succeeded", achalls[i].domain) 72 else: 73 logger.error( 74 "DVSNI verification for %s in %s failed", 75 achalls[i].domain, config) 76 success = False 77 78 if success: 79 try: 80 plugin.cleanup(achalls) 81 except le_errors.Error as error: 82 logger.error("Challenge cleanup for %s caused an error:", config) 83 logger.exception(error) 84 success = False 85 86 if _dirs_are_unequal(config, backup): 87 logger.error("Challenge cleanup failed for %s", config) 88 return False 89 else: 90 logger.info("Challenge cleanup succeeded") 91 92 return success 93 94 95 def _create_achalls(plugin): 96 """Returns a list of annotated challenges to test on plugin""" 97 achalls = list() 98 names = plugin.get_testable_domain_names() 99 for domain in names: 100 prefs = plugin.get_chall_pref(domain) 101 for chall_type in prefs: 102 if chall_type == challenges.DVSNI: 103 chall = challenges.DVSNI( 104 token=os.urandom(challenges.DVSNI.TOKEN_SIZE)) 105 challb = acme_util.chall_to_challb( 106 chall, messages.STATUS_PENDING) 107 achall = achallenges.DVSNI( 108 challb=challb, domain=domain, account_key=util.JWK) 109 achalls.append(achall) 110 111 return achalls 112 113 114 def test_installer(args, plugin, config, temp_dir): 115 """Tests plugin as an installer""" 116 backup = _create_backup(config, temp_dir) 117 118 names_match = plugin.get_all_names() == plugin.get_all_names_answer() 119 if names_match: 120 logger.info("get_all_names test succeeded") 121 else: 122 logger.error("get_all_names test failed for config %s", config) 123 124 domains = list(plugin.get_testable_domain_names()) 125 success = test_deploy_cert(plugin, temp_dir, domains) 126 127 if success and args.enhance: 128 success = test_enhancements(plugin, domains) 129 130 good_rollback = test_rollback(plugin, config, backup) 131 return names_match and success and good_rollback 132 133 134 def test_deploy_cert(plugin, temp_dir, domains): 135 """Tests deploy_cert returning True if the tests are successful""" 136 cert = crypto_util.gen_ss_cert(util.KEY, domains) 137 cert_path = os.path.join(temp_dir, "cert.pem") 138 with open(cert_path, "w") as f: 139 f.write(OpenSSL.crypto.dump_certificate( 140 OpenSSL.crypto.FILETYPE_PEM, cert)) 141 142 for domain in domains: 143 try: 144 plugin.deploy_cert(domain, cert_path, util.KEY_PATH) 145 except le_errors.Error as error: 146 logger.error("Plugin failed to deploy ceritificate for %s:", domain) 147 logger.exception(error) 148 return False 149 150 if not _save_and_restart(plugin, "deployed"): 151 return False 152 153 success = True 154 for domain in domains: 155 verify = functools.partial(validator.Validator().certificate, cert, 156 domain, "127.0.0.1", plugin.https_port) 157 if not _try_until_true(verify): 158 logger.error("Could not verify certificate for domain %s", domain) 159 success = False 160 161 if success: 162 logger.info("HTTPS validation succeeded") 163 164 return success 165 166 167 def test_enhancements(plugin, domains): 168 """Tests supported enhancements returning True if successful""" 169 supported = plugin.supported_enhancements() 170 171 if "redirect" not in supported: 172 logger.error("The plugin and this program support no common " 173 "enhancements") 174 return False 175 176 for domain in domains: 177 try: 178 plugin.enhance(domain, "redirect") 179 except le_errors.PluginError as error: 180 # Don't immediately fail because a redirect may already be enabled 181 logger.warning("Plugin failed to enable redirect for %s:", domain) 182 logger.warning("%s", error) 183 except le_errors.Error as error: 184 logger.error("An error occurred while enabling redirect for %s:", 185 domain) 186 logger.exception(error) 187 188 if not _save_and_restart(plugin, "enhanced"): 189 return False 190 191 success = True 192 for domain in domains: 193 verify = functools.partial(validator.Validator().redirect, "localhost", 194 plugin.http_port, headers={"Host": domain}) 195 if not _try_until_true(verify): 196 logger.error("Improper redirect for domain %s", domain) 197 success = False 198 199 if success: 200 logger.info("Enhancments test succeeded") 201 202 return success 203 204 205 def _try_until_true(func, max_tries=5, sleep_time=0.5): 206 """Calls func up to max_tries times until it returns True""" 207 for _ in xrange(0, max_tries): 208 if func(): 209 return True 210 else: 211 time.sleep(sleep_time) 212 213 return False 214 215 216 def _save_and_restart(plugin, title=None): 217 """Saves and restart the plugin, returning True if no errors occurred""" 218 try: 219 plugin.save(title) 220 plugin.restart() 221 return True 222 except le_errors.Error as error: 223 logger.error("Plugin failed to save and restart server:") 224 logger.exception(error) 225 return False 226 227 228 def test_rollback(plugin, config, backup): 229 """Tests the rollback checkpoints function""" 230 try: 231 plugin.rollback_checkpoints(1337) 232 except le_errors.Error as error: 233 logger.error("Plugin raised an exception during rollback:") 234 logger.exception(error) 235 return False 236 237 if _dirs_are_unequal(config, backup): 238 logger.error("Rollback failed for config `%s`", config) 239 return False 240 else: 241 logger.info("Rollback succeeded") 242 return True 243 244 245 def _create_backup(config, temp_dir): 246 """Creates a backup of config in temp_dir""" 247 backup = os.path.join(temp_dir, "backup") 248 shutil.rmtree(backup, ignore_errors=True) 249 shutil.copytree(config, backup, symlinks=True) 250 251 return backup 252 253 254 def _dirs_are_unequal(dir1, dir2): 255 """Returns True if dir1 and dir2 are unequal""" 256 dircmps = [filecmp.dircmp(dir1, dir2)] 257 while len(dircmps): 258 dircmp = dircmps.pop() 259 if dircmp.left_only or dircmp.right_only: 260 logger.error("The following files and directories are only " 261 "present in one directory") 262 if dircmp.left_only: 263 logger.error(dircmp.left_only) 264 else: 265 logger.error(dircmp.right_only) 266 return True 267 elif dircmp.common_funny or dircmp.funny_files: 268 logger.error("The following files and directories could not be " 269 "compared:") 270 if dircmp.common_funny: 271 logger.error(dircmp.common_funny) 272 else: 273 logger.error(dircmp.funny_files) 274 return True 275 elif dircmp.diff_files: 276 logger.error("The following files differ:") 277 logger.error(dircmp.diff_files) 278 return True 279 280 for subdir in dircmp.subdirs.itervalues(): 281 dircmps.append(subdir) 282 283 return False 284 285 286 def get_args(): 287 """Returns parsed command line arguments.""" 288 parser = argparse.ArgumentParser( 289 description=DESCRIPTION, 290 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 291 292 group = parser.add_argument_group("general") 293 group.add_argument( 294 "-c", "--configs", default="configs.tar.gz", 295 help="a directory or tarball containing server configurations") 296 group.add_argument( 297 "-p", "--plugin", default="apache", help="the plugin to be tested") 298 group.add_argument( 299 "-v", "--verbose", dest="verbose_count", action="count", 300 default=0, help="you know how to use this") 301 group.add_argument( 302 "-a", "--auth", action="store_true", 303 help="tests the challenges the plugin supports") 304 group.add_argument( 305 "-i", "--install", action="store_true", 306 help="tests the plugin as an installer") 307 group.add_argument( 308 "-e", "--enhance", action="store_true", help="tests the enhancements " 309 "the plugin supports (implicitly includes installer tests)") 310 311 for plugin in PLUGINS.itervalues(): 312 plugin.add_parser_arguments(parser) 313 314 args = parser.parse_args() 315 if args.enhance: 316 args.install = True 317 elif not (args.auth or args.install): 318 args.auth = args.install = args.enhance = True 319 320 return args 321 322 323 def setup_logging(args): 324 """Prepares logging for the program""" 325 handler = logging.StreamHandler() 326 327 root_logger = logging.getLogger() 328 root_logger.setLevel(logging.ERROR - args.verbose_count * 10) 329 root_logger.addHandler(handler) 330 331 332 def main(): 333 """Main test script execution.""" 334 args = get_args() 335 setup_logging(args) 336 337 if args.plugin not in PLUGINS: 338 raise errors.Error("Unknown plugin {0}".format(args.plugin)) 339 340 temp_dir = tempfile.mkdtemp() 341 plugin = PLUGINS[args.plugin](args) 342 try: 343 plugin.execute_in_docker("mkdir -p /var/log/apache2") 344 while plugin.has_more_configs(): 345 success = True 346 347 try: 348 config = plugin.load_config() 349 logger.info("Loaded configuration: %s", config) 350 if args.auth: 351 success = test_authenticator(plugin, config, temp_dir) 352 if success and args.install: 353 success = test_installer(args, plugin, config, temp_dir) 354 except errors.Error as error: 355 logger.error("Tests on %s raised:", config) 356 logger.exception(error) 357 success = False 358 359 if success: 360 logger.info("All tests on %s succeeded", config) 361 else: 362 logger.error("Tests on %s failed", config) 363 finally: 364 plugin.cleanup_from_tests() 365 366 367 if __name__ == "__main__": 368 main()