Forráskód Böngészése

s/combiner/logical_thread

Yash Tibrewal 5 éve
szülő
commit
895bd10859
21 módosított fájl, 150 hozzáadás és 142 törlés
  1. 2 3
      src/core/ext/filters/client_channel/resolver.cc
  2. 9 7
      src/core/ext/filters/client_channel/resolver.h
  3. 5 5
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  4. 10 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  5. 3 3
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  6. 9 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc
  7. 2 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  8. 25 24
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  9. 17 16
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  10. 1 1
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  11. 2 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
  12. 4 4
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  13. 14 14
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  14. 2 2
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  15. 2 2
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  16. 2 2
      src/core/ext/filters/client_channel/resolver_factory.h
  17. 2 2
      src/core/ext/filters/client_channel/resolver_registry.cc
  18. 3 3
      src/core/ext/filters/client_channel/xds/xds_api.h
  19. 16 14
      src/core/ext/filters/client_channel/xds/xds_client.cc
  20. 3 3
      src/core/ext/filters/client_channel/xds/xds_client.h
  21. 17 15
      src/core/ext/filters/client_channel/xds/xds_client_stats.h

+ 2 - 3
src/core/ext/filters/client_channel/resolver.cc

@@ -19,7 +19,6 @@
 #include <grpc/support/port_platform.h>
 
 #include "src/core/ext/filters/client_channel/resolver.h"
-#include "src/core/lib/iomgr/combiner.h"
 
 grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount(false,
                                                            "resolver_refcount");
@@ -30,11 +29,11 @@ namespace grpc_core {
 // Resolver
 //
 
-Resolver::Resolver(RefCountedPtr<LogicalThread> combiner,
+Resolver::Resolver(RefCountedPtr<LogicalThread> logical_thread,
                    std::unique_ptr<ResultHandler> result_handler)
     : InternallyRefCounted(&grpc_trace_resolver_refcount),
       result_handler_(std::move(result_handler)),
-      combiner_(std::move(combiner)) {}
+      logical_thread_(std::move(logical_thread)) {}
 
 //
 // Resolver::Result

+ 9 - 7
src/core/ext/filters/client_channel/resolver.h

@@ -45,7 +45,7 @@ namespace grpc_core {
 /// DNS).
 ///
 /// Note: All methods with a "Locked" suffix must be called from the
-/// combiner passed to the constructor.
+/// logical_thread passed to the constructor.
 class Resolver : public InternallyRefCounted<Resolver> {
  public:
   /// Results returned by the resolver.
@@ -115,30 +115,32 @@ class Resolver : public InternallyRefCounted<Resolver> {
   /// implementations.  At that point, this method can go away.
   virtual void ResetBackoffLocked() {}
 
-  // Note: This must be invoked while holding the combiner.
+  // Note: This must be invoked while holding the logical_thread.
   void Orphan() override {
     ShutdownLocked();
     Unref();
   }
 
  protected:
-  /// Does NOT take ownership of the reference to \a combiner.
-  // TODO(roth): Once we have a C++-like interface for combiners, this
+  /// Does NOT take ownership of the reference to \a logical_thread.
+  // TODO(roth): Once we have a C++-like interface for logical threads, this
   // API should change to take a RefCountedPtr<>, so that we always take
   // ownership of a new ref.
-  explicit Resolver(RefCountedPtr<LogicalThread> combiner,
+  explicit Resolver(RefCountedPtr<LogicalThread> logical_thread,
                     std::unique_ptr<ResultHandler> result_handler);
 
   /// Shuts down the resolver.
   virtual void ShutdownLocked() = 0;
 
-  RefCountedPtr<LogicalThread> combiner() const { return combiner_; }
+  RefCountedPtr<LogicalThread> logical_thread() const {
+    return logical_thread_;
+  }
 
   ResultHandler* result_handler() const { return result_handler_.get(); }
 
  private:
   std::unique_ptr<ResultHandler> result_handler_;
-  RefCountedPtr<LogicalThread> combiner_;
+  RefCountedPtr<LogicalThread> logical_thread_;
 };
 
 }  // namespace grpc_core

+ 5 - 5
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc

@@ -91,7 +91,7 @@ class AresDnsResolver : public Resolver {
   bool request_service_config_;
   /// pollset_set to drive the name resolution process
   grpc_pollset_set* interested_parties_;
-  /// closures used by the combiner
+  /// closures used by the logical_thread
   grpc_closure on_next_resolution_;
   grpc_closure on_resolved_;
   /// are we currently resolving?
@@ -120,7 +120,7 @@ class AresDnsResolver : public Resolver {
 };
 
 AresDnsResolver::AresDnsResolver(ResolverArgs args)
-    : Resolver(args.combiner, std::move(args.result_handler)),
+    : Resolver(args.logical_thread, std::move(args.result_handler)),
       backoff_(
           BackOff::Options()
               .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS *
@@ -201,7 +201,7 @@ void AresDnsResolver::ShutdownLocked() {
 
 void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
   AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
-  r->combiner()->Run(
+  r->logical_thread()->Run(
       Closure::ToFunction(GRPC_CLOSURE_INIT(&r->on_next_resolution_,
                                             OnNextResolutionLocked, r, nullptr),
                           GRPC_ERROR_REF(error)),
@@ -327,7 +327,7 @@ char* ChooseServiceConfig(char* service_config_choice_json,
 
 void AresDnsResolver::OnResolved(void* arg, grpc_error* error) {
   AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
-  r->combiner()->Run(
+  r->logical_thread()->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&r->on_resolved_, OnResolvedLocked, r, nullptr),
           GRPC_ERROR_REF(error)),
@@ -443,7 +443,7 @@ void AresDnsResolver::StartResolvingLocked() {
       dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
       &on_resolved_, &addresses_, enable_srv_queries_ /* check_grpclb */,
       request_service_config_ ? &service_config_json_ : nullptr,
-      query_timeout_ms_, combiner());
+      query_timeout_ms_, logical_thread());
   last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
   GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p",
                        this, pending_request_);

+ 10 - 10
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc

@@ -65,8 +65,8 @@ struct grpc_ares_ev_driver {
   /** refcount of the event driver */
   gpr_refcount refs;
 
-  /** combiner to synchronize c-ares and I/O callbacks on */
-  grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner;
+  /** logical_thread to synchronize c-ares and I/O callbacks on */
+  grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread;
   /** a list of grpc_fd that this event driver is currently using. */
   fd_node* fds;
   /** is this event driver currently working? */
@@ -144,7 +144,7 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) =
 grpc_error* grpc_ares_ev_driver_create_locked(
     grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
     int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner,
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread,
     grpc_ares_request* request) {
   *ev_driver = new grpc_ares_ev_driver();
   ares_options opts;
@@ -162,7 +162,7 @@ grpc_error* grpc_ares_ev_driver_create_locked(
     gpr_free(*ev_driver);
     return err;
   }
-  (*ev_driver)->combiner = std::move(combiner);
+  (*ev_driver)->logical_thread = std::move(logical_thread);
   gpr_ref_init(&(*ev_driver)->refs, 1);
   (*ev_driver)->pollset_set = pollset_set;
   (*ev_driver)->fds = nullptr;
@@ -170,7 +170,7 @@ grpc_error* grpc_ares_ev_driver_create_locked(
   (*ev_driver)->shutting_down = false;
   (*ev_driver)->request = request;
   (*ev_driver)->polled_fd_factory =
-      grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
+      grpc_core::NewGrpcPolledFdFactory((*ev_driver)->logical_thread);
   (*ev_driver)
       ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
   (*ev_driver)->query_timeout_ms = query_timeout_ms;
@@ -232,7 +232,7 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
 
 static void on_timeout(void* arg, grpc_error* error) {
   grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
-  driver->combiner->Run(
+  driver->logical_thread->Run(
       grpc_core::Closure::ToFunction(
           GRPC_CLOSURE_INIT(&driver->on_timeout_locked, on_timeout_locked,
                             driver, nullptr),
@@ -254,7 +254,7 @@ static void on_timeout_locked(void* arg, grpc_error* error) {
 
 static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) {
   grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
-  driver->combiner->Run(
+  driver->logical_thread->Run(
       grpc_core::Closure::ToFunction(
           GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked,
                             on_ares_backup_poll_alarm_locked, driver, nullptr),
@@ -333,7 +333,7 @@ static void on_readable_locked(void* arg, grpc_error* error) {
 
 static void on_readable(void* arg, grpc_error* error) {
   fd_node* fdn = static_cast<fd_node*>(arg);
-  fdn->ev_driver->combiner->Run(
+  fdn->ev_driver->logical_thread->Run(
       grpc_core::Closure::ToFunction(
           GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
                             nullptr),
@@ -366,7 +366,7 @@ static void on_writable_locked(void* arg, grpc_error* error) {
 
 static void on_writable(void* arg, grpc_error* error) {
   fd_node* fdn = static_cast<fd_node*>(arg);
-  fdn->ev_driver->combiner->Run(
+  fdn->ev_driver->logical_thread->Run(
       grpc_core::Closure::ToFunction(
           GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
                             nullptr),
@@ -396,7 +396,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
           fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
           fdn->grpc_polled_fd =
               ev_driver->polled_fd_factory->NewGrpcPolledFdLocked(
-                  socks[i], ev_driver->pollset_set, ev_driver->combiner);
+                  socks[i], ev_driver->pollset_set, ev_driver->logical_thread);
           GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request,
                                fdn->grpc_polled_fd->GetName());
           fdn->ev_driver = ev_driver;

+ 3 - 3
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h

@@ -43,7 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked(
 grpc_error* grpc_ares_ev_driver_create_locked(
     grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
     int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner,
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread,
     grpc_ares_request* request);
 
 /* Called back when all DNS lookups have completed. */
@@ -90,13 +90,13 @@ class GrpcPolledFdFactory {
   /* Creates a new wrapped fd for the current platform */
   virtual GrpcPolledFd* NewGrpcPolledFdLocked(
       ares_socket_t as, grpc_pollset_set* driver_pollset_set,
-      RefCountedPtr<LogicalThread> combiner) = 0;
+      RefCountedPtr<LogicalThread> logical_thread) = 0;
   /* Optionally configures the ares channel after creation */
   virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
 };
 
 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
-    RefCountedPtr<LogicalThread> combiner);
+    RefCountedPtr<LogicalThread> logical_thread);
 
 }  // namespace grpc_core
 

+ 9 - 8
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc

@@ -41,8 +41,9 @@ void ares_uv_poll_close_cb(uv_handle_t* handle) { delete handle; }
 
 class GrpcPolledFdLibuv : public GrpcPolledFd {
  public:
-  GrpcPolledFdLibuv(ares_socket_t as, RefCountedPtr<LogicalThread> combiner)
-      : as_(as), combiner_(std::move(combiner)) {
+  GrpcPolledFdLibuv(ares_socket_t as,
+                    RefCountedPtr<LogicalThread> logical_thread)
+      : as_(as), logical_thread_(std::move(logical_thread)) {
     gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as);
     handle_ = new uv_poll_t();
     uv_poll_init_socket(uv_default_loop(), handle_, as);
@@ -105,7 +106,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
   grpc_closure* read_closure_ = nullptr;
   grpc_closure* write_closure_ = nullptr;
   int poll_events_ = 0;
-  RefCountedPtr<LogicalThread> combiner_;
+  RefCountedPtr<LogicalThread> logical_thread_;
 };
 
 struct AresUvPollCbArg {
@@ -151,23 +152,23 @@ void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) {
   GrpcPolledFdLibuv* polled_fd =
       reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
   AresUvPollCbArg* arg = new AresUvPollCbArg(handle, status, events);
-  polled_fd->combiner_->Run([arg]() { ares_uv_poll_cb_locked(arg); },
-                            DEBUG_LOCATION);
+  polled_fd->logical_thread_->Run([arg]() { ares_uv_poll_cb_locked(arg); },
+                                  DEBUG_LOCATION);
 }
 
 class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
  public:
   GrpcPolledFd* NewGrpcPolledFdLocked(
       ares_socket_t as, grpc_pollset_set* driver_pollset_set,
-      RefCountedPtr<LogicalThread> combiner) override {
-    return new GrpcPolledFdLibuv(as, combiner);
+      RefCountedPtr<LogicalThread> logical_thread) override {
+    return new GrpcPolledFdLibuv(as, logical_thread);
   }
 
   void ConfigureAresChannelLocked(ares_channel channel) override {}
 };
 
 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
-    RefCountedPtr<LogicalThread> /*combiner*/) {
+    RefCountedPtr<LogicalThread> /*logical_thread*/) {
   return MakeUnique<GrpcPolledFdFactoryLibuv>();
 }
 

+ 2 - 2
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc

@@ -90,7 +90,7 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
  public:
   GrpcPolledFd* NewGrpcPolledFdLocked(
       ares_socket_t as, grpc_pollset_set* driver_pollset_set,
-      RefCountedPtr<LogicalThread> /*combiner*/) override {
+      RefCountedPtr<LogicalThread> /*logical_thread*/) override {
     return new GrpcPolledFdPosix(as, driver_pollset_set);
   }
 
@@ -98,7 +98,7 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
 };
 
 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
-    RefCountedPtr<LogicalThread> /*combiner*/) {
+    RefCountedPtr<LogicalThread> /*logical_thread*/) {
   return MakeUnique<GrpcPolledFdFactoryPosix>();
 }
 

+ 25 - 24
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc

@@ -30,8 +30,8 @@
 #include <string.h>
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/memory.h"
-#include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/iocp_windows.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/iomgr/sockaddr_windows.h"
 #include "src/core/lib/iomgr/socket_windows.h"
@@ -97,7 +97,8 @@ class GrpcPolledFdWindows {
     WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
   };
 
-  GrpcPolledFdWindows(ares_socket_t as, RefCountedPtr<LogicalThread> combiner,
+  GrpcPolledFdWindows(ares_socket_t as,
+                      RefCountedPtr<LogicalThread> logical_thread,
                       int address_family, int socket_type)
       : read_buf_(grpc_empty_slice()),
         write_buf_(grpc_empty_slice()),
@@ -105,7 +106,7 @@ class GrpcPolledFdWindows {
         gotten_into_driver_list_(false),
         address_family_(address_family),
         socket_type_(socket_type),
-        combiner_(std::move(combiner)) {
+        logical_thread_(std::move(logical_thread)) {
     gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
     winsocket_ = grpc_winsocket_create(as, name_);
   }
@@ -137,8 +138,8 @@ class GrpcPolledFdWindows {
     GPR_ASSERT(!read_buf_has_data_);
     read_buf_ = GRPC_SLICE_MALLOC(4192);
     if (connect_done_) {
-      combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
-                     DEBUG_LOCATION);
+      logical_thread_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
+                           DEBUG_LOCATION);
     } else {
       GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
       pending_continue_register_for_on_readable_locked_ = true;
@@ -197,8 +198,8 @@ class GrpcPolledFdWindows {
     GPR_ASSERT(write_closure_ == nullptr);
     write_closure_ = write_closure;
     if (connect_done_) {
-      combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
-                     DEBUG_LOCATION);
+      logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
+                           DEBUG_LOCATION);
     } else {
       GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false);
       pending_continue_register_for_on_writeable_locked_ = true;
@@ -416,7 +417,7 @@ class GrpcPolledFdWindows {
   static void OnTcpConnect(void* arg, grpc_error* error) {
     GrpcPolledFdWindows* grpc_polled_fd =
         static_cast<GrpcPolledFdWindows*>(arg);
-    grpc_polled_fd->combiner_->Run(
+    grpc_polled_fd->logical_thread_->Run(
         Closure::ToFunction(
             GRPC_CLOSURE_INIT(&grpc_polled_fd->on_tcp_connect_locked_,
                               &GrpcPolledFdWindows::OnTcpConnectLocked,
@@ -465,12 +466,12 @@ class GrpcPolledFdWindows {
       wsa_connect_error_ = WSA_OPERATION_ABORTED;
     }
     if (pending_continue_register_for_on_readable_locked_) {
-      combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
-                     DEBUG_LOCATION);
+      logical_thread_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
+                           DEBUG_LOCATION);
     }
     if (pending_continue_register_for_on_writeable_locked_) {
-      combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
-                     DEBUG_LOCATION);
+      logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
+                           DEBUG_LOCATION);
     }
   }
 
@@ -580,7 +581,7 @@ class GrpcPolledFdWindows {
 
   static void OnIocpReadable(void* arg, grpc_error* error) {
     GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
-    polled_fd->combiner_->Run(
+    polled_fd->logical_thread_->Run(
         Closure::ToFunction(
             GRPC_CLOSURE_INIT(&polled_fd->outer_read_closure_,
                               &GrpcPolledFdWindows::OnIocpReadableLocked,
@@ -634,7 +635,7 @@ class GrpcPolledFdWindows {
 
   static void OnIocpWriteable(void* arg, grpc_error* error) {
     GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
-    polled_fd->combiner_->Run(
+    polled_fd->logical_thread_->Run(
         Closure::ToFunction(
             GRPC_CLOSURE_INIT(&polled_fd->outer_write_closure_,
                               &GrpcPolledFdWindows::OnIocpWriteableLocked,
@@ -680,7 +681,7 @@ class GrpcPolledFdWindows {
   bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
   void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
 
-  RefCountedPtr<LogicalThread> combiner_;
+  RefCountedPtr<LogicalThread> logical_thread_;
   char recv_from_source_addr_[200];
   ares_socklen_t recv_from_source_addr_len_;
   grpc_slice read_buf_;
@@ -722,8 +723,8 @@ struct SockToPolledFdEntry {
  * with a GrpcPolledFdWindows factory and event driver */
 class SockToPolledFdMap {
  public:
-  SockToPolledFdMap(RefCountedPtr<LogicalThread> combiner)
-      : combiner_(std::move(combiner)) {}
+  SockToPolledFdMap(RefCountedPtr<LogicalThread> logical_thread)
+      : logical_thread_(std::move(logical_thread)) {}
 
   ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); }
 
@@ -781,7 +782,7 @@ class SockToPolledFdMap {
     }
     grpc_tcp_set_non_block(s);
     GrpcPolledFdWindows* polled_fd =
-        new GrpcPolledFdWindows(s, map->combiner_, af, type);
+        new GrpcPolledFdWindows(s, map->logical_thread_, af, type);
     GRPC_CARES_TRACE_LOG(
         "fd:|%s| created with params af:%d type:%d protocol:%d",
         polled_fd->GetName(), af, type, protocol);
@@ -837,7 +838,7 @@ class SockToPolledFdMap {
 
  private:
   SockToPolledFdEntry* head_ = nullptr;
-  RefCountedPtr<LogicalThread> combiner_;
+  RefCountedPtr<LogicalThread> logical_thread_;
 };
 
 const struct ares_socket_functions custom_ares_sock_funcs = {
@@ -886,12 +887,12 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
 
 class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
  public:
-  GrpcPolledFdFactoryWindows(RefCountedPtr<LogicalThread> combiner)
-      : sock_to_polled_fd_map_(combiner) {}
+  GrpcPolledFdFactoryWindows(RefCountedPtr<LogicalThread> logical_thread)
+      : sock_to_polled_fd_map_(logical_thread) {}
 
   GrpcPolledFd* NewGrpcPolledFdLocked(
       ares_socket_t as, grpc_pollset_set* driver_pollset_set,
-      RefCountedPtr<LogicalThread> combiner) override {
+      RefCountedPtr<LogicalThread> logical_thread) override {
     GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
     // Set a flag so that the virtual socket "close" method knows it
     // doesn't need to call ShutdownLocked, since now the driver will.
@@ -909,8 +910,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
 };
 
 std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
-    RefCountedPtr<LogicalThread> combiner) {
-  return MakeUnique<GrpcPolledFdFactoryWindows>(std::move(combiner));
+    RefCountedPtr<LogicalThread> logical_thread) {
+  return MakeUnique<GrpcPolledFdFactoryWindows>(std::move(logical_thread));
 }
 
 }  // namespace grpc_core

+ 17 - 16
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc

@@ -350,7 +350,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
     grpc_ares_request* r, const char* dns_server, const char* name,
     const char* default_port, grpc_pollset_set* interested_parties,
     bool check_grpclb, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread) {
   grpc_error* error = GRPC_ERROR_NONE;
   grpc_ares_hostbyname_request* hr = nullptr;
   ares_channel* channel = nullptr;
@@ -372,8 +372,8 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
     }
     port.reset(gpr_strdup(default_port));
   }
-  error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties,
-                                            query_timeout_ms, combiner, r);
+  error = grpc_ares_ev_driver_create_locked(
+      &r->ev_driver, interested_parties, query_timeout_ms, logical_thread, r);
   if (error != GRPC_ERROR_NONE) goto error_cleanup;
   channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
   // If dns_server is specified, use it.
@@ -590,7 +590,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
     grpc_pollset_set* interested_parties, grpc_closure* on_done,
     std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
     char** service_config_json, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread) {
   grpc_ares_request* r =
       static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
   r->ev_driver = nullptr;
@@ -624,7 +624,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
   // Look up name using c-ares lib.
   grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
       r, dns_server, name, default_port, interested_parties, check_grpclb,
-      query_timeout_ms, combiner);
+      query_timeout_ms, logical_thread);
   return r;
 }
 
@@ -633,7 +633,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
     grpc_pollset_set* interested_parties, grpc_closure* on_done,
     std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
     char** service_config_json, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) =
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread) =
     grpc_dns_lookup_ares_locked_impl;
 
 static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
@@ -674,8 +674,8 @@ void grpc_ares_cleanup(void) {}
  */
 
 typedef struct grpc_resolve_address_ares_request {
-  /* combiner that queries and related callbacks run under */
-  grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner;
+  /* logical_thread that queries and related callbacks run under */
+  grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread;
   /** the pointer to receive the resolved addresses */
   grpc_resolved_addresses** addrs_out;
   /** currently resolving addresses */
@@ -723,11 +723,12 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
 static void on_dns_lookup_done(void* arg, grpc_error* error) {
   grpc_resolve_address_ares_request* r =
       static_cast<grpc_resolve_address_ares_request*>(arg);
-  r->combiner->Run(grpc_core::Closure::ToFunction(
-                       GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked,
-                                         on_dns_lookup_done_locked, r, nullptr),
-                       GRPC_ERROR_REF(error)),
-                   DEBUG_LOCATION);
+  r->logical_thread->Run(
+      grpc_core::Closure::ToFunction(
+          GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked,
+                            on_dns_lookup_done_locked, r, nullptr),
+          GRPC_ERROR_REF(error)),
+      DEBUG_LOCATION);
 }
 
 static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) {
@@ -739,7 +740,7 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) {
       nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
       &r->on_dns_lookup_done_locked, &r->addresses, false /* check_grpclb */,
       nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS,
-      r->combiner);
+      r->logical_thread);
 }
 
 static void grpc_resolve_address_ares_impl(const char* name,
@@ -749,13 +750,13 @@ static void grpc_resolve_address_ares_impl(const char* name,
                                            grpc_resolved_addresses** addrs) {
   grpc_resolve_address_ares_request* r =
       new grpc_resolve_address_ares_request();
-  r->combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
+  r->logical_thread = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
   r->addrs_out = addrs;
   r->on_resolve_address_done = on_done;
   r->name = name;
   r->default_port = default_port;
   r->interested_parties = interested_parties;
-  r->combiner->Run(
+  r->logical_thread->Run(
       [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); },
       DEBUG_LOCATION);
 }

+ 1 - 1
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h

@@ -66,7 +66,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
     grpc_pollset_set* interested_parties, grpc_closure* on_done,
     std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
     char** service_config_json, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner);
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread);
 
 /* Cancel the pending grpc_ares_request \a request */
 extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);

+ 2 - 2
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc

@@ -31,7 +31,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
     grpc_pollset_set* interested_parties, grpc_closure* on_done,
     std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
     char** service_config_json, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread) {
   return NULL;
 }
 
@@ -40,7 +40,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
     grpc_pollset_set* interested_parties, grpc_closure* on_done,
     std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
     char** service_config_json, int query_timeout_ms,
-    grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) =
+    grpc_core::RefCountedPtr<grpc_core::LogicalThread> logical_thread) =
     grpc_dns_lookup_ares_locked_impl;
 
 static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}

+ 4 - 4
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc

@@ -33,7 +33,7 @@
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/manual_constructor.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/timer.h"
 
@@ -97,7 +97,7 @@ class NativeDnsResolver : public Resolver {
 };
 
 NativeDnsResolver::NativeDnsResolver(ResolverArgs args)
-    : Resolver(args.combiner, std::move(args.result_handler)),
+    : Resolver(args.logical_thread, std::move(args.result_handler)),
       backoff_(
           BackOff::Options()
               .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS *
@@ -149,7 +149,7 @@ void NativeDnsResolver::ShutdownLocked() {
 
 void NativeDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
   NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
-  r->combiner()->Run(
+  r->logical_thread()->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&r->on_next_resolution_,
                             NativeDnsResolver::OnNextResolutionLocked, r,
@@ -169,7 +169,7 @@ void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
 
 void NativeDnsResolver::OnResolved(void* arg, grpc_error* error) {
   NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
-  r->combiner()->Run(
+  r->logical_thread()->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&r->on_resolved_,
                             NativeDnsResolver::OnResolvedLocked, r, nullptr),

+ 14 - 14
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc

@@ -35,7 +35,7 @@
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gpr/useful.h"
 #include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/unix_sockets_posix.h"
 #include "src/core/lib/slice/slice_internal.h"
@@ -94,7 +94,7 @@ class FakeResolver : public Resolver {
 };
 
 FakeResolver::FakeResolver(ResolverArgs args)
-    : Resolver(args.combiner, std::move(args.result_handler)),
+    : Resolver(args.logical_thread, std::move(args.result_handler)),
       response_generator_(
           FakeResolverResponseGenerator::GetFromArgs(args.args)) {
   // Channels sharing the same subchannels may have different resolver response
@@ -126,8 +126,8 @@ void FakeResolver::RequestReresolutionLocked() {
     if (!reresolution_closure_pending_) {
       reresolution_closure_pending_ = true;
       Ref().release();  // ref held by closure
-      combiner()->Run([this]() { ReturnReresolutionResult(this); },
-                      DEBUG_LOCATION);
+      logical_thread()->Run([this]() { ReturnReresolutionResult(this); },
+                            DEBUG_LOCATION);
     }
   }
 }
@@ -196,8 +196,8 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
   SetResponseArg* arg = new SetResponseArg();
   arg->resolver = std::move(resolver);
   arg->result = std::move(result);
-  arg->resolver->combiner()->Run([arg]() { SetResponseLocked(arg); },
-                                 DEBUG_LOCATION);
+  arg->resolver->logical_thread()->Run([arg]() { SetResponseLocked(arg); },
+                                       DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
@@ -222,7 +222,7 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
   arg->resolver = std::move(resolver);
   arg->result = std::move(result);
   arg->has_result = true;
-  arg->resolver->combiner()->Run(
+  arg->resolver->logical_thread()->Run(
       [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION);
 }
 
@@ -235,7 +235,7 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
   }
   SetResponseArg* arg = new SetResponseArg();
   arg->resolver = std::move(resolver);
-  arg->resolver->combiner()->Run(
+  arg->resolver->logical_thread()->Run(
       [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION);
 }
 
@@ -257,8 +257,8 @@ void FakeResolverResponseGenerator::SetFailure() {
   }
   SetResponseArg* arg = new SetResponseArg();
   arg->resolver = std::move(resolver);
-  arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); },
-                                 DEBUG_LOCATION);
+  arg->resolver->logical_thread()->Run([arg]() { SetFailureLocked(arg); },
+                                       DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetFailureOnReresolution() {
@@ -271,8 +271,8 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() {
   SetResponseArg* arg = new SetResponseArg();
   arg->resolver = std::move(resolver);
   arg->immediate = false;
-  arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); },
-                                 DEBUG_LOCATION);
+  arg->resolver->logical_thread()->Run([arg]() { SetFailureLocked(arg); },
+                                       DEBUG_LOCATION);
 }
 
 void FakeResolverResponseGenerator::SetFakeResolver(
@@ -284,8 +284,8 @@ void FakeResolverResponseGenerator::SetFakeResolver(
     SetResponseArg* arg = new SetResponseArg();
     arg->resolver = resolver_->Ref();
     arg->result = std::move(result_);
-    resolver_->combiner()->Run([arg]() { SetResponseLocked(arg); },
-                               DEBUG_LOCATION);
+    resolver_->logical_thread()->Run([arg]() { SetResponseLocked(arg); },
+                                     DEBUG_LOCATION);
     has_result_ = false;
   }
 }

+ 2 - 2
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc

@@ -31,7 +31,7 @@
 #include "src/core/ext/filters/client_channel/server_address.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/gpr/string.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/unix_sockets_posix.h"
 #include "src/core/lib/slice/slice_internal.h"
@@ -57,7 +57,7 @@ class SockaddrResolver : public Resolver {
 
 SockaddrResolver::SockaddrResolver(ServerAddressList addresses,
                                    ResolverArgs args)
-    : Resolver(args.combiner, std::move(args.result_handler)),
+    : Resolver(args.logical_thread, std::move(args.result_handler)),
       addresses_(std::move(addresses)),
       channel_args_(grpc_channel_args_copy(args.args)) {}
 

+ 2 - 2
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc

@@ -33,7 +33,7 @@ namespace {
 class XdsResolver : public Resolver {
  public:
   explicit XdsResolver(ResolverArgs args)
-      : Resolver(args.combiner, std::move(args.result_handler)),
+      : Resolver(args.logical_thread, std::move(args.result_handler)),
         args_(grpc_channel_args_copy(args.args)),
         interested_parties_(args.pollset_set) {
     char* path = args.uri->path;
@@ -88,7 +88,7 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
 void XdsResolver::StartLocked() {
   grpc_error* error = GRPC_ERROR_NONE;
   xds_client_ = MakeOrphanable<XdsClient>(
-      combiner(), interested_parties_, StringView(server_name_.get()),
+      logical_thread(), interested_parties_, StringView(server_name_.get()),
       MakeUnique<ServiceConfigWatcher>(Ref()), *args_, &error);
   if (error != GRPC_ERROR_NONE) {
     gpr_log(GPR_ERROR,

+ 2 - 2
src/core/ext/filters/client_channel/resolver_factory.h

@@ -38,8 +38,8 @@ struct ResolverArgs {
   const grpc_channel_args* args = nullptr;
   /// Used to drive I/O in the name resolution process.
   grpc_pollset_set* pollset_set = nullptr;
-  /// The combiner under which all resolver calls will be run.
-  RefCountedPtr<LogicalThread> combiner;
+  /// The logical_thread under which all resolver calls will be run.
+  RefCountedPtr<LogicalThread> logical_thread;
   /// The result handler to be used by the resolver.
   std::unique_ptr<Resolver::ResultHandler> result_handler;
 };

+ 2 - 2
src/core/ext/filters/client_channel/resolver_registry.cc

@@ -145,7 +145,7 @@ bool ResolverRegistry::IsValidTarget(const char* target) {
 
 OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
     const char* target, const grpc_channel_args* args,
-    grpc_pollset_set* pollset_set, RefCountedPtr<LogicalThread> combiner,
+    grpc_pollset_set* pollset_set, RefCountedPtr<LogicalThread> logical_thread,
     std::unique_ptr<Resolver::ResultHandler> result_handler) {
   GPR_ASSERT(g_state != nullptr);
   grpc_uri* uri = nullptr;
@@ -156,7 +156,7 @@ OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
   resolver_args.uri = uri;
   resolver_args.args = args;
   resolver_args.pollset_set = pollset_set;
-  resolver_args.combiner = std::move(combiner);
+  resolver_args.logical_thread = std::move(logical_thread);
   resolver_args.result_handler = std::move(result_handler);
   OrphanablePtr<Resolver> resolver =
       factory == nullptr ? nullptr

+ 3 - 3
src/core/ext/filters/client_channel/xds/xds_api.h

@@ -90,8 +90,8 @@ class XdsPriorityListUpdate {
 };
 
 // There are two phases of accessing this class's content:
-// 1. to initialize in the control plane combiner;
-// 2. to use in the data plane combiner.
+// 1. to initialize in the control plane logical_thread;
+// 2. to use in the data plane logical_thread.
 // So no additional synchronization is needed.
 class XdsDropConfig : public RefCounted<XdsDropConfig> {
  public:
@@ -113,7 +113,7 @@ class XdsDropConfig : public RefCounted<XdsDropConfig> {
         DropCategory{std::move(name), parts_per_million});
   }
 
-  // The only method invoked from the data plane combiner.
+  // The only method invoked from the data plane logical_thread.
   bool ShouldDrop(const grpc_core::UniquePtr<char>** category_name) const;
 
   const DropCategoryList& drop_category_list() const {

+ 16 - 14
src/core/ext/filters/client_channel/xds/xds_client.cc

@@ -46,7 +46,7 @@
 #include "src/core/lib/gprpp/orphanable.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/gprpp/sync.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 #include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/iomgr/timer.h"
@@ -257,7 +257,8 @@ class XdsClient::ChannelState::StateWatcher
     : public AsyncConnectivityStateWatcherInterface {
  public:
   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
-      : AsyncConnectivityStateWatcherInterface(parent->xds_client()->combiner_),
+      : AsyncConnectivityStateWatcherInterface(
+            parent->xds_client()->logical_thread_),
         parent_(std::move(parent)) {}
 
  private:
@@ -487,7 +488,7 @@ template <typename T>
 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
     void* arg, grpc_error* error) {
   RetryableCall* calld = static_cast<RetryableCall*>(arg);
-  calld->chand_->xds_client()->combiner_->Run(
+  calld->chand_->xds_client()->logical_thread_->Run(
       Closure::ToFunction(GRPC_CLOSURE_INIT(&calld->on_retry_timer_,
                                             OnRetryTimerLocked, calld, nullptr),
                           GRPC_ERROR_REF(error)),
@@ -633,7 +634,7 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
     void* arg, grpc_error* error) {
   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
-  ads_calld->xds_client()->combiner_->Run(
+  ads_calld->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&ads_calld->on_response_received_,
                             OnResponseReceivedLocked, ads_calld, nullptr),
@@ -792,7 +793,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
     void* arg, grpc_error* error) {
   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
-  ads_calld->xds_client()->combiner_->Run(
+  ads_calld->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&ads_calld->on_status_received_,
                             OnStatusReceivedLocked, ads_calld, nullptr),
@@ -855,7 +856,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::
 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
     void* arg, grpc_error* error) {
   Reporter* self = static_cast<Reporter*>(arg);
-  self->xds_client()->combiner_->Run(
+  self->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&self->on_next_report_timer_,
                             OnNextReportTimerLocked, self, nullptr),
@@ -917,7 +918,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
     void* arg, grpc_error* error) {
   Reporter* self = static_cast<Reporter*>(arg);
-  self->xds_client()->combiner_->Run(
+  self->xds_client()->logical_thread_->Run(
       Closure::ToFunction(GRPC_CLOSURE_INIT(&self->on_report_done_,
                                             OnReportDoneLocked, self, nullptr),
                           GRPC_ERROR_REF(error)),
@@ -1087,7 +1088,7 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
     void* arg, grpc_error* error) {
   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
-  lrs_calld->xds_client()->combiner_->Run(
+  lrs_calld->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_,
                             OnInitialRequestSentLocked, lrs_calld, nullptr),
@@ -1108,7 +1109,7 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked(
 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
     void* arg, grpc_error* error) {
   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
-  lrs_calld->xds_client()->combiner_->Run(
+  lrs_calld->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_,
                             OnResponseReceivedLocked, lrs_calld, nullptr),
@@ -1209,7 +1210,7 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
     void* arg, grpc_error* error) {
   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
-  lrs_calld->xds_client()->combiner_->Run(
+  lrs_calld->xds_client()->logical_thread_->Run(
       Closure::ToFunction(
           GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_,
                             OnStatusReceivedLocked, lrs_calld, nullptr),
@@ -1263,13 +1264,13 @@ grpc_core::UniquePtr<char> GenerateBuildVersionString() {
 
 }  // namespace
 
-XdsClient::XdsClient(RefCountedPtr<LogicalThread> combiner,
+XdsClient::XdsClient(RefCountedPtr<LogicalThread> logical_thread,
                      grpc_pollset_set* interested_parties,
                      StringView server_name,
                      std::unique_ptr<ServiceConfigWatcherInterface> watcher,
                      const grpc_channel_args& channel_args, grpc_error** error)
     : build_version_(GenerateBuildVersionString()),
-      combiner_(std::move(combiner)),
+      logical_thread_(std::move(logical_thread)),
       interested_parties_(interested_parties),
       bootstrap_(XdsBootstrap::ReadFromFile(error)),
       server_name_(StringViewToCString(server_name)),
@@ -1291,8 +1292,9 @@ XdsClient::XdsClient(RefCountedPtr<LogicalThread> combiner,
     // TODO(juanlishen): Start LDS call and do not return service config
     // until we get the first LDS response.
     XdsClient* self = Ref().release();
-    combiner_->Run([self]() { NotifyOnServiceConfig(self, GRPC_ERROR_NONE); },
-                   DEBUG_LOCATION);
+    logical_thread_->Run(
+        [self]() { NotifyOnServiceConfig(self, GRPC_ERROR_NONE); },
+        DEBUG_LOCATION);
   }
 }
 

+ 3 - 3
src/core/ext/filters/client_channel/xds/xds_client.h

@@ -31,7 +31,7 @@
 #include "src/core/lib/gprpp/ref_counted.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/gprpp/string_view.h"
-#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/logical_thread.h"
 
 namespace grpc_core {
 
@@ -72,7 +72,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
   // If *error is not GRPC_ERROR_NONE after construction, then there was
   // an error initializing the client.
-  XdsClient(RefCountedPtr<LogicalThread> combiner,
+  XdsClient(RefCountedPtr<LogicalThread> logical_thread,
             grpc_pollset_set* interested_parties, StringView server_name,
             std::unique_ptr<ServiceConfigWatcherInterface> watcher,
             const grpc_channel_args& channel_args, grpc_error** error);
@@ -197,7 +197,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
 
   UniquePtr<char> build_version_;
 
-  RefCountedPtr<LogicalThread> combiner_;
+  RefCountedPtr<LogicalThread> logical_thread_;
   grpc_pollset_set* interested_parties_;
 
   std::unique_ptr<XdsBootstrap> bootstrap_;

+ 17 - 15
src/core/ext/filters/client_channel/xds/xds_client_stats.h

@@ -137,21 +137,22 @@ class XdsClientStats {
     // If the refcount is 0, there won't be new calls recorded to the
     // LocalityStats, so the LocalityStats can be safely deleted when all the
     // in-progress calls have finished.
-    // Only be called from the control plane combiner.
+    // Only be called from the control plane logical_thread.
     void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); }
-    // Might be called from the control plane combiner or the data plane
-    // combiner.
+    // Might be called from the control plane logical_thread or the data plane
+    // logical_thread.
     // TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged,
-    //  this method will also only be invoked in the control plane combiner.
-    //  We may then be able to simplify the LocalityStats' lifetime by making it
-    //  RefCounted<> and populating the protobuf in its dtor.
+    //  this method will also only be invoked in the control plane
+    //  logical_thread. We may then be able to simplify the LocalityStats'
+    //  lifetime by making it RefCounted<> and populating the protobuf in its
+    //  dtor.
     void UnrefByPicker() { picker_refcount_.FetchSub(1, MemoryOrder::ACQ_REL); }
-    // Only be called from the control plane combiner.
+    // Only be called from the control plane logical_thread.
     // The only place where the picker_refcount_ can be increased is
     // RefByPicker(), which also can only be called from the control plane
-    // combiner. Also, if the picker_refcount_ is 0, total_requests_in_progress_
-    // can't be increased from 0. So it's safe to delete the LocalityStats right
-    // after this method returns true.
+    // logical_thread. Also, if the picker_refcount_ is 0,
+    // total_requests_in_progress_ can't be increased from 0. So it's safe to
+    // delete the LocalityStats right after this method returns true.
     bool IsSafeToDelete() {
       return picker_refcount_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0 &&
              total_requests_in_progress_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0;
@@ -168,12 +169,12 @@ class XdsClientStats {
     Atomic<uint64_t> total_issued_requests_{0};
     // Protects load_metric_stats_. A mutex is necessary because the length of
     // load_metric_stats_ can be accessed by both the callback intercepting the
-    // call's recv_trailing_metadata (not from any combiner) and the load
-    // reporting thread (from the control plane combiner).
+    // call's recv_trailing_metadata (not from any logical_thread) and the load
+    // reporting thread (from the control plane logical_thread).
     Mutex load_metric_stats_mu_;
     LoadMetricMap load_metric_stats_;
-    // Can be accessed from either the control plane combiner or the data plane
-    // combiner.
+    // Can be accessed from either the control plane logical_thread or the data
+    // plane logical_thread.
     Atomic<uint8_t> picker_refcount_{0};
   };
 
@@ -219,7 +220,8 @@ class XdsClientStats {
   Atomic<uint64_t> total_dropped_requests_{0};
   // Protects dropped_requests_. A mutex is necessary because the length of
   // dropped_requests_ can be accessed by both the picker (from data plane
-  // combiner) and the load reporting thread (from the control plane combiner).
+  // logical_thread) and the load reporting thread (from the control plane
+  // logical_thread).
   Mutex dropped_requests_mu_;
   DroppedRequestsMap dropped_requests_;
   // The timestamp of last reporting. For the LB-policy-wide first report, the