Quellcode durchsuchen

Fix state reported by pick_first when we receive a GOAWAY with a pending subchannel list.

Mark D. Roth vor 6 Jahren
Ursprung
Commit
c9421eeb85

+ 43 - 18
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc

@@ -102,6 +102,14 @@ class PickFirst : public LoadBalancingPolicy {
       PickFirst* p = static_cast<PickFirst*>(policy());
       p->Unref(DEBUG_LOCATION, "subchannel_list");
     }
+
+    bool in_transient_failure() const { return in_transient_failure_; }
+    void set_in_transient_failure(bool in_transient_failure) {
+      in_transient_failure_ = in_transient_failure;
+    }
+
+   private:
+    bool in_transient_failure_ = false;
   };
 
   class Picker : public SubchannelPicker {
@@ -368,12 +376,21 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
       p->selected_ = nullptr;
       StopConnectivityWatchLocked();
       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
-      grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-          "selected subchannel not ready; switching to pending update", &error,
-          1);
-      p->channel_control_helper()->UpdateState(
-          GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
-          UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(new_error)));
+      // Set our state to that of the pending subchannel list.
+      if (p->subchannel_list_->in_transient_failure()) {
+        grpc_error* new_error =
+            GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                "selected subchannel failed; switching to pending update",
+                &error, 1);
+        p->channel_control_helper()->UpdateState(
+            GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
+            UniquePtr<SubchannelPicker>(
+                New<TransientFailurePicker>(new_error)));
+      } else {
+        p->channel_control_helper()->UpdateState(
+            GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+            UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
+      }
     } else {
       if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
         // If the selected subchannel goes bad, request a re-resolution. We
@@ -382,7 +399,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
         // to connect to the re-resolved backends until we leave IDLE state.
         p->idle_ = true;
         p->channel_control_helper()->RequestReresolution();
-        // In transient failure. Rely on re-resolution to recover.
         p->selected_ = nullptr;
         StopConnectivityWatchLocked();
         p->channel_control_helper()->UpdateState(
@@ -418,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
   //    for a subchannel in p->latest_pending_subchannel_list_.  The
   //    goal here is to find a subchannel from the update that we can
   //    select in place of the current one.
+  subchannel_list()->set_in_transient_failure(false);
   switch (connectivity_state) {
     case GRPC_CHANNEL_READY: {
       // Renew notification.
@@ -431,17 +448,25 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
       size_t next_index =
           (sd->Index() + 1) % subchannel_list()->num_subchannels();
       sd = subchannel_list()->subchannel(next_index);
-      // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
-      // all subchannels.
-      if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
-        p->channel_control_helper()->RequestReresolution();
-        grpc_error* new_error =
-            GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                "failed to connect to all addresses", &error, 1);
-        p->channel_control_helper()->UpdateState(
-            GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
-            UniquePtr<SubchannelPicker>(
-                New<TransientFailurePicker>(new_error)));
+      // If we're tried all subchannels, set state to TRANSIENT_FAILURE.
+      if (sd->Index() == 0) {
+        // Re-resolve if this is the most recent subchannel list.
+        if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
+                                      ? p->latest_pending_subchannel_list_.get()
+                                      : p->subchannel_list_.get())) {
+          p->channel_control_helper()->RequestReresolution();
+        }
+        subchannel_list()->set_in_transient_failure(true);
+        // Only report new state in case 1.
+        if (subchannel_list() == p->subchannel_list_.get()) {
+          grpc_error* new_error =
+              GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                  "failed to connect to all addresses", &error, 1);
+          p->channel_control_helper()->UpdateState(
+              GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(new_error),
+              UniquePtr<SubchannelPicker>(
+                  New<TransientFailurePicker>(new_error)));
+        }
       }
       sd->CheckConnectivityStateAndStartWatchingLocked();
       break;

+ 4 - 2
src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@@ -1136,8 +1136,10 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
   }
   t->goaway_error = grpc_error_set_str(
       grpc_error_set_int(
-          GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
-          GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
+          grpc_error_set_int(
+              GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
+              GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
+          GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
 
   /* We want to log this irrespective of whether http tracing is enabled */

+ 1 - 0
test/cpp/end2end/BUILD

@@ -402,6 +402,7 @@ grpc_cc_test(
     name = "client_lb_end2end_test",
     srcs = ["client_lb_end2end_test.cc"],
     external_deps = [
+        "gmock",
         "gtest",
     ],
     deps = [

+ 52 - 2
test/cpp/end2end/client_lb_end2end_test.cc

@@ -56,6 +56,7 @@
 #include "test/core/util/test_lb_policies.h"
 #include "test/cpp/end2end/test_service_impl.h"
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 using grpc::testing::EchoRequest;
@@ -221,9 +222,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
     response_generator_->SetFailureOnReresolution();
   }
 
-  std::vector<int> GetServersPorts() {
+  std::vector<int> GetServersPorts(size_t start_index = 0) {
     std::vector<int> ports;
-    for (const auto& server : servers_) ports.push_back(server->port_);
+    for (size_t i = start_index; i < servers_.size(); ++i) {
+      ports.push_back(servers_[i]->port_);
+    }
     return ports;
   }
 
@@ -897,6 +900,53 @@ TEST_F(ClientLbEnd2endTest, PickFirstIdleOnDisconnect) {
   servers_.clear();
 }
 
+TEST_F(ClientLbEnd2endTest, PickFirstPendingUpdateAndSelectedSubchannelFails) {
+  auto channel = BuildChannel("");  // pick_first is the default.
+  auto stub = BuildStub(channel);
+  // Create a number of servers, but only start 1 of them.
+  CreateServers(10);
+  StartServer(0);
+  // Initially resolve to first server and make sure it connects.
+  gpr_log(GPR_INFO, "Phase 1: Connect to first server.");
+  SetNextResolution({servers_[0]->port_});
+  CheckRpcSendOk(stub, DEBUG_LOCATION, true /* wait_for_ready */);
+  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
+  // Send a resolution update with the remaining servers, none of which are
+  // running yet, so the update will stay pending.  Note that it's important
+  // to have multiple servers here, or else the test will be flaky; with only
+  // one server, the pending subchannel list has already gone into
+  // TRANSIENT_FAILURE due to hitting the end of the list by the time we
+  // check the state.
+  gpr_log(GPR_INFO,
+          "Phase 2: Resolver update pointing to remaining "
+          "(not started) servers.");
+  SetNextResolution(GetServersPorts(1 /* start_index */));
+  // RPCs will continue to be sent to the first server.
+  CheckRpcSendOk(stub, DEBUG_LOCATION);
+  // Now stop the first server, so that the current subchannel list
+  // fails.  This should cause us to immediately swap over to the
+  // pending list, even though it's not yet connected.  The state should
+  // be set to CONNECTING, since that's what the pending subchannel list
+  // was doing when we swapped over.
+  gpr_log(GPR_INFO, "Phase 3: Stopping first server.");
+  servers_[0]->Shutdown();
+  WaitForChannelNotReady(channel.get());
+  // TODO(roth): This should always return CONNECTING, but it's flaky
+  // between that and TRANSIENT_FAILURE.  I suspect that this problem
+  // will go away once we move the backoff code out of the subchannel
+  // and into the LB policies.
+  EXPECT_THAT(channel->GetState(false),
+              ::testing::AnyOf(GRPC_CHANNEL_CONNECTING,
+                               GRPC_CHANNEL_TRANSIENT_FAILURE));
+  // Now start the second server.
+  gpr_log(GPR_INFO, "Phase 4: Starting second server.");
+  StartServer(1);
+  // The channel should go to READY state and RPCs should go to the
+  // second server.
+  WaitForChannelReady(channel.get());
+  WaitForServer(stub, 1, DEBUG_LOCATION, true /* ignore_failure */);
+}
+
 TEST_F(ClientLbEnd2endTest, RoundRobin) {
   // Start servers and send one RPC per server.
   const int kNumServers = 3;