|  | @@ -34,13 +34,15 @@
 | 
	
		
			
				|  |  |  #include "src/core/channel/client_channel.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include <stdio.h>
 | 
	
		
			
				|  |  | +#include <string.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include "src/core/channel/channel_args.h"
 | 
	
		
			
				|  |  | -#include "src/core/channel/child_channel.h"
 | 
	
		
			
				|  |  |  #include "src/core/channel/connected_channel.h"
 | 
	
		
			
				|  |  | +#include "src/core/surface/channel.h"
 | 
	
		
			
				|  |  |  #include "src/core/iomgr/iomgr.h"
 | 
	
		
			
				|  |  |  #include "src/core/iomgr/pollset_set.h"
 | 
	
		
			
				|  |  |  #include "src/core/support/string.h"
 | 
	
		
			
				|  |  | +#include "src/core/transport/connectivity_state.h"
 | 
	
		
			
				|  |  |  #include <grpc/support/alloc.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/log.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/sync.h>
 | 
	
	
		
			
				|  | @@ -51,31 +53,38 @@
 | 
	
		
			
				|  |  |  typedef struct call_data call_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct {
 | 
	
		
			
				|  |  | -  /* protects children, child_count, child_capacity, active_child,
 | 
	
		
			
				|  |  | -     transport_setup_initiated
 | 
	
		
			
				|  |  | -     does not protect channel stacks held by children
 | 
	
		
			
				|  |  | -     transport_setup is assumed to be set once during construction */
 | 
	
		
			
				|  |  | -  gpr_mu mu;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* the sending child (may be null) */
 | 
	
		
			
				|  |  | -  grpc_child_channel *active_child;
 | 
	
		
			
				|  |  | +  /** metadata context for this channel */
 | 
	
		
			
				|  |  |    grpc_mdctx *mdctx;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* calls waiting for a channel to be ready */
 | 
	
		
			
				|  |  | -  call_data **waiting_children;
 | 
	
		
			
				|  |  | -  size_t waiting_child_count;
 | 
	
		
			
				|  |  | -  size_t waiting_child_capacity;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* transport setup for this channel */
 | 
	
		
			
				|  |  | -  grpc_transport_setup *transport_setup;
 | 
	
		
			
				|  |  | -  int transport_setup_initiated;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  grpc_channel_args *args;
 | 
	
		
			
				|  |  | +  /** resolver for this channel */
 | 
	
		
			
				|  |  | +  grpc_resolver *resolver;
 | 
	
		
			
				|  |  | +  /** master channel - the grpc_channel instance that ultimately owns
 | 
	
		
			
				|  |  | +      this channel_data via its channel stack.
 | 
	
		
			
				|  |  | +      We occasionally use this to bump the refcount on the master channel
 | 
	
		
			
				|  |  | +      to keep ourselves alive through an asynchronous operation. */
 | 
	
		
			
				|  |  | +  grpc_channel *master;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /** mutex protecting client configuration, including all
 | 
	
		
			
				|  |  | +      variables below in this data structure */
 | 
	
		
			
				|  |  | +  gpr_mu mu_config;
 | 
	
		
			
				|  |  | +  /** currently active load balancer - guarded by mu_config */
 | 
	
		
			
				|  |  | +  grpc_lb_policy *lb_policy;
 | 
	
		
			
				|  |  | +  /** incoming configuration - set by resolver.next
 | 
	
		
			
				|  |  | +      guarded by mu_config */
 | 
	
		
			
				|  |  | +  grpc_client_config *incoming_configuration;
 | 
	
		
			
				|  |  | +  /** a list of closures that are all waiting for config to come in */
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure *waiting_for_config_closures;
 | 
	
		
			
				|  |  | +  /** resolver callback */
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure on_config_changed;
 | 
	
		
			
				|  |  | +  /** connectivity state being tracked */
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  |  } channel_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef enum {
 | 
	
		
			
				|  |  |    CALL_CREATED,
 | 
	
		
			
				|  |  | -  CALL_WAITING,
 | 
	
		
			
				|  |  | +  CALL_WAITING_FOR_SEND,
 | 
	
		
			
				|  |  | +  CALL_WAITING_FOR_CONFIG,
 | 
	
		
			
				|  |  | +  CALL_WAITING_FOR_PICK,
 | 
	
		
			
				|  |  | +  CALL_WAITING_FOR_CALL,
 | 
	
		
			
				|  |  |    CALL_ACTIVE,
 | 
	
		
			
				|  |  |    CALL_CANCELLED
 | 
	
		
			
				|  |  |  } call_state;
 | 
	
	
		
			
				|  | @@ -84,75 +93,25 @@ struct call_data {
 | 
	
		
			
				|  |  |    /* owning element */
 | 
	
		
			
				|  |  |    grpc_call_element *elem;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  gpr_mu mu_state;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    call_state state;
 | 
	
		
			
				|  |  |    gpr_timespec deadline;
 | 
	
		
			
				|  |  | -  union {
 | 
	
		
			
				|  |  | -    struct {
 | 
	
		
			
				|  |  | -      /* our child call stack */
 | 
	
		
			
				|  |  | -      grpc_child_call *child_call;
 | 
	
		
			
				|  |  | -    } active;
 | 
	
		
			
				|  |  | -    grpc_transport_op waiting_op;
 | 
	
		
			
				|  |  | -    struct {
 | 
	
		
			
				|  |  | -      grpc_linked_mdelem status;
 | 
	
		
			
				|  |  | -      grpc_linked_mdelem details;
 | 
	
		
			
				|  |  | -    } cancelled;
 | 
	
		
			
				|  |  | -  } s;
 | 
	
		
			
				|  |  | +  grpc_subchannel *picked_channel;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure async_setup_task;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op waiting_op;
 | 
	
		
			
				|  |  | +  /* our child call stack */
 | 
	
		
			
				|  |  | +  grpc_subchannel_call *subchannel_call;
 | 
	
		
			
				|  |  | +  grpc_linked_mdelem status;
 | 
	
		
			
				|  |  | +  grpc_linked_mdelem details;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static int prepare_activate(grpc_call_element *elem,
 | 
	
		
			
				|  |  | -                            grpc_child_channel *on_child) {
 | 
	
		
			
				|  |  | -  call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  if (calld->state == CALL_CANCELLED) return 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* no more access to calld->s.waiting allowed */
 | 
	
		
			
				|  |  | -  GPR_ASSERT(calld->state == CALL_WAITING);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (calld->s.waiting_op.bind_pollset) {
 | 
	
		
			
				|  |  | -    grpc_transport_setup_del_interested_party(chand->transport_setup,
 | 
	
		
			
				|  |  | -                                              calld->s.waiting_op.bind_pollset);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  calld->state = CALL_ACTIVE;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* create a child call */
 | 
	
		
			
				|  |  | -  /* TODO(ctiller): pass the waiting op down here */
 | 
	
		
			
				|  |  | -  calld->s.active.child_call =
 | 
	
		
			
				|  |  | -      grpc_child_channel_create_call(on_child, elem, NULL);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  return 1;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
 | 
	
		
			
				|  |  | -  call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | -  grpc_call_element *child_elem =
 | 
	
		
			
				|  |  | -      grpc_child_call_get_top_element(calld->s.active.child_call);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  GPR_ASSERT(calld->state == CALL_ACTIVE);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* continue the start call down the stack, this nees to happen after metadata
 | 
	
		
			
				|  |  | -     are flushed*/
 | 
	
		
			
				|  |  | -  child_elem->filter->start_transport_op(child_elem, op);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void remove_waiting_child(channel_data *chand, call_data *calld) {
 | 
	
		
			
				|  |  | -  size_t new_count;
 | 
	
		
			
				|  |  | -  size_t i;
 | 
	
		
			
				|  |  | -  for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
 | 
	
		
			
				|  |  | -    if (chand->waiting_children[i] == calld) {
 | 
	
		
			
				|  |  | -      grpc_transport_setup_del_interested_party(
 | 
	
		
			
				|  |  | -          chand->transport_setup, calld->s.waiting_op.bind_pollset);
 | 
	
		
			
				|  |  | -      continue;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    chand->waiting_children[new_count++] = chand->waiting_children[i];
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
 | 
	
		
			
				|  |  | -             new_count == chand->waiting_child_count);
 | 
	
		
			
				|  |  | -  chand->waiting_child_count = new_count;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | +static grpc_iomgr_closure *merge_into_waiting_op(
 | 
	
		
			
				|  |  | +    grpc_call_element *elem,
 | 
	
		
			
				|  |  | +    grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void handle_op_after_cancellation(grpc_call_element *elem,
 | 
	
		
			
				|  |  | -                                         grpc_transport_op *op) {
 | 
	
		
			
				|  |  | +                                         grpc_transport_stream_op *op) {
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  |    if (op->send_ops) {
 | 
	
	
		
			
				|  | @@ -163,15 +122,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
 | 
	
		
			
				|  |  |      char status[GPR_LTOA_MIN_BUFSIZE];
 | 
	
		
			
				|  |  |      grpc_metadata_batch mdb;
 | 
	
		
			
				|  |  |      gpr_ltoa(GRPC_STATUS_CANCELLED, status);
 | 
	
		
			
				|  |  | -    calld->s.cancelled.status.md =
 | 
	
		
			
				|  |  | +    calld->status.md =
 | 
	
		
			
				|  |  |          grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
 | 
	
		
			
				|  |  | -    calld->s.cancelled.details.md =
 | 
	
		
			
				|  |  | +    calld->details.md =
 | 
	
		
			
				|  |  |          grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
 | 
	
		
			
				|  |  | -    calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
 | 
	
		
			
				|  |  | -    calld->s.cancelled.status.next = &calld->s.cancelled.details;
 | 
	
		
			
				|  |  | -    calld->s.cancelled.details.prev = &calld->s.cancelled.status;
 | 
	
		
			
				|  |  | -    mdb.list.head = &calld->s.cancelled.status;
 | 
	
		
			
				|  |  | -    mdb.list.tail = &calld->s.cancelled.details;
 | 
	
		
			
				|  |  | +    calld->status.prev = calld->details.next = NULL;
 | 
	
		
			
				|  |  | +    calld->status.next = &calld->details;
 | 
	
		
			
				|  |  | +    calld->details.prev = &calld->status;
 | 
	
		
			
				|  |  | +    mdb.list.head = &calld->status;
 | 
	
		
			
				|  |  | +    mdb.list.tail = &calld->details;
 | 
	
		
			
				|  |  |      mdb.garbage.head = mdb.garbage.tail = NULL;
 | 
	
		
			
				|  |  |      mdb.deadline = gpr_inf_future;
 | 
	
		
			
				|  |  |      grpc_sopb_add_metadata(op->recv_ops, mdb);
 | 
	
	
		
			
				|  | @@ -183,192 +142,372 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void cc_start_transport_op(grpc_call_element *elem,
 | 
	
		
			
				|  |  | -                                  grpc_transport_op *op) {
 | 
	
		
			
				|  |  | +typedef struct {
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure closure;
 | 
	
		
			
				|  |  | +  grpc_call_element *elem;
 | 
	
		
			
				|  |  | +} waiting_call;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void perform_transport_stream_op(grpc_call_element *elem,
 | 
	
		
			
				|  |  | +                                        grpc_transport_stream_op *op,
 | 
	
		
			
				|  |  | +                                        int continuation);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void continue_with_pick(void *arg, int iomgr_success) {
 | 
	
		
			
				|  |  | +  waiting_call *wc = arg;
 | 
	
		
			
				|  |  | +  call_data *calld = wc->elem->call_data;
 | 
	
		
			
				|  |  | +  perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
 | 
	
		
			
				|  |  | +  gpr_free(wc);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void add_to_lb_policy_wait_queue_locked_state_config(
 | 
	
		
			
				|  |  | +    grpc_call_element *elem) {
 | 
	
		
			
				|  |  | +  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | +  waiting_call *wc = gpr_malloc(sizeof(*wc));
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
 | 
	
		
			
				|  |  | +  wc->elem = elem;
 | 
	
		
			
				|  |  | +  wc->closure.next = chand->waiting_for_config_closures;
 | 
	
		
			
				|  |  | +  chand->waiting_for_config_closures = &wc->closure;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static int is_empty(void *p, int len) {
 | 
	
		
			
				|  |  | +  char *ptr = p;
 | 
	
		
			
				|  |  | +  int i;
 | 
	
		
			
				|  |  | +  for (i = 0; i < len; i++) {
 | 
	
		
			
				|  |  | +    if (ptr[i] != 0) return 0;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return 1;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void started_call(void *arg, int iomgr_success) {
 | 
	
		
			
				|  |  | +  call_data *calld = arg;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op op;
 | 
	
		
			
				|  |  | +  int have_waiting;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&calld->mu_state);
 | 
	
		
			
				|  |  | +  if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
 | 
	
		
			
				|  |  | +    memset(&op, 0, sizeof(op));
 | 
	
		
			
				|  |  | +    op.cancel_with_status = GRPC_STATUS_CANCELLED;
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +    grpc_subchannel_call_process_op(calld->subchannel_call, &op);
 | 
	
		
			
				|  |  | +  } else if (calld->state == CALL_WAITING_FOR_CALL) {
 | 
	
		
			
				|  |  | +    have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
 | 
	
		
			
				|  |  | +    if (calld->subchannel_call != NULL) {
 | 
	
		
			
				|  |  | +      calld->state = CALL_ACTIVE;
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      if (have_waiting) {
 | 
	
		
			
				|  |  | +        grpc_subchannel_call_process_op(calld->subchannel_call,
 | 
	
		
			
				|  |  | +                                        &calld->waiting_op);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      calld->state = CALL_CANCELLED;
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      if (have_waiting) {
 | 
	
		
			
				|  |  | +        handle_op_after_cancellation(calld->elem, &calld->waiting_op);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(calld->state == CALL_CANCELLED);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void picked_target(void *arg, int iomgr_success) {
 | 
	
		
			
				|  |  | +  call_data *calld = arg;
 | 
	
		
			
				|  |  | +  grpc_pollset *pollset;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (calld->picked_channel == NULL) {
 | 
	
		
			
				|  |  | +    /* treat this like a cancellation */
 | 
	
		
			
				|  |  | +    calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
 | 
	
		
			
				|  |  | +    perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    gpr_mu_lock(&calld->mu_state);
 | 
	
		
			
				|  |  | +    if (calld->state == CALL_CANCELLED) {
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      handle_op_after_cancellation(calld->elem, &calld->waiting_op);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
 | 
	
		
			
				|  |  | +      calld->state = CALL_WAITING_FOR_CALL;
 | 
	
		
			
				|  |  | +      pollset = calld->waiting_op.bind_pollset;
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
 | 
	
		
			
				|  |  | +      grpc_subchannel_create_call(calld->picked_channel, pollset,
 | 
	
		
			
				|  |  | +                                  &calld->subchannel_call,
 | 
	
		
			
				|  |  | +                                  &calld->async_setup_task);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
 | 
	
		
			
				|  |  | +  grpc_metadata_batch *initial_metadata;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op *op = &calld->waiting_op;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->bind_pollset);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->send_ops);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->send_ops->nops >= 1);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
 | 
	
		
			
				|  |  | +  initial_metadata = &op->send_ops->ops[0].data.metadata;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
 | 
	
		
			
				|  |  | +  grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata,
 | 
	
		
			
				|  |  | +                      &calld->picked_channel, &calld->async_setup_task);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static grpc_iomgr_closure *merge_into_waiting_op(
 | 
	
		
			
				|  |  | +    grpc_call_element *elem, grpc_transport_stream_op *new_op) {
 | 
	
		
			
				|  |  | +  call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure *consumed_op = NULL;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op *waiting_op = &calld->waiting_op;
 | 
	
		
			
				|  |  | +  GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
 | 
	
		
			
				|  |  | +  GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
 | 
	
		
			
				|  |  | +  if (new_op->send_ops != NULL) {
 | 
	
		
			
				|  |  | +    waiting_op->send_ops = new_op->send_ops;
 | 
	
		
			
				|  |  | +    waiting_op->is_last_send = new_op->is_last_send;
 | 
	
		
			
				|  |  | +    waiting_op->on_done_send = new_op->on_done_send;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (new_op->recv_ops != NULL) {
 | 
	
		
			
				|  |  | +    waiting_op->recv_ops = new_op->recv_ops;
 | 
	
		
			
				|  |  | +    waiting_op->recv_state = new_op->recv_state;
 | 
	
		
			
				|  |  | +    waiting_op->on_done_recv = new_op->on_done_recv;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (new_op->on_consumed != NULL) {
 | 
	
		
			
				|  |  | +    if (waiting_op->on_consumed != NULL) {
 | 
	
		
			
				|  |  | +      consumed_op = waiting_op->on_consumed;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    waiting_op->on_consumed = new_op->on_consumed;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (new_op->cancel_with_status != GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | +    waiting_op->cancel_with_status = new_op->cancel_with_status;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return consumed_op;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void perform_transport_stream_op(grpc_call_element *elem,
 | 
	
		
			
				|  |  | +                                        grpc_transport_stream_op *op,
 | 
	
		
			
				|  |  | +                                        int continuation) {
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  grpc_call_element *child_elem;
 | 
	
		
			
				|  |  | -  grpc_transport_op waiting_op;
 | 
	
		
			
				|  |  | +  grpc_subchannel_call *subchannel_call;
 | 
	
		
			
				|  |  | +  grpc_lb_policy *lb_policy;
 | 
	
		
			
				|  |  | +  grpc_transport_stream_op op2;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure *consumed_op = NULL;
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |    GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&calld->mu_state);
 | 
	
		
			
				|  |  |    switch (calld->state) {
 | 
	
		
			
				|  |  |      case CALL_ACTIVE:
 | 
	
		
			
				|  |  | -      child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      child_elem->filter->start_transport_op(child_elem, op);
 | 
	
		
			
				|  |  | +      GPR_ASSERT(!continuation);
 | 
	
		
			
				|  |  | +      subchannel_call = calld->subchannel_call;
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      grpc_subchannel_call_process_op(subchannel_call, op);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  | -    case CALL_CREATED:
 | 
	
		
			
				|  |  | -      if (op->cancel_with_status != GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | -        calld->state = CALL_CANCELLED;
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -        handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        calld->state = CALL_WAITING;
 | 
	
		
			
				|  |  | -        calld->s.waiting_op.bind_pollset = NULL;
 | 
	
		
			
				|  |  | -        if (chand->active_child) {
 | 
	
		
			
				|  |  | -          /* channel is connected - use the connected stack */
 | 
	
		
			
				|  |  | -          if (prepare_activate(elem, chand->active_child)) {
 | 
	
		
			
				|  |  | -            gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -            /* activate the request (pass it down) outside the lock */
 | 
	
		
			
				|  |  | -            complete_activate(elem, op);
 | 
	
		
			
				|  |  | -          } else {
 | 
	
		
			
				|  |  | -            gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | +    case CALL_CANCELLED:
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_SEND:
 | 
	
		
			
				|  |  | +      GPR_ASSERT(!continuation);
 | 
	
		
			
				|  |  | +      consumed_op = merge_into_waiting_op(elem, op);
 | 
	
		
			
				|  |  | +      if (!calld->waiting_op.send_ops &&
 | 
	
		
			
				|  |  | +          calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      *op = calld->waiting_op;
 | 
	
		
			
				|  |  | +      memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
 | 
	
		
			
				|  |  | +      continuation = 1;
 | 
	
		
			
				|  |  | +    /* fall through */
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_CONFIG:
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_PICK:
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_CALL:
 | 
	
		
			
				|  |  | +      if (!continuation) {
 | 
	
		
			
				|  |  | +        if (op->cancel_with_status != GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | +          calld->state = CALL_CANCELLED;
 | 
	
		
			
				|  |  | +          op2 = calld->waiting_op;
 | 
	
		
			
				|  |  | +          memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
 | 
	
		
			
				|  |  | +          if (op->on_consumed) {
 | 
	
		
			
				|  |  | +            calld->waiting_op.on_consumed = op->on_consumed;
 | 
	
		
			
				|  |  | +            op->on_consumed = NULL;
 | 
	
		
			
				|  |  | +          } else if (op2.on_consumed) {
 | 
	
		
			
				|  |  | +            calld->waiting_op.on_consumed = op2.on_consumed;
 | 
	
		
			
				|  |  | +            op2.on_consumed = NULL;
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  | +          gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +          handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  | +          handle_op_after_cancellation(elem, &op2);
 | 
	
		
			
				|  |  |          } else {
 | 
	
		
			
				|  |  | -          /* check to see if we should initiate a connection (if we're not
 | 
	
		
			
				|  |  | -             already),
 | 
	
		
			
				|  |  | -             but don't do so until outside the lock to avoid re-entrancy
 | 
	
		
			
				|  |  | -             problems if
 | 
	
		
			
				|  |  | -             the callback is immediate */
 | 
	
		
			
				|  |  | -          int initiate_transport_setup = 0;
 | 
	
		
			
				|  |  | -          if (!chand->transport_setup_initiated) {
 | 
	
		
			
				|  |  | -            chand->transport_setup_initiated = 1;
 | 
	
		
			
				|  |  | -            initiate_transport_setup = 1;
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -          /* add this call to the waiting set to be resumed once we have a child
 | 
	
		
			
				|  |  | -             channel stack, growing the waiting set if needed */
 | 
	
		
			
				|  |  | -          if (chand->waiting_child_count == chand->waiting_child_capacity) {
 | 
	
		
			
				|  |  | -            chand->waiting_child_capacity =
 | 
	
		
			
				|  |  | -                GPR_MAX(chand->waiting_child_capacity * 2, 8);
 | 
	
		
			
				|  |  | -            chand->waiting_children = gpr_realloc(
 | 
	
		
			
				|  |  | -                chand->waiting_children,
 | 
	
		
			
				|  |  | -                chand->waiting_child_capacity * sizeof(call_data *));
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | -          calld->s.waiting_op = *op;
 | 
	
		
			
				|  |  | -          chand->waiting_children[chand->waiting_child_count++] = calld;
 | 
	
		
			
				|  |  | -          grpc_transport_setup_add_interested_party(chand->transport_setup,
 | 
	
		
			
				|  |  | -                                                    op->bind_pollset);
 | 
	
		
			
				|  |  | -          gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -          /* finally initiate transport setup if needed */
 | 
	
		
			
				|  |  | -          if (initiate_transport_setup) {
 | 
	
		
			
				|  |  | -            grpc_transport_setup_initiate(chand->transport_setup);
 | 
	
		
			
				|  |  | -          }
 | 
	
		
			
				|  |  | +          consumed_op = merge_into_waiting_op(elem, op);
 | 
	
		
			
				|  |  | +          gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +        break;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case CALL_WAITING:
 | 
	
		
			
				|  |  | +    /* fall through */
 | 
	
		
			
				|  |  | +    case CALL_CREATED:
 | 
	
		
			
				|  |  |        if (op->cancel_with_status != GRPC_STATUS_OK) {
 | 
	
		
			
				|  |  | -        waiting_op = calld->s.waiting_op;
 | 
	
		
			
				|  |  | -        remove_waiting_child(chand, calld);
 | 
	
		
			
				|  |  |          calld->state = CALL_CANCELLED;
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -        handle_op_after_cancellation(elem, &waiting_op);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  |          handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  | -        GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
 | 
	
		
			
				|  |  | -                   (op->send_ops == NULL));
 | 
	
		
			
				|  |  | -        GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
 | 
	
		
			
				|  |  | -                   (op->recv_ops == NULL));
 | 
	
		
			
				|  |  | -        if (op->send_ops) {
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.send_ops = op->send_ops;
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.is_last_send = op->is_last_send;
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.on_done_send = op->on_done_send;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        if (op->recv_ops) {
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.recv_ops = op->recv_ops;
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.recv_state = op->recv_state;
 | 
	
		
			
				|  |  | -          calld->s.waiting_op.on_done_recv = op->on_done_recv;
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -        if (op->on_consumed) {
 | 
	
		
			
				|  |  | -          op->on_consumed->cb(op->on_consumed->cb_arg, 0);
 | 
	
		
			
				|  |  | +        calld->waiting_op = *op;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (op->send_ops == NULL) {
 | 
	
		
			
				|  |  | +          /* need to have some send ops before we can select the
 | 
	
		
			
				|  |  | +             lb target */
 | 
	
		
			
				|  |  | +          calld->state = CALL_WAITING_FOR_SEND;
 | 
	
		
			
				|  |  | +          gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +          gpr_mu_lock(&chand->mu_config);
 | 
	
		
			
				|  |  | +          lb_policy = chand->lb_policy;
 | 
	
		
			
				|  |  | +          if (lb_policy) {
 | 
	
		
			
				|  |  | +            GRPC_LB_POLICY_REF(lb_policy, "pick");
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +            calld->state = CALL_WAITING_FOR_PICK;
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            pick_target(lb_policy, calld);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            GRPC_LB_POLICY_UNREF(lb_policy, "pick");
 | 
	
		
			
				|  |  | +          } else if (chand->resolver != NULL) {
 | 
	
		
			
				|  |  | +            calld->state = CALL_WAITING_FOR_CONFIG;
 | 
	
		
			
				|  |  | +            add_to_lb_policy_wait_queue_locked_state_config(elem);
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +          } else {
 | 
	
		
			
				|  |  | +            calld->state = CALL_CANCELLED;
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +            gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +            handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  | -    case CALL_CANCELLED:
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      handle_op_after_cancellation(elem, op);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (consumed_op != NULL) {
 | 
	
		
			
				|  |  | +    consumed_op->cb(consumed_op->cb_arg, 1);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void cc_start_transport_stream_op(grpc_call_element *elem,
 | 
	
		
			
				|  |  | +                                         grpc_transport_stream_op *op) {
 | 
	
		
			
				|  |  | +  perform_transport_stream_op(elem, op, 0);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void cc_on_config_changed(void *arg, int iomgr_success) {
 | 
	
		
			
				|  |  | +  channel_data *chand = arg;
 | 
	
		
			
				|  |  | +  grpc_lb_policy *lb_policy = NULL;
 | 
	
		
			
				|  |  | +  grpc_lb_policy *old_lb_policy;
 | 
	
		
			
				|  |  | +  grpc_resolver *old_resolver;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure *wakeup_closures = NULL;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (chand->incoming_configuration != NULL) {
 | 
	
		
			
				|  |  | +    lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
 | 
	
		
			
				|  |  | +    GRPC_LB_POLICY_REF(lb_policy, "channel");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    grpc_client_config_unref(chand->incoming_configuration);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  chand->incoming_configuration = NULL;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&chand->mu_config);
 | 
	
		
			
				|  |  | +  old_lb_policy = chand->lb_policy;
 | 
	
		
			
				|  |  | +  chand->lb_policy = lb_policy;
 | 
	
		
			
				|  |  | +  if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
 | 
	
		
			
				|  |  | +    wakeup_closures = chand->waiting_for_config_closures;
 | 
	
		
			
				|  |  | +    chand->waiting_for_config_closures = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (old_lb_policy) {
 | 
	
		
			
				|  |  | +    GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&chand->mu_config);
 | 
	
		
			
				|  |  | +  if (iomgr_success && chand->resolver) {
 | 
	
		
			
				|  |  | +    grpc_resolver *resolver = chand->resolver;
 | 
	
		
			
				|  |  | +    GRPC_RESOLVER_REF(resolver, "channel-next");
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +    GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
 | 
	
		
			
				|  |  | +    grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
 | 
	
		
			
				|  |  | +                       &chand->on_config_changed);
 | 
	
		
			
				|  |  | +    GRPC_RESOLVER_UNREF(resolver, "channel-next");
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    old_resolver = chand->resolver;
 | 
	
		
			
				|  |  | +    chand->resolver = NULL;
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_set(&chand->state_tracker,
 | 
	
		
			
				|  |  | +                                GRPC_CHANNEL_FATAL_FAILURE);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +    if (old_resolver != NULL) {
 | 
	
		
			
				|  |  | +      grpc_resolver_shutdown(old_resolver);
 | 
	
		
			
				|  |  | +      GRPC_RESOLVER_UNREF(old_resolver, "channel");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  while (wakeup_closures) {
 | 
	
		
			
				|  |  | +    grpc_iomgr_closure *next = wakeup_closures->next;
 | 
	
		
			
				|  |  | +    grpc_iomgr_add_callback(wakeup_closures);
 | 
	
		
			
				|  |  | +    wakeup_closures = next;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void channel_op(grpc_channel_element *elem,
 | 
	
		
			
				|  |  | -                       grpc_channel_element *from_elem, grpc_channel_op *op) {
 | 
	
		
			
				|  |  | +static void cc_start_transport_op(grpc_channel_element *elem,
 | 
	
		
			
				|  |  | +                                  grpc_transport_op *op) {
 | 
	
		
			
				|  |  | +  grpc_lb_policy *lb_policy = NULL;
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  grpc_child_channel *child_channel;
 | 
	
		
			
				|  |  | -  grpc_channel_op rop;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  | +  grpc_resolver *destroy_resolver = NULL;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure *on_consumed = op->on_consumed;
 | 
	
		
			
				|  |  | +  op->on_consumed = NULL;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->set_accept_stream == NULL);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(op->bind_pollset == NULL);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&chand->mu_config);
 | 
	
		
			
				|  |  | +  if (op->on_connectivity_state_change != NULL) {
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_notify_on_state_change(
 | 
	
		
			
				|  |  | +        &chand->state_tracker, op->connectivity_state,
 | 
	
		
			
				|  |  | +        op->on_connectivity_state_change);
 | 
	
		
			
				|  |  | +    op->on_connectivity_state_change = NULL;
 | 
	
		
			
				|  |  | +    op->connectivity_state = NULL;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  switch (op->type) {
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_GOAWAY:
 | 
	
		
			
				|  |  | -      /* sending goaway: clear out the active child on the way through */
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | -      child_channel = chand->active_child;
 | 
	
		
			
				|  |  | -      chand->active_child = NULL;
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      if (child_channel) {
 | 
	
		
			
				|  |  | -        grpc_child_channel_handle_op(child_channel, op);
 | 
	
		
			
				|  |  | -        grpc_child_channel_destroy(child_channel, 1);
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        gpr_slice_unref(op->data.goaway.message);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case GRPC_CHANNEL_DISCONNECT:
 | 
	
		
			
				|  |  | -      /* sending disconnect: clear out the active child on the way through */
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | -      child_channel = chand->active_child;
 | 
	
		
			
				|  |  | -      chand->active_child = NULL;
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      if (child_channel) {
 | 
	
		
			
				|  |  | -        grpc_child_channel_destroy(child_channel, 1);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      /* fake a transport closed to satisfy the refcounting in client */
 | 
	
		
			
				|  |  | -      rop.type = GRPC_TRANSPORT_CLOSED;
 | 
	
		
			
				|  |  | -      rop.dir = GRPC_CALL_UP;
 | 
	
		
			
				|  |  | -      grpc_channel_next_op(elem, &rop);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case GRPC_TRANSPORT_GOAWAY:
 | 
	
		
			
				|  |  | -      /* receiving goaway: if it's from our active child, drop the active child;
 | 
	
		
			
				|  |  | -         in all cases consume the event here */
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | -      child_channel = grpc_channel_stack_from_top_element(from_elem);
 | 
	
		
			
				|  |  | -      if (child_channel == chand->active_child) {
 | 
	
		
			
				|  |  | -        chand->active_child = NULL;
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        child_channel = NULL;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      if (child_channel) {
 | 
	
		
			
				|  |  | -        grpc_child_channel_destroy(child_channel, 0);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      gpr_slice_unref(op->data.goaway.message);
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    case GRPC_TRANSPORT_CLOSED:
 | 
	
		
			
				|  |  | -      /* receiving disconnect: if it's from our active child, drop the active
 | 
	
		
			
				|  |  | -         child; in all cases consume the event here */
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | -      child_channel = grpc_channel_stack_from_top_element(from_elem);
 | 
	
		
			
				|  |  | -      if (child_channel == chand->active_child) {
 | 
	
		
			
				|  |  | -        chand->active_child = NULL;
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        child_channel = NULL;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      if (child_channel) {
 | 
	
		
			
				|  |  | -        grpc_child_channel_destroy(child_channel, 0);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | -    default:
 | 
	
		
			
				|  |  | -      switch (op->dir) {
 | 
	
		
			
				|  |  | -        case GRPC_CALL_UP:
 | 
	
		
			
				|  |  | -          grpc_channel_next_op(elem, op);
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -        case GRPC_CALL_DOWN:
 | 
	
		
			
				|  |  | -          gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
 | 
	
		
			
				|  |  | -          abort();
 | 
	
		
			
				|  |  | -          break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      break;
 | 
	
		
			
				|  |  | +  if (op->disconnect && chand->resolver != NULL) {
 | 
	
		
			
				|  |  | +    grpc_connectivity_state_set(&chand->state_tracker,
 | 
	
		
			
				|  |  | +                                GRPC_CHANNEL_FATAL_FAILURE);
 | 
	
		
			
				|  |  | +    destroy_resolver = chand->resolver;
 | 
	
		
			
				|  |  | +    chand->resolver = NULL;
 | 
	
		
			
				|  |  | +    if (chand->lb_policy != NULL) {
 | 
	
		
			
				|  |  | +      grpc_lb_policy_shutdown(chand->lb_policy);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (!is_empty(op, sizeof(*op))) {
 | 
	
		
			
				|  |  | +    lb_policy = chand->lb_policy;
 | 
	
		
			
				|  |  | +    if (lb_policy) {
 | 
	
		
			
				|  |  | +      GRPC_LB_POLICY_REF(lb_policy, "broadcast");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&chand->mu_config);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (destroy_resolver) {
 | 
	
		
			
				|  |  | +    grpc_resolver_shutdown(destroy_resolver);
 | 
	
		
			
				|  |  | +    GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (lb_policy) {
 | 
	
		
			
				|  |  | +    grpc_lb_policy_broadcast(lb_policy, op);
 | 
	
		
			
				|  |  | +    GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (on_consumed) {
 | 
	
		
			
				|  |  | +    grpc_iomgr_add_callback(on_consumed);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Constructor for call_data */
 | 
	
		
			
				|  |  |  static void init_call_elem(grpc_call_element *elem,
 | 
	
		
			
				|  |  |                             const void *server_transport_data,
 | 
	
		
			
				|  |  | -                           grpc_transport_op *initial_op) {
 | 
	
		
			
				|  |  | +                           grpc_transport_stream_op *initial_op) {
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* TODO(ctiller): is there something useful we can do here? */
 | 
	
	
		
			
				|  | @@ -376,6 +515,7 @@ static void init_call_elem(grpc_call_element *elem,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |    GPR_ASSERT(server_transport_data == NULL);
 | 
	
		
			
				|  |  | +  gpr_mu_init(&calld->mu_state);
 | 
	
		
			
				|  |  |    calld->elem = elem;
 | 
	
		
			
				|  |  |    calld->state = CALL_CREATED;
 | 
	
		
			
				|  |  |    calld->deadline = gpr_inf_future;
 | 
	
	
		
			
				|  | @@ -384,161 +524,88 @@ static void init_call_elem(grpc_call_element *elem,
 | 
	
		
			
				|  |  |  /* Destructor for call_data */
 | 
	
		
			
				|  |  |  static void destroy_call_elem(grpc_call_element *elem) {
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | +  grpc_subchannel_call *subchannel_call;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* if the call got activated, we need to destroy the child stack also, and
 | 
	
		
			
				|  |  |       remove it from the in-flight requests tracked by the child_entry we
 | 
	
		
			
				|  |  |       picked */
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&calld->mu_state);
 | 
	
		
			
				|  |  |    switch (calld->state) {
 | 
	
		
			
				|  |  |      case CALL_ACTIVE:
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -      grpc_child_call_destroy(calld->s.active.child_call);
 | 
	
		
			
				|  |  | +      subchannel_call = calld->subchannel_call;
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  | +      GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  | -    case CALL_WAITING:
 | 
	
		
			
				|  |  | -      remove_waiting_child(chand, calld);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | +    case CALL_CREATED:
 | 
	
		
			
				|  |  | +    case CALL_CANCELLED:
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(&calld->mu_state);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  | -    default:
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_PICK:
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_CONFIG:
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_CALL:
 | 
	
		
			
				|  |  | +    case CALL_WAITING_FOR_SEND:
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "should never reach here");
 | 
	
		
			
				|  |  | +      abort();
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GPR_ASSERT(calld->state != CALL_WAITING);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Constructor for channel_data */
 | 
	
		
			
				|  |  | -static void init_channel_elem(grpc_channel_element *elem,
 | 
	
		
			
				|  |  | +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
 | 
	
		
			
				|  |  |                                const grpc_channel_args *args,
 | 
	
		
			
				|  |  |                                grpc_mdctx *metadata_context, int is_first,
 | 
	
		
			
				|  |  |                                int is_last) {
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!is_first);
 | 
	
		
			
				|  |  | +  memset(chand, 0, sizeof(*chand));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(is_last);
 | 
	
		
			
				|  |  |    GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_init(&chand->mu);
 | 
	
		
			
				|  |  | -  chand->active_child = NULL;
 | 
	
		
			
				|  |  | -  chand->waiting_children = NULL;
 | 
	
		
			
				|  |  | -  chand->waiting_child_count = 0;
 | 
	
		
			
				|  |  | -  chand->waiting_child_capacity = 0;
 | 
	
		
			
				|  |  | -  chand->transport_setup = NULL;
 | 
	
		
			
				|  |  | -  chand->transport_setup_initiated = 0;
 | 
	
		
			
				|  |  | -  chand->args = grpc_channel_args_copy(args);
 | 
	
		
			
				|  |  | +  gpr_mu_init(&chand->mu_config);
 | 
	
		
			
				|  |  |    chand->mdctx = metadata_context;
 | 
	
		
			
				|  |  | +  chand->master = master;
 | 
	
		
			
				|  |  | +  grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
 | 
	
		
			
				|  |  | +                          chand);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Destructor for channel_data */
 | 
	
		
			
				|  |  |  static void destroy_channel_elem(grpc_channel_element *elem) {
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_transport_setup_cancel(chand->transport_setup);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (chand->active_child) {
 | 
	
		
			
				|  |  | -    grpc_child_channel_destroy(chand->active_child, 1);
 | 
	
		
			
				|  |  | -    chand->active_child = NULL;
 | 
	
		
			
				|  |  | +  if (chand->resolver != NULL) {
 | 
	
		
			
				|  |  | +    grpc_resolver_shutdown(chand->resolver);
 | 
	
		
			
				|  |  | +    GRPC_RESOLVER_UNREF(chand->resolver, "channel");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  grpc_channel_args_destroy(chand->args);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&chand->mu);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(chand->waiting_child_count == 0);
 | 
	
		
			
				|  |  | -  gpr_free(chand->waiting_children);
 | 
	
		
			
				|  |  | +  if (chand->lb_policy != NULL) {
 | 
	
		
			
				|  |  | +    GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_mu_destroy(&chand->mu_config);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  const grpc_channel_filter grpc_client_channel_filter = {
 | 
	
		
			
				|  |  | -    cc_start_transport_op, channel_op,           sizeof(call_data),
 | 
	
		
			
				|  |  | -    init_call_elem,        destroy_call_elem,    sizeof(channel_data),
 | 
	
		
			
				|  |  | -    init_channel_elem,     destroy_channel_elem, "client-channel",
 | 
	
		
			
				|  |  | +    cc_start_transport_stream_op,
 | 
	
		
			
				|  |  | +    cc_start_transport_op,
 | 
	
		
			
				|  |  | +    sizeof(call_data),
 | 
	
		
			
				|  |  | +    init_call_elem,
 | 
	
		
			
				|  |  | +    destroy_call_elem,
 | 
	
		
			
				|  |  | +    sizeof(channel_data),
 | 
	
		
			
				|  |  | +    init_channel_elem,
 | 
	
		
			
				|  |  | +    destroy_channel_elem,
 | 
	
		
			
				|  |  | +    "client-channel",
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
 | 
	
		
			
				|  |  | -    grpc_channel_stack *channel_stack, grpc_transport *transport,
 | 
	
		
			
				|  |  | -    grpc_channel_filter const **channel_filters, size_t num_channel_filters,
 | 
	
		
			
				|  |  | -    grpc_mdctx *mdctx) {
 | 
	
		
			
				|  |  | -  /* we just got a new transport: lets create a child channel stack for it */
 | 
	
		
			
				|  |  | -  grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  size_t num_child_filters = 2 + num_channel_filters;
 | 
	
		
			
				|  |  | -  grpc_channel_filter const **child_filters;
 | 
	
		
			
				|  |  | -  grpc_transport_setup_result result;
 | 
	
		
			
				|  |  | -  grpc_child_channel *old_active = NULL;
 | 
	
		
			
				|  |  | -  call_data **waiting_children;
 | 
	
		
			
				|  |  | -  size_t waiting_child_count;
 | 
	
		
			
				|  |  | -  size_t i;
 | 
	
		
			
				|  |  | -  grpc_transport_op *call_ops;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* build the child filter stack */
 | 
	
		
			
				|  |  | -  child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
 | 
	
		
			
				|  |  | -  /* we always need a link back filter to get back to the connected channel */
 | 
	
		
			
				|  |  | -  child_filters[0] = &grpc_child_channel_top_filter;
 | 
	
		
			
				|  |  | -  for (i = 0; i < num_channel_filters; i++) {
 | 
	
		
			
				|  |  | -    child_filters[i + 1] = channel_filters[i];
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  /* and we always need a connected channel to talk to the transport */
 | 
	
		
			
				|  |  | -  child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* BEGIN LOCKING CHANNEL */
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&chand->mu);
 | 
	
		
			
				|  |  | -  chand->transport_setup_initiated = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (chand->active_child) {
 | 
	
		
			
				|  |  | -    old_active = chand->active_child;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  chand->active_child = grpc_child_channel_create(
 | 
	
		
			
				|  |  | -      elem, child_filters, num_child_filters, chand->args, mdctx);
 | 
	
		
			
				|  |  | -  result =
 | 
	
		
			
				|  |  | -      grpc_connected_channel_bind_transport(chand->active_child, transport);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* capture the waiting children - we'll activate them outside the lock
 | 
	
		
			
				|  |  | -     to avoid re-entrancy problems */
 | 
	
		
			
				|  |  | -  waiting_children = chand->waiting_children;
 | 
	
		
			
				|  |  | -  waiting_child_count = chand->waiting_child_count;
 | 
	
		
			
				|  |  | -  /* bumping up inflight_requests here avoids taking a lock per rpc below */
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  chand->waiting_children = NULL;
 | 
	
		
			
				|  |  | -  chand->waiting_child_count = 0;
 | 
	
		
			
				|  |  | -  chand->waiting_child_capacity = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  for (i = 0; i < waiting_child_count; i++) {
 | 
	
		
			
				|  |  | -    call_ops[i] = waiting_children[i]->s.waiting_op;
 | 
	
		
			
				|  |  | -    if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
 | 
	
		
			
				|  |  | -      waiting_children[i] = NULL;
 | 
	
		
			
				|  |  | -      grpc_transport_op_finish_with_failure(&call_ops[i]);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* END LOCKING CHANNEL */
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&chand->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* activate any pending operations - this is safe to do as we guarantee one
 | 
	
		
			
				|  |  | -     and only one write operation per request at the surface api - if we lose
 | 
	
		
			
				|  |  | -     that guarantee we need to do some curly locking here */
 | 
	
		
			
				|  |  | -  for (i = 0; i < waiting_child_count; i++) {
 | 
	
		
			
				|  |  | -    if (waiting_children[i]) {
 | 
	
		
			
				|  |  | -      complete_activate(waiting_children[i]->elem, &call_ops[i]);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_free(waiting_children);
 | 
	
		
			
				|  |  | -  gpr_free(call_ops);
 | 
	
		
			
				|  |  | -  gpr_free(child_filters);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (old_active) {
 | 
	
		
			
				|  |  | -    grpc_child_channel_destroy(old_active, 1);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  return result;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
 | 
	
		
			
				|  |  | -                                             grpc_transport_setup *setup) {
 | 
	
		
			
				|  |  | +void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
 | 
	
		
			
				|  |  | +                                      grpc_resolver *resolver) {
 | 
	
		
			
				|  |  |    /* post construction initialization: set the transport setup pointer */
 | 
	
		
			
				|  |  |    grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!chand->transport_setup);
 | 
	
		
			
				|  |  | -  chand->transport_setup = setup;
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!chand->resolver);
 | 
	
		
			
				|  |  | +  chand->resolver = resolver;
 | 
	
		
			
				|  |  | +  GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
 | 
	
		
			
				|  |  | +  GRPC_RESOLVER_REF(resolver, "channel");
 | 
	
		
			
				|  |  | +  grpc_resolver_next(resolver, &chand->incoming_configuration,
 | 
	
		
			
				|  |  | +                     &chand->on_config_changed);
 | 
	
		
			
				|  |  |  }
 |