|  | @@ -39,7 +39,6 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/tls.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/atomic.h"
 | 
	
		
			
				|  |  | -#include "src/core/lib/iomgr/closure.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/executor.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/pollset.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/timer.h"
 | 
	
	
		
			
				|  | @@ -209,9 +208,6 @@ struct cq_vtable {
 | 
	
		
			
				|  |  |                       void* reserved);
 | 
	
		
			
				|  |  |    grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  |                        gpr_timespec deadline, void* reserved);
 | 
	
		
			
				|  |  | -  // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer
 | 
	
		
			
				|  |  | -  // needed.
 | 
	
		
			
				|  |  | -  grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace {
 | 
	
	
		
			
				|  | @@ -313,7 +309,7 @@ struct cq_pluck_data {
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct cq_callback_data {
 | 
	
		
			
				|  |  | -  explicit cq_callback_data(
 | 
	
		
			
				|  |  | +  cq_callback_data(
 | 
	
		
			
				|  |  |        grpc_experimental_completion_queue_functor* shutdown_callback)
 | 
	
		
			
				|  |  |        : shutdown_callback(shutdown_callback) {}
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -338,81 +334,6 @@ struct cq_callback_data {
 | 
	
		
			
				|  |  |    grpc_experimental_completion_queue_functor* shutdown_callback;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// TODO(vjpai): Remove all callback_alternative variants when event manager is
 | 
	
		
			
				|  |  | -// the only supported poller.
 | 
	
		
			
				|  |  | -struct cq_callback_alternative_data {
 | 
	
		
			
				|  |  | -  explicit cq_callback_alternative_data(
 | 
	
		
			
				|  |  | -      grpc_experimental_completion_queue_functor* shutdown_callback)
 | 
	
		
			
				|  |  | -      : implementation(SharedNextableCQ()),
 | 
	
		
			
				|  |  | -        shutdown_callback(shutdown_callback) {}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* This just points to a single shared nextable CQ */
 | 
	
		
			
				|  |  | -  grpc_completion_queue* const implementation;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** Number of outstanding events (+1 if not shut down)
 | 
	
		
			
				|  |  | -      Initial count is dropped by grpc_completion_queue_shutdown */
 | 
	
		
			
				|  |  | -  grpc_core::Atomic<intptr_t> pending_events{1};
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** 0 initially. 1 once we initiated shutdown */
 | 
	
		
			
				|  |  | -  bool shutdown_called = false;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** A callback that gets invoked when the CQ completes shutdown */
 | 
	
		
			
				|  |  | -  grpc_experimental_completion_queue_functor* shutdown_callback;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  static grpc_completion_queue* SharedNextableCQ() {
 | 
	
		
			
				|  |  | -    grpc_core::MutexLock lock(&*shared_cq_next_mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    if (shared_cq_next == nullptr) {
 | 
	
		
			
				|  |  | -      shared_cq_next = grpc_completion_queue_create_for_next(nullptr);
 | 
	
		
			
				|  |  | -      int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 1, 32);
 | 
	
		
			
				|  |  | -      threads_remaining.Store(num_nexting_threads,
 | 
	
		
			
				|  |  | -                              grpc_core::MemoryOrder::RELEASE);
 | 
	
		
			
				|  |  | -      for (int i = 0; i < num_nexting_threads; i++) {
 | 
	
		
			
				|  |  | -        grpc_core::Executor::Run(
 | 
	
		
			
				|  |  | -            GRPC_CLOSURE_CREATE(
 | 
	
		
			
				|  |  | -                [](void* arg, grpc_error* /*error*/) {
 | 
	
		
			
				|  |  | -                  grpc_completion_queue* cq =
 | 
	
		
			
				|  |  | -                      static_cast<grpc_completion_queue*>(arg);
 | 
	
		
			
				|  |  | -                  while (true) {
 | 
	
		
			
				|  |  | -                    grpc_event event = grpc_completion_queue_next(
 | 
	
		
			
				|  |  | -                        cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
 | 
	
		
			
				|  |  | -                    if (event.type == GRPC_QUEUE_SHUTDOWN) {
 | 
	
		
			
				|  |  | -                      break;
 | 
	
		
			
				|  |  | -                    }
 | 
	
		
			
				|  |  | -                    GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE);
 | 
	
		
			
				|  |  | -                    // We can always execute the callback inline rather than
 | 
	
		
			
				|  |  | -                    // pushing it to another Executor thread because this
 | 
	
		
			
				|  |  | -                    // thread is definitely running on an executor, does not
 | 
	
		
			
				|  |  | -                    // hold any application locks before executing the callback,
 | 
	
		
			
				|  |  | -                    // and cannot be entered recursively.
 | 
	
		
			
				|  |  | -                    auto* functor = static_cast<
 | 
	
		
			
				|  |  | -                        grpc_experimental_completion_queue_functor*>(event.tag);
 | 
	
		
			
				|  |  | -                    functor->functor_run(functor, event.success);
 | 
	
		
			
				|  |  | -                  }
 | 
	
		
			
				|  |  | -                  if (threads_remaining.FetchSub(
 | 
	
		
			
				|  |  | -                          1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
 | 
	
		
			
				|  |  | -                    grpc_completion_queue_destroy(cq);
 | 
	
		
			
				|  |  | -                  }
 | 
	
		
			
				|  |  | -                },
 | 
	
		
			
				|  |  | -                shared_cq_next, nullptr),
 | 
	
		
			
				|  |  | -            GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
 | 
	
		
			
				|  |  | -            grpc_core::ExecutorJobType::LONG);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    return shared_cq_next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Use manually-constructed Mutex to avoid static construction issues
 | 
	
		
			
				|  |  | -  static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu;
 | 
	
		
			
				|  |  | -  static grpc_completion_queue*
 | 
	
		
			
				|  |  | -      shared_cq_next;  // GUARDED_BY(shared_cq_next_mu)
 | 
	
		
			
				|  |  | -  static grpc_core::Atomic<int> threads_remaining;
 | 
	
		
			
				|  |  | -};
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -grpc_core::ManualConstructor<grpc_core::Mutex>
 | 
	
		
			
				|  |  | -    cq_callback_alternative_data::shared_cq_next_mu;
 | 
	
		
			
				|  |  | -grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr;
 | 
	
		
			
				|  |  | -grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0};
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  }  // namespace
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Completion queue structure */
 | 
	
	
		
			
				|  | @@ -425,12 +346,6 @@ struct grpc_completion_queue {
 | 
	
		
			
				|  |  |    const cq_vtable* vtable;
 | 
	
		
			
				|  |  |    const cq_poller_vtable* poller_vtable;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // The pollset entry is allowed to enable proxy CQs like the
 | 
	
		
			
				|  |  | -  // callback_alternative.
 | 
	
		
			
				|  |  | -  // TODO(vjpai): Consider removing pollset and reverting to previous
 | 
	
		
			
				|  |  | -  // calculation of pollset once callback_alternative is no longer needed.
 | 
	
		
			
				|  |  | -  grpc_pollset* pollset;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  |    void** outstanding_tags;
 | 
	
		
			
				|  |  |    size_t outstanding_tag_count;
 | 
	
	
		
			
				|  | @@ -445,17 +360,13 @@ struct grpc_completion_queue {
 | 
	
		
			
				|  |  |  static void cq_finish_shutdown_next(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  | -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  static void cq_shutdown_next(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  static void cq_shutdown_pluck(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  static void cq_shutdown_callback(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  | -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
 | 
	
		
			
				|  |  |  static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
 | 
	
		
			
				|  |  |  static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
 | 
	
		
			
				|  |  | -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
 | 
	
		
			
				|  |  | -                                                 void* tag);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // A cq_end_op function is called when an operation on a given CQ with
 | 
	
		
			
				|  |  |  // a given tag has completed. The storage argument is a reference to the
 | 
	
	
		
			
				|  | @@ -478,20 +389,12 @@ static void cq_end_op_for_callback(
 | 
	
		
			
				|  |  |      void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  |      grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cq_end_op_for_callback_alternative(
 | 
	
		
			
				|  |  | -    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | -    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | -    grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
 | 
	
		
			
				|  |  |                            void* reserved);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  |                             gpr_timespec deadline, void* reserved);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
 | 
	
		
			
				|  |  | -    grpc_completion_queue* cq);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
 | 
	
		
			
				|  |  |  static void cq_init_next(
 | 
	
		
			
				|  |  |      void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
 | 
	
	
		
			
				|  | @@ -499,39 +402,29 @@ static void cq_init_pluck(
 | 
	
		
			
				|  |  |      void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
 | 
	
		
			
				|  |  |  static void cq_init_callback(
 | 
	
		
			
				|  |  |      void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
 | 
	
		
			
				|  |  | -// poller becomes only option.
 | 
	
		
			
				|  |  | -static void cq_init_callback_alternative(
 | 
	
		
			
				|  |  | -    void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
 | 
	
		
			
				|  |  |  static void cq_destroy_next(void* data);
 | 
	
		
			
				|  |  |  static void cq_destroy_pluck(void* data);
 | 
	
		
			
				|  |  |  static void cq_destroy_callback(void* data);
 | 
	
		
			
				|  |  | -static void cq_destroy_callback_alternative(void* data);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Completion queue vtables based on the completion-type */
 | 
	
		
			
				|  |  | -// TODO(vjpai): Make this const again once we stop needing callback_alternative
 | 
	
		
			
				|  |  | -static cq_vtable g_polling_cq_vtable[] = {
 | 
	
		
			
				|  |  | +static const cq_vtable g_cq_vtable[] = {
 | 
	
		
			
				|  |  |      /* GRPC_CQ_NEXT */
 | 
	
		
			
				|  |  |      {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
 | 
	
		
			
				|  |  |       cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
 | 
	
		
			
				|  |  | -     nullptr, nullptr},
 | 
	
		
			
				|  |  | +     nullptr},
 | 
	
		
			
				|  |  |      /* GRPC_CQ_PLUCK */
 | 
	
		
			
				|  |  |      {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
 | 
	
		
			
				|  |  |       cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
 | 
	
		
			
				|  |  | -     cq_pluck, nullptr},
 | 
	
		
			
				|  |  | +     cq_pluck},
 | 
	
		
			
				|  |  |      /* GRPC_CQ_CALLBACK */
 | 
	
		
			
				|  |  |      {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
 | 
	
		
			
				|  |  |       cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
 | 
	
		
			
				|  |  | -     cq_end_op_for_callback, nullptr, nullptr, nullptr},
 | 
	
		
			
				|  |  | +     cq_end_op_for_callback, nullptr, nullptr},
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Separate vtable for non-polling cqs, assign at init
 | 
	
		
			
				|  |  | -static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) /
 | 
	
		
			
				|  |  | -                                        sizeof(g_polling_cq_vtable[0])];
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
 | 
	
		
			
				|  |  | -#define INLINE_POLLSET_FROM_CQ(cq) \
 | 
	
		
			
				|  |  | +#define POLLSET_FROM_CQ(cq) \
 | 
	
		
			
				|  |  |    ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
 | 
	
		
			
				|  |  | -#define POLLSET_FROM_CQ(cq) (cq->pollset)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -550,46 +443,6 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error);
 | 
	
		
			
				|  |  |  void grpc_cq_global_init() {
 | 
	
		
			
				|  |  |    gpr_tls_init(&g_cached_event);
 | 
	
		
			
				|  |  |    gpr_tls_init(&g_cached_cq);
 | 
	
		
			
				|  |  | -  g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT];
 | 
	
		
			
				|  |  | -  g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK];
 | 
	
		
			
				|  |  | -  g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] =
 | 
	
		
			
				|  |  | -      g_polling_cq_vtable[GRPC_CQ_CALLBACK];
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// TODO(vjpai): Remove when callback_alternative is no longer needed
 | 
	
		
			
				|  |  | -void grpc_cq_init() {
 | 
	
		
			
				|  |  | -  // If the iomgr runs in the background, we can use the preferred callback CQ.
 | 
	
		
			
				|  |  | -  // If the iomgr is non-polling, we cannot use the alternative callback CQ.
 | 
	
		
			
				|  |  | -  if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
 | 
	
		
			
				|  |  | -    cq_callback_alternative_data::shared_cq_next_mu.Init();
 | 
	
		
			
				|  |  | -    g_polling_cq_vtable[GRPC_CQ_CALLBACK] = {
 | 
	
		
			
				|  |  | -        GRPC_CQ_CALLBACK,
 | 
	
		
			
				|  |  | -        sizeof(cq_callback_alternative_data),
 | 
	
		
			
				|  |  | -        cq_init_callback_alternative,
 | 
	
		
			
				|  |  | -        cq_shutdown_callback_alternative,
 | 
	
		
			
				|  |  | -        cq_destroy_callback_alternative,
 | 
	
		
			
				|  |  | -        cq_begin_op_for_callback_alternative,
 | 
	
		
			
				|  |  | -        cq_end_op_for_callback_alternative,
 | 
	
		
			
				|  |  | -        nullptr,
 | 
	
		
			
				|  |  | -        nullptr,
 | 
	
		
			
				|  |  | -        cq_proxy_pollset_for_callback_alternative};
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// TODO(vjpai): Remove when callback_alternative is no longer needed
 | 
	
		
			
				|  |  | -void grpc_cq_shutdown() {
 | 
	
		
			
				|  |  | -  if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
 | 
	
		
			
				|  |  | -    {
 | 
	
		
			
				|  |  | -      grpc_core::MutexLock lock(
 | 
	
		
			
				|  |  | -          &*cq_callback_alternative_data::shared_cq_next_mu);
 | 
	
		
			
				|  |  | -      if (cq_callback_alternative_data::shared_cq_next != nullptr) {
 | 
	
		
			
				|  |  | -        grpc_completion_queue_shutdown(
 | 
	
		
			
				|  |  | -            cq_callback_alternative_data::shared_cq_next);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      cq_callback_alternative_data::shared_cq_next = nullptr;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    cq_callback_alternative_data::shared_cq_next_mu.Destroy();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
 | 
	
	
		
			
				|  | @@ -668,9 +521,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
 | 
	
		
			
				|  |  |        "polling_type=%d)",
 | 
	
		
			
				|  |  |        2, (completion_type, polling_type));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING)
 | 
	
		
			
				|  |  | -                                ? &g_nonpolling_cq_vtable[completion_type]
 | 
	
		
			
				|  |  | -                                : &g_polling_cq_vtable[completion_type];
 | 
	
		
			
				|  |  | +  const cq_vtable* vtable = &g_cq_vtable[completion_type];
 | 
	
		
			
				|  |  |    const cq_poller_vtable* poller_vtable =
 | 
	
		
			
				|  |  |        &g_poller_vtable_by_poller_type[polling_type];
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -687,18 +538,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
 | 
	
		
			
				|  |  |    /* One for destroy(), one for pollset_shutdown */
 | 
	
		
			
				|  |  |    new (&cq->owning_refs) grpc_core::RefCount(2);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
 | 
	
		
			
				|  |  |    vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can
 | 
	
		
			
				|  |  | -  // be removed and the nullptr proxy_pollset value below can be the definition
 | 
	
		
			
				|  |  | -  // of POLLSET_FROM_CQ.
 | 
	
		
			
				|  |  | -  cq->pollset = cq->vtable->proxy_pollset == nullptr
 | 
	
		
			
				|  |  | -                    ? INLINE_POLLSET_FROM_CQ(cq)
 | 
	
		
			
				|  |  | -                    : cq->vtable->proxy_pollset(cq);
 | 
	
		
			
				|  |  | -  // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be
 | 
	
		
			
				|  |  | -  // init'ed in its CQ init.
 | 
	
		
			
				|  |  | -  cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |    return cq;
 | 
	
	
		
			
				|  | @@ -736,17 +578,6 @@ static void cq_destroy_callback(void* data) {
 | 
	
		
			
				|  |  |    cqd->~cq_callback_data();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cq_init_callback_alternative(
 | 
	
		
			
				|  |  | -    void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
 | 
	
		
			
				|  |  | -  new (data) cq_callback_alternative_data(shutdown_callback);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void cq_destroy_callback_alternative(void* data) {
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*>(data);
 | 
	
		
			
				|  |  | -  cqd->~cq_callback_alternative_data();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |    return cq->vtable->cq_completion_type;
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -787,9 +618,7 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |    if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
 | 
	
		
			
				|  |  |      cq->vtable->destroy(DATA_FROM_CQ(cq));
 | 
	
		
			
				|  |  | -    // Only destroy the inlined pollset. If a proxy CQ is used, the proxy
 | 
	
		
			
				|  |  | -    // pollset will be destroyed by the proxy CQ.
 | 
	
		
			
				|  |  | -    cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq));
 | 
	
		
			
				|  |  | +    cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  |      gpr_free(cq->outstanding_tags);
 | 
	
		
			
				|  |  |  #endif
 | 
	
	
		
			
				|  | @@ -840,14 +669,6 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
 | 
	
		
			
				|  |  |    return cqd->pending_events.IncrementIfNonzero();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
 | 
	
		
			
				|  |  | -                                                 void* tag) {
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  | -  return grpc_cq_begin_op(cqd->implementation, tag) &&
 | 
	
		
			
				|  |  | -         cqd->pending_events.IncrementIfNonzero();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  |    gpr_mu_lock(cq->mu);
 | 
	
	
		
			
				|  | @@ -1011,7 +832,7 @@ static void cq_end_op_for_pluck(
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void functor_callback(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +static void functor_callback(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
 | 
	
		
			
				|  |  |    functor->functor_run(functor, error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -1071,40 +892,6 @@ static void cq_end_op_for_callback(
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cq_end_op_for_callback_alternative(
 | 
	
		
			
				|  |  | -    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | -    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | -    grpc_cq_completion* storage, bool internal) {
 | 
	
		
			
				|  |  | -  GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
 | 
	
		
			
				|  |  | -      (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
 | 
	
		
			
				|  |  | -       error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | -    const char* errmsg = grpc_error_string(error);
 | 
	
		
			
				|  |  | -    GRPC_API_TRACE(
 | 
	
		
			
				|  |  | -        "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, "
 | 
	
		
			
				|  |  | -        "done=%p, done_arg=%p, storage=%p)",
 | 
	
		
			
				|  |  | -        6, (cq, tag, errmsg, done, done_arg, storage));
 | 
	
		
			
				|  |  | -    if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
 | 
	
		
			
				|  |  | -        error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Pass through the actual work to the internal nextable CQ
 | 
	
		
			
				|  |  | -  grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage,
 | 
	
		
			
				|  |  | -                 internal);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  cq_check_tag(cq, tag, true); /* Used in debug builds only */
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
 | 
	
		
			
				|  |  | -    cq_finish_shutdown_callback_alternative(cq);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  |                      void (*done)(void* done_arg, grpc_cq_completion* storage),
 | 
	
		
			
				|  |  |                      void* done_arg, grpc_cq_completion* storage,
 | 
	
	
		
			
				|  | @@ -1112,13 +899,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  |    cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
 | 
	
		
			
				|  |  | -    grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq));
 | 
	
		
			
				|  |  | -  return POLLSET_FROM_CQ(cqd->implementation);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  struct cq_is_finished_arg {
 | 
	
		
			
				|  |  |    gpr_atm last_seen_things_queued_ever;
 | 
	
		
			
				|  |  |    grpc_completion_queue* cq;
 | 
	
	
		
			
				|  | @@ -1599,21 +1379,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |        GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  | -  auto* callback = cqd->shutdown_callback;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  GPR_ASSERT(cqd->shutdown_called);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Shutdown the non-proxy pollset
 | 
	
		
			
				|  |  | -  cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq),
 | 
	
		
			
				|  |  | -                              &cq->pollset_shutdown_done);
 | 
	
		
			
				|  |  | -  grpc_core::Executor::Run(
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
 | 
	
		
			
				|  |  | -      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void cq_shutdown_callback(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |    cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1640,33 +1405,6 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |    GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  | -  cq_callback_alternative_data* cqd =
 | 
	
		
			
				|  |  | -      static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* Need an extra ref for cq here because:
 | 
	
		
			
				|  |  | -   * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
 | 
	
		
			
				|  |  | -   * Pollset shutdown decrements the cq ref count which can potentially destroy
 | 
	
		
			
				|  |  | -   * the cq (if that happens to be the last ref).
 | 
	
		
			
				|  |  | -   * Creating an extra ref here prevents the cq from getting destroyed while
 | 
	
		
			
				|  |  | -   * this function is still active */
 | 
	
		
			
				|  |  | -  GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cq->mu);
 | 
	
		
			
				|  |  | -  if (cqd->shutdown_called) {
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cq->mu);
 | 
	
		
			
				|  |  | -    GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  cqd->shutdown_called = true;
 | 
	
		
			
				|  |  | -  if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cq->mu);
 | 
	
		
			
				|  |  | -    cq_finish_shutdown_callback_alternative(cq);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cq->mu);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  /* Shutdown simply drops a ref that we reserved at creation time; if we drop
 | 
	
		
			
				|  |  |     to zero here, then enter shutdown mode and wake up any waiters */
 | 
	
		
			
				|  |  |  void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
 |