|  | @@ -153,8 +153,6 @@ struct grpc_call {
 | 
	
		
			
				|  |  |    bool destroy_called;
 | 
	
		
			
				|  |  |    /** flag indicating that cancellation is inherited */
 | 
	
		
			
				|  |  |    bool cancellation_is_inherited;
 | 
	
		
			
				|  |  | -  /** bitmask of live batches */
 | 
	
		
			
				|  |  | -  uint8_t used_batches;
 | 
	
		
			
				|  |  |    /** which ops are in-flight */
 | 
	
		
			
				|  |  |    bool sent_initial_metadata;
 | 
	
		
			
				|  |  |    bool sending_message;
 | 
	
	
		
			
				|  | @@ -1012,25 +1010,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
 | 
	
		
			
				|  |  |    return !(flags & invalid_positions);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static batch_control *allocate_batch_control(grpc_call *call) {
 | 
	
		
			
				|  |  | -  size_t i;
 | 
	
		
			
				|  |  | -  for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
 | 
	
		
			
				|  |  | -    if ((call->used_batches & (1 << i)) == 0) {
 | 
	
		
			
				|  |  | -      call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
 | 
	
		
			
				|  |  | -      return &call->active_batches[i];
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +static int batch_slot_for_op(grpc_op_type type) {
 | 
	
		
			
				|  |  | +  switch (type) {
 | 
	
		
			
				|  |  | +    case GRPC_OP_SEND_INITIAL_METADATA:
 | 
	
		
			
				|  |  | +      return 0;
 | 
	
		
			
				|  |  | +    case GRPC_OP_SEND_MESSAGE:
 | 
	
		
			
				|  |  | +      return 1;
 | 
	
		
			
				|  |  | +    case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
 | 
	
		
			
				|  |  | +    case GRPC_OP_SEND_STATUS_FROM_SERVER:
 | 
	
		
			
				|  |  | +      return 2;
 | 
	
		
			
				|  |  | +    case GRPC_OP_RECV_INITIAL_METADATA:
 | 
	
		
			
				|  |  | +      return 3;
 | 
	
		
			
				|  |  | +    case GRPC_OP_RECV_MESSAGE:
 | 
	
		
			
				|  |  | +      return 4;
 | 
	
		
			
				|  |  | +    case GRPC_OP_RECV_CLOSE_ON_SERVER:
 | 
	
		
			
				|  |  | +    case GRPC_OP_RECV_STATUS_ON_CLIENT:
 | 
	
		
			
				|  |  | +      return 5;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_UNREACHABLE_CODE(return 123456789);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static batch_control *allocate_batch_control(grpc_call *call,
 | 
	
		
			
				|  |  | +                                             const grpc_op *ops,
 | 
	
		
			
				|  |  | +                                             size_t num_ops) {
 | 
	
		
			
				|  |  | +  int slot = batch_slot_for_op(ops[0].op);
 | 
	
		
			
				|  |  | +  for (size_t i = 1; i < num_ops; i++) {
 | 
	
		
			
				|  |  | +    int op_slot = batch_slot_for_op(ops[i].op);
 | 
	
		
			
				|  |  | +    slot = GPR_MIN(slot, op_slot);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  batch_control *bctl = &call->active_batches[slot];
 | 
	
		
			
				|  |  | +  if (bctl->call != NULL) {
 | 
	
		
			
				|  |  | +    return NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  return NULL;
 | 
	
		
			
				|  |  | +  memset(bctl, 0, sizeof(*bctl));
 | 
	
		
			
				|  |  | +  bctl->call = call;
 | 
	
		
			
				|  |  | +  return bctl;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
 | 
	
		
			
				|  |  |                                      grpc_cq_completion *storage) {
 | 
	
		
			
				|  |  |    batch_control *bctl = user_data;
 | 
	
		
			
				|  |  |    grpc_call *call = bctl->call;
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&call->mu);
 | 
	
		
			
				|  |  | -  call->used_batches = (uint8_t)(
 | 
	
		
			
				|  |  | -      call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&call->mu);
 | 
	
		
			
				|  |  | +  bctl->call = NULL;
 | 
	
		
			
				|  |  |    GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1113,12 +1134,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (bctl->is_notify_tag_closure) {
 | 
	
		
			
				|  |  |      /* unrefs bctl->error */
 | 
	
		
			
				|  |  | +    bctl->call = NULL;
 | 
	
		
			
				|  |  |      grpc_closure_run(exec_ctx, bctl->notify_tag, error);
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&call->mu);
 | 
	
		
			
				|  |  | -    bctl->call->used_batches =
 | 
	
		
			
				|  |  | -        (uint8_t)(bctl->call->used_batches &
 | 
	
		
			
				|  |  | -                  ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&call->mu);
 | 
	
		
			
				|  |  |      GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      /* unrefs bctl->error */
 | 
	
	
		
			
				|  | @@ -1330,6 +1347,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
 | 
	
		
			
				|  |  |    finish_batch_step(exec_ctx, bctl);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
 | 
	
		
			
				|  |  | +                                  grpc_cq_completion *completion) {
 | 
	
		
			
				|  |  | +  gpr_free(completion);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                          grpc_call *call, const grpc_op *ops,
 | 
	
		
			
				|  |  |                                          size_t nops, void *notify_tag,
 | 
	
	
		
			
				|  | @@ -1344,32 +1366,34 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    grpc_metadata compression_md;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* TODO(ctiller): this feels like it could be made lock-free */
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&call->mu);
 | 
	
		
			
				|  |  | -  bctl = allocate_batch_control(call);
 | 
	
		
			
				|  |  | -  memset(bctl, 0, sizeof(*bctl));
 | 
	
		
			
				|  |  | -  bctl->call = call;
 | 
	
		
			
				|  |  | -  bctl->notify_tag = notify_tag;
 | 
	
		
			
				|  |  | -  bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  grpc_transport_stream_op *stream_op = &bctl->op;
 | 
	
		
			
				|  |  | -  memset(stream_op, 0, sizeof(*stream_op));
 | 
	
		
			
				|  |  | -  stream_op->covered_by_poller = true;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    if (nops == 0) {
 | 
	
		
			
				|  |  | -    GRPC_CALL_INTERNAL_REF(call, "completion");
 | 
	
		
			
				|  |  |      if (!is_notify_tag_closure) {
 | 
	
		
			
				|  |  |        grpc_cq_begin_op(call->cq, notify_tag);
 | 
	
		
			
				|  |  | +      grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
 | 
	
		
			
				|  |  | +                     free_no_op_completion, NULL,
 | 
	
		
			
				|  |  | +                     gpr_malloc(sizeof(grpc_cq_completion)));
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&call->mu);
 | 
	
		
			
				|  |  | -    post_batch_completion(exec_ctx, bctl);
 | 
	
		
			
				|  |  |      error = GRPC_CALL_OK;
 | 
	
		
			
				|  |  |      goto done;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /* TODO(ctiller): this feels like it could be made lock-free */
 | 
	
		
			
				|  |  | +  bctl = allocate_batch_control(call, ops, nops);
 | 
	
		
			
				|  |  | +  if (bctl == NULL) {
 | 
	
		
			
				|  |  | +    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  bctl->notify_tag = notify_tag;
 | 
	
		
			
				|  |  | +  bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&call->mu);
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op *stream_op = &bctl->op;
 | 
	
		
			
				|  |  | +  memset(stream_op, 0, sizeof(*stream_op));
 | 
	
		
			
				|  |  | +  stream_op->covered_by_poller = true;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* rewrite batch ops into a transport op */
 | 
	
		
			
				|  |  |    for (i = 0; i < nops; i++) {
 | 
	
		
			
				|  |  |      op = &ops[i];
 |