|  | @@ -51,7 +51,8 @@ grpc_slice g_fake_auth_value;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct inproc_stream;
 | 
	
		
			
				|  |  |  bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
 | 
	
		
			
				|  |  | -void op_state_machine(void* arg, grpc_error* error);
 | 
	
		
			
				|  |  | +void maybe_process_ops_locked(inproc_stream* s, grpc_error* error);
 | 
	
		
			
				|  |  | +void op_state_machine_locked(inproc_stream* s, grpc_error* error);
 | 
	
		
			
				|  |  |  void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
 | 
	
		
			
				|  |  |                    bool is_initial);
 | 
	
		
			
				|  |  |  grpc_error* fill_in_metadata(inproc_stream* s,
 | 
	
	
		
			
				|  | @@ -130,8 +131,6 @@ struct inproc_stream {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      grpc_metadata_batch_init(&to_read_initial_md);
 | 
	
		
			
				|  |  |      grpc_metadata_batch_init(&to_read_trailing_md);
 | 
	
		
			
				|  |  | -    GRPC_CLOSURE_INIT(&op_closure, op_state_machine, this,
 | 
	
		
			
				|  |  | -                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |      grpc_metadata_batch_init(&write_buffer_initial_md);
 | 
	
		
			
				|  |  |      grpc_metadata_batch_init(&write_buffer_trailing_md);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -186,6 +185,7 @@ struct inproc_stream {
 | 
	
		
			
				|  |  |        if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          cancel_other_error = cs->write_buffer_cancel_error;
 | 
	
		
			
				|  |  |          cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +        maybe_process_ops_locked(this, cancel_other_error);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        gpr_mu_unlock(&t->mu->mu);
 | 
	
	
		
			
				|  | @@ -235,8 +235,6 @@ struct inproc_stream {
 | 
	
		
			
				|  |  |    grpc_metadata_batch to_read_trailing_md;
 | 
	
		
			
				|  |  |    bool to_read_trailing_md_filled = false;
 | 
	
		
			
				|  |  |    bool ops_needed = false;
 | 
	
		
			
				|  |  | -  bool op_closure_scheduled = false;
 | 
	
		
			
				|  |  | -  grpc_closure op_closure;
 | 
	
		
			
				|  |  |    // Write buffer used only during gap at init time when client-side
 | 
	
		
			
				|  |  |    // stream is set up but server side stream is not yet set up
 | 
	
		
			
				|  |  |    grpc_metadata_batch write_buffer_initial_md;
 | 
	
	
		
			
				|  | @@ -396,12 +394,10 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  | -  if (s && s->ops_needed && !s->op_closure_scheduled) {
 | 
	
		
			
				|  |  | -    grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
 | 
	
		
			
				|  |  | -                            GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | -    s->op_closure_scheduled = true;
 | 
	
		
			
				|  |  | +void maybe_process_ops_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  | +  if (s && (error != GRPC_ERROR_NONE || s->ops_needed)) {
 | 
	
		
			
				|  |  |      s->ops_needed = false;
 | 
	
		
			
				|  |  | +    op_state_machine_locked(s, error);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -429,7 +425,7 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  |        if (other->cancel_other_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          other->cancel_other_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      maybe_schedule_op_closure_locked(other, error);
 | 
	
		
			
				|  |  | +      maybe_process_ops_locked(other, error);
 | 
	
		
			
				|  |  |      } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -587,23 +583,17 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
 | 
	
		
			
				|  |  |    sender->send_message_op = nullptr;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void op_state_machine(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  |    // This function gets called when we have contents in the unprocessed reads
 | 
	
		
			
				|  |  |    // Get what we want based on our ops wanted
 | 
	
		
			
				|  |  |    // Schedule our appropriate closures
 | 
	
		
			
				|  |  |    // and then return to ops_needed state if still needed
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Since this is a closure directly invoked by the combiner, it should not
 | 
	
		
			
				|  |  | -  // unref the error parameter explicitly; the combiner will do that implicitly
 | 
	
		
			
				|  |  |    grpc_error* new_err = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    bool needs_close = false;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  INPROC_LOG(GPR_INFO, "op_state_machine %p", arg);
 | 
	
		
			
				|  |  | -  inproc_stream* s = static_cast<inproc_stream*>(arg);
 | 
	
		
			
				|  |  | -  gpr_mu* mu = &s->t->mu->mu;  // keep aside in case s gets closed
 | 
	
		
			
				|  |  | -  gpr_mu_lock(mu);
 | 
	
		
			
				|  |  | -  s->op_closure_scheduled = false;
 | 
	
		
			
				|  |  | +  INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
 | 
	
		
			
				|  |  |    // cancellation takes precedence
 | 
	
		
			
				|  |  |    inproc_stream* other = s->other_side;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -621,7 +611,7 @@ void op_state_machine(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    if (s->send_message_op && other) {
 | 
	
		
			
				|  |  |      if (other->recv_message_op) {
 | 
	
		
			
				|  |  |        message_transfer_locked(s, other);
 | 
	
		
			
				|  |  | -      maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      maybe_process_ops_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      } else if (!s->t->is_client && s->trailing_md_sent) {
 | 
	
		
			
				|  |  |        // A server send will never be matched if the server already sent status
 | 
	
		
			
				|  |  |        s->send_message_op->payload->send_message.send_message.reset();
 | 
	
	
		
			
				|  | @@ -679,7 +669,7 @@ void op_state_machine(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |          needs_close = true;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +    maybe_process_ops_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      complete_if_batch_end_locked(
 | 
	
		
			
				|  |  |          s, GRPC_ERROR_NONE, s->send_trailing_md_op,
 | 
	
		
			
				|  |  |          "op_state_machine scheduling send-trailing-metadata-on-complete");
 | 
	
	
		
			
				|  | @@ -741,7 +731,7 @@ void op_state_machine(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |    if (s->recv_message_op) {
 | 
	
		
			
				|  |  |      if (other && other->send_message_op) {
 | 
	
		
			
				|  |  |        message_transfer_locked(other, s);
 | 
	
		
			
				|  |  | -      maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +      maybe_process_ops_locked(other, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (s->to_read_trailing_md_filled) {
 | 
	
	
		
			
				|  | @@ -808,7 +798,7 @@ void op_state_machine(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  |                                  s->recv_trailing_md_op->on_complete,
 | 
	
		
			
				|  |  |                                  GRPC_ERROR_REF(new_err));
 | 
	
		
			
				|  |  |          s->recv_trailing_md_op = nullptr;
 | 
	
		
			
				|  |  | -        needs_close = true;
 | 
	
		
			
				|  |  | +        needs_close = s->trailing_md_sent;
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  |          INPROC_LOG(GPR_INFO,
 | 
	
		
			
				|  |  |                     "op_state_machine %p server needs to delay handling "
 | 
	
	
		
			
				|  | @@ -860,7 +850,6 @@ done:
 | 
	
		
			
				|  |  |      close_other_side_locked(s, "op_state_machine");
 | 
	
		
			
				|  |  |      close_stream_locked(s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(mu);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(new_err);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -870,7 +859,9 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  |    if (s->cancel_self_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      ret = true;
 | 
	
		
			
				|  |  |      s->cancel_self_error = GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  | -    maybe_schedule_op_closure_locked(s, s->cancel_self_error);
 | 
	
		
			
				|  |  | +    // Catch current value of other before it gets closed off
 | 
	
		
			
				|  |  | +    inproc_stream* other = s->other_side;
 | 
	
		
			
				|  |  | +    maybe_process_ops_locked(s, s->cancel_self_error);
 | 
	
		
			
				|  |  |      // Send trailing md to the other side indicating cancellation, even if we
 | 
	
		
			
				|  |  |      // already have
 | 
	
		
			
				|  |  |      s->trailing_md_sent = true;
 | 
	
	
		
			
				|  | @@ -878,7 +869,6 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  |      grpc_metadata_batch cancel_md;
 | 
	
		
			
				|  |  |      grpc_metadata_batch_init(&cancel_md);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    inproc_stream* other = s->other_side;
 | 
	
		
			
				|  |  |      grpc_metadata_batch* dest = (other == nullptr)
 | 
	
		
			
				|  |  |                                      ? &s->write_buffer_trailing_md
 | 
	
		
			
				|  |  |                                      : &other->to_read_trailing_md;
 | 
	
	
		
			
				|  | @@ -891,7 +881,7 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
 | 
	
		
			
				|  |  |        if (other->cancel_other_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      maybe_schedule_op_closure_locked(other, other->cancel_other_error);
 | 
	
		
			
				|  |  | +      maybe_process_ops_locked(other, other->cancel_other_error);
 | 
	
		
			
				|  |  |      } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -969,8 +959,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |                 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  bool needs_close = false;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    inproc_stream* other = s->other_side;
 | 
	
		
			
				|  |  |    if (error == GRPC_ERROR_NONE &&
 | 
	
		
			
				|  |  |        (op->send_initial_metadata || op->send_trailing_metadata)) {
 | 
	
	
		
			
				|  | @@ -991,7 +979,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |          INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
 | 
	
		
			
				|  |  |          error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  | -        if (!other || !other->closed) {
 | 
	
		
			
				|  |  | +        if (!s->other_side_closed) {
 | 
	
		
			
				|  |  |            fill_in_metadata(
 | 
	
		
			
				|  |  |                s, op->payload->send_initial_metadata.send_initial_metadata,
 | 
	
		
			
				|  |  |                op->payload->send_initial_metadata.send_initial_metadata_flags,
 | 
	
	
		
			
				|  | @@ -1005,7 +993,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |            s->initial_md_sent = true;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      maybe_schedule_op_closure_locked(other, error);
 | 
	
		
			
				|  |  | +      maybe_process_ops_locked(other, error);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1013,7 +1001,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |        (op->send_message || op->send_trailing_metadata ||
 | 
	
		
			
				|  |  |         op->recv_initial_metadata || op->recv_message ||
 | 
	
		
			
				|  |  |         op->recv_trailing_metadata)) {
 | 
	
		
			
				|  |  | -    // Mark ops that need to be processed by the closure
 | 
	
		
			
				|  |  | +    // Mark ops that need to be processed by the state machine
 | 
	
		
			
				|  |  |      if (op->send_message) {
 | 
	
		
			
				|  |  |        s->send_message_op = op;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -1030,7 +1018,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |        s->recv_trailing_md_op = op;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    // We want to initiate the closure if:
 | 
	
		
			
				|  |  | +    // We want to initiate the state machine if:
 | 
	
		
			
				|  |  |      // 1. We want to send a message and the other side wants to receive
 | 
	
		
			
				|  |  |      // 2. We want to send trailing metadata and there isn't an unmatched send
 | 
	
		
			
				|  |  |      //    or the other side wants trailing metadata
 | 
	
	
		
			
				|  | @@ -1044,11 +1032,7 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |          (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
 | 
	
		
			
				|  |  |          (op->recv_message && other && other->send_message_op != nullptr) ||
 | 
	
		
			
				|  |  |          (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
 | 
	
		
			
				|  |  | -      if (!s->op_closure_scheduled) {
 | 
	
		
			
				|  |  | -        grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
 | 
	
		
			
				|  |  | -                                GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -        s->op_closure_scheduled = true;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +      op_state_machine_locked(s, error);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        s->ops_needed = true;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -1103,10 +1087,6 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
 | 
	
		
			
				|  |  |                 error);
 | 
	
		
			
				|  |  |      grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (needs_close) {
 | 
	
		
			
				|  |  | -    close_other_side_locked(s, "perform_stream_op:other_side");
 | 
	
		
			
				|  |  | -    close_stream_locked(s);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |    gpr_mu_unlock(mu);
 | 
	
		
			
				|  |  |    GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 |