瀏覽代碼

Merge branch 'master' of github.com:grpc/grpc into grpclb_reworked_conn_state

David Garcia Quintas 9 年之前
父節點
當前提交
c2e6254984
共有 2 個文件被更改,包括 45 次插入28 次删除
  1. 18 10
      src/core/ext/lb_policy/grpclb/grpclb.c
  2. 27 18
      test/cpp/grpclb/grpclb_test.cc

+ 18 - 10
src/core/ext/lb_policy/grpclb/grpclb.c

@@ -186,14 +186,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
      * addresses failed to connect). There won't be any user_data/token
      * available */
     if (wc_arg->target != NULL) {
-      GPR_ASSERT(wc_arg->lb_token != NULL);
-      initial_metadata_add_lb_token(wc_arg->initial_metadata,
-                                    wc_arg->lb_token_mdelem_storage,
-                                    GRPC_MDELEM_REF(wc_arg->lb_token));
+      if (wc_arg->lb_token != NULL) {
+        initial_metadata_add_lb_token(wc_arg->initial_metadata,
+                                      wc_arg->lb_token_mdelem_storage,
+                                      GRPC_MDELEM_REF(wc_arg->lb_token));
+      } else {
+        gpr_log(GPR_ERROR,
+                "No LB token for connected subchannel pick %p (from RR "
+                "instance %p).",
+                (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
+        abort();
+      }
     }
     if (grpc_lb_glb_trace) {
-      gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
-              (intptr_t)wc_arg->rr_policy);
+      gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
     }
     GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
   }
@@ -411,7 +417,7 @@ static void parse_server(const grpc_grpclb_server *server,
 }
 
 /* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses *process_serverlist(
+static grpc_lb_addresses *process_serverlist_locked(
     const grpc_grpclb_serverlist *serverlist) {
   size_t num_valid = 0;
   /* first pass: count how many are valid in order to allocate the necessary
@@ -451,10 +457,12 @@ static grpc_lb_addresses *process_serverlist(
       user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
                                                     lb_token_mdstr);
     } else {
-      gpr_log(GPR_ERROR,
+      char *uri = grpc_sockaddr_to_uri(&addr);
+      gpr_log(GPR_INFO,
               "Missing LB token for backend address '%s'. The empty token will "
               "be used instead",
-              grpc_sockaddr_to_uri(&addr));
+              uri);
+      gpr_free(uri);
       user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
     }
 
@@ -567,7 +575,7 @@ static grpc_lb_policy *create_rr_locked(
   grpc_lb_policy_args args;
   memset(&args, 0, sizeof(args));
   args.client_channel_factory = glb_policy->cc_factory;
-  grpc_lb_addresses *addresses = process_serverlist(serverlist);
+  grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
 
   // Replace the LB addresses in the channel args that we pass down to
   // the subchannel.

+ 27 - 18
test/cpp/grpclb/grpclb_test.cc

@@ -111,6 +111,7 @@ typedef struct server_fixture {
   grpc_completion_queue *cq;
   char *servers_hostport;
   int port;
+  const char *lb_token_prefix;
   gpr_thd_id tid;
   int num_calls_serviced;
 } server_fixture;
@@ -126,7 +127,8 @@ static void *tag(intptr_t t) { return (void *)t; }
 
 static grpc_slice build_response_payload_slice(
     const char *host, int *ports, size_t nports,
-    int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
+    int64_t expiration_interval_secs, int32_t expiration_interval_nanos,
+    const char *token_prefix) {
   // server_list {
   //   servers {
   //     ip_address: <in_addr/6 bytes of an IP>
@@ -153,15 +155,15 @@ static grpc_slice build_response_payload_slice(
     struct in_addr ip4;
     GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1);
     server->set_ip_address(
-        grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4)));
+        string(reinterpret_cast<const char *>(&ip4), sizeof(ip4)));
     server->set_port(ports[i]);
-    // The following long long int cast is meant to work around the
-    // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
-    // have a version for int but does have one for long long int.
-    string token_data = "token" + std::to_string((long long int)ports[i]);
-    server->set_load_balance_token(token_data);
+    // Missing tokens are acceptable. Test that path.
+    if (strlen(token_prefix) > 0) {
+      string token_data = token_prefix + std::to_string(ports[i]);
+      server->set_load_balance_token(token_data);
+    }
   }
-  const grpc::string &enc_resp = response.SerializeAsString();
+  const string &enc_resp = response.SerializeAsString();
   return grpc_slice_from_copied_buffer(enc_resp.data(), enc_resp.size());
 }
 
@@ -253,14 +255,14 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
   for (int i = 0; i < 2; i++) {
     if (i == 0) {
       // First half of the ports.
-      response_payload_slice =
-          build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
+      response_payload_slice = build_response_payload_slice(
+          "127.0.0.1", ports, nports / 2, -1, -1, sf->lb_token_prefix);
     } else {
       // Second half of the ports.
       sleep_ms(update_delay_ms);
-      response_payload_slice =
-          build_response_payload_slice("127.0.0.1", ports + (nports / 2),
-                                       (nports + 1) / 2 /* ceil */, -1, -1);
+      response_payload_slice = build_response_payload_slice(
+          "127.0.0.1", ports + (nports / 2), (nports + 1) / 2 /* ceil */, -1,
+          -1, "" /* this half doesn't get to receive an LB token */);
     }
 
     response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
@@ -342,11 +344,10 @@ static void start_backend_server(server_fixture *sf) {
       return;
     }
     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
-
-    // The following long long int cast is meant to work around the
-    // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
-    // have a version for int but does have one for long long int.
-    string expected_token = "token" + std::to_string((long long int)sf->port);
+    const string expected_token =
+        strlen(sf->lb_token_prefix) == 0
+            ? ""
+            : sf->lb_token_prefix + std::to_string(sf->port);
     GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token",
                                  expected_token.c_str()));
 
@@ -631,6 +632,7 @@ static void fork_lb_server(void *arg) {
                   tf->lb_server_update_delay_ms);
 }
 
+#define LB_TOKEN_PREFIX "token"
 static test_fixture setup_test_fixture(int lb_server_update_delay_ms) {
   test_fixture tf;
   memset(&tf, 0, sizeof(tf));
@@ -640,11 +642,18 @@ static test_fixture setup_test_fixture(int lb_server_update_delay_ms) {
   gpr_thd_options_set_joinable(&options);
 
   for (int i = 0; i < NUM_BACKENDS; ++i) {
+    // Only the first half of the servers expect an LB token.
+    if (i < NUM_BACKENDS / 2) {
+      tf.lb_backends[i].lb_token_prefix = LB_TOKEN_PREFIX;
+    } else {
+      tf.lb_backends[i].lb_token_prefix = "";
+    }
     setup_server("127.0.0.1", &tf.lb_backends[i]);
     gpr_thd_new(&tf.lb_backends[i].tid, fork_backend_server, &tf.lb_backends[i],
                 &options);
   }
 
+  tf.lb_server.lb_token_prefix = LB_TOKEN_PREFIX;
   setup_server("127.0.0.1", &tf.lb_server);
   gpr_thd_new(&tf.lb_server.tid, fork_lb_server, &tf.lb_server, &options);