| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961 | /* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */#include <forward_list>#include <functional>#include <list>#include <memory>#include <mutex>#include <sstream>#include <string>#include <thread>#include <utility>#include <vector>#include <grpc/grpc.h>#include <grpc/support/cpu.h>#include <grpc/support/log.h>#include <grpcpp/alarm.h>#include <grpcpp/channel.h>#include <grpcpp/client_context.h>#include <grpcpp/generic/generic_stub.h>#include "src/core/lib/surface/completion_queue.h"#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"#include "test/cpp/qps/client.h"#include "test/cpp/qps/usage_timer.h"#include "test/cpp/util/create_test_channel.h"namespace grpc {namespace testing {class ClientRpcContext { public:  ClientRpcContext() {}  virtual ~ClientRpcContext() {}  // next state, return false if done. Collect stats when appropriate  virtual bool RunNextState(bool, HistogramEntry* entry) = 0;  virtual void StartNewClone(CompletionQueue* cq) = 0;  static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }  static ClientRpcContext* detag(void* t) {    return static_cast<ClientRpcContext*>(t);  }  virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;  virtual void TryCancel() = 0;};template <class RequestType, class ResponseType>class ClientRpcContextUnaryImpl : public ClientRpcContext { public:  ClientRpcContextUnaryImpl(      BenchmarkService::Stub* stub, const RequestType& req,      std::function<gpr_timespec()> next_issue,      std::function<          std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(              BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,              CompletionQueue*)>          prepare_req,      std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)      : context_(),        stub_(stub),        cq_(nullptr),        req_(req),        response_(),        next_state_(State::READY),        callback_(on_done),        next_issue_(std::move(next_issue)),        prepare_req_(prepare_req) {}  ~ClientRpcContextUnaryImpl() override {}  void Start(CompletionQueue* cq, const ClientConfig& config) override {    GPR_ASSERT(!config.use_coalesce_api());  // not supported.    StartInternal(cq);  }  bool RunNextState(bool /*ok*/, HistogramEntry* entry) override {    switch (next_state_) {      case State::READY:        start_ = UsageTimer::Now();        response_reader_ = prepare_req_(stub_, &context_, req_, cq_);        response_reader_->StartCall();        next_state_ = State::RESP_DONE;        response_reader_->Finish(&response_, &status_,                                 ClientRpcContext::tag(this));        return true;      case State::RESP_DONE:        if (status_.ok()) {          entry->set_value((UsageTimer::Now() - start_) * 1e9);        }        callback_(status_, &response_, entry);        next_state_ = State::INVALID;        return false;      default:        GPR_ASSERT(false);        return false;    }  }  void StartNewClone(CompletionQueue* cq) override {    auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,                                                prepare_req_, callback_);    clone->StartInternal(cq);  }  void TryCancel() override { context_.TryCancel(); } private:  grpc::ClientContext context_;  BenchmarkService::Stub* stub_;  CompletionQueue* cq_;  std::unique_ptr<Alarm> alarm_;  const RequestType& req_;  ResponseType response_;  enum State { INVALID, READY, RESP_DONE };  State next_state_;  std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;  std::function<gpr_timespec()> next_issue_;  std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(      BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,      CompletionQueue*)>      prepare_req_;  grpc::Status status_;  double start_;  std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>      response_reader_;  void StartInternal(CompletionQueue* cq) {    cq_ = cq;    if (!next_issue_) {  // ready to issue      RunNextState(true, nullptr);    } else {  // wait for the issue time      alarm_.reset(new Alarm);      alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));    }  }};template <class StubType, class RequestType>class AsyncClient : public ClientImpl<StubType, RequestType> {  // Specify which protected members we are using since there is no  // member name resolution until the template types are fully resolved public:  using Client::NextIssuer;  using Client::SetupLoadTest;  using Client::closed_loop_;  using ClientImpl<StubType, RequestType>::cores_;  using ClientImpl<StubType, RequestType>::channels_;  using ClientImpl<StubType, RequestType>::request_;  AsyncClient(const ClientConfig& config,              std::function<ClientRpcContext*(                  StubType*, std::function<gpr_timespec()> next_issue,                  const RequestType&)>                  setup_ctx,              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>                  create_stub)      : ClientImpl<StubType, RequestType>(config, create_stub),        num_async_threads_(NumThreads(config)) {    SetupLoadTest(config, num_async_threads_);    int tpc = std::max(1, config.threads_per_cq());      // 1 if unspecified    int num_cqs = (num_async_threads_ + tpc - 1) / tpc;  // ceiling operator    for (int i = 0; i < num_cqs; i++) {      cli_cqs_.emplace_back(new CompletionQueue);    }    for (int i = 0; i < num_async_threads_; i++) {      cq_.emplace_back(i % cli_cqs_.size());      next_issuers_.emplace_back(NextIssuer(i));      shutdown_state_.emplace_back(new PerThreadShutdownState());    }    int t = 0;    for (int ch = 0; ch < config.client_channels(); ch++) {      for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {        auto* cq = cli_cqs_[t].get();        auto ctx =            setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);        ctx->Start(cq, config);      }      t = (t + 1) % cli_cqs_.size();    }  }  virtual ~AsyncClient() {    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {      void* got_tag;      bool ok;      while ((*cq)->Next(&got_tag, &ok)) {        delete ClientRpcContext::detag(got_tag);      }    }  }  int GetPollCount() override {    int count = 0;    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {      count += grpc_get_cq_poll_num((*cq)->cq());    }    return count;  } protected:  const int num_async_threads_; private:  struct PerThreadShutdownState {    mutable std::mutex mutex;    bool shutdown;    PerThreadShutdownState() : shutdown(false) {}  };  int NumThreads(const ClientConfig& config) {    int num_threads = config.async_client_threads();    if (num_threads <= 0) {  // Use dynamic sizing      num_threads = cores_;      gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);    }    return num_threads;  }  void DestroyMultithreading() override final {    for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {      std::lock_guard<std::mutex> lock((*ss)->mutex);      (*ss)->shutdown = true;    }    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {      (*cq)->Shutdown();    }    this->EndThreads();  // this needed for resolution  }  ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {    ClientRpcContext* ctx = ClientRpcContext::detag(tag);    if (shutdown_state_[thread_idx]->shutdown) {      ctx->TryCancel();      delete ctx;      bool ok;      while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {        ctx = ClientRpcContext::detag(tag);        ctx->TryCancel();        delete ctx;      }      return nullptr;    }    return ctx;  }  void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {    void* got_tag;    bool ok;    HistogramEntry entry;    HistogramEntry* entry_ptr = &entry;    if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {      return;    }    std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;    shutdown_mu->lock();    ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);    if (ctx == nullptr) {      shutdown_mu->unlock();      return;    }    while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(        [&, ctx, ok, entry_ptr, shutdown_mu]() {          if (!ctx->RunNextState(ok, entry_ptr)) {            // The RPC and callback are done, so clone the ctx            // and kickstart the new one            ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());            delete ctx;          }          shutdown_mu->unlock();        },        &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {      t->UpdateHistogram(entry_ptr);      entry = HistogramEntry();      shutdown_mu->lock();      ctx = ProcessTag(thread_idx, got_tag);      if (ctx == nullptr) {        shutdown_mu->unlock();        return;      }    }  }  std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;  std::vector<int> cq_;  std::vector<std::function<gpr_timespec()>> next_issuers_;  std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;};static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(    const std::shared_ptr<Channel>& ch) {  return BenchmarkService::NewStub(ch);}class AsyncUnaryClient final    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public:  explicit AsyncUnaryClient(const ClientConfig& config)      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(            config, SetupCtx, BenchmarkStubCreator) {    StartThreads(num_async_threads_);  }  ~AsyncUnaryClient() override {} private:  static void CheckDone(const grpc::Status& s, SimpleResponse* /*response*/,                        HistogramEntry* entry) {    entry->set_status(s.error_code());  }  static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>  PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,             const SimpleRequest& request, CompletionQueue* cq) {    return stub->PrepareAsyncUnaryCall(ctx, request, cq);  };  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,                                    std::function<gpr_timespec()> next_issue,                                    const SimpleRequest& req) {    return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(        stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,        AsyncUnaryClient::CheckDone);  }};template <class RequestType, class ResponseType>class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { public:  ClientRpcContextStreamingPingPongImpl(      BenchmarkService::Stub* stub, const RequestType& req,      std::function<gpr_timespec()> next_issue,      std::function<std::unique_ptr<          grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(          BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>          prepare_req,      std::function<void(grpc::Status, ResponseType*)> on_done)      : context_(),        stub_(stub),        cq_(nullptr),        req_(req),        response_(),        next_state_(State::INVALID),        callback_(on_done),        next_issue_(std::move(next_issue)),        prepare_req_(prepare_req),        coalesce_(false) {}  ~ClientRpcContextStreamingPingPongImpl() override {}  void Start(CompletionQueue* cq, const ClientConfig& config) override {    StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());  }  bool RunNextState(bool ok, HistogramEntry* entry) override {    while (true) {      switch (next_state_) {        case State::STREAM_IDLE:          if (!next_issue_) {  // ready to issue            next_state_ = State::READY_TO_WRITE;          } else {            next_state_ = State::WAIT;          }          break;  // loop around, don't return        case State::WAIT:          next_state_ = State::READY_TO_WRITE;          alarm_.reset(new Alarm);          alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));          return true;        case State::READY_TO_WRITE:          if (!ok) {            return false;          }          start_ = UsageTimer::Now();          next_state_ = State::WRITE_DONE;          if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {            stream_->WriteLast(req_, WriteOptions(),                               ClientRpcContext::tag(this));          } else {            stream_->Write(req_, ClientRpcContext::tag(this));          }          return true;        case State::WRITE_DONE:          if (!ok) {            return false;          }          next_state_ = State::READ_DONE;          stream_->Read(&response_, ClientRpcContext::tag(this));          return true;          break;        case State::READ_DONE:          entry->set_value((UsageTimer::Now() - start_) * 1e9);          callback_(status_, &response_);          if ((messages_per_stream_ != 0) &&              (++messages_issued_ >= messages_per_stream_)) {            next_state_ = State::WRITES_DONE_DONE;            if (coalesce_) {              // WritesDone should have been called on the last Write.              // loop around to call Finish.              break;            }            stream_->WritesDone(ClientRpcContext::tag(this));            return true;          }          next_state_ = State::STREAM_IDLE;          break;  // loop around        case State::WRITES_DONE_DONE:          next_state_ = State::FINISH_DONE;          stream_->Finish(&status_, ClientRpcContext::tag(this));          return true;        case State::FINISH_DONE:          next_state_ = State::INVALID;          return false;          break;        default:          GPR_ASSERT(false);          return false;      }    }  }  void StartNewClone(CompletionQueue* cq) override {    auto* clone = new ClientRpcContextStreamingPingPongImpl(        stub_, req_, next_issue_, prepare_req_, callback_);    clone->StartInternal(cq, messages_per_stream_, coalesce_);  }  void TryCancel() override { context_.TryCancel(); } private:  grpc::ClientContext context_;  BenchmarkService::Stub* stub_;  CompletionQueue* cq_;  std::unique_ptr<Alarm> alarm_;  const RequestType& req_;  ResponseType response_;  enum State {    INVALID,    STREAM_IDLE,    WAIT,    READY_TO_WRITE,    WRITE_DONE,    READ_DONE,    WRITES_DONE_DONE,    FINISH_DONE  };  State next_state_;  std::function<void(grpc::Status, ResponseType*)> callback_;  std::function<gpr_timespec()> next_issue_;  std::function<      std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(          BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>      prepare_req_;  grpc::Status status_;  double start_;  std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>      stream_;  // Allow a limit on number of messages in a stream  int messages_per_stream_;  int messages_issued_;  // Whether to use coalescing API.  bool coalesce_;  void StartInternal(CompletionQueue* cq, int messages_per_stream,                     bool coalesce) {    cq_ = cq;    messages_per_stream_ = messages_per_stream;    messages_issued_ = 0;    coalesce_ = coalesce;    if (coalesce_) {      GPR_ASSERT(messages_per_stream_ != 0);      context_.set_initial_metadata_corked(true);    }    stream_ = prepare_req_(stub_, &context_, cq);    next_state_ = State::STREAM_IDLE;    stream_->StartCall(ClientRpcContext::tag(this));    if (coalesce_) {      // When the initial metadata is corked, the tag will not come back and we      // need to manually drive the state machine.      RunNextState(true, nullptr);    }  }};class AsyncStreamingPingPongClient final    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public:  explicit AsyncStreamingPingPongClient(const ClientConfig& config)      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(            config, SetupCtx, BenchmarkStubCreator) {    StartThreads(num_async_threads_);  }  ~AsyncStreamingPingPongClient() override {} private:  static void CheckDone(const grpc::Status& /*s*/,                        SimpleResponse* /*response*/) {}  static std::unique_ptr<      grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>  PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,             CompletionQueue* cq) {    auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);    return stream;  };  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,                                    std::function<gpr_timespec()> next_issue,                                    const SimpleRequest& req) {    return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,                                                     SimpleResponse>(        stub, req, std::move(next_issue),        AsyncStreamingPingPongClient::PrepareReq,        AsyncStreamingPingPongClient::CheckDone);  }};template <class RequestType, class ResponseType>class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { public:  ClientRpcContextStreamingFromClientImpl(      BenchmarkService::Stub* stub, const RequestType& req,      std::function<gpr_timespec()> next_issue,      std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(          BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,          CompletionQueue*)>          prepare_req,      std::function<void(grpc::Status, ResponseType*)> on_done)      : context_(),        stub_(stub),        cq_(nullptr),        req_(req),        response_(),        next_state_(State::INVALID),        callback_(on_done),        next_issue_(std::move(next_issue)),        prepare_req_(prepare_req) {}  ~ClientRpcContextStreamingFromClientImpl() override {}  void Start(CompletionQueue* cq, const ClientConfig& config) override {    GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.    StartInternal(cq);  }  bool RunNextState(bool ok, HistogramEntry* entry) override {    while (true) {      switch (next_state_) {        case State::STREAM_IDLE:          if (!next_issue_) {  // ready to issue            next_state_ = State::READY_TO_WRITE;          } else {            next_state_ = State::WAIT;          }          break;  // loop around, don't return        case State::WAIT:          alarm_.reset(new Alarm);          alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));          next_state_ = State::READY_TO_WRITE;          return true;        case State::READY_TO_WRITE:          if (!ok) {            return false;          }          start_ = UsageTimer::Now();          next_state_ = State::WRITE_DONE;          stream_->Write(req_, ClientRpcContext::tag(this));          return true;        case State::WRITE_DONE:          if (!ok) {            return false;          }          entry->set_value((UsageTimer::Now() - start_) * 1e9);          next_state_ = State::STREAM_IDLE;          break;  // loop around        default:          GPR_ASSERT(false);          return false;      }    }  }  void StartNewClone(CompletionQueue* cq) override {    auto* clone = new ClientRpcContextStreamingFromClientImpl(        stub_, req_, next_issue_, prepare_req_, callback_);    clone->StartInternal(cq);  }  void TryCancel() override { context_.TryCancel(); } private:  grpc::ClientContext context_;  BenchmarkService::Stub* stub_;  CompletionQueue* cq_;  std::unique_ptr<Alarm> alarm_;  const RequestType& req_;  ResponseType response_;  enum State {    INVALID,    STREAM_IDLE,    WAIT,    READY_TO_WRITE,    WRITE_DONE,  };  State next_state_;  std::function<void(grpc::Status, ResponseType*)> callback_;  std::function<gpr_timespec()> next_issue_;  std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(      BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,      CompletionQueue*)>      prepare_req_;  grpc::Status status_;  double start_;  std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;  void StartInternal(CompletionQueue* cq) {    cq_ = cq;    stream_ = prepare_req_(stub_, &context_, &response_, cq);    next_state_ = State::STREAM_IDLE;    stream_->StartCall(ClientRpcContext::tag(this));  }};class AsyncStreamingFromClientClient final    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public:  explicit AsyncStreamingFromClientClient(const ClientConfig& config)      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(            config, SetupCtx, BenchmarkStubCreator) {    StartThreads(num_async_threads_);  }  ~AsyncStreamingFromClientClient() override {} private:  static void CheckDone(const grpc::Status& /*s*/,                        SimpleResponse* /*response*/) {}  static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(      BenchmarkService::Stub* stub, grpc::ClientContext* ctx,      SimpleResponse* resp, CompletionQueue* cq) {    auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);    return stream;  };  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,                                    std::function<gpr_timespec()> next_issue,                                    const SimpleRequest& req) {    return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,                                                       SimpleResponse>(        stub, req, std::move(next_issue),        AsyncStreamingFromClientClient::PrepareReq,        AsyncStreamingFromClientClient::CheckDone);  }};template <class RequestType, class ResponseType>class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { public:  ClientRpcContextStreamingFromServerImpl(      BenchmarkService::Stub* stub, const RequestType& req,      std::function<gpr_timespec()> next_issue,      std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(          BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,          CompletionQueue*)>          prepare_req,      std::function<void(grpc::Status, ResponseType*)> on_done)      : context_(),        stub_(stub),        cq_(nullptr),        req_(req),        response_(),        next_state_(State::INVALID),        callback_(on_done),        next_issue_(std::move(next_issue)),        prepare_req_(prepare_req) {}  ~ClientRpcContextStreamingFromServerImpl() override {}  void Start(CompletionQueue* cq, const ClientConfig& config) override {    GPR_ASSERT(!config.use_coalesce_api());  // not supported    StartInternal(cq);  }  bool RunNextState(bool ok, HistogramEntry* entry) override {    while (true) {      switch (next_state_) {        case State::STREAM_IDLE:          if (!ok) {            return false;          }          start_ = UsageTimer::Now();          next_state_ = State::READ_DONE;          stream_->Read(&response_, ClientRpcContext::tag(this));          return true;        case State::READ_DONE:          if (!ok) {            return false;          }          entry->set_value((UsageTimer::Now() - start_) * 1e9);          callback_(status_, &response_);          next_state_ = State::STREAM_IDLE;          break;  // loop around        default:          GPR_ASSERT(false);          return false;      }    }  }  void StartNewClone(CompletionQueue* cq) override {    auto* clone = new ClientRpcContextStreamingFromServerImpl(        stub_, req_, next_issue_, prepare_req_, callback_);    clone->StartInternal(cq);  }  void TryCancel() override { context_.TryCancel(); } private:  grpc::ClientContext context_;  BenchmarkService::Stub* stub_;  CompletionQueue* cq_;  std::unique_ptr<Alarm> alarm_;  const RequestType& req_;  ResponseType response_;  enum State { INVALID, STREAM_IDLE, READ_DONE };  State next_state_;  std::function<void(grpc::Status, ResponseType*)> callback_;  std::function<gpr_timespec()> next_issue_;  std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(      BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,      CompletionQueue*)>      prepare_req_;  grpc::Status status_;  double start_;  std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;  void StartInternal(CompletionQueue* cq) {    // TODO(vjpai): Add support to rate-pace this    cq_ = cq;    stream_ = prepare_req_(stub_, &context_, req_, cq);    next_state_ = State::STREAM_IDLE;    stream_->StartCall(ClientRpcContext::tag(this));  }};class AsyncStreamingFromServerClient final    : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public:  explicit AsyncStreamingFromServerClient(const ClientConfig& config)      : AsyncClient<BenchmarkService::Stub, SimpleRequest>(            config, SetupCtx, BenchmarkStubCreator) {    StartThreads(num_async_threads_);  }  ~AsyncStreamingFromServerClient() override {} private:  static void CheckDone(const grpc::Status& /*s*/,                        SimpleResponse* /*response*/) {}  static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(      BenchmarkService::Stub* stub, grpc::ClientContext* ctx,      const SimpleRequest& req, CompletionQueue* cq) {    auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);    return stream;  };  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,                                    std::function<gpr_timespec()> next_issue,                                    const SimpleRequest& req) {    return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,                                                       SimpleResponse>(        stub, req, std::move(next_issue),        AsyncStreamingFromServerClient::PrepareReq,        AsyncStreamingFromServerClient::CheckDone);  }};class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { public:  ClientRpcContextGenericStreamingImpl(      grpc::GenericStub* stub, const ByteBuffer& req,      std::function<gpr_timespec()> next_issue,      std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(          grpc::GenericStub*, grpc::ClientContext*,          const grpc::string& method_name, CompletionQueue*)>          prepare_req,      std::function<void(grpc::Status, ByteBuffer*)> on_done)      : context_(),        stub_(stub),        cq_(nullptr),        req_(req),        response_(),        next_state_(State::INVALID),        callback_(std::move(on_done)),        next_issue_(std::move(next_issue)),        prepare_req_(std::move(prepare_req)) {}  ~ClientRpcContextGenericStreamingImpl() override {}  void Start(CompletionQueue* cq, const ClientConfig& config) override {    GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.    StartInternal(cq, config.messages_per_stream());  }  bool RunNextState(bool ok, HistogramEntry* entry) override {    while (true) {      switch (next_state_) {        case State::STREAM_IDLE:          if (!next_issue_) {  // ready to issue            next_state_ = State::READY_TO_WRITE;          } else {            next_state_ = State::WAIT;          }          break;  // loop around, don't return        case State::WAIT:          next_state_ = State::READY_TO_WRITE;          alarm_.reset(new Alarm);          alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));          return true;        case State::READY_TO_WRITE:          if (!ok) {            return false;          }          start_ = UsageTimer::Now();          next_state_ = State::WRITE_DONE;          stream_->Write(req_, ClientRpcContext::tag(this));          return true;        case State::WRITE_DONE:          if (!ok) {            return false;          }          next_state_ = State::READ_DONE;          stream_->Read(&response_, ClientRpcContext::tag(this));          return true;          break;        case State::READ_DONE:          entry->set_value((UsageTimer::Now() - start_) * 1e9);          callback_(status_, &response_);          if ((messages_per_stream_ != 0) &&              (++messages_issued_ >= messages_per_stream_)) {            next_state_ = State::WRITES_DONE_DONE;            stream_->WritesDone(ClientRpcContext::tag(this));            return true;          }          next_state_ = State::STREAM_IDLE;          break;  // loop around        case State::WRITES_DONE_DONE:          next_state_ = State::FINISH_DONE;          stream_->Finish(&status_, ClientRpcContext::tag(this));          return true;        case State::FINISH_DONE:          next_state_ = State::INVALID;          return false;          break;        default:          GPR_ASSERT(false);          return false;      }    }  }  void StartNewClone(CompletionQueue* cq) override {    auto* clone = new ClientRpcContextGenericStreamingImpl(        stub_, req_, next_issue_, prepare_req_, callback_);    clone->StartInternal(cq, messages_per_stream_);  }  void TryCancel() override { context_.TryCancel(); } private:  grpc::ClientContext context_;  grpc::GenericStub* stub_;  CompletionQueue* cq_;  std::unique_ptr<Alarm> alarm_;  ByteBuffer req_;  ByteBuffer response_;  enum State {    INVALID,    STREAM_IDLE,    WAIT,    READY_TO_WRITE,    WRITE_DONE,    READ_DONE,    WRITES_DONE_DONE,    FINISH_DONE  };  State next_state_;  std::function<void(grpc::Status, ByteBuffer*)> callback_;  std::function<gpr_timespec()> next_issue_;  std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(      grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,      CompletionQueue*)>      prepare_req_;  grpc::Status status_;  double start_;  std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;  // Allow a limit on number of messages in a stream  int messages_per_stream_;  int messages_issued_;  void StartInternal(CompletionQueue* cq, int messages_per_stream) {    cq_ = cq;    const grpc::string kMethodName(        "/grpc.testing.BenchmarkService/StreamingCall");    messages_per_stream_ = messages_per_stream;    messages_issued_ = 0;    stream_ = prepare_req_(stub_, &context_, kMethodName, cq);    next_state_ = State::STREAM_IDLE;    stream_->StartCall(ClientRpcContext::tag(this));  }};static std::unique_ptr<grpc::GenericStub> GenericStubCreator(    const std::shared_ptr<Channel>& ch) {  return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));}class GenericAsyncStreamingClient final    : public AsyncClient<grpc::GenericStub, ByteBuffer> { public:  explicit GenericAsyncStreamingClient(const ClientConfig& config)      : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,                                                   GenericStubCreator) {    StartThreads(num_async_threads_);  }  ~GenericAsyncStreamingClient() override {} private:  static void CheckDone(const grpc::Status& /*s*/, ByteBuffer* /*response*/) {}  static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(      grpc::GenericStub* stub, grpc::ClientContext* ctx,      const grpc::string& method_name, CompletionQueue* cq) {    auto stream = stub->PrepareCall(ctx, method_name, cq);    return stream;  };  static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,                                    std::function<gpr_timespec()> next_issue,                                    const ByteBuffer& req) {    return new ClientRpcContextGenericStreamingImpl(        stub, req, std::move(next_issue),        GenericAsyncStreamingClient::PrepareReq,        GenericAsyncStreamingClient::CheckDone);  }};std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {  switch (config.rpc_type()) {    case UNARY:      return std::unique_ptr<Client>(new AsyncUnaryClient(config));    case STREAMING:      return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));    case STREAMING_FROM_CLIENT:      return std::unique_ptr<Client>(          new AsyncStreamingFromClientClient(config));    case STREAMING_FROM_SERVER:      return std::unique_ptr<Client>(          new AsyncStreamingFromServerClient(config));    case STREAMING_BOTH_WAYS:      // TODO(vjpai): Implement this      assert(false);      return nullptr;    default:      assert(false);      return nullptr;  }}std::unique_ptr<Client> CreateGenericAsyncStreamingClient(    const ClientConfig& args) {  return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));}}  // namespace testing}  // namespace grpc
 |