| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 | /* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */#include <memory>#include <grpcpp/channel.h>#include <grpcpp/client_context.h>#include <grpcpp/create_channel.h>#include <grpcpp/server.h>#include <grpcpp/server_builder.h>#include <grpcpp/server_context.h>#include "absl/memory/memory.h"#include "src/core/lib/gpr/tls.h"#include "src/core/lib/iomgr/port.h"#include "src/proto/grpc/testing/echo.grpc.pb.h"#include "test/core/util/port.h"#include "test/core/util/test_config.h"#ifdef GRPC_POSIX_SOCKET#include "src/core/lib/iomgr/ev_posix.h"#endif  // GRPC_POSIX_SOCKET#include <gtest/gtest.h>#ifdef GRPC_POSIX_SOCKET// Thread-local variable to so that only polls from this test assert// non-blocking (not polls from resolver, timer thread, etc), and only when the// thread is waiting on polls caused by CompletionQueue::AsyncNext (not for// picking a port or other reasons).GPR_TLS_DECL(g_is_nonblocking_poll);namespace {int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,                                   int timeout) {  // Only assert that this poll should have zero timeout if we're in the  // middle of a zero-timeout CQ Next.  if (gpr_tls_get(&g_is_nonblocking_poll)) {    GPR_ASSERT(timeout == 0);  }  return poll(pfds, nfds, timeout);}}  // namespacenamespace grpc {namespace testing {namespace {void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }class NonblockingTest : public ::testing::Test { protected:  NonblockingTest() {}  void SetUp() override {    port_ = grpc_pick_unused_port_or_die();    server_address_ << "localhost:" << port_;    // Setup server    BuildAndStartServer();  }  bool LoopForTag(void** tag, bool* ok) {    // Temporarily set the thread-local nonblocking poll flag so that the polls    // caused by this loop are indeed sent by the library with zero timeout.    intptr_t orig_val = gpr_tls_get(&g_is_nonblocking_poll);    gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(true));    for (;;) {      auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));      if (r == CompletionQueue::SHUTDOWN) {        gpr_tls_set(&g_is_nonblocking_poll, orig_val);        return false;      } else if (r == CompletionQueue::GOT_EVENT) {        gpr_tls_set(&g_is_nonblocking_poll, orig_val);        return true;      }    }  }  void TearDown() override {    server_->Shutdown();    void* ignored_tag;    bool ignored_ok;    cq_->Shutdown();    while (LoopForTag(&ignored_tag, &ignored_ok)) {    }    stub_.reset();    grpc_recycle_unused_port(port_);  }  void BuildAndStartServer() {    ServerBuilder builder;    builder.AddListeningPort(server_address_.str(),                             grpc::InsecureServerCredentials());    service_ =        absl::make_unique<grpc::testing::EchoTestService::AsyncService>();    builder.RegisterService(service_.get());    cq_ = builder.AddCompletionQueue();    server_ = builder.BuildAndStart();  }  void ResetStub() {    std::shared_ptr<Channel> channel = grpc::CreateChannel(        server_address_.str(), grpc::InsecureChannelCredentials());    stub_ = grpc::testing::EchoTestService::NewStub(channel);  }  void SendRpc(int num_rpcs) {    for (int i = 0; i < num_rpcs; i++) {      EchoRequest send_request;      EchoRequest recv_request;      EchoResponse send_response;      EchoResponse recv_response;      Status recv_status;      ClientContext cli_ctx;      ServerContext srv_ctx;      grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);      send_request.set_message("hello non-blocking world");      std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(          stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));      response_reader->StartCall();      response_reader->Finish(&recv_response, &recv_status, tag(4));      service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,                            cq_.get(), cq_.get(), tag(2));      void* got_tag;      bool ok;      EXPECT_TRUE(LoopForTag(&got_tag, &ok));      EXPECT_TRUE(ok);      EXPECT_EQ(detag(got_tag), 2);      EXPECT_EQ(send_request.message(), recv_request.message());      send_response.set_message(recv_request.message());      response_writer.Finish(send_response, Status::OK, tag(3));      int tagsum = 0;      int tagprod = 1;      EXPECT_TRUE(LoopForTag(&got_tag, &ok));      EXPECT_TRUE(ok);      tagsum += detag(got_tag);      tagprod *= detag(got_tag);      EXPECT_TRUE(LoopForTag(&got_tag, &ok));      EXPECT_TRUE(ok);      tagsum += detag(got_tag);      tagprod *= detag(got_tag);      EXPECT_EQ(tagsum, 7);      EXPECT_EQ(tagprod, 12);      EXPECT_EQ(send_response.message(), recv_response.message());      EXPECT_TRUE(recv_status.ok());    }  }  std::unique_ptr<ServerCompletionQueue> cq_;  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;  std::unique_ptr<Server> server_;  std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;  std::ostringstream server_address_;  int port_;};TEST_F(NonblockingTest, SimpleRpc) {  ResetStub();  SendRpc(10);}}  // namespace}  // namespace testing}  // namespace grpc#endif  // GRPC_POSIX_SOCKETint main(int argc, char** argv) {#ifdef GRPC_POSIX_SOCKET  // Override the poll function before anything else can happen  grpc_poll_function = maybe_assert_non_blocking_poll;  grpc::testing::TestEnvironment env(argc, argv);  ::testing::InitGoogleTest(&argc, argv);  gpr_tls_init(&g_is_nonblocking_poll);  // Start the nonblocking poll thread-local variable as false because the  // thread that issues RPCs starts by picking a port (which has non-zero  // timeout).  gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(false));  int ret = RUN_ALL_TESTS();  gpr_tls_destroy(&g_is_nonblocking_poll);  return ret;#else   // GRPC_POSIX_SOCKET  return 0;#endif  // GRPC_POSIX_SOCKET}
 |