|  | @@ -107,6 +107,7 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/string_util.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_config/client_channel_factory.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/client_config/lb_policy_factory.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_config/lb_policy_registry.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_config/parse_address.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/lb_policy/grpclb/grpclb.h"
 | 
	
	
		
			
				|  | @@ -120,18 +121,6 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  int grpc_lb_glb_trace = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
 | 
	
		
			
				|  |  | -                             size_t num_addresses) {
 | 
	
		
			
				|  |  | -  /* free "resolved" addresses memblock */
 | 
	
		
			
				|  |  | -  gpr_free(lb_addresses->resolved_address);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < num_addresses; ++i) {
 | 
	
		
			
				|  |  | -    if (lb_addresses[i].user_data != NULL) {
 | 
	
		
			
				|  |  | -      GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_free(lb_addresses);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  /* add lb_token of selected subchannel (address) to the call's initial
 | 
	
		
			
				|  |  |   * metadata */
 | 
	
		
			
				|  |  |  static void initial_metadata_add_lb_token(
 | 
	
	
		
			
				|  | @@ -312,11 +301,8 @@ typedef struct glb_lb_policy {
 | 
	
		
			
				|  |  |     * response has arrived. */
 | 
	
		
			
				|  |  |    grpc_grpclb_serverlist *serverlist;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /** total number of valid addresses received in \a serverlist */
 | 
	
		
			
				|  |  | -  size_t num_ok_serverlist_addresses;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
 | 
	
		
			
				|  |  | -  grpc_lb_address *lb_addresses;
 | 
	
		
			
				|  |  | +  /** addresses from \a serverlist */
 | 
	
		
			
				|  |  | +  grpc_lb_addresses *addresses;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /** list of picks that are waiting on RR's policy connectivity */
 | 
	
		
			
				|  |  |    pending_pick *pending_picks;
 | 
	
	
		
			
				|  | @@ -369,26 +355,18 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* populate \a addresses according to \a serverlist. Returns the number of
 | 
	
		
			
				|  |  | - * addresses successfully parsed and added to \a addresses */
 | 
	
		
			
				|  |  | -static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
 | 
	
		
			
				|  |  | -                                 grpc_lb_address **lb_addresses) {
 | 
	
		
			
				|  |  | +/* Returns addresses extracted from \a serverlist. */
 | 
	
		
			
				|  |  | +static grpc_lb_addresses *process_serverlist(
 | 
	
		
			
				|  |  | +    const grpc_grpclb_serverlist *serverlist) {
 | 
	
		
			
				|  |  |    size_t num_valid = 0;
 | 
	
		
			
				|  |  |    /* first pass: count how many are valid in order to allocate the necessary
 | 
	
		
			
				|  |  |     * memory in a single block */
 | 
	
		
			
				|  |  |    for (size_t i = 0; i < serverlist->num_servers; ++i) {
 | 
	
		
			
				|  |  |      if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (num_valid == 0) {
 | 
	
		
			
				|  |  | -    return 0;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  if (num_valid == 0) return NULL;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* allocate the memory block for the "resolved" addresses. */
 | 
	
		
			
				|  |  | -  grpc_resolved_address *r_addrs_memblock =
 | 
	
		
			
				|  |  | -      gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
 | 
	
		
			
				|  |  | -  memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
 | 
	
		
			
				|  |  | -  grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
 | 
	
		
			
				|  |  | -  memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
 | 
	
		
			
				|  |  | +  grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* second pass: actually populate the addresses and LB tokens (aka user data
 | 
	
		
			
				|  |  |     * to the outside world) to be read by the RR policy during its creation.
 | 
	
	
		
			
				|  | @@ -400,56 +378,58 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
 | 
	
		
			
				|  |  |      GPR_ASSERT(addr_idx < num_valid);
 | 
	
		
			
				|  |  |      const grpc_grpclb_server *server = serverlist->servers[sl_idx];
 | 
	
		
			
				|  |  |      if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
 | 
	
		
			
				|  |  | -    grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /* address processing */
 | 
	
		
			
				|  |  |      const uint16_t netorder_port = htons((uint16_t)server->port);
 | 
	
		
			
				|  |  |      /* the addresses are given in binary format (a in(6)_addr struct) in
 | 
	
		
			
				|  |  |       * server->ip_address.bytes. */
 | 
	
		
			
				|  |  |      const grpc_grpclb_ip_address *ip = &server->ip_address;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
 | 
	
		
			
				|  |  | -    struct sockaddr_storage *sa =
 | 
	
		
			
				|  |  | -        (struct sockaddr_storage *)lb_addr->resolved_address->addr;
 | 
	
		
			
				|  |  | -    size_t *sa_len = &lb_addr->resolved_address->len;
 | 
	
		
			
				|  |  | -    *sa_len = 0;
 | 
	
		
			
				|  |  | +    grpc_resolved_address addr;
 | 
	
		
			
				|  |  | +    memset(&addr, 0, sizeof(addr));
 | 
	
		
			
				|  |  |      if (ip->size == 4) {
 | 
	
		
			
				|  |  | -      struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
 | 
	
		
			
				|  |  | -      *sa_len = sizeof(struct sockaddr_in);
 | 
	
		
			
				|  |  | -      memset(addr4, 0, *sa_len);
 | 
	
		
			
				|  |  | +      addr.len = sizeof(struct sockaddr_in);
 | 
	
		
			
				|  |  | +      struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr;
 | 
	
		
			
				|  |  |        addr4->sin_family = AF_INET;
 | 
	
		
			
				|  |  |        memcpy(&addr4->sin_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  |        addr4->sin_port = netorder_port;
 | 
	
		
			
				|  |  |      } else if (ip->size == 16) {
 | 
	
		
			
				|  |  | -      struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
 | 
	
		
			
				|  |  | -      *sa_len = sizeof(struct sockaddr_in6);
 | 
	
		
			
				|  |  | -      memset(addr6, 0, *sa_len);
 | 
	
		
			
				|  |  | +      addr.len = sizeof(struct sockaddr_in6);
 | 
	
		
			
				|  |  | +      struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr;
 | 
	
		
			
				|  |  |        addr6->sin6_family = AF_INET;
 | 
	
		
			
				|  |  |        memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
 | 
	
		
			
				|  |  |        addr6->sin6_port = netorder_port;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    GPR_ASSERT(*sa_len > 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /* lb token processing */
 | 
	
		
			
				|  |  | +    void *user_data;
 | 
	
		
			
				|  |  |      if (server->has_load_balance_token) {
 | 
	
		
			
				|  |  |        const size_t lb_token_size =
 | 
	
		
			
				|  |  |            GPR_ARRAY_SIZE(server->load_balance_token) - 1;
 | 
	
		
			
				|  |  |        grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
 | 
	
		
			
				|  |  |            (uint8_t *)server->load_balance_token, lb_token_size);
 | 
	
		
			
				|  |  | -      lb_addr->user_data = grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  | +      user_data = grpc_mdelem_from_metadata_strings(
 | 
	
		
			
				|  |  |            GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  |                "Missing LB token for backend address '%s'. The empty token will "
 | 
	
		
			
				|  |  |                "be used instead",
 | 
	
		
			
				|  |  | -              grpc_sockaddr_to_uri((struct sockaddr *)sa));
 | 
	
		
			
				|  |  | -      lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
 | 
	
		
			
				|  |  | +              grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr));
 | 
	
		
			
				|  |  | +      user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
 | 
	
		
			
				|  |  | +                                  false /* is_balancer */,
 | 
	
		
			
				|  |  | +                                  NULL /* balancer_name */, user_data);
 | 
	
		
			
				|  |  |      ++addr_idx;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    GPR_ASSERT(addr_idx == num_valid);
 | 
	
		
			
				|  |  | -  *lb_addresses = lb_addrs;
 | 
	
		
			
				|  |  | -  return num_valid;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  return lb_addresses;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */
 | 
	
		
			
				|  |  | +static void lb_token_destroy(void *token) {
 | 
	
		
			
				|  |  | +  if (token != NULL) GRPC_MDELEM_UNREF(token);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
 | 
	
	
		
			
				|  | @@ -459,21 +439,17 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_lb_policy_args args;
 | 
	
		
			
				|  |  |    memset(&args, 0, sizeof(args));
 | 
	
		
			
				|  |  | -  args.client_channel_factory = glb_policy->cc_factory;
 | 
	
		
			
				|  |  |    args.server_name = glb_policy->server_name;
 | 
	
		
			
				|  |  | -  const size_t num_ok_addresses =
 | 
	
		
			
				|  |  | -      process_serverlist(serverlist, &args.addresses);
 | 
	
		
			
				|  |  | -  args.num_addresses = num_ok_addresses;
 | 
	
		
			
				|  |  | +  args.client_channel_factory = glb_policy->cc_factory;
 | 
	
		
			
				|  |  | +  args.addresses = process_serverlist(serverlist);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (glb_policy->lb_addresses != NULL) {
 | 
	
		
			
				|  |  | +  if (glb_policy->addresses != NULL) {
 | 
	
		
			
				|  |  |      /* dispose of the previous version */
 | 
	
		
			
				|  |  | -    lb_addrs_destroy(glb_policy->lb_addresses,
 | 
	
		
			
				|  |  | -                     glb_policy->num_ok_serverlist_addresses);
 | 
	
		
			
				|  |  | +    grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
 | 
	
		
			
				|  |  | -  glb_policy->lb_addresses = args.addresses;
 | 
	
		
			
				|  |  | +  glb_policy->addresses = args.addresses;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    return rr;
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -567,6 +543,19 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |  static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                    grpc_lb_policy_factory *factory,
 | 
	
		
			
				|  |  |                                    grpc_lb_policy_args *args) {
 | 
	
		
			
				|  |  | +  /* Count the number of gRPC-LB addresses. There must be at least one.
 | 
	
		
			
				|  |  | +   * TODO(roth): For now, we ignore non-balancer addresses, but in the
 | 
	
		
			
				|  |  | +   * future, we may change the behavior such that we fall back to using
 | 
	
		
			
				|  |  | +   * the non-balancer addresses if we cannot reach any balancers. At that
 | 
	
		
			
				|  |  | +   * time, this should be changed to allow a list with no balancer addresses,
 | 
	
		
			
				|  |  | +   * since the resolver might fail to return a balancer address even when
 | 
	
		
			
				|  |  | +   * this is the right LB policy to use. */
 | 
	
		
			
				|  |  | +  size_t num_grpclb_addrs = 0;
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
 | 
	
		
			
				|  |  | +    if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (num_grpclb_addrs == 0) return NULL;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
 | 
	
		
			
				|  |  |    memset(glb_policy, 0, sizeof(*glb_policy));
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -579,36 +568,34 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    glb_policy->server_name = args->server_name;
 | 
	
		
			
				|  |  |    glb_policy->cc_factory = args->client_channel_factory;
 | 
	
		
			
				|  |  |    GPR_ASSERT(glb_policy->cc_factory != NULL);
 | 
	
		
			
				|  |  | -  if (args->num_addresses == 0) {
 | 
	
		
			
				|  |  | -    return NULL;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (args->addresses[0].user_data != NULL) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -            "This LB policy doesn't support user data. It will be ignored");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* construct a target from the addresses in args, given in the form
 | 
	
		
			
				|  |  |     * ipvX://ip1:port1,ip2:port2,...
 | 
	
		
			
				|  |  |     * TODO(dgq): support mixed ip version */
 | 
	
		
			
				|  |  | -  char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
 | 
	
		
			
				|  |  | -  addr_strs[0] = grpc_sockaddr_to_uri(
 | 
	
		
			
				|  |  | -      (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
 | 
	
		
			
				|  |  | -  for (size_t i = 1; i < args->num_addresses; i++) {
 | 
	
		
			
				|  |  | -    if (args->addresses[i].user_data != NULL) {
 | 
	
		
			
				|  |  | +  char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
 | 
	
		
			
				|  |  | +  size_t addr_index = 0;
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < args->addresses->num_addresses; i++) {
 | 
	
		
			
				|  |  | +    if (args->addresses->addresses[i].user_data != NULL) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  |                "This LB policy doesn't support user data. It will be ignored");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    GPR_ASSERT(
 | 
	
		
			
				|  |  | -        grpc_sockaddr_to_string(
 | 
	
		
			
				|  |  | -            &addr_strs[i],
 | 
	
		
			
				|  |  | -            (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
 | 
	
		
			
				|  |  | -            true) == 0);
 | 
	
		
			
				|  |  | +    if (args->addresses->addresses[i].is_balancer) {
 | 
	
		
			
				|  |  | +      if (addr_index == 0) {
 | 
	
		
			
				|  |  | +        addr_strs[addr_index++] = grpc_sockaddr_to_uri(
 | 
	
		
			
				|  |  | +            (const struct sockaddr *)&args->addresses->addresses[i]
 | 
	
		
			
				|  |  | +                .address.addr);
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        GPR_ASSERT(grpc_sockaddr_to_string(
 | 
	
		
			
				|  |  | +                       &addr_strs[addr_index++],
 | 
	
		
			
				|  |  | +                       (const struct sockaddr *)&args->addresses->addresses[i]
 | 
	
		
			
				|  |  | +                           .address.addr,
 | 
	
		
			
				|  |  | +                       true) == 0);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    size_t uri_path_len;
 | 
	
		
			
				|  |  | -  char *target_uri_str = gpr_strjoin_sep(
 | 
	
		
			
				|  |  | -      (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
 | 
	
		
			
				|  |  | +  char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
 | 
	
		
			
				|  |  | +                                         num_grpclb_addrs, ",", &uri_path_len);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* will pick using pick_first */
 | 
	
		
			
				|  |  |    glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
 | 
	
	
		
			
				|  | @@ -616,7 +603,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_free(target_uri_str);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < args->num_addresses; i++) {
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < num_grpclb_addrs; i++) {
 | 
	
		
			
				|  |  |      gpr_free(addr_strs[i]);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(addr_strs);
 | 
	
	
		
			
				|  | @@ -652,9 +639,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
 | 
	
		
			
				|  |  |      grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&glb_policy->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  lb_addrs_destroy(glb_policy->lb_addresses,
 | 
	
		
			
				|  |  | -                   glb_policy->num_ok_serverlist_addresses);
 | 
	
		
			
				|  |  | +  grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
 | 
	
		
			
				|  |  |    gpr_free(glb_policy);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 |