Browse Source

CHTTP2 executor

Craig Tiller 10 years ago
parent
commit
4b074d5274
2 changed files with 94 additions and 80 deletions
  1. 7 0
      src/core/transport/chttp2/internal.h
  2. 87 80
      src/core/transport/chttp2_transport.c

+ 7 - 0
src/core/transport/chttp2/internal.h

@@ -351,6 +351,13 @@ struct grpc_chttp2_transport {
   grpc_iomgr_closure writing_action;
   /** closure to start reading from the endpoint */
   grpc_iomgr_closure reading_action;
+  /** closure to actually do parsing */
+  grpc_iomgr_closure parsing_action;
+
+  struct {
+    size_t nslices;
+    gpr_slice *slices;
+  } executor_parsing;
 
   /** address to place a newly accepted stream - set and unset by
       grpc_chttp2_parsing_accept_stream; used by init_stream to

+ 87 - 80
src/core/transport/chttp2_transport.c

@@ -89,6 +89,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
 /* forward declarations of various callbacks that we'll build closures around */
 static void writing_action(void *t, int iomgr_success_ignored);
 static void reading_action(void *t, int iomgr_success_ignored);
+static void parsing_action(void *t, int iomgr_success_ignored);
 static void notify_closed(void *t, int iomgr_success_ignored);
 
 /** Set a transport level setting, and push it to our peer */
@@ -244,6 +245,7 @@ static void init_transport(grpc_chttp2_transport *t,
   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
   grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
   grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
+  grpc_iomgr_closure_init(&t->parsing_action, parsing_action, t);
 
   gpr_slice_buffer_init(&t->parsing.qbuf);
   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -427,24 +429,6 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
 
   int i;
 
-#if 0
-  gpr_mu_lock(&t->mu);
-
-  GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
-             s->global.id == 0);
-  GPR_ASSERT(!s->global.in_stream_map);
-  grpc_chttp2_unregister_stream(t, s);
-  if (!t->parsing_active && s->global.id) {
-    GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
-                                           s->global.id) == NULL);
-  }
-
-  grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global);
-  grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global);
-
-  gpr_mu_unlock(&t->mu);
-#endif
-
   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     GPR_ASSERT(!s->included[i]);
   }
@@ -540,7 +524,6 @@ void grpc_chttp2_run_with_global_lock(grpc_chttp2_transport *t, grpc_chttp2_stre
   for (;;) {
     if (!t->executor.global_active) {
       t->executor.global_active = 1;
-      GPR_ASSERT(t->executor.pending_actions == NULL);
       gpr_mu_unlock(&t->executor.mu);
 
       action(t, optional_stream, arg);
@@ -591,12 +574,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
   }
 }
 
-#if 0
-void grpc_chttp2_terminate_writing(
-    grpc_chttp2_transport_writing *transport_writing, int success) {
-  grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
-
-  lock(t);
+static void terminate_writing_with_lock(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) {
+  int success = *(int*)a;
 
   if (!success) {
     drop_connection(t);
@@ -607,7 +586,7 @@ void grpc_chttp2_terminate_writing(
 
   /* leave the writing flag up on shutdown to prevent further writes in unlock()
      from starting */
-  t->writing_active = 0;
+  t->executor.writing_active = 0;
   if (t->ep && !t->endpoint_reading) {
     grpc_endpoint_destroy(t->ep);
     t->ep = NULL;
@@ -615,11 +594,14 @@ void grpc_chttp2_terminate_writing(
         t, "disconnect"); /* safe because we'll still have the ref for write */
   }
 
-  unlock(t);
-
   UNREF_TRANSPORT(t, "writing");
 }
-#endif
+
+void grpc_chttp2_terminate_writing(
+    grpc_chttp2_transport_writing *transport_writing, int success) {
+  grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
+  grpc_chttp2_run_with_global_lock(t, NULL, terminate_writing_with_lock, &success, sizeof(success));
+}
 
 static void writing_action(void *gt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = gt;
@@ -879,6 +861,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
     grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
         &stream_global->incoming_metadata, &stream_global->incoming_sopb,
         &stream_global->outstanding_metadata);
+    if (state == GRPC_STREAM_CLOSED) {
+      GPR_ASSERT(!stream_global->in_stream_map);
+      grpc_chttp2_unregister_stream(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global));
+      grpc_chttp2_list_remove_incoming_window_updated(transport_global, stream_global);
+      grpc_chttp2_list_remove_writable_window_update_stream(transport_global, stream_global);
+    }
     grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
     stream_global->published_state = *stream_global->publish_state = state;
     grpc_chttp2_schedule_closure(transport_global,
@@ -922,66 +910,87 @@ static void drop_connection(grpc_chttp2_transport *t) {
   end_all_the_calls(t);
 }
 
+static void recv_data_error_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *a) {
+  size_t i;
+
+  drop_connection(t);
+  t->endpoint_reading = 0;
+  if (!t->executor.writing_active && t->ep) {
+    grpc_endpoint_destroy(t->ep);
+    t->ep = NULL;
+    UNREF_TRANSPORT(
+        t, "disconnect"); /* safe as we still have a ref for read */
+  }
+  UNREF_TRANSPORT(t, "recv_data");
+  for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]);
+}
+
+static void finish_parsing_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, void *a) {
+  size_t i = *(size_t *)a;
+
+  if (i != t->executor_parsing.nslices) {
+    drop_connection(t);
+  }
+  /* merge stream lists */
+  grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                   &t->parsing_stream_map);
+  t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+  /* handle higher level things */
+  grpc_chttp2_publish_reads(&t->global, &t->parsing);
+  t->executor.parsing_active = 0;
+
+  for (; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]);
+
+  if (i == t->executor_parsing.nslices) {
+    grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
+  }
+}
+
+static void parsing_action(void *pt, int iomgr_success_ignored) {
+  size_t i;
+  grpc_chttp2_transport *t = pt;
+  for (i = 0; i < t->executor_parsing.nslices && grpc_chttp2_perform_read(&t->parsing, t->executor_parsing.slices[i]);
+       i++) {
+    gpr_slice_unref(t->executor_parsing.slices[i]);
+  }
+  grpc_chttp2_run_with_global_lock(t, NULL, finish_parsing_locked, &i, sizeof(i));
+}
+
+static void recv_data_ok_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void *a) {
+  size_t i;
+  GPR_ASSERT(!t->executor.parsing_active);
+  if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
+    t->executor.parsing_active = 1;
+    /* merge stream lists */
+    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+                                     &t->parsing_stream_map);
+    grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+    /* schedule more work to do unlocked */
+    grpc_chttp2_schedule_closure(&t->global, &t->parsing_action, 1);
+  } else {
+    for (i = 0; i < t->executor_parsing.nslices; i++) gpr_slice_unref(t->executor_parsing.slices[i]);
+  }
+}
+
 /* tcp read callback */
-#if 0
 static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
                       grpc_endpoint_cb_status error) {
   grpc_chttp2_transport *t = tp;
-  size_t i;
+
+  t->executor_parsing.slices = slices;
+  t->executor_parsing.nslices = nslices;
 
   switch (error) {
     case GRPC_ENDPOINT_CB_SHUTDOWN:
     case GRPC_ENDPOINT_CB_EOF:
     case GRPC_ENDPOINT_CB_ERROR:
-      lock(t);
-      drop_connection(t);
-      t->endpoint_reading = 0;
-      if (!t->writing_active && t->ep) {
-        grpc_endpoint_destroy(t->ep);
-        t->ep = NULL;
-        UNREF_TRANSPORT(
-            t, "disconnect"); /* safe as we still have a ref for read */
-      }
-      unlock(t);
-      UNREF_TRANSPORT(t, "recv_data");
-      for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
+      grpc_chttp2_run_with_global_lock(t, NULL, recv_data_error_locked, NULL, 0);
       break;
     case GRPC_ENDPOINT_CB_OK:
-      lock(t);
-      i = 0;
-      GPR_ASSERT(!t->parsing_active);
-      if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
-        t->parsing_active = 1;
-        /* merge stream lists */
-        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                         &t->parsing_stream_map);
-        grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
-        gpr_mu_unlock(&t->mu);
-        for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
-             i++) {
-          gpr_slice_unref(slices[i]);
-        }
-        gpr_mu_lock(&t->mu);
-        if (i != nslices) {
-          drop_connection(t);
-        }
-        /* merge stream lists */
-        grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                         &t->parsing_stream_map);
-        t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map);
-        /* handle higher level things */
-        grpc_chttp2_publish_reads(&t->global, &t->parsing);
-        t->parsing_active = 0;
-      }
-      if (i == nslices) {
-        grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
-      }
-      unlock(t);
-      for (; i < nslices; i++) gpr_slice_unref(slices[i]);
+      grpc_chttp2_run_with_global_lock(t, NULL, recv_data_ok_locked, NULL, 0);
       break;
   }
 }
-#endif
 
 static void reading_action(void *pt, int iomgr_success_ignored) {
   grpc_chttp2_transport *t = pt;
@@ -1024,9 +1033,8 @@ static void notify_closed(void *gt, int iomgr_success_ignored) {
   UNREF_TRANSPORT(t, "notify_closed");
 }
 
-#if 0
 static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
-  if (t->channel_callback.executing) {
+  if (t->executor.channel_callback_active) {
     return;
   }
   if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) {
@@ -1037,7 +1045,7 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
       a->error = t->global.goaway_error;
       a->text = t->global.goaway_text;
       t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
-      t->channel_callback.executing = 1;
+      t->executor.channel_callback_active = 1;
       grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
       REF_TRANSPORT(t, "notify_goaways");
       grpc_chttp2_schedule_closure(&t->global, &a->closure, 1);
@@ -1048,13 +1056,12 @@ static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
   }
   if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) {
     t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
-    t->channel_callback.executing = 1;
+    t->executor.channel_callback_active = 1;
     REF_TRANSPORT(t, "notify_closed");
     grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed,
                                  1);
   }
 }
-#endif
 
 void grpc_chttp2_schedule_closure(
     grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,