|  | @@ -34,6 +34,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gpr/tls.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/gprpp/atomic.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/executor.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/pollset.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/timer.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/profiling/timers.h"
 | 
	
	
		
			
				|  | @@ -200,7 +201,7 @@ struct cq_vtable {
 | 
	
		
			
				|  |  |    bool (*begin_op)(grpc_completion_queue* cq, void* tag);
 | 
	
		
			
				|  |  |    void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  |                   void (*done)(void* done_arg, grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                 void* done_arg, grpc_cq_completion* storage);
 | 
	
		
			
				|  |  | +                 void* done_arg, grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  |    grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
 | 
	
		
			
				|  |  |                       void* reserved);
 | 
	
		
			
				|  |  |    grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
 | 
	
	
		
			
				|  | @@ -354,23 +355,20 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
 | 
	
		
			
				|  |  |  // queue. The done argument is a callback that will be invoked when it is
 | 
	
		
			
				|  |  |  // safe to free up that storage. The storage MUST NOT be freed until the
 | 
	
		
			
				|  |  |  // done callback is invoked.
 | 
	
		
			
				|  |  | -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  | -                               grpc_error* error,
 | 
	
		
			
				|  |  | -                               void (*done)(void* done_arg,
 | 
	
		
			
				|  |  | -                                            grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                               void* done_arg, grpc_cq_completion* storage);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  | -                                grpc_error* error,
 | 
	
		
			
				|  |  | -                                void (*done)(void* done_arg,
 | 
	
		
			
				|  |  | -                                             grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                                void* done_arg, grpc_cq_completion* storage);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  | -                                   grpc_error* error,
 | 
	
		
			
				|  |  | -                                   void (*done)(void* done_arg,
 | 
	
		
			
				|  |  | -                                                grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                                   void* done_arg, grpc_cq_completion* storage);
 | 
	
		
			
				|  |  | +static void cq_end_op_for_next(
 | 
	
		
			
				|  |  | +    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | +    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void cq_end_op_for_pluck(
 | 
	
		
			
				|  |  | +    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | +    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void cq_end_op_for_callback(
 | 
	
		
			
				|  |  | +    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | +    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
 | 
	
		
			
				|  |  |                            void* reserved);
 | 
	
	
		
			
				|  | @@ -674,11 +672,10 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
 | 
	
		
			
				|  |  |  /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
 | 
	
		
			
				|  |  |   * completion
 | 
	
		
			
				|  |  |   * type of GRPC_CQ_NEXT) */
 | 
	
		
			
				|  |  | -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  | -                               grpc_error* error,
 | 
	
		
			
				|  |  | -                               void (*done)(void* done_arg,
 | 
	
		
			
				|  |  | -                                            grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                               void* done_arg, grpc_cq_completion* storage) {
 | 
	
		
			
				|  |  | +static void cq_end_op_for_next(
 | 
	
		
			
				|  |  | +    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | +    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal) {
 | 
	
		
			
				|  |  |    GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
 | 
	
	
		
			
				|  | @@ -754,11 +751,10 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  |  /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
 | 
	
		
			
				|  |  |   * completion
 | 
	
		
			
				|  |  |   * type of GRPC_CQ_PLUCK) */
 | 
	
		
			
				|  |  | -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  | -                                grpc_error* error,
 | 
	
		
			
				|  |  | -                                void (*done)(void* done_arg,
 | 
	
		
			
				|  |  | -                                             grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                                void* done_arg, grpc_cq_completion* storage) {
 | 
	
		
			
				|  |  | +static void cq_end_op_for_pluck(
 | 
	
		
			
				|  |  | +    grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  | +    void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal) {
 | 
	
		
			
				|  |  |    GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
 | 
	
	
		
			
				|  | @@ -821,15 +817,19 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void functor_callback(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +  auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
 | 
	
		
			
				|  |  | +  functor->functor_run(functor, error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
 | 
	
		
			
				|  |  |  static void cq_end_op_for_callback(
 | 
	
		
			
				|  |  |      grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  |      void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
 | 
	
		
			
				|  |  | -    grpc_cq_completion* storage) {
 | 
	
		
			
				|  |  | +    grpc_cq_completion* storage, bool internal) {
 | 
	
		
			
				|  |  |    GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
 | 
	
		
			
				|  |  | -  bool is_success = (error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
 | 
	
		
			
				|  |  |        (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
 | 
	
	
		
			
				|  | @@ -856,16 +856,25 @@ static void cq_end_op_for_callback(
 | 
	
		
			
				|  |  |      cq_finish_shutdown_callback(cq);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
 | 
	
		
			
				|  |  | -  grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success);
 | 
	
		
			
				|  |  | +  if (internal) {
 | 
	
		
			
				|  |  | +    grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
 | 
	
		
			
				|  |  | +                                                   (error == GRPC_ERROR_NONE));
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | +        GRPC_CLOSURE_CREATE(
 | 
	
		
			
				|  |  | +            functor_callback, functor,
 | 
	
		
			
				|  |  | +            grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
 | 
	
		
			
				|  |  | +        GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
 | 
	
		
			
				|  |  |                      void (*done)(void* done_arg, grpc_cq_completion* storage),
 | 
	
		
			
				|  |  | -                    void* done_arg, grpc_cq_completion* storage) {
 | 
	
		
			
				|  |  | -  cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
 | 
	
		
			
				|  |  | +                    void* done_arg, grpc_cq_completion* storage,
 | 
	
		
			
				|  |  | +                    bool internal) {
 | 
	
		
			
				|  |  | +  cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct {
 | 
	
	
		
			
				|  | @@ -1343,7 +1352,11 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(cqd->shutdown_called);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
 | 
	
		
			
				|  |  | -  grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | +      GRPC_CLOSURE_CREATE(
 | 
	
		
			
				|  |  | +          functor_callback, callback,
 | 
	
		
			
				|  |  | +          grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
 | 
	
		
			
				|  |  | +      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void cq_shutdown_callback(grpc_completion_queue* cq) {
 |