|  | @@ -44,6 +44,7 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/string_util.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/useful.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "src/core/ext/transport/chttp2/transport/frame_data.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/transport/chttp2/transport/internal.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/transport/chttp2/transport/varint.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/channel/channel_args.h"
 | 
	
	
		
			
				|  | @@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                  void *byte_stream,
 | 
	
		
			
				|  |  |                                                  grpc_error *error_ignored);
 | 
	
		
			
				|  |  | +static void incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  | +    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | +    grpc_error *error);
 | 
	
		
			
				|  |  | +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                       grpc_chttp2_incoming_byte_stream *bs);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
 | 
	
		
			
				|  |  |                                      grpc_error *error);
 | 
	
	
		
			
				|  | @@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |  static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                                              grpc_error *error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                              grpc_error *error);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /*******************************************************************************
 | 
	
		
			
				|  |  |   * CONSTRUCTION/DESTRUCTION/REFCOUNTING
 | 
	
		
			
				|  |  |   */
 | 
	
	
		
			
				|  | @@ -656,7 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |    /* We reserve one 'active stream' that's dropped when the stream is
 | 
	
		
			
				|  |  |       read-closed. The others are for incoming_byte_streams that are actively
 | 
	
		
			
				|  |  |       reading */
 | 
	
		
			
				|  |  | -  gpr_ref_init(&s->active_streams, 1);
 | 
	
		
			
				|  |  |    GRPC_CHTTP2_STREAM_REF(s, "chttp2");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
 | 
	
	
		
			
				|  | @@ -666,6 +674,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |    s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |    grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_init(&s->frame_storage);
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
 | 
	
		
			
				|  |  | +                    grpc_combiner_scheduler(t->combiner, false));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -683,7 +696,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |                                    grpc_error *error) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = sp;
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -694,9 +706,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |      GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_destroy_internal(exec_ctx,
 | 
	
		
			
				|  |  | +                                     &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_chttp2_list_remove_stalled_by_transport(t, s);
 | 
	
		
			
				|  |  |    grpc_chttp2_list_remove_stalled_by_stream(t, s);
 | 
	
	
		
			
				|  | @@ -723,6 +735,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |    grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->read_closed_error);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->write_closed_error);
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (s->incoming_window_delta > 0) {
 | 
	
		
			
				|  |  |      GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
 | 
	
	
		
			
				|  | @@ -1176,8 +1189,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        s->fetching_send_message = NULL;
 | 
	
		
			
				|  |  |        return; /* early out */
 | 
	
		
			
				|  |  |      } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | -                                     &s->fetching_slice, UINT32_MAX,
 | 
	
		
			
				|  |  | -                                     &s->complete_fetch_locked)) {
 | 
	
		
			
				|  |  | +                                     UINT32_MAX, &s->complete_fetch_locked)) {
 | 
	
		
			
				|  |  | +      grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | +                            &s->fetching_slice);
 | 
	
		
			
				|  |  |        add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1188,9 +1202,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = gs;
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -    continue_fetching_send_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | +    error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | +                                  &s->fetching_slice);
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +      continue_fetching_send_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      /* TODO(ctiller): what to do here */
 | 
	
		
			
				|  |  |      abort();
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1422,12 +1442,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->recv_message) {
 | 
	
		
			
				|  |  | +    size_t already_received;
 | 
	
		
			
				|  |  |      GPR_ASSERT(s->recv_message_ready == NULL);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!s->pending_byte_stream);
 | 
	
		
			
				|  |  |      s->recv_message_ready = op_payload->recv_message.recv_message_ready;
 | 
	
		
			
				|  |  |      s->recv_message = op_payload->recv_message.recv_message;
 | 
	
		
			
				|  |  | -    if (s->id != 0 &&
 | 
	
		
			
				|  |  | -        (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
 | 
	
		
			
				|  |  | -      incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
 | 
	
		
			
				|  |  | +    if (s->id != 0) {
 | 
	
		
			
				|  |  | +      if (s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        already_received = s->frame_storage.length;
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        already_received = s->frame_storage.length +
 | 
	
		
			
				|  |  | +                           s->unprocessed_incoming_frames_buffer.length;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
 | 
	
		
			
				|  |  | +                                               already_received);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1615,13 +1643,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                        grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                                        grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    if (s->recv_initial_metadata_ready != NULL &&
 | 
	
		
			
				|  |  |        s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
 | 
	
		
			
				|  |  |      if (s->seen_error) {
 | 
	
		
			
				|  |  | -      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -             NULL) {
 | 
	
		
			
				|  |  | -        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
	
		
			
				|  | @@ -1634,39 +1662,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                               grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                               grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  | +  grpc_error *error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    if (s->recv_message_ready != NULL) {
 | 
	
		
			
				|  |  | -    while (s->final_metadata_requested && s->seen_error &&
 | 
	
		
			
				|  |  | -           (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -               NULL) {
 | 
	
		
			
				|  |  | -      incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    *s->recv_message = NULL;
 | 
	
		
			
				|  |  | +    if (s->final_metadata_requested && s->seen_error) {
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (s->incoming_frames.head != NULL) {
 | 
	
		
			
				|  |  | -      *s->recv_message =
 | 
	
		
			
				|  |  | -          grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
 | 
	
		
			
				|  |  | -      GPR_ASSERT(*s->recv_message != NULL);
 | 
	
		
			
				|  |  | +    if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +      while (s->unprocessed_incoming_frames_buffer.length > 0 ||
 | 
	
		
			
				|  |  | +             s->frame_storage.length > 0) {
 | 
	
		
			
				|  |  | +        if (s->unprocessed_incoming_frames_buffer.length == 0) {
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
 | 
	
		
			
				|  |  | +                                 &s->frame_storage);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        error = deframe_unprocessed_incoming_frames(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->data_parser, s,
 | 
	
		
			
				|  |  | +            &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
 | 
	
		
			
				|  |  | +        if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +          s->seen_error = true;
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
 | 
	
		
			
				|  |  | +                                                     &s->frame_storage);
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +              exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        } else if (*s->recv_message != NULL) {
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
 | 
	
		
			
				|  |  |        null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
 | 
	
		
			
				|  |  |        *s->recv_message = NULL;
 | 
	
		
			
				|  |  |        null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                         grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                                         grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
 | 
	
		
			
				|  |  |        s->write_closed) {
 | 
	
		
			
				|  |  |      if (s->seen_error) {
 | 
	
		
			
				|  |  | -      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -             NULL) {
 | 
	
		
			
				|  |  | -        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (s->all_incoming_byte_streams_finished &&
 | 
	
		
			
				|  |  | +    bool pending_data = s->pending_byte_stream ||
 | 
	
		
			
				|  |  | +                        s->unprocessed_incoming_frames_buffer.length > 0;
 | 
	
		
			
				|  |  | +    if (s->read_closed && s->frame_storage.length == 0 &&
 | 
	
		
			
				|  |  | +        (!pending_data || s->seen_error) &&
 | 
	
		
			
				|  |  |          s->recv_trailing_metadata_finished != NULL) {
 | 
	
		
			
				|  |  |        grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
		
			
				|  |  |            exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
 | 
	
	
		
			
				|  | @@ -1677,14 +1731,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                            grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  | -                                            grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
 | 
	
		
			
				|  |  | -    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                            uint32_t id, grpc_error *error) {
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
 | 
	
	
		
			
				|  | @@ -1693,10 +1739,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |      t->incoming_stream = NULL;
 | 
	
		
			
				|  |  |      grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | -    grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  | -        exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +  if (s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +    if (s->on_next != NULL) {
 | 
	
		
			
				|  |  | +      grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
 | 
	
		
			
				|  |  | +      if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      incoming_byte_stream_publish_error(exec_ctx, bs, error);
 | 
	
		
			
				|  |  | +      incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +      s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +      s->byte_stream_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
 | 
	
	
		
			
				|  | @@ -1882,7 +1937,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |          s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    decrement_active_streams_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1933,7 +1987,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |      GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
 | 
	
		
			
				|  |  |      len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    content_type_hdr = grpc_slice_malloc(31);
 | 
	
		
			
				|  |  | +    content_type_hdr = GRPC_SLICE_MALLOC(31);
 | 
	
		
			
				|  |  |      p = GRPC_SLICE_START_PTR(content_type_hdr);
 | 
	
		
			
				|  |  |      *p++ = 0x00;
 | 
	
		
			
				|  |  |      *p++ = 12;
 | 
	
	
		
			
				|  | @@ -2422,12 +2476,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |   * BYTE STREAM
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                              grpc_error *error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
 | 
	
		
			
				|  |  | +    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    s->on_next = NULL;
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +    s->byte_stream_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +    grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    s->byte_stream_error = error;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                         grpc_chttp2_incoming_byte_stream *bs) {
 | 
	
		
			
				|  |  |    if (gpr_unref(&bs->refs)) {
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(bs->error);
 | 
	
		
			
				|  |  | -    grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
 | 
	
		
			
				|  |  | -    gpr_mu_destroy(&bs->slice_mu);
 | 
	
		
			
				|  |  |      gpr_free(bs);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -2487,47 +2557,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = bs->transport;
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (bs->is_tail) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -    size_t cur_length = bs->slices.length;
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -    incoming_byte_stream_update_flow_control(
 | 
	
		
			
				|  |  | -        exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -  if (bs->slices.count > 0) {
 | 
	
		
			
				|  |  | -    *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
 | 
	
		
			
				|  |  | -    grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  } else if (bs->error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    grpc_closure_run(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | -                     GRPC_ERROR_REF(bs->error));
 | 
	
		
			
				|  |  | +  size_t cur_length = s->frame_storage.length;
 | 
	
		
			
				|  |  | +  incoming_byte_stream_update_flow_control(
 | 
	
		
			
				|  |  | +      exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
 | 
	
		
			
				|  |  | +  if (s->frame_storage.length > 0) {
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_swap(&s->frame_storage,
 | 
	
		
			
				|  |  | +                           &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | +                       GRPC_ERROR_REF(s->byte_stream_error));
 | 
	
		
			
				|  |  | +    if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | +      incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
 | 
	
		
			
				|  |  | +      s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else if (s->read_closed) {
 | 
	
		
			
				|  |  | +    if (bs->remaining_bytes != 0) {
 | 
	
		
			
				|  |  | +      s->byte_stream_error =
 | 
	
		
			
				|  |  | +          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +      grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | +                         GRPC_ERROR_REF(s->byte_stream_error));
 | 
	
		
			
				|  |  | +      if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | +        incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
 | 
	
		
			
				|  |  | +        s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      /* Should never reach here. */
 | 
	
		
			
				|  |  | +      GPR_ASSERT(false);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    bs->on_next = bs->next_action.on_complete;
 | 
	
		
			
				|  |  | -    bs->next = bs->next_action.slice;
 | 
	
		
			
				|  |  | +    s->on_next = bs->next_action.on_complete;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                     grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | -                                     grpc_slice *slice, size_t max_size_hint,
 | 
	
		
			
				|  |  | -                                     grpc_closure *on_complete) {
 | 
	
		
			
				|  |  | +static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                      grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | +                                      size_t max_size_hint,
 | 
	
		
			
				|  |  | +                                      grpc_closure *on_complete) {
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_byte_stream *bs =
 | 
	
		
			
				|  |  |        (grpc_chttp2_incoming_byte_stream *)byte_stream;
 | 
	
		
			
				|  |  | -  gpr_ref(&bs->refs);
 | 
	
		
			
				|  |  | -  bs->next_action.slice = slice;
 | 
	
		
			
				|  |  | -  bs->next_action.max_size_hint = max_size_hint;
 | 
	
		
			
				|  |  | -  bs->next_action.on_complete = on_complete;
 | 
	
		
			
				|  |  | -  grpc_closure_sched(
 | 
	
		
			
				|  |  | -      exec_ctx,
 | 
	
		
			
				|  |  | -      grpc_closure_init(
 | 
	
		
			
				|  |  | -          &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
 | 
	
		
			
				|  |  | -          grpc_combiner_scheduler(bs->transport->combiner, false)),
 | 
	
		
			
				|  |  | -      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  GPR_TIMER_END("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  | -  return 0;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +  if (s->unprocessed_incoming_frames_buffer.length > 0) {
 | 
	
		
			
				|  |  | +    return true;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    gpr_ref(&bs->refs);
 | 
	
		
			
				|  |  | +    bs->next_action.max_size_hint = max_size_hint;
 | 
	
		
			
				|  |  | +    bs->next_action.on_complete = on_complete;
 | 
	
		
			
				|  |  | +    grpc_closure_sched(
 | 
	
		
			
				|  |  | +        exec_ctx,
 | 
	
		
			
				|  |  | +        grpc_closure_init(
 | 
	
		
			
				|  |  | +            &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
 | 
	
		
			
				|  |  | +            grpc_combiner_scheduler(bs->transport->combiner, false)),
 | 
	
		
			
				|  |  | +        GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    GPR_TIMER_END("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                             grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | +                                             grpc_slice *slice) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_byte_stream *bs =
 | 
	
		
			
				|  |  | +      (grpc_chttp2_incoming_byte_stream *)byte_stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (s->unprocessed_incoming_frames_buffer.length > 0) {
 | 
	
		
			
				|  |  | +    grpc_error *error = deframe_unprocessed_incoming_frames(
 | 
	
		
			
				|  |  | +        exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
 | 
	
		
			
				|  |  | +        slice, NULL);
 | 
	
		
			
				|  |  | +    if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      return error;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    grpc_error *error =
 | 
	
		
			
				|  |  | +        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    return error;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_TIMER_END("incoming_byte_stream_pull", 0);
 | 
	
		
			
				|  |  | +  return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -2537,9 +2650,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                  void *byte_stream,
 | 
	
		
			
				|  |  |                                                  grpc_error *error_ignored) {
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_byte_stream *bs = byte_stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
 | 
	
		
			
				|  |  | -  decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +  grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -2559,50 +2677,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  |      grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  |      grpc_error *error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  bs->on_next = NULL;
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(bs->error);
 | 
	
		
			
				|  |  | +  grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +  s->on_next = NULL;
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +  s->byte_stream_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |    grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
 | 
	
		
			
				|  |  |                              GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  bs->error = error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                           grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | -                                           grpc_slice slice) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | +grpc_error *grpc_chttp2_incoming_byte_stream_push(
 | 
	
		
			
				|  |  | +    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | +    grpc_slice slice, grpc_slice *slice_out) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  | -        exec_ctx, bs,
 | 
	
		
			
				|  |  | -        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
 | 
	
		
			
				|  |  | +    grpc_error *error =
 | 
	
		
			
				|  |  | +        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    grpc_slice_unref_internal(exec_ctx, slice);
 | 
	
		
			
				|  |  | +    return error;
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
 | 
	
		
			
				|  |  | -    if (bs->on_next != NULL) {
 | 
	
		
			
				|  |  | -      *bs->next = slice;
 | 
	
		
			
				|  |  | -      grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -      bs->on_next = NULL;
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      grpc_slice_buffer_add(&bs->slices, slice);
 | 
	
		
			
				|  |  | +    if (slice_out != NULL) {
 | 
	
		
			
				|  |  | +      *slice_out = slice;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  | +grpc_error *grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  |      grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | -    grpc_error *error) {
 | 
	
		
			
				|  |  | +    grpc_error *error, bool reset_on_error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  |      if (bs->remaining_bytes != 0) {
 | 
	
		
			
				|  |  |        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_publish_error(exec_ctx, bs, error);
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE && reset_on_error) {
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +  return error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
 | 
	
	
		
			
				|  | @@ -2614,26 +2735,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
 | 
	
		
			
				|  |  |    incoming_byte_stream->remaining_bytes = frame_size;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.flags = flags;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.next = incoming_byte_stream_next;
 | 
	
		
			
				|  |  | +  incoming_byte_stream->base.pull = incoming_byte_stream_pull;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&incoming_byte_stream->slice_mu);
 | 
	
		
			
				|  |  |    gpr_ref_init(&incoming_byte_stream->refs, 2);
 | 
	
		
			
				|  |  | -  incoming_byte_stream->next_message = NULL;
 | 
	
		
			
				|  |  |    incoming_byte_stream->transport = t;
 | 
	
		
			
				|  |  |    incoming_byte_stream->stream = s;
 | 
	
		
			
				|  |  | -  gpr_ref(&incoming_byte_stream->stream->active_streams);
 | 
	
		
			
				|  |  | -  grpc_slice_buffer_init(&incoming_byte_stream->slices);
 | 
	
		
			
				|  |  | -  incoming_byte_stream->on_next = NULL;
 | 
	
		
			
				|  |  | -  incoming_byte_stream->is_tail = 1;
 | 
	
		
			
				|  |  | -  incoming_byte_stream->error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -  grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
 | 
	
		
			
				|  |  | -  if (q->head == NULL) {
 | 
	
		
			
				|  |  | -    q->head = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    q->tail->is_tail = 0;
 | 
	
		
			
				|  |  | -    q->tail->next_message = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  q->tail = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +  s->byte_stream_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    return incoming_byte_stream;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 |