Browse Source

Merge remote-tracking branch 'origin/master' into peers_method

Richard Belleville 5 years ago
parent
commit
651f850076
30 changed files with 582 additions and 208 deletions
  1. 1 2
      .github/ISSUE_TEMPLATE/bug_report.md
  2. 2 1
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 37 8
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  4. 24 33
      src/core/ext/filters/client_channel/xds/xds_api.cc
  5. 5 3
      src/core/ext/filters/client_channel/xds/xds_api.h
  6. 115 54
      src/core/ext/filters/client_channel/xds/xds_client.cc
  7. 14 7
      src/core/ext/filters/client_channel/xds/xds_client.h
  8. 1 0
      src/core/tsi/transport_security_interface.h
  9. 1 0
      src/objective-c/tests/UnitTests/APIv2Tests.m
  10. 0 8
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  11. 0 21
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  12. 1 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
  13. 1 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  14. 30 0
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  15. 96 0
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  16. 75 24
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  17. 1 0
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  18. 4 0
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  19. 13 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  20. 1 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  21. 2 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  22. 1 0
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  23. 2 0
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  24. 1 1
      src/python/grpcio/grpc/experimental/aio/_server.py
  25. 5 9
      src/python/grpcio_tests/tests_aio/unit/server_test.py
  26. 135 25
      test/cpp/end2end/xds_end2end_test.cc
  27. 2 0
      tools/bazel.rc
  28. 1 0
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh
  29. 3 2
      tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
  30. 8 4
      tools/run_tests/run_xds_tests.py

+ 1 - 2
.github/ISSUE_TEMPLATE/bug_report.md

@@ -7,7 +7,6 @@ assignees: karthikravis
 ---
 ---
 
 
 <!--
 <!--
-
 This form is for bug reports and feature requests ONLY!
 This form is for bug reports and feature requests ONLY!
 For general questions and troubleshooting, please ask/look for answers here:
 For general questions and troubleshooting, please ask/look for answers here:
 - grpc.io mailing list: https://groups.google.com/forum/#!forum/grpc-io
 - grpc.io mailing list: https://groups.google.com/forum/#!forum/grpc-io
@@ -26,7 +25,7 @@ Issues specific to *grpc-java*, *grpc-go*, *grpc-node*, *grpc-dart*, *grpc-web*
 
 
 
 
 ### What did you do?
 ### What did you do?
-If possible, provide a recipe for reproducing the error. Try being specific and include code snippets if helpful.
+Please provide either 1) A unit test for reproducing the bug or 2) Specific steps for us to follow to reproduce the bug. If there’s not enough information to debug the problem, gRPC team may close the issue at their discretion. You’re welcome to re-open the issue once you have a reproduction.
 
 
 ### What did you expect to see?
 ### What did you expect to see?
 
 

+ 2 - 1
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc

@@ -295,7 +295,8 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
                 old_config->cluster().c_str());
                 old_config->cluster().c_str());
       }
       }
       xds_client_->CancelClusterDataWatch(
       xds_client_->CancelClusterDataWatch(
-          StringView(old_config->cluster().c_str()), cluster_watcher_);
+          StringView(old_config->cluster().c_str()), cluster_watcher_,
+          /*delay_unsubscription=*/true);
     }
     }
     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
       gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
       gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,

+ 37 - 8
src/core/ext/filters/client_channel/lb_policy/xds/xds.cc

@@ -298,7 +298,7 @@ class XdsLb : public LoadBalancingPolicy {
     ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
     ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
 
 
     void UpdateLocked(
     void UpdateLocked(
-        const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+        const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
         bool update_locality_stats);
         bool update_locality_stats);
     void ResetBackoffLocked();
     void ResetBackoffLocked();
     void UpdateXdsPickerLocked();
     void UpdateXdsPickerLocked();
@@ -792,7 +792,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
                 old_eds_service_name);
                 old_eds_service_name);
       }
       }
       xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name),
       xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name),
-                                            endpoint_watcher_);
+                                            endpoint_watcher_,
+                                            /*delay_unsubscription=*/true);
     }
     }
     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
       gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this,
       gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this,
@@ -1032,7 +1033,7 @@ XdsLb::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
 }
 }
 
 
 void XdsLb::LocalityMap::UpdateLocked(
 void XdsLb::LocalityMap::UpdateLocked(
-    const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+    const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
     bool update_locality_stats) {
     bool update_locality_stats) {
   if (xds_policy_->shutting_down_) return;
   if (xds_policy_->shutting_down_) return;
   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@@ -1042,11 +1043,11 @@ void XdsLb::LocalityMap::UpdateLocked(
   // Maybe reactivate the locality map in case all the active locality maps have
   // Maybe reactivate the locality map in case all the active locality maps have
   // failed.
   // failed.
   MaybeReactivateLocked();
   MaybeReactivateLocked();
-  // Remove (later) the localities not in locality_map_update.
+  // Remove (later) the localities not in priority_update.
   for (auto iter = localities_.begin(); iter != localities_.end();) {
   for (auto iter = localities_.begin(); iter != localities_.end();) {
     const auto& name = iter->first;
     const auto& name = iter->first;
     Locality* locality = iter->second.get();
     Locality* locality = iter->second.get();
-    if (locality_map_update.Contains(name)) {
+    if (priority_update.Contains(name)) {
       ++iter;
       ++iter;
       continue;
       continue;
     }
     }
@@ -1057,8 +1058,8 @@ void XdsLb::LocalityMap::UpdateLocked(
       ++iter;
       ++iter;
     }
     }
   }
   }
-  // Add or update the localities in locality_map_update.
-  for (const auto& p : locality_map_update.localities) {
+  // Add or update the localities in priority_update.
+  for (const auto& p : priority_update.localities) {
     const auto& name = p.first;
     const auto& name = p.first;
     const auto& locality_update = p.second;
     const auto& locality_update = p.second;
     OrphanablePtr<Locality>& locality = localities_[name];
     OrphanablePtr<Locality>& locality = localities_[name];
@@ -1078,6 +1079,32 @@ void XdsLb::LocalityMap::UpdateLocked(
     locality->UpdateLocked(locality_update.lb_weight,
     locality->UpdateLocked(locality_update.lb_weight,
                            locality_update.serverlist, update_locality_stats);
                            locality_update.serverlist, update_locality_stats);
   }
   }
+  // If this is the current priority and we removed all of the READY
+  // localities, go into state CONNECTING.
+  // TODO(roth): Ideally, we should model this as a graceful policy
+  // switch: we should keep using the old localities for a short period
+  // of time, long enough to give the new localities a chance to get
+  // connected.  As part of refactoring this policy, we should try to
+  // fix that.
+  if (priority_ == xds_policy()->current_priority_) {
+    bool found_ready = false;
+    for (auto& p : localities_) {
+      const auto& locality_name = p.first;
+      Locality* locality = p.second.get();
+      if (!locality_map_update()->Contains(locality_name)) continue;
+      if (locality->connectivity_state() == GRPC_CHANNEL_READY) {
+        found_ready = true;
+        break;
+      }
+    }
+    if (!found_ready) {
+      xds_policy_->channel_control_helper()->UpdateState(
+          GRPC_CHANNEL_CONNECTING,
+          absl::make_unique<QueuePicker>(
+              xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")));
+      xds_policy_->current_priority_ = UINT32_MAX;
+    }
+  }
 }
 }
 
 
 void XdsLb::LocalityMap::ResetBackoffLocked() {
 void XdsLb::LocalityMap::ResetBackoffLocked() {
@@ -1098,7 +1125,9 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
     const auto& locality_name = p.first;
     const auto& locality_name = p.first;
     Locality* locality = p.second.get();
     Locality* locality = p.second.get();
     // Skip the localities that are not in the latest locality map update.
     // Skip the localities that are not in the latest locality map update.
-    if (!locality_map_update()->Contains(locality_name)) continue;
+    const auto* locality_update = locality_map_update();
+    if (locality_update == nullptr) continue;
+    if (!locality_update->Contains(locality_name)) continue;
     if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue;
     if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue;
     end += locality->weight();
     end += locality->weight();
     picker_list.push_back(
     picker_list.push_back(

+ 24 - 33
src/core/ext/filters/client_channel/xds/xds_api.cc

@@ -1045,15 +1045,12 @@ grpc_error* RouteConfigParse(
 grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
 grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
                              const envoy_api_v2_DiscoveryResponse* response,
                              const envoy_api_v2_DiscoveryResponse* response,
                              const std::string& expected_server_name,
                              const std::string& expected_server_name,
-                             XdsApi::LdsUpdate* lds_update, upb_arena* arena) {
+                             absl::optional<XdsApi::LdsUpdate>* lds_update,
+                             upb_arena* arena) {
   // Get the resources from the response.
   // Get the resources from the response.
   size_t size;
   size_t size;
   const google_protobuf_Any* const* resources =
   const google_protobuf_Any* const* resources =
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
-  if (size < 1) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "LDS response contains 0 resource.");
-  }
   for (size_t i = 0; i < size; ++i) {
   for (size_t i = 0; i < size; ++i) {
     // Check the type_url of the resource.
     // Check the type_url of the resource.
     const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
     const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
@@ -1096,11 +1093,8 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
       grpc_error* error = RouteConfigParse(client, tracer, route_config,
       grpc_error* error = RouteConfigParse(client, tracer, route_config,
                                            expected_server_name, &rds_update);
                                            expected_server_name, &rds_update);
       if (error != GRPC_ERROR_NONE) return error;
       if (error != GRPC_ERROR_NONE) return error;
-      lds_update->rds_update.emplace(std::move(rds_update));
-      const upb_strview route_config_name =
-          envoy_api_v2_RouteConfiguration_name(route_config);
-      lds_update->route_config_name =
-          std::string(route_config_name.data, route_config_name.size);
+      lds_update->emplace();
+      (*lds_update)->rds_update.emplace(std::move(rds_update));
       return GRPC_ERROR_NONE;
       return GRPC_ERROR_NONE;
     }
     }
     // Validate that RDS must be used to get the route_config dynamically.
     // Validate that RDS must be used to get the route_config dynamically.
@@ -1116,27 +1110,24 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
     const upb_strview route_config_name =
     const upb_strview route_config_name =
         envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
         envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
             rds);
             rds);
-    lds_update->route_config_name =
+    lds_update->emplace();
+    (*lds_update)->route_config_name =
         std::string(route_config_name.data, route_config_name.size);
         std::string(route_config_name.data, route_config_name.size);
     return GRPC_ERROR_NONE;
     return GRPC_ERROR_NONE;
   }
   }
-  return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-      "No listener found for expected server name.");
+  return GRPC_ERROR_NONE;
 }
 }
 
 
 grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
 grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
                              const envoy_api_v2_DiscoveryResponse* response,
                              const envoy_api_v2_DiscoveryResponse* response,
                              const std::string& expected_server_name,
                              const std::string& expected_server_name,
                              const std::string& expected_route_config_name,
                              const std::string& expected_route_config_name,
-                             XdsApi::RdsUpdate* rds_update, upb_arena* arena) {
+                             absl::optional<XdsApi::RdsUpdate>* rds_update,
+                             upb_arena* arena) {
   // Get the resources from the response.
   // Get the resources from the response.
   size_t size;
   size_t size;
   const google_protobuf_Any* const* resources =
   const google_protobuf_Any* const* resources =
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
-  if (size < 1) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "RDS response contains 0 resource.");
-  }
   for (size_t i = 0; i < size; ++i) {
   for (size_t i = 0; i < size; ++i) {
     // Check the type_url of the resource.
     // Check the type_url of the resource.
     const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
     const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
@@ -1162,25 +1153,21 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
     grpc_error* error = RouteConfigParse(
     grpc_error* error = RouteConfigParse(
         client, tracer, route_config, expected_server_name, &local_rds_update);
         client, tracer, route_config, expected_server_name, &local_rds_update);
     if (error != GRPC_ERROR_NONE) return error;
     if (error != GRPC_ERROR_NONE) return error;
-    *rds_update = std::move(local_rds_update);
+    rds_update->emplace(std::move(local_rds_update));
     return GRPC_ERROR_NONE;
     return GRPC_ERROR_NONE;
   }
   }
-  return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-      "No route config found for expected name.");
+  return GRPC_ERROR_NONE;
 }
 }
 
 
 grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer,
 grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer,
                              const envoy_api_v2_DiscoveryResponse* response,
                              const envoy_api_v2_DiscoveryResponse* response,
+                             const std::set<StringView>& expected_cluster_names,
                              XdsApi::CdsUpdateMap* cds_update_map,
                              XdsApi::CdsUpdateMap* cds_update_map,
                              upb_arena* arena) {
                              upb_arena* arena) {
   // Get the resources from the response.
   // Get the resources from the response.
   size_t size;
   size_t size;
   const google_protobuf_Any* const* resources =
   const google_protobuf_Any* const* resources =
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
-  if (size < 1) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "CDS response contains 0 resource.");
-  }
   // Parse all the resources in the CDS response.
   // Parse all the resources in the CDS response.
   for (size_t i = 0; i < size; ++i) {
   for (size_t i = 0; i < size; ++i) {
     XdsApi::CdsUpdate cds_update;
     XdsApi::CdsUpdate cds_update;
@@ -1197,6 +1184,13 @@ grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer,
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster.");
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster.");
     }
     }
     MaybeLogCluster(client, tracer, cluster);
     MaybeLogCluster(client, tracer, cluster);
+    // Ignore unexpected cluster names.
+    upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster);
+    StringView cluster_name_strview(cluster_name.data, cluster_name.size);
+    if (expected_cluster_names.find(cluster_name_strview) ==
+        expected_cluster_names.end()) {
+      continue;
+    }
     // Check the cluster_discovery_type.
     // Check the cluster_discovery_type.
     if (!envoy_api_v2_Cluster_has_type(cluster)) {
     if (!envoy_api_v2_Cluster_has_type(cluster)) {
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
       return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
@@ -1235,7 +1229,6 @@ grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer,
       }
       }
       cds_update.lrs_load_reporting_server_name.emplace("");
       cds_update.lrs_load_reporting_server_name.emplace("");
     }
     }
-    upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster);
     cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
     cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
                             std::move(cds_update));
                             std::move(cds_update));
   }
   }
@@ -1363,10 +1356,6 @@ grpc_error* EdsResponsedParse(
   size_t size;
   size_t size;
   const google_protobuf_Any* const* resources =
   const google_protobuf_Any* const* resources =
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
       envoy_api_v2_DiscoveryResponse_resources(response, &size);
-  if (size < 1) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "EDS response contains 0 resource.");
-  }
   for (size_t i = 0; i < size; ++i) {
   for (size_t i = 0; i < size; ++i) {
     XdsApi::EdsUpdate eds_update;
     XdsApi::EdsUpdate eds_update;
     // Check the type_url of the resource.
     // Check the type_url of the resource.
@@ -1442,8 +1431,10 @@ grpc_error* EdsResponsedParse(
 grpc_error* XdsApi::ParseAdsResponse(
 grpc_error* XdsApi::ParseAdsResponse(
     const grpc_slice& encoded_response, const std::string& expected_server_name,
     const grpc_slice& encoded_response, const std::string& expected_server_name,
     const std::string& expected_route_config_name,
     const std::string& expected_route_config_name,
+    const std::set<StringView>& expected_cluster_names,
     const std::set<StringView>& expected_eds_service_names,
     const std::set<StringView>& expected_eds_service_names,
-    LdsUpdate* lds_update, RdsUpdate* rds_update, CdsUpdateMap* cds_update_map,
+    absl::optional<LdsUpdate>* lds_update,
+    absl::optional<RdsUpdate>* rds_update, CdsUpdateMap* cds_update_map,
     EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce,
     EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce,
     std::string* type_url) {
     std::string* type_url) {
   upb::Arena arena;
   upb::Arena arena;
@@ -1477,8 +1468,8 @@ grpc_error* XdsApi::ParseAdsResponse(
                             expected_route_config_name, rds_update,
                             expected_route_config_name, rds_update,
                             arena.ptr());
                             arena.ptr());
   } else if (*type_url == kCdsTypeUrl) {
   } else if (*type_url == kCdsTypeUrl) {
-    return CdsResponseParse(client_, tracer_, response, cds_update_map,
-                            arena.ptr());
+    return CdsResponseParse(client_, tracer_, response, expected_cluster_names,
+                            cds_update_map, arena.ptr());
   } else if (*type_url == kEdsTypeUrl) {
   } else if (*type_url == kEdsTypeUrl) {
     return EdsResponsedParse(client_, tracer_, response,
     return EdsResponsedParse(client_, tracer_, response,
                              expected_eds_service_names, eds_update_map,
                              expected_eds_service_names, eds_update_map,

+ 5 - 3
src/core/ext/filters/client_channel/xds/xds_api.h

@@ -232,10 +232,12 @@ class XdsApi {
       const grpc_slice& encoded_response,
       const grpc_slice& encoded_response,
       const std::string& expected_server_name,
       const std::string& expected_server_name,
       const std::string& expected_route_config_name,
       const std::string& expected_route_config_name,
+      const std::set<StringView>& expected_cluster_names,
       const std::set<StringView>& expected_eds_service_names,
       const std::set<StringView>& expected_eds_service_names,
-      LdsUpdate* lds_update, RdsUpdate* rds_update,
-      CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map,
-      std::string* version, std::string* nonce, std::string* type_url);
+      absl::optional<LdsUpdate>* lds_update,
+      absl::optional<RdsUpdate>* rds_update, CdsUpdateMap* cds_update_map,
+      EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce,
+      std::string* type_url);
 
 
   // Creates an LRS request querying \a server_name.
   // Creates an LRS request querying \a server_name.
   grpc_slice CreateLrsInitialRequest(const std::string& server_name);
   grpc_slice CreateLrsInitialRequest(const std::string& server_name);

+ 115 - 54
src/core/ext/filters/client_channel/xds/xds_client.cc

@@ -128,7 +128,8 @@ class XdsClient::ChannelState::AdsCallState
   bool seen_response() const { return seen_response_; }
   bool seen_response() const { return seen_response_; }
 
 
   void Subscribe(const std::string& type_url, const std::string& name);
   void Subscribe(const std::string& type_url, const std::string& name);
-  void Unsubscribe(const std::string& type_url, const std::string& name);
+  void Unsubscribe(const std::string& type_url, const std::string& name,
+                   bool delay_unsubscription);
 
 
   bool HasSubscribedResources() const;
   bool HasSubscribedResources() const;
 
 
@@ -240,8 +241,8 @@ class XdsClient::ChannelState::AdsCallState
 
 
   void SendMessageLocked(const std::string& type_url);
   void SendMessageLocked(const std::string& type_url);
 
 
-  void AcceptLdsUpdate(XdsApi::LdsUpdate lds_update);
-  void AcceptRdsUpdate(XdsApi::RdsUpdate rds_update);
+  void AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update);
+  void AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update);
   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
 
 
@@ -557,9 +558,10 @@ void XdsClient::ChannelState::Subscribe(const std::string& type_url,
 }
 }
 
 
 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
-                                          const std::string& name) {
+                                          const std::string& name,
+                                          bool delay_unsubscription) {
   if (ads_calld_ != nullptr) {
   if (ads_calld_ != nullptr) {
-    ads_calld_->calld()->Unsubscribe(type_url, name);
+    ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription);
     if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
     if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
   }
   }
 }
 }
@@ -862,9 +864,10 @@ void XdsClient::ChannelState::AdsCallState::Subscribe(
 }
 }
 
 
 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
-    const std::string& type_url, const std::string& name) {
+    const std::string& type_url, const std::string& name,
+    bool delay_unsubscription) {
   state_map_[type_url].subscribed_resources.erase(name);
   state_map_[type_url].subscribed_resources.erase(name);
-  SendMessageLocked(type_url);
+  if (!delay_unsubscription) SendMessageLocked(type_url);
 }
 }
 
 
 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
@@ -875,24 +878,32 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
 }
 }
 
 
 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
-    XdsApi::LdsUpdate lds_update) {
+    absl::optional<XdsApi::LdsUpdate> lds_update) {
+  if (!lds_update.has_value()) {
+    gpr_log(GPR_INFO,
+            "[xds_client %p] LDS update does not include requested resource",
+            xds_client());
+    xds_client()->service_config_watcher_->OnError(
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "LDS update does not include requested resource"));
+    return;
+  }
   const std::string& cluster_name =
   const std::string& cluster_name =
-      lds_update.rds_update.has_value()
-          ? lds_update.rds_update.value().cluster_name
+      lds_update->rds_update.has_value()
+          ? lds_update->rds_update.value().cluster_name
           : "";
           : "";
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     gpr_log(GPR_INFO,
     gpr_log(GPR_INFO,
-            "[xds_client %p] LDS update received: "
-            "route_config_name=%s, "
+            "[xds_client %p] LDS update received: route_config_name=%s, "
             "cluster_name=%s (empty if RDS is needed to obtain it)",
             "cluster_name=%s (empty if RDS is needed to obtain it)",
-            xds_client(), lds_update.route_config_name.c_str(),
+            xds_client(), lds_update->route_config_name.c_str(),
             cluster_name.c_str());
             cluster_name.c_str());
   }
   }
   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
   auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
   auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
   if (state != nullptr) state->Finish();
   if (state != nullptr) state->Finish();
   // Ignore identical update.
   // Ignore identical update.
-  if (xds_client()->route_config_name_ == lds_update.route_config_name &&
+  if (xds_client()->route_config_name_ == lds_update->route_config_name &&
       xds_client()->cluster_name_ == cluster_name) {
       xds_client()->cluster_name_ == cluster_name) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO,
       gpr_log(GPR_INFO,
@@ -901,12 +912,17 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
     }
     }
     return;
     return;
   }
   }
-  xds_client()->route_config_name_ = std::move(lds_update.route_config_name);
-  if (lds_update.rds_update.has_value()) {
+  if (!xds_client()->route_config_name_.empty()) {
+    Unsubscribe(
+        XdsApi::kRdsTypeUrl, xds_client()->route_config_name_,
+        /*delay_unsubscription=*/!lds_update->route_config_name.empty());
+  }
+  xds_client()->route_config_name_ = std::move(lds_update->route_config_name);
+  if (lds_update->rds_update.has_value()) {
     // If cluster_name was found inlined in LDS response, notify the watcher
     // If cluster_name was found inlined in LDS response, notify the watcher
     // immediately.
     // immediately.
     xds_client()->cluster_name_ =
     xds_client()->cluster_name_ =
-        std::move(lds_update.rds_update.value().cluster_name);
+        std::move(lds_update->rds_update.value().cluster_name);
     RefCountedPtr<ServiceConfig> service_config;
     RefCountedPtr<ServiceConfig> service_config;
     grpc_error* error = xds_client()->CreateServiceConfig(
     grpc_error* error = xds_client()->CreateServiceConfig(
         xds_client()->cluster_name_, &service_config);
         xds_client()->cluster_name_, &service_config);
@@ -923,19 +939,26 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
 }
 }
 
 
 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
-    XdsApi::RdsUpdate rds_update) {
-  if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+    absl::optional<XdsApi::RdsUpdate> rds_update) {
+  if (!rds_update.has_value()) {
     gpr_log(GPR_INFO,
     gpr_log(GPR_INFO,
-            "[xds_client %p] RDS update received: "
-            "cluster_name=%s",
-            xds_client(), rds_update.cluster_name.c_str());
+            "[xds_client %p] RDS update does not include requested resource",
+            xds_client());
+    xds_client()->service_config_watcher_->OnError(
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "RDS update does not include requested resource"));
+    return;
+  }
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+    gpr_log(GPR_INFO, "[xds_client %p] RDS update received: cluster_name=%s",
+            xds_client(), rds_update->cluster_name.c_str());
   }
   }
   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
   auto& state =
   auto& state =
       rds_state.subscribed_resources[xds_client()->route_config_name_];
       rds_state.subscribed_resources[xds_client()->route_config_name_];
   if (state != nullptr) state->Finish();
   if (state != nullptr) state->Finish();
   // Ignore identical update.
   // Ignore identical update.
-  if (xds_client()->cluster_name_ == rds_update.cluster_name) {
+  if (xds_client()->cluster_name_ == rds_update->cluster_name) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO,
       gpr_log(GPR_INFO,
               "[xds_client %p] RDS update identical to current, ignoring.",
               "[xds_client %p] RDS update identical to current, ignoring.",
@@ -943,7 +966,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
     }
     }
     return;
     return;
   }
   }
-  xds_client()->cluster_name_ = std::move(rds_update.cluster_name);
+  xds_client()->cluster_name_ = std::move(rds_update->cluster_name);
   // Notify the watcher.
   // Notify the watcher.
   RefCountedPtr<ServiceConfig> service_config;
   RefCountedPtr<ServiceConfig> service_config;
   grpc_error* error = xds_client()->CreateServiceConfig(
   grpc_error* error = xds_client()->CreateServiceConfig(
@@ -959,6 +982,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
     XdsApi::CdsUpdateMap cds_update_map) {
     XdsApi::CdsUpdateMap cds_update_map) {
   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
+  std::set<std::string> eds_resource_names_seen;
   for (auto& p : cds_update_map) {
   for (auto& p : cds_update_map) {
     const char* cluster_name = p.first.c_str();
     const char* cluster_name = p.first.c_str();
     XdsApi::CdsUpdate& cds_update = p.second;
     XdsApi::CdsUpdate& cds_update = p.second;
@@ -967,21 +991,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO,
       gpr_log(GPR_INFO,
               "[xds_client %p] CDS update (cluster=%s) received: "
               "[xds_client %p] CDS update (cluster=%s) received: "
-              "eds_service_name=%s, "
-              "lrs_load_reporting_server_name=%s",
+              "eds_service_name=%s, lrs_load_reporting_server_name=%s",
               xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
               xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
               cds_update.lrs_load_reporting_server_name.has_value()
               cds_update.lrs_load_reporting_server_name.has_value()
                   ? cds_update.lrs_load_reporting_server_name.value().c_str()
                   ? cds_update.lrs_load_reporting_server_name.value().c_str()
                   : "(N/A)");
                   : "(N/A)");
     }
     }
-    ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
+    // Record the EDS resource names seen.
+    eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
+                                       ? cluster_name
+                                       : cds_update.eds_service_name);
     // Ignore identical update.
     // Ignore identical update.
+    ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
     if (cluster_state.update.has_value() &&
     if (cluster_state.update.has_value() &&
-        cds_update.eds_service_name ==
-            cluster_state.update.value().eds_service_name &&
-        cds_update.lrs_load_reporting_server_name.value() ==
-            cluster_state.update.value()
-                .lrs_load_reporting_server_name.value()) {
+        cds_update.eds_service_name == cluster_state.update->eds_service_name &&
+        cds_update.lrs_load_reporting_server_name ==
+            cluster_state.update->lrs_load_reporting_server_name) {
       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
         gpr_log(GPR_INFO,
         gpr_log(GPR_INFO,
                 "[xds_client %p] CDS update identical to current, ignoring.",
                 "[xds_client %p] CDS update identical to current, ignoring.",
@@ -990,12 +1015,41 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
       continue;
       continue;
     }
     }
     // Update the cluster state.
     // Update the cluster state.
-    cluster_state.update.emplace(std::move(cds_update));
+    cluster_state.update = std::move(cds_update);
     // Notify all watchers.
     // Notify all watchers.
     for (const auto& p : cluster_state.watchers) {
     for (const auto& p : cluster_state.watchers) {
       p.first->OnClusterChanged(cluster_state.update.value());
       p.first->OnClusterChanged(cluster_state.update.value());
     }
     }
   }
   }
+  // For any subscribed resource that is not present in the update,
+  // remove it from the cache and notify watchers of the error.
+  for (const auto& p : cds_state.subscribed_resources) {
+    const std::string& cluster_name = p.first;
+    if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
+      ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
+      cluster_state.update.reset();
+      for (const auto& p : cluster_state.watchers) {
+        p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "Cluster not present in CDS update"));
+      }
+    }
+  }
+  // Also remove any EDS resources that are no longer referred to by any CDS
+  // resources.
+  auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
+  for (const auto& p : eds_state.subscribed_resources) {
+    const std::string& eds_resource_name = p.first;
+    if (eds_resource_names_seen.find(eds_resource_name) ==
+        eds_resource_names_seen.end()) {
+      EndpointState& endpoint_state =
+          xds_client()->endpoint_map_[eds_resource_name];
+      endpoint_state.update.reset();
+      for (const auto& p : endpoint_state.watchers) {
+        p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "ClusterLoadAssignment resource removed due to CDS update"));
+      }
+    }
+  }
 }
 }
 
 
 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
@@ -1058,25 +1112,27 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
     EndpointState& endpoint_state =
     EndpointState& endpoint_state =
         xds_client()->endpoint_map_[eds_service_name];
         xds_client()->endpoint_map_[eds_service_name];
     // Ignore identical update.
     // Ignore identical update.
-    const XdsApi::EdsUpdate& prev_update = endpoint_state.update;
-    const bool priority_list_changed =
-        prev_update.priority_list_update != eds_update.priority_list_update;
-    const bool drop_config_changed =
-        prev_update.drop_config == nullptr ||
-        *prev_update.drop_config != *eds_update.drop_config;
-    if (!priority_list_changed && !drop_config_changed) {
-      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-        gpr_log(GPR_INFO,
-                "[xds_client %p] EDS update identical to current, ignoring.",
-                xds_client());
+    if (endpoint_state.update.has_value()) {
+      const XdsApi::EdsUpdate& prev_update = endpoint_state.update.value();
+      const bool priority_list_changed =
+          prev_update.priority_list_update != eds_update.priority_list_update;
+      const bool drop_config_changed =
+          prev_update.drop_config == nullptr ||
+          *prev_update.drop_config != *eds_update.drop_config;
+      if (!priority_list_changed && !drop_config_changed) {
+        if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+          gpr_log(GPR_INFO,
+                  "[xds_client %p] EDS update identical to current, ignoring.",
+                  xds_client());
+        }
+        continue;
       }
       }
-      continue;
     }
     }
     // Update the cluster state.
     // Update the cluster state.
     endpoint_state.update = std::move(eds_update);
     endpoint_state.update = std::move(eds_update);
     // Notify all watchers.
     // Notify all watchers.
     for (const auto& p : endpoint_state.watchers) {
     for (const auto& p : endpoint_state.watchers) {
-      p.first->OnEndpointChanged(endpoint_state.update);
+      p.first->OnEndpointChanged(endpoint_state.update.value());
     }
     }
   }
   }
 }
 }
@@ -1150,8 +1206,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
   // mode. We will also need to cancel the timer when we receive a serverlist
   // mode. We will also need to cancel the timer when we receive a serverlist
   // from the balancer.
   // from the balancer.
   // Parse the response.
   // Parse the response.
-  XdsApi::LdsUpdate lds_update;
-  XdsApi::RdsUpdate rds_update;
+  absl::optional<XdsApi::LdsUpdate> lds_update;
+  absl::optional<XdsApi::RdsUpdate> rds_update;
   XdsApi::CdsUpdateMap cds_update_map;
   XdsApi::CdsUpdateMap cds_update_map;
   XdsApi::EdsUpdateMap eds_update_map;
   XdsApi::EdsUpdateMap eds_update_map;
   std::string version;
   std::string version;
@@ -1160,6 +1216,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
   // Note that ParseAdsResponse() also validates the response.
   // Note that ParseAdsResponse() also validates the response.
   grpc_error* parse_error = xds_client->api_.ParseAdsResponse(
   grpc_error* parse_error = xds_client->api_.ParseAdsResponse(
       response_slice, xds_client->server_name_, xds_client->route_config_name_,
       response_slice, xds_client->server_name_, xds_client->route_config_name_,
+      ads_calld->ClusterNamesForRequest(),
       ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
       ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
       &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
       &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
   grpc_slice_unref_internal(response_slice);
   grpc_slice_unref_internal(response_slice);
@@ -1822,7 +1879,8 @@ void XdsClient::WatchClusterData(
 }
 }
 
 
 void XdsClient::CancelClusterDataWatch(StringView cluster_name,
 void XdsClient::CancelClusterDataWatch(StringView cluster_name,
-                                       ClusterWatcherInterface* watcher) {
+                                       ClusterWatcherInterface* watcher,
+                                       bool delay_unsubscription) {
   if (shutting_down_) return;
   if (shutting_down_) return;
   std::string cluster_name_str = std::string(cluster_name);
   std::string cluster_name_str = std::string(cluster_name);
   ClusterState& cluster_state = cluster_map_[cluster_name_str];
   ClusterState& cluster_state = cluster_map_[cluster_name_str];
@@ -1831,7 +1889,8 @@ void XdsClient::CancelClusterDataWatch(StringView cluster_name,
     cluster_state.watchers.erase(it);
     cluster_state.watchers.erase(it);
     if (cluster_state.watchers.empty()) {
     if (cluster_state.watchers.empty()) {
       cluster_map_.erase(cluster_name_str);
       cluster_map_.erase(cluster_name_str);
-      chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
+      chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
+                          delay_unsubscription);
     }
     }
   }
   }
 }
 }
@@ -1845,18 +1904,19 @@ void XdsClient::WatchEndpointData(
   endpoint_state.watchers[w] = std::move(watcher);
   endpoint_state.watchers[w] = std::move(watcher);
   // If we've already received an EDS update, notify the new watcher
   // If we've already received an EDS update, notify the new watcher
   // immediately.
   // immediately.
-  if (!endpoint_state.update.priority_list_update.empty()) {
+  if (endpoint_state.update.has_value()) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
               this, StringViewToCString(eds_service_name).get());
               this, StringViewToCString(eds_service_name).get());
     }
     }
-    w->OnEndpointChanged(endpoint_state.update);
+    w->OnEndpointChanged(endpoint_state.update.value());
   }
   }
   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
 }
 }
 
 
 void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
 void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
-                                        EndpointWatcherInterface* watcher) {
+                                        EndpointWatcherInterface* watcher,
+                                        bool delay_unsubscription) {
   if (shutting_down_) return;
   if (shutting_down_) return;
   std::string eds_service_name_str = std::string(eds_service_name);
   std::string eds_service_name_str = std::string(eds_service_name);
   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
@@ -1865,7 +1925,8 @@ void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
     endpoint_state.watchers.erase(it);
     endpoint_state.watchers.erase(it);
     if (endpoint_state.watchers.empty()) {
     if (endpoint_state.watchers.empty()) {
       endpoint_map_.erase(eds_service_name_str);
       endpoint_map_.erase(eds_service_name_str);
-      chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
+      chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
+                          delay_unsubscription);
     }
     }
   }
   }
 }
 }

+ 14 - 7
src/core/ext/filters/client_channel/xds/xds_client.h

@@ -86,20 +86,26 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
   // keep a raw pointer to the watcher, which may be used only for
   // keep a raw pointer to the watcher, which may be used only for
   // cancellation.  (Because the caller does not own the watcher, the
   // cancellation.  (Because the caller does not own the watcher, the
   // pointer must not be used for any other purpose.)
   // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
   void WatchClusterData(StringView cluster_name,
   void WatchClusterData(StringView cluster_name,
                         std::unique_ptr<ClusterWatcherInterface> watcher);
                         std::unique_ptr<ClusterWatcherInterface> watcher);
   void CancelClusterDataWatch(StringView cluster_name,
   void CancelClusterDataWatch(StringView cluster_name,
-                              ClusterWatcherInterface* watcher);
+                              ClusterWatcherInterface* watcher,
+                              bool delay_unsubscription = false);
 
 
   // Start and cancel endpoint data watch for a cluster.
   // Start and cancel endpoint data watch for a cluster.
   // The XdsClient takes ownership of the watcher, but the caller may
   // The XdsClient takes ownership of the watcher, but the caller may
   // keep a raw pointer to the watcher, which may be used only for
   // keep a raw pointer to the watcher, which may be used only for
   // cancellation.  (Because the caller does not own the watcher, the
   // cancellation.  (Because the caller does not own the watcher, the
   // pointer must not be used for any other purpose.)
   // pointer must not be used for any other purpose.)
+  // If the caller is going to start a new watch after cancelling the
+  // old one, it should set delay_unsubscription to true.
   void WatchEndpointData(StringView eds_service_name,
   void WatchEndpointData(StringView eds_service_name,
                          std::unique_ptr<EndpointWatcherInterface> watcher);
                          std::unique_ptr<EndpointWatcherInterface> watcher);
   void CancelEndpointDataWatch(StringView eds_service_name,
   void CancelEndpointDataWatch(StringView eds_service_name,
-                               EndpointWatcherInterface* watcher);
+                               EndpointWatcherInterface* watcher,
+                               bool delay_unsubscription = false);
 
 
   // Adds and removes drop stats for cluster_name and eds_service_name.
   // Adds and removes drop stats for cluster_name and eds_service_name.
   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
@@ -167,7 +173,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     void CancelConnectivityWatchLocked();
     void CancelConnectivityWatchLocked();
 
 
     void Subscribe(const std::string& type_url, const std::string& name);
     void Subscribe(const std::string& type_url, const std::string& name);
-    void Unsubscribe(const std::string& type_url, const std::string& name);
+    void Unsubscribe(const std::string& type_url, const std::string& name,
+                     bool delay_unsubscription);
 
 
    private:
    private:
     class StateWatcher;
     class StateWatcher;
@@ -189,7 +196,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
         watchers;
         watchers;
     // The latest data seen from CDS.
     // The latest data seen from CDS.
-    Optional<XdsApi::CdsUpdate> update;
+    absl::optional<XdsApi::CdsUpdate> update;
   };
   };
 
 
   struct EndpointState {
   struct EndpointState {
@@ -197,7 +204,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
              std::unique_ptr<EndpointWatcherInterface>>
              std::unique_ptr<EndpointWatcherInterface>>
         watchers;
         watchers;
     // The latest data seen from EDS.
     // The latest data seen from EDS.
-    XdsApi::EdsUpdate update;
+    absl::optional<XdsApi::EdsUpdate> update;
   };
   };
 
 
   struct LoadReportState {
   struct LoadReportState {
@@ -241,9 +248,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
 
   std::string route_config_name_;
   std::string route_config_name_;
   std::string cluster_name_;
   std::string cluster_name_;
-  // All the received clusters are cached, no matter they are watched or not.
+  // One entry for each watched CDS resource.
   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
-  // Only the watched EDS service names are stored.
+  // One entry for each watched EDS resource.
   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
   std::map<
   std::map<
       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,

+ 1 - 0
src/core/tsi/transport_security_interface.h

@@ -44,6 +44,7 @@ typedef enum {
   TSI_OUT_OF_RESOURCES = 12,
   TSI_OUT_OF_RESOURCES = 12,
   TSI_ASYNC = 13,
   TSI_ASYNC = 13,
   TSI_HANDSHAKE_SHUTDOWN = 14,
   TSI_HANDSHAKE_SHUTDOWN = 14,
+  TSI_CLOSE_NOTIFY = 15,  // Indicates that the connection should be closed.
 } tsi_result;
 } tsi_result;
 
 
 typedef enum {
 typedef enum {

+ 1 - 0
src/objective-c/tests/UnitTests/APIv2Tests.m

@@ -401,6 +401,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
 
 
   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
   options.timeout = 0.001;
   options.timeout = 0.001;
+  options.transportType = GRPCTransportTypeInsecure;
   GRPCRequestOptions *requestOptions =
   GRPCRequestOptions *requestOptions =
       [[GRPCRequestOptions alloc] initWithHost:kHostAddress
       [[GRPCRequestOptions alloc] initWithHost:kHostAddress
                                           path:kFullDuplexCallMethod.HTTPPath
                                           path:kFullDuplexCallMethod.HTTPPath

+ 0 - 8
src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi

@@ -52,13 +52,5 @@ cdef class CallbackWrapper:
     cdef grpc_experimental_completion_queue_functor *c_functor(self)
     cdef grpc_experimental_completion_queue_functor *c_functor(self)
 
 
 
 
-cdef class CallbackCompletionQueue:
-    cdef grpc_completion_queue *_cq
-    cdef object _shutdown_completed  # asyncio.Future
-    cdef CallbackWrapper _wrapper
-
-    cdef grpc_completion_queue* c_ptr(self)
-
-
 cdef class GrpcCallWrapper:
 cdef class GrpcCallWrapper:
     cdef grpc_call* call
     cdef grpc_call* call

+ 0 - 21
src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi

@@ -69,27 +69,6 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler
     InternalError)
     InternalError)
 
 
 
 
-cdef class CallbackCompletionQueue:
-
-    def __cinit__(self):
-        self._shutdown_completed = grpc_aio_loop().create_future()
-        self._wrapper = CallbackWrapper(
-            self._shutdown_completed,
-            CQ_SHUTDOWN_FAILURE_HANDLER)
-        self._cq = grpc_completion_queue_create_for_callback(
-            self._wrapper.c_functor(),
-            NULL
-        )
-
-    cdef grpc_completion_queue* c_ptr(self):
-        return self._cq
-
-    async def shutdown(self):
-        grpc_completion_queue_shutdown(self._cq)
-        await self._shutdown_completed
-        grpc_completion_queue_destroy(self._cq)
-
-
 class ExecuteBatchError(Exception): pass
 class ExecuteBatchError(Exception): pass
 
 
 
 

+ 1 - 1
src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi

@@ -21,7 +21,7 @@ cdef enum AioChannelStatus:
 cdef class AioChannel:
 cdef class AioChannel:
     cdef:
     cdef:
         grpc_channel * channel
         grpc_channel * channel
-        CallbackCompletionQueue cq
+        BaseCompletionQueue cq
         object loop
         object loop
         bytes _target
         bytes _target
         AioChannelStatus _status
         AioChannelStatus _status

+ 1 - 1
src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi

@@ -31,7 +31,7 @@ cdef class AioChannel:
             options = ()
             options = ()
         cdef _ChannelArgs channel_args = _ChannelArgs(options)
         cdef _ChannelArgs channel_args = _ChannelArgs(options)
         self._target = target
         self._target = target
-        self.cq = CallbackCompletionQueue()
+        self.cq = create_completion_queue()
         self.loop = loop
         self.loop = loop
         self._status = AIO_CHANNEL_STATUS_READY
         self._status = AIO_CHANNEL_STATUS_READY
 
 

+ 30 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi

@@ -0,0 +1,30 @@
+# Copyright 2020 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class BaseCompletionQueue:
+    cdef grpc_completion_queue *_cq
+
+    cdef grpc_completion_queue* c_ptr(self)
+
+cdef class PollerCompletionQueue(BaseCompletionQueue):
+    cdef bint _shutdown
+    cdef object _shutdown_completed
+    cdef object _poller_thread
+
+    cdef void _poll(self) except *
+
+
+cdef class CallbackCompletionQueue(BaseCompletionQueue):
+    cdef object _shutdown_completed  # asyncio.Future
+    cdef CallbackWrapper _wrapper

+ 96 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi

@@ -0,0 +1,96 @@
+# Copyright 2020 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
+
+
+def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
+    CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
+
+
+cdef class BaseCompletionQueue:
+
+    async def shutdown(self):
+        raise NotImplementedError()
+
+    cdef grpc_completion_queue* c_ptr(self):
+        return self._cq
+
+
+cdef class PollerCompletionQueue(BaseCompletionQueue):
+
+    def __cinit__(self):
+        self._cq = grpc_completion_queue_create_for_next(NULL)
+        self._shutdown = False
+        self._shutdown_completed = asyncio.get_event_loop().create_future()
+        self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
+        self._poller_thread.start()
+
+    cdef void _poll(self) except *:
+        cdef grpc_event event
+        cdef CallbackContext *context
+
+        while not self._shutdown:
+            with nogil:
+                event = grpc_completion_queue_next(self._cq,
+                                                _GPR_INF_FUTURE,
+                                                NULL)
+
+            if event.type == GRPC_QUEUE_TIMEOUT:
+                raise AssertionError("Core should not return timeout error!")
+            elif event.type == GRPC_QUEUE_SHUTDOWN:
+                self._shutdown = True
+                aio_loop_call_soon_threadsafe(self._shutdown_completed.set_result, None)
+            else:
+                context = <CallbackContext *>event.tag
+                aio_loop_call_soon_threadsafe(
+                    _handle_callback_wrapper,
+                    <CallbackWrapper>context.callback_wrapper,
+                    event.success)
+
+    def _poll_wrapper(self):
+        self._poll()
+
+    async def shutdown(self):
+        grpc_completion_queue_shutdown(self._cq)
+        await self._shutdown_completed
+        grpc_completion_queue_destroy(self._cq)
+        self._poller_thread.join()
+
+
+cdef class CallbackCompletionQueue(BaseCompletionQueue):
+
+    def __cinit__(self):
+        self._shutdown_completed = grpc_aio_loop().create_future()
+        self._wrapper = CallbackWrapper(
+            self._shutdown_completed,
+            CQ_SHUTDOWN_FAILURE_HANDLER)
+        self._cq = grpc_completion_queue_create_for_callback(
+            self._wrapper.c_functor(),
+            NULL
+        )
+
+    async def shutdown(self):
+        grpc_completion_queue_shutdown(self._cq)
+        await self._shutdown_completed
+        grpc_completion_queue_destroy(self._cq)
+
+
+cdef BaseCompletionQueue create_completion_queue():
+    if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
+        return CallbackCompletionQueue()
+    elif grpc_aio_engine is AsyncIOEngine.POLLER:
+        return PollerCompletionQueue()
+    else:
+        raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine)

+ 75 - 24
src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi

@@ -15,44 +15,95 @@
 
 
 cdef bint _grpc_aio_initialized = False
 cdef bint _grpc_aio_initialized = False
 # NOTE(lidiz) Theoretically, applications can run in multiple event loops as
 # NOTE(lidiz) Theoretically, applications can run in multiple event loops as
-# long as they are in the same thread with same magic. However, I don't think
-# we should support this use case. So, the gRPC Python Async Stack should use
-# a single event loop picked by "init_grpc_aio".
-cdef object _grpc_aio_loop
+# long as they are in the same thread with same magic. This is not a supported
+# use case. So, the gRPC Python Async Stack should use a single event loop
+# picked by "init_grpc_aio".
+cdef object _grpc_aio_loop  # asyncio.AbstractEventLoop
+cdef int64_t _event_loop_thread_ident
+cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
+grpc_aio_engine = None
+cdef object _grpc_initialization_lock = threading.Lock()
+
+
+class AsyncIOEngine(enum.Enum):
+    DEFAULT = 'default'
+    CUSTOM_IO_MANAGER = 'custom'
+    POLLER = 'poller'
 
 
 
 
 def init_grpc_aio():
 def init_grpc_aio():
     global _grpc_aio_initialized
     global _grpc_aio_initialized
     global _grpc_aio_loop
     global _grpc_aio_loop
+    global _event_loop_thread_ident
+    global grpc_aio_engine
 
 
-    if _grpc_aio_initialized:
-        return
-    else:
-        _grpc_aio_initialized = True
+    with _grpc_initialization_lock:
+        # Marks this function as called
+        if _grpc_aio_initialized:
+            return
+        else:
+            _grpc_aio_initialized = True
 
 
-    # Anchors the event loop that the gRPC library going to use.
-    _grpc_aio_loop = asyncio.get_event_loop()
+        # Picks the engine for gRPC AsyncIO Stack
+        for engine_type in AsyncIOEngine:
+            if engine_type.value == _GRPC_ASYNCIO_ENGINE:
+                grpc_aio_engine = engine_type
+                break
+        if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
+            grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
 
 
-    # Activates asyncio IO manager
-    install_asyncio_iomgr()
+        # Anchors the event loop that the gRPC library going to use.
+        _grpc_aio_loop = asyncio.get_event_loop()
+        _event_loop_thread_ident = threading.current_thread().ident
 
 
-    # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
-    # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
-    # library won't shutdown cleanly.
-    grpc_init()
+        if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
+            # Activates asyncio IO manager.
+            # NOTE(lidiz) Custom IO manager must be activated before the first
+            # `grpc_init()`. Otherwise, some special configurations in Core won't
+            # pick up the change, and resulted in SEGFAULT or ABORT.
+            install_asyncio_iomgr()
 
 
-    # Timers are triggered by the Asyncio loop. We disable
-    # the background thread that is being used by the native
-    # gRPC iomgr.
-    grpc_timer_manager_set_threading(False)
+            # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
+            # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
+            # library won't shutdown cleanly.
+            grpc_init()
 
 
-    # gRPC callbaks are executed within the same thread used by the Asyncio
-    # event loop, as it is being done by the other Asyncio callbacks.
-    Executor.SetThreadingAll(False)
+            # Timers are triggered by the Asyncio loop. We disable
+            # the background thread that is being used by the native
+            # gRPC iomgr.
+            grpc_timer_manager_set_threading(False)
 
 
-    _grpc_aio_initialized = False
+            # gRPC callbaks are executed within the same thread used by the Asyncio
+            # event loop, as it is being done by the other Asyncio callbacks.
+            Executor.SetThreadingAll(False)
+        else:
+            # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
+            # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
+            # library won't shutdown cleanly.
+            grpc_init()
 
 
 
 
 def grpc_aio_loop():
 def grpc_aio_loop():
     """Returns the one-and-only gRPC Aio event loop."""
     """Returns the one-and-only gRPC Aio event loop."""
     return _grpc_aio_loop
     return _grpc_aio_loop
+
+
+def aio_loop_schedule_coroutine(object coro):
+    """Thread-safely schedules coroutine to gRPC Aio event loop.
+
+    If invoked within the same thread as the event loop, return an
+    Asyncio.Task. Otherwise, return a concurrent.futures.Future (the sync
+    Future). For non-asyncio threads, sync Future objects are probably easier
+    to handle (without worrying other thread-safety stuff).
+    """
+    if _event_loop_thread_ident != threading.current_thread().ident:
+        return asyncio.run_coroutine_threadsafe(coro, _grpc_aio_loop)
+    else:
+        return _grpc_aio_loop.create_task(coro)
+
+
+def aio_loop_call_soon_threadsafe(object func, *args):
+    # TODO(lidiz) After we are confident, we can drop this assert. Otherwsie,
+    # we should limit this function to non-grpc-event-loop thread.
+    assert _event_loop_thread_ident != threading.current_thread().ident
+    return _grpc_aio_loop.call_soon_threadsafe(func, *args)

+ 1 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi

@@ -188,6 +188,7 @@ cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
 
 
 
 
 cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
 cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
+    # TODO(https://github.com/grpc/grpc/issues/22278) remove this if condition
     if grpc_timer.timer == NULL:
     if grpc_timer.timer == NULL:
         return
         return
     else:
     else:

+ 4 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi

@@ -24,10 +24,14 @@ cdef class _AsyncioSocket:
         object _task_read
         object _task_read
         object _task_write
         object _task_write
         object _task_connect
         object _task_connect
+        object _task_listen
         char * _read_buffer
         char * _read_buffer
         # Caches the picked event loop, so we can avoid the 30ns overhead each
         # Caches the picked event loop, so we can avoid the 30ns overhead each
         # time we need access to the event loop.
         # time we need access to the event loop.
         object _loop
         object _loop
+        # TODO(lidiz) Drop after 3.6 deprecation. Python 3.7 introduces methods
+        # like `is_closing()` to help graceful shutdown.
+        bint _closed
 
 
         # Client-side attributes
         # Client-side attributes
         grpc_custom_connect_callback _grpc_connect_cb
         grpc_custom_connect_callback _grpc_connect_cb

+ 13 - 1
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi

@@ -31,10 +31,12 @@ cdef class _AsyncioSocket:
         self._task_connect = None
         self._task_connect = None
         self._task_read = None
         self._task_read = None
         self._task_write = None
         self._task_write = None
+        self._task_listen = None
         self._read_buffer = NULL
         self._read_buffer = NULL
         self._server = None
         self._server = None
         self._py_socket = None
         self._py_socket = None
         self._peername = None
         self._peername = None
+        self._closed = False
 
 
     @staticmethod
     @staticmethod
     cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
     cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
@@ -159,8 +161,14 @@ cdef class _AsyncioSocket:
         return self._reader and not self._reader._transport.is_closing()
         return self._reader and not self._reader._transport.is_closing()
 
 
     cdef void close(self):
     cdef void close(self):
+        if self._closed:
+            return
+        else:
+            self._closed = True
         if self.is_connected():
         if self.is_connected():
             self._writer.close()
             self._writer.close()
+        if self._task_listen and not self._task_listen.done():
+            self._task_listen.close()
         if self._server:
         if self._server:
             self._server.close()
             self._server.close()
         # NOTE(lidiz) If the asyncio.Server is created from a Python socket,
         # NOTE(lidiz) If the asyncio.Server is created from a Python socket,
@@ -170,6 +178,10 @@ cdef class _AsyncioSocket:
             self._py_socket.close()
             self._py_socket.close()
 
 
     def _new_connection_callback(self, object reader, object writer):
     def _new_connection_callback(self, object reader, object writer):
+        # If the socket is closed, stop.
+        if self._closed:
+            return
+
         # Close the connection if server is not started yet.
         # Close the connection if server is not started yet.
         if self._grpc_accept_cb == NULL:
         if self._grpc_accept_cb == NULL:
             writer.close()
             writer.close()
@@ -197,7 +209,7 @@ cdef class _AsyncioSocket:
                 sock=self._py_socket,
                 sock=self._py_socket,
             )
             )
 
 
-        grpc_aio_loop().create_task(create_asyncio_server())
+        self._task_listen = grpc_aio_loop().create_task(create_asyncio_server())
 
 
     cdef accept(self,
     cdef accept(self,
                 grpc_custom_socket* grpc_socket_client,
                 grpc_custom_socket* grpc_socket_client,

+ 1 - 1
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi

@@ -51,7 +51,7 @@ cdef enum AioServerStatus:
 
 
 cdef class AioServer:
 cdef class AioServer:
     cdef Server _server
     cdef Server _server
-    cdef CallbackCompletionQueue _cq
+    cdef BaseCompletionQueue _cq
     cdef list _generic_handlers
     cdef list _generic_handlers
     cdef AioServerStatus _status
     cdef AioServerStatus _status
     cdef object _loop  # asyncio.EventLoop
     cdef object _loop  # asyncio.EventLoop

+ 2 - 2
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@@ -613,7 +613,7 @@ cdef class AioServer:
         # NOTE(lidiz) Core objects won't be deallocated automatically.
         # NOTE(lidiz) Core objects won't be deallocated automatically.
         # If AioServer.shutdown is not called, those objects will leak.
         # If AioServer.shutdown is not called, those objects will leak.
         self._server = Server(options)
         self._server = Server(options)
-        self._cq = CallbackCompletionQueue()
+        self._cq = create_completion_queue()
         grpc_server_register_completion_queue(
         grpc_server_register_completion_queue(
             self._server.c_server,
             self._server.c_server,
             self._cq.c_ptr(),
             self._cq.c_ptr(),
@@ -736,7 +736,7 @@ cdef class AioServer:
         # The shutdown callback won't be called until there is no live RPC.
         # The shutdown callback won't be called until there is no live RPC.
         grpc_server_shutdown_and_notify(
         grpc_server_shutdown_and_notify(
             self._server.c_server,
             self._server.c_server,
-            self._cq._cq,
+            self._cq.c_ptr(),
             self._shutdown_callback_wrapper.c_functor())
             self._shutdown_callback_wrapper.c_functor())
 
 
         # Ensures the serving task (coroutine) exits.
         # Ensures the serving task (coroutine) exits.

+ 1 - 0
src/python/grpcio/grpc/_cython/cygrpc.pxd

@@ -45,6 +45,7 @@ IF UNAME_SYSNAME != "Windows":
 include "_cygrpc/aio/iomgr/socket.pxd.pxi"
 include "_cygrpc/aio/iomgr/socket.pxd.pxi"
 include "_cygrpc/aio/iomgr/timer.pxd.pxi"
 include "_cygrpc/aio/iomgr/timer.pxd.pxi"
 include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
 include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
+include "_cygrpc/aio/completion_queue.pxd.pxi"
 include "_cygrpc/aio/rpc_status.pxd.pxi"
 include "_cygrpc/aio/rpc_status.pxd.pxi"
 include "_cygrpc/aio/grpc_aio.pxd.pxi"
 include "_cygrpc/aio/grpc_aio.pxd.pxi"
 include "_cygrpc/aio/callback_common.pxd.pxi"
 include "_cygrpc/aio/callback_common.pxd.pxi"

+ 2 - 0
src/python/grpcio/grpc/_cython/cygrpc.pyx

@@ -20,6 +20,7 @@ import os
 import sys
 import sys
 import threading
 import threading
 import time
 import time
+import enum
 
 
 import grpc
 import grpc
 
 
@@ -71,6 +72,7 @@ include "_cygrpc/aio/iomgr/timer.pyx.pxi"
 include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
 include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
 include "_cygrpc/aio/common.pyx.pxi"
 include "_cygrpc/aio/common.pyx.pxi"
 include "_cygrpc/aio/rpc_status.pyx.pxi"
 include "_cygrpc/aio/rpc_status.pyx.pxi"
+include "_cygrpc/aio/completion_queue.pyx.pxi"
 include "_cygrpc/aio/callback_common.pyx.pxi"
 include "_cygrpc/aio/callback_common.pyx.pxi"
 include "_cygrpc/aio/grpc_aio.pyx.pxi"
 include "_cygrpc/aio/grpc_aio.pyx.pxi"
 include "_cygrpc/aio/call.pyx.pxi"
 include "_cygrpc/aio/call.pyx.pxi"

+ 1 - 1
src/python/grpcio/grpc/experimental/aio/_server.py

@@ -162,7 +162,7 @@ class Server(_base_server.Server):
         be safe to slightly extend the underlying Cython object's life span.
         be safe to slightly extend the underlying Cython object's life span.
         """
         """
         if hasattr(self, '_server'):
         if hasattr(self, '_server'):
-            self._loop.create_task(self._server.shutdown(None))
+            cygrpc.aio_loop_schedule_coroutine(self._server.shutdown(None))
 
 
 
 
 def server(migration_thread_pool: Optional[Executor] = None,
 def server(migration_thread_pool: Optional[Executor] = None,

+ 5 - 9
src/python/grpcio_tests/tests_aio/unit/server_test.py

@@ -348,11 +348,10 @@ class TestServer(AioTestBase):
 
 
         await self._server.stop(test_constants.SHORT_TIMEOUT)
         await self._server.stop(test_constants.SHORT_TIMEOUT)
 
 
-        with self.assertRaises(grpc.RpcError) as exception_context:
+        with self.assertRaises(aio.AioRpcError) as exception_context:
             await call
             await call
         self.assertEqual(grpc.StatusCode.UNAVAILABLE,
         self.assertEqual(grpc.StatusCode.UNAVAILABLE,
                          exception_context.exception.code())
                          exception_context.exception.code())
-        self.assertIn('GOAWAY', exception_context.exception.details())
 
 
     async def test_concurrent_graceful_shutdown(self):
     async def test_concurrent_graceful_shutdown(self):
         call = self._channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)
         call = self._channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)
@@ -384,21 +383,18 @@ class TestServer(AioTestBase):
             self._server.stop(test_constants.LONG_TIMEOUT),
             self._server.stop(test_constants.LONG_TIMEOUT),
         )
         )
 
 
-        with self.assertRaises(grpc.RpcError) as exception_context:
+        with self.assertRaises(aio.AioRpcError) as exception_context:
             await call
             await call
         self.assertEqual(grpc.StatusCode.UNAVAILABLE,
         self.assertEqual(grpc.StatusCode.UNAVAILABLE,
                          exception_context.exception.code())
                          exception_context.exception.code())
-        self.assertIn('GOAWAY', exception_context.exception.details())
 
 
-    @unittest.skip('https://github.com/grpc/grpc/issues/20818')
     async def test_shutdown_before_call(self):
     async def test_shutdown_before_call(self):
-        server_target, server, _ = _start_test_server()
-        await server.stop(None)
+        await self._server.stop(None)
 
 
         # Ensures the server is cleaned up at this point.
         # Ensures the server is cleaned up at this point.
         # Some proper exception should be raised.
         # Some proper exception should be raised.
-        async with aio.insecure_channel('localhost:%d' % port) as channel:
-            await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST)
+        with self.assertRaises(aio.AioRpcError):
+            await self._channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST)
 
 
     async def test_unimplemented(self):
     async def test_unimplemented(self):
         call = self._channel.unary_unary(_UNIMPLEMENTED_METHOD)
         call = self._channel.unary_unary(_UNIMPLEMENTED_METHOD)

+ 135 - 25
test/cpp/end2end/xds_end2end_test.cc

@@ -351,7 +351,6 @@ class ClientStats {
   std::map<grpc::string, uint64_t> dropped_requests_;
   std::map<grpc::string, uint64_t> dropped_requests_;
 };
 };
 
 
-// TODO(roth): Change this service to a real fake.
 class AdsServiceImpl : public AggregatedDiscoveryService::Service,
 class AdsServiceImpl : public AggregatedDiscoveryService::Service,
                        public std::enable_shared_from_this<AdsServiceImpl> {
                        public std::enable_shared_from_this<AdsServiceImpl> {
  public:
  public:
@@ -496,6 +495,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
                       SubscriptionState* subscription_state,
                       SubscriptionState* subscription_state,
                       ResourceState* resource_state,
                       ResourceState* resource_state,
                       UpdateQueue* update_queue) {
                       UpdateQueue* update_queue) {
+    // The update_queue will be null if we were not previously subscribed.
     if (subscription_state->update_queue != nullptr) return;
     if (subscription_state->update_queue != nullptr) return;
     subscription_state->update_queue = update_queue;
     subscription_state->update_queue = update_queue;
     resource_state->subscriptions.emplace(subscription_state);
     resource_state->subscriptions.emplace(subscription_state);
@@ -594,7 +594,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
       // Main loop to look for requests and updates.
       // Main loop to look for requests and updates.
       while (true) {
       while (true) {
         // Look for new requests and and decide what to handle.
         // Look for new requests and and decide what to handle.
-        DiscoveryResponse response;
+        absl::optional<DiscoveryResponse> response;
         // Boolean to keep track if the loop received any work to do: a request
         // Boolean to keep track if the loop received any work to do: a request
         // or an update; regardless whether a response was actually sent out.
         // or an update; regardless whether a response was actually sent out.
         bool did_work = false;
         bool did_work = false;
@@ -647,8 +647,9 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
                       this, request.type_url().c_str(), resource_name.c_str(),
                       this, request.type_url().c_str(), resource_name.c_str(),
                       resource_state.version);
                       resource_state.version);
                   resources_added_to_response.emplace(resource_name);
                   resources_added_to_response.emplace(resource_name);
+                  if (!response.has_value()) response.emplace();
                   if (resource_state.resource.has_value()) {
                   if (resource_state.resource.has_value()) {
-                    response.add_resources()->CopyFrom(
+                    response->add_resources()->CopyFrom(
                         resource_state.resource.value());
                         resource_state.resource.value());
                   }
                   }
                 }
                 }
@@ -664,17 +665,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
                     request.type_url(),
                     request.type_url(),
                     ++resource_type_version[request.type_url()],
                     ++resource_type_version[request.type_url()],
                     subscription_name_map, resources_added_to_response,
                     subscription_name_map, resources_added_to_response,
-                    &response);
+                    &response.value());
               }
               }
             }
             }
           }
           }
         }
         }
-        if (!response.resources().empty()) {
+        if (response.has_value()) {
           gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
           gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
-                  response.DebugString().c_str());
-          stream->Write(response);
+                  response->DebugString().c_str());
+          stream->Write(response.value());
         }
         }
-        response.Clear();
+        response.reset();
         // Look for updates and decide what to handle.
         // Look for updates and decide what to handle.
         {
         {
           grpc_core::MutexLock lock(&ads_mu_);
           grpc_core::MutexLock lock(&ads_mu_);
@@ -700,21 +701,22 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
                     "ADS[%p]: Sending update for type=%s name=%s version=%d",
                     "ADS[%p]: Sending update for type=%s name=%s version=%d",
                     this, resource_type.c_str(), resource_name.c_str(),
                     this, resource_type.c_str(), resource_name.c_str(),
                     resource_state.version);
                     resource_state.version);
+                response.emplace();
                 if (resource_state.resource.has_value()) {
                 if (resource_state.resource.has_value()) {
-                  response.add_resources()->CopyFrom(
+                  response->add_resources()->CopyFrom(
                       resource_state.resource.value());
                       resource_state.resource.value());
-                  CompleteBuildingDiscoveryResponse(
-                      resource_type, ++resource_type_version[resource_type],
-                      subscription_name_map, {resource_name}, &response);
                 }
                 }
+                CompleteBuildingDiscoveryResponse(
+                    resource_type, ++resource_type_version[resource_type],
+                    subscription_name_map, {resource_name}, &response.value());
               }
               }
             }
             }
           }
           }
         }
         }
-        if (!response.resources().empty()) {
+        if (response.has_value()) {
           gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
           gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
-                  response.DebugString().c_str());
-          stream->Write(response);
+                  response->DebugString().c_str());
+          stream->Write(response.value());
         }
         }
         // If we didn't find anything to do, delay before the next loop
         // If we didn't find anything to do, delay before the next loop
         // iteration; otherwise, check whether we should exit and then
         // iteration; otherwise, check whether we should exit and then
@@ -765,6 +767,18 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
     resource_types_to_ignore_.emplace(type_url);
     resource_types_to_ignore_.emplace(type_url);
   }
   }
 
 
+  void UnsetResource(const std::string& type_url, const std::string& name) {
+    grpc_core::MutexLock lock(&ads_mu_);
+    ResourceState& state = resource_map_[type_url][name];
+    ++state.version;
+    state.resource.reset();
+    gpr_log(GPR_INFO, "ADS[%p]: Unsetting %s resource %s to version %u", this,
+            type_url.c_str(), name.c_str(), state.version);
+    for (SubscriptionState* subscription : state.subscriptions) {
+      subscription->update_queue->emplace_back(type_url, name);
+    }
+  }
+
   void SetResource(google::protobuf::Any resource, const std::string& type_url,
   void SetResource(google::protobuf::Any resource, const std::string& type_url,
                    const std::string& name) {
                    const std::string& name) {
     grpc_core::MutexLock lock(&ads_mu_);
     grpc_core::MutexLock lock(&ads_mu_);
@@ -1195,11 +1209,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
     return std::make_tuple(num_ok, num_failure, num_drops);
     return std::make_tuple(num_ok, num_failure, num_drops);
   }
   }
 
 
-  void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
+  void WaitForBackend(size_t backend_idx, bool reset_counters = true,
+                      bool require_success = false) {
     gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
     gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
             static_cast<unsigned long>(backend_idx));
             static_cast<unsigned long>(backend_idx));
     do {
     do {
-      (void)SendRpc();
+      Status status = SendRpc();
+      if (require_success) {
+        EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                                 << " message=" << status.error_message();
+      }
     } while (backends_[backend_idx]->backend_service()->request_count() == 0);
     } while (backends_[backend_idx]->backend_service()->request_count() == 0);
     if (reset_counters) ResetBackendCounters();
     if (reset_counters) ResetBackendCounters();
     gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
     gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
@@ -1639,6 +1658,68 @@ TEST_P(BasicTest, BackendsRestart) {
                  true /* wait_for_ready */);
                  true /* wait_for_ready */);
 }
 }
 
 
+using XdsResolverOnlyTest = BasicTest;
+
+// Tests switching over from one cluster to another.
+TEST_P(XdsResolverOnlyTest, ChangeClusters) {
+  const char* kNewClusterName = "new_cluster_name";
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", GetBackendPorts(0, 2)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
+  // We need to wait for all backends to come online.
+  WaitForAllBackends(0, 2);
+  // Populate new EDS resource.
+  AdsServiceImpl::EdsResourceArgs args2({
+      {"locality0", GetBackendPorts(2, 4)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args2, kNewClusterName),
+      kNewClusterName);
+  // Populate new CDS resource.
+  Cluster new_cluster = balancers_[0]->ads_service()->default_cluster();
+  new_cluster.set_name(kNewClusterName);
+  balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName);
+  // Change RDS resource to point to new cluster.
+  RouteConfiguration new_route_config =
+      balancers_[0]->ads_service()->default_route_config();
+  new_route_config.mutable_virtual_hosts(0)
+      ->mutable_routes(0)
+      ->mutable_route()
+      ->set_cluster(kNewClusterName);
+  Listener listener =
+      balancers_[0]->ads_service()->BuildListener(new_route_config);
+  balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
+  // Wait for all new backends to be used.
+  std::tuple<int, int, int> counts = WaitForAllBackends(2, 4);
+  // Make sure no RPCs failed in the transition.
+  EXPECT_EQ(0, std::get<1>(counts));
+}
+
+// Tests that things keep workng if the cluster resource disappears.
+TEST_P(XdsResolverOnlyTest, ClusterRemoved) {
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", GetBackendPorts()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
+  // We need to wait for all backends to come online.
+  WaitForAllBackends();
+  // Unset CDS resource.
+  balancers_[0]->ads_service()->UnsetResource(kCdsTypeUrl,
+                                              kDefaultResourceName);
+  // Make sure RPCs are still succeeding.
+  CheckRpcSendOk(100 * num_backends_);
+  // Make sure we ACK'ed the update.
+  EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(),
+            AdsServiceImpl::ACKED);
+}
+
 using SecureNamingTest = BasicTest;
 using SecureNamingTest = BasicTest;
 
 
 // Tests that secure naming check passes if target name is expected.
 // Tests that secure naming check passes if target name is expected.
@@ -2169,14 +2250,6 @@ TEST_P(LocalityMapTest, UpdateMap) {
   });
   });
   balancers_[0]->ads_service()->SetEdsResource(
   balancers_[0]->ads_service()->SetEdsResource(
       AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
       AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
-  args = AdsServiceImpl::EdsResourceArgs({
-      {"locality1", GetBackendPorts(1, 2), 3},
-      {"locality2", GetBackendPorts(2, 3), 2},
-      {"locality3", GetBackendPorts(3, 4), 6},
-  });
-  std::thread delayed_resource_setter(std::bind(
-      &BasicTest::SetEdsResourceWithDelay, this, 0,
-      AdsServiceImpl::BuildEdsResource(args), 5000, kDefaultResourceName));
   // Wait for the first 3 backends to be ready.
   // Wait for the first 3 backends to be ready.
   WaitForAllBackends(0, 3);
   WaitForAllBackends(0, 3);
   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -2199,6 +2272,13 @@ TEST_P(LocalityMapTest, UpdateMap) {
             ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
             ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
             ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
             ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
   }
   }
+  args = AdsServiceImpl::EdsResourceArgs({
+      {"locality1", GetBackendPorts(1, 2), 3},
+      {"locality2", GetBackendPorts(2, 3), 2},
+      {"locality3", GetBackendPorts(3, 4), 6},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
   // Backend 3 hasn't received any request.
   // Backend 3 hasn't received any request.
   EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
   EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
   // Wait until the locality update has been processed, as signaled by backend 3
   // Wait until the locality update has been processed, as signaled by backend 3
@@ -2225,6 +2305,30 @@ TEST_P(LocalityMapTest, UpdateMap) {
             ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
             ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
             ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
             ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
   }
   }
+}
+
+// Tests that we don't fail RPCs when replacing all of the localities in
+// a given priority.
+TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", GetBackendPorts(0, 1)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
+  args = AdsServiceImpl::EdsResourceArgs({
+      {"locality1", GetBackendPorts(1, 2)},
+  });
+  std::thread delayed_resource_setter(std::bind(
+      &BasicTest::SetEdsResourceWithDelay, this, 0,
+      AdsServiceImpl::BuildEdsResource(args), 5000, kDefaultResourceName));
+  // Wait for the first backend to be ready.
+  WaitForBackend(0);
+  // Keep sending RPCs until we switch over to backend 1, which tells us
+  // that we received the update.  No RPCs should fail during this
+  // transition.
+  WaitForBackend(1, /*reset_counters=*/true, /*require_success=*/true);
   delayed_resource_setter.join();
   delayed_resource_setter.join();
 }
 }
 
 
@@ -3307,6 +3411,12 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest,
                                            TestType(true, true)),
                                            TestType(true, true)),
                          &TestTypeName);
                          &TestTypeName);
 
 
+// XdsResolverOnlyTest depends on XdsResolver.
+INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest,
+                         ::testing::Values(TestType(true, false),
+                                           TestType(true, true)),
+                         &TestTypeName);
+
 INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
 INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
                          ::testing::Values(TestType(false, true),
                          ::testing::Values(TestType(false, true),
                                            TestType(false, false),
                                            TestType(false, false),

+ 2 - 0
tools/bazel.rc

@@ -89,3 +89,5 @@ build:basicprof --copt=-DGRPC_BASIC_PROFILER
 build:basicprof --copt=-DGRPC_TIMERS_RDTSC
 build:basicprof --copt=-DGRPC_TIMERS_RDTSC
 
 
 build:python_single_threaded_unary_stream --test_env="GRPC_SINGLE_THREADED_UNARY_STREAM=true"
 build:python_single_threaded_unary_stream --test_env="GRPC_SINGLE_THREADED_UNARY_STREAM=true"
+
+build:python_poller_engine --test_env="GRPC_ASYNCIO_ENGINE=poller"

+ 1 - 0
tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh

@@ -28,6 +28,7 @@ TEST_TARGETS="//src/python/... //examples/python/..."
 BAZEL_FLAGS="--spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors"
 BAZEL_FLAGS="--spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors"
 bazel test ${BAZEL_FLAGS} ${TEST_TARGETS}
 bazel test ${BAZEL_FLAGS} ${TEST_TARGETS}
 bazel test --config=python_single_threaded_unary_stream ${BAZEL_FLAGS} ${TEST_TARGETS}
 bazel test --config=python_single_threaded_unary_stream ${BAZEL_FLAGS} ${TEST_TARGETS}
+bazel test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS}
 
 
 
 
 # TODO(https://github.com/grpc/grpc/issues/19854): Move this to a new Kokoro
 # TODO(https://github.com/grpc/grpc/issues/19854): Move this to a new Kokoro

+ 3 - 2
tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh

@@ -47,9 +47,10 @@ touch "$TOOLS_DIR"/src/proto/grpc/testing/__init__.py
 
 
 bazel build test/cpp/interop:xds_interop_client
 bazel build test/cpp/interop:xds_interop_client
 
 
-"$PYTHON" tools/run_tests/run_xds_tests.py \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,cds_lb,xds_lb "$PYTHON" \
+  tools/run_tests/run_xds_tests.py \
     --test_case=all \
     --test_case=all \
     --project_id=grpc-testing \
     --project_id=grpc-testing \
     --gcp_suffix=$(date '+%s') \
     --gcp_suffix=$(date '+%s') \
     --verbose \
     --verbose \
-    --client_cmd='GRPC_VERBOSITY=debug GRPC_TRACE=xds,xds_client bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}'
+    --client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}'

+ 8 - 4
tools/run_tests/run_xds_tests.py

@@ -79,10 +79,11 @@ argp.add_argument('--zone', default='us-central1-a')
 argp.add_argument('--secondary_zone',
 argp.add_argument('--secondary_zone',
                   default='us-west1-b',
                   default='us-west1-b',
                   help='Zone to use for secondary TD locality tests')
                   help='Zone to use for secondary TD locality tests')
-argp.add_argument('--qps', default=10, help='Client QPS')
+argp.add_argument('--qps', default=10, type=int, help='Client QPS')
 argp.add_argument(
 argp.add_argument(
     '--wait_for_backend_sec',
     '--wait_for_backend_sec',
     default=600,
     default=600,
+    type=int,
     help='Time limit for waiting for created backend services to report '
     help='Time limit for waiting for created backend services to report '
     'healthy when launching or updated GCP resources')
     'healthy when launching or updated GCP resources')
 argp.add_argument(
 argp.add_argument(
@@ -124,6 +125,9 @@ argp.add_argument('--xds_server',
 argp.add_argument('--source_image',
 argp.add_argument('--source_image',
                   default='projects/debian-cloud/global/images/family/debian-9',
                   default='projects/debian-cloud/global/images/family/debian-9',
                   help='Source image for VMs created during the test')
                   help='Source image for VMs created during the test')
+argp.add_argument('--machine_type',
+                  default='e2-standard-2',
+                  help='Machine type for VMs created during the test')
 argp.add_argument(
 argp.add_argument(
     '--tolerate_gcp_errors',
     '--tolerate_gcp_errors',
     default=False,
     default=False,
@@ -429,14 +433,14 @@ def test_secondary_locality_gets_requests_on_primary_failure(
         patch_backend_instances(gcp, backend_service, [primary_instance_group])
         patch_backend_instances(gcp, backend_service, [primary_instance_group])
 
 
 
 
-def create_instance_template(gcp, name, network, source_image):
+def create_instance_template(gcp, name, network, source_image, machine_type):
     config = {
     config = {
         'name': name,
         'name': name,
         'properties': {
         'properties': {
             'tags': {
             'tags': {
                 'items': ['allow-health-checks']
                 'items': ['allow-health-checks']
             },
             },
-            'machineType': 'e2-standard-2',
+            'machineType': machine_type,
             'serviceAccounts': [{
             'serviceAccounts': [{
                 'email': 'default',
                 'email': 'default',
                 'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
                 'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
@@ -941,7 +945,7 @@ try:
             raise Exception(
             raise Exception(
                 'Failed to find a valid ip:port for the forwarding rule')
                 'Failed to find a valid ip:port for the forwarding rule')
         create_instance_template(gcp, template_name, args.network,
         create_instance_template(gcp, template_name, args.network,
-                                 args.source_image)
+                                 args.source_image, args.machine_type)
         instance_group = add_instance_group(gcp, args.zone, instance_group_name,
         instance_group = add_instance_group(gcp, args.zone, instance_group_name,
                                             _INSTANCE_GROUP_SIZE)
                                             _INSTANCE_GROUP_SIZE)
         patch_backend_instances(gcp, backend_service, [instance_group])
         patch_backend_instances(gcp, backend_service, [instance_group])