|  | @@ -44,6 +44,7 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/string_util.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/useful.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "src/core/ext/transport/chttp2/transport/frame_data.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/transport/chttp2/transport/internal.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/transport/chttp2/transport/varint.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/channel/channel_args.h"
 | 
	
	
		
			
				|  | @@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                  void *byte_stream,
 | 
	
		
			
				|  |  |                                                  grpc_error *error_ignored);
 | 
	
		
			
				|  |  | +static void incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  | +    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | +    grpc_error *error);
 | 
	
		
			
				|  |  | +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                       grpc_chttp2_incoming_byte_stream *bs);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
 | 
	
		
			
				|  |  |                                      grpc_error *error);
 | 
	
	
		
			
				|  | @@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |  static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                                              grpc_error *error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                              grpc_error *error);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /*******************************************************************************
 | 
	
		
			
				|  |  |   * CONSTRUCTION/DESTRUCTION/REFCOUNTING
 | 
	
		
			
				|  |  |   */
 | 
	
	
		
			
				|  | @@ -356,6 +365,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                 DEFAULT_WINDOW);
 | 
	
		
			
				|  |  |    push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
 | 
	
		
			
				|  |  |                 DEFAULT_MAX_HEADER_LIST_SIZE);
 | 
	
		
			
				|  |  | +  push_setting(exec_ctx, t,
 | 
	
		
			
				|  |  | +               GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    t->ping_policy = (grpc_chttp2_repeated_ping_policy){
 | 
	
		
			
				|  |  |        .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
 | 
	
	
		
			
				|  | @@ -486,26 +497,31 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |            grpc_chttp2_setting_id setting_id;
 | 
	
		
			
				|  |  |            grpc_integer_options integer_options;
 | 
	
		
			
				|  |  |            bool availability[2] /* server, client */;
 | 
	
		
			
				|  |  | -        } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
 | 
	
		
			
				|  |  | -                             GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
 | 
	
		
			
				|  |  | -                             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | -                             {true, false}},
 | 
	
		
			
				|  |  | -                            {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
 | 
	
		
			
				|  |  | -                             GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
 | 
	
		
			
				|  |  | -                             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | -                             {true, true}},
 | 
	
		
			
				|  |  | -                            {GRPC_ARG_MAX_METADATA_SIZE,
 | 
	
		
			
				|  |  | -                             GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
 | 
	
		
			
				|  |  | -                             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | -                             {true, true}},
 | 
	
		
			
				|  |  | -                            {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
 | 
	
		
			
				|  |  | -                             GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
 | 
	
		
			
				|  |  | -                             {-1, 16384, 16777215},
 | 
	
		
			
				|  |  | -                             {true, true}},
 | 
	
		
			
				|  |  | -                            {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
 | 
	
		
			
				|  |  | -                             GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
 | 
	
		
			
				|  |  | -                             {-1, 5, INT32_MAX},
 | 
	
		
			
				|  |  | -                             {true, true}}};
 | 
	
		
			
				|  |  | +        } settings_map[] = {
 | 
	
		
			
				|  |  | +            {GRPC_ARG_MAX_CONCURRENT_STREAMS,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
 | 
	
		
			
				|  |  | +             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | +             {true, false}},
 | 
	
		
			
				|  |  | +            {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
 | 
	
		
			
				|  |  | +             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | +             {true, true}},
 | 
	
		
			
				|  |  | +            {GRPC_ARG_MAX_METADATA_SIZE,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
 | 
	
		
			
				|  |  | +             {-1, 0, INT32_MAX},
 | 
	
		
			
				|  |  | +             {true, true}},
 | 
	
		
			
				|  |  | +            {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
 | 
	
		
			
				|  |  | +             {-1, 16384, 16777215},
 | 
	
		
			
				|  |  | +             {true, true}},
 | 
	
		
			
				|  |  | +            {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
 | 
	
		
			
				|  |  | +             {1, 0, 1},
 | 
	
		
			
				|  |  | +             {true, true}},
 | 
	
		
			
				|  |  | +            {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
 | 
	
		
			
				|  |  | +             GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
 | 
	
		
			
				|  |  | +             {-1, 5, INT32_MAX},
 | 
	
		
			
				|  |  | +             {true, true}}};
 | 
	
		
			
				|  |  |          for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) {
 | 
	
		
			
				|  |  |            if (0 == strcmp(channel_args->args[i].key,
 | 
	
		
			
				|  |  |                            settings_map[j].channel_arg_name)) {
 | 
	
	
		
			
				|  | @@ -543,6 +559,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |          exec_ctx, &t->keepalive_ping_timer,
 | 
	
		
			
				|  |  |          gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
 | 
	
		
			
				|  |  |          &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
 | 
	
		
			
				|  |  | +       inflight keeaplive timers */
 | 
	
		
			
				|  |  | +    t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
 | 
	
	
		
			
				|  | @@ -591,21 +611,18 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |      connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
 | 
	
		
			
				|  |  |                             GRPC_ERROR_REF(error), "close_transport");
 | 
	
		
			
				|  |  |      grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    if (t->is_client) {
 | 
	
		
			
				|  |  | -      switch (t->keepalive_state) {
 | 
	
		
			
				|  |  | -        case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: {
 | 
	
		
			
				|  |  | -          grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: {
 | 
	
		
			
				|  |  | -          grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
 | 
	
		
			
				|  |  | -          grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: {
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +    switch (t->keepalive_state) {
 | 
	
		
			
				|  |  | +      case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
 | 
	
		
			
				|  |  | +        grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
 | 
	
		
			
				|  |  | +        grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
 | 
	
		
			
				|  |  | +        grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
 | 
	
		
			
				|  |  | +      case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
 | 
	
		
			
				|  |  | +        /* keepalive timers are not set in these two states */
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /* flush writable stream list to avoid dangling references */
 | 
	
	
		
			
				|  | @@ -648,7 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |    /* We reserve one 'active stream' that's dropped when the stream is
 | 
	
		
			
				|  |  |       read-closed. The others are for incoming_byte_streams that are actively
 | 
	
		
			
				|  |  |       reading */
 | 
	
		
			
				|  |  | -  gpr_ref_init(&s->active_streams, 1);
 | 
	
		
			
				|  |  |    GRPC_CHTTP2_STREAM_REF(s, "chttp2");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
 | 
	
	
		
			
				|  | @@ -658,6 +674,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |    s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |    grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_init(&s->frame_storage);
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
 | 
	
		
			
				|  |  | +                    grpc_combiner_scheduler(t->combiner, false));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -675,7 +696,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |                                    grpc_error *error) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = sp;
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -686,9 +706,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |      GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_destroy_internal(exec_ctx,
 | 
	
		
			
				|  |  | +                                     &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_chttp2_list_remove_stalled_by_transport(t, s);
 | 
	
		
			
				|  |  |    grpc_chttp2_list_remove_stalled_by_stream(t, s);
 | 
	
	
		
			
				|  | @@ -715,6 +735,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
 | 
	
		
			
				|  |  |    grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->read_closed_error);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->write_closed_error);
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (s->incoming_window_delta > 0) {
 | 
	
		
			
				|  |  |      GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
 | 
	
	
		
			
				|  | @@ -1168,8 +1189,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        s->fetching_send_message = NULL;
 | 
	
		
			
				|  |  |        return; /* early out */
 | 
	
		
			
				|  |  |      } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | -                                     &s->fetching_slice, UINT32_MAX,
 | 
	
		
			
				|  |  | -                                     &s->complete_fetch_locked)) {
 | 
	
		
			
				|  |  | +                                     UINT32_MAX, &s->complete_fetch_locked)) {
 | 
	
		
			
				|  |  | +      grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | +                            &s->fetching_slice);
 | 
	
		
			
				|  |  |        add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1180,9 +1202,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = gs;
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -    continue_fetching_send_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | +    error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
 | 
	
		
			
				|  |  | +                                  &s->fetching_slice);
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      add_fetched_slice_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +      continue_fetching_send_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      /* TODO(ctiller): what to do here */
 | 
	
		
			
				|  |  |      abort();
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1414,12 +1442,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (op->recv_message) {
 | 
	
		
			
				|  |  | +    size_t already_received;
 | 
	
		
			
				|  |  |      GPR_ASSERT(s->recv_message_ready == NULL);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!s->pending_byte_stream);
 | 
	
		
			
				|  |  |      s->recv_message_ready = op_payload->recv_message.recv_message_ready;
 | 
	
		
			
				|  |  |      s->recv_message = op_payload->recv_message.recv_message;
 | 
	
		
			
				|  |  | -    if (s->id != 0 &&
 | 
	
		
			
				|  |  | -        (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
 | 
	
		
			
				|  |  | -      incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
 | 
	
		
			
				|  |  | +    if (s->id != 0) {
 | 
	
		
			
				|  |  | +      if (s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        already_received = s->frame_storage.length;
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        already_received = s->frame_storage.length +
 | 
	
		
			
				|  |  | +                           s->unprocessed_incoming_frames_buffer.length;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
 | 
	
		
			
				|  |  | +                                               already_received);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1607,13 +1643,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                        grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                                        grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    if (s->recv_initial_metadata_ready != NULL &&
 | 
	
		
			
				|  |  |        s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
 | 
	
		
			
				|  |  |      if (s->seen_error) {
 | 
	
		
			
				|  |  | -      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -             NULL) {
 | 
	
		
			
				|  |  | -        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
	
		
			
				|  | @@ -1626,39 +1662,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                               grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                               grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  | +  grpc_error *error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    if (s->recv_message_ready != NULL) {
 | 
	
		
			
				|  |  | -    while (s->final_metadata_requested && s->seen_error &&
 | 
	
		
			
				|  |  | -           (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -               NULL) {
 | 
	
		
			
				|  |  | -      incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    *s->recv_message = NULL;
 | 
	
		
			
				|  |  | +    if (s->final_metadata_requested && s->seen_error) {
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +      while (s->unprocessed_incoming_frames_buffer.length > 0 ||
 | 
	
		
			
				|  |  | +             s->frame_storage.length > 0) {
 | 
	
		
			
				|  |  | +        if (s->unprocessed_incoming_frames_buffer.length == 0) {
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
 | 
	
		
			
				|  |  | +                                 &s->frame_storage);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        error = grpc_deframe_unprocessed_incoming_frames(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->data_parser, s,
 | 
	
		
			
				|  |  | +            &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
 | 
	
		
			
				|  |  | +        if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +          s->seen_error = true;
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
 | 
	
		
			
				|  |  | +                                                     &s->frame_storage);
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +              exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        } else if (*s->recv_message != NULL) {
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (s->incoming_frames.head != NULL) {
 | 
	
		
			
				|  |  | -      *s->recv_message =
 | 
	
		
			
				|  |  | -          grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
 | 
	
		
			
				|  |  | -      GPR_ASSERT(*s->recv_message != NULL);
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
 | 
	
		
			
				|  |  |        null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
 | 
	
		
			
				|  |  |        *s->recv_message = NULL;
 | 
	
		
			
				|  |  |        null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                         grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                                                         grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  grpc_byte_stream *bs;
 | 
	
		
			
				|  |  |    grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
 | 
	
		
			
				|  |  |        s->write_closed) {
 | 
	
		
			
				|  |  |      if (s->seen_error) {
 | 
	
		
			
				|  |  | -      while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
 | 
	
		
			
				|  |  | -             NULL) {
 | 
	
		
			
				|  |  | -        incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
 | 
	
		
			
				|  |  | +      if (!s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_reset_and_unref_internal(
 | 
	
		
			
				|  |  | +            exec_ctx, &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (s->all_incoming_byte_streams_finished &&
 | 
	
		
			
				|  |  | +    bool pending_data = s->pending_byte_stream ||
 | 
	
		
			
				|  |  | +                        s->unprocessed_incoming_frames_buffer.length > 0;
 | 
	
		
			
				|  |  | +    if (s->read_closed && s->frame_storage.length == 0 &&
 | 
	
		
			
				|  |  | +        (!pending_data || s->seen_error) &&
 | 
	
		
			
				|  |  |          s->recv_trailing_metadata_finished != NULL) {
 | 
	
		
			
				|  |  |        grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
		
			
				|  |  |            exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
 | 
	
	
		
			
				|  | @@ -1669,14 +1731,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                            grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  | -                                            grpc_chttp2_stream *s) {
 | 
	
		
			
				|  |  | -  if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
 | 
	
		
			
				|  |  | -    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |                            uint32_t id, grpc_error *error) {
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
 | 
	
	
		
			
				|  | @@ -1685,10 +1739,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |      t->incoming_stream = NULL;
 | 
	
		
			
				|  |  |      grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | -    grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  | -        exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +  if (s->pending_byte_stream) {
 | 
	
		
			
				|  |  | +    if (s->on_next != NULL) {
 | 
	
		
			
				|  |  | +      grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
 | 
	
		
			
				|  |  | +      if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      incoming_byte_stream_publish_error(exec_ctx, bs, error);
 | 
	
		
			
				|  |  | +      incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +      s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +      s->byte_stream_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
 | 
	
	
		
			
				|  | @@ -1874,7 +1937,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |          s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    decrement_active_streams_locked(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  |      grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -1890,6 +1952,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |    grpc_slice hdr;
 | 
	
		
			
				|  |  |    grpc_slice status_hdr;
 | 
	
		
			
				|  |  |    grpc_slice http_status_hdr;
 | 
	
		
			
				|  |  | +  grpc_slice content_type_hdr;
 | 
	
		
			
				|  |  |    grpc_slice message_pfx;
 | 
	
		
			
				|  |  |    uint8_t *p;
 | 
	
		
			
				|  |  |    uint32_t len = 0;
 | 
	
	
		
			
				|  | @@ -1923,6 +1986,42 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |      *p++ = '0';
 | 
	
		
			
				|  |  |      GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
 | 
	
		
			
				|  |  |      len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    content_type_hdr = grpc_slice_malloc(31);
 | 
	
		
			
				|  |  | +    p = GRPC_SLICE_START_PTR(content_type_hdr);
 | 
	
		
			
				|  |  | +    *p++ = 0x00;
 | 
	
		
			
				|  |  | +    *p++ = 12;
 | 
	
		
			
				|  |  | +    *p++ = 'c';
 | 
	
		
			
				|  |  | +    *p++ = 'o';
 | 
	
		
			
				|  |  | +    *p++ = 'n';
 | 
	
		
			
				|  |  | +    *p++ = 't';
 | 
	
		
			
				|  |  | +    *p++ = 'e';
 | 
	
		
			
				|  |  | +    *p++ = 'n';
 | 
	
		
			
				|  |  | +    *p++ = 't';
 | 
	
		
			
				|  |  | +    *p++ = '-';
 | 
	
		
			
				|  |  | +    *p++ = 't';
 | 
	
		
			
				|  |  | +    *p++ = 'y';
 | 
	
		
			
				|  |  | +    *p++ = 'p';
 | 
	
		
			
				|  |  | +    *p++ = 'e';
 | 
	
		
			
				|  |  | +    *p++ = 16;
 | 
	
		
			
				|  |  | +    *p++ = 'a';
 | 
	
		
			
				|  |  | +    *p++ = 'p';
 | 
	
		
			
				|  |  | +    *p++ = 'p';
 | 
	
		
			
				|  |  | +    *p++ = 'l';
 | 
	
		
			
				|  |  | +    *p++ = 'i';
 | 
	
		
			
				|  |  | +    *p++ = 'c';
 | 
	
		
			
				|  |  | +    *p++ = 'a';
 | 
	
		
			
				|  |  | +    *p++ = 't';
 | 
	
		
			
				|  |  | +    *p++ = 'i';
 | 
	
		
			
				|  |  | +    *p++ = 'o';
 | 
	
		
			
				|  |  | +    *p++ = 'n';
 | 
	
		
			
				|  |  | +    *p++ = '/';
 | 
	
		
			
				|  |  | +    *p++ = 'g';
 | 
	
		
			
				|  |  | +    *p++ = 'r';
 | 
	
		
			
				|  |  | +    *p++ = 'p';
 | 
	
		
			
				|  |  | +    *p++ = 'c';
 | 
	
		
			
				|  |  | +    GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
 | 
	
		
			
				|  |  | +    len += (uint32_t)GRPC_SLICE_LENGTH(content_type_hdr);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
 | 
	
	
		
			
				|  | @@ -1992,6 +2091,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |    grpc_slice_buffer_add(&t->qbuf, hdr);
 | 
	
		
			
				|  |  |    if (!s->sent_initial_metadata) {
 | 
	
		
			
				|  |  |      grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    grpc_slice_buffer_add(&t->qbuf, status_hdr);
 | 
	
		
			
				|  |  |    grpc_slice_buffer_add(&t->qbuf, message_pfx);
 | 
	
	
		
			
				|  | @@ -2050,6 +2150,7 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 | 
	
		
			
				|  |  |              (int)bdp);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
 | 
	
		
			
				|  |  | +  push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, bdp);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -2267,7 +2368,9 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                                         grpc_error *error) {
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = arg;
 | 
	
		
			
				|  |  |    GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
 | 
	
		
			
				|  |  | -  if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
 | 
	
		
			
				|  |  | +  if (t->destroying || t->closed) {
 | 
	
		
			
				|  |  | +    t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
 | 
	
		
			
				|  |  | +  } else if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      if (t->keepalive_permit_without_calls ||
 | 
	
		
			
				|  |  |          grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
 | 
	
		
			
				|  |  |        t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
 | 
	
	
		
			
				|  | @@ -2282,7 +2385,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |            gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
 | 
	
		
			
				|  |  |            &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  } else if (error == GRPC_ERROR_CANCELLED && !(t->destroying || t->closed)) {
 | 
	
		
			
				|  |  | +  } else if (error == GRPC_ERROR_CANCELLED) {
 | 
	
		
			
				|  |  |      /* The keepalive ping timer may be cancelled by bdp */
 | 
	
		
			
				|  |  |      GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
 | 
	
		
			
				|  |  |      grpc_timer_init(
 | 
	
	
		
			
				|  | @@ -2374,12 +2477,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |   * BYTE STREAM
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                              grpc_error *error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
 | 
	
		
			
				|  |  | +    grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    s->on_next = NULL;
 | 
	
		
			
				|  |  | +    GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +    s->byte_stream_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +    grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    s->byte_stream_error = error;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                         grpc_chttp2_incoming_byte_stream *bs) {
 | 
	
		
			
				|  |  |    if (gpr_unref(&bs->refs)) {
 | 
	
		
			
				|  |  | -    GRPC_ERROR_UNREF(bs->error);
 | 
	
		
			
				|  |  | -    grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
 | 
	
		
			
				|  |  | -    gpr_mu_destroy(&bs->slice_mu);
 | 
	
		
			
				|  |  |      gpr_free(bs);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -2439,47 +2558,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    grpc_chttp2_transport *t = bs->transport;
 | 
	
		
			
				|  |  |    grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (bs->is_tail) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -    size_t cur_length = bs->slices.length;
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -    incoming_byte_stream_update_flow_control(
 | 
	
		
			
				|  |  | -        exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | -  if (bs->slices.count > 0) {
 | 
	
		
			
				|  |  | -    *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
 | 
	
		
			
				|  |  | -    grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  } else if (bs->error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    grpc_closure_run(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | -                     GRPC_ERROR_REF(bs->error));
 | 
	
		
			
				|  |  | +  size_t cur_length = s->frame_storage.length;
 | 
	
		
			
				|  |  | +  incoming_byte_stream_update_flow_control(
 | 
	
		
			
				|  |  | +      exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
 | 
	
		
			
				|  |  | +  if (s->frame_storage.length > 0) {
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_swap(&s->frame_storage,
 | 
	
		
			
				|  |  | +                           &s->unprocessed_incoming_frames_buffer);
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | +                       GRPC_ERROR_REF(s->byte_stream_error));
 | 
	
		
			
				|  |  | +    if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | +      incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
 | 
	
		
			
				|  |  | +      s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else if (s->read_closed) {
 | 
	
		
			
				|  |  | +    if (bs->remaining_bytes != 0) {
 | 
	
		
			
				|  |  | +      s->byte_stream_error =
 | 
	
		
			
				|  |  | +          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +      grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
 | 
	
		
			
				|  |  | +                         GRPC_ERROR_REF(s->byte_stream_error));
 | 
	
		
			
				|  |  | +      if (s->data_parser.parsing_frame != NULL) {
 | 
	
		
			
				|  |  | +        incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
 | 
	
		
			
				|  |  | +        s->data_parser.parsing_frame = NULL;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      /* Should never reach here. */
 | 
	
		
			
				|  |  | +      GPR_ASSERT(false);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    bs->on_next = bs->next_action.on_complete;
 | 
	
		
			
				|  |  | -    bs->next = bs->next_action.slice;
 | 
	
		
			
				|  |  | +    s->on_next = bs->next_action.on_complete;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                     grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | -                                     grpc_slice *slice, size_t max_size_hint,
 | 
	
		
			
				|  |  | -                                     grpc_closure *on_complete) {
 | 
	
		
			
				|  |  | +static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                      grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | +                                      size_t max_size_hint,
 | 
	
		
			
				|  |  | +                                      grpc_closure *on_complete) {
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_byte_stream *bs =
 | 
	
		
			
				|  |  |        (grpc_chttp2_incoming_byte_stream *)byte_stream;
 | 
	
		
			
				|  |  | -  gpr_ref(&bs->refs);
 | 
	
		
			
				|  |  | -  bs->next_action.slice = slice;
 | 
	
		
			
				|  |  | -  bs->next_action.max_size_hint = max_size_hint;
 | 
	
		
			
				|  |  | -  bs->next_action.on_complete = on_complete;
 | 
	
		
			
				|  |  | -  grpc_closure_sched(
 | 
	
		
			
				|  |  | -      exec_ctx,
 | 
	
		
			
				|  |  | -      grpc_closure_init(
 | 
	
		
			
				|  |  | -          &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
 | 
	
		
			
				|  |  | -          grpc_combiner_scheduler(bs->transport->combiner, false)),
 | 
	
		
			
				|  |  | -      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  GPR_TIMER_END("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  | -  return 0;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +  if (s->unprocessed_incoming_frames_buffer.length > 0) {
 | 
	
		
			
				|  |  | +    return true;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    gpr_ref(&bs->refs);
 | 
	
		
			
				|  |  | +    bs->next_action.max_size_hint = max_size_hint;
 | 
	
		
			
				|  |  | +    bs->next_action.on_complete = on_complete;
 | 
	
		
			
				|  |  | +    grpc_closure_sched(
 | 
	
		
			
				|  |  | +        exec_ctx,
 | 
	
		
			
				|  |  | +        grpc_closure_init(
 | 
	
		
			
				|  |  | +            &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
 | 
	
		
			
				|  |  | +            grpc_combiner_scheduler(bs->transport->combiner, false)),
 | 
	
		
			
				|  |  | +        GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    GPR_TIMER_END("incoming_byte_stream_next", 0);
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                             grpc_byte_stream *byte_stream,
 | 
	
		
			
				|  |  | +                                             grpc_slice *slice) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_byte_stream *bs =
 | 
	
		
			
				|  |  | +      (grpc_chttp2_incoming_byte_stream *)byte_stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (s->unprocessed_incoming_frames_buffer.length > 0) {
 | 
	
		
			
				|  |  | +    grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
 | 
	
		
			
				|  |  | +        exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
 | 
	
		
			
				|  |  | +        slice, NULL);
 | 
	
		
			
				|  |  | +    if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      return error;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    grpc_error *error =
 | 
	
		
			
				|  |  | +        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    return error;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_TIMER_END("incoming_byte_stream_pull", 0);
 | 
	
		
			
				|  |  | +  return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -2489,9 +2651,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                  void *byte_stream,
 | 
	
		
			
				|  |  |                                                  grpc_error *error_ignored) {
 | 
	
		
			
				|  |  |    grpc_chttp2_incoming_byte_stream *bs = byte_stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +  grpc_chttp2_transport *t = s->t;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
 | 
	
		
			
				|  |  | -  decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +  s->pending_byte_stream = false;
 | 
	
		
			
				|  |  | +  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +  grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -2511,50 +2678,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  static void incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  |      grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  |      grpc_error *error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(error != GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  bs->on_next = NULL;
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(bs->error);
 | 
	
		
			
				|  |  | +  grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +  s->on_next = NULL;
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(s->byte_stream_error);
 | 
	
		
			
				|  |  | +  s->byte_stream_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |    grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
 | 
	
		
			
				|  |  |                              GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -  bs->error = error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                           grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | -                                           grpc_slice slice) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  | +grpc_error *grpc_chttp2_incoming_byte_stream_push(
 | 
	
		
			
				|  |  | +    grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | +    grpc_slice slice, grpc_slice *slice_out) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_publish_error(
 | 
	
		
			
				|  |  | -        exec_ctx, bs,
 | 
	
		
			
				|  |  | -        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
 | 
	
		
			
				|  |  | +    grpc_error *error =
 | 
	
		
			
				|  |  | +        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    grpc_slice_unref_internal(exec_ctx, slice);
 | 
	
		
			
				|  |  | +    return error;
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
 | 
	
		
			
				|  |  | -    if (bs->on_next != NULL) {
 | 
	
		
			
				|  |  | -      *bs->next = slice;
 | 
	
		
			
				|  |  | -      grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -      bs->on_next = NULL;
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      grpc_slice_buffer_add(&bs->slices, slice);
 | 
	
		
			
				|  |  | +    if (slice_out != NULL) {
 | 
	
		
			
				|  |  | +      *slice_out = slice;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  | +grpc_error *grpc_chttp2_incoming_byte_stream_finished(
 | 
	
		
			
				|  |  |      grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
 | 
	
		
			
				|  |  | -    grpc_error *error) {
 | 
	
		
			
				|  |  | +    grpc_error *error, bool reset_on_error) {
 | 
	
		
			
				|  |  | +  grpc_chttp2_stream *s = bs->stream;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&bs->slice_mu);
 | 
	
		
			
				|  |  |      if (bs->remaining_bytes != 0) {
 | 
	
		
			
				|  |  |        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&bs->slice_mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    incoming_byte_stream_publish_error(exec_ctx, bs, error);
 | 
	
		
			
				|  |  | +  if (error != GRPC_ERROR_NONE && reset_on_error) {
 | 
	
		
			
				|  |  | +    grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    incoming_byte_stream_unref(exec_ctx, bs);
 | 
	
		
			
				|  |  | +  return error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
 | 
	
	
		
			
				|  | @@ -2566,26 +2736,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
 | 
	
		
			
				|  |  |    incoming_byte_stream->remaining_bytes = frame_size;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.flags = flags;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.next = incoming_byte_stream_next;
 | 
	
		
			
				|  |  | +  incoming_byte_stream->base.pull = incoming_byte_stream_pull;
 | 
	
		
			
				|  |  |    incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&incoming_byte_stream->slice_mu);
 | 
	
		
			
				|  |  |    gpr_ref_init(&incoming_byte_stream->refs, 2);
 | 
	
		
			
				|  |  | -  incoming_byte_stream->next_message = NULL;
 | 
	
		
			
				|  |  |    incoming_byte_stream->transport = t;
 | 
	
		
			
				|  |  |    incoming_byte_stream->stream = s;
 | 
	
		
			
				|  |  | -  gpr_ref(&incoming_byte_stream->stream->active_streams);
 | 
	
		
			
				|  |  | -  grpc_slice_buffer_init(&incoming_byte_stream->slices);
 | 
	
		
			
				|  |  | -  incoming_byte_stream->on_next = NULL;
 | 
	
		
			
				|  |  | -  incoming_byte_stream->is_tail = 1;
 | 
	
		
			
				|  |  | -  incoming_byte_stream->error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -  grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
 | 
	
		
			
				|  |  | -  if (q->head == NULL) {
 | 
	
		
			
				|  |  | -    q->head = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    q->tail->is_tail = 0;
 | 
	
		
			
				|  |  | -    q->tail->next_message = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  q->tail = incoming_byte_stream;
 | 
	
		
			
				|  |  | -  grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
 | 
	
		
			
				|  |  | +  s->byte_stream_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    return incoming_byte_stream;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 |