|  | @@ -66,13 +66,35 @@ class CallbackClient
 | 
	
		
			
				|  |  |              config, BenchmarkStubCreator) {
 | 
	
		
			
				|  |  |      num_threads_ = NumThreads(config);
 | 
	
		
			
				|  |  |      rpcs_done_ = 0;
 | 
	
		
			
				|  |  | -    SetupLoadTest(config, num_threads_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //  Don't divide the fixed load among threads as the user threads
 | 
	
		
			
				|  |  | +    //  only bootstrap the RPCs
 | 
	
		
			
				|  |  | +    SetupLoadTest(config, 1);
 | 
	
		
			
				|  |  |      total_outstanding_rpcs_ =
 | 
	
		
			
				|  |  |          config.client_channels() * config.outstanding_rpcs_per_channel();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    virtual ~CallbackClient() {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /**
 | 
	
		
			
				|  |  | +   * The main thread of the benchmark will be waiting on DestroyMultithreading.
 | 
	
		
			
				|  |  | +   * Increment the rpcs_done_ variable to signify that the Callback RPC
 | 
	
		
			
				|  |  | +   * after thread completion is done. When the last outstanding rpc increments
 | 
	
		
			
				|  |  | +   * the counter it should also signal the main thread's conditional variable.
 | 
	
		
			
				|  |  | +   */
 | 
	
		
			
				|  |  | +  void NotifyMainThreadOfThreadCompletion() {
 | 
	
		
			
				|  |  | +    std::lock_guard<std::mutex> l(shutdown_mu_);
 | 
	
		
			
				|  |  | +    rpcs_done_++;
 | 
	
		
			
				|  |  | +    if (rpcs_done_ == total_outstanding_rpcs_) {
 | 
	
		
			
				|  |  | +      shutdown_cv_.notify_one();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_timespec NextRPCIssueTime() {
 | 
	
		
			
				|  |  | +    std::lock_guard<std::mutex> l(next_issue_time_mu_);
 | 
	
		
			
				|  |  | +    return Client::NextIssueTime(0);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |   protected:
 | 
	
		
			
				|  |  |    size_t num_threads_;
 | 
	
		
			
				|  |  |    size_t total_outstanding_rpcs_;
 | 
	
	
		
			
				|  | @@ -93,24 +115,9 @@ class CallbackClient
 | 
	
		
			
				|  |  |      ThreadFuncImpl(t, thread_idx);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  virtual void ScheduleRpc(Thread* t, size_t thread_idx,
 | 
	
		
			
				|  |  | -                           size_t ctx_vector_idx) = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /**
 | 
	
		
			
				|  |  | -   * The main thread of the benchmark will be waiting on DestroyMultithreading.
 | 
	
		
			
				|  |  | -   * Increment the rpcs_done_ variable to signify that the Callback RPC
 | 
	
		
			
				|  |  | -   * after thread completion is done. When the last outstanding rpc increments
 | 
	
		
			
				|  |  | -   * the counter it should also signal the main thread's conditional variable.
 | 
	
		
			
				|  |  | -   */
 | 
	
		
			
				|  |  | -  void NotifyMainThreadOfThreadCompletion() {
 | 
	
		
			
				|  |  | -    std::lock_guard<std::mutex> l(shutdown_mu_);
 | 
	
		
			
				|  |  | -    rpcs_done_++;
 | 
	
		
			
				|  |  | -    if (rpcs_done_ == total_outstanding_rpcs_) {
 | 
	
		
			
				|  |  | -      shutdown_cv_.notify_one();
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | +  std::mutex next_issue_time_mu_;  // Used by next issue time
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    int NumThreads(const ClientConfig& config) {
 | 
	
		
			
				|  |  |      int num_threads = config.async_client_threads();
 | 
	
		
			
				|  |  |      if (num_threads <= 0) {  // Use dynamic sizing
 | 
	
	
		
			
				|  | @@ -149,7 +156,7 @@ class CallbackUnaryClient final : public CallbackClient {
 | 
	
		
			
				|  |  |    bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
 | 
	
		
			
				|  |  |      for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
 | 
	
		
			
				|  |  |           vector_idx += num_threads_) {
 | 
	
		
			
				|  |  | -      ScheduleRpc(t, thread_idx, vector_idx);
 | 
	
		
			
				|  |  | +      ScheduleRpc(t, vector_idx);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      return true;
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -157,26 +164,26 @@ class CallbackUnaryClient final : public CallbackClient {
 | 
	
		
			
				|  |  |    void InitThreadFuncImpl(size_t thread_idx) override { return; }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) override {
 | 
	
		
			
				|  |  | +  void ScheduleRpc(Thread* t, size_t vector_idx) {
 | 
	
		
			
				|  |  |      if (!closed_loop_) {
 | 
	
		
			
				|  |  | -      gpr_timespec next_issue_time = NextIssueTime(thread_idx);
 | 
	
		
			
				|  |  | +      gpr_timespec next_issue_time = NextRPCIssueTime();
 | 
	
		
			
				|  |  |        // Start an alarm callback to run the internal callback after
 | 
	
		
			
				|  |  |        // next_issue_time
 | 
	
		
			
				|  |  |        ctx_[vector_idx]->alarm_.experimental().Set(
 | 
	
		
			
				|  |  | -          next_issue_time, [this, t, thread_idx, vector_idx](bool ok) {
 | 
	
		
			
				|  |  | -            IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
 | 
	
		
			
				|  |  | +          next_issue_time, [this, t, vector_idx](bool ok) {
 | 
	
		
			
				|  |  | +            IssueUnaryCallbackRpc(t, vector_idx);
 | 
	
		
			
				|  |  |            });
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  | -      IssueUnaryCallbackRpc(t, thread_idx, vector_idx);
 | 
	
		
			
				|  |  | +      IssueUnaryCallbackRpc(t, vector_idx);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) {
 | 
	
		
			
				|  |  | +  void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
 | 
	
		
			
				|  |  |      GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
 | 
	
		
			
				|  |  |      double start = UsageTimer::Now();
 | 
	
		
			
				|  |  |      ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
 | 
	
		
			
				|  |  |          (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
 | 
	
		
			
				|  |  | -        [this, t, thread_idx, start, vector_idx](grpc::Status s) {
 | 
	
		
			
				|  |  | +        [this, t, start, vector_idx](grpc::Status s) {
 | 
	
		
			
				|  |  |            // Update Histogram with data from the callback run
 | 
	
		
			
				|  |  |            HistogramEntry entry;
 | 
	
		
			
				|  |  |            if (s.ok()) {
 | 
	
	
		
			
				|  | @@ -193,17 +200,157 @@ class CallbackUnaryClient final : public CallbackClient {
 | 
	
		
			
				|  |  |              ctx_[vector_idx].reset(
 | 
	
		
			
				|  |  |                  new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
 | 
	
		
			
				|  |  |              // Schedule a new RPC
 | 
	
		
			
				|  |  | -            ScheduleRpc(t, thread_idx, vector_idx);
 | 
	
		
			
				|  |  | +            ScheduleRpc(t, vector_idx);
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +class CallbackStreamingClient : public CallbackClient {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackStreamingClient(const ClientConfig& config)
 | 
	
		
			
				|  |  | +      : CallbackClient(config),
 | 
	
		
			
				|  |  | +        messages_per_stream_(config.messages_per_stream()) {
 | 
	
		
			
				|  |  | +    for (int ch = 0; ch < config.client_channels(); ch++) {
 | 
	
		
			
				|  |  | +      for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
 | 
	
		
			
				|  |  | +        ctx_.emplace_back(
 | 
	
		
			
				|  |  | +            new CallbackClientRpcContext(channels_[ch].get_stub()));
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    StartThreads(num_threads_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  ~CallbackStreamingClient() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void AddHistogramEntry(double start_, bool ok, Thread* thread_ptr) {
 | 
	
		
			
				|  |  | +    // Update Histogram with data from the callback run
 | 
	
		
			
				|  |  | +    HistogramEntry entry;
 | 
	
		
			
				|  |  | +    if (ok) {
 | 
	
		
			
				|  |  | +      entry.set_value((UsageTimer::Now() - start_) * 1e9);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    thread_ptr->UpdateHistogram(&entry);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  int messages_per_stream() { return messages_per_stream_; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + protected:
 | 
	
		
			
				|  |  | +  const int messages_per_stream_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class CallbackStreamingPingPongClient : public CallbackStreamingClient {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackStreamingPingPongClient(const ClientConfig& config)
 | 
	
		
			
				|  |  | +      : CallbackStreamingClient(config) {}
 | 
	
		
			
				|  |  | +  ~CallbackStreamingPingPongClient() {}
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class CallbackStreamingPingPongReactor final
 | 
	
		
			
				|  |  | +    : public grpc::experimental::ClientBidiReactor<SimpleRequest,
 | 
	
		
			
				|  |  | +                                                   SimpleResponse> {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackStreamingPingPongReactor(
 | 
	
		
			
				|  |  | +      CallbackStreamingPingPongClient* client,
 | 
	
		
			
				|  |  | +      std::unique_ptr<CallbackClientRpcContext> ctx)
 | 
	
		
			
				|  |  | +      : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void StartNewRpc() {
 | 
	
		
			
				|  |  | +    if (client_->ThreadCompleted()) return;
 | 
	
		
			
				|  |  | +    start_ = UsageTimer::Now();
 | 
	
		
			
				|  |  | +    ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
 | 
	
		
			
				|  |  | +    StartWrite(client_->request());
 | 
	
		
			
				|  |  | +    StartCall();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void OnWriteDone(bool ok) override {
 | 
	
		
			
				|  |  | +    if (!ok || client_->ThreadCompleted()) {
 | 
	
		
			
				|  |  | +      if (!ok) gpr_log(GPR_ERROR, "Error writing RPC");
 | 
	
		
			
				|  |  | +      StartWritesDone();
 | 
	
		
			
				|  |  | +      return;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    StartRead(&ctx_->response_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void OnReadDone(bool ok) override {
 | 
	
		
			
				|  |  | +    client_->AddHistogramEntry(start_, ok, thread_ptr_);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (client_->ThreadCompleted() || !ok ||
 | 
	
		
			
				|  |  | +        (client_->messages_per_stream() != 0 &&
 | 
	
		
			
				|  |  | +         ++messages_issued_ >= client_->messages_per_stream())) {
 | 
	
		
			
				|  |  | +      if (!ok) {
 | 
	
		
			
				|  |  | +        gpr_log(GPR_ERROR, "Error reading RPC");
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      StartWritesDone();
 | 
	
		
			
				|  |  | +      return;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    StartWrite(client_->request());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void OnDone(const Status& s) override {
 | 
	
		
			
				|  |  | +    if (client_->ThreadCompleted() || !s.ok()) {
 | 
	
		
			
				|  |  | +      client_->NotifyMainThreadOfThreadCompletion();
 | 
	
		
			
				|  |  | +      return;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    ctx_.reset(new CallbackClientRpcContext(ctx_->stub_));
 | 
	
		
			
				|  |  | +    ScheduleRpc();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void ScheduleRpc() {
 | 
	
		
			
				|  |  | +    if (client_->ThreadCompleted()) return;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (!client_->IsClosedLoop()) {
 | 
	
		
			
				|  |  | +      gpr_timespec next_issue_time = client_->NextRPCIssueTime();
 | 
	
		
			
				|  |  | +      // Start an alarm callback to run the internal callback after
 | 
	
		
			
				|  |  | +      // next_issue_time
 | 
	
		
			
				|  |  | +      ctx_->alarm_.experimental().Set(next_issue_time,
 | 
	
		
			
				|  |  | +                                      [this](bool ok) { StartNewRpc(); });
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      StartNewRpc();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  CallbackStreamingPingPongClient* client_;
 | 
	
		
			
				|  |  | +  std::unique_ptr<CallbackClientRpcContext> ctx_;
 | 
	
		
			
				|  |  | +  Client::Thread* thread_ptr_;  // Needed to update histogram entries
 | 
	
		
			
				|  |  | +  double start_;                // Track message start time
 | 
	
		
			
				|  |  | +  int messages_issued_;         // Messages issued by this stream
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class CallbackStreamingPingPongClientImpl final
 | 
	
		
			
				|  |  | +    : public CallbackStreamingPingPongClient {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  CallbackStreamingPingPongClientImpl(const ClientConfig& config)
 | 
	
		
			
				|  |  | +      : CallbackStreamingPingPongClient(config) {
 | 
	
		
			
				|  |  | +    for (size_t i = 0; i < total_outstanding_rpcs_; i++)
 | 
	
		
			
				|  |  | +      reactor_.emplace_back(
 | 
	
		
			
				|  |  | +          new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  ~CallbackStreamingPingPongClientImpl() {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
 | 
	
		
			
				|  |  | +    for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
 | 
	
		
			
				|  |  | +         vector_idx += num_threads_) {
 | 
	
		
			
				|  |  | +      reactor_[vector_idx]->set_thread_ptr(t);
 | 
	
		
			
				|  |  | +      reactor_[vector_idx]->ScheduleRpc();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return true;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void InitThreadFuncImpl(size_t thread_idx) override {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// TODO(mhaidry) : Implement Streaming from client, server and both ways
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
 | 
	
		
			
				|  |  |    switch (config.rpc_type()) {
 | 
	
		
			
				|  |  |      case UNARY:
 | 
	
		
			
				|  |  |        return std::unique_ptr<Client>(new CallbackUnaryClient(config));
 | 
	
		
			
				|  |  |      case STREAMING:
 | 
	
		
			
				|  |  | +      return std::unique_ptr<Client>(
 | 
	
		
			
				|  |  | +          new CallbackStreamingPingPongClientImpl(config));
 | 
	
		
			
				|  |  |      case STREAMING_FROM_CLIENT:
 | 
	
		
			
				|  |  |      case STREAMING_FROM_SERVER:
 | 
	
		
			
				|  |  |      case STREAMING_BOTH_WAYS:
 |