|  | @@ -125,31 +125,122 @@ class XdsClient::ChannelState::AdsCallState
 | 
	
		
			
				|  |  |    XdsClient* xds_client() const { return chand()->xds_client(); }
 | 
	
		
			
				|  |  |    bool seen_response() const { return seen_response_; }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // If \a type_url is an unsupported type, \a nonce_for_unsupported_type and
 | 
	
		
			
				|  |  | -  // \a error_for_unsupported_type will be used in the request; otherwise, the
 | 
	
		
			
				|  |  | -  // nonce and error stored in each ADS call state will be used. Takes ownership
 | 
	
		
			
				|  |  | -  // of \a error_for_unsupported_type.
 | 
	
		
			
				|  |  | -  void SendMessageLocked(const std::string& type_url,
 | 
	
		
			
				|  |  | -                         const std::string& nonce_for_unsupported_type,
 | 
	
		
			
				|  |  | -                         grpc_error* error_for_unsupported_type,
 | 
	
		
			
				|  |  | -                         bool is_first_message);
 | 
	
		
			
				|  |  | +  void Subscribe(const std::string& type_url, const std::string& name);
 | 
	
		
			
				|  |  | +  void Unsubscribe(const std::string& type_url, const std::string& name);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  bool HasSubscribedResources() const;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |   private:
 | 
	
		
			
				|  |  | -  struct BufferedRequest {
 | 
	
		
			
				|  |  | -    std::string nonce;
 | 
	
		
			
				|  |  | -    grpc_error* error;
 | 
	
		
			
				|  |  | +  class ResourceState : public InternallyRefCounted<ResourceState> {
 | 
	
		
			
				|  |  | +   public:
 | 
	
		
			
				|  |  | +    ResourceState(const std::string& type_url, const std::string& name)
 | 
	
		
			
				|  |  | +        : type_url_(type_url), name_(name) {
 | 
	
		
			
				|  |  | +      GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
 | 
	
		
			
				|  |  | +                        grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Orphan() override {
 | 
	
		
			
				|  |  | +      Finish();
 | 
	
		
			
				|  |  | +      Unref();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Start(RefCountedPtr<AdsCallState> ads_calld) {
 | 
	
		
			
				|  |  | +      if (sent_) return;
 | 
	
		
			
				|  |  | +      sent_ = true;
 | 
	
		
			
				|  |  | +      ads_calld_ = std::move(ads_calld);
 | 
	
		
			
				|  |  | +      Ref().release();
 | 
	
		
			
				|  |  | +      timer_pending_ = true;
 | 
	
		
			
				|  |  | +      grpc_timer_init(
 | 
	
		
			
				|  |  | +          &timer_,
 | 
	
		
			
				|  |  | +          ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
 | 
	
		
			
				|  |  | +          &timer_callback_);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    void Finish() {
 | 
	
		
			
				|  |  | +      if (timer_pending_) {
 | 
	
		
			
				|  |  | +        grpc_timer_cancel(&timer_);
 | 
	
		
			
				|  |  | +        timer_pending_ = false;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    // Takes ownership of \a error.
 | 
	
		
			
				|  |  | -    BufferedRequest(std::string nonce, grpc_error* error)
 | 
	
		
			
				|  |  | -        : nonce(std::move(nonce)), error(error) {}
 | 
	
		
			
				|  |  | +   private:
 | 
	
		
			
				|  |  | +    static void OnTimer(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +      ResourceState* self = static_cast<ResourceState*>(arg);
 | 
	
		
			
				|  |  | +      self->ads_calld_->xds_client()->combiner_->Run(
 | 
	
		
			
				|  |  | +          GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self,
 | 
	
		
			
				|  |  | +                            nullptr),
 | 
	
		
			
				|  |  | +          GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    ~BufferedRequest() { GRPC_ERROR_UNREF(error); }
 | 
	
		
			
				|  |  | +    static void OnTimerLocked(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +      ResourceState* self = static_cast<ResourceState*>(arg);
 | 
	
		
			
				|  |  | +      if (error == GRPC_ERROR_NONE && self->timer_pending_) {
 | 
	
		
			
				|  |  | +        self->timer_pending_ = false;
 | 
	
		
			
				|  |  | +        char* msg;
 | 
	
		
			
				|  |  | +        gpr_asprintf(
 | 
	
		
			
				|  |  | +            &msg,
 | 
	
		
			
				|  |  | +            "timeout obtaining resource {type=%s name=%s} from xds server",
 | 
	
		
			
				|  |  | +            self->type_url_.c_str(), self->name_.c_str());
 | 
	
		
			
				|  |  | +        grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
 | 
	
		
			
				|  |  | +        gpr_free(msg);
 | 
	
		
			
				|  |  | +        if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
 | 
	
		
			
				|  |  | +          gpr_log(GPR_INFO, "[xds_client %p] %s",
 | 
	
		
			
				|  |  | +                  self->ads_calld_->xds_client(), grpc_error_string(error));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) {
 | 
	
		
			
				|  |  | +          self->ads_calld_->xds_client()->service_config_watcher_->OnError(
 | 
	
		
			
				|  |  | +              error);
 | 
	
		
			
				|  |  | +        } else if (self->type_url_ == kCdsTypeUrl) {
 | 
	
		
			
				|  |  | +          ClusterState& state =
 | 
	
		
			
				|  |  | +              self->ads_calld_->xds_client()->cluster_map_[self->name_];
 | 
	
		
			
				|  |  | +          for (const auto& p : state.watchers) {
 | 
	
		
			
				|  |  | +            p.first->OnError(GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  | +          GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +        } else if (self->type_url_ == kEdsTypeUrl) {
 | 
	
		
			
				|  |  | +          EndpointState& state =
 | 
	
		
			
				|  |  | +              self->ads_calld_->xds_client()->endpoint_map_[self->name_];
 | 
	
		
			
				|  |  | +          for (const auto& p : state.watchers) {
 | 
	
		
			
				|  |  | +            p.first->OnError(GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  | +          GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +          GPR_UNREACHABLE_CODE(return );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      self->ads_calld_.reset();
 | 
	
		
			
				|  |  | +      self->Unref();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    const std::string type_url_;
 | 
	
		
			
				|  |  | +    const std::string name_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    RefCountedPtr<AdsCallState> ads_calld_;
 | 
	
		
			
				|  |  | +    bool sent_ = false;
 | 
	
		
			
				|  |  | +    bool timer_pending_ = false;
 | 
	
		
			
				|  |  | +    grpc_timer timer_;
 | 
	
		
			
				|  |  | +    grpc_closure timer_callback_;
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  void AcceptLdsUpdate(LdsUpdate lds_update, std::string new_version);
 | 
	
		
			
				|  |  | -  void AcceptRdsUpdate(RdsUpdate rds_update, std::string new_version);
 | 
	
		
			
				|  |  | -  void AcceptCdsUpdate(CdsUpdateMap cds_update_map, std::string new_version);
 | 
	
		
			
				|  |  | -  void AcceptEdsUpdate(EdsUpdateMap eds_update_map, std::string new_version);
 | 
	
		
			
				|  |  | +  struct ResourceTypeState {
 | 
	
		
			
				|  |  | +    ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Version, nonce, and error for this resource type.
 | 
	
		
			
				|  |  | +    std::string version;
 | 
	
		
			
				|  |  | +    std::string nonce;
 | 
	
		
			
				|  |  | +    grpc_error* error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // Subscribed resources of this type.
 | 
	
		
			
				|  |  | +    std::map<std::string /* name */, OrphanablePtr<ResourceState>>
 | 
	
		
			
				|  |  | +        subscribed_resources;
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void SendMessageLocked(const std::string& type_url);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  void AcceptLdsUpdate(LdsUpdate lds_update);
 | 
	
		
			
				|  |  | +  void AcceptRdsUpdate(RdsUpdate rds_update);
 | 
	
		
			
				|  |  | +  void AcceptCdsUpdate(CdsUpdateMap cds_update_map);
 | 
	
		
			
				|  |  | +  void AcceptEdsUpdate(EdsUpdateMap eds_update_map);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    static void OnRequestSent(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  |    static void OnRequestSentLocked(void* arg, grpc_error* error);
 | 
	
	
		
			
				|  | @@ -160,8 +251,13 @@ class XdsClient::ChannelState::AdsCallState
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    bool IsCurrentCallOnChannel() const;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  std::set<StringView> ClusterNamesForRequest();
 | 
	
		
			
				|  |  | +  std::set<StringView> EdsServiceNamesForRequest();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    // The owning RetryableCall<>.
 | 
	
		
			
				|  |  |    RefCountedPtr<RetryableCall<AdsCallState>> parent_;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  bool sent_initial_message_ = false;
 | 
	
		
			
				|  |  |    bool seen_response_ = false;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Always non-NULL.
 | 
	
	
		
			
				|  | @@ -184,15 +280,11 @@ class XdsClient::ChannelState::AdsCallState
 | 
	
		
			
				|  |  |    grpc_slice status_details_;
 | 
	
		
			
				|  |  |    grpc_closure on_status_received_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Version state.
 | 
	
		
			
				|  |  | -  VersionState lds_version_;
 | 
	
		
			
				|  |  | -  VersionState rds_version_;
 | 
	
		
			
				|  |  | -  VersionState cds_version_;
 | 
	
		
			
				|  |  | -  VersionState eds_version_;
 | 
	
		
			
				|  |  | +  // Resource types for which requests need to be sent.
 | 
	
		
			
				|  |  | +  std::set<std::string /*type_url*/> buffered_requests_;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Buffered requests.
 | 
	
		
			
				|  |  | -  std::map<std::string /*type_url*/, std::unique_ptr<BufferedRequest>>
 | 
	
		
			
				|  |  | -      buffered_request_map_;
 | 
	
		
			
				|  |  | +  // State for each resource type.
 | 
	
		
			
				|  |  | +  std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Contains an LRS call to the xds server.
 | 
	
	
		
			
				|  | @@ -445,31 +537,30 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
 | 
	
		
			
				|  |  |    grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void XdsClient::ChannelState::OnResourceNamesChanged(
 | 
	
		
			
				|  |  | -    const std::string& type_url) {
 | 
	
		
			
				|  |  | +void XdsClient::ChannelState::Subscribe(const std::string& type_url,
 | 
	
		
			
				|  |  | +                                        const std::string& name) {
 | 
	
		
			
				|  |  |    if (ads_calld_ == nullptr) {
 | 
	
		
			
				|  |  |      // Start the ADS call if this is the first request.
 | 
	
		
			
				|  |  |      ads_calld_.reset(new RetryableCall<AdsCallState>(
 | 
	
		
			
				|  |  |          Ref(DEBUG_LOCATION, "ChannelState+ads")));
 | 
	
		
			
				|  |  | -    // Note: AdsCallState's ctor will automatically send necessary messages, so
 | 
	
		
			
				|  |  | -    // we can return here.
 | 
	
		
			
				|  |  | +    // Note: AdsCallState's ctor will automatically subscribe to all
 | 
	
		
			
				|  |  | +    // resources that the XdsClient already has watchers for, so we can
 | 
	
		
			
				|  |  | +    // return here.
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    // If the ADS call is in backoff state, we don't need to do anything now
 | 
	
		
			
				|  |  |    // because when the call is restarted it will resend all necessary requests.
 | 
	
		
			
				|  |  |    if (ads_calld() == nullptr) return;
 | 
	
		
			
				|  |  | -  // Send the message if the ADS call is active.
 | 
	
		
			
				|  |  | -  ads_calld()->SendMessageLocked(type_url, "", nullptr, false);
 | 
	
		
			
				|  |  | +  // Subscribe to this resource if the ADS call is active.
 | 
	
		
			
				|  |  | +  ads_calld()->Subscribe(type_url, name);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void XdsClient::ChannelState::OnWatcherRemoved() {
 | 
	
		
			
				|  |  | -  // Keep the ADS call if there are watcher(s).
 | 
	
		
			
				|  |  | -  for (const auto& p : xds_client()->cluster_map_) {
 | 
	
		
			
				|  |  | -    const ClusterState& cluster_state = p.second;
 | 
	
		
			
				|  |  | -    if (!cluster_state.watchers.empty()) return;
 | 
	
		
			
				|  |  | +void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
 | 
	
		
			
				|  |  | +                                          const std::string& name) {
 | 
	
		
			
				|  |  | +  if (ads_calld_ != nullptr) {
 | 
	
		
			
				|  |  | +    ads_calld_->calld()->Unsubscribe(type_url, name);
 | 
	
		
			
				|  |  | +    if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (!xds_client()->endpoint_map_.empty()) return;
 | 
	
		
			
				|  |  | -  ads_calld_.reset();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -620,22 +711,14 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
 | 
	
		
			
				|  |  |    // Op: send request message.
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -  bool initial_message = true;
 | 
	
		
			
				|  |  |    if (xds_client()->service_config_watcher_ != nullptr) {
 | 
	
		
			
				|  |  | -    if (xds_client()->route_config_name_.empty()) {
 | 
	
		
			
				|  |  | -      SendMessageLocked(kLdsTypeUrl, "", nullptr, initial_message);
 | 
	
		
			
				|  |  | -      initial_message = false;
 | 
	
		
			
				|  |  | -    } else if (xds_client()->cluster_name_.empty()) {
 | 
	
		
			
				|  |  | -      SendMessageLocked(kRdsTypeUrl, "", nullptr, initial_message);
 | 
	
		
			
				|  |  | -      initial_message = false;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    Subscribe(kLdsTypeUrl, xds_client()->server_name_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (!xds_client()->cluster_map_.empty()) {
 | 
	
		
			
				|  |  | -    SendMessageLocked(kCdsTypeUrl, "", nullptr, initial_message);
 | 
	
		
			
				|  |  | -    initial_message = false;
 | 
	
		
			
				|  |  | +  for (const auto& p : xds_client()->cluster_map_) {
 | 
	
		
			
				|  |  | +    Subscribe(kCdsTypeUrl, std::string(p.first));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (!xds_client()->endpoint_map_.empty()) {
 | 
	
		
			
				|  |  | -    SendMessageLocked(kEdsTypeUrl, "", nullptr, initial_message);
 | 
	
		
			
				|  |  | +  for (const auto& p : xds_client()->endpoint_map_) {
 | 
	
		
			
				|  |  | +    Subscribe(kEdsTypeUrl, std::string(p.first));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    // Op: recv initial metadata.
 | 
	
		
			
				|  |  |    op = ops;
 | 
	
	
		
			
				|  | @@ -693,51 +776,49 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
 | 
	
		
			
				|  |  |    // we are here because xds_client has to orphan a failed call, then the
 | 
	
		
			
				|  |  |    // following cancellation will be a no-op.
 | 
	
		
			
				|  |  |    grpc_call_cancel(call_, nullptr);
 | 
	
		
			
				|  |  | +  state_map_.clear();
 | 
	
		
			
				|  |  |    // Note that the initial ref is hold by on_status_received_. So the
 | 
	
		
			
				|  |  |    // corresponding unref happens in on_status_received_ instead of here.
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
 | 
	
		
			
				|  |  | -    const std::string& type_url, const std::string& nonce_for_unsupported_type,
 | 
	
		
			
				|  |  | -    grpc_error* error_for_unsupported_type, bool is_first_message) {
 | 
	
		
			
				|  |  | +    const std::string& type_url) {
 | 
	
		
			
				|  |  |    // Buffer message sending if an existing message is in flight.
 | 
	
		
			
				|  |  |    if (send_message_payload_ != nullptr) {
 | 
	
		
			
				|  |  | -    buffered_request_map_[type_url].reset(new BufferedRequest(
 | 
	
		
			
				|  |  | -        nonce_for_unsupported_type, error_for_unsupported_type));
 | 
	
		
			
				|  |  | +    buffered_requests_.insert(type_url);
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_slice request_payload_slice;
 | 
	
		
			
				|  |  | +  auto& state = state_map_[type_url];
 | 
	
		
			
				|  |  | +  grpc_error* error = state.error;
 | 
	
		
			
				|  |  | +  state.error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    const XdsBootstrap::Node* node =
 | 
	
		
			
				|  |  | -      is_first_message ? xds_client()->bootstrap_->node() : nullptr;
 | 
	
		
			
				|  |  | +      sent_initial_message_ ? nullptr : xds_client()->bootstrap_->node();
 | 
	
		
			
				|  |  |    const char* build_version =
 | 
	
		
			
				|  |  | -      is_first_message ? xds_client()->build_version_.get() : nullptr;
 | 
	
		
			
				|  |  | +      sent_initial_message_ ? nullptr : xds_client()->build_version_.get();
 | 
	
		
			
				|  |  | +  sent_initial_message_ = true;
 | 
	
		
			
				|  |  | +  grpc_slice request_payload_slice;
 | 
	
		
			
				|  |  |    if (type_url == kLdsTypeUrl) {
 | 
	
		
			
				|  |  |      request_payload_slice = XdsLdsRequestCreateAndEncode(
 | 
	
		
			
				|  |  | -        xds_client()->server_name_, node, build_version,
 | 
	
		
			
				|  |  | -        lds_version_.version_info, lds_version_.nonce, lds_version_.error);
 | 
	
		
			
				|  |  | -    lds_version_.error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(error_for_unsupported_type);
 | 
	
		
			
				|  |  | +        xds_client()->server_name_, node, build_version, state.version,
 | 
	
		
			
				|  |  | +        state.nonce, error);
 | 
	
		
			
				|  |  | +    state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
 | 
	
		
			
				|  |  |    } else if (type_url == kRdsTypeUrl) {
 | 
	
		
			
				|  |  |      request_payload_slice = XdsRdsRequestCreateAndEncode(
 | 
	
		
			
				|  |  | -        xds_client()->route_config_name_, node, build_version,
 | 
	
		
			
				|  |  | -        rds_version_.version_info, rds_version_.nonce, rds_version_.error);
 | 
	
		
			
				|  |  | -    rds_version_.error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(error_for_unsupported_type);
 | 
	
		
			
				|  |  | +        xds_client()->route_config_name_, node, build_version, state.version,
 | 
	
		
			
				|  |  | +        state.nonce, error);
 | 
	
		
			
				|  |  | +    state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
 | 
	
		
			
				|  |  |    } else if (type_url == kCdsTypeUrl) {
 | 
	
		
			
				|  |  |      request_payload_slice = XdsCdsRequestCreateAndEncode(
 | 
	
		
			
				|  |  | -        xds_client()->WatchedClusterNames(), node, build_version,
 | 
	
		
			
				|  |  | -        cds_version_.version_info, cds_version_.nonce, cds_version_.error);
 | 
	
		
			
				|  |  | -    cds_version_.error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(error_for_unsupported_type);
 | 
	
		
			
				|  |  | +        ClusterNamesForRequest(), node, build_version, state.version,
 | 
	
		
			
				|  |  | +        state.nonce, error);
 | 
	
		
			
				|  |  |    } else if (type_url == kEdsTypeUrl) {
 | 
	
		
			
				|  |  |      request_payload_slice = XdsEdsRequestCreateAndEncode(
 | 
	
		
			
				|  |  | -        xds_client()->EdsServiceNames(), node, build_version,
 | 
	
		
			
				|  |  | -        eds_version_.version_info, eds_version_.nonce, eds_version_.error);
 | 
	
		
			
				|  |  | -    eds_version_.error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(error_for_unsupported_type);
 | 
	
		
			
				|  |  | +        EdsServiceNamesForRequest(), node, build_version, state.version,
 | 
	
		
			
				|  |  | +        state.nonce, error);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode(
 | 
	
		
			
				|  |  | -        type_url, nonce_for_unsupported_type, error_for_unsupported_type);
 | 
	
		
			
				|  |  | +        type_url, state.nonce, state.error);
 | 
	
		
			
				|  |  | +    state_map_.erase(type_url);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    // Create message payload.
 | 
	
		
			
				|  |  |    send_message_payload_ =
 | 
	
	
		
			
				|  | @@ -761,8 +842,30 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +void XdsClient::ChannelState::AdsCallState::Subscribe(
 | 
	
		
			
				|  |  | +    const std::string& type_url, const std::string& name) {
 | 
	
		
			
				|  |  | +  auto& state = state_map_[type_url].subscribed_resources[name];
 | 
	
		
			
				|  |  | +  if (state == nullptr) {
 | 
	
		
			
				|  |  | +    state = MakeOrphanable<ResourceState>(type_url, name);
 | 
	
		
			
				|  |  | +    SendMessageLocked(type_url);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void XdsClient::ChannelState::AdsCallState::Unsubscribe(
 | 
	
		
			
				|  |  | +    const std::string& type_url, const std::string& name) {
 | 
	
		
			
				|  |  | +  state_map_[type_url].subscribed_resources.erase(name);
 | 
	
		
			
				|  |  | +  SendMessageLocked(type_url);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
 | 
	
		
			
				|  |  | +  for (const auto& p : state_map_) {
 | 
	
		
			
				|  |  | +    if (!p.second.subscribed_resources.empty()) return true;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return false;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
 | 
	
		
			
				|  |  | -    LdsUpdate lds_update, std::string new_version) {
 | 
	
		
			
				|  |  | +    LdsUpdate lds_update) {
 | 
	
		
			
				|  |  |    const std::string& cluster_name =
 | 
	
		
			
				|  |  |        lds_update.rds_update.has_value()
 | 
	
		
			
				|  |  |            ? lds_update.rds_update.value().cluster_name
 | 
	
	
		
			
				|  | @@ -775,6 +878,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
 | 
	
		
			
				|  |  |              xds_client(), lds_update.route_config_name.c_str(),
 | 
	
		
			
				|  |  |              cluster_name.c_str());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  auto& lds_state = state_map_[kLdsTypeUrl];
 | 
	
		
			
				|  |  | +  auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
 | 
	
		
			
				|  |  | +  if (state != nullptr) state->Finish();
 | 
	
		
			
				|  |  |    // Ignore identical update.
 | 
	
		
			
				|  |  |    if (xds_client()->route_config_name_ == lds_update.route_config_name &&
 | 
	
		
			
				|  |  |        xds_client()->cluster_name_ == cluster_name) {
 | 
	
	
		
			
				|  | @@ -802,19 +908,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // Send RDS request for dynamic resolution.
 | 
	
		
			
				|  |  | -    SendMessageLocked(kRdsTypeUrl, "", nullptr, false);
 | 
	
		
			
				|  |  | +    Subscribe(kRdsTypeUrl, xds_client()->route_config_name_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  lds_version_.version_info = std::move(new_version);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
 | 
	
		
			
				|  |  | -    RdsUpdate rds_update, std::string new_version) {
 | 
	
		
			
				|  |  | +    RdsUpdate rds_update) {
 | 
	
		
			
				|  |  |    if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |              "[xds_client %p] RDS update received: "
 | 
	
		
			
				|  |  |              "cluster_name=%s",
 | 
	
		
			
				|  |  |              xds_client(), rds_update.cluster_name.c_str());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  auto& rds_state = state_map_[kRdsTypeUrl];
 | 
	
		
			
				|  |  | +  auto& state =
 | 
	
		
			
				|  |  | +      rds_state.subscribed_resources[xds_client()->route_config_name_];
 | 
	
		
			
				|  |  | +  if (state != nullptr) state->Finish();
 | 
	
		
			
				|  |  |    // Ignore identical update.
 | 
	
		
			
				|  |  |    if (xds_client()->cluster_name_ == rds_update.cluster_name) {
 | 
	
		
			
				|  |  |      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
 | 
	
	
		
			
				|  | @@ -835,14 +944,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      xds_client()->service_config_watcher_->OnError(error);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  rds_version_.version_info = std::move(new_version);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
 | 
	
		
			
				|  |  | -    CdsUpdateMap cds_update_map, std::string new_version) {
 | 
	
		
			
				|  |  | +    CdsUpdateMap cds_update_map) {
 | 
	
		
			
				|  |  | +  auto& cds_state = state_map_[kCdsTypeUrl];
 | 
	
		
			
				|  |  |    for (auto& p : cds_update_map) {
 | 
	
		
			
				|  |  |      const char* cluster_name = p.first.c_str();
 | 
	
		
			
				|  |  |      CdsUpdate& cds_update = p.second;
 | 
	
		
			
				|  |  | +    auto& state = cds_state.subscribed_resources[cluster_name];
 | 
	
		
			
				|  |  | +    if (state != nullptr) state->Finish();
 | 
	
		
			
				|  |  |      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                "[xds_client %p] CDS update (cluster=%s) received: "
 | 
	
	
		
			
				|  | @@ -875,14 +986,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
 | 
	
		
			
				|  |  |        p.first->OnClusterChanged(cluster_state.update.value());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  cds_version_.version_info = std::move(new_version);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
 | 
	
		
			
				|  |  | -    EdsUpdateMap eds_update_map, std::string new_version) {
 | 
	
		
			
				|  |  | +    EdsUpdateMap eds_update_map) {
 | 
	
		
			
				|  |  | +  auto& eds_state = state_map_[kEdsTypeUrl];
 | 
	
		
			
				|  |  |    for (auto& p : eds_update_map) {
 | 
	
		
			
				|  |  |      const char* eds_service_name = p.first.c_str();
 | 
	
		
			
				|  |  |      EdsUpdate& eds_update = p.second;
 | 
	
		
			
				|  |  | +    auto& state = eds_state.subscribed_resources[eds_service_name];
 | 
	
		
			
				|  |  | +    if (state != nullptr) state->Finish();
 | 
	
		
			
				|  |  |      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                "[xds_client %p] EDS response with %" PRIuPTR
 | 
	
	
		
			
				|  | @@ -956,7 +1069,6 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
 | 
	
		
			
				|  |  |        p.first->OnEndpointChanged(endpoint_state.update);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  eds_version_.version_info = std::move(new_version);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
 | 
	
	
		
			
				|  | @@ -977,22 +1089,17 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
 | 
	
		
			
				|  |  |      self->send_message_payload_ = nullptr;
 | 
	
		
			
				|  |  |      // Continue to send another pending message if any.
 | 
	
		
			
				|  |  |      // TODO(roth): The current code to handle buffered messages has the
 | 
	
		
			
				|  |  | -    // advantage of sending only the most recent list of resource names for each
 | 
	
		
			
				|  |  | -    // resource type (no matter how many times that resource type has been
 | 
	
		
			
				|  |  | -    // requested to send while the current message sending is still pending).
 | 
	
		
			
				|  |  | -    // But its disadvantage is that we send the requests in fixed order of
 | 
	
		
			
				|  |  | -    // resource types. We need to fix this if we are seeing some resource
 | 
	
		
			
				|  |  | -    // type(s) starved due to frequent requests of other resource type(s).
 | 
	
		
			
				|  |  | -    for (auto& p : self->buffered_request_map_) {
 | 
	
		
			
				|  |  | -      const std::string& type_url = p.first;
 | 
	
		
			
				|  |  | -      std::unique_ptr<BufferedRequest>& buffered_request = p.second;
 | 
	
		
			
				|  |  | -      if (buffered_request != nullptr) {
 | 
	
		
			
				|  |  | -        self->SendMessageLocked(type_url, buffered_request->nonce,
 | 
	
		
			
				|  |  | -                                buffered_request->error, false);
 | 
	
		
			
				|  |  | -        buffered_request->error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -        buffered_request.reset();
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +    // advantage of sending only the most recent list of resource names for
 | 
	
		
			
				|  |  | +    // each resource type (no matter how many times that resource type has
 | 
	
		
			
				|  |  | +    // been requested to send while the current message sending is still
 | 
	
		
			
				|  |  | +    // pending). But its disadvantage is that we send the requests in fixed
 | 
	
		
			
				|  |  | +    // order of resource types. We need to fix this if we are seeing some
 | 
	
		
			
				|  |  | +    // resource type(s) starved due to frequent requests of other resource
 | 
	
		
			
				|  |  | +    // type(s).
 | 
	
		
			
				|  |  | +    auto it = self->buffered_requests_.begin();
 | 
	
		
			
				|  |  | +    if (it != self->buffered_requests_.end()) {
 | 
	
		
			
				|  |  | +      self->SendMessageLocked(*it);
 | 
	
		
			
				|  |  | +      self->buffered_requests_.erase(it);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
 | 
	
	
		
			
				|  | @@ -1043,8 +1150,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
 | 
	
		
			
				|  |  |    // Note that XdsAdsResponseDecodeAndParse() also validate the response.
 | 
	
		
			
				|  |  |    grpc_error* parse_error = XdsAdsResponseDecodeAndParse(
 | 
	
		
			
				|  |  |        response_slice, xds_client->server_name_, xds_client->route_config_name_,
 | 
	
		
			
				|  |  | -      xds_client->EdsServiceNames(), &lds_update, &rds_update, &cds_update_map,
 | 
	
		
			
				|  |  | -      &eds_update_map, &version, &nonce, &type_url);
 | 
	
		
			
				|  |  | +      ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
 | 
	
		
			
				|  |  | +      &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
 | 
	
		
			
				|  |  |    grpc_slice_unref_internal(response_slice);
 | 
	
		
			
				|  |  |    if (type_url.empty()) {
 | 
	
		
			
				|  |  |      // Ignore unparsable response.
 | 
	
	
		
			
				|  | @@ -1052,48 +1159,34 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
 | 
	
		
			
				|  |  |              xds_client, grpc_error_string(parse_error));
 | 
	
		
			
				|  |  |      GRPC_ERROR_UNREF(parse_error);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    // Update nonce and error.
 | 
	
		
			
				|  |  | -    if (type_url == kLdsTypeUrl) {
 | 
	
		
			
				|  |  | -      ads_calld->lds_version_.nonce = nonce;
 | 
	
		
			
				|  |  | -      GRPC_ERROR_UNREF(ads_calld->lds_version_.error);
 | 
	
		
			
				|  |  | -      ads_calld->lds_version_.error = GRPC_ERROR_REF(parse_error);
 | 
	
		
			
				|  |  | -    } else if (type_url == kRdsTypeUrl) {
 | 
	
		
			
				|  |  | -      ads_calld->rds_version_.nonce = nonce;
 | 
	
		
			
				|  |  | -      GRPC_ERROR_UNREF(ads_calld->rds_version_.error);
 | 
	
		
			
				|  |  | -      ads_calld->rds_version_.error = GRPC_ERROR_REF(parse_error);
 | 
	
		
			
				|  |  | -    } else if (type_url == kCdsTypeUrl) {
 | 
	
		
			
				|  |  | -      ads_calld->cds_version_.nonce = nonce;
 | 
	
		
			
				|  |  | -      GRPC_ERROR_UNREF(ads_calld->cds_version_.error);
 | 
	
		
			
				|  |  | -      ads_calld->cds_version_.error = GRPC_ERROR_REF(parse_error);
 | 
	
		
			
				|  |  | -    } else if (type_url == kEdsTypeUrl) {
 | 
	
		
			
				|  |  | -      ads_calld->eds_version_.nonce = nonce;
 | 
	
		
			
				|  |  | -      GRPC_ERROR_UNREF(ads_calld->eds_version_.error);
 | 
	
		
			
				|  |  | -      ads_calld->eds_version_.error = GRPC_ERROR_REF(parse_error);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    // Update nonce.
 | 
	
		
			
				|  |  | +    auto& state = ads_calld->state_map_[type_url];
 | 
	
		
			
				|  |  | +    state.nonce = std::move(nonce);
 | 
	
		
			
				|  |  |      // NACK or ACK the response.
 | 
	
		
			
				|  |  |      if (parse_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(state.error);
 | 
	
		
			
				|  |  | +      state.error = parse_error;
 | 
	
		
			
				|  |  |        // NACK unacceptable update.
 | 
	
		
			
				|  |  |        gpr_log(
 | 
	
		
			
				|  |  |            GPR_ERROR,
 | 
	
		
			
				|  |  |            "[xds_client %p] ADS response can't be accepted, NACKing. error=%s",
 | 
	
		
			
				|  |  |            xds_client, grpc_error_string(parse_error));
 | 
	
		
			
				|  |  | -      ads_calld->SendMessageLocked(type_url, nonce, parse_error, false);
 | 
	
		
			
				|  |  | +      ads_calld->SendMessageLocked(type_url);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        ads_calld->seen_response_ = true;
 | 
	
		
			
				|  |  |        // Accept the ADS response according to the type_url.
 | 
	
		
			
				|  |  |        if (type_url == kLdsTypeUrl) {
 | 
	
		
			
				|  |  | -        ads_calld->AcceptLdsUpdate(std::move(lds_update), std::move(version));
 | 
	
		
			
				|  |  | +        ads_calld->AcceptLdsUpdate(std::move(lds_update));
 | 
	
		
			
				|  |  |        } else if (type_url == kRdsTypeUrl) {
 | 
	
		
			
				|  |  | -        ads_calld->AcceptRdsUpdate(std::move(rds_update), std::move(version));
 | 
	
		
			
				|  |  | +        ads_calld->AcceptRdsUpdate(std::move(rds_update));
 | 
	
		
			
				|  |  |        } else if (type_url == kCdsTypeUrl) {
 | 
	
		
			
				|  |  | -        ads_calld->AcceptCdsUpdate(std::move(cds_update_map),
 | 
	
		
			
				|  |  | -                                   std::move(version));
 | 
	
		
			
				|  |  | +        ads_calld->AcceptCdsUpdate(std::move(cds_update_map));
 | 
	
		
			
				|  |  |        } else if (type_url == kEdsTypeUrl) {
 | 
	
		
			
				|  |  | -        ads_calld->AcceptEdsUpdate(std::move(eds_update_map),
 | 
	
		
			
				|  |  | -                                   std::move(version));
 | 
	
		
			
				|  |  | +        ads_calld->AcceptEdsUpdate(std::move(eds_update_map));
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | +      state.version = std::move(version);
 | 
	
		
			
				|  |  |        // ACK the update.
 | 
	
		
			
				|  |  | -      ads_calld->SendMessageLocked(type_url, nonce, nullptr, false);
 | 
	
		
			
				|  |  | +      ads_calld->SendMessageLocked(type_url);
 | 
	
		
			
				|  |  |        // Start load reporting if needed.
 | 
	
		
			
				|  |  |        auto& lrs_call = ads_calld->chand()->lrs_calld_;
 | 
	
		
			
				|  |  |        if (lrs_call != nullptr) {
 | 
	
	
		
			
				|  | @@ -1164,6 +1257,28 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
 | 
	
		
			
				|  |  |    return this == chand()->ads_calld_->calld();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +std::set<StringView>
 | 
	
		
			
				|  |  | +XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
 | 
	
		
			
				|  |  | +  std::set<StringView> cluster_names;
 | 
	
		
			
				|  |  | +  for (auto& p : state_map_[kCdsTypeUrl].subscribed_resources) {
 | 
	
		
			
				|  |  | +    cluster_names.insert(p.first);
 | 
	
		
			
				|  |  | +    OrphanablePtr<ResourceState>& state = p.second;
 | 
	
		
			
				|  |  | +    state->Start(Ref());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return cluster_names;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +std::set<StringView>
 | 
	
		
			
				|  |  | +XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() {
 | 
	
		
			
				|  |  | +  std::set<StringView> eds_names;
 | 
	
		
			
				|  |  | +  for (auto& p : state_map_[kEdsTypeUrl].subscribed_resources) {
 | 
	
		
			
				|  |  | +    eds_names.insert(p.first);
 | 
	
		
			
				|  |  | +    OrphanablePtr<ResourceState>& state = p.second;
 | 
	
		
			
				|  |  | +    state->Start(Ref());
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return eds_names;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  // XdsClient::ChannelState::LrsCallState::Reporter
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -1584,6 +1699,12 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
 | 
	
		
			
				|  |  | +  return grpc_channel_args_find_integer(
 | 
	
		
			
				|  |  | +      &args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
 | 
	
		
			
				|  |  | +      {15000, 0, INT_MAX});
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  UniquePtr<char> GenerateBuildVersionString() {
 | 
	
		
			
				|  |  |    char* build_version_str;
 | 
	
		
			
				|  |  |    gpr_asprintf(&build_version_str, "gRPC C-core %s %s", grpc_version_string(),
 | 
	
	
		
			
				|  | @@ -1598,6 +1719,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
 | 
	
		
			
				|  |  |                       std::unique_ptr<ServiceConfigWatcherInterface> watcher,
 | 
	
		
			
				|  |  |                       const grpc_channel_args& channel_args, grpc_error** error)
 | 
	
		
			
				|  |  |      : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
 | 
	
		
			
				|  |  | +      request_timeout_(GetRequestTimeout(channel_args)),
 | 
	
		
			
				|  |  |        build_version_(GenerateBuildVersionString()),
 | 
	
		
			
				|  |  |        combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
 | 
	
		
			
				|  |  |        interested_parties_(interested_parties),
 | 
	
	
		
			
				|  | @@ -1618,7 +1740,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
 | 
	
		
			
				|  |  |    chand_ = MakeOrphanable<ChannelState>(
 | 
	
		
			
				|  |  |        Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args);
 | 
	
		
			
				|  |  |    if (service_config_watcher_ != nullptr) {
 | 
	
		
			
				|  |  | -    chand_->OnResourceNamesChanged(kLdsTypeUrl);
 | 
	
		
			
				|  |  | +    chand_->Subscribe(kLdsTypeUrl, std::string(server_name));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1634,8 +1756,8 @@ void XdsClient::Orphan() {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::WatchClusterData(
 | 
	
		
			
				|  |  |      StringView cluster_name, std::unique_ptr<ClusterWatcherInterface> watcher) {
 | 
	
		
			
				|  |  | -  const bool new_name = cluster_map_.find(cluster_name) == cluster_map_.end();
 | 
	
		
			
				|  |  | -  ClusterState& cluster_state = cluster_map_[cluster_name];
 | 
	
		
			
				|  |  | +  std::string cluster_name_str = std::string(cluster_name);
 | 
	
		
			
				|  |  | +  ClusterState& cluster_state = cluster_map_[cluster_name_str];
 | 
	
		
			
				|  |  |    ClusterWatcherInterface* w = watcher.get();
 | 
	
		
			
				|  |  |    cluster_state.watchers[w] = std::move(watcher);
 | 
	
		
			
				|  |  |    // If we've already received an CDS update, notify the new watcher
 | 
	
	
		
			
				|  | @@ -1643,30 +1765,29 @@ void XdsClient::WatchClusterData(
 | 
	
		
			
				|  |  |    if (cluster_state.update.has_value()) {
 | 
	
		
			
				|  |  |      w->OnClusterChanged(cluster_state.update.value());
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (new_name) chand_->OnResourceNamesChanged(kCdsTypeUrl);
 | 
	
		
			
				|  |  | +  chand_->Subscribe(kCdsTypeUrl, cluster_name_str);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::CancelClusterDataWatch(StringView cluster_name,
 | 
	
		
			
				|  |  |                                         ClusterWatcherInterface* watcher) {
 | 
	
		
			
				|  |  |    if (shutting_down_) return;
 | 
	
		
			
				|  |  | -  ClusterState& cluster_state = cluster_map_[cluster_name];
 | 
	
		
			
				|  |  | +  std::string cluster_name_str = std::string(cluster_name);
 | 
	
		
			
				|  |  | +  ClusterState& cluster_state = cluster_map_[cluster_name_str];
 | 
	
		
			
				|  |  |    auto it = cluster_state.watchers.find(watcher);
 | 
	
		
			
				|  |  |    if (it != cluster_state.watchers.end()) {
 | 
	
		
			
				|  |  |      cluster_state.watchers.erase(it);
 | 
	
		
			
				|  |  |      if (cluster_state.watchers.empty()) {
 | 
	
		
			
				|  |  | -      cluster_map_.erase(cluster_name);
 | 
	
		
			
				|  |  | -      chand_->OnResourceNamesChanged(kCdsTypeUrl);
 | 
	
		
			
				|  |  | +      cluster_map_.erase(cluster_name_str);
 | 
	
		
			
				|  |  | +      chand_->Unsubscribe(kCdsTypeUrl, cluster_name_str);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  chand_->OnWatcherRemoved();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::WatchEndpointData(
 | 
	
		
			
				|  |  |      StringView eds_service_name,
 | 
	
		
			
				|  |  |      std::unique_ptr<EndpointWatcherInterface> watcher) {
 | 
	
		
			
				|  |  | -  const bool new_name =
 | 
	
		
			
				|  |  | -      endpoint_map_.find(eds_service_name) == endpoint_map_.end();
 | 
	
		
			
				|  |  | -  EndpointState& endpoint_state = endpoint_map_[eds_service_name];
 | 
	
		
			
				|  |  | +  std::string eds_service_name_str = std::string(eds_service_name);
 | 
	
		
			
				|  |  | +  EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
 | 
	
		
			
				|  |  |    EndpointWatcherInterface* w = watcher.get();
 | 
	
		
			
				|  |  |    endpoint_state.watchers[w] = std::move(watcher);
 | 
	
		
			
				|  |  |    // If we've already received an EDS update, notify the new watcher
 | 
	
	
		
			
				|  | @@ -1674,28 +1795,28 @@ void XdsClient::WatchEndpointData(
 | 
	
		
			
				|  |  |    if (!endpoint_state.update.priority_list_update.empty()) {
 | 
	
		
			
				|  |  |      w->OnEndpointChanged(endpoint_state.update);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (new_name) chand_->OnResourceNamesChanged(kEdsTypeUrl);
 | 
	
		
			
				|  |  | +  chand_->Subscribe(kEdsTypeUrl, eds_service_name_str);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
 | 
	
		
			
				|  |  |                                          EndpointWatcherInterface* watcher) {
 | 
	
		
			
				|  |  |    if (shutting_down_) return;
 | 
	
		
			
				|  |  | -  EndpointState& endpoint_state = endpoint_map_[eds_service_name];
 | 
	
		
			
				|  |  | +  std::string eds_service_name_str = std::string(eds_service_name);
 | 
	
		
			
				|  |  | +  EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
 | 
	
		
			
				|  |  |    auto it = endpoint_state.watchers.find(watcher);
 | 
	
		
			
				|  |  |    if (it != endpoint_state.watchers.end()) {
 | 
	
		
			
				|  |  |      endpoint_state.watchers.erase(it);
 | 
	
		
			
				|  |  |      if (endpoint_state.watchers.empty()) {
 | 
	
		
			
				|  |  | -      endpoint_map_.erase(eds_service_name);
 | 
	
		
			
				|  |  | -      chand_->OnResourceNamesChanged(kEdsTypeUrl);
 | 
	
		
			
				|  |  | +      endpoint_map_.erase(eds_service_name_str);
 | 
	
		
			
				|  |  | +      chand_->Unsubscribe(kEdsTypeUrl, eds_service_name_str);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  chand_->OnWatcherRemoved();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void XdsClient::AddClientStats(StringView /*lrs_server*/,
 | 
	
		
			
				|  |  |                                 StringView cluster_name,
 | 
	
		
			
				|  |  |                                 XdsClientStats* client_stats) {
 | 
	
		
			
				|  |  | -  EndpointState& endpoint_state = endpoint_map_[cluster_name];
 | 
	
		
			
				|  |  | +  EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
 | 
	
		
			
				|  |  |    // TODO(roth): When we add support for direct federation, use the
 | 
	
		
			
				|  |  |    // server name specified in lrs_server.
 | 
	
		
			
				|  |  |    endpoint_state.client_stats.insert(client_stats);
 | 
	
	
		
			
				|  | @@ -1705,7 +1826,7 @@ void XdsClient::AddClientStats(StringView /*lrs_server*/,
 | 
	
		
			
				|  |  |  void XdsClient::RemoveClientStats(StringView /*lrs_server*/,
 | 
	
		
			
				|  |  |                                    StringView cluster_name,
 | 
	
		
			
				|  |  |                                    XdsClientStats* client_stats) {
 | 
	
		
			
				|  |  | -  EndpointState& endpoint_state = endpoint_map_[cluster_name];
 | 
	
		
			
				|  |  | +  EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
 | 
	
		
			
				|  |  |    // TODO(roth): When we add support for direct federation, use the
 | 
	
		
			
				|  |  |    // server name specified in lrs_server.
 | 
	
		
			
				|  |  |    // TODO(roth): In principle, we should try to send a final load report
 | 
	
	
		
			
				|  | @@ -1745,32 +1866,11 @@ grpc_error* XdsClient::CreateServiceConfig(
 | 
	
		
			
				|  |  |    return error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -std::set<StringView> XdsClient::WatchedClusterNames() const {
 | 
	
		
			
				|  |  | -  std::set<StringView> cluster_names;
 | 
	
		
			
				|  |  | -  for (const auto& p : cluster_map_) {
 | 
	
		
			
				|  |  | -    const StringView& cluster_name = p.first;
 | 
	
		
			
				|  |  | -    const ClusterState& cluster_state = p.second;
 | 
	
		
			
				|  |  | -    // Don't request for the clusters that are cached before watched.
 | 
	
		
			
				|  |  | -    if (cluster_state.watchers.empty()) continue;
 | 
	
		
			
				|  |  | -    cluster_names.emplace(cluster_name);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return cluster_names;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -std::set<StringView> XdsClient::EdsServiceNames() const {
 | 
	
		
			
				|  |  | -  std::set<StringView> eds_service_names;
 | 
	
		
			
				|  |  | -  for (const auto& p : endpoint_map_) {
 | 
	
		
			
				|  |  | -    const StringView& eds_service_name = p.first;
 | 
	
		
			
				|  |  | -    eds_service_names.emplace(eds_service_name);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return eds_service_names;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -std::map<StringView, std::set<XdsClientStats*>> XdsClient::ClientStatsMap()
 | 
	
		
			
				|  |  | -    const {
 | 
	
		
			
				|  |  | -  std::map<StringView, std::set<XdsClientStats*>> client_stats_map;
 | 
	
		
			
				|  |  | +std::map<StringView, std::set<XdsClientStats*>, StringLess>
 | 
	
		
			
				|  |  | +XdsClient::ClientStatsMap() const {
 | 
	
		
			
				|  |  | +  std::map<StringView, std::set<XdsClientStats*>, StringLess> client_stats_map;
 | 
	
		
			
				|  |  |    for (const auto& p : endpoint_map_) {
 | 
	
		
			
				|  |  | -    const StringView& cluster_name = p.first;
 | 
	
		
			
				|  |  | +    const StringView cluster_name = p.first;
 | 
	
		
			
				|  |  |      const auto& client_stats = p.second.client_stats;
 | 
	
		
			
				|  |  |      if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) {
 | 
	
		
			
				|  |  |        client_stats_map.emplace(cluster_name, client_stats);
 |