ソースを参照

Merge pull request #24362 from yashykt/fdinterceptor

Experimental support and tests for CreateCustomInsecureChannelWithInterceptorsFromFd
Yash Tibrewal 5 年 前
コミット
1edc2e78cc

+ 2 - 2
include/grpcpp/create_channel_posix.h

@@ -57,8 +57,8 @@ namespace experimental {
 std::shared_ptr<grpc::Channel>
 CreateCustomInsecureChannelWithInterceptorsFromFd(
     const std::string& target, int fd, const grpc::ChannelArguments& args,
-    std::unique_ptr<std::vector<
-        std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>>
+    std::vector<
+        std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
         interceptor_creators);
 
 }  // namespace experimental

+ 81 - 19
test/cpp/end2end/client_interceptors_end2end_test.cc

@@ -19,17 +19,21 @@
 #include <memory>
 #include <vector>
 
+#include "absl/memory/memory.h"
+
 #include <grpcpp/channel.h>
 #include <grpcpp/client_context.h>
 #include <grpcpp/create_channel.h>
+#include <grpcpp/create_channel_posix.h>
 #include <grpcpp/generic/generic_stub.h>
 #include <grpcpp/impl/codegen/proto_utils.h>
 #include <grpcpp/server.h>
 #include <grpcpp/server_builder.h>
 #include <grpcpp/server_context.h>
+#include <grpcpp/server_posix.h>
 #include <grpcpp/support/client_interceptor.h>
 
-#include "absl/memory/memory.h"
+#include "src/core/lib/iomgr/port.h"
 #include "src/proto/grpc/testing/echo.grpc.pb.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
@@ -38,6 +42,11 @@
 #include "test/cpp/util/byte_buffer_proto_helper.h"
 #include "test/cpp/util/string_ref_helper.h"
 
+#ifdef GRPC_POSIX_SOCKET
+#include <fcntl.h>
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+#endif /* GRPC_POSIX_SOCKET */
+
 #include <gtest/gtest.h>
 
 namespace grpc {
@@ -55,6 +64,11 @@ enum class RPCType {
   kAsyncCQBidiStreaming,
 };
 
+enum class ChannelType {
+  kHttpChannel,
+  kFdChannel,
+};
+
 /* Hijacks Echo RPC and fills in the expected values */
 class HijackingInterceptor : public experimental::Interceptor {
  public:
@@ -686,22 +700,35 @@ class LoggingInterceptorFactory
 
 class TestScenario {
  public:
-  explicit TestScenario(const RPCType& type) : type_(type) {}
+  explicit TestScenario(const ChannelType& channel_type,
+                        const RPCType& rpc_type)
+      : channel_type_(channel_type), rpc_type_(rpc_type) {}
 
-  RPCType type() const { return type_; }
+  ChannelType channel_type() const { return channel_type_; }
+
+  RPCType rpc_type() const { return rpc_type_; }
 
  private:
-  RPCType type_;
+  const ChannelType channel_type_;
+  const RPCType rpc_type_;
 };
 
 std::vector<TestScenario> CreateTestScenarios() {
   std::vector<TestScenario> scenarios;
-  scenarios.emplace_back(RPCType::kSyncUnary);
-  scenarios.emplace_back(RPCType::kSyncClientStreaming);
-  scenarios.emplace_back(RPCType::kSyncServerStreaming);
-  scenarios.emplace_back(RPCType::kSyncBidiStreaming);
-  scenarios.emplace_back(RPCType::kAsyncCQUnary);
-  scenarios.emplace_back(RPCType::kAsyncCQServerStreaming);
+  std::vector<RPCType> rpc_types;
+  rpc_types.emplace_back(RPCType::kSyncUnary);
+  rpc_types.emplace_back(RPCType::kSyncClientStreaming);
+  rpc_types.emplace_back(RPCType::kSyncServerStreaming);
+  rpc_types.emplace_back(RPCType::kSyncBidiStreaming);
+  rpc_types.emplace_back(RPCType::kAsyncCQUnary);
+  rpc_types.emplace_back(RPCType::kAsyncCQServerStreaming);
+  for (const auto& rpc_type : rpc_types) {
+    scenarios.emplace_back(ChannelType::kHttpChannel, rpc_type);
+// TODO(yashykt): Maybe add support for non-posix sockets too
+#ifdef GRPC_POSIX_SOCKET
+    scenarios.emplace_back(ChannelType::kFdChannel, rpc_type);
+#endif /* GRPC_POSIX_SOCKET */
+  }
   return scenarios;
 }
 
@@ -709,21 +736,56 @@ class ParameterizedClientInterceptorsEnd2endTest
     : public ::testing::TestWithParam<TestScenario> {
  protected:
   ParameterizedClientInterceptorsEnd2endTest() {
-    int port = grpc_pick_unused_port_or_die();
-
     ServerBuilder builder;
-    server_address_ = "localhost:" + std::to_string(port);
-    builder.AddListeningPort(server_address_, InsecureServerCredentials());
     builder.RegisterService(&service_);
-    server_ = builder.BuildAndStart();
+    if (GetParam().channel_type() == ChannelType::kHttpChannel) {
+      int port = grpc_pick_unused_port_or_die();
+      server_address_ = "localhost:" + std::to_string(port);
+      builder.AddListeningPort(server_address_, InsecureServerCredentials());
+      server_ = builder.BuildAndStart();
+    }
+#ifdef GRPC_POSIX_SOCKET
+    else if (GetParam().channel_type() == ChannelType::kFdChannel) {
+      int flags;
+      GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv_) == 0);
+      flags = fcntl(sv_[0], F_GETFL, 0);
+      GPR_ASSERT(fcntl(sv_[0], F_SETFL, flags | O_NONBLOCK) == 0);
+      flags = fcntl(sv_[1], F_GETFL, 0);
+      GPR_ASSERT(fcntl(sv_[1], F_SETFL, flags | O_NONBLOCK) == 0);
+      GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[0]) ==
+                 GRPC_ERROR_NONE);
+      GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv_[1]) ==
+                 GRPC_ERROR_NONE);
+      server_ = builder.BuildAndStart();
+      AddInsecureChannelFromFd(server_.get(), sv_[1]);
+    }
+#endif /* GRPC_POSIX_SOCKET */
   }
 
   ~ParameterizedClientInterceptorsEnd2endTest() override {
     server_->Shutdown();
   }
 
+  std::shared_ptr<grpc::Channel> CreateClientChannel(
+      std::vector<std::unique_ptr<
+          grpc::experimental::ClientInterceptorFactoryInterface>>
+          creators) {
+    if (GetParam().channel_type() == ChannelType::kHttpChannel) {
+      return experimental::CreateCustomChannelWithInterceptors(
+          server_address_, InsecureChannelCredentials(), ChannelArguments(),
+          std::move(creators));
+    }
+#ifdef GRPC_POSIX_SOCKET
+    else if (GetParam().channel_type() == ChannelType::kFdChannel) {
+      return experimental::CreateCustomInsecureChannelWithInterceptorsFromFd(
+          "", sv_[0], ChannelArguments(), std::move(creators));
+    }
+#endif /* GRPC_POSIX_SOCKET */
+    return nullptr;
+  }
+
   void SendRPC(const std::shared_ptr<Channel>& channel) {
-    switch (GetParam().type()) {
+    switch (GetParam().rpc_type()) {
       case RPCType::kSyncUnary:
         MakeCall(channel);
         break;
@@ -752,6 +814,7 @@ class ParameterizedClientInterceptorsEnd2endTest
   }
 
   std::string server_address_;
+  int sv_[2];
   EchoTestServiceStreamingImpl service_;
   std::unique_ptr<Server> server_;
 };
@@ -767,10 +830,9 @@ TEST_P(ParameterizedClientInterceptorsEnd2endTest,
   for (auto i = 0; i < 20; i++) {
     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
   }
-  auto channel = experimental::CreateCustomChannelWithInterceptors(
-      server_address_, InsecureChannelCredentials(), args, std::move(creators));
+  auto channel = CreateClientChannel(std::move(creators));
   SendRPC(channel);
-  LoggingInterceptor::VerifyCall(GetParam().type());
+  LoggingInterceptor::VerifyCall(GetParam().rpc_type());
   // Make sure all 20 dummy interceptors were run
   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
 }