|  | @@ -0,0 +1,520 @@
 | 
	
		
			
				|  |  | +# Copyright 2017 gRPC authors.
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +# Licensed under the Apache License, Version 2.0 (the "License");
 | 
	
		
			
				|  |  | +# you may not use this file except in compliance with the License.
 | 
	
		
			
				|  |  | +# You may obtain a copy of the License at
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +#     http://www.apache.org/licenses/LICENSE-2.0
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +# Unless required by applicable law or agreed to in writing, software
 | 
	
		
			
				|  |  | +# distributed under the License is distributed on an "AS IS" BASIS,
 | 
	
		
			
				|  |  | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
	
		
			
				|  |  | +# See the License for the specific language governing permissions and
 | 
	
		
			
				|  |  | +# limitations under the License.
 | 
	
		
			
				|  |  | +"""
 | 
	
		
			
				|  |  | +This tests server certificate rotation support.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Here we test various aspects of gRPC Python, and in some cases C-core
 | 
	
		
			
				|  |  | +by extension, support for server certificate rotation.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +* ServerSSLCertReloadTestWithClientAuth: test ability to rotate
 | 
	
		
			
				|  |  | +  server's SSL cert for use in future channels with clients while not
 | 
	
		
			
				|  |  | +  affecting any existing channel. The server requires client
 | 
	
		
			
				|  |  | +  authentication.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +* ServerSSLCertReloadTestWithoutClientAuth: like
 | 
	
		
			
				|  |  | +  ServerSSLCertReloadTestWithClientAuth except that the server does
 | 
	
		
			
				|  |  | +  not authenticate the client.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +* ServerSSLCertReloadTestCertConfigReuse: tests gRPC Python's ability
 | 
	
		
			
				|  |  | +  to deal with user's reuse of ServerCertificateConfig instances.
 | 
	
		
			
				|  |  | +"""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import abc
 | 
	
		
			
				|  |  | +import collections
 | 
	
		
			
				|  |  | +import os
 | 
	
		
			
				|  |  | +import six
 | 
	
		
			
				|  |  | +import threading
 | 
	
		
			
				|  |  | +import unittest
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +from concurrent import futures
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import grpc
 | 
	
		
			
				|  |  | +from tests.unit import resources
 | 
	
		
			
				|  |  | +from tests.testing import _application_common
 | 
	
		
			
				|  |  | +from tests.testing import _server_application
 | 
	
		
			
				|  |  | +from tests.testing.proto import services_pb2_grpc
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +CA_1_PEM = resources.cert_hier_1_root_ca_cert()
 | 
	
		
			
				|  |  | +CA_2_PEM = resources.cert_hier_2_root_ca_cert()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +CLIENT_KEY_1_PEM = resources.cert_hier_1_client_1_key()
 | 
	
		
			
				|  |  | +CLIENT_CERT_CHAIN_1_PEM = (resources.cert_hier_1_client_1_cert() +
 | 
	
		
			
				|  |  | +                           resources.cert_hier_1_intermediate_ca_cert())
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +CLIENT_KEY_2_PEM = resources.cert_hier_2_client_1_key()
 | 
	
		
			
				|  |  | +CLIENT_CERT_CHAIN_2_PEM = (resources.cert_hier_2_client_1_cert() +
 | 
	
		
			
				|  |  | +                           resources.cert_hier_2_intermediate_ca_cert())
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +SERVER_KEY_1_PEM = resources.cert_hier_1_server_1_key()
 | 
	
		
			
				|  |  | +SERVER_CERT_CHAIN_1_PEM = (resources.cert_hier_1_server_1_cert() +
 | 
	
		
			
				|  |  | +                           resources.cert_hier_1_intermediate_ca_cert())
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +SERVER_KEY_2_PEM = resources.cert_hier_2_server_1_key()
 | 
	
		
			
				|  |  | +SERVER_CERT_CHAIN_2_PEM = (resources.cert_hier_2_server_1_cert() +
 | 
	
		
			
				|  |  | +                           resources.cert_hier_2_intermediate_ca_cert())
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +# for use with the CertConfigFetcher. Roughly a simple custom mock
 | 
	
		
			
				|  |  | +# implementation
 | 
	
		
			
				|  |  | +Call = collections.namedtuple('Call', ['did_raise', 'returned_cert_config'])
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def _create_client_stub(
 | 
	
		
			
				|  |  | +        port,
 | 
	
		
			
				|  |  | +        expect_success,
 | 
	
		
			
				|  |  | +        root_certificates=None,
 | 
	
		
			
				|  |  | +        private_key=None,
 | 
	
		
			
				|  |  | +        certificate_chain=None,):
 | 
	
		
			
				|  |  | +    channel = grpc.secure_channel('localhost:{}'.format(port),
 | 
	
		
			
				|  |  | +                                  grpc.ssl_channel_credentials(
 | 
	
		
			
				|  |  | +                                      root_certificates=root_certificates,
 | 
	
		
			
				|  |  | +                                      private_key=private_key,
 | 
	
		
			
				|  |  | +                                      certificate_chain=certificate_chain))
 | 
	
		
			
				|  |  | +    if expect_success:
 | 
	
		
			
				|  |  | +        # per Nathaniel: there's some robustness issue if we start
 | 
	
		
			
				|  |  | +        # using a channel without waiting for it to be actually ready
 | 
	
		
			
				|  |  | +        grpc.channel_ready_future(channel).result(timeout=10)
 | 
	
		
			
				|  |  | +    return services_pb2_grpc.FirstServiceStub(channel)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class CertConfigFetcher(object):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __init__(self):
 | 
	
		
			
				|  |  | +        self._lock = threading.Lock()
 | 
	
		
			
				|  |  | +        self._calls = []
 | 
	
		
			
				|  |  | +        self._should_raise = False
 | 
	
		
			
				|  |  | +        self._cert_config = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def reset(self):
 | 
	
		
			
				|  |  | +        with self._lock:
 | 
	
		
			
				|  |  | +            self._calls = []
 | 
	
		
			
				|  |  | +            self._should_raise = False
 | 
	
		
			
				|  |  | +            self._cert_config = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def configure(self, should_raise, cert_config):
 | 
	
		
			
				|  |  | +        assert not (should_raise and cert_config), (
 | 
	
		
			
				|  |  | +            "should not specify both should_raise and a cert_config at the same time"
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        with self._lock:
 | 
	
		
			
				|  |  | +            self._should_raise = should_raise
 | 
	
		
			
				|  |  | +            self._cert_config = cert_config
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def getCalls(self):
 | 
	
		
			
				|  |  | +        with self._lock:
 | 
	
		
			
				|  |  | +            return self._calls
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __call__(self):
 | 
	
		
			
				|  |  | +        with self._lock:
 | 
	
		
			
				|  |  | +            if self._should_raise:
 | 
	
		
			
				|  |  | +                self._calls.append(Call(True, None))
 | 
	
		
			
				|  |  | +                raise ValueError('just for fun, should not affect the test')
 | 
	
		
			
				|  |  | +            else:
 | 
	
		
			
				|  |  | +                self._calls.append(Call(False, self._cert_config))
 | 
	
		
			
				|  |  | +                return self._cert_config
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _ServerSSLCertReloadTest(
 | 
	
		
			
				|  |  | +        six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __init__(self, *args, **kwargs):
 | 
	
		
			
				|  |  | +        super(_ServerSSLCertReloadTest, self).__init__(*args, **kwargs)
 | 
	
		
			
				|  |  | +        self.server = None
 | 
	
		
			
				|  |  | +        self.port = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @abc.abstractmethod
 | 
	
		
			
				|  |  | +    def require_client_auth(self):
 | 
	
		
			
				|  |  | +        raise NotImplementedError()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def setUp(self):
 | 
	
		
			
				|  |  | +        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
 | 
	
		
			
				|  |  | +        services_pb2_grpc.add_FirstServiceServicer_to_server(
 | 
	
		
			
				|  |  | +            _server_application.FirstServiceServicer(), self.server)
 | 
	
		
			
				|  |  | +        switch_cert_on_client_num = 10
 | 
	
		
			
				|  |  | +        initial_cert_config = grpc.ssl_server_certificate_config(
 | 
	
		
			
				|  |  | +            [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM)
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher = CertConfigFetcher()
 | 
	
		
			
				|  |  | +        server_credentials = grpc.ssl_server_credentials_dynamic_cert_config(
 | 
	
		
			
				|  |  | +            initial_cert_config,
 | 
	
		
			
				|  |  | +            self.cert_config_fetcher,
 | 
	
		
			
				|  |  | +            require_client_auth=self.require_client_auth())
 | 
	
		
			
				|  |  | +        self.port = self.server.add_secure_port('[::]:0', server_credentials)
 | 
	
		
			
				|  |  | +        self.server.start()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def tearDown(self):
 | 
	
		
			
				|  |  | +        if self.server:
 | 
	
		
			
				|  |  | +            self.server.stop(None)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _perform_rpc(self, client_stub, expect_success):
 | 
	
		
			
				|  |  | +        # we don't care about the actual response of the rpc; only
 | 
	
		
			
				|  |  | +        # whether we can perform it or not, and if not, the status
 | 
	
		
			
				|  |  | +        # code must be UNAVAILABLE
 | 
	
		
			
				|  |  | +        request = _application_common.UNARY_UNARY_REQUEST
 | 
	
		
			
				|  |  | +        if expect_success:
 | 
	
		
			
				|  |  | +            response = client_stub.UnUn(request)
 | 
	
		
			
				|  |  | +            self.assertEqual(response, _application_common.UNARY_UNARY_RESPONSE)
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            with self.assertRaises(grpc.RpcError) as exception_context:
 | 
	
		
			
				|  |  | +                client_stub.UnUn(request)
 | 
	
		
			
				|  |  | +            self.assertEqual(exception_context.exception.code(),
 | 
	
		
			
				|  |  | +                             grpc.StatusCode.UNAVAILABLE)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _do_one_shot_client_rpc(self,
 | 
	
		
			
				|  |  | +                                expect_success,
 | 
	
		
			
				|  |  | +                                root_certificates=None,
 | 
	
		
			
				|  |  | +                                private_key=None,
 | 
	
		
			
				|  |  | +                                certificate_chain=None):
 | 
	
		
			
				|  |  | +        client_stub = _create_client_stub(
 | 
	
		
			
				|  |  | +            self.port,
 | 
	
		
			
				|  |  | +            expect_success,
 | 
	
		
			
				|  |  | +            root_certificates=root_certificates,
 | 
	
		
			
				|  |  | +            private_key=private_key,
 | 
	
		
			
				|  |  | +            certificate_chain=certificate_chain)
 | 
	
		
			
				|  |  | +        self._perform_rpc(client_stub, expect_success)
 | 
	
		
			
				|  |  | +        del client_stub
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _test(self):
 | 
	
		
			
				|  |  | +        # things should work...
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # client should reject server...
 | 
	
		
			
				|  |  | +        # fails because client trusts ca2 and so will reject server
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            False,
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # should work again...
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(True, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertTrue(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # if with_client_auth, then client should be rejected by
 | 
	
		
			
				|  |  | +        # server because client uses key/cert1, but server trusts ca2,
 | 
	
		
			
				|  |  | +        # so server will reject
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            not self.require_client_auth(),
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_1_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # should work again...
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # now create the "persistent" clients
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        persistent_client_stub_A = _create_client_stub(
 | 
	
		
			
				|  |  | +            self.port,
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        self._perform_rpc(persistent_client_stub_A, True)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        persistent_client_stub_B = _create_client_stub(
 | 
	
		
			
				|  |  | +            self.port,
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        self._perform_rpc(persistent_client_stub_B, True)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # moment of truth!! client should reject server because the
 | 
	
		
			
				|  |  | +        # server switch cert...
 | 
	
		
			
				|  |  | +        cert_config = grpc.ssl_server_certificate_config(
 | 
	
		
			
				|  |  | +            [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM)
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, cert_config)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            False,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertEqual(call.returned_cert_config, cert_config,
 | 
	
		
			
				|  |  | +                             'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # now should work again...
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_1_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertIsNone(actual_calls[0].returned_cert_config)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # client should be rejected by server if with_client_auth
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            not self.require_client_auth(),
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # here client should reject server...
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            False,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # persistent clients should continue to work
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._perform_rpc(persistent_client_stub_A, True)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 0)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, None)
 | 
	
		
			
				|  |  | +        self._perform_rpc(persistent_client_stub_B, True)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 0)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ServerSSLCertConfigFetcherParamsChecks(unittest.TestCase):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def test_check_on_initial_config(self):
 | 
	
		
			
				|  |  | +        with self.assertRaises(TypeError):
 | 
	
		
			
				|  |  | +            grpc.ssl_server_credentials_dynamic_cert_config(None, str)
 | 
	
		
			
				|  |  | +        with self.assertRaises(TypeError):
 | 
	
		
			
				|  |  | +            grpc.ssl_server_credentials_dynamic_cert_config(1, str)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def test_check_on_config_fetcher(self):
 | 
	
		
			
				|  |  | +        cert_config = grpc.ssl_server_certificate_config(
 | 
	
		
			
				|  |  | +            [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM)
 | 
	
		
			
				|  |  | +        with self.assertRaises(TypeError):
 | 
	
		
			
				|  |  | +            grpc.ssl_server_credentials_dynamic_cert_config(cert_config, None)
 | 
	
		
			
				|  |  | +        with self.assertRaises(TypeError):
 | 
	
		
			
				|  |  | +            grpc.ssl_server_credentials_dynamic_cert_config(cert_config, 1)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ServerSSLCertReloadTestWithClientAuth(_ServerSSLCertReloadTest):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def require_client_auth(self):
 | 
	
		
			
				|  |  | +        return True
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    test = _ServerSSLCertReloadTest._test
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ServerSSLCertReloadTestWithoutClientAuth(_ServerSSLCertReloadTest):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def require_client_auth(self):
 | 
	
		
			
				|  |  | +        return False
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    test = _ServerSSLCertReloadTest._test
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest):
 | 
	
		
			
				|  |  | +    """Ensures that `ServerCertificateConfig` instances can be reused.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Because C-core takes ownership of the
 | 
	
		
			
				|  |  | +    `grpc_ssl_server_certificate_config` encapsulated by
 | 
	
		
			
				|  |  | +    `ServerCertificateConfig`, this test reuses the same
 | 
	
		
			
				|  |  | +    `ServerCertificateConfig` instances multiple times to make sure
 | 
	
		
			
				|  |  | +    gRPC Python takes care of maintaining the validity of
 | 
	
		
			
				|  |  | +    `ServerCertificateConfig` instances, so that such instances can be
 | 
	
		
			
				|  |  | +    re-used by user application.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def require_client_auth(self):
 | 
	
		
			
				|  |  | +        return True
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def setUp(self):
 | 
	
		
			
				|  |  | +        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
 | 
	
		
			
				|  |  | +        services_pb2_grpc.add_FirstServiceServicer_to_server(
 | 
	
		
			
				|  |  | +            _server_application.FirstServiceServicer(), self.server)
 | 
	
		
			
				|  |  | +        self.cert_config_A = grpc.ssl_server_certificate_config(
 | 
	
		
			
				|  |  | +            [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM)
 | 
	
		
			
				|  |  | +        self.cert_config_B = grpc.ssl_server_certificate_config(
 | 
	
		
			
				|  |  | +            [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM)
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher = CertConfigFetcher()
 | 
	
		
			
				|  |  | +        server_credentials = grpc.ssl_server_credentials_dynamic_cert_config(
 | 
	
		
			
				|  |  | +            self.cert_config_A,
 | 
	
		
			
				|  |  | +            self.cert_config_fetcher,
 | 
	
		
			
				|  |  | +            require_client_auth=True)
 | 
	
		
			
				|  |  | +        self.port = self.server.add_secure_port('[::]:0', server_credentials)
 | 
	
		
			
				|  |  | +        self.server.start()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def test_cert_config_reuse(self):
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # succeed with A
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_A)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertEqual(actual_calls[0].returned_cert_config,
 | 
	
		
			
				|  |  | +                         self.cert_config_A)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # fail with A
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_A)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            False,
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_1_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertEqual(call.returned_cert_config, self.cert_config_A,
 | 
	
		
			
				|  |  | +                             'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # succeed again with A
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_A)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertEqual(actual_calls[0].returned_cert_config,
 | 
	
		
			
				|  |  | +                         self.cert_config_A)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # succeed with B
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_B)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_1_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertEqual(actual_calls[0].returned_cert_config,
 | 
	
		
			
				|  |  | +                         self.cert_config_B)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # fail with B
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_B)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            False,
 | 
	
		
			
				|  |  | +            root_certificates=CA_1_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_2_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertGreaterEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        for i, call in enumerate(actual_calls):
 | 
	
		
			
				|  |  | +            self.assertFalse(call.did_raise, 'i= {}'.format(i))
 | 
	
		
			
				|  |  | +            self.assertEqual(call.returned_cert_config, self.cert_config_B,
 | 
	
		
			
				|  |  | +                             'i= {}'.format(i))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # succeed again with B
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.reset()
 | 
	
		
			
				|  |  | +        self.cert_config_fetcher.configure(False, self.cert_config_B)
 | 
	
		
			
				|  |  | +        self._do_one_shot_client_rpc(
 | 
	
		
			
				|  |  | +            True,
 | 
	
		
			
				|  |  | +            root_certificates=CA_2_PEM,
 | 
	
		
			
				|  |  | +            private_key=CLIENT_KEY_1_PEM,
 | 
	
		
			
				|  |  | +            certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
 | 
	
		
			
				|  |  | +        actual_calls = self.cert_config_fetcher.getCalls()
 | 
	
		
			
				|  |  | +        self.assertEqual(len(actual_calls), 1)
 | 
	
		
			
				|  |  | +        self.assertFalse(actual_calls[0].did_raise)
 | 
	
		
			
				|  |  | +        self.assertEqual(actual_calls[0].returned_cert_config,
 | 
	
		
			
				|  |  | +                         self.cert_config_B)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +if __name__ == '__main__':
 | 
	
		
			
				|  |  | +    unittest.main(verbosity=2)
 |