|  | @@ -76,9 +76,9 @@
 | 
	
		
			
				|  |  |   *    operations in progress over the old RR instance. This is done by
 | 
	
		
			
				|  |  |   *    decreasing the reference count on the old policy. The moment no more
 | 
	
		
			
				|  |  |   *    references are held on the old RR policy, it'll be destroyed and \a
 | 
	
		
			
				|  |  | - *    rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
 | 
	
		
			
				|  |  | - *    At this point we can transition to a new RR instance safely, which is done
 | 
	
		
			
				|  |  | - *    once again via \a rr_handover().
 | 
	
		
			
				|  |  | + *    glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
 | 
	
		
			
				|  |  | + *    state. At this point we can transition to a new RR instance safely, which
 | 
	
		
			
				|  |  | + *    is done once again via \a rr_handover().
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  |   *
 | 
	
		
			
				|  |  |   * Once a RR policy instance is in place (and getting updated as described),
 | 
	
	
		
			
				|  | @@ -96,6 +96,8 @@
 | 
	
		
			
				|  |  |   * - Implement LB service forwarding (point 2c. in the doc's diagram).
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include <errno.h>
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  #include <string.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include <grpc/byte_buffer_reader.h>
 | 
	
	
		
			
				|  | @@ -109,18 +111,57 @@
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_config/parse_address.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/lb_policy/grpclb/grpclb.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/sockaddr.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/sockaddr_utils.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/support/string.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/surface/call.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/surface/channel.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/transport/static_metadata.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  int grpc_lb_glb_trace = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
 | 
	
		
			
				|  |  | +                             size_t num_addresses) {
 | 
	
		
			
				|  |  | +  /* free "resolved" addresses memblock */
 | 
	
		
			
				|  |  | +  gpr_free(lb_addresses->resolved_address);
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < num_addresses; ++i) {
 | 
	
		
			
				|  |  | +    if (lb_addresses[i].user_data != NULL) {
 | 
	
		
			
				|  |  | +      GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_free(lb_addresses);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* add lb_token of selected subchannel (address) to the call's initial
 | 
	
		
			
				|  |  | + * metadata */
 | 
	
		
			
				|  |  | +static void initial_metadata_add_lb_token(
 | 
	
		
			
				|  |  | +    grpc_metadata_batch *initial_metadata,
 | 
	
		
			
				|  |  | +    grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(lb_token_mdelem_storage != NULL);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(lb_token != NULL);
 | 
	
		
			
				|  |  | +  grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +                               lb_token);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  typedef struct wrapped_rr_closure_arg {
 | 
	
		
			
				|  |  |    /* the original closure. Usually a on_complete/notify cb for pick() and ping()
 | 
	
		
			
				|  |  |     * calls against the internal RR instance, respectively. */
 | 
	
		
			
				|  |  |    grpc_closure *wrapped_closure;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /* the pick's initial metadata, kept in order to append the LB token for the
 | 
	
		
			
				|  |  | +   * pick */
 | 
	
		
			
				|  |  | +  grpc_metadata_batch *initial_metadata;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* the picked target, used to determine which LB token to add to the pick's
 | 
	
		
			
				|  |  | +   * initial metadata */
 | 
	
		
			
				|  |  | +  grpc_connected_subchannel **target;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* the LB token associated with the pick */
 | 
	
		
			
				|  |  | +  grpc_mdelem *lb_token;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* storage for the lb token initial metadata mdelem */
 | 
	
		
			
				|  |  | +  grpc_linked_mdelem *lb_token_mdelem_storage;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* The RR instance related to the closure */
 | 
	
		
			
				|  |  |    grpc_lb_policy *rr_policy;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -141,9 +182,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                (intptr_t)wc_arg->rr_policy);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /* if target is NULL, no pick has been made by the RR policy (eg, all
 | 
	
		
			
				|  |  | +     * addresses failed to connect). There won't be any user_data/token
 | 
	
		
			
				|  |  | +     * available */
 | 
	
		
			
				|  |  | +    if (wc_arg->target != NULL) {
 | 
	
		
			
				|  |  | +      initial_metadata_add_lb_token(wc_arg->initial_metadata,
 | 
	
		
			
				|  |  | +                                    wc_arg->lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +                                    GRPC_MDELEM_REF(wc_arg->lb_token));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    GPR_ASSERT(wc_arg->wrapped_closure != NULL);
 | 
	
		
			
				|  |  | -  grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
 | 
	
		
			
				|  |  | +                      NULL);
 | 
	
		
			
				|  |  |    gpr_free(wc_arg->owning_pending_node);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -164,6 +216,9 @@ typedef struct pending_pick {
 | 
	
		
			
				|  |  |    /* the initial metadata for the pick. See grpc_lb_policy_pick() */
 | 
	
		
			
				|  |  |    grpc_metadata_batch *initial_metadata;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /* storage for the lb token initial metadata mdelem */
 | 
	
		
			
				|  |  | +  grpc_linked_mdelem *lb_token_mdelem_storage;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* bitmask passed to pick() and used for selective cancelling. See
 | 
	
		
			
				|  |  |     * grpc_lb_policy_cancel_picks() */
 | 
	
		
			
				|  |  |    uint32_t initial_metadata_flags;
 | 
	
	
		
			
				|  | @@ -180,20 +235,24 @@ typedef struct pending_pick {
 | 
	
		
			
				|  |  |    wrapped_rr_closure_arg wrapped_on_complete_arg;
 | 
	
		
			
				|  |  |  } pending_pick;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
 | 
	
		
			
				|  |  | -                             grpc_metadata_batch *initial_metadata,
 | 
	
		
			
				|  |  | -                             uint32_t initial_metadata_flags,
 | 
	
		
			
				|  |  | +static void add_pending_pick(pending_pick **root,
 | 
	
		
			
				|  |  | +                             const grpc_lb_policy_pick_args *pick_args,
 | 
	
		
			
				|  |  |                               grpc_connected_subchannel **target,
 | 
	
		
			
				|  |  |                               grpc_closure *on_complete) {
 | 
	
		
			
				|  |  |    pending_pick *pp = gpr_malloc(sizeof(*pp));
 | 
	
		
			
				|  |  |    memset(pp, 0, sizeof(pending_pick));
 | 
	
		
			
				|  |  |    memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
 | 
	
		
			
				|  |  |    pp->next = *root;
 | 
	
		
			
				|  |  | -  pp->pollent = pollent;
 | 
	
		
			
				|  |  | +  pp->pollent = pick_args->pollent;
 | 
	
		
			
				|  |  |    pp->target = target;
 | 
	
		
			
				|  |  | -  pp->initial_metadata = initial_metadata;
 | 
	
		
			
				|  |  | -  pp->initial_metadata_flags = initial_metadata_flags;
 | 
	
		
			
				|  |  | +  pp->initial_metadata = pick_args->initial_metadata;
 | 
	
		
			
				|  |  | +  pp->initial_metadata_flags = pick_args->initial_metadata_flags;
 | 
	
		
			
				|  |  | +  pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
 | 
	
		
			
				|  |  |    pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
 | 
	
		
			
				|  |  | +  pp->wrapped_on_complete_arg.target = target;
 | 
	
		
			
				|  |  | +  pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
 | 
	
		
			
				|  |  | +  pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
 | 
	
		
			
				|  |  | +      pick_args->lb_token_mdelem_storage;
 | 
	
		
			
				|  |  |    grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
 | 
	
		
			
				|  |  |                      &pp->wrapped_on_complete_arg);
 | 
	
		
			
				|  |  |    *root = pp;
 | 
	
	
		
			
				|  | @@ -253,6 +312,12 @@ typedef struct glb_lb_policy {
 | 
	
		
			
				|  |  |     * response has arrived. */
 | 
	
		
			
				|  |  |    grpc_grpclb_serverlist *serverlist;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  /** total number of valid addresses received in \a serverlist */
 | 
	
		
			
				|  |  | +  size_t num_ok_serverlist_addresses;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
 | 
	
		
			
				|  |  | +  grpc_lb_address *lb_addresses;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /** list of picks that are waiting on RR's policy connectivity */
 | 
	
		
			
				|  |  |    pending_pick *pending_picks;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -280,59 +345,143 @@ struct rr_connectivity_data {
 | 
	
		
			
				|  |  |    glb_lb_policy *glb_policy;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                 const grpc_grpclb_serverlist *serverlist,
 | 
	
		
			
				|  |  | -                                 glb_lb_policy *glb_policy) {
 | 
	
		
			
				|  |  | -  /* TODO(dgq): support mixed ip version */
 | 
	
		
			
				|  |  | -  GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
 | 
	
		
			
				|  |  | -  char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < serverlist->num_servers; ++i) {
 | 
	
		
			
				|  |  | -    gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
 | 
	
		
			
				|  |  | -                       serverlist->servers[i]->port);
 | 
	
		
			
				|  |  | +static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
 | 
	
		
			
				|  |  | +                            bool log) {
 | 
	
		
			
				|  |  | +  const grpc_grpclb_ip_address *ip = &server->ip_address;
 | 
	
		
			
				|  |  | +  if (server->port >> 16 != 0) {
 | 
	
		
			
				|  |  | +    if (log) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "Invalid port '%d' at index %zu of serverlist. Ignoring.",
 | 
	
		
			
				|  |  | +              server->port, idx);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  size_t uri_path_len;
 | 
	
		
			
				|  |  | -  char *concat_ipports = gpr_strjoin_sep(
 | 
	
		
			
				|  |  | -      (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
 | 
	
		
			
				|  |  | +  if (ip->size != 4 && ip->size != 16) {
 | 
	
		
			
				|  |  | +    if (log) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
 | 
	
		
			
				|  |  | +              "serverlist. Ignoring",
 | 
	
		
			
				|  |  | +              ip->size, idx);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return false;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_lb_policy_args args;
 | 
	
		
			
				|  |  | -  args.server_name = glb_policy->server_name;
 | 
	
		
			
				|  |  | -  args.client_channel_factory = glb_policy->cc_factory;
 | 
	
		
			
				|  |  | -  args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
 | 
	
		
			
				|  |  | -  args.addresses->naddrs = serverlist->num_servers;
 | 
	
		
			
				|  |  | -  args.addresses->addrs =
 | 
	
		
			
				|  |  | -      gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
 | 
	
		
			
				|  |  | -  size_t out_addrs_idx = 0;
 | 
	
		
			
				|  |  | +/* populate \a addresses according to \a serverlist. Returns the number of
 | 
	
		
			
				|  |  | + * addresses successfully parsed and added to \a addresses */
 | 
	
		
			
				|  |  | +static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
 | 
	
		
			
				|  |  | +                                 grpc_lb_address **lb_addresses) {
 | 
	
		
			
				|  |  | +  size_t num_valid = 0;
 | 
	
		
			
				|  |  | +  /* first pass: count how many are valid in order to allocate the necessary
 | 
	
		
			
				|  |  | +   * memory in a single block */
 | 
	
		
			
				|  |  |    for (size_t i = 0; i < serverlist->num_servers; ++i) {
 | 
	
		
			
				|  |  | -    grpc_uri uri;
 | 
	
		
			
				|  |  | -    struct sockaddr_storage sa;
 | 
	
		
			
				|  |  | -    size_t sa_len;
 | 
	
		
			
				|  |  | -    uri.path = host_ports[i];
 | 
	
		
			
				|  |  | -    if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
 | 
	
		
			
				|  |  | -      memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
 | 
	
		
			
				|  |  | -      args.addresses->addrs[out_addrs_idx].len = sa_len;
 | 
	
		
			
				|  |  | -      ++out_addrs_idx;
 | 
	
		
			
				|  |  | +    if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (num_valid == 0) {
 | 
	
		
			
				|  |  | +    return 0;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* allocate the memory block for the "resolved" addresses. */
 | 
	
		
			
				|  |  | +  grpc_resolved_address *r_addrs_memblock =
 | 
	
		
			
				|  |  | +      gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
 | 
	
		
			
				|  |  | +  memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
 | 
	
		
			
				|  |  | +  grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
 | 
	
		
			
				|  |  | +  memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* second pass: actually populate the addresses and LB tokens (aka user data
 | 
	
		
			
				|  |  | +   * to the outside world) to be read by the RR policy during its creation.
 | 
	
		
			
				|  |  | +   * Given that the validity tests are very cheap, they are performed again
 | 
	
		
			
				|  |  | +   * instead of marking the valid ones during the first pass, as this would
 | 
	
		
			
				|  |  | +   * incurr in an allocation due to the arbitrary number of server */
 | 
	
		
			
				|  |  | +  size_t addr_idx = 0;
 | 
	
		
			
				|  |  | +  for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(addr_idx < num_valid);
 | 
	
		
			
				|  |  | +    const grpc_grpclb_server *server = serverlist->servers[sl_idx];
 | 
	
		
			
				|  |  | +    if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
 | 
	
		
			
				|  |  | +    grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /* address processing */
 | 
	
		
			
				|  |  | +    const uint16_t netorder_port = htons((uint16_t)server->port);
 | 
	
		
			
				|  |  | +    /* the addresses are given in binary format (a in(6)_addr struct) in
 | 
	
		
			
				|  |  | +     * server->ip_address.bytes. */
 | 
	
		
			
				|  |  | +    const grpc_grpclb_ip_address *ip = &server->ip_address;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
 | 
	
		
			
				|  |  | +    struct sockaddr_storage *sa =
 | 
	
		
			
				|  |  | +        (struct sockaddr_storage *)lb_addr->resolved_address->addr;
 | 
	
		
			
				|  |  | +    size_t *sa_len = &lb_addr->resolved_address->len;
 | 
	
		
			
				|  |  | +    *sa_len = 0;
 | 
	
		
			
				|  |  | +    if (ip->size == 4) {
 | 
	
		
			
				|  |  | +      struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
 | 
	
		
			
				|  |  | +      *sa_len = sizeof(struct sockaddr_in);
 | 
	
		
			
				|  |  | +      memset(addr4, 0, *sa_len);
 | 
	
		
			
				|  |  | +      addr4->sin_family = AF_INET;
 | 
	
		
			
				|  |  | +      memcpy(&addr4->sin_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | +      addr4->sin_port = netorder_port;
 | 
	
		
			
				|  |  | +    } else if (ip->size == 16) {
 | 
	
		
			
				|  |  | +      struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
 | 
	
		
			
				|  |  | +      *sa_len = sizeof(struct sockaddr_in6);
 | 
	
		
			
				|  |  | +      memset(addr6, 0, *sa_len);
 | 
	
		
			
				|  |  | +      addr6->sin6_family = AF_INET;
 | 
	
		
			
				|  |  | +      memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  | +      addr6->sin6_port = netorder_port;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    GPR_ASSERT(*sa_len > 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /* lb token processing */
 | 
	
		
			
				|  |  | +    if (server->has_load_balance_token) {
 | 
	
		
			
				|  |  | +      const size_t lb_token_size =
 | 
	
		
			
				|  |  | +          GPR_ARRAY_SIZE(server->load_balance_token) - 1;
 | 
	
		
			
				|  |  | +      grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
 | 
	
		
			
				|  |  | +          (uint8_t *)server->load_balance_token, lb_token_size);
 | 
	
		
			
				|  |  | +      lb_addr->user_data = grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +          GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
 | 
	
		
			
				|  |  | -              host_ports[i]);
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "Missing LB token for backend address '%s'. The empty token will "
 | 
	
		
			
				|  |  | +              "be used instead",
 | 
	
		
			
				|  |  | +              grpc_sockaddr_to_uri((struct sockaddr *)sa));
 | 
	
		
			
				|  |  | +      lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +    ++addr_idx;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  GPR_ASSERT(addr_idx == num_valid);
 | 
	
		
			
				|  |  | +  *lb_addresses = lb_addrs;
 | 
	
		
			
				|  |  | +  return num_valid;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | +                                 const grpc_grpclb_serverlist *serverlist,
 | 
	
		
			
				|  |  | +                                 glb_lb_policy *glb_policy) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_lb_policy_args args;
 | 
	
		
			
				|  |  | +  memset(&args, 0, sizeof(args));
 | 
	
		
			
				|  |  | +  args.client_channel_factory = glb_policy->cc_factory;
 | 
	
		
			
				|  |  | +  args.server_name = glb_policy->server_name;
 | 
	
		
			
				|  |  | +  const size_t num_ok_addresses =
 | 
	
		
			
				|  |  | +      process_serverlist(serverlist, &args.addresses);
 | 
	
		
			
				|  |  | +  args.num_addresses = num_ok_addresses;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_free(concat_ipports);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < serverlist->num_servers; i++) {
 | 
	
		
			
				|  |  | -    gpr_free(host_ports[i]);
 | 
	
		
			
				|  |  | +  if (glb_policy->lb_addresses != NULL) {
 | 
	
		
			
				|  |  | +    /* dispose of the previous version */
 | 
	
		
			
				|  |  | +    lb_addrs_destroy(glb_policy->lb_addresses,
 | 
	
		
			
				|  |  | +                     glb_policy->num_ok_serverlist_addresses);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_free(host_ports);
 | 
	
		
			
				|  |  | -  gpr_free(args.addresses->addrs);
 | 
	
		
			
				|  |  | -  gpr_free(args.addresses);
 | 
	
		
			
				|  |  | +  glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
 | 
	
		
			
				|  |  | +  glb_policy->lb_addresses = args.addresses;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    return rr;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
 | 
	
		
			
				|  |  |                          grpc_error *error) {
 | 
	
		
			
				|  |  | -  GRPC_ERROR_REF(error);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(glb_policy->serverlist != NULL &&
 | 
	
		
			
				|  |  | +             glb_policy->serverlist->num_servers > 0);
 | 
	
		
			
				|  |  |    glb_policy->rr_policy =
 | 
	
		
			
				|  |  |        create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -347,8 +496,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
 | 
	
		
			
				|  |  |        exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
 | 
	
		
			
				|  |  |        &glb_policy->rr_connectivity->on_change);
 | 
	
		
			
				|  |  |    grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
 | 
	
		
			
				|  |  | -                              glb_policy->rr_connectivity->state, error,
 | 
	
		
			
				|  |  | -                              "rr_handover");
 | 
	
		
			
				|  |  | +                              glb_policy->rr_connectivity->state,
 | 
	
		
			
				|  |  | +                              GRPC_ERROR_REF(error), "rr_handover");
 | 
	
		
			
				|  |  |    grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* flush pending ops */
 | 
	
	
		
			
				|  | @@ -361,9 +510,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
 | 
	
		
			
				|  |  |                (intptr_t)glb_policy->rr_policy);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
 | 
	
		
			
				|  |  | -                        pp->initial_metadata, pp->initial_metadata_flags,
 | 
	
		
			
				|  |  | -                        pp->target, &pp->wrapped_on_complete);
 | 
	
		
			
				|  |  | +    const grpc_lb_policy_pick_args pick_args = {
 | 
	
		
			
				|  |  | +        pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
 | 
	
		
			
				|  |  | +        pp->lb_token_mdelem_storage};
 | 
	
		
			
				|  |  | +    grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
 | 
	
		
			
				|  |  | +                        (void **)&pp->wrapped_on_complete_arg.lb_token,
 | 
	
		
			
				|  |  | +                        &pp->wrapped_on_complete);
 | 
	
		
			
				|  |  |      pp->wrapped_on_complete_arg.owning_pending_node = pp;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -380,13 +532,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
 | 
	
		
			
				|  |  |                              &pping->wrapped_notify);
 | 
	
		
			
				|  |  |      pping->wrapped_notify_arg.owning_pending_node = pping;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | -                                    grpc_error *error) {
 | 
	
		
			
				|  |  | +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  | +                                        grpc_error *error) {
 | 
	
		
			
				|  |  |    rr_connectivity_data *rr_conn_data = arg;
 | 
	
		
			
				|  |  |    glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
 | 
	
		
			
				|  |  |      if (glb_policy->serverlist != NULL) {
 | 
	
		
			
				|  |  |        /* a RR policy is shutting down but there's a serverlist available ->
 | 
	
	
		
			
				|  | @@ -400,8 +552,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |      if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        /* RR not shutting down. Mimic the RR's policy state */
 | 
	
		
			
				|  |  |        grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
 | 
	
		
			
				|  |  | -                                  rr_conn_data->state, error,
 | 
	
		
			
				|  |  | -                                  "rr_connectivity_changed");
 | 
	
		
			
				|  |  | +                                  rr_conn_data->state, GRPC_ERROR_REF(error),
 | 
	
		
			
				|  |  | +                                  "glb_rr_connectivity_changed");
 | 
	
		
			
				|  |  |        /* resubscribe */
 | 
	
		
			
				|  |  |        grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
 | 
	
		
			
				|  |  |                                              &rr_conn_data->state,
 | 
	
	
		
			
				|  | @@ -410,7 +562,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |        gpr_free(rr_conn_data);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -420,32 +571,44 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    memset(glb_policy, 0, sizeof(*glb_policy));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* All input addresses in args->addresses come from a resolver that claims
 | 
	
		
			
				|  |  | -   * they are LB services. It's the resolver's responsibility to make sure this
 | 
	
		
			
				|  |  | +   * they are LB services. It's the resolver's responsibility to make sure
 | 
	
		
			
				|  |  | +   * this
 | 
	
		
			
				|  |  |     * policy is only instantiated and used in that case.
 | 
	
		
			
				|  |  |     *
 | 
	
		
			
				|  |  |     * Create a client channel over them to communicate with a LB service */
 | 
	
		
			
				|  |  |    glb_policy->server_name = args->server_name;
 | 
	
		
			
				|  |  |    glb_policy->cc_factory = args->client_channel_factory;
 | 
	
		
			
				|  |  |    GPR_ASSERT(glb_policy->cc_factory != NULL);
 | 
	
		
			
				|  |  | -  if (args->addresses->naddrs == 0) {
 | 
	
		
			
				|  |  | +  if (args->num_addresses == 0) {
 | 
	
		
			
				|  |  |      return NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* construct a target from the args->addresses, in the form
 | 
	
		
			
				|  |  | +  if (args->addresses[0].user_data != NULL) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +            "This LB policy doesn't support user data. It will be ignored");
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* construct a target from the addresses in args, given in the form
 | 
	
		
			
				|  |  |     * ipvX://ip1:port1,ip2:port2,...
 | 
	
		
			
				|  |  |     * TODO(dgq): support mixed ip version */
 | 
	
		
			
				|  |  | -  char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
 | 
	
		
			
				|  |  | -  addr_strs[0] =
 | 
	
		
			
				|  |  | -      grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
 | 
	
		
			
				|  |  | -  for (size_t i = 1; i < args->addresses->naddrs; i++) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(grpc_sockaddr_to_string(
 | 
	
		
			
				|  |  | -                   &addr_strs[i],
 | 
	
		
			
				|  |  | -                   (const struct sockaddr *)&args->addresses->addrs[i],
 | 
	
		
			
				|  |  | -                   true) == 0);
 | 
	
		
			
				|  |  | +  char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
 | 
	
		
			
				|  |  | +  addr_strs[0] = grpc_sockaddr_to_uri(
 | 
	
		
			
				|  |  | +      (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
 | 
	
		
			
				|  |  | +  for (size_t i = 1; i < args->num_addresses; i++) {
 | 
	
		
			
				|  |  | +    if (args->addresses[i].user_data != NULL) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "This LB policy doesn't support user data. It will be ignored");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    GPR_ASSERT(
 | 
	
		
			
				|  |  | +        grpc_sockaddr_to_string(
 | 
	
		
			
				|  |  | +            &addr_strs[i],
 | 
	
		
			
				|  |  | +            (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
 | 
	
		
			
				|  |  | +            true) == 0);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    size_t uri_path_len;
 | 
	
		
			
				|  |  |    char *target_uri_str = gpr_strjoin_sep(
 | 
	
		
			
				|  |  | -      (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
 | 
	
		
			
				|  |  | +      (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* will pick using pick_first */
 | 
	
		
			
				|  |  |    glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
 | 
	
	
		
			
				|  | @@ -453,7 +616,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_free(target_uri_str);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < args->addresses->naddrs; i++) {
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < args->num_addresses; i++) {
 | 
	
		
			
				|  |  |      gpr_free(addr_strs[i]);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(addr_strs);
 | 
	
	
		
			
				|  | @@ -466,7 +629,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    rr_connectivity_data *rr_connectivity =
 | 
	
		
			
				|  |  |        gpr_malloc(sizeof(rr_connectivity_data));
 | 
	
		
			
				|  |  |    memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
 | 
	
		
			
				|  |  | -  grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
 | 
	
		
			
				|  |  | +  grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
 | 
	
		
			
				|  |  |                      rr_connectivity);
 | 
	
		
			
				|  |  |    rr_connectivity->glb_policy = glb_policy;
 | 
	
		
			
				|  |  |    glb_policy->rr_connectivity = rr_connectivity;
 | 
	
	
		
			
				|  | @@ -489,6 +652,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |      grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&glb_policy->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  lb_addrs_destroy(glb_policy->lb_addresses,
 | 
	
		
			
				|  |  | +                   glb_policy->num_ok_serverlist_addresses);
 | 
	
		
			
				|  |  |    gpr_free(glb_policy);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -549,7 +715,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  |        *target = NULL;
 | 
	
		
			
				|  |  |        grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
 | 
	
		
			
				|  |  |                            GRPC_ERROR_CANCELLED, NULL);
 | 
	
		
			
				|  |  | -      gpr_free(pp);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        pp->next = glb_policy->pending_picks;
 | 
	
		
			
				|  |  |        glb_policy->pending_picks = pp;
 | 
	
	
		
			
				|  | @@ -579,7 +744,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  |            exec_ctx, pp->pollent, glb_policy->base.interested_parties);
 | 
	
		
			
				|  |  |        grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
 | 
	
		
			
				|  |  |                            GRPC_ERROR_CANCELLED, NULL);
 | 
	
		
			
				|  |  | -      gpr_free(pp);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        pp->next = glb_policy->pending_picks;
 | 
	
		
			
				|  |  |        glb_policy->pending_picks = pp;
 | 
	
	
		
			
				|  | @@ -606,12 +770,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  | -                    grpc_polling_entity *pollent,
 | 
	
		
			
				|  |  | -                    grpc_metadata_batch *initial_metadata,
 | 
	
		
			
				|  |  | -                    uint32_t initial_metadata_flags,
 | 
	
		
			
				|  |  | -                    grpc_connected_subchannel **target,
 | 
	
		
			
				|  |  | +                    const grpc_lb_policy_pick_args *pick_args,
 | 
	
		
			
				|  |  | +                    grpc_connected_subchannel **target, void **user_data,
 | 
	
		
			
				|  |  |                      grpc_closure *on_complete) {
 | 
	
		
			
				|  |  |    glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (pick_args->lb_token_mdelem_storage == NULL) {
 | 
	
		
			
				|  |  | +    *target = NULL;
 | 
	
		
			
				|  |  | +    grpc_exec_ctx_sched(
 | 
	
		
			
				|  |  | +        exec_ctx, on_complete,
 | 
	
		
			
				|  |  | +        GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
 | 
	
		
			
				|  |  | +                          "won't work without it. Failing"),
 | 
	
		
			
				|  |  | +        NULL);
 | 
	
		
			
				|  |  | +    return 1;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    gpr_mu_lock(&glb_policy->mu);
 | 
	
		
			
				|  |  |    int r;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -623,29 +796,36 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 | 
	
		
			
				|  |  |      GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
 | 
	
		
			
				|  |  |      memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
 | 
	
		
			
				|  |  |      glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
 | 
	
		
			
				|  |  | +    glb_policy->wc_arg.target = target;
 | 
	
		
			
				|  |  |      glb_policy->wc_arg.wrapped_closure = on_complete;
 | 
	
		
			
				|  |  | +    glb_policy->wc_arg.lb_token_mdelem_storage =
 | 
	
		
			
				|  |  | +        pick_args->lb_token_mdelem_storage;
 | 
	
		
			
				|  |  | +    glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
 | 
	
		
			
				|  |  | +    glb_policy->wc_arg.owning_pending_node = NULL;
 | 
	
		
			
				|  |  |      grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
 | 
	
		
			
				|  |  |                        &glb_policy->wc_arg);
 | 
	
		
			
				|  |  | -    r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
 | 
	
		
			
				|  |  | -                            initial_metadata, initial_metadata_flags, target,
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
 | 
	
		
			
				|  |  | +                            (void **)&glb_policy->wc_arg.lb_token,
 | 
	
		
			
				|  |  |                              &glb_policy->wrapped_on_complete);
 | 
	
		
			
				|  |  |      if (r != 0) {
 | 
	
		
			
				|  |  | -      /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
 | 
	
		
			
				|  |  | -       * policy and notify the original callback */
 | 
	
		
			
				|  |  | -      glb_policy->wc_arg.wrapped_closure = NULL;
 | 
	
		
			
				|  |  | +      /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
 | 
	
		
			
				|  |  |        if (grpc_lb_glb_trace) {
 | 
	
		
			
				|  |  |          gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
 | 
	
		
			
				|  |  |                  (intptr_t)glb_policy->wc_arg.rr_policy);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
 | 
	
		
			
				|  |  | -      grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
 | 
	
		
			
				|  |  | -                          GRPC_ERROR_NONE, NULL);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      /* add the load reporting initial metadata */
 | 
	
		
			
				|  |  | +      initial_metadata_add_lb_token(
 | 
	
		
			
				|  |  | +          pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
 | 
	
		
			
				|  |  | +          GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
 | 
	
		
			
				|  |  | +    grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
 | 
	
		
			
				|  |  |                                             glb_policy->base.interested_parties);
 | 
	
		
			
				|  |  | -    add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
 | 
	
		
			
				|  |  | -                     initial_metadata_flags, target, on_complete);
 | 
	
		
			
				|  |  | +    add_pending_pick(&glb_policy->pending_picks, pick_args, target,
 | 
	
		
			
				|  |  | +                     on_complete);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (!glb_policy->started_picking) {
 | 
	
		
			
				|  |  |        start_picking(exec_ctx, glb_policy);
 | 
	
	
		
			
				|  | @@ -705,9 +885,6 @@ typedef struct lb_client_data {
 | 
	
		
			
				|  |  |    /* called once initial metadata's been sent */
 | 
	
		
			
				|  |  |    grpc_closure md_sent;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* called once initial metadata's been received */
 | 
	
		
			
				|  |  | -  grpc_closure md_rcvd;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    /* called once the LoadBalanceRequest has been sent to the LB server. See
 | 
	
		
			
				|  |  |     * src/proto/grpc/.../load_balancer.proto */
 | 
	
		
			
				|  |  |    grpc_closure req_sent;
 | 
	
	
		
			
				|  | @@ -744,7 +921,6 @@ typedef struct lb_client_data {
 | 
	
		
			
				|  |  |  } lb_client_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 | 
	
		
			
				|  |  | -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 | 
	
		
			
				|  |  |  static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 | 
	
		
			
				|  |  |  static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 | 
	
		
			
				|  |  |  static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
	
		
			
				|  | @@ -759,7 +935,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
 | 
	
		
			
				|  |  |    gpr_mu_init(&lb_client->mu);
 | 
	
		
			
				|  |  |    grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
 | 
	
		
			
				|  |  |    grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
 | 
	
		
			
				|  |  |    grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
 | 
	
		
			
				|  |  |    grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
 | 
	
	
		
			
				|  | @@ -858,23 +1033,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |    grpc_op ops[1];
 | 
	
		
			
				|  |  |    memset(ops, 0, sizeof(ops));
 | 
	
		
			
				|  |  |    grpc_op *op = ops;
 | 
	
		
			
				|  |  | -  op->op = GRPC_OP_RECV_INITIAL_METADATA;
 | 
	
		
			
				|  |  | -  op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
 | 
	
		
			
				|  |  | -  op->flags = 0;
 | 
	
		
			
				|  |  | -  op->reserved = NULL;
 | 
	
		
			
				|  |  | -  op++;
 | 
	
		
			
				|  |  | -  grpc_call_error call_error = grpc_call_start_batch_and_execute(
 | 
	
		
			
				|  |  | -      exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
 | 
	
		
			
				|  |  | -      &lb_client->md_rcvd);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(GRPC_CALL_OK == call_error);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  | -  lb_client_data *lb_client = arg;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(lb_client->lb_call);
 | 
	
		
			
				|  |  | -  grpc_op ops[1];
 | 
	
		
			
				|  |  | -  memset(ops, 0, sizeof(ops));
 | 
	
		
			
				|  |  | -  grpc_op *op = ops;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    op->op = GRPC_OP_SEND_MESSAGE;
 | 
	
		
			
				|  |  |    op->data.send_message = lb_client->request_payload;
 | 
	
	
		
			
				|  | @@ -889,11 +1047,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |    lb_client_data *lb_client = arg;
 | 
	
		
			
				|  |  | +  GPR_ASSERT(lb_client->lb_call);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_op ops[1];
 | 
	
		
			
				|  |  | +  grpc_op ops[2];
 | 
	
		
			
				|  |  |    memset(ops, 0, sizeof(ops));
 | 
	
		
			
				|  |  |    grpc_op *op = ops;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  op->op = GRPC_OP_RECV_INITIAL_METADATA;
 | 
	
		
			
				|  |  | +  op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
 | 
	
		
			
				|  |  | +  op->flags = 0;
 | 
	
		
			
				|  |  | +  op->reserved = NULL;
 | 
	
		
			
				|  |  | +  op++;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    op->op = GRPC_OP_RECV_MESSAGE;
 | 
	
		
			
				|  |  |    op->data.recv_message = &lb_client->response_payload;
 | 
	
		
			
				|  |  |    op->flags = 0;
 | 
	
	
		
			
				|  | @@ -912,8 +1077,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |    grpc_op *op = ops;
 | 
	
		
			
				|  |  |    if (lb_client->response_payload != NULL) {
 | 
	
		
			
				|  |  |      /* Received data from the LB server. Look inside
 | 
	
		
			
				|  |  | -     * lb_client->response_payload, for
 | 
	
		
			
				|  |  | -     * a serverlist. */
 | 
	
		
			
				|  |  | +     * lb_client->response_payload, for a serverlist. */
 | 
	
		
			
				|  |  |      grpc_byte_buffer_reader bbr;
 | 
	
		
			
				|  |  |      grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
 | 
	
		
			
				|  |  |      gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
 | 
	
	
		
			
				|  | @@ -950,7 +1114,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |          } else {
 | 
	
		
			
				|  |  |            /* unref the RR policy, eventually leading to its substitution with a
 | 
	
		
			
				|  |  |             * new one constructed from the received serverlist (see
 | 
	
		
			
				|  |  | -           * rr_connectivity_changed) */
 | 
	
		
			
				|  |  | +           * glb_rr_connectivity_changed) */
 | 
	
		
			
				|  |  |            GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
 | 
	
		
			
				|  |  |                                 "serverlist_received");
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -1013,8 +1177,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |              lb_client->status, lb_client->status_details,
 | 
	
		
			
				|  |  |              lb_client->status_details_capacity);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  /* TODO(dgq): deal with stream termination properly (fire up another one? fail
 | 
	
		
			
				|  |  | -   * the original call?) */
 | 
	
		
			
				|  |  | +  /* TODO(dgq): deal with stream termination properly (fire up another one?
 | 
	
		
			
				|  |  | +   * fail the original call?) */
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Code wiring the policy with the rest of the core */
 |