|  | @@ -1,6 +1,6 @@
 | 
	
		
			
				|  |  |  /*
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  | - * Copyright 2015, Google Inc.
 | 
	
		
			
				|  |  | + * Copyright 2015-2016, Google Inc.
 | 
	
		
			
				|  |  |   * All rights reserved.
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  |   * Redistribution and use in source and binary forms, with or without
 | 
	
	
		
			
				|  | @@ -55,12 +55,11 @@ typedef struct {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_closure connectivity_changed;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /** the selected channel (a grpc_connected_subchannel) */
 | 
	
		
			
				|  |  | +  gpr_atm selected;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /** mutex protecting remaining members */
 | 
	
		
			
				|  |  |    gpr_mu mu;
 | 
	
		
			
				|  |  | -  /** the selected channel
 | 
	
		
			
				|  |  | -      TODO(ctiller): this should be atomically set so we don't
 | 
	
		
			
				|  |  | -                     need to take a mutex in the common case */
 | 
	
		
			
				|  |  | -  grpc_connected_subchannel *selected;
 | 
	
		
			
				|  |  |    /** have we started picking? */
 | 
	
		
			
				|  |  |    int started_picking;
 | 
	
		
			
				|  |  |    /** are we shut down? */
 | 
	
	
		
			
				|  | @@ -76,15 +75,19 @@ typedef struct {
 | 
	
		
			
				|  |  |    grpc_connectivity_state_tracker state_tracker;
 | 
	
		
			
				|  |  |  } pick_first_lb_policy;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#define GET_SELECTED(p) \
 | 
	
		
			
				|  |  | +  ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |    pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel *selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  |    GPR_ASSERT(p->pending_picks == NULL);
 | 
	
		
			
				|  |  |    for (i = 0; i < p->num_subchannels; i++) {
 | 
	
		
			
				|  |  |      GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (p->selected) {
 | 
	
		
			
				|  |  | -    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
 | 
	
		
			
				|  |  | +  if (selected != NULL) {
 | 
	
		
			
				|  |  | +    GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
 | 
	
		
			
				|  |  |    gpr_free(p->subchannels);
 | 
	
	
		
			
				|  | @@ -95,16 +98,18 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |  void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |    pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
 | 
	
		
			
				|  |  |    pending_pick *pp;
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel *selected;
 | 
	
		
			
				|  |  |    gpr_mu_lock(&p->mu);
 | 
	
		
			
				|  |  | +  selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  |    p->shutdown = 1;
 | 
	
		
			
				|  |  |    pp = p->pending_picks;
 | 
	
		
			
				|  |  |    p->pending_picks = NULL;
 | 
	
		
			
				|  |  |    grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
 | 
	
		
			
				|  |  |                                GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
 | 
	
		
			
				|  |  |    /* cancel subscription */
 | 
	
		
			
				|  |  | -  if (p->selected != NULL) {
 | 
	
		
			
				|  |  | +  if (selected != NULL) {
 | 
	
		
			
				|  |  |      grpc_connected_subchannel_notify_on_state_change(
 | 
	
		
			
				|  |  | -        exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
 | 
	
		
			
				|  |  | +        exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      grpc_subchannel_notify_on_state_change(
 | 
	
		
			
				|  |  |          exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
 | 
	
	
		
			
				|  | @@ -171,10 +176,20 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
 | 
	
		
			
				|  |  |              grpc_connected_subchannel **target, grpc_closure *on_complete) {
 | 
	
		
			
				|  |  |    pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
 | 
	
		
			
				|  |  |    pending_pick *pp;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* Check atomically for a selected channel */
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel *selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  | +  if (selected != NULL) {
 | 
	
		
			
				|  |  | +    *target = selected;
 | 
	
		
			
				|  |  | +    return 1;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* No subchannel selected yet, so acquire lock and then attempt again */
 | 
	
		
			
				|  |  |    gpr_mu_lock(&p->mu);
 | 
	
		
			
				|  |  | -  if (p->selected) {
 | 
	
		
			
				|  |  | +  selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  | +  if (selected) {
 | 
	
		
			
				|  |  |      gpr_mu_unlock(&p->mu);
 | 
	
		
			
				|  |  | -    *target = p->selected;
 | 
	
		
			
				|  |  | +    *target = selected;
 | 
	
		
			
				|  |  |      return 1;
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      if (!p->started_picking) {
 | 
	
	
		
			
				|  | @@ -219,14 +234,17 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |    pick_first_lb_policy *p = arg;
 | 
	
		
			
				|  |  |    grpc_subchannel *selected_subchannel;
 | 
	
		
			
				|  |  |    pending_pick *pp;
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel *selected;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_mu_lock(&p->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (p->shutdown) {
 | 
	
		
			
				|  |  |      gpr_mu_unlock(&p->mu);
 | 
	
		
			
				|  |  |      GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  | -  } else if (p->selected != NULL) {
 | 
	
		
			
				|  |  | +  } else if (selected != NULL) {
 | 
	
		
			
				|  |  |      if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
 | 
	
		
			
				|  |  |        /* if the selected channel goes bad, we're done */
 | 
	
		
			
				|  |  |        p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
 | 
	
	
		
			
				|  | @@ -235,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                                  p->checking_connectivity, "selected_changed");
 | 
	
		
			
				|  |  |      if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
 | 
	
		
			
				|  |  |        grpc_connected_subchannel_notify_on_state_change(
 | 
	
		
			
				|  |  | -          exec_ctx, p->selected, &p->base.interested_parties,
 | 
	
		
			
				|  |  | +          exec_ctx, selected, &p->base.interested_parties,
 | 
	
		
			
				|  |  |            &p->checking_connectivity, &p->connectivity_changed);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
 | 
	
	
		
			
				|  | @@ -247,10 +265,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |          grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
 | 
	
		
			
				|  |  |                                      GRPC_CHANNEL_READY, "connecting_ready");
 | 
	
		
			
				|  |  |          selected_subchannel = p->subchannels[p->checking_subchannel];
 | 
	
		
			
				|  |  | -        p->selected =
 | 
	
		
			
				|  |  | +        selected =
 | 
	
		
			
				|  |  |              grpc_subchannel_get_connected_subchannel(selected_subchannel);
 | 
	
		
			
				|  |  | -        GPR_ASSERT(p->selected);
 | 
	
		
			
				|  |  | -        GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
 | 
	
		
			
				|  |  | +        GPR_ASSERT(selected != NULL);
 | 
	
		
			
				|  |  | +        gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
 | 
	
		
			
				|  |  | +        GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
 | 
	
		
			
				|  |  |          /* drop the pick list: we are connected now */
 | 
	
		
			
				|  |  |          GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
 | 
	
		
			
				|  |  |          grpc_exec_ctx_enqueue(exec_ctx,
 | 
	
	
		
			
				|  | @@ -258,14 +277,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |          /* update any calls that were waiting for a pick */
 | 
	
		
			
				|  |  |          while ((pp = p->pending_picks)) {
 | 
	
		
			
				|  |  |            p->pending_picks = pp->next;
 | 
	
		
			
				|  |  | -          *pp->target = p->selected;
 | 
	
		
			
				|  |  | +          *pp->target = selected;
 | 
	
		
			
				|  |  |            grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
 | 
	
		
			
				|  |  |                                         pp->pollset);
 | 
	
		
			
				|  |  |            grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
 | 
	
		
			
				|  |  |            gpr_free(pp);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          grpc_connected_subchannel_notify_on_state_change(
 | 
	
		
			
				|  |  | -            exec_ctx, p->selected, &p->base.interested_parties,
 | 
	
		
			
				|  |  | +            exec_ctx, selected, &p->base.interested_parties,
 | 
	
		
			
				|  |  |              &p->checking_connectivity, &p->connectivity_changed);
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        case GRPC_CHANNEL_TRANSIENT_FAILURE:
 | 
	
	
		
			
				|  | @@ -351,13 +370,12 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  |  void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  |                   grpc_closure *closure) {
 | 
	
		
			
				|  |  |    pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&p->mu);
 | 
	
		
			
				|  |  | -  if (p->selected) {
 | 
	
		
			
				|  |  | -    grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel *selected = GET_SELECTED(p);
 | 
	
		
			
				|  |  | +  if (selected) {
 | 
	
		
			
				|  |  | +    grpc_connected_subchannel_ping(exec_ctx, selected, closure);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&p->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
 |