|  | @@ -68,7 +68,9 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/string_util.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/time.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "include/grpc/support/alloc.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/client_channel.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/lb_policy.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
 | 
	
	
		
			
				|  | @@ -85,6 +87,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/host_port.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/manual_constructor.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/gprpp/map.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/memory.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/mutex_lock.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/orphanable.h"
 | 
	
	
		
			
				|  | @@ -114,6 +117,7 @@ TraceFlag grpc_lb_xds_trace(false, "xds");
 | 
	
		
			
				|  |  |  namespace {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  constexpr char kXds[] = "xds_experimental";
 | 
	
		
			
				|  |  | +constexpr char kDefaultLocalityName[] = "xds_default_locality";
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |   public:
 | 
	
	
		
			
				|  | @@ -128,6 +132,9 @@ class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |        channelz::ChildRefsList* child_channels) override;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | +  struct LocalityServerlistEntry;
 | 
	
		
			
				|  |  | +  using LocalityList = InlinedVector<UniquePtr<LocalityServerlistEntry>, 1>;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /// Contains a channel to the LB server and all the data related to the
 | 
	
		
			
				|  |  |    /// channel.
 | 
	
		
			
				|  |  |    class BalancerChannelState
 | 
	
	
		
			
				|  | @@ -266,25 +273,88 @@ class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |      RefCountedPtr<XdsLbClientStats> client_stats_;
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  class Helper : public ChannelControlHelper {
 | 
	
		
			
				|  |  | +  class LocalityMap {
 | 
	
		
			
				|  |  |     public:
 | 
	
		
			
				|  |  | -    explicit Helper(RefCountedPtr<XdsLb> parent) : parent_(std::move(parent)) {}
 | 
	
		
			
				|  |  | +    class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
 | 
	
		
			
				|  |  | +     public:
 | 
	
		
			
				|  |  | +      explicit LocalityEntry(RefCountedPtr<XdsLb> parent)
 | 
	
		
			
				|  |  | +          : parent_(std::move(parent)) {
 | 
	
		
			
				|  |  | +        gpr_mu_init(&child_policy_mu_);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      ~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      void UpdateLocked(xds_grpclb_serverlist* serverlist,
 | 
	
		
			
				|  |  | +                        LoadBalancingPolicy::Config* child_policy_config,
 | 
	
		
			
				|  |  | +                        const grpc_channel_args* args);
 | 
	
		
			
				|  |  | +      void ShutdownLocked();
 | 
	
		
			
				|  |  | +      void ResetBackoffLocked();
 | 
	
		
			
				|  |  | +      void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  | +                                    channelz::ChildRefsList* child_channels);
 | 
	
		
			
				|  |  | +      void Orphan() override;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +     private:
 | 
	
		
			
				|  |  | +      class Helper : public ChannelControlHelper {
 | 
	
		
			
				|  |  | +       public:
 | 
	
		
			
				|  |  | +        explicit Helper(RefCountedPtr<LocalityEntry> entry)
 | 
	
		
			
				|  |  | +            : entry_(std::move(entry)) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | +        grpc_channel* CreateChannel(const char* target,
 | 
	
		
			
				|  |  | +                                    const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | +        void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | +                         UniquePtr<SubchannelPicker> picker) override;
 | 
	
		
			
				|  |  | +        void RequestReresolution() override;
 | 
	
		
			
				|  |  | +        void set_child(LoadBalancingPolicy* child) { child_ = child; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +       private:
 | 
	
		
			
				|  |  | +        bool CalledByPendingChild() const;
 | 
	
		
			
				|  |  | +        bool CalledByCurrentChild() const;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        RefCountedPtr<LocalityEntry> entry_;
 | 
	
		
			
				|  |  | +        LoadBalancingPolicy* child_ = nullptr;
 | 
	
		
			
				|  |  | +      };
 | 
	
		
			
				|  |  | +      // Methods for dealing with the child policy.
 | 
	
		
			
				|  |  | +      OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
 | 
	
		
			
				|  |  | +          const char* name, const grpc_channel_args* args);
 | 
	
		
			
				|  |  | +      grpc_channel_args* CreateChildPolicyArgsLocked(
 | 
	
		
			
				|  |  | +          const grpc_channel_args* args);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      OrphanablePtr<LoadBalancingPolicy> child_policy_;
 | 
	
		
			
				|  |  | +      OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
 | 
	
		
			
				|  |  | +      // Lock held when modifying the value of child_policy_ or
 | 
	
		
			
				|  |  | +      // pending_child_policy_.
 | 
	
		
			
				|  |  | +      gpr_mu child_policy_mu_;
 | 
	
		
			
				|  |  | +      RefCountedPtr<XdsLb> parent_;
 | 
	
		
			
				|  |  | +    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | -    grpc_channel* CreateChannel(const char* target,
 | 
	
		
			
				|  |  | -                                const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | -    void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | -                     UniquePtr<SubchannelPicker> picker) override;
 | 
	
		
			
				|  |  | -    void RequestReresolution() override;
 | 
	
		
			
				|  |  | +    LocalityMap() { gpr_mu_init(&child_refs_mu_); }
 | 
	
		
			
				|  |  | +    ~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    void set_child(LoadBalancingPolicy* child) { child_ = child; }
 | 
	
		
			
				|  |  | +    void UpdateLocked(const LocalityList& locality_list,
 | 
	
		
			
				|  |  | +                      LoadBalancingPolicy::Config* child_policy_config,
 | 
	
		
			
				|  |  | +                      const grpc_channel_args* args, XdsLb* parent);
 | 
	
		
			
				|  |  | +    void ShutdownLocked();
 | 
	
		
			
				|  |  | +    void ResetBackoffLocked();
 | 
	
		
			
				|  |  | +    void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  | +                                  channelz::ChildRefsList* child_channels);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |     private:
 | 
	
		
			
				|  |  | -    bool CalledByPendingChild() const;
 | 
	
		
			
				|  |  | -    bool CalledByCurrentChild() const;
 | 
	
		
			
				|  |  | +    void PruneLocalities(const LocalityList& locality_list);
 | 
	
		
			
				|  |  | +    Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_;
 | 
	
		
			
				|  |  | +    // Lock held while filling child refs for all localities
 | 
	
		
			
				|  |  | +    // inside the map
 | 
	
		
			
				|  |  | +    gpr_mu child_refs_mu_;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    RefCountedPtr<XdsLb> parent_;
 | 
	
		
			
				|  |  | -    LoadBalancingPolicy* child_ = nullptr;
 | 
	
		
			
				|  |  | +  struct LocalityServerlistEntry {
 | 
	
		
			
				|  |  | +    ~LocalityServerlistEntry() {
 | 
	
		
			
				|  |  | +      gpr_free(locality_name);
 | 
	
		
			
				|  |  | +      xds_grpclb_destroy_serverlist(serverlist);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    char* locality_name;
 | 
	
		
			
				|  |  | +    // The deserialized response from the balancer. May be nullptr until one
 | 
	
		
			
				|  |  | +    // such response has arrived.
 | 
	
		
			
				|  |  | +    xds_grpclb_serverlist* serverlist;
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    ~XdsLb();
 | 
	
	
		
			
				|  | @@ -309,12 +379,6 @@ class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |    // Callback to enter fallback mode.
 | 
	
		
			
				|  |  |    static void OnFallbackTimerLocked(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Methods for dealing with the child policy.
 | 
	
		
			
				|  |  | -  void CreateOrUpdateChildPolicyLocked();
 | 
	
		
			
				|  |  | -  grpc_channel_args* CreateChildPolicyArgsLocked();
 | 
	
		
			
				|  |  | -  OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
 | 
	
		
			
				|  |  | -      const char* name, const grpc_channel_args* args);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    // Who the client is trying to communicate with.
 | 
	
		
			
				|  |  |    const char* server_name_ = nullptr;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -338,10 +402,6 @@ class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |    // Timeout in milliseconds for the LB call. 0 means no deadline.
 | 
	
		
			
				|  |  |    int lb_call_timeout_ms_ = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // The deserialized response from the balancer. May be nullptr until one
 | 
	
		
			
				|  |  | -  // such response has arrived.
 | 
	
		
			
				|  |  | -  xds_grpclb_serverlist* serverlist_ = nullptr;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    // Timeout in milliseconds for before using fallback backend addresses.
 | 
	
		
			
				|  |  |    // 0 means not using fallback.
 | 
	
		
			
				|  |  |    RefCountedPtr<Config> fallback_policy_config_;
 | 
	
	
		
			
				|  | @@ -355,11 +415,12 @@ class XdsLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // The policy to use for the backends.
 | 
	
		
			
				|  |  |    RefCountedPtr<Config> child_policy_config_;
 | 
	
		
			
				|  |  | -  OrphanablePtr<LoadBalancingPolicy> child_policy_;
 | 
	
		
			
				|  |  | -  OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
 | 
	
		
			
				|  |  | -  // Lock held when modifying the value of child_policy_ or
 | 
	
		
			
				|  |  | -  // pending_child_policy_.
 | 
	
		
			
				|  |  | -  gpr_mu child_policy_mu_;
 | 
	
		
			
				|  |  | +  // Map of policies to use in the backend
 | 
	
		
			
				|  |  | +  LocalityMap locality_map_;
 | 
	
		
			
				|  |  | +  LocalityList locality_serverlist_;
 | 
	
		
			
				|  |  | +  // TODO(mhaidry) : Add a pending locality map that may be swapped with the
 | 
	
		
			
				|  |  | +  // the current one when new localities in the pending map are ready
 | 
	
		
			
				|  |  | +  // to accept connections
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -378,105 +439,6 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
 | 
	
		
			
				|  |  |    return result;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -// XdsLb::Helper
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -bool XdsLb::Helper::CalledByPendingChild() const {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(child_ != nullptr);
 | 
	
		
			
				|  |  | -  return child_ == parent_->pending_child_policy_.get();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -bool XdsLb::Helper::CalledByCurrentChild() const {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(child_ != nullptr);
 | 
	
		
			
				|  |  | -  return child_ == parent_->child_policy_.get();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | -  if (parent_->shutting_down_ ||
 | 
	
		
			
				|  |  | -      (!CalledByPendingChild() && !CalledByCurrentChild())) {
 | 
	
		
			
				|  |  | -    return nullptr;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return parent_->channel_control_helper()->CreateSubchannel(args);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -grpc_channel* XdsLb::Helper::CreateChannel(const char* target,
 | 
	
		
			
				|  |  | -                                           const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | -  if (parent_->shutting_down_ ||
 | 
	
		
			
				|  |  | -      (!CalledByPendingChild() && !CalledByCurrentChild())) {
 | 
	
		
			
				|  |  | -    return nullptr;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return parent_->channel_control_helper()->CreateChannel(target, args);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
 | 
	
		
			
				|  |  | -                                grpc_error* state_error,
 | 
	
		
			
				|  |  | -                                UniquePtr<SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | -  if (parent_->shutting_down_) {
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // If this request is from the pending child policy, ignore it until
 | 
	
		
			
				|  |  | -  // it reports READY, at which point we swap it into place.
 | 
	
		
			
				|  |  | -  if (CalledByPendingChild()) {
 | 
	
		
			
				|  |  | -    if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -              "[xdslb %p helper %p] pending child policy %p reports state=%s",
 | 
	
		
			
				|  |  | -              parent_.get(), this, parent_->pending_child_policy_.get(),
 | 
	
		
			
				|  |  | -              grpc_connectivity_state_name(state));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (state != GRPC_CHANNEL_READY) {
 | 
	
		
			
				|  |  | -      GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | -        parent_->child_policy_->interested_parties(),
 | 
	
		
			
				|  |  | -        parent_->interested_parties());
 | 
	
		
			
				|  |  | -    MutexLock lock(&parent_->child_policy_mu_);
 | 
	
		
			
				|  |  | -    parent_->child_policy_ = std::move(parent_->pending_child_policy_);
 | 
	
		
			
				|  |  | -  } else if (!CalledByCurrentChild()) {
 | 
	
		
			
				|  |  | -    // This request is from an outdated child, so ignore it.
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // TODO(juanlishen): When in fallback mode, pass the child picker
 | 
	
		
			
				|  |  | -  // through without wrapping it.  (Or maybe use a different helper for
 | 
	
		
			
				|  |  | -  // the fallback policy?)
 | 
	
		
			
				|  |  | -  GPR_ASSERT(parent_->lb_chand_ != nullptr);
 | 
	
		
			
				|  |  | -  RefCountedPtr<XdsLbClientStats> client_stats =
 | 
	
		
			
				|  |  | -      parent_->lb_chand_->lb_calld() == nullptr
 | 
	
		
			
				|  |  | -          ? nullptr
 | 
	
		
			
				|  |  | -          : parent_->lb_chand_->lb_calld()->client_stats();
 | 
	
		
			
				|  |  | -  parent_->channel_control_helper()->UpdateState(
 | 
	
		
			
				|  |  | -      state, state_error,
 | 
	
		
			
				|  |  | -      UniquePtr<SubchannelPicker>(
 | 
	
		
			
				|  |  | -          New<Picker>(std::move(picker), std::move(client_stats))));
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void XdsLb::Helper::RequestReresolution() {
 | 
	
		
			
				|  |  | -  if (parent_->shutting_down_) return;
 | 
	
		
			
				|  |  | -  // If there is a pending child policy, ignore re-resolution requests
 | 
	
		
			
				|  |  | -  // from the current child policy (or any outdated child).
 | 
	
		
			
				|  |  | -  if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -            "[xdslb %p] Re-resolution requested from the internal RR policy "
 | 
	
		
			
				|  |  | -            "(%p).",
 | 
	
		
			
				|  |  | -            parent_.get(), parent_->child_policy_.get());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GPR_ASSERT(parent_->lb_chand_ != nullptr);
 | 
	
		
			
				|  |  | -  // If we are talking to a balancer, we expect to get updated addresses
 | 
	
		
			
				|  |  | -  // from the balancer, so we can ignore the re-resolution request from
 | 
	
		
			
				|  |  | -  // the child policy. Otherwise, pass the re-resolution request up to the
 | 
	
		
			
				|  |  | -  // channel.
 | 
	
		
			
				|  |  | -  if (parent_->lb_chand_->lb_calld() == nullptr ||
 | 
	
		
			
				|  |  | -      !parent_->lb_chand_->lb_calld()->seen_initial_response()) {
 | 
	
		
			
				|  |  | -    parent_->channel_control_helper()->RequestReresolution();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // serverlist parsing code
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -951,7 +913,9 @@ void XdsLb::BalancerChannelState::BalancerCallState::
 | 
	
		
			
				|  |  |          self.release();
 | 
	
		
			
				|  |  |          lb_calld->ScheduleNextClientLoadReportLocked();
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      if (xds_grpclb_serverlist_equals(xdslb_policy->serverlist_, serverlist)) {
 | 
	
		
			
				|  |  | +      if (!xdslb_policy->locality_serverlist_.empty() &&
 | 
	
		
			
				|  |  | +          xds_grpclb_serverlist_equals(
 | 
	
		
			
				|  |  | +              xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) {
 | 
	
		
			
				|  |  |          if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  |            gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                    "[xdslb %p] Incoming server list identical to current, "
 | 
	
	
		
			
				|  | @@ -960,21 +924,31 @@ void XdsLb::BalancerChannelState::BalancerCallState::
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          xds_grpclb_destroy_serverlist(serverlist);
 | 
	
		
			
				|  |  |        } else { /* new serverlist */
 | 
	
		
			
				|  |  | -        if (xdslb_policy->serverlist_ != nullptr) {
 | 
	
		
			
				|  |  | +        if (!xdslb_policy->locality_serverlist_.empty()) {
 | 
	
		
			
				|  |  |            /* dispose of the old serverlist */
 | 
	
		
			
				|  |  | -          xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_);
 | 
	
		
			
				|  |  | +          xds_grpclb_destroy_serverlist(
 | 
	
		
			
				|  |  | +              xdslb_policy->locality_serverlist_[0]->serverlist);
 | 
	
		
			
				|  |  |          } else {
 | 
	
		
			
				|  |  |            /* or dispose of the fallback */
 | 
	
		
			
				|  |  |            xdslb_policy->fallback_backend_addresses_.reset();
 | 
	
		
			
				|  |  |            if (xdslb_policy->fallback_timer_callback_pending_) {
 | 
	
		
			
				|  |  |              grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  | +          /* Initialize locality serverlist, currently the list only handles
 | 
	
		
			
				|  |  | +           * one child */
 | 
	
		
			
				|  |  | +          xdslb_policy->locality_serverlist_.emplace_back(
 | 
	
		
			
				|  |  | +              MakeUnique<LocalityServerlistEntry>());
 | 
	
		
			
				|  |  | +          xdslb_policy->locality_serverlist_[0]->locality_name =
 | 
	
		
			
				|  |  | +              static_cast<char*>(gpr_strdup(kDefaultLocalityName));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          // and update the copy in the XdsLb instance. This
 | 
	
		
			
				|  |  |          // serverlist instance will be destroyed either upon the next
 | 
	
		
			
				|  |  |          // update or when the XdsLb instance is destroyed.
 | 
	
		
			
				|  |  | -        xdslb_policy->serverlist_ = serverlist;
 | 
	
		
			
				|  |  | -        xdslb_policy->CreateOrUpdateChildPolicyLocked();
 | 
	
		
			
				|  |  | +        xdslb_policy->locality_serverlist_[0]->serverlist = serverlist;
 | 
	
		
			
				|  |  | +        xdslb_policy->locality_map_.UpdateLocked(
 | 
	
		
			
				|  |  | +            xdslb_policy->locality_serverlist_,
 | 
	
		
			
				|  |  | +            xdslb_policy->child_policy_config_.get(), xdslb_policy->args_,
 | 
	
		
			
				|  |  | +            xdslb_policy);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        if (grpc_lb_xds_trace.enabled()) {
 | 
	
	
		
			
				|  | @@ -1112,9 +1086,11 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
 | 
	
		
			
				|  |  |  // ctor and dtor
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
 | 
	
		
			
				|  |  | +XdsLb::XdsLb(Args args)
 | 
	
		
			
				|  |  | +    : LoadBalancingPolicy(std::move(args)),
 | 
	
		
			
				|  |  | +      locality_map_(),
 | 
	
		
			
				|  |  | +      locality_serverlist_() {
 | 
	
		
			
				|  |  |    gpr_mu_init(&lb_chand_mu_);
 | 
	
		
			
				|  |  | -  gpr_mu_init(&child_policy_mu_);
 | 
	
		
			
				|  |  |    // Record server name.
 | 
	
		
			
				|  |  |    const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
 | 
	
		
			
				|  |  |    const char* server_uri = grpc_channel_arg_get_string(arg);
 | 
	
	
		
			
				|  | @@ -1141,10 +1117,7 @@ XdsLb::~XdsLb() {
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&lb_chand_mu_);
 | 
	
		
			
				|  |  |    gpr_free((void*)server_name_);
 | 
	
		
			
				|  |  |    grpc_channel_args_destroy(args_);
 | 
	
		
			
				|  |  | -  if (serverlist_ != nullptr) {
 | 
	
		
			
				|  |  | -    xds_grpclb_destroy_serverlist(serverlist_);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&child_policy_mu_);
 | 
	
		
			
				|  |  | +  locality_serverlist_.clear();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsLb::ShutdownLocked() {
 | 
	
	
		
			
				|  | @@ -1152,19 +1125,7 @@ void XdsLb::ShutdownLocked() {
 | 
	
		
			
				|  |  |    if (fallback_timer_callback_pending_) {
 | 
	
		
			
				|  |  |      grpc_timer_cancel(&lb_fallback_timer_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
 | 
	
		
			
				|  |  | -                                     interested_parties());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | -        pending_child_policy_->interested_parties(), interested_parties());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  {
 | 
	
		
			
				|  |  | -    MutexLock lock(&child_policy_mu_);
 | 
	
		
			
				|  |  | -    child_policy_.reset();
 | 
	
		
			
				|  |  | -    pending_child_policy_.reset();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  locality_map_.ShutdownLocked();
 | 
	
		
			
				|  |  |    // We destroy the LB channel here instead of in our destructor because
 | 
	
		
			
				|  |  |    // destroying the channel triggers a last callback to
 | 
	
		
			
				|  |  |    // OnBalancerChannelConnectivityChangedLocked(), and we need to be
 | 
	
	
		
			
				|  | @@ -1187,30 +1148,13 @@ void XdsLb::ResetBackoffLocked() {
 | 
	
		
			
				|  |  |    if (pending_lb_chand_ != nullptr) {
 | 
	
		
			
				|  |  |      grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    child_policy_->ResetBackoffLocked();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    pending_child_policy_->ResetBackoffLocked();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  locality_map_.ResetBackoffLocked();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  |                                       channelz::ChildRefsList* child_channels) {
 | 
	
		
			
				|  |  | -  {
 | 
	
		
			
				|  |  | -    // Delegate to the child_policy_ to fill the children subchannels.
 | 
	
		
			
				|  |  | -    // This must be done holding child_policy_mu_, since this method does not
 | 
	
		
			
				|  |  | -    // run in the combiner.
 | 
	
		
			
				|  |  | -    MutexLock lock(&child_policy_mu_);
 | 
	
		
			
				|  |  | -    if (child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -      child_policy_->FillChildRefsForChannelz(child_subchannels,
 | 
	
		
			
				|  |  | -                                              child_channels);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -      pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
 | 
	
		
			
				|  |  | -                                                      child_channels);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  // Delegate to the child_policy_ to fill the children subchannels.
 | 
	
		
			
				|  |  | +  locality_map_.FillChildRefsForChannelz(child_subchannels, child_channels);
 | 
	
		
			
				|  |  |    MutexLock lock(&lb_chand_mu_);
 | 
	
		
			
				|  |  |    if (lb_chand_ != nullptr) {
 | 
	
		
			
				|  |  |      grpc_core::channelz::ChannelNode* channel_node =
 | 
	
	
		
			
				|  | @@ -1314,10 +1258,11 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
 | 
	
		
			
				|  |  |    // have been created from a serverlist.
 | 
	
		
			
				|  |  |    // TODO(vpowar): Handle the fallback_address changes when we add support for
 | 
	
		
			
				|  |  |    // fallback in xDS.
 | 
	
		
			
				|  |  | -  if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
 | 
	
		
			
				|  |  | +  locality_map_.UpdateLocked(locality_serverlist_, child_policy_config_.get(),
 | 
	
		
			
				|  |  | +                             args_, this);
 | 
	
		
			
				|  |  |    // If this is the initial update, start the fallback timer.
 | 
	
		
			
				|  |  |    if (is_initial_update) {
 | 
	
		
			
				|  |  | -    if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
 | 
	
		
			
				|  |  | +    if (lb_fallback_timeout_ms_ > 0 && locality_serverlist_.empty() &&
 | 
	
		
			
				|  |  |          !fallback_timer_callback_pending_) {
 | 
	
		
			
				|  |  |        grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
 | 
	
		
			
				|  |  |        Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Held by closure
 | 
	
	
		
			
				|  | @@ -1341,8 +1286,8 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    xdslb_policy->fallback_timer_callback_pending_ = false;
 | 
	
		
			
				|  |  |    // If we receive a serverlist after the timer fires but before this callback
 | 
	
		
			
				|  |  |    // actually runs, don't fall back.
 | 
	
		
			
				|  |  | -  if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ &&
 | 
	
		
			
				|  |  | -      error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +  if (xdslb_policy->locality_serverlist_.empty() &&
 | 
	
		
			
				|  |  | +      !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                "[xdslb %p] Fallback timer fired. Not using fallback backends",
 | 
	
	
		
			
				|  | @@ -1352,11 +1297,70 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -// code for interacting with the child policy
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
 | 
	
		
			
				|  |  | +  for (auto iter = map_.begin(); iter != map_.end();) {
 | 
	
		
			
				|  |  | +    bool found = false;
 | 
	
		
			
				|  |  | +    for (size_t i = 0; i < locality_list.size(); i++) {
 | 
	
		
			
				|  |  | +      if (!gpr_stricmp(locality_list[i]->locality_name, iter->first.get())) {
 | 
	
		
			
				|  |  | +        found = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (!found) {  // Remove entries not present in the locality list
 | 
	
		
			
				|  |  | +      MutexLock lock(&child_refs_mu_);
 | 
	
		
			
				|  |  | +      iter = map_.erase(iter);
 | 
	
		
			
				|  |  | +    } else
 | 
	
		
			
				|  |  | +      iter++;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::UpdateLocked(
 | 
	
		
			
				|  |  | +    const LocalityList& locality_serverlist,
 | 
	
		
			
				|  |  | +    LoadBalancingPolicy::Config* child_policy_config,
 | 
	
		
			
				|  |  | +    const grpc_channel_args* args, XdsLb* parent) {
 | 
	
		
			
				|  |  | +  if (parent->shutting_down_) return;
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < locality_serverlist.size(); i++) {
 | 
	
		
			
				|  |  | +    UniquePtr<char> locality_name(
 | 
	
		
			
				|  |  | +        gpr_strdup(locality_serverlist[i]->locality_name));
 | 
	
		
			
				|  |  | +    auto iter = map_.find(locality_name);
 | 
	
		
			
				|  |  | +    if (iter == map_.end()) {
 | 
	
		
			
				|  |  | +      OrphanablePtr<LocalityEntry> new_entry =
 | 
	
		
			
				|  |  | +          MakeOrphanable<LocalityEntry>(parent->Ref());
 | 
	
		
			
				|  |  | +      MutexLock lock(&child_refs_mu_);
 | 
	
		
			
				|  |  | +      iter = map_.emplace(std::move(locality_name), std::move(new_entry)).first;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    // Don't create new child policies if not directed to
 | 
	
		
			
				|  |  | +    xds_grpclb_serverlist* serverlist =
 | 
	
		
			
				|  |  | +        parent->locality_serverlist_[i]->serverlist;
 | 
	
		
			
				|  |  | +    iter->second->UpdateLocked(serverlist, child_policy_config, args);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  PruneLocalities(locality_serverlist);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void grpc_core::XdsLb::LocalityMap::ShutdownLocked() {
 | 
	
		
			
				|  |  | +  MutexLock lock(&child_refs_mu_);
 | 
	
		
			
				|  |  | +  map_.clear();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void grpc_core::XdsLb::LocalityMap::ResetBackoffLocked() {
 | 
	
		
			
				|  |  | +  for (auto iter = map_.begin(); iter != map_.end(); iter++) {
 | 
	
		
			
				|  |  | +    iter->second->ResetBackoffLocked();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
 | 
	
		
			
				|  |  | +    channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  | +    channelz::ChildRefsList* child_channels) {
 | 
	
		
			
				|  |  | +  MutexLock lock(&child_refs_mu_);
 | 
	
		
			
				|  |  | +  for (auto iter = map_.begin(); iter != map_.end(); iter++) {
 | 
	
		
			
				|  |  | +    iter->second->FillChildRefsForChannelz(child_subchannels, child_channels);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
 | 
	
		
			
				|  |  | +// Locality Entry child policy methods
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +grpc_channel_args*
 | 
	
		
			
				|  |  | +XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked(
 | 
	
		
			
				|  |  | +    const grpc_channel_args* args_in) {
 | 
	
		
			
				|  |  |    const grpc_arg args_to_add[] = {
 | 
	
		
			
				|  |  |        // A channel arg indicating if the target is a backend inferred from a
 | 
	
		
			
				|  |  |        // grpclb load balancer.
 | 
	
	
		
			
				|  | @@ -1368,15 +1372,16 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
 | 
	
		
			
				|  |  |        grpc_channel_arg_integer_create(
 | 
	
		
			
				|  |  |            const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  | -  return grpc_channel_args_copy_and_add(args_, args_to_add,
 | 
	
		
			
				|  |  | +  return grpc_channel_args_copy_and_add(args_in, args_to_add,
 | 
	
		
			
				|  |  |                                          GPR_ARRAY_SIZE(args_to_add));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked(
 | 
	
		
			
				|  |  | +OrphanablePtr<LoadBalancingPolicy>
 | 
	
		
			
				|  |  | +XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
 | 
	
		
			
				|  |  |      const char* name, const grpc_channel_args* args) {
 | 
	
		
			
				|  |  | -  Helper* helper = New<Helper>(Ref());
 | 
	
		
			
				|  |  | +  Helper* helper = New<Helper>(this->Ref());
 | 
	
		
			
				|  |  |    LoadBalancingPolicy::Args lb_policy_args;
 | 
	
		
			
				|  |  | -  lb_policy_args.combiner = combiner();
 | 
	
		
			
				|  |  | +  lb_policy_args.combiner = parent_->combiner();
 | 
	
		
			
				|  |  |    lb_policy_args.args = args;
 | 
	
		
			
				|  |  |    lb_policy_args.channel_control_helper =
 | 
	
		
			
				|  |  |        UniquePtr<ChannelControlHelper>(helper);
 | 
	
	
		
			
				|  | @@ -1397,22 +1402,27 @@ OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked(
 | 
	
		
			
				|  |  |    // child policy. This will make the child policy progress upon activity on xDS
 | 
	
		
			
				|  |  |    // LB, which in turn is tied to the application's call.
 | 
	
		
			
				|  |  |    grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
 | 
	
		
			
				|  |  | -                                   interested_parties());
 | 
	
		
			
				|  |  | +                                   parent_->interested_parties());
 | 
	
		
			
				|  |  |    return lb_policy;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void XdsLb::CreateOrUpdateChildPolicyLocked() {
 | 
	
		
			
				|  |  | -  if (shutting_down_) return;
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
 | 
	
		
			
				|  |  | +    xds_grpclb_serverlist* serverlist,
 | 
	
		
			
				|  |  | +    LoadBalancingPolicy::Config* child_policy_config,
 | 
	
		
			
				|  |  | +    const grpc_channel_args* args_in) {
 | 
	
		
			
				|  |  | +  if (parent_->shutting_down_) return;
 | 
	
		
			
				|  |  |    // This should never be invoked if we do not have serverlist_, as fallback
 | 
	
		
			
				|  |  |    // mode is disabled for xDS plugin.
 | 
	
		
			
				|  |  |    // TODO(juanlishen): Change this as part of implementing fallback mode.
 | 
	
		
			
				|  |  | -  GPR_ASSERT(serverlist_ != nullptr);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(serverlist_->num_servers > 0);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(serverlist != nullptr);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(serverlist->num_servers > 0);
 | 
	
		
			
				|  |  |    // Construct update args.
 | 
	
		
			
				|  |  |    UpdateArgs update_args;
 | 
	
		
			
				|  |  | -  update_args.addresses = ProcessServerlist(serverlist_);
 | 
	
		
			
				|  |  | -  update_args.config = child_policy_config_;
 | 
	
		
			
				|  |  | -  update_args.args = CreateChildPolicyArgsLocked();
 | 
	
		
			
				|  |  | +  update_args.addresses = ProcessServerlist(serverlist);
 | 
	
		
			
				|  |  | +  update_args.config =
 | 
	
		
			
				|  |  | +      child_policy_config == nullptr ? nullptr : child_policy_config->Ref();
 | 
	
		
			
				|  |  | +  update_args.args = CreateChildPolicyArgsLocked(args_in);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    // If the child policy name changes, we need to create a new child
 | 
	
		
			
				|  |  |    // policy.  When this happens, we leave child_policy_ as-is and store
 | 
	
		
			
				|  |  |    // the new child policy in pending_child_policy_.  Once the new child
 | 
	
	
		
			
				|  | @@ -1464,9 +1474,9 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
 | 
	
		
			
				|  |  |    //       when the new child transitions into state READY.
 | 
	
		
			
				|  |  |    // TODO(juanlishen): If the child policy is not configured via service config,
 | 
	
		
			
				|  |  |    // use whatever algorithm is specified by the balancer.
 | 
	
		
			
				|  |  | -  const char* child_policy_name = child_policy_config_ == nullptr
 | 
	
		
			
				|  |  | +  const char* child_policy_name = child_policy_config == nullptr
 | 
	
		
			
				|  |  |                                        ? "round_robin"
 | 
	
		
			
				|  |  | -                                      : child_policy_config_->name();
 | 
	
		
			
				|  |  | +                                      : child_policy_config->name();
 | 
	
		
			
				|  |  |    const bool create_policy =
 | 
	
		
			
				|  |  |        // case 1
 | 
	
		
			
				|  |  |        child_policy_ == nullptr ||
 | 
	
	
		
			
				|  | @@ -1512,6 +1522,145 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
 | 
	
		
			
				|  |  |    policy_to_update->UpdateLocked(std::move(update_args));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() {
 | 
	
		
			
				|  |  | +  // Remove the child policy's interested_parties pollset_set from the
 | 
	
		
			
				|  |  | +  // xDS policy.
 | 
	
		
			
				|  |  | +  grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
 | 
	
		
			
				|  |  | +                                   parent_->interested_parties());
 | 
	
		
			
				|  |  | +  if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | +    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | +        pending_child_policy_->interested_parties(),
 | 
	
		
			
				|  |  | +        parent_->interested_parties());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  {
 | 
	
		
			
				|  |  | +    MutexLock lock(&child_policy_mu_);
 | 
	
		
			
				|  |  | +    child_policy_.reset();
 | 
	
		
			
				|  |  | +    pending_child_policy_.reset();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
 | 
	
		
			
				|  |  | +  child_policy_->ResetBackoffLocked();
 | 
	
		
			
				|  |  | +  if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | +    pending_child_policy_->ResetBackoffLocked();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::FillChildRefsForChannelz(
 | 
	
		
			
				|  |  | +    channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  | +    channelz::ChildRefsList* child_channels) {
 | 
	
		
			
				|  |  | +  MutexLock lock(&child_policy_mu_);
 | 
	
		
			
				|  |  | +  child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
 | 
	
		
			
				|  |  | +  if (pending_child_policy_ != nullptr) {
 | 
	
		
			
				|  |  | +    pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
 | 
	
		
			
				|  |  | +                                                    child_channels);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::Orphan() {
 | 
	
		
			
				|  |  | +  ShutdownLocked();
 | 
	
		
			
				|  |  | +  Unref();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// LocalityEntry::Helper implementation
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(child_ != nullptr);
 | 
	
		
			
				|  |  | +  return child_ == entry_->pending_child_policy_.get();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(child_ != nullptr);
 | 
	
		
			
				|  |  | +  return child_ == entry_->child_policy_.get();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
 | 
	
		
			
				|  |  | +    const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | +  if (entry_->parent_->shutting_down_ ||
 | 
	
		
			
				|  |  | +      (!CalledByPendingChild() && !CalledByCurrentChild())) {
 | 
	
		
			
				|  |  | +    return nullptr;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return entry_->parent_->channel_control_helper()->CreateSubchannel(args);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +grpc_channel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateChannel(
 | 
	
		
			
				|  |  | +    const char* target, const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | +  if (entry_->parent_->shutting_down_ ||
 | 
	
		
			
				|  |  | +      (!CalledByPendingChild() && !CalledByCurrentChild())) {
 | 
	
		
			
				|  |  | +    return nullptr;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return entry_->parent_->channel_control_helper()->CreateChannel(target, args);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
 | 
	
		
			
				|  |  | +    grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | +    UniquePtr<SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | +  if (entry_->parent_->shutting_down_) {
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // If this request is from the pending child policy, ignore it until
 | 
	
		
			
				|  |  | +  // it reports READY, at which point we swap it into place.
 | 
	
		
			
				|  |  | +  if (CalledByPendingChild()) {
 | 
	
		
			
				|  |  | +    if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +              "[xdslb %p helper %p] pending child policy %p reports state=%s",
 | 
	
		
			
				|  |  | +              entry_->parent_.get(), this, entry_->pending_child_policy_.get(),
 | 
	
		
			
				|  |  | +              grpc_connectivity_state_name(state));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (state != GRPC_CHANNEL_READY) {
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | +      return;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    grpc_pollset_set_del_pollset_set(
 | 
	
		
			
				|  |  | +        entry_->child_policy_->interested_parties(),
 | 
	
		
			
				|  |  | +        entry_->parent_->interested_parties());
 | 
	
		
			
				|  |  | +    MutexLock lock(&entry_->child_policy_mu_);
 | 
	
		
			
				|  |  | +    entry_->child_policy_ = std::move(entry_->pending_child_policy_);
 | 
	
		
			
				|  |  | +  } else if (!CalledByCurrentChild()) {
 | 
	
		
			
				|  |  | +    // This request is from an outdated child, so ignore it.
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // TODO(juanlishen): When in fallback mode, pass the child picker
 | 
	
		
			
				|  |  | +  // through without wrapping it.  (Or maybe use a different helper for
 | 
	
		
			
				|  |  | +  // the fallback policy?)
 | 
	
		
			
				|  |  | +  GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
 | 
	
		
			
				|  |  | +  RefCountedPtr<XdsLbClientStats> client_stats =
 | 
	
		
			
				|  |  | +      entry_->parent_->lb_chand_->lb_calld() == nullptr
 | 
	
		
			
				|  |  | +          ? nullptr
 | 
	
		
			
				|  |  | +          : entry_->parent_->lb_chand_->lb_calld()->client_stats();
 | 
	
		
			
				|  |  | +  entry_->parent_->channel_control_helper()->UpdateState(
 | 
	
		
			
				|  |  | +      state, state_error,
 | 
	
		
			
				|  |  | +      UniquePtr<SubchannelPicker>(
 | 
	
		
			
				|  |  | +          New<Picker>(std::move(picker), std::move(client_stats))));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {
 | 
	
		
			
				|  |  | +  if (entry_->parent_->shutting_down_) return;
 | 
	
		
			
				|  |  | +  // If there is a pending child policy, ignore re-resolution requests
 | 
	
		
			
				|  |  | +  // from the current child policy (or any outdated child).
 | 
	
		
			
				|  |  | +  if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (grpc_lb_xds_trace.enabled()) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +            "[xdslb %p] Re-resolution requested from the internal RR policy "
 | 
	
		
			
				|  |  | +            "(%p).",
 | 
	
		
			
				|  |  | +            entry_->parent_.get(), entry_->child_policy_.get());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
 | 
	
		
			
				|  |  | +  // If we are talking to a balancer, we expect to get updated addresses
 | 
	
		
			
				|  |  | +  // from the balancer, so we can ignore the re-resolution request from
 | 
	
		
			
				|  |  | +  // the child policy. Otherwise, pass the re-resolution request up to the
 | 
	
		
			
				|  |  | +  // channel.
 | 
	
		
			
				|  |  | +  if (entry_->parent_->lb_chand_->lb_calld() == nullptr ||
 | 
	
		
			
				|  |  | +      !entry_->parent_->lb_chand_->lb_calld()->seen_initial_response()) {
 | 
	
		
			
				|  |  | +    entry_->parent_->channel_control_helper()->RequestReresolution();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // factory
 | 
	
		
			
				|  |  |  //
 |