Pārlūkot izejas kodu

Merge pull request #23938 from markdroth/xds_client_api_multiple_listeners_and_route_configs

Change XdsClient to support multiple LDS and RDS watchers
Mark D. Roth 5 gadi atpakaļ
vecāks
revīzija
f31a322e1c

+ 4 - 0
src/core/ext/filters/client_channel/client_channel.cc

@@ -1443,6 +1443,10 @@ ChannelData::ChannelConfigHelper::ApplyServiceConfig(
   // If resolver did not return a service config or returned an invalid service
   // config, we need a fallback service config.
   if (result.service_config_error != GRPC_ERROR_NONE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
+      gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
+              chand_, grpc_error_string(result.service_config_error));
+    }
     // If the service config was invalid, then fallback to the saved service
     // config. If there is no saved config either, use the default service
     // config.

+ 3 - 2
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc

@@ -288,7 +288,6 @@ CdsLb::~CdsLb() {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
     gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
   }
-  grpc_channel_args_destroy(args_);
 }
 
 void CdsLb::ShutdownLocked() {
@@ -305,8 +304,10 @@ void CdsLb::ShutdownLocked() {
       }
       xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_);
     }
-    xds_client_.reset();
+    xds_client_.reset(DEBUG_LOCATION, "CdsLb");
   }
+  grpc_channel_args_destroy(args_);
+  args_ = nullptr;
 }
 
 void CdsLb::MaybeDestroyChildPolicyLocked() {

+ 11 - 5
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc

@@ -400,7 +400,6 @@ EdsLb::~EdsLb() {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
     gpr_log(GPR_INFO, "[edslb %p] destroying xds LB policy", this);
   }
-  grpc_channel_args_destroy(args_);
 }
 
 void EdsLb::ShutdownLocked() {
@@ -426,9 +425,15 @@ void EdsLb::ShutdownLocked() {
       xds_client()->CancelEndpointDataWatch(GetEdsResourceName(),
                                             endpoint_watcher_);
     }
-    xds_client_from_channel_.reset();
+    xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb");
+  }
+  if (xds_client_ != nullptr) {
+    grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
+                                     interested_parties());
+    xds_client_.reset();
   }
-  xds_client_.reset();
+  grpc_channel_args_destroy(args_);
+  args_ = nullptr;
 }
 
 void EdsLb::MaybeDestroyChildPolicyLocked() {
@@ -456,11 +461,12 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
     if (xds_client_from_channel_ == nullptr) {
       grpc_error* error = GRPC_ERROR_NONE;
       xds_client_ = MakeOrphanable<XdsClient>(
-          work_serializer(), interested_parties(), GetEdsResourceName(),
-          nullptr /* service config watcher */, *args_, &error);
+          work_serializer(), GetEdsResourceName(), *args_, &error);
       // TODO(roth): If we decide that we care about EDS-only mode, add
       // proper error handling here.
       GPR_ASSERT(error == GRPC_ERROR_NONE);
+      grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
+                                       interested_parties());
       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
         gpr_log(GPR_INFO, "[edslb %p] Created xds client %p", this,
                 xds_client_.get());

+ 137 - 45
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc

@@ -66,19 +66,26 @@ class XdsResolver : public Resolver {
 
   void StartLocked() override;
 
-  void ShutdownLocked() override {
-    if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
-      gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
-    }
-    xds_client_.reset();
-  }
+  void ShutdownLocked() override;
 
  private:
   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
    public:
     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
         : resolver_(std::move(resolver)) {}
-    void OnListenerChanged(std::vector<XdsApi::Route> routes) override;
+    void OnListenerChanged(XdsApi::LdsUpdate listener) override;
+    void OnError(grpc_error* error) override;
+    void OnResourceDoesNotExist() override;
+
+   private:
+    RefCountedPtr<XdsResolver> resolver_;
+  };
+
+  class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
+   public:
+    explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
+        : resolver_(std::move(resolver)) {}
+    void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override;
     void OnError(grpc_error* error) override;
     void OnResourceDoesNotExist() override;
 
@@ -125,16 +132,21 @@ class XdsResolver : public Resolver {
     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
   };
 
-  void OnListenerChanged(std::vector<XdsApi::Route> routes);
-  grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
+  void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
   void OnError(grpc_error* error);
-  void PropagateUpdate();
+  void OnResourceDoesNotExist();
+
+  grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
+  void GenerateResult();
   void MaybeRemoveUnusedClusters();
 
   std::string server_name_;
   const grpc_channel_args* args_;
   grpc_pollset_set* interested_parties_;
   OrphanablePtr<XdsClient> xds_client_;
+  XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
+  std::string route_config_name_;
+  XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
   ClusterState::ClusterStateMap cluster_state_map_;
   std::vector<XdsApi::Route> current_update_;
 };
@@ -144,34 +156,69 @@ class XdsResolver : public Resolver {
 //
 
 void XdsResolver::ListenerWatcher::OnListenerChanged(
-    std::vector<XdsApi::Route> routes) {
+    XdsApi::LdsUpdate listener) {
   if (resolver_->xds_client_ == nullptr) return;
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data",
             resolver_.get());
   }
-  resolver_->OnListenerChanged(std::move(routes));
+  if (listener.route_config_name != resolver_->route_config_name_) {
+    if (resolver_->route_config_watcher_ != nullptr) {
+      resolver_->xds_client_->CancelRouteConfigDataWatch(
+          resolver_->route_config_name_, resolver_->route_config_watcher_,
+          /*delay_unsubscription=*/!listener.route_config_name.empty());
+      resolver_->route_config_watcher_ = nullptr;
+    }
+    resolver_->route_config_name_ = std::move(listener.route_config_name);
+    if (!resolver_->route_config_name_.empty()) {
+      auto watcher = absl::make_unique<RouteConfigWatcher>(resolver_->Ref());
+      resolver_->route_config_watcher_ = watcher.get();
+      resolver_->xds_client_->WatchRouteConfigData(
+          resolver_->route_config_name_, std::move(watcher));
+    }
+  }
+  if (resolver_->route_config_name_.empty()) {
+    GPR_ASSERT(listener.rds_update.has_value());
+    resolver_->OnRouteConfigUpdate(std::move(*listener.rds_update));
+  }
 }
 
 void XdsResolver::ListenerWatcher::OnError(grpc_error* error) {
   if (resolver_->xds_client_ == nullptr) return;
-  gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(),
-          grpc_error_string(error));
+  gpr_log(GPR_ERROR, "[xds_resolver %p] received listener error: %s",
+          resolver_.get(), grpc_error_string(error));
   resolver_->OnError(error);
 }
 
 void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
   if (resolver_->xds_client_ == nullptr) return;
-  gpr_log(GPR_ERROR,
-          "[xds_resolver %p] LDS/RDS resource does not exist -- returning "
-          "empty service config",
-          resolver_.get());
-  Result result;
-  result.service_config =
-      ServiceConfig::Create("{}", &result.service_config_error);
-  GPR_ASSERT(result.service_config != nullptr);
-  result.args = grpc_channel_args_copy(resolver_->args_);
-  resolver_->result_handler()->ReturnResult(std::move(result));
+  resolver_->OnResourceDoesNotExist();
+}
+
+//
+// XdsResolver::RouteConfigWatcher
+//
+
+void XdsResolver::RouteConfigWatcher::OnRouteConfigChanged(
+    XdsApi::RdsUpdate route_config) {
+  if (resolver_->xds_client_ == nullptr) return;
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
+    gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config data",
+            resolver_.get());
+  }
+  resolver_->OnRouteConfigUpdate(std::move(route_config));
+}
+
+void XdsResolver::RouteConfigWatcher::OnError(grpc_error* error) {
+  if (resolver_->xds_client_ == nullptr) return;
+  gpr_log(GPR_ERROR, "[xds_resolver %p] received route config error: %s",
+          resolver_.get(), grpc_error_string(error));
+  resolver_->OnError(error);
+}
+
+void XdsResolver::RouteConfigWatcher::OnResourceDoesNotExist() {
+  if (resolver_->xds_client_ == nullptr) return;
+  resolver_->OnResourceDoesNotExist();
 }
 
 //
@@ -420,24 +467,78 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
 
 void XdsResolver::StartLocked() {
   grpc_error* error = GRPC_ERROR_NONE;
-  xds_client_ = MakeOrphanable<XdsClient>(
-      work_serializer(), interested_parties_, server_name_,
-      absl::make_unique<ListenerWatcher>(Ref()), *args_, &error);
+  xds_client_ = MakeOrphanable<XdsClient>(work_serializer(), server_name_,
+                                          *args_, &error);
   if (error != GRPC_ERROR_NONE) {
     gpr_log(GPR_ERROR,
             "Failed to create xds client -- channel will remain in "
             "TRANSIENT_FAILURE: %s",
             grpc_error_string(error));
     result_handler()->ReturnError(error);
+    return;
+  }
+  grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
+                                   interested_parties_);
+  auto watcher = absl::make_unique<ListenerWatcher>(Ref());
+  listener_watcher_ = watcher.get();
+  xds_client_->WatchListenerData(server_name_, std::move(watcher));
+}
+
+void XdsResolver::ShutdownLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
+    gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
+  }
+  if (xds_client_ != nullptr) {
+    if (listener_watcher_ != nullptr) {
+      xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
+                                           /*delay_unsubscription=*/false);
+    }
+    if (route_config_watcher_ != nullptr) {
+      xds_client_->CancelRouteConfigDataWatch(
+          server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
+    }
+    grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
+                                     interested_parties_);
+    xds_client_.reset();
+  }
+}
+
+void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
+  // Find the relevant VirtualHost from the RouteConfiguration.
+  XdsApi::RdsUpdate::VirtualHost* vhost =
+      rds_update.FindVirtualHostForDomain(server_name_);
+  if (vhost == nullptr) {
+    OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+        absl::StrCat("could not find VirtualHost for ", server_name_,
+                     " in RouteConfiguration")
+            .c_str()));
+    return;
   }
+  // Save the list of routes in the resolver.
+  current_update_ = std::move(vhost->routes);
+  // Send a new result to the channel.
+  GenerateResult();
+}
+
+void XdsResolver::OnError(grpc_error* error) {
+  grpc_arg xds_client_arg = xds_client_->MakeChannelArg();
+  Result result;
+  result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1);
+  result.service_config_error = error;
+  result_handler()->ReturnResult(std::move(result));
 }
 
-void XdsResolver::OnListenerChanged(std::vector<XdsApi::Route> routes) {
-  // Save the update in the resolver.
-  current_update_ = std::move(routes);
-  // Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
-  // and ReturnResult.
-  PropagateUpdate();
+void XdsResolver::OnResourceDoesNotExist() {
+  gpr_log(GPR_ERROR,
+          "[xds_resolver %p] LDS/RDS resource does not exist -- returning "
+          "empty service config",
+          this);
+  Result result;
+  result.service_config =
+      ServiceConfig::Create("{}", &result.service_config_error);
+  GPR_ASSERT(result.service_config != nullptr);
+  result.args = grpc_channel_args_copy(args_);
+  result_handler()->ReturnResult(std::move(result));
 }
 
 grpc_error* XdsResolver::CreateServiceConfig(
@@ -472,15 +573,7 @@ grpc_error* XdsResolver::CreateServiceConfig(
   return error;
 }
 
-void XdsResolver::OnError(grpc_error* error) {
-  grpc_arg xds_client_arg = xds_client_->MakeChannelArg();
-  Result result;
-  result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1);
-  result.service_config_error = error;
-  result_handler()->ReturnResult(std::move(result));
-}
-
-void XdsResolver::PropagateUpdate() {
+void XdsResolver::GenerateResult() {
   // First create XdsConfigSelector, which may add new entries to the cluster
   // state map, and then CreateServiceConfig for LB policies.
   auto config_selector =
@@ -516,9 +609,8 @@ void XdsResolver::MaybeRemoveUnusedClusters() {
     }
   }
   if (update_needed && xds_client_ != nullptr) {
-    // Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
-    // and ReturnResult.
-    PropagateUpdate();
+    // Send a new result to the channel.
+    GenerateResult();
   }
 }
 

+ 4 - 0
src/core/ext/filters/client_channel/resolving_lb_policy.cc

@@ -173,6 +173,10 @@ ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
 
 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
   if (resolver_ != nullptr) {
+    if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+      gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
+              resolver_.get());
+    }
     resolver_.reset();
     if (lb_policy_ != nullptr) {
       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {

+ 49 - 34
src/core/ext/xds/xds_api.cc

@@ -228,8 +228,8 @@ std::string XdsApi::Route::Matchers::HeaderMatcher::ToString() const {
 std::string XdsApi::Route::Matchers::ToString() const {
   std::vector<std::string> contents;
   contents.push_back(path_matcher.ToString());
-  for (const auto& header_it : header_matchers) {
-    contents.push_back(header_it.ToString());
+  for (const HeaderMatcher& header_matcher : header_matchers) {
+    contents.push_back(header_matcher.ToString());
   }
   if (fraction_per_million.has_value()) {
     contents.push_back(absl::StrFormat("Fraction Per Million %d",
@@ -248,8 +248,8 @@ std::string XdsApi::Route::ToString() const {
   if (!cluster_name.empty()) {
     contents.push_back(absl::StrFormat("Cluster name: %s", cluster_name));
   }
-  for (const auto& weighted_it : weighted_clusters) {
-    contents.push_back(weighted_it.ToString());
+  for (const ClusterWeight& cluster_weight : weighted_clusters) {
+    contents.push_back(cluster_weight.ToString());
   }
   return absl::StrJoin(contents, "\n");
 }
@@ -333,8 +333,8 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) {
 
 }  // namespace
 
-const XdsApi::RdsUpdate::VirtualHost*
-XdsApi::RdsUpdate::FindVirtualHostForDomain(const std::string& domain) const {
+XdsApi::RdsUpdate::VirtualHost* XdsApi::RdsUpdate::FindVirtualHostForDomain(
+    const std::string& domain) {
   // Find the best matched virtual host.
   // The search order for 4 groups of domain patterns:
   //   1. Exact match.
@@ -344,12 +344,12 @@ XdsApi::RdsUpdate::FindVirtualHostForDomain(const std::string& domain) const {
   // Within each group, longest match wins.
   // If the same best matched domain pattern appears in multiple virtual hosts,
   // the first matched virtual host wins.
-  const VirtualHost* target_vhost = nullptr;
+  VirtualHost* target_vhost = nullptr;
   MatchType best_match_type = INVALID_MATCH;
   size_t longest_match = 0;
   // Check each domain pattern in each virtual host to determine the best
   // matched virtual host.
-  for (const VirtualHost& vhost : virtual_hosts) {
+  for (VirtualHost& vhost : virtual_hosts) {
     for (const std::string& domain_pattern : vhost.domains) {
       // Check the match type first. Skip the pattern if it's not better than
       // current match.
@@ -1568,7 +1568,9 @@ grpc_error* RouteConfigParse(
       std::string domain_pattern = UpbStringToStdString(domains[j]);
       const MatchType match_type = DomainPatternMatchType(domain_pattern);
       if (match_type == INVALID_MATCH) {
-        return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid domain pattern.");
+        return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+            absl::StrCat("Invalid domain pattern \"", domain_pattern, "\".")
+                .c_str());
       }
       vhost.domains.emplace_back(std::move(domain_pattern));
     }
@@ -1624,8 +1626,8 @@ grpc_error* RouteConfigParse(
 grpc_error* LdsResponseParse(
     XdsClient* client, TraceFlag* tracer,
     const envoy_service_discovery_v3_DiscoveryResponse* response,
-    const std::string& expected_server_name,
-    absl::optional<XdsApi::LdsUpdate>* lds_update, upb_arena* arena) {
+    const std::set<absl::string_view>& expected_listener_names,
+    XdsApi::LdsUpdateMap* lds_update_map, upb_arena* arena) {
   // Get the resources from the response.
   size_t size;
   const google_protobuf_Any* const* resources =
@@ -1647,9 +1649,19 @@ grpc_error* LdsResponseParse(
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode listener.");
     }
     // Check listener name. Ignore unexpected listeners.
-    absl::string_view name =
-        UpbStringToAbsl(envoy_config_listener_v3_Listener_name(listener));
-    if (name != expected_server_name) continue;
+    std::string listener_name =
+        UpbStringToStdString(envoy_config_listener_v3_Listener_name(listener));
+    if (expected_listener_names.find(listener_name) ==
+        expected_listener_names.end()) {
+      continue;
+    }
+    // Fail if listener name is duplicated.
+    if (lds_update_map->find(listener_name) != lds_update_map->end()) {
+      return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+          absl::StrCat("duplicate listener name \"", listener_name, "\"")
+              .c_str());
+    }
+    XdsApi::LdsUpdate& lds_update = (*lds_update_map)[listener_name];
     // Get api_listener and decode it to http_connection_manager.
     const envoy_config_listener_v3_ApiListener* api_listener =
         envoy_config_listener_v3_Listener_api_listener(listener);
@@ -1677,9 +1689,8 @@ grpc_error* LdsResponseParse(
       grpc_error* error =
           RouteConfigParse(client, tracer, route_config, &rds_update);
       if (error != GRPC_ERROR_NONE) return error;
-      lds_update->emplace();
-      (*lds_update)->rds_update = std::move(rds_update);
-      return GRPC_ERROR_NONE;
+      lds_update.rds_update = std::move(rds_update);
+      continue;
     }
     // Validate that RDS must be used to get the route_config dynamically.
     if (!envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_rds(
@@ -1703,11 +1714,9 @@ grpc_error* LdsResponseParse(
           "HttpConnectionManager ConfigSource for RDS does not specify ADS.");
     }
     // Get the route_config_name.
-    lds_update->emplace();
-    (*lds_update)->route_config_name = UpbStringToStdString(
+    lds_update.route_config_name = UpbStringToStdString(
         envoy_extensions_filters_network_http_connection_manager_v3_Rds_route_config_name(
             rds));
-    return GRPC_ERROR_NONE;
   }
   return GRPC_ERROR_NONE;
 }
@@ -1716,7 +1725,7 @@ grpc_error* RdsResponseParse(
     XdsClient* client, TraceFlag* tracer,
     const envoy_service_discovery_v3_DiscoveryResponse* response,
     const std::set<absl::string_view>& expected_route_configuration_names,
-    absl::optional<XdsApi::RdsUpdate>* rds_update, upb_arena* arena) {
+    XdsApi::RdsUpdateMap* rds_update_map, upb_arena* arena) {
   // Get the resources from the response.
   size_t size;
   const google_protobuf_Any* const* resources =
@@ -1738,19 +1747,25 @@ grpc_error* RdsResponseParse(
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode route_config.");
     }
     // Check route_config_name. Ignore unexpected route_config.
-    absl::string_view route_config_name = UpbStringToAbsl(
+    std::string route_config_name = UpbStringToStdString(
         envoy_config_route_v3_RouteConfiguration_name(route_config));
     if (expected_route_configuration_names.find(route_config_name) ==
         expected_route_configuration_names.end()) {
       continue;
     }
+    // Fail if route config name is duplicated.
+    if (rds_update_map->find(route_config_name) != rds_update_map->end()) {
+      return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+          absl::StrCat("duplicate route config name \"", route_config_name,
+                       "\"")
+              .c_str());
+    }
     // Parse the route_config.
-    XdsApi::RdsUpdate local_rds_update;
+    XdsApi::RdsUpdate& rds_update =
+        (*rds_update_map)[std::move(route_config_name)];
     grpc_error* error =
-        RouteConfigParse(client, tracer, route_config, &local_rds_update);
+        RouteConfigParse(client, tracer, route_config, &rds_update);
     if (error != GRPC_ERROR_NONE) return error;
-    rds_update->emplace(std::move(local_rds_update));
-    return GRPC_ERROR_NONE;
   }
   return GRPC_ERROR_NONE;
 }
@@ -1766,7 +1781,6 @@ grpc_error* CdsResponseParse(
       envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size);
   // Parse all the resources in the CDS response.
   for (size_t i = 0; i < size; ++i) {
-    XdsApi::CdsUpdate cds_update;
     // Check the type_url of the resource.
     absl::string_view type_url =
         UpbStringToAbsl(google_protobuf_Any_type_url(resources[i]));
@@ -1795,6 +1809,7 @@ grpc_error* CdsResponseParse(
           absl::StrCat("duplicate resource name \"", cluster_name, "\"")
               .c_str());
     }
+    XdsApi::CdsUpdate& cds_update = (*cds_update_map)[std::move(cluster_name)];
     // Check the cluster_discovery_type.
     if (!envoy_config_cluster_v3_Cluster_has_type(cluster)) {
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
@@ -1836,7 +1851,6 @@ grpc_error* CdsResponseParse(
       }
       cds_update.lrs_load_reporting_server_name.emplace("");
     }
-    cds_update_map->emplace(std::move(cluster_name), std::move(cds_update));
   }
   return GRPC_ERROR_NONE;
 }
@@ -1962,7 +1976,6 @@ grpc_error* EdsResponseParse(
   const google_protobuf_Any* const* resources =
       envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size);
   for (size_t i = 0; i < size; ++i) {
-    XdsApi::EdsUpdate eds_update;
     // Check the type_url of the resource.
     absl::string_view type_url =
         UpbStringToAbsl(google_protobuf_Any_type_url(resources[i]));
@@ -1995,6 +2008,8 @@ grpc_error* EdsResponseParse(
           absl::StrCat("duplicate resource name \"", eds_service_name, "\"")
               .c_str());
     }
+    XdsApi::EdsUpdate& eds_update =
+        (*eds_update_map)[std::move(eds_service_name)];
     // Get the endpoints.
     size_t locality_size;
     const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
@@ -2038,7 +2053,6 @@ grpc_error* EdsResponseParse(
         if (error != GRPC_ERROR_NONE) return error;
       }
     }
-    eds_update_map->emplace(std::move(eds_service_name), std::move(eds_update));
   }
   return GRPC_ERROR_NONE;
 }
@@ -2059,7 +2073,8 @@ std::string TypeUrlInternalToExternal(absl::string_view type_url) {
 }  // namespace
 
 XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
-    const grpc_slice& encoded_response, const std::string& expected_server_name,
+    const grpc_slice& encoded_response,
+    const std::set<absl::string_view>& expected_listener_names,
     const std::set<absl::string_view>& expected_route_configuration_names,
     const std::set<absl::string_view>& expected_cluster_names,
     const std::set<absl::string_view>& expected_eds_service_names) {
@@ -2087,12 +2102,12 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
   // Parse the response according to the resource type.
   if (IsLds(result.type_url)) {
     result.parse_error =
-        LdsResponseParse(client_, tracer_, response, expected_server_name,
-                         &result.lds_update, arena.ptr());
+        LdsResponseParse(client_, tracer_, response, expected_listener_names,
+                         &result.lds_update_map, arena.ptr());
   } else if (IsRds(result.type_url)) {
     result.parse_error = RdsResponseParse(client_, tracer_, response,
                                           expected_route_configuration_names,
-                                          &result.rds_update, arena.ptr());
+                                          &result.rds_update_map, arena.ptr());
   } else if (IsCds(result.type_url)) {
     result.parse_error =
         CdsResponseParse(client_, tracer_, response, expected_cluster_names,

+ 10 - 5
src/core/ext/xds/xds_api.h

@@ -147,8 +147,7 @@ class XdsApi {
       return virtual_hosts == other.virtual_hosts;
     }
     std::string ToString() const;
-    const VirtualHost* FindVirtualHostForDomain(
-        const std::string& domain) const;
+    VirtualHost* FindVirtualHostForDomain(const std::string& domain);
   };
 
   // TODO(roth): When we can use absl::variant<>, consider using that
@@ -179,6 +178,12 @@ class XdsApi {
     // If set to the empty string, will use the same server we obtained the CDS
     // data from.
     absl::optional<std::string> lrs_load_reporting_server_name;
+
+    bool operator==(const CdsUpdate& other) const {
+      return eds_service_name == other.eds_service_name &&
+             lrs_load_reporting_server_name ==
+                 other.lrs_load_reporting_server_name;
+    }
   };
 
   using CdsUpdateMap = std::map<std::string /*cluster_name*/, CdsUpdate>;
@@ -296,14 +301,14 @@ class XdsApi {
     std::string version;
     std::string nonce;
     std::string type_url;
-    absl::optional<LdsUpdate> lds_update;
-    absl::optional<RdsUpdate> rds_update;
+    LdsUpdateMap lds_update_map;
+    RdsUpdateMap rds_update_map;
     CdsUpdateMap cds_update_map;
     EdsUpdateMap eds_update_map;
   };
   AdsParseResult ParseAdsResponse(
       const grpc_slice& encoded_response,
-      const std::string& expected_server_name,
+      const std::set<absl::string_view>& expected_listener_names,
       const std::set<absl::string_view>& expected_route_configuration_names,
       const std::set<absl::string_view>& expected_cluster_names,
       const std::set<absl::string_view>& expected_eds_service_names);

+ 233 - 133
src/core/ext/xds/xds_client.cc

@@ -144,14 +144,14 @@ class XdsClient::ChannelState::AdsCallState
 
     void Orphan() override {
       Finish();
-      Unref();
+      Unref(DEBUG_LOCATION, "Orphan");
     }
 
     void Start(RefCountedPtr<AdsCallState> ads_calld) {
       if (sent_) return;
       sent_ = true;
       ads_calld_ = std::move(ads_calld);
-      Ref().release();
+      Ref(DEBUG_LOCATION, "timer").release();
       timer_pending_ = true;
       grpc_timer_init(
           &timer_,
@@ -186,27 +186,34 @@ class XdsClient::ChannelState::AdsCallState
           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
                   grpc_error_string(watcher_error));
         }
-        if (type_url_ == XdsApi::kLdsTypeUrl ||
-            type_url_ == XdsApi::kRdsTypeUrl) {
-          ads_calld_->xds_client()->listener_watcher_->OnError(watcher_error);
+        if (type_url_ == XdsApi::kLdsTypeUrl) {
+          ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
+          for (const auto& p : state.watchers) {
+            p.first->OnError(GRPC_ERROR_REF(watcher_error));
+          }
+        } else if (type_url_ == XdsApi::kRdsTypeUrl) {
+          RouteConfigState& state =
+              ads_calld_->xds_client()->route_config_map_[name_];
+          for (const auto& p : state.watchers) {
+            p.first->OnError(GRPC_ERROR_REF(watcher_error));
+          }
         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
           for (const auto& p : state.watchers) {
             p.first->OnError(GRPC_ERROR_REF(watcher_error));
           }
-          GRPC_ERROR_UNREF(watcher_error);
         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
           for (const auto& p : state.watchers) {
             p.first->OnError(GRPC_ERROR_REF(watcher_error));
           }
-          GRPC_ERROR_UNREF(watcher_error);
         } else {
           GPR_UNREACHABLE_CODE(return );
         }
+        GRPC_ERROR_UNREF(watcher_error);
       }
       ads_calld_.reset();
-      Unref();
+      Unref(DEBUG_LOCATION, "timer");
       GRPC_ERROR_UNREF(error);
     }
 
@@ -235,8 +242,8 @@ class XdsClient::ChannelState::AdsCallState
 
   void SendMessageLocked(const std::string& type_url);
 
-  void AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update);
-  void AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update);
+  void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
+  void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
 
@@ -489,6 +496,7 @@ XdsClient::ChannelState::~ChannelState() {
             this);
   }
   grpc_channel_destroy(channel_);
+  xds_client_.reset(DEBUG_LOCATION, "ChannelState");
 }
 
 void XdsClient::ChannelState::Orphan() {
@@ -525,7 +533,7 @@ void XdsClient::ChannelState::StartConnectivityWatchLocked() {
   grpc_channel_element* client_channel_elem =
       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
-  watcher_ = new StateWatcher(Ref());
+  watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
   grpc_client_channel_start_connectivity_watch(
       client_channel_elem, GRPC_CHANNEL_IDLE,
       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
@@ -560,8 +568,11 @@ void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
                                           const std::string& name,
                                           bool delay_unsubscription) {
   if (ads_calld_ != nullptr) {
-    ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription);
-    if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
+    auto* calld = ads_calld_->calld();
+    if (calld != nullptr) {
+      calld->Unsubscribe(type_url, name, delay_unsubscription);
+      if (!calld->HasSubscribedResources()) ads_calld_.reset();
+    }
   }
 }
 
@@ -678,7 +689,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
   // activity in xds_client()->interested_parties_, which is comprised of
   // the polling entities from client_channel.
   GPR_ASSERT(xds_client() != nullptr);
-  GPR_ASSERT(!xds_client()->server_name_.empty());
   // Create a call with the specified method name.
   const auto& method =
       xds_client()->bootstrap_->server().ShouldUseV3()
@@ -717,13 +727,11 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
   // Op: send request message.
   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
                     grpc_schedule_on_exec_ctx);
-  if (xds_client()->listener_watcher_ != nullptr) {
-    Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_);
-    if (xds_client()->lds_result_.has_value() &&
-        !xds_client()->lds_result_->route_config_name.empty()) {
-      Subscribe(XdsApi::kRdsTypeUrl,
-                xds_client()->lds_result_->route_config_name);
-    }
+  for (const auto& p : xds_client()->listener_map_) {
+    Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
+  }
+  for (const auto& p : xds_client()->route_config_map_) {
+    Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
   }
   for (const auto& p : xds_client()->cluster_map_) {
     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
@@ -867,113 +875,128 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
 }
 
 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
-    absl::optional<XdsApi::LdsUpdate> lds_update) {
-  if (!lds_update.has_value()) {
-    gpr_log(GPR_INFO,
-            "[xds_client %p] LDS update does not include requested resource",
-            xds_client());
-    if (xds_client()->lds_result_.has_value() &&
-        !xds_client()->lds_result_->route_config_name.empty()) {
-      Unsubscribe(XdsApi::kRdsTypeUrl,
-                  xds_client()->lds_result_->route_config_name,
-                  /*delay_unsubscription=*/false);
-      xds_client()->rds_result_.reset();
-    }
-    xds_client()->lds_result_.reset();
-    xds_client()->listener_watcher_->OnResourceDoesNotExist();
-    return;
-  }
+    XdsApi::LdsUpdateMap lds_update_map) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     gpr_log(GPR_INFO,
-            "[xds_client %p] LDS update received: route_config_name=%s",
-            xds_client(),
-            (!lds_update->route_config_name.empty()
-                 ? lds_update->route_config_name.c_str()
-                 : "<inlined>"));
-    if (lds_update->rds_update.has_value()) {
-      gpr_log(GPR_INFO, "RouteConfiguration: %s",
-              lds_update->rds_update->ToString().c_str());
-    }
+            "[xds_client %p] LDS update received containing %" PRIuPTR
+            " resources",
+            xds_client(), lds_update_map.size());
   }
   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
-  auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
-  if (state != nullptr) state->Finish();
-  // Ignore identical update.
-  if (xds_client()->lds_result_ == lds_update) {
+  std::set<std::string> rds_resource_names_seen;
+  for (auto& p : lds_update_map) {
+    const std::string& listener_name = p.first;
+    XdsApi::LdsUpdate& lds_update = p.second;
+    auto& state = lds_state.subscribed_resources[listener_name];
+    if (state != nullptr) state->Finish();
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-      gpr_log(GPR_INFO,
-              "[xds_client %p] LDS update identical to current, ignoring.",
-              xds_client());
+      gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
+              xds_client(), listener_name.c_str(),
+              (!lds_update.route_config_name.empty()
+                   ? lds_update.route_config_name.c_str()
+                   : "<inlined>"));
+      if (lds_update.rds_update.has_value()) {
+        gpr_log(GPR_INFO, "RouteConfiguration: %s",
+                lds_update.rds_update->ToString().c_str());
+      }
+    }
+    // Record the RDS resource names seen.
+    if (!lds_update.route_config_name.empty()) {
+      rds_resource_names_seen.insert(lds_update.route_config_name);
+    }
+    // Ignore identical update.
+    ListenerState& listener_state = xds_client()->listener_map_[listener_name];
+    if (listener_state.update.has_value() &&
+        *listener_state.update == lds_update) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+        gpr_log(GPR_INFO,
+                "[xds_client %p] LDS update for %s identical to current, "
+                "ignoring.",
+                xds_client(), listener_name.c_str());
+      }
+      continue;
+    }
+    // Update the listener state.
+    listener_state.update = std::move(lds_update);
+    // Notify watchers.
+    for (const auto& p : listener_state.watchers) {
+      p.first->OnListenerChanged(*listener_state.update);
     }
-    return;
   }
-  if (xds_client()->lds_result_.has_value() &&
-      !xds_client()->lds_result_->route_config_name.empty()) {
-    Unsubscribe(
-        XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name,
-        /*delay_unsubscription=*/!lds_update->route_config_name.empty());
-    xds_client()->rds_result_.reset();
-  }
-  xds_client()->lds_result_ = std::move(lds_update);
-  if (xds_client()->lds_result_->rds_update.has_value()) {
-    // If the RouteConfiguration was found inlined in LDS response, notify
-    // the watcher immediately.
-    const XdsApi::RdsUpdate::VirtualHost* vhost =
-        xds_client()->lds_result_->rds_update->FindVirtualHostForDomain(
-            xds_client()->server_name_);
-    if (vhost == nullptr) {
-      xds_client()->listener_watcher_->OnError(
-          GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-              "no VirtualHost found for domain"));
-    } else {
-      xds_client()->listener_watcher_->OnListenerChanged(vhost->routes);
+  // For any subscribed resource that is not present in the update,
+  // remove it from the cache and notify watchers that it does not exist.
+  for (const auto& p : lds_state.subscribed_resources) {
+    const std::string& listener_name = p.first;
+    if (lds_update_map.find(listener_name) == lds_update_map.end()) {
+      ListenerState& listener_state =
+          xds_client()->listener_map_[listener_name];
+      // If the resource was newly requested but has not yet been received,
+      // we don't want to generate an error for the watchers, because this LDS
+      // response may be in reaction to an earlier request that did not yet
+      // request the new resource, so its absence from the response does not
+      // necessarily indicate that the resource does not exist.
+      // For that case, we rely on the request timeout instead.
+      if (!listener_state.update.has_value()) continue;
+      listener_state.update.reset();
+      for (const auto& p : listener_state.watchers) {
+        p.first->OnResourceDoesNotExist();
+      }
+    }
+  }
+  // For any RDS resource that is no longer referred to by any LDS
+  // resources, remove it from the cache and notify watchers that it
+  // does not exist.
+  auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
+  for (const auto& p : rds_state.subscribed_resources) {
+    const std::string& rds_resource_name = p.first;
+    if (rds_resource_names_seen.find(rds_resource_name) ==
+        rds_resource_names_seen.end()) {
+      RouteConfigState& route_config_state =
+          xds_client()->route_config_map_[rds_resource_name];
+      route_config_state.update.reset();
+      for (const auto& p : route_config_state.watchers) {
+        p.first->OnResourceDoesNotExist();
+      }
     }
-  } else {
-    // Send RDS request for dynamic resolution.
-    Subscribe(XdsApi::kRdsTypeUrl,
-              xds_client()->lds_result_->route_config_name);
   }
 }
 
 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
-    absl::optional<XdsApi::RdsUpdate> rds_update) {
-  if (!rds_update.has_value()) {
-    gpr_log(GPR_INFO,
-            "[xds_client %p] RDS update does not include requested resource",
-            xds_client());
-    xds_client()->rds_result_.reset();
-    xds_client()->listener_watcher_->OnResourceDoesNotExist();
-    return;
-  }
+    XdsApi::RdsUpdateMap rds_update_map) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-    gpr_log(GPR_INFO, "[xds_client %p] RDS update received:\n%s", xds_client(),
-            rds_update->ToString().c_str());
+    gpr_log(GPR_INFO,
+            "[xds_client %p] RDS update received containing %" PRIuPTR
+            " resources",
+            xds_client(), rds_update_map.size());
   }
-  auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
-  auto& state =
-      rds_state
-          .subscribed_resources[xds_client()->lds_result_->route_config_name];
-  if (state != nullptr) state->Finish();
-  // Ignore identical update.
-  if (xds_client()->rds_result_ == rds_update) {
+  auto& rds_state = state_map_[XdsApi::kLdsTypeUrl];
+  for (auto& p : rds_update_map) {
+    const std::string& route_config_name = p.first;
+    XdsApi::RdsUpdate& rds_update = p.second;
+    auto& state = rds_state.subscribed_resources[route_config_name];
+    if (state != nullptr) state->Finish();
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-      gpr_log(GPR_INFO,
-              "[xds_client %p] RDS update identical to current, ignoring.",
-              xds_client());
+      gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
+              rds_update.ToString().c_str());
+    }
+    RouteConfigState& route_config_state =
+        xds_client()->route_config_map_[route_config_name];
+    // Ignore identical update.
+    if (route_config_state.update.has_value() &&
+        *route_config_state.update == rds_update) {
+      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+        gpr_log(GPR_INFO,
+                "[xds_client %p] RDS resource identical to current, ignoring",
+                xds_client());
+      }
+      continue;
+    }
+    // Update the cache.
+    route_config_state.update = std::move(rds_update);
+    // Notify all watchers.
+    for (const auto& p : route_config_state.watchers) {
+      p.first->OnRouteConfigChanged(*route_config_state.update);
     }
-    return;
-  }
-  xds_client()->rds_result_ = std::move(rds_update);
-  // Notify the watcher.
-  const XdsApi::RdsUpdate::VirtualHost* vhost =
-      xds_client()->rds_result_->FindVirtualHostForDomain(
-          xds_client()->server_name_);
-  if (vhost == nullptr) {
-    xds_client()->listener_watcher_->OnError(
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-            "no VirtualHost found for domain"));
-  } else {
-    xds_client()->listener_watcher_->OnListenerChanged(vhost->routes);
   }
 }
 
@@ -1008,9 +1031,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
     // Ignore identical update.
     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
     if (cluster_state.update.has_value() &&
-        cds_update.eds_service_name == cluster_state.update->eds_service_name &&
-        cds_update.lrs_load_reporting_server_name ==
-            cluster_state.update->lrs_load_reporting_server_name) {
+        *cluster_state.update == cds_update) {
       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
         gpr_log(GPR_INFO,
                 "[xds_client %p] CDS update identical to current, ignoring.",
@@ -1157,7 +1178,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
   recv_message_payload_ = nullptr;
   // Parse and validate the response.
   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
-      response_slice, xds_client()->server_name_,
+      response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
@@ -1187,9 +1208,9 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
       seen_response_ = true;
       // Accept the ADS response according to the type_url.
       if (result.type_url == XdsApi::kLdsTypeUrl) {
-        AcceptLdsUpdate(std::move(result.lds_update));
+        AcceptLdsUpdate(std::move(result.lds_update_map));
       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
-        AcceptRdsUpdate(std::move(result.rds_update));
+        AcceptRdsUpdate(std::move(result.rds_update_map));
       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
         AcceptCdsUpdate(std::move(result.cds_update_map));
       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
@@ -1272,7 +1293,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
     for (auto& p : it->second.subscribed_resources) {
       resource_names.insert(p.first);
       OrphanablePtr<ResourceState>& state = p.second;
-      state->Start(Ref());
+      state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
     }
   }
   return resource_names;
@@ -1746,19 +1767,16 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
 }  // namespace
 
 XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
-                     grpc_pollset_set* interested_parties,
                      absl::string_view server_name,
-                     std::unique_ptr<ListenerWatcherInterface> watcher,
                      const grpc_channel_args& channel_args, grpc_error** error)
     : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
       request_timeout_(GetRequestTimeout(channel_args)),
       work_serializer_(std::move(work_serializer)),
-      interested_parties_(interested_parties),
+      interested_parties_(grpc_pollset_set_create()),
       bootstrap_(
           XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
       api_(this, &grpc_xds_client_trace, bootstrap_.get()),
-      server_name_(server_name),
-      listener_watcher_(std::move(watcher)) {
+      server_name_(server_name) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
   }
@@ -1781,15 +1799,13 @@ XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
   }
   chand_ = MakeOrphanable<ChannelState>(
       Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
-  if (listener_watcher_ != nullptr) {
-    chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name));
-  }
 }
 
 XdsClient::~XdsClient() {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
   }
+  grpc_pollset_set_destroy(interested_parties_);
 }
 
 void XdsClient::Orphan() {
@@ -1804,13 +1820,88 @@ void XdsClient::Orphan() {
   // possible for ADS calls to be in progress. Unreffing the loadbalancing
   // policies before those calls are done would lead to issues such as
   // https://github.com/grpc/grpc/issues/20928.
-  if (listener_watcher_ != nullptr) {
+  if (!listener_map_.empty()) {
     cluster_map_.clear();
     endpoint_map_.clear();
   }
   Unref(DEBUG_LOCATION, "XdsClient::Orphan()");
 }
 
+void XdsClient::WatchListenerData(
+    absl::string_view listener_name,
+    std::unique_ptr<ListenerWatcherInterface> watcher) {
+  std::string listener_name_str = std::string(listener_name);
+  ListenerState& listener_state = listener_map_[listener_name_str];
+  ListenerWatcherInterface* w = watcher.get();
+  listener_state.watchers[w] = std::move(watcher);
+  // If we've already received an LDS update, notify the new watcher
+  // immediately.
+  if (listener_state.update.has_value()) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+      gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
+              this, listener_name_str.c_str());
+    }
+    w->OnListenerChanged(*listener_state.update);
+  }
+  chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
+}
+
+void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
+                                        ListenerWatcherInterface* watcher,
+                                        bool delay_unsubscription) {
+  if (shutting_down_) return;
+  std::string listener_name_str = std::string(listener_name);
+  ListenerState& listener_state = listener_map_[listener_name_str];
+  auto it = listener_state.watchers.find(watcher);
+  if (it != listener_state.watchers.end()) {
+    listener_state.watchers.erase(it);
+    if (listener_state.watchers.empty()) {
+      listener_map_.erase(listener_name_str);
+      chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
+                          delay_unsubscription);
+    }
+  }
+}
+
+void XdsClient::WatchRouteConfigData(
+    absl::string_view route_config_name,
+    std::unique_ptr<RouteConfigWatcherInterface> watcher) {
+  std::string route_config_name_str = std::string(route_config_name);
+  RouteConfigState& route_config_state =
+      route_config_map_[route_config_name_str];
+  RouteConfigWatcherInterface* w = watcher.get();
+  route_config_state.watchers[w] = std::move(watcher);
+  // If we've already received an RDS update, notify the new watcher
+  // immediately.
+  if (route_config_state.update.has_value()) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+      gpr_log(GPR_INFO,
+              "[xds_client %p] returning cached route config data for %s", this,
+              route_config_name_str.c_str());
+    }
+    w->OnRouteConfigChanged(*route_config_state.update);
+  }
+  chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
+}
+
+void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
+                                           RouteConfigWatcherInterface* watcher,
+                                           bool delay_unsubscription) {
+  if (shutting_down_) return;
+  std::string route_config_name_str = std::string(route_config_name);
+  RouteConfigState& route_config_state =
+      route_config_map_[route_config_name_str];
+  auto it = route_config_state.watchers.find(watcher);
+  if (it != route_config_state.watchers.end()) {
+    route_config_state.watchers.erase(it);
+    if (route_config_state.watchers.empty()) {
+      route_config_map_.erase(route_config_name_str);
+      chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
+                          delay_unsubscription);
+    }
+  }
+}
+
 void XdsClient::WatchClusterData(
     absl::string_view cluster_name,
     std::unique_ptr<ClusterWatcherInterface> watcher) {
@@ -1818,7 +1909,7 @@ void XdsClient::WatchClusterData(
   ClusterState& cluster_state = cluster_map_[cluster_name_str];
   ClusterWatcherInterface* w = watcher.get();
   cluster_state.watchers[w] = std::move(watcher);
-  // If we've already received an CDS update, notify the new watcher
+  // If we've already received a CDS update, notify the new watcher
   // immediately.
   if (cluster_state.update.has_value()) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@@ -2048,8 +2139,17 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
 }
 
 void XdsClient::NotifyOnError(grpc_error* error) {
-  if (listener_watcher_ != nullptr) {
-    listener_watcher_->OnError(GRPC_ERROR_REF(error));
+  for (const auto& p : listener_map_) {
+    const ListenerState& listener_state = p.second;
+    for (const auto& p : listener_state.watchers) {
+      p.first->OnError(GRPC_ERROR_REF(error));
+    }
+  }
+  for (const auto& p : route_config_map_) {
+    const RouteConfigState& route_config_state = p.second;
+    for (const auto& p : route_config_state.watchers) {
+      p.first->OnError(GRPC_ERROR_REF(error));
+    }
   }
   for (const auto& p : cluster_map_) {
     const ClusterState& cluster_state = p.second;
@@ -2093,8 +2193,8 @@ RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
     const grpc_channel_args& args) {
   XdsClient* xds_client =
       grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
-  if (xds_client != nullptr) return xds_client->Ref();
-  return nullptr;
+  if (xds_client == nullptr) return nullptr;
+  return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
 }
 
 grpc_channel_args* XdsClient::RemoveFromChannelArgs(

+ 73 - 7
src/core/ext/xds/xds_client.h

@@ -20,6 +20,7 @@
 #include <grpc/support/port_platform.h>
 
 #include <set>
+#include <vector>
 
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
@@ -45,7 +46,19 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
    public:
     virtual ~ListenerWatcherInterface() = default;
 
-    virtual void OnListenerChanged(std::vector<XdsApi::Route> routes) = 0;
+    virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
+
+    virtual void OnError(grpc_error* error) = 0;
+
+    virtual void OnResourceDoesNotExist() = 0;
+  };
+
+  // RouteConfiguration data watcher interface.  Implemented by callers.
+  class RouteConfigWatcherInterface {
+   public:
+    virtual ~RouteConfigWatcherInterface() = default;
+
+    virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
 
     virtual void OnError(grpc_error* error) = 0;
 
@@ -78,14 +91,44 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
   // If *error is not GRPC_ERROR_NONE after construction, then there was
   // an error initializing the client.
+  // TODO(roth): Remove the server_name parameter as part of sharing the
+  // XdsClient instance between channels.
   XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
-            grpc_pollset_set* interested_parties, absl::string_view server_name,
-            std::unique_ptr<ListenerWatcherInterface> watcher,
+            absl::string_view server_name,
             const grpc_channel_args& channel_args, grpc_error** error);
   ~XdsClient();
 
+  grpc_pollset_set* interested_parties() const { return interested_parties_; }
+
   void Orphan() override;
 
+  // Start and cancel listener data watch for a listener.
+  // The XdsClient takes ownership of the watcher, but the caller may
+  // keep a raw pointer to the watcher, which may be used only for
+  // cancellation.  (Because the caller does not own the watcher, the
+  // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
+  void WatchListenerData(absl::string_view listener_name,
+                         std::unique_ptr<ListenerWatcherInterface> watcher);
+  void CancelListenerDataWatch(absl::string_view listener_name,
+                               ListenerWatcherInterface* watcher,
+                               bool delay_unsubscription = false);
+
+  // Start and cancel route config data watch for a listener.
+  // The XdsClient takes ownership of the watcher, but the caller may
+  // keep a raw pointer to the watcher, which may be used only for
+  // cancellation.  (Because the caller does not own the watcher, the
+  // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
+  void WatchRouteConfigData(
+      absl::string_view route_config_name,
+      std::unique_ptr<RouteConfigWatcherInterface> watcher);
+  void CancelRouteConfigDataWatch(absl::string_view route_config_name,
+                                  RouteConfigWatcherInterface* watcher,
+                                  bool delay_unsubscription = false);
+
   // Start and cancel cluster data watch for a cluster.
   // The XdsClient takes ownership of the watcher, but the caller may
   // keep a raw pointer to the watcher, which may be used only for
@@ -200,6 +243,22 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
   };
 
+  struct ListenerState {
+    std::map<ListenerWatcherInterface*,
+             std::unique_ptr<ListenerWatcherInterface>>
+        watchers;
+    // The latest data seen from LDS.
+    absl::optional<XdsApi::LdsUpdate> update;
+  };
+
+  struct RouteConfigState {
+    std::map<RouteConfigWatcherInterface*,
+             std::unique_ptr<RouteConfigWatcherInterface>>
+        watchers;
+    // The latest data seen from RDS.
+    absl::optional<XdsApi::RdsUpdate> update;
+  };
+
   struct ClusterState {
     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
         watchers;
@@ -250,19 +309,26 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   std::unique_ptr<XdsBootstrap> bootstrap_;
   XdsApi api_;
 
+  // TODO(roth): In order to share the XdsClient instance between
+  // channels and servers, we will need to remove this field.  In order
+  // to do that, we'll need to figure out if we can stop sending the
+  // server name as part of the node metadata in the LRS request.
   const std::string server_name_;
-  std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
 
   // The channel for communicating with the xds server.
   OrphanablePtr<ChannelState> chand_;
 
-  absl::optional<XdsApi::LdsUpdate> lds_result_;
-  absl::optional<XdsApi::RdsUpdate> rds_result_;
-
+  // One entry for each watched LDS resource.
+  std::map<std::string /*listener_name*/, ListenerState> listener_map_;
+  // One entry for each watched RDS resource.
+  std::map<std::string /*route_config_name*/, RouteConfigState>
+      route_config_map_;
   // One entry for each watched CDS resource.
   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
   // One entry for each watched EDS resource.
   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
+
+  // Load report data.
   std::map<
       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
       LoadReportState>