|  | @@ -26,12 +26,15 @@
 | 
	
		
			
				|  |  |  #include <thread>
 | 
	
		
			
				|  |  |  #include <vector>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "absl/strings/str_split.h"
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  #include <gflags/gflags.h>
 | 
	
		
			
				|  |  |  #include <grpcpp/grpcpp.h>
 | 
	
		
			
				|  |  |  #include <grpcpp/server.h>
 | 
	
		
			
				|  |  |  #include <grpcpp/server_builder.h>
 | 
	
		
			
				|  |  |  #include <grpcpp/server_context.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "src/core/lib/gpr/env.h"
 | 
	
		
			
				|  |  |  #include "src/proto/grpc/testing/empty.pb.h"
 | 
	
		
			
				|  |  |  #include "src/proto/grpc/testing/messages.pb.h"
 | 
	
		
			
				|  |  |  #include "src/proto/grpc/testing/test.grpc.pb.h"
 | 
	
	
		
			
				|  | @@ -46,6 +49,8 @@ DEFINE_int32(rpc_timeout_sec, 30, "Per RPC timeout seconds.");
 | 
	
		
			
				|  |  |  DEFINE_string(server, "localhost:50051", "Address of server.");
 | 
	
		
			
				|  |  |  DEFINE_int32(stats_port, 50052,
 | 
	
		
			
				|  |  |               "Port to expose peer distribution stats service.");
 | 
	
		
			
				|  |  | +DEFINE_string(rpc, "UnaryCall", "a comma separated list of rpc methods.");
 | 
	
		
			
				|  |  | +DEFINE_string(metadata, "", "metadata to send with the RPC.");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  using grpc::Channel;
 | 
	
		
			
				|  |  |  using grpc::ClientAsyncResponseReader;
 | 
	
	
		
			
				|  | @@ -59,6 +64,7 @@ using grpc::ServerReader;
 | 
	
		
			
				|  |  |  using grpc::ServerReaderWriter;
 | 
	
		
			
				|  |  |  using grpc::ServerWriter;
 | 
	
		
			
				|  |  |  using grpc::Status;
 | 
	
		
			
				|  |  | +using grpc::testing::Empty;
 | 
	
		
			
				|  |  |  using grpc::testing::LoadBalancerStatsRequest;
 | 
	
		
			
				|  |  |  using grpc::testing::LoadBalancerStatsResponse;
 | 
	
		
			
				|  |  |  using grpc::testing::LoadBalancerStatsService;
 | 
	
	
		
			
				|  | @@ -81,7 +87,8 @@ class XdsStatsWatcher {
 | 
	
		
			
				|  |  |    XdsStatsWatcher(int start_id, int end_id)
 | 
	
		
			
				|  |  |        : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  void RpcCompleted(int request_id, const std::string& peer) {
 | 
	
		
			
				|  |  | +  void RpcCompleted(int request_id, const std::string& rpc_method,
 | 
	
		
			
				|  |  | +                    const std::string& peer) {
 | 
	
		
			
				|  |  |      if (start_id_ <= request_id && request_id < end_id_) {
 | 
	
		
			
				|  |  |        {
 | 
	
		
			
				|  |  |          std::lock_guard<std::mutex> lk(m_);
 | 
	
	
		
			
				|  | @@ -89,6 +96,7 @@ class XdsStatsWatcher {
 | 
	
		
			
				|  |  |            no_remote_peer_++;
 | 
	
		
			
				|  |  |          } else {
 | 
	
		
			
				|  |  |            rpcs_by_peer_[peer]++;
 | 
	
		
			
				|  |  | +          rpcs_by_method_[rpc_method][peer]++;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          rpcs_needed_--;
 | 
	
		
			
				|  |  |        }
 | 
	
	
		
			
				|  | @@ -104,6 +112,17 @@ class XdsStatsWatcher {
 | 
	
		
			
				|  |  |                     [this] { return rpcs_needed_ == 0; });
 | 
	
		
			
				|  |  |        response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
 | 
	
		
			
				|  |  |                                                 rpcs_by_peer_.end());
 | 
	
		
			
				|  |  | +      auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
 | 
	
		
			
				|  |  | +      for (const auto& rpc_by_method : rpcs_by_method_) {
 | 
	
		
			
				|  |  | +        auto& response_rpc_by_method =
 | 
	
		
			
				|  |  | +            response_rpcs_by_method[rpc_by_method.first];
 | 
	
		
			
				|  |  | +        auto& response_rpcs_by_peer =
 | 
	
		
			
				|  |  | +            *response_rpc_by_method.mutable_rpcs_by_peer();
 | 
	
		
			
				|  |  | +        for (const auto& rpc_by_peer : rpc_by_method.second) {
 | 
	
		
			
				|  |  | +          auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
 | 
	
		
			
				|  |  | +          response_rpc_by_peer = rpc_by_peer.second;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |        response->set_num_failures(no_remote_peer_ + rpcs_needed_);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -113,7 +132,11 @@ class XdsStatsWatcher {
 | 
	
		
			
				|  |  |    int end_id_;
 | 
	
		
			
				|  |  |    int rpcs_needed_;
 | 
	
		
			
				|  |  |    int no_remote_peer_ = 0;
 | 
	
		
			
				|  |  | +  // A map of stats keyed by peer name.
 | 
	
		
			
				|  |  |    std::map<std::string, int> rpcs_by_peer_;
 | 
	
		
			
				|  |  | +  // A two-level map of stats keyed at top level by RPC method and second level
 | 
	
		
			
				|  |  | +  // by peer name.
 | 
	
		
			
				|  |  | +  std::map<std::string, std::map<std::string, int>> rpcs_by_method_;
 | 
	
		
			
				|  |  |    std::mutex m_;
 | 
	
		
			
				|  |  |    std::condition_variable cv_;
 | 
	
		
			
				|  |  |  };
 | 
	
	
		
			
				|  | @@ -123,7 +146,8 @@ class TestClient {
 | 
	
		
			
				|  |  |    TestClient(const std::shared_ptr<Channel>& channel)
 | 
	
		
			
				|  |  |        : stub_(TestService::NewStub(channel)) {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  void AsyncUnaryCall() {
 | 
	
		
			
				|  |  | +  void AsyncUnaryCall(
 | 
	
		
			
				|  |  | +      std::vector<std::pair<std::string, std::string>> metadata) {
 | 
	
		
			
				|  |  |      SimpleResponse response;
 | 
	
		
			
				|  |  |      int saved_request_id;
 | 
	
		
			
				|  |  |      {
 | 
	
	
		
			
				|  | @@ -135,11 +159,41 @@ class TestClient {
 | 
	
		
			
				|  |  |          std::chrono::seconds(FLAGS_rpc_timeout_sec);
 | 
	
		
			
				|  |  |      AsyncClientCall* call = new AsyncClientCall;
 | 
	
		
			
				|  |  |      call->context.set_deadline(deadline);
 | 
	
		
			
				|  |  | +    for (const auto& data : metadata) {
 | 
	
		
			
				|  |  | +      call->context.AddMetadata(data.first, data.second);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      call->saved_request_id = saved_request_id;
 | 
	
		
			
				|  |  | -    call->response_reader = stub_->PrepareAsyncUnaryCall(
 | 
	
		
			
				|  |  | +    call->rpc_method = "UnaryCall";
 | 
	
		
			
				|  |  | +    call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
 | 
	
		
			
				|  |  |          &call->context, SimpleRequest::default_instance(), &cq_);
 | 
	
		
			
				|  |  | -    call->response_reader->StartCall();
 | 
	
		
			
				|  |  | -    call->response_reader->Finish(&call->response, &call->status, (void*)call);
 | 
	
		
			
				|  |  | +    call->simple_response_reader->StartCall();
 | 
	
		
			
				|  |  | +    call->simple_response_reader->Finish(&call->simple_response, &call->status,
 | 
	
		
			
				|  |  | +                                         (void*)call);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void AsyncEmptyCall(
 | 
	
		
			
				|  |  | +      std::vector<std::pair<std::string, std::string>> metadata) {
 | 
	
		
			
				|  |  | +    Empty response;
 | 
	
		
			
				|  |  | +    int saved_request_id;
 | 
	
		
			
				|  |  | +    {
 | 
	
		
			
				|  |  | +      std::lock_guard<std::mutex> lk(mu);
 | 
	
		
			
				|  |  | +      saved_request_id = ++global_request_id;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    std::chrono::system_clock::time_point deadline =
 | 
	
		
			
				|  |  | +        std::chrono::system_clock::now() +
 | 
	
		
			
				|  |  | +        std::chrono::seconds(FLAGS_rpc_timeout_sec);
 | 
	
		
			
				|  |  | +    AsyncClientCall* call = new AsyncClientCall;
 | 
	
		
			
				|  |  | +    call->context.set_deadline(deadline);
 | 
	
		
			
				|  |  | +    for (const auto& data : metadata) {
 | 
	
		
			
				|  |  | +      call->context.AddMetadata(data.first, data.second);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    call->saved_request_id = saved_request_id;
 | 
	
		
			
				|  |  | +    call->rpc_method = "EmptyCall";
 | 
	
		
			
				|  |  | +    call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
 | 
	
		
			
				|  |  | +        &call->context, Empty::default_instance(), &cq_);
 | 
	
		
			
				|  |  | +    call->empty_response_reader->StartCall();
 | 
	
		
			
				|  |  | +    call->empty_response_reader->Finish(&call->empty_response, &call->status,
 | 
	
		
			
				|  |  | +                                        (void*)call);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    void AsyncCompleteRpc() {
 | 
	
	
		
			
				|  | @@ -150,9 +204,17 @@ class TestClient {
 | 
	
		
			
				|  |  |        GPR_ASSERT(ok);
 | 
	
		
			
				|  |  |        {
 | 
	
		
			
				|  |  |          std::lock_guard<std::mutex> lk(mu);
 | 
	
		
			
				|  |  | +        auto server_initial_metadata = call->context.GetServerInitialMetadata();
 | 
	
		
			
				|  |  | +        auto metadata_hostname =
 | 
	
		
			
				|  |  | +            call->context.GetServerInitialMetadata().find("hostname");
 | 
	
		
			
				|  |  | +        std::string hostname =
 | 
	
		
			
				|  |  | +            metadata_hostname != call->context.GetServerInitialMetadata().end()
 | 
	
		
			
				|  |  | +                ? std::string(metadata_hostname->second.data(),
 | 
	
		
			
				|  |  | +                              metadata_hostname->second.length())
 | 
	
		
			
				|  |  | +                : call->simple_response.hostname();
 | 
	
		
			
				|  |  |          for (auto watcher : watchers) {
 | 
	
		
			
				|  |  | -          watcher->RpcCompleted(call->saved_request_id,
 | 
	
		
			
				|  |  | -                                call->response.hostname());
 | 
	
		
			
				|  |  | +          watcher->RpcCompleted(call->saved_request_id, call->rpc_method,
 | 
	
		
			
				|  |  | +                                std::move(hostname));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -166,9 +228,16 @@ class TestClient {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  |          if (FLAGS_print_response) {
 | 
	
		
			
				|  |  | -          std::cout << "Greeting: Hello world, this is "
 | 
	
		
			
				|  |  | -                    << call->response.hostname() << ", from "
 | 
	
		
			
				|  |  | -                    << call->context.peer() << std::endl;
 | 
	
		
			
				|  |  | +          auto metadata_hostname =
 | 
	
		
			
				|  |  | +              call->context.GetServerInitialMetadata().find("hostname");
 | 
	
		
			
				|  |  | +          std::string hostname =
 | 
	
		
			
				|  |  | +              metadata_hostname !=
 | 
	
		
			
				|  |  | +                      call->context.GetServerInitialMetadata().end()
 | 
	
		
			
				|  |  | +                  ? std::string(metadata_hostname->second.data(),
 | 
	
		
			
				|  |  | +                                metadata_hostname->second.length())
 | 
	
		
			
				|  |  | +                  : call->simple_response.hostname();
 | 
	
		
			
				|  |  | +          std::cout << "Greeting: Hello world, this is " << hostname
 | 
	
		
			
				|  |  | +                    << ", from " << call->context.peer() << std::endl;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -178,11 +247,15 @@ class TestClient {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  |    struct AsyncClientCall {
 | 
	
		
			
				|  |  | -    SimpleResponse response;
 | 
	
		
			
				|  |  | +    Empty empty_response;
 | 
	
		
			
				|  |  | +    SimpleResponse simple_response;
 | 
	
		
			
				|  |  |      ClientContext context;
 | 
	
		
			
				|  |  |      Status status;
 | 
	
		
			
				|  |  |      int saved_request_id;
 | 
	
		
			
				|  |  | -    std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>> response_reader;
 | 
	
		
			
				|  |  | +    std::string rpc_method;
 | 
	
		
			
				|  |  | +    std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
 | 
	
		
			
				|  |  | +    std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
 | 
	
		
			
				|  |  | +        simple_response_reader;
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    std::unique_ptr<TestService::Stub> stub_;
 | 
	
	
		
			
				|  | @@ -214,10 +287,25 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void RunTestLoop(const std::string& server,
 | 
	
		
			
				|  |  | -                 std::chrono::duration<double> duration_per_query) {
 | 
	
		
			
				|  |  | +void RunTestLoop(std::chrono::duration<double> duration_per_query) {
 | 
	
		
			
				|  |  | +  std::vector<absl::string_view> rpc_methods = absl::StrSplit(FLAGS_rpc, ',');
 | 
	
		
			
				|  |  | +  // Store Metadata like
 | 
	
		
			
				|  |  | +  // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
 | 
	
		
			
				|  |  | +  // map where the key is the RPC method and value is a vector of key:value
 | 
	
		
			
				|  |  | +  // pairs. {EmptyCall, [{key1,value1}],
 | 
	
		
			
				|  |  | +  //  UnaryCall, [{key1,value1}, {key2,value2}]}
 | 
	
		
			
				|  |  | +  std::vector<absl::string_view> rpc_metadata =
 | 
	
		
			
				|  |  | +      absl::StrSplit(FLAGS_metadata, ',');
 | 
	
		
			
				|  |  | +  std::map<std::string, std::vector<std::pair<std::string, std::string>>>
 | 
	
		
			
				|  |  | +      metadata_map;
 | 
	
		
			
				|  |  | +  for (auto& data : rpc_metadata) {
 | 
	
		
			
				|  |  | +    std::vector<absl::string_view> metadata = absl::StrSplit(data, ':');
 | 
	
		
			
				|  |  | +    GPR_ASSERT(metadata.size() == 3);
 | 
	
		
			
				|  |  | +    metadata_map[std::string(metadata[0])].push_back(
 | 
	
		
			
				|  |  | +        {std::string(metadata[1]), std::string(metadata[2])});
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    TestClient client(
 | 
	
		
			
				|  |  | -      grpc::CreateChannel(server, grpc::InsecureChannelCredentials()));
 | 
	
		
			
				|  |  | +      grpc::CreateChannel(FLAGS_server, grpc::InsecureChannelCredentials()));
 | 
	
		
			
				|  |  |    std::chrono::time_point<std::chrono::system_clock> start =
 | 
	
		
			
				|  |  |        std::chrono::system_clock::now();
 | 
	
		
			
				|  |  |    std::chrono::duration<double> elapsed;
 | 
	
	
		
			
				|  | @@ -225,10 +313,23 @@ void RunTestLoop(const std::string& server,
 | 
	
		
			
				|  |  |    std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    while (true) {
 | 
	
		
			
				|  |  | -    elapsed = std::chrono::system_clock::now() - start;
 | 
	
		
			
				|  |  | -    if (elapsed > duration_per_query) {
 | 
	
		
			
				|  |  | -      start = std::chrono::system_clock::now();
 | 
	
		
			
				|  |  | -      client.AsyncUnaryCall();
 | 
	
		
			
				|  |  | +    for (const absl::string_view& rpc_method : rpc_methods) {
 | 
	
		
			
				|  |  | +      elapsed = std::chrono::system_clock::now() - start;
 | 
	
		
			
				|  |  | +      if (elapsed > duration_per_query) {
 | 
	
		
			
				|  |  | +        start = std::chrono::system_clock::now();
 | 
	
		
			
				|  |  | +        auto metadata_iter = metadata_map.find(std::string(rpc_method));
 | 
	
		
			
				|  |  | +        if (rpc_method == "EmptyCall") {
 | 
	
		
			
				|  |  | +          client.AsyncEmptyCall(
 | 
	
		
			
				|  |  | +              metadata_iter != metadata_map.end()
 | 
	
		
			
				|  |  | +                  ? metadata_iter->second
 | 
	
		
			
				|  |  | +                  : std::vector<std::pair<std::string, std::string>>());
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +          client.AsyncUnaryCall(
 | 
	
		
			
				|  |  | +              metadata_iter != metadata_map.end()
 | 
	
		
			
				|  |  | +                  ? metadata_iter->second
 | 
	
		
			
				|  |  | +                  : std::vector<std::pair<std::string, std::string>>());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    thread.join();
 | 
	
	
		
			
				|  | @@ -255,6 +356,7 @@ void RunServer(const int port) {
 | 
	
		
			
				|  |  |  int main(int argc, char** argv) {
 | 
	
		
			
				|  |  |    grpc::testing::TestEnvironment env(argc, argv);
 | 
	
		
			
				|  |  |    grpc::testing::InitTest(&argc, &argv, true);
 | 
	
		
			
				|  |  | +  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ROUTING", "true");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    std::chrono::duration<double> duration_per_query =
 | 
	
		
			
				|  |  |        std::chrono::nanoseconds(std::chrono::seconds(1)) / FLAGS_qps;
 | 
	
	
		
			
				|  | @@ -263,8 +365,7 @@ int main(int argc, char** argv) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    test_threads.reserve(FLAGS_num_channels);
 | 
	
		
			
				|  |  |    for (int i = 0; i < FLAGS_num_channels; i++) {
 | 
	
		
			
				|  |  | -    test_threads.emplace_back(
 | 
	
		
			
				|  |  | -        std::thread(&RunTestLoop, FLAGS_server, duration_per_query));
 | 
	
		
			
				|  |  | +    test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    RunServer(FLAGS_stats_port);
 |