|  | @@ -74,7 +74,6 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/time.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/client_channel.h"
 | 
	
		
			
				|  |  | -#include "src/core/ext/filters/client_channel/client_channel_factory.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
 | 
	
	
		
			
				|  | @@ -131,16 +130,6 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    void UpdateLocked(const grpc_channel_args& args,
 | 
	
		
			
				|  |  |                      grpc_json* lb_config) override;
 | 
	
		
			
				|  |  | -  bool PickLocked(PickState* pick, grpc_error** error) override;
 | 
	
		
			
				|  |  | -  void CancelPickLocked(PickState* pick, grpc_error* error) override;
 | 
	
		
			
				|  |  | -  void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
 | 
	
		
			
				|  |  | -                                 uint32_t initial_metadata_flags_eq,
 | 
	
		
			
				|  |  | -                                 grpc_error* error) override;
 | 
	
		
			
				|  |  | -  void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
 | 
	
		
			
				|  |  | -                                 grpc_closure* closure) override;
 | 
	
		
			
				|  |  | -  grpc_connectivity_state CheckConnectivityLocked(
 | 
	
		
			
				|  |  | -      grpc_error** connectivity_error) override;
 | 
	
		
			
				|  |  | -  void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
 | 
	
		
			
				|  |  |    void ExitIdleLocked() override;
 | 
	
		
			
				|  |  |    void ResetBackoffLocked() override;
 | 
	
		
			
				|  |  |    void FillChildRefsForChannelz(
 | 
	
	
		
			
				|  | @@ -148,31 +137,6 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |        channelz::ChildRefsList* child_channels) override;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  /// Linked list of pending pick requests. It stores all information needed to
 | 
	
		
			
				|  |  | -  /// eventually call (Round Robin's) pick() on them. They mainly stay pending
 | 
	
		
			
				|  |  | -  /// waiting for the RR policy to be created.
 | 
	
		
			
				|  |  | -  ///
 | 
	
		
			
				|  |  | -  /// Note that when a pick is sent to the RR policy, we inject our own
 | 
	
		
			
				|  |  | -  /// on_complete callback, so that we can intercept the result before
 | 
	
		
			
				|  |  | -  /// invoking the original on_complete callback.  This allows us to set the
 | 
	
		
			
				|  |  | -  /// LB token metadata and add client_stats to the call context.
 | 
	
		
			
				|  |  | -  /// See \a pending_pick_complete() for details.
 | 
	
		
			
				|  |  | -  struct PendingPick {
 | 
	
		
			
				|  |  | -    // The grpclb instance that created the wrapping. This instance is not
 | 
	
		
			
				|  |  | -    // owned; reference counts are untouched. It's used only for logging
 | 
	
		
			
				|  |  | -    // purposes.
 | 
	
		
			
				|  |  | -    GrpcLb* grpclb_policy;
 | 
	
		
			
				|  |  | -    // The original pick.
 | 
	
		
			
				|  |  | -    PickState* pick;
 | 
	
		
			
				|  |  | -    // Our on_complete closure and the original one.
 | 
	
		
			
				|  |  | -    grpc_closure on_complete;
 | 
	
		
			
				|  |  | -    grpc_closure* original_on_complete;
 | 
	
		
			
				|  |  | -    // Stats for client-side load reporting.
 | 
	
		
			
				|  |  | -    RefCountedPtr<GrpcLbClientStats> client_stats;
 | 
	
		
			
				|  |  | -    // Next pending pick.
 | 
	
		
			
				|  |  | -    PendingPick* next = nullptr;
 | 
	
		
			
				|  |  | -  };
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    /// Contains a call to the LB server and all the data related to the call.
 | 
	
		
			
				|  |  |    class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
 | 
	
		
			
				|  |  |     public:
 | 
	
	
		
			
				|  | @@ -248,6 +212,80 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |      grpc_closure client_load_report_closure_;
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  class Serverlist : public RefCounted<Serverlist> {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    // Takes ownership of serverlist.
 | 
	
		
			
				|  |  | +    explicit Serverlist(grpc_grpclb_serverlist* serverlist)
 | 
	
		
			
				|  |  | +        : serverlist_(serverlist) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    bool operator==(const Serverlist& other) const;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Returns a text representation suitable for logging.
 | 
	
		
			
				|  |  | +    UniquePtr<char> AsText() const;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Extracts all non-drop entries into a ServerAddressList.
 | 
	
		
			
				|  |  | +    ServerAddressList GetServerAddressList() const;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Returns true if the serverlist contains at least one drop entry and
 | 
	
		
			
				|  |  | +    // no backend address entries.
 | 
	
		
			
				|  |  | +    bool ContainsAllDropEntries() const;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Returns the LB token to use for a drop, or null if the call
 | 
	
		
			
				|  |  | +    // should not be dropped.
 | 
	
		
			
				|  |  | +    // Intended to be called from picker, so calls will be externally
 | 
	
		
			
				|  |  | +    // synchronized.
 | 
	
		
			
				|  |  | +    const char* ShouldDrop();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    grpc_grpclb_serverlist* serverlist_;
 | 
	
		
			
				|  |  | +    size_t drop_index_ = 0;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  class Picker : public SubchannelPicker {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
 | 
	
		
			
				|  |  | +           UniquePtr<SubchannelPicker> child_picker,
 | 
	
		
			
				|  |  | +           RefCountedPtr<GrpcLbClientStats> client_stats)
 | 
	
		
			
				|  |  | +        : parent_(parent),
 | 
	
		
			
				|  |  | +          serverlist_(std::move(serverlist)),
 | 
	
		
			
				|  |  | +          child_picker_(std::move(child_picker)),
 | 
	
		
			
				|  |  | +          client_stats_(std::move(client_stats)) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    PickResult Pick(PickState* pick, grpc_error** error) override;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    // Storing the address for logging, but not holding a ref.
 | 
	
		
			
				|  |  | +    // DO NOT DEFERENCE!
 | 
	
		
			
				|  |  | +    GrpcLb* parent_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Serverlist to be used for determining drops.
 | 
	
		
			
				|  |  | +    RefCountedPtr<Serverlist> serverlist_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    UniquePtr<SubchannelPicker> child_picker_;
 | 
	
		
			
				|  |  | +    RefCountedPtr<GrpcLbClientStats> client_stats_;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  class Helper : public ChannelControlHelper {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    explicit Helper(RefCountedPtr<GrpcLb> parent)
 | 
	
		
			
				|  |  | +        : parent_(std::move(parent)) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | +    grpc_channel* CreateChannel(const char* target,
 | 
	
		
			
				|  |  | +                                grpc_client_channel_type type,
 | 
	
		
			
				|  |  | +                                const grpc_channel_args& args) override;
 | 
	
		
			
				|  |  | +    void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
 | 
	
		
			
				|  |  | +                     UniquePtr<SubchannelPicker> picker) override;
 | 
	
		
			
				|  |  | +    void RequestReresolution() override;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    RefCountedPtr<GrpcLb> parent_;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    ~GrpcLb();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    void ShutdownLocked() override;
 | 
	
	
		
			
				|  | @@ -264,24 +302,10 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |    static void OnBalancerChannelConnectivityChangedLocked(void* arg,
 | 
	
		
			
				|  |  |                                                           grpc_error* error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Pending pick methods.
 | 
	
		
			
				|  |  | -  static void PendingPickSetMetadataAndContext(PendingPick* pp);
 | 
	
		
			
				|  |  | -  PendingPick* PendingPickCreate(PickState* pick);
 | 
	
		
			
				|  |  | -  void AddPendingPick(PendingPick* pp);
 | 
	
		
			
				|  |  | -  static void OnPendingPickComplete(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    // Methods for dealing with the RR policy.
 | 
	
		
			
				|  |  |    void CreateOrUpdateRoundRobinPolicyLocked();
 | 
	
		
			
				|  |  |    grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
 | 
	
		
			
				|  |  |    void CreateRoundRobinPolicyLocked(Args args);
 | 
	
		
			
				|  |  | -  bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
 | 
	
		
			
				|  |  | -                                      grpc_error** error);
 | 
	
		
			
				|  |  | -  void UpdateConnectivityStateFromRoundRobinPolicyLocked(
 | 
	
		
			
				|  |  | -      grpc_error* rr_state_error);
 | 
	
		
			
				|  |  | -  static void OnRoundRobinConnectivityChangedLocked(void* arg,
 | 
	
		
			
				|  |  | -                                                    grpc_error* error);
 | 
	
		
			
				|  |  | -  static void OnRoundRobinRequestReresolutionLocked(void* arg,
 | 
	
		
			
				|  |  | -                                                    grpc_error* error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Who the client is trying to communicate with.
 | 
	
		
			
				|  |  |    const char* server_name_ = nullptr;
 | 
	
	
		
			
				|  | @@ -292,7 +316,6 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |    // Internal state.
 | 
	
		
			
				|  |  |    bool started_picking_ = false;
 | 
	
		
			
				|  |  |    bool shutting_down_ = false;
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_tracker state_tracker_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // The channel for communicating with the LB server.
 | 
	
		
			
				|  |  |    grpc_channel* lb_channel_ = nullptr;
 | 
	
	
		
			
				|  | @@ -321,11 +344,7 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // The deserialized response from the balancer. May be nullptr until one
 | 
	
		
			
				|  |  |    // such response has arrived.
 | 
	
		
			
				|  |  | -  grpc_grpclb_serverlist* serverlist_ = nullptr;
 | 
	
		
			
				|  |  | -  // Index into serverlist for next pick.
 | 
	
		
			
				|  |  | -  // If the server at this index is a drop, we return a drop.
 | 
	
		
			
				|  |  | -  // Otherwise, we delegate to the RR policy.
 | 
	
		
			
				|  |  | -  size_t serverlist_index_ = 0;
 | 
	
		
			
				|  |  | +  RefCountedPtr<Serverlist> serverlist_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Timeout in milliseconds for before using fallback backend addresses.
 | 
	
		
			
				|  |  |    // 0 means not using fallback.
 | 
	
	
		
			
				|  | @@ -337,20 +356,65 @@ class GrpcLb : public LoadBalancingPolicy {
 | 
	
		
			
				|  |  |    grpc_timer lb_fallback_timer_;
 | 
	
		
			
				|  |  |    grpc_closure lb_on_fallback_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Pending picks that are waiting on the RR policy's connectivity.
 | 
	
		
			
				|  |  | -  PendingPick* pending_picks_ = nullptr;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    // The RR policy to use for the backends.
 | 
	
		
			
				|  |  |    OrphanablePtr<LoadBalancingPolicy> rr_policy_;
 | 
	
		
			
				|  |  | -  grpc_connectivity_state rr_connectivity_state_;
 | 
	
		
			
				|  |  | -  grpc_closure on_rr_connectivity_changed_;
 | 
	
		
			
				|  |  | -  grpc_closure on_rr_request_reresolution_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  | -// serverlist parsing code
 | 
	
		
			
				|  |  | +// GrpcLb::Serverlist
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
 | 
	
		
			
				|  |  | +  return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void ParseServer(const grpc_grpclb_server* server,
 | 
	
		
			
				|  |  | +                 grpc_resolved_address* addr) {
 | 
	
		
			
				|  |  | +  memset(addr, 0, sizeof(*addr));
 | 
	
		
			
				|  |  | +  if (server->drop) return;
 | 
	
		
			
				|  |  | +  const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
 | 
	
		
			
				|  |  | +  /* the addresses are given in binary format (a in(6)_addr struct) in
 | 
	
		
			
				|  |  | +   * server->ip_address.bytes. */
 | 
	
		
			
				|  |  | +  const grpc_grpclb_ip_address* ip = &server->ip_address;
 | 
	
		
			
				|  |  | +  if (ip->size == 4) {
 | 
	
		
			
				|  |  | +    addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
 | 
	
		
			
				|  |  | +    grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
 | 
	
		
			
				|  |  | +    addr4->sin_family = GRPC_AF_INET;
 | 
	
		
			
				|  |  | +    memcpy(&addr4->sin_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | +    addr4->sin_port = netorder_port;
 | 
	
		
			
				|  |  | +  } else if (ip->size == 16) {
 | 
	
		
			
				|  |  | +    addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
 | 
	
		
			
				|  |  | +    grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
 | 
	
		
			
				|  |  | +    addr6->sin6_family = GRPC_AF_INET6;
 | 
	
		
			
				|  |  | +    memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | +    addr6->sin6_port = netorder_port;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +UniquePtr<char> GrpcLb::Serverlist::AsText() const {
 | 
	
		
			
				|  |  | +  gpr_strvec entries;
 | 
	
		
			
				|  |  | +  gpr_strvec_init(&entries);
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < serverlist_->num_servers; ++i) {
 | 
	
		
			
				|  |  | +    const auto* server = serverlist_->servers[i];
 | 
	
		
			
				|  |  | +    char* ipport;
 | 
	
		
			
				|  |  | +    if (server->drop) {
 | 
	
		
			
				|  |  | +      ipport = gpr_strdup("(drop)");
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      grpc_resolved_address addr;
 | 
	
		
			
				|  |  | +      ParseServer(server, &addr);
 | 
	
		
			
				|  |  | +      grpc_sockaddr_to_string(&ipport, &addr, false);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    char* entry;
 | 
	
		
			
				|  |  | +    gpr_asprintf(&entry, "  %" PRIuPTR ": %s token=%s\n", i, ipport,
 | 
	
		
			
				|  |  | +                 server->load_balance_token);
 | 
	
		
			
				|  |  | +    gpr_free(ipport);
 | 
	
		
			
				|  |  | +    gpr_strvec_add(&entries, entry);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
 | 
	
		
			
				|  |  | +  gpr_strvec_destroy(&entries);
 | 
	
		
			
				|  |  | +  return result;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  // vtable for LB token channel arg.
 | 
	
		
			
				|  |  |  void* lb_token_copy(void* token) {
 | 
	
		
			
				|  |  |    return token == nullptr
 | 
	
	
		
			
				|  | @@ -393,35 +457,12 @@ bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void ParseServer(const grpc_grpclb_server* server,
 | 
	
		
			
				|  |  | -                 grpc_resolved_address* addr) {
 | 
	
		
			
				|  |  | -  memset(addr, 0, sizeof(*addr));
 | 
	
		
			
				|  |  | -  if (server->drop) return;
 | 
	
		
			
				|  |  | -  const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
 | 
	
		
			
				|  |  | -  /* the addresses are given in binary format (a in(6)_addr struct) in
 | 
	
		
			
				|  |  | -   * server->ip_address.bytes. */
 | 
	
		
			
				|  |  | -  const grpc_grpclb_ip_address* ip = &server->ip_address;
 | 
	
		
			
				|  |  | -  if (ip->size == 4) {
 | 
	
		
			
				|  |  | -    addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
 | 
	
		
			
				|  |  | -    grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
 | 
	
		
			
				|  |  | -    addr4->sin_family = GRPC_AF_INET;
 | 
	
		
			
				|  |  | -    memcpy(&addr4->sin_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | -    addr4->sin_port = netorder_port;
 | 
	
		
			
				|  |  | -  } else if (ip->size == 16) {
 | 
	
		
			
				|  |  | -    addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
 | 
	
		
			
				|  |  | -    grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
 | 
	
		
			
				|  |  | -    addr6->sin6_family = GRPC_AF_INET6;
 | 
	
		
			
				|  |  | -    memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | -    addr6->sin6_port = netorder_port;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Returns addresses extracted from \a serverlist.
 | 
	
		
			
				|  |  | -ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
 | 
	
		
			
				|  |  | +// Returns addresses extracted from the serverlist.
 | 
	
		
			
				|  |  | +ServerAddressList GrpcLb::Serverlist::GetServerAddressList() const {
 | 
	
		
			
				|  |  |    ServerAddressList addresses;
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < serverlist->num_servers; ++i) {
 | 
	
		
			
				|  |  | -    const grpc_grpclb_server* server = serverlist->servers[i];
 | 
	
		
			
				|  |  | -    if (!IsServerValid(serverlist->servers[i], i, false)) continue;
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < serverlist_->num_servers; ++i) {
 | 
	
		
			
				|  |  | +    const grpc_grpclb_server* server = serverlist_->servers[i];
 | 
	
		
			
				|  |  | +    if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
 | 
	
		
			
				|  |  |      // Address processing.
 | 
	
		
			
				|  |  |      grpc_resolved_address addr;
 | 
	
		
			
				|  |  |      ParseServer(server, &addr);
 | 
	
	
		
			
				|  | @@ -456,6 +497,176 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
 | 
	
		
			
				|  |  |    return addresses;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
 | 
	
		
			
				|  |  | +  if (serverlist_->num_servers == 0) return false;
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < serverlist_->num_servers; ++i) {
 | 
	
		
			
				|  |  | +    if (!serverlist_->servers[i]->drop) return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +const char* GrpcLb::Serverlist::ShouldDrop() {
 | 
	
		
			
				|  |  | +  if (serverlist_->num_servers == 0) return nullptr;
 | 
	
		
			
				|  |  | +  grpc_grpclb_server* server = serverlist_->servers[drop_index_];
 | 
	
		
			
				|  |  | +  drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
 | 
	
		
			
				|  |  | +  return server->drop ? server->load_balance_token : nullptr;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// GrpcLb::Picker
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Adds lb_token of selected subchannel (address) to the call's initial
 | 
	
		
			
				|  |  | +// metadata.
 | 
	
		
			
				|  |  | +grpc_error* AddLbTokenToInitialMetadata(
 | 
	
		
			
				|  |  | +    grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +    grpc_metadata_batch* initial_metadata) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
 | 
	
		
			
				|  |  | +  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +                                      lb_token);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// Destroy function used when embedding client stats in call context.
 | 
	
		
			
				|  |  | +void DestroyClientStats(void* arg) {
 | 
	
		
			
				|  |  | +  static_cast<GrpcLbClientStats*>(arg)->Unref();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +GrpcLb::Picker::PickResult GrpcLb::Picker::Pick(PickState* pick,
 | 
	
		
			
				|  |  | +                                                grpc_error** error) {
 | 
	
		
			
				|  |  | +  // Check if we should drop the call.
 | 
	
		
			
				|  |  | +  const char* drop_token = serverlist_->ShouldDrop();
 | 
	
		
			
				|  |  | +  if (drop_token != nullptr) {
 | 
	
		
			
				|  |  | +    // Update client load reporting stats to indicate the number of
 | 
	
		
			
				|  |  | +    // dropped calls.  Note that we have to do this here instead of in
 | 
	
		
			
				|  |  | +    // the client_load_reporting filter, because we do not create a
 | 
	
		
			
				|  |  | +    // subchannel call (and therefore no client_load_reporting filter)
 | 
	
		
			
				|  |  | +    // for dropped calls.
 | 
	
		
			
				|  |  | +    if (client_stats_ != nullptr) {
 | 
	
		
			
				|  |  | +      client_stats_->AddCallDroppedLocked(drop_token);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return PICK_COMPLETE;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Forward pick to child policy.
 | 
	
		
			
				|  |  | +  PickResult result = child_picker_->Pick(pick, error);
 | 
	
		
			
				|  |  | +  // If pick succeeded, add LB token to initial metadata.
 | 
	
		
			
				|  |  | +  if (result == PickResult::PICK_COMPLETE &&
 | 
	
		
			
				|  |  | +      pick->connected_subchannel != nullptr) {
 | 
	
		
			
				|  |  | +    const grpc_arg* arg = grpc_channel_args_find(
 | 
	
		
			
				|  |  | +        pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
 | 
	
		
			
				|  |  | +    if (arg == nullptr) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "[grpclb %p picker %p] No LB token for connected subchannel "
 | 
	
		
			
				|  |  | +              "pick %p",
 | 
	
		
			
				|  |  | +              parent_, this, pick);
 | 
	
		
			
				|  |  | +      abort();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
 | 
	
		
			
				|  |  | +    AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
 | 
	
		
			
				|  |  | +                                &pick->lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +                                pick->initial_metadata);
 | 
	
		
			
				|  |  | +    // Pass on client stats via context. Passes ownership of the reference.
 | 
	
		
			
				|  |  | +    if (client_stats_ != nullptr) {
 | 
	
		
			
				|  |  | +      pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
 | 
	
		
			
				|  |  | +          client_stats_->Ref().release();
 | 
	
		
			
				|  |  | +      pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
 | 
	
		
			
				|  |  | +          DestroyClientStats;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return result;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// GrpcLb::Helper
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | +  if (parent_->shutting_down_) return nullptr;
 | 
	
		
			
				|  |  | +  return parent_->channel_control_helper()->CreateSubchannel(args);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
 | 
	
		
			
				|  |  | +                                            grpc_client_channel_type type,
 | 
	
		
			
				|  |  | +                                            const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | +  if (parent_->shutting_down_) return nullptr;
 | 
	
		
			
				|  |  | +  return parent_->channel_control_helper()->CreateChannel(target, type, args);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
 | 
	
		
			
				|  |  | +                                 grpc_error* state_error,
 | 
	
		
			
				|  |  | +                                 UniquePtr<SubchannelPicker> picker) {
 | 
	
		
			
				|  |  | +  if (parent_->shutting_down_) {
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(state_error);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // There are three cases to consider here:
 | 
	
		
			
				|  |  | +  // 1. We're in fallback mode.  In this case, we're always going to use
 | 
	
		
			
				|  |  | +  //    RR's result, so we pass its picker through as-is.
 | 
	
		
			
				|  |  | +  // 2. The serverlist contains only drop entries.  In this case, we
 | 
	
		
			
				|  |  | +  //    want to use our own picker so that we can return the drops.
 | 
	
		
			
				|  |  | +  // 3. Not in fallback mode and serverlist is not all drops (i.e., it
 | 
	
		
			
				|  |  | +  //    may be empty or contain at least one backend address).  There are
 | 
	
		
			
				|  |  | +  //    two sub-cases:
 | 
	
		
			
				|  |  | +  //    a. RR is reporting state READY.  In this case, we wrap RR's
 | 
	
		
			
				|  |  | +  //       picker in our own, so that we can handle drops and LB token
 | 
	
		
			
				|  |  | +  //       metadata for each pick.
 | 
	
		
			
				|  |  | +  //    b. RR is reporting a state other than READY.  In this case, we
 | 
	
		
			
				|  |  | +  //       don't want to use our own picker, because we don't want to
 | 
	
		
			
				|  |  | +  //       process drops for picks that yield a QUEUE result; this would
 | 
	
		
			
				|  |  | +  //       result in dropping too many calls, since we will see the
 | 
	
		
			
				|  |  | +  //       queued picks multiple times, and we'd consider each one a
 | 
	
		
			
				|  |  | +  //       separate call for the drop calculation.
 | 
	
		
			
				|  |  | +  //
 | 
	
		
			
				|  |  | +  // Cases 1 and 3b: return picker from RR as-is.
 | 
	
		
			
				|  |  | +  if (parent_->serverlist_ == nullptr ||
 | 
	
		
			
				|  |  | +      (!parent_->serverlist_->ContainsAllDropEntries() &&
 | 
	
		
			
				|  |  | +       state != GRPC_CHANNEL_READY)) {
 | 
	
		
			
				|  |  | +    if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +              "[grpclb %p helper %p] state=%s passing RR picker %p as-is",
 | 
	
		
			
				|  |  | +              parent_.get(), this, grpc_connectivity_state_name(state),
 | 
	
		
			
				|  |  | +              picker.get());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    parent_->channel_control_helper()->UpdateState(state, state_error,
 | 
	
		
			
				|  |  | +                                                   std::move(picker));
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Cases 2 and 3a: wrap picker from RR in our own picker.
 | 
	
		
			
				|  |  | +  if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping RR picker %p",
 | 
	
		
			
				|  |  | +            parent_.get(), this, grpc_connectivity_state_name(state),
 | 
	
		
			
				|  |  | +            picker.get());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  RefCountedPtr<GrpcLbClientStats> client_stats;
 | 
	
		
			
				|  |  | +  if (parent_->lb_calld_ != nullptr &&
 | 
	
		
			
				|  |  | +      parent_->lb_calld_->client_stats() != nullptr) {
 | 
	
		
			
				|  |  | +    client_stats = parent_->lb_calld_->client_stats()->Ref();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  parent_->channel_control_helper()->UpdateState(
 | 
	
		
			
				|  |  | +      state, state_error,
 | 
	
		
			
				|  |  | +      UniquePtr<SubchannelPicker>(
 | 
	
		
			
				|  |  | +          New<Picker>(parent_.get(), parent_->serverlist_, std::move(picker),
 | 
	
		
			
				|  |  | +                      std::move(client_stats))));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void GrpcLb::Helper::RequestReresolution() {
 | 
	
		
			
				|  |  | +  if (parent_->shutting_down_) return;
 | 
	
		
			
				|  |  | +  if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | +            "[grpclb %p] Re-resolution requested from the internal RR policy "
 | 
	
		
			
				|  |  | +            "(%p).",
 | 
	
		
			
				|  |  | +            parent_.get(), parent_->rr_policy_.get());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // If we are talking to a balancer, we expect to get updated addresses
 | 
	
		
			
				|  |  | +  // from the balancer, so we can ignore the re-resolution request from
 | 
	
		
			
				|  |  | +  // the RR policy. Otherwise, pass the re-resolution request up to the
 | 
	
		
			
				|  |  | +  // channel.
 | 
	
		
			
				|  |  | +  if (parent_->lb_calld_ == nullptr ||
 | 
	
		
			
				|  |  | +      !parent_->lb_calld_->seen_initial_response()) {
 | 
	
		
			
				|  |  | +    parent_->channel_control_helper()->RequestReresolution();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // GrpcLb::BalancerCallState
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -754,27 +965,20 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
 | 
	
		
			
				|  |  |                    response_slice)) != nullptr) {
 | 
	
		
			
				|  |  |      // Have seen initial response, look for serverlist.
 | 
	
		
			
				|  |  |      GPR_ASSERT(lb_calld->lb_call_ != nullptr);
 | 
	
		
			
				|  |  | +    auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
 | 
	
		
			
				|  |  |      if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | +      UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
 | 
	
		
			
				|  |  | -              " servers received",
 | 
	
		
			
				|  |  | -              grpclb_policy, lb_calld, serverlist->num_servers);
 | 
	
		
			
				|  |  | -      for (size_t i = 0; i < serverlist->num_servers; ++i) {
 | 
	
		
			
				|  |  | -        grpc_resolved_address addr;
 | 
	
		
			
				|  |  | -        ParseServer(serverlist->servers[i], &addr);
 | 
	
		
			
				|  |  | -        char* ipport;
 | 
	
		
			
				|  |  | -        grpc_sockaddr_to_string(&ipport, &addr, false);
 | 
	
		
			
				|  |  | -        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -                "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s",
 | 
	
		
			
				|  |  | -                grpclb_policy, lb_calld, i, ipport);
 | 
	
		
			
				|  |  | -        gpr_free(ipport);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +              " servers received:\n%s",
 | 
	
		
			
				|  |  | +              grpclb_policy, lb_calld, serverlist->num_servers,
 | 
	
		
			
				|  |  | +              serverlist_text.get());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Start sending client load report only after we start using the
 | 
	
		
			
				|  |  |      // serverlist returned from the current LB call.
 | 
	
		
			
				|  |  |      if (lb_calld->client_stats_report_interval_ > 0 &&
 | 
	
		
			
				|  |  |          lb_calld->client_stats_ == nullptr) {
 | 
	
		
			
				|  |  | -      lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
 | 
	
		
			
				|  |  | +      lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
 | 
	
		
			
				|  |  |        // TODO(roth): We currently track this ref manually.  Once the
 | 
	
		
			
				|  |  |        // ClosureRef API is ready, we should pass the RefCountedPtr<> along
 | 
	
		
			
				|  |  |        // with the callback.
 | 
	
	
		
			
				|  | @@ -783,19 +987,16 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
 | 
	
		
			
				|  |  |        lb_calld->ScheduleNextClientLoadReportLocked();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Check if the serverlist differs from the previous one.
 | 
	
		
			
				|  |  | -    if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) {
 | 
	
		
			
				|  |  | +    if (grpclb_policy->serverlist_ != nullptr &&
 | 
	
		
			
				|  |  | +        *grpclb_policy->serverlist_ == *serverlist_wrapper) {
 | 
	
		
			
				|  |  |        if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  |          gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                  "[grpclb %p] lb_calld=%p: Incoming server list identical to "
 | 
	
		
			
				|  |  |                  "current, ignoring.",
 | 
	
		
			
				|  |  |                  grpclb_policy, lb_calld);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      grpc_grpclb_destroy_serverlist(serverlist);
 | 
	
		
			
				|  |  |      } else {  // New serverlist.
 | 
	
		
			
				|  |  | -      if (grpclb_policy->serverlist_ != nullptr) {
 | 
	
		
			
				|  |  | -        // Dispose of the old serverlist.
 | 
	
		
			
				|  |  | -        grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | +      if (grpclb_policy->serverlist_ == nullptr) {
 | 
	
		
			
				|  |  |          // Dispose of the fallback.
 | 
	
		
			
				|  |  |          grpclb_policy->fallback_backend_addresses_.reset();
 | 
	
		
			
				|  |  |          if (grpclb_policy->fallback_timer_callback_pending_) {
 | 
	
	
		
			
				|  | @@ -805,8 +1006,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
 | 
	
		
			
				|  |  |        // Update the serverlist in the GrpcLb instance. This serverlist
 | 
	
		
			
				|  |  |        // instance will be destroyed either upon the next update or when the
 | 
	
		
			
				|  |  |        // GrpcLb instance is destroyed.
 | 
	
		
			
				|  |  | -      grpclb_policy->serverlist_ = serverlist;
 | 
	
		
			
				|  |  | -      grpclb_policy->serverlist_index_ = 0;
 | 
	
		
			
				|  |  | +      grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
 | 
	
		
			
				|  |  |        grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
	
		
			
				|  | @@ -853,13 +1053,13 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
 | 
	
		
			
				|  |  |              lb_calld->lb_call_, grpc_error_string(error));
 | 
	
		
			
				|  |  |      gpr_free(status_details);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    // If this lb_calld is still in use, this call ended because of a failure so
 | 
	
		
			
				|  |  |    // we want to retry connecting. Otherwise, we have deliberately ended this
 | 
	
		
			
				|  |  |    // call and no further action is required.
 | 
	
		
			
				|  |  |    if (lb_calld == grpclb_policy->lb_calld_.get()) {
 | 
	
		
			
				|  |  |      grpclb_policy->lb_calld_.reset();
 | 
	
		
			
				|  |  |      GPR_ASSERT(!grpclb_policy->shutting_down_);
 | 
	
		
			
				|  |  | +    grpclb_policy->channel_control_helper()->RequestReresolution();
 | 
	
		
			
				|  |  |      if (lb_calld->seen_initial_response_) {
 | 
	
		
			
				|  |  |        // If we lose connection to the LB server, reset the backoff and restart
 | 
	
		
			
				|  |  |        // the LB call immediately.
 | 
	
	
		
			
				|  | @@ -917,6 +1117,9 @@ grpc_channel_args* BuildBalancerChannelArgs(
 | 
	
		
			
				|  |  |        // LB policy name, since we want to use the default (pick_first) in
 | 
	
		
			
				|  |  |        // the LB channel.
 | 
	
		
			
				|  |  |        GRPC_ARG_LB_POLICY_NAME,
 | 
	
		
			
				|  |  | +      // Strip out the service config, since we don't want the LB policy
 | 
	
		
			
				|  |  | +      // config specified for the parent channel to affect the LB channel.
 | 
	
		
			
				|  |  | +      GRPC_ARG_SERVICE_CONFIG,
 | 
	
		
			
				|  |  |        // The channel arg for the server URI, since that will be different for
 | 
	
		
			
				|  |  |        // the LB channel than for the parent channel.  The client channel
 | 
	
		
			
				|  |  |        // factory will re-add this arg with the right value.
 | 
	
	
		
			
				|  | @@ -928,7 +1131,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
 | 
	
		
			
				|  |  |        // resolver will have is_balancer=false, whereas our own addresses have
 | 
	
		
			
				|  |  |        // is_balancer=true.  We need the LB channel to return addresses with
 | 
	
		
			
				|  |  |        // is_balancer=false so that it does not wind up recursively using the
 | 
	
		
			
				|  |  | -      // grpclb LB policy, as per the special case logic in client_channel.c.
 | 
	
		
			
				|  |  | +      // grpclb LB policy.
 | 
	
		
			
				|  |  |        GRPC_ARG_SERVER_ADDRESS_LIST,
 | 
	
		
			
				|  |  |        // The fake resolver response generator, because we are replacing it
 | 
	
		
			
				|  |  |        // with the one from the grpclb policy, used to propagate updates to
 | 
	
	
		
			
				|  | @@ -988,13 +1191,6 @@ GrpcLb::GrpcLb(LoadBalancingPolicy::Args args)
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
 | 
	
		
			
				|  |  |                      &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
 | 
	
		
			
				|  |  |                      grpc_combiner_scheduler(args.combiner));
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
 | 
	
		
			
				|  |  | -                    &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
 | 
	
		
			
				|  |  | -                    grpc_combiner_scheduler(args.combiner));
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
 | 
	
		
			
				|  |  | -                    &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
 | 
	
		
			
				|  |  | -                    grpc_combiner_scheduler(args.combiner));
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
 | 
	
		
			
				|  |  |    // Record server name.
 | 
	
		
			
				|  |  |    const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
 | 
	
		
			
				|  |  |    const char* server_uri = grpc_channel_arg_get_string(arg);
 | 
	
	
		
			
				|  | @@ -1017,20 +1213,18 @@ GrpcLb::GrpcLb(LoadBalancingPolicy::Args args)
 | 
	
		
			
				|  |  |        arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
 | 
	
		
			
				|  |  |    // Process channel args.
 | 
	
		
			
				|  |  |    ProcessChannelArgsLocked(*args.args);
 | 
	
		
			
				|  |  | +  // Initialize channel with a picker that will start us connecting.
 | 
	
		
			
				|  |  | +  channel_control_helper()->UpdateState(
 | 
	
		
			
				|  |  | +      GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
 | 
	
		
			
				|  |  | +      UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  GrpcLb::~GrpcLb() {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(pending_picks_ == nullptr);
 | 
	
		
			
				|  |  |    gpr_free((void*)server_name_);
 | 
	
		
			
				|  |  |    grpc_channel_args_destroy(args_);
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_destroy(&state_tracker_);
 | 
	
		
			
				|  |  | -  if (serverlist_ != nullptr) {
 | 
	
		
			
				|  |  | -    grpc_grpclb_destroy_serverlist(serverlist_);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void GrpcLb::ShutdownLocked() {
 | 
	
		
			
				|  |  | -  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
 | 
	
		
			
				|  |  |    shutting_down_ = true;
 | 
	
		
			
				|  |  |    lb_calld_.reset();
 | 
	
		
			
				|  |  |    if (retry_timer_callback_pending_) {
 | 
	
	
		
			
				|  | @@ -1040,7 +1234,6 @@ void GrpcLb::ShutdownLocked() {
 | 
	
		
			
				|  |  |      grpc_timer_cancel(&lb_fallback_timer_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    rr_policy_.reset();
 | 
	
		
			
				|  |  | -  TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
 | 
	
		
			
				|  |  |    // We destroy the LB channel here instead of in our destructor because
 | 
	
		
			
				|  |  |    // destroying the channel triggers a last callback to
 | 
	
		
			
				|  |  |    // OnBalancerChannelConnectivityChangedLocked(), and we need to be
 | 
	
	
		
			
				|  | @@ -1050,109 +1243,12 @@ void GrpcLb::ShutdownLocked() {
 | 
	
		
			
				|  |  |      lb_channel_ = nullptr;
 | 
	
		
			
				|  |  |      gpr_atm_no_barrier_store(&lb_channel_uuid_, 0);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
 | 
	
		
			
				|  |  | -                              GRPC_ERROR_REF(error), "grpclb_shutdown");
 | 
	
		
			
				|  |  | -  // Clear pending picks.
 | 
	
		
			
				|  |  | -  PendingPick* pp;
 | 
	
		
			
				|  |  | -  while ((pp = pending_picks_) != nullptr) {
 | 
	
		
			
				|  |  | -    pending_picks_ = pp->next;
 | 
	
		
			
				|  |  | -    pp->pick->connected_subchannel.reset();
 | 
	
		
			
				|  |  | -    // Note: pp is deleted in this callback.
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // public methods
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
 | 
	
		
			
				|  |  | -  PendingPick* pp;
 | 
	
		
			
				|  |  | -  while ((pp = pending_picks_) != nullptr) {
 | 
	
		
			
				|  |  | -    pending_picks_ = pp->next;
 | 
	
		
			
				|  |  | -    pp->pick->on_complete = pp->original_on_complete;
 | 
	
		
			
				|  |  | -    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    if (new_policy->PickLocked(pp->pick, &error)) {
 | 
	
		
			
				|  |  | -      // Synchronous return; schedule closure.
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    Delete(pp);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Cancel a specific pending pick.
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -// A grpclb pick progresses as follows:
 | 
	
		
			
				|  |  | -// - If there's a Round Robin policy (rr_policy_) available, it'll be
 | 
	
		
			
				|  |  | -//   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
 | 
	
		
			
				|  |  | -//   that point onwards, it'll be RR's responsibility. For cancellations, that
 | 
	
		
			
				|  |  | -//   implies the pick needs also be cancelled by the RR instance.
 | 
	
		
			
				|  |  | -// - Otherwise, without an RR instance, picks stay pending at this policy's
 | 
	
		
			
				|  |  | -//   level (grpclb), inside the pending_picks_ list. To cancel these,
 | 
	
		
			
				|  |  | -//   we invoke the completion closure and set the pick's connected
 | 
	
		
			
				|  |  | -//   subchannel to nullptr right here.
 | 
	
		
			
				|  |  | -void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
 | 
	
		
			
				|  |  | -  PendingPick* pp = pending_picks_;
 | 
	
		
			
				|  |  | -  pending_picks_ = nullptr;
 | 
	
		
			
				|  |  | -  while (pp != nullptr) {
 | 
	
		
			
				|  |  | -    PendingPick* next = pp->next;
 | 
	
		
			
				|  |  | -    if (pp->pick == pick) {
 | 
	
		
			
				|  |  | -      pick->connected_subchannel.reset();
 | 
	
		
			
				|  |  | -      // Note: pp is deleted in this callback.
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(&pp->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -                             "Pick Cancelled", &error, 1));
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      pp->next = pending_picks_;
 | 
	
		
			
				|  |  | -      pending_picks_ = pp;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    pp = next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (rr_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Cancel all pending picks.
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -// A grpclb pick progresses as follows:
 | 
	
		
			
				|  |  | -// - If there's a Round Robin policy (rr_policy_) available, it'll be
 | 
	
		
			
				|  |  | -//   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
 | 
	
		
			
				|  |  | -//   that point onwards, it'll be RR's responsibility. For cancellations, that
 | 
	
		
			
				|  |  | -//   implies the pick needs also be cancelled by the RR instance.
 | 
	
		
			
				|  |  | -// - Otherwise, without an RR instance, picks stay pending at this policy's
 | 
	
		
			
				|  |  | -//   level (grpclb), inside the pending_picks_ list. To cancel these,
 | 
	
		
			
				|  |  | -//   we invoke the completion closure and set the pick's connected
 | 
	
		
			
				|  |  | -//   subchannel to nullptr right here.
 | 
	
		
			
				|  |  | -void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
 | 
	
		
			
				|  |  | -                                       uint32_t initial_metadata_flags_eq,
 | 
	
		
			
				|  |  | -                                       grpc_error* error) {
 | 
	
		
			
				|  |  | -  PendingPick* pp = pending_picks_;
 | 
	
		
			
				|  |  | -  pending_picks_ = nullptr;
 | 
	
		
			
				|  |  | -  while (pp != nullptr) {
 | 
	
		
			
				|  |  | -    PendingPick* next = pp->next;
 | 
	
		
			
				|  |  | -    if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
 | 
	
		
			
				|  |  | -        initial_metadata_flags_eq) {
 | 
	
		
			
				|  |  | -      // Note: pp is deleted in this callback.
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(&pp->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -                             "Pick Cancelled", &error, 1));
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      pp->next = pending_picks_;
 | 
	
		
			
				|  |  | -      pending_picks_ = pp;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    pp = next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (rr_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
 | 
	
		
			
				|  |  | -                                          initial_metadata_flags_eq,
 | 
	
		
			
				|  |  | -                                          GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  void GrpcLb::ExitIdleLocked() {
 | 
	
		
			
				|  |  |    if (!started_picking_) {
 | 
	
		
			
				|  |  |      StartPickingLocked();
 | 
	
	
		
			
				|  | @@ -1168,37 +1264,6 @@ void GrpcLb::ResetBackoffLocked() {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
 | 
	
		
			
				|  |  | -  PendingPick* pp = PendingPickCreate(pick);
 | 
	
		
			
				|  |  | -  bool pick_done = false;
 | 
	
		
			
				|  |  | -  if (rr_policy_ != nullptr) {
 | 
	
		
			
				|  |  | -    if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
 | 
	
		
			
				|  |  | -              rr_policy_.get());
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    pick_done =
 | 
	
		
			
				|  |  | -        PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
 | 
	
		
			
				|  |  | -  } else {  // rr_policy_ == NULL
 | 
	
		
			
				|  |  | -    if (pick->on_complete == nullptr) {
 | 
	
		
			
				|  |  | -      *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
 | 
	
		
			
				|  |  | -          "No pick result available but synchronous result required.");
 | 
	
		
			
				|  |  | -      pick_done = true;
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -                "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
 | 
	
		
			
				|  |  | -                this);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      AddPendingPick(pp);
 | 
	
		
			
				|  |  | -      if (!started_picking_) {
 | 
	
		
			
				|  |  | -        StartPickingLocked();
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      pick_done = false;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return pick_done;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  void GrpcLb::FillChildRefsForChannelz(
 | 
	
		
			
				|  |  |      channelz::ChildRefsList* child_subchannels,
 | 
	
		
			
				|  |  |      channelz::ChildRefsList* child_channels) {
 | 
	
	
		
			
				|  | @@ -1212,17 +1277,6 @@ void GrpcLb::FillChildRefsForChannelz(
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
 | 
	
		
			
				|  |  | -    grpc_error** connectivity_error) {
 | 
	
		
			
				|  |  | -  return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
 | 
	
		
			
				|  |  | -                                       grpc_closure* notify) {
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
 | 
	
		
			
				|  |  | -                                                 notify);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  // Returns the backend addresses extracted from the given addresses.
 | 
	
		
			
				|  |  |  UniquePtr<ServerAddressList> ExtractBackendAddresses(
 | 
	
		
			
				|  |  |      const ServerAddressList& addresses) {
 | 
	
	
		
			
				|  | @@ -1268,9 +1322,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
 | 
	
		
			
				|  |  |    if (lb_channel_ == nullptr) {
 | 
	
		
			
				|  |  |      char* uri_str;
 | 
	
		
			
				|  |  |      gpr_asprintf(&uri_str, "fake:///%s", server_name_);
 | 
	
		
			
				|  |  | -    lb_channel_ = grpc_client_channel_factory_create_channel(
 | 
	
		
			
				|  |  | -        client_channel_factory(), uri_str,
 | 
	
		
			
				|  |  | -        GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
 | 
	
		
			
				|  |  | +    lb_channel_ = channel_control_helper()->CreateChannel(
 | 
	
		
			
				|  |  | +        uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, *lb_channel_args);
 | 
	
		
			
				|  |  |      GPR_ASSERT(lb_channel_ != nullptr);
 | 
	
		
			
				|  |  |      grpc_core::channelz::ChannelNode* channel_node =
 | 
	
		
			
				|  |  |          grpc_channel_get_channelz_node(lb_channel_);
 | 
	
	
		
			
				|  | @@ -1451,143 +1504,10 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -// PendingPick
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Adds lb_token of selected subchannel (address) to the call's initial
 | 
	
		
			
				|  |  | -// metadata.
 | 
	
		
			
				|  |  | -grpc_error* AddLbTokenToInitialMetadata(
 | 
	
		
			
				|  |  | -    grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | -    grpc_metadata_batch* initial_metadata) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
 | 
	
		
			
				|  |  | -  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | -                                      lb_token);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Destroy function used when embedding client stats in call context.
 | 
	
		
			
				|  |  | -void DestroyClientStats(void* arg) {
 | 
	
		
			
				|  |  | -  static_cast<GrpcLbClientStats*>(arg)->Unref();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
 | 
	
		
			
				|  |  | -  // If connected_subchannel is nullptr, no pick has been made by the RR
 | 
	
		
			
				|  |  | -  // policy (e.g., all addresses failed to connect). There won't be any
 | 
	
		
			
				|  |  | -  // LB token available.
 | 
	
		
			
				|  |  | -  if (pp->pick->connected_subchannel != nullptr) {
 | 
	
		
			
				|  |  | -    const grpc_arg* arg =
 | 
	
		
			
				|  |  | -        grpc_channel_args_find(pp->pick->connected_subchannel->args(),
 | 
	
		
			
				|  |  | -                               GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
 | 
	
		
			
				|  |  | -    if (arg != nullptr) {
 | 
	
		
			
				|  |  | -      grpc_mdelem lb_token = {
 | 
	
		
			
				|  |  | -          reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
 | 
	
		
			
				|  |  | -      AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
 | 
	
		
			
				|  |  | -                                  &pp->pick->lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | -                                  pp->pick->initial_metadata);
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -              "[grpclb %p] No LB token for connected subchannel pick %p",
 | 
	
		
			
				|  |  | -              pp->grpclb_policy, pp->pick);
 | 
	
		
			
				|  |  | -      abort();
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    // Pass on client stats via context. Passes ownership of the reference.
 | 
	
		
			
				|  |  | -    if (pp->client_stats != nullptr) {
 | 
	
		
			
				|  |  | -      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
 | 
	
		
			
				|  |  | -          pp->client_stats.release();
 | 
	
		
			
				|  |  | -      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
 | 
	
		
			
				|  |  | -          DestroyClientStats;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    pp->client_stats.reset();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -/* The \a on_complete closure passed as part of the pick requires keeping a
 | 
	
		
			
				|  |  | - * reference to its associated round robin instance. We wrap this closure in
 | 
	
		
			
				|  |  | - * order to unref the round robin instance upon its invocation */
 | 
	
		
			
				|  |  | -void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | -  PendingPick* pp = static_cast<PendingPick*>(arg);
 | 
	
		
			
				|  |  | -  PendingPickSetMetadataAndContext(pp);
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  Delete(pp);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
 | 
	
		
			
				|  |  | -  PendingPick* pp = New<PendingPick>();
 | 
	
		
			
				|  |  | -  pp->grpclb_policy = this;
 | 
	
		
			
				|  |  | -  pp->pick = pick;
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
 | 
	
		
			
				|  |  | -                    grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -  pp->original_on_complete = pick->on_complete;
 | 
	
		
			
				|  |  | -  pick->on_complete = &pp->on_complete;
 | 
	
		
			
				|  |  | -  return pp;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void GrpcLb::AddPendingPick(PendingPick* pp) {
 | 
	
		
			
				|  |  | -  pp->next = pending_picks_;
 | 
	
		
			
				|  |  | -  pending_picks_ = pp;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // code for interacting with the RR policy
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Performs a pick over \a rr_policy_. Given that a pick can return
 | 
	
		
			
				|  |  | -// immediately (ignoring its completion callback), we need to perform the
 | 
	
		
			
				|  |  | -// cleanups this callback would otherwise be responsible for.
 | 
	
		
			
				|  |  | -// If \a force_async is true, then we will manually schedule the
 | 
	
		
			
				|  |  | -// completion callback even if the pick is available immediately.
 | 
	
		
			
				|  |  | -bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
 | 
	
		
			
				|  |  | -                                            grpc_error** error) {
 | 
	
		
			
				|  |  | -  // Check for drops if we are not using fallback backend addresses.
 | 
	
		
			
				|  |  | -  if (serverlist_ != nullptr && serverlist_->num_servers > 0) {
 | 
	
		
			
				|  |  | -    // Look at the index into the serverlist to see if we should drop this call.
 | 
	
		
			
				|  |  | -    grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
 | 
	
		
			
				|  |  | -    if (serverlist_index_ == serverlist_->num_servers) {
 | 
	
		
			
				|  |  | -      serverlist_index_ = 0;  // Wrap-around.
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (server->drop) {
 | 
	
		
			
				|  |  | -      // Update client load reporting stats to indicate the number of
 | 
	
		
			
				|  |  | -      // dropped calls.  Note that we have to do this here instead of in
 | 
	
		
			
				|  |  | -      // the client_load_reporting filter, because we do not create a
 | 
	
		
			
				|  |  | -      // subchannel call (and therefore no client_load_reporting filter)
 | 
	
		
			
				|  |  | -      // for dropped calls.
 | 
	
		
			
				|  |  | -      if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
 | 
	
		
			
				|  |  | -        lb_calld_->client_stats()->AddCallDroppedLocked(
 | 
	
		
			
				|  |  | -            server->load_balance_token);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      if (force_async) {
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -        Delete(pp);
 | 
	
		
			
				|  |  | -        return false;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      Delete(pp);
 | 
	
		
			
				|  |  | -      return true;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Set client_stats.
 | 
	
		
			
				|  |  | -  if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
 | 
	
		
			
				|  |  | -    pp->client_stats = lb_calld_->client_stats()->Ref();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Pick via the RR policy.
 | 
	
		
			
				|  |  | -  bool pick_done = rr_policy_->PickLocked(pp->pick, error);
 | 
	
		
			
				|  |  | -  if (pick_done) {
 | 
	
		
			
				|  |  | -    PendingPickSetMetadataAndContext(pp);
 | 
	
		
			
				|  |  | -    if (force_async) {
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
 | 
	
		
			
				|  |  | -      *error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -      pick_done = false;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    Delete(pp);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // else, the pending pick will be registered and taken care of by the
 | 
	
		
			
				|  |  | -  // pending pick list inside the RR policy.  Eventually,
 | 
	
		
			
				|  |  | -  // OnPendingPickComplete() will be called, which will (among other
 | 
	
		
			
				|  |  | -  // things) add the LB token to the call's initial metadata.
 | 
	
		
			
				|  |  | -  return pick_done;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(rr_policy_ == nullptr);
 | 
	
		
			
				|  |  |    rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
 | 
	
	
		
			
				|  | @@ -1601,40 +1521,12 @@ void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
 | 
	
		
			
				|  |  |              rr_policy_.get());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // TODO(roth): We currently track this ref manually.  Once the new
 | 
	
		
			
				|  |  | -  // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
 | 
	
		
			
				|  |  | -  auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
 | 
	
		
			
				|  |  | -  self.release();
 | 
	
		
			
				|  |  | -  rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
 | 
	
		
			
				|  |  | -  grpc_error* rr_state_error = nullptr;
 | 
	
		
			
				|  |  | -  rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
 | 
	
		
			
				|  |  | -  // Connectivity state is a function of the RR policy updated/created.
 | 
	
		
			
				|  |  | -  UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
 | 
	
		
			
				|  |  |    // Add the gRPC LB's interested_parties pollset_set to that of the newly
 | 
	
		
			
				|  |  |    // created RR policy. This will make the RR policy progress upon activity on
 | 
	
		
			
				|  |  |    // gRPC LB, which in turn is tied to the application's call.
 | 
	
		
			
				|  |  |    grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
 | 
	
		
			
				|  |  |                                     interested_parties());
 | 
	
		
			
				|  |  | -  // Subscribe to changes to the connectivity of the new RR.
 | 
	
		
			
				|  |  | -  // TODO(roth): We currently track this ref manually.  Once the new
 | 
	
		
			
				|  |  | -  // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
 | 
	
		
			
				|  |  | -  self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
 | 
	
		
			
				|  |  | -  self.release();
 | 
	
		
			
				|  |  | -  rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
 | 
	
		
			
				|  |  | -                                        &on_rr_connectivity_changed_);
 | 
	
		
			
				|  |  |    rr_policy_->ExitIdleLocked();
 | 
	
		
			
				|  |  | -  // Send pending picks to RR policy.
 | 
	
		
			
				|  |  | -  PendingPick* pp;
 | 
	
		
			
				|  |  | -  while ((pp = pending_picks_)) {
 | 
	
		
			
				|  |  | -    pending_picks_ = pp->next;
 | 
	
		
			
				|  |  | -    if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -              "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
 | 
	
		
			
				|  |  | -              rr_policy_.get());
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
 | 
	
	
		
			
				|  | @@ -1642,7 +1534,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
 | 
	
		
			
				|  |  |    ServerAddressList* addresses = &tmp_addresses;
 | 
	
		
			
				|  |  |    bool is_backend_from_grpclb_load_balancer = false;
 | 
	
		
			
				|  |  |    if (serverlist_ != nullptr) {
 | 
	
		
			
				|  |  | -    tmp_addresses = ProcessServerlist(serverlist_);
 | 
	
		
			
				|  |  | +    tmp_addresses = serverlist_->GetServerAddressList();
 | 
	
		
			
				|  |  |      is_backend_from_grpclb_load_balancer = true;
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
 | 
	
	
		
			
				|  | @@ -1691,110 +1583,14 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      LoadBalancingPolicy::Args lb_policy_args;
 | 
	
		
			
				|  |  |      lb_policy_args.combiner = combiner();
 | 
	
		
			
				|  |  | -    lb_policy_args.client_channel_factory = client_channel_factory();
 | 
	
		
			
				|  |  |      lb_policy_args.args = args;
 | 
	
		
			
				|  |  | -    lb_policy_args.subchannel_pool = subchannel_pool()->Ref();
 | 
	
		
			
				|  |  | +    lb_policy_args.channel_control_helper =
 | 
	
		
			
				|  |  | +        UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
 | 
	
		
			
				|  |  |      CreateRoundRobinPolicyLocked(std::move(lb_policy_args));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    grpc_channel_args_destroy(args);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
 | 
	
		
			
				|  |  | -                                                   grpc_error* error) {
 | 
	
		
			
				|  |  | -  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
 | 
	
		
			
				|  |  | -  if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | -    gpr_log(
 | 
	
		
			
				|  |  | -        GPR_INFO,
 | 
	
		
			
				|  |  | -        "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
 | 
	
		
			
				|  |  | -        grpclb_policy, grpclb_policy->rr_policy_.get());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // If we are talking to a balancer, we expect to get updated addresses form
 | 
	
		
			
				|  |  | -  // the balancer, so we can ignore the re-resolution request from the RR
 | 
	
		
			
				|  |  | -  // policy. Otherwise, handle the re-resolution request using the
 | 
	
		
			
				|  |  | -  // grpclb policy's original re-resolution closure.
 | 
	
		
			
				|  |  | -  if (grpclb_policy->lb_calld_ == nullptr ||
 | 
	
		
			
				|  |  | -      !grpclb_policy->lb_calld_->seen_initial_response()) {
 | 
	
		
			
				|  |  | -    grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Give back the wrapper closure to the RR policy.
 | 
	
		
			
				|  |  | -  grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
 | 
	
		
			
				|  |  | -      &grpclb_policy->on_rr_request_reresolution_);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
 | 
	
		
			
				|  |  | -    grpc_error* rr_state_error) {
 | 
	
		
			
				|  |  | -  const grpc_connectivity_state curr_glb_state =
 | 
	
		
			
				|  |  | -      grpc_connectivity_state_check(&state_tracker_);
 | 
	
		
			
				|  |  | -  /* The new connectivity status is a function of the previous one and the new
 | 
	
		
			
				|  |  | -   * input coming from the status of the RR policy.
 | 
	
		
			
				|  |  | -   *
 | 
	
		
			
				|  |  | -   *  current state (grpclb's)
 | 
	
		
			
				|  |  | -   *  |
 | 
	
		
			
				|  |  | -   *  v  || I  |  C  |  R  |  TF  |  SD  |  <- new state (RR's)
 | 
	
		
			
				|  |  | -   *  ===++====+=====+=====+======+======+
 | 
	
		
			
				|  |  | -   *   I || I  |  C  |  R  | [I]  | [I]  |
 | 
	
		
			
				|  |  | -   *  ---++----+-----+-----+------+------+
 | 
	
		
			
				|  |  | -   *   C || I  |  C  |  R  | [C]  | [C]  |
 | 
	
		
			
				|  |  | -   *  ---++----+-----+-----+------+------+
 | 
	
		
			
				|  |  | -   *   R || I  |  C  |  R  | [R]  | [R]  |
 | 
	
		
			
				|  |  | -   *  ---++----+-----+-----+------+------+
 | 
	
		
			
				|  |  | -   *  TF || I  |  C  |  R  | [TF] | [TF] |
 | 
	
		
			
				|  |  | -   *  ---++----+-----+-----+------+------+
 | 
	
		
			
				|  |  | -   *  SD || NA |  NA |  NA |  NA  |  NA  | (*)
 | 
	
		
			
				|  |  | -   *  ---++----+-----+-----+------+------+
 | 
	
		
			
				|  |  | -   *
 | 
	
		
			
				|  |  | -   * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
 | 
	
		
			
				|  |  | -   * is the current state of grpclb, which is left untouched.
 | 
	
		
			
				|  |  | -   *
 | 
	
		
			
				|  |  | -   *  In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
 | 
	
		
			
				|  |  | -   *  the previous RR instance.
 | 
	
		
			
				|  |  | -   *
 | 
	
		
			
				|  |  | -   *  Note that the status is never updated to SHUTDOWN as a result of calling
 | 
	
		
			
				|  |  | -   *  this function. Only glb_shutdown() has the power to set that state.
 | 
	
		
			
				|  |  | -   *
 | 
	
		
			
				|  |  | -   *  (*) This function mustn't be called during shutting down. */
 | 
	
		
			
				|  |  | -  GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
 | 
	
		
			
				|  |  | -  switch (rr_connectivity_state_) {
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_TRANSIENT_FAILURE:
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_SHUTDOWN:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_IDLE:
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_CONNECTING:
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_READY:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (grpc_lb_glb_trace.enabled()) {
 | 
	
		
			
				|  |  | -    gpr_log(
 | 
	
		
			
				|  |  | -        GPR_INFO,
 | 
	
		
			
				|  |  | -        "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
 | 
	
		
			
				|  |  | -        this, grpc_connectivity_state_name(rr_connectivity_state_),
 | 
	
		
			
				|  |  | -        rr_policy_.get());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
 | 
	
		
			
				|  |  | -                              rr_state_error,
 | 
	
		
			
				|  |  | -                              "update_lb_connectivity_status_locked");
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
 | 
	
		
			
				|  |  | -                                                   grpc_error* error) {
 | 
	
		
			
				|  |  | -  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
 | 
	
		
			
				|  |  | -  if (grpclb_policy->shutting_down_) {
 | 
	
		
			
				|  |  | -    grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
 | 
	
		
			
				|  |  | -      GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
 | 
	
		
			
				|  |  | -  grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
 | 
	
		
			
				|  |  | -      &grpclb_policy->rr_connectivity_state_,
 | 
	
		
			
				|  |  | -      &grpclb_policy->on_rr_connectivity_changed_);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // factory
 | 
	
		
			
				|  |  |  //
 |