Jelajahi Sumber

Zookeeper resolver works

Hongwei Wang 10 tahun lalu
induk
melakukan
85fd2f7a32

+ 2 - 2
Makefile

@@ -318,8 +318,8 @@ CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
 CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)
 CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)
 
 
 LDFLAGS += $(ARCH_FLAGS)
 LDFLAGS += $(ARCH_FLAGS)
-LDLIBS += $(addprefix -l, $(LIBS))
-LDLIBSXX += $(addprefix -l, $(LIBSXX))
+LDLIBS += $(addprefix -l, $(LIBS)) -lzookeeper_mt
+LDLIBSXX += $(addprefix -l, $(LIBSXX)) -lzookeeper_mt
 
 
 HOST_CPPFLAGS = $(CPPFLAGS)
 HOST_CPPFLAGS = $(CPPFLAGS)
 HOST_CFLAGS = $(CFLAGS)
 HOST_CFLAGS = $(CFLAGS)

+ 47 - 22
src/core/client_config/resolvers/zookeeper_resolver.c

@@ -80,6 +80,9 @@ typedef struct {
   zhandle_t *zookeeper_handle;
   zhandle_t *zookeeper_handle;
   /** zookeeper connection state */
   /** zookeeper connection state */
   int zookeeper_state;
   int zookeeper_state;
+  grpc_resolved_addresses * resolved_addrs;
+  int resolved_total;
+  int resolved_num;
 
 
 } zookeeper_resolver;
 } zookeeper_resolver;
 
 
@@ -173,45 +176,65 @@ static void zookeeper_on_resolved(void *arg, grpc_resolved_addresses *addresses)
   GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
   GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
 }
 }
 
 
+static void zookeeper_dns_resolved(void *arg, grpc_resolved_addresses *addresses) {
+  size_t i;
+  zookeeper_resolver *r = arg;
+  r->resolved_num++;
+  r->resolved_addrs->addrs = gpr_realloc(r->resolved_addrs->addrs, 
+                             sizeof(grpc_resolved_address) * (r->resolved_addrs->naddrs + addresses->naddrs));
+
+  for (i = 0; i < addresses->naddrs; i++) {
+    memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, addresses->addrs[i].addr, addresses->addrs[i].len);
+    r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = addresses->addrs[i].len;
+  }
+
+  r->resolved_addrs->naddrs += addresses->naddrs;
+  grpc_resolved_addresses_destroy(addresses);
+
+  if (r->resolved_num == r->resolved_total)
+    zookeeper_on_resolved(r, r->resolved_addrs);
+}
+
 /** Resolve address by zookeeper */
 /** Resolve address by zookeeper */
 static void zookeeper_resolve_address(zookeeper_resolver *r) {
 static void zookeeper_resolve_address(zookeeper_resolver *r) {
-  struct String_vector children;
-  grpc_resolved_addresses *addrs;
-  int i, k;
+  struct String_vector addresses;
+  int i;
   int status;
   int status;
 
 
+  char path[GRPC_ZOOKEEPER_MAX_SIZE];
   char buffer[GRPC_ZOOKEEPER_MAX_SIZE];
   char buffer[GRPC_ZOOKEEPER_MAX_SIZE];
   int buffer_len;
   int buffer_len;
   
   
-  addrs = NULL;
-  status = zoo_get_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, &children);
+  r->resolved_addrs = NULL;
+  r->resolved_total = 0;
+  r->resolved_num = 0;
+  gpr_log(GPR_INFO, r->name);
+  status = zoo_get_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, &addresses);
   if (!status) {
   if (!status) {
-    addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
-    addrs->naddrs = 0;
-    addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * children.count);
-    
-    k = 0;
-    for (i = 0; i < children.count; i++) {
-      char path[GRPC_ZOOKEEPER_MAX_SIZE];
+    /** Assume no children are deleted */
+    r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+    r->resolved_addrs->addrs = NULL;
+    r->resolved_addrs->naddrs = 0;
+    r->resolved_total = addresses.count;
+    for (i = 0; i < addresses.count; i++) {
+      memset(path, 0, GRPC_ZOOKEEPER_MAX_SIZE);
       strcat(path, r->name);
       strcat(path, r->name);
       strcat(path, "/");
       strcat(path, "/");
-      strcat(path, children.data[i]);
+      strcat(path, addresses.data[i]);
       
       
+      gpr_log(GPR_INFO, path);
+      memset(buffer, 0, GRPC_ZOOKEEPER_MAX_SIZE);
       status = zoo_get(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, buffer, &buffer_len, NULL);
       status = zoo_get(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, buffer, &buffer_len, NULL);
       if (!status) {
       if (!status) {
-        addrs->naddrs++;
-        memcpy(&addrs->addrs[k].addr, buffer, buffer_len);
-        addrs->addrs[k].len = buffer_len;
-        k++;
+        gpr_log(GPR_INFO, buffer);
+        grpc_resolve_address(buffer, NULL, zookeeper_dns_resolved, r);
       } else {
       } else {
-        gpr_log(GPR_ERROR, "cannot resolve zookeeper address");
+        gpr_log(GPR_ERROR, "Cannot resolve zookeeper address");
       }
       }
     }
     }
   } else {
   } else {
-    gpr_log(GPR_ERROR, "cannot resolve zookeeper address");
+    gpr_log(GPR_ERROR, "Cannot resolve zookeeper address");
   }
   }
-
-  zookeeper_on_resolved(r, addrs);
 }
 }
 
 
 static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
 static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
@@ -275,9 +298,11 @@ static grpc_resolver *zookeeper_create(
   grpc_subchannel_factory_ref(subchannel_factory);
   grpc_subchannel_factory_ref(subchannel_factory);
 
 
   /** Initialize zookeeper client */
   /** Initialize zookeeper client */
+  zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
   r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher, GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
   r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher, GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
+
   if (r->zookeeper_handle  == NULL) {
   if (r->zookeeper_handle  == NULL) {
-    gpr_log(GPR_ERROR, "cannot connect to zookeeper servers");
+    gpr_log(GPR_ERROR, "Cannot connect to zookeeper servers");
     return NULL;
     return NULL;
   }
   }
 
 

+ 3 - 0
src/core/surface/init.c

@@ -50,6 +50,8 @@
 #include "src/core/client_config/resolvers/unix_resolver_posix.h"
 #include "src/core/client_config/resolvers/unix_resolver_posix.h"
 #endif
 #endif
 
 
+#include "src/core/client_config/resolvers/zookeeper_resolver.h"
+
 static gpr_once g_basic_init = GPR_ONCE_INIT;
 static gpr_once g_basic_init = GPR_ONCE_INIT;
 static gpr_mu g_init_mu;
 static gpr_mu g_init_mu;
 static int g_initializations;
 static int g_initializations;
@@ -69,6 +71,7 @@ void grpc_init(void) {
 #ifdef GPR_POSIX_SOCKET
 #ifdef GPR_POSIX_SOCKET
     grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create());
     grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create());
 #endif
 #endif
+    grpc_register_resolver_type("zookeeper", grpc_zookeeper_resolver_factory_create());
     grpc_register_tracer("channel", &grpc_trace_channel);
     grpc_register_tracer("channel", &grpc_trace_channel);
     grpc_register_tracer("surface", &grpc_surface_trace);
     grpc_register_tracer("surface", &grpc_surface_trace);
     grpc_register_tracer("http", &grpc_http_trace);
     grpc_register_tracer("http", &grpc_http_trace);

+ 1 - 0
test/core/client_config/uri_parser_test.c

@@ -57,6 +57,7 @@ int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
   grpc_test_init(argc, argv);
   test_succeeds("http://www.google.com", "http", "www.google.com", "");
   test_succeeds("http://www.google.com", "http", "www.google.com", "");
   test_succeeds("dns:///foo", "dns", "", "/foo");
   test_succeeds("dns:///foo", "dns", "", "/foo");
+  test_succeeds("zookeeper://127.0.0.1:2181/foo", "zookeeper", "127.0.0.1:2181", "/foo");
   test_succeeds("http://www.google.com:90", "http", "www.google.com:90", "");
   test_succeeds("http://www.google.com:90", "http", "www.google.com:90", "");
   test_succeeds("a192.4-df:foo.coom", "a192.4-df", "", "foo.coom");
   test_succeeds("a192.4-df:foo.coom", "a192.4-df", "", "foo.coom");
   test_succeeds("a+b:foo.coom", "a+b", "", "foo.coom");
   test_succeeds("a+b:foo.coom", "a+b", "", "foo.coom");