|  | @@ -100,49 +100,52 @@ struct QueuedPick {
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct client_channel_channel_data {
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields set at construction and never modified.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  |    bool deadline_checking_enabled;
 | 
	
		
			
				|  |  |    bool enable_retries;
 | 
	
		
			
				|  |  |    size_t per_rpc_retry_buffer_size;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** combiner protecting all variables below in this data structure */
 | 
	
		
			
				|  |  | -  grpc_combiner* combiner;
 | 
	
		
			
				|  |  | -  /** owning stack */
 | 
	
		
			
				|  |  |    grpc_channel_stack* owning_stack;
 | 
	
		
			
				|  |  | -  /** interested parties (owned) */
 | 
	
		
			
				|  |  | -  grpc_pollset_set* interested_parties;
 | 
	
		
			
				|  |  | -  // Client channel factory.
 | 
	
		
			
				|  |  |    grpc_core::ClientChannelFactory* client_channel_factory;
 | 
	
		
			
				|  |  | -  // Subchannel pool.
 | 
	
		
			
				|  |  | -  grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_core::channelz::ClientChannelNode* channelz_node;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Resolving LB policy.
 | 
	
		
			
				|  |  | -  grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
 | 
	
		
			
				|  |  | -  // Subchannel picker from LB policy.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields used in the data plane.  Protected by data_plane_combiner.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_combiner* data_plane_combiner;
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
 | 
	
		
			
				|  |  | -  // Linked list of queued picks.
 | 
	
		
			
				|  |  | -  QueuedPick* queued_picks;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  bool have_service_config;
 | 
	
		
			
				|  |  | -  /** retry throttle data from service config */
 | 
	
		
			
				|  |  | +  QueuedPick* queued_picks;  // Linked list of queued picks.
 | 
	
		
			
				|  |  | +  // Data from service config.
 | 
	
		
			
				|  |  | +  bool received_service_config_data;
 | 
	
		
			
				|  |  |    grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
 | 
	
		
			
				|  |  | -  /** per-method service config data */
 | 
	
		
			
				|  |  |    grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* the following properties are guarded by a mutex since APIs require them
 | 
	
		
			
				|  |  | -     to be instantaneously available */
 | 
	
		
			
				|  |  | -  gpr_mu info_mu;
 | 
	
		
			
				|  |  | -  grpc_core::UniquePtr<char> info_lb_policy_name;
 | 
	
		
			
				|  |  | -  grpc_core::UniquePtr<char> info_service_config_json;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields used in the control plane.  Protected by combiner.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_combiner* combiner;
 | 
	
		
			
				|  |  | +  grpc_pollset_set* interested_parties;
 | 
	
		
			
				|  |  | +  grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
 | 
	
		
			
				|  |  | +  grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
 | 
	
		
			
				|  |  |    grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  | -  grpc_error* disconnect_error;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* external_connectivity_watcher_list head is guarded by its own mutex, since
 | 
	
		
			
				|  |  | -   * counts need to be grabbed immediately without polling on a cq */
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Fields accessed from both data plane and control plane combiners.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  grpc_core::Atomic<grpc_error*> disconnect_error;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // external_connectivity_watcher_list head is guarded by its own mutex, since
 | 
	
		
			
				|  |  | +  // counts need to be grabbed immediately without polling on a CQ.
 | 
	
		
			
				|  |  |    gpr_mu external_connectivity_watcher_list_mu;
 | 
	
		
			
				|  |  |    struct external_connectivity_watcher* external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // The following properties are guarded by a mutex since APIs require them
 | 
	
		
			
				|  |  | +  // to be instantaneously available.
 | 
	
		
			
				|  |  | +  gpr_mu info_mu;
 | 
	
		
			
				|  |  | +  grpc_core::UniquePtr<char> info_lb_policy_name;
 | 
	
		
			
				|  |  | +  grpc_core::UniquePtr<char> info_service_config_json;
 | 
	
		
			
				|  |  |  } channel_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Forward declarations.
 | 
	
	
		
			
				|  | @@ -166,30 +169,98 @@ static const char* get_channel_connectivity_state_change_string(
 | 
	
		
			
				|  |  |    GPR_UNREACHABLE_CODE(return "UNKNOWN");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | -    channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | -    const char* reason,
 | 
	
		
			
				|  |  | -    grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | -  // Update connectivity state.
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
 | 
	
		
			
				|  |  | -                              reason);
 | 
	
		
			
				|  |  | -  if (chand->channelz_node != nullptr) {
 | 
	
		
			
				|  |  | -    chand->channelz_node->AddTraceEvent(
 | 
	
		
			
				|  |  | -        grpc_core::channelz::ChannelTrace::Severity::Info,
 | 
	
		
			
				|  |  | -        grpc_slice_from_static_string(
 | 
	
		
			
				|  |  | -            get_channel_connectivity_state_change_string(state)));
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// A fire-and-forget class that sets the channel's connectivity state
 | 
	
		
			
				|  |  | +// and then hops into the data plane combiner to update the picker.
 | 
	
		
			
				|  |  | +// Must be instantiated while holding the control plane combiner.
 | 
	
		
			
				|  |  | +// Deletes itself when done.
 | 
	
		
			
				|  |  | +class ConnectivityStateAndPickerSetter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ConnectivityStateAndPickerSetter(
 | 
	
		
			
				|  |  | +      channel_data* chand, grpc_connectivity_state state,
 | 
	
		
			
				|  |  | +      grpc_error* state_error, const char* reason,
 | 
	
		
			
				|  |  | +      UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
 | 
	
		
			
				|  |  | +      : chand_(chand), picker_(std::move(picker)) {
 | 
	
		
			
				|  |  | +    // Update connectivity state here, while holding control plane combiner.
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
 | 
	
		
			
				|  |  | +                                reason);
 | 
	
		
			
				|  |  | +    if (chand->channelz_node != nullptr) {
 | 
	
		
			
				|  |  | +      chand->channelz_node->AddTraceEvent(
 | 
	
		
			
				|  |  | +          channelz::ChannelTrace::Severity::Info,
 | 
	
		
			
				|  |  | +          grpc_slice_from_static_string(
 | 
	
		
			
				|  |  | +              get_channel_connectivity_state_change_string(state)));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Bounce into the data plane combiner to reset the picker.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_REF(chand->owning_stack,
 | 
	
		
			
				|  |  | +                           "ConnectivityStateAndPickerSetter");
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // Update picker.
 | 
	
		
			
				|  |  | -  chand->picker = std::move(picker);
 | 
	
		
			
				|  |  | -  // Re-process queued picks.
 | 
	
		
			
				|  |  | -  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | -       pick = pick->next) {
 | 
	
		
			
				|  |  | -    start_pick_locked(pick->elem, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void SetPicker(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +    auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
 | 
	
		
			
				|  |  | +    // Update picker.
 | 
	
		
			
				|  |  | +    self->chand_->picker = std::move(self->picker_);
 | 
	
		
			
				|  |  | +    // Re-process queued picks.
 | 
	
		
			
				|  |  | +    for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +         pick = pick->next) {
 | 
	
		
			
				|  |  | +      start_pick_locked(pick->elem, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Clean up.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack,
 | 
	
		
			
				|  |  | +                             "ConnectivityStateAndPickerSetter");
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -namespace grpc_core {
 | 
	
		
			
				|  |  | -namespace {
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +  UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
 | 
	
		
			
				|  |  | +  grpc_closure closure_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// A fire-and-forget class that sets the channel's service config data
 | 
	
		
			
				|  |  | +// in the data plane combiner.  Deletes itself when done.
 | 
	
		
			
				|  |  | +class ServiceConfigSetter {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  ServiceConfigSetter(
 | 
	
		
			
				|  |  | +      channel_data* chand,
 | 
	
		
			
				|  |  | +      RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
 | 
	
		
			
				|  |  | +      RefCountedPtr<ClientChannelMethodParamsTable> method_params_table)
 | 
	
		
			
				|  |  | +      : chand_(chand),
 | 
	
		
			
				|  |  | +        retry_throttle_data_(std::move(retry_throttle_data)),
 | 
	
		
			
				|  |  | +        method_params_table_(std::move(method_params_table)) {
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter");
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void SetServiceConfigData(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +    ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
 | 
	
		
			
				|  |  | +    channel_data* chand = self->chand_;
 | 
	
		
			
				|  |  | +    // Update channel state.
 | 
	
		
			
				|  |  | +    chand->received_service_config_data = true;
 | 
	
		
			
				|  |  | +    chand->retry_throttle_data = std::move(self->retry_throttle_data_);
 | 
	
		
			
				|  |  | +    chand->method_params_table = std::move(self->method_params_table_);
 | 
	
		
			
				|  |  | +    // Apply service config to queued picks.
 | 
	
		
			
				|  |  | +    for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +         pick = pick->next) {
 | 
	
		
			
				|  |  | +      maybe_apply_service_config_to_call_locked(pick->elem);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Clean up.
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter");
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +  RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
 | 
	
		
			
				|  |  | +  RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
 | 
	
		
			
				|  |  | +  grpc_closure closure_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class ClientChannelControlHelper
 | 
	
		
			
				|  |  |      : public LoadBalancingPolicy::ChannelControlHelper {
 | 
	
	
		
			
				|  | @@ -222,8 +293,10 @@ class ClientChannelControlHelper
 | 
	
		
			
				|  |  |    void UpdateState(
 | 
	
		
			
				|  |  |        grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  |        UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
 | 
	
		
			
				|  |  | +    grpc_error* disconnect_error =
 | 
	
		
			
				|  |  | +        chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
 | 
	
		
			
				|  |  |      if (grpc_client_channel_routing_trace.enabled()) {
 | 
	
		
			
				|  |  | -      const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  | +      const char* extra = disconnect_error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  |                                ? ""
 | 
	
		
			
				|  |  |                                : " (ignoring -- channel shutting down)";
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
 | 
	
	
		
			
				|  | @@ -231,9 +304,10 @@ class ClientChannelControlHelper
 | 
	
		
			
				|  |  |                grpc_error_string(state_error), picker.get(), extra);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Do update only if not shutting down.
 | 
	
		
			
				|  |  | -    if (chand_->disconnect_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -      set_connectivity_state_and_picker_locked(chand_, state, state_error,
 | 
	
		
			
				|  |  | -                                               "helper", std::move(picker));
 | 
	
		
			
				|  |  | +    if (disconnect_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      // Will delete itself.
 | 
	
		
			
				|  |  | +      New<ConnectivityStateAndPickerSetter>(chand_, state, state_error,
 | 
	
		
			
				|  |  | +                                            "helper", std::move(picker));
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -255,7 +329,6 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |      void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
 | 
	
		
			
				|  |  |      grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(arg);
 | 
	
		
			
				|  |  | -  chand->have_service_config = true;
 | 
	
		
			
				|  |  |    ProcessedResolverResult resolver_result(result, chand->enable_retries);
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> service_config_json =
 | 
	
		
			
				|  |  |        resolver_result.service_config_json();
 | 
	
	
		
			
				|  | @@ -263,9 +336,11 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
 | 
	
		
			
				|  |  |              chand, service_config_json.get());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // Update channel state.
 | 
	
		
			
				|  |  | -  chand->retry_throttle_data = resolver_result.retry_throttle_data();
 | 
	
		
			
				|  |  | -  chand->method_params_table = resolver_result.method_params_table();
 | 
	
		
			
				|  |  | +  // Create service config setter to update channel state in the data
 | 
	
		
			
				|  |  | +  // plane combiner.  Destroys itself when done.
 | 
	
		
			
				|  |  | +  grpc_core::New<grpc_core::ServiceConfigSetter>(
 | 
	
		
			
				|  |  | +      chand, resolver_result.retry_throttle_data(),
 | 
	
		
			
				|  |  | +      resolver_result.method_params_table());
 | 
	
		
			
				|  |  |    // Swap out the data used by cc_get_channel_info().
 | 
	
		
			
				|  |  |    gpr_mu_lock(&chand->info_mu);
 | 
	
		
			
				|  |  |    chand->info_lb_policy_name = resolver_result.lb_policy_name();
 | 
	
	
		
			
				|  | @@ -280,11 +355,6 @@ static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  |    // Return results.
 | 
	
		
			
				|  |  |    *lb_policy_name = chand->info_lb_policy_name.get();
 | 
	
		
			
				|  |  |    *lb_policy_config = resolver_result.lb_policy_config();
 | 
	
		
			
				|  |  | -  // Apply service config to queued picks.
 | 
	
		
			
				|  |  | -  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | -       pick = pick->next) {
 | 
	
		
			
				|  |  | -    maybe_apply_service_config_to_call_locked(pick->elem);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |    return service_config_changed;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -342,12 +412,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->disconnect_with_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    chand->disconnect_error = op->disconnect_with_error;
 | 
	
		
			
				|  |  | +    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +    GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong(
 | 
	
		
			
				|  |  | +        &error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL,
 | 
	
		
			
				|  |  | +        grpc_core::MemoryOrder::ACQUIRE));
 | 
	
		
			
				|  |  |      grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  |          chand->resolving_lb_policy->interested_parties(),
 | 
	
		
			
				|  |  |          chand->interested_parties);
 | 
	
		
			
				|  |  |      chand->resolving_lb_policy.reset();
 | 
	
		
			
				|  |  | -    set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | +    // Will delete itself.
 | 
	
		
			
				|  |  | +    grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
 | 
	
		
			
				|  |  |          chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
 | 
	
		
			
				|  |  |          "shutdown from API",
 | 
	
		
			
				|  |  |          grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
 | 
	
	
		
			
				|  | @@ -397,10 +471,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  |    GPR_ASSERT(args->is_last);
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |    // Initialize data members.
 | 
	
		
			
				|  |  | +  chand->data_plane_combiner = grpc_combiner_create();
 | 
	
		
			
				|  |  |    chand->combiner = grpc_combiner_create();
 | 
	
		
			
				|  |  |    grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
 | 
	
		
			
				|  |  |                                 "client_channel");
 | 
	
		
			
				|  |  | -  chand->disconnect_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +  chand->disconnect_error.Store(GRPC_ERROR_NONE,
 | 
	
		
			
				|  |  | +                                grpc_core::MemoryOrder::RELAXED);
 | 
	
		
			
				|  |  |    gpr_mu_init(&chand->info_mu);
 | 
	
		
			
				|  |  |    gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -511,8 +587,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |    chand->method_params_table.reset();
 | 
	
		
			
				|  |  |    grpc_client_channel_stop_backup_polling(chand->interested_parties);
 | 
	
		
			
				|  |  |    grpc_pollset_set_destroy(chand->interested_parties);
 | 
	
		
			
				|  |  | +  GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel");
 | 
	
		
			
				|  |  |    GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(chand->disconnect_error);
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(
 | 
	
		
			
				|  |  | +      chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED));
 | 
	
		
			
				|  |  |    grpc_connectivity_state_destroy(&chand->state_tracker);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&chand->info_mu);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
 | 
	
	
		
			
				|  | @@ -1261,7 +1339,7 @@ static void do_retry(grpc_call_element* elem,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    // Schedule retry after computed delay.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
 | 
	
		
			
				|  |  | -                    grpc_combiner_scheduler(chand->combiner));
 | 
	
		
			
				|  |  | +                    grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  |    grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
 | 
	
		
			
				|  |  |    // Update bookkeeping.
 | 
	
		
			
				|  |  |    if (retry_state != nullptr) retry_state->retry_dispatched = true;
 | 
	
	
		
			
				|  | @@ -2488,7 +2566,7 @@ class QueuedPickCanceller {
 | 
	
		
			
				|  |  |      auto* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |      GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
 | 
	
		
			
				|  |  | -                      grpc_combiner_scheduler(chand->combiner));
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->data_plane_combiner));
 | 
	
		
			
				|  |  |      grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -2628,7 +2706,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  |    // Apply service config data to the call only once, and only if the
 | 
	
		
			
				|  |  |    // channel has the data available.
 | 
	
		
			
				|  |  | -  if (GPR_LIKELY(chand->have_service_config &&
 | 
	
		
			
				|  |  | +  if (GPR_LIKELY(chand->received_service_config_data &&
 | 
	
		
			
				|  |  |                   !calld->service_config_applied)) {
 | 
	
		
			
				|  |  |      calld->service_config_applied = true;
 | 
	
		
			
				|  |  |      apply_service_config_to_call_locked(elem);
 | 
	
	
		
			
				|  | @@ -2676,7 +2754,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |                   .send_initial_metadata_flags;
 | 
	
		
			
				|  |  |    // Apply service config to call if needed.
 | 
	
		
			
				|  |  |    maybe_apply_service_config_to_call_locked(elem);
 | 
	
		
			
				|  |  | -  // When done, we schedule this closure to leave the channel combiner.
 | 
	
		
			
				|  |  | +  // When done, we schedule this closure to leave the data plane combiner.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |    // Attempt pick.
 | 
	
	
		
			
				|  | @@ -2691,12 +2769,14 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |              grpc_error_string(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    switch (pick_result) {
 | 
	
		
			
				|  |  | -    case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
 | 
	
		
			
				|  |  |        // If we're shutting down, fail all RPCs.
 | 
	
		
			
				|  |  | -      if (chand->disconnect_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      grpc_error* disconnect_error =
 | 
	
		
			
				|  |  | +          chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
 | 
	
		
			
				|  |  | +      if (disconnect_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_SCHED(&calld->pick_closure,
 | 
	
		
			
				|  |  | -                           GRPC_ERROR_REF(chand->disconnect_error));
 | 
	
		
			
				|  |  | +                           GRPC_ERROR_REF(disconnect_error));
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        // If wait_for_ready is false, then the error indicates the RPC
 | 
	
	
		
			
				|  | @@ -2722,7 +2802,8 @@ static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |        // If wait_for_ready is true, then queue to retry when we get a new
 | 
	
		
			
				|  |  |        // picker.
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | -      // Fallthrough
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Fallthrough
 | 
	
		
			
				|  |  |      case LoadBalancingPolicy::PICK_QUEUE:
 | 
	
		
			
				|  |  |        if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
 | 
	
		
			
				|  |  |        break;
 | 
	
	
		
			
				|  | @@ -2816,7 +2897,8 @@ static void cc_start_transport_stream_op_batch(
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
 | 
	
		
			
				|  |  | -                          elem, grpc_combiner_scheduler(chand->combiner)),
 | 
	
		
			
				|  |  | +                          elem,
 | 
	
		
			
				|  |  | +                          grpc_combiner_scheduler(chand->data_plane_combiner)),
 | 
	
		
			
				|  |  |          GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // For all other batches, release the call combiner.
 |