# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of the links interface of RPC Framework."""
# unittest is referenced from specification in this module.
import abc
import unittest # pylint: disable=unused-import
from grpc.framework.common import test_constants
from grpc.framework.interfaces.links import links
from grpc.framework.interfaces.links import test_utilities
[docs]def at_least_n_payloads_received_predicate(n):
def predicate(ticket_sequence):
payload_count = 0
for ticket in ticket_sequence:
if ticket.payload is not None:
payload_count += 1
if n <= payload_count:
return True
else:
return False
return predicate
[docs]def terminated(ticket_sequence):
return ticket_sequence and ticket_sequence[-1].termination is not None
_TRANSMISSION_GROUP = 'test.Group'
_TRANSMISSION_METHOD = 'TestMethod'
[docs]class TransmissionTest(object):
"""Tests ticket transmission between two connected links.
This class must be mixed into a unittest.TestCase that implements the abstract
methods it provides.
"""
__metaclass__ = abc.ABCMeta
# This is a unittest.TestCase mix-in.
# pylint: disable=invalid-name
@abc.abstractmethod
[docs] def create_transmitting_links(self):
"""Creates two connected links for use in this test.
Returns:
Two links.Links, the first of which will be used on the invocation side
of RPCs and the second of which will be used on the service side of
RPCs.
"""
raise NotImplementedError()
@abc.abstractmethod
[docs] def destroy_transmitting_links(self, invocation_side_link, service_side_link):
"""Destroys the two connected links created for this test.
Args:
invocation_side_link: The link used on the invocation side of RPCs in
this test.
service_side_link: The link used on the service side of RPCs in this
test.
"""
raise NotImplementedError()
@abc.abstractmethod
@abc.abstractmethod
@abc.abstractmethod
@abc.abstractmethod
@abc.abstractmethod
[docs] def create_invocation_completion(self):
"""Creates values for use as invocation-side code and message.
Returns:
An invocation-side code value and an invocation-side message value.
Either or both may be None if invocation-side code and/or
invocation-side message transmission is not supported by the links
under test.
"""
raise NotImplementedError()
@abc.abstractmethod
[docs] def create_service_completion(self):
"""Creates values for use as service-side code and message.
Returns:
A service-side code value and a service-side message value. Either or
both may be None if service-side code and/or service-side message
transmission is not supported by the links under test.
"""
raise NotImplementedError()
@abc.abstractmethod
[docs] def group_and_method(self):
"""Returns the group and method used in this test case.
Returns:
A pair of the group and method used in this test case.
"""
return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD
[docs] def serialize_request(self, request):
"""Serializes a request value used in this test case.
Args:
request: A request value created by this test case.
Returns:
A bytestring that is the serialization of the given request.
"""
return request
[docs] def deserialize_request(self, serialized_request):
"""Deserializes a request value used in this test case.
Args:
serialized_request: A bytestring that is the serialization of some request
used in this test case.
Returns:
The request value encoded by the given bytestring.
"""
return serialized_request
[docs] def serialize_response(self, response):
"""Serializes a response value used in this test case.
Args:
response: A response value created by this test case.
Returns:
A bytestring that is the serialization of the given response.
"""
return response
[docs] def deserialize_response(self, serialized_response):
"""Deserializes a response value used in this test case.
Args:
serialized_response: A bytestring that is the serialization of some
response used in this test case.
Returns:
The response value encoded by the given bytestring.
"""
return serialized_response
def _assert_is_valid_metadata_payload_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata):
initial_metadata_seen = False
seen_payloads = []
terminal_metadata_seen = False
for ticket in ticket_sequence:
if ticket.initial_metadata is not None:
self.assertFalse(initial_metadata_seen)
self.assertFalse(seen_payloads)
self.assertFalse(terminal_metadata_seen)
self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata)
initial_metadata_seen = True
if ticket.payload is not None:
self.assertFalse(terminal_metadata_seen)
seen_payloads.append(ticket.payload)
if ticket.terminal_metadata is not None:
self.assertFalse(terminal_metadata_seen)
self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata)
terminal_metadata_seen = True
self.assertSequenceEqual(payloads, seen_payloads)
def _assert_is_valid_invocation_sequence(
self, ticket_sequence, group, method, payloads, initial_metadata,
terminal_metadata, termination):
self.assertLess(0, len(ticket_sequence))
self.assertEqual(group, ticket_sequence[0].group)
self.assertEqual(method, ticket_sequence[0].method)
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertIs(termination, ticket_sequence[-1].termination)
def _assert_is_valid_service_sequence(
self, ticket_sequence, payloads, initial_metadata, terminal_metadata,
code, message, termination):
self.assertLess(0, len(ticket_sequence))
self._assert_is_valid_metadata_payload_sequence(
ticket_sequence, payloads, initial_metadata, terminal_metadata)
self.assertEqual(code, ticket_sequence[-1].code)
self.assertEqual(message, ticket_sequence[-1].message)
self.assertIs(termination, ticket_sequence[-1].termination)
[docs] def setUp(self):
self._invocation_link, self._service_link = self.create_transmitting_links()
self._invocation_mate = test_utilities.RecordingLink()
self._service_mate = test_utilities.RecordingLink()
self._invocation_link.join_link(self._invocation_mate)
self._service_link.join_link(self._service_mate)
[docs] def tearDown(self):
self.destroy_transmitting_links(self._invocation_link, self._service_link)
[docs] def testSimplestRoundTrip(self):
"""Tests transmission of one ticket in each direction."""
invocation_operation_id = object()
invocation_payload = b'\x07' * 1023
timeout = test_constants.LONG_TIMEOUT
invocation_initial_metadata = self.create_invocation_initial_metadata()
invocation_terminal_metadata = self.create_invocation_terminal_metadata()
invocation_code, invocation_message = self.create_invocation_completion()
service_payload = b'\x08' * 1025
service_initial_metadata = self.create_service_initial_metadata()
service_terminal_metadata = self.create_service_terminal_metadata()
service_code, service_message = self.create_service_completion()
original_invocation_ticket = links.Ticket(
invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
invocation_payload, invocation_terminal_metadata, invocation_code,
invocation_message, links.Ticket.Termination.COMPLETION)
self._invocation_link.accept_ticket(original_invocation_ticket)
# TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
# invocation-side ticket sequence shouldn't require granting allowance for
# another payload.
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
self._service_link.accept_ticket(
links.Ticket(
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
None, 1, None, None, None, None, None, None))
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
(invocation_payload,), invocation_initial_metadata,
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION)
self._service_link.accept_ticket(original_service_ticket)
self._invocation_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_service_sequence(
self._invocation_mate.tickets(), (service_payload,),
service_initial_metadata, service_terminal_metadata, service_code,
service_message, links.Ticket.Termination.COMPLETION)