|  | @@ -51,6 +51,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/inlined_vector.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/manual_constructor.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/gprpp/mutex_lock.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/combiner.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/iomgr.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/polling_entity.h"
 | 
	
	
		
			
				|  | @@ -91,7 +92,55 @@ grpc_core::TraceFlag grpc_client_channel_routing_trace(
 | 
	
		
			
				|  |  |   * CHANNEL-WIDE FUNCTIONS
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -struct external_connectivity_watcher;
 | 
	
		
			
				|  |  | +// Forward declaration.
 | 
	
		
			
				|  |  | +typedef struct client_channel_channel_data channel_data;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ExternalConnectivityWatcher {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  class WatcherList {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    WatcherList() { gpr_mu_init(&mu_); }
 | 
	
		
			
				|  |  | +    ~WatcherList() { gpr_mu_destroy(&mu_); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    int size() const;
 | 
	
		
			
				|  |  | +    ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
 | 
	
		
			
				|  |  | +    void Add(ExternalConnectivityWatcher* watcher);
 | 
	
		
			
				|  |  | +    void Remove(const ExternalConnectivityWatcher* watcher);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    // head_ is guarded by a mutex, since the size() method needs to
 | 
	
		
			
				|  |  | +    // iterate over the list, and it's called from the C-core API
 | 
	
		
			
				|  |  | +    // function grpc_channel_num_external_connectivity_watchers(), which
 | 
	
		
			
				|  |  | +    // is synchronous and therefore cannot run in the combiner.
 | 
	
		
			
				|  |  | +    mutable gpr_mu mu_;
 | 
	
		
			
				|  |  | +    ExternalConnectivityWatcher* head_ = nullptr;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ExternalConnectivityWatcher(channel_data* chand, grpc_polling_entity pollent,
 | 
	
		
			
				|  |  | +                              grpc_connectivity_state* state,
 | 
	
		
			
				|  |  | +                              grpc_closure* on_complete,
 | 
	
		
			
				|  |  | +                              grpc_closure* watcher_timer_init);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ~ExternalConnectivityWatcher();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void OnWatchCompleteLocked(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  | +  static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +  grpc_polling_entity pollent_;
 | 
	
		
			
				|  |  | +  grpc_connectivity_state* state_;
 | 
	
		
			
				|  |  | +  grpc_closure* on_complete_;
 | 
	
		
			
				|  |  | +  grpc_closure* watcher_timer_init_;
 | 
	
		
			
				|  |  | +  grpc_closure my_closure_;
 | 
	
		
			
				|  |  | +  ExternalConnectivityWatcher* next_ = nullptr;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}  // namespace
 | 
	
		
			
				|  |  | +}  // namespace grpc_core
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct QueuedPick {
 | 
	
		
			
				|  |  |    LoadBalancingPolicy::PickArgs pick;
 | 
	
	
		
			
				|  | @@ -99,51 +148,53 @@ struct QueuedPick {
 | 
	
		
			
				|  |  |    QueuedPick* next = nullptr;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -typedef struct client_channel_channel_data {
 | 
	
		
			
				|  |  | +struct client_channel_channel_data {
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields set at construction and never modified.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  |    bool deadline_checking_enabled;
 | 
	
		
			
				|  |  |    bool enable_retries;
 | 
	
		
			
				|  |  |    size_t per_rpc_retry_buffer_size;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** combiner protecting all variables below in this data structure */
 | 
	
		
			
				|  |  | -  grpc_combiner* combiner;
 | 
	
		
			
				|  |  | -  /** owning stack */
 | 
	
		
			
				|  |  |    grpc_channel_stack* owning_stack;
 | 
	
		
			
				|  |  | -  /** interested parties (owned) */
 | 
	
		
			
				|  |  | -  grpc_pollset_set* interested_parties;
 | 
	
		
			
				|  |  | -  // Client channel factory.
 | 
	
		
			
				|  |  |    grpc_core::ClientChannelFactory* client_channel_factory;
 | 
	
		
			
				|  |  | -  // Subchannel pool.
 | 
	
		
			
				|  |  | -  grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_core::channelz::ClientChannelNode* channelz_node;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Resolving LB policy.
 | 
	
		
			
				|  |  | -  grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
 | 
	
		
			
				|  |  | -  // Subchannel picker from LB policy.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields used in the data plane.  Protected by data_plane_combiner.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_combiner* data_plane_combiner;
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
 | 
	
		
			
				|  |  | -  // Linked list of queued picks.
 | 
	
		
			
				|  |  | -  QueuedPick* queued_picks;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  bool have_service_config;
 | 
	
		
			
				|  |  | -  /** retry throttle data from service config */
 | 
	
		
			
				|  |  | +  QueuedPick* queued_picks;  // Linked list of queued picks.
 | 
	
		
			
				|  |  | +  // Data from service config.
 | 
	
		
			
				|  |  | +  bool received_service_config_data;
 | 
	
		
			
				|  |  |    grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
 | 
	
		
			
				|  |  | -  /** per-method service config data */
 | 
	
		
			
				|  |  |    grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* the following properties are guarded by a mutex since APIs require them
 | 
	
		
			
				|  |  | -     to be instantaneously available */
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields used in the control plane.  Protected by combiner.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_combiner* combiner;
 | 
	
		
			
				|  |  | +  grpc_pollset_set* interested_parties;
 | 
	
		
			
				|  |  | +  grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
 | 
	
		
			
				|  |  | +  grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields accessed from both data plane and control plane combiners.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_core::Atomic<grpc_error*> disconnect_error;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // The following properties are guarded by a mutex since APIs require them
 | 
	
		
			
				|  |  | +  // to be instantaneously available.
 | 
	
		
			
				|  |  |    gpr_mu info_mu;
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> info_lb_policy_name;
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> info_service_config_json;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  | -  grpc_error* disconnect_error;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* external_connectivity_watcher_list head is guarded by its own mutex, since
 | 
	
		
			
				|  |  | -   * counts need to be grabbed immediately without polling on a cq */
 | 
	
		
			
				|  |  | -  gpr_mu external_connectivity_watcher_list_mu;
 | 
	
		
			
				|  |  | -  struct external_connectivity_watcher* external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | -} channel_data;
 | 
	
		
			
				|  |  | +  grpc_core::ManualConstructor<
 | 
	
		
			
				|  |  | +      grpc_core::ExternalConnectivityWatcher::WatcherList>
 | 
	
		
			
				|  |  | +      external_connectivity_watcher_list;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Forward declarations.
 | 
	
		
			
				|  |  |  static void start_pick_locked(void* arg, grpc_error* ignored);
 | 
	
	
		
			
				|  | @@ -166,30 +217,215 @@ static const char* get_channel_connectivity_state_change_string(
 | 
	
		
			
				|  |  |    GPR_UNREACHABLE_CODE(return "UNKNOWN");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | -    channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | -    const char* reason,
 | 
	
		
			
				|  |  | -    grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | -  // Update connectivity state.
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
 | 
	
		
			
				|  |  | -                              reason);
 | 
	
		
			
				|  |  | -  if (chand->channelz_node != nullptr) {
 | 
	
		
			
				|  |  | -    chand->channelz_node->AddTraceEvent(
 | 
	
		
			
				|  |  | -        grpc_core::channelz::ChannelTrace::Severity::Info,
 | 
	
		
			
				|  |  | -        grpc_slice_from_static_string(
 | 
	
		
			
				|  |  | -            get_channel_connectivity_state_change_string(state)));
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// A fire-and-forget class that sets the channel's connectivity state
 | 
	
		
			
				|  |  | +// and then hops into the data plane combiner to update the picker.
 | 
	
		
			
				|  |  | +// Must be instantiated while holding the control plane combiner.
 | 
	
		
			
				|  |  | +// Deletes itself when done.
 | 
	
		
			
				|  |  | +class ConnectivityStateAndPickerSetter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ConnectivityStateAndPickerSetter(
 | 
	
		
			
				|  |  | +      channel_data* chand, grpc_connectivity_state state,
 | 
	
		
			
				|  |  | +      grpc_error* state_error, const char* reason,
 | 
	
		
			
				|  |  | +      UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
 | 
	
		
			
				|  |  | +      : chand_(chand), picker_(std::move(picker)) {
 | 
	
		
			
				|  |  | +    // Update connectivity state here, while holding control plane combiner.
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
 | 
	
		
			
				|  |  | +                                reason);
 | 
	
		
			
				|  |  | +    if (chand->channelz_node != nullptr) {
 | 
	
		
			
				|  |  | +      chand->channelz_node->AddTraceEvent(
 | 
	
		
			
				|  |  | +          channelz::ChannelTrace::Severity::Info,
 | 
	
		
			
				|  |  | +          grpc_slice_from_static_string(
 | 
	
		
			
				|  |  | +              get_channel_connectivity_state_change_string(state)));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Bounce into the data plane combiner to reset the picker.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_REF(chand->owning_stack,
 | 
	
		
			
				|  |  | +                           "ConnectivityStateAndPickerSetter");
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void SetPicker(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +    auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
 | 
	
		
			
				|  |  | +    // Update picker.
 | 
	
		
			
				|  |  | +    self->chand_->picker = std::move(self->picker_);
 | 
	
		
			
				|  |  | +    // Re-process queued picks.
 | 
	
		
			
				|  |  | +    for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +         pick = pick->next) {
 | 
	
		
			
				|  |  | +      start_pick_locked(pick->elem, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Clean up.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack,
 | 
	
		
			
				|  |  | +                             "ConnectivityStateAndPickerSetter");
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +  UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
 | 
	
		
			
				|  |  | +  grpc_closure closure_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// A fire-and-forget class that sets the channel's service config data
 | 
	
		
			
				|  |  | +// in the data plane combiner.  Deletes itself when done.
 | 
	
		
			
				|  |  | +class ServiceConfigSetter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ServiceConfigSetter(
 | 
	
		
			
				|  |  | +      channel_data* chand,
 | 
	
		
			
				|  |  | +      RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
 | 
	
		
			
				|  |  | +      RefCountedPtr<ClientChannelMethodParamsTable> method_params_table)
 | 
	
		
			
				|  |  | +      : chand_(chand),
 | 
	
		
			
				|  |  | +        retry_throttle_data_(std::move(retry_throttle_data)),
 | 
	
		
			
				|  |  | +        method_params_table_(std::move(method_params_table)) {
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter");
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // Update picker.
 | 
	
		
			
				|  |  | -  chand->picker = std::move(picker);
 | 
	
		
			
				|  |  | -  // Re-process queued picks.
 | 
	
		
			
				|  |  | -  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | -       pick = pick->next) {
 | 
	
		
			
				|  |  | -    start_pick_locked(pick->elem, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void SetServiceConfigData(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +    ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
 | 
	
		
			
				|  |  | +    channel_data* chand = self->chand_;
 | 
	
		
			
				|  |  | +    // Update channel state.
 | 
	
		
			
				|  |  | +    chand->received_service_config_data = true;
 | 
	
		
			
				|  |  | +    chand->retry_throttle_data = std::move(self->retry_throttle_data_);
 | 
	
		
			
				|  |  | +    chand->method_params_table = std::move(self->method_params_table_);
 | 
	
		
			
				|  |  | +    // Apply service config to queued picks.
 | 
	
		
			
				|  |  | +    for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +         pick = pick->next) {
 | 
	
		
			
				|  |  | +      maybe_apply_service_config_to_call_locked(pick->elem);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Clean up.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter");
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
 | 
	
		
			
				|  |  | +  RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
 | 
	
		
			
				|  |  | +  grpc_closure closure_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// ExternalConnectivityWatcher::WatcherList
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +int ExternalConnectivityWatcher::WatcherList::size() const {
 | 
	
		
			
				|  |  | +  MutexLock lock(&mu_);
 | 
	
		
			
				|  |  | +  int count = 0;
 | 
	
		
			
				|  |  | +  for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
 | 
	
		
			
				|  |  | +    ++count;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return count;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -namespace grpc_core {
 | 
	
		
			
				|  |  | -namespace {
 | 
	
		
			
				|  |  | +ExternalConnectivityWatcher* ExternalConnectivityWatcher::WatcherList::Lookup(
 | 
	
		
			
				|  |  | +    grpc_closure* on_complete) const {
 | 
	
		
			
				|  |  | +  MutexLock lock(&mu_);
 | 
	
		
			
				|  |  | +  ExternalConnectivityWatcher* w = head_;
 | 
	
		
			
				|  |  | +  while (w != nullptr && w->on_complete_ != on_complete) {
 | 
	
		
			
				|  |  | +    w = w->next_;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return w;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void ExternalConnectivityWatcher::WatcherList::Add(
 | 
	
		
			
				|  |  | +    ExternalConnectivityWatcher* watcher) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
 | 
	
		
			
				|  |  | +  MutexLock lock(&mu_);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(watcher->next_ == nullptr);
 | 
	
		
			
				|  |  | +  watcher->next_ = head_;
 | 
	
		
			
				|  |  | +  head_ = watcher;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void ExternalConnectivityWatcher::WatcherList::Remove(
 | 
	
		
			
				|  |  | +    const ExternalConnectivityWatcher* watcher) {
 | 
	
		
			
				|  |  | +  MutexLock lock(&mu_);
 | 
	
		
			
				|  |  | +  if (watcher == head_) {
 | 
	
		
			
				|  |  | +    head_ = watcher->next_;
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
 | 
	
		
			
				|  |  | +    if (w->next_ == watcher) {
 | 
	
		
			
				|  |  | +      w->next_ = w->next_->next_;
 | 
	
		
			
				|  |  | +      return;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_UNREACHABLE_CODE(return );
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// ExternalConnectivityWatcher
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +ExternalConnectivityWatcher::ExternalConnectivityWatcher(
 | 
	
		
			
				|  |  | +    channel_data* chand, grpc_polling_entity pollent,
 | 
	
		
			
				|  |  | +    grpc_connectivity_state* state, grpc_closure* on_complete,
 | 
	
		
			
				|  |  | +    grpc_closure* watcher_timer_init)
 | 
	
		
			
				|  |  | +    : chand_(chand),
 | 
	
		
			
				|  |  | +      pollent_(pollent),
 | 
	
		
			
				|  |  | +      state_(state),
 | 
	
		
			
				|  |  | +      on_complete_(on_complete),
 | 
	
		
			
				|  |  | +      watcher_timer_init_(watcher_timer_init) {
 | 
	
		
			
				|  |  | +  grpc_polling_entity_add_to_pollset_set(&pollent_, chand_->interested_parties);
 | 
	
		
			
				|  |  | +  GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ExternalConnectivityWatcher");
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | +      GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
 | 
	
		
			
				|  |  | +                        grpc_combiner_scheduler(chand_->combiner)),
 | 
	
		
			
				|  |  | +      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
 | 
	
		
			
				|  |  | +  grpc_polling_entity_del_from_pollset_set(&pollent_,
 | 
	
		
			
				|  |  | +                                           chand_->interested_parties);
 | 
	
		
			
				|  |  | +  GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack, "ExternalConnectivityWatcher");
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void ExternalConnectivityWatcher::OnWatchCompleteLocked(void* arg,
 | 
	
		
			
				|  |  | +                                                        grpc_error* error) {
 | 
	
		
			
				|  |  | +  ExternalConnectivityWatcher* self =
 | 
	
		
			
				|  |  | +      static_cast<ExternalConnectivityWatcher*>(arg);
 | 
	
		
			
				|  |  | +  grpc_closure* on_complete = self->on_complete_;
 | 
	
		
			
				|  |  | +  self->chand_->external_connectivity_watcher_list->Remove(self);
 | 
	
		
			
				|  |  | +  Delete(self);
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void ExternalConnectivityWatcher::WatchConnectivityStateLocked(
 | 
	
		
			
				|  |  | +    void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +  ExternalConnectivityWatcher* self =
 | 
	
		
			
				|  |  | +      static_cast<ExternalConnectivityWatcher*>(arg);
 | 
	
		
			
				|  |  | +  if (self->state_ == nullptr) {
 | 
	
		
			
				|  |  | +    // Handle cancellation.
 | 
	
		
			
				|  |  | +    GPR_ASSERT(self->watcher_timer_init_ == nullptr);
 | 
	
		
			
				|  |  | +    ExternalConnectivityWatcher* found =
 | 
	
		
			
				|  |  | +        self->chand_->external_connectivity_watcher_list->Lookup(
 | 
	
		
			
				|  |  | +            self->on_complete_);
 | 
	
		
			
				|  |  | +    if (found != nullptr) {
 | 
	
		
			
				|  |  | +      grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | +          &found->chand_->state_tracker, nullptr, &found->my_closure_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // New watcher.
 | 
	
		
			
				|  |  | +  self->chand_->external_connectivity_watcher_list->Add(self);
 | 
	
		
			
				|  |  | +  // This assumes that the closure is scheduled on the ExecCtx scheduler
 | 
	
		
			
				|  |  | +  // and that GRPC_CLOSURE_RUN would run the closure immediately.
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
 | 
	
		
			
				|  |  | +                    grpc_combiner_scheduler(self->chand_->combiner));
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | +      &self->chand_->state_tracker, self->state_, &self->my_closure_);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// ClientChannelControlHelper
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class ClientChannelControlHelper
 | 
	
		
			
				|  |  |      : public LoadBalancingPolicy::ChannelControlHelper {
 | 
	
	
		
			
				|  | @@ -222,8 +458,10 @@ class ClientChannelControlHelper
 | 
	
		
			
				|  |  |    void UpdateState(
 | 
	
		
			
				|  |  |        grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  |        UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
 | 
	
		
			
				|  |  | +    grpc_error* disconnect_error =
 | 
	
		
			
				|  |  | +        chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
 | 
	
		
			
				|  |  |      if (grpc_client_channel_routing_trace.enabled()) {
 | 
	
		
			
				|  |  | -      const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  | +      const char* extra = disconnect_error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  |                                ? ""
 | 
	
		
			
				|  |  |                                : " (ignoring -- channel shutting down)";
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
 | 
	
	
		
			
				|  | @@ -231,9 +469,10 @@ class ClientChannelControlHelper
 | 
	
		
			
				|  |  |                grpc_error_string(state_error), picker.get(), extra);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Do update only if not shutting down.
 | 
	
		
			
				|  |  | -    if (chand_->disconnect_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -      set_connectivity_state_and_picker_locked(chand_, state, state_error,
 | 
	
		
			
				|  |  | -                                               "helper", std::move(picker));
 | 
	
		
			
				|  |  | +    if (disconnect_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      // Will delete itself.
 | 
	
		
			
				|  |  | +      New<ConnectivityStateAndPickerSetter>(chand_, state, state_error,
 | 
	
		
			
				|  |  | +                                            "helper", std::move(picker));
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -255,7 +494,6 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |      void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
 | 
	
		
			
				|  |  |      grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(arg);
 | 
	
		
			
				|  |  | -  chand->have_service_config = true;
 | 
	
		
			
				|  |  |    ProcessedResolverResult resolver_result(result, chand->enable_retries);
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> service_config_json =
 | 
	
		
			
				|  |  |        resolver_result.service_config_json();
 | 
	
	
		
			
				|  | @@ -263,9 +501,11 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
 | 
	
		
			
				|  |  |              chand, service_config_json.get());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // Update channel state.
 | 
	
		
			
				|  |  | -  chand->retry_throttle_data = resolver_result.retry_throttle_data();
 | 
	
		
			
				|  |  | -  chand->method_params_table = resolver_result.method_params_table();
 | 
	
		
			
				|  |  | +  // Create service config setter to update channel state in the data
 | 
	
		
			
				|  |  | +  // plane combiner.  Destroys itself when done.
 | 
	
		
			
				|  |  | +  grpc_core::New<grpc_core::ServiceConfigSetter>(
 | 
	
		
			
				|  |  | +      chand, resolver_result.retry_throttle_data(),
 | 
	
		
			
				|  |  | +      resolver_result.method_params_table());
 | 
	
		
			
				|  |  |    // Swap out the data used by cc_get_channel_info().
 | 
	
		
			
				|  |  |    gpr_mu_lock(&chand->info_mu);
 | 
	
		
			
				|  |  |    chand->info_lb_policy_name = resolver_result.lb_policy_name();
 | 
	
	
		
			
				|  | @@ -280,11 +520,6 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |    // Return results.
 | 
	
		
			
				|  |  |    *lb_policy_name = chand->info_lb_policy_name.get();
 | 
	
		
			
				|  |  |    *lb_policy_config = resolver_result.lb_policy_config();
 | 
	
		
			
				|  |  | -  // Apply service config to queued picks.
 | 
	
		
			
				|  |  | -  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | -       pick = pick->next) {
 | 
	
		
			
				|  |  | -    maybe_apply_service_config_to_call_locked(pick->elem);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |    return service_config_changed;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -342,12 +577,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->disconnect_with_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    chand->disconnect_error = op->disconnect_with_error;
 | 
	
		
			
				|  |  | +    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +    GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong(
 | 
	
		
			
				|  |  | +        &error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL,
 | 
	
		
			
				|  |  | +        grpc_core::MemoryOrder::ACQUIRE));
 | 
	
		
			
				|  |  |      grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  |          chand->resolving_lb_policy->interested_parties(),
 | 
	
		
			
				|  |  |          chand->interested_parties);
 | 
	
		
			
				|  |  |      chand->resolving_lb_policy.reset();
 | 
	
		
			
				|  |  | -    set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | +    // Will delete itself.
 | 
	
		
			
				|  |  | +    grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
 | 
	
		
			
				|  |  |          chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
 | 
	
		
			
				|  |  |          "shutdown from API",
 | 
	
		
			
				|  |  |          grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
 | 
	
	
		
			
				|  | @@ -397,17 +636,14 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  |    GPR_ASSERT(args->is_last);
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |    // Initialize data members.
 | 
	
		
			
				|  |  | +  chand->data_plane_combiner = grpc_combiner_create();
 | 
	
		
			
				|  |  |    chand->combiner = grpc_combiner_create();
 | 
	
		
			
				|  |  |    grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
 | 
	
		
			
				|  |  |                                 "client_channel");
 | 
	
		
			
				|  |  | -  chand->disconnect_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +  chand->disconnect_error.Store(GRPC_ERROR_NONE,
 | 
	
		
			
				|  |  | +                                grpc_core::MemoryOrder::RELAXED);
 | 
	
		
			
				|  |  |    gpr_mu_init(&chand->info_mu);
 | 
	
		
			
				|  |  | -  gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  chand->external_connectivity_watcher_list_head = nullptr;
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +  chand->external_connectivity_watcher_list.Init();
 | 
	
		
			
				|  |  |    chand->owning_stack = args->channel_stack;
 | 
	
		
			
				|  |  |    chand->deadline_checking_enabled =
 | 
	
		
			
				|  |  |        grpc_deadline_checking_enabled(args->channel_args);
 | 
	
	
		
			
				|  | @@ -511,11 +747,13 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |    chand->method_params_table.reset();
 | 
	
		
			
				|  |  |    grpc_client_channel_stop_backup_polling(chand->interested_parties);
 | 
	
		
			
				|  |  |    grpc_pollset_set_destroy(chand->interested_parties);
 | 
	
		
			
				|  |  | +  GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel");
 | 
	
		
			
				|  |  |    GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(chand->disconnect_error);
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(
 | 
	
		
			
				|  |  | +      chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED));
 | 
	
		
			
				|  |  |    grpc_connectivity_state_destroy(&chand->state_tracker);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&chand->info_mu);
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | +  chand->external_connectivity_watcher_list.Destroy();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /*************************************************************************
 | 
	
	
		
			
				|  | @@ -1261,7 +1499,7 @@ static void do_retry(grpc_call_element* elem,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    // Schedule retry after computed delay.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
 | 
	
		
			
				|  |  | -                    grpc_combiner_scheduler(chand->combiner));
 | 
	
		
			
				|  |  | +                    grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  |    grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
 | 
	
		
			
				|  |  |    // Update bookkeeping.
 | 
	
		
			
				|  |  |    if (retry_state != nullptr) retry_state->retry_dispatched = true;
 | 
	
	
		
			
				|  | @@ -2488,7 +2726,7 @@ class QueuedPickCanceller {
 | 
	
		
			
				|  |  |      auto* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |      GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
 | 
	
		
			
				|  |  | -                      grpc_combiner_scheduler(chand->combiner));
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  |      grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -2628,7 +2866,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  |    // Apply service config data to the call only once, and only if the
 | 
	
		
			
				|  |  |    // channel has the data available.
 | 
	
		
			
				|  |  | -  if (GPR_LIKELY(chand->have_service_config &&
 | 
	
		
			
				|  |  | +  if (GPR_LIKELY(chand->received_service_config_data &&
 | 
	
		
			
				|  |  |                   !calld->service_config_applied)) {
 | 
	
		
			
				|  |  |      calld->service_config_applied = true;
 | 
	
		
			
				|  |  |      apply_service_config_to_call_locked(elem);
 | 
	
	
		
			
				|  | @@ -2676,7 +2914,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |                   .send_initial_metadata_flags;
 | 
	
		
			
				|  |  |    // Apply service config to call if needed.
 | 
	
		
			
				|  |  |    maybe_apply_service_config_to_call_locked(elem);
 | 
	
		
			
				|  |  | -  // When done, we schedule this closure to leave the channel combiner.
 | 
	
		
			
				|  |  | +  // When done, we schedule this closure to leave the data plane combiner.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |    // Attempt pick.
 | 
	
	
		
			
				|  | @@ -2691,12 +2929,14 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |              grpc_error_string(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    switch (pick_result) {
 | 
	
		
			
				|  |  | -    case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
 | 
	
		
			
				|  |  |        // If we're shutting down, fail all RPCs.
 | 
	
		
			
				|  |  | -      if (chand->disconnect_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      grpc_error* disconnect_error =
 | 
	
		
			
				|  |  | +          chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
 | 
	
		
			
				|  |  | +      if (disconnect_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_SCHED(&calld->pick_closure,
 | 
	
		
			
				|  |  | -                           GRPC_ERROR_REF(chand->disconnect_error));
 | 
	
		
			
				|  |  | +                           GRPC_ERROR_REF(disconnect_error));
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        // If wait_for_ready is false, then the error indicates the RPC
 | 
	
	
		
			
				|  | @@ -2722,7 +2962,8 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |        // If wait_for_ready is true, then queue to retry when we get a new
 | 
	
		
			
				|  |  |        // picker.
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | -      // Fallthrough
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Fallthrough
 | 
	
		
			
				|  |  |      case LoadBalancingPolicy::PICK_QUEUE:
 | 
	
		
			
				|  |  |        if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
 | 
	
		
			
				|  |  |        break;
 | 
	
	
		
			
				|  | @@ -2816,7 +3057,8 @@ static void cc_start_transport_stream_op_batch(
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
 | 
	
		
			
				|  |  | -                          elem, grpc_combiner_scheduler(chand->combiner)),
 | 
	
		
			
				|  |  | +                          elem,
 | 
	
		
			
				|  |  | +                          grpc_combiner_scheduler(chand->data_plane_combiner)),
 | 
	
		
			
				|  |  |          GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // For all other batches, release the call combiner.
 | 
	
	
		
			
				|  | @@ -2875,6 +3117,10 @@ const grpc_channel_filter grpc_client_channel_filter = {
 | 
	
		
			
				|  |  |      "client-channel",
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// functions exported to the rest of core
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  void grpc_client_channel_set_channelz_node(
 | 
	
		
			
				|  |  |      grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
	
		
			
				|  | @@ -2914,120 +3160,10 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
 | 
	
		
			
				|  |  |    return out;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -typedef struct external_connectivity_watcher {
 | 
	
		
			
				|  |  | -  channel_data* chand;
 | 
	
		
			
				|  |  | -  grpc_polling_entity pollent;
 | 
	
		
			
				|  |  | -  grpc_closure* on_complete;
 | 
	
		
			
				|  |  | -  grpc_closure* watcher_timer_init;
 | 
	
		
			
				|  |  | -  grpc_connectivity_state* state;
 | 
	
		
			
				|  |  | -  grpc_closure my_closure;
 | 
	
		
			
				|  |  | -  struct external_connectivity_watcher* next;
 | 
	
		
			
				|  |  | -} external_connectivity_watcher;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static external_connectivity_watcher* lookup_external_connectivity_watcher(
 | 
	
		
			
				|  |  | -    channel_data* chand, grpc_closure* on_complete) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      chand->external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | -  while (w != nullptr && w->on_complete != on_complete) {
 | 
	
		
			
				|  |  | -    w = w->next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  return w;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void external_connectivity_watcher_list_append(
 | 
	
		
			
				|  |  | -    channel_data* chand, external_connectivity_watcher* w) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!w->next);
 | 
	
		
			
				|  |  | -  w->next = chand->external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | -  chand->external_connectivity_watcher_list_head = w;
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void external_connectivity_watcher_list_remove(
 | 
	
		
			
				|  |  | -    channel_data* chand, external_connectivity_watcher* to_remove) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(
 | 
	
		
			
				|  |  | -      lookup_external_connectivity_watcher(chand, to_remove->on_complete));
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  if (to_remove == chand->external_connectivity_watcher_list_head) {
 | 
	
		
			
				|  |  | -    chand->external_connectivity_watcher_list_head = to_remove->next;
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      chand->external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | -  while (w != nullptr) {
 | 
	
		
			
				|  |  | -    if (w->next == to_remove) {
 | 
	
		
			
				|  |  | -      w->next = w->next->next;
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    w = w->next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GPR_UNREACHABLE_CODE(return );
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  int grpc_client_channel_num_external_connectivity_watchers(
 | 
	
		
			
				|  |  |      grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  int count = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      chand->external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | -  while (w != nullptr) {
 | 
	
		
			
				|  |  | -    count++;
 | 
	
		
			
				|  |  | -    w = w->next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  return count;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      static_cast<external_connectivity_watcher*>(arg);
 | 
	
		
			
				|  |  | -  grpc_closure* follow_up = w->on_complete;
 | 
	
		
			
				|  |  | -  grpc_polling_entity_del_from_pollset_set(&w->pollent,
 | 
	
		
			
				|  |  | -                                           w->chand->interested_parties);
 | 
	
		
			
				|  |  | -  GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
 | 
	
		
			
				|  |  | -                           "external_connectivity_watcher");
 | 
	
		
			
				|  |  | -  external_connectivity_watcher_list_remove(w->chand, w);
 | 
	
		
			
				|  |  | -  gpr_free(w);
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void watch_connectivity_state_locked(void* arg,
 | 
	
		
			
				|  |  | -                                            grpc_error* error_ignored) {
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      static_cast<external_connectivity_watcher*>(arg);
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* found = nullptr;
 | 
	
		
			
				|  |  | -  if (w->state != nullptr) {
 | 
	
		
			
				|  |  | -    external_connectivity_watcher_list_append(w->chand, w);
 | 
	
		
			
				|  |  | -    // An assumption is being made that the closure is scheduled on the exec ctx
 | 
	
		
			
				|  |  | -    // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
 | 
	
		
			
				|  |  | -                      grpc_combiner_scheduler(w->chand->combiner));
 | 
	
		
			
				|  |  | -    grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
 | 
	
		
			
				|  |  | -                                                   w->state, &w->my_closure);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(w->watcher_timer_init == nullptr);
 | 
	
		
			
				|  |  | -    found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
 | 
	
		
			
				|  |  | -    if (found) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(found->on_complete == w->on_complete);
 | 
	
		
			
				|  |  | -      grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | -          &found->chand->state_tracker, nullptr, &found->my_closure);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    grpc_polling_entity_del_from_pollset_set(&w->pollent,
 | 
	
		
			
				|  |  | -                                             w->chand->interested_parties);
 | 
	
		
			
				|  |  | -    GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
 | 
	
		
			
				|  |  | -                             "external_connectivity_watcher");
 | 
	
		
			
				|  |  | -    gpr_free(w);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  return chand->external_connectivity_watcher_list->size();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_client_channel_watch_connectivity_state(
 | 
	
	
		
			
				|  | @@ -3035,21 +3171,8 @@ void grpc_client_channel_watch_connectivity_state(
 | 
	
		
			
				|  |  |      grpc_connectivity_state* state, grpc_closure* closure,
 | 
	
		
			
				|  |  |      grpc_closure* watcher_timer_init) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  external_connectivity_watcher* w =
 | 
	
		
			
				|  |  | -      static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
 | 
	
		
			
				|  |  | -  w->chand = chand;
 | 
	
		
			
				|  |  | -  w->pollent = pollent;
 | 
	
		
			
				|  |  | -  w->on_complete = closure;
 | 
	
		
			
				|  |  | -  w->state = state;
 | 
	
		
			
				|  |  | -  w->watcher_timer_init = watcher_timer_init;
 | 
	
		
			
				|  |  | -  grpc_polling_entity_add_to_pollset_set(&w->pollent,
 | 
	
		
			
				|  |  | -                                         chand->interested_parties);
 | 
	
		
			
				|  |  | -  GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
 | 
	
		
			
				|  |  | -                         "external_connectivity_watcher");
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
 | 
	
		
			
				|  |  | -                        grpc_combiner_scheduler(chand->combiner)),
 | 
	
		
			
				|  |  | -      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  grpc_core::New<grpc_core::ExternalConnectivityWatcher>(
 | 
	
		
			
				|  |  | +      chand, pollent, state, closure, watcher_timer_init);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
 |