Explorar o código

Merge pull request #16713 from ncteisen/channelz

Implement Child Socket Support
Noah Eisen %!s(int64=7) %!d(string=hai) anos
pai
achega
9fe1c087df

+ 11 - 0
src/core/ext/filters/client_channel/client_channel_channelz.cc

@@ -166,6 +166,17 @@ grpc_json* SubchannelNode::RenderJson() {
   }
   }
   // ask CallCountingHelper to populate trace and call count data.
   // ask CallCountingHelper to populate trace and call count data.
   call_counter_.PopulateCallCounts(json);
   call_counter_.PopulateCallCounts(json);
+  json = top_level_json;
+  // populate the child socket.
+  intptr_t socket_uuid = grpc_subchannel_get_child_socket_uuid(subchannel_);
+  if (socket_uuid != 0) {
+    grpc_json* array_parent = grpc_json_create_child(
+        nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
+    json_iterator = grpc_json_create_child(json_iterator, array_parent, nullptr,
+                                           nullptr, GRPC_JSON_OBJECT, false);
+    grpc_json_add_number_string_child(json_iterator, nullptr, "socketId",
+                                      socket_uuid);
+  }
   return top_level_json;
   return top_level_json;
 }
 }
 
 

+ 3 - 0
src/core/ext/filters/client_channel/connector.h

@@ -47,6 +47,9 @@ typedef struct {
 
 
   /** channel arguments (to be passed to the filters) */
   /** channel arguments (to be passed to the filters) */
   grpc_channel_args* channel_args;
   grpc_channel_args* channel_args;
+
+  /** socket uuid of the connected transport. 0 if not available */
+  intptr_t socket_uuid;
 } grpc_connect_out_args;
 } grpc_connect_out_args;
 
 
 struct grpc_connector_vtable {
 struct grpc_connector_vtable {

+ 13 - 3
src/core/ext/filters/client_channel/subchannel.cc

@@ -411,6 +411,14 @@ grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
   return subchannel->channelz_subchannel.get();
   return subchannel->channelz_subchannel.get();
 }
 }
 
 
+intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) {
+  if (subchannel->connected_subchannel != nullptr) {
+    return subchannel->connected_subchannel->socket_uuid();
+  } else {
+    return 0;
+  }
+}
+
 static void continue_connect_locked(grpc_subchannel* c) {
 static void continue_connect_locked(grpc_subchannel* c) {
   grpc_connect_in_args args;
   grpc_connect_in_args args;
   args.interested_parties = c->pollset_set;
   args.interested_parties = c->pollset_set;
@@ -621,6 +629,7 @@ static bool publish_transport_locked(grpc_subchannel* c) {
     GRPC_ERROR_UNREF(error);
     GRPC_ERROR_UNREF(error);
     return false;
     return false;
   }
   }
+  intptr_t socket_uuid = c->connecting_result.socket_uuid;
   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
 
 
   /* initialize state watcher */
   /* initialize state watcher */
@@ -641,7 +650,7 @@ static bool publish_transport_locked(grpc_subchannel* c) {
 
 
   /* publish */
   /* publish */
   c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
   c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
-      stk, c->channelz_subchannel.get()));
+      stk, c->channelz_subchannel.get(), socket_uuid));
   gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
   gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
           c->connected_subchannel.get(), c);
           c->connected_subchannel.get(), c);
 
 
@@ -811,10 +820,11 @@ namespace grpc_core {
 
 
 ConnectedSubchannel::ConnectedSubchannel(
 ConnectedSubchannel::ConnectedSubchannel(
     grpc_channel_stack* channel_stack,
     grpc_channel_stack* channel_stack,
-    channelz::SubchannelNode* channelz_subchannel)
+    channelz::SubchannelNode* channelz_subchannel, intptr_t socket_uuid)
     : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
     : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
       channel_stack_(channel_stack),
       channel_stack_(channel_stack),
-      channelz_subchannel_(channelz_subchannel) {}
+      channelz_subchannel_(channelz_subchannel),
+      socket_uuid_(socket_uuid) {}
 
 
 ConnectedSubchannel::~ConnectedSubchannel() {
 ConnectedSubchannel::~ConnectedSubchannel() {
   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");

+ 7 - 1
src/core/ext/filters/client_channel/subchannel.h

@@ -86,7 +86,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
   };
   };
 
 
   explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
   explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
-                               channelz::SubchannelNode* channelz_subchannel);
+                               channelz::SubchannelNode* channelz_subchannel,
+                               intptr_t socket_uuid);
   ~ConnectedSubchannel();
   ~ConnectedSubchannel();
 
 
   grpc_channel_stack* channel_stack() { return channel_stack_; }
   grpc_channel_stack* channel_stack() { return channel_stack_; }
@@ -98,12 +99,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
   channelz::SubchannelNode* channelz_subchannel() {
   channelz::SubchannelNode* channelz_subchannel() {
     return channelz_subchannel_;
     return channelz_subchannel_;
   }
   }
+  intptr_t socket_uuid() { return socket_uuid_; }
 
 
  private:
  private:
   grpc_channel_stack* channel_stack_;
   grpc_channel_stack* channel_stack_;
   // backpointer to the channelz node in this connected subchannel's
   // backpointer to the channelz node in this connected subchannel's
   // owning subchannel.
   // owning subchannel.
   channelz::SubchannelNode* channelz_subchannel_;
   channelz::SubchannelNode* channelz_subchannel_;
+  // uuid of this subchannel's socket. 0 if this subchannel is not connected.
+  const intptr_t socket_uuid_;
 };
 };
 
 
 }  // namespace grpc_core
 }  // namespace grpc_core
@@ -126,6 +130,8 @@ void grpc_subchannel_call_unref(
 grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
 grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
     grpc_subchannel* subchannel);
     grpc_subchannel* subchannel);
 
 
+intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel);
+
 /** Returns a pointer to the parent data associated with \a subchannel_call.
 /** Returns a pointer to the parent data associated with \a subchannel_call.
     The data will be of the size specified in \a parent_data_size
     The data will be of the size specified in \a parent_data_size
     field of the args passed to \a grpc_connected_subchannel_create_call(). */
     field of the args passed to \a grpc_connected_subchannel_create_call(). */

+ 2 - 0
src/core/ext/transport/chttp2/client/chttp2_connector.cc

@@ -117,6 +117,8 @@ static void on_handshake_done(void* arg, grpc_error* error) {
                                           c->args.interested_parties);
                                           c->args.interested_parties);
     c->result->transport =
     c->result->transport =
         grpc_create_chttp2_transport(args->args, args->endpoint, true);
         grpc_create_chttp2_transport(args->args, args->endpoint, true);
+    c->result->socket_uuid =
+        grpc_chttp2_transport_get_socket_uuid(c->result->transport);
     GPR_ASSERT(c->result->transport);
     GPR_ASSERT(c->result->transport);
     // TODO(roth): We ideally want to wait until we receive HTTP/2
     // TODO(roth): We ideally want to wait until we receive HTTP/2
     // settings from the server before we consider the connection
     // settings from the server before we consider the connection

+ 10 - 0
src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@@ -3170,6 +3170,16 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
 
 
 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
 
 
+intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) {
+  grpc_chttp2_transport* t =
+      reinterpret_cast<grpc_chttp2_transport*>(transport);
+  if (t->channelz_socket != nullptr) {
+    return t->channelz_socket->uuid();
+  } else {
+    return 0;
+  }
+}
+
 grpc_transport* grpc_create_chttp2_transport(
 grpc_transport* grpc_create_chttp2_transport(
     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(
   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(

+ 2 - 0
src/core/ext/transport/chttp2/transport/chttp2_transport.h

@@ -34,6 +34,8 @@ extern bool g_flow_control_enabled;
 grpc_transport* grpc_create_chttp2_transport(
 grpc_transport* grpc_create_chttp2_transport(
     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client);
     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client);
 
 
+intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport);
+
 /// Takes ownership of \a read_buffer, which (if non-NULL) contains
 /// Takes ownership of \a read_buffer, which (if non-NULL) contains
 /// leftover bytes previously read from the endpoint (e.g., by handshakers).
 /// leftover bytes previously read from the endpoint (e.g., by handshakers).
 /// If non-null, \a notify_on_receive_settings will be scheduled when
 /// If non-null, \a notify_on_receive_settings will be scheduled when