standalone_test.py
1 """Tests for letsencrypt.plugins.standalone.""" 2 import argparse 3 import socket 4 import unittest 5 6 import mock 7 import six 8 9 from acme import challenges 10 from acme import jose 11 from acme import standalone as acme_standalone 12 13 from letsencrypt import achallenges 14 from letsencrypt import errors 15 from letsencrypt import interfaces 16 17 from letsencrypt.tests import acme_util 18 from letsencrypt.tests import test_util 19 20 21 class ServerManagerTest(unittest.TestCase): 22 """Tests for letsencrypt.plugins.standalone.ServerManager.""" 23 24 def setUp(self): 25 from letsencrypt.plugins.standalone import ServerManager 26 self.certs = {} 27 self.simple_http_resources = {} 28 self.mgr = ServerManager(self.certs, self.simple_http_resources) 29 30 def test_init(self): 31 self.assertTrue(self.mgr.certs is self.certs) 32 self.assertTrue( 33 self.mgr.simple_http_resources is self.simple_http_resources) 34 35 def _test_run_stop(self, challenge_type): 36 server = self.mgr.run(port=0, challenge_type=challenge_type) 37 port = server.socket.getsockname()[1] # pylint: disable=no-member 38 self.assertEqual(self.mgr.running(), {port: server}) 39 self.mgr.stop(port=port) 40 self.assertEqual(self.mgr.running(), {}) 41 42 def test_run_stop_dvsni(self): 43 self._test_run_stop(challenges.DVSNI) 44 45 def test_run_stop_simplehttp(self): 46 self._test_run_stop(challenges.SimpleHTTP) 47 48 def test_run_idempotent(self): 49 server = self.mgr.run(port=0, challenge_type=challenges.SimpleHTTP) 50 port = server.socket.getsockname()[1] # pylint: disable=no-member 51 server2 = self.mgr.run(port=port, challenge_type=challenges.SimpleHTTP) 52 self.assertEqual(self.mgr.running(), {port: server}) 53 self.assertTrue(server is server2) 54 self.mgr.stop(port) 55 self.assertEqual(self.mgr.running(), {}) 56 57 def test_run_bind_error(self): 58 some_server = socket.socket() 59 some_server.bind(("", 0)) 60 port = some_server.getsockname()[1] 61 self.assertRaises( 62 errors.StandaloneBindError, self.mgr.run, port, 63 challenge_type=challenges.SimpleHTTP) 64 self.assertEqual(self.mgr.running(), {}) 65 66 67 class SupportedChallengesValidatorTest(unittest.TestCase): 68 """Tests for plugins.standalone.supported_challenges_validator.""" 69 70 def _call(self, data): 71 from letsencrypt.plugins.standalone import ( 72 supported_challenges_validator) 73 return supported_challenges_validator(data) 74 75 def test_correct(self): 76 self.assertEqual("dvsni", self._call("dvsni")) 77 self.assertEqual("simpleHttp", self._call("simpleHttp")) 78 self.assertEqual("dvsni,simpleHttp", self._call("dvsni,simpleHttp")) 79 self.assertEqual("simpleHttp,dvsni", self._call("simpleHttp,dvsni")) 80 81 def test_unrecognized(self): 82 assert "foo" not in challenges.Challenge.TYPES 83 self.assertRaises(argparse.ArgumentTypeError, self._call, "foo") 84 85 def test_not_subset(self): 86 self.assertRaises(argparse.ArgumentTypeError, self._call, "dns") 87 88 89 class AuthenticatorTest(unittest.TestCase): 90 """Tests for letsencrypt.plugins.standalone.Authenticator.""" 91 92 def setUp(self): 93 from letsencrypt.plugins.standalone import Authenticator 94 self.config = mock.MagicMock(dvsni_port=1234, simple_http_port=4321, 95 standalone_supported_challenges="dvsni,simpleHttp") 96 self.auth = Authenticator(self.config, name="standalone") 97 98 def test_supported_challenges(self): 99 self.assertEqual(self.auth.supported_challenges, 100 set([challenges.DVSNI, challenges.SimpleHTTP])) 101 102 def test_more_info(self): 103 self.assertTrue(isinstance(self.auth.more_info(), six.string_types)) 104 105 def test_get_chall_pref(self): 106 self.assertEqual(set(self.auth.get_chall_pref(domain=None)), 107 set([challenges.DVSNI, challenges.SimpleHTTP])) 108 109 @mock.patch("letsencrypt.plugins.standalone.util") 110 def test_perform_misconfiguration(self, mock_util): 111 mock_util.already_listening.return_value = True 112 self.assertRaises(errors.MisconfigurationError, self.auth.perform, []) 113 mock_util.already_listening.assert_called_once_with(1234) 114 115 @mock.patch("letsencrypt.plugins.standalone.zope.component.getUtility") 116 def test_perform(self, unused_mock_get_utility): 117 achalls = [1, 2, 3] 118 self.auth.perform2 = mock.Mock(return_value=mock.sentinel.responses) 119 self.assertEqual(mock.sentinel.responses, self.auth.perform(achalls)) 120 self.auth.perform2.assert_called_once_with(achalls) 121 122 @mock.patch("letsencrypt.plugins.standalone.zope.component.getUtility") 123 def _test_perform_bind_errors(self, errno, achalls, mock_get_utility): 124 def _perform2(unused_achalls): 125 raise errors.StandaloneBindError(mock.Mock(errno=errno), 1234) 126 127 self.auth.perform2 = mock.MagicMock(side_effect=_perform2) 128 self.auth.perform(achalls) 129 mock_get_utility.assert_called_once_with(interfaces.IDisplay) 130 notification = mock_get_utility.return_value.notification 131 self.assertEqual(1, notification.call_count) 132 self.assertTrue("1234" in notification.call_args[0][0]) 133 134 def test_perform_eacces(self): 135 # pylint: disable=no-value-for-parameter 136 self._test_perform_bind_errors(socket.errno.EACCES, []) 137 138 def test_perform_eaddrinuse(self): 139 # pylint: disable=no-value-for-parameter 140 self._test_perform_bind_errors(socket.errno.EADDRINUSE, []) 141 142 def test_perfom_unknown_bind_error(self): 143 self.assertRaises( 144 errors.StandaloneBindError, self._test_perform_bind_errors, 145 socket.errno.ENOTCONN, []) 146 147 def test_perform2(self): 148 domain = b'localhost' 149 key = jose.JWK.load(test_util.load_vector('rsa512_key.pem')) 150 simple_http = achallenges.SimpleHTTP( 151 challb=acme_util.SIMPLE_HTTP_P, domain=domain, account_key=key) 152 dvsni = achallenges.DVSNI( 153 challb=acme_util.DVSNI_P, domain=domain, account_key=key) 154 155 self.auth.servers = mock.MagicMock() 156 157 def _run(port, tls): # pylint: disable=unused-argument 158 return "server{0}".format(port) 159 160 self.auth.servers.run.side_effect = _run 161 responses = self.auth.perform2([simple_http, dvsni]) 162 163 self.assertTrue(isinstance(responses, list)) 164 self.assertEqual(2, len(responses)) 165 self.assertTrue(isinstance(responses[0], challenges.SimpleHTTPResponse)) 166 self.assertTrue(isinstance(responses[1], challenges.DVSNIResponse)) 167 168 self.assertEqual(self.auth.servers.run.mock_calls, [ 169 mock.call(4321, challenges.SimpleHTTP), 170 mock.call(1234, challenges.DVSNI), 171 ]) 172 self.assertEqual(self.auth.served, { 173 "server1234": set([dvsni]), 174 "server4321": set([simple_http]), 175 }) 176 self.assertEqual(1, len(self.auth.simple_http_resources)) 177 self.assertEqual(2, len(self.auth.certs)) 178 self.assertEqual(list(self.auth.simple_http_resources), [ 179 acme_standalone.SimpleHTTPRequestHandler.SimpleHTTPResource( 180 acme_util.SIMPLE_HTTP, responses[0], mock.ANY)]) 181 182 def test_cleanup(self): 183 self.auth.servers = mock.Mock() 184 self.auth.servers.running.return_value = { 185 1: "server1", 186 2: "server2", 187 } 188 self.auth.served["server1"].add("chall1") 189 self.auth.served["server2"].update(["chall2", "chall3"]) 190 191 self.auth.cleanup(["chall1"]) 192 self.assertEqual(self.auth.served, { 193 "server1": set(), "server2": set(["chall2", "chall3"])}) 194 self.auth.servers.stop.assert_called_once_with(1) 195 196 self.auth.servers.running.return_value = { 197 2: "server2", 198 } 199 self.auth.cleanup(["chall2"]) 200 self.assertEqual(self.auth.served, { 201 "server1": set(), "server2": set(["chall3"])}) 202 self.assertEqual(1, self.auth.servers.stop.call_count) 203 204 self.auth.cleanup(["chall3"]) 205 self.assertEqual(self.auth.served, { 206 "server1": set(), "server2": set([])}) 207 self.auth.servers.stop.assert_called_with(2) 208 209 if __name__ == "__main__": 210 unittest.main() # pragma: no cover