Преглед на файлове

Add combiner_run to run closures immediately if we already have the combiner lock

Yash Tibrewal преди 7 години
родител
ревизия
eaddd597d7

+ 1 - 1
src/core/ext/filters/client_channel/subchannel.cc

@@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) {
   gpr_mu_unlock(&w->subchannel->mu);
   GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher");
   gpr_free(w);
-  GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
+  GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
 }
 
 static void on_alarm(void* arg, grpc_error* error) {

+ 7 - 7
src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@@ -1665,8 +1665,8 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
 static void send_ping_locked(grpc_chttp2_transport* t,
                              grpc_closure* on_initiate, grpc_closure* on_ack) {
   if (t->closed_with_error != GRPC_ERROR_NONE) {
-    GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
-    GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
+    GRPC_CLOSURE_RUN(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
+    GRPC_CLOSURE_RUN(on_ack, GRPC_ERROR_REF(t->closed_with_error));
     return;
   }
   grpc_chttp2_ping_queue* pq = &t->ping_queue;
@@ -1683,16 +1683,16 @@ static void send_ping_locked(grpc_chttp2_transport* t,
  */
 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
   if (t->closed_with_error != GRPC_ERROR_NONE) {
-    GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked,
-                       GRPC_ERROR_REF(t->closed_with_error));
-    GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked,
-                       GRPC_ERROR_REF(t->closed_with_error));
+    GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
+                     GRPC_ERROR_REF(t->closed_with_error));
+    GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
+                     GRPC_ERROR_REF(t->closed_with_error));
     return;
   }
   grpc_chttp2_ping_queue* pq = &t->ping_queue;
   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
     /* There is a ping in flight. Add yourself to the inflight closure list. */
-    GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+    GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
     grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
                              &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
     return;

+ 37 - 2
src/core/lib/iomgr/combiner.cc

@@ -63,13 +63,15 @@ struct grpc_combiner {
   gpr_refcount refs;
 };
 
+static void combiner_run(grpc_closure* closure, grpc_error* error);
 static void combiner_exec(grpc_closure* closure, grpc_error* error);
+static void combiner_finally_run(grpc_closure* closure, grpc_error* error);
 static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
 
 static const grpc_closure_scheduler_vtable scheduler = {
-    combiner_exec, combiner_exec, "combiner:immediately"};
+    combiner_run, combiner_exec, "combiner:immediately"};
 static const grpc_closure_scheduler_vtable finally_scheduler = {
-    combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
+    combiner_finally_run, combiner_finally_exec, "combiner:finally"};
 
 static void offload(void* arg, grpc_error* error);
 
@@ -343,6 +345,39 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
   grpc_closure_list_append(&lock->final_list, closure, error);
 }
 
+static void combiner_run(grpc_closure* closure, grpc_error* error) {
+#ifndef NDEBUG
+  closure->scheduled = false;
+  grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler);
+  GRPC_COMBINER_TRACE(gpr_log(
+      GPR_DEBUG,
+      "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]",
+      lock, closure, closure->file_created, closure->line_created,
+      closure->file_initiated, closure->line_initiated));
+  GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner ==
+             lock);
+#endif
+  closure->cb(closure->cb_arg, error);
+  GRPC_ERROR_UNREF(error);
+}
+
+static void combiner_finally_run(grpc_closure* closure, grpc_error* error) {
+#ifndef NDEBUG
+  closure->scheduled = false;
+  grpc_combiner* lock =
+      COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
+  GRPC_COMBINER_TRACE(gpr_log(
+      GPR_DEBUG,
+      "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]",
+      lock, closure, closure->file_created, closure->line_created,
+      closure->file_initiated, closure->line_initiated));
+  GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner ==
+             lock);
+#endif
+  closure->cb(closure->cb_arg, error);
+  GRPC_ERROR_UNREF(error);
+}
+
 static void enqueue_finally(void* closure, grpc_error* error) {
   combiner_finally_exec(static_cast<grpc_closure*>(closure),
                         GRPC_ERROR_REF(error));

+ 2 - 2
src/core/lib/iomgr/tcp_posix.cc

@@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
 
   tcp->read_cb = nullptr;
   tcp->incoming_buffer = nullptr;
-  GRPC_CLOSURE_RUN(cb, error);
+  GRPC_CLOSURE_SCHED(cb, error);
 }
 
 #define MAX_READ_IOVEC 4
@@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
       gpr_log(GPR_INFO, "write: %s", str);
     }
 
-    GRPC_CLOSURE_RUN(cb, error);
+    GRPC_CLOSURE_SCHED(cb, error);
     TCP_UNREF(tcp, "write");
   }
 }