浏览代码

Merge github.com:grpc/grpc into c++api

Conflicts:
	src/core/surface/server.c
Craig Tiller 10 年之前
父节点
当前提交
7bd5ab1055

+ 7 - 3
include/grpc/grpc.h

@@ -590,15 +590,19 @@ void grpc_server_start(grpc_server *server);
 
 /* Begin shutting down a server.
    After completion, no new calls or connections will be admitted.
-   Existing calls will be allowed to complete. */
+   Existing calls will be allowed to complete.
+   Shutdown is idempotent. */
 void grpc_server_shutdown(grpc_server *server);
 
 /* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when
-   there are no more calls being serviced. */
+   there are no more calls being serviced.
+   Shutdown is idempotent, and all tags will be notified at once if multiple
+   grpc_server_shutdown_and_notify calls are made. */
 void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
 
 /* Destroy a server.
-   Forcefully cancels all existing calls. */
+   Forcefully cancels all existing calls.
+   Implies grpc_server_shutdown() if one was not previously performed. */
 void grpc_server_destroy(grpc_server *server);
 
 #ifdef __cplusplus

+ 13 - 7
src/core/surface/channel.c

@@ -36,6 +36,7 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include "src/core/iomgr/iomgr.h"
 #include "src/core/surface/call.h"
 #include "src/core/surface/client.h"
 #include <grpc/support/alloc.h>
@@ -138,15 +139,20 @@ void grpc_channel_internal_ref(grpc_channel *channel) {
   gpr_ref(&channel->refs);
 }
 
+static void destroy_channel(void *p, int ok) {
+  grpc_channel *channel = p;
+  grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+  grpc_mdstr_unref(channel->grpc_status_string);
+  grpc_mdstr_unref(channel->grpc_message_string);
+  grpc_mdstr_unref(channel->path_string);
+  grpc_mdstr_unref(channel->authority_string);
+  grpc_mdctx_orphan(channel->metadata_context);
+  gpr_free(channel);
+}
+
 void grpc_channel_internal_unref(grpc_channel *channel) {
   if (gpr_unref(&channel->refs)) {
-    grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
-    grpc_mdstr_unref(channel->grpc_status_string);
-    grpc_mdstr_unref(channel->grpc_message_string);
-    grpc_mdstr_unref(channel->path_string);
-    grpc_mdstr_unref(channel->authority_string);
-    grpc_mdctx_orphan(channel->metadata_context);
-    gpr_free(channel);
+    grpc_iomgr_add_callback(destroy_channel, channel);
   }
 }
 

+ 34 - 19
src/core/surface/server.c

@@ -141,8 +141,8 @@ struct grpc_server {
   requested_call_array requested_calls;
 
   gpr_uint8 shutdown;
-  gpr_uint8 have_shutdown_tag;
-  void *shutdown_tag;
+  size_t num_shutdown_tags;
+  void **shutdown_tags;
 
   call_data *lists[CALL_LIST_COUNT];
   channel_data root_channel_data;
@@ -273,6 +273,7 @@ static void server_unref(grpc_server *server) {
     }
     gpr_free(server->cqs);
     gpr_free(server->pollsets);
+    gpr_free(server->shutdown_tags);
     gpr_free(server);
   }
 }
@@ -514,17 +515,18 @@ static void init_call_elem(grpc_call_element *elem,
 static void destroy_call_elem(grpc_call_element *elem) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
-  size_t i;
+  size_t i, j;
 
   gpr_mu_lock(&chand->server->mu);
   for (i = 0; i < CALL_LIST_COUNT; i++) {
     call_list_remove(elem->call_data, i);
   }
-  if (chand->server->shutdown && chand->server->have_shutdown_tag &&
-      chand->server->lists[ALL_CALLS] == NULL) {
-    for (i = 0; i < chand->server->cq_count; i++) {
-      grpc_cq_end_server_shutdown(chand->server->cqs[i],
-                                  chand->server->shutdown_tag);
+  if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
+    for (i = 0; i < chand->server->num_shutdown_tags; i++) {
+      for (j = 0; j < chand->server->cq_count; j++) {
+        grpc_cq_end_server_shutdown(chand->server->cqs[j],
+                                    chand->server->shutdown_tags[i]);
+      }
     }
   }
   gpr_mu_unlock(&chand->server->mu);
@@ -586,8 +588,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
 }
 
 static const grpc_channel_filter server_surface_filter = {
-    call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
-    sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+    call_op,           channel_op,           sizeof(call_data),
+    init_call_elem,    destroy_call_elem,    sizeof(channel_data),
+    init_channel_elem, destroy_channel_elem, "server",
 };
 
 static void addcq(grpc_server *server, grpc_completion_queue *cq) {
@@ -762,7 +765,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
 
   result = grpc_connected_channel_bind_transport(
       grpc_channel_get_channel_stack(channel), transport);
-  
+
   gpr_mu_lock(&s->mu);
   chand->next = &s->root_channel_data;
   chand->prev = chand->next->prev;
@@ -781,13 +784,22 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
   channel_data **channels;
   channel_data *c;
   size_t nchannels;
-  size_t i;
+  size_t i, j;
   grpc_channel_op op;
   grpc_channel_element *elem;
   registered_method *rm;
 
   /* lock, and gather up some stuff to do */
   gpr_mu_lock(&server->mu);
+  if (have_shutdown_tag) {
+    for (i = 0; i < server->cq_count; i++) {
+      grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+    }
+    server->shutdown_tags =
+        gpr_realloc(server->shutdown_tags,
+                    sizeof(void *) * (server->num_shutdown_tags + 1));
+    server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
+  }
   if (server->shutdown) {
     gpr_mu_unlock(&server->mu);
     return;
@@ -828,13 +840,10 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
   }
 
   server->shutdown = 1;
-  server->have_shutdown_tag = have_shutdown_tag;
-  server->shutdown_tag = shutdown_tag;
-  if (have_shutdown_tag) {
-    for (i = 0; i < server->cq_count; i++) {
-      grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
-      if (server->lists[ALL_CALLS] == NULL) {
-        grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
+  if (server->lists[ALL_CALLS] == NULL) {
+    for (i = 0; i < server->num_shutdown_tags; i++) {
+      for (j = 0; j < server->cq_count; j++) {
+        grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
       }
     }
   }
@@ -883,6 +892,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
 void grpc_server_destroy(grpc_server *server) {
   channel_data *c;
   gpr_mu_lock(&server->mu);
+  if (!server->shutdown) {
+    gpr_mu_unlock(&server->mu);
+    grpc_server_shutdown(server);
+    gpr_mu_lock(&server->mu);
+  }
+
   for (c = server->root_channel_data.next; c != &server->root_channel_data;
        c = c->next) {
     shutdown_channel(c);

+ 1 - 1
test/core/end2end/tests/early_server_shutdown_finishes_tags.c

@@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) {
 
 static void shutdown_server(grpc_end2end_test_fixture *f) {
   if (!f->server) return;
-  grpc_server_shutdown(f->server);
+  /* don't shutdown, just destroy, to tickle this code edge */
   grpc_server_destroy(f->server);
   f->server = NULL;
 }