|  | @@ -62,96 +62,22 @@ typedef struct inproc_transport {
 | 
	
		
			
				|  |  |    struct inproc_stream *stream_list;
 | 
	
		
			
				|  |  |  } inproc_transport;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -typedef struct sb_list_entry {
 | 
	
		
			
				|  |  | -  grpc_slice_buffer sb;
 | 
	
		
			
				|  |  | -  struct sb_list_entry *next;
 | 
	
		
			
				|  |  | -} sb_list_entry;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Specialize grpc_byte_stream for our use case
 | 
	
		
			
				|  |  | -typedef struct {
 | 
	
		
			
				|  |  | -  grpc_byte_stream base;
 | 
	
		
			
				|  |  | -  sb_list_entry *le;
 | 
	
		
			
				|  |  | -  grpc_error *shutdown_error;
 | 
	
		
			
				|  |  | -} inproc_slice_byte_stream;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -typedef struct {
 | 
	
		
			
				|  |  | -  // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
 | 
	
		
			
				|  |  | -  sb_list_entry *head;
 | 
	
		
			
				|  |  | -  sb_list_entry *tail;
 | 
	
		
			
				|  |  | -} slice_buffer_list;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void slice_buffer_list_init(slice_buffer_list *l) {
 | 
	
		
			
				|  |  | -  l->head = NULL;
 | 
	
		
			
				|  |  | -  l->tail = NULL;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) {
 | 
	
		
			
				|  |  | -  grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb);
 | 
	
		
			
				|  |  | -  gpr_free(le);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                      slice_buffer_list *l) {
 | 
	
		
			
				|  |  | -  sb_list_entry *curr = l->head;
 | 
	
		
			
				|  |  | -  while (curr != NULL) {
 | 
	
		
			
				|  |  | -    sb_list_entry *le = curr;
 | 
	
		
			
				|  |  | -    curr = curr->next;
 | 
	
		
			
				|  |  | -    sb_list_entry_destroy(exec_ctx, le);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  l->head = NULL;
 | 
	
		
			
				|  |  | -  l->tail = NULL;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static bool slice_buffer_list_empty(slice_buffer_list *l) {
 | 
	
		
			
				|  |  | -  return l->head == NULL;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void slice_buffer_list_append_entry(slice_buffer_list *l,
 | 
	
		
			
				|  |  | -                                           sb_list_entry *next) {
 | 
	
		
			
				|  |  | -  next->next = NULL;
 | 
	
		
			
				|  |  | -  if (l->tail) {
 | 
	
		
			
				|  |  | -    l->tail->next = next;
 | 
	
		
			
				|  |  | -    l->tail = next;
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    l->head = next;
 | 
	
		
			
				|  |  | -    l->tail = next;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) {
 | 
	
		
			
				|  |  | -  sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next));
 | 
	
		
			
				|  |  | -  grpc_slice_buffer_init(&next->sb);
 | 
	
		
			
				|  |  | -  slice_buffer_list_append_entry(l, next);
 | 
	
		
			
				|  |  | -  return &next->sb;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) {
 | 
	
		
			
				|  |  | -  sb_list_entry *ret = l->head;
 | 
	
		
			
				|  |  | -  l->head = l->head->next;
 | 
	
		
			
				|  |  | -  if (l->head == NULL) {
 | 
	
		
			
				|  |  | -    l->tail = NULL;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return ret;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  typedef struct inproc_stream {
 | 
	
		
			
				|  |  |    inproc_transport *t;
 | 
	
		
			
				|  |  |    grpc_metadata_batch to_read_initial_md;
 | 
	
		
			
				|  |  |    uint32_t to_read_initial_md_flags;
 | 
	
		
			
				|  |  |    bool to_read_initial_md_filled;
 | 
	
		
			
				|  |  | -  slice_buffer_list to_read_message;
 | 
	
		
			
				|  |  |    grpc_metadata_batch to_read_trailing_md;
 | 
	
		
			
				|  |  |    bool to_read_trailing_md_filled;
 | 
	
		
			
				|  |  | -  bool reads_needed;
 | 
	
		
			
				|  |  | -  bool read_closure_scheduled;
 | 
	
		
			
				|  |  | -  grpc_closure read_closure;
 | 
	
		
			
				|  |  | +  bool ops_needed;
 | 
	
		
			
				|  |  | +  bool op_closure_scheduled;
 | 
	
		
			
				|  |  | +  grpc_closure op_closure;
 | 
	
		
			
				|  |  |    // Write buffer used only during gap at init time when client-side
 | 
	
		
			
				|  |  |    // stream is set up but server side stream is not yet set up
 | 
	
		
			
				|  |  |    grpc_metadata_batch write_buffer_initial_md;
 | 
	
		
			
				|  |  |    bool write_buffer_initial_md_filled;
 | 
	
		
			
				|  |  |    uint32_t write_buffer_initial_md_flags;
 | 
	
		
			
				|  |  |    grpc_millis write_buffer_deadline;
 | 
	
		
			
				|  |  | -  slice_buffer_list write_buffer_message;
 | 
	
		
			
				|  |  |    grpc_metadata_batch write_buffer_trailing_md;
 | 
	
		
			
				|  |  |    bool write_buffer_trailing_md_filled;
 | 
	
		
			
				|  |  |    grpc_error *write_buffer_cancel_error;
 | 
	
	
		
			
				|  | @@ -164,11 +90,15 @@ typedef struct inproc_stream {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_arena *arena;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op_batch *send_message_op;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op_batch *send_trailing_md_op;
 | 
	
		
			
				|  |  |    grpc_transport_stream_op_batch *recv_initial_md_op;
 | 
	
		
			
				|  |  |    grpc_transport_stream_op_batch *recv_message_op;
 | 
	
		
			
				|  |  |    grpc_transport_stream_op_batch *recv_trailing_md_op;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  inproc_slice_byte_stream recv_message_stream;
 | 
	
		
			
				|  |  | +  grpc_slice_buffer recv_message;
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_stream recv_stream;
 | 
	
		
			
				|  |  | +  bool recv_inited;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    bool initial_md_sent;
 | 
	
		
			
				|  |  |    bool trailing_md_sent;
 | 
	
	
		
			
				|  | @@ -187,54 +117,11 @@ typedef struct inproc_stream {
 | 
	
		
			
				|  |  |    struct inproc_stream *stream_list_next;
 | 
	
		
			
				|  |  |  } inproc_stream;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                          grpc_byte_stream *bs, size_t max,
 | 
	
		
			
				|  |  | -                                          grpc_closure *on_complete) {
 | 
	
		
			
				|  |  | -  // Because inproc transport always provides the entire message atomically,
 | 
	
		
			
				|  |  | -  // the byte stream always has data available when this function is called.
 | 
	
		
			
				|  |  | -  // Thus, this function always returns true (unlike other transports) and
 | 
	
		
			
				|  |  | -  // there is never any need to schedule a closure
 | 
	
		
			
				|  |  | -  return true;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                                 grpc_byte_stream *bs,
 | 
	
		
			
				|  |  | -                                                 grpc_slice *slice) {
 | 
	
		
			
				|  |  | -  inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
 | 
	
		
			
				|  |  | -  if (stream->shutdown_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    return GRPC_ERROR_REF(stream->shutdown_error);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  *slice = grpc_slice_buffer_take_first(&stream->le->sb);
 | 
	
		
			
				|  |  | -  return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                              grpc_byte_stream *bs,
 | 
	
		
			
				|  |  | -                                              grpc_error *error) {
 | 
	
		
			
				|  |  | -  inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(stream->shutdown_error);
 | 
	
		
			
				|  |  | -  stream->shutdown_error = error;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                             grpc_byte_stream *bs) {
 | 
	
		
			
				|  |  | -  inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
 | 
	
		
			
				|  |  | -  sb_list_entry_destroy(exec_ctx, stream->le);
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(stream->shutdown_error);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
 | 
	
		
			
				|  |  | -    inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
 | 
	
		
			
				|  |  | -    inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
 | 
	
		
			
				|  |  | -                                   sb_list_entry *le) {
 | 
	
		
			
				|  |  | -  s->base.length = (uint32_t)le->sb.length;
 | 
	
		
			
				|  |  | -  s->base.flags = 0;
 | 
	
		
			
				|  |  | -  s->base.vtable = &inproc_slice_byte_stream_vtable;
 | 
	
		
			
				|  |  | -  s->le = le;
 | 
	
		
			
				|  |  | -  s->shutdown_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | +static grpc_closure do_nothing_closure;
 | 
	
		
			
				|  |  | +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  | +                                 grpc_error *error);
 | 
	
		
			
				|  |  | +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                             grpc_error *error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void ref_transport(inproc_transport *t) {
 | 
	
		
			
				|  |  |    INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
 | 
	
	
		
			
				|  | @@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |  static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
 | 
	
		
			
				|  |  |    INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  slice_buffer_list_destroy(exec_ctx, &s->to_read_message);
 | 
	
		
			
				|  |  | -  slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->cancel_self_error);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(s->cancel_other_error);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  if (s->recv_inited) {
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    unref_transport(exec_ctx, s->t);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (s->closure_at_destroy) {
 | 
	
	
		
			
				|  | @@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | -                               grpc_error *error);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
 | 
	
		
			
				|  |  |                           bool is_initial) {
 | 
	
		
			
				|  |  |    for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
 | 
	
	
		
			
				|  | @@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |    s->write_buffer_initial_md_filled = false;
 | 
	
		
			
				|  |  |    grpc_metadata_batch_init(&s->write_buffer_trailing_md);
 | 
	
		
			
				|  |  |    s->write_buffer_trailing_md_filled = false;
 | 
	
		
			
				|  |  | -  slice_buffer_list_init(&s->to_read_message);
 | 
	
		
			
				|  |  | -  slice_buffer_list_init(&s->write_buffer_message);
 | 
	
		
			
				|  |  | -  s->reads_needed = false;
 | 
	
		
			
				|  |  | -  s->read_closure_scheduled = false;
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s,
 | 
	
		
			
				|  |  | +  s->ops_needed = false;
 | 
	
		
			
				|  |  | +  s->op_closure_scheduled = false;
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |    s->t = t;
 | 
	
		
			
				|  |  |    s->closure_at_destroy = NULL;
 | 
	
	
		
			
				|  | @@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |        grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
 | 
	
		
			
				|  |  |        cs->write_buffer_initial_md_filled = false;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    while (!slice_buffer_list_empty(&cs->write_buffer_message)) {
 | 
	
		
			
				|  |  | -      slice_buffer_list_append_entry(
 | 
	
		
			
				|  |  | -          &s->to_read_message,
 | 
	
		
			
				|  |  | -          slice_buffer_list_pophead(&cs->write_buffer_message));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  |      if (cs->write_buffer_trailing_md_filled) {
 | 
	
		
			
				|  |  |        fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
 | 
	
		
			
				|  |  |                         &s->to_read_trailing_md, NULL,
 | 
	
	
		
			
				|  | @@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +// Call the on_complete closure associated with this stream_op_batch if
 | 
	
		
			
				|  |  | +// this stream_op_batch is only one of the pending operations for this
 | 
	
		
			
				|  |  | +// stream. This is called when one of the pending operations for the stream
 | 
	
		
			
				|  |  | +// is done and about to be NULLed out
 | 
	
		
			
				|  |  | +static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                         inproc_stream *s, grpc_error *error,
 | 
	
		
			
				|  |  | +                                         grpc_transport_stream_op_batch *op,
 | 
	
		
			
				|  |  | +                                         const char *msg) {
 | 
	
		
			
				|  |  | +  int is_sm = (int)(op == s->send_message_op);
 | 
	
		
			
				|  |  | +  int is_stm = (int)(op == s->send_trailing_md_op);
 | 
	
		
			
				|  |  | +  int is_rim = (int)(op == s->recv_initial_md_op);
 | 
	
		
			
				|  |  | +  int is_rm = (int)(op == s->recv_message_op);
 | 
	
		
			
				|  |  | +  int is_rtm = (int)(op == s->recv_trailing_md_op);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
 | 
	
		
			
				|  |  | +    INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                             inproc_stream *s,
 | 
	
		
			
				|  |  | +                                             grpc_error *error) {
 | 
	
		
			
				|  |  | +  if (s && s->ops_needed && !s->op_closure_scheduled) {
 | 
	
		
			
				|  |  | +    GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    s->op_closure_scheduled = true;
 | 
	
		
			
				|  |  | +    s->ops_needed = false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |                                 grpc_error *error) {
 | 
	
		
			
				|  |  | -  INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s);
 | 
	
		
			
				|  |  | +  INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
 | 
	
		
			
				|  |  |    // If we're failing this side, we need to make sure that
 | 
	
		
			
				|  |  |    // we also send or have already sent trailing metadata
 | 
	
		
			
				|  |  |    if (!s->trailing_md_sent) {
 | 
	
	
		
			
				|  | @@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |        if (other->cancel_other_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          other->cancel_other_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      if (other->reads_needed) {
 | 
	
		
			
				|  |  | -        if (!other->read_closure_scheduled) {
 | 
	
		
			
				|  |  | -          GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
 | 
	
		
			
				|  |  | -                             GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -          other->read_closure_scheduled = true;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        other->reads_needed = false;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +      maybe_schedule_op_closure_locked(exec_ctx, other, error);
 | 
	
		
			
				|  |  |      } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |                         err);
 | 
	
		
			
				|  |  |      // Last use of err so no need to REF and then UNREF it
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    if ((s->recv_initial_md_op != s->recv_message_op) &&
 | 
	
		
			
				|  |  | -        (s->recv_initial_md_op != s->recv_trailing_md_op)) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "fail_helper %p scheduling initial-metadata-on-complete %p",
 | 
	
		
			
				|  |  | -                 error, s);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, error, s->recv_initial_md_op,
 | 
	
		
			
				|  |  | +        "fail_helper scheduling recv-initial-metadata-on-complete");
 | 
	
		
			
				|  |  |      s->recv_initial_md_op = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (s->recv_message_op) {
 | 
	
	
		
			
				|  | @@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  |          exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
 | 
	
		
			
				|  |  |          GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    if (s->recv_message_op != s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p",
 | 
	
		
			
				|  |  | -                 s, error);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, error, s->recv_message_op,
 | 
	
		
			
				|  |  | +        "fail_helper scheduling recv-message-on-complete");
 | 
	
		
			
				|  |  |      s->recv_message_op = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  if (s->send_message_op) {
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, error, s->send_message_op,
 | 
	
		
			
				|  |  | +        "fail_helper scheduling send-message-on-complete");
 | 
	
		
			
				|  |  | +    s->send_message_op = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (s->send_trailing_md_op) {
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, error, s->send_trailing_md_op,
 | 
	
		
			
				|  |  | +        "fail_helper scheduling send-trailng-md-on-complete");
 | 
	
		
			
				|  |  | +    s->send_trailing_md_op = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    if (s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  |      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  |                 "fail_helper %p scheduling trailing-md-on-complete %p", s,
 | 
	
		
			
				|  |  |                 error);
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  | -                       GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, error, s->recv_trailing_md_op,
 | 
	
		
			
				|  |  | +        "fail_helper scheduling recv-trailing-metadata-on-complete");
 | 
	
		
			
				|  |  |      s->recv_trailing_md_op = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
 | 
	
	
		
			
				|  | @@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | -                               grpc_error *error) {
 | 
	
		
			
				|  |  | +static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                    inproc_stream *sender,
 | 
	
		
			
				|  |  | +                                    inproc_stream *receiver) {
 | 
	
		
			
				|  |  | +  size_t remaining =
 | 
	
		
			
				|  |  | +      sender->send_message_op->payload->send_message.send_message->length;
 | 
	
		
			
				|  |  | +  if (receiver->recv_inited) {
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_init(&receiver->recv_message);
 | 
	
		
			
				|  |  | +  receiver->recv_inited = true;
 | 
	
		
			
				|  |  | +  do {
 | 
	
		
			
				|  |  | +    grpc_slice message_slice;
 | 
	
		
			
				|  |  | +    grpc_closure unused;
 | 
	
		
			
				|  |  | +    GPR_ASSERT(grpc_byte_stream_next(
 | 
	
		
			
				|  |  | +        exec_ctx, sender->send_message_op->payload->send_message.send_message,
 | 
	
		
			
				|  |  | +        SIZE_MAX, &unused));
 | 
	
		
			
				|  |  | +    grpc_error *error = grpc_byte_stream_pull(
 | 
	
		
			
				|  |  | +        exec_ctx, sender->send_message_op->payload->send_message.send_message,
 | 
	
		
			
				|  |  | +        &message_slice);
 | 
	
		
			
				|  |  | +    if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    GPR_ASSERT(error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    remaining -= GRPC_SLICE_LENGTH(message_slice);
 | 
	
		
			
				|  |  | +    grpc_slice_buffer_add(&receiver->recv_message, message_slice);
 | 
	
		
			
				|  |  | +  } while (remaining > 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
 | 
	
		
			
				|  |  | +                                0);
 | 
	
		
			
				|  |  | +  *receiver->recv_message_op->payload->recv_message.recv_message =
 | 
	
		
			
				|  |  | +      &receiver->recv_stream.base;
 | 
	
		
			
				|  |  | +  INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
 | 
	
		
			
				|  |  | +             receiver);
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | +      exec_ctx,
 | 
	
		
			
				|  |  | +      receiver->recv_message_op->payload->recv_message.recv_message_ready,
 | 
	
		
			
				|  |  | +      GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +  complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +      exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
 | 
	
		
			
				|  |  | +      "message_transfer scheduling sender on_complete");
 | 
	
		
			
				|  |  | +  complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +      exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
 | 
	
		
			
				|  |  | +      "message_transfer scheduling receiver on_complete");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  receiver->recv_message_op = NULL;
 | 
	
		
			
				|  |  | +  sender->send_message_op = NULL;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                             grpc_error *error) {
 | 
	
		
			
				|  |  |    // This function gets called when we have contents in the unprocessed reads
 | 
	
		
			
				|  |  |    // Get what we want based on our ops wanted
 | 
	
		
			
				|  |  |    // Schedule our appropriate closures
 | 
	
		
			
				|  |  | -  // and then return to reads_needed state if still needed
 | 
	
		
			
				|  |  | +  // and then return to ops_needed state if still needed
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    // Since this is a closure directly invoked by the combiner, it should not
 | 
	
		
			
				|  |  |    // unref the error parameter explicitly; the combiner will do that implicitly
 | 
	
	
		
			
				|  | @@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    bool needs_close = false;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg);
 | 
	
		
			
				|  |  | +  INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
 | 
	
		
			
				|  |  |    inproc_stream *s = (inproc_stream *)arg;
 | 
	
		
			
				|  |  |    gpr_mu *mu = &s->t->mu->mu;  // keep aside in case s gets closed
 | 
	
		
			
				|  |  |    gpr_mu_lock(mu);
 | 
	
		
			
				|  |  | -  s->read_closure_scheduled = false;
 | 
	
		
			
				|  |  | +  s->op_closure_scheduled = false;
 | 
	
		
			
				|  |  |    // cancellation takes precedence
 | 
	
		
			
				|  |  | +  inproc_stream *other = s->other_side;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (s->cancel_self_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
 | 
	
		
			
				|  |  |      goto done;
 | 
	
	
		
			
				|  | @@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |      goto done;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (s->recv_initial_md_op) {
 | 
	
		
			
				|  |  | -    if (!s->to_read_initial_md_filled) {
 | 
	
		
			
				|  |  | -      // We entered the state machine on some other kind of read even though
 | 
	
		
			
				|  |  | -      // we still haven't satisfied initial md . That's an error.
 | 
	
		
			
				|  |  | -      new_err =
 | 
	
		
			
				|  |  | -          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing");
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "read_state_machine %p scheduling on_complete errors for no "
 | 
	
		
			
				|  |  | -                 "initial md %p",
 | 
	
		
			
				|  |  | -                 s, new_err);
 | 
	
		
			
				|  |  | +  if (s->send_message_op && other) {
 | 
	
		
			
				|  |  | +    if (other->recv_message_op) {
 | 
	
		
			
				|  |  | +      message_transfer_locked(exec_ctx, s, other);
 | 
	
		
			
				|  |  | +      maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    } else if (!s->t->is_client &&
 | 
	
		
			
				|  |  | +               (s->trailing_md_sent || other->recv_trailing_md_op)) {
 | 
	
		
			
				|  |  | +      // A server send will never be matched if the client is waiting
 | 
	
		
			
				|  |  | +      // for trailing metadata already
 | 
	
		
			
				|  |  | +      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +          exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
 | 
	
		
			
				|  |  | +          "op_state_machine scheduling send-message-on-complete");
 | 
	
		
			
				|  |  | +      s->send_message_op = NULL;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Pause a send trailing metadata if there is still an outstanding
 | 
	
		
			
				|  |  | +  // send message unless we know that the send message will never get
 | 
	
		
			
				|  |  | +  // matched to a receive. This happens on the client if the server has
 | 
	
		
			
				|  |  | +  // already sent status.
 | 
	
		
			
				|  |  | +  if (s->send_trailing_md_op &&
 | 
	
		
			
				|  |  | +      (!s->send_message_op ||
 | 
	
		
			
				|  |  | +       (s->t->is_client &&
 | 
	
		
			
				|  |  | +        (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
 | 
	
		
			
				|  |  | +    grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
 | 
	
		
			
				|  |  | +                                                : &other->to_read_trailing_md;
 | 
	
		
			
				|  |  | +    bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
 | 
	
		
			
				|  |  | +                                       : &other->to_read_trailing_md_filled;
 | 
	
		
			
				|  |  | +    if (*destfilled || s->trailing_md_sent) {
 | 
	
		
			
				|  |  | +      // The buffer is already in use; that's an error!
 | 
	
		
			
				|  |  | +      INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
 | 
	
		
			
				|  |  | +      new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
 | 
	
		
			
				|  |  |        fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  |        goto done;
 | 
	
		
			
				|  |  | -    } else if (s->initial_md_recvd) {
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      if (other && !other->closed) {
 | 
	
		
			
				|  |  | +        fill_in_metadata(exec_ctx, s,
 | 
	
		
			
				|  |  | +                         s->send_trailing_md_op->payload->send_trailing_metadata
 | 
	
		
			
				|  |  | +                             .send_trailing_metadata,
 | 
	
		
			
				|  |  | +                         0, dest, NULL, destfilled);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      s->trailing_md_sent = true;
 | 
	
		
			
				|  |  | +      if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | +        INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | +                   "op_state_machine %p scheduling trailing-md-on-complete", s);
 | 
	
		
			
				|  |  | +        GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  | +                           GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +        s->recv_trailing_md_op = NULL;
 | 
	
		
			
				|  |  | +        needs_close = true;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
 | 
	
		
			
				|  |  | +        "op_state_machine scheduling send-trailing-metadata-on-complete");
 | 
	
		
			
				|  |  | +    s->send_trailing_md_op = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (s->recv_initial_md_op) {
 | 
	
		
			
				|  |  | +    if (s->initial_md_recvd) {
 | 
	
		
			
				|  |  |        new_err =
 | 
	
		
			
				|  |  |            GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
 | 
	
		
			
				|  |  |        INPROC_LOG(
 | 
	
		
			
				|  |  |            GPR_DEBUG,
 | 
	
		
			
				|  |  | -          "read_state_machine %p scheduling on_complete errors for already "
 | 
	
		
			
				|  |  | +          "op_state_machine %p scheduling on_complete errors for already "
 | 
	
		
			
				|  |  |            "recvd initial md %p",
 | 
	
		
			
				|  |  |            s, new_err);
 | 
	
		
			
				|  |  |        fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  |        goto done;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    s->initial_md_recvd = true;
 | 
	
		
			
				|  |  | -    new_err = fill_in_metadata(
 | 
	
		
			
				|  |  | -        exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
 | 
	
		
			
				|  |  | -        s->recv_initial_md_op->payload->recv_initial_metadata
 | 
	
		
			
				|  |  | -            .recv_initial_metadata,
 | 
	
		
			
				|  |  | -        s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL);
 | 
	
		
			
				|  |  | -    s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata
 | 
	
		
			
				|  |  | -        ->deadline = s->deadline;
 | 
	
		
			
				|  |  | -    grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
 | 
	
		
			
				|  |  | -    s->to_read_initial_md_filled = false;
 | 
	
		
			
				|  |  | -    INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -               "read_state_machine %p scheduling initial-metadata-ready %p", s,
 | 
	
		
			
				|  |  | -               new_err);
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_SCHED(exec_ctx,
 | 
	
		
			
				|  |  | -                       s->recv_initial_md_op->payload->recv_initial_metadata
 | 
	
		
			
				|  |  | -                           .recv_initial_metadata_ready,
 | 
	
		
			
				|  |  | -                       GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -    if ((s->recv_initial_md_op != s->recv_message_op) &&
 | 
	
		
			
				|  |  | -        (s->recv_initial_md_op != s->recv_trailing_md_op)) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(
 | 
	
		
			
				|  |  | -          GPR_DEBUG,
 | 
	
		
			
				|  |  | -          "read_state_machine %p scheduling initial-metadata-on-complete %p", s,
 | 
	
		
			
				|  |  | -          new_err);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    s->recv_initial_md_op = NULL;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    if (new_err != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    if (s->to_read_initial_md_filled) {
 | 
	
		
			
				|  |  | +      s->initial_md_recvd = true;
 | 
	
		
			
				|  |  | +      new_err = fill_in_metadata(
 | 
	
		
			
				|  |  | +          exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
 | 
	
		
			
				|  |  | +          s->recv_initial_md_op->payload->recv_initial_metadata
 | 
	
		
			
				|  |  | +              .recv_initial_metadata,
 | 
	
		
			
				|  |  | +          s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
 | 
	
		
			
				|  |  | +          NULL);
 | 
	
		
			
				|  |  | +      s->recv_initial_md_op->payload->recv_initial_metadata
 | 
	
		
			
				|  |  | +          .recv_initial_metadata->deadline = s->deadline;
 | 
	
		
			
				|  |  | +      grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
 | 
	
		
			
				|  |  | +      s->to_read_initial_md_filled = false;
 | 
	
		
			
				|  |  |        INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "read_state_machine %p scheduling on_complete errors2 %p", s,
 | 
	
		
			
				|  |  | +                 "op_state_machine %p scheduling initial-metadata-ready %p", s,
 | 
	
		
			
				|  |  |                   new_err);
 | 
	
		
			
				|  |  | -      fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -      goto done;
 | 
	
		
			
				|  |  | +      GRPC_CLOSURE_SCHED(exec_ctx,
 | 
	
		
			
				|  |  | +                         s->recv_initial_md_op->payload->recv_initial_metadata
 | 
	
		
			
				|  |  | +                             .recv_initial_metadata_ready,
 | 
	
		
			
				|  |  | +                         GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | +      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +          exec_ctx, s, new_err, s->recv_initial_md_op,
 | 
	
		
			
				|  |  | +          "op_state_machine scheduling recv-initial-metadata-on-complete");
 | 
	
		
			
				|  |  | +      s->recv_initial_md_op = NULL;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      if (new_err != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +        INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | +                   "op_state_machine %p scheduling on_complete errors2 %p", s,
 | 
	
		
			
				|  |  | +                   new_err);
 | 
	
		
			
				|  |  | +        fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | +        goto done;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (s->to_read_initial_md_filled) {
 | 
	
		
			
				|  |  | -    new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame");
 | 
	
		
			
				|  |  | -    fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -    goto done;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) {
 | 
	
		
			
				|  |  | -    inproc_slice_byte_stream_init(
 | 
	
		
			
				|  |  | -        &s->recv_message_stream,
 | 
	
		
			
				|  |  | -        slice_buffer_list_pophead(&s->to_read_message));
 | 
	
		
			
				|  |  | -    *s->recv_message_op->payload->recv_message.recv_message =
 | 
	
		
			
				|  |  | -        &s->recv_message_stream.base;
 | 
	
		
			
				|  |  | -    INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  | -        exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
 | 
	
		
			
				|  |  | -        GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -    if (s->recv_message_op != s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "read_state_machine %p scheduling message-on-complete %p", s,
 | 
	
		
			
				|  |  | -                 new_err);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | +  if (s->recv_message_op) {
 | 
	
		
			
				|  |  | +    if (other && other->send_message_op) {
 | 
	
		
			
				|  |  | +      message_transfer_locked(exec_ctx, other, s);
 | 
	
		
			
				|  |  | +      maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->recv_message_op = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (s->recv_trailing_md_op && s->t->is_client && other &&
 | 
	
		
			
				|  |  | +      other->send_message_op) {
 | 
	
		
			
				|  |  | +    maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (s->to_read_trailing_md_filled) {
 | 
	
		
			
				|  |  |      if (s->trailing_md_recvd) {
 | 
	
	
		
			
				|  | @@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |            GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
 | 
	
		
			
				|  |  |        INPROC_LOG(
 | 
	
		
			
				|  |  |            GPR_DEBUG,
 | 
	
		
			
				|  |  | -          "read_state_machine %p scheduling on_complete errors for already "
 | 
	
		
			
				|  |  | +          "op_state_machine %p scheduling on_complete errors for already "
 | 
	
		
			
				|  |  |            "recvd trailing md %p",
 | 
	
		
			
				|  |  |            s, new_err);
 | 
	
		
			
				|  |  |        fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
 | 
	
	
		
			
				|  | @@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |      if (s->recv_message_op != NULL) {
 | 
	
		
			
				|  |  |        // This message needs to be wrapped up because it will never be
 | 
	
		
			
				|  |  |        // satisfied
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready",
 | 
	
		
			
				|  |  | -                 s);
 | 
	
		
			
				|  |  | +      INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  |            exec_ctx,
 | 
	
		
			
				|  |  |            s->recv_message_op->payload->recv_message.recv_message_ready,
 | 
	
		
			
				|  |  |            GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -      if (s->recv_message_op != s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -        INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                   "read_state_machine %p scheduling message-on-complete %p", s,
 | 
	
		
			
				|  |  | -                   new_err);
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
 | 
	
		
			
				|  |  | -                           GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +          exec_ctx, s, new_err, s->recv_message_op,
 | 
	
		
			
				|  |  | +          "op_state_machine scheduling recv-message-on-complete");
 | 
	
		
			
				|  |  |        s->recv_message_op = NULL;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
 | 
	
		
			
				|  |  | +      // Nothing further will try to receive from this stream, so finish off
 | 
	
		
			
				|  |  | +      // any outstanding send_message op
 | 
	
		
			
				|  |  | +      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +          exec_ctx, s, new_err, s->send_message_op,
 | 
	
		
			
				|  |  | +          "op_state_machine scheduling send-message-on-complete");
 | 
	
		
			
				|  |  | +      s->send_message_op = NULL;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      if (s->recv_trailing_md_op != NULL) {
 | 
	
		
			
				|  |  |        // We wanted trailing metadata and we got it
 | 
	
		
			
				|  |  |        s->trailing_md_recvd = true;
 | 
	
	
		
			
				|  | @@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |        //    (If the server hasn't already sent its trailing md, it doesn't have
 | 
	
		
			
				|  |  |        //     a final status, so don't mark this op complete)
 | 
	
		
			
				|  |  |        if (s->t->is_client || s->trailing_md_sent) {
 | 
	
		
			
				|  |  | -        INPROC_LOG(
 | 
	
		
			
				|  |  | -            GPR_DEBUG,
 | 
	
		
			
				|  |  | -            "read_state_machine %p scheduling trailing-md-on-complete %p", s,
 | 
	
		
			
				|  |  | -            new_err);
 | 
	
		
			
				|  |  | +        INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | +                   "op_state_machine %p scheduling trailing-md-on-complete %p",
 | 
	
		
			
				|  |  | +                   s, new_err);
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  |                             GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  |          s->recv_trailing_md_op = NULL;
 | 
	
		
			
				|  |  |          needs_close = true;
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  |          INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                   "read_state_machine %p server needs to delay handling "
 | 
	
		
			
				|  |  | +                   "op_state_machine %p server needs to delay handling "
 | 
	
		
			
				|  |  |                     "trailing-md-on-complete %p",
 | 
	
		
			
				|  |  |                     s, new_err);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        INPROC_LOG(
 | 
	
		
			
				|  |  |            GPR_DEBUG,
 | 
	
		
			
				|  |  | -          "read_state_machine %p has trailing md but not yet waiting for it",
 | 
	
		
			
				|  |  | -          s);
 | 
	
		
			
				|  |  | +          "op_state_machine %p has trailing md but not yet waiting for it", s);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (s->trailing_md_recvd && s->recv_message_op) {
 | 
	
		
			
				|  |  |      // No further message will come on this stream, so finish off the
 | 
	
		
			
				|  |  |      // recv_message_op
 | 
	
		
			
				|  |  | -    INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
 | 
	
		
			
				|  |  | +    INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
 | 
	
		
			
				|  |  |      GRPC_CLOSURE_SCHED(
 | 
	
		
			
				|  |  |          exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
 | 
	
		
			
				|  |  |          GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -    if (s->recv_message_op != s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "read_state_machine %p scheduling message-on-complete %p", s,
 | 
	
		
			
				|  |  | -                 new_err);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, new_err, s->recv_message_op,
 | 
	
		
			
				|  |  | +        "op_state_machine scheduling recv-message-on-complete");
 | 
	
		
			
				|  |  |      s->recv_message_op = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (s->recv_message_op || s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | +  if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
 | 
	
		
			
				|  |  | +      s->send_message_op) {
 | 
	
		
			
				|  |  | +    // Nothing further will try to receive from this stream, so finish off
 | 
	
		
			
				|  |  | +    // any outstanding send_message op
 | 
	
		
			
				|  |  | +    complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +        exec_ctx, s, new_err, s->send_message_op,
 | 
	
		
			
				|  |  | +        "op_state_machine scheduling send-message-on-complete");
 | 
	
		
			
				|  |  | +    s->send_message_op = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
 | 
	
		
			
				|  |  | +      s->recv_message_op || s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  |      // Didn't get the item we wanted so we still need to get
 | 
	
		
			
				|  |  |      // rescheduled
 | 
	
		
			
				|  |  | -    INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s,
 | 
	
		
			
				|  |  | -               s->recv_message_op, s->recv_trailing_md_op);
 | 
	
		
			
				|  |  | -    s->reads_needed = true;
 | 
	
		
			
				|  |  | +    INPROC_LOG(
 | 
	
		
			
				|  |  | +        GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
 | 
	
		
			
				|  |  | +        s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
 | 
	
		
			
				|  |  | +        s->recv_message_op, s->recv_trailing_md_op);
 | 
	
		
			
				|  |  | +    s->ops_needed = true;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  done:
 | 
	
		
			
				|  |  |    if (needs_close) {
 | 
	
		
			
				|  |  | -    close_other_side_locked(exec_ctx, s, "read_state_machine");
 | 
	
		
			
				|  |  | +    close_other_side_locked(exec_ctx, s, "op_state_machine");
 | 
	
		
			
				|  |  |      close_stream_locked(exec_ctx, s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_mu_unlock(mu);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(new_err);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_closure do_nothing_closure;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |                                   grpc_error *error) {
 | 
	
		
			
				|  |  |    bool ret = false;  // was the cancel accepted
 | 
	
	
		
			
				|  | @@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |    if (s->cancel_self_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      ret = true;
 | 
	
		
			
				|  |  |      s->cancel_self_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  | -    if (s->reads_needed) {
 | 
	
		
			
				|  |  | -      if (!s->read_closure_scheduled) {
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure,
 | 
	
		
			
				|  |  | -                           GRPC_ERROR_REF(s->cancel_self_error));
 | 
	
		
			
				|  |  | -        s->read_closure_scheduled = true;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      s->reads_needed = false;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +    maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
 | 
	
		
			
				|  |  |      // Send trailing md to the other side indicating cancellation, even if we
 | 
	
		
			
				|  |  |      // already have
 | 
	
		
			
				|  |  |      s->trailing_md_sent = true;
 | 
	
	
		
			
				|  | @@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |        if (other->cancel_other_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      if (other->reads_needed) {
 | 
	
		
			
				|  |  | -        if (!other->read_closure_scheduled) {
 | 
	
		
			
				|  |  | -          GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
 | 
	
		
			
				|  |  | -                             GRPC_ERROR_REF(other->cancel_other_error));
 | 
	
		
			
				|  |  | -          other->read_closure_scheduled = true;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        other->reads_needed = false;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +      maybe_schedule_op_closure_locked(exec_ctx, other,
 | 
	
		
			
				|  |  | +                                       other->cancel_other_error);
 | 
	
		
			
				|  |  |      } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
 | 
	
		
			
				|  |  |      // couldn't complete that because we hadn't yet sent out trailing
 | 
	
		
			
				|  |  |      // md, now's the chance
 | 
	
		
			
				|  |  |      if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -      INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                 "cancel_stream %p scheduling trailing-md-on-complete %p", s,
 | 
	
		
			
				|  |  | -                 s->cancel_self_error);
 | 
	
		
			
				|  |  | -      GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  | -                         GRPC_ERROR_REF(s->cancel_self_error));
 | 
	
		
			
				|  |  | +      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  | +          exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
 | 
	
		
			
				|  |  | +          "cancel_stream scheduling trailing-md-on-complete");
 | 
	
		
			
				|  |  |        s->recv_trailing_md_op = NULL;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |      // already self-canceled so still give it an error
 | 
	
		
			
				|  |  |      error = GRPC_ERROR_REF(s->cancel_self_error);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s,
 | 
	
		
			
				|  |  | +    INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
 | 
	
		
			
				|  |  | +               s->t->is_client ? "client" : "server",
 | 
	
		
			
				|  |  |                 op->send_initial_metadata ? " send_initial_metadata" : "",
 | 
	
		
			
				|  |  |                 op->send_message ? " send_message" : "",
 | 
	
		
			
				|  |  |                 op->send_trailing_metadata ? " send_trailing_metadata" : "",
 | 
	
	
		
			
				|  | @@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    bool needs_close = false;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  inproc_stream *other = s->other_side;
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE &&
 | 
	
		
			
				|  |  | -      (op->send_initial_metadata || op->send_message ||
 | 
	
		
			
				|  |  | -       op->send_trailing_metadata)) {
 | 
	
		
			
				|  |  | -    inproc_stream *other = s->other_side;
 | 
	
		
			
				|  |  | +      (op->send_initial_metadata || op->send_trailing_metadata)) {
 | 
	
		
			
				|  |  |      if (s->t->is_closed) {
 | 
	
		
			
				|  |  |        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |            s->initial_md_sent = true;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (error == GRPC_ERROR_NONE && op->send_message) {
 | 
	
		
			
				|  |  | -      size_t remaining = op->payload->send_message.send_message->length;
 | 
	
		
			
				|  |  | -      grpc_slice_buffer *dest = slice_buffer_list_append(
 | 
	
		
			
				|  |  | -          (other == NULL) ? &s->write_buffer_message : &other->to_read_message);
 | 
	
		
			
				|  |  | -      do {
 | 
	
		
			
				|  |  | -        grpc_slice message_slice;
 | 
	
		
			
				|  |  | -        grpc_closure unused;
 | 
	
		
			
				|  |  | -        GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
 | 
	
		
			
				|  |  | -                                         op->payload->send_message.send_message,
 | 
	
		
			
				|  |  | -                                         SIZE_MAX, &unused));
 | 
	
		
			
				|  |  | -        error = grpc_byte_stream_pull(
 | 
	
		
			
				|  |  | -            exec_ctx, op->payload->send_message.send_message, &message_slice);
 | 
	
		
			
				|  |  | -        if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -          cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        GPR_ASSERT(error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -        remaining -= GRPC_SLICE_LENGTH(message_slice);
 | 
	
		
			
				|  |  | -        grpc_slice_buffer_add(dest, message_slice);
 | 
	
		
			
				|  |  | -      } while (remaining != 0);
 | 
	
		
			
				|  |  | -      grpc_byte_stream_destroy(exec_ctx,
 | 
	
		
			
				|  |  | -                               op->payload->send_message.send_message);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
 | 
	
		
			
				|  |  | -      grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
 | 
	
		
			
				|  |  | -                                                  : &other->to_read_trailing_md;
 | 
	
		
			
				|  |  | -      bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
 | 
	
		
			
				|  |  | -                                         : &other->to_read_trailing_md_filled;
 | 
	
		
			
				|  |  | -      if (*destfilled || s->trailing_md_sent) {
 | 
	
		
			
				|  |  | -        // The buffer is already in use; that's an error!
 | 
	
		
			
				|  |  | -        INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
 | 
	
		
			
				|  |  | -        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        if (!other->closed) {
 | 
	
		
			
				|  |  | -          fill_in_metadata(
 | 
	
		
			
				|  |  | -              exec_ctx, s,
 | 
	
		
			
				|  |  | -              op->payload->send_trailing_metadata.send_trailing_metadata, 0,
 | 
	
		
			
				|  |  | -              dest, NULL, destfilled);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        s->trailing_md_sent = true;
 | 
	
		
			
				|  |  | -        if (!s->t->is_client && s->trailing_md_recvd &&
 | 
	
		
			
				|  |  | -            s->recv_trailing_md_op) {
 | 
	
		
			
				|  |  | -          INPROC_LOG(GPR_DEBUG,
 | 
	
		
			
				|  |  | -                     "perform_stream_op %p scheduling trailing-md-on-complete",
 | 
	
		
			
				|  |  | -                     s);
 | 
	
		
			
				|  |  | -          GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  | -                             GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -          s->recv_trailing_md_op = NULL;
 | 
	
		
			
				|  |  | -          needs_close = true;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (other != NULL && other->reads_needed) {
 | 
	
		
			
				|  |  | -      if (!other->read_closure_scheduled) {
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error);
 | 
	
		
			
				|  |  | -        other->read_closure_scheduled = true;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      other->reads_needed = false;
 | 
	
		
			
				|  |  | +      maybe_schedule_op_closure_locked(exec_ctx, other, error);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE &&
 | 
	
		
			
				|  |  | -      (op->recv_initial_metadata || op->recv_message ||
 | 
	
		
			
				|  |  | +      (op->send_message || op->send_trailing_metadata ||
 | 
	
		
			
				|  |  | +       op->recv_initial_metadata || op->recv_message ||
 | 
	
		
			
				|  |  |         op->recv_trailing_metadata)) {
 | 
	
		
			
				|  |  | -    // If there are any reads, mark it so that the read closure will react to
 | 
	
		
			
				|  |  | -    // them
 | 
	
		
			
				|  |  | +    // Mark ops that need to be processed by the closure
 | 
	
		
			
				|  |  | +    if (op->send_message) {
 | 
	
		
			
				|  |  | +      s->send_message_op = op;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (op->send_trailing_metadata) {
 | 
	
		
			
				|  |  | +      s->send_trailing_md_op = op;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      if (op->recv_initial_metadata) {
 | 
	
		
			
				|  |  |        s->recv_initial_md_op = op;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      // We want to initiate the closure if:
 | 
	
		
			
				|  |  | -    // 1. There is initial metadata and something ready to take that
 | 
	
		
			
				|  |  | -    // 2. There is a message and something ready to take it
 | 
	
		
			
				|  |  | -    // 3. There is trailing metadata, even if nothing specifically wants
 | 
	
		
			
				|  |  | -    //    that because that can shut down the message as well
 | 
	
		
			
				|  |  | -    if ((s->to_read_initial_md_filled && op->recv_initial_metadata) ||
 | 
	
		
			
				|  |  | -        ((!slice_buffer_list_empty(&s->to_read_message) ||
 | 
	
		
			
				|  |  | -          s->trailing_md_recvd) &&
 | 
	
		
			
				|  |  | -         op->recv_message) ||
 | 
	
		
			
				|  |  | -        (s->to_read_trailing_md_filled)) {
 | 
	
		
			
				|  |  | -      if (!s->read_closure_scheduled) {
 | 
	
		
			
				|  |  | -        GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -        s->read_closure_scheduled = true;
 | 
	
		
			
				|  |  | +    // 1. We want to send a message and the other side wants to receive or end
 | 
	
		
			
				|  |  | +    // 2. We want to send trailing metadata and there isn't an unmatched send
 | 
	
		
			
				|  |  | +    // 3. We want initial metadata and the other side has sent it
 | 
	
		
			
				|  |  | +    // 4. We want to receive a message and there is a message ready
 | 
	
		
			
				|  |  | +    // 5. There is trailing metadata, even if nothing specifically wants
 | 
	
		
			
				|  |  | +    //    that because that can shut down the receive message as well
 | 
	
		
			
				|  |  | +    if ((op->send_message && other && ((other->recv_message_op != NULL) ||
 | 
	
		
			
				|  |  | +                                       (other->recv_trailing_md_op != NULL))) ||
 | 
	
		
			
				|  |  | +        (op->send_trailing_metadata && !op->send_message) ||
 | 
	
		
			
				|  |  | +        (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
 | 
	
		
			
				|  |  | +        (op->recv_message && (other && other->send_message_op != NULL)) ||
 | 
	
		
			
				|  |  | +        (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
 | 
	
		
			
				|  |  | +      if (!s->op_closure_scheduled) {
 | 
	
		
			
				|  |  | +        GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +        s->op_closure_scheduled = true;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  | -      s->reads_needed = true;
 | 
	
		
			
				|  |  | +      s->ops_needed = true;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -      // Schedule op's read closures that we didn't push to read state machine
 | 
	
		
			
				|  |  | +      // Schedule op's closures that we didn't push to op state machine
 | 
	
		
			
				|  |  |        if (op->recv_initial_metadata) {
 | 
	
		
			
				|  |  |          INPROC_LOG(
 | 
	
		
			
				|  |  |              GPR_DEBUG,
 |