Эх сурвалжийг харах

Merge branch 'epexinf' of github.com:ctiller/grpc into epexinf

Craig Tiller 8 жил өмнө
parent
commit
85ab449d72

+ 39 - 24
src/core/lib/iomgr/ev_epollex_linux.cc

@@ -201,7 +201,7 @@ struct grpc_pollset_set {
 
 
   size_t pollset_count;
   size_t pollset_count;
   size_t pollset_capacity;
   size_t pollset_capacity;
-  pollable **pollsets;
+  grpc_pollset **pollsets;
 
 
   size_t fd_count;
   size_t fd_count;
   size_t fd_capacity;
   size_t fd_capacity;
@@ -545,6 +545,13 @@ static void pollset_global_shutdown(void) {
 
 
 static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
 static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
                                           grpc_pollset *pollset) {
                                           grpc_pollset *pollset) {
+  if (GRPC_TRACER_ON(grpc_polling_trace)) {
+    gpr_log(GPR_DEBUG,
+            "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
+            "rw=%p (target:NULL) cpsc=%d (target:0)",
+            pollset, pollset->active_pollable, pollset->shutdown_closure,
+            pollset->root_worker, pollset->containing_pollset_set_count);
+  }
   if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
   if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
       pollset->containing_pollset_set_count == 0) {
       pollset->containing_pollset_set_count == 0) {
     GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
     GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
@@ -1123,7 +1130,11 @@ static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) {
   pollset_set_unref(exec_ctx, pss->parent);
   pollset_set_unref(exec_ctx, pss->parent);
   gpr_mu_destroy(&pss->mu);
   gpr_mu_destroy(&pss->mu);
   for (size_t i = 0; i < pss->pollset_count; i++) {
   for (size_t i = 0; i < pss->pollset_count; i++) {
-    POLLABLE_UNREF(pss->pollsets[i], "pollset_set");
+    gpr_mu_lock(&pss->pollsets[i]->mu);
+    if (0 == --pss->pollsets[i]->containing_pollset_set_count) {
+      pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]);
+    }
+    gpr_mu_unlock(&pss->pollsets[i]->mu);
   }
   }
   for (size_t i = 0; i < pss->fd_count; i++) {
   for (size_t i = 0; i < pss->fd_count; i++) {
     UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
     UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
@@ -1142,7 +1153,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
   static const char *err_desc = "pollset_set_add_fd";
   static const char *err_desc = "pollset_set_add_fd";
   pss = pss_lock_adam(pss);
   pss = pss_lock_adam(pss);
   for (size_t i = 0; i < pss->pollset_count; i++) {
   for (size_t i = 0; i < pss->pollset_count; i++) {
-    append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc);
+    append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd),
+                 err_desc);
   }
   }
   if (pss->fd_count == pss->fd_capacity) {
   if (pss->fd_count == pss->fd_capacity) {
     pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
     pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
@@ -1185,8 +1197,7 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
   pss = pss_lock_adam(pss);
   pss = pss_lock_adam(pss);
   size_t i;
   size_t i;
   for (i = 0; i < pss->pollset_count; i++) {
   for (i = 0; i < pss->pollset_count; i++) {
-    if (pss->pollsets[i] == ps->active_pollable) {
-      POLLABLE_UNREF(pss->pollsets[i], "pollset_set");
+    if (pss->pollsets[i] == ps) {
       break;
       break;
     }
     }
   }
   }
@@ -1204,11 +1215,12 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
 }
 }
 
 
 // add all fds to pollables, and output a new array of unorphaned out_fds
 // add all fds to pollables, and output a new array of unorphaned out_fds
-static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
-                                        size_t fd_count, pollable **pollables,
-                                        size_t pollable_count,
-                                        const char *err_desc, grpc_fd **out_fds,
-                                        size_t *out_fd_count) {
+// assumes pollsets are multipollable
+static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
+                                       size_t fd_count, grpc_pollset **pollsets,
+                                       size_t pollset_count,
+                                       const char *err_desc, grpc_fd **out_fds,
+                                       size_t *out_fd_count) {
   grpc_error *error = GRPC_ERROR_NONE;
   grpc_error *error = GRPC_ERROR_NONE;
   for (size_t i = 0; i < fd_count; i++) {
   for (size_t i = 0; i < fd_count; i++) {
     gpr_mu_lock(&fds[i]->orphan_mu);
     gpr_mu_lock(&fds[i]->orphan_mu);
@@ -1216,8 +1228,10 @@ static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
       gpr_mu_unlock(&fds[i]->orphan_mu);
       gpr_mu_unlock(&fds[i]->orphan_mu);
       UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
       UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
     } else {
     } else {
-      for (size_t j = 0; j < pollable_count; j++) {
-        append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc);
+      for (size_t j = 0; j < pollset_count; j++) {
+        append_error(&error,
+                     pollable_add_fd(pollsets[j]->active_pollable, fds[i]),
+                     err_desc);
       }
       }
       gpr_mu_unlock(&fds[i]->orphan_mu);
       gpr_mu_unlock(&fds[i]->orphan_mu);
       out_fds[(*out_fd_count)++] = fds[i];
       out_fds[(*out_fd_count)++] = fds[i];
@@ -1246,17 +1260,18 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
   pss = pss_lock_adam(pss);
   pss = pss_lock_adam(pss);
   size_t initial_fd_count = pss->fd_count;
   size_t initial_fd_count = pss->fd_count;
   pss->fd_count = 0;
   pss->fd_count = 0;
-  append_error(&error, add_fds_to_pollables(exec_ctx, pss->fds,
-                                            initial_fd_count, &pollable_obj, 1,
-                                            err_desc, pss->fds, &pss->fd_count),
+  append_error(&error,
+               add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1,
+                                   err_desc, pss->fds, &pss->fd_count),
                err_desc);
                err_desc);
   if (pss->pollset_count == pss->pollset_capacity) {
   if (pss->pollset_count == pss->pollset_capacity) {
     pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
     pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
-    pss->pollsets = (pollable **)gpr_realloc(
+    pss->pollsets = (grpc_pollset **)gpr_realloc(
         pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets));
         pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets));
   }
   }
-  pss->pollsets[pss->pollset_count++] = pollable_obj;
+  pss->pollsets[pss->pollset_count++] = ps;
   gpr_mu_unlock(&pss->mu);
   gpr_mu_unlock(&pss->mu);
+  POLLABLE_UNREF(pollable_obj, "pollset_set");
 
 
   GRPC_LOG_IF_ERROR(err_desc, error);
   GRPC_LOG_IF_ERROR(err_desc, error);
 }
 }
@@ -1309,18 +1324,18 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
   }
   }
   size_t initial_a_fd_count = a->fd_count;
   size_t initial_a_fd_count = a->fd_count;
   a->fd_count = 0;
   a->fd_count = 0;
-  append_error(&error, add_fds_to_pollables(
-                           exec_ctx, a->fds, initial_a_fd_count, b->pollsets,
-                           b->pollset_count, "merge_a2b", a->fds, &a->fd_count),
+  append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count,
+                                           b->pollsets, b->pollset_count,
+                                           "merge_a2b", a->fds, &a->fd_count),
                err_desc);
                err_desc);
-  append_error(&error, add_fds_to_pollables(exec_ctx, b->fds, b->fd_count,
-                                            a->pollsets, a->pollset_count,
-                                            "merge_b2a", a->fds, &a->fd_count),
+  append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count,
+                                           a->pollsets, a->pollset_count,
+                                           "merge_b2a", a->fds, &a->fd_count),
                err_desc);
                err_desc);
   if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
   if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
     a->pollset_capacity =
     a->pollset_capacity =
         GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
         GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
-    a->pollsets = (pollable **)gpr_realloc(
+    a->pollsets = (grpc_pollset **)gpr_realloc(
         a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
         a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
   }
   }
   if (b->pollset_count > 0) {
   if (b->pollset_count > 0) {

+ 1 - 1
tools/run_tests/performance/scenario_config.py

@@ -253,7 +253,7 @@ class CXXLanguage:
        req_size=300, resp_size=50,
        req_size=300, resp_size=50,
        unconstrained_client='async', outstanding=30000, channels=300,
        unconstrained_client='async', outstanding=30000, channels=300,
        offered_load=37500, secure=False,
        offered_load=37500, secure=False,
-       async_server_threads=16, server_threads_per_cq=16,
+       async_server_threads=16, server_threads_per_cq=1,
        categories=[SMOKETEST] + [SCALABLE])
        categories=[SMOKETEST] + [SCALABLE])
 
 
     for secure in [True, False]:
     for secure in [True, False]: