methods.py 15 KB


  1. # Copyright 2015, Google Inc.
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are
  6. # met:
  7. #
  8. # * Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # * Redistributions in binary form must reproduce the above
  11. # copyright notice, this list of conditions and the following disclaimer
  12. # in the documentation and/or other materials provided with the
  13. # distribution.
  14. # * Neither the name of Google Inc. nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. """Implementations of interoperability test methods."""
  30. import enum
  31. import json
  32. import os
  33. import threading
  34. import time
  35. from oauth2client import client as oauth2client_client
  36. from grpc.framework.alpha import utilities
  37. from grpc.framework.alpha import exceptions
  38. from grpc_interop import empty_pb2
  39. from grpc_interop import messages_pb2
  40. _TIMEOUT = 7
  41. def _empty_call(request, unused_context):
  42. return empty_pb2.Empty()
  43. _CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description(
  44. empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString)
  45. _SERVER_EMPTY_CALL = utilities.unary_unary_service_description(
  46. _empty_call, empty_pb2.Empty.FromString,
  47. empty_pb2.Empty.SerializeToString)
  48. def _unary_call(request, unused_context):
  49. return messages_pb2.SimpleResponse(
  50. payload=messages_pb2.Payload(
  51. type=messages_pb2.COMPRESSABLE,
  52. body=b'\x00' * request.response_size))
  53. _CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description(
  54. messages_pb2.SimpleRequest.SerializeToString,
  55. messages_pb2.SimpleResponse.FromString)
  56. _SERVER_UNARY_CALL = utilities.unary_unary_service_description(
  57. _unary_call, messages_pb2.SimpleRequest.FromString,
  58. messages_pb2.SimpleResponse.SerializeToString)
  59. def _streaming_output_call(request, unused_context):
  60. for response_parameters in request.response_parameters:
  61. yield messages_pb2.StreamingOutputCallResponse(
  62. payload=messages_pb2.Payload(
  63. type=request.response_type,
  64. body=b'\x00' * response_parameters.size))
  65. _CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description(
  66. messages_pb2.StreamingOutputCallRequest.SerializeToString,
  67. messages_pb2.StreamingOutputCallResponse.FromString)
  68. _SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description(
  69. _streaming_output_call,
  70. messages_pb2.StreamingOutputCallRequest.FromString,
  71. messages_pb2.StreamingOutputCallResponse.SerializeToString)
  72. def _streaming_input_call(request_iterator, unused_context):
  73. aggregate_size = 0
  74. for request in request_iterator:
  75. if request.payload and request.payload.body:
  76. aggregate_size += len(request.payload.body)
  77. return messages_pb2.StreamingInputCallResponse(
  78. aggregated_payload_size=aggregate_size)
  79. _CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description(
  80. messages_pb2.StreamingInputCallRequest.SerializeToString,
  81. messages_pb2.StreamingInputCallResponse.FromString)
  82. _SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description(
  83. _streaming_input_call,
  84. messages_pb2.StreamingInputCallRequest.FromString,
  85. messages_pb2.StreamingInputCallResponse.SerializeToString)
  86. def _full_duplex_call(request_iterator, unused_context):
  87. for request in request_iterator:
  88. yield messages_pb2.StreamingOutputCallResponse(
  89. payload=messages_pb2.Payload(
  90. type=request.payload.type,
  91. body=b'\x00' * request.response_parameters[0].size))
  92. _CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description(
  93. messages_pb2.StreamingOutputCallRequest.SerializeToString,
  94. messages_pb2.StreamingOutputCallResponse.FromString)
  95. _SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description(
  96. _full_duplex_call,
  97. messages_pb2.StreamingOutputCallRequest.FromString,
  98. messages_pb2.StreamingOutputCallResponse.SerializeToString)
  99. # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
  100. _CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description(
  101. messages_pb2.StreamingOutputCallRequest.SerializeToString,
  102. messages_pb2.StreamingOutputCallResponse.FromString)
  103. _SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description(
  104. _full_duplex_call,
  105. messages_pb2.StreamingOutputCallRequest.FromString,
  106. messages_pb2.StreamingOutputCallResponse.SerializeToString)
  107. SERVICE_NAME = 'grpc.testing.TestService'
  108. _EMPTY_CALL_METHOD_NAME = 'EmptyCall'
  109. _UNARY_CALL_METHOD_NAME = 'UnaryCall'
  110. _STREAMING_OUTPUT_CALL_METHOD_NAME = 'StreamingOutputCall'
  111. _STREAMING_INPUT_CALL_METHOD_NAME = 'StreamingInputCall'
  112. _FULL_DUPLEX_CALL_METHOD_NAME = 'FullDuplexCall'
  113. _HALF_DUPLEX_CALL_METHOD_NAME = 'HalfDuplexCall'
  114. CLIENT_METHODS = {
  115. _EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
  116. _UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
  117. _STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
  118. _STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
  119. _FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
  120. _HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
  121. }
  122. SERVER_METHODS = {
  123. _EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
  124. _UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
  125. _STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
  126. _STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
  127. _FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
  128. _HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
  129. }
  130. def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope):
  131. with stub:
  132. request = messages_pb2.SimpleRequest(
  133. response_type=messages_pb2.COMPRESSABLE, response_size=314159,
  134. payload=messages_pb2.Payload(body=b'\x00' * 271828),
  135. fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
  136. response_future = stub.UnaryCall.async(request, _TIMEOUT)
  137. response = response_future.result()
  138. if response.payload.type is not messages_pb2.COMPRESSABLE:
  139. raise ValueError(
  140. 'response payload type is "%s"!' % type(response.payload.type))
  141. if len(response.payload.body) != 314159:
  142. raise ValueError(
  143. 'response body of incorrect size %d!' % len(response.payload.body))
  144. return response
  145. def _empty_unary(stub):
  146. with stub:
  147. response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
  148. if not isinstance(response, empty_pb2.Empty):
  149. raise TypeError(
  150. 'response is of type "%s", not empty_pb2.Empty!', type(response))
  151. def _large_unary(stub):
  152. _large_unary_common_behavior(stub, False, False)
  153. def _client_streaming(stub):
  154. with stub:
  155. payload_body_sizes = (27182, 8, 1828, 45904)
  156. payloads = (
  157. messages_pb2.Payload(body=b'\x00' * size)
  158. for size in payload_body_sizes)
  159. requests = (
  160. messages_pb2.StreamingInputCallRequest(payload=payload)
  161. for payload in payloads)
  162. response = stub.StreamingInputCall(requests, _TIMEOUT)
  163. if response.aggregated_payload_size != 74922:
  164. raise ValueError(
  165. 'incorrect size %d!' % response.aggregated_payload_size)
  166. def _server_streaming(stub):
  167. sizes = (31415, 9, 2653, 58979)
  168. with stub:
  169. request = messages_pb2.StreamingOutputCallRequest(
  170. response_type=messages_pb2.COMPRESSABLE,
  171. response_parameters=(
  172. messages_pb2.ResponseParameters(size=sizes[0]),
  173. messages_pb2.ResponseParameters(size=sizes[1]),
  174. messages_pb2.ResponseParameters(size=sizes[2]),
  175. messages_pb2.ResponseParameters(size=sizes[3]),
  176. ))
  177. response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
  178. for index, response in enumerate(response_iterator):
  179. if response.payload.type != messages_pb2.COMPRESSABLE:
  180. raise ValueError(
  181. 'response body of invalid type %s!' % response.payload.type)
  182. if len(response.payload.body) != sizes[index]:
  183. raise ValueError(
  184. 'response body of invalid size %d!' % len(response.payload.body))
  185. def _cancel_after_begin(stub):
  186. with stub:
  187. sizes = (27182, 8, 1828, 45904)
  188. payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
  189. requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
  190. for payload in payloads]
  191. responses = stub.StreamingInputCall.async(requests, _TIMEOUT)
  192. responses.cancel()
  193. if not responses.cancelled():
  194. raise ValueError('expected call to be cancelled')
  195. class _Pipe(object):
  196. def __init__(self):
  197. self._condition = threading.Condition()
  198. self._values = []
  199. self._open = True
  200. def __iter__(self):
  201. return self
  202. def next(self):
  203. with self._condition:
  204. while not self._values and self._open:
  205. self._condition.wait()
  206. if self._values:
  207. return self._values.pop(0)
  208. else:
  209. raise StopIteration()
  210. def add(self, value):
  211. with self._condition:
  212. self._values.append(value)
  213. self._condition.notify()
  214. def close(self):
  215. with self._condition:
  216. self._open = False
  217. self._condition.notify()
  218. def __enter__(self):
  219. return self
  220. def __exit__(self, type, value, traceback):
  221. self.close()
  222. def _ping_pong(stub):
  223. request_response_sizes = (31415, 9, 2653, 58979)
  224. request_payload_sizes = (27182, 8, 1828, 45904)
  225. with stub, _Pipe() as pipe:
  226. response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
  227. print 'Starting ping-pong with response iterator %s' % response_iterator
  228. for response_size, payload_size in zip(
  229. request_response_sizes, request_payload_sizes):
  230. request = messages_pb2.StreamingOutputCallRequest(
  231. response_type=messages_pb2.COMPRESSABLE,
  232. response_parameters=(messages_pb2.ResponseParameters(
  233. size=response_size),),
  234. payload=messages_pb2.Payload(body=b'\x00' * payload_size))
  235. pipe.add(request)
  236. response = next(response_iterator)
  237. if response.payload.type != messages_pb2.COMPRESSABLE:
  238. raise ValueError(
  239. 'response body of invalid type %s!' % response.payload.type)
  240. if len(response.payload.body) != response_size:
  241. raise ValueError(
  242. 'response body of invalid size %d!' % len(response.payload.body))
  243. def _cancel_after_first_response(stub):
  244. request_response_sizes = (31415, 9, 2653, 58979)
  245. request_payload_sizes = (27182, 8, 1828, 45904)
  246. with stub, _Pipe() as pipe:
  247. response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
  248. response_size = request_response_sizes[0]
  249. payload_size = request_payload_sizes[0]
  250. request = messages_pb2.StreamingOutputCallRequest(
  251. response_type=messages_pb2.COMPRESSABLE,
  252. response_parameters=(messages_pb2.ResponseParameters(
  253. size=response_size),),
  254. payload=messages_pb2.Payload(body=b'\x00' * payload_size))
  255. pipe.add(request)
  256. response = next(response_iterator)
  257. # We test the contents of `response` in the Ping Pong test - don't check
  258. # them here.
  259. response_iterator.cancel()
  260. try:
  261. next(response_iterator)
  262. except Exception:
  263. pass
  264. else:
  265. raise ValueError('expected call to be cancelled')
  266. def _timeout_on_sleeping_server(stub):
  267. request_payload_size = 27182
  268. with stub, _Pipe() as pipe:
  269. response_iterator = stub.FullDuplexCall(pipe, 0.001)
  270. request = messages_pb2.StreamingOutputCallRequest(
  271. response_type=messages_pb2.COMPRESSABLE,
  272. payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
  273. pipe.add(request)
  274. time.sleep(0.1)
  275. try:
  276. next(response_iterator)
  277. except exceptions.ExpirationError:
  278. pass
  279. else:
  280. raise ValueError('expected call to exceed deadline')
  281. def _compute_engine_creds(stub, args):
  282. response = _large_unary_common_behavior(stub, True, True)
  283. if args.default_service_account != response.username:
  284. raise ValueError(
  285. 'expected username %s, got %s' % (args.default_service_account,
  286. response.username))
  287. def _service_account_creds(stub, args):
  288. json_key_filename = os.environ[
  289. oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
  290. wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
  291. response = _large_unary_common_behavior(stub, True, True)
  292. if wanted_email != response.username:
  293. raise ValueError(
  294. 'expected username %s, got %s' % (wanted_email, response.username))
  295. if args.oauth_scope.find(response.oauth_scope) == -1:
  296. raise ValueError(
  297. 'expected to find oauth scope "%s" in received "%s"' %
  298. (response.oauth_scope, args.oauth_scope))
  299. @enum.unique
  300. class TestCase(enum.Enum):
  301. EMPTY_UNARY = 'empty_unary'
  302. LARGE_UNARY = 'large_unary'
  303. SERVER_STREAMING = 'server_streaming'
  304. CLIENT_STREAMING = 'client_streaming'
  305. PING_PONG = 'ping_pong'
  306. CANCEL_AFTER_BEGIN = 'cancel_after_begin'
  307. CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
  308. COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
  309. SERVICE_ACCOUNT_CREDS = 'service_account_creds'
  310. TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
  311. def test_interoperability(self, stub, args):
  312. if self is TestCase.EMPTY_UNARY:
  313. _empty_unary(stub)
  314. elif self is TestCase.LARGE_UNARY:
  315. _large_unary(stub)
  316. elif self is TestCase.SERVER_STREAMING:
  317. _server_streaming(stub)
  318. elif self is TestCase.CLIENT_STREAMING:
  319. _client_streaming(stub)
  320. elif self is TestCase.PING_PONG:
  321. _ping_pong(stub)
  322. elif self is TestCase.CANCEL_AFTER_BEGIN:
  323. _cancel_after_begin(stub)
  324. elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
  325. _cancel_after_first_response(stub)
  326. elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
  327. _timeout_on_sleeping_server(stub)
  328. elif self is TestCase.COMPUTE_ENGINE_CREDS:
  329. _compute_engine_creds(stub, args)
  330. elif self is TestCase.SERVICE_ACCOUNT_CREDS:
  331. _service_account_creds(stub, args)
  332. else:
  333. raise NotImplementedError('Test case "%s" not implemented!' % self.name)