|  | @@ -54,86 +54,144 @@ auto& force_library_initialization = Library::get();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -template <class Fixture>
 | 
	
		
			
				|  |  | -static void BM_PumpStreamClientToServer(benchmark::State& state) {
 | 
	
		
			
				|  |  | +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
 | 
	
		
			
				|  |  | +// messages in each call) in a loop on a single channel
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +//  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
 | 
	
		
			
				|  |  | +//  Second parameter (i.e state.range(1)): Number of ping pong messages.
 | 
	
		
			
				|  |  | +//      Note: One ping-pong means two messages (one from client to server and
 | 
	
		
			
				|  |  | +//      the other from server to client):
 | 
	
		
			
				|  |  | +template <class Fixture, class ClientContextMutator, class ServerContextMutator>
 | 
	
		
			
				|  |  | +static void BM_StreamingPingPong(benchmark::State& state) {
 | 
	
		
			
				|  |  | +  TrackCounters track_counters;
 | 
	
		
			
				|  |  | +  const int msg_size = state.range(0);
 | 
	
		
			
				|  |  | +  const int max_ping_pongs = state.range(1);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    EchoTestService::AsyncService service;
 | 
	
		
			
				|  |  |    std::unique_ptr<Fixture> fixture(new Fixture(&service));
 | 
	
		
			
				|  |  |    {
 | 
	
		
			
				|  |  | +    EchoResponse send_response;
 | 
	
		
			
				|  |  | +    EchoResponse recv_response;
 | 
	
		
			
				|  |  |      EchoRequest send_request;
 | 
	
		
			
				|  |  |      EchoRequest recv_request;
 | 
	
		
			
				|  |  | -    if (state.range(0) > 0) {
 | 
	
		
			
				|  |  | -      send_request.set_message(std::string(state.range(0), 'a'));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (msg_size > 0) {
 | 
	
		
			
				|  |  | +      send_request.set_message(std::string(msg_size, 'a'));
 | 
	
		
			
				|  |  | +      send_response.set_message(std::string(msg_size, 'b'));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    Status recv_status;
 | 
	
		
			
				|  |  | -    ServerContext svr_ctx;
 | 
	
		
			
				|  |  | -    ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
 | 
	
		
			
				|  |  | -    service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
 | 
	
		
			
				|  |  | -                              fixture->cq(), tag(0));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      std::unique_ptr<EchoTestService::Stub> stub(
 | 
	
		
			
				|  |  |          EchoTestService::NewStub(fixture->channel()));
 | 
	
		
			
				|  |  | -    ClientContext cli_ctx;
 | 
	
		
			
				|  |  | -    auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
 | 
	
		
			
				|  |  | -    int need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  | -    void* t;
 | 
	
		
			
				|  |  | -    bool ok;
 | 
	
		
			
				|  |  | -    while (need_tags) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | -      GPR_ASSERT(ok);
 | 
	
		
			
				|  |  | -      int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | -      GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | -      need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    response_rw.Read(&recv_request, tag(0));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      while (state.KeepRunning()) {
 | 
	
		
			
				|  |  | -      GPR_TIMER_SCOPE("BenchmarkCycle", 0);
 | 
	
		
			
				|  |  | -      request_rw->Write(send_request, tag(1));
 | 
	
		
			
				|  |  | -      while (true) {
 | 
	
		
			
				|  |  | +      ServerContext svr_ctx;
 | 
	
		
			
				|  |  | +      ServerContextMutator svr_ctx_mut(&svr_ctx);
 | 
	
		
			
				|  |  | +      ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
 | 
	
		
			
				|  |  | +      service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
 | 
	
		
			
				|  |  | +                                fixture->cq(), tag(0));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      ClientContext cli_ctx;
 | 
	
		
			
				|  |  | +      ClientContextMutator cli_ctx_mut(&cli_ctx);
 | 
	
		
			
				|  |  | +      auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      // Establish async stream between client side and server side
 | 
	
		
			
				|  |  | +      void* t;
 | 
	
		
			
				|  |  | +      bool ok;
 | 
	
		
			
				|  |  | +      int need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  | +      while (need_tags) {
 | 
	
		
			
				|  |  |          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | -        if (t == tag(0)) {
 | 
	
		
			
				|  |  | -          response_rw.Read(&recv_request, tag(0));
 | 
	
		
			
				|  |  | -        } else if (t == tag(1)) {
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          GPR_ASSERT(false);
 | 
	
		
			
				|  |  | +        GPR_ASSERT(ok);
 | 
	
		
			
				|  |  | +        int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | +        GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | +        need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      // Send 'max_ping_pongs' number of ping pong messages
 | 
	
		
			
				|  |  | +      int ping_pong_cnt = 0;
 | 
	
		
			
				|  |  | +      while (ping_pong_cnt < max_ping_pongs) {
 | 
	
		
			
				|  |  | +        request_rw->Write(send_request, tag(0));   // Start client send
 | 
	
		
			
				|  |  | +        response_rw.Read(&recv_request, tag(1));   // Start server recv
 | 
	
		
			
				|  |  | +        request_rw->Read(&recv_response, tag(2));  // Start client recv
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
 | 
	
		
			
				|  |  | +        while (need_tags) {
 | 
	
		
			
				|  |  | +          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | +          GPR_ASSERT(ok);
 | 
	
		
			
				|  |  | +          int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +          // If server recv is complete, start the server send operation
 | 
	
		
			
				|  |  | +          if (i == 1) {
 | 
	
		
			
				|  |  | +            response_rw.Write(send_response, tag(3));
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +          GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | +          need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        ping_pong_cnt++;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    request_rw->WritesDone(tag(1));
 | 
	
		
			
				|  |  | -    need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  | -    while (need_tags) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | -      int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | -      GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | -      need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      request_rw->WritesDone(tag(0));
 | 
	
		
			
				|  |  | +      response_rw.Finish(Status::OK, tag(1));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      Status recv_status;
 | 
	
		
			
				|  |  | +      request_rw->Finish(&recv_status, tag(2));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      need_tags = (1 << 0) | (1 << 1) | (1 << 2);
 | 
	
		
			
				|  |  | +      while (need_tags) {
 | 
	
		
			
				|  |  | +        GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | +        int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | +        GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | +        need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      GPR_ASSERT(recv_status.ok());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    fixture->Finish(state);
 | 
	
		
			
				|  |  |    fixture.reset();
 | 
	
		
			
				|  |  | -  state.SetBytesProcessed(state.range(0) * state.iterations());
 | 
	
		
			
				|  |  | +  state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
 | 
	
		
			
				|  |  | +  track_counters.Finish(state);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -template <class Fixture>
 | 
	
		
			
				|  |  | -static void BM_PumpStreamServerToClient(benchmark::State& state) {
 | 
	
		
			
				|  |  | +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
 | 
	
		
			
				|  |  | +//     First parmeter (i.e state.range(0)):  Message size (in bytes) to use
 | 
	
		
			
				|  |  | +template <class Fixture, class ClientContextMutator, class ServerContextMutator>
 | 
	
		
			
				|  |  | +static void BM_StreamingPingPongMsgs(benchmark::State& state) {
 | 
	
		
			
				|  |  | +  TrackCounters track_counters;
 | 
	
		
			
				|  |  | +  const int msg_size = state.range(0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    EchoTestService::AsyncService service;
 | 
	
		
			
				|  |  |    std::unique_ptr<Fixture> fixture(new Fixture(&service));
 | 
	
		
			
				|  |  |    {
 | 
	
		
			
				|  |  |      EchoResponse send_response;
 | 
	
		
			
				|  |  |      EchoResponse recv_response;
 | 
	
		
			
				|  |  | -    if (state.range(0) > 0) {
 | 
	
		
			
				|  |  | -      send_response.set_message(std::string(state.range(0), 'a'));
 | 
	
		
			
				|  |  | +    EchoRequest send_request;
 | 
	
		
			
				|  |  | +    EchoRequest recv_request;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if (msg_size > 0) {
 | 
	
		
			
				|  |  | +      send_request.set_message(std::string(msg_size, 'a'));
 | 
	
		
			
				|  |  | +      send_response.set_message(std::string(msg_size, 'b'));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    Status recv_status;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    std::unique_ptr<EchoTestService::Stub> stub(
 | 
	
		
			
				|  |  | +        EchoTestService::NewStub(fixture->channel()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      ServerContext svr_ctx;
 | 
	
		
			
				|  |  | +    ServerContextMutator svr_ctx_mut(&svr_ctx);
 | 
	
		
			
				|  |  |      ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
 | 
	
		
			
				|  |  |      service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
 | 
	
		
			
				|  |  |                                fixture->cq(), tag(0));
 | 
	
		
			
				|  |  | -    std::unique_ptr<EchoTestService::Stub> stub(
 | 
	
		
			
				|  |  | -        EchoTestService::NewStub(fixture->channel()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      ClientContext cli_ctx;
 | 
	
		
			
				|  |  | +    ClientContextMutator cli_ctx_mut(&cli_ctx);
 | 
	
		
			
				|  |  |      auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
 | 
	
		
			
				|  |  | -    int need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Establish async stream between client side and server side
 | 
	
		
			
				|  |  |      void* t;
 | 
	
		
			
				|  |  |      bool ok;
 | 
	
		
			
				|  |  | +    int need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  |      while (need_tags) {
 | 
	
		
			
				|  |  |        GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  |        GPR_ASSERT(ok);
 | 
	
	
		
			
				|  | @@ -141,54 +199,79 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) {
 | 
	
		
			
				|  |  |        GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  |        need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    request_rw->Read(&recv_response, tag(0));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      while (state.KeepRunning()) {
 | 
	
		
			
				|  |  |        GPR_TIMER_SCOPE("BenchmarkCycle", 0);
 | 
	
		
			
				|  |  | -      response_rw.Write(send_response, tag(1));
 | 
	
		
			
				|  |  | -      while (true) {
 | 
	
		
			
				|  |  | +      request_rw->Write(send_request, tag(0));   // Start client send
 | 
	
		
			
				|  |  | +      response_rw.Read(&recv_request, tag(1));   // Start server recv
 | 
	
		
			
				|  |  | +      request_rw->Read(&recv_response, tag(2));  // Start client recv
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
 | 
	
		
			
				|  |  | +      while (need_tags) {
 | 
	
		
			
				|  |  |          GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  | -        if (t == tag(0)) {
 | 
	
		
			
				|  |  | -          request_rw->Read(&recv_response, tag(0));
 | 
	
		
			
				|  |  | -        } else if (t == tag(1)) {
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          GPR_ASSERT(false);
 | 
	
		
			
				|  |  | +        GPR_ASSERT(ok);
 | 
	
		
			
				|  |  | +        int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // If server recv is complete, start the server send operation
 | 
	
		
			
				|  |  | +        if (i == 1) {
 | 
	
		
			
				|  |  | +          response_rw.Write(send_response, tag(3));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  | +        need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    request_rw->WritesDone(tag(0));
 | 
	
		
			
				|  |  |      response_rw.Finish(Status::OK, tag(1));
 | 
	
		
			
				|  |  | -    need_tags = (1 << 0) | (1 << 1);
 | 
	
		
			
				|  |  | +    Status recv_status;
 | 
	
		
			
				|  |  | +    request_rw->Finish(&recv_status, tag(2));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    need_tags = (1 << 0) | (1 << 1) | (1 << 2);
 | 
	
		
			
				|  |  |      while (need_tags) {
 | 
	
		
			
				|  |  |        GPR_ASSERT(fixture->cq()->Next(&t, &ok));
 | 
	
		
			
				|  |  |        int i = (int)(intptr_t)t;
 | 
	
		
			
				|  |  |        GPR_ASSERT(need_tags & (1 << i));
 | 
	
		
			
				|  |  |        need_tags &= ~(1 << i);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    GPR_ASSERT(recv_status.ok());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    fixture->Finish(state);
 | 
	
		
			
				|  |  |    fixture.reset();
 | 
	
		
			
				|  |  | -  state.SetBytesProcessed(state.range(0) * state.iterations());
 | 
	
		
			
				|  |  | +  state.SetBytesProcessed(msg_size * state.iterations() * 2);
 | 
	
		
			
				|  |  | +  track_counters.Finish(state);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /*******************************************************************************
 | 
	
		
			
				|  |  |   * CONFIGURATIONS
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS)
 | 
	
		
			
				|  |  | -    ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
 | 
	
		
			
				|  |  | +// Generate Args for StreamingPingPong benchmarks. Currently generates args for
 | 
	
		
			
				|  |  | +// only "small streams" (i.e streams with 0, 1 or 2 messages)
 | 
	
		
			
				|  |  | +static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) {
 | 
	
		
			
				|  |  | +  int msg_size = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  b->Args({0, 0});  // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  for (msg_size = 0; msg_size <= 128 * 1024 * 1024;
 | 
	
		
			
				|  |  | +       msg_size == 0 ? msg_size++ : msg_size *= 8) {
 | 
	
		
			
				|  |  | +    b->Args({msg_size, 1});
 | 
	
		
			
				|  |  | +    b->Args({msg_size, 2});
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +BENCHMARK_TEMPLATE(BM_StreamingPingPong, InProcessCHTTP2, NoOpMutator,
 | 
	
		
			
				|  |  | +                   NoOpMutator)
 | 
	
		
			
				|  |  | +    ->Apply(StreamingPingPongArgs);
 | 
	
		
			
				|  |  | +BENCHMARK_TEMPLATE(BM_StreamingPingPong, TCP, NoOpMutator, NoOpMutator)
 | 
	
		
			
				|  |  | +    ->Apply(StreamingPingPongArgs);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator,
 | 
	
		
			
				|  |  | +                   NoOpMutator)
 | 
	
		
			
				|  |  |      ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  | -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
 | 
	
		
			
				|  |  | +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
 | 
	
		
			
				|  |  |      ->Range(0, 128 * 1024 * 1024);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  }  // namespace testing
 |