| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 | # Copyright 2015, Google Inc.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are# met:##     * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.#     * Redistributions in binary form must reproduce the above# copyright notice, this list of conditions and the following disclaimer# in the documentation and/or other materials provided with the# distribution.#     * Neither the name of Google Inc. nor the names of its# contributors may be used to endorse or promote products derived from# this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.import argparseimport contextlibimport errnoimport itertoolsimport osimport shutilimport subprocessimport sysimport tempfileimport threadingimport timeimport unittestfrom grpc.framework.alpha import exceptionsfrom grpc.framework.foundation import future# Identifiers of entities we expect to find in the generated module.SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'# The timeout used in tests of RPCs that are supposed to expire.SHORT_TIMEOUT = 2# The timeout used in tests of RPCs that are not supposed to expire. The# absurdly large value doesn't matter since no passing execution of this test# module will ever wait the duration.LONG_TIMEOUT = 600NO_DELAY = 0# Build mode environment variable set by tools/run_tests/run_tests.py._build_mode = os.environ['CONFIG']class _ServicerMethods(object):  def __init__(self, test_pb2, delay):    self._condition = threading.Condition()    self._delay = delay    self._paused = False    self._fail = False    self._test_pb2 = test_pb2  @contextlib.contextmanager  def pause(self):  # pylint: disable=invalid-name    with self._condition:      self._paused = True    yield    with self._condition:      self._paused = False      self._condition.notify_all()  @contextlib.contextmanager  def fail(self):  # pylint: disable=invalid-name    with self._condition:      self._fail = True    yield    with self._condition:      self._fail = False  def _control(self):  # pylint: disable=invalid-name    with self._condition:      if self._fail:        raise ValueError()      while self._paused:        self._condition.wait()    time.sleep(self._delay)  def UnaryCall(self, request, unused_rpc_context):    response = self._test_pb2.SimpleResponse()    response.payload.payload_type = self._test_pb2.COMPRESSABLE    response.payload.payload_compressable = 'a' * request.response_size    self._control()    return response  def StreamingOutputCall(self, request, unused_rpc_context):    for parameter in request.response_parameters:      response = self._test_pb2.StreamingOutputCallResponse()      response.payload.payload_type = self._test_pb2.COMPRESSABLE      response.payload.payload_compressable = 'a' * parameter.size      self._control()      yield response  def StreamingInputCall(self, request_iter, unused_rpc_context):    response = self._test_pb2.StreamingInputCallResponse()    aggregated_payload_size = 0    for request in request_iter:      aggregated_payload_size += len(request.payload.payload_compressable)    response.aggregated_payload_size = aggregated_payload_size    self._control()    return response  def FullDuplexCall(self, request_iter, unused_rpc_context):    for request in request_iter:      for parameter in request.response_parameters:        response = self._test_pb2.StreamingOutputCallResponse()        response.payload.payload_type = self._test_pb2.COMPRESSABLE        response.payload.payload_compressable = 'a' * parameter.size        self._control()        yield response  def HalfDuplexCall(self, request_iter, unused_rpc_context):    responses = []    for request in request_iter:      for parameter in request.response_parameters:        response = self._test_pb2.StreamingOutputCallResponse()        response.payload.payload_type = self._test_pb2.COMPRESSABLE        response.payload.payload_compressable = 'a' * parameter.size        self._control()        responses.append(response)    for response in responses:      yield response@contextlib.contextmanagerdef _CreateService(test_pb2, delay):  """Provides a servicer backend and a stub.  The servicer is just the implementation  of the actual servicer passed to the face player of the python RPC  implementation; the two are detached.  Non-zero delay puts a delay on each call to the servicer, representative of  communication latency. Timeout is the default timeout for the stub while  waiting for the service.  Args:    test_pb2: The test_pb2 module generated by this test.    delay: Delay in seconds per response from the servicer.  Yields:    A (servicer_methods, servicer, stub) three-tuple where servicer_methods is      the back-end of the service bound to the stub and the server and stub      are both activated and ready for use.  """  servicer_methods = _ServicerMethods(test_pb2, delay)  class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):    def UnaryCall(self, request, context):      return servicer_methods.UnaryCall(request, context)    def StreamingOutputCall(self, request, context):      return servicer_methods.StreamingOutputCall(request, context)    def StreamingInputCall(self, request_iter, context):      return servicer_methods.StreamingInputCall(request_iter, context)    def FullDuplexCall(self, request_iter, context):      return servicer_methods.FullDuplexCall(request_iter, context)    def HalfDuplexCall(self, request_iter, context):      return servicer_methods.HalfDuplexCall(request_iter, context)  servicer = Servicer()  server = getattr(      test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)  with server:    port = server.port()    stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)    with stub:      yield servicer_methods, stub, serverdef _streaming_input_request_iterator(test_pb2):  for _ in range(3):    request = test_pb2.StreamingInputCallRequest()    request.payload.payload_type = test_pb2.COMPRESSABLE    request.payload.payload_compressable = 'a'    yield requestdef _streaming_output_request(test_pb2):  request = test_pb2.StreamingOutputCallRequest()  sizes = [1, 2, 3]  request.response_parameters.add(size=sizes[0], interval_us=0)  request.response_parameters.add(size=sizes[1], interval_us=0)  request.response_parameters.add(size=sizes[2], interval_us=0)  return requestdef _full_duplex_request_iterator(test_pb2):  request = test_pb2.StreamingOutputCallRequest()  request.response_parameters.add(size=1, interval_us=0)  yield request  request = test_pb2.StreamingOutputCallRequest()  request.response_parameters.add(size=2, interval_us=0)  request.response_parameters.add(size=3, interval_us=0)  yield requestclass PythonPluginTest(unittest.TestCase):  """Test case for the gRPC Python protoc-plugin.  While reading these tests, remember that the futures API  (`stub.method.async()`) only gives futures for the *non-streaming* responses,  else it behaves like its blocking cousin.  """  def setUp(self):    protoc_command = '../../bins/%s/protobuf/protoc' % _build_mode    protoc_plugin_filename = '../../bins/%s/grpc_python_plugin' % _build_mode    test_proto_filename = './test.proto'    if not os.path.isfile(protoc_command):      # Assume that if we haven't built protoc that it's on the system.      protoc_command = 'protoc'    # Ensure that the output directory exists.    self.outdir = tempfile.mkdtemp()    # Invoke protoc with the plugin.    cmd = [        protoc_command,        '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,        '-I %s' % os.path.dirname(test_proto_filename),        '--python_out=%s' % self.outdir,        '--python-grpc_out=%s' % self.outdir,        os.path.basename(test_proto_filename),    ]    subprocess.call(' '.join(cmd), shell=True)    sys.path.append(self.outdir)  def tearDown(self):    try:      shutil.rmtree(self.outdir)    except OSError as exc:      if exc.errno != errno.ENOENT:        raise  # TODO(atash): Figure out which of these tests is hanging flakily with small  # probability.  def testImportAttributes(self):    # check that we can access the generated module and its members.    import test_pb2  # pylint: disable=g-import-not-at-top    self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))    self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))    self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))    self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))    self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))  def testUpDown(self):    import test_pb2    with _CreateService(        test_pb2, NO_DELAY) as (servicer, stub, unused_server):      request = test_pb2.SimpleRequest(response_size=13)  def testUnaryCall(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      timeout = 6  # TODO(issue 2039): LONG_TIMEOUT like the other methods.      request = test_pb2.SimpleRequest(response_size=13)      response = stub.UnaryCall(request, timeout)    expected_response = methods.UnaryCall(request, 'not a real RpcContext!')    self.assertEqual(expected_response, response)  def testUnaryCallAsync(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = test_pb2.SimpleRequest(response_size=13)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      # Check that the call does not block waiting for the server to respond.      with methods.pause():        response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)      response = response_future.result()    expected_response = methods.UnaryCall(request, 'not a real RpcContext!')    self.assertEqual(expected_response, response)  def testUnaryCallAsyncExpired(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      request = test_pb2.SimpleRequest(response_size=13)      with methods.pause():        response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)        with self.assertRaises(exceptions.ExpirationError):          response_future.result()  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testUnaryCallAsyncCancelled(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = test_pb2.SimpleRequest(response_size=13)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        response_future = stub.UnaryCall.async(request, 1)        response_future.cancel()        self.assertTrue(response_future.cancelled())  def testUnaryCallAsyncFailed(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = test_pb2.SimpleRequest(response_size=13)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.fail():        response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)        self.assertIsNotNone(response_future.exception())  def testStreamingOutputCall(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = _streaming_output_request(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)      expected_responses = methods.StreamingOutputCall(          request, 'not a real RpcContext!')      for expected_response, response in itertools.izip_longest(          expected_responses, responses):        self.assertEqual(expected_response, response)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testStreamingOutputCallExpired(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = _streaming_output_request(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)        with self.assertRaises(exceptions.ExpirationError):          list(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testStreamingOutputCallCancelled(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = _streaming_output_request(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (        unused_methods, stub, unused_server):      responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)      next(responses)      responses.cancel()      with self.assertRaises(future.CancelledError):        next(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this times out '                 'instead of raising the proper error.')  def testStreamingOutputCallFailed(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request = _streaming_output_request(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.fail():        responses = stub.StreamingOutputCall(request, 1)        self.assertIsNotNone(responses)        with self.assertRaises(exceptions.ServicerError):          next(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testStreamingInputCall(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      response = stub.StreamingInputCall(          _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)    expected_response = methods.StreamingInputCall(        _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')    self.assertEqual(expected_response, response)  def testStreamingInputCallAsync(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        response_future = stub.StreamingInputCall.async(            _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)      response = response_future.result()    expected_response = methods.StreamingInputCall(        _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')    self.assertEqual(expected_response, response)  def testStreamingInputCallAsyncExpired(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        response_future = stub.StreamingInputCall.async(            _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)        with self.assertRaises(exceptions.ExpirationError):          response_future.result()        self.assertIsInstance(            response_future.exception(), exceptions.ExpirationError)  def testStreamingInputCallAsyncCancelled(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        timeout = 6  # TODO(issue 2039): LONG_TIMEOUT like the other methods.        response_future = stub.StreamingInputCall.async(            _streaming_input_request_iterator(test_pb2), timeout)        response_future.cancel()        self.assertTrue(response_future.cancelled())      with self.assertRaises(future.CancelledError):        response_future.result()  def testStreamingInputCallAsyncFailed(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.fail():        response_future = stub.StreamingInputCall.async(            _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)        self.assertIsNotNone(response_future.exception())  def testFullDuplexCall(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      responses = stub.FullDuplexCall(          _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)      expected_responses = methods.FullDuplexCall(          _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')      for expected_response, response in itertools.izip_longest(          expected_responses, responses):        self.assertEqual(expected_response, response)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testFullDuplexCallExpired(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request_iterator = _full_duplex_request_iterator(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.pause():        responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)        with self.assertRaises(exceptions.ExpirationError):          list(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testFullDuplexCallCancelled(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      request_iterator = _full_duplex_request_iterator(test_pb2)      responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)      next(responses)      responses.cancel()      with self.assertRaises(future.CancelledError):        next(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '                 'and fix.')  def testFullDuplexCallFailed(self):    import test_pb2  # pylint: disable=g-import-not-at-top    request_iterator = _full_duplex_request_iterator(test_pb2)    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      with methods.fail():        responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)        self.assertIsNotNone(responses)        with self.assertRaises(exceptions.ServicerError):          next(responses)  @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '                 'forever and fix.')  def testHalfDuplexCall(self):    import test_pb2  # pylint: disable=g-import-not-at-top    with _CreateService(test_pb2, NO_DELAY) as (        methods, stub, unused_server):      def half_duplex_request_iterator():        request = test_pb2.StreamingOutputCallRequest()        request.response_parameters.add(size=1, interval_us=0)        yield request        request = test_pb2.StreamingOutputCallRequest()        request.response_parameters.add(size=2, interval_us=0)        request.response_parameters.add(size=3, interval_us=0)        yield request      responses = stub.HalfDuplexCall(          half_duplex_request_iterator(), LONG_TIMEOUT)      expected_responses = methods.HalfDuplexCall(          half_duplex_request_iterator(), 'not a real RpcContext!')      for check in itertools.izip_longest(expected_responses, responses):        expected_response, response = check        self.assertEqual(expected_response, response)  def testHalfDuplexCallWedged(self):    import test_pb2  # pylint: disable=g-import-not-at-top    condition = threading.Condition()    wait_cell = [False]    @contextlib.contextmanager    def wait():  # pylint: disable=invalid-name      # Where's Python 3's 'nonlocal' statement when you need it?      with condition:        wait_cell[0] = True      yield      with condition:        wait_cell[0] = False        condition.notify_all()    def half_duplex_request_iterator():      request = test_pb2.StreamingOutputCallRequest()      request.response_parameters.add(size=1, interval_us=0)      yield request      with condition:        while wait_cell[0]:          condition.wait()    with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):      with wait():        responses = stub.HalfDuplexCall(            half_duplex_request_iterator(), SHORT_TIMEOUT)        # half-duplex waits for the client to send all info        with self.assertRaises(exceptions.ExpirationError):          next(responses)if __name__ == '__main__':  os.chdir(os.path.dirname(sys.argv[0]))  unittest.main(verbosity=2)
 |