Jelajahi Sumber

Executor sketch

Craig Tiller 10 tahun lalu
induk
melakukan
1766a917e8
2 mengubah file dengan 193 tambahan dan 94 penghapusan
  1. 27 9
      src/core/transport/chttp2/internal.h
  2. 166 85
      src/core/transport/chttp2_transport.c

+ 27 - 9
src/core/transport/chttp2/internal.h

@@ -290,24 +290,39 @@ struct grpc_chttp2_transport_parsing {
   grpc_chttp2_outstanding_ping pings;
 };
 
+typedef struct grpc_chttp2_executor_action_header {
+  grpc_chttp2_stream *stream;
+  void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg);
+  struct grpc_chttp2_executor_action_header *next;
+  void *arg;
+} grpc_chttp2_executor_action_header;
+
 struct grpc_chttp2_transport {
   grpc_transport base; /* must be first */
+  gpr_refcount refs;
   grpc_endpoint *ep;
   grpc_mdctx *metadata_context;
-  gpr_refcount refs;
 
-  gpr_mu mu;
+  struct {
+    gpr_mu mu;
+
+    /** is a thread currently in the global lock */
+    gpr_uint8 global_active;
+    /** is a thread currently writing */
+    gpr_uint8 writing_active;
+    /** is a thread currently parsing */
+    gpr_uint8 parsing_active;
+    /** is a thread currently executing channel callbacks */
+    gpr_uint8 channel_callback_active;
+
+    grpc_chttp2_executor_action_header *pending_actions;
+  } executor;
 
   /** is the transport destroying itself? */
   gpr_uint8 destroying;
   /** has the upper layer closed the transport? */
   gpr_uint8 closed;
 
-  /** is a thread currently writing */
-  gpr_uint8 writing_active;
-  /** is a thread currently parsing */
-  gpr_uint8 parsing_active;
-
   /** is there a read request to the endpoint outstanding? */
   gpr_uint8 endpoint_reading;
 
@@ -343,8 +358,6 @@ struct grpc_chttp2_transport {
   grpc_chttp2_stream **accepting_stream;
 
   struct {
-    /** is a thread currently performing channel callbacks */
-    gpr_uint8 executing;
     /** transport channel-level callback */
     const grpc_transport_callbacks *cb;
     /** user data for cb calls */
@@ -615,6 +628,11 @@ void grpc_chttp2_for_all_streams(
 void grpc_chttp2_parsing_become_skip_parser(
     grpc_chttp2_transport_parsing *transport_parsing);
 
+void grpc_chttp2_run_with_global_lock(
+  grpc_chttp2_transport *transport, grpc_chttp2_stream *optional_stream,
+  void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg),
+  void *arg, size_t sizeof_arg);
+
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)

+ 166 - 85
src/core/transport/chttp2_transport.c

@@ -78,8 +78,10 @@ int grpc_flowctl_trace = 0;
 
 static const grpc_transport_vtable vtable;
 
+#if 0
 static void lock(grpc_chttp2_transport *t);
 static void unlock(grpc_chttp2_transport *t);
+#endif
 
 static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
 static void unlock_check_read_write_state(grpc_chttp2_transport *t);
@@ -101,9 +103,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
 static void drop_connection(grpc_chttp2_transport *t);
 
 /** Perform a transport_op */
-static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
-                              grpc_chttp2_stream_global *stream_global,
-                              grpc_transport_op *op);
+static void perform_op_locked(grpc_chttp2_transport *t,
+                              grpc_chttp2_stream *s,
+                              void *transport_op);
 
 /** Cancel a stream: coming from the transport API */
 static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
@@ -112,12 +114,15 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
 
 /** Add endpoint from this transport to pollset */
 static void add_to_pollset_locked(grpc_chttp2_transport *t,
-                                  grpc_pollset *pollset);
+  grpc_chttp2_stream *s_ignored,
+                                  void *pollset);
 
 /** Start new streams that have been created if we can */
 static void maybe_start_some_streams(
     grpc_chttp2_transport_global *transport_global);
 
+static void finish_global_actions(grpc_chttp2_transport *t);
+
 /*
  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
  */
@@ -125,7 +130,7 @@ static void maybe_start_some_streams(
 static void destruct_transport(grpc_chttp2_transport *t) {
   size_t i;
 
-  gpr_mu_lock(&t->mu);
+  gpr_mu_lock(&t->executor.mu);
 
   GPR_ASSERT(t->ep == NULL);
 
@@ -151,8 +156,8 @@ static void destruct_transport(grpc_chttp2_transport *t) {
   grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
   grpc_chttp2_stream_map_destroy(&t->new_stream_map);
 
-  gpr_mu_unlock(&t->mu);
-  gpr_mu_destroy(&t->mu);
+  gpr_mu_unlock(&t->executor.mu);
+  gpr_mu_destroy(&t->executor.mu);
 
   /* callback remaining pings: they're not allowed to call into the transpot,
      and maybe they hold resources that need to be freed */
@@ -215,7 +220,7 @@ static void init_transport(grpc_chttp2_transport *t,
   t->ep = ep;
   /* one ref is for destroy, the other for when ep becomes NULL */
   gpr_ref_init(&t->refs, 2);
-  gpr_mu_init(&t->mu);
+  gpr_mu_init(&t->executor.mu);
   grpc_mdctx_ref(mdctx);
   t->metadata_context = mdctx;
   t->endpoint_reading = 1;
@@ -311,18 +316,19 @@ static void init_transport(grpc_chttp2_transport *t,
     }
   }
 
-  gpr_mu_lock(&t->mu);
-  t->channel_callback.executing = 1;
+  gpr_mu_lock(&t->executor.mu);
+  t->executor.channel_callback_active = 1;
+  t->executor.global_active = 1;
   REF_TRANSPORT(t, "init"); /* matches unref at end of this function */
-  gpr_mu_unlock(&t->mu);
+  gpr_mu_unlock(&t->executor.mu);
 
   sr = setup(arg, &t->base, t->metadata_context);
 
-  lock(t);
   t->channel_callback.cb = sr.callbacks;
   t->channel_callback.cb_user_data = sr.user_data;
-  t->channel_callback.executing = 0;
-  unlock(t);
+  t->executor.channel_callback_active = 0;
+
+  finish_global_actions(t);
 
   REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
   recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
@@ -330,18 +336,18 @@ static void init_transport(grpc_chttp2_transport *t,
   UNREF_TRANSPORT(t, "init");
 }
 
-static void destroy_transport(grpc_transport *gt) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-
-  lock(t);
+static void destroy_transport_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) {
   t->destroying = 1;
   drop_connection(t);
-  unlock(t);
+}
 
+static void destroy_transport(grpc_transport *gt) {
+  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  grpc_chttp2_run_with_global_lock(t, NULL, destroy_transport_locked, NULL, 0);
   UNREF_TRANSPORT(t, "destroy");
 }
 
-static void close_transport_locked(grpc_chttp2_transport *t) {
+static void close_transport_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) {
   if (!t->closed) {
     t->closed = 1;
     if (t->ep) {
@@ -352,19 +358,32 @@ static void close_transport_locked(grpc_chttp2_transport *t) {
 
 static void close_transport(grpc_transport *gt) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  gpr_mu_lock(&t->mu);
-  close_transport_locked(t);
-  gpr_mu_unlock(&t->mu);
+  grpc_chttp2_run_with_global_lock(t, NULL, close_transport_locked, NULL, 0);
+}
+
+typedef struct {
+  grpc_status_code status;
+  gpr_slice debug_data;
+} goaway_arg;
+
+static void goaway_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) {
+  goaway_arg *arg = a;
+  grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
+                            grpc_chttp2_grpc_status_to_http2_error(arg->status),
+                            arg->debug_data, &t->global.qbuf);
 }
 
 static void goaway(grpc_transport *gt, grpc_status_code status,
                    gpr_slice debug_data) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  lock(t);
-  grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
-                            grpc_chttp2_grpc_status_to_http2_error(status),
-                            debug_data, &t->global.qbuf);
-  unlock(t);
+  goaway_arg arg;
+  arg.status = status;
+  arg.debug_data = debug_data;
+  grpc_chttp2_run_with_global_lock(t, NULL, goaway_locked, &arg, sizeof(arg));
+}
+
+static void finish_init_stream_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg_ignored) {
+  grpc_chttp2_register_stream(t, s);
 }
 
 static int init_stream(grpc_transport *gt, grpc_stream *gs,
@@ -382,10 +401,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
 
   REF_TRANSPORT(t, "stream");
 
-  lock(t);
-  grpc_chttp2_register_stream(t, s);
   if (server_data) {
-    GPR_ASSERT(t->parsing_active);
+    GPR_ASSERT(t->executor.parsing_active);
     s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
     s->global.outgoing_window =
         t->global
@@ -398,8 +415,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
     s->global.in_stream_map = 1;
   }
 
-  if (initial_op) perform_op_locked(&t->global, &s->global, initial_op);
-  unlock(t);
+  grpc_chttp2_run_with_global_lock(t, s, finish_init_stream_locked, NULL, 0);
+  if (initial_op) grpc_chttp2_run_with_global_lock(t, s, perform_op_locked, initial_op, sizeof(*initial_op));
 
   return 0;
 }
@@ -407,8 +424,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
 static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+
   int i;
 
+#if 0
   gpr_mu_lock(&t->mu);
 
   GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
@@ -424,6 +443,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
   grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global);
 
   gpr_mu_unlock(&t->mu);
+#endif
 
   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     GPR_ASSERT(!s->included[i]);
@@ -474,36 +494,92 @@ static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream
  * LOCK MANAGEMENT
  */
 
-/* We take a grpc_chttp2_transport-global lock in response to calls coming in
-   from above,
-   and in response to data being received from below. New data to be written
-   is always queued, as are callbacks to process data. During unlock() we
-   check our todo lists and initiate callbacks and flush writes. */
+static void finish_global_actions(grpc_chttp2_transport *t) {
+  grpc_chttp2_executor_action_header *hdr;
+  grpc_chttp2_executor_action_header *next;
+  grpc_iomgr_closure *run_closures;
 
-static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
+  for (;;) {
+    unlock_check_read_write_state(t);
+    if (!t->executor.writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE &&
+        grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
+      t->executor.writing_active = 1;
+      REF_TRANSPORT(t, "writing");
+      grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+    }
+    unlock_check_channel_callbacks(t);
 
-static void unlock(grpc_chttp2_transport *t) {
-  grpc_iomgr_closure *run_closures;
+    run_closures = t->global.pending_closures;
+    t->global.pending_closures = NULL;
+
+    gpr_mu_lock(&t->executor.mu);
+    t->executor.global_active = 0;
+    gpr_mu_unlock(&t->executor.mu);
 
-  unlock_check_read_write_state(t);
-  if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE &&
-      grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
-    t->writing_active = 1;
-    REF_TRANSPORT(t, "writing");
-    grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+    while (run_closures) {
+      grpc_iomgr_closure *next = run_closures->next;
+      run_closures->cb(run_closures->cb_arg, run_closures->success);
+      run_closures = next;
+    }
+
+    gpr_mu_lock(&t->executor.mu);
+    if (!t->executor.global_active && t->executor.pending_actions) {
+      t->executor.global_active = 1;
+      hdr = t->executor.pending_actions;
+      t->executor.pending_actions = NULL;
+      gpr_mu_unlock(&t->executor.mu);
+      while (hdr != NULL) {
+        hdr->action(t, hdr->stream, hdr->arg);
+        next = hdr->next;
+        gpr_free(hdr);
+        hdr = next;
+      }
+      continue;
+    }
+    gpr_mu_unlock(&t->executor.mu);
+    break;
   }
-  /* unlock_check_parser(t); */
-  unlock_check_channel_callbacks(t);
+}
 
-  run_closures = t->global.pending_closures;
-  t->global.pending_closures = NULL;
+void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stream *optional_stream,
+  void (*action)(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *arg),
+  void *arg, size_t sizeof_arg) {
+  grpc_chttp2_executor_action_header *hdr;
 
-  gpr_mu_unlock(&t->mu);
+  gpr_mu_lock(&t->executor.mu);
+
+  for (;;) {
+    if (!t->executor.global_active) {
+      t->executor.global_active = 1;
+      GPR_ASSERT(t->executor.pending_actions == NULL);
+      gpr_mu_unlock(&t->executor.mu);
+
+      action(t, optional_stream, arg);
+
+      finish_global_actions(t);
+    } else {
+      gpr_mu_unlock(&t->executor.mu);
+
+      hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
+      hdr->stream = optional_stream;
+      hdr->action = action;
+      if (sizeof_arg == 0) {
+        hdr->arg = arg;
+      } else {
+        hdr->arg = hdr + 1;
+        memcpy(hdr->arg, arg, sizeof_arg);
+      }
 
-  while (run_closures) {
-    grpc_iomgr_closure *next = run_closures->next;
-    run_closures->cb(run_closures->cb_arg, run_closures->success);
-    run_closures = next;
+      gpr_mu_lock(&t->executor.mu);
+      if (!t->executor.global_active) {
+        gpr_free(hdr);
+        continue;
+      }
+      hdr->next = t->executor.pending_actions;
+      t->executor.pending_actions = hdr;
+      gpr_mu_unlock(&t->executor.mu);
+    }
+    break;
   }
 }
 
@@ -526,6 +602,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
   }
 }
 
+#if 0
 void grpc_chttp2_terminate_writing(
     grpc_chttp2_transport_writing *transport_writing, int success) {
   grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
@@ -553,6 +630,7 @@ void grpc_chttp2_terminate_writing(
 
   UNREF_TRANSPORT(t, "writing");
 }
+#endif
 
 static void writing_action(void *gt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = gt;
@@ -622,9 +700,13 @@ static void maybe_start_some_streams(
   }
 }
 
-static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
-                              grpc_chttp2_stream_global *stream_global,
-                              grpc_transport_op *op) {
+static void perform_op_locked(grpc_chttp2_transport *t,
+                              grpc_chttp2_stream *s,
+                              void *transport_op) {
+  grpc_chttp2_transport_global *transport_global = &t->global;
+  grpc_chttp2_stream_global *stream_global = &s->global;
+  grpc_transport_op *op = transport_op;
+
   if (op->cancel_with_status != GRPC_STATUS_OK) {
     cancel_from_api(transport_global, stream_global, op->cancel_with_status);
   }
@@ -671,7 +753,7 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
   }
 
   if (op->bind_pollset) {
-    add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global),
+    add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), NULL,
                           op->bind_pollset);
   }
 
@@ -684,17 +766,11 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs,
                        grpc_transport_op *op) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
-
-  lock(t);
-  perform_op_locked(&t->global, &s->global, op);
-  unlock(t);
+  grpc_chttp2_run_with_global_lock(t, s, perform_op_locked, op, sizeof(*op));
 }
 
-static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+static void send_ping_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) {
   grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
-
-  lock(t);
   p->next = &t->global.pings;
   p->prev = p->next->prev;
   p->prev->next = p->next->prev = p;
@@ -706,9 +782,13 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
   p->id[5] = (t->global.ping_counter >> 16) & 0xff;
   p->id[6] = (t->global.ping_counter >> 8) & 0xff;
   p->id[7] = t->global.ping_counter & 0xff;
-  p->on_recv = on_recv;
+  p->on_recv = *(grpc_iomgr_closure**)a;
   gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
-  unlock(t);
+}
+
+static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
+  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  grpc_chttp2_run_with_global_lock(t, NULL, send_ping_locked, &on_recv, sizeof(on_recv));
 }
 
 /*
@@ -751,7 +831,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
   grpc_chttp2_stream_global *stream_global;
   grpc_stream_state state;
 
-  if (!t->parsing_active) {
+  if (!t->executor.parsing_active) {
     /* if a stream is in the stream map, and gets cancelled, we need to ensure
        we are not parsing before continuing the cancellation to keep things in
        a sane state */
@@ -784,7 +864,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
     }
     if (stream_global->write_state == WRITE_STATE_SENT_CLOSE &&
         stream_global->read_closed && stream_global->in_stream_map) {
-      if (t->parsing_active) {
+      if (t->executor.parsing_active) {
         grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
                                                         stream_global);
       } else {
@@ -934,7 +1014,7 @@ static void drop_connection(grpc_chttp2_transport *t) {
   if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
     t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
   }
-  close_transport_locked(t);
+  close_transport_locked(t, NULL, NULL);
   end_all_the_calls(t);
 }
 
@@ -968,6 +1048,7 @@ static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id
 #endif
 
 /* tcp read callback */
+#if 0
 static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
                       grpc_endpoint_cb_status error) {
   grpc_chttp2_transport *t = tp;
@@ -1025,6 +1106,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
       break;
   }
 }
+#endif
 
 static void reading_action(void *pt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = pt;
@@ -1042,6 +1124,10 @@ typedef struct {
   grpc_iomgr_closure closure;
 } notify_goaways_args;
 
+static void finished_channel_callbacks_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *arg_ignored) {
+  t->executor.channel_callback_active = 0;
+}
+
 static void notify_goaways(void *p, int iomgr_success_ignored) {
   notify_goaways_args *a = p;
   grpc_chttp2_transport *t = a->t;
@@ -1051,10 +1137,7 @@ static void notify_goaways(void *p, int iomgr_success_ignored) {
 
   gpr_free(a);
 
-  lock(t);
-  t->channel_callback.executing = 0;
-  unlock(t);
-
+  grpc_chttp2_run_with_global_lock(t, NULL, finished_channel_callbacks_locked, NULL, 0);
   UNREF_TRANSPORT(t, "notify_goaways");
 }
 
@@ -1062,13 +1145,11 @@ static void notify_closed(void *gt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = gt;
   t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
 
-  lock(t);
-  t->channel_callback.executing = 0;
-  unlock(t);
-
+  grpc_chttp2_run_with_global_lock(t, NULL, finished_channel_callbacks_locked, NULL, 0);
   UNREF_TRANSPORT(t, "notify_closed");
 }
 
+#if 0
 static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
   if (t->channel_callback.executing) {
     return;
@@ -1098,6 +1179,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
                                  1);
   }
 }
+#endif
 
 void grpc_chttp2_schedule_closure(
     grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
@@ -1112,7 +1194,8 @@ void grpc_chttp2_schedule_closure(
  */
 
 static void add_to_pollset_locked(grpc_chttp2_transport *t,
-                                  grpc_pollset *pollset) {
+                                  grpc_chttp2_stream *s,
+                                  void *pollset) {
   if (t->ep) {
     grpc_endpoint_add_to_pollset(t->ep, pollset);
   }
@@ -1120,9 +1203,7 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
 
 static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  lock(t);
-  add_to_pollset_locked(t, pollset);
-  unlock(t);
+  grpc_chttp2_run_with_global_lock(t, NULL, add_to_pollset_locked, pollset, 0);
 }
 
 /*