Browse Source

Merge pull request #2477 from ctiller/sometimes-its-good-just-to-check-in-with-each-other

Client connectivity API
Nicolas Noble 10 years ago
parent
commit
e71314fe26
49 changed files with 1995 additions and 463 deletions
  1. 3 0
      BUILD
  2. 75 12
      Makefile
  3. 1 0
      build.json
  4. 1 0
      gRPC.podspec
  5. 18 1
      include/grpc/grpc.h
  6. 7 5
      src/compiler/csharp_generator.cc
  7. 132 17
      src/core/channel/client_channel.c
  8. 14 0
      src/core/channel/client_channel.h
  9. 128 64
      src/core/client_config/lb_policies/pick_first.c
  10. 15 0
      src/core/client_config/lb_policy.c
  11. 12 0
      src/core/client_config/lb_policy.h
  12. 46 22
      src/core/client_config/subchannel.c
  13. 3 1
      src/core/iomgr/alarm.c
  14. 4 0
      src/core/iomgr/endpoint.c
  15. 3 0
      src/core/iomgr/endpoint.h
  16. 4 2
      src/core/iomgr/endpoint_pair_windows.c
  17. 0 1
      src/core/iomgr/iomgr.c
  18. 9 3
      src/core/iomgr/pollset_set_posix.c
  19. 19 14
      src/core/iomgr/tcp_client_posix.c
  20. 9 2
      src/core/iomgr/tcp_posix.c
  21. 3 3
      src/core/iomgr/tcp_windows.c
  22. 10 2
      src/core/security/secure_endpoint.c
  23. 191 0
      src/core/surface/channel_connectivity.c
  24. 8 2
      src/core/surface/channel_create.c
  25. 2 0
      src/core/surface/init.c
  26. 8 2
      src/core/surface/secure_channel_create.c
  27. 31 10
      src/core/transport/chttp2_transport.c
  28. 40 4
      src/core/transport/connectivity_state.c
  29. 10 3
      src/core/transport/connectivity_state.h
  30. 2 0
      src/core/transport/transport.h
  31. 2 2
      src/cpp/client/create_channel.cc
  32. 10 14
      src/csharp/ext/grpc_csharp_ext.c
  33. 2 3
      src/node/test/surface_test.js
  34. 124 0
      test/core/end2end/fixtures/chttp2_fullstack_uds_posix_with_poll.c
  35. 33 21
      test/core/end2end/gen_build_json.py
  36. 123 0
      test/core/end2end/tests/channel_connectivity.c
  37. 2 3
      test/core/end2end/tests/disappearing_server.c
  38. 3 4
      test/core/end2end/tests/simple_delayed_request.c
  39. 1 1
      test/cpp/client/credentials_test.cc
  40. 1 1
      test/cpp/end2end/end2end_test.cc
  41. 1 0
      tools/doxygen/Doxyfile.core.internal
  42. 1 1
      tools/run_tests/run_tests.py
  43. 201 111
      tools/run_tests/sources_and_headers.json
  44. 673 132
      tools/run_tests/tests.json
  45. 0 0
      vsprojects/Grpc.mak
  46. 2 0
      vsprojects/grpc/grpc.vcxproj
  47. 3 0
      vsprojects/grpc/grpc.vcxproj.filters
  48. 2 0
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  49. 3 0
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

+ 3 - 0
BUILD

@@ -343,6 +343,7 @@ cc_library(
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",
@@ -578,6 +579,7 @@ cc_library(
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",
@@ -1061,6 +1063,7 @@ objc_library(
     "src/core/surface/call_details.c",
     "src/core/surface/call_log_batch.c",
     "src/core/surface/channel.c",
+    "src/core/surface/channel_connectivity.c",
     "src/core/surface/channel_create.c",
     "src/core/surface/completion_queue.c",
     "src/core/surface/event_string.c",

File diff suppressed because it is too large
+ 75 - 12
Makefile


+ 1 - 0
build.json

@@ -284,6 +284,7 @@
         "src/core/surface/call_details.c",
         "src/core/surface/call_log_batch.c",
         "src/core/surface/channel.c",
+        "src/core/surface/channel_connectivity.c",
         "src/core/surface/channel_create.c",
         "src/core/surface/completion_queue.c",
         "src/core/surface/event_string.c",

+ 1 - 0
gRPC.podspec

@@ -352,6 +352,7 @@ Pod::Spec.new do |s|
                       'src/core/surface/call_details.c',
                       'src/core/surface/call_log_batch.c',
                       'src/core/surface/channel.c',
+                      'src/core/surface/channel_connectivity.c',
                       'src/core/surface/channel_create.c',
                       'src/core/surface/completion_queue.c',
                       'src/core/surface/event_string.c',

+ 18 - 1
include/grpc/grpc.h

@@ -402,6 +402,23 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
     drained and no threads are executing grpc_completion_queue_next */
 void grpc_completion_queue_destroy(grpc_completion_queue *cq);
 
+/** Check the connectivity state of a channel. */
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+    grpc_channel *channel, int try_to_connect);
+
+/** Watch for a change in connectivity state.
+    Once the channel connectivity state is different from last_observed_state,
+    tag will be enqueued on cq with success=1.
+    If deadline expires BEFORE the state is changed, tag will be enqueued on cq
+    with success=0.
+    If optional_new_state is non-NULL, it will be set to the newly observed
+    connectivity state of the channel at the same point as tag is enqueued onto 
+    the completion queue. */
+void grpc_channel_watch_connectivity_state(
+    grpc_channel *channel, grpc_connectivity_state last_observed_state,
+    grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
+    grpc_completion_queue *cq, void *tag);
+
 /** Create a call given a grpc_channel, in order to call 'method'. All
     completions are sent to 'completion_queue'. 'method' and 'host' need only
     live through the invocation of this function. */
@@ -436,7 +453,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
 /** Returns a newly allocated string representing the endpoint to which this
     call is communicating with. The string is in the uri format accepted by
     grpc_channel_create.
-    The returned string should be disposed of with gpr_free(). 
+    The returned string should be disposed of with gpr_free().
 
     WARNING: this value is never authenticated or subject to any security
     related code. It must not be used for any authentication related

+ 7 - 5
src/compiler/csharp_generator.cc

@@ -298,11 +298,13 @@ void GenerateServerInterface(Printer* out, const ServiceDescriptor *service) {
   out->Indent();
   for (int i = 0; i < service->method_count(); i++) {
     const MethodDescriptor *method = service->method(i);
-    out->Print("$returntype$ $methodname$($request$$response_stream_maybe$, ServerCallContext context);\n",
-               "methodname", method->name(), "returntype",
-               GetMethodReturnTypeServer(method), "request",
-               GetMethodRequestParamServer(method), "response_stream_maybe",
-               GetMethodResponseStreamMaybe(method));
+    out->Print(
+        "$returntype$ $methodname$($request$$response_stream_maybe$, "
+        "ServerCallContext context);\n",
+        "methodname", method->name(), "returntype",
+        GetMethodReturnTypeServer(method), "request",
+        GetMethodRequestParamServer(method), "response_stream_maybe",
+        GetMethodResponseStreamMaybe(method));
   }
   out->Outdent();
   out->Print("}\n");

+ 132 - 17
src/core/channel/client_channel.c

@@ -40,7 +40,6 @@
 #include "src/core/channel/connected_channel.h"
 #include "src/core/surface/channel.h"
 #include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset_set.h"
 #include "src/core/support/string.h"
 #include "src/core/transport/connectivity_state.h"
 #include <grpc/support/alloc.h>
@@ -77,8 +76,22 @@ typedef struct {
   grpc_iomgr_closure on_config_changed;
   /** connectivity state being tracked */
   grpc_connectivity_state_tracker state_tracker;
+  /** when an lb_policy arrives, should we try to exit idle */
+  int exit_idle_when_lb_policy_arrives;
+  /** pollset_set of interested parties in a new connection */
+  grpc_pollset_set pollset_set;
 } channel_data;
 
+/** We create one watcher for each new lb_policy that is returned from a resolver,
+    to watch for state changes from the lb_policy. When a state change is seen, we
+    update the channel, and create a new watcher */
+typedef struct {
+  channel_data *chand;
+  grpc_iomgr_closure on_changed;
+  grpc_connectivity_state state;
+  grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
 typedef enum {
   CALL_CREATED,
   CALL_WAITING_FOR_SEND,
@@ -408,16 +421,53 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
   perform_transport_stream_op(elem, op, 0);
 }
 
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+
+static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
+  lb_policy_connectivity_watcher *w = arg;
+
+  gpr_mu_lock(&w->chand->mu_config);
+  /* check if the notification is for a stale policy */
+  if (w->lb_policy == w->chand->lb_policy) {
+    grpc_connectivity_state_set(&w->chand->state_tracker, w->state,
+                                "lb_changed");
+    if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+      watch_lb_policy(w->chand, w->lb_policy, w->state);
+    }
+  }
+  gpr_mu_unlock(&w->chand->mu_config);
+
+  GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+  gpr_free(w);
+}
+
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+  lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+  GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+
+  w->chand = chand;
+  grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+  w->state = current_state;
+  w->lb_policy = lb_policy;
+  grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+}
+
 static void cc_on_config_changed(void *arg, int iomgr_success) {
   channel_data *chand = arg;
   grpc_lb_policy *lb_policy = NULL;
   grpc_lb_policy *old_lb_policy;
   grpc_resolver *old_resolver;
   grpc_iomgr_closure *wakeup_closures = NULL;
+  grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+  int exit_idle = 0;
 
   if (chand->incoming_configuration != NULL) {
     lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
-    GRPC_LB_POLICY_REF(lb_policy, "channel");
+    if (lb_policy != NULL) {
+      GRPC_LB_POLICY_REF(lb_policy, "channel");
+      GRPC_LB_POLICY_REF(lb_policy, "config_change");
+      state = grpc_lb_policy_check_connectivity(lb_policy);
+    }
 
     grpc_client_config_unref(chand->incoming_configuration);
   }
@@ -431,13 +481,12 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
     wakeup_closures = chand->waiting_for_config_closures;
     chand->waiting_for_config_closures = NULL;
   }
-  gpr_mu_unlock(&chand->mu_config);
-
-  if (old_lb_policy) {
-    GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+  if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+    GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+    exit_idle = 1;
+    chand->exit_idle_when_lb_policy_arrives = 0;
   }
 
-  gpr_mu_lock(&chand->mu_config);
   if (iomgr_success && chand->resolver) {
     grpc_resolver *resolver = chand->resolver;
     GRPC_RESOLVER_REF(resolver, "channel-next");
@@ -446,11 +495,16 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
     grpc_resolver_next(resolver, &chand->incoming_configuration,
                        &chand->on_config_changed);
     GRPC_RESOLVER_UNREF(resolver, "channel-next");
+    grpc_connectivity_state_set(&chand->state_tracker, state,
+                                "new_lb+resolver");
+    if (lb_policy != NULL) {
+      watch_lb_policy(chand, lb_policy, state);
+    }
   } else {
     old_resolver = chand->resolver;
     chand->resolver = NULL;
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE);
+                                GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
     gpr_mu_unlock(&chand->mu_config);
     if (old_resolver != NULL) {
       grpc_resolver_shutdown(old_resolver);
@@ -458,12 +512,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
     }
   }
 
+  if (exit_idle) {
+    grpc_lb_policy_exit_idle(lb_policy);
+    GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+  }
+
+  if (old_lb_policy != NULL) {
+    GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+  }
+
   while (wakeup_closures) {
     grpc_iomgr_closure *next = wakeup_closures->next;
     wakeup_closures->cb(wakeup_closures->cb_arg, 1);
     wakeup_closures = next;
   }
 
+  if (lb_policy != NULL) {
+    GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
+  }
   GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
 }
 
@@ -487,20 +553,22 @@ static void cc_start_transport_op(grpc_channel_element *elem,
     op->connectivity_state = NULL;
   }
 
+  if (!is_empty(op, sizeof(*op))) {
+    lb_policy = chand->lb_policy;
+    if (lb_policy) {
+      GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+    }
+  }
+
   if (op->disconnect && chand->resolver != NULL) {
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE);
+                                GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
     destroy_resolver = chand->resolver;
     chand->resolver = NULL;
     if (chand->lb_policy != NULL) {
       grpc_lb_policy_shutdown(chand->lb_policy);
-    }
-  }
-
-  if (!is_empty(op, sizeof(*op))) {
-    lb_policy = chand->lb_policy;
-    if (lb_policy) {
-      GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+      GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+      chand->lb_policy = NULL;
     }
   }
   gpr_mu_unlock(&chand->mu_config);
@@ -581,10 +649,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
   gpr_mu_init(&chand->mu_config);
   chand->mdctx = metadata_context;
   chand->master = master;
+  grpc_pollset_set_init(&chand->pollset_set);
   grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
                           chand);
 
-  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
+  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
 }
 
 /* Destructor for channel_data */
@@ -598,6 +667,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
   if (chand->lb_policy != NULL) {
     GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
   }
+  grpc_connectivity_state_destroy(&chand->state_tracker);
+  grpc_pollset_set_destroy(&chand->pollset_set);
   gpr_mu_destroy(&chand->mu_config);
 }
 
@@ -626,3 +697,47 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
   grpc_resolver_next(resolver, &chand->incoming_configuration,
                      &chand->on_config_changed);
 }
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+    grpc_channel_element *elem, int try_to_connect) {
+  channel_data *chand = elem->channel_data;
+  grpc_connectivity_state out;
+  gpr_mu_lock(&chand->mu_config);
+  out = grpc_connectivity_state_check(&chand->state_tracker);
+  if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+    if (chand->lb_policy != NULL) {
+      grpc_lb_policy_exit_idle(chand->lb_policy);
+    } else {
+      chand->exit_idle_when_lb_policy_arrives = 1;
+    }
+  }
+  gpr_mu_unlock(&chand->mu_config);
+  return out;
+}
+
+void grpc_client_channel_watch_connectivity_state(
+    grpc_channel_element *elem, grpc_connectivity_state *state,
+    grpc_iomgr_closure *on_complete) {
+  channel_data *chand = elem->channel_data;
+  gpr_mu_lock(&chand->mu_config);
+  grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+                                                 on_complete);
+  gpr_mu_unlock(&chand->mu_config);
+}
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+  channel_data *chand = elem->channel_data;
+  return &chand->pollset_set;
+}
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
+                                          grpc_pollset *pollset) {
+  channel_data *chand = elem->channel_data;
+  grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
+}
+
+void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
+                                          grpc_pollset *pollset) {
+  channel_data *chand = elem->channel_data;
+  grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
+}

+ 14 - 0
src/core/channel/client_channel.h

@@ -52,4 +52,18 @@ extern const grpc_channel_filter grpc_client_channel_filter;
 void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
                                       grpc_resolver *resolver);
 
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+    grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_channel_watch_connectivity_state(
+    grpc_channel_element *elem, grpc_connectivity_state *state,
+    grpc_iomgr_closure *on_complete);
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
+                                          grpc_pollset *pollset);
+void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
+                                          grpc_pollset *pollset);
+
 #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */

+ 128 - 64
src/core/client_config/lb_policies/pick_first.c

@@ -62,6 +62,8 @@ typedef struct {
   grpc_subchannel *selected;
   /** have we started picking? */
   int started_picking;
+  /** are we shut down? */
+  int shutdown;
   /** which subchannel are we watching? */
   size_t checking_subchannel;
   /** what is the connectivity of that channel? */
@@ -73,12 +75,30 @@ typedef struct {
   grpc_connectivity_state_tracker state_tracker;
 } pick_first_lb_policy;
 
+static void del_interested_parties_locked(pick_first_lb_policy *p) {
+  pending_pick *pp;
+  for (pp = p->pending_picks; pp; pp = pp->next) {
+    grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
+                                         pp->pollset);
+  }
+}
+
+static void add_interested_parties_locked(pick_first_lb_policy *p) {
+  pending_pick *pp;
+  for (pp = p->pending_picks; pp; pp = pp->next) {
+    grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
+                                         pp->pollset);
+  }
+}
+
 void pf_destroy(grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   size_t i;
+  del_interested_parties_locked(p);
   for (i = 0; i < p->num_subchannels; i++) {
     GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
   }
+  grpc_connectivity_state_destroy(&p->state_tracker);
   gpr_free(p->subchannels);
   gpr_mu_destroy(&p->mu);
   gpr_free(p);
@@ -88,12 +108,35 @@ void pf_shutdown(grpc_lb_policy *pol) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
   gpr_mu_lock(&p->mu);
+  del_interested_parties_locked(p);
+  p->shutdown = 1;
   while ((pp = p->pending_picks)) {
     p->pending_picks = pp->next;
     *pp->target = NULL;
     grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
     gpr_free(pp);
   }
+  grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+                              "shutdown");
+  gpr_mu_unlock(&p->mu);
+}
+
+static void start_picking(pick_first_lb_policy *p) {
+  p->started_picking = 1;
+  p->checking_subchannel = 0;
+  p->checking_connectivity = GRPC_CHANNEL_IDLE;
+  GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
+  grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+                                         &p->checking_connectivity,
+                                         &p->connectivity_changed);
+}
+
+void pf_exit_idle(grpc_lb_policy *pol) {
+  pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+  gpr_mu_lock(&p->mu);
+  if (!p->started_picking) {
+    start_picking(p);
+  }
   gpr_mu_unlock(&p->mu);
 }
 
@@ -109,13 +152,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
     on_complete->cb(on_complete->cb_arg, 1);
   } else {
     if (!p->started_picking) {
-      p->started_picking = 1;
-      p->checking_subchannel = 0;
-      p->checking_connectivity = GRPC_CHANNEL_IDLE;
-      GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
-      grpc_subchannel_notify_on_state_change(
-          p->subchannels[p->checking_subchannel], &p->checking_connectivity,
-          &p->connectivity_changed);
+      start_picking(p);
     }
     grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
                                          pollset);
@@ -129,77 +166,97 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
   }
 }
 
-static void del_interested_parties_locked(pick_first_lb_policy *p) {
-  pending_pick *pp;
-  for (pp = p->pending_picks; pp; pp = pp->next) {
-    grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
-                                         pp->pollset);
-  }
-}
-
-static void add_interested_parties_locked(pick_first_lb_policy *p) {
-  pending_pick *pp;
-  for (pp = p->pending_picks; pp; pp = pp->next) {
-    grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
-                                         pp->pollset);
-  }
-}
-
 static void pf_connectivity_changed(void *arg, int iomgr_success) {
   pick_first_lb_policy *p = arg;
   pending_pick *pp;
   int unref = 0;
 
   gpr_mu_lock(&p->mu);
-loop:
-  switch (p->checking_connectivity) {
-    case GRPC_CHANNEL_READY:
-      p->selected = p->subchannels[p->checking_subchannel];
-      while ((pp = p->pending_picks)) {
-        p->pending_picks = pp->next;
-        *pp->target = p->selected;
-        grpc_subchannel_del_interested_party(p->selected, pp->pollset);
-        grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
-        gpr_free(pp);
-      }
-      unref = 1;
-      break;
-    case GRPC_CHANNEL_TRANSIENT_FAILURE:
-      del_interested_parties_locked(p);
-      p->checking_subchannel =
-          (p->checking_subchannel + 1) % p->num_subchannels;
-      p->checking_connectivity = grpc_subchannel_check_connectivity(
-          p->subchannels[p->checking_subchannel]);
-      add_interested_parties_locked(p);
-      goto loop;
-    case GRPC_CHANNEL_CONNECTING:
-    case GRPC_CHANNEL_IDLE:
+
+  if (p->shutdown) {
+    unref = 1;
+  } else if (p->selected != NULL) {
+    grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
+                                "selected_changed");
+    if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
       grpc_subchannel_notify_on_state_change(
-          p->subchannels[p->checking_subchannel], &p->checking_connectivity,
-          &p->connectivity_changed);
-      break;
-    case GRPC_CHANNEL_FATAL_FAILURE:
-      del_interested_parties_locked(p);
-      GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
-               p->subchannels[p->num_subchannels - 1]);
-      p->num_subchannels--;
-      GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
-      if (p->num_subchannels == 0) {
+          p->selected, &p->checking_connectivity, &p->connectivity_changed);
+    } else {
+      unref = 1;
+    }
+  } else {
+  loop:
+    switch (p->checking_connectivity) {
+      case GRPC_CHANNEL_READY:
+        grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+                                    "connecting_ready");
+        p->selected = p->subchannels[p->checking_subchannel];
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
-          *pp->target = NULL;
+          *pp->target = p->selected;
+          grpc_subchannel_del_interested_party(p->selected, pp->pollset);
           grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
           gpr_free(pp);
         }
-        unref = 1;
-      } else {
-        p->checking_subchannel %= p->num_subchannels;
+        grpc_subchannel_notify_on_state_change(
+            p->selected, &p->checking_connectivity, &p->connectivity_changed);
+        break;
+      case GRPC_CHANNEL_TRANSIENT_FAILURE:
+        grpc_connectivity_state_set(&p->state_tracker,
+                                    GRPC_CHANNEL_TRANSIENT_FAILURE,
+                                    "connecting_transient_failure");
+        del_interested_parties_locked(p);
+        p->checking_subchannel =
+            (p->checking_subchannel + 1) % p->num_subchannels;
         p->checking_connectivity = grpc_subchannel_check_connectivity(
             p->subchannels[p->checking_subchannel]);
         add_interested_parties_locked(p);
-        goto loop;
-      }
+        if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+          grpc_subchannel_notify_on_state_change(
+              p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+              &p->connectivity_changed);
+        } else {
+          goto loop;
+        }
+        break;
+      case GRPC_CHANNEL_CONNECTING:
+      case GRPC_CHANNEL_IDLE:
+        grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
+                                    "connecting_changed");
+        grpc_subchannel_notify_on_state_change(
+            p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+            &p->connectivity_changed);
+        break;
+      case GRPC_CHANNEL_FATAL_FAILURE:
+        del_interested_parties_locked(p);
+        GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+                 p->subchannels[p->num_subchannels - 1]);
+        p->num_subchannels--;
+        GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
+        if (p->num_subchannels == 0) {
+          grpc_connectivity_state_set(&p->state_tracker,
+                                      GRPC_CHANNEL_FATAL_FAILURE,
+                                      "no_more_channels");
+          while ((pp = p->pending_picks)) {
+            p->pending_picks = pp->next;
+            *pp->target = NULL;
+            grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+            gpr_free(pp);
+          }
+          unref = 1;
+        } else {
+          grpc_connectivity_state_set(&p->state_tracker,
+                                      GRPC_CHANNEL_TRANSIENT_FAILURE,
+                                      "subchannel_failed");
+          p->checking_subchannel %= p->num_subchannels;
+          p->checking_connectivity = grpc_subchannel_check_connectivity(
+              p->subchannels[p->checking_subchannel]);
+          add_interested_parties_locked(p);
+          goto loop;
+        }
+    }
   }
+
   gpr_mu_unlock(&p->mu);
 
   if (unref) {
@@ -249,8 +306,13 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol,
 }
 
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
-    pf_destroy,   pf_shutdown,           pf_pick,
-    pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+    pf_destroy,
+    pf_shutdown,
+    pf_pick,
+    pf_exit_idle,
+    pf_broadcast,
+    pf_check_connectivity,
+    pf_notify_on_state_change};
 
 grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
                                                  size_t num_subchannels) {
@@ -260,6 +322,8 @@ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
   grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
   p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
   p->num_subchannels = num_subchannels;
+  grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+                               "pick_first");
   memcpy(p->subchannels, subchannels,
          sizeof(grpc_subchannel *) * num_subchannels);
   grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);

+ 15 - 0
src/core/client_config/lb_policy.c

@@ -77,3 +77,18 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
 void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
   policy->vtable->broadcast(policy, op);
 }
+
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
+  policy->vtable->exit_idle(policy);
+}
+
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+                                           grpc_connectivity_state *state,
+                                           grpc_iomgr_closure *closure) {
+  policy->vtable->notify_on_state_change(policy, state, closure);
+}
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(
+    grpc_lb_policy *policy) {
+  return policy->vtable->check_connectivity(policy);
+}

+ 12 - 0
src/core/client_config/lb_policy.h

@@ -59,6 +59,9 @@ struct grpc_lb_policy_vtable {
                grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
                grpc_iomgr_closure *on_complete);
 
+  /** try to enter a READY connectivity state */
+  void (*exit_idle)(grpc_lb_policy *policy);
+
   /** broadcast a transport op to all subchannels */
   void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
 
@@ -106,4 +109,13 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
 
 void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
 
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
+
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+                                           grpc_connectivity_state *state,
+                                           grpc_iomgr_closure *closure);
+
+grpc_connectivity_state grpc_lb_policy_check_connectivity(
+    grpc_lb_policy *policy);
+
 #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */

+ 46 - 22
src/core/client_config/subchannel.c

@@ -38,9 +38,11 @@
 #include <grpc/support/alloc.h>
 
 #include "src/core/channel/channel_args.h"
+#include "src/core/channel/client_channel.h"
 #include "src/core/channel/connected_channel.h"
 #include "src/core/iomgr/alarm.h"
 #include "src/core/transport/connectivity_state.h"
+#include "src/core/surface/channel.h"
 
 typedef struct {
   /* all fields protected by subchannel->mu */
@@ -94,8 +96,10 @@ struct grpc_subchannel {
   grpc_iomgr_closure connected;
 
   /** pollset_set tracking who's interested in a connection
-      being setup */
-  grpc_pollset_set pollset_set;
+      being setup - owned by the master channel (in particular the
+     client_channel
+      filter there-in) */
+  grpc_pollset_set *pollset_set;
 
   /** mutex protecting remaining elements */
   gpr_mu mu;
@@ -132,7 +136,8 @@ struct grpc_subchannel_call {
 #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
 
 static grpc_subchannel_call *create_call(connection *con);
-static void connectivity_state_changed_locked(grpc_subchannel *c);
+static void connectivity_state_changed_locked(grpc_subchannel *c,
+                                              const char *reason);
 static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
 static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
 static void subchannel_connected(void *subchannel, int iomgr_success);
@@ -244,7 +249,6 @@ static void subchannel_destroy(grpc_subchannel *c) {
   grpc_channel_args_destroy(c->args);
   gpr_free(c->addr);
   grpc_mdctx_unref(c->mdctx);
-  grpc_pollset_set_destroy(&c->pollset_set);
   grpc_connectivity_state_destroy(&c->state_tracker);
   grpc_connector_unref(c->connector);
   gpr_free(c);
@@ -252,17 +256,19 @@ static void subchannel_destroy(grpc_subchannel *c) {
 
 void grpc_subchannel_add_interested_party(grpc_subchannel *c,
                                           grpc_pollset *pollset) {
-  grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
+  grpc_pollset_set_add_pollset(c->pollset_set, pollset);
 }
 
 void grpc_subchannel_del_interested_party(grpc_subchannel *c,
                                           grpc_pollset *pollset) {
-  grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
+  grpc_pollset_set_del_pollset(c->pollset_set, pollset);
 }
 
 grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
                                         grpc_subchannel_args *args) {
   grpc_subchannel *c = gpr_malloc(sizeof(*c));
+  grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
+      grpc_channel_get_channel_stack(args->master));
   memset(c, 0, sizeof(*c));
   c->refs = 1;
   c->connector = connector;
@@ -277,10 +283,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
   c->args = grpc_channel_args_copy(args->args);
   c->mdctx = args->mdctx;
   c->master = args->master;
+  c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
   grpc_mdctx_ref(c->mdctx);
-  grpc_pollset_set_init(&c->pollset_set);
   grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
-  grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
+  grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
+                               "subchannel");
   gpr_mu_init(&c->mu);
   return c;
 }
@@ -288,7 +295,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
 static void continue_connect(grpc_subchannel *c) {
   grpc_connect_in_args args;
 
-  args.interested_parties = &c->pollset_set;
+  args.interested_parties = c->pollset_set;
   args.addr = c->addr;
   args.addr_len = c->addr_len;
   args.deadline = compute_connect_deadline(c);
@@ -309,6 +316,7 @@ static void start_connect(grpc_subchannel *c) {
 
 static void continue_creating_call(void *arg, int iomgr_success) {
   waiting_for_connect *w4c = arg;
+  grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
   grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
                               w4c->notify);
   GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
@@ -341,9 +349,10 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
     grpc_subchannel_add_interested_party(c, pollset);
     if (!c->connecting) {
       c->connecting = 1;
-      connectivity_state_changed_locked(c);
+      connectivity_state_changed_locked(c, "create_call");
       /* released by connection */
       SUBCHANNEL_REF_LOCKED(c, "connecting");
+      GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
       gpr_mu_unlock(&c->mu);
 
       start_connect(c);
@@ -372,7 +381,8 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
     c->connecting = 1;
     /* released by connection */
     SUBCHANNEL_REF_LOCKED(c, "connecting");
-    connectivity_state_changed_locked(c);
+    GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
+    connectivity_state_changed_locked(c, "state_change");
   }
   gpr_mu_unlock(&c->mu);
   if (do_connect) {
@@ -388,7 +398,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
   gpr_mu_lock(&c->mu);
   if (op->disconnect) {
     c->disconnected = 1;
-    connectivity_state_changed_locked(c);
+    connectivity_state_changed_locked(c, "disconnect");
     if (c->have_alarm) {
       cancel_alarm = 1;
     }
@@ -456,13 +466,15 @@ static void on_state_changed(void *p, int iomgr_success) {
         destroy_connection = sw->subchannel->active;
       }
       sw->subchannel->active = NULL;
-      grpc_connectivity_state_set(&c->state_tracker,
-                                  GRPC_CHANNEL_TRANSIENT_FAILURE);
+      grpc_connectivity_state_set(
+          &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
+                                             : GRPC_CHANNEL_TRANSIENT_FAILURE,
+          "connection_failed");
       break;
   }
 
 done:
-  connectivity_state_changed_locked(c);
+  connectivity_state_changed_locked(c, "transport_state_changed");
   destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
   gpr_free(sw);
   gpr_mu_unlock(mu);
@@ -486,6 +498,8 @@ static void publish_transport(grpc_subchannel *c) {
   connection *destroy_connection = NULL;
   grpc_channel_element *elem;
 
+  gpr_log(GPR_DEBUG, "publish_transport: %p", c->master);
+
   /* build final filter list */
   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
   filters = gpr_malloc(sizeof(*filters) * num_filters);
@@ -519,6 +533,8 @@ static void publish_transport(grpc_subchannel *c) {
     gpr_free(sw);
     gpr_free(filters);
     grpc_channel_stack_destroy(stk);
+    GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
+    GRPC_SUBCHANNEL_UNREF(c, "connecting");
     return;
   }
 
@@ -536,14 +552,16 @@ static void publish_transport(grpc_subchannel *c) {
   memset(&op, 0, sizeof(op));
   op.connectivity_state = &sw->connectivity_state;
   op.on_connectivity_state_change = &sw->closure;
+  op.bind_pollset_set = c->pollset_set;
   SUBCHANNEL_REF_LOCKED(c, "state_watcher");
+  GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
   GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
   elem =
       grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
   elem->filter->start_transport_op(elem, &op);
 
   /* signal completion */
-  connectivity_state_changed_locked(c);
+  connectivity_state_changed_locked(c, "connected");
   while ((w4c = c->waiting)) {
     c->waiting = w4c->next;
     grpc_iomgr_add_callback(&w4c->continuation);
@@ -565,11 +583,12 @@ static void on_alarm(void *arg, int iomgr_success) {
   if (c->disconnected) {
     iomgr_success = 0;
   }
-  connectivity_state_changed_locked(c);
+  connectivity_state_changed_locked(c, "alarm");
   gpr_mu_unlock(&c->mu);
   if (iomgr_success) {
     continue_connect(c);
   } else {
+    GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
     GRPC_SUBCHANNEL_UNREF(c, "connecting");
   }
 }
@@ -579,13 +598,17 @@ static void subchannel_connected(void *arg, int iomgr_success) {
   if (c->connecting_result.transport != NULL) {
     publish_transport(c);
   } else {
+    gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
     gpr_mu_lock(&c->mu);
-    connectivity_state_changed_locked(c);
     GPR_ASSERT(!c->have_alarm);
     c->have_alarm = 1;
+    connectivity_state_changed_locked(c, "connect_failed");
     c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta);
-    c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
-    grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_MONOTONIC));
+    if (gpr_time_cmp(c->backoff_delta,
+                     gpr_time_from_seconds(60, GPR_TIMESPAN)) < 0) {
+      c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
+    }
+    grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
     gpr_mu_unlock(&c->mu);
   }
 }
@@ -610,9 +633,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
   return GRPC_CHANNEL_IDLE;
 }
 
-static void connectivity_state_changed_locked(grpc_subchannel *c) {
+static void connectivity_state_changed_locked(grpc_subchannel *c,
+                                              const char *reason) {
   grpc_connectivity_state current = compute_connectivity_locked(c);
-  grpc_connectivity_state_set(&c->state_tracker, current);
+  grpc_connectivity_state_set(&c->state_tracker, current, reason);
 }
 
 /*

+ 3 - 1
src/core/iomgr/alarm.c

@@ -361,7 +361,9 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
 
 int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
   GPR_ASSERT(now.clock_type == g_clock_type);
-  return run_some_expired_alarms(drop_mu, now, next, 1);
+  return run_some_expired_alarms(
+      drop_mu, now, next, 
+      gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
 }
 
 gpr_timespec grpc_alarm_list_next_timeout(void) {

+ 4 - 0
src/core/iomgr/endpoint.c

@@ -50,6 +50,10 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
   ep->vtable->add_to_pollset(ep, pollset);
 }
 
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+  ep->vtable->add_to_pollset_set(ep, pollset_set);
+}
+
 void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
 
 void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }

+ 3 - 0
src/core/iomgr/endpoint.h

@@ -35,6 +35,7 @@
 #define GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H
 
 #include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/pollset_set.h"
 #include <grpc/support/slice.h>
 #include <grpc/support/time.h>
 
@@ -70,6 +71,7 @@ struct grpc_endpoint_vtable {
                                       size_t nslices, grpc_endpoint_write_cb cb,
                                       void *user_data);
   void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
+  void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
   void (*shutdown)(grpc_endpoint *ep);
   void (*destroy)(grpc_endpoint *ep);
   char *(*get_peer)(grpc_endpoint *ep);
@@ -101,6 +103,7 @@ void grpc_endpoint_destroy(grpc_endpoint *ep);
 /* Add an endpoint to a pollset, so that when the pollset is polled, events from
    this endpoint are considered */
 void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
+void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set);
 
 struct grpc_endpoint {
   const grpc_endpoint_vtable *vtable;

+ 4 - 2
src/core/iomgr/endpoint_pair_windows.c

@@ -81,8 +81,10 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read
   SOCKET sv[2];
   grpc_endpoint_pair p;
   create_sockets(sv);
-  p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server");
-  p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client");
+  p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
+                             "endpoint:server");
+  p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
+                             "endpoint:client");
   return p;
 }
 

+ 0 - 1
src/core/iomgr/iomgr.c

@@ -147,7 +147,6 @@ void grpc_iomgr_shutdown(void) {
       continue;
     }
     if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
-      gpr_log(GPR_DEBUG, "got late alarm");
       continue;
     }
     if (g_root_object.next != &g_root_object) {

+ 9 - 3
src/core/iomgr/pollset_set_posix.c

@@ -60,7 +60,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
 
 void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
                                   grpc_pollset *pollset) {
-  size_t i;
+  size_t i, j;
   gpr_mu_lock(&pollset_set->mu);
   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
     pollset_set->pollset_capacity =
@@ -70,9 +70,15 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
                                                sizeof(*pollset_set->pollsets));
   }
   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
-  for (i = 0; i < pollset_set->fd_count; i++) {
-    grpc_pollset_add_fd(pollset, pollset_set->fds[i]);
+  for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+    if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
+      GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+    } else {
+      grpc_pollset_add_fd(pollset, pollset_set->fds[i]);
+      pollset_set->fds[j++] = pollset_set->fds[i];
+    }
   }
+  pollset_set->fd_count = j;
   gpr_mu_unlock(&pollset_set->mu);
 }
 

+ 19 - 14
src/core/iomgr/tcp_client_posix.c

@@ -89,11 +89,11 @@ error:
   return 0;
 }
 
-static void on_alarm(void *acp, int success) {
+static void tc_on_alarm(void *acp, int success) {
   int done;
   async_connect *ac = acp;
   gpr_mu_lock(&ac->mu);
-  if (ac->fd != NULL && success) {
+  if (ac->fd != NULL) {
     grpc_fd_shutdown(ac->fd);
   }
   done = (--ac->refs == 0);
@@ -110,11 +110,17 @@ static void on_writable(void *acp, int success) {
   int so_error = 0;
   socklen_t so_error_size;
   int err;
-  int fd = ac->fd->fd;
   int done;
   grpc_endpoint *ep = NULL;
   void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
   void *cb_arg = ac->cb_arg;
+  grpc_fd *fd;
+
+  gpr_mu_lock(&ac->mu);
+  GPR_ASSERT(ac->fd);
+  fd = ac->fd;
+  ac->fd = NULL;
+  gpr_mu_unlock(&ac->mu);
 
   grpc_alarm_cancel(&ac->alarm);
 
@@ -122,7 +128,7 @@ static void on_writable(void *acp, int success) {
   if (success) {
     do {
       so_error_size = sizeof(so_error);
-      err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
+      err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
     } while (err < 0 && errno == EINTR);
     if (err < 0) {
       gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
@@ -145,7 +151,7 @@ static void on_writable(void *acp, int success) {
            don't do that! */
         gpr_log(GPR_ERROR, "kernel out of buffers");
         gpr_mu_unlock(&ac->mu);
-        grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+        grpc_fd_notify_on_write(fd, &ac->write_closure);
         return;
       } else {
         switch (so_error) {
@@ -159,9 +165,9 @@ static void on_writable(void *acp, int success) {
         goto finish;
       }
     } else {
-      grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
-      ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
-                           ac->addr_str);
+      grpc_pollset_set_del_fd(ac->interested_parties, fd);
+      ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+      fd = NULL;
       goto finish;
     }
   } else {
@@ -172,11 +178,10 @@ static void on_writable(void *acp, int success) {
   abort();
 
 finish:
-  if (ep == NULL) {
-    grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
-    grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
-  } else {
-    ac->fd = NULL;
+  if (fd != NULL) {
+    grpc_pollset_set_del_fd(ac->interested_parties, fd);
+    grpc_fd_orphan(fd, NULL, "tcp_client_orphan");
+    fd = NULL;
   }
   done = (--ac->refs == 0);
   gpr_mu_unlock(&ac->mu);
@@ -260,7 +265,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
 
   gpr_mu_lock(&ac->mu);
   grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), 
-                  on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
+                  tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
   gpr_mu_unlock(&ac->mu);
 

+ 9 - 2
src/core/iomgr/tcp_posix.c

@@ -572,14 +572,21 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
   grpc_pollset_add_fd(pollset, tcp->em_fd);
 }
 
+static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
+  grpc_tcp *tcp = (grpc_tcp *)ep;
+  grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
+}
+
 static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   return gpr_strdup(tcp->peer_string);
 }
 
 static const grpc_endpoint_vtable vtable = {
-    grpc_tcp_notify_on_read, grpc_tcp_write,   grpc_tcp_add_to_pollset,
-    grpc_tcp_shutdown,       grpc_tcp_destroy, grpc_tcp_get_peer};
+    grpc_tcp_notify_on_read, grpc_tcp_write,
+    grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
+    grpc_tcp_shutdown,       grpc_tcp_destroy,
+    grpc_tcp_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
                                const char *peer_string) {

+ 3 - 3
src/core/iomgr/tcp_windows.c

@@ -401,9 +401,9 @@ static char *win_get_peer(grpc_endpoint *ep) {
   return gpr_strdup(tcp->peer_string);
 }
 
-static grpc_endpoint_vtable vtable = {
-  win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer
-};
+static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write,
+                                      win_add_to_pollset, win_shutdown,
+                                      win_destroy,        win_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
   grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp));

+ 10 - 2
src/core/security/secure_endpoint.c

@@ -331,14 +331,22 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
   grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
 }
 
+static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
+                                    grpc_pollset_set *pollset_set) {
+  secure_endpoint *ep = (secure_endpoint *)secure_ep;
+  grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
+}
+
 static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
   secure_endpoint *ep = (secure_endpoint *)secure_ep;
   return grpc_endpoint_get_peer(ep->wrapped_ep);
 }
 
 static const grpc_endpoint_vtable vtable = {
-    endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
-    endpoint_shutdown,       endpoint_unref, endpoint_get_peer};
+    endpoint_notify_on_read, endpoint_write,
+    endpoint_add_to_pollset, endpoint_add_to_pollset_set,
+    endpoint_shutdown,       endpoint_unref,
+    endpoint_get_peer};
 
 grpc_endpoint *grpc_secure_endpoint_create(
     struct tsi_frame_protector *protector, grpc_endpoint *transport,

+ 191 - 0
src/core/surface/channel_connectivity.c

@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/channel/client_channel.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/surface/completion_queue.h"
+
+grpc_connectivity_state grpc_channel_check_connectivity_state(
+    grpc_channel *channel, int try_to_connect) {
+  /* forward through to the underlying client channel */
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  if (client_channel_elem->filter != &grpc_client_channel_filter) {
+    gpr_log(GPR_ERROR,
+            "grpc_channel_check_connectivity_state called on something that is "
+            "not a client channel, but '%s'",
+            client_channel_elem->filter->name);
+    return GRPC_CHANNEL_FATAL_FAILURE;
+  }
+  return grpc_client_channel_check_connectivity_state(client_channel_elem,
+                                                      try_to_connect);
+}
+
+typedef enum {
+  WAITING,
+  CALLING_BACK,
+  CALLING_BACK_AND_FINISHED,
+  CALLED_BACK
+} callback_phase;
+
+typedef struct {
+  gpr_mu mu;
+  callback_phase phase;
+  int success;
+  grpc_iomgr_closure on_complete;
+  grpc_alarm alarm;
+  grpc_connectivity_state state;
+  grpc_connectivity_state *optional_new_state;
+  grpc_completion_queue *cq;
+  grpc_cq_completion completion_storage;
+  grpc_channel *channel;
+  void *tag;
+} state_watcher;
+
+static void delete_state_watcher(state_watcher *w) {
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel));
+  grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq));
+  GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
+  gpr_mu_destroy(&w->mu);
+  gpr_free(w);
+}
+
+static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+  int delete = 0;
+  state_watcher *w = pw;
+  gpr_mu_lock(&w->mu);
+  switch (w->phase) {
+    case WAITING:
+    case CALLED_BACK:
+      gpr_log(GPR_ERROR, "should never reach here");
+      abort();
+      break;
+    case CALLING_BACK:
+      w->phase = CALLED_BACK;
+      break;
+    case CALLING_BACK_AND_FINISHED:
+      delete = 1;
+      break;
+  }
+  gpr_mu_unlock(&w->mu);
+
+  if (delete) {
+    delete_state_watcher(w);
+  }
+}
+
+static void partly_done(state_watcher *w, int due_to_completion) {
+  int delete = 0;
+
+  if (due_to_completion) {
+    gpr_mu_lock(&w->mu);
+    w->success = 1;
+    gpr_mu_unlock(&w->mu);
+    grpc_alarm_cancel(&w->alarm);
+  }
+
+  gpr_mu_lock(&w->mu);
+  switch (w->phase) {
+    case WAITING:
+      w->phase = CALLING_BACK;
+      if (w->optional_new_state) {
+        *w->optional_new_state = w->state;
+      }
+      grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
+                     &w->completion_storage);
+      break;
+    case CALLING_BACK:
+      w->phase = CALLING_BACK_AND_FINISHED;
+      break;
+    case CALLING_BACK_AND_FINISHED:
+      gpr_log(GPR_ERROR, "should never reach here");
+      abort();
+      break;
+    case CALLED_BACK:
+      delete = 1;
+      break;
+  }
+  gpr_mu_unlock(&w->mu);
+
+  if (delete) {
+    delete_state_watcher(w);
+  }
+}
+
+static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
+
+static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
+
+void grpc_channel_watch_connectivity_state(
+    grpc_channel *channel, grpc_connectivity_state last_observed_state,
+    grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
+    grpc_completion_queue *cq, void *tag) {
+  grpc_channel_element *client_channel_elem =
+      grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  state_watcher *w = gpr_malloc(sizeof(*w));
+
+  grpc_cq_begin_op(cq);
+
+  gpr_mu_init(&w->mu);
+  grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
+  w->phase = WAITING;
+  w->state = last_observed_state;
+  w->success = 0;
+  w->optional_new_state = optional_new_state;
+  w->cq = cq;
+  w->tag = tag;
+  w->channel = channel;
+
+  grpc_alarm_init(
+      &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), 
+      timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+
+  if (client_channel_elem->filter != &grpc_client_channel_filter) {
+    gpr_log(GPR_ERROR,
+            "grpc_channel_watch_connectivity_state called on something that is "
+            "not a client channel, but '%s'",
+            client_channel_elem->filter->name);
+    grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+  } else {
+    GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
+    grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq));
+    grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
+                                                 &w->on_complete);
+  }
+}

+ 8 - 2
src/core/surface/channel_create.c

@@ -109,6 +109,7 @@ typedef struct {
   gpr_refcount refs;
   grpc_mdctx *mdctx;
   grpc_channel_args *merge_args;
+  grpc_channel *master;
 } subchannel_factory;
 
 static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -119,6 +120,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
 static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
   subchannel_factory *f = (subchannel_factory *)scf;
   if (gpr_unref(&f->refs)) {
+    GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
     grpc_channel_args_destroy(f->merge_args);
     grpc_mdctx_unref(f->mdctx);
     gpr_free(f);
@@ -137,6 +139,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
   gpr_ref_init(&c->refs, 1);
   args->mdctx = f->mdctx;
   args->args = final_args;
+  args->master = f->master;
   s = grpc_subchannel_create(&c->base, args);
   grpc_connector_unref(&c->base);
   grpc_channel_args_destroy(final_args);
@@ -168,19 +171,22 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
+  channel =
+      grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
+
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
   gpr_ref_init(&f->refs, 1);
   grpc_mdctx_ref(mdctx);
   f->mdctx = mdctx;
   f->merge_args = grpc_channel_args_copy(args);
+  f->master = channel;
+  GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
   resolver = grpc_resolver_create(target, &f->base);
   if (!resolver) {
     return NULL;
   }
 
-  channel =
-      grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
   grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
                                    resolver);
   GRPC_RESOLVER_UNREF(resolver, "create");

+ 2 - 0
src/core/surface/init.c

@@ -47,6 +47,7 @@
 #include "src/core/surface/init.h"
 #include "src/core/surface/surface_trace.h"
 #include "src/core/transport/chttp2_transport.h"
+#include "src/core/transport/connectivity_state.h"
 
 static gpr_once g_basic_init = GPR_ONCE_INIT;
 static gpr_mu g_init_mu;
@@ -75,6 +76,7 @@ void grpc_init(void) {
     grpc_register_tracer("http", &grpc_http_trace);
     grpc_register_tracer("flowctl", &grpc_flowctl_trace);
     grpc_register_tracer("batch", &grpc_trace_batch);
+    grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
     grpc_security_pre_init();
     grpc_iomgr_init();
     grpc_tracer_init("GRPC_TRACE");

+ 8 - 2
src/core/surface/secure_channel_create.c

@@ -134,6 +134,7 @@ typedef struct {
   grpc_mdctx *mdctx;
   grpc_channel_args *merge_args;
   grpc_channel_security_connector *security_connector;
+  grpc_channel *master;
 } subchannel_factory;
 
 static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
@@ -146,6 +147,7 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
   if (gpr_unref(&f->refs)) {
     GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
                                   "subchannel_factory");
+    GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
     grpc_channel_args_destroy(f->merge_args);
     grpc_mdctx_unref(f->mdctx);
     gpr_free(f);
@@ -165,6 +167,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
   gpr_ref_init(&c->refs, 1);
   args->mdctx = f->mdctx;
   args->args = final_args;
+  args->master = f->master;
   s = grpc_subchannel_create(&c->base, args);
   grpc_connector_unref(&c->base);
   grpc_channel_args_destroy(final_args);
@@ -218,6 +221,9 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
+  channel =
+      grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
+
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
   gpr_ref_init(&f->refs, 1);
@@ -226,13 +232,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory");
   f->security_connector = connector;
   f->merge_args = grpc_channel_args_copy(args_copy);
+  f->master = channel;
+  GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
   resolver = grpc_resolver_create(target, &f->base);
   if (!resolver) {
     return NULL;
   }
 
-  channel =
-      grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
   grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
                                    resolver);
   GRPC_RESOLVER_UNREF(resolver, "create");

+ 31 - 10
src/core/transport/chttp2_transport.c

@@ -110,6 +110,8 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
 /** Add endpoint from this transport to pollset */
 static void add_to_pollset_locked(grpc_chttp2_transport *t,
                                   grpc_pollset *pollset);
+static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
+                                  grpc_pollset_set *pollset_set);
 
 /** Start new streams that have been created if we can */
 static void maybe_start_some_streams(
@@ -117,7 +119,7 @@ static void maybe_start_some_streams(
 
 static void connectivity_state_set(
     grpc_chttp2_transport_global *transport_global,
-    grpc_connectivity_state state);
+    grpc_connectivity_state state, const char *reason);
 
 /*
  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
@@ -235,7 +237,7 @@ static void init_transport(grpc_chttp2_transport *t,
       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
   t->writing.is_client = is_client;
   grpc_connectivity_state_init(&t->channel_callback.state_tracker,
-                               GRPC_CHANNEL_READY);
+                               GRPC_CHANNEL_READY, "transport");
 
   gpr_slice_buffer_init(&t->global.qbuf);
 
@@ -329,7 +331,8 @@ static void destroy_transport(grpc_transport *gt) {
 static void close_transport_locked(grpc_chttp2_transport *t) {
   if (!t->closed) {
     t->closed = 1;
-    connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE);
+    connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE,
+                           "close_transport");
     if (t->ep) {
       grpc_endpoint_shutdown(t->ep);
     }
@@ -536,7 +539,8 @@ void grpc_chttp2_add_incoming_goaway(
   gpr_free(msg);
   gpr_slice_unref(goaway_text);
   transport_global->seen_goaway = 1;
-  connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE);
+  connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE,
+                         "got_goaway");
 }
 
 static void maybe_start_some_streams(
@@ -561,7 +565,8 @@ static void maybe_start_some_streams(
     transport_global->next_stream_id += 2;
 
     if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
-      connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE);
+      connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
+                             "no_more_stream_ids");
     }
 
     stream_global->outgoing_window =
@@ -689,6 +694,7 @@ static void send_ping_locked(grpc_chttp2_transport *t,
 
 static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  int close_transport = 0;
 
   lock(t);
 
@@ -708,9 +714,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
         t->global.last_incoming_stream_id,
         grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
         gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
-    if (!grpc_chttp2_has_streams(t)) {
-      close_transport_locked(t);
-    }
+    close_transport = !grpc_chttp2_has_streams(t);
   }
 
   if (op->set_accept_stream != NULL) {
@@ -723,6 +727,10 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
     add_to_pollset_locked(t, op->bind_pollset);
   }
 
+  if (op->bind_pollset_set) {
+    add_to_pollset_set_locked(t, op->bind_pollset_set);
+  }
+
   if (op->send_ping) {
     send_ping_locked(t, op->send_ping);
   }
@@ -732,6 +740,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
   }
 
   unlock(t);
+
+  if (close_transport) {
+    lock(t);
+    close_transport_locked(t);
+    unlock(t);
+  }
 }
 
 /*
@@ -1008,12 +1022,12 @@ static void schedule_closure_for_connectivity(void *a,
 
 static void connectivity_state_set(
     grpc_chttp2_transport_global *transport_global,
-    grpc_connectivity_state state) {
+    grpc_connectivity_state state, const char *reason) {
   GRPC_CHTTP2_IF_TRACING(
       gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
   grpc_connectivity_state_set_with_scheduler(
       &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
-      state, schedule_closure_for_connectivity, transport_global);
+      state, schedule_closure_for_connectivity, transport_global, reason);
 }
 
 void grpc_chttp2_schedule_closure(
@@ -1041,6 +1055,13 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
   }
 }
 
+static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
+                                  grpc_pollset_set *pollset_set) {
+  if (t->ep) {
+    grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
+  }
+}
+
 /*
  * TRACING
  */

+ 40 - 4
src/core/transport/connectivity_state.c

@@ -34,11 +34,33 @@
 #include "src/core/transport/connectivity_state.h"
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+int grpc_connectivity_state_trace = 0;
+
+const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
+  switch (state) {
+    case GRPC_CHANNEL_IDLE:
+      return "IDLE";
+    case GRPC_CHANNEL_CONNECTING:
+      return "CONNECTING";
+    case GRPC_CHANNEL_READY:
+      return "READY";
+    case GRPC_CHANNEL_TRANSIENT_FAILURE:
+      return "TRANSIENT_FAILURE";
+    case GRPC_CHANNEL_FATAL_FAILURE:
+      return "FATAL_FAILURE";
+  }
+  abort();
+  return "UNKNOWN";
+}
 
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
-                                  grpc_connectivity_state init_state) {
+                                  grpc_connectivity_state init_state,
+                                  const char *name) {
   tracker->current_state = init_state;
   tracker->watchers = NULL;
+  tracker->name = gpr_strdup(name);
 }
 
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -54,6 +76,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
     }
     gpr_free(w);
   }
+  gpr_free(tracker->name);
 }
 
 grpc_connectivity_state grpc_connectivity_state_check(
@@ -64,6 +87,11 @@ grpc_connectivity_state grpc_connectivity_state_check(
 int grpc_connectivity_state_notify_on_state_change(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
     grpc_iomgr_closure *notify) {
+  if (grpc_connectivity_state_trace) {
+    gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
+            grpc_connectivity_state_name(*current),
+            grpc_connectivity_state_name(tracker->current_state));
+  }
   if (tracker->current_state != *current) {
     *current = tracker->current_state;
     grpc_iomgr_add_callback(notify);
@@ -79,12 +107,19 @@ int grpc_connectivity_state_notify_on_state_change(
 
 void grpc_connectivity_state_set_with_scheduler(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
-    void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) {
+    void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg,
+    const char *reason) {
   grpc_connectivity_state_watcher *new = NULL;
   grpc_connectivity_state_watcher *w;
+  if (grpc_connectivity_state_trace) {
+    gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
+            grpc_connectivity_state_name(tracker->current_state),
+            grpc_connectivity_state_name(state), reason);
+  }
   if (tracker->current_state == state) {
     return;
   }
+  GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
   tracker->current_state = state;
   while ((w = tracker->watchers)) {
     tracker->watchers = w->next;
@@ -106,7 +141,8 @@ static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
 }
 
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
-                                 grpc_connectivity_state state) {
+                                 grpc_connectivity_state state,
+                                 const char *reason) {
   grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
-                                             NULL);
+                                             NULL, reason);
 }

+ 10 - 3
src/core/transport/connectivity_state.h

@@ -51,17 +51,24 @@ typedef struct {
   grpc_connectivity_state current_state;
   /** all our watchers */
   grpc_connectivity_state_watcher *watchers;
+  /** a name to help debugging */
+  char *name;
 } grpc_connectivity_state_tracker;
 
+extern int grpc_connectivity_state_trace;
+
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
-                                  grpc_connectivity_state init_state);
+                                  grpc_connectivity_state init_state,
+                                  const char *name);
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
 
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
-                                 grpc_connectivity_state state);
+                                 grpc_connectivity_state state,
+                                 const char *reason);
 void grpc_connectivity_state_set_with_scheduler(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
-    void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg);
+    void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg,
+    const char *reason);
 
 grpc_connectivity_state grpc_connectivity_state_check(
     grpc_connectivity_state_tracker *tracker);

+ 2 - 0
src/core/transport/transport.h

@@ -109,6 +109,8 @@ typedef struct grpc_transport_op {
   void *set_accept_stream_user_data;
   /** add this transport to a pollset */
   grpc_pollset *bind_pollset;
+  /** add this transport to a pollset_set */
+  grpc_pollset_set *bind_pollset_set;
   /** send a ping, call this back if not NULL */
   grpc_iomgr_closure *send_ping;
 } grpc_transport_op;

+ 2 - 2
src/cpp/client/create_channel.cc

@@ -51,7 +51,7 @@ std::shared_ptr<ChannelInterface> CreateChannel(
   cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
                     user_agent_prefix.str());
   return creds ? creds->CreateChannel(target, cp_args)
-               : std::shared_ptr<ChannelInterface>(
-                     new Channel(target, grpc_lame_client_channel_create(NULL)));
+               : std::shared_ptr<ChannelInterface>(new Channel(
+                     target, grpc_lame_client_channel_create(NULL)));
 }
 }  // namespace grpc

+ 10 - 14
src/csharp/ext/grpc_csharp_ext.c

@@ -169,7 +169,7 @@ grpcsharp_metadata_array_add(grpc_metadata_array *array, const char *key,
 
 GPR_EXPORT gpr_intptr GPR_CALLTYPE
 grpcsharp_metadata_array_count(grpc_metadata_array *array) {
-  return (gpr_intptr) array->count;
+  return (gpr_intptr)array->count;
 }
 
 GPR_EXPORT const char *GPR_CALLTYPE
@@ -184,10 +184,10 @@ grpcsharp_metadata_array_get_value(grpc_metadata_array *array, size_t index) {
   return array->metadata[index].value;
 }
 
-GPR_EXPORT gpr_intptr GPR_CALLTYPE
-grpcsharp_metadata_array_get_value_length(grpc_metadata_array *array, size_t index) {
+GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_metadata_array_get_value_length(
+    grpc_metadata_array *array, size_t index) {
   GPR_ASSERT(index < array->count);
-  return (gpr_intptr) array->metadata[index].value_length;
+  return (gpr_intptr)array->metadata[index].value_length;
 }
 
 /* Move contents of metadata array */
@@ -306,8 +306,7 @@ grpcsharp_batch_context_server_rpc_new_method(
   return ctx->server_rpc_new.call_details.method;
 }
 
-GPR_EXPORT const char *GPR_CALLTYPE
-grpcsharp_batch_context_server_rpc_new_host(
+GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_host(
     const grpcsharp_batch_context *ctx) {
   return ctx->server_rpc_new.call_details.host;
 }
@@ -657,20 +656,17 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
   return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
 }
 
-GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_send_status_from_server(grpc_call *call,
-                                       grpcsharp_batch_context *ctx,
-                                       grpc_status_code status_code,
-                                       const char *status_details,
-                                       grpc_metadata_array *trailing_metadata) {
+GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
+    grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
+    const char *status_details, grpc_metadata_array *trailing_metadata) {
   /* TODO: don't use magic number */
   grpc_op ops[1];
   ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
   ops[0].data.send_status_from_server.status = status_code;
   ops[0].data.send_status_from_server.status_details =
       gpr_strdup(status_details);
-  grpcsharp_metadata_array_move(&(ctx->send_status_from_server.trailing_metadata),
-                                  trailing_metadata);
+  grpcsharp_metadata_array_move(
+      &(ctx->send_status_from_server.trailing_metadata), trailing_metadata);
   ops[0].data.send_status_from_server.trailing_metadata_count =
       ctx->send_status_from_server.trailing_metadata.count;
   ops[0].data.send_status_from_server.trailing_metadata =

+ 2 - 3
src/node/test/surface_test.js

@@ -260,9 +260,8 @@ describe('Echo metadata', function() {
   });
   it('shows the correct user-agent string', function(done) {
     var version = require('../package.json').version;
-    var call = client.unary({}, function(err, data) {
-      assert.ifError(err);
-    }, {key: ['value']});
+    var call = client.unary({}, function(err, data) { assert.ifError(err); },
+                            {key: ['value']});
     call.on('metadata', function(metadata) {
       assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version));
       done();

+ 124 - 0
test/core/end2end/fixtures/chttp2_fullstack_uds_posix_with_poll.c

@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "src/core/channel/client_channel.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/http_server_filter.h"
+#include "src/core/support/string.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/server.h"
+#include "src/core/transport/chttp2_transport.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+typedef struct fullstack_fixture_data {
+  char *localaddr;
+} fullstack_fixture_data;
+
+static int unique = 1;
+
+static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
+    grpc_channel_args *client_args, grpc_channel_args *server_args) {
+  grpc_end2end_test_fixture f;
+  fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data));
+  memset(&f, 0, sizeof(f));
+
+  gpr_asprintf(&ffd->localaddr, "unix:/tmp/grpc_fullstack_test.%d.%d", getpid(),
+               unique++);
+
+  f.fixture_data = ffd;
+  f.cq = grpc_completion_queue_create();
+
+  return f;
+}
+
+void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
+                                  grpc_channel_args *client_args) {
+  fullstack_fixture_data *ffd = f->fixture_data;
+  f->client = grpc_insecure_channel_create(ffd->localaddr, client_args);
+}
+
+void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
+                                  grpc_channel_args *server_args) {
+  fullstack_fixture_data *ffd = f->fixture_data;
+  if (f->server) {
+    grpc_server_destroy(f->server);
+  }
+  f->server = grpc_server_create(server_args);
+  grpc_server_register_completion_queue(f->server, f->cq);
+  GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
+  grpc_server_start(f->server);
+}
+
+void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) {
+  fullstack_fixture_data *ffd = f->fixture_data;
+  gpr_free(ffd->localaddr);
+  gpr_free(ffd);
+}
+
+/* All test configurations */
+static grpc_end2end_test_config configs[] = {
+    {"chttp2/fullstack_uds", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
+     chttp2_create_fixture_fullstack, chttp2_init_client_fullstack,
+     chttp2_init_server_fullstack, chttp2_tear_down_fullstack},
+};
+
+int main(int argc, char **argv) {
+  size_t i;
+
+  grpc_platform_become_multipoller = grpc_poll_become_multipoller;
+
+  grpc_test_init(argc, argv);
+  grpc_init();
+
+  for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
+    grpc_end2end_tests(configs[i]);
+  }
+
+  grpc_shutdown();
+
+  return 0;
+}

+ 33 - 21
test/core/end2end/gen_build_json.py

@@ -36,38 +36,42 @@ import simplejson
 import collections
 
 
-FixtureOptions = collections.namedtuple('FixtureOptions', 'secure platforms')
-default_unsecure_fixture_options = FixtureOptions(False, ['windows', 'posix'])
-default_secure_fixture_options = FixtureOptions(True, ['windows', 'posix'])
+FixtureOptions = collections.namedtuple('FixtureOptions', 'fullstack secure platforms')
+default_unsecure_fixture_options = FixtureOptions(True, False, ['windows', 'posix'])
+socketpair_unsecure_fixture_options = FixtureOptions(False, False, ['windows', 'posix'])
+default_secure_fixture_options = FixtureOptions(True, True, ['windows', 'posix'])
 
 # maps fixture name to whether it requires the security library
 END2END_FIXTURES = {
     'chttp2_fake_security': default_secure_fixture_options,
-    'chttp2_fullstack': default_unsecure_fixture_options,
     'chttp2_fullstack_compression': default_unsecure_fixture_options,
-    'chttp2_fullstack_with_poll': FixtureOptions(False, ['posix']),
-    'chttp2_fullstack_uds_posix': FixtureOptions(False, ['posix']),
+    'chttp2_fullstack': default_unsecure_fixture_options,
+    'chttp2_fullstack_uds_posix': FixtureOptions(True, False, ['posix']),
+    'chttp2_fullstack_uds_posix_with_poll': FixtureOptions(True, False, ['posix']),
+    'chttp2_fullstack_with_poll': FixtureOptions(True, False, ['posix']),
     'chttp2_simple_ssl_fullstack': default_secure_fixture_options,
-    'chttp2_simple_ssl_fullstack_with_poll': FixtureOptions(True, ['posix']),
+    'chttp2_simple_ssl_fullstack_with_poll': FixtureOptions(True, True, ['posix']),
     'chttp2_simple_ssl_with_oauth2_fullstack': default_secure_fixture_options,
-    'chttp2_socket_pair': default_unsecure_fixture_options,
-    'chttp2_socket_pair_one_byte_at_a_time': default_unsecure_fixture_options,
-    'chttp2_socket_pair_with_grpc_trace': default_unsecure_fixture_options,
+    'chttp2_socket_pair_one_byte_at_a_time': socketpair_unsecure_fixture_options,
+    'chttp2_socket_pair': socketpair_unsecure_fixture_options,
+    'chttp2_socket_pair_with_grpc_trace': socketpair_unsecure_fixture_options,
 }
 
-TestOptions = collections.namedtuple('TestOptions', 'flaky secure')
-default_test_options = TestOptions(False, False)
+TestOptions = collections.namedtuple('TestOptions', 'needs_fullstack flaky secure')
+default_test_options = TestOptions(False, False, False)
+connectivity_test_options = TestOptions(True, False, False)
 
 # maps test names to options
 END2END_TESTS = {
     'bad_hostname': default_test_options,
-    'cancel_after_accept': default_test_options,
     'cancel_after_accept_and_writes_closed': default_test_options,
+    'cancel_after_accept': default_test_options,
     'cancel_after_invoke': default_test_options,
     'cancel_before_invoke': default_test_options,
     'cancel_in_a_vacuum': default_test_options,
     'census_simple_request': default_test_options,
-    'disappearing_server': default_test_options,
+    'channel_connectivity': connectivity_test_options,
+    'disappearing_server': connectivity_test_options,
     'early_server_shutdown_finishes_inflight_calls': default_test_options,
     'early_server_shutdown_finishes_tags': default_test_options,
     'empty_batch': default_test_options,
@@ -79,21 +83,28 @@ END2END_TESTS = {
     'ping_pong_streaming': default_test_options,
     'registered_call': default_test_options,
     'request_response_with_binary_metadata_and_payload': default_test_options,
-    'request_response_with_trailing_metadata_and_payload': default_test_options,
     'request_response_with_metadata_and_payload': default_test_options,
+    'request_response_with_payload_and_call_creds': TestOptions(needs_fullstack=False, flaky=False, secure=True),
     'request_response_with_payload': default_test_options,
-    'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True),
-    'request_with_large_metadata': default_test_options,
-    'request_with_payload': default_test_options,
+    'request_response_with_trailing_metadata_and_payload': default_test_options,
     'request_with_compressed_payload': default_test_options,
     'request_with_flags': default_test_options,
+    'request_with_large_metadata': default_test_options,
+    'request_with_payload': default_test_options,
     'server_finishes_request': default_test_options,
-    'simple_delayed_request': default_test_options,
+    'simple_delayed_request': connectivity_test_options,
     'simple_request': default_test_options,
     'simple_request_with_high_initial_sequence_number': default_test_options,
 }
 
 
+def compatible(f, t):
+  if END2END_TESTS[t].needs_fullstack:
+    if not END2END_FIXTURES[f].fullstack:
+      return False
+  return True
+
+
 def main():
   sec_deps = [
     'end2end_certs',
@@ -157,7 +168,8 @@ def main():
                   'end2end_test_%s' % t] + sec_deps
           }
       for f in sorted(END2END_FIXTURES.keys())
-      for t in sorted(END2END_TESTS.keys())] + [
+      for t in sorted(END2END_TESTS.keys())
+      if compatible(f, t)] + [
           {
               'name': '%s_%s_unsecure_test' % (f, t),
               'build': 'test',
@@ -171,7 +183,7 @@ def main():
                   'end2end_test_%s' % t] + unsec_deps
           }
       for f in sorted(END2END_FIXTURES.keys()) if not END2END_FIXTURES[f].secure
-      for t in sorted(END2END_TESTS.keys()) if not END2END_TESTS[t].secure]}
+      for t in sorted(END2END_TESTS.keys()) if compatible(f, t) and not END2END_TESTS[t].secure]}
   print simplejson.dumps(json, sort_keys=True, indent=2 * ' ')
 
 

+ 123 - 0
test/core/end2end/tests/channel_connectivity.c

@@ -0,0 +1,123 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
+#include "test/core/end2end/cq_verifier.h"
+
+static void *tag(gpr_intptr t) { return (void *)t; }
+
+static void test_connectivity(grpc_end2end_test_config config) {
+  grpc_end2end_test_fixture f = config.create_fixture(NULL, NULL);
+  grpc_connectivity_state state;
+  cq_verifier *cqv = cq_verifier_create(f.cq);
+
+  config.init_client(&f, NULL);
+
+  /* channels should start life in IDLE, and stay there */
+  GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) == GRPC_CHANNEL_IDLE);
+  gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
+  GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) == GRPC_CHANNEL_IDLE);
+
+  /* start watching for a change */
+  grpc_channel_watch_connectivity_state(
+  	f.client, GRPC_CHANNEL_IDLE, &state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(1));
+  /* nothing should happen */
+  cq_verify_empty(cqv);
+
+  /* check that we're still in idle, and start connecting */
+  GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) == GRPC_CHANNEL_IDLE);
+
+  /* and now the watch should trigger */
+  cq_expect_completion(cqv, tag(1), 1);
+  cq_verify(cqv);
+  GPR_ASSERT(state == GRPC_CHANNEL_CONNECTING);
+
+  /* quickly followed by a transition to TRANSIENT_FAILURE */
+  grpc_channel_watch_connectivity_state(
+  	f.client, GRPC_CHANNEL_CONNECTING, &state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(2));
+  cq_expect_completion(cqv, tag(2), 1);
+  cq_verify(cqv);
+  GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE);
+
+  gpr_log(GPR_DEBUG, "*** STARTING SERVER ***");
+
+  /* now let's bring up a server to connect to */
+  config.init_server(&f, NULL);
+
+  gpr_log(GPR_DEBUG, "*** STARTED SERVER ***");
+
+  /* we'll go through some set of transitions (some might be missed), until
+     READY is reached */
+  while (state != GRPC_CHANNEL_READY) {
+  	grpc_channel_watch_connectivity_state(
+  		f.client, state, &state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(3));
+  	cq_expect_completion(cqv, tag(3), 1);
+    cq_verify(cqv);
+  	GPR_ASSERT(state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_TRANSIENT_FAILURE);
+  }
+
+  /* bring down the server again */
+  /* we should go immediately to TRANSIENT_FAILURE */
+  gpr_log(GPR_DEBUG, "*** SHUTTING DOWN SERVER ***");
+
+  grpc_channel_watch_connectivity_state(
+  	f.client, GRPC_CHANNEL_READY, &state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(4));
+
+  grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead));
+
+  cq_expect_completion(cqv, tag(4), 1);
+  cq_expect_completion(cqv, tag(0xdead), 1);
+  cq_verify(cqv);
+  GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE);
+
+  /* cleanup server */
+  grpc_server_destroy(f.server);
+
+  gpr_log(GPR_DEBUG, "*** SHUTDOWN SERVER ***");
+
+  grpc_channel_destroy(f.client);
+  grpc_completion_queue_shutdown(f.cq);
+  grpc_completion_queue_destroy(f.cq);
+  config.tear_down_data(&f);
+
+  cq_verifier_destroy(cqv);
+}
+
+void grpc_end2end_tests(grpc_end2end_test_config config) {
+  GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
+  test_connectivity(config);
+}

+ 2 - 3
test/core/end2end/tests/disappearing_server.c

@@ -199,7 +199,6 @@ static void disappearing_server_test(grpc_end2end_test_config config) {
 }
 
 void grpc_end2end_tests(grpc_end2end_test_config config) {
-  if (config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION) {
-    disappearing_server_test(config);
-  }
+  GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
+  disappearing_server_test(config);
 }

+ 3 - 4
test/core/end2end/tests/simple_delayed_request.c

@@ -207,8 +207,7 @@ static void test_simple_delayed_request_long(grpc_end2end_test_config config) {
 }
 
 void grpc_end2end_tests(grpc_end2end_test_config config) {
-  if (config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION) {
-    test_simple_delayed_request_short(config);
-    test_simple_delayed_request_long(config);
-  }
+  GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
+  test_simple_delayed_request_short(config);
+  test_simple_delayed_request_long(config);
 }

+ 1 - 1
test/cpp/client/credentials_test.cc

@@ -47,7 +47,7 @@ class CredentialsTest : public ::testing::Test {
 
 TEST_F(CredentialsTest, InvalidServiceAccountCreds) {
   std::shared_ptr<Credentials> bad1 = ServiceAccountCredentials("", "", 1);
-  EXPECT_EQ(static_cast<Credentials *>(nullptr), bad1.get());
+  EXPECT_EQ(static_cast<Credentials*>(nullptr), bad1.get());
 }
 
 }  // namespace testing

+ 1 - 1
test/cpp/end2end/end2end_test.cc

@@ -516,7 +516,7 @@ TEST_F(End2endTest, DiffPackageServices) {
 // rpc and stream should fail on bad credentials.
 TEST_F(End2endTest, BadCredentials) {
   std::shared_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1);
-  EXPECT_EQ(static_cast<Credentials *>(nullptr), bad_creds.get());
+  EXPECT_EQ(static_cast<Credentials*>(nullptr), bad_creds.get());
   std::shared_ptr<ChannelInterface> channel =
       CreateChannel(server_address_.str(), bad_creds, ChannelArguments());
   std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub(

+ 1 - 0
tools/doxygen/Doxyfile.core.internal

@@ -978,6 +978,7 @@ src/core/surface/call.c \
 src/core/surface/call_details.c \
 src/core/surface/call_log_batch.c \
 src/core/surface/channel.c \
+src/core/surface/channel_connectivity.c \
 src/core/surface/channel_create.c \
 src/core/surface/completion_queue.c \
 src/core/surface/event_string.c \

+ 1 - 1
tools/run_tests/run_tests.py

@@ -346,7 +346,7 @@ _CONFIGS = {
     'dbg': SimpleConfig('dbg'),
     'opt': SimpleConfig('opt'),
     'tsan': SimpleConfig('tsan', environ={
-        'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt:halt_on_error=1'}),
+        'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1'}),
     'msan': SimpleConfig('msan'),
     'ubsan': SimpleConfig('ubsan'),
     'asan': SimpleConfig('asan', environ={

File diff suppressed because it is too large
+ 201 - 111
tools/run_tests/sources_and_headers.json


File diff suppressed because it is too large
+ 673 - 132
tools/run_tests/tests.json


File diff suppressed because it is too large
+ 0 - 0
vsprojects/Grpc.mak


+ 2 - 0
vsprojects/grpc/grpc.vcxproj

@@ -465,6 +465,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\completion_queue.c">

+ 3 - 0
vsprojects/grpc/grpc.vcxproj.filters

@@ -289,6 +289,9 @@
     <ClCompile Include="..\..\src\core\surface\channel.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>

+ 2 - 0
vsprojects/grpc_unsecure/grpc_unsecure.vcxproj

@@ -400,6 +400,8 @@
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel.c">
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
     </ClCompile>
     <ClCompile Include="..\..\src\core\surface\completion_queue.c">

+ 3 - 0
vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@@ -220,6 +220,9 @@
     <ClCompile Include="..\..\src\core\surface\channel.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
+      <Filter>src\core\surface</Filter>
+    </ClCompile>
     <ClCompile Include="..\..\src\core\surface\channel_create.c">
       <Filter>src\core\surface</Filter>
     </ClCompile>

Some files were not shown because too many files changed in this diff