Selaa lähdekoodia

Whitelist internal code path to use ApplicationExecCtx

Karthik Ravi Shankar 6 vuotta sitten
vanhempi
commit
172bb1b30f

+ 17 - 10
src/core/lib/surface/completion_queue.cc

@@ -201,7 +201,7 @@ struct cq_vtable {
   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
                  void (*done)(void* done_arg, grpc_cq_completion* storage),
-                 void* done_arg, grpc_cq_completion* storage);
+                 void* done_arg, grpc_cq_completion* storage, bool internal);
   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
                      void* reserved);
   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
@@ -359,19 +359,19 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
                                grpc_error* error,
                                void (*done)(void* done_arg,
                                             grpc_cq_completion* storage),
-                               void* done_arg, grpc_cq_completion* storage);
+                               void* done_arg, grpc_cq_completion* storage, bool internal = false);
 
 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
                                 grpc_error* error,
                                 void (*done)(void* done_arg,
                                              grpc_cq_completion* storage),
-                                void* done_arg, grpc_cq_completion* storage);
+                                void* done_arg, grpc_cq_completion* storage, bool internal = false);
 
 static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
                                    grpc_error* error,
                                    void (*done)(void* done_arg,
                                                 grpc_cq_completion* storage),
-                                   void* done_arg, grpc_cq_completion* storage);
+                                   void* done_arg, grpc_cq_completion* storage, bool internal = false);
 
 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
                           void* reserved);
@@ -679,7 +679,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
                                grpc_error* error,
                                void (*done)(void* done_arg,
                                             grpc_cq_completion* storage),
-                               void* done_arg, grpc_cq_completion* storage) {
+                               void* done_arg, grpc_cq_completion* storage,
+                               bool internal) {
   GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
 
   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
@@ -759,7 +760,8 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
                                 grpc_error* error,
                                 void (*done)(void* done_arg,
                                              grpc_cq_completion* storage),
-                                void* done_arg, grpc_cq_completion* storage) {
+                                void* done_arg, grpc_cq_completion* storage,
+                                bool internal) {
   GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
 
   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
@@ -831,7 +833,8 @@ static void functor_callback(void* arg, grpc_error* error) {
 static void cq_end_op_for_callback(
     grpc_completion_queue* cq, void* tag, grpc_error* error,
     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
-    grpc_cq_completion* storage) {
+    grpc_cq_completion* storage,
+    bool internal) {
   GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
 
   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
@@ -862,19 +865,23 @@ static void cq_end_op_for_callback(
   }
 
   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
-  GRPC_CLOSURE_SCHED(
+  if (internal) {
+    grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE));
+  } else {
+    GRPC_CLOSURE_SCHED(
       GRPC_CLOSURE_CREATE(
           functor_callback, functor,
           grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
       GRPC_ERROR_REF(error));
+  }
 
   GRPC_ERROR_UNREF(error);
 }
 
 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
                     void (*done)(void* done_arg, grpc_cq_completion* storage),
-                    void* done_arg, grpc_cq_completion* storage) {
-  cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
+                    void* done_arg, grpc_cq_completion* storage, bool internal) {
+  cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
 }
 
 typedef struct {

+ 1 - 1
src/core/lib/surface/completion_queue.h

@@ -77,7 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag);
    grpc_cq_begin_op */
 void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error,
                     void (*done)(void* done_arg, grpc_cq_completion* storage),
-                    void* done_arg, grpc_cq_completion* storage);
+                    void* done_arg, grpc_cq_completion* storage, bool internal);
 
 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc);
 

+ 1 - 1
src/core/lib/surface/server.cc

@@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
   }
 
   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
-                 rc, &rc->completion);
+                 rc, &rc->completion, true);
 }
 
 static void publish_new_rpc(void* arg, grpc_error* error) {