Просмотр исходного кода

PR feedback: Move rpc base classes out of __init__

Sergii Tkachenko 4 лет назад
Родитель
Сommit
060eb1acbc

+ 1 - 85
tools/run_tests/xds_test_driver/framework/rpc/__init__.py

@@ -11,88 +11,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
-import re
-from typing import Optional, ClassVar, Dict
-
-# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
-# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
-import grpc
-from google.protobuf import json_format
-import google.protobuf.message
-
-logger = logging.getLogger(__name__)
-
-# Type aliases
-Message = google.protobuf.message.Message
-
-
-class GrpcClientHelper:
-    channel: grpc.Channel
-    DEFAULT_CONNECTION_TIMEOUT_SEC = 60
-    DEFAULT_WAIT_FOR_READY_SEC = 60
-
-    def __init__(self, channel: grpc.Channel, stub_class: ClassVar):
-        self.channel = channel
-        self.stub = stub_class(channel)
-        # This is purely cosmetic to make RPC logs look like method calls.
-        self.log_service_name = re.sub('Stub$', '',
-                                       self.stub.__class__.__name__)
-
-    def call_unary_with_deadline(
-            self,
-            *,
-            rpc: str,
-            req: Message,
-            wait_for_ready_sec: Optional[int] = DEFAULT_WAIT_FOR_READY_SEC,
-            connection_timeout_sec: Optional[
-                int] = DEFAULT_CONNECTION_TIMEOUT_SEC) -> Message:
-        if wait_for_ready_sec is None:
-            wait_for_ready_sec = self.DEFAULT_WAIT_FOR_READY_SEC
-        if connection_timeout_sec is None:
-            connection_timeout_sec = self.DEFAULT_CONNECTION_TIMEOUT_SEC
-
-        timeout_sec = wait_for_ready_sec + connection_timeout_sec
-        rpc_callable: grpc.UnaryUnaryMultiCallable = getattr(self.stub, rpc)
-
-        call_kwargs = dict(wait_for_ready=True, timeout=timeout_sec)
-        self._log_debug(rpc, req, call_kwargs)
-        return rpc_callable(req, **call_kwargs)
-
-    def _log_debug(self, rpc, req, call_kwargs):
-        logger.debug('RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc,
-                     req.__class__.__name__, json_format.MessageToDict(req),
-                     ', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))
-
-
-class GrpcApp:
-    channels: Dict[int, grpc.Channel]
-
-    class NotFound(Exception):
-        """Requested resource not found"""
-
-    def __init__(self, rpc_host):
-        self.rpc_host = rpc_host
-        # Cache gRPC channels per port
-        self.channels = dict()
-
-    def _make_channel(self, port) -> grpc.Channel:
-        if port not in self.channels:
-            target = f'{self.rpc_host}:{port}'
-            self.channels[port] = grpc.insecure_channel(target)
-        return self.channels[port]
-
-    def close(self):
-        # Close all channels
-        for channel in self.channels.values():
-            channel.close()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        self.close()
-        return False
-
-    def __del__(self):
-        self.close()
+from framework.rpc import grpc

+ 99 - 0
tools/run_tests/xds_test_driver/framework/rpc/grpc.py

@@ -0,0 +1,99 @@
+# Copyright 2020 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import re
+from typing import Optional, ClassVar, Dict
+
+# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
+# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
+import grpc
+from google.protobuf import json_format
+import google.protobuf.message
+
+logger = logging.getLogger(__name__)
+
+# Type aliases
+Message = google.protobuf.message.Message
+
+
+class GrpcClientHelper:
+    channel: grpc.Channel
+    DEFAULT_CONNECTION_TIMEOUT_SEC = 60
+    DEFAULT_WAIT_FOR_READY_SEC = 60
+
+    def __init__(self, channel: grpc.Channel, stub_class: ClassVar):
+        self.channel = channel
+        self.stub = stub_class(channel)
+        # This is purely cosmetic to make RPC logs look like method calls.
+        self.log_service_name = re.sub('Stub$', '',
+                                       self.stub.__class__.__name__)
+
+    def call_unary_with_deadline(
+            self,
+            *,
+            rpc: str,
+            req: Message,
+            wait_for_ready_sec: Optional[int] = DEFAULT_WAIT_FOR_READY_SEC,
+            connection_timeout_sec: Optional[
+                int] = DEFAULT_CONNECTION_TIMEOUT_SEC) -> Message:
+        if wait_for_ready_sec is None:
+            wait_for_ready_sec = self.DEFAULT_WAIT_FOR_READY_SEC
+        if connection_timeout_sec is None:
+            connection_timeout_sec = self.DEFAULT_CONNECTION_TIMEOUT_SEC
+
+        timeout_sec = wait_for_ready_sec + connection_timeout_sec
+        rpc_callable: grpc.UnaryUnaryMultiCallable = getattr(self.stub, rpc)
+
+        call_kwargs = dict(wait_for_ready=True, timeout=timeout_sec)
+        self._log_debug(rpc, req, call_kwargs)
+        return rpc_callable(req, **call_kwargs)
+
+    def _log_debug(self, rpc, req, call_kwargs):
+        logger.debug('RPC %s.%s(request=%s(%r), %s)',
+                     self.log_service_name, rpc, req.__class__.__name__,
+                     json_format.MessageToDict(req),
+                     ', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))
+
+
+class GrpcApp:
+    channels: Dict[int, grpc.Channel]
+
+    class NotFound(Exception):
+        """Requested resource not found"""
+
+    def __init__(self, rpc_host):
+        self.rpc_host = rpc_host
+        # Cache gRPC channels per port
+        self.channels = dict()
+
+    def _make_channel(self, port) -> grpc.Channel:
+        if port not in self.channels:
+            target = f'{self.rpc_host}:{port}'
+            self.channels[port] = grpc.insecure_channel(target)
+        return self.channels[port]
+
+    def close(self):
+        # Close all channels
+        for channel in self.channels.values():
+            channel.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+    def __del__(self):
+        self.close()

+ 1 - 1
tools/run_tests/xds_test_driver/framework/rpc/grpc_channelz.py

@@ -53,7 +53,7 @@ _GetServerSocketsRequest = channelz_pb2.GetServerSocketsRequest
 _GetServerSocketsResponse = channelz_pb2.GetServerSocketsResponse
 
 
-class ChannelzServiceClient(framework.rpc.GrpcClientHelper):
+class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper):
     stub: channelz_pb2_grpc.ChannelzStub
 
     def __init__(self, channel: grpc.Channel):

+ 6 - 7
tools/run_tests/xds_test_driver/framework/rpc/grpc_testing.py

@@ -14,7 +14,6 @@
 """
 This contains helpers for gRPC services defined in
 https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
-https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
 """
 from typing import Optional
 
@@ -29,7 +28,7 @@ _LoadBalancerStatsRequest = messages_pb2.LoadBalancerStatsRequest
 _LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse
 
 
-class LoadBalancerStatsServiceClient(framework.rpc.GrpcClientHelper):
+class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
     stub: test_pb2_grpc.LoadBalancerStatsServiceStub
     STATS_PARTIAL_RESULTS_TIMEOUT_SEC = 1200
 
@@ -45,8 +44,8 @@ class LoadBalancerStatsServiceClient(framework.rpc.GrpcClientHelper):
         if timeout_sec is None:
             timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC
 
-        return self.call_unary_with_deadline(
-            rpc='GetClientStats',
-            wait_for_ready_sec=timeout_sec,
-            req=_LoadBalancerStatsRequest(num_rpcs=num_rpcs,
-                                          timeout_sec=timeout_sec))
+        return self.call_unary_with_deadline(rpc='GetClientStats',
+                                             wait_for_ready_sec=timeout_sec,
+                                             req=_LoadBalancerStatsRequest(
+                                                 num_rpcs=num_rpcs,
+                                                 timeout_sec=timeout_sec))

+ 11 - 1
tools/run_tests/xds_test_driver/framework/test_app/client_app.py

@@ -11,6 +11,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+xDS Test Client.
+
+TODO(sergiitk): separate XdsTestClient and KubernetesClientRunner to individual
+modules.
+"""
 import functools
 import logging
 from typing import Optional, Iterator
@@ -31,7 +37,11 @@ _ChannelConnectivityState = grpc_channelz.ChannelConnectivityState
 _LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
 
 
-class XdsTestClient(framework.rpc.GrpcApp):
+class XdsTestClient(framework.rpc.grpc.GrpcApp):
+    """
+    Represents RPC services implemented in Client component of the xds test app.
+    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client
+    """
 
     def __init__(self,
                  *,

+ 11 - 1
tools/run_tests/xds_test_driver/framework/test_app/server_app.py

@@ -11,6 +11,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+xDS Test Server.
+
+TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
+modules.
+"""
 import functools
 import logging
 from typing import Optional
@@ -26,7 +32,11 @@ logger = logging.getLogger(__name__)
 _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
 
 
-class XdsTestServer(framework.rpc.GrpcApp):
+class XdsTestServer(framework.rpc.grpc.GrpcApp):
+    """
+    Represents RPC services implemented in Server component of the xDS test app.
+    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
+    """
 
     def __init__(self,
                  *,