Эх сурвалжийг харах

Merge branch 'master' into minor-timer-fix

Sree Kuchibhotla 7 жил өмнө
parent
commit
a5dc80be7b
28 өөрчлөгдсөн 426 нэмэгдсэн , 69 устгасан
  1. 19 0
      doc/interop-test-descriptions.md
  2. 1 1
      examples/csharp/route_guide/RouteGuide/route_guide_db.json
  3. 2 0
      grpc.def
  4. 23 0
      include/grpc/grpc.h
  5. 1 1
      include/grpcpp/impl/codegen/completion_queue.h
  6. 3 2
      src/android/test/interop/app/src/main/cpp/grpc-interop.cc
  7. 2 2
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  8. 24 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  9. 3 23
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  10. 57 2
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  11. 13 0
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  12. 21 0
      src/core/lib/channel/channelz_registry.cc
  13. 5 1
      src/core/lib/iomgr/lockfree_event.cc
  14. 1 1
      src/csharp/doc/docfx.json
  15. 4 0
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  16. 6 0
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  17. 10 0
      test/core/channel/channelz_test.cc
  18. 2 0
      test/core/surface/public_headers_must_be_c89.c
  19. 14 2
      test/cpp/interop/client.cc
  20. 44 20
      test/cpp/interop/interop_client.cc
  21. 16 4
      test/cpp/interop/interop_client.h
  22. 3 3
      test/cpp/interop/stress_interop_client.cc
  23. 2 2
      test/cpp/interop/stress_interop_client.h
  24. 8 3
      test/cpp/interop/stress_test.cc
  25. 5 0
      test/cpp/util/channel_trace_proto_helper.cc
  26. 1 0
      test/cpp/util/channel_trace_proto_helper.h
  27. 26 0
      tools/internal_ci/linux/grpc_publish_packages.cfg
  28. 110 0
      tools/internal_ci/linux/grpc_publish_packages.sh

+ 19 - 0
doc/interop-test-descriptions.md

@@ -899,6 +899,25 @@ Status: TODO
 This test verifies that a client sending faster than a server can drain sees
 This test verifies that a client sending faster than a server can drain sees
 pushback (i.e., attempts to send succeed only after appropriate delays).
 pushback (i.e., attempts to send succeed only after appropriate delays).
 
 
+### Experimental Tests
+
+These tests are not yet standardized, and are not yet implemented in all
+languages. Therefore they are not part of our interop matrix.
+
+#### rpc_soak
+
+The client performs many large_unary RPCs in sequence over the same channel. 
+The number of RPCs is configured by the experimental flag, `soak_iterations`.
+
+#### channel_soak
+
+The client performs many large_unary RPCs in sequence. Before each RPC, it 
+tears down and rebuilds the channel. The number of RPCs is configured by 
+the experimental flag, `soak_iterations`.
+
+This tests puts stress on several gRPC components; the resolver, the load 
+balancer, and the RPC hotpath.
+
 ### TODO Tests
 ### TODO Tests
 
 
 #### High priority:
 #### High priority:

+ 1 - 1
examples/csharp/route_guide/RouteGuide/route_guide_db.json

@@ -1,4 +1,4 @@
-[{
+[{
     "location": {
     "location": {
         "latitude": 407838351,
         "latitude": 407838351,
         "longitude": -746143763
         "longitude": -746143763

+ 2 - 0
grpc.def

@@ -69,6 +69,8 @@ EXPORTS
     grpc_resource_quota_unref
     grpc_resource_quota_unref
     grpc_resource_quota_resize
     grpc_resource_quota_resize
     grpc_resource_quota_arg_vtable
     grpc_resource_quota_arg_vtable
+    grpc_channelz_get_top_channels
+    grpc_channelz_get_channel
     grpc_insecure_channel_create_from_fd
     grpc_insecure_channel_create_from_fd
     grpc_server_add_insecure_channel_from_fd
     grpc_server_add_insecure_channel_from_fd
     grpc_use_signal
     grpc_use_signal

+ 23 - 0
include/grpc/grpc.h

@@ -454,6 +454,29 @@ GRPCAPI void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
  */
  */
 GRPCAPI const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void);
 GRPCAPI const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void);
 
 
+/************* CHANNELZ API *************/
+/** Channelz is under active development. The following APIs will see some
+    churn as the feature is implemented. This comment will be removed once
+    channelz is officially supported, and these APIs become stable. For now
+    you may track the progress by following this github issue:
+    https://github.com/grpc/grpc/issues/15340
+
+    the following APIs return allocated JSON strings that match the response
+    objects from the channelz proto, found here:
+    https://github.com/grpc/grpc/blob/master/src/proto/grpc/channelz/channelz.proto.
+
+    For easy conversion to protobuf, The JSON is formatted according to:
+    https://developers.google.com/protocol-buffers/docs/proto3#json. */
+
+/* Gets all root channels (i.e. channels the application has directly
+   created). This does not include subchannels nor non-top level channels.
+   The returned string is allocated and must be freed by the application. */
+GRPCAPI char* grpc_channelz_get_top_channels(intptr_t start_channel_id);
+
+/* Returns a single Channel, or else a NOT_FOUND code. The returned string
+   is allocated and must be freed by the application. */
+GRPCAPI char* grpc_channelz_get_channel(intptr_t channel_id);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 1 - 1
include/grpcpp/impl/codegen/completion_queue.h

@@ -367,7 +367,7 @@ class ServerCompletionQueue : public CompletionQueue {
 
 
  protected:
  protected:
   /// Default constructor
   /// Default constructor
-  ServerCompletionQueue() {}
+  ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {}
 
 
  private:
  private:
   /// \param is_frequently_polled Informs the GRPC library about whether the
   /// \param is_frequently_polled Informs the GRPC library about whether the

+ 3 - 2
src/android/test/interop/app/src/main/cpp/grpc-interop.cc

@@ -45,9 +45,10 @@ std::shared_ptr<grpc::testing::InteropClient> GetClient(const char* host,
     credentials = grpc::InsecureChannelCredentials();
     credentials = grpc::InsecureChannelCredentials();
   }
   }
 
 
+  grpc::testing::ChannelCreationFunc channel_creation_func = 
+      std::bind(grpc::CreateChannel, host_port, credentials);
   return std::shared_ptr<grpc::testing::InteropClient>(
   return std::shared_ptr<grpc::testing::InteropClient>(
-      new grpc::testing::InteropClient(
-          grpc::CreateChannel(host_port, credentials), true, false));
+      new grpc::testing::InteropClient(channel_creation_func, true, false));
 }
 }
 
 
 extern "C" JNIEXPORT jboolean JNICALL
 extern "C" JNIEXPORT jboolean JNICALL

+ 2 - 2
src/core/ext/filters/client_channel/client_channel_channelz.cc

@@ -85,12 +85,12 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
     grpc_json* array_parent = grpc_json_create_child(
     grpc_json* array_parent = grpc_json_create_child(
         nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
         nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
     json_iterator = nullptr;
     json_iterator = nullptr;
-    for (size_t i = 0; i < child_subchannels.size(); ++i) {
+    for (size_t i = 0; i < child_channels.size(); ++i) {
       json_iterator =
       json_iterator =
           grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
           grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
                                  GRPC_JSON_OBJECT, false);
                                  GRPC_JSON_OBJECT, false);
       grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
       grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
-                                        child_subchannels[i]);
+                                        child_channels[i]);
     }
     }
   }
   }
 }
 }

+ 24 - 2
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@@ -135,9 +135,8 @@ class GrpcLb : public LoadBalancingPolicy {
   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
   void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
   void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
   void ExitIdleLocked() override;
   void ExitIdleLocked() override;
-  // TODO(ncteisen): implement this in a follow up PR
   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
-                                ChildRefsList* child_channels) override {}
+                                ChildRefsList* child_channels) override;
 
 
  private:
  private:
   /// Linked list of pending pick requests. It stores all information needed to
   /// Linked list of pending pick requests. It stores all information needed to
@@ -301,6 +300,9 @@ class GrpcLb : public LoadBalancingPolicy {
 
 
   // The channel for communicating with the LB server.
   // The channel for communicating with the LB server.
   grpc_channel* lb_channel_ = nullptr;
   grpc_channel* lb_channel_ = nullptr;
+  // Mutex to protect the channel to the LB server. This is used when
+  // processing a channelz request.
+  gpr_mu lb_channel_mu_;
   grpc_connectivity_state lb_channel_connectivity_;
   grpc_connectivity_state lb_channel_connectivity_;
   grpc_closure lb_channel_on_connectivity_changed_;
   grpc_closure lb_channel_on_connectivity_changed_;
   // Are we already watching the LB channel's connectivity?
   // Are we already watching the LB channel's connectivity?
@@ -1040,6 +1042,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
                                1000)) {
                                1000)) {
   // Initialization.
   // Initialization.
+  gpr_mu_init(&lb_channel_mu_);
   grpc_subchannel_index_ref();
   grpc_subchannel_index_ref();
   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
@@ -1078,6 +1081,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
 GrpcLb::~GrpcLb() {
 GrpcLb::~GrpcLb() {
   GPR_ASSERT(pending_picks_ == nullptr);
   GPR_ASSERT(pending_picks_ == nullptr);
   GPR_ASSERT(pending_pings_ == nullptr);
   GPR_ASSERT(pending_pings_ == nullptr);
+  gpr_mu_destroy(&lb_channel_mu_);
   gpr_free((void*)server_name_);
   gpr_free((void*)server_name_);
   grpc_channel_args_destroy(args_);
   grpc_channel_args_destroy(args_);
   grpc_connectivity_state_destroy(&state_tracker_);
   grpc_connectivity_state_destroy(&state_tracker_);
@@ -1107,8 +1111,10 @@ void GrpcLb::ShutdownLocked() {
   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
   // alive when that callback is invoked.
   // alive when that callback is invoked.
   if (lb_channel_ != nullptr) {
   if (lb_channel_ != nullptr) {
+    gpr_mu_lock(&lb_channel_mu_);
     grpc_channel_destroy(lb_channel_);
     grpc_channel_destroy(lb_channel_);
     lb_channel_ = nullptr;
     lb_channel_ = nullptr;
+    gpr_mu_unlock(&lb_channel_mu_);
   }
   }
   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
                               GRPC_ERROR_REF(error), "grpclb_shutdown");
                               GRPC_ERROR_REF(error), "grpclb_shutdown");
@@ -1279,6 +1285,20 @@ void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
   }
   }
 }
 }
 
 
+void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
+                                      ChildRefsList* child_channels) {
+  // delegate to the RoundRobin to fill the children subchannels.
+  rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
+  mu_guard guard(&lb_channel_mu_);
+  if (lb_channel_ != nullptr) {
+    grpc_core::channelz::ChannelNode* channel_node =
+        grpc_channel_get_channelz_node(lb_channel_);
+    if (channel_node != nullptr) {
+      child_channels->push_back(channel_node->channel_uuid());
+    }
+  }
+}
+
 grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
 grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
     grpc_error** connectivity_error) {
     grpc_error** connectivity_error) {
   return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
   return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
@@ -1322,9 +1342,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
   if (lb_channel_ == nullptr) {
   if (lb_channel_ == nullptr) {
     char* uri_str;
     char* uri_str;
     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+    gpr_mu_lock(&lb_channel_mu_);
     lb_channel_ = grpc_client_channel_factory_create_channel(
     lb_channel_ = grpc_client_channel_factory_create_channel(
         client_channel_factory(), uri_str,
         client_channel_factory(), uri_str,
         GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
         GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
+    gpr_mu_unlock(&lb_channel_mu_);
     GPR_ASSERT(lb_channel_ != nullptr);
     GPR_ASSERT(lb_channel_ != nullptr);
     gpr_free(uri_str);
     gpr_free(uri_str);
   }
   }

+ 3 - 23
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc

@@ -181,7 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
 }
 }
 
 
 void PickFirst::ShutdownLocked() {
 void PickFirst::ShutdownLocked() {
-  AutoChildRefsUpdater gaurd(this);
+  AutoChildRefsUpdater guard(this);
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_pick_first_trace.enabled()) {
   if (grpc_lb_pick_first_trace.enabled()) {
     gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
     gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@@ -327,30 +327,10 @@ void PickFirst::FillChildRefsForChannelz(
 void PickFirst::UpdateChildRefsLocked() {
 void PickFirst::UpdateChildRefsLocked() {
   ChildRefsList cs;
   ChildRefsList cs;
   if (subchannel_list_ != nullptr) {
   if (subchannel_list_ != nullptr) {
-    for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
-      if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
-        grpc_core::channelz::SubchannelNode* subchannel_node =
-            grpc_subchannel_get_channelz_node(
-                subchannel_list_->subchannel(i)->subchannel());
-        if (subchannel_node != nullptr) {
-          cs.push_back(subchannel_node->subchannel_uuid());
-        }
-      }
-    }
+    subchannel_list_->PopulateChildRefsList(&cs);
   }
   }
   if (latest_pending_subchannel_list_ != nullptr) {
   if (latest_pending_subchannel_list_ != nullptr) {
-    for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
-         ++i) {
-      if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
-          nullptr) {
-        grpc_core::channelz::SubchannelNode* subchannel_node =
-            grpc_subchannel_get_channelz_node(
-                latest_pending_subchannel_list_->subchannel(i)->subchannel());
-        if (subchannel_node != nullptr) {
-          cs.push_back(subchannel_node->subchannel_uuid());
-        }
-      }
-    }
+    latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
   }
   }
   // atomically update the data that channelz will actually be looking at.
   // atomically update the data that channelz will actually be looking at.
   mu_guard guard(&child_refs_mu_);
   mu_guard guard(&child_refs_mu_);

+ 57 - 2
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc

@@ -69,9 +69,8 @@ class RoundRobin : public LoadBalancingPolicy {
   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
   void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
   void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
   void ExitIdleLocked() override;
   void ExitIdleLocked() override;
-  // TODO(ncteisen): implement this in a follow up PR
   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
-                                ChildRefsList* child_channels) override {}
+                                ChildRefsList* ignored) override;
 
 
  private:
  private:
   ~RoundRobin();
   ~RoundRobin();
@@ -183,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
     size_t last_ready_index_ = -1;  // Index into list of last pick.
     size_t last_ready_index_ = -1;  // Index into list of last pick.
   };
   };
 
 
+  // Helper class to ensure that any function that modifies the child refs
+  // data structures will update the channelz snapshot data structures before
+  // returning.
+  class AutoChildRefsUpdater {
+   public:
+    explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
+    ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
+
+   private:
+    RoundRobin* rr_;
+  };
+
   void ShutdownLocked() override;
   void ShutdownLocked() override;
 
 
   void StartPickingLocked();
   void StartPickingLocked();
   bool DoPickLocked(PickState* pick);
   bool DoPickLocked(PickState* pick);
   void DrainPendingPicksLocked();
   void DrainPendingPicksLocked();
+  void UpdateChildRefsLocked();
 
 
   /** list of subchannels */
   /** list of subchannels */
   OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
   OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@@ -205,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
   PickState* pending_picks_ = nullptr;
   PickState* pending_picks_ = nullptr;
   /** our connectivity state tracker */
   /** our connectivity state tracker */
   grpc_connectivity_state_tracker state_tracker_;
   grpc_connectivity_state_tracker state_tracker_;
+  /// Lock and data used to capture snapshots of this channel's child
+  /// channels and subchannels. This data is consumed by channelz.
+  gpr_mu child_refs_mu_;
+  ChildRefsList child_subchannels_;
+  ChildRefsList child_channels_;
 };
 };
 
 
 RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
 RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
   GPR_ASSERT(args.client_channel_factory != nullptr);
   GPR_ASSERT(args.client_channel_factory != nullptr);
+  gpr_mu_init(&child_refs_mu_);
   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
                                "round_robin");
                                "round_robin");
   UpdateLocked(*args.args);
   UpdateLocked(*args.args);
@@ -223,6 +241,7 @@ RoundRobin::~RoundRobin() {
   if (grpc_lb_round_robin_trace.enabled()) {
   if (grpc_lb_round_robin_trace.enabled()) {
     gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
     gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
   }
   }
+  gpr_mu_destroy(&child_refs_mu_);
   GPR_ASSERT(subchannel_list_ == nullptr);
   GPR_ASSERT(subchannel_list_ == nullptr);
   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
   GPR_ASSERT(pending_picks_ == nullptr);
   GPR_ASSERT(pending_picks_ == nullptr);
@@ -242,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
 }
 }
 
 
 void RoundRobin::ShutdownLocked() {
 void RoundRobin::ShutdownLocked() {
+  AutoChildRefsUpdater guard(this);
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_round_robin_trace.enabled()) {
   if (grpc_lb_round_robin_trace.enabled()) {
     gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
     gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@@ -365,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
   return false;
   return false;
 }
 }
 
 
+void RoundRobin::FillChildRefsForChannelz(
+    ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
+  mu_guard guard(&child_refs_mu_);
+  for (size_t i = 0; i < child_subchannels_.size(); ++i) {
+    // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
+    // have to implement lightweight set. For now, we don't care about
+    // performance when channelz requests are made.
+    bool found = false;
+    for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
+      if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
+        found = true;
+        break;
+      }
+    }
+    if (!found) {
+      child_subchannels_to_fill->push_back(child_subchannels_[i]);
+    }
+  }
+}
+
+void RoundRobin::UpdateChildRefsLocked() {
+  ChildRefsList cs;
+  if (subchannel_list_ != nullptr) {
+    subchannel_list_->PopulateChildRefsList(&cs);
+  }
+  if (latest_pending_subchannel_list_ != nullptr) {
+    latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
+  }
+  // atomically update the data that channelz will actually be looking at.
+  mu_guard guard(&child_refs_mu_);
+  child_subchannels_ = std::move(cs);
+}
+
 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
   if (num_subchannels() == 0) return;
   if (num_subchannels() == 0) return;
   // Check current state of each subchannel synchronously, since any
   // Check current state of each subchannel synchronously, since any
@@ -455,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList::
 void RoundRobin::RoundRobinSubchannelList::
 void RoundRobin::RoundRobinSubchannelList::
     UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
     UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
   RoundRobin* p = static_cast<RoundRobin*>(policy());
   RoundRobin* p = static_cast<RoundRobin*>(policy());
+  AutoChildRefsUpdater guard(p);
   if (num_ready_ > 0) {
   if (num_ready_ > 0) {
     if (p->subchannel_list_.get() != this) {
     if (p->subchannel_list_.get() != this) {
       // Promote this list to p->subchannel_list_.
       // Promote this list to p->subchannel_list_.
@@ -611,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
 
 
 void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
 void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+  AutoChildRefsUpdater guard(this);
   if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
   if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
     gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
     gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
     // If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
     // If we don't have a current subchannel list, go into TRANSIENT_FAILURE.

+ 13 - 0
src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@@ -189,6 +189,19 @@ class SubchannelList
   // Returns true if the subchannel list is shutting down.
   // Returns true if the subchannel list is shutting down.
   bool shutting_down() const { return shutting_down_; }
   bool shutting_down() const { return shutting_down_; }
 
 
+  // Populates refs_list with the uuids of this SubchannelLists's subchannels.
+  void PopulateChildRefsList(ChildRefsList* refs_list) {
+    for (size_t i = 0; i < subchannels_.size(); ++i) {
+      if (subchannels_[i].subchannel() != nullptr) {
+        grpc_core::channelz::SubchannelNode* subchannel_node =
+            grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
+        if (subchannel_node != nullptr) {
+          refs_list->push_back(subchannel_node->subchannel_uuid());
+        }
+      }
+    }
+  }
+
   // Accessors.
   // Accessors.
   LoadBalancingPolicy* policy() const { return policy_; }
   LoadBalancingPolicy* policy() const { return policy_; }
   TraceFlag* tracer() const { return tracer_; }
   TraceFlag* tracer() const { return tracer_; }

+ 21 - 0
src/core/lib/channel/channelz_registry.cc

@@ -121,3 +121,24 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
 
 
 }  // namespace channelz
 }  // namespace channelz
 }  // namespace grpc_core
 }  // namespace grpc_core
+
+char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
+  return grpc_core::channelz::ChannelzRegistry::GetTopChannels(
+      start_channel_id);
+}
+
+char* grpc_channelz_get_channel(intptr_t channel_id) {
+  grpc_core::channelz::ChannelNode* channel_node =
+      grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
+  if (channel_node == nullptr) {
+    return nullptr;
+  }
+  grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+  grpc_json* json = top_level_json;
+  grpc_json* channel_json = channel_node->RenderJson();
+  channel_json->key = "channel";
+  grpc_json_link_child(json, channel_json, nullptr);
+  char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+  grpc_json_destroy(top_level_json);
+  return json_str;
+}

+ 5 - 1
src/core/lib/iomgr/lockfree_event.cc

@@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
 
 
 void LockfreeEvent::NotifyOn(grpc_closure* closure) {
 void LockfreeEvent::NotifyOn(grpc_closure* closure) {
   while (true) {
   while (true) {
-    gpr_atm curr = gpr_atm_no_barrier_load(&state_);
+    /* This load needs to be an acquire load because this can be a shutdown
+     * error that we might need to reference. Adding acquire semantics makes
+     * sure that the shutdown error has been initialized properly before us
+     * referencing it. */
+    gpr_atm curr = gpr_atm_acq_load(&state_);
     if (grpc_polling_trace.enabled()) {
     if (grpc_polling_trace.enabled()) {
       gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
       gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
               (void*)curr, closure);
               (void*)curr, closure);

+ 1 - 1
src/csharp/doc/docfx.json

@@ -24,7 +24,7 @@
         "dest": "api"
         "dest": "api"
       },
       },
       {
       {
-        "files": [ "toc.yml"],
+        "files": [ "toc.yml"]
       }
       }
     ],
     ],
     "globalMetadata": {
     "globalMetadata": {

+ 4 - 0
src/ruby/ext/grpc/rb_grpc_imports.generated.c

@@ -92,6 +92,8 @@ grpc_resource_quota_ref_type grpc_resource_quota_ref_import;
 grpc_resource_quota_unref_type grpc_resource_quota_unref_import;
 grpc_resource_quota_unref_type grpc_resource_quota_unref_import;
 grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
 grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
 grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
 grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
+grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
+grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
 grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
 grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
 grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
 grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
 grpc_use_signal_type grpc_use_signal_import;
 grpc_use_signal_type grpc_use_signal_import;
@@ -340,6 +342,8 @@ void grpc_rb_load_imports(HMODULE library) {
   grpc_resource_quota_unref_import = (grpc_resource_quota_unref_type) GetProcAddress(library, "grpc_resource_quota_unref");
   grpc_resource_quota_unref_import = (grpc_resource_quota_unref_type) GetProcAddress(library, "grpc_resource_quota_unref");
   grpc_resource_quota_resize_import = (grpc_resource_quota_resize_type) GetProcAddress(library, "grpc_resource_quota_resize");
   grpc_resource_quota_resize_import = (grpc_resource_quota_resize_type) GetProcAddress(library, "grpc_resource_quota_resize");
   grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
   grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
+  grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels");
+  grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel");
   grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
   grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
   grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
   grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
   grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");
   grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");

+ 6 - 0
src/ruby/ext/grpc/rb_grpc_imports.generated.h

@@ -251,6 +251,12 @@ extern grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
 typedef const grpc_arg_pointer_vtable*(*grpc_resource_quota_arg_vtable_type)(void);
 typedef const grpc_arg_pointer_vtable*(*grpc_resource_quota_arg_vtable_type)(void);
 extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
 extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
 #define grpc_resource_quota_arg_vtable grpc_resource_quota_arg_vtable_import
 #define grpc_resource_quota_arg_vtable grpc_resource_quota_arg_vtable_import
+typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id);
+extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
+#define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import
+typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);
+extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
+#define grpc_channelz_get_channel grpc_channelz_get_channel_import
 typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args);
 typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args);
 extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
 extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
 #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import
 #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import

+ 10 - 0
test/core/channel/channelz_test.cc

@@ -95,6 +95,11 @@ void ValidateGetTopChannels(size_t expected_channels) {
   EXPECT_EQ(end->type, GRPC_JSON_TRUE);
   EXPECT_EQ(end->type, GRPC_JSON_TRUE);
   grpc_json_destroy(parsed_json);
   grpc_json_destroy(parsed_json);
   gpr_free(json_str);
   gpr_free(json_str);
+  // also check that the core API formats this correctly
+  char* core_api_json_str = grpc_channelz_get_top_channels(0);
+  grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(
+      core_api_json_str);
+  gpr_free(core_api_json_str);
 }
 }
 
 
 class ChannelFixture {
 class ChannelFixture {
@@ -151,6 +156,11 @@ void ValidateChannel(ChannelNode* channel, validate_channel_data_args args) {
   grpc::testing::ValidateChannelProtoJsonTranslation(json_str);
   grpc::testing::ValidateChannelProtoJsonTranslation(json_str);
   ValidateCounters(json_str, args);
   ValidateCounters(json_str, args);
   gpr_free(json_str);
   gpr_free(json_str);
+  // also check that the core API formats this the correct way
+  char* core_api_json_str = grpc_channelz_get_channel(channel->channel_uuid());
+  grpc::testing::ValidateGetChannelResponseProtoJsonTranslation(
+      core_api_json_str);
+  gpr_free(core_api_json_str);
 }
 }
 
 
 grpc_millis GetLastCallStartedMillis(ChannelNode* channel) {
 grpc_millis GetLastCallStartedMillis(ChannelNode* channel) {

+ 2 - 0
test/core/surface/public_headers_must_be_c89.c

@@ -131,6 +131,8 @@ int main(int argc, char **argv) {
   printf("%lx", (unsigned long) grpc_resource_quota_unref);
   printf("%lx", (unsigned long) grpc_resource_quota_unref);
   printf("%lx", (unsigned long) grpc_resource_quota_resize);
   printf("%lx", (unsigned long) grpc_resource_quota_resize);
   printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable);
   printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable);
+  printf("%lx", (unsigned long) grpc_channelz_get_top_channels);
+  printf("%lx", (unsigned long) grpc_channelz_get_channel);
   printf("%lx", (unsigned long) grpc_auth_property_iterator_next);
   printf("%lx", (unsigned long) grpc_auth_property_iterator_next);
   printf("%lx", (unsigned long) grpc_auth_context_property_iterator);
   printf("%lx", (unsigned long) grpc_auth_context_property_iterator);
   printf("%lx", (unsigned long) grpc_auth_context_peer_identity);
   printf("%lx", (unsigned long) grpc_auth_context_peer_identity);

+ 14 - 2
test/cpp/interop/client.cc

@@ -46,6 +46,7 @@ DEFINE_string(
     "all : all test cases;\n"
     "all : all test cases;\n"
     "cancel_after_begin : cancel stream after starting it;\n"
     "cancel_after_begin : cancel stream after starting it;\n"
     "cancel_after_first_response: cancel on first response;\n"
     "cancel_after_first_response: cancel on first response;\n"
+    "channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each time;\n"
     "client_compressed_streaming : compressed request streaming with "
     "client_compressed_streaming : compressed request streaming with "
     "client_compressed_unary : single compressed request;\n"
     "client_compressed_unary : single compressed request;\n"
     "client_streaming : request streaming with single response;\n"
     "client_streaming : request streaming with single response;\n"
@@ -60,6 +61,7 @@ DEFINE_string(
     "per_rpc_creds: raw oauth2 access token on a single rpc;\n"
     "per_rpc_creds: raw oauth2 access token on a single rpc;\n"
     "ping_pong : full-duplex streaming;\n"
     "ping_pong : full-duplex streaming;\n"
     "response streaming;\n"
     "response streaming;\n"
+    "rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
     "server_compressed_streaming : single request with compressed "
     "server_compressed_streaming : single request with compressed "
     "server_compressed_unary : single compressed response;\n"
     "server_compressed_unary : single compressed response;\n"
     "server_streaming : single request with response streaming;\n"
     "server_streaming : single request with response streaming;\n"
@@ -83,6 +85,10 @@ DEFINE_bool(do_not_abort_on_transient_failures, false,
             "test is retried in case of transient failures (and currently the "
             "test is retried in case of transient failures (and currently the "
             "interop tests are not retried even if this flag is set to true)");
             "interop tests are not retried even if this flag is set to true)");
 
 
+DEFINE_int32(soak_iterations, 1000,
+             "number of iterations to use for the two soak tests; rpc_soak and "
+             "channel_soak");
+
 using grpc::testing::CreateChannelForTestCase;
 using grpc::testing::CreateChannelForTestCase;
 using grpc::testing::GetServiceAccountJsonKey;
 using grpc::testing::GetServiceAccountJsonKey;
 using grpc::testing::UpdateActions;
 using grpc::testing::UpdateActions;
@@ -91,8 +97,9 @@ int main(int argc, char** argv) {
   grpc::testing::InitTest(&argc, &argv, true);
   grpc::testing::InitTest(&argc, &argv, true);
   gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
   gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
   int ret = 0;
   int ret = 0;
-  grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
-                                      true,
+  grpc::testing::ChannelCreationFunc channel_creation_func =
+      std::bind(&CreateChannelForTestCase, FLAGS_test_case);
+  grpc::testing::InteropClient client(channel_creation_func, true,
                                       FLAGS_do_not_abort_on_transient_failures);
                                       FLAGS_do_not_abort_on_transient_failures);
 
 
   std::unordered_map<grpc::string, std::function<bool()>> actions;
   std::unordered_map<grpc::string, std::function<bool()>> actions;
@@ -151,6 +158,11 @@ int main(int argc, char** argv) {
       std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
       std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
   actions["cacheable_unary"] =
   actions["cacheable_unary"] =
       std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
       std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
+  actions["channel_soak"] =
+      std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client,
+                FLAGS_soak_iterations);
+  actions["rpc_soak"] = std::bind(&grpc::testing::InteropClient::DoRpcSoakTest,
+                                  &client, FLAGS_soak_iterations);
 
 
   UpdateActions(&actions);
   UpdateActions(&actions);
 
 

+ 44 - 20
test/cpp/interop/interop_client.cc

@@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
 }
 }
 }  // namespace
 }  // namespace
 
 
-InteropClient::ServiceStub::ServiceStub(const std::shared_ptr<Channel>& channel,
-                                        bool new_stub_every_call)
-    : channel_(channel), new_stub_every_call_(new_stub_every_call) {
+InteropClient::ServiceStub::ServiceStub(
+    ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
+    : channel_creation_func_(channel_creation_func),
+      channel_(channel_creation_func_()),
+      new_stub_every_call_(new_stub_every_call) {
   // If new_stub_every_call is false, then this is our chance to initialize
   // If new_stub_every_call is false, then this is our chance to initialize
   // stub_. (see Get())
   // stub_. (see Get())
   if (!new_stub_every_call) {
   if (!new_stub_every_call) {
-    stub_ = TestService::NewStub(channel);
+    stub_ = TestService::NewStub(channel_);
   }
   }
 }
 }
 
 
@@ -100,27 +102,17 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() {
   return unimplemented_service_stub_.get();
   return unimplemented_service_stub_.get();
 }
 }
 
 
-void InteropClient::ServiceStub::Reset(
-    const std::shared_ptr<Channel>& channel) {
-  channel_ = channel;
-
-  // Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
-  // the stub_ since the next call to Get() will create a new stub
-  if (new_stub_every_call_) {
-    stub_.reset();
-  } else {
-    stub_ = TestService::NewStub(channel);
+void InteropClient::ServiceStub::ResetChannel() {
+  channel_ = channel_creation_func_();
+  if (!new_stub_every_call_) {
+    stub_ = TestService::NewStub(channel_);
   }
   }
 }
 }
 
 
-void InteropClient::Reset(const std::shared_ptr<Channel>& channel) {
-  serviceStub_.Reset(std::move(channel));
-}
-
-InteropClient::InteropClient(const std::shared_ptr<Channel>& channel,
+InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
                              bool new_stub_every_test_case,
                              bool new_stub_every_test_case,
                              bool do_not_abort_on_transient_failures)
                              bool do_not_abort_on_transient_failures)
-    : serviceStub_(std::move(channel), new_stub_every_test_case),
+    : serviceStub_(channel_creation_func, new_stub_every_test_case),
       do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
       do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
 
 
 bool InteropClient::AssertStatusOk(const Status& s,
 bool InteropClient::AssertStatusOk(const Status& s,
@@ -1028,6 +1020,38 @@ bool InteropClient::DoCustomMetadata() {
   return true;
   return true;
 }
 }
 
 
+bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) {
+  gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
+  GPR_ASSERT(soak_iterations > 0);
+  SimpleRequest request;
+  SimpleResponse response;
+  for (int i = 0; i < soak_iterations; ++i) {
+    if (!PerformLargeUnary(&request, &response)) {
+      gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i);
+      return false;
+    }
+  }
+  gpr_log(GPR_DEBUG, "rpc_soak test done.");
+  return true;
+}
+
+bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) {
+  gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
+          soak_iterations);
+  GPR_ASSERT(soak_iterations > 0);
+  SimpleRequest request;
+  SimpleResponse response;
+  for (int i = 0; i < soak_iterations; ++i) {
+    serviceStub_.ResetChannel();
+    if (!PerformLargeUnary(&request, &response)) {
+      gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i);
+      return false;
+    }
+  }
+  gpr_log(GPR_DEBUG, "channel_soak test done.");
+  return true;
+}
+
 bool InteropClient::DoUnimplementedService() {
 bool InteropClient::DoUnimplementedService() {
   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
 
 

+ 16 - 4
test/cpp/interop/interop_client.h

@@ -34,13 +34,15 @@ typedef std::function<void(const InteropClientContextInspector&,
                            const SimpleRequest*, const SimpleResponse*)>
                            const SimpleRequest*, const SimpleResponse*)>
     CheckerFn;
     CheckerFn;
 
 
+typedef std::function<std::shared_ptr<Channel>(void)> ChannelCreationFunc;
+
 class InteropClient {
 class InteropClient {
  public:
  public:
   /// If new_stub_every_test_case is true, a new TestService::Stub object is
   /// If new_stub_every_test_case is true, a new TestService::Stub object is
   /// created for every test case
   /// created for every test case
   /// If do_not_abort_on_transient_failures is true, abort() is not called in
   /// If do_not_abort_on_transient_failures is true, abort() is not called in
   /// case of transient failures (like connection failures)
   /// case of transient failures (like connection failures)
-  explicit InteropClient(const std::shared_ptr<Channel>& channel,
+  explicit InteropClient(ChannelCreationFunc channel_creation_func,
                          bool new_stub_every_test_case,
                          bool new_stub_every_test_case,
                          bool do_not_abort_on_transient_failures);
                          bool do_not_abort_on_transient_failures);
   ~InteropClient() {}
   ~InteropClient() {}
@@ -67,6 +69,14 @@ class InteropClient {
   bool DoUnimplementedMethod();
   bool DoUnimplementedMethod();
   bool DoUnimplementedService();
   bool DoUnimplementedService();
   bool DoCacheableUnary();
   bool DoCacheableUnary();
+
+  // The following interop test are not yet part of the interop spec, and are
+  // not implemented cross-language. They are considered experimental for now,
+  // but at some point in the future, might be codified and implemented in all
+  // languages
+  bool DoChannelSoakTest(int32_t soak_iterations);
+  bool DoRpcSoakTest(int32_t soak_iterations);
+
   // Auth tests.
   // Auth tests.
   // username is a string containing the user email
   // username is a string containing the user email
   bool DoJwtTokenCreds(const grpc::string& username);
   bool DoJwtTokenCreds(const grpc::string& username);
@@ -83,15 +93,17 @@ class InteropClient {
    public:
    public:
     // If new_stub_every_call = true, pointer to a new instance of
     // If new_stub_every_call = true, pointer to a new instance of
     // TestServce::Stub is returned by Get() everytime it is called
     // TestServce::Stub is returned by Get() everytime it is called
-    ServiceStub(const std::shared_ptr<Channel>& channel,
+    ServiceStub(ChannelCreationFunc channel_creation_func,
                 bool new_stub_every_call);
                 bool new_stub_every_call);
 
 
     TestService::Stub* Get();
     TestService::Stub* Get();
     UnimplementedService::Stub* GetUnimplementedServiceStub();
     UnimplementedService::Stub* GetUnimplementedServiceStub();
 
 
-    void Reset(const std::shared_ptr<Channel>& channel);
+    // forces channel to be recreated.
+    void ResetChannel();
 
 
    private:
    private:
+    ChannelCreationFunc channel_creation_func_;
     std::unique_ptr<TestService::Stub> stub_;
     std::unique_ptr<TestService::Stub> stub_;
     std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
     std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
     std::shared_ptr<Channel> channel_;
     std::shared_ptr<Channel> channel_;
@@ -109,8 +121,8 @@ class InteropClient {
   bool AssertStatusCode(const Status& s, StatusCode expected_code,
   bool AssertStatusCode(const Status& s, StatusCode expected_code,
                         const grpc::string& optional_debug_string);
                         const grpc::string& optional_debug_string);
   bool TransientFailureOrAbort();
   bool TransientFailureOrAbort();
-  ServiceStub serviceStub_;
 
 
+  ServiceStub serviceStub_;
   /// If true, abort() is not called for transient failures
   /// If true, abort() is not called for transient failures
   bool do_not_abort_on_transient_failures_;
   bool do_not_abort_on_transient_failures_;
 };
 };

+ 3 - 3
test/cpp/interop/stress_interop_client.cc

@@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const {
 
 
 StressTestInteropClient::StressTestInteropClient(
 StressTestInteropClient::StressTestInteropClient(
     int test_id, const grpc::string& server_address,
     int test_id, const grpc::string& server_address,
-    const std::shared_ptr<Channel>& channel,
+    ChannelCreationFunc channel_creation_func,
     const WeightedRandomTestSelector& test_selector, long test_duration_secs,
     const WeightedRandomTestSelector& test_selector, long test_duration_secs,
     long sleep_duration_ms, bool do_not_abort_on_transient_failures)
     long sleep_duration_ms, bool do_not_abort_on_transient_failures)
     : test_id_(test_id),
     : test_id_(test_id),
       server_address_(server_address),
       server_address_(server_address),
-      channel_(channel),
-      interop_client_(new InteropClient(channel, false,
+      channel_creation_func_(channel_creation_func),
+      interop_client_(new InteropClient(channel_creation_func_, false,
                                         do_not_abort_on_transient_failures)),
                                         do_not_abort_on_transient_failures)),
       test_selector_(test_selector),
       test_selector_(test_selector),
       test_duration_secs_(test_duration_secs),
       test_duration_secs_(test_duration_secs),

+ 2 - 2
test/cpp/interop/stress_interop_client.h

@@ -91,7 +91,7 @@ class WeightedRandomTestSelector {
 class StressTestInteropClient {
 class StressTestInteropClient {
  public:
  public:
   StressTestInteropClient(int test_id, const grpc::string& server_address,
   StressTestInteropClient(int test_id, const grpc::string& server_address,
-                          const std::shared_ptr<Channel>& channel,
+                          ChannelCreationFunc channel_creation_func,
                           const WeightedRandomTestSelector& test_selector,
                           const WeightedRandomTestSelector& test_selector,
                           long test_duration_secs, long sleep_duration_ms,
                           long test_duration_secs, long sleep_duration_ms,
                           bool do_not_abort_on_transient_failures);
                           bool do_not_abort_on_transient_failures);
@@ -105,7 +105,7 @@ class StressTestInteropClient {
 
 
   int test_id_;
   int test_id_;
   const grpc::string& server_address_;
   const grpc::string& server_address_;
-  std::shared_ptr<Channel> channel_;
+  ChannelCreationFunc channel_creation_func_;
   std::unique_ptr<InteropClient> interop_client_;
   std::unique_ptr<InteropClient> interop_client_;
   const WeightedRandomTestSelector& test_selector_;
   const WeightedRandomTestSelector& test_selector_;
   long test_duration_secs_;
   long test_duration_secs_;

+ 8 - 3
test/cpp/interop/stress_test.cc

@@ -283,15 +283,20 @@ int main(int argc, char** argv) {
          channel_idx++) {
          channel_idx++) {
       gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
       gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
               channel_idx);
               channel_idx);
-      std::shared_ptr<grpc::Channel> channel = grpc::CreateTestChannel(
+      grpc::testing::ChannelCreationFunc channel_creation_func = std::bind(
+          static_cast<std::shared_ptr<grpc::Channel> (*)(
+              const grpc::string&, const grpc::string&,
+              grpc::testing::transport_security, bool)>(
+              grpc::CreateTestChannel),
           *it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);
           *it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);
 
 
       // Create stub(s) for each channel
       // Create stub(s) for each channel
       for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
       for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
            stub_idx++) {
            stub_idx++) {
         clients.emplace_back(new StressTestInteropClient(
         clients.emplace_back(new StressTestInteropClient(
-            ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
-            FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures));
+            ++thread_idx, *it, channel_creation_func, test_selector,
+            FLAGS_test_duration_secs, FLAGS_sleep_duration_ms,
+            FLAGS_do_not_abort_on_transient_failures));
 
 
         bool is_already_created = false;
         bool is_already_created = false;
         // QpsGauge name
         // QpsGauge name

+ 5 - 0
test/cpp/util/channel_trace_proto_helper.cc

@@ -77,5 +77,10 @@ void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str) {
       json_c_str);
       json_c_str);
 }
 }
 
 
+void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str) {
+  VaidateProtoJsonTranslation<grpc::channelz::v1::GetChannelResponse>(
+      json_c_str);
+}
+
 }  // namespace testing
 }  // namespace testing
 }  // namespace grpc
 }  // namespace grpc

+ 1 - 0
test/cpp/util/channel_trace_proto_helper.h

@@ -25,6 +25,7 @@ namespace testing {
 void ValidateChannelTraceProtoJsonTranslation(char* json_c_str);
 void ValidateChannelTraceProtoJsonTranslation(char* json_c_str);
 void ValidateChannelProtoJsonTranslation(char* json_c_str);
 void ValidateChannelProtoJsonTranslation(char* json_c_str);
 void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str);
 void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str);
+void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str);
 
 
 }  // namespace testing
 }  // namespace testing
 }  // namespace grpc
 }  // namespace grpc

+ 26 - 0
tools/internal_ci/linux/grpc_publish_packages.cfg

@@ -0,0 +1,26 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Config file for the internal CI (in protobuf text format)
+
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/linux/grpc_publish_packages.sh"
+timeout_mins: 120
+action {
+  define_artifacts {
+    regex: "**/*sponge_log.xml"
+    regex: "github/grpc/reports/**"
+    regex: "github/grpc/artifacts/**"
+  }
+}

+ 110 - 0
tools/internal_ci/linux/grpc_publish_packages.sh

@@ -0,0 +1,110 @@
+#!/bin/bash
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -ex
+
+shopt -s nullglob
+
+export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json
+
+GCS_ROOT=gs://packages.grpc.io
+MANIFEST_FILE=index.xml
+ARCHIVE_UUID=${KOKORO_BUILD_ID:-$(uuidgen)}
+GIT_BRANCH_NAME=master #${KOKORO_GITHUB_COMMIT:-master}
+GIT_COMMIT=${KOKORO_GIT_COMMIT:-unknown}
+ARCHIVE_TIMESTAMP=$(date -Iseconds)
+TARGET_DIR=$(mktemp -d grpc_publish_packages.sh.XXXX)
+YEAR_MONTH_PREFIX=$(date "+%Y/%m")
+YEAR_PREFIX=${YEAR_MONTH_PREFIX%%/*}
+UPLOAD_ROOT=$TARGET_DIR/$YEAR_PREFIX
+RELATIVE_PATH=$YEAR_MONTH_PREFIX/$ARCHIVE_UUID
+BUILD_ROOT=$TARGET_DIR/$RELATIVE_PATH
+
+LINUX_PACKAGES=$KOKORO_GFILE_DIR/github/grpc/artifacts
+WINDOWS_PACKAGES=$KOKORO_GFILE_DIR/github/grpc/artifacts
+# TODO(mmx): enable linux_extra
+# LINUX_EXTRA_PACKAGES=$KOKORO_GFILE_DIR/github/grpc/artifacts
+
+PYTHON_PACKAGES=(
+  "$LINUX_PACKAGES"/grpcio-[0-9]*.whl
+  "$LINUX_PACKAGES"/grpcio-[0-9]*.tar.gz
+  "$LINUX_PACKAGES"/grpcio_tools-[0-9]*.whl
+  "$LINUX_PACKAGES"/grpcio-tools-[0-9]*.tar.gz
+  "$LINUX_PACKAGES"/grpcio-health-checking-[0-9]*.tar.gz
+  "$LINUX_PACKAGES"/grpcio-reflection-[0-9]*.tar.gz
+  "$LINUX_PACKAGES"/grpcio-testing-[0-9]*.tar.gz
+  #"$LINUX_EXTRA_PACKAGES"/grpcio-[0-9]*.whl
+  #"$LINUX_EXTRA_PACKAGES"/grpcio_tools-[0-9]*.whl
+)
+
+PHP_PACKAGES=(
+  "$LINUX_PACKAGES"/grpc-[0-9]*.tgz
+)
+
+RUBY_PACKAGES=(
+  "$LINUX_PACKAGES"/grpc-[0-9]*.gem
+  "$LINUX_PACKAGES"/grpc-tools-[0-9]*.gem
+)
+
+CSHARP_PACKAGES=(
+  "$WINDOWS_PACKAGES"/csharp_nugets_windows_dotnetcli.zip
+)
+
+function add_to_manifest() {
+  local xml_type=$1
+  local xml_name
+  xml_name=$(basename "$2")
+  local xml_sha256
+  xml_sha256=$(openssl sha256 -r "$2" | cut -d " " -f 1)
+  cp "$2" "$BUILD_ROOT"
+  echo "<artifact type='$xml_type' name='$xml_name' sha256='$xml_sha256' />"
+}
+
+mkdir -p "$BUILD_ROOT"
+
+{
+  cat <<EOF
+<?xml version="1.0"?>
+<?xml-stylesheet href="/web-assets/build.xsl" type="text/xsl"?>
+EOF
+  echo "<build id='$ARCHIVE_UUID' timestamp='$ARCHIVE_TIMESTAMP'>"
+  echo "<metadata>"
+  echo "<branch>$GIT_BRANCH_NAME</branch>"
+  echo "<commit>$GIT_COMMIT</commit>"
+  echo "</metadata><artifacts>"
+
+  for pkg in "${PYTHON_PACKAGES[@]}"; do add_to_manifest python "$pkg"; done
+  for pkg in "${CSHARP_PACKAGES[@]}"; do add_to_manifest csharp "$pkg"; done
+  for pkg in "${PHP_PACKAGES[@]}"; do add_to_manifest php "$pkg"; done
+  for pkg in "${RUBY_PACKAGES[@]}"; do add_to_manifest ruby "$pkg"; done
+
+  echo "</artifacts></build>"
+}> "$BUILD_ROOT/$MANIFEST_FILE"
+
+BUILD_XML_SHA=$(openssl sha256 -r "$BUILD_ROOT/$MANIFEST_FILE" | cut -d " " -f 1)
+
+PREV_HOME=$(mktemp old-XXXXX-$MANIFEST_FILE)
+NEW_HOME=$(mktemp new-XXXXX-$MANIFEST_FILE)
+gsutil cp "$GCS_ROOT/$MANIFEST_FILE" "$PREV_HOME"
+
+{
+  head --lines=4 "$PREV_HOME"
+  echo "<build id='$ARCHIVE_UUID' timestamp='$ARCHIVE_TIMESTAMP' branch='$GIT_BRANCH_NAME' commit='$GIT_COMMIT' manifest='archive/$RELATIVE_PATH/$MANIFEST_FILE' manifest-sha256='$BUILD_XML_SHA' />"
+  tail --lines=+5 "$PREV_HOME"
+}> "$NEW_HOME"
+
+gsutil -m cp -r "$UPLOAD_ROOT" "$GCS_ROOT/archive"
+gsutil -h "Content-Type:application/xml" cp "$NEW_HOME" "$GCS_ROOT/$MANIFEST_FILE"
+