|  | @@ -91,10 +91,9 @@ typedef enum {
 | 
	
		
			
				|  |  |    /* streams that are waiting to start because there are too many concurrent
 | 
	
		
			
				|  |  |       streams on the connection */
 | 
	
		
			
				|  |  |    WAITING_FOR_CONCURRENCY,
 | 
	
		
			
				|  |  | -  /* streams that want to callback the application */
 | 
	
		
			
				|  |  | -  PENDING_CALLBACKS,
 | 
	
		
			
				|  |  | -  /* streams that *ARE* calling back to the application */
 | 
	
		
			
				|  |  | -  EXECUTING_CALLBACKS,
 | 
	
		
			
				|  |  | +  /* streams that have finished reading: we wait until unlock to coalesce
 | 
	
		
			
				|  |  | +     all changes into one callback */
 | 
	
		
			
				|  |  | +  FINISHED_READ_OP,
 | 
	
		
			
				|  |  |    STREAM_LIST_COUNT /* must be last */
 | 
	
		
			
				|  |  |  } stream_list_id;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -141,6 +140,12 @@ typedef enum {
 | 
	
		
			
				|  |  |    DTS_FRAME
 | 
	
		
			
				|  |  |  } deframe_transport_state;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +typedef enum {
 | 
	
		
			
				|  |  | +  WRITE_STATE_OPEN,
 | 
	
		
			
				|  |  | +  WRITE_STATE_QUEUED_CLOSE,
 | 
	
		
			
				|  |  | +  WRITE_STATE_SENT_CLOSE
 | 
	
		
			
				|  |  | +} WRITE_STATE;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  typedef struct {
 | 
	
		
			
				|  |  |    stream *head;
 | 
	
		
			
				|  |  |    stream *tail;
 | 
	
	
		
			
				|  | @@ -182,6 +187,18 @@ typedef struct {
 | 
	
		
			
				|  |  |    gpr_slice debug;
 | 
	
		
			
				|  |  |  } pending_goaway;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +typedef struct {
 | 
	
		
			
				|  |  | +  void (*cb)(void *user_data, int success);
 | 
	
		
			
				|  |  | +  void *user_data;
 | 
	
		
			
				|  |  | +  int success;
 | 
	
		
			
				|  |  | +} op_closure;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +typedef struct {
 | 
	
		
			
				|  |  | +  op_closure *callbacks;
 | 
	
		
			
				|  |  | +  size_t count;
 | 
	
		
			
				|  |  | +  size_t capacity;
 | 
	
		
			
				|  |  | +} op_closure_array;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  struct transport {
 | 
	
		
			
				|  |  |    grpc_transport base; /* must be first */
 | 
	
		
			
				|  |  |    const grpc_transport_callbacks *cb;
 | 
	
	
		
			
				|  | @@ -202,6 +219,10 @@ struct transport {
 | 
	
		
			
				|  |  |    gpr_uint8 closed;
 | 
	
		
			
				|  |  |    error_state error_state;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /* queued callbacks */
 | 
	
		
			
				|  |  | +  op_closure_array pending_callbacks;
 | 
	
		
			
				|  |  | +  op_closure_array executing_callbacks;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* stream indexing */
 | 
	
		
			
				|  |  |    gpr_uint32 next_stream_id;
 | 
	
		
			
				|  |  |    gpr_uint32 last_incoming_stream_id;
 | 
	
	
		
			
				|  | @@ -281,13 +302,13 @@ struct stream {
 | 
	
		
			
				|  |  |    /* when the application requests writes be closed, the write_closed is
 | 
	
		
			
				|  |  |       'queued'; when the close is flow controlled into the send path, we are
 | 
	
		
			
				|  |  |       'sending' it; when the write has been performed it is 'sent' */
 | 
	
		
			
				|  |  | -  gpr_uint8 queued_write_closed;
 | 
	
		
			
				|  |  | -  gpr_uint8 sending_write_closed;
 | 
	
		
			
				|  |  | -  gpr_uint8 sent_write_closed;
 | 
	
		
			
				|  |  | +  WRITE_STATE write_state;
 | 
	
		
			
				|  |  | +  gpr_uint8 send_closed;
 | 
	
		
			
				|  |  |    gpr_uint8 read_closed;
 | 
	
		
			
				|  |  |    gpr_uint8 cancelled;
 | 
	
		
			
				|  |  | -  gpr_uint8 allow_window_updates;
 | 
	
		
			
				|  |  | -  gpr_uint8 published_close;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  op_closure send_done_closure;
 | 
	
		
			
				|  |  | +  op_closure recv_done_closure;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    stream_link links[STREAM_LIST_COUNT];
 | 
	
		
			
				|  |  |    gpr_uint8 included[STREAM_LIST_COUNT];
 | 
	
	
		
			
				|  | @@ -296,10 +317,14 @@ struct stream {
 | 
	
		
			
				|  |  |    grpc_linked_mdelem *incoming_metadata;
 | 
	
		
			
				|  |  |    size_t incoming_metadata_count;
 | 
	
		
			
				|  |  |    size_t incoming_metadata_capacity;
 | 
	
		
			
				|  |  | +  grpc_linked_mdelem *old_incoming_metadata;
 | 
	
		
			
				|  |  |    gpr_timespec incoming_deadline;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* sops from application */
 | 
	
		
			
				|  |  | -  grpc_stream_op_buffer outgoing_sopb;
 | 
	
		
			
				|  |  | +  grpc_stream_op_buffer *outgoing_sopb;
 | 
	
		
			
				|  |  | +  grpc_stream_op_buffer *incoming_sopb;
 | 
	
		
			
				|  |  | +  grpc_stream_state *publish_state;
 | 
	
		
			
				|  |  | +  grpc_stream_state published_state;
 | 
	
		
			
				|  |  |    /* sops that have passed flow control to be written */
 | 
	
		
			
				|  |  |    grpc_stream_op_buffer writing_sopb;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
 | 
	
		
			
				|  |  |                               grpc_chttp2_error_code error_code, int send_rst);
 | 
	
		
			
				|  |  |  static void cancel_stream(transport *t, stream *s,
 | 
	
		
			
				|  |  |                            grpc_status_code local_status,
 | 
	
		
			
				|  |  | -                          grpc_chttp2_error_code error_code, int send_rst);
 | 
	
		
			
				|  |  | +                          grpc_chttp2_error_code error_code,
 | 
	
		
			
				|  |  | +                          grpc_mdstr *optional_message, int send_rst);
 | 
	
		
			
				|  |  |  static void finalize_cancellations(transport *t);
 | 
	
		
			
				|  |  |  static stream *lookup_stream(transport *t, gpr_uint32 id);
 | 
	
		
			
				|  |  |  static void remove_from_stream_map(transport *t, stream *s);
 | 
	
	
		
			
				|  | @@ -348,6 +374,14 @@ static void become_skip_parser(transport *t);
 | 
	
		
			
				|  |  |  static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
 | 
	
		
			
				|  |  |                        grpc_endpoint_cb_status error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void schedule_cb(transport *t, op_closure closure, int success);
 | 
	
		
			
				|  |  | +static void maybe_finish_read(transport *t, stream *s);
 | 
	
		
			
				|  |  | +static void maybe_join_window_updates(transport *t, stream *s);
 | 
	
		
			
				|  |  | +static void finish_reads(transport *t);
 | 
	
		
			
				|  |  | +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
 | 
	
		
			
				|  |  | +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
 | 
	
		
			
				|  |  | +static void add_metadata_batch(transport *t, stream *s);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /*
 | 
	
		
			
				|  |  |   * CONSTRUCTION/DESTRUCTION/REFCOUNTING
 | 
	
		
			
				|  |  |   */
 | 
	
	
		
			
				|  | @@ -387,6 +421,9 @@ static void destruct_transport(transport *t) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(t->pings);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  gpr_free(t->pending_callbacks.callbacks);
 | 
	
		
			
				|  |  | +  gpr_free(t->executing_callbacks.callbacks);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    for (i = 0; i < t->num_pending_goaways; i++) {
 | 
	
		
			
				|  |  |      gpr_slice_unref(t->pending_goaways[i].debug);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  memset(t, 0, sizeof(*t));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    t->base.vtable = &vtable;
 | 
	
		
			
				|  |  |    t->ep = ep;
 | 
	
		
			
				|  |  |    /* one ref is for destroy, the other for when ep becomes NULL */
 | 
	
	
		
			
				|  | @@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |    t->str_grpc_timeout =
 | 
	
		
			
				|  |  |        grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
 | 
	
		
			
				|  |  |    t->reading = 1;
 | 
	
		
			
				|  |  | -  t->writing = 0;
 | 
	
		
			
				|  |  |    t->error_state = ERROR_STATE_NONE;
 | 
	
		
			
				|  |  |    t->next_stream_id = is_client ? 1 : 2;
 | 
	
		
			
				|  |  | -  t->last_incoming_stream_id = 0;
 | 
	
		
			
				|  |  | -  t->destroying = 0;
 | 
	
		
			
				|  |  | -  t->closed = 0;
 | 
	
		
			
				|  |  |    t->is_client = is_client;
 | 
	
		
			
				|  |  |    t->outgoing_window = DEFAULT_WINDOW;
 | 
	
		
			
				|  |  |    t->incoming_window = DEFAULT_WINDOW;
 | 
	
		
			
				|  |  |    t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
 | 
	
		
			
				|  |  |    t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
 | 
	
		
			
				|  |  | -  t->expect_continuation_stream_id = 0;
 | 
	
		
			
				|  |  | -  t->pings = NULL;
 | 
	
		
			
				|  |  | -  t->ping_count = 0;
 | 
	
		
			
				|  |  | -  t->ping_capacity = 0;
 | 
	
		
			
				|  |  |    t->ping_counter = gpr_now().tv_nsec;
 | 
	
		
			
				|  |  |    grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
 | 
	
		
			
				|  |  |    grpc_chttp2_goaway_parser_init(&t->goaway_parser);
 | 
	
		
			
				|  |  | -  t->pending_goaways = NULL;
 | 
	
		
			
				|  |  | -  t->num_pending_goaways = 0;
 | 
	
		
			
				|  |  | -  t->cap_pending_goaways = 0;
 | 
	
		
			
				|  |  |    gpr_slice_buffer_init(&t->outbuf);
 | 
	
		
			
				|  |  |    gpr_slice_buffer_init(&t->qbuf);
 | 
	
		
			
				|  |  |    grpc_sopb_init(&t->nuke_later_sopb);
 | 
	
	
		
			
				|  | @@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |       needed.
 | 
	
		
			
				|  |  |       TODO(ctiller): tune this */
 | 
	
		
			
				|  |  |    grpc_chttp2_stream_map_init(&t->stream_map, 8);
 | 
	
		
			
				|  |  | -  memset(&t->lists, 0, sizeof(t->lists));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* copy in initial settings to all setting sets */
 | 
	
		
			
				|  |  |    for (i = 0; i < NUM_SETTING_SETS; i++) {
 | 
	
	
		
			
				|  | @@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_mu_lock(&t->mu);
 | 
	
		
			
				|  |  |    t->calling_back = 1;
 | 
	
		
			
				|  |  | -  ref_transport(t);
 | 
	
		
			
				|  |  | +  ref_transport(t); /* matches unref at end of this function */
 | 
	
		
			
				|  |  |    gpr_mu_unlock(&t->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    sr = setup(arg, &t->base, t->metadata_context);
 | 
	
	
		
			
				|  | @@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |    if (t->destroying) gpr_cv_signal(&t->cv);
 | 
	
		
			
				|  |  |    unlock(t);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  ref_transport(t);
 | 
	
		
			
				|  |  | +  ref_transport(t); /* matches unref inside recv_data */
 | 
	
		
			
				|  |  |    recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    unref_transport(t);
 | 
	
	
		
			
				|  | @@ -573,16 +600,19 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static int init_stream(grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  | -                       const void *server_data) {
 | 
	
		
			
				|  |  | +                       const void *server_data, grpc_transport_op *initial_op) {
 | 
	
		
			
				|  |  |    transport *t = (transport *)gt;
 | 
	
		
			
				|  |  |    stream *s = (stream *)gs;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  memset(s, 0, sizeof(*s));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    ref_transport(t);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (!server_data) {
 | 
	
		
			
				|  |  |      lock(t);
 | 
	
		
			
				|  |  |      s->id = 0;
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | +    /* already locked */
 | 
	
		
			
				|  |  |      s->id = (gpr_uint32)(gpr_uintptr)server_data;
 | 
	
		
			
				|  |  |      t->incoming_stream = s;
 | 
	
		
			
				|  |  |      grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
 | 
	
	
		
			
				|  | @@ -592,24 +622,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  |        t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
 | 
	
		
			
				|  |  |    s->incoming_window =
 | 
	
		
			
				|  |  |        t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
 | 
	
		
			
				|  |  | -  s->queued_write_closed = 0;
 | 
	
		
			
				|  |  | -  s->sending_write_closed = 0;
 | 
	
		
			
				|  |  | -  s->sent_write_closed = 0;
 | 
	
		
			
				|  |  | -  s->read_closed = 0;
 | 
	
		
			
				|  |  | -  s->cancelled = 0;
 | 
	
		
			
				|  |  | -  s->allow_window_updates = 0;
 | 
	
		
			
				|  |  | -  s->published_close = 0;
 | 
	
		
			
				|  |  | -  s->incoming_metadata_count = 0;
 | 
	
		
			
				|  |  | -  s->incoming_metadata_capacity = 0;
 | 
	
		
			
				|  |  | -  s->incoming_metadata = NULL;
 | 
	
		
			
				|  |  |    s->incoming_deadline = gpr_inf_future;
 | 
	
		
			
				|  |  | -  memset(&s->links, 0, sizeof(s->links));
 | 
	
		
			
				|  |  | -  memset(&s->included, 0, sizeof(s->included));
 | 
	
		
			
				|  |  | -  grpc_sopb_init(&s->outgoing_sopb);
 | 
	
		
			
				|  |  |    grpc_sopb_init(&s->writing_sopb);
 | 
	
		
			
				|  |  |    grpc_sopb_init(&s->callback_sopb);
 | 
	
		
			
				|  |  |    grpc_chttp2_data_parser_init(&s->parser);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  if (initial_op) perform_op_locked(t, s, initial_op);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (!server_data) {
 | 
	
		
			
				|  |  |      unlock(t);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -642,10 +661,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_mu_unlock(&t->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_sopb_destroy(&s->outgoing_sopb);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(s->outgoing_sopb == NULL);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(s->incoming_sopb == NULL);
 | 
	
		
			
				|  |  |    grpc_sopb_destroy(&s->writing_sopb);
 | 
	
		
			
				|  |  |    grpc_sopb_destroy(&s->callback_sopb);
 | 
	
		
			
				|  |  |    grpc_chttp2_data_parser_destroy(&s->parser);
 | 
	
		
			
				|  |  | +  for (i = 0; i < s->incoming_metadata_count; i++) {
 | 
	
		
			
				|  |  | +    grpc_mdelem_unref(s->incoming_metadata[i].md);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_free(s->incoming_metadata);
 | 
	
		
			
				|  |  | +  gpr_free(s->old_incoming_metadata);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    unref_transport(t);
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -708,8 +733,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void stream_list_join(transport *t, stream *s, stream_list_id id) {
 | 
	
		
			
				|  |  | -  if (id == PENDING_CALLBACKS)
 | 
	
		
			
				|  |  | -    GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
 | 
	
		
			
				|  |  |    if (s->included[id]) {
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -718,6 +741,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void remove_from_stream_map(transport *t, stream *s) {
 | 
	
		
			
				|  |  |    if (s->id == 0) return;
 | 
	
		
			
				|  |  | +  IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
 | 
	
		
			
				|  |  | +                     t->is_client ? "CLI" : "SVR", s->id));
 | 
	
		
			
				|  |  |    if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
 | 
	
		
			
				|  |  |      maybe_start_some_streams(t);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -762,6 +787,8 @@ static void unlock(transport *t) {
 | 
	
		
			
				|  |  |      finalize_cancellations(t);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  finish_reads(t);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* gather any callbacks that need to be made */
 | 
	
		
			
				|  |  |    if (!t->calling_back && cb) {
 | 
	
		
			
				|  |  |      perform_callbacks = prepare_callbacks(t);
 | 
	
	
		
			
				|  | @@ -865,21 +892,24 @@ static int prepare_write(transport *t) {
 | 
	
		
			
				|  |  |    while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
 | 
	
		
			
				|  |  |           s->outgoing_window > 0) {
 | 
	
		
			
				|  |  |      window_delta = grpc_chttp2_preencode(
 | 
	
		
			
				|  |  | -        s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
 | 
	
		
			
				|  |  | +        s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
 | 
	
		
			
				|  |  |          GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
 | 
	
		
			
				|  |  |      t->outgoing_window -= window_delta;
 | 
	
		
			
				|  |  |      s->outgoing_window -= window_delta;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    s->sending_write_closed =
 | 
	
		
			
				|  |  | -        s->queued_write_closed && s->outgoing_sopb.nops == 0;
 | 
	
		
			
				|  |  | -    if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
 | 
	
		
			
				|  |  | +    if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
 | 
	
		
			
				|  |  | +        s->outgoing_sopb->nops == 0) {
 | 
	
		
			
				|  |  | +      s->send_closed = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (s->writing_sopb.nops > 0 || s->send_closed) {
 | 
	
		
			
				|  |  |        stream_list_join(t, s, WRITING);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    /* if there are still writes to do and the stream still has window
 | 
	
		
			
				|  |  | -       available, then schedule a further write */
 | 
	
		
			
				|  |  | -    if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(!t->outgoing_window);
 | 
	
		
			
				|  |  | +    /* we should either exhaust window or have no ops left, but not both */
 | 
	
		
			
				|  |  | +    if (s->outgoing_sopb->nops == 0) {
 | 
	
		
			
				|  |  | +      s->outgoing_sopb = NULL;
 | 
	
		
			
				|  |  | +      schedule_cb(t, s->send_done_closure, 1);
 | 
	
		
			
				|  |  | +    } else if (s->outgoing_window) {
 | 
	
		
			
				|  |  |        stream_list_add_tail(t, s, WRITABLE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -912,10 +942,9 @@ static void finalize_outbuf(transport *t) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    while ((s = stream_list_remove_head(t, WRITING))) {
 | 
	
		
			
				|  |  |      grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
 | 
	
		
			
				|  |  | -                       s->sending_write_closed, s->id, &t->hpack_compressor,
 | 
	
		
			
				|  |  | -                       &t->outbuf);
 | 
	
		
			
				|  |  | +                       s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
 | 
	
		
			
				|  |  |      s->writing_sopb.nops = 0;
 | 
	
		
			
				|  |  | -    if (s->sending_write_closed) {
 | 
	
		
			
				|  |  | +    if (s->send_closed) {
 | 
	
		
			
				|  |  |        stream_list_join(t, s, WRITTEN_CLOSED);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -929,8 +958,10 @@ static void finish_write_common(transport *t, int success) {
 | 
	
		
			
				|  |  |      drop_connection(t);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
 | 
	
		
			
				|  |  | -    s->sent_write_closed = 1;
 | 
	
		
			
				|  |  | -    if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +    s->write_state = WRITE_STATE_SENT_CLOSE;
 | 
	
		
			
				|  |  | +    if (1||!s->cancelled) {
 | 
	
		
			
				|  |  | +      maybe_finish_read(t, s);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    t->outbuf.count = 0;
 | 
	
		
			
				|  |  |    t->outbuf.length = 0;
 | 
	
	
		
			
				|  | @@ -980,6 +1011,9 @@ static void maybe_start_some_streams(transport *t) {
 | 
	
		
			
				|  |  |      stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
 | 
	
		
			
				|  |  |      if (!s) break;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
 | 
	
		
			
				|  |  | +                       t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      GPR_ASSERT(s->id == 0);
 | 
	
		
			
				|  |  |      s->id = t->next_stream_id;
 | 
	
		
			
				|  |  |      t->next_stream_id += 2;
 | 
	
	
		
			
				|  | @@ -988,43 +1022,63 @@ static void maybe_start_some_streams(transport *t) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
 | 
	
		
			
				|  |  | -                       size_t ops_count, int is_last) {
 | 
	
		
			
				|  |  | -  transport *t = (transport *)gt;
 | 
	
		
			
				|  |  | -  stream *s = (stream *)gs;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  lock(t);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (is_last) {
 | 
	
		
			
				|  |  | -    s->queued_write_closed = 1;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (!s->cancelled) {
 | 
	
		
			
				|  |  | -    grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
 | 
	
		
			
				|  |  | -    if (s->id == 0) {
 | 
	
		
			
				|  |  | -      stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
 | 
	
		
			
				|  |  | -      maybe_start_some_streams(t);
 | 
	
		
			
				|  |  | +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
 | 
	
		
			
				|  |  | +  if (op->cancel_with_status != GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | +    cancel_stream(
 | 
	
		
			
				|  |  | +        t, s, op->cancel_with_status,
 | 
	
		
			
				|  |  | +        grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
 | 
	
		
			
				|  |  | +        op->cancel_message, 1);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (op->send_ops) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(s->outgoing_sopb == NULL);
 | 
	
		
			
				|  |  | +    s->send_done_closure.cb = op->on_done_send;
 | 
	
		
			
				|  |  | +    s->send_done_closure.user_data = op->send_user_data;
 | 
	
		
			
				|  |  | +    if (!s->cancelled) {
 | 
	
		
			
				|  |  | +      s->outgoing_sopb = op->send_ops;
 | 
	
		
			
				|  |  | +      if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
 | 
	
		
			
				|  |  | +        s->write_state = WRITE_STATE_QUEUED_CLOSE;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      if (s->id == 0) {
 | 
	
		
			
				|  |  | +        IF_TRACING(gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | +                           "HTTP:%s: New stream %p waiting for concurrency",
 | 
	
		
			
				|  |  | +                           t->is_client ? "CLI" : "SVR", s));
 | 
	
		
			
				|  |  | +        stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
 | 
	
		
			
				|  |  | +        maybe_start_some_streams(t);
 | 
	
		
			
				|  |  | +      } else if (s->outgoing_window > 0) {
 | 
	
		
			
				|  |  | +        stream_list_join(t, s, WRITABLE);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  | -      stream_list_join(t, s, WRITABLE);
 | 
	
		
			
				|  |  | +      schedule_nuke_sopb(t, op->send_ops);
 | 
	
		
			
				|  |  | +      schedule_cb(t, s->send_done_closure, 0);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
 | 
	
		
			
				|  |  | -      !s->published_close) {
 | 
	
		
			
				|  |  | -    stream_list_join(t, s, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (op->recv_ops) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(s->incoming_sopb == NULL);
 | 
	
		
			
				|  |  | +    s->recv_done_closure.cb = op->on_done_recv;
 | 
	
		
			
				|  |  | +    s->recv_done_closure.user_data = op->recv_user_data;
 | 
	
		
			
				|  |  | +    s->incoming_sopb = op->recv_ops;
 | 
	
		
			
				|  |  | +    s->incoming_sopb->nops = 0;
 | 
	
		
			
				|  |  | +    s->publish_state = op->recv_state;
 | 
	
		
			
				|  |  | +    gpr_free(s->old_incoming_metadata);
 | 
	
		
			
				|  |  | +    s->old_incoming_metadata = NULL;
 | 
	
		
			
				|  |  | +    maybe_finish_read(t, s);
 | 
	
		
			
				|  |  | +    maybe_join_window_updates(t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  unlock(t);
 | 
	
		
			
				|  |  | +  if (op->bind_pollset) {
 | 
	
		
			
				|  |  | +    add_to_pollset_locked(t, op->bind_pollset);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void abort_stream(grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  | -                         grpc_status_code status) {
 | 
	
		
			
				|  |  | +static void perform_op(grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  | +                       grpc_transport_op *op) {
 | 
	
		
			
				|  |  |    transport *t = (transport *)gt;
 | 
	
		
			
				|  |  |    stream *s = (stream *)gs;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    lock(t);
 | 
	
		
			
				|  |  | -  cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
 | 
	
		
			
				|  |  | -                1);
 | 
	
		
			
				|  |  | +  perform_op_locked(t, s, op);
 | 
	
		
			
				|  |  |    unlock(t);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1063,8 +1117,8 @@ static void finalize_cancellations(transport *t) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    while ((s = stream_list_remove_head(t, CANCELLED))) {
 | 
	
		
			
				|  |  |      s->read_closed = 1;
 | 
	
		
			
				|  |  | -    s->sent_write_closed = 1;
 | 
	
		
			
				|  |  | -    stream_list_join(t, s, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +    s->write_state = WRITE_STATE_SENT_CLOSE;
 | 
	
		
			
				|  |  | +    maybe_finish_read(t, s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1082,18 +1136,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
 | 
	
		
			
				|  |  |  static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
 | 
	
		
			
				|  |  |                                  grpc_status_code local_status,
 | 
	
		
			
				|  |  |                                  grpc_chttp2_error_code error_code,
 | 
	
		
			
				|  |  | -                                int send_rst) {
 | 
	
		
			
				|  |  | +                                grpc_mdstr *optional_message, int send_rst) {
 | 
	
		
			
				|  |  |    int had_outgoing;
 | 
	
		
			
				|  |  |    char buffer[GPR_LTOA_MIN_BUFSIZE];
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (s) {
 | 
	
		
			
				|  |  |      /* clear out any unreported input & output: nobody cares anymore */
 | 
	
		
			
				|  |  | -    had_outgoing = s->outgoing_sopb.nops != 0;
 | 
	
		
			
				|  |  | +    had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
 | 
	
		
			
				|  |  |      schedule_nuke_sopb(t, &s->parser.incoming_sopb);
 | 
	
		
			
				|  |  | -    schedule_nuke_sopb(t, &s->outgoing_sopb);
 | 
	
		
			
				|  |  | +    if (s->outgoing_sopb) {
 | 
	
		
			
				|  |  | +      schedule_nuke_sopb(t, s->outgoing_sopb);
 | 
	
		
			
				|  |  | +      s->outgoing_sopb = NULL;
 | 
	
		
			
				|  |  | +      stream_list_remove(t, s, WRITABLE);
 | 
	
		
			
				|  |  | +      schedule_cb(t, s->send_done_closure, 0);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      if (s->cancelled) {
 | 
	
		
			
				|  |  |        send_rst = 0;
 | 
	
		
			
				|  |  | -    } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
 | 
	
		
			
				|  |  | +    } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
 | 
	
		
			
				|  |  | +               had_outgoing) {
 | 
	
		
			
				|  |  |        s->cancelled = 1;
 | 
	
		
			
				|  |  |        stream_list_join(t, s, CANCELLED);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1101,17 +1161,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
 | 
	
		
			
				|  |  |        add_incoming_metadata(
 | 
	
		
			
				|  |  |            t, s,
 | 
	
		
			
				|  |  |            grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
 | 
	
		
			
				|  |  | -      switch (local_status) {
 | 
	
		
			
				|  |  | -        case GRPC_STATUS_CANCELLED:
 | 
	
		
			
				|  |  | -          add_incoming_metadata(
 | 
	
		
			
				|  |  | -              t, s, grpc_mdelem_from_strings(t->metadata_context,
 | 
	
		
			
				|  |  | -                                             "grpc-message", "Cancelled"));
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        default:
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | +      if (!optional_message) {
 | 
	
		
			
				|  |  | +        switch (local_status) {
 | 
	
		
			
				|  |  | +          case GRPC_STATUS_CANCELLED:
 | 
	
		
			
				|  |  | +            add_incoming_metadata(
 | 
	
		
			
				|  |  | +                t, s, grpc_mdelem_from_strings(t->metadata_context,
 | 
	
		
			
				|  |  | +                                               "grpc-message", "Cancelled"));
 | 
	
		
			
				|  |  | +            break;
 | 
	
		
			
				|  |  | +          default:
 | 
	
		
			
				|  |  | +            break;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        add_incoming_metadata(
 | 
	
		
			
				|  |  | +            t, s,
 | 
	
		
			
				|  |  | +            grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +                t->metadata_context,
 | 
	
		
			
				|  |  | +                grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
 | 
	
		
			
				|  |  | +                grpc_mdstr_ref(optional_message)));
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      stream_list_join(t, s, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +      add_metadata_batch(t, s);
 | 
	
		
			
				|  |  | +      maybe_finish_read(t, s);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (!id) send_rst = 0;
 | 
	
	
		
			
				|  | @@ -1119,24 +1188,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
 | 
	
		
			
				|  |  |      gpr_slice_buffer_add(&t->qbuf,
 | 
	
		
			
				|  |  |                           grpc_chttp2_rst_stream_create(id, error_code));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  if (optional_message) {
 | 
	
		
			
				|  |  | +    grpc_mdstr_unref(optional_message);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void cancel_stream_id(transport *t, gpr_uint32 id,
 | 
	
		
			
				|  |  |                               grpc_status_code local_status,
 | 
	
		
			
				|  |  |                               grpc_chttp2_error_code error_code, int send_rst) {
 | 
	
		
			
				|  |  |    cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
 | 
	
		
			
				|  |  | -                      send_rst);
 | 
	
		
			
				|  |  | +                      NULL, send_rst);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void cancel_stream(transport *t, stream *s,
 | 
	
		
			
				|  |  |                            grpc_status_code local_status,
 | 
	
		
			
				|  |  | -                          grpc_chttp2_error_code error_code, int send_rst) {
 | 
	
		
			
				|  |  | -  cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
 | 
	
		
			
				|  |  | +                          grpc_chttp2_error_code error_code,
 | 
	
		
			
				|  |  | +                          grpc_mdstr *optional_message, int send_rst) {
 | 
	
		
			
				|  |  | +  cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
 | 
	
		
			
				|  |  | +                      send_rst);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
 | 
	
		
			
				|  |  |    cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
 | 
	
		
			
				|  |  | -                GRPC_CHTTP2_INTERNAL_ERROR, 0);
 | 
	
		
			
				|  |  | +                GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void end_all_the_calls(transport *t) {
 | 
	
	
		
			
				|  | @@ -1150,8 +1224,14 @@ static void drop_connection(transport *t) {
 | 
	
		
			
				|  |  |    end_all_the_calls(t);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void maybe_finish_read(transport *t, stream *s) {
 | 
	
		
			
				|  |  | +  if (s->incoming_sopb) {
 | 
	
		
			
				|  |  | +    stream_list_join(t, s, FINISHED_READ_OP);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void maybe_join_window_updates(transport *t, stream *s) {
 | 
	
		
			
				|  |  | -  if (s->allow_window_updates &&
 | 
	
		
			
				|  |  | +  if (s->incoming_sopb != NULL &&
 | 
	
		
			
				|  |  |        s->incoming_window <
 | 
	
		
			
				|  |  |            t->settings[LOCAL_SETTINGS]
 | 
	
		
			
				|  |  |                       [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
 | 
	
	
		
			
				|  | @@ -1160,21 +1240,6 @@ static void maybe_join_window_updates(transport *t, stream *s) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
 | 
	
		
			
				|  |  | -                                     int allow) {
 | 
	
		
			
				|  |  | -  transport *t = (transport *)tp;
 | 
	
		
			
				|  |  | -  stream *s = (stream *)sp;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  lock(t);
 | 
	
		
			
				|  |  | -  s->allow_window_updates = allow;
 | 
	
		
			
				|  |  | -  if (allow) {
 | 
	
		
			
				|  |  | -    maybe_join_window_updates(t, s);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    stream_list_remove(t, s, WINDOW_UPDATE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  unlock(t);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
 | 
	
		
			
				|  |  |    if (t->incoming_frame_size > t->incoming_window) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
 | 
	
	
		
			
				|  | @@ -1248,7 +1313,7 @@ static int init_data_frame_parser(transport *t) {
 | 
	
		
			
				|  |  |      case GRPC_CHTTP2_STREAM_ERROR:
 | 
	
		
			
				|  |  |        cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
 | 
	
		
			
				|  |  |                                GRPC_CHTTP2_INTERNAL_ERROR),
 | 
	
		
			
				|  |  | -                    GRPC_CHTTP2_INTERNAL_ERROR, 1);
 | 
	
		
			
				|  |  | +                    GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
 | 
	
		
			
				|  |  |        return init_skip_frame(t, 0);
 | 
	
		
			
				|  |  |      case GRPC_CHTTP2_CONNECTION_ERROR:
 | 
	
		
			
				|  |  |        drop_connection(t);
 | 
	
	
		
			
				|  | @@ -1267,11 +1332,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_ASSERT(s);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
 | 
	
		
			
				|  |  | -                     grpc_mdstr_as_c_string(md->key),
 | 
	
		
			
				|  |  | -                     grpc_mdstr_as_c_string(md->value)));
 | 
	
		
			
				|  |  | +  IF_TRACING(gpr_log(
 | 
	
		
			
				|  |  | +      GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
 | 
	
		
			
				|  |  | +      grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  stream_list_join(t, s, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  |    if (md->key == t->str_grpc_timeout) {
 | 
	
		
			
				|  |  |      gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
 | 
	
		
			
				|  |  |      if (!cached_timeout) {
 | 
	
	
		
			
				|  | @@ -1290,6 +1354,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      add_incoming_metadata(t, s, md);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  maybe_finish_read(t, s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static int init_header_frame_parser(transport *t, int is_continuation) {
 | 
	
	
		
			
				|  | @@ -1327,7 +1392,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  |                "ignoring out of order new stream request on server; last stream "
 | 
	
		
			
				|  |  |                "id=%d, new stream id=%d",
 | 
	
		
			
				|  |  | -              t->last_incoming_stream_id, t->incoming_stream);
 | 
	
		
			
				|  |  | +              t->last_incoming_stream_id, t->incoming_stream_id);
 | 
	
		
			
				|  |  | +      return init_skip_frame(t, 1);
 | 
	
		
			
				|  |  | +    } else if ((t->incoming_stream_id & 1) == 0) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id);
 | 
	
		
			
				|  |  |        return init_skip_frame(t, 1);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      t->incoming_stream = NULL;
 | 
	
	
		
			
				|  | @@ -1464,33 +1532,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
 | 
	
		
			
				|  |  |    return window + window_update < MAX_WINDOW;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void add_metadata_batch(transport *t, stream *s) {
 | 
	
		
			
				|  |  |    grpc_metadata_batch b;
 | 
	
		
			
				|  |  | -  size_t i;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  b.list.head = &s->incoming_metadata[0];
 | 
	
		
			
				|  |  | -  b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
 | 
	
		
			
				|  |  | +  b.list.head = NULL;
 | 
	
		
			
				|  |  | +  /* Store away the last element of the list, so that in patch_metadata_ops
 | 
	
		
			
				|  |  | +     we can reconstitute the list.
 | 
	
		
			
				|  |  | +     We can't do list building here as later incoming metadata may reallocate
 | 
	
		
			
				|  |  | +     the underlying array. */
 | 
	
		
			
				|  |  | +  b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
 | 
	
		
			
				|  |  |    b.garbage.head = b.garbage.tail = NULL;
 | 
	
		
			
				|  |  |    b.deadline = s->incoming_deadline;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  for (i = 1; i < s->incoming_metadata_count; i++) {
 | 
	
		
			
				|  |  | -    s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
 | 
	
		
			
				|  |  | -    s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  s->incoming_metadata[0].prev = NULL;
 | 
	
		
			
				|  |  | -  s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
 | 
	
		
			
				|  |  | +  s->incoming_deadline = gpr_inf_future;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
 | 
	
		
			
				|  |  | -  grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
 | 
	
		
			
				|  |  | -                            s->incoming_metadata);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* reset */
 | 
	
		
			
				|  |  | -  s->incoming_deadline = gpr_inf_future;
 | 
	
		
			
				|  |  | -  s->incoming_metadata = NULL;
 | 
	
		
			
				|  |  | -  s->incoming_metadata_count = 0;
 | 
	
		
			
				|  |  | -  s->incoming_metadata_capacity = 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
 | 
	
	
		
			
				|  | @@ -1501,14 +1556,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
 | 
	
		
			
				|  |  |      case GRPC_CHTTP2_PARSE_OK:
 | 
	
		
			
				|  |  |        if (st.end_of_stream) {
 | 
	
		
			
				|  |  |          t->incoming_stream->read_closed = 1;
 | 
	
		
			
				|  |  | -        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +        maybe_finish_read(t, t->incoming_stream);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        if (st.need_flush_reads) {
 | 
	
		
			
				|  |  | -        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +        maybe_finish_read(t, t->incoming_stream);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        if (st.metadata_boundary) {
 | 
	
		
			
				|  |  |          add_metadata_batch(t, t->incoming_stream);
 | 
	
		
			
				|  |  | -        stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
 | 
	
		
			
				|  |  | +        maybe_finish_read(t, t->incoming_stream);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        if (st.ack_settings) {
 | 
	
		
			
				|  |  |          gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
 | 
	
	
		
			
				|  | @@ -1545,11 +1600,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        if (st.initial_window_update) {
 | 
	
		
			
				|  |  |          for (i = 0; i < t->stream_map.count; i++) {
 | 
	
		
			
				|  |  | -          stream *s = (stream*)(t->stream_map.values[i]);
 | 
	
		
			
				|  |  | +          stream *s = (stream *)(t->stream_map.values[i]);
 | 
	
		
			
				|  |  |            int was_window_empty = s->outgoing_window <= 0;
 | 
	
		
			
				|  |  |            s->outgoing_window += st.initial_window_update;
 | 
	
		
			
				|  |  | -          if (was_window_empty && s->outgoing_window > 0 &&
 | 
	
		
			
				|  |  | -              s->outgoing_sopb.nops > 0) {
 | 
	
		
			
				|  |  | +          if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
 | 
	
		
			
				|  |  | +              s->outgoing_sopb->nops > 0) {
 | 
	
		
			
				|  |  |              stream_list_join(t, s, WRITABLE);
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -1563,12 +1618,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
 | 
	
		
			
				|  |  |              if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
 | 
	
		
			
				|  |  |                cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
 | 
	
		
			
				|  |  |                                        GRPC_CHTTP2_FLOW_CONTROL_ERROR),
 | 
	
		
			
				|  |  | -                            GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
 | 
	
		
			
				|  |  | +                            GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
 | 
	
		
			
				|  |  |              } else {
 | 
	
		
			
				|  |  |                s->outgoing_window += st.window_update;
 | 
	
		
			
				|  |  |                /* if this window update makes outgoing ops writable again,
 | 
	
		
			
				|  |  |                   flag that */
 | 
	
		
			
				|  |  | -              if (was_window_empty && s->outgoing_sopb.nops) {
 | 
	
		
			
				|  |  | +              if (was_window_empty && s->outgoing_sopb &&
 | 
	
		
			
				|  |  | +                  s->outgoing_sopb->nops > 0) {
 | 
	
		
			
				|  |  |                  stream_list_join(t, s, WRITABLE);
 | 
	
		
			
				|  |  |                }
 | 
	
		
			
				|  |  |              }
 | 
	
	
		
			
				|  | @@ -1830,53 +1886,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
 | 
	
		
			
				|  |  |    return GRPC_STREAM_OPEN;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static int prepare_callbacks(transport *t) {
 | 
	
		
			
				|  |  | -  stream *s;
 | 
	
		
			
				|  |  | -  int n = 0;
 | 
	
		
			
				|  |  | -  while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
 | 
	
		
			
				|  |  | -    int execute = 1;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
 | 
	
		
			
				|  |  | -    if (s->callback_state == GRPC_STREAM_CLOSED) {
 | 
	
		
			
				|  |  | -      remove_from_stream_map(t, s);
 | 
	
		
			
				|  |  | -      if (s->published_close) {
 | 
	
		
			
				|  |  | -        execute = 0;
 | 
	
		
			
				|  |  | -      } else if (s->incoming_metadata_count) {
 | 
	
		
			
				|  |  | -        add_metadata_batch(t, s);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      s->published_close = 1;
 | 
	
		
			
				|  |  | +static void patch_metadata_ops(stream *s) {
 | 
	
		
			
				|  |  | +  grpc_stream_op *ops = s->incoming_sopb->ops;
 | 
	
		
			
				|  |  | +  size_t nops = s->incoming_sopb->nops;
 | 
	
		
			
				|  |  | +  size_t i;
 | 
	
		
			
				|  |  | +  size_t j;
 | 
	
		
			
				|  |  | +  size_t mdidx = 0;
 | 
	
		
			
				|  |  | +  size_t last_mdidx;
 | 
	
		
			
				|  |  | +  int found_metadata = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* rework the array of metadata into a linked list, making use
 | 
	
		
			
				|  |  | +     of the breadcrumbs we left in metadata batches during 
 | 
	
		
			
				|  |  | +     add_metadata_batch */
 | 
	
		
			
				|  |  | +  for (i = 0; i < nops; i++) {
 | 
	
		
			
				|  |  | +    grpc_stream_op *op = &ops[i];
 | 
	
		
			
				|  |  | +    if (op->type != GRPC_OP_METADATA) continue;
 | 
	
		
			
				|  |  | +    found_metadata = 1;
 | 
	
		
			
				|  |  | +    /* we left a breadcrumb indicating where the end of this list is,
 | 
	
		
			
				|  |  | +       and since we add sequentially, we know from the end of the last
 | 
	
		
			
				|  |  | +       segment where this segment begins */
 | 
	
		
			
				|  |  | +    last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(last_mdidx > mdidx);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
 | 
	
		
			
				|  |  | +    /* turn the array into a doubly linked list */
 | 
	
		
			
				|  |  | +    op->data.metadata.list.head = &s->incoming_metadata[mdidx];
 | 
	
		
			
				|  |  | +    op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
 | 
	
		
			
				|  |  | +    for (j = mdidx + 1; j < last_mdidx; j++) {
 | 
	
		
			
				|  |  | +      s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
 | 
	
		
			
				|  |  | +      s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    s->incoming_metadata[mdidx].prev = NULL;
 | 
	
		
			
				|  |  | +    s->incoming_metadata[last_mdidx-1].next = NULL;
 | 
	
		
			
				|  |  | +    /* track where we're up to */
 | 
	
		
			
				|  |  | +    mdidx = last_mdidx;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (found_metadata) {
 | 
	
		
			
				|  |  | +    s->old_incoming_metadata = s->incoming_metadata;
 | 
	
		
			
				|  |  | +    if (mdidx != s->incoming_metadata_count) {
 | 
	
		
			
				|  |  | +      /* we have a partially read metadata batch still in incoming_metadata */
 | 
	
		
			
				|  |  | +      size_t new_count = s->incoming_metadata_count - mdidx;
 | 
	
		
			
				|  |  | +      size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
 | 
	
		
			
				|  |  | +      GPR_ASSERT(mdidx < s->incoming_metadata_count);
 | 
	
		
			
				|  |  | +      s->incoming_metadata = gpr_malloc(copy_bytes);
 | 
	
		
			
				|  |  | +      memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes);
 | 
	
		
			
				|  |  | +      s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      s->incoming_metadata = NULL;
 | 
	
		
			
				|  |  | +      s->incoming_metadata_count = 0;
 | 
	
		
			
				|  |  | +      s->incoming_metadata_capacity = 0;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
 | 
	
		
			
				|  |  | +static void finish_reads(transport *t) {
 | 
	
		
			
				|  |  | +  stream *s;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    if (execute) {
 | 
	
		
			
				|  |  | -      stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
 | 
	
		
			
				|  |  | -      n = 1;
 | 
	
		
			
				|  |  | +  while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
 | 
	
		
			
				|  |  | +    int publish = 0;
 | 
	
		
			
				|  |  | +    GPR_ASSERT(s->incoming_sopb);
 | 
	
		
			
				|  |  | +    *s->publish_state =
 | 
	
		
			
				|  |  | +        compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
 | 
	
		
			
				|  |  | +    if (*s->publish_state != s->published_state) {
 | 
	
		
			
				|  |  | +      s->published_state = *s->publish_state;
 | 
	
		
			
				|  |  | +      publish = 1;
 | 
	
		
			
				|  |  | +      if (s->published_state == GRPC_STREAM_CLOSED) {
 | 
	
		
			
				|  |  | +        remove_from_stream_map(t, s);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (s->parser.incoming_sopb.nops > 0) {
 | 
	
		
			
				|  |  | +      grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
 | 
	
		
			
				|  |  | +      publish = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (publish) {
 | 
	
		
			
				|  |  | +      if (s->incoming_metadata_count > 0) {
 | 
	
		
			
				|  |  | +        patch_metadata_ops(s);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      s->incoming_sopb = NULL;
 | 
	
		
			
				|  |  | +      schedule_cb(t, s->recv_done_closure, 1);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  return n;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void schedule_cb(transport *t, op_closure closure, int success) {
 | 
	
		
			
				|  |  | +  if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
 | 
	
		
			
				|  |  | +    t->pending_callbacks.capacity =
 | 
	
		
			
				|  |  | +        GPR_MAX(t->pending_callbacks.capacity * 2, 8);
 | 
	
		
			
				|  |  | +    t->pending_callbacks.callbacks =
 | 
	
		
			
				|  |  | +        gpr_realloc(t->pending_callbacks.callbacks,
 | 
	
		
			
				|  |  | +                    t->pending_callbacks.capacity *
 | 
	
		
			
				|  |  | +                        sizeof(*t->pending_callbacks.callbacks));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  closure.success = success;
 | 
	
		
			
				|  |  | +  t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static int prepare_callbacks(transport *t) {
 | 
	
		
			
				|  |  | +  op_closure_array temp = t->pending_callbacks;
 | 
	
		
			
				|  |  | +  t->pending_callbacks = t->executing_callbacks;
 | 
	
		
			
				|  |  | +  t->executing_callbacks = temp;
 | 
	
		
			
				|  |  | +  return t->executing_callbacks.count > 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
 | 
	
		
			
				|  |  | -  stream *s;
 | 
	
		
			
				|  |  | -  while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
 | 
	
		
			
				|  |  | -    size_t nops = s->callback_sopb.nops;
 | 
	
		
			
				|  |  | -    s->callback_sopb.nops = 0;
 | 
	
		
			
				|  |  | -    cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
 | 
	
		
			
				|  |  | -                   s->callback_sopb.ops, nops, s->callback_state);
 | 
	
		
			
				|  |  | +  size_t i;
 | 
	
		
			
				|  |  | +  for (i = 0; i < t->executing_callbacks.count; i++) {
 | 
	
		
			
				|  |  | +    op_closure c = t->executing_callbacks.callbacks[i];
 | 
	
		
			
				|  |  | +    c.cb(c.user_data, c.success);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  t->executing_callbacks.count = 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
 | 
	
		
			
				|  |  |    cb->closed(t->cb_user_data, &t->base);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
 | 
	
		
			
				|  |  | -  transport *t = (transport *)gt;
 | 
	
		
			
				|  |  | -  lock(t);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | + * POLLSET STUFF
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |    if (t->ep) {
 | 
	
		
			
				|  |  |      grpc_endpoint_add_to_pollset(t->ep, pollset);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
 | 
	
		
			
				|  |  | +  transport *t = (transport *)gt;
 | 
	
		
			
				|  |  | +  lock(t);
 | 
	
		
			
				|  |  | +  add_to_pollset_locked(t, pollset);
 | 
	
		
			
				|  |  |    unlock(t);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1885,9 +2023,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static const grpc_transport_vtable vtable = {
 | 
	
		
			
				|  |  | -    sizeof(stream), init_stream, send_batch, set_allow_window_updates,
 | 
	
		
			
				|  |  | -    add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
 | 
	
		
			
				|  |  | -    send_ping, destroy_transport};
 | 
	
		
			
				|  |  | +    sizeof(stream),  init_stream,    perform_op,
 | 
	
		
			
				|  |  | +    add_to_pollset,  destroy_stream, goaway,
 | 
	
		
			
				|  |  | +    close_transport, send_ping,      destroy_transport};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
 | 
	
		
			
				|  |  |                                    void *arg,
 |