/ letsencrypt / plugins / standalone_test.py
standalone_test.py
  1  """Tests for letsencrypt.plugins.standalone."""
  2  import argparse
  3  import socket
  4  import unittest
  5  
  6  import mock
  7  import six
  8  
  9  from acme import challenges
 10  from acme import jose
 11  from acme import standalone as acme_standalone
 12  
 13  from letsencrypt import achallenges
 14  from letsencrypt import errors
 15  from letsencrypt import interfaces
 16  
 17  from letsencrypt.tests import acme_util
 18  from letsencrypt.tests import test_util
 19  
 20  
 21  class ServerManagerTest(unittest.TestCase):
 22      """Tests for letsencrypt.plugins.standalone.ServerManager."""
 23  
 24      def setUp(self):
 25          from letsencrypt.plugins.standalone import ServerManager
 26          self.certs = {}
 27          self.simple_http_resources = {}
 28          self.mgr = ServerManager(self.certs, self.simple_http_resources)
 29  
 30      def test_init(self):
 31          self.assertTrue(self.mgr.certs is self.certs)
 32          self.assertTrue(
 33              self.mgr.simple_http_resources is self.simple_http_resources)
 34  
 35      def _test_run_stop(self, challenge_type):
 36          server = self.mgr.run(port=0, challenge_type=challenge_type)
 37          port = server.socket.getsockname()[1]  # pylint: disable=no-member
 38          self.assertEqual(self.mgr.running(), {port: server})
 39          self.mgr.stop(port=port)
 40          self.assertEqual(self.mgr.running(), {})
 41  
 42      def test_run_stop_dvsni(self):
 43          self._test_run_stop(challenges.DVSNI)
 44  
 45      def test_run_stop_simplehttp(self):
 46          self._test_run_stop(challenges.SimpleHTTP)
 47  
 48      def test_run_idempotent(self):
 49          server = self.mgr.run(port=0, challenge_type=challenges.SimpleHTTP)
 50          port = server.socket.getsockname()[1]  # pylint: disable=no-member
 51          server2 = self.mgr.run(port=port, challenge_type=challenges.SimpleHTTP)
 52          self.assertEqual(self.mgr.running(), {port: server})
 53          self.assertTrue(server is server2)
 54          self.mgr.stop(port)
 55          self.assertEqual(self.mgr.running(), {})
 56  
 57      def test_run_bind_error(self):
 58          some_server = socket.socket()
 59          some_server.bind(("", 0))
 60          port = some_server.getsockname()[1]
 61          self.assertRaises(
 62              errors.StandaloneBindError, self.mgr.run, port,
 63              challenge_type=challenges.SimpleHTTP)
 64          self.assertEqual(self.mgr.running(), {})
 65  
 66  
 67  class SupportedChallengesValidatorTest(unittest.TestCase):
 68      """Tests for plugins.standalone.supported_challenges_validator."""
 69  
 70      def _call(self, data):
 71          from letsencrypt.plugins.standalone import (
 72              supported_challenges_validator)
 73          return supported_challenges_validator(data)
 74  
 75      def test_correct(self):
 76          self.assertEqual("dvsni", self._call("dvsni"))
 77          self.assertEqual("simpleHttp", self._call("simpleHttp"))
 78          self.assertEqual("dvsni,simpleHttp", self._call("dvsni,simpleHttp"))
 79          self.assertEqual("simpleHttp,dvsni", self._call("simpleHttp,dvsni"))
 80  
 81      def test_unrecognized(self):
 82          assert "foo" not in challenges.Challenge.TYPES
 83          self.assertRaises(argparse.ArgumentTypeError, self._call, "foo")
 84  
 85      def test_not_subset(self):
 86          self.assertRaises(argparse.ArgumentTypeError, self._call, "dns")
 87  
 88  
 89  class AuthenticatorTest(unittest.TestCase):
 90      """Tests for letsencrypt.plugins.standalone.Authenticator."""
 91  
 92      def setUp(self):
 93          from letsencrypt.plugins.standalone import Authenticator
 94          self.config = mock.MagicMock(dvsni_port=1234, simple_http_port=4321,
 95                                       standalone_supported_challenges="dvsni,simpleHttp")
 96          self.auth = Authenticator(self.config, name="standalone")
 97  
 98      def test_supported_challenges(self):
 99          self.assertEqual(self.auth.supported_challenges,
100                           set([challenges.DVSNI, challenges.SimpleHTTP]))
101  
102      def test_more_info(self):
103          self.assertTrue(isinstance(self.auth.more_info(), six.string_types))
104  
105      def test_get_chall_pref(self):
106          self.assertEqual(set(self.auth.get_chall_pref(domain=None)),
107                           set([challenges.DVSNI, challenges.SimpleHTTP]))
108  
109      @mock.patch("letsencrypt.plugins.standalone.util")
110      def test_perform_misconfiguration(self, mock_util):
111          mock_util.already_listening.return_value = True
112          self.assertRaises(errors.MisconfigurationError, self.auth.perform, [])
113          mock_util.already_listening.assert_called_once_with(1234)
114  
115      @mock.patch("letsencrypt.plugins.standalone.zope.component.getUtility")
116      def test_perform(self, unused_mock_get_utility):
117          achalls = [1, 2, 3]
118          self.auth.perform2 = mock.Mock(return_value=mock.sentinel.responses)
119          self.assertEqual(mock.sentinel.responses, self.auth.perform(achalls))
120          self.auth.perform2.assert_called_once_with(achalls)
121  
122      @mock.patch("letsencrypt.plugins.standalone.zope.component.getUtility")
123      def _test_perform_bind_errors(self, errno, achalls, mock_get_utility):
124          def _perform2(unused_achalls):
125              raise errors.StandaloneBindError(mock.Mock(errno=errno), 1234)
126  
127          self.auth.perform2 = mock.MagicMock(side_effect=_perform2)
128          self.auth.perform(achalls)
129          mock_get_utility.assert_called_once_with(interfaces.IDisplay)
130          notification = mock_get_utility.return_value.notification
131          self.assertEqual(1, notification.call_count)
132          self.assertTrue("1234" in notification.call_args[0][0])
133  
134      def test_perform_eacces(self):
135          # pylint: disable=no-value-for-parameter
136          self._test_perform_bind_errors(socket.errno.EACCES, [])
137  
138      def test_perform_eaddrinuse(self):
139          # pylint: disable=no-value-for-parameter
140          self._test_perform_bind_errors(socket.errno.EADDRINUSE, [])
141  
142      def test_perfom_unknown_bind_error(self):
143          self.assertRaises(
144              errors.StandaloneBindError, self._test_perform_bind_errors,
145              socket.errno.ENOTCONN, [])
146  
147      def test_perform2(self):
148          domain = b'localhost'
149          key = jose.JWK.load(test_util.load_vector('rsa512_key.pem'))
150          simple_http = achallenges.SimpleHTTP(
151              challb=acme_util.SIMPLE_HTTP_P, domain=domain, account_key=key)
152          dvsni = achallenges.DVSNI(
153              challb=acme_util.DVSNI_P, domain=domain, account_key=key)
154  
155          self.auth.servers = mock.MagicMock()
156  
157          def _run(port, tls):  # pylint: disable=unused-argument
158              return "server{0}".format(port)
159  
160          self.auth.servers.run.side_effect = _run
161          responses = self.auth.perform2([simple_http, dvsni])
162  
163          self.assertTrue(isinstance(responses, list))
164          self.assertEqual(2, len(responses))
165          self.assertTrue(isinstance(responses[0], challenges.SimpleHTTPResponse))
166          self.assertTrue(isinstance(responses[1], challenges.DVSNIResponse))
167  
168          self.assertEqual(self.auth.servers.run.mock_calls, [
169              mock.call(4321, challenges.SimpleHTTP),
170              mock.call(1234, challenges.DVSNI),
171          ])
172          self.assertEqual(self.auth.served, {
173              "server1234": set([dvsni]),
174              "server4321": set([simple_http]),
175          })
176          self.assertEqual(1, len(self.auth.simple_http_resources))
177          self.assertEqual(2, len(self.auth.certs))
178          self.assertEqual(list(self.auth.simple_http_resources), [
179              acme_standalone.SimpleHTTPRequestHandler.SimpleHTTPResource(
180                  acme_util.SIMPLE_HTTP, responses[0], mock.ANY)])
181  
182      def test_cleanup(self):
183          self.auth.servers = mock.Mock()
184          self.auth.servers.running.return_value = {
185              1: "server1",
186              2: "server2",
187          }
188          self.auth.served["server1"].add("chall1")
189          self.auth.served["server2"].update(["chall2", "chall3"])
190  
191          self.auth.cleanup(["chall1"])
192          self.assertEqual(self.auth.served, {
193              "server1": set(), "server2": set(["chall2", "chall3"])})
194          self.auth.servers.stop.assert_called_once_with(1)
195  
196          self.auth.servers.running.return_value = {
197              2: "server2",
198          }
199          self.auth.cleanup(["chall2"])
200          self.assertEqual(self.auth.served, {
201              "server1": set(), "server2": set(["chall3"])})
202          self.assertEqual(1, self.auth.servers.stop.call_count)
203  
204          self.auth.cleanup(["chall3"])
205          self.assertEqual(self.auth.served, {
206              "server1": set(), "server2": set([])})
207          self.auth.servers.stop.assert_called_with(2)
208  
209  if __name__ == "__main__":
210      unittest.main()  # pragma: no cover