_auth_context_test.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2017 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tests exposure of SSL auth context"""
  15. import pickle
  16. import unittest
  17. import grpc
  18. from grpc import _channel
  19. import six
  20. from tests.unit import test_common
  21. from tests.unit import resources
  22. _REQUEST = b'\x00\x00\x00'
  23. _RESPONSE = b'\x00\x00\x00'
  24. _UNARY_UNARY = '/test/UnaryUnary'
  25. _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
  26. _CLIENT_IDS = (b'*.test.google.fr', b'waterzooi.test.google.be',
  27. b'*.test.youtube.com', b'192.168.1.3',)
  28. _ID = 'id'
  29. _ID_KEY = 'id_key'
  30. _AUTH_CTX = 'auth_ctx'
  31. _PRIVATE_KEY = resources.private_key()
  32. _CERTIFICATE_CHAIN = resources.certificate_chain()
  33. _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
  34. _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
  35. _PROPERTY_OPTIONS = (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)
  36. def handle_unary_unary(request, servicer_context):
  37. return pickle.dumps({
  38. _ID: servicer_context.peer_identities(),
  39. _ID_KEY: servicer_context.peer_identity_key(),
  40. _AUTH_CTX: servicer_context.auth_context()
  41. })
  42. class AuthContextTest(unittest.TestCase):
  43. def testInsecure(self):
  44. handler = grpc.method_handlers_generic_handler('test', {
  45. 'UnaryUnary':
  46. grpc.unary_unary_rpc_method_handler(handle_unary_unary)
  47. })
  48. server = test_common.test_server()
  49. server.add_generic_rpc_handlers((handler,))
  50. port = server.add_insecure_port('[::]:0')
  51. server.start()
  52. channel = grpc.insecure_channel('localhost:%d' % port)
  53. response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
  54. server.stop(None)
  55. auth_data = pickle.loads(response)
  56. self.assertIsNone(auth_data[_ID])
  57. self.assertIsNone(auth_data[_ID_KEY])
  58. self.assertDictEqual({}, auth_data[_AUTH_CTX])
  59. def testSecureNoCert(self):
  60. handler = grpc.method_handlers_generic_handler('test', {
  61. 'UnaryUnary':
  62. grpc.unary_unary_rpc_method_handler(handle_unary_unary)
  63. })
  64. server = test_common.test_server()
  65. server.add_generic_rpc_handlers((handler,))
  66. server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
  67. port = server.add_secure_port('[::]:0', server_cred)
  68. server.start()
  69. channel_creds = grpc.ssl_channel_credentials(
  70. root_certificates=_TEST_ROOT_CERTIFICATES)
  71. channel = grpc.secure_channel(
  72. 'localhost:{}'.format(port),
  73. channel_creds,
  74. options=_PROPERTY_OPTIONS)
  75. response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
  76. server.stop(None)
  77. auth_data = pickle.loads(response)
  78. self.assertIsNone(auth_data[_ID])
  79. self.assertIsNone(auth_data[_ID_KEY])
  80. self.assertDictEqual({
  81. 'transport_security_type': [b'ssl']
  82. }, auth_data[_AUTH_CTX])
  83. def testSecureClientCert(self):
  84. handler = grpc.method_handlers_generic_handler('test', {
  85. 'UnaryUnary':
  86. grpc.unary_unary_rpc_method_handler(handle_unary_unary)
  87. })
  88. server = test_common.test_server()
  89. server.add_generic_rpc_handlers((handler,))
  90. server_cred = grpc.ssl_server_credentials(
  91. _SERVER_CERTS,
  92. root_certificates=_TEST_ROOT_CERTIFICATES,
  93. require_client_auth=True)
  94. port = server.add_secure_port('[::]:0', server_cred)
  95. server.start()
  96. channel_creds = grpc.ssl_channel_credentials(
  97. root_certificates=_TEST_ROOT_CERTIFICATES,
  98. private_key=_PRIVATE_KEY,
  99. certificate_chain=_CERTIFICATE_CHAIN)
  100. channel = grpc.secure_channel(
  101. 'localhost:{}'.format(port),
  102. channel_creds,
  103. options=_PROPERTY_OPTIONS)
  104. response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
  105. server.stop(None)
  106. auth_data = pickle.loads(response)
  107. auth_ctx = auth_data[_AUTH_CTX]
  108. six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
  109. self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
  110. self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
  111. self.assertSequenceEqual([b'*.test.google.com'],
  112. auth_ctx['x509_common_name'])
  113. if __name__ == '__main__':
  114. unittest.main(verbosity=2)