|  | @@ -46,617 +46,963 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/support/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/surface/channel.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/transport/metadata_batch.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/transport/static_metadata.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/transport/transport_impl.h"
 | 
	
		
			
				|  |  |  #include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #define GRPC_HEADER_SIZE_IN_BYTES 5
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Global flag that gets set with GRPC_TRACE env variable
 | 
	
		
			
				|  |  | -int grpc_cronet_trace = 1;
 | 
	
		
			
				|  |  | +#define CRONET_LOG(...)                          \
 | 
	
		
			
				|  |  | +  do {                                           \
 | 
	
		
			
				|  |  | +    if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
 | 
	
		
			
				|  |  | +  } while (0)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Cronet transport object
 | 
	
		
			
				|  |  | +/* TODO (makdharma): Hook up into the wider tracing mechanism */
 | 
	
		
			
				|  |  | +int grpc_cronet_trace = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +enum e_op_result {
 | 
	
		
			
				|  |  | +  ACTION_TAKEN_WITH_CALLBACK,
 | 
	
		
			
				|  |  | +  ACTION_TAKEN_NO_CALLBACK,
 | 
	
		
			
				|  |  | +  NO_ACTION_POSSIBLE
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +enum e_op_id {
 | 
	
		
			
				|  |  | +  OP_SEND_INITIAL_METADATA = 0,
 | 
	
		
			
				|  |  | +  OP_SEND_MESSAGE,
 | 
	
		
			
				|  |  | +  OP_SEND_TRAILING_METADATA,
 | 
	
		
			
				|  |  | +  OP_RECV_MESSAGE,
 | 
	
		
			
				|  |  | +  OP_RECV_INITIAL_METADATA,
 | 
	
		
			
				|  |  | +  OP_RECV_TRAILING_METADATA,
 | 
	
		
			
				|  |  | +  OP_CANCEL_ERROR,
 | 
	
		
			
				|  |  | +  OP_ON_COMPLETE,
 | 
	
		
			
				|  |  | +  OP_FAILED,
 | 
	
		
			
				|  |  | +  OP_SUCCEEDED,
 | 
	
		
			
				|  |  | +  OP_CANCELED,
 | 
	
		
			
				|  |  | +  OP_RECV_MESSAGE_AND_ON_COMPLETE,
 | 
	
		
			
				|  |  | +  OP_READ_REQ_MADE,
 | 
	
		
			
				|  |  | +  OP_NUM_OPS
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void on_request_headers_sent(cronet_bidirectional_stream *);
 | 
	
		
			
				|  |  | +static void on_response_headers_received(
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream *,
 | 
	
		
			
				|  |  | +    const cronet_bidirectional_stream_header_array *, const char *);
 | 
	
		
			
				|  |  | +static void on_write_completed(cronet_bidirectional_stream *, const char *);
 | 
	
		
			
				|  |  | +static void on_read_completed(cronet_bidirectional_stream *, char *, int);
 | 
	
		
			
				|  |  | +static void on_response_trailers_received(
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream *,
 | 
	
		
			
				|  |  | +    const cronet_bidirectional_stream_header_array *);
 | 
	
		
			
				|  |  | +static void on_succeeded(cronet_bidirectional_stream *);
 | 
	
		
			
				|  |  | +static void on_failed(cronet_bidirectional_stream *, int);
 | 
	
		
			
				|  |  | +static void on_canceled(cronet_bidirectional_stream *);
 | 
	
		
			
				|  |  | +static cronet_bidirectional_stream_callback cronet_callbacks = {
 | 
	
		
			
				|  |  | +    on_request_headers_sent,
 | 
	
		
			
				|  |  | +    on_response_headers_received,
 | 
	
		
			
				|  |  | +    on_read_completed,
 | 
	
		
			
				|  |  | +    on_write_completed,
 | 
	
		
			
				|  |  | +    on_response_trailers_received,
 | 
	
		
			
				|  |  | +    on_succeeded,
 | 
	
		
			
				|  |  | +    on_failed,
 | 
	
		
			
				|  |  | +    on_canceled};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* Cronet transport object */
 | 
	
		
			
				|  |  |  struct grpc_cronet_transport {
 | 
	
		
			
				|  |  |    grpc_transport base; /* must be first element in this structure */
 | 
	
		
			
				|  |  |    cronet_engine *engine;
 | 
	
		
			
				|  |  |    char *host;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  typedef struct grpc_cronet_transport grpc_cronet_transport;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -enum send_state {
 | 
	
		
			
				|  |  | -  CRONET_SEND_IDLE = 0,
 | 
	
		
			
				|  |  | -  CRONET_REQ_STARTED,
 | 
	
		
			
				|  |  | -  CRONET_SEND_HEADER,
 | 
	
		
			
				|  |  | -  CRONET_WRITE,
 | 
	
		
			
				|  |  | -  CRONET_WRITE_COMPLETED,
 | 
	
		
			
				|  |  | +/* TODO (makdharma): reorder structure for memory efficiency per
 | 
	
		
			
				|  |  | +   http://www.catb.org/esr/structure-packing/#_structure_reordering: */
 | 
	
		
			
				|  |  | +struct read_state {
 | 
	
		
			
				|  |  | +  /* vars to store data coming from server */
 | 
	
		
			
				|  |  | +  char *read_buffer;
 | 
	
		
			
				|  |  | +  bool length_field_received;
 | 
	
		
			
				|  |  | +  int received_bytes;
 | 
	
		
			
				|  |  | +  int remaining_bytes;
 | 
	
		
			
				|  |  | +  int length_field;
 | 
	
		
			
				|  |  | +  char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
 | 
	
		
			
				|  |  | +  char *payload_field;
 | 
	
		
			
				|  |  | +  bool read_stream_closed;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* vars for holding data destined for the application */
 | 
	
		
			
				|  |  | +  struct grpc_slice_buffer_stream sbs;
 | 
	
		
			
				|  |  | +  gpr_slice_buffer read_slice_buffer;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* vars for trailing metadata */
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_metadata_buffer trailing_metadata;
 | 
	
		
			
				|  |  | +  bool trailing_metadata_valid;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* vars for initial metadata */
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_metadata_buffer initial_metadata;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -enum recv_state {
 | 
	
		
			
				|  |  | -  CRONET_RECV_IDLE = 0,
 | 
	
		
			
				|  |  | -  CRONET_RECV_READ_LENGTH,
 | 
	
		
			
				|  |  | -  CRONET_RECV_READ_DATA,
 | 
	
		
			
				|  |  | -  CRONET_RECV_CLOSED,
 | 
	
		
			
				|  |  | +struct write_state {
 | 
	
		
			
				|  |  | +  char *write_buffer;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static const char *recv_state_name[] = {
 | 
	
		
			
				|  |  | -    "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
 | 
	
		
			
				|  |  | -    "CRONET_RECV_CLOSED"};
 | 
	
		
			
				|  |  | +/* track state of one stream op */
 | 
	
		
			
				|  |  | +struct op_state {
 | 
	
		
			
				|  |  | +  bool state_op_done[OP_NUM_OPS];
 | 
	
		
			
				|  |  | +  bool state_callback_received[OP_NUM_OPS];
 | 
	
		
			
				|  |  | +  /* data structure for storing data coming from server */
 | 
	
		
			
				|  |  | +  struct read_state rs;
 | 
	
		
			
				|  |  | +  /* data structure for storing data going to the server */
 | 
	
		
			
				|  |  | +  struct write_state ws;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Enum that identifies calling function.
 | 
	
		
			
				|  |  | -enum e_caller {
 | 
	
		
			
				|  |  | -  PERFORM_STREAM_OP,
 | 
	
		
			
				|  |  | -  ON_READ_COMPLETE,
 | 
	
		
			
				|  |  | -  ON_RESPONSE_HEADERS_RECEIVED,
 | 
	
		
			
				|  |  | -  ON_RESPONSE_TRAILERS_RECEIVED
 | 
	
		
			
				|  |  | +struct op_and_state {
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op op;
 | 
	
		
			
				|  |  | +  struct op_state state;
 | 
	
		
			
				|  |  | +  bool done;
 | 
	
		
			
				|  |  | +  struct stream_obj *s;      /* Pointer back to the stream object */
 | 
	
		
			
				|  |  | +  struct op_and_state *next; /* next op_and_state in the linked list */
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -enum callback_id {
 | 
	
		
			
				|  |  | -  CB_SEND_INITIAL_METADATA = 0,
 | 
	
		
			
				|  |  | -  CB_SEND_MESSAGE,
 | 
	
		
			
				|  |  | -  CB_SEND_TRAILING_METADATA,
 | 
	
		
			
				|  |  | -  CB_RECV_MESSAGE,
 | 
	
		
			
				|  |  | -  CB_RECV_INITIAL_METADATA,
 | 
	
		
			
				|  |  | -  CB_RECV_TRAILING_METADATA,
 | 
	
		
			
				|  |  | -  CB_NUM_CALLBACKS
 | 
	
		
			
				|  |  | +struct op_storage {
 | 
	
		
			
				|  |  | +  int num_pending_ops;
 | 
	
		
			
				|  |  | +  struct op_and_state *head;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct stream_obj {
 | 
	
		
			
				|  |  | -  // we store received bytes here as they trickle in.
 | 
	
		
			
				|  |  | -  gpr_slice_buffer write_slice_buffer;
 | 
	
		
			
				|  |  | +  struct op_and_state *oas;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op *curr_op;
 | 
	
		
			
				|  |  | +  grpc_cronet_transport curr_ct;
 | 
	
		
			
				|  |  | +  grpc_stream *curr_gs;
 | 
	
		
			
				|  |  |    cronet_bidirectional_stream *cbs;
 | 
	
		
			
				|  |  | -  gpr_slice slice;
 | 
	
		
			
				|  |  | -  gpr_slice_buffer read_slice_buffer;
 | 
	
		
			
				|  |  | -  struct grpc_slice_buffer_stream sbs;
 | 
	
		
			
				|  |  | -  char *read_buffer;
 | 
	
		
			
				|  |  | -  int remaining_read_bytes;
 | 
	
		
			
				|  |  | -  int total_read_bytes;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  char *write_buffer;
 | 
	
		
			
				|  |  | -  size_t write_buffer_size;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Hold the URL
 | 
	
		
			
				|  |  | -  char *url;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  bool response_headers_received;
 | 
	
		
			
				|  |  | -  bool read_requested;
 | 
	
		
			
				|  |  | -  bool response_trailers_received;
 | 
	
		
			
				|  |  | -  bool read_closed;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Recv message stuff
 | 
	
		
			
				|  |  | -  grpc_byte_buffer **recv_message;
 | 
	
		
			
				|  |  | -  // Initial metadata stuff
 | 
	
		
			
				|  |  | -  grpc_metadata_batch *recv_initial_metadata;
 | 
	
		
			
				|  |  | -  // Trailing metadata stuff
 | 
	
		
			
				|  |  | -  grpc_metadata_batch *recv_trailing_metadata;
 | 
	
		
			
				|  |  | -  grpc_chttp2_incoming_metadata_buffer imb;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // This mutex protects receive state machine execution
 | 
	
		
			
				|  |  | -  gpr_mu recv_mu;
 | 
	
		
			
				|  |  | -  // we can queue up up to 2 callbacks for each OP
 | 
	
		
			
				|  |  | -  grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // storage for header
 | 
	
		
			
				|  |  | -  cronet_bidirectional_stream_header *headers;
 | 
	
		
			
				|  |  | -  uint32_t num_headers;
 | 
	
		
			
				|  |  |    cronet_bidirectional_stream_header_array header_array;
 | 
	
		
			
				|  |  | -  // state tracking
 | 
	
		
			
				|  |  | -  enum recv_state cronet_recv_state;
 | 
	
		
			
				|  |  | -  enum send_state cronet_send_state;
 | 
	
		
			
				|  |  | -};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -typedef struct stream_obj stream_obj;
 | 
	
		
			
				|  |  | +  /* Stream level state. Some state will be tracked both at stream and stream_op
 | 
	
		
			
				|  |  | +   * level */
 | 
	
		
			
				|  |  | +  struct op_state state;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void next_send_step(stream_obj *s);
 | 
	
		
			
				|  |  | -static void next_recv_step(stream_obj *s, enum e_caller caller);
 | 
	
		
			
				|  |  | +  /* OP storage */
 | 
	
		
			
				|  |  | +  struct op_storage storage;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | -                                   grpc_stream *gs, grpc_pollset *pollset) {}
 | 
	
		
			
				|  |  | +  /* Mutex to protect storage */
 | 
	
		
			
				|  |  | +  gpr_mu mu;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +typedef struct stream_obj stream_obj;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                       grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  | -                                       grpc_pollset_set *pollset_set) {}
 | 
	
		
			
				|  |  | +static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                          struct op_and_state *oas);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void enqueue_callbacks(grpc_closure *callback_list[]) {
 | 
	
		
			
				|  |  | -  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  | -  if (callback_list[0]) {
 | 
	
		
			
				|  |  | -    grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | -    callback_list[0] = NULL;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (callback_list[1]) {
 | 
	
		
			
				|  |  | -    grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | -    callback_list[1] = NULL;
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Utility function to translate enum into string for printing
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static const char *op_result_string(enum e_op_result i) {
 | 
	
		
			
				|  |  | +  switch (i) {
 | 
	
		
			
				|  |  | +    case ACTION_TAKEN_WITH_CALLBACK:
 | 
	
		
			
				|  |  | +      return "ACTION_TAKEN_WITH_CALLBACK";
 | 
	
		
			
				|  |  | +    case ACTION_TAKEN_NO_CALLBACK:
 | 
	
		
			
				|  |  | +      return "ACTION_TAKEN_NO_CALLBACK";
 | 
	
		
			
				|  |  | +    case NO_ACTION_POSSIBLE:
 | 
	
		
			
				|  |  | +      return "NO_ACTION_POSSIBLE";
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_exec_ctx_finish(&exec_ctx);
 | 
	
		
			
				|  |  | +  GPR_UNREACHABLE_CODE(return "UNKNOWN");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void on_canceled(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "on_canceled %p", stream);
 | 
	
		
			
				|  |  | +static const char *op_id_string(enum e_op_id i) {
 | 
	
		
			
				|  |  | +  switch (i) {
 | 
	
		
			
				|  |  | +    case OP_SEND_INITIAL_METADATA:
 | 
	
		
			
				|  |  | +      return "OP_SEND_INITIAL_METADATA";
 | 
	
		
			
				|  |  | +    case OP_SEND_MESSAGE:
 | 
	
		
			
				|  |  | +      return "OP_SEND_MESSAGE";
 | 
	
		
			
				|  |  | +    case OP_SEND_TRAILING_METADATA:
 | 
	
		
			
				|  |  | +      return "OP_SEND_TRAILING_METADATA";
 | 
	
		
			
				|  |  | +    case OP_RECV_MESSAGE:
 | 
	
		
			
				|  |  | +      return "OP_RECV_MESSAGE";
 | 
	
		
			
				|  |  | +    case OP_RECV_INITIAL_METADATA:
 | 
	
		
			
				|  |  | +      return "OP_RECV_INITIAL_METADATA";
 | 
	
		
			
				|  |  | +    case OP_RECV_TRAILING_METADATA:
 | 
	
		
			
				|  |  | +      return "OP_RECV_TRAILING_METADATA";
 | 
	
		
			
				|  |  | +    case OP_CANCEL_ERROR:
 | 
	
		
			
				|  |  | +      return "OP_CANCEL_ERROR";
 | 
	
		
			
				|  |  | +    case OP_ON_COMPLETE:
 | 
	
		
			
				|  |  | +      return "OP_ON_COMPLETE";
 | 
	
		
			
				|  |  | +    case OP_FAILED:
 | 
	
		
			
				|  |  | +      return "OP_FAILED";
 | 
	
		
			
				|  |  | +    case OP_SUCCEEDED:
 | 
	
		
			
				|  |  | +      return "OP_SUCCEEDED";
 | 
	
		
			
				|  |  | +    case OP_CANCELED:
 | 
	
		
			
				|  |  | +      return "OP_CANCELED";
 | 
	
		
			
				|  |  | +    case OP_RECV_MESSAGE_AND_ON_COMPLETE:
 | 
	
		
			
				|  |  | +      return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
 | 
	
		
			
				|  |  | +    case OP_READ_REQ_MADE:
 | 
	
		
			
				|  |  | +      return "OP_READ_REQ_MADE";
 | 
	
		
			
				|  |  | +    case OP_NUM_OPS:
 | 
	
		
			
				|  |  | +      return "OP_NUM_OPS";
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Add a new stream op to op storage.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
 | 
	
		
			
				|  |  | +  struct op_storage *storage = &s->storage;
 | 
	
		
			
				|  |  | +  /* add new op at the beginning of the linked list. The memory is freed
 | 
	
		
			
				|  |  | +  in remove_from_storage */
 | 
	
		
			
				|  |  | +  struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
 | 
	
		
			
				|  |  | +  memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
 | 
	
		
			
				|  |  | +  memset(&new_op->state, 0, sizeof(new_op->state));
 | 
	
		
			
				|  |  | +  new_op->s = s;
 | 
	
		
			
				|  |  | +  new_op->done = false;
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&s->mu);
 | 
	
		
			
				|  |  | +  new_op->next = storage->head;
 | 
	
		
			
				|  |  | +  storage->head = new_op;
 | 
	
		
			
				|  |  | +  storage->num_pending_ops++;
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
 | 
	
		
			
				|  |  | +             storage->num_pending_ops);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&s->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void on_succeeded(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Traverse the linked list and delete op and free memory
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void remove_from_storage(struct stream_obj *s,
 | 
	
		
			
				|  |  | +                                struct op_and_state *oas) {
 | 
	
		
			
				|  |  | +  struct op_and_state *curr;
 | 
	
		
			
				|  |  | +  if (s->storage.head == NULL || oas == NULL) {
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void on_response_trailers_received(
 | 
	
		
			
				|  |  | -    cronet_bidirectional_stream *stream,
 | 
	
		
			
				|  |  | -    const cronet_bidirectional_stream_header_array *trailers) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
 | 
	
		
			
				|  |  | +  if (s->storage.head == oas) {
 | 
	
		
			
				|  |  | +    s->storage.head = oas->next;
 | 
	
		
			
				|  |  | +    gpr_free(oas);
 | 
	
		
			
				|  |  | +    s->storage.num_pending_ops--;
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
 | 
	
		
			
				|  |  | +               s->storage.num_pending_ops);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    for (curr = s->storage.head; curr != NULL; curr = curr->next) {
 | 
	
		
			
				|  |  | +      if (curr->next == oas) {
 | 
	
		
			
				|  |  | +        curr->next = oas->next;
 | 
	
		
			
				|  |  | +        s->storage.num_pending_ops--;
 | 
	
		
			
				|  |  | +        CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
 | 
	
		
			
				|  |  | +                   s->storage.num_pending_ops);
 | 
	
		
			
				|  |  | +        gpr_free(oas);
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      } else if (curr->next == NULL) {
 | 
	
		
			
				|  |  | +        CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  memset(&s->imb, 0, sizeof(s->imb));
 | 
	
		
			
				|  |  | -  grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
 | 
	
		
			
				|  |  | -  unsigned int i = 0;
 | 
	
		
			
				|  |  | -  for (i = 0; i < trailers->count; i++) {
 | 
	
		
			
				|  |  | -    grpc_chttp2_incoming_metadata_buffer_add(
 | 
	
		
			
				|  |  | -        &s->imb, grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | -                     grpc_mdstr_from_string(trailers->headers[i].key),
 | 
	
		
			
				|  |  | -                     grpc_mdstr_from_string(trailers->headers[i].value)));
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cycle through ops and try to take next action. Break when either
 | 
	
		
			
				|  |  | +  an action with callback is taken, or no action is possible.
 | 
	
		
			
				|  |  | +  This can be executed from the Cronet network thread via cronet callback
 | 
	
		
			
				|  |  | +  or on the application supplied thread via the perform_stream_op function.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void execute_from_storage(stream_obj *s) {
 | 
	
		
			
				|  |  | +  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&s->mu);
 | 
	
		
			
				|  |  | +  for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(curr->done == 0);
 | 
	
		
			
				|  |  | +    enum e_op_result result = execute_stream_op(&exec_ctx, curr);
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
 | 
	
		
			
				|  |  | +               op_result_string(result));
 | 
	
		
			
				|  |  | +    /* if this op is done, then remove it and free memory */
 | 
	
		
			
				|  |  | +    if (curr->done) {
 | 
	
		
			
				|  |  | +      struct op_and_state *next = curr->next;
 | 
	
		
			
				|  |  | +      remove_from_storage(s, curr);
 | 
	
		
			
				|  |  | +      curr = next;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
 | 
	
		
			
				|  |  | +    if (result == NO_ACTION_POSSIBLE) {
 | 
	
		
			
				|  |  | +      curr = curr->next;
 | 
	
		
			
				|  |  | +    } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  s->response_trailers_received = true;
 | 
	
		
			
				|  |  | -  next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&s->mu);
 | 
	
		
			
				|  |  | +  grpc_exec_ctx_finish(&exec_ctx);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void on_write_completed(cronet_bidirectional_stream *stream,
 | 
	
		
			
				|  |  | -                               const char *data) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "W: on_write_completed");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
 | 
	
		
			
				|  |  |    stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | -  enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
 | 
	
		
			
				|  |  | -  s->cronet_send_state = CRONET_WRITE_COMPLETED;
 | 
	
		
			
				|  |  | -  next_send_step(s);
 | 
	
		
			
				|  |  | +  cronet_bidirectional_stream_destroy(s->cbs);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_FAILED] = true;
 | 
	
		
			
				|  |  | +  s->cbs = NULL;
 | 
	
		
			
				|  |  | +  if (s->header_array.headers) {
 | 
	
		
			
				|  |  | +    gpr_free(s->header_array.headers);
 | 
	
		
			
				|  |  | +    s->header_array.headers = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (s->state.ws.write_buffer) {
 | 
	
		
			
				|  |  | +    gpr_free(s->state.ws.write_buffer);
 | 
	
		
			
				|  |  | +    s->state.ws.write_buffer = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
 | 
	
		
			
				|  |  | -  gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
 | 
	
		
			
				|  |  | -  uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
 | 
	
		
			
				|  |  | -  if (s->total_read_bytes > 0) {
 | 
	
		
			
				|  |  | -    // Only copy if there is non-zero number of bytes
 | 
	
		
			
				|  |  | -    memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
 | 
	
		
			
				|  |  | -    gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_canceled(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
 | 
	
		
			
				|  |  | +  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +  cronet_bidirectional_stream_destroy(s->cbs);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_CANCELED] = true;
 | 
	
		
			
				|  |  | +  s->cbs = NULL;
 | 
	
		
			
				|  |  | +  if (s->header_array.headers) {
 | 
	
		
			
				|  |  | +    gpr_free(s->header_array.headers);
 | 
	
		
			
				|  |  | +    s->header_array.headers = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
 | 
	
		
			
				|  |  | -  *s->recv_message = (grpc_byte_buffer *)&s->sbs;
 | 
	
		
			
				|  |  | +  if (s->state.ws.write_buffer) {
 | 
	
		
			
				|  |  | +    gpr_free(s->state.ws.write_buffer);
 | 
	
		
			
				|  |  | +    s->state.ws.write_buffer = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static int parse_grpc_header(const uint8_t *data) {
 | 
	
		
			
				|  |  | -  const uint8_t *p = data + 1;
 | 
	
		
			
				|  |  | -  int length = 0;
 | 
	
		
			
				|  |  | -  length |= ((uint8_t)*p++) << 24;
 | 
	
		
			
				|  |  | -  length |= ((uint8_t)*p++) << 16;
 | 
	
		
			
				|  |  | -  length |= ((uint8_t)*p++) << 8;
 | 
	
		
			
				|  |  | -  length |= ((uint8_t)*p++);
 | 
	
		
			
				|  |  | -  return length;
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_succeeded(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
 | 
	
		
			
				|  |  | +  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +  cronet_bidirectional_stream_destroy(s->cbs);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_SUCCEEDED] = true;
 | 
	
		
			
				|  |  | +  s->cbs = NULL;
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
 | 
	
		
			
				|  |  | -                              int count) {
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
 | 
	
		
			
				|  |  |    stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
 | 
	
		
			
				|  |  | -            count, s->total_read_bytes, s->remaining_read_bytes);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (count > 0) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(s->recv_message);
 | 
	
		
			
				|  |  | -    s->remaining_read_bytes -= count;
 | 
	
		
			
				|  |  | -    next_recv_step(s, ON_READ_COMPLETE);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    s->read_closed = true;
 | 
	
		
			
				|  |  | -    next_recv_step(s, ON_READ_COMPLETE);
 | 
	
		
			
				|  |  | +  s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
 | 
	
		
			
				|  |  | +  /* Free the memory allocated for headers */
 | 
	
		
			
				|  |  | +  if (s->header_array.headers) {
 | 
	
		
			
				|  |  | +    gpr_free(s->header_array.headers);
 | 
	
		
			
				|  |  | +    s->header_array.headers = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  |  static void on_response_headers_received(
 | 
	
		
			
				|  |  |      cronet_bidirectional_stream *stream,
 | 
	
		
			
				|  |  |      const cronet_bidirectional_stream_header_array *headers,
 | 
	
		
			
				|  |  |      const char *negotiated_protocol) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "R: on_response_headers_received");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
 | 
	
		
			
				|  |  | +             headers, negotiated_protocol);
 | 
	
		
			
				|  |  |    stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | -  enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
 | 
	
		
			
				|  |  | -  s->response_headers_received = true;
 | 
	
		
			
				|  |  | -  next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
 | 
	
		
			
				|  |  | +  memset(&s->state.rs.initial_metadata, 0,
 | 
	
		
			
				|  |  | +         sizeof(s->state.rs.initial_metadata));
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < headers->count; i++) {
 | 
	
		
			
				|  |  | +    grpc_chttp2_incoming_metadata_buffer_add(
 | 
	
		
			
				|  |  | +        &s->state.rs.initial_metadata,
 | 
	
		
			
				|  |  | +        grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +            grpc_mdstr_from_string(headers->headers[i].key),
 | 
	
		
			
				|  |  | +            grpc_mdstr_from_string(headers->headers[i].value)));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | -  enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
 | 
	
		
			
				|  |  | -  s->cronet_send_state = CRONET_SEND_HEADER;
 | 
	
		
			
				|  |  | -  next_send_step(s);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Callback function pointers (invoked by cronet in response to events)
 | 
	
		
			
				|  |  | -static cronet_bidirectional_stream_callback callbacks = {
 | 
	
		
			
				|  |  | -    on_request_headers_sent,
 | 
	
		
			
				|  |  | -    on_response_headers_received,
 | 
	
		
			
				|  |  | -    on_read_completed,
 | 
	
		
			
				|  |  | -    on_write_completed,
 | 
	
		
			
				|  |  | -    on_response_trailers_received,
 | 
	
		
			
				|  |  | -    on_succeeded,
 | 
	
		
			
				|  |  | -    on_failed,
 | 
	
		
			
				|  |  | -    on_canceled};
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void invoke_closing_callback(stream_obj *s) {
 | 
	
		
			
				|  |  | -  grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
 | 
	
		
			
				|  |  | -                                               s->recv_trailing_metadata);
 | 
	
		
			
				|  |  | -  if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
 | 
	
		
			
				|  |  | -    enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_write_completed(cronet_bidirectional_stream *stream,
 | 
	
		
			
				|  |  | +                               const char *data) {
 | 
	
		
			
				|  |  | +  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
 | 
	
		
			
				|  |  | +  if (s->state.ws.write_buffer) {
 | 
	
		
			
				|  |  | +    gpr_free(s->state.ws.write_buffer);
 | 
	
		
			
				|  |  | +    s->state.ws.write_buffer = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_SEND_MESSAGE] = true;
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_recv_state(stream_obj *s, enum recv_state state) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
 | 
	
		
			
				|  |  | +                              int count) {
 | 
	
		
			
				|  |  | +  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
 | 
	
		
			
				|  |  | +             count);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +  if (count > 0) {
 | 
	
		
			
				|  |  | +    s->state.rs.received_bytes += count;
 | 
	
		
			
				|  |  | +    s->state.rs.remaining_bytes -= count;
 | 
	
		
			
				|  |  | +    if (s->state.rs.remaining_bytes > 0) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
 | 
	
		
			
				|  |  | +      s->state.state_op_done[OP_READ_REQ_MADE] = true;
 | 
	
		
			
				|  |  | +      cronet_bidirectional_stream_read(
 | 
	
		
			
				|  |  | +          s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
 | 
	
		
			
				|  |  | +          s->state.rs.remaining_bytes);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      execute_from_storage(s);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    s->state.rs.read_stream_closed = true;
 | 
	
		
			
				|  |  | +    execute_from_storage(s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  s->cronet_recv_state = state;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// This is invoked from perform_stream_op, and all on_xxxx callbacks.
 | 
	
		
			
				|  |  | -static void next_recv_step(stream_obj *s, enum e_caller caller) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&s->recv_mu);
 | 
	
		
			
				|  |  | -  switch (s->cronet_recv_state) {
 | 
	
		
			
				|  |  | -    case CRONET_RECV_IDLE:
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      if (caller == PERFORM_STREAM_OP ||
 | 
	
		
			
				|  |  | -          caller == ON_RESPONSE_HEADERS_RECEIVED) {
 | 
	
		
			
				|  |  | -        if (s->read_closed && s->response_trailers_received) {
 | 
	
		
			
				|  |  | -          invoke_closing_callback(s);
 | 
	
		
			
				|  |  | -          set_recv_state(s, CRONET_RECV_CLOSED);
 | 
	
		
			
				|  |  | -        } else if (s->response_headers_received == true &&
 | 
	
		
			
				|  |  | -                   s->read_requested == true) {
 | 
	
		
			
				|  |  | -          set_recv_state(s, CRONET_RECV_READ_LENGTH);
 | 
	
		
			
				|  |  | -          s->total_read_bytes = s->remaining_read_bytes =
 | 
	
		
			
				|  |  | -              GRPC_HEADER_SIZE_IN_BYTES;
 | 
	
		
			
				|  |  | -          GPR_ASSERT(s->read_buffer);
 | 
	
		
			
				|  |  | -          if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -            gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -          cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
 | 
	
		
			
				|  |  | -                                           s->remaining_read_bytes);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CRONET_RECV_READ_LENGTH:
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      if (caller == ON_READ_COMPLETE) {
 | 
	
		
			
				|  |  | -        if (s->read_closed) {
 | 
	
		
			
				|  |  | -          invoke_closing_callback(s);
 | 
	
		
			
				|  |  | -          enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
 | 
	
		
			
				|  |  | -          set_recv_state(s, CRONET_RECV_CLOSED);
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          GPR_ASSERT(s->remaining_read_bytes == 0);
 | 
	
		
			
				|  |  | -          set_recv_state(s, CRONET_RECV_READ_DATA);
 | 
	
		
			
				|  |  | -          s->total_read_bytes = s->remaining_read_bytes =
 | 
	
		
			
				|  |  | -              parse_grpc_header((const uint8_t *)s->read_buffer);
 | 
	
		
			
				|  |  | -          s->read_buffer =
 | 
	
		
			
				|  |  | -              gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
 | 
	
		
			
				|  |  | -          GPR_ASSERT(s->read_buffer);
 | 
	
		
			
				|  |  | -          if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -            gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -          if (s->remaining_read_bytes > 0) {
 | 
	
		
			
				|  |  | -            cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
 | 
	
		
			
				|  |  | -                                             s->remaining_read_bytes);
 | 
	
		
			
				|  |  | -          } else {
 | 
	
		
			
				|  |  | -            // Calling the closing callback directly since this is a 0 byte read
 | 
	
		
			
				|  |  | -            // for an empty message.
 | 
	
		
			
				|  |  | -            process_recv_message(s, NULL);
 | 
	
		
			
				|  |  | -            enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
 | 
	
		
			
				|  |  | -            invoke_closing_callback(s);
 | 
	
		
			
				|  |  | -            set_recv_state(s, CRONET_RECV_CLOSED);
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CRONET_RECV_READ_DATA:
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      if (caller == ON_READ_COMPLETE) {
 | 
	
		
			
				|  |  | -        if (s->remaining_read_bytes > 0) {
 | 
	
		
			
				|  |  | -          int offset = s->total_read_bytes - s->remaining_read_bytes;
 | 
	
		
			
				|  |  | -          GPR_ASSERT(s->read_buffer);
 | 
	
		
			
				|  |  | -          if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -            gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -          cronet_bidirectional_stream_read(
 | 
	
		
			
				|  |  | -              s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          gpr_slice_buffer_init(&s->read_slice_buffer);
 | 
	
		
			
				|  |  | -          uint8_t *p = (uint8_t *)s->read_buffer;
 | 
	
		
			
				|  |  | -          process_recv_message(s, p);
 | 
	
		
			
				|  |  | -          set_recv_state(s, CRONET_RECV_IDLE);
 | 
	
		
			
				|  |  | -          enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CRONET_RECV_CLOSED:
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    default:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(0);  // Should not reach here
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Cronet callback
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void on_response_trailers_received(
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream *stream,
 | 
	
		
			
				|  |  | +    const cronet_bidirectional_stream_header_array *trailers) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
 | 
	
		
			
				|  |  | +             trailers);
 | 
	
		
			
				|  |  | +  stream_obj *s = (stream_obj *)stream->annotation;
 | 
	
		
			
				|  |  | +  memset(&s->state.rs.trailing_metadata, 0,
 | 
	
		
			
				|  |  | +         sizeof(s->state.rs.trailing_metadata));
 | 
	
		
			
				|  |  | +  s->state.rs.trailing_metadata_valid = false;
 | 
	
		
			
				|  |  | +  grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < trailers->count; i++) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
 | 
	
		
			
				|  |  | +               trailers->headers[i].value);
 | 
	
		
			
				|  |  | +    grpc_chttp2_incoming_metadata_buffer_add(
 | 
	
		
			
				|  |  | +        &s->state.rs.trailing_metadata,
 | 
	
		
			
				|  |  | +        grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +            grpc_mdstr_from_string(trailers->headers[i].key),
 | 
	
		
			
				|  |  | +            grpc_mdstr_from_string(trailers->headers[i].value)));
 | 
	
		
			
				|  |  | +    s->state.rs.trailing_metadata_valid = true;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&s->recv_mu);
 | 
	
		
			
				|  |  | +  s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// This function takes the data from s->write_slice_buffer and assembles into
 | 
	
		
			
				|  |  | -// a contiguous byte stream with 5 byte gRPC header prepended.
 | 
	
		
			
				|  |  | -static void create_grpc_frame(stream_obj *s) {
 | 
	
		
			
				|  |  | -  gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
 | 
	
		
			
				|  |  | -  uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | + Utility function that takes the data from s->write_slice_buffer and assembles
 | 
	
		
			
				|  |  | + into a contiguous byte stream with 5 byte gRPC header prepended.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
 | 
	
		
			
				|  |  | +                              char **pp_write_buffer,
 | 
	
		
			
				|  |  | +                              size_t *p_write_buffer_size) {
 | 
	
		
			
				|  |  | +  gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
 | 
	
		
			
				|  |  |    size_t length = GPR_SLICE_LENGTH(slice);
 | 
	
		
			
				|  |  | -  s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
 | 
	
		
			
				|  |  | -  s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
 | 
	
		
			
				|  |  | -  uint8_t *p = (uint8_t *)s->write_buffer;
 | 
	
		
			
				|  |  | -  // Append 5 byte header
 | 
	
		
			
				|  |  | +  *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
 | 
	
		
			
				|  |  | +  /* This is freed in the on_write_completed callback */
 | 
	
		
			
				|  |  | +  char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
 | 
	
		
			
				|  |  | +  *pp_write_buffer = write_buffer;
 | 
	
		
			
				|  |  | +  uint8_t *p = (uint8_t *)write_buffer;
 | 
	
		
			
				|  |  | +  /* Append 5 byte header */
 | 
	
		
			
				|  |  |    *p++ = 0;
 | 
	
		
			
				|  |  |    *p++ = (uint8_t)(length >> 24);
 | 
	
		
			
				|  |  |    *p++ = (uint8_t)(length >> 16);
 | 
	
		
			
				|  |  |    *p++ = (uint8_t)(length >> 8);
 | 
	
		
			
				|  |  |    *p++ = (uint8_t)(length);
 | 
	
		
			
				|  |  | -  // append actual data
 | 
	
		
			
				|  |  | -  memcpy(p, raw_data, length);
 | 
	
		
			
				|  |  | +  /* append actual data */
 | 
	
		
			
				|  |  | +  memcpy(p, GPR_SLICE_START_PTR(slice), length);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void do_write(stream_obj *s) {
 | 
	
		
			
				|  |  | -  gpr_slice_buffer *sb = &s->write_slice_buffer;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(sb->count <= 1);
 | 
	
		
			
				|  |  | -  if (sb->count > 0) {
 | 
	
		
			
				|  |  | -    create_grpc_frame(s);
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
 | 
	
		
			
				|  |  | -                                      (int)s->write_buffer_size, false);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -//
 | 
	
		
			
				|  |  | -static void next_send_step(stream_obj *s) {
 | 
	
		
			
				|  |  | -  switch (s->cronet_send_state) {
 | 
	
		
			
				|  |  | -    case CRONET_SEND_IDLE:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(
 | 
	
		
			
				|  |  | -          s->cbs);  // cronet_bidirectional_stream is not initialized yet.
 | 
	
		
			
				|  |  | -      s->cronet_send_state = CRONET_REQ_STARTED;
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
 | 
	
		
			
				|  |  | -                                        &s->header_array, false);
 | 
	
		
			
				|  |  | -      // we no longer need the memory that was allocated earlier.
 | 
	
		
			
				|  |  | -      gpr_free(s->header_array.headers);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CRONET_SEND_HEADER:
 | 
	
		
			
				|  |  | -      do_write(s);
 | 
	
		
			
				|  |  | -      s->cronet_send_state = CRONET_WRITE;
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CRONET_WRITE_COMPLETED:
 | 
	
		
			
				|  |  | -      do_write(s);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    default:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(0);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
 | 
	
		
			
				|  |  | -                                               const char *host,
 | 
	
		
			
				|  |  | -                                               stream_obj *s) {
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | + Convert metadata in a format that Cronet can consume
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static void convert_metadata_to_cronet_headers(
 | 
	
		
			
				|  |  | +    grpc_linked_mdelem *head, const char *host, char **pp_url,
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
 | 
	
		
			
				|  |  |    grpc_linked_mdelem *curr = head;
 | 
	
		
			
				|  |  | -  // Walk the linked list and get number of header fields
 | 
	
		
			
				|  |  | +  /* Walk the linked list and get number of header fields */
 | 
	
		
			
				|  |  |    uint32_t num_headers_available = 0;
 | 
	
		
			
				|  |  |    while (curr != NULL) {
 | 
	
		
			
				|  |  |      curr = curr->next;
 | 
	
		
			
				|  |  |      num_headers_available++;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  // Allocate enough memory
 | 
	
		
			
				|  |  | -  s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
 | 
	
		
			
				|  |  | -      sizeof(cronet_bidirectional_stream_header) * num_headers_available);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Walk the linked list again, this time copying the header fields.
 | 
	
		
			
				|  |  | -  // s->num_headers
 | 
	
		
			
				|  |  | -  // can be less than num_headers_available, as some headers are not used for
 | 
	
		
			
				|  |  | -  // cronet
 | 
	
		
			
				|  |  | +  /* Allocate enough memory. It is freed in the on_request_headers_sent callback
 | 
	
		
			
				|  |  | +   */
 | 
	
		
			
				|  |  | +  cronet_bidirectional_stream_header *headers =
 | 
	
		
			
				|  |  | +      (cronet_bidirectional_stream_header *)gpr_malloc(
 | 
	
		
			
				|  |  | +          sizeof(cronet_bidirectional_stream_header) * num_headers_available);
 | 
	
		
			
				|  |  | +  *pp_headers = headers;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* Walk the linked list again, this time copying the header fields.
 | 
	
		
			
				|  |  | +    s->num_headers can be less than num_headers_available, as some headers
 | 
	
		
			
				|  |  | +    are not used for cronet.
 | 
	
		
			
				|  |  | +    TODO (makdharma): Eliminate need to traverse the LL second time for perf.
 | 
	
		
			
				|  |  | +   */
 | 
	
		
			
				|  |  |    curr = head;
 | 
	
		
			
				|  |  | -  s->num_headers = 0;
 | 
	
		
			
				|  |  | -  while (s->num_headers < num_headers_available) {
 | 
	
		
			
				|  |  | +  int num_headers = 0;
 | 
	
		
			
				|  |  | +  while (num_headers < num_headers_available) {
 | 
	
		
			
				|  |  |      grpc_mdelem *mdelem = curr->md;
 | 
	
		
			
				|  |  |      curr = curr->next;
 | 
	
		
			
				|  |  |      const char *key = grpc_mdstr_as_c_string(mdelem->key);
 | 
	
		
			
				|  |  |      const char *value = grpc_mdstr_as_c_string(mdelem->value);
 | 
	
		
			
				|  |  | -    if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
 | 
	
		
			
				|  |  | -        strcmp(key, ":authority") == 0) {
 | 
	
		
			
				|  |  | -      // Cronet populates these fields on its own.
 | 
	
		
			
				|  |  | +    if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
 | 
	
		
			
				|  |  | +        mdelem->key == GRPC_MDSTR_AUTHORITY) {
 | 
	
		
			
				|  |  | +      /* Cronet populates these fields on its own */
 | 
	
		
			
				|  |  |        continue;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (strcmp(key, ":path") == 0) {
 | 
	
		
			
				|  |  | -      // Create URL by appending :path value to the hostname
 | 
	
		
			
				|  |  | -      gpr_asprintf(&s->url, "https://%s%s", host, value);
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | +    if (mdelem->key == GRPC_MDSTR_PATH) {
 | 
	
		
			
				|  |  | +      /* Create URL by appending :path value to the hostname */
 | 
	
		
			
				|  |  | +      gpr_asprintf(pp_url, "https://%s%s", host, value);
 | 
	
		
			
				|  |  |        continue;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->headers[s->num_headers].key = key;
 | 
	
		
			
				|  |  | -    s->headers[s->num_headers].value = value;
 | 
	
		
			
				|  |  | -    s->num_headers++;
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
 | 
	
		
			
				|  |  | +    headers[num_headers].key = key;
 | 
	
		
			
				|  |  | +    headers[num_headers].value = value;
 | 
	
		
			
				|  |  | +    num_headers++;
 | 
	
		
			
				|  |  |      if (curr == NULL) {
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  *p_num_headers = num_headers;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | -                              grpc_stream *gs, grpc_transport_stream_op *op) {
 | 
	
		
			
				|  |  | -  grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(ct->engine);
 | 
	
		
			
				|  |  | -  stream_obj *s = (stream_obj *)gs;
 | 
	
		
			
				|  |  | -  if (op->recv_trailing_metadata) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | -              "perform_stream_op - recv_trailing_metadata: on_complete=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +static int parse_grpc_header(const uint8_t *data) {
 | 
	
		
			
				|  |  | +  const uint8_t *p = data + 1;
 | 
	
		
			
				|  |  | +  int length = 0;
 | 
	
		
			
				|  |  | +  length |= ((uint8_t)*p++) << 24;
 | 
	
		
			
				|  |  | +  length |= ((uint8_t)*p++) << 16;
 | 
	
		
			
				|  |  | +  length |= ((uint8_t)*p++) << 8;
 | 
	
		
			
				|  |  | +  length |= ((uint8_t)*p++);
 | 
	
		
			
				|  |  | +  return length;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Op Execution: Decide if one of the actions contained in the stream op can be
 | 
	
		
			
				|  |  | +  executed. This is the heart of the state machine.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static bool op_can_be_run(grpc_transport_stream_op *curr_op,
 | 
	
		
			
				|  |  | +                          struct op_state *stream_state,
 | 
	
		
			
				|  |  | +                          struct op_state *op_state, enum e_op_id op_id) {
 | 
	
		
			
				|  |  | +  bool result = true;
 | 
	
		
			
				|  |  | +  /* When call is canceled, every op can be run, except under following
 | 
	
		
			
				|  |  | +  conditions
 | 
	
		
			
				|  |  | +  */
 | 
	
		
			
				|  |  | +  bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
 | 
	
		
			
				|  |  | +                               stream_state->state_callback_received[OP_FAILED];
 | 
	
		
			
				|  |  | +  if (is_canceled_of_failed) {
 | 
	
		
			
				|  |  | +    if (op_id == OP_SEND_INITIAL_METADATA) result = false;
 | 
	
		
			
				|  |  | +    if (op_id == OP_SEND_MESSAGE) result = false;
 | 
	
		
			
				|  |  | +    if (op_id == OP_SEND_TRAILING_METADATA) result = false;
 | 
	
		
			
				|  |  | +    if (op_id == OP_CANCEL_ERROR) result = false;
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (op_id == OP_RECV_INITIAL_METADATA &&
 | 
	
		
			
				|  |  | +        stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    if (op_id == OP_RECV_MESSAGE &&
 | 
	
		
			
				|  |  | +        stream_state->state_op_done[OP_RECV_MESSAGE])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    if (op_id == OP_RECV_TRAILING_METADATA &&
 | 
	
		
			
				|  |  | +        stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_SEND_INITIAL_METADATA) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_RECV_INITIAL_METADATA) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
 | 
	
		
			
				|  |  | +    /* we haven't sent headers yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    /* we haven't received headers yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_SEND_MESSAGE) {
 | 
	
		
			
				|  |  | +    /* already executed (note we're checking op specific state, not stream
 | 
	
		
			
				|  |  | +     state) */
 | 
	
		
			
				|  |  | +    if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
 | 
	
		
			
				|  |  | +    /* we haven't sent headers yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_RECV_MESSAGE) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
 | 
	
		
			
				|  |  | +    /* we haven't received headers yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_RECV_TRAILING_METADATA) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
 | 
	
		
			
				|  |  | +    /* we have asked for but haven't received message yet. */
 | 
	
		
			
				|  |  | +    else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
 | 
	
		
			
				|  |  | +             !stream_state->state_op_done[OP_RECV_MESSAGE])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    /* we haven't received trailers  yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    /* we haven't received on_succeeded  yet. */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_SUCCEEDED])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_SEND_TRAILING_METADATA) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
 | 
	
		
			
				|  |  | +    /* we haven't sent initial metadata yet */
 | 
	
		
			
				|  |  | +    else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    /* we haven't sent message yet */
 | 
	
		
			
				|  |  | +    else if (curr_op->send_message &&
 | 
	
		
			
				|  |  | +             !stream_state->state_op_done[OP_SEND_MESSAGE])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    /* we haven't got on_write_completed for the send yet */
 | 
	
		
			
				|  |  | +    else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
 | 
	
		
			
				|  |  | +             !stream_state->state_callback_received[OP_SEND_MESSAGE])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_CANCEL_ERROR) {
 | 
	
		
			
				|  |  | +    /* already executed */
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
 | 
	
		
			
				|  |  | +  } else if (op_id == OP_ON_COMPLETE) {
 | 
	
		
			
				|  |  | +    /* already executed (note we're checking op specific state, not stream
 | 
	
		
			
				|  |  | +    state) */
 | 
	
		
			
				|  |  | +    if (op_state->state_op_done[OP_ON_COMPLETE]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->recv_trailing_metadata = op->recv_trailing_metadata;
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (op->recv_message) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +    /* Check if every op that was asked for is done. */
 | 
	
		
			
				|  |  | +    else if (curr_op->send_initial_metadata &&
 | 
	
		
			
				|  |  | +             !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->send_message &&
 | 
	
		
			
				|  |  | +               !op_state->state_op_done[OP_SEND_MESSAGE]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->send_message &&
 | 
	
		
			
				|  |  | +               !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->send_trailing_metadata &&
 | 
	
		
			
				|  |  | +               !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->recv_initial_metadata &&
 | 
	
		
			
				|  |  | +               !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->recv_message &&
 | 
	
		
			
				|  |  | +               !stream_state->state_op_done[OP_RECV_MESSAGE]) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  | +    } else if (curr_op->recv_trailing_metadata) {
 | 
	
		
			
				|  |  | +      /* We aren't done with trailing metadata yet */
 | 
	
		
			
				|  |  | +      if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
 | 
	
		
			
				|  |  | +        CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +        result = false;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      /* We've asked for actual message in an earlier op, and it hasn't been
 | 
	
		
			
				|  |  | +        delivered yet. */
 | 
	
		
			
				|  |  | +      else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
 | 
	
		
			
				|  |  | +        /* If this op is not the one asking for read, (which means some earlier
 | 
	
		
			
				|  |  | +          op has asked), and the read hasn't been delivered. */
 | 
	
		
			
				|  |  | +        if (!curr_op->recv_message &&
 | 
	
		
			
				|  |  | +            !stream_state->state_callback_received[OP_SUCCEEDED]) {
 | 
	
		
			
				|  |  | +          CRONET_LOG(GPR_DEBUG, "Because");
 | 
	
		
			
				|  |  | +          result = false;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->recv_message = (grpc_byte_buffer **)op->recv_message;
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
 | 
	
		
			
				|  |  | -    s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
 | 
	
		
			
				|  |  | -    s->read_requested = true;
 | 
	
		
			
				|  |  | -    next_recv_step(s, PERFORM_STREAM_OP);
 | 
	
		
			
				|  |  | +    /* We should see at least one on_write_completed for the trailers that we
 | 
	
		
			
				|  |  | +      sent */
 | 
	
		
			
				|  |  | +    else if (curr_op->send_trailing_metadata &&
 | 
	
		
			
				|  |  | +             !stream_state->state_callback_received[OP_SEND_MESSAGE])
 | 
	
		
			
				|  |  | +      result = false;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (op->recv_initial_metadata) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
 | 
	
		
			
				|  |  | +             result ? "YES" : "NO");
 | 
	
		
			
				|  |  | +  return result;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  TODO (makdharma): Break down this function in smaller chunks for readability.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                          struct op_and_state *oas) {
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op *stream_op = &oas->op;
 | 
	
		
			
				|  |  | +  struct stream_obj *s = oas->s;
 | 
	
		
			
				|  |  | +  struct op_state *stream_state = &s->state;
 | 
	
		
			
				|  |  | +  enum e_op_result result = NO_ACTION_POSSIBLE;
 | 
	
		
			
				|  |  | +  if (stream_op->send_initial_metadata &&
 | 
	
		
			
				|  |  | +      op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                    OP_SEND_INITIAL_METADATA)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
 | 
	
		
			
				|  |  | +    /* This OP is the beginning. Reset various states */
 | 
	
		
			
				|  |  | +    memset(&s->header_array, 0, sizeof(s->header_array));
 | 
	
		
			
				|  |  | +    memset(&stream_state->rs, 0, sizeof(stream_state->rs));
 | 
	
		
			
				|  |  | +    memset(&stream_state->ws, 0, sizeof(stream_state->ws));
 | 
	
		
			
				|  |  | +    memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
 | 
	
		
			
				|  |  | +    memset(stream_state->state_callback_received, 0,
 | 
	
		
			
				|  |  | +           sizeof(stream_state->state_callback_received));
 | 
	
		
			
				|  |  | +    /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
 | 
	
		
			
				|  |  | +     * on_failed */
 | 
	
		
			
				|  |  | +    GPR_ASSERT(s->cbs == NULL);
 | 
	
		
			
				|  |  | +    s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
 | 
	
		
			
				|  |  | +                                                &cronet_callbacks);
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
 | 
	
		
			
				|  |  | +    char *url;
 | 
	
		
			
				|  |  | +    s->header_array.headers = NULL;
 | 
	
		
			
				|  |  | +    convert_metadata_to_cronet_headers(
 | 
	
		
			
				|  |  | +        stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
 | 
	
		
			
				|  |  | +        &s->header_array.headers, &s->header_array.count);
 | 
	
		
			
				|  |  | +    s->header_array.capacity = s->header_array.count;
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
 | 
	
		
			
				|  |  | +               url);
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
 | 
	
		
			
				|  |  | +                                      false);
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  | +  } else if (stream_op->recv_initial_metadata &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_RECV_INITIAL_METADATA)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
 | 
	
		
			
				|  |  | +    if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
 | 
	
		
			
				|  |  | +      grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
		
			
				|  |  | +          &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
 | 
	
		
			
				|  |  | +      grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
 | 
	
		
			
				|  |  | +                          GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
 | 
	
		
			
				|  |  | +                          GRPC_ERROR_CANCELLED, NULL);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->recv_initial_metadata = op->recv_initial_metadata;
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_RECV_INITIAL_METADATA][0] =
 | 
	
		
			
				|  |  | -        op->recv_initial_metadata_ready;
 | 
	
		
			
				|  |  | -    s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (op->send_initial_metadata) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | -              "perform_stream_op - send_initial_metadata: on_complete=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_NO_CALLBACK;
 | 
	
		
			
				|  |  | +  } else if (stream_op->send_message &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_SEND_MESSAGE)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
 | 
	
		
			
				|  |  | +    gpr_slice_buffer write_slice_buffer;
 | 
	
		
			
				|  |  | +    gpr_slice slice;
 | 
	
		
			
				|  |  | +    gpr_slice_buffer_init(&write_slice_buffer);
 | 
	
		
			
				|  |  | +    grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
 | 
	
		
			
				|  |  | +                          stream_op->send_message->length, NULL);
 | 
	
		
			
				|  |  | +    /* Check that compression flag is OFF. We don't support compression yet. */
 | 
	
		
			
				|  |  | +    if (stream_op->send_message->flags != 0) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Compression is not supported");
 | 
	
		
			
				|  |  | +      GPR_ASSERT(stream_op->send_message->flags == 0);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    s->num_headers = 0;
 | 
	
		
			
				|  |  | -    convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
 | 
	
		
			
				|  |  | -                                       ct->host, s);
 | 
	
		
			
				|  |  | -    s->header_array.count = s->num_headers;
 | 
	
		
			
				|  |  | -    s->header_array.capacity = s->num_headers;
 | 
	
		
			
				|  |  | -    s->header_array.headers = s->headers;
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (op->send_message) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +    gpr_slice_buffer_add(&write_slice_buffer, slice);
 | 
	
		
			
				|  |  | +    if (write_slice_buffer.count != 1) {
 | 
	
		
			
				|  |  | +      /* Empty request not handled yet */
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Empty request is not supported");
 | 
	
		
			
				|  |  | +      GPR_ASSERT(write_slice_buffer.count == 1);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (write_slice_buffer.count > 0) {
 | 
	
		
			
				|  |  | +      size_t write_buffer_size;
 | 
	
		
			
				|  |  | +      create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
 | 
	
		
			
				|  |  | +                        &write_buffer_size);
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
 | 
	
		
			
				|  |  | +                 s->cbs, stream_state->ws.write_buffer);
 | 
	
		
			
				|  |  | +      stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
 | 
	
		
			
				|  |  | +      cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
 | 
	
		
			
				|  |  | +                                        (int)write_buffer_size, false);
 | 
	
		
			
				|  |  | +      result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
 | 
	
		
			
				|  |  | -                          op->send_message->length, NULL);
 | 
	
		
			
				|  |  | -    // Check that compression flag is not ON. We don't support compression yet.
 | 
	
		
			
				|  |  | -    // TODO (makdharma): add compression support
 | 
	
		
			
				|  |  | -    GPR_ASSERT(op->send_message->flags == 0);
 | 
	
		
			
				|  |  | -    gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
 | 
	
		
			
				|  |  | -    if (s->cbs == NULL) {
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_SEND_MESSAGE] = true;
 | 
	
		
			
				|  |  | +    oas->state.state_op_done[OP_SEND_MESSAGE] = true;
 | 
	
		
			
				|  |  | +  } else if (stream_op->recv_message &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_RECV_MESSAGE)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
 | 
	
		
			
				|  |  | +    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
 | 
	
		
			
				|  |  | +      grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
 | 
	
		
			
				|  |  | +                          GRPC_ERROR_CANCELLED, NULL);
 | 
	
		
			
				|  |  | +      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +    } else if (stream_state->rs.read_stream_closed == true) {
 | 
	
		
			
				|  |  | +      /* No more data will be received */
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "read stream closed");
 | 
	
		
			
				|  |  | +      grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
 | 
	
		
			
				|  |  | +                          GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | +      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +    } else if (stream_state->rs.length_field_received == false) {
 | 
	
		
			
				|  |  | +      if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
 | 
	
		
			
				|  |  | +          stream_state->rs.remaining_bytes == 0) {
 | 
	
		
			
				|  |  | +        /* Start a read operation for data */
 | 
	
		
			
				|  |  | +        stream_state->rs.length_field_received = true;
 | 
	
		
			
				|  |  | +        stream_state->rs.length_field = stream_state->rs.remaining_bytes =
 | 
	
		
			
				|  |  | +            parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
 | 
	
		
			
				|  |  | +        CRONET_LOG(GPR_DEBUG, "length field = %d",
 | 
	
		
			
				|  |  | +                   stream_state->rs.length_field);
 | 
	
		
			
				|  |  | +        if (stream_state->rs.length_field > 0) {
 | 
	
		
			
				|  |  | +          stream_state->rs.read_buffer =
 | 
	
		
			
				|  |  | +              gpr_malloc(stream_state->rs.length_field);
 | 
	
		
			
				|  |  | +          GPR_ASSERT(stream_state->rs.read_buffer);
 | 
	
		
			
				|  |  | +          stream_state->rs.remaining_bytes = stream_state->rs.length_field;
 | 
	
		
			
				|  |  | +          stream_state->rs.received_bytes = 0;
 | 
	
		
			
				|  |  | +          CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
 | 
	
		
			
				|  |  | +          stream_state->state_op_done[OP_READ_REQ_MADE] =
 | 
	
		
			
				|  |  | +              true; /* Indicates that at least one read request has been made */
 | 
	
		
			
				|  |  | +          cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
 | 
	
		
			
				|  |  | +                                           stream_state->rs.remaining_bytes);
 | 
	
		
			
				|  |  | +          result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +          stream_state->rs.remaining_bytes = 0;
 | 
	
		
			
				|  |  | +          CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
 | 
	
		
			
				|  |  | +          gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
 | 
	
		
			
				|  |  | +          grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
 | 
	
		
			
				|  |  | +                                        &stream_state->rs.read_slice_buffer, 0);
 | 
	
		
			
				|  |  | +          *((grpc_byte_buffer **)stream_op->recv_message) =
 | 
	
		
			
				|  |  | +              (grpc_byte_buffer *)&stream_state->rs.sbs;
 | 
	
		
			
				|  |  | +          grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
 | 
	
		
			
				|  |  | +                              GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | +          stream_state->state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +          oas->state.state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +          result = ACTION_TAKEN_NO_CALLBACK;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      } else if (stream_state->rs.remaining_bytes == 0) {
 | 
	
		
			
				|  |  | +        /* Start a read operation for first 5 bytes (GRPC header) */
 | 
	
		
			
				|  |  | +        stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
 | 
	
		
			
				|  |  | +        stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
 | 
	
		
			
				|  |  | +        stream_state->rs.received_bytes = 0;
 | 
	
		
			
				|  |  | +        CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
 | 
	
		
			
				|  |  | +        stream_state->state_op_done[OP_READ_REQ_MADE] =
 | 
	
		
			
				|  |  | +            true; /* Indicates that at least one read request has been made */
 | 
	
		
			
				|  |  | +        cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
 | 
	
		
			
				|  |  | +                                         stream_state->rs.remaining_bytes);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
 | 
	
		
			
				|  |  | -      GPR_ASSERT(s->cbs);
 | 
	
		
			
				|  |  | -      s->read_closed = false;
 | 
	
		
			
				|  |  | -      s->response_trailers_received = false;
 | 
	
		
			
				|  |  | -      s->response_headers_received = false;
 | 
	
		
			
				|  |  | -      s->cronet_send_state = CRONET_SEND_IDLE;
 | 
	
		
			
				|  |  | -      s->cronet_recv_state = CRONET_RECV_IDLE;
 | 
	
		
			
				|  |  | +      result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  | +    } else if (stream_state->rs.remaining_bytes == 0) {
 | 
	
		
			
				|  |  | +      CRONET_LOG(GPR_DEBUG, "read operation complete");
 | 
	
		
			
				|  |  | +      gpr_slice read_data_slice =
 | 
	
		
			
				|  |  | +          gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
 | 
	
		
			
				|  |  | +      uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
 | 
	
		
			
				|  |  | +      memcpy(dst_p, stream_state->rs.read_buffer,
 | 
	
		
			
				|  |  | +             (size_t)stream_state->rs.length_field);
 | 
	
		
			
				|  |  | +      gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
 | 
	
		
			
				|  |  | +      gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
 | 
	
		
			
				|  |  | +                           read_data_slice);
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
 | 
	
		
			
				|  |  | +                                    &stream_state->rs.read_slice_buffer, 0);
 | 
	
		
			
				|  |  | +      *((grpc_byte_buffer **)stream_op->recv_message) =
 | 
	
		
			
				|  |  | +          (grpc_byte_buffer *)&stream_state->rs.sbs;
 | 
	
		
			
				|  |  | +      grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
 | 
	
		
			
				|  |  | +                          GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | +      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
 | 
	
		
			
				|  |  | +      /* Clear read state of the stream, so next read op (if it were to come)
 | 
	
		
			
				|  |  | +       * will work */
 | 
	
		
			
				|  |  | +      stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
 | 
	
		
			
				|  |  | +          stream_state->rs.length_field_received = 0;
 | 
	
		
			
				|  |  | +      result = ACTION_TAKEN_NO_CALLBACK;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
 | 
	
		
			
				|  |  | -    next_send_step(s);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (op->send_trailing_metadata) {
 | 
	
		
			
				|  |  | -    if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | -              "perform_stream_op - send_trailing_metadata: on_complete=%p",
 | 
	
		
			
				|  |  | -              op->on_complete);
 | 
	
		
			
				|  |  | +  } else if (stream_op->recv_trailing_metadata &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_RECV_TRAILING_METADATA)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
 | 
	
		
			
				|  |  | +    if (oas->s->state.rs.trailing_metadata_valid) {
 | 
	
		
			
				|  |  | +      grpc_chttp2_incoming_metadata_buffer_publish(
 | 
	
		
			
				|  |  | +          &oas->s->state.rs.trailing_metadata,
 | 
	
		
			
				|  |  | +          stream_op->recv_trailing_metadata);
 | 
	
		
			
				|  |  | +      stream_state->rs.trailing_metadata_valid = false;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
 | 
	
		
			
				|  |  | -    s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_NO_CALLBACK;
 | 
	
		
			
				|  |  | +  } else if (stream_op->send_trailing_metadata &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_SEND_TRAILING_METADATA)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
 | 
	
		
			
				|  |  | +    stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
 | 
	
		
			
				|  |  | +    cronet_bidirectional_stream_write(s->cbs, "", 0, true);
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  | +  } else if (stream_op->cancel_error &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_CANCEL_ERROR)) {
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
 | 
	
		
			
				|  |  |      if (s->cbs) {
 | 
	
		
			
				|  |  | -      // Send an "empty" write to the far end to signal that we're done.
 | 
	
		
			
				|  |  | -      // This will induce the server to send down trailers.
 | 
	
		
			
				|  |  | -      if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      // We never created a stream. This was probably an empty request.
 | 
	
		
			
				|  |  | -      invoke_closing_callback(s);
 | 
	
		
			
				|  |  | +      cronet_bidirectional_stream_cancel(s->cbs);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    stream_state->state_op_done[OP_CANCEL_ERROR] = true;
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_WITH_CALLBACK;
 | 
	
		
			
				|  |  | +  } else if (stream_op->on_complete &&
 | 
	
		
			
				|  |  | +             op_can_be_run(stream_op, stream_state, &oas->state,
 | 
	
		
			
				|  |  | +                           OP_ON_COMPLETE)) {
 | 
	
		
			
				|  |  | +    /* All actions in this stream_op are complete. Call the on_complete callback
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
 | 
	
		
			
				|  |  | +    grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
 | 
	
		
			
				|  |  | +                        NULL);
 | 
	
		
			
				|  |  | +    oas->state.state_op_done[OP_ON_COMPLETE] = true;
 | 
	
		
			
				|  |  | +    oas->done = true;
 | 
	
		
			
				|  |  | +    /* reset any send message state, only if this ON_COMPLETE is about a send.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    if (stream_op->send_message) {
 | 
	
		
			
				|  |  | +      stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
 | 
	
		
			
				|  |  | +      stream_state->state_op_done[OP_SEND_MESSAGE] = false;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    result = ACTION_TAKEN_NO_CALLBACK;
 | 
	
		
			
				|  |  | +    /* If this is the on_complete callback being called for a received message -
 | 
	
		
			
				|  |  | +      make a note */
 | 
	
		
			
				|  |  | +    if (stream_op->recv_message)
 | 
	
		
			
				|  |  | +      stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    result = NO_ACTION_POSSIBLE;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  return result;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +  Functions used by upper layers to access transport functionality.
 | 
	
		
			
				|  |  | +*/
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  |                         grpc_stream *gs, grpc_stream_refcount *refcount,
 | 
	
		
			
				|  |  |                         const void *server_data) {
 | 
	
		
			
				|  |  |    stream_obj *s = (stream_obj *)gs;
 | 
	
		
			
				|  |  | -  memset(s->callback_list, 0, sizeof(s->callback_list));
 | 
	
		
			
				|  |  | +  memset(&s->storage, 0, sizeof(s->storage));
 | 
	
		
			
				|  |  | +  s->storage.head = NULL;
 | 
	
		
			
				|  |  | +  memset(&s->state, 0, sizeof(s->state));
 | 
	
		
			
				|  |  | +  s->curr_op = NULL;
 | 
	
		
			
				|  |  |    s->cbs = NULL;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&s->recv_mu);
 | 
	
		
			
				|  |  | -  s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
 | 
	
		
			
				|  |  | -  s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
 | 
	
		
			
				|  |  | -  gpr_slice_buffer_init(&s->write_slice_buffer);
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  memset(&s->header_array, 0, sizeof(s->header_array));
 | 
	
		
			
				|  |  | +  memset(&s->state.rs, 0, sizeof(s->state.rs));
 | 
	
		
			
				|  |  | +  memset(&s->state.ws, 0, sizeof(s->state.ws));
 | 
	
		
			
				|  |  | +  memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
 | 
	
		
			
				|  |  | +  memset(s->state.state_callback_received, 0,
 | 
	
		
			
				|  |  | +         sizeof(s->state.state_callback_received));
 | 
	
		
			
				|  |  | +  gpr_mu_init(&s->mu);
 | 
	
		
			
				|  |  |    return 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | -                           grpc_stream *gs, void *and_free_memory) {
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "Destroy stream");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | +                                   grpc_stream *gs, grpc_pollset *pollset) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                       grpc_transport *gt, grpc_stream *gs,
 | 
	
		
			
				|  |  | +                                       grpc_pollset_set *pollset_set) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | +                              grpc_stream *gs, grpc_transport_stream_op *op) {
 | 
	
		
			
				|  |  | +  CRONET_LOG(GPR_DEBUG, "perform_stream_op");
 | 
	
		
			
				|  |  |    stream_obj *s = (stream_obj *)gs;
 | 
	
		
			
				|  |  | -  s->cbs = NULL;
 | 
	
		
			
				|  |  | -  gpr_free(s->read_buffer);
 | 
	
		
			
				|  |  | -  gpr_free(s->write_buffer);
 | 
	
		
			
				|  |  | -  gpr_free(s->url);
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&s->recv_mu);
 | 
	
		
			
				|  |  | -  if (and_free_memory) {
 | 
	
		
			
				|  |  | -    gpr_free(and_free_memory);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  s->curr_gs = gs;
 | 
	
		
			
				|  |  | +  memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport));
 | 
	
		
			
				|  |  | +  add_to_storage(s, op);
 | 
	
		
			
				|  |  | +  execute_from_storage(s);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
 | 
	
		
			
				|  |  | -  grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
 | 
	
		
			
				|  |  | -  gpr_free(ct->host);
 | 
	
		
			
				|  |  | -  if (grpc_cronet_trace) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_DEBUG, "Destroy transport");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | +                           grpc_stream *gs, void *and_free_memory) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
 | 
	
		
			
				|  |  | +  return NULL;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 | 
	
		
			
				|  |  | +                       grpc_transport_op *op) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
 | 
	
		
			
				|  |  |                                                    "cronet_http",
 | 
	
		
			
				|  |  |                                                    init_stream,
 | 
	
		
			
				|  |  |                                                    set_pollset_do_nothing,
 | 
	
		
			
				|  |  |                                                    set_pollset_set_do_nothing,
 | 
	
		
			
				|  |  |                                                    perform_stream_op,
 | 
	
		
			
				|  |  | -                                                  NULL,
 | 
	
		
			
				|  |  | +                                                  perform_op,
 | 
	
		
			
				|  |  |                                                    destroy_stream,
 | 
	
		
			
				|  |  |                                                    destroy_transport,
 | 
	
		
			
				|  |  | -                                                  NULL};
 | 
	
		
			
				|  |  | +                                                  get_peer};
 |