Source code for grpc.framework.interfaces.links.test_cases

# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Tests of the links interface of RPC Framework."""

# unittest is referenced from specification in this module.
import abc
import unittest  # pylint: disable=unused-import

from grpc.framework.common import test_constants
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import test_utilities


[docs]def at_least_n_payloads_received_predicate(n): def predicate(ticket_sequence): payload_count = 0 for ticket in ticket_sequence: if ticket.payload is not None: payload_count += 1 if n <= payload_count: return True else: return False return predicate
[docs]def terminated(ticket_sequence): return ticket_sequence and ticket_sequence[-1].termination is not None
_TRANSMISSION_GROUP = 'test.Group' _TRANSMISSION_METHOD = 'TestMethod'
[docs]class TransmissionTest(object): """Tests ticket transmission between two connected links. This class must be mixed into a unittest.TestCase that implements the abstract methods it provides. """ __metaclass__ = abc.ABCMeta # This is a unittest.TestCase mix-in. # pylint: disable=invalid-name @abc.abstractmethod @abc.abstractmethod @abc.abstractmethod
[docs] def create_invocation_initial_metadata(self): """Creates a value for use as invocation-side initial metadata. Returns: A metadata value appropriate for use as invocation-side initial metadata or None if invocation-side initial metadata transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def create_invocation_terminal_metadata(self): """Creates a value for use as invocation-side terminal metadata. Returns: A metadata value appropriate for use as invocation-side terminal metadata or None if invocation-side terminal metadata transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def create_service_initial_metadata(self): """Creates a value for use as service-side initial metadata. Returns: A metadata value appropriate for use as service-side initial metadata or None if service-side initial metadata transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def create_service_terminal_metadata(self): """Creates a value for use as service-side terminal metadata. Returns: A metadata value appropriate for use as service-side terminal metadata or None if service-side terminal metadata transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def create_invocation_completion(self): """Creates values for use as invocation-side code and message. Returns: An invocation-side code value and an invocation-side message value. Either or both may be None if invocation-side code and/or invocation-side message transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def create_service_completion(self): """Creates values for use as service-side code and message. Returns: A service-side code value and a service-side message value. Either or both may be None if service-side code and/or service-side message transmission is not supported by the links under test. """ raise NotImplementedError()
@abc.abstractmethod
[docs] def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): """Asserts that transmitted_metadata contains original_metadata. Args: original_metadata: A metadata object used in this test. transmitted_metadata: A metadata object obtained after transmission through the system under test. Raises: AssertionError: if the transmitted_metadata object does not contain original_metadata. """ raise NotImplementedError()
[docs] def group_and_method(self): """Returns the group and method used in this test case. Returns: A pair of the group and method used in this test case. """ return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD
[docs] def serialize_request(self, request): """Serializes a request value used in this test case. Args: request: A request value created by this test case. Returns: A bytestring that is the serialization of the given request. """ return request
[docs] def deserialize_request(self, serialized_request): """Deserializes a request value used in this test case. Args: serialized_request: A bytestring that is the serialization of some request used in this test case. Returns: The request value encoded by the given bytestring. """ return serialized_request
[docs] def serialize_response(self, response): """Serializes a response value used in this test case. Args: response: A response value created by this test case. Returns: A bytestring that is the serialization of the given response. """ return response
[docs] def deserialize_response(self, serialized_response): """Deserializes a response value used in this test case. Args: serialized_response: A bytestring that is the serialization of some response used in this test case. Returns: The response value encoded by the given bytestring. """ return serialized_response
def _assert_is_valid_metadata_payload_sequence( self, ticket_sequence, payloads, initial_metadata, terminal_metadata): initial_metadata_seen = False seen_payloads = [] terminal_metadata_seen = False for ticket in ticket_sequence: if ticket.initial_metadata is not None: self.assertFalse(initial_metadata_seen) self.assertFalse(seen_payloads) self.assertFalse(terminal_metadata_seen) self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata) initial_metadata_seen = True if ticket.payload is not None: self.assertFalse(terminal_metadata_seen) seen_payloads.append(ticket.payload) if ticket.terminal_metadata is not None: self.assertFalse(terminal_metadata_seen) self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata) terminal_metadata_seen = True self.assertSequenceEqual(payloads, seen_payloads) def _assert_is_valid_invocation_sequence( self, ticket_sequence, group, method, payloads, initial_metadata, terminal_metadata, termination): self.assertLess(0, len(ticket_sequence)) self.assertEqual(group, ticket_sequence[0].group) self.assertEqual(method, ticket_sequence[0].method) self._assert_is_valid_metadata_payload_sequence( ticket_sequence, payloads, initial_metadata, terminal_metadata) self.assertIs(termination, ticket_sequence[-1].termination) def _assert_is_valid_service_sequence( self, ticket_sequence, payloads, initial_metadata, terminal_metadata, code, message, termination): self.assertLess(0, len(ticket_sequence)) self._assert_is_valid_metadata_payload_sequence( ticket_sequence, payloads, initial_metadata, terminal_metadata) self.assertEqual(code, ticket_sequence[-1].code) self.assertEqual(message, ticket_sequence[-1].message) self.assertIs(termination, ticket_sequence[-1].termination)
[docs] def setUp(self): self._invocation_link, self._service_link = self.create_transmitting_links() self._invocation_mate = test_utilities.RecordingLink() self._service_mate = test_utilities.RecordingLink() self._invocation_link.join_link(self._invocation_mate) self._service_link.join_link(self._service_mate)
[docs] def tearDown(self): self.destroy_transmitting_links(self._invocation_link, self._service_link)
[docs] def testSimplestRoundTrip(self): """Tests transmission of one ticket in each direction.""" invocation_operation_id = object() invocation_payload = b'\x07' * 1023 timeout = test_constants.LONG_TIMEOUT invocation_initial_metadata = self.create_invocation_initial_metadata() invocation_terminal_metadata = self.create_invocation_terminal_metadata() invocation_code, invocation_message = self.create_invocation_completion() service_payload = b'\x08' * 1025 service_initial_metadata = self.create_service_initial_metadata() service_terminal_metadata = self.create_service_terminal_metadata() service_code, service_message = self.create_service_completion() original_invocation_ticket = links.Ticket( invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata, invocation_payload, invocation_terminal_metadata, invocation_code, invocation_message, links.Ticket.Termination.COMPLETION) self._invocation_link.accept_ticket(original_invocation_ticket) # TODO(nathaniel): This shouldn't be necessary. Detecting the end of the # invocation-side ticket sequence shouldn't require granting allowance for # another payload. self._service_mate.block_until_tickets_satisfy( at_least_n_payloads_received_predicate(1)) service_operation_id = self._service_mate.tickets()[0].operation_id self._service_link.accept_ticket( links.Ticket( service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, None, 1, None, None, None, None, None, None)) self._service_mate.block_until_tickets_satisfy(terminated) self._assert_is_valid_invocation_sequence( self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, (invocation_payload,), invocation_initial_metadata, invocation_terminal_metadata, links.Ticket.Termination.COMPLETION) original_service_ticket = links.Ticket( service_operation_id, 1, None, None, links.Ticket.Subscription.FULL, timeout, 0, service_initial_metadata, service_payload, service_terminal_metadata, service_code, service_message, links.Ticket.Termination.COMPLETION) self._service_link.accept_ticket(original_service_ticket) self._invocation_mate.block_until_tickets_satisfy(terminated) self._assert_is_valid_service_sequence( self._invocation_mate.tickets(), (service_payload,), service_initial_metadata, service_terminal_metadata, service_code, service_message, links.Ticket.Termination.COMPLETION)