|  | @@ -36,6 +36,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/channel/metadata_buffer.h"
 | 
	
		
			
				|  |  |  #include "src/core/iomgr/alarm.h"
 | 
	
		
			
				|  |  |  #include "src/core/support/string.h"
 | 
	
		
			
				|  |  | +#include "src/core/surface/byte_buffer_queue.h"
 | 
	
		
			
				|  |  |  #include "src/core/surface/channel.h"
 | 
	
		
			
				|  |  |  #include "src/core/surface/completion_queue.h"
 | 
	
		
			
				|  |  |  #include <grpc/support/alloc.h>
 | 
	
	
		
			
				|  | @@ -59,7 +60,7 @@ typedef struct {
 | 
	
		
			
				|  |  |    grpc_metadata_array trailing_md_in;
 | 
	
		
			
				|  |  |    grpc_recv_status status_in;
 | 
	
		
			
				|  |  |    size_t msg_in_read_idx;
 | 
	
		
			
				|  |  | -  grpc_byte_buffer_array msg_in;
 | 
	
		
			
				|  |  | +  grpc_byte_buffer *msg_in;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_uint8 got_status;
 | 
	
		
			
				|  |  |    void *finished_tag;
 | 
	
	
		
			
				|  | @@ -72,6 +73,7 @@ typedef enum {
 | 
	
		
			
				|  |  |    SEND_INITIAL_METADATA,
 | 
	
		
			
				|  |  |    SEND_MESSAGE,
 | 
	
		
			
				|  |  |    SEND_TRAILING_METADATA,
 | 
	
		
			
				|  |  | +  SEND_STATUS,
 | 
	
		
			
				|  |  |    SEND_FINISH
 | 
	
		
			
				|  |  |  } send_action;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -81,10 +83,24 @@ typedef struct {
 | 
	
		
			
				|  |  |    grpc_op_error status;
 | 
	
		
			
				|  |  |  } completed_request;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/* See reqinfo.set below for a description */
 | 
	
		
			
				|  |  | +#define REQSET_EMPTY 255
 | 
	
		
			
				|  |  | +#define REQSET_DONE  254
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* The state of an ioreq */
 | 
	
		
			
				|  |  |  typedef struct reqinfo {
 | 
	
		
			
				|  |  | -  req_state state;
 | 
	
		
			
				|  |  | +  /* User supplied parameters */
 | 
	
		
			
				|  |  |    grpc_ioreq_data data;
 | 
	
		
			
				|  |  | -  struct reqinfo *master;
 | 
	
		
			
				|  |  | +  /* In which set is this ioreq?
 | 
	
		
			
				|  |  | +     This value could be:
 | 
	
		
			
				|  |  | +       - an element of grpc_ioreq_op enumeration, in which case
 | 
	
		
			
				|  |  | +         it designates the master ioreq in a set of requests
 | 
	
		
			
				|  |  | +       - REQSET_EMPTY, in which case this reqinfo type has no application
 | 
	
		
			
				|  |  | +         request against it
 | 
	
		
			
				|  |  | +       - REQSET_DONE, in which case this reqinfo has been satisfied for
 | 
	
		
			
				|  |  | +         all time for this call, and no further use will be made of it */
 | 
	
		
			
				|  |  | +  gpr_uint8 set;
 | 
	
		
			
				|  |  | +  grpc_op_error status;
 | 
	
		
			
				|  |  |    grpc_ioreq_completion_func on_complete;
 | 
	
		
			
				|  |  |    void *user_data;
 | 
	
		
			
				|  |  |    gpr_uint32 need_mask;
 | 
	
	
		
			
				|  | @@ -122,10 +138,9 @@ struct grpc_call {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    reqinfo requests[GRPC_IOREQ_OP_COUNT];
 | 
	
		
			
				|  |  |    completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
 | 
	
		
			
				|  |  | -  grpc_byte_buffer_array buffered_messages;
 | 
	
		
			
				|  |  | +  grpc_byte_buffer_queue incoming_queue;
 | 
	
		
			
				|  |  |    grpc_metadata_array buffered_initial_metadata;
 | 
	
		
			
				|  |  |    grpc_metadata_array buffered_trailing_metadata;
 | 
	
		
			
				|  |  | -  size_t write_index;
 | 
	
		
			
				|  |  |    grpc_mdelem **owned_metadata;
 | 
	
		
			
				|  |  |    size_t owned_metadata_count;
 | 
	
		
			
				|  |  |    size_t owned_metadata_capacity;
 | 
	
	
		
			
				|  | @@ -159,6 +174,7 @@ static void enact_send_action(grpc_call *call, send_action sa);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_call *grpc_call_create(grpc_channel *channel,
 | 
	
		
			
				|  |  |                              const void *server_transport_data) {
 | 
	
		
			
				|  |  | +  size_t i;
 | 
	
		
			
				|  |  |    grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
 | 
	
		
			
				|  |  |    grpc_call *call =
 | 
	
		
			
				|  |  |        gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
 | 
	
	
		
			
				|  | @@ -166,8 +182,11 @@ grpc_call *grpc_call_create(grpc_channel *channel,
 | 
	
		
			
				|  |  |    gpr_mu_init(&call->mu);
 | 
	
		
			
				|  |  |    call->channel = channel;
 | 
	
		
			
				|  |  |    call->is_client = server_transport_data == NULL;
 | 
	
		
			
				|  |  | +  for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 | 
	
		
			
				|  |  | +    call->requests[i].set = REQSET_EMPTY;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    if (call->is_client) {
 | 
	
		
			
				|  |  | -    call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE;
 | 
	
		
			
				|  |  | +    call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    grpc_channel_internal_ref(channel);
 | 
	
		
			
				|  |  |    call->metadata_context = grpc_channel_get_metadata_context(channel);
 | 
	
	
		
			
				|  | @@ -189,15 +208,6 @@ legacy_state *get_legacy_state(grpc_call *call) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void destroy_message_array(grpc_byte_buffer_array *array,
 | 
	
		
			
				|  |  | -                                  size_t start_idx) {
 | 
	
		
			
				|  |  | -  size_t i;
 | 
	
		
			
				|  |  | -  for (i = start_idx; i < array->count; i++) {
 | 
	
		
			
				|  |  | -    grpc_byte_buffer_destroy(array->buffers[i]);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_free(array->buffers);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void destroy_call(void *call, int ignored_success) {
 | 
	
		
			
				|  |  |    size_t i, j;
 | 
	
		
			
				|  |  |    grpc_call *c = call;
 | 
	
	
		
			
				|  | @@ -213,7 +223,6 @@ static void destroy_call(void *call, int ignored_success) {
 | 
	
		
			
				|  |  |      grpc_mdelem_unref(c->owned_metadata[i]);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(c->owned_metadata);
 | 
	
		
			
				|  |  | -  destroy_message_array(&c->buffered_messages, 0);
 | 
	
		
			
				|  |  |    gpr_free(c->buffered_initial_metadata.metadata);
 | 
	
		
			
				|  |  |    gpr_free(c->buffered_trailing_metadata.metadata);
 | 
	
		
			
				|  |  |    if (c->legacy_state) {
 | 
	
	
		
			
				|  | @@ -226,8 +235,6 @@ static void destroy_call(void *call, int ignored_success) {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      gpr_free(c->legacy_state->initial_md_in.metadata);
 | 
	
		
			
				|  |  |      gpr_free(c->legacy_state->trailing_md_in.metadata);
 | 
	
		
			
				|  |  | -    destroy_message_array(&c->legacy_state->msg_in,
 | 
	
		
			
				|  |  | -                          c->legacy_state->msg_in_read_idx);
 | 
	
		
			
				|  |  |      gpr_free(c->legacy_state);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(c);
 | 
	
	
		
			
				|  | @@ -284,7 +291,7 @@ static void unlock(grpc_call *call) {
 | 
	
		
			
				|  |  |    int num_completed_requests = call->num_completed_requests;
 | 
	
		
			
				|  |  |    int need_more_data =
 | 
	
		
			
				|  |  |        call->need_more_data &&
 | 
	
		
			
				|  |  | -      call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state == REQ_DONE;
 | 
	
		
			
				|  |  | +      call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE;
 | 
	
		
			
				|  |  |    int i;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (need_more_data) {
 | 
	
	
		
			
				|  | @@ -321,124 +328,131 @@ static void unlock(grpc_call *call) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void get_final_status(grpc_call *call, grpc_status_code *code,
 | 
	
		
			
				|  |  | -                             const char **details) {
 | 
	
		
			
				|  |  | +static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
 | 
	
		
			
				|  |  |    int i;
 | 
	
		
			
				|  |  |    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
 | 
	
		
			
				|  |  |      if (call->status[i].set) {
 | 
	
		
			
				|  |  | -      *code = call->status[i].code;
 | 
	
		
			
				|  |  | -      *details = call->status[i].details
 | 
	
		
			
				|  |  | -                     ? grpc_mdstr_as_c_string(call->status[i].details)
 | 
	
		
			
				|  |  | -                     : NULL;
 | 
	
		
			
				|  |  | +      *args.code = call->status[i].code;
 | 
	
		
			
				|  |  | +      if (call->status[i].details) {
 | 
	
		
			
				|  |  | +        gpr_slice details = call->status[i].details->slice;
 | 
	
		
			
				|  |  | +        size_t len = GPR_SLICE_LENGTH(details);
 | 
	
		
			
				|  |  | +        if (len + 1 > *args.details_capacity) {
 | 
	
		
			
				|  |  | +          *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
 | 
	
		
			
				|  |  | +          *args.details = gpr_realloc(*args.details, *args.details_capacity);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
 | 
	
		
			
				|  |  | +        (*args.details)[len] = 0;
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        goto no_details;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |        return;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  *code = GRPC_STATUS_UNKNOWN;
 | 
	
		
			
				|  |  | -  *details = NULL;
 | 
	
		
			
				|  |  | +  *args.code = GRPC_STATUS_UNKNOWN;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +no_details:
 | 
	
		
			
				|  |  | +  if (0 == *args.details_capacity) {
 | 
	
		
			
				|  |  | +    *args.details_capacity = 8;
 | 
	
		
			
				|  |  | +    *args.details = gpr_malloc(*args.details_capacity);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  **args.details = 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
 | 
	
		
			
				|  |  |                              grpc_op_error status) {
 | 
	
		
			
				|  |  | -  reqinfo *master = call->requests[op].master;
 | 
	
		
			
				|  |  |    completed_request *cr;
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  | -  switch (call->requests[op].state) {
 | 
	
		
			
				|  |  | -    case REQ_INITIAL: /* not started yet */
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    case REQ_DONE: /* already finished */
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    case REQ_READY:
 | 
	
		
			
				|  |  | -      master->complete_mask |= 1 << op;
 | 
	
		
			
				|  |  | -      call->requests[op].state =
 | 
	
		
			
				|  |  | -          (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES)
 | 
	
		
			
				|  |  | -              ? REQ_INITIAL
 | 
	
		
			
				|  |  | -              : REQ_DONE;
 | 
	
		
			
				|  |  | -      if (master->complete_mask == master->need_mask ||
 | 
	
		
			
				|  |  | -          status == GRPC_OP_ERROR) {
 | 
	
		
			
				|  |  | -        if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
 | 
	
		
			
				|  |  | -          get_final_status(
 | 
	
		
			
				|  |  | -              call,
 | 
	
		
			
				|  |  | -              &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
 | 
	
		
			
				|  |  | -              &call->requests[GRPC_IOREQ_RECV_STATUS]
 | 
	
		
			
				|  |  | -                   .data.recv_status->details);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 | 
	
		
			
				|  |  | -          if (call->requests[i].master == master) {
 | 
	
		
			
				|  |  | -            call->requests[i].master = NULL;
 | 
	
		
			
				|  |  | +  if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) {
 | 
	
		
			
				|  |  | +    reqinfo *master = &call->requests[call->requests[op].set];
 | 
	
		
			
				|  |  | +    /* ioreq is live: we need to do something */
 | 
	
		
			
				|  |  | +    master->complete_mask |= 1 << op;
 | 
	
		
			
				|  |  | +    call->requests[op].set =
 | 
	
		
			
				|  |  | +        (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
 | 
	
		
			
				|  |  | +            ? REQSET_EMPTY
 | 
	
		
			
				|  |  | +            : REQSET_DONE;
 | 
	
		
			
				|  |  | +    if (master->complete_mask == master->need_mask ||
 | 
	
		
			
				|  |  | +        status == GRPC_OP_ERROR) {
 | 
	
		
			
				|  |  | +      if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
 | 
	
		
			
				|  |  | +        get_final_status(
 | 
	
		
			
				|  |  | +            call,
 | 
	
		
			
				|  |  | +            call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 | 
	
		
			
				|  |  | +        if (call->requests[i].set == op) {
 | 
	
		
			
				|  |  | +          if (call->requests[i].status != GRPC_OP_OK) {
 | 
	
		
			
				|  |  | +            status = GRPC_OP_ERROR;
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  | +          call->requests[i].set = REQSET_EMPTY;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        cr = &call->completed_requests[call->num_completed_requests++];
 | 
	
		
			
				|  |  | -        cr->status = status;
 | 
	
		
			
				|  |  | -        cr->on_complete = master->on_complete;
 | 
	
		
			
				|  |  | -        cr->user_data = master->user_data;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | +      cr = &call->completed_requests[call->num_completed_requests++];
 | 
	
		
			
				|  |  | +      cr->status = status;
 | 
	
		
			
				|  |  | +      cr->on_complete = master->on_complete;
 | 
	
		
			
				|  |  | +      cr->user_data = master->user_data;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void finish_write_step(void *pc, grpc_op_error error) {
 | 
	
		
			
				|  |  | -  grpc_call *call = pc;
 | 
	
		
			
				|  |  | +static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) {
 | 
	
		
			
				|  |  |    lock(call);
 | 
	
		
			
				|  |  | -  if (error == GRPC_OP_OK) {
 | 
	
		
			
				|  |  | -    if (call->write_index ==
 | 
	
		
			
				|  |  | -        call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) {
 | 
	
		
			
				|  |  | -      finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  finish_ioreq_op(call, op, error);
 | 
	
		
			
				|  |  |    call->sending = 0;
 | 
	
		
			
				|  |  |    unlock(call);
 | 
	
		
			
				|  |  |    grpc_call_internal_unref(call, 0);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void finish_write_step(void *pc, grpc_op_error error) {
 | 
	
		
			
				|  |  | +  finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void finish_finish_step(void *pc, grpc_op_error error) {
 | 
	
		
			
				|  |  | -  grpc_call *call = pc;
 | 
	
		
			
				|  |  | -  lock(call);
 | 
	
		
			
				|  |  | -  finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error);
 | 
	
		
			
				|  |  | -  call->sending = 0;
 | 
	
		
			
				|  |  | -  unlock(call);
 | 
	
		
			
				|  |  | -  grpc_call_internal_unref(call, 0);
 | 
	
		
			
				|  |  | +  finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void finish_start_step(void *pc, grpc_op_error error) {
 | 
	
		
			
				|  |  | -  grpc_call *call = pc;
 | 
	
		
			
				|  |  | -  lock(call);
 | 
	
		
			
				|  |  | -  finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
 | 
	
		
			
				|  |  | -  call->sending = 0;
 | 
	
		
			
				|  |  | -  unlock(call);
 | 
	
		
			
				|  |  | -  grpc_call_internal_unref(call, 0);
 | 
	
		
			
				|  |  | +  finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static send_action choose_send_action(grpc_call *call) {
 | 
	
		
			
				|  |  | -  switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) {
 | 
	
		
			
				|  |  | -    case REQ_INITIAL:
 | 
	
		
			
				|  |  | +  switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) {
 | 
	
		
			
				|  |  | +    case REQSET_EMPTY:
 | 
	
		
			
				|  |  |        return SEND_NOTHING;
 | 
	
		
			
				|  |  | -    case REQ_READY:
 | 
	
		
			
				|  |  | +    default:
 | 
	
		
			
				|  |  |        return SEND_INITIAL_METADATA;
 | 
	
		
			
				|  |  | -    case REQ_DONE:
 | 
	
		
			
				|  |  | +    case REQSET_DONE:
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) {
 | 
	
		
			
				|  |  | -    case REQ_INITIAL:
 | 
	
		
			
				|  |  | +  switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) {
 | 
	
		
			
				|  |  | +    case REQSET_EMPTY:
 | 
	
		
			
				|  |  |        return SEND_NOTHING;
 | 
	
		
			
				|  |  | -    case REQ_READY:
 | 
	
		
			
				|  |  | +    default:
 | 
	
		
			
				|  |  |        return SEND_MESSAGE;
 | 
	
		
			
				|  |  | -    case REQ_DONE:
 | 
	
		
			
				|  |  | +    case REQSET_DONE:
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) {
 | 
	
		
			
				|  |  | -    case REQ_INITIAL:
 | 
	
		
			
				|  |  | +  switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) {
 | 
	
		
			
				|  |  | +    case REQSET_EMPTY:
 | 
	
		
			
				|  |  |        return SEND_NOTHING;
 | 
	
		
			
				|  |  | -    case REQ_READY:
 | 
	
		
			
				|  |  | +    default:
 | 
	
		
			
				|  |  |        finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
 | 
	
		
			
				|  |  |        return SEND_TRAILING_METADATA;
 | 
	
		
			
				|  |  | -    case REQ_DONE:
 | 
	
		
			
				|  |  | +    case REQSET_DONE:
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) {
 | 
	
		
			
				|  |  | +  switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) {
 | 
	
		
			
				|  |  | +    case REQSET_EMPTY:
 | 
	
		
			
				|  |  | +      return SEND_NOTHING;
 | 
	
		
			
				|  |  |      default:
 | 
	
		
			
				|  |  | +      finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
 | 
	
		
			
				|  |  | +      return SEND_STATUS;
 | 
	
		
			
				|  |  | +    case REQSET_DONE:
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
 | 
	
		
			
				|  |  | +    case REQSET_EMPTY:
 | 
	
		
			
				|  |  | +    case REQSET_DONE:
 | 
	
		
			
				|  |  |        return SEND_NOTHING;
 | 
	
		
			
				|  |  | -    case REQ_READY:
 | 
	
		
			
				|  |  | +    default:
 | 
	
		
			
				|  |  |        return SEND_FINISH;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -458,6 +472,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
 | 
	
		
			
				|  |  |    grpc_ioreq_data data;
 | 
	
		
			
				|  |  |    grpc_call_op op;
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  | +  char status_str[GPR_LTOA_MIN_BUFSIZE];
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    switch (sa) {
 | 
	
		
			
				|  |  |      case SEND_NOTHING:
 | 
	
	
		
			
				|  | @@ -481,11 +496,11 @@ static void enact_send_action(grpc_call *call, send_action sa) {
 | 
	
		
			
				|  |  |        grpc_call_execute_op(call, &op);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      case SEND_MESSAGE:
 | 
	
		
			
				|  |  | -      data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data;
 | 
	
		
			
				|  |  | +      data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data;
 | 
	
		
			
				|  |  |        op.type = GRPC_SEND_MESSAGE;
 | 
	
		
			
				|  |  |        op.dir = GRPC_CALL_DOWN;
 | 
	
		
			
				|  |  |        op.flags = 0;
 | 
	
		
			
				|  |  | -      op.data.message = data.send_messages.messages[call->write_index++];
 | 
	
		
			
				|  |  | +      op.data.message = data.send_message;
 | 
	
		
			
				|  |  |        op.done_cb = finish_write_step;
 | 
	
		
			
				|  |  |        op.user_data = call;
 | 
	
		
			
				|  |  |        grpc_call_execute_op(call, &op);
 | 
	
	
		
			
				|  | @@ -504,28 +519,27 @@ static void enact_send_action(grpc_call *call, send_action sa) {
 | 
	
		
			
				|  |  |        unlock(call);
 | 
	
		
			
				|  |  |        grpc_call_internal_unref(call, 0);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  | -    case SEND_FINISH:
 | 
	
		
			
				|  |  | -      if (!call->is_client) {
 | 
	
		
			
				|  |  | -        /* TODO(ctiller): cache common status values */
 | 
	
		
			
				|  |  | -        char status_str[GPR_LTOA_MIN_BUFSIZE];
 | 
	
		
			
				|  |  | -        data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
 | 
	
		
			
				|  |  | -        gpr_ltoa(data.send_close.status, status_str);
 | 
	
		
			
				|  |  | -        send_metadata(
 | 
	
		
			
				|  |  | -            call,
 | 
	
		
			
				|  |  | -            grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | -                call->metadata_context,
 | 
	
		
			
				|  |  | -                grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
 | 
	
		
			
				|  |  | -                grpc_mdstr_from_string(call->metadata_context, status_str)));
 | 
	
		
			
				|  |  | -        if (data.send_close.details) {
 | 
	
		
			
				|  |  | -          send_metadata(call,
 | 
	
		
			
				|  |  | -                        grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | -                            call->metadata_context,
 | 
	
		
			
				|  |  | -                            grpc_mdstr_ref(
 | 
	
		
			
				|  |  | -                                grpc_channel_get_message_string(call->channel)),
 | 
	
		
			
				|  |  | -                            grpc_mdstr_from_string(call->metadata_context,
 | 
	
		
			
				|  |  | -                                                   data.send_close.details)));
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +    case SEND_STATUS:
 | 
	
		
			
				|  |  | +      /* TODO(ctiller): cache common status values */
 | 
	
		
			
				|  |  | +      data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
 | 
	
		
			
				|  |  | +      gpr_ltoa(data.send_status.code, status_str);
 | 
	
		
			
				|  |  | +      send_metadata(
 | 
	
		
			
				|  |  | +          call,
 | 
	
		
			
				|  |  | +          grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +              call->metadata_context,
 | 
	
		
			
				|  |  | +              grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
 | 
	
		
			
				|  |  | +              grpc_mdstr_from_string(call->metadata_context, status_str)));
 | 
	
		
			
				|  |  | +      if (data.send_status.details) {
 | 
	
		
			
				|  |  | +        send_metadata(call,
 | 
	
		
			
				|  |  | +                      grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +                          call->metadata_context,
 | 
	
		
			
				|  |  | +                          grpc_mdstr_ref(
 | 
	
		
			
				|  |  | +                              grpc_channel_get_message_string(call->channel)),
 | 
	
		
			
				|  |  | +                          grpc_mdstr_from_string(call->metadata_context,
 | 
	
		
			
				|  |  | +                                                 data.send_status.details)));
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    case SEND_FINISH:
 | 
	
		
			
				|  |  |        op.type = GRPC_SEND_FINISH;
 | 
	
		
			
				|  |  |        op.dir = GRPC_CALL_DOWN;
 | 
	
		
			
				|  |  |        op.flags = 0;
 | 
	
	
		
			
				|  | @@ -542,7 +556,7 @@ static grpc_call_error start_ioreq_error(grpc_call *call,
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  |    for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 | 
	
		
			
				|  |  |      if (mutated_ops & (1 << i)) {
 | 
	
		
			
				|  |  | -      call->requests[i].master = NULL;
 | 
	
		
			
				|  |  | +      call->requests[i].set = REQSET_EMPTY;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    return ret;
 | 
	
	
		
			
				|  | @@ -555,35 +569,32 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  |    gpr_uint32 have_ops = 0;
 | 
	
		
			
				|  |  |    grpc_ioreq_op op;
 | 
	
		
			
				|  |  | -  reqinfo *master = NULL;
 | 
	
		
			
				|  |  |    reqinfo *requests = call->requests;
 | 
	
		
			
				|  |  | +  reqinfo *master;
 | 
	
		
			
				|  |  |    grpc_ioreq_data data;
 | 
	
		
			
				|  |  | +  gpr_uint8 set;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (nreqs == 0) {
 | 
	
		
			
				|  |  | +    return GRPC_CALL_OK;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  set = reqs[0].op;
 | 
	
		
			
				|  |  | +  master = &requests[set];
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    for (i = 0; i < nreqs; i++) {
 | 
	
		
			
				|  |  |      op = reqs[i].op;
 | 
	
		
			
				|  |  | -    if (requests[op].master) {
 | 
	
		
			
				|  |  | +    if (requests[op].set < GRPC_IOREQ_OP_COUNT) {
 | 
	
		
			
				|  |  |        return start_ioreq_error(call, have_ops,
 | 
	
		
			
				|  |  |                                 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    switch (requests[op].state) {
 | 
	
		
			
				|  |  | -      case REQ_INITIAL:
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      case REQ_READY:
 | 
	
		
			
				|  |  | -        return start_ioreq_error(call, have_ops,
 | 
	
		
			
				|  |  | -                                 GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
 | 
	
		
			
				|  |  | -      case REQ_DONE:
 | 
	
		
			
				|  |  | -        return start_ioreq_error(call, have_ops,
 | 
	
		
			
				|  |  | -                                 GRPC_CALL_ERROR_ALREADY_INVOKED);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (master == NULL) {
 | 
	
		
			
				|  |  | -      master = &requests[op];
 | 
	
		
			
				|  |  | +    } else if (requests[op].set == REQSET_DONE) {
 | 
	
		
			
				|  |  | +      return start_ioreq_error(call, have_ops,
 | 
	
		
			
				|  |  | +                               GRPC_CALL_ERROR_ALREADY_INVOKED);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      have_ops |= 1 << op;
 | 
	
		
			
				|  |  |      data = reqs[i].data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    requests[op].state = REQ_READY;
 | 
	
		
			
				|  |  |      requests[op].data = data;
 | 
	
		
			
				|  |  | -    requests[op].master = master;
 | 
	
		
			
				|  |  | +    requests[op].set = set;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_ASSERT(master != NULL);
 | 
	
	
		
			
				|  | @@ -598,12 +609,10 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 | 
	
		
			
				|  |  |      switch (op) {
 | 
	
		
			
				|  |  |        default:
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  | -      case GRPC_IOREQ_RECV_MESSAGES:
 | 
	
		
			
				|  |  | -        data.recv_messages->count = 0;
 | 
	
		
			
				|  |  | -        if (call->buffered_messages.count > 0 || call->read_closed) {
 | 
	
		
			
				|  |  | -          SWAP(grpc_byte_buffer_array, *data.recv_messages,
 | 
	
		
			
				|  |  | -               call->buffered_messages);
 | 
	
		
			
				|  |  | -          finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
 | 
	
		
			
				|  |  | +      case GRPC_IOREQ_RECV_MESSAGE:
 | 
	
		
			
				|  |  | +        *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
 | 
	
		
			
				|  |  | +        if (*data.recv_message) {
 | 
	
		
			
				|  |  | +          finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
 | 
	
		
			
				|  |  |          } else {
 | 
	
		
			
				|  |  |            call->need_more_data = 1;
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -612,19 +621,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        case GRPC_IOREQ_RECV_STATUS:
 | 
	
		
			
				|  |  | -        if (call->stream_closed && call->buffered_messages.count == 0) {
 | 
	
		
			
				|  |  | +        if (call->stream_closed) {
 | 
	
		
			
				|  |  |            finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  | -      case GRPC_IOREQ_SEND_MESSAGES:
 | 
	
		
			
				|  |  | +      case GRPC_IOREQ_SEND_MESSAGE:
 | 
	
		
			
				|  |  |          if (call->stream_closed) {
 | 
	
		
			
				|  |  | -          finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
 | 
	
		
			
				|  |  | +          finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        call->write_index = 0;
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        case GRPC_IOREQ_SEND_CLOSE:
 | 
	
		
			
				|  |  | -        if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
 | 
	
		
			
				|  |  | -          requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
 | 
	
		
			
				|  |  | +        if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
 | 
	
		
			
				|  |  | +          requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        case GRPC_IOREQ_SEND_INITIAL_METADATA:
 | 
	
	
		
			
				|  | @@ -1088,8 +1096,13 @@ void grpc_call_recv_message(grpc_call_element *elem,
 | 
	
		
			
				|  |  |    grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 | 
	
		
			
				|  |  |    grpc_byte_buffer_array *dest;
 | 
	
		
			
				|  |  |    lock(call);
 | 
	
		
			
				|  |  | -  if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) {
 | 
	
		
			
				|  |  | -    dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages;
 | 
	
		
			
				|  |  | +  if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) {
 | 
	
		
			
				|  |  | +    if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) {
 | 
	
		
			
				|  |  | +      call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
 | 
	
		
			
				|  |  | +      finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      dest = &call->buffered_messages;
 | 
	
		
			
				|  |  |    }
 |