channel_test.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. # Copyright 2019 The gRPC Authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Tests behavior of the grpc.aio.Channel class."""
  15. import logging
  16. import os
  17. import unittest
  18. import grpc
  19. from grpc.experimental import aio
  20. from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
  21. from tests.unit.framework.common import test_constants
  22. from tests_aio.unit._constants import (UNARY_CALL_WITH_SLEEP_VALUE,
  23. UNREACHABLE_TARGET)
  24. from tests_aio.unit._test_base import AioTestBase
  25. from tests_aio.unit._test_server import start_test_server
  26. _UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall'
  27. _UNARY_CALL_METHOD_WITH_SLEEP = '/grpc.testing.TestService/UnaryCallWithSleep'
  28. _STREAMING_OUTPUT_CALL_METHOD = '/grpc.testing.TestService/StreamingOutputCall'
  29. _INVOCATION_METADATA = (
  30. ('x-grpc-test-echo-initial', 'initial-md-value'),
  31. ('x-grpc-test-echo-trailing-bin', b'\x00\x02'),
  32. )
  33. _NUM_STREAM_RESPONSES = 5
  34. _REQUEST_PAYLOAD_SIZE = 7
  35. _RESPONSE_PAYLOAD_SIZE = 42
  36. class TestChannel(AioTestBase):
  37. async def setUp(self):
  38. self._server_target, self._server = await start_test_server()
  39. async def tearDown(self):
  40. await self._server.stop(None)
  41. async def test_async_context(self):
  42. async with aio.insecure_channel(self._server_target) as channel:
  43. hi = channel.unary_unary(
  44. _UNARY_CALL_METHOD,
  45. request_serializer=messages_pb2.SimpleRequest.SerializeToString,
  46. response_deserializer=messages_pb2.SimpleResponse.FromString)
  47. await hi(messages_pb2.SimpleRequest())
  48. async def test_unary_unary(self):
  49. async with aio.insecure_channel(self._server_target) as channel:
  50. hi = channel.unary_unary(
  51. _UNARY_CALL_METHOD,
  52. request_serializer=messages_pb2.SimpleRequest.SerializeToString,
  53. response_deserializer=messages_pb2.SimpleResponse.FromString)
  54. response = await hi(messages_pb2.SimpleRequest())
  55. self.assertIsInstance(response, messages_pb2.SimpleResponse)
  56. async def test_unary_call_times_out(self):
  57. async with aio.insecure_channel(self._server_target) as channel:
  58. hi = channel.unary_unary(
  59. _UNARY_CALL_METHOD_WITH_SLEEP,
  60. request_serializer=messages_pb2.SimpleRequest.SerializeToString,
  61. response_deserializer=messages_pb2.SimpleResponse.FromString,
  62. )
  63. with self.assertRaises(grpc.RpcError) as exception_context:
  64. await hi(messages_pb2.SimpleRequest(),
  65. timeout=UNARY_CALL_WITH_SLEEP_VALUE / 2)
  66. _, details = grpc.StatusCode.DEADLINE_EXCEEDED.value # pylint: disable=unused-variable
  67. self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED,
  68. exception_context.exception.code())
  69. self.assertEqual(details.title(),
  70. exception_context.exception.details())
  71. self.assertIsNotNone(exception_context.exception.initial_metadata())
  72. self.assertIsNotNone(
  73. exception_context.exception.trailing_metadata())
  74. @unittest.skipIf(os.name == 'nt',
  75. 'TODO: https://github.com/grpc/grpc/issues/21658')
  76. async def test_unary_call_does_not_times_out(self):
  77. async with aio.insecure_channel(self._server_target) as channel:
  78. hi = channel.unary_unary(
  79. _UNARY_CALL_METHOD_WITH_SLEEP,
  80. request_serializer=messages_pb2.SimpleRequest.SerializeToString,
  81. response_deserializer=messages_pb2.SimpleResponse.FromString,
  82. )
  83. call = hi(messages_pb2.SimpleRequest(),
  84. timeout=UNARY_CALL_WITH_SLEEP_VALUE * 5)
  85. self.assertEqual(await call.code(), grpc.StatusCode.OK)
  86. async def test_unary_stream(self):
  87. channel = aio.insecure_channel(self._server_target)
  88. stub = test_pb2_grpc.TestServiceStub(channel)
  89. # Prepares the request
  90. request = messages_pb2.StreamingOutputCallRequest()
  91. for _ in range(_NUM_STREAM_RESPONSES):
  92. request.response_parameters.append(
  93. messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
  94. # Invokes the actual RPC
  95. call = stub.StreamingOutputCall(request)
  96. # Validates the responses
  97. response_cnt = 0
  98. async for response in call:
  99. response_cnt += 1
  100. self.assertIs(type(response),
  101. messages_pb2.StreamingOutputCallResponse)
  102. self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
  103. self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
  104. self.assertEqual(await call.code(), grpc.StatusCode.OK)
  105. await channel.close()
  106. async def test_stream_unary_using_write(self):
  107. channel = aio.insecure_channel(self._server_target)
  108. stub = test_pb2_grpc.TestServiceStub(channel)
  109. # Invokes the actual RPC
  110. call = stub.StreamingInputCall()
  111. # Prepares the request
  112. payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
  113. request = messages_pb2.StreamingInputCallRequest(payload=payload)
  114. # Sends out requests
  115. for _ in range(_NUM_STREAM_RESPONSES):
  116. await call.write(request)
  117. await call.done_writing()
  118. # Validates the responses
  119. response = await call
  120. self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
  121. self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
  122. response.aggregated_payload_size)
  123. self.assertEqual(await call.code(), grpc.StatusCode.OK)
  124. await channel.close()
  125. async def test_stream_unary_using_async_gen(self):
  126. channel = aio.insecure_channel(self._server_target)
  127. stub = test_pb2_grpc.TestServiceStub(channel)
  128. # Prepares the request
  129. payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
  130. request = messages_pb2.StreamingInputCallRequest(payload=payload)
  131. async def gen():
  132. for _ in range(_NUM_STREAM_RESPONSES):
  133. yield request
  134. # Invokes the actual RPC
  135. call = stub.StreamingInputCall(gen())
  136. # Validates the responses
  137. response = await call
  138. self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
  139. self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
  140. response.aggregated_payload_size)
  141. self.assertEqual(await call.code(), grpc.StatusCode.OK)
  142. await channel.close()
  143. async def test_stream_stream_using_read_write(self):
  144. channel = aio.insecure_channel(self._server_target)
  145. stub = test_pb2_grpc.TestServiceStub(channel)
  146. # Invokes the actual RPC
  147. call = stub.FullDuplexCall()
  148. # Prepares the request
  149. request = messages_pb2.StreamingOutputCallRequest()
  150. request.response_parameters.append(
  151. messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
  152. for _ in range(_NUM_STREAM_RESPONSES):
  153. await call.write(request)
  154. response = await call.read()
  155. self.assertIsInstance(response,
  156. messages_pb2.StreamingOutputCallResponse)
  157. self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
  158. await call.done_writing()
  159. self.assertEqual(grpc.StatusCode.OK, await call.code())
  160. await channel.close()
  161. async def test_stream_stream_using_async_gen(self):
  162. channel = aio.insecure_channel(self._server_target)
  163. stub = test_pb2_grpc.TestServiceStub(channel)
  164. # Prepares the request
  165. request = messages_pb2.StreamingOutputCallRequest()
  166. request.response_parameters.append(
  167. messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
  168. async def gen():
  169. for _ in range(_NUM_STREAM_RESPONSES):
  170. yield request
  171. # Invokes the actual RPC
  172. call = stub.FullDuplexCall(gen())
  173. async for response in call:
  174. self.assertIsInstance(response,
  175. messages_pb2.StreamingOutputCallResponse)
  176. self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
  177. self.assertEqual(grpc.StatusCode.OK, await call.code())
  178. await channel.close()
  179. if __name__ == '__main__':
  180. logging.basicConfig(level=logging.INFO)
  181. unittest.main(verbosity=2)