test_driver.py
  1  """Tests Let's Encrypt plugins against different server configurations."""
  2  import argparse
  3  import filecmp
  4  import functools
  5  import logging
  6  import os
  7  import shutil
  8  import tempfile
  9  import time
 10  
 11  import OpenSSL
 12  
 13  from acme import challenges
 14  from acme import crypto_util
 15  from acme import messages
 16  from letsencrypt import achallenges
 17  from letsencrypt import errors as le_errors
 18  from letsencrypt import validator
 19  from letsencrypt.tests import acme_util
 20  
 21  from letsencrypt_compatibility_test import errors
 22  from letsencrypt_compatibility_test import util
 23  from letsencrypt_compatibility_test.configurators.apache import apache24
 24  
 25  
 26  DESCRIPTION = """
 27  Tests Let's Encrypt plugins against different server configuratons. It is
 28  assumed that Docker is already installed. If no test types is specified, all
 29  tests that the plugin supports are performed.
 30  
 31  """
 32  
 33  PLUGINS = {"apache": apache24.Proxy}
 34  
 35  
 36  logger = logging.getLogger(__name__)
 37  
 38  
 39  def test_authenticator(plugin, config, temp_dir):
 40      """Tests authenticator, returning True if the tests are successful"""
 41      backup = _create_backup(config, temp_dir)
 42  
 43      achalls = _create_achalls(plugin)
 44      if not achalls:
 45          logger.error("The plugin and this program support no common "
 46                       "challenge types")
 47          return False
 48  
 49      try:
 50          responses = plugin.perform(achalls)
 51      except le_errors.Error as error:
 52          logger.error("Performing challenges on %s caused an error:", config)
 53          logger.exception(error)
 54          return False
 55  
 56      success = True
 57      for i in xrange(len(responses)):
 58          if not responses[i]:
 59              logger.error(
 60                  "Plugin failed to complete %s for %s in %s",
 61                  type(achalls[i]), achalls[i].domain, config)
 62              success = False
 63          elif isinstance(responses[i], challenges.DVSNIResponse):
 64              verify = functools.partial(responses[i].simple_verify, achalls[i],
 65                                         achalls[i].domain,
 66                                         util.JWK.public_key(),
 67                                         host="127.0.0.1",
 68                                         port=plugin.https_port)
 69              if _try_until_true(verify):
 70                  logger.info(
 71                      "DVSNI verification for %s succeeded", achalls[i].domain)
 72              else:
 73                  logger.error(
 74                      "DVSNI verification for %s in %s failed",
 75                      achalls[i].domain, config)
 76                  success = False
 77  
 78      if success:
 79          try:
 80              plugin.cleanup(achalls)
 81          except le_errors.Error as error:
 82              logger.error("Challenge cleanup for %s caused an error:", config)
 83              logger.exception(error)
 84              success = False
 85  
 86          if _dirs_are_unequal(config, backup):
 87              logger.error("Challenge cleanup failed for %s", config)
 88              return False
 89          else:
 90              logger.info("Challenge cleanup succeeded")
 91  
 92      return success
 93  
 94  
 95  def _create_achalls(plugin):
 96      """Returns a list of annotated challenges to test on plugin"""
 97      achalls = list()
 98      names = plugin.get_testable_domain_names()
 99      for domain in names:
100          prefs = plugin.get_chall_pref(domain)
101          for chall_type in prefs:
102              if chall_type == challenges.DVSNI:
103                  chall = challenges.DVSNI(
104                      token=os.urandom(challenges.DVSNI.TOKEN_SIZE))
105                  challb = acme_util.chall_to_challb(
106                      chall, messages.STATUS_PENDING)
107                  achall = achallenges.DVSNI(
108                      challb=challb, domain=domain, account_key=util.JWK)
109                  achalls.append(achall)
110  
111      return achalls
112  
113  
114  def test_installer(args, plugin, config, temp_dir):
115      """Tests plugin as an installer"""
116      backup = _create_backup(config, temp_dir)
117  
118      names_match = plugin.get_all_names() == plugin.get_all_names_answer()
119      if names_match:
120          logger.info("get_all_names test succeeded")
121      else:
122          logger.error("get_all_names test failed for config %s", config)
123  
124      domains = list(plugin.get_testable_domain_names())
125      success = test_deploy_cert(plugin, temp_dir, domains)
126  
127      if success and args.enhance:
128          success = test_enhancements(plugin, domains)
129  
130      good_rollback = test_rollback(plugin, config, backup)
131      return names_match and success and good_rollback
132  
133  
134  def test_deploy_cert(plugin, temp_dir, domains):
135      """Tests deploy_cert returning True if the tests are successful"""
136      cert = crypto_util.gen_ss_cert(util.KEY, domains)
137      cert_path = os.path.join(temp_dir, "cert.pem")
138      with open(cert_path, "w") as f:
139          f.write(OpenSSL.crypto.dump_certificate(
140              OpenSSL.crypto.FILETYPE_PEM, cert))
141  
142      for domain in domains:
143          try:
144              plugin.deploy_cert(domain, cert_path, util.KEY_PATH)
145          except le_errors.Error as error:
146              logger.error("Plugin failed to deploy ceritificate for %s:", domain)
147              logger.exception(error)
148              return False
149  
150      if not _save_and_restart(plugin, "deployed"):
151          return False
152  
153      success = True
154      for domain in domains:
155          verify = functools.partial(validator.Validator().certificate, cert,
156                                     domain, "127.0.0.1", plugin.https_port)
157          if not _try_until_true(verify):
158              logger.error("Could not verify certificate for domain %s", domain)
159              success = False
160  
161      if success:
162          logger.info("HTTPS validation succeeded")
163  
164      return success
165  
166  
167  def test_enhancements(plugin, domains):
168      """Tests supported enhancements returning True if successful"""
169      supported = plugin.supported_enhancements()
170  
171      if "redirect" not in supported:
172          logger.error("The plugin and this program support no common "
173                       "enhancements")
174          return False
175  
176      for domain in domains:
177          try:
178              plugin.enhance(domain, "redirect")
179          except le_errors.PluginError as error:
180              # Don't immediately fail because a redirect may already be enabled
181              logger.warning("Plugin failed to enable redirect for %s:", domain)
182              logger.warning("%s", error)
183          except le_errors.Error as error:
184              logger.error("An error occurred while enabling redirect for %s:",
185                           domain)
186              logger.exception(error)
187  
188      if not _save_and_restart(plugin, "enhanced"):
189          return False
190  
191      success = True
192      for domain in domains:
193          verify = functools.partial(validator.Validator().redirect, "localhost",
194                                     plugin.http_port, headers={"Host": domain})
195          if not _try_until_true(verify):
196              logger.error("Improper redirect for domain %s", domain)
197              success = False
198  
199      if success:
200          logger.info("Enhancments test succeeded")
201  
202      return success
203  
204  
205  def _try_until_true(func, max_tries=5, sleep_time=0.5):
206      """Calls func up to max_tries times until it returns True"""
207      for _ in xrange(0, max_tries):
208          if func():
209              return True
210          else:
211              time.sleep(sleep_time)
212  
213      return False
214  
215  
216  def _save_and_restart(plugin, title=None):
217      """Saves and restart the plugin, returning True if no errors occurred"""
218      try:
219          plugin.save(title)
220          plugin.restart()
221          return True
222      except le_errors.Error as error:
223          logger.error("Plugin failed to save and restart server:")
224          logger.exception(error)
225          return False
226  
227  
228  def test_rollback(plugin, config, backup):
229      """Tests the rollback checkpoints function"""
230      try:
231          plugin.rollback_checkpoints(1337)
232      except le_errors.Error as error:
233          logger.error("Plugin raised an exception during rollback:")
234          logger.exception(error)
235          return False
236  
237      if _dirs_are_unequal(config, backup):
238          logger.error("Rollback failed for config `%s`", config)
239          return False
240      else:
241          logger.info("Rollback succeeded")
242          return True
243  
244  
245  def _create_backup(config, temp_dir):
246      """Creates a backup of config in temp_dir"""
247      backup = os.path.join(temp_dir, "backup")
248      shutil.rmtree(backup, ignore_errors=True)
249      shutil.copytree(config, backup, symlinks=True)
250  
251      return backup
252  
253  
254  def _dirs_are_unequal(dir1, dir2):
255      """Returns True if dir1 and dir2 are unequal"""
256      dircmps = [filecmp.dircmp(dir1, dir2)]
257      while len(dircmps):
258          dircmp = dircmps.pop()
259          if dircmp.left_only or dircmp.right_only:
260              logger.error("The following files and directories are only "
261                           "present in one directory")
262              if dircmp.left_only:
263                  logger.error(dircmp.left_only)
264              else:
265                  logger.error(dircmp.right_only)
266              return True
267          elif dircmp.common_funny or dircmp.funny_files:
268              logger.error("The following files and directories could not be "
269                           "compared:")
270              if dircmp.common_funny:
271                  logger.error(dircmp.common_funny)
272              else:
273                  logger.error(dircmp.funny_files)
274              return True
275          elif dircmp.diff_files:
276              logger.error("The following files differ:")
277              logger.error(dircmp.diff_files)
278              return True
279  
280          for subdir in dircmp.subdirs.itervalues():
281              dircmps.append(subdir)
282  
283      return False
284  
285  
286  def get_args():
287      """Returns parsed command line arguments."""
288      parser = argparse.ArgumentParser(
289          description=DESCRIPTION,
290          formatter_class=argparse.ArgumentDefaultsHelpFormatter)
291  
292      group = parser.add_argument_group("general")
293      group.add_argument(
294          "-c", "--configs", default="configs.tar.gz",
295          help="a directory or tarball containing server configurations")
296      group.add_argument(
297          "-p", "--plugin", default="apache", help="the plugin to be tested")
298      group.add_argument(
299          "-v", "--verbose", dest="verbose_count", action="count",
300          default=0, help="you know how to use this")
301      group.add_argument(
302          "-a", "--auth", action="store_true",
303          help="tests the challenges the plugin supports")
304      group.add_argument(
305          "-i", "--install", action="store_true",
306          help="tests the plugin as an installer")
307      group.add_argument(
308          "-e", "--enhance", action="store_true", help="tests the enhancements "
309          "the plugin supports (implicitly includes installer tests)")
310  
311      for plugin in PLUGINS.itervalues():
312          plugin.add_parser_arguments(parser)
313  
314      args = parser.parse_args()
315      if args.enhance:
316          args.install = True
317      elif not (args.auth or args.install):
318          args.auth = args.install = args.enhance = True
319  
320      return args
321  
322  
323  def setup_logging(args):
324      """Prepares logging for the program"""
325      handler = logging.StreamHandler()
326  
327      root_logger = logging.getLogger()
328      root_logger.setLevel(logging.ERROR - args.verbose_count * 10)
329      root_logger.addHandler(handler)
330  
331  
332  def main():
333      """Main test script execution."""
334      args = get_args()
335      setup_logging(args)
336  
337      if args.plugin not in PLUGINS:
338          raise errors.Error("Unknown plugin {0}".format(args.plugin))
339  
340      temp_dir = tempfile.mkdtemp()
341      plugin = PLUGINS[args.plugin](args)
342      try:
343          plugin.execute_in_docker("mkdir -p /var/log/apache2")
344          while plugin.has_more_configs():
345              success = True
346  
347              try:
348                  config = plugin.load_config()
349                  logger.info("Loaded configuration: %s", config)
350                  if args.auth:
351                      success = test_authenticator(plugin, config, temp_dir)
352                  if success and args.install:
353                      success = test_installer(args, plugin, config, temp_dir)
354              except errors.Error as error:
355                  logger.error("Tests on %s raised:", config)
356                  logger.exception(error)
357                  success = False
358  
359              if success:
360                  logger.info("All tests on %s succeeded", config)
361              else:
362                  logger.error("Tests on %s failed", config)
363      finally:
364          plugin.cleanup_from_tests()
365  
366  
367  if __name__ == "__main__":
368      main()