Przeglądaj źródła

Merge pull request #9626 from muxi/lazy-deframe

Implement lazy deframe
Muxi Yan 8 lat temu
rodzic
commit
f452cc5f9a

+ 218 - 120
src/core/ext/transport/chttp2/transport/chttp2_transport.c

@@ -44,6 +44,7 @@
 #include <grpc/support/string_util.h>
 #include <grpc/support/useful.h>
 
+#include "src/core/ext/transport/chttp2/transport/frame_data.h"
 #include "src/core/ext/transport/chttp2/transport/internal.h"
 #include "src/core/ext/transport/chttp2/transport/varint.h"
 #include "src/core/lib/channel/channel_args.h"
@@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
 static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
                                                 void *byte_stream,
                                                 grpc_error *error_ignored);
+static void incoming_byte_stream_publish_error(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+    grpc_error *error);
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+                                       grpc_chttp2_incoming_byte_stream *bs);
 
 static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
                                     grpc_error *error);
@@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
 static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                             grpc_error *error);
 
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+                              grpc_error *error);
+
 /*******************************************************************************
  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
  */
@@ -648,7 +657,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
   /* We reserve one 'active stream' that's dropped when the stream is
      read-closed. The others are for incoming_byte_streams that are actively
      reading */
-  gpr_ref_init(&s->active_streams, 1);
   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
 
   grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -658,6 +666,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
   s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
   grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
                     grpc_schedule_on_exec_ctx);
+  grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
+  grpc_slice_buffer_init(&s->frame_storage);
+  s->pending_byte_stream = false;
+  grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
+                    grpc_combiner_scheduler(t->combiner, false));
 
   GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
 
@@ -675,7 +688,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 
 static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
                                   grpc_error *error) {
-  grpc_byte_stream *bs;
   grpc_chttp2_stream *s = sp;
   grpc_chttp2_transport *t = s->t;
 
@@ -686,9 +698,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
   }
 
-  while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
-    incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
-  }
+  grpc_slice_buffer_destroy_internal(exec_ctx,
+                                     &s->unprocessed_incoming_frames_buffer);
+  grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
 
   grpc_chttp2_list_remove_stalled_by_transport(t, s);
   grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -715,6 +727,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
   grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
   GRPC_ERROR_UNREF(s->read_closed_error);
   GRPC_ERROR_UNREF(s->write_closed_error);
+  GRPC_ERROR_UNREF(s->byte_stream_error);
 
   if (s->incoming_window_delta > 0) {
     GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@@ -1168,8 +1181,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
       s->fetching_send_message = NULL;
       return; /* early out */
     } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
-                                     &s->fetching_slice, UINT32_MAX,
-                                     &s->complete_fetch_locked)) {
+                                     UINT32_MAX, &s->complete_fetch_locked)) {
+      grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+                            &s->fetching_slice);
       add_fetched_slice_locked(exec_ctx, t, s);
     }
   }
@@ -1180,9 +1194,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
   grpc_chttp2_stream *s = gs;
   grpc_chttp2_transport *t = s->t;
   if (error == GRPC_ERROR_NONE) {
-    add_fetched_slice_locked(exec_ctx, t, s);
-    continue_fetching_send_locked(exec_ctx, t, s);
-  } else {
+    error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+                                  &s->fetching_slice);
+    if (error == GRPC_ERROR_NONE) {
+      add_fetched_slice_locked(exec_ctx, t, s);
+      continue_fetching_send_locked(exec_ctx, t, s);
+    }
+  }
+
+  if (error != GRPC_ERROR_NONE) {
     /* TODO(ctiller): what to do here */
     abort();
   }
@@ -1417,8 +1437,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
     GPR_ASSERT(s->recv_message_ready == NULL);
     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
     s->recv_message = op_payload->recv_message.recv_message;
-    if (s->id != 0 &&
-        (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
+    if (s->id != 0 && s->frame_storage.length == 0) {
       incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
     }
     grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@@ -1607,13 +1626,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
                                                       grpc_chttp2_transport *t,
                                                       grpc_chttp2_stream *s) {
-  grpc_byte_stream *bs;
   if (s->recv_initial_metadata_ready != NULL &&
       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
     if (s->seen_error) {
-      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
-             NULL) {
-        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+      if (!s->pending_byte_stream) {
+        grpc_slice_buffer_reset_and_unref_internal(
+            exec_ctx, &s->unprocessed_incoming_frames_buffer);
       }
     }
     grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1626,39 +1645,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
 void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
                                              grpc_chttp2_transport *t,
                                              grpc_chttp2_stream *s) {
-  grpc_byte_stream *bs;
+  grpc_error *error = GRPC_ERROR_NONE;
   if (s->recv_message_ready != NULL) {
-    while (s->final_metadata_requested && s->seen_error &&
-           (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
-               NULL) {
-      incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+    *s->recv_message = NULL;
+    if (s->final_metadata_requested && s->seen_error) {
+      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+      if (!s->pending_byte_stream) {
+        grpc_slice_buffer_reset_and_unref_internal(
+            exec_ctx, &s->unprocessed_incoming_frames_buffer);
+      }
     }
-    if (s->incoming_frames.head != NULL) {
-      *s->recv_message =
-          grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
-      GPR_ASSERT(*s->recv_message != NULL);
+    if (!s->pending_byte_stream) {
+      while (s->unprocessed_incoming_frames_buffer.length > 0 ||
+             s->frame_storage.length > 0) {
+        if (s->unprocessed_incoming_frames_buffer.length == 0) {
+          grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+                                 &s->frame_storage);
+        }
+        error = deframe_unprocessed_incoming_frames(
+            exec_ctx, &s->data_parser, s,
+            &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+        if (error != GRPC_ERROR_NONE) {
+          s->seen_error = true;
+          grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+                                                     &s->frame_storage);
+          grpc_slice_buffer_reset_and_unref_internal(
+              exec_ctx, &s->unprocessed_incoming_frames_buffer);
+          break;
+        } else if (*s->recv_message != NULL) {
+          break;
+        }
+      }
+    }
+    if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
       null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
       *s->recv_message = NULL;
       null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
     }
+    GRPC_ERROR_UNREF(error);
   }
 }
 
 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
                                                        grpc_chttp2_transport *t,
                                                        grpc_chttp2_stream *s) {
-  grpc_byte_stream *bs;
   grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
   if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
       s->write_closed) {
     if (s->seen_error) {
-      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
-             NULL) {
-        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+      if (!s->pending_byte_stream) {
+        grpc_slice_buffer_reset_and_unref_internal(
+            exec_ctx, &s->unprocessed_incoming_frames_buffer);
       }
     }
-    if (s->all_incoming_byte_streams_finished &&
+    bool pending_data = s->pending_byte_stream ||
+                        s->unprocessed_incoming_frames_buffer.length > 0;
+    if (s->read_closed && s->frame_storage.length == 0 &&
+        (!pending_data || s->seen_error) &&
         s->recv_trailing_metadata_finished != NULL) {
       grpc_chttp2_incoming_metadata_buffer_publish(
           exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -1669,14 +1714,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
   }
 }
 
-static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
-                                            grpc_chttp2_transport *t,
-                                            grpc_chttp2_stream *s) {
-  if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
-    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
-  }
-}
-
 static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
                           uint32_t id, grpc_error *error) {
   grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@@ -1685,10 +1722,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     t->incoming_stream = NULL;
     grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
   }
-  if (s->data_parser.parsing_frame != NULL) {
-    grpc_chttp2_incoming_byte_stream_finished(
-        exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
-    s->data_parser.parsing_frame = NULL;
+  if (s->pending_byte_stream) {
+    if (s->on_next != NULL) {
+      grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
+      if (error == GRPC_ERROR_NONE) {
+        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+      }
+      incoming_byte_stream_publish_error(exec_ctx, bs, error);
+      incoming_byte_stream_unref(exec_ctx, bs);
+      s->data_parser.parsing_frame = NULL;
+    } else {
+      GRPC_ERROR_UNREF(s->byte_stream_error);
+      s->byte_stream_error = GRPC_ERROR_REF(error);
+    }
   }
 
   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -1874,7 +1920,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
         s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
       }
     }
-    decrement_active_streams_locked(exec_ctx, t, s);
     grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
     grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
   }
@@ -2412,12 +2457,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
  * BYTE STREAM
  */
 
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+                              grpc_error *error) {
+  grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
+
+  s->pending_byte_stream = false;
+  if (error == GRPC_ERROR_NONE) {
+    grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
+    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
+  } else {
+    GPR_ASSERT(error != GRPC_ERROR_NONE);
+    grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+    s->on_next = NULL;
+    GRPC_ERROR_UNREF(s->byte_stream_error);
+    s->byte_stream_error = GRPC_ERROR_NONE;
+    grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
+    s->byte_stream_error = error;
+  }
+}
+
 static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
                                        grpc_chttp2_incoming_byte_stream *bs) {
   if (gpr_unref(&bs->refs)) {
-    GRPC_ERROR_UNREF(bs->error);
-    grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
-    gpr_mu_destroy(&bs->slice_mu);
     gpr_free(bs);
   }
 }
@@ -2477,47 +2538,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
   grpc_chttp2_transport *t = bs->transport;
   grpc_chttp2_stream *s = bs->stream;
 
-  if (bs->is_tail) {
-    gpr_mu_lock(&bs->slice_mu);
-    size_t cur_length = bs->slices.length;
-    gpr_mu_unlock(&bs->slice_mu);
-    incoming_byte_stream_update_flow_control(
-        exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
-  }
-  gpr_mu_lock(&bs->slice_mu);
-  if (bs->slices.count > 0) {
-    *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
-    grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
-  } else if (bs->error != GRPC_ERROR_NONE) {
-    grpc_closure_run(exec_ctx, bs->next_action.on_complete,
-                     GRPC_ERROR_REF(bs->error));
+  size_t cur_length = s->frame_storage.length;
+  incoming_byte_stream_update_flow_control(
+      exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
+
+  GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+  if (s->frame_storage.length > 0) {
+    grpc_slice_buffer_swap(&s->frame_storage,
+                           &s->unprocessed_incoming_frames_buffer);
+    grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
+  } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
+    grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+                       GRPC_ERROR_REF(s->byte_stream_error));
+    if (s->data_parser.parsing_frame != NULL) {
+      incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+      s->data_parser.parsing_frame = NULL;
+    }
+  } else if (s->read_closed) {
+    if (bs->remaining_bytes != 0) {
+      s->byte_stream_error =
+          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+      grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+                         GRPC_ERROR_REF(s->byte_stream_error));
+      if (s->data_parser.parsing_frame != NULL) {
+        incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+        s->data_parser.parsing_frame = NULL;
+      }
+    } else {
+      /* Should never reach here. */
+      GPR_ASSERT(false);
+    }
   } else {
-    bs->on_next = bs->next_action.on_complete;
-    bs->next = bs->next_action.slice;
+    s->on_next = bs->next_action.on_complete;
   }
-  gpr_mu_unlock(&bs->slice_mu);
   incoming_byte_stream_unref(exec_ctx, bs);
 }
 
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                                     grpc_byte_stream *byte_stream,
-                                     grpc_slice *slice, size_t max_size_hint,
-                                     grpc_closure *on_complete) {
+static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                                      grpc_byte_stream *byte_stream,
+                                      size_t max_size_hint,
+                                      grpc_closure *on_complete) {
   GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
   grpc_chttp2_incoming_byte_stream *bs =
       (grpc_chttp2_incoming_byte_stream *)byte_stream;
-  gpr_ref(&bs->refs);
-  bs->next_action.slice = slice;
-  bs->next_action.max_size_hint = max_size_hint;
-  bs->next_action.on_complete = on_complete;
-  grpc_closure_sched(
-      exec_ctx,
-      grpc_closure_init(
-          &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
-          grpc_combiner_scheduler(bs->transport->combiner, false)),
-      GRPC_ERROR_NONE);
-  GPR_TIMER_END("incoming_byte_stream_next", 0);
-  return 0;
+  grpc_chttp2_stream *s = bs->stream;
+  if (s->unprocessed_incoming_frames_buffer.length > 0) {
+    return true;
+  } else {
+    gpr_ref(&bs->refs);
+    bs->next_action.max_size_hint = max_size_hint;
+    bs->next_action.on_complete = on_complete;
+    grpc_closure_sched(
+        exec_ctx,
+        grpc_closure_init(
+            &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+            grpc_combiner_scheduler(bs->transport->combiner, false)),
+        GRPC_ERROR_NONE);
+    GPR_TIMER_END("incoming_byte_stream_next", 0);
+    return false;
+  }
+}
+
+static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+                                             grpc_byte_stream *byte_stream,
+                                             grpc_slice *slice) {
+  GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
+  grpc_chttp2_incoming_byte_stream *bs =
+      (grpc_chttp2_incoming_byte_stream *)byte_stream;
+  grpc_chttp2_stream *s = bs->stream;
+
+  if (s->unprocessed_incoming_frames_buffer.length > 0) {
+    grpc_error *error = deframe_unprocessed_incoming_frames(
+        exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+        slice, NULL);
+    if (error != GRPC_ERROR_NONE) {
+      return error;
+    }
+  } else {
+    grpc_error *error =
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+    return error;
+  }
+  GPR_TIMER_END("incoming_byte_stream_pull", 0);
+  return GRPC_ERROR_NONE;
 }
 
 static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2527,9 +2631,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
                                                 void *byte_stream,
                                                 grpc_error *error_ignored) {
   grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+  grpc_chttp2_stream *s = bs->stream;
+  grpc_chttp2_transport *t = s->t;
+
   GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
-  decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
   incoming_byte_stream_unref(exec_ctx, bs);
+  s->pending_byte_stream = false;
+  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+  grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
 }
 
 static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2549,50 +2658,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 static void incoming_byte_stream_publish_error(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
     grpc_error *error) {
+  grpc_chttp2_stream *s = bs->stream;
+
   GPR_ASSERT(error != GRPC_ERROR_NONE);
-  grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
-  bs->on_next = NULL;
-  GRPC_ERROR_UNREF(bs->error);
+  grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+  s->on_next = NULL;
+  GRPC_ERROR_UNREF(s->byte_stream_error);
+  s->byte_stream_error = GRPC_ERROR_REF(error);
   grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
                             GRPC_ERROR_REF(error));
-  bs->error = error;
 }
 
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
-                                           grpc_chttp2_incoming_byte_stream *bs,
-                                           grpc_slice slice) {
-  gpr_mu_lock(&bs->slice_mu);
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+    grpc_slice slice, grpc_slice *slice_out) {
+  grpc_chttp2_stream *s = bs->stream;
+
   if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
-    incoming_byte_stream_publish_error(
-        exec_ctx, bs,
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
+    grpc_error *error =
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
+
+    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+    grpc_slice_unref_internal(exec_ctx, slice);
+    return error;
   } else {
     bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
-    if (bs->on_next != NULL) {
-      *bs->next = slice;
-      grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
-      bs->on_next = NULL;
-    } else {
-      grpc_slice_buffer_add(&bs->slices, slice);
+    if (slice_out != NULL) {
+      *slice_out = slice;
     }
+    return GRPC_ERROR_NONE;
   }
-  gpr_mu_unlock(&bs->slice_mu);
 }
 
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
-    grpc_error *error) {
+    grpc_error *error, bool reset_on_error) {
+  grpc_chttp2_stream *s = bs->stream;
+
   if (error == GRPC_ERROR_NONE) {
-    gpr_mu_lock(&bs->slice_mu);
     if (bs->remaining_bytes != 0) {
       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
     }
-    gpr_mu_unlock(&bs->slice_mu);
   }
-  if (error != GRPC_ERROR_NONE) {
-    incoming_byte_stream_publish_error(exec_ctx, bs, error);
+  if (error != GRPC_ERROR_NONE && reset_on_error) {
+    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
   }
   incoming_byte_stream_unref(exec_ctx, bs);
+  return error;
 }
 
 grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2604,26 +2716,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
   incoming_byte_stream->remaining_bytes = frame_size;
   incoming_byte_stream->base.flags = flags;
   incoming_byte_stream->base.next = incoming_byte_stream_next;
+  incoming_byte_stream->base.pull = incoming_byte_stream_pull;
   incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
-  gpr_mu_init(&incoming_byte_stream->slice_mu);
   gpr_ref_init(&incoming_byte_stream->refs, 2);
-  incoming_byte_stream->next_message = NULL;
   incoming_byte_stream->transport = t;
   incoming_byte_stream->stream = s;
-  gpr_ref(&incoming_byte_stream->stream->active_streams);
-  grpc_slice_buffer_init(&incoming_byte_stream->slices);
-  incoming_byte_stream->on_next = NULL;
-  incoming_byte_stream->is_tail = 1;
-  incoming_byte_stream->error = GRPC_ERROR_NONE;
-  grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
-  if (q->head == NULL) {
-    q->head = incoming_byte_stream;
-  } else {
-    q->tail->is_tail = 0;
-    q->tail->next_message = incoming_byte_stream;
-  }
-  q->tail = incoming_byte_stream;
-  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+  s->byte_stream_error = GRPC_ERROR_NONE;
   return incoming_byte_stream;
 }
 

+ 203 - 162
src/core/ext/transport/chttp2/transport/frame_data.c

@@ -40,6 +40,7 @@
 #include <grpc/support/string_util.h>
 #include <grpc/support/useful.h>
 #include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/lib/slice/slice_internal.h"
 #include "src/core/lib/slice/slice_string_helpers.h"
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/transport/transport.h"
@@ -53,16 +54,17 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
 void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
                                      grpc_chttp2_data_parser *parser) {
   if (parser->parsing_frame != NULL) {
-    grpc_chttp2_incoming_byte_stream_finished(
+    GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
         exec_ctx, parser->parsing_frame,
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
+        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
   }
   GRPC_ERROR_UNREF(parser->error);
 }
 
 grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
                                                 uint8_t flags,
-                                                uint32_t stream_id) {
+                                                uint32_t stream_id,
+                                                grpc_chttp2_stream *s) {
   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
     char *msg;
     gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@@ -74,47 +76,14 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
   }
 
   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
-    parser->is_last_frame = 1;
+    s->received_last_frame = true;
   } else {
-    parser->is_last_frame = 0;
+    s->received_last_frame = false;
   }
 
   return GRPC_ERROR_NONE;
 }
 
-void grpc_chttp2_incoming_frame_queue_merge(
-    grpc_chttp2_incoming_frame_queue *head_dst,
-    grpc_chttp2_incoming_frame_queue *tail_src) {
-  if (tail_src->head == NULL) {
-    return;
-  }
-
-  if (head_dst->head == NULL) {
-    *head_dst = *tail_src;
-    memset(tail_src, 0, sizeof(*tail_src));
-    return;
-  }
-
-  head_dst->tail->next_message = tail_src->head;
-  head_dst->tail = tail_src->tail;
-  memset(tail_src, 0, sizeof(*tail_src));
-}
-
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
-    grpc_chttp2_incoming_frame_queue *q) {
-  grpc_byte_stream *out;
-  if (q->head == NULL) {
-    return NULL;
-  }
-  out = &q->head->base;
-  if (q->head == q->tail) {
-    memset(q, 0, sizeof(*q));
-  } else {
-    q->head = q->head->next_message;
-  }
-  return out;
-}
-
 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
                              uint32_t write_bytes, int is_eof,
                              grpc_transport_one_way_stats *stats,
@@ -143,145 +112,217 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
   stats->data_bytes += write_bytes;
 }
 
-static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
-                               grpc_chttp2_data_parser *p,
-                               grpc_chttp2_transport *t, grpc_chttp2_stream *s,
-                               grpc_slice slice) {
-  uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
-  uint8_t *const end = GRPC_SLICE_END_PTR(slice);
-  uint8_t *cur = beg;
-  uint32_t message_flags;
-  grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
-  char *msg;
+grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
+                                                grpc_chttp2_data_parser *p,
+                                                grpc_chttp2_stream *s,
+                                                grpc_slice_buffer *slices,
+                                                grpc_slice *slice_out,
+                                                grpc_byte_stream **stream_out) {
+  grpc_error *error = GRPC_ERROR_NONE;
+  grpc_chttp2_transport *t = s->t;
 
-  if (cur == end) {
-    return GRPC_ERROR_NONE;
-  }
+  while (slices->count > 0) {
+    uint8_t *beg = NULL;
+    uint8_t *end = NULL;
+    uint8_t *cur = NULL;
 
-  switch (p->state) {
-    case GRPC_CHTTP2_DATA_ERROR:
-      p->state = GRPC_CHTTP2_DATA_ERROR;
-      return GRPC_ERROR_REF(p->error);
-    fh_0:
-    case GRPC_CHTTP2_DATA_FH_0:
-      s->stats.incoming.framing_bytes++;
-      p->frame_type = *cur;
-      switch (p->frame_type) {
-        case 0:
-          p->is_frame_compressed = 0; /* GPR_FALSE */
-          break;
-        case 1:
-          p->is_frame_compressed = 1; /* GPR_TRUE */
-          break;
-        default:
-          gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
-          p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
-          p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
-                                        (intptr_t)s->id);
-          gpr_free(msg);
-          msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
-          p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
-                                        grpc_slice_from_copied_string(msg));
-          gpr_free(msg);
-          p->error =
-              grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
-          p->state = GRPC_CHTTP2_DATA_ERROR;
-          return GRPC_ERROR_REF(p->error);
-      }
-      if (++cur == end) {
-        p->state = GRPC_CHTTP2_DATA_FH_1;
-        return GRPC_ERROR_NONE;
-      }
-    /* fallthrough */
-    case GRPC_CHTTP2_DATA_FH_1:
-      s->stats.incoming.framing_bytes++;
-      p->frame_size = ((uint32_t)*cur) << 24;
-      if (++cur == end) {
-        p->state = GRPC_CHTTP2_DATA_FH_2;
-        return GRPC_ERROR_NONE;
-      }
-    /* fallthrough */
-    case GRPC_CHTTP2_DATA_FH_2:
-      s->stats.incoming.framing_bytes++;
-      p->frame_size |= ((uint32_t)*cur) << 16;
-      if (++cur == end) {
-        p->state = GRPC_CHTTP2_DATA_FH_3;
-        return GRPC_ERROR_NONE;
-      }
-    /* fallthrough */
-    case GRPC_CHTTP2_DATA_FH_3:
-      s->stats.incoming.framing_bytes++;
-      p->frame_size |= ((uint32_t)*cur) << 8;
-      if (++cur == end) {
-        p->state = GRPC_CHTTP2_DATA_FH_4;
-        return GRPC_ERROR_NONE;
-      }
-    /* fallthrough */
-    case GRPC_CHTTP2_DATA_FH_4:
-      s->stats.incoming.framing_bytes++;
-      p->frame_size |= ((uint32_t)*cur);
-      p->state = GRPC_CHTTP2_DATA_FRAME;
-      ++cur;
-      message_flags = 0;
-      if (p->is_frame_compressed) {
-        message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
-      }
-      p->parsing_frame = incoming_byte_stream =
-          grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
-                                                  message_flags);
-    /* fallthrough */
-    case GRPC_CHTTP2_DATA_FRAME:
-      if (cur == end) {
-        return GRPC_ERROR_NONE;
-      }
-      uint32_t remaining = (uint32_t)(end - cur);
-      if (remaining == p->frame_size) {
-        s->stats.incoming.data_bytes += p->frame_size;
-        grpc_chttp2_incoming_byte_stream_push(
-            exec_ctx, p->parsing_frame,
-            grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
-        grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
-                                                  GRPC_ERROR_NONE);
-        p->parsing_frame = NULL;
-        p->state = GRPC_CHTTP2_DATA_FH_0;
-        return GRPC_ERROR_NONE;
-      } else if (remaining > p->frame_size) {
-        s->stats.incoming.data_bytes += p->frame_size;
-        grpc_chttp2_incoming_byte_stream_push(
-            exec_ctx, p->parsing_frame,
-            grpc_slice_sub(slice, (size_t)(cur - beg),
-                           (size_t)(cur + p->frame_size - beg)));
-        grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
-                                                  GRPC_ERROR_NONE);
-        p->parsing_frame = NULL;
-        cur += p->frame_size;
-        goto fh_0; /* loop */
-      } else {
-        GPR_ASSERT(remaining <= p->frame_size);
-        grpc_chttp2_incoming_byte_stream_push(
-            exec_ctx, p->parsing_frame,
-            grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
-        p->frame_size -= remaining;
-        s->stats.incoming.data_bytes += remaining;
+    grpc_slice slice = grpc_slice_buffer_take_first(slices);
+
+    beg = GRPC_SLICE_START_PTR(slice);
+    end = GRPC_SLICE_END_PTR(slice);
+    cur = beg;
+    uint32_t message_flags;
+    char *msg;
+
+    if (cur == end) {
+      grpc_slice_unref_internal(exec_ctx, slice);
+      continue;
+    }
+
+    switch (p->state) {
+      case GRPC_CHTTP2_DATA_ERROR:
+        p->state = GRPC_CHTTP2_DATA_ERROR;
+        grpc_slice_unref_internal(exec_ctx, slice);
+        return GRPC_ERROR_REF(p->error);
+      case GRPC_CHTTP2_DATA_FH_0:
+        p->frame_type = *cur;
+        switch (p->frame_type) {
+          case 0:
+            p->is_frame_compressed = false; /* GPR_FALSE */
+            break;
+          case 1:
+            p->is_frame_compressed = true; /* GPR_TRUE */
+            break;
+          default:
+            gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
+            p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+            p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
+                                          (intptr_t)s->id);
+            gpr_free(msg);
+            msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+            p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
+                                          grpc_slice_from_copied_string(msg));
+            gpr_free(msg);
+            p->error =
+                grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
+            p->state = GRPC_CHTTP2_DATA_ERROR;
+            grpc_slice_unref_internal(exec_ctx, slice);
+            return GRPC_ERROR_REF(p->error);
+        }
+        if (++cur == end) {
+          p->state = GRPC_CHTTP2_DATA_FH_1;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          continue;
+        }
+      /* fallthrough */
+      case GRPC_CHTTP2_DATA_FH_1:
+        p->frame_size = ((uint32_t)*cur) << 24;
+        if (++cur == end) {
+          p->state = GRPC_CHTTP2_DATA_FH_2;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          continue;
+        }
+      /* fallthrough */
+      case GRPC_CHTTP2_DATA_FH_2:
+        p->frame_size |= ((uint32_t)*cur) << 16;
+        if (++cur == end) {
+          p->state = GRPC_CHTTP2_DATA_FH_3;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          continue;
+        }
+      /* fallthrough */
+      case GRPC_CHTTP2_DATA_FH_3:
+        p->frame_size |= ((uint32_t)*cur) << 8;
+        if (++cur == end) {
+          p->state = GRPC_CHTTP2_DATA_FH_4;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          continue;
+        }
+      /* fallthrough */
+      case GRPC_CHTTP2_DATA_FH_4:
+        GPR_ASSERT(stream_out != NULL);
+        GPR_ASSERT(p->parsing_frame == NULL);
+        p->frame_size |= ((uint32_t)*cur);
+        p->state = GRPC_CHTTP2_DATA_FRAME;
+        ++cur;
+        message_flags = 0;
+        if (p->is_frame_compressed) {
+          message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+        }
+        p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
+            exec_ctx, t, s, p->frame_size, message_flags);
+        *stream_out = &p->parsing_frame->base;
+        if (p->parsing_frame->remaining_bytes == 0) {
+          GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+              exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
+          p->parsing_frame = NULL;
+          p->state = GRPC_CHTTP2_DATA_FH_0;
+        }
+        s->pending_byte_stream = true;
+
+        if (cur != end) {
+          grpc_slice_buffer_undo_take_first(
+              &s->unprocessed_incoming_frames_buffer,
+              grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+        }
+        grpc_slice_unref_internal(exec_ctx, slice);
         return GRPC_ERROR_NONE;
+      case GRPC_CHTTP2_DATA_FRAME: {
+        GPR_ASSERT(p->parsing_frame != NULL);
+        GPR_ASSERT(slice_out != NULL);
+        if (cur == end) {
+          grpc_slice_unref_internal(exec_ctx, slice);
+          continue;
+        }
+        uint32_t remaining = (uint32_t)(end - cur);
+        if (remaining == p->frame_size) {
+          if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+                                      exec_ctx, p->parsing_frame,
+                                      grpc_slice_sub(slice, (size_t)(cur - beg),
+                                                     (size_t)(end - beg)),
+                                      slice_out))) {
+            grpc_slice_unref_internal(exec_ctx, slice);
+            return error;
+          }
+          if (GRPC_ERROR_NONE !=
+              (error = grpc_chttp2_incoming_byte_stream_finished(
+                   exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+            grpc_slice_unref_internal(exec_ctx, slice);
+            return error;
+          }
+          p->parsing_frame = NULL;
+          p->state = GRPC_CHTTP2_DATA_FH_0;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          return GRPC_ERROR_NONE;
+        } else if (remaining < p->frame_size) {
+          if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+                                      exec_ctx, p->parsing_frame,
+                                      grpc_slice_sub(slice, (size_t)(cur - beg),
+                                                     (size_t)(end - beg)),
+                                      slice_out))) {
+            return error;
+          }
+          p->frame_size -= remaining;
+          grpc_slice_unref_internal(exec_ctx, slice);
+          return GRPC_ERROR_NONE;
+        } else {
+          GPR_ASSERT(remaining > p->frame_size);
+          if (GRPC_ERROR_NONE !=
+              (grpc_chttp2_incoming_byte_stream_push(
+                  exec_ctx, p->parsing_frame,
+                  grpc_slice_sub(slice, (size_t)(cur - beg),
+                                 (size_t)(cur + p->frame_size - beg)),
+                  slice_out))) {
+            grpc_slice_unref_internal(exec_ctx, slice);
+            return error;
+          }
+          if (GRPC_ERROR_NONE !=
+              (error = grpc_chttp2_incoming_byte_stream_finished(
+                   exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+            grpc_slice_unref_internal(exec_ctx, slice);
+            return error;
+          }
+          p->parsing_frame = NULL;
+          p->state = GRPC_CHTTP2_DATA_FH_0;
+          cur += p->frame_size;
+          grpc_slice_buffer_undo_take_first(
+              &s->unprocessed_incoming_frames_buffer,
+              grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+          grpc_slice_unref_internal(exec_ctx, slice);
+          return GRPC_ERROR_NONE;
+        }
       }
+    }
   }
 
-  GPR_UNREACHABLE_CODE(
-      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
+  return GRPC_ERROR_NONE;
 }
 
 grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
                                           grpc_chttp2_transport *t,
                                           grpc_chttp2_stream *s,
                                           grpc_slice slice, int is_last) {
-  grpc_chttp2_data_parser *p = parser;
-  grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
+  /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
+  s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
+  if (!s->pending_byte_stream) {
+    grpc_slice_ref_internal(slice);
+    grpc_slice_buffer_add(&s->frame_storage, slice);
+    grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+  } else if (s->on_next) {
+    GPR_ASSERT(s->frame_storage.length == 0);
+    grpc_slice_ref_internal(slice);
+    grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
+    grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
+    s->on_next = NULL;
+  } else {
+    grpc_slice_ref_internal(slice);
+    grpc_slice_buffer_add(&s->frame_storage, slice);
+  }
 
-  if (is_last && p->is_last_frame) {
+  if (is_last && s->received_last_frame) {
     grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
                                    GRPC_ERROR_NONE);
   }
 
-  return error;
+  return GRPC_ERROR_NONE;
 }

+ 10 - 14
src/core/ext/transport/chttp2/transport/frame_data.h

@@ -56,28 +56,16 @@ typedef enum {
 typedef struct grpc_chttp2_incoming_byte_stream
     grpc_chttp2_incoming_byte_stream;
 
-typedef struct grpc_chttp2_incoming_frame_queue {
-  grpc_chttp2_incoming_byte_stream *head;
-  grpc_chttp2_incoming_byte_stream *tail;
-} grpc_chttp2_incoming_frame_queue;
-
 typedef struct {
   grpc_chttp2_stream_state state;
-  uint8_t is_last_frame;
   uint8_t frame_type;
   uint32_t frame_size;
   grpc_error *error;
 
-  int is_frame_compressed;
+  bool is_frame_compressed;
   grpc_chttp2_incoming_byte_stream *parsing_frame;
 } grpc_chttp2_data_parser;
 
-void grpc_chttp2_incoming_frame_queue_merge(
-    grpc_chttp2_incoming_frame_queue *head_dst,
-    grpc_chttp2_incoming_frame_queue *tail_src);
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
-    grpc_chttp2_incoming_frame_queue *q);
-
 /* initialize per-stream state for data frame parsing */
 grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
 
@@ -87,7 +75,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
 /* start processing a new data frame */
 grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
                                                 uint8_t flags,
-                                                uint32_t stream_id);
+                                                uint32_t stream_id,
+                                                grpc_chttp2_stream *s);
 
 /* handle a slice of a data frame - is_last indicates the last slice of a
    frame */
@@ -101,4 +90,11 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
                              grpc_transport_one_way_stats *stats,
                              grpc_slice_buffer *outbuf);
 
+grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
+                                                grpc_chttp2_data_parser *p,
+                                                grpc_chttp2_stream *s,
+                                                grpc_slice_buffer *slices,
+                                                grpc_slice *slice_out,
+                                                grpc_byte_stream **stream_out);
+
 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */

+ 31 - 20
src/core/ext/transport/chttp2/transport/internal.h

@@ -195,22 +195,20 @@ typedef struct grpc_chttp2_write_cb {
 struct grpc_chttp2_incoming_byte_stream {
   grpc_byte_stream base;
   gpr_refcount refs;
-  struct grpc_chttp2_incoming_byte_stream *next_message;
-  grpc_error *error;
 
-  grpc_chttp2_transport *transport;
-  grpc_chttp2_stream *stream;
-  bool is_tail;
+  grpc_chttp2_transport *transport; /* immutable */
+  grpc_chttp2_stream *stream;       /* immutable */
 
-  gpr_mu slice_mu;  // protects slices, on_next
-  grpc_slice_buffer slices;
-  grpc_closure *on_next;
-  grpc_slice *next;
+  /* Accessed only by transport thread when stream->pending_byte_stream == false
+   * Accessed only by application thread when stream->pending_byte_stream ==
+   * true */
   uint32_t remaining_bytes;
 
+  /* Accessed only by transport thread when stream->pending_byte_stream == false
+   * Accessed only by application thread when stream->pending_byte_stream ==
+   * true */
   struct {
     grpc_closure closure;
-    grpc_slice *slice;
     size_t max_size_hint;
     grpc_closure *on_complete;
   } next_action;
@@ -445,8 +443,8 @@ struct grpc_chttp2_stream {
   uint32_t id;
 
   /** window available for us to send to peer, over or under the initial window
-    * size of the transport... ie:
-    * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+   * size of the transport... ie:
+   * outgoing_window = outgoing_window_delta + transport.initial_window_size */
   int64_t outgoing_window_delta;
   /** things the upper layers would like to send */
   grpc_metadata_batch *send_initial_metadata;
@@ -473,9 +471,6 @@ struct grpc_chttp2_stream {
   grpc_transport_stream_stats *collecting_stats;
   grpc_transport_stream_stats stats;
 
-  /** number of streams that are currently being read */
-  gpr_refcount active_streams;
-
   /** Is this stream closed for writing. */
   bool write_closed;
   /** Is this stream reading half-closed. */
@@ -499,7 +494,17 @@ struct grpc_chttp2_stream {
 
   grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
 
-  grpc_chttp2_incoming_frame_queue incoming_frames;
+  grpc_slice_buffer frame_storage; /* protected by t combiner */
+
+  /* Accessed only by transport thread when stream->pending_byte_stream == false
+   * Accessed only by application thread when stream->pending_byte_stream ==
+   * true */
+  grpc_slice_buffer unprocessed_incoming_frames_buffer;
+  grpc_closure *on_next;    /* protected by t combiner */
+  bool pending_byte_stream; /* protected by t combiner */
+  grpc_closure reset_byte_stream;
+  grpc_error *byte_stream_error; /* protected by t combiner */
+  bool received_last_frame;      /* protected by t combiner */
 
   gpr_timespec deadline;
 
@@ -512,6 +517,9 @@ struct grpc_chttp2_stream {
    * incoming_window = incoming_window_delta + transport.initial_window_size */
   int64_t incoming_window_delta;
   /** parsing state for data frames */
+  /* Accessed only by transport thread when stream->pending_byte_stream == false
+   * Accessed only by application thread when stream->pending_byte_stream ==
+   * true */
   grpc_chttp2_data_parser data_parser;
   /** number of bytes received - reset at end of parse thread execution */
   int64_t received_bytes;
@@ -790,10 +798,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
 grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
     uint32_t frame_size, uint32_t flags);
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
-                                           grpc_chttp2_incoming_byte_stream *bs,
-                                           grpc_slice slice);
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+    grpc_slice slice, grpc_slice *slice_out);
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+    grpc_error *error, bool reset_on_error);
+void grpc_chttp2_incoming_byte_stream_notify(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
     grpc_error *error);
 

+ 3 - 2
src/core/ext/transport/chttp2/transport/parsing.c

@@ -458,12 +458,13 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
     return init_skip_frame_parser(exec_ctx, t, 0);
   }
   if (err == GRPC_ERROR_NONE) {
-    err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
-                                              t->incoming_frame_flags, s->id);
+    err = grpc_chttp2_data_parser_begin_frame(
+        &s->data_parser, t->incoming_frame_flags, s->id, s);
   }
 error_handler:
   if (err == GRPC_ERROR_NONE) {
     t->incoming_stream = s;
+    /* t->parser = grpc_chttp2_data_parser_parse;*/
     t->parser = grpc_chttp2_data_parser_parse;
     t->parser_data = &s->data_parser;
     return GRPC_ERROR_NONE;

+ 14 - 3
src/core/ext/transport/cronet/transport/cronet_transport.c

@@ -973,9 +973,20 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
       grpc_slice_buffer write_slice_buffer;
       grpc_slice slice;
       grpc_slice_buffer_init(&write_slice_buffer);
-      grpc_byte_stream_next(
-          NULL, stream_op->payload->send_message.send_message, &slice,
-          stream_op->payload->send_message.send_message->length, NULL);
+      if (1 != grpc_byte_stream_next(
+                   exec_ctx, stream_op->payload->send_message.send_message,
+                   stream_op->payload->send_message.send_message->length,
+                   NULL)) {
+        /* Should never reach here */
+        GPR_ASSERT(false);
+      }
+      if (GRPC_ERROR_NONE !=
+          grpc_byte_stream_pull(exec_ctx,
+                                stream_op->payload->send_message.send_message,
+                                &slice)) {
+        /* Should never reach here */
+        GPR_ASSERT(false);
+      }
       grpc_slice_buffer_add(&write_slice_buffer, slice);
       if (write_slice_buffer.count != 1) {
         /* Empty request not handled yet */

+ 12 - 2
src/core/lib/channel/compress_filter.c

@@ -221,6 +221,13 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
 static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
   grpc_call_element *elem = elemp;
   call_data *calld = elem->call_data;
+  if (GRPC_ERROR_NONE !=
+      grpc_byte_stream_pull(exec_ctx,
+                            calld->send_op->payload->send_message.send_message,
+                            &calld->incoming_slice)) {
+    /* Should never reach here */
+    abort();
+  }
   grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
   if (calld->send_length == calld->slices.length) {
     finish_send_message(exec_ctx, elem);
@@ -233,8 +240,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
                                   grpc_call_element *elem) {
   call_data *calld = elem->call_data;
   while (grpc_byte_stream_next(
-      exec_ctx, calld->send_op->payload->send_message.send_message,
-      &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
+      exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+      &calld->got_slice)) {
+    grpc_byte_stream_pull(exec_ctx,
+                          calld->send_op->payload->send_message.send_message,
+                          &calld->incoming_slice);
     grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
     if (calld->send_length == calld->slices.length) {
       finish_send_message(exec_ctx, elem);

+ 12 - 2
src/core/lib/channel/http_client_filter.c

@@ -220,8 +220,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
   call_data *calld = elem->call_data;
   uint8_t *wrptr = calld->payload_bytes;
   while (grpc_byte_stream_next(
-      exec_ctx, calld->send_op->payload->send_message.send_message,
-      &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
+      exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+      &calld->got_slice)) {
+    grpc_byte_stream_pull(exec_ctx,
+                          calld->send_op->payload->send_message.send_message,
+                          &calld->incoming_slice);
     memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
            GRPC_SLICE_LENGTH(calld->incoming_slice));
     wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -237,6 +240,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
   grpc_call_element *elem = elemp;
   call_data *calld = elem->call_data;
   calld->send_message_blocked = false;
+  if (GRPC_ERROR_NONE !=
+      grpc_byte_stream_pull(exec_ctx,
+                            calld->send_op->payload->send_message.send_message,
+                            &calld->incoming_slice)) {
+    /* Should never reach here */
+    abort();
+  }
   grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
   if (calld->send_length == calld->slices.length) {
     /* Pass down the original send_message op that was blocked.*/

+ 36 - 8
src/core/lib/surface/call.c

@@ -1187,6 +1187,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
 
 static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
                                       batch_control *bctl) {
+  grpc_error *error;
   grpc_call *call = bctl->call;
   for (;;) {
     size_t remaining = call->receiving_stream->length -
@@ -1198,11 +1199,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
       finish_batch_step(exec_ctx, bctl);
       return;
     }
-    if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
-                              &call->receiving_slice, remaining,
+    if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
                               &call->receiving_slice_ready)) {
-      grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
-                            call->receiving_slice);
+      error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
+                                    &call->receiving_slice);
+      if (error == GRPC_ERROR_NONE) {
+        grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+                              call->receiving_slice);
+      } else {
+        grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+        call->receiving_stream = NULL;
+        grpc_byte_buffer_destroy(*call->receiving_buffer);
+        *call->receiving_buffer = NULL;
+        call->receiving_message = 0;
+        finish_batch_step(exec_ctx, bctl);
+        return;
+      }
     } else {
       return;
     }
@@ -1213,12 +1225,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
                                   grpc_error *error) {
   batch_control *bctl = bctlp;
   grpc_call *call = bctl->call;
+  grpc_byte_stream *bs = call->receiving_stream;
+  bool release_error = false;
 
   if (error == GRPC_ERROR_NONE) {
-    grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
-                          call->receiving_slice);
-    continue_receiving_slices(exec_ctx, bctl);
-  } else {
+    grpc_slice slice;
+    error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
+    if (error == GRPC_ERROR_NONE) {
+      grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+                            slice);
+      continue_receiving_slices(exec_ctx, bctl);
+    } else {
+      /* Error returned by grpc_byte_stream_pull needs to be released manually
+       */
+      release_error = true;
+    }
+  }
+
+  if (error != GRPC_ERROR_NONE) {
     if (grpc_trace_operation_failures) {
       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
     }
@@ -1226,7 +1250,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
     call->receiving_stream = NULL;
     grpc_byte_buffer_destroy(*call->receiving_buffer);
     *call->receiving_buffer = NULL;
+    call->receiving_message = 0;
     finish_batch_step(exec_ctx, bctl);
+    if (release_error) {
+      GRPC_ERROR_UNREF(error);
+    }
   }
 }
 

+ 23 - 9
src/core/lib/transport/byte_stream.c

@@ -40,10 +40,15 @@
 #include "src/core/lib/slice/slice_internal.h"
 
 int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                          grpc_byte_stream *byte_stream, grpc_slice *slice,
-                          size_t max_size_hint, grpc_closure *on_complete) {
-  return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
-                           on_complete);
+                          grpc_byte_stream *byte_stream, size_t max_size_hint,
+                          grpc_closure *on_complete) {
+  return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
+}
+
+grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+                                  grpc_byte_stream *byte_stream,
+                                  grpc_slice *slice) {
+  return byte_stream->pull(exec_ctx, byte_stream, slice);
 }
 
 void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -53,16 +58,24 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 
 /* slice_buffer_stream */
 
-static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
-                                    grpc_byte_stream *byte_stream,
-                                    grpc_slice *slice, size_t max_size_hint,
-                                    grpc_closure *on_complete) {
+static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
+                                     grpc_byte_stream *byte_stream,
+                                     size_t max_size_hint,
+                                     grpc_closure *on_complete) {
+  grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+  GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
+  return true;
+}
+
+static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
+                                            grpc_byte_stream *byte_stream,
+                                            grpc_slice *slice) {
   grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
   GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
   *slice =
       grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
   stream->cursor++;
-  return 1;
+  return GRPC_ERROR_NONE;
 }
 
 static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -75,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
   stream->base.length = (uint32_t)slice_buffer->length;
   stream->base.flags = flags;
   stream->base.next = slice_buffer_stream_next;
+  stream->base.pull = slice_buffer_stream_pull;
   stream->base.destroy = slice_buffer_stream_destroy;
   stream->backing_buffer = slice_buffer;
   stream->cursor = 0;

+ 15 - 6
src/core/lib/transport/byte_stream.h

@@ -49,9 +49,10 @@ typedef struct grpc_byte_stream grpc_byte_stream;
 struct grpc_byte_stream {
   uint32_t length;
   uint32_t flags;
-  int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
-              grpc_slice *slice, size_t max_size_hint,
-              grpc_closure *on_complete);
+  bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+               size_t max_size_hint, grpc_closure *on_complete);
+  grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+                      grpc_slice *slice);
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
 };
 
@@ -61,12 +62,20 @@ struct grpc_byte_stream {
  *
  * max_size_hint can be set as a hint as to the maximum number
  * of bytes that would be acceptable to read.
+ */
+int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                          grpc_byte_stream *byte_stream, size_t max_size_hint,
+                          grpc_closure *on_complete);
+
+/* returns the next slice in the byte stream when it is ready (indicated by
+ * either grpc_byte_stream_next returning 1 or on_complete passed to
+ * grpc_byte_stream_next is called).
  *
  * once a slice is returned into *slice, it is owned by the caller.
  */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                          grpc_byte_stream *byte_stream, grpc_slice *slice,
-                          size_t max_size_hint, grpc_closure *on_complete);
+grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+                                  grpc_byte_stream *byte_stream,
+                                  grpc_slice *slice);
 
 void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                               grpc_byte_stream *byte_stream);

+ 7 - 2
test/cpp/microbenchmarks/bm_chttp2_transport.cc

@@ -569,12 +569,17 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
         grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
         return;
       }
-    } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice,
+    } while (grpc_byte_stream_next(exec_ctx, recv_stream,
                                    recv_stream->length - received,
-                                   drain_continue.get()));
+                                   drain_continue.get()) &&
+             GRPC_ERROR_NONE ==
+                 grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
+             (received += GRPC_SLICE_LENGTH(recv_slice),
+              grpc_slice_unref_internal(exec_ctx, recv_slice), true));
   });
 
   drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
+    grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
     received += GRPC_SLICE_LENGTH(recv_slice);
     grpc_slice_unref_internal(exec_ctx, recv_slice);
     grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);