Browse Source

Split StartOrRenewConnectivityWatchLocked() into two methods.

Mark D. Roth 7 years ago
parent
commit
6d21c8bdc4

+ 8 - 8
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc

@@ -211,7 +211,7 @@ void PickFirst::StartPickingLocked() {
   if (subchannel_list_ != nullptr) {
     for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
       if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
-        subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked();
+        subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
         break;
       }
     }
@@ -315,7 +315,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
     // If we've started picking, start trying to connect to the first
     // subchannel in the new list.
     if (started_picking_) {
-      subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked();
+      subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
     }
   } else {
     // We do have a selected subchannel.
@@ -337,7 +337,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
         selected_ = sd;
         subchannel_list_ = std::move(subchannel_list);
         DestroyUnselectedSubchannelsLocked();
-        sd->StartOrRenewConnectivityWatchLocked();
+        sd->StartConnectivityWatchLocked();
         // If there was a previously pending update (which may or may
         // not have contained the currently selected subchannel), drop
         // it, so that it doesn't override what we've done here.
@@ -363,7 +363,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
     // subchannel in the new list.
     if (started_picking_) {
       latest_pending_subchannel_list_->subchannel(0)
-          ->StartOrRenewConnectivityWatchLocked();
+          ->StartConnectivityWatchLocked();
     }
   }
 }
@@ -420,7 +420,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
         grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(),
                                     GRPC_ERROR_REF(error), "selected_changed");
         // Renew notification.
-        StartOrRenewConnectivityWatchLocked();
+        RenewConnectivityWatchLocked();
       }
     }
     GRPC_ERROR_UNREF(error);
@@ -467,7 +467,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
         GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
       }
       // Renew notification.
-      StartOrRenewConnectivityWatchLocked();
+      RenewConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
@@ -485,7 +485,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
             &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
             GRPC_ERROR_REF(error), "connecting_transient_failure");
       }
-      sd->StartOrRenewConnectivityWatchLocked();
+      sd->StartConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_CONNECTING:
@@ -497,7 +497,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
                                     "connecting_changed");
       }
       // Renew notification.
-      StartOrRenewConnectivityWatchLocked();
+      RenewConnectivityWatchLocked();
       break;
     }
     case GRPC_CHANNEL_SHUTDOWN:

+ 2 - 2
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc

@@ -444,7 +444,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
   // Start connectivity watch for each subchannel.
   for (size_t i = 0; i < num_subchannels(); i++) {
     if (subchannel(i)->subchannel() != nullptr) {
-      subchannel(i)->StartOrRenewConnectivityWatchLocked();
+      subchannel(i)->StartConnectivityWatchLocked();
     }
   }
 }
@@ -584,7 +584,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
   // If we've started watching, update overall state and renew notification.
   if (subchannel_list()->started_watching()) {
     subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
-    StartOrRenewConnectivityWatchLocked();
+    RenewConnectivityWatchLocked();
   }
   GRPC_ERROR_UNREF(error);
 }

+ 46 - 17
src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@@ -111,8 +111,9 @@ class SubchannelData {
   // Synchronously checks the subchannel's connectivity state.  Calls
   // ProcessConnectivityChangeLocked() if the state has changed.
   // Must not be called while there is a connectivity notification
-  // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
-  // and the resulting invocation of ProcessConnectivityChangeLocked()).
+  // pending (i.e., between calling StartConnectivityWatchLocked() or
+  // RenewConnectivityWatchLocked() and the resulting invocation of
+  // ProcessConnectivityChangeLocked()).
   void CheckConnectivityStateLocked() {
     GPR_ASSERT(!connectivity_notification_pending_);
     grpc_error* error = GRPC_ERROR_NONE;
@@ -133,18 +134,22 @@ class SubchannelData {
   // being unreffed.
   virtual void UnrefSubchannelLocked(const char* reason);
 
-  // Starts or renewes watching the connectivity state of the subchannel.
+  // Starts watching the connectivity state of the subchannel.
   // ProcessConnectivityChangeLocked() will be called when the
   // connectivity state changes.
-  void StartOrRenewConnectivityWatchLocked();
+  void StartConnectivityWatchLocked();
+
+  // Renews watching the connectivity state of the subchannel.
+  void RenewConnectivityWatchLocked();
 
   // Stops watching the connectivity state of the subchannel.
   void StopConnectivityWatchLocked();
 
   // Cancels watching the connectivity state of the subchannel.
   // Must be called only while there is a connectivity notification
-  // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
-  // and the resulting invocation of ProcessConnectivityChangeLocked()).
+  // pending (i.e., between calling StartConnectivityWatchLocked() or
+  // RenewConnectivityWatchLocked() and the resulting invocation of
+  // ProcessConnectivityChangeLocked()).
   // From within ProcessConnectivityChangeLocked(), use
   // StopConnectivityWatchLocked() instead.
   void CancelConnectivityWatchLocked(const char* reason);
@@ -162,12 +167,13 @@ class SubchannelData {
 
   virtual ~SubchannelData();
 
-  // After StartOrRenewConnectivityWatchLocked() is called, this method
-  // will be invoked when the subchannel's connectivity state changes.
+  // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
+  // is called, this method will be invoked when the subchannel's connectivity
+  // state changes.
   // Implementations can use connectivity_state() to get the new
   // connectivity state.
-  // Implementations must invoke either StopConnectivityWatch() or again
-  // call StartOrRenewConnectivityWatch() before returning.
+  // Implementations must invoke either RenewConnectivityWatchLocked() or
+  // StopConnectivityWatchLocked() before returning.
   virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
 
  private:
@@ -252,6 +258,8 @@ class SubchannelList
 
   TraceFlag* tracer_;
 
+  grpc_combiner* combiner_;
+
   // The list of subchannels.
   SubchannelVector subchannels_;
 
@@ -313,21 +321,39 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
 
 template <typename SubchannelListType, typename SubchannelDataType>
 void SubchannelData<SubchannelListType,
-                    SubchannelDataType>::StartOrRenewConnectivityWatchLocked() {
+                    SubchannelDataType>::StartConnectivityWatchLocked() {
   if (subchannel_list_->tracer()->enabled()) {
     gpr_log(GPR_DEBUG,
             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
-            " (subchannel %p): requesting connectivity change "
+            " (subchannel %p): starting watch: requesting connectivity change "
             "notification (from %s)",
             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
             subchannel_,
             grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
   }
-  if (!connectivity_notification_pending_) {
-    subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
-    connectivity_notification_pending_ = true;
+  GPR_ASSERT(!connectivity_notification_pending_);
+  connectivity_notification_pending_ = true;
+  subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
+  grpc_subchannel_notify_on_state_change(
+      subchannel_, subchannel_list_->policy()->interested_parties(),
+      &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+                    SubchannelDataType>::RenewConnectivityWatchLocked() {
+  if (subchannel_list_->tracer()->enabled()) {
+    gpr_log(GPR_DEBUG,
+            "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+            " (subchannel %p): renewing watch: requesting connectivity change "
+            "notification (from %s)",
+            subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+            subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+            subchannel_,
+            grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
   }
+  GPR_ASSERT(connectivity_notification_pending_);
   grpc_subchannel_notify_on_state_change(
       subchannel_, subchannel_list_->policy()->interested_parties(),
       &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@@ -360,6 +386,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
             subchannel_, reason);
   }
+  GPR_ASSERT(connectivity_notification_pending_);
   grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
                                          &connectivity_changed_closure_);
 }
@@ -427,7 +454,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
   // Get or release ref to connected subchannel.
   if (!sd->UpdateConnectedSubchannelLocked()) {
     // We don't want to report this connectivity state, so renew the watch.
-    sd->StartOrRenewConnectivityWatchLocked();
+    sd->RenewConnectivityWatchLocked();
     return;
   }
   // Now that we're inside the combiner, copy the pending connectivity
@@ -462,7 +489,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
     const grpc_channel_args& args)
     : InternallyRefCountedWithTracing<SubchannelListType>(tracer),
       policy_(policy),
-      tracer_(tracer) {
+      tracer_(tracer),
+      combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) {
   if (tracer_->enabled()) {
     gpr_log(GPR_DEBUG,
             "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
@@ -523,6 +551,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
     gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
             policy_, this);
   }
+  GRPC_COMBINER_UNREF(combiner_, "subchannel_list");
 }
 
 template <typename SubchannelListType, typename SubchannelDataType>