|  | @@ -32,12 +32,14 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/sync.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/backup_poller.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
 | 
	
		
			
				|  |  | -#include "src/core/ext/filters/client_channel/request_routing.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/resolver_registry.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/retry_throttle.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/subchannel.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/deadline/deadline_filter.h"
 | 
	
	
		
			
				|  | @@ -68,6 +70,8 @@ using grpc_core::internal::ClientChannelMethodParamsTable;
 | 
	
		
			
				|  |  |  using grpc_core::internal::ProcessedResolverResult;
 | 
	
		
			
				|  |  |  using grpc_core::internal::ServerRetryThrottleData;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +using grpc_core::LoadBalancingPolicy;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /* Client channel implementation */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // By default, we buffer 256 KiB per RPC for retries.
 | 
	
	
		
			
				|  | @@ -86,44 +90,170 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct external_connectivity_watcher;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -typedef struct client_channel_channel_data {
 | 
	
		
			
				|  |  | -  grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router;
 | 
	
		
			
				|  |  | +struct QueuedPick {
 | 
	
		
			
				|  |  | +  LoadBalancingPolicy::PickState pick;
 | 
	
		
			
				|  |  | +  grpc_call_element* elem;
 | 
	
		
			
				|  |  | +  QueuedPick* next = nullptr;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +typedef struct client_channel_channel_data {
 | 
	
		
			
				|  |  |    bool deadline_checking_enabled;
 | 
	
		
			
				|  |  |    bool enable_retries;
 | 
	
		
			
				|  |  |    size_t per_rpc_retry_buffer_size;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /** combiner protecting all variables below in this data structure */
 | 
	
		
			
				|  |  |    grpc_combiner* combiner;
 | 
	
		
			
				|  |  | -  /** retry throttle data */
 | 
	
		
			
				|  |  | -  grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
 | 
	
		
			
				|  |  | -  /** maps method names to method_parameters structs */
 | 
	
		
			
				|  |  | -  grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
 | 
	
		
			
				|  |  |    /** owning stack */
 | 
	
		
			
				|  |  |    grpc_channel_stack* owning_stack;
 | 
	
		
			
				|  |  |    /** interested parties (owned) */
 | 
	
		
			
				|  |  |    grpc_pollset_set* interested_parties;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* external_connectivity_watcher_list head is guarded by its own mutex, since
 | 
	
		
			
				|  |  | -   * counts need to be grabbed immediately without polling on a cq */
 | 
	
		
			
				|  |  | -  gpr_mu external_connectivity_watcher_list_mu;
 | 
	
		
			
				|  |  | -  struct external_connectivity_watcher* external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  | +  // Client channel factory.  Holds a ref.
 | 
	
		
			
				|  |  | +  grpc_client_channel_factory* client_channel_factory;
 | 
	
		
			
				|  |  | +  // Subchannel pool.
 | 
	
		
			
				|  |  | +  grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_core::channelz::ClientChannelNode* channelz_node;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // Resolving LB policy.
 | 
	
		
			
				|  |  | +  grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
 | 
	
		
			
				|  |  | +  // Subchannel picker from LB policy.
 | 
	
		
			
				|  |  | +  grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
 | 
	
		
			
				|  |  | +  // Linked list of queued picks.
 | 
	
		
			
				|  |  | +  QueuedPick* queued_picks;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  bool have_service_config;
 | 
	
		
			
				|  |  | +  /** retry throttle data from service config */
 | 
	
		
			
				|  |  | +  grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
 | 
	
		
			
				|  |  | +  /** per-method service config data */
 | 
	
		
			
				|  |  | +  grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* the following properties are guarded by a mutex since APIs require them
 | 
	
		
			
				|  |  |       to be instantaneously available */
 | 
	
		
			
				|  |  |    gpr_mu info_mu;
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> info_lb_policy_name;
 | 
	
		
			
				|  |  | -  /** service config in JSON form */
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> info_service_config_json;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  | +  grpc_error* disconnect_error;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* external_connectivity_watcher_list head is guarded by its own mutex, since
 | 
	
		
			
				|  |  | +   * counts need to be grabbed immediately without polling on a cq */
 | 
	
		
			
				|  |  | +  gpr_mu external_connectivity_watcher_list_mu;
 | 
	
		
			
				|  |  | +  struct external_connectivity_watcher* external_connectivity_watcher_list_head;
 | 
	
		
			
				|  |  |  } channel_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Synchronous callback from chand->request_router to process a resolver
 | 
	
		
			
				|  |  | +// Forward declarations.
 | 
	
		
			
				|  |  | +static void start_pick_locked(void* arg, grpc_error* ignored);
 | 
	
		
			
				|  |  | +static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static const char* get_channel_connectivity_state_change_string(
 | 
	
		
			
				|  |  | +    grpc_connectivity_state state) {
 | 
	
		
			
				|  |  | +  switch (state) {
 | 
	
		
			
				|  |  | +    case GRPC_CHANNEL_IDLE:
 | 
	
		
			
				|  |  | +      return "Channel state change to IDLE";
 | 
	
		
			
				|  |  | +    case GRPC_CHANNEL_CONNECTING:
 | 
	
		
			
				|  |  | +      return "Channel state change to CONNECTING";
 | 
	
		
			
				|  |  | +    case GRPC_CHANNEL_READY:
 | 
	
		
			
				|  |  | +      return "Channel state change to READY";
 | 
	
		
			
				|  |  | +    case GRPC_CHANNEL_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | +      return "Channel state change to TRANSIENT_FAILURE";
 | 
	
		
			
				|  |  | +    case GRPC_CHANNEL_SHUTDOWN:
 | 
	
		
			
				|  |  | +      return "Channel state change to SHUTDOWN";
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_UNREACHABLE_CODE(return "UNKNOWN");
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | +    channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | +    const char* reason,
 | 
	
		
			
				|  |  | +    grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | +  // Update connectivity state.
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
 | 
	
		
			
				|  |  | +                              reason);
 | 
	
		
			
				|  |  | +  if (chand->channelz_node != nullptr) {
 | 
	
		
			
				|  |  | +    chand->channelz_node->AddTraceEvent(
 | 
	
		
			
				|  |  | +        grpc_core::channelz::ChannelTrace::Severity::Info,
 | 
	
		
			
				|  |  | +        grpc_slice_from_static_string(
 | 
	
		
			
				|  |  | +            get_channel_connectivity_state_change_string(state)));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Update picker.
 | 
	
		
			
				|  |  | +  chand->picker = std::move(picker);
 | 
	
		
			
				|  |  | +  // Re-process queued picks.
 | 
	
		
			
				|  |  | +  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +       pick = pick->next) {
 | 
	
		
			
				|  |  | +    start_pick_locked(pick->elem, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class ClientChannelControlHelper
 | 
	
		
			
				|  |  | +    : public LoadBalancingPolicy::ChannelControlHelper {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  explicit ClientChannelControlHelper(channel_data* chand) : chand_(chand) {
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ClientChannelControlHelper");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  ~ClientChannelControlHelper() override {
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack,
 | 
	
		
			
				|  |  | +                             "ClientChannelControlHelper");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
 | 
	
		
			
				|  |  | +    grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
 | 
	
		
			
				|  |  | +        chand_->subchannel_pool.get());
 | 
	
		
			
				|  |  | +    grpc_channel_args* new_args =
 | 
	
		
			
				|  |  | +        grpc_channel_args_copy_and_add(&args, &arg, 1);
 | 
	
		
			
				|  |  | +    Subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
 | 
	
		
			
				|  |  | +        chand_->client_channel_factory, new_args);
 | 
	
		
			
				|  |  | +    grpc_channel_args_destroy(new_args);
 | 
	
		
			
				|  |  | +    return subchannel;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_channel* CreateChannel(const char* target, grpc_client_channel_type type,
 | 
	
		
			
				|  |  | +                              const grpc_channel_args& args) override {
 | 
	
		
			
				|  |  | +    return grpc_client_channel_factory_create_channel(
 | 
	
		
			
				|  |  | +        chand_->client_channel_factory, target, type, &args);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void UpdateState(
 | 
	
		
			
				|  |  | +      grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | +      UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
 | 
	
		
			
				|  |  | +    if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +      const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  | +                              ? ""
 | 
	
		
			
				|  |  | +                              : " (ignoring -- channel shutting down)";
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
 | 
	
		
			
				|  |  | +              chand_, grpc_connectivity_state_name(state),
 | 
	
		
			
				|  |  | +              grpc_error_string(state_error), picker.get(), extra);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Do update only if not shutting down.
 | 
	
		
			
				|  |  | +    if (chand_->disconnect_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      set_connectivity_state_and_picker_locked(chand_, state, state_error,
 | 
	
		
			
				|  |  | +                                               "helper", std::move(picker));
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
 | 
	
		
			
				|  |  | +  void RequestReresolution() override {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  channel_data* chand_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}  // namespace
 | 
	
		
			
				|  |  | +}  // namespace grpc_core
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Synchronous callback from chand->resolving_lb_policy to process a resolver
 | 
	
		
			
				|  |  |  // result update.
 | 
	
		
			
				|  |  | -static bool process_resolver_result_locked(void* arg,
 | 
	
		
			
				|  |  | -                                           const grpc_channel_args& args,
 | 
	
		
			
				|  |  | -                                           const char** lb_policy_name,
 | 
	
		
			
				|  |  | -                                           grpc_json** lb_policy_config) {
 | 
	
		
			
				|  |  | +static bool process_resolver_result_locked(
 | 
	
		
			
				|  |  | +    void* arg, const grpc_channel_args& args, const char** lb_policy_name,
 | 
	
		
			
				|  |  | +    grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(arg);
 | 
	
		
			
				|  |  | +  chand->have_service_config = true;
 | 
	
		
			
				|  |  |    ProcessedResolverResult resolver_result(args, chand->enable_retries);
 | 
	
		
			
				|  |  |    grpc_core::UniquePtr<char> service_config_json =
 | 
	
		
			
				|  |  |        resolver_result.service_config_json();
 | 
	
	
		
			
				|  | @@ -148,9 +278,38 @@ static bool process_resolver_result_locked(void* arg,
 | 
	
		
			
				|  |  |    // Return results.
 | 
	
		
			
				|  |  |    *lb_policy_name = chand->info_lb_policy_name.get();
 | 
	
		
			
				|  |  |    *lb_policy_config = resolver_result.lb_policy_config();
 | 
	
		
			
				|  |  | +  // Apply service config to queued picks.
 | 
	
		
			
				|  |  | +  for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
 | 
	
		
			
				|  |  | +       pick = pick->next) {
 | 
	
		
			
				|  |  | +    maybe_apply_service_config_to_call_locked(pick->elem);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    return service_config_changed;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static grpc_error* do_ping_locked(channel_data* chand, grpc_transport_op* op) {
 | 
	
		
			
				|  |  | +  grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +  grpc_connectivity_state state =
 | 
	
		
			
				|  |  | +      grpc_connectivity_state_get(&chand->state_tracker, &error);
 | 
	
		
			
				|  |  | +  if (state != GRPC_CHANNEL_READY) {
 | 
	
		
			
				|  |  | +    grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | +        "channel not connected", &error, 1);
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +    return new_error;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  LoadBalancingPolicy::PickState pick;
 | 
	
		
			
				|  |  | +  chand->picker->Pick(&pick, &error);
 | 
	
		
			
				|  |  | +  if (pick.connected_subchannel != nullptr) {
 | 
	
		
			
				|  |  | +    pick.connected_subchannel->Ping(op->send_ping.on_initiate,
 | 
	
		
			
				|  |  | +                                    op->send_ping.on_ack);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | +          "LB policy dropped call on ping");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return error;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
 | 
	
		
			
				|  |  |    grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
 | 
	
		
			
				|  |  |    grpc_channel_element* elem =
 | 
	
	
		
			
				|  | @@ -158,47 +317,40 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->on_connectivity_state_change != nullptr) {
 | 
	
		
			
				|  |  | -    chand->request_router->NotifyOnConnectivityStateChange(
 | 
	
		
			
				|  |  | -        op->connectivity_state, op->on_connectivity_state_change);
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | +        &chand->state_tracker, op->connectivity_state,
 | 
	
		
			
				|  |  | +        op->on_connectivity_state_change);
 | 
	
		
			
				|  |  |      op->on_connectivity_state_change = nullptr;
 | 
	
		
			
				|  |  |      op->connectivity_state = nullptr;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
 | 
	
		
			
				|  |  | -    if (chand->request_router->lb_policy() == nullptr) {
 | 
	
		
			
				|  |  | -      grpc_error* error =
 | 
	
		
			
				|  |  | -          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
 | 
	
		
			
				|  |  | +    grpc_error* error = do_ping_locked(chand, op);
 | 
	
		
			
				|  |  | +    if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -      grpc_core::LoadBalancingPolicy::PickState pick_state;
 | 
	
		
			
				|  |  | -      // Pick must return synchronously, because pick_state.on_complete is null.
 | 
	
		
			
				|  |  | -      GPR_ASSERT(
 | 
	
		
			
				|  |  | -          chand->request_router->lb_policy()->PickLocked(&pick_state, &error));
 | 
	
		
			
				|  |  | -      if (pick_state.connected_subchannel != nullptr) {
 | 
	
		
			
				|  |  | -        pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
 | 
	
		
			
				|  |  | -                                              op->send_ping.on_ack);
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -          error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -              "LB policy dropped call on ping");
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      op->bind_pollset = nullptr;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    op->bind_pollset = nullptr;
 | 
	
		
			
				|  |  |      op->send_ping.on_initiate = nullptr;
 | 
	
		
			
				|  |  |      op->send_ping.on_ack = nullptr;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (op->disconnect_with_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    chand->request_router->ShutdownLocked(op->disconnect_with_error);
 | 
	
		
			
				|  |  | +  if (op->reset_connect_backoff) {
 | 
	
		
			
				|  |  | +    chand->resolving_lb_policy->ResetBackoffLocked();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (op->reset_connect_backoff) {
 | 
	
		
			
				|  |  | -    chand->request_router->ResetConnectionBackoffLocked();
 | 
	
		
			
				|  |  | +  if (op->disconnect_with_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    chand->disconnect_error = op->disconnect_with_error;
 | 
	
		
			
				|  |  | +    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | +        chand->resolving_lb_policy->interested_parties(),
 | 
	
		
			
				|  |  | +        chand->interested_parties);
 | 
	
		
			
				|  |  | +    chand->resolving_lb_policy.reset();
 | 
	
		
			
				|  |  | +    set_connectivity_state_and_picker_locked(
 | 
	
		
			
				|  |  | +        chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
 | 
	
		
			
				|  |  | +        "shutdown from API",
 | 
	
		
			
				|  |  | +        grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
 | 
	
		
			
				|  |  | +            grpc_core::New<LoadBalancingPolicy::TransientFailurePicker>(
 | 
	
		
			
				|  |  | +                GRPC_ERROR_REF(op->disconnect_with_error))));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
 | 
	
	
		
			
				|  | @@ -244,6 +396,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |    // Initialize data members.
 | 
	
		
			
				|  |  |    chand->combiner = grpc_combiner_create();
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
 | 
	
		
			
				|  |  | +                               "client_channel");
 | 
	
		
			
				|  |  | +  chand->disconnect_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    gpr_mu_init(&chand->info_mu);
 | 
	
		
			
				|  |  |    gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -275,8 +430,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  |      return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  |          "client channel factory arg must be a pointer");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_client_channel_factory* client_channel_factory =
 | 
	
		
			
				|  |  | +  chand->client_channel_factory =
 | 
	
		
			
				|  |  |        static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
 | 
	
		
			
				|  |  | +  grpc_client_channel_factory_ref(chand->client_channel_factory);
 | 
	
		
			
				|  |  |    // Get server name to resolve, using proxy mapper if needed.
 | 
	
		
			
				|  |  |    arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
 | 
	
		
			
				|  |  |    if (arg == nullptr) {
 | 
	
	
		
			
				|  | @@ -291,26 +447,71 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  |    grpc_channel_args* new_args = nullptr;
 | 
	
		
			
				|  |  |    grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
 | 
	
		
			
				|  |  |                                &proxy_name, &new_args);
 | 
	
		
			
				|  |  | -  // Instantiate request router.
 | 
	
		
			
				|  |  | -  grpc_client_channel_factory_ref(client_channel_factory);
 | 
	
		
			
				|  |  | +  grpc_core::UniquePtr<char> target_uri(
 | 
	
		
			
				|  |  | +      proxy_name != nullptr ? proxy_name : gpr_strdup(arg->value.string));
 | 
	
		
			
				|  |  | +  // Instantiate subchannel pool.
 | 
	
		
			
				|  |  | +  arg = grpc_channel_args_find(args->channel_args,
 | 
	
		
			
				|  |  | +                               GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL);
 | 
	
		
			
				|  |  | +  if (grpc_channel_arg_get_bool(arg, false)) {
 | 
	
		
			
				|  |  | +    chand->subchannel_pool =
 | 
	
		
			
				|  |  | +        grpc_core::MakeRefCounted<grpc_core::LocalSubchannelPool>();
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    chand->subchannel_pool = grpc_core::GlobalSubchannelPool::instance();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Instantiate resolving LB policy.
 | 
	
		
			
				|  |  | +  LoadBalancingPolicy::Args lb_args;
 | 
	
		
			
				|  |  | +  lb_args.combiner = chand->combiner;
 | 
	
		
			
				|  |  | +  lb_args.channel_control_helper =
 | 
	
		
			
				|  |  | +      grpc_core::UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
 | 
	
		
			
				|  |  | +          grpc_core::New<grpc_core::ClientChannelControlHelper>(chand));
 | 
	
		
			
				|  |  | +  lb_args.args = new_args != nullptr ? new_args : args->channel_args;
 | 
	
		
			
				|  |  |    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -  chand->request_router.Init(
 | 
	
		
			
				|  |  | -      chand->owning_stack, chand->combiner, client_channel_factory,
 | 
	
		
			
				|  |  | -      chand->interested_parties, &grpc_client_channel_trace,
 | 
	
		
			
				|  |  | -      process_resolver_result_locked, chand,
 | 
	
		
			
				|  |  | -      proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */,
 | 
	
		
			
				|  |  | -      new_args != nullptr ? new_args : args->channel_args, &error);
 | 
	
		
			
				|  |  | -  gpr_free(proxy_name);
 | 
	
		
			
				|  |  | +  chand->resolving_lb_policy.reset(
 | 
	
		
			
				|  |  | +      grpc_core::New<grpc_core::ResolvingLoadBalancingPolicy>(
 | 
	
		
			
				|  |  | +          std::move(lb_args), &grpc_client_channel_trace, std::move(target_uri),
 | 
	
		
			
				|  |  | +          process_resolver_result_locked, chand, &error));
 | 
	
		
			
				|  |  |    grpc_channel_args_destroy(new_args);
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    // Orphan the resolving LB policy and flush the exec_ctx to ensure
 | 
	
		
			
				|  |  | +    // that it finishes shutting down.  This ensures that if we are
 | 
	
		
			
				|  |  | +    // failing, we destroy the ClientChannelControlHelper (and thus
 | 
	
		
			
				|  |  | +    // unref the channel stack) before we return.
 | 
	
		
			
				|  |  | +    // TODO(roth): This is not a complete solution, because it only
 | 
	
		
			
				|  |  | +    // catches the case where channel stack initialization fails in this
 | 
	
		
			
				|  |  | +    // particular filter.  If there is a failure in a different filter, we
 | 
	
		
			
				|  |  | +    // will leave a dangling ref here, which can cause a crash.  Fortunately,
 | 
	
		
			
				|  |  | +    // in practice, there are no other filters that can cause failures in
 | 
	
		
			
				|  |  | +    // channel stack initialization, so this works for now.
 | 
	
		
			
				|  |  | +    chand->resolving_lb_policy.reset();
 | 
	
		
			
				|  |  | +    grpc_core::ExecCtx::Get()->Flush();
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    grpc_pollset_set_add_pollset_set(
 | 
	
		
			
				|  |  | +        chand->resolving_lb_policy->interested_parties(),
 | 
	
		
			
				|  |  | +        chand->interested_parties);
 | 
	
		
			
				|  |  | +    if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", chand,
 | 
	
		
			
				|  |  | +              chand->resolving_lb_policy.get());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    return error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Destructor for channel_data */
 | 
	
		
			
				|  |  |  static void cc_destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  chand->request_router.Destroy();
 | 
	
		
			
				|  |  | +  if (chand->resolving_lb_policy != nullptr) {
 | 
	
		
			
				|  |  | +    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | +        chand->resolving_lb_policy->interested_parties(),
 | 
	
		
			
				|  |  | +        chand->interested_parties);
 | 
	
		
			
				|  |  | +    chand->resolving_lb_policy.reset();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    // TODO(roth): Once we convert the filter API to C++, there will no
 | 
	
		
			
				|  |  |    // longer be any need to explicitly reset these smart pointer data members.
 | 
	
		
			
				|  |  | +  chand->picker.reset();
 | 
	
		
			
				|  |  | +  chand->subchannel_pool.reset();
 | 
	
		
			
				|  |  | +  if (chand->client_channel_factory != nullptr) {
 | 
	
		
			
				|  |  | +    grpc_client_channel_factory_unref(chand->client_channel_factory);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    chand->info_lb_policy_name.reset();
 | 
	
		
			
				|  |  |    chand->info_service_config_json.reset();
 | 
	
		
			
				|  |  |    chand->retry_throttle_data.reset();
 | 
	
	
		
			
				|  | @@ -318,6 +519,8 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |    grpc_client_channel_stop_backup_polling(chand->interested_parties);
 | 
	
		
			
				|  |  |    grpc_pollset_set_destroy(chand->interested_parties);
 | 
	
		
			
				|  |  |    GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(chand->disconnect_error);
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_destroy(&chand->state_tracker);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&chand->info_mu);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -371,6 +574,12 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  |  //   (census filter is on top of this one)
 | 
	
		
			
				|  |  |  // - add census stats for retries
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +class QueuedPickCanceller;
 | 
	
		
			
				|  |  | +}  // namespace
 | 
	
		
			
				|  |  | +}  // namespace grpc_core
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  namespace {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct call_data;
 | 
	
	
		
			
				|  | @@ -509,8 +718,11 @@ struct call_data {
 | 
	
		
			
				|  |  |      for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
 | 
	
		
			
				|  |  |        GPR_ASSERT(pending_batches[i].batch == nullptr);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (have_request) {
 | 
	
		
			
				|  |  | -      request.Destroy();
 | 
	
		
			
				|  |  | +    for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
 | 
	
		
			
				|  |  | +      if (pick.pick.subchannel_call_context[i].destroy != nullptr) {
 | 
	
		
			
				|  |  | +        pick.pick.subchannel_call_context[i].destroy(
 | 
	
		
			
				|  |  | +            pick.pick.subchannel_call_context[i].value);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -537,8 +749,10 @@ struct call_data {
 | 
	
		
			
				|  |  |    // Set when we get a cancel_stream op.
 | 
	
		
			
				|  |  |    grpc_error* cancel_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request;
 | 
	
		
			
				|  |  | -  bool have_request = false;
 | 
	
		
			
				|  |  | +  QueuedPick pick;
 | 
	
		
			
				|  |  | +  bool pick_queued = false;
 | 
	
		
			
				|  |  | +  bool service_config_applied = false;
 | 
	
		
			
				|  |  | +  grpc_core::QueuedPickCanceller* pick_canceller = nullptr;
 | 
	
		
			
				|  |  |    grpc_closure pick_closure;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_polling_entity* pollent = nullptr;
 | 
	
	
		
			
				|  | @@ -600,7 +814,7 @@ static void retry_commit(grpc_call_element* elem,
 | 
	
		
			
				|  |  |  static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
 | 
	
		
			
				|  |  |  static void on_complete(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  |  static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
 | 
	
		
			
				|  |  | -static void start_pick_locked(void* arg, grpc_error* ignored);
 | 
	
		
			
				|  |  | +static void remove_call_from_queued_picks_locked(grpc_call_element* elem);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // send op data caching
 | 
	
	
		
			
				|  | @@ -728,7 +942,7 @@ static void free_cached_send_op_data_for_completed_batch(
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void maybe_inject_recv_trailing_metadata_ready_for_lb(
 | 
	
		
			
				|  |  | -    const grpc_core::LoadBalancingPolicy::PickState& pick,
 | 
	
		
			
				|  |  | +    const LoadBalancingPolicy::PickState& pick,
 | 
	
		
			
				|  |  |      grpc_transport_stream_op_batch* batch) {
 | 
	
		
			
				|  |  |    if (pick.recv_trailing_metadata_ready != nullptr) {
 | 
	
		
			
				|  |  |      *pick.original_recv_trailing_metadata_ready =
 | 
	
	
		
			
				|  | @@ -846,10 +1060,25 @@ static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // This is called via the call combiner, so access to calld is synchronized.
 | 
	
		
			
				|  |  | -// If yield_call_combiner is true, assumes responsibility for yielding
 | 
	
		
			
				|  |  | -// the call combiner.
 | 
	
		
			
				|  |  | -static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
 | 
	
		
			
				|  |  | -                                 bool yield_call_combiner) {
 | 
	
		
			
				|  |  | +// If yield_call_combiner_predicate returns true, assumes responsibility for
 | 
	
		
			
				|  |  | +// yielding the call combiner.
 | 
	
		
			
				|  |  | +typedef bool (*YieldCallCombinerPredicate)(
 | 
	
		
			
				|  |  | +    const grpc_core::CallCombinerClosureList& closures);
 | 
	
		
			
				|  |  | +static bool yield_call_combiner(
 | 
	
		
			
				|  |  | +    const grpc_core::CallCombinerClosureList& closures) {
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +static bool no_yield_call_combiner(
 | 
	
		
			
				|  |  | +    const grpc_core::CallCombinerClosureList& closures) {
 | 
	
		
			
				|  |  | +  return false;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +static bool yield_call_combiner_if_pending_batches_found(
 | 
	
		
			
				|  |  | +    const grpc_core::CallCombinerClosureList& closures) {
 | 
	
		
			
				|  |  | +  return closures.size() > 0;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +static void pending_batches_fail(
 | 
	
		
			
				|  |  | +    grpc_call_element* elem, grpc_error* error,
 | 
	
		
			
				|  |  | +    YieldCallCombinerPredicate yield_call_combiner_predicate) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  |    if (grpc_client_channel_trace.enabled()) {
 | 
	
	
		
			
				|  | @@ -866,9 +1095,9 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
 | 
	
		
			
				|  |  |      pending_batch* pending = &calld->pending_batches[i];
 | 
	
		
			
				|  |  |      grpc_transport_stream_op_batch* batch = pending->batch;
 | 
	
		
			
				|  |  |      if (batch != nullptr) {
 | 
	
		
			
				|  |  | -      if (batch->recv_trailing_metadata && calld->have_request) {
 | 
	
		
			
				|  |  | -        maybe_inject_recv_trailing_metadata_ready_for_lb(
 | 
	
		
			
				|  |  | -            *calld->request->pick(), batch);
 | 
	
		
			
				|  |  | +      if (batch->recv_trailing_metadata) {
 | 
	
		
			
				|  |  | +        maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
 | 
	
		
			
				|  |  | +                                                         batch);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        batch->handler_private.extra_arg = calld;
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_INIT(&batch->handler_private.closure,
 | 
	
	
		
			
				|  | @@ -879,7 +1108,7 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
 | 
	
		
			
				|  |  |        pending_batch_clear(calld, pending);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (yield_call_combiner) {
 | 
	
		
			
				|  |  | +  if (yield_call_combiner_predicate(closures)) {
 | 
	
		
			
				|  |  |      closures.RunClosures(calld->call_combiner);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      closures.RunClosuresWithoutYielding(calld->call_combiner);
 | 
	
	
		
			
				|  | @@ -923,8 +1152,8 @@ static void pending_batches_resume(grpc_call_element* elem) {
 | 
	
		
			
				|  |  |      grpc_transport_stream_op_batch* batch = pending->batch;
 | 
	
		
			
				|  |  |      if (batch != nullptr) {
 | 
	
		
			
				|  |  |        if (batch->recv_trailing_metadata) {
 | 
	
		
			
				|  |  | -        maybe_inject_recv_trailing_metadata_ready_for_lb(
 | 
	
		
			
				|  |  | -            *calld->request->pick(), batch);
 | 
	
		
			
				|  |  | +        maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
 | 
	
		
			
				|  |  | +                                                         batch);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        batch->handler_private.extra_arg = calld->subchannel_call.get();
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_INIT(&batch->handler_private.closure,
 | 
	
	
		
			
				|  | @@ -1015,11 +1244,9 @@ static void do_retry(grpc_call_element* elem,
 | 
	
		
			
				|  |  |    const ClientChannelMethodParams::RetryPolicy* retry_policy =
 | 
	
		
			
				|  |  |        calld->method_params->retry_policy();
 | 
	
		
			
				|  |  |    GPR_ASSERT(retry_policy != nullptr);
 | 
	
		
			
				|  |  | +  // Reset subchannel call and connected subchannel.
 | 
	
		
			
				|  |  |    calld->subchannel_call.reset();
 | 
	
		
			
				|  |  | -  if (calld->have_request) {
 | 
	
		
			
				|  |  | -    calld->have_request = false;
 | 
	
		
			
				|  |  | -    calld->request.Destroy();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  calld->pick.pick.connected_subchannel.reset();
 | 
	
		
			
				|  |  |    // Compute backoff delay.
 | 
	
		
			
				|  |  |    grpc_millis next_attempt_time;
 | 
	
		
			
				|  |  |    if (server_pushback_ms >= 0) {
 | 
	
	
		
			
				|  | @@ -1938,7 +2165,7 @@ static void add_retriable_recv_trailing_metadata_op(
 | 
	
		
			
				|  |  |    batch_data->batch.payload->recv_trailing_metadata
 | 
	
		
			
				|  |  |        .recv_trailing_metadata_ready =
 | 
	
		
			
				|  |  |        &retry_state->recv_trailing_metadata_ready;
 | 
	
		
			
				|  |  | -  maybe_inject_recv_trailing_metadata_ready_for_lb(*calld->request->pick(),
 | 
	
		
			
				|  |  | +  maybe_inject_recv_trailing_metadata_ready_for_lb(calld->pick.pick,
 | 
	
		
			
				|  |  |                                                     &batch_data->batch);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -2207,41 +2434,38 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  |  // LB pick
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
 | 
	
		
			
				|  |  | +static void create_subchannel_call(grpc_call_element* elem) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  |    const size_t parent_data_size =
 | 
	
		
			
				|  |  |        calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
 | 
	
		
			
				|  |  |    const grpc_core::ConnectedSubchannel::CallArgs call_args = {
 | 
	
		
			
				|  |  | -      calld->pollent,                                   // pollent
 | 
	
		
			
				|  |  | -      calld->path,                                      // path
 | 
	
		
			
				|  |  | -      calld->call_start_time,                           // start_time
 | 
	
		
			
				|  |  | -      calld->deadline,                                  // deadline
 | 
	
		
			
				|  |  | -      calld->arena,                                     // arena
 | 
	
		
			
				|  |  | -      calld->request->pick()->subchannel_call_context,  // context
 | 
	
		
			
				|  |  | -      calld->call_combiner,                             // call_combiner
 | 
	
		
			
				|  |  | -      parent_data_size                                  // parent_data_size
 | 
	
		
			
				|  |  | +      calld->pollent,                            // pollent
 | 
	
		
			
				|  |  | +      calld->path,                               // path
 | 
	
		
			
				|  |  | +      calld->call_start_time,                    // start_time
 | 
	
		
			
				|  |  | +      calld->deadline,                           // deadline
 | 
	
		
			
				|  |  | +      calld->arena,                              // arena
 | 
	
		
			
				|  |  | +      calld->pick.pick.subchannel_call_context,  // context
 | 
	
		
			
				|  |  | +      calld->call_combiner,                      // call_combiner
 | 
	
		
			
				|  |  | +      parent_data_size                           // parent_data_size
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  | -  grpc_error* new_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +  grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    calld->subchannel_call =
 | 
	
		
			
				|  |  | -      calld->request->pick()->connected_subchannel->CreateCall(call_args,
 | 
	
		
			
				|  |  | -                                                               &new_error);
 | 
	
		
			
				|  |  | +      calld->pick.pick.connected_subchannel->CreateCall(call_args, &error);
 | 
	
		
			
				|  |  |    if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
 | 
	
		
			
				|  |  |              chand, calld, calld->subchannel_call.get(),
 | 
	
		
			
				|  |  | -            grpc_error_string(new_error));
 | 
	
		
			
				|  |  | +            grpc_error_string(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | -    new_error = grpc_error_add_child(new_error, error);
 | 
	
		
			
				|  |  | -    pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
 | 
	
		
			
				|  |  | +  if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | +    pending_batches_fail(elem, error, yield_call_combiner);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      if (parent_data_size > 0) {
 | 
	
		
			
				|  |  | -      new (calld->subchannel_call->GetParentData()) subchannel_call_retry_state(
 | 
	
		
			
				|  |  | -          calld->request->pick()->subchannel_call_context);
 | 
	
		
			
				|  |  | +      new (calld->subchannel_call->GetParentData())
 | 
	
		
			
				|  |  | +          subchannel_call_retry_state(calld->pick.pick.subchannel_call_context);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      pending_batches_resume(elem);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Invoked when a pick is completed, on both success or failure.
 | 
	
	
		
			
				|  | @@ -2249,54 +2473,106 @@ static void pick_done(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -  if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) {
 | 
	
		
			
				|  |  | -    // Failed to create subchannel.
 | 
	
		
			
				|  |  | -    // If there was no error, this is an LB policy drop, in which case
 | 
	
		
			
				|  |  | -    // we return an error; otherwise, we may retry.
 | 
	
		
			
				|  |  | -    grpc_status_code status = GRPC_STATUS_OK;
 | 
	
		
			
				|  |  | -    grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
 | 
	
		
			
				|  |  | -                          nullptr);
 | 
	
		
			
				|  |  | -    if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
 | 
	
		
			
				|  |  | -        !maybe_retry(elem, nullptr /* batch_data */, status,
 | 
	
		
			
				|  |  | -                     nullptr /* server_pushback_md */)) {
 | 
	
		
			
				|  |  | -      grpc_error* new_error =
 | 
	
		
			
				|  |  | -          error == GRPC_ERROR_NONE
 | 
	
		
			
				|  |  | -              ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -                    "Call dropped by load balancing policy")
 | 
	
		
			
				|  |  | -              : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -                    "Failed to create subchannel", &error, 1);
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +              "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
 | 
	
		
			
				|  |  | +              calld, grpc_error_string(error));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    pending_batches_fail(elem, GRPC_ERROR_REF(error), yield_call_combiner);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  create_subchannel_call(elem);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// A class to handle the call combiner cancellation callback for a
 | 
	
		
			
				|  |  | +// queued pick.
 | 
	
		
			
				|  |  | +class QueuedPickCanceller {
 | 
	
		
			
				|  |  | + public:
 | 
	
		
			
				|  |  | +  explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
 | 
	
		
			
				|  |  | +    auto* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | +    auto* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | +    GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
 | 
	
		
			
				|  |  | +                      grpc_combiner_scheduler(chand->combiner));
 | 
	
		
			
				|  |  | +    grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | + private:
 | 
	
		
			
				|  |  | +  static void CancelLocked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +    auto* self = static_cast<QueuedPickCanceller*>(arg);
 | 
	
		
			
				|  |  | +    auto* chand = static_cast<channel_data*>(self->elem_->channel_data);
 | 
	
		
			
				|  |  | +    auto* calld = static_cast<call_data*>(self->elem_->call_data);
 | 
	
		
			
				|  |  | +    if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +              "chand=%p calld=%p: cancelling queued pick: "
 | 
	
		
			
				|  |  | +              "error=%s self=%p calld->pick_canceller=%p",
 | 
	
		
			
				|  |  | +              chand, calld, grpc_error_string(error), self,
 | 
	
		
			
				|  |  | +              calld->pick_canceller);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (calld->pick_canceller == self && error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      // Remove pick from list of queued picks.
 | 
	
		
			
				|  |  | +      remove_call_from_queued_picks_locked(self->elem_);
 | 
	
		
			
				|  |  | +      // Fail pending batches on the call.
 | 
	
		
			
				|  |  | +      pending_batches_fail(self->elem_, GRPC_ERROR_REF(error),
 | 
	
		
			
				|  |  | +                           yield_call_combiner_if_pending_batches_found);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    GRPC_CALL_STACK_UNREF(calld->owning_call, "QueuedPickCanceller");
 | 
	
		
			
				|  |  | +    Delete(self);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_call_element* elem_;
 | 
	
		
			
				|  |  | +  grpc_closure closure_;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}  // namespace
 | 
	
		
			
				|  |  | +}  // namespace grpc_core
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Removes the call from the channel's list of queued picks.
 | 
	
		
			
				|  |  | +static void remove_call_from_queued_picks_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  | +  auto* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | +  auto* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | +  for (QueuedPick** pick = &chand->queued_picks; *pick != nullptr;
 | 
	
		
			
				|  |  | +       pick = &(*pick)->next) {
 | 
	
		
			
				|  |  | +    if (*pick == &calld->pick) {
 | 
	
		
			
				|  |  |        if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -                "chand=%p calld=%p: failed to create subchannel: error=%s",
 | 
	
		
			
				|  |  | -                chand, calld, grpc_error_string(new_error));
 | 
	
		
			
				|  |  | +        gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
 | 
	
		
			
				|  |  | +                chand, calld);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
 | 
	
		
			
				|  |  | +      calld->pick_queued = false;
 | 
	
		
			
				|  |  | +      *pick = calld->pick.next;
 | 
	
		
			
				|  |  | +      // Remove call's pollent from channel's interested_parties.
 | 
	
		
			
				|  |  | +      grpc_polling_entity_del_from_pollset_set(calld->pollent,
 | 
	
		
			
				|  |  | +                                               chand->interested_parties);
 | 
	
		
			
				|  |  | +      // Lame the call combiner canceller.
 | 
	
		
			
				|  |  | +      calld->pick_canceller = nullptr;
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    /* Create call on subchannel. */
 | 
	
		
			
				|  |  | -    create_subchannel_call(elem, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// If the channel is in TRANSIENT_FAILURE and the call is not
 | 
	
		
			
				|  |  | -// wait_for_ready=true, fails the call and returns true.
 | 
	
		
			
				|  |  | -static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
 | 
	
		
			
				|  |  | -  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -  grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
 | 
	
		
			
				|  |  | -  if (chand->request_router->GetConnectivityState() ==
 | 
	
		
			
				|  |  | -          GRPC_CHANNEL_TRANSIENT_FAILURE &&
 | 
	
		
			
				|  |  | -      (batch->payload->send_initial_metadata.send_initial_metadata_flags &
 | 
	
		
			
				|  |  | -       GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
 | 
	
		
			
				|  |  | -    pending_batches_fail(
 | 
	
		
			
				|  |  | -        elem,
 | 
	
		
			
				|  |  | -        grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -                               "channel is in state TRANSIENT_FAILURE"),
 | 
	
		
			
				|  |  | -                           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
 | 
	
		
			
				|  |  | -        true /* yield_call_combiner */);
 | 
	
		
			
				|  |  | -    return true;
 | 
	
		
			
				|  |  | +// Adds the call to the channel's list of queued picks.
 | 
	
		
			
				|  |  | +static void add_call_to_queued_picks_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  | +  auto* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | +  auto* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | +  if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
 | 
	
		
			
				|  |  | +            calld);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  return false;
 | 
	
		
			
				|  |  | +  calld->pick_queued = true;
 | 
	
		
			
				|  |  | +  // Add call to queued picks list.
 | 
	
		
			
				|  |  | +  calld->pick.elem = elem;
 | 
	
		
			
				|  |  | +  calld->pick.next = chand->queued_picks;
 | 
	
		
			
				|  |  | +  chand->queued_picks = &calld->pick;
 | 
	
		
			
				|  |  | +  // Add call's pollent to channel's interested_parties, so that I/O
 | 
	
		
			
				|  |  | +  // can be done under the call's CQ.
 | 
	
		
			
				|  |  | +  grpc_polling_entity_add_to_pollset_set(calld->pollent,
 | 
	
		
			
				|  |  | +                                         chand->interested_parties);
 | 
	
		
			
				|  |  | +  // Register call combiner cancellation callback.
 | 
	
		
			
				|  |  | +  calld->pick_canceller = grpc_core::New<grpc_core::QueuedPickCanceller>(elem);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Applies service config to the call.  Must be invoked once we know
 | 
	
	
		
			
				|  | @@ -2356,36 +2632,37 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Invoked once resolver results are available.
 | 
	
		
			
				|  |  | -static bool maybe_apply_service_config_to_call_locked(void* arg) {
 | 
	
		
			
				|  |  | -  grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
 | 
	
		
			
				|  |  | +static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
 | 
	
		
			
				|  |  | +  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -  // Only get service config data on the first attempt.
 | 
	
		
			
				|  |  | -  if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
 | 
	
		
			
				|  |  | +  // Apply service config data to the call only once, and only if the
 | 
	
		
			
				|  |  | +  // channel has the data available.
 | 
	
		
			
				|  |  | +  if (GPR_LIKELY(chand->have_service_config &&
 | 
	
		
			
				|  |  | +                 !calld->service_config_applied)) {
 | 
	
		
			
				|  |  | +    calld->service_config_applied = true;
 | 
	
		
			
				|  |  |      apply_service_config_to_call_locked(elem);
 | 
	
		
			
				|  |  | -    // Check this after applying service config, since it may have
 | 
	
		
			
				|  |  | -    // affected the call's wait_for_ready value.
 | 
	
		
			
				|  |  | -    if (fail_call_if_in_transient_failure(elem)) return false;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void start_pick_locked(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  | +static const char* pick_result_name(
 | 
	
		
			
				|  |  | +    LoadBalancingPolicy::SubchannelPicker::PickResult result) {
 | 
	
		
			
				|  |  | +  switch (result) {
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::SubchannelPicker::PICK_COMPLETE:
 | 
	
		
			
				|  |  | +      return "COMPLETE";
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::SubchannelPicker::PICK_QUEUE:
 | 
	
		
			
				|  |  | +      return "QUEUE";
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::SubchannelPicker::PICK_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | +      return "TRANSIENT_FAILURE";
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_UNREACHABLE_CODE(return "UNKNOWN");
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void start_pick_locked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
 | 
	
		
			
				|  |  |    call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!calld->have_request);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(calld->pick.pick.connected_subchannel == nullptr);
 | 
	
		
			
				|  |  |    GPR_ASSERT(calld->subchannel_call == nullptr);
 | 
	
		
			
				|  |  | -  // Normally, we want to do this check until after we've processed the
 | 
	
		
			
				|  |  | -  // service config, so that we can honor the wait_for_ready setting in
 | 
	
		
			
				|  |  | -  // the service config.  However, if the channel is in TRANSIENT_FAILURE
 | 
	
		
			
				|  |  | -  // and we don't have an LB policy at this point, that means that the
 | 
	
		
			
				|  |  | -  // resolver has returned a failure, so we're not going to get a service
 | 
	
		
			
				|  |  | -  // config right away.  In that case, we fail the call now based on the
 | 
	
		
			
				|  |  | -  // wait_for_ready value passed in from the application.
 | 
	
		
			
				|  |  | -  if (chand->request_router->lb_policy() == nullptr &&
 | 
	
		
			
				|  |  | -      fail_call_if_in_transient_failure(elem)) {
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |    // If this is a retry, use the send_initial_metadata payload that
 | 
	
		
			
				|  |  |    // we've cached; otherwise, use the pending batch.  The
 | 
	
		
			
				|  |  |    // send_initial_metadata batch will be the first pending batch in the
 | 
	
	
		
			
				|  | @@ -2396,25 +2673,78 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
 | 
	
		
			
				|  |  |    // allocate the subchannel batch earlier so that we can give the
 | 
	
		
			
				|  |  |    // subchannel's copy of the metadata batch (which is copied for each
 | 
	
		
			
				|  |  |    // attempt) to the LB policy instead the one from the parent channel.
 | 
	
		
			
				|  |  | -  grpc_metadata_batch* initial_metadata =
 | 
	
		
			
				|  |  | +  calld->pick.pick.initial_metadata =
 | 
	
		
			
				|  |  |        calld->seen_send_initial_metadata
 | 
	
		
			
				|  |  |            ? &calld->send_initial_metadata
 | 
	
		
			
				|  |  |            : calld->pending_batches[0]
 | 
	
		
			
				|  |  |                  .batch->payload->send_initial_metadata.send_initial_metadata;
 | 
	
		
			
				|  |  | -  uint32_t* initial_metadata_flags =
 | 
	
		
			
				|  |  | +  uint32_t* send_initial_metadata_flags =
 | 
	
		
			
				|  |  |        calld->seen_send_initial_metadata
 | 
	
		
			
				|  |  |            ? &calld->send_initial_metadata_flags
 | 
	
		
			
				|  |  |            : &calld->pending_batches[0]
 | 
	
		
			
				|  |  |                   .batch->payload->send_initial_metadata
 | 
	
		
			
				|  |  |                   .send_initial_metadata_flags;
 | 
	
		
			
				|  |  | +  // Apply service config to call if needed.
 | 
	
		
			
				|  |  | +  maybe_apply_service_config_to_call_locked(elem);
 | 
	
		
			
				|  |  | +  // When done, we schedule this closure to leave the channel combiner.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -  calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent,
 | 
	
		
			
				|  |  | -                      initial_metadata, initial_metadata_flags,
 | 
	
		
			
				|  |  | -                      maybe_apply_service_config_to_call_locked, elem,
 | 
	
		
			
				|  |  | -                      &calld->pick_closure);
 | 
	
		
			
				|  |  | -  calld->have_request = true;
 | 
	
		
			
				|  |  | -  chand->request_router->RouteCallLocked(calld->request.get());
 | 
	
		
			
				|  |  | +  // Attempt pick.
 | 
	
		
			
				|  |  | +  error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +  auto pick_result = chand->picker->Pick(&calld->pick.pick, &error);
 | 
	
		
			
				|  |  | +  if (grpc_client_channel_trace.enabled()) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +            "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
 | 
	
		
			
				|  |  | +            "error=%s)",
 | 
	
		
			
				|  |  | +            chand, calld, pick_result_name(pick_result),
 | 
	
		
			
				|  |  | +            calld->pick.pick.connected_subchannel.get(),
 | 
	
		
			
				|  |  | +            grpc_error_string(error));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  switch (pick_result) {
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::SubchannelPicker::PICK_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | +      // If we're shutting down, fail all RPCs.
 | 
	
		
			
				|  |  | +      if (chand->disconnect_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +        GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +        GRPC_CLOSURE_SCHED(&calld->pick_closure,
 | 
	
		
			
				|  |  | +                           GRPC_ERROR_REF(chand->disconnect_error));
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      // If wait_for_ready is false, then the error indicates the RPC
 | 
	
		
			
				|  |  | +      // attempt's final status.
 | 
	
		
			
				|  |  | +      if ((*send_initial_metadata_flags &
 | 
	
		
			
				|  |  | +           GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
 | 
	
		
			
				|  |  | +        // Retry if appropriate; otherwise, fail.
 | 
	
		
			
				|  |  | +        grpc_status_code status = GRPC_STATUS_OK;
 | 
	
		
			
				|  |  | +        grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
 | 
	
		
			
				|  |  | +                              nullptr);
 | 
	
		
			
				|  |  | +        if (!calld->enable_retries ||
 | 
	
		
			
				|  |  | +            !maybe_retry(elem, nullptr /* batch_data */, status,
 | 
	
		
			
				|  |  | +                         nullptr /* server_pushback_md */)) {
 | 
	
		
			
				|  |  | +          grpc_error* new_error =
 | 
	
		
			
				|  |  | +              GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | +                  "Failed to create subchannel", &error, 1);
 | 
	
		
			
				|  |  | +          GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +          GRPC_CLOSURE_SCHED(&calld->pick_closure, new_error);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      // If wait_for_ready is true, then queue to retry when we get a new
 | 
	
		
			
				|  |  | +      // picker.
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +      // Fallthrough
 | 
	
		
			
				|  |  | +    case LoadBalancingPolicy::SubchannelPicker::PICK_QUEUE:
 | 
	
		
			
				|  |  | +      if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    default:  // PICK_COMPLETE
 | 
	
		
			
				|  |  | +      // Handle drops.
 | 
	
		
			
				|  |  | +      if (GPR_UNLIKELY(calld->pick.pick.connected_subchannel == nullptr)) {
 | 
	
		
			
				|  |  | +        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | +            "Call dropped by load balancing policy");
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
 | 
	
		
			
				|  |  | +      if (calld->pick_queued) remove_call_from_queued_picks_locked(elem);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -2458,8 +2788,10 @@ static void cc_start_transport_stream_op_batch(
 | 
	
		
			
				|  |  |      // been started), fail all pending batches.  Otherwise, send the
 | 
	
		
			
				|  |  |      // cancellation down to the subchannel call.
 | 
	
		
			
				|  |  |      if (calld->subchannel_call == nullptr) {
 | 
	
		
			
				|  |  | +      // TODO(roth): If there is a pending retry callback, do we need to
 | 
	
		
			
				|  |  | +      // cancel it here?
 | 
	
		
			
				|  |  |        pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
 | 
	
		
			
				|  |  | -                           false /* yield_call_combiner */);
 | 
	
		
			
				|  |  | +                           no_yield_call_combiner);
 | 
	
		
			
				|  |  |        // Note: This will release the call combiner.
 | 
	
		
			
				|  |  |        grpc_transport_stream_op_batch_finish_with_failure(
 | 
	
		
			
				|  |  |            batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
 | 
	
	
		
			
				|  | @@ -2556,7 +2888,8 @@ const grpc_channel_filter grpc_client_channel_filter = {
 | 
	
		
			
				|  |  |  void grpc_client_channel_set_channelz_node(
 | 
	
		
			
				|  |  |      grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  chand->request_router->set_channelz_node(node);
 | 
	
		
			
				|  |  | +  chand->channelz_node = node;
 | 
	
		
			
				|  |  | +  chand->resolving_lb_policy->set_channelz_node(node->Ref());
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_client_channel_populate_child_refs(
 | 
	
	
		
			
				|  | @@ -2564,22 +2897,23 @@ void grpc_client_channel_populate_child_refs(
 | 
	
		
			
				|  |  |      grpc_core::channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  |      grpc_core::channelz::ChildRefsList* child_channels) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  if (chand->request_router->lb_policy() != nullptr) {
 | 
	
		
			
				|  |  | -    chand->request_router->lb_policy()->FillChildRefsForChannelz(
 | 
	
		
			
				|  |  | -        child_subchannels, child_channels);
 | 
	
		
			
				|  |  | +  if (chand->resolving_lb_policy != nullptr) {
 | 
	
		
			
				|  |  | +    chand->resolving_lb_policy->FillChildRefsForChannelz(child_subchannels,
 | 
	
		
			
				|  |  | +                                                         child_channels);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(arg);
 | 
	
		
			
				|  |  | -  chand->request_router->ExitIdleLocked();
 | 
	
		
			
				|  |  | +  chand->resolving_lb_policy->ExitIdleLocked();
 | 
	
		
			
				|  |  |    GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_connectivity_state grpc_client_channel_check_connectivity_state(
 | 
	
		
			
				|  |  |      grpc_channel_element* elem, int try_to_connect) {
 | 
	
		
			
				|  |  |    channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  grpc_connectivity_state out = chand->request_router->GetConnectivityState();
 | 
	
		
			
				|  |  | +  grpc_connectivity_state out =
 | 
	
		
			
				|  |  | +      grpc_connectivity_state_check(&chand->state_tracker);
 | 
	
		
			
				|  |  |    if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
 | 
	
		
			
				|  |  |      GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_SCHED(
 | 
	
	
		
			
				|  | @@ -2688,15 +3022,15 @@ static void watch_connectivity_state_locked(void* arg,
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
 | 
	
		
			
				|  |  |                        grpc_combiner_scheduler(w->chand->combiner));
 | 
	
		
			
				|  |  | -    w->chand->request_router->NotifyOnConnectivityStateChange(w->state,
 | 
	
		
			
				|  |  | -                                                              &w->my_closure);
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
 | 
	
		
			
				|  |  | +                                                   w->state, &w->my_closure);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      GPR_ASSERT(w->watcher_timer_init == nullptr);
 | 
	
		
			
				|  |  |      found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
 | 
	
		
			
				|  |  |      if (found) {
 | 
	
		
			
				|  |  |        GPR_ASSERT(found->on_complete == w->on_complete);
 | 
	
		
			
				|  |  | -      found->chand->request_router->NotifyOnConnectivityStateChange(
 | 
	
		
			
				|  |  | -          nullptr, &found->my_closure);
 | 
	
		
			
				|  |  | +      grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | +          &found->chand->state_tracker, nullptr, &found->my_closure);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      grpc_polling_entity_del_from_pollset_set(&w->pollent,
 | 
	
		
			
				|  |  |                                               w->chand->interested_parties);
 |