فهرست منبع

Threading fixes

- single global mutex (simpler, easier to make correct for now)
- properly flag kick state for workers to avoid missing wakeups (if
signal is called before wait on the cv)
Craig Tiller 10 سال پیش
والد
کامیت
084c8089a4
2فایلهای تغییر یافته به همراه32 افزوده شده و 23 حذف شده
  1. 22 19
      src/core/iomgr/pollset_windows.c
  2. 10 4
      src/core/iomgr/pollset_windows.h

+ 22 - 19
src/core/iomgr/pollset_windows.c

@@ -43,12 +43,12 @@
 #include "src/core/iomgr/pollset.h"
 #include "src/core/iomgr/pollset_windows.h"
 
-static gpr_mu g_polling_mu;
+gpr_mu grpc_polling_mu;
 static grpc_pollset_worker *g_active_poller;
 static grpc_pollset_worker g_global_root_worker;
 
 void grpc_pollset_global_init() {
-  gpr_mu_init(&g_polling_mu);
+  gpr_mu_init(&grpc_polling_mu);
   g_active_poller = NULL;
   g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
     g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
@@ -56,7 +56,7 @@ void grpc_pollset_global_init() {
 }
 
 void grpc_pollset_global_shutdown() {
-  gpr_mu_destroy(&g_polling_mu);
+  gpr_mu_destroy(&grpc_polling_mu);
 }
 
 static void remove_worker(grpc_pollset_worker *worker, 
@@ -108,7 +108,6 @@ static void push_front_worker(grpc_pollset_worker *root,
 
 void grpc_pollset_init(grpc_pollset *pollset) {
   memset(pollset, 0, sizeof(*pollset));
-  gpr_mu_init(&pollset->mu);
   pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = 
     pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = 
     &pollset->root_worker;
@@ -116,7 +115,7 @@ void grpc_pollset_init(grpc_pollset *pollset) {
 
 void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
                            grpc_closure *closure) {
-  gpr_mu_lock(&pollset->mu);
+  gpr_mu_lock(&grpc_polling_mu);
   pollset->shutting_down = 1;
   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
   if (!pollset->is_iocp_worker) {
@@ -124,11 +123,10 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
   } else {
     pollset->on_shutdown = closure;
   }
-  gpr_mu_unlock(&pollset->mu);
+  gpr_mu_unlock(&grpc_polling_mu);
 }
 
 void grpc_pollset_destroy(grpc_pollset *pollset) {
-  gpr_mu_destroy(&pollset->mu);
 }
 
 void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -140,29 +138,31 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
     worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
     NULL;
+  worker->kicked = 0;
+  worker->pollset = pollset;
   gpr_cv_init(&worker->cv);
   if (grpc_alarm_check(exec_ctx, now, &deadline)) {
     goto done;
   }
   if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
-    gpr_mu_lock(&g_polling_mu);
     if (g_active_poller == NULL) {
       grpc_pollset_worker *next_worker;
       /* become poller */
       pollset->is_iocp_worker = 1;
       g_active_poller = worker;
-      gpr_mu_unlock(&g_polling_mu);
-      gpr_mu_unlock(&pollset->mu);
+      gpr_mu_unlock(&grpc_polling_mu);
       grpc_iocp_work(exec_ctx, deadline);
-      gpr_mu_lock(&pollset->mu);
-      gpr_mu_lock(&g_polling_mu);
+      gpr_mu_lock(&grpc_polling_mu);
       pollset->is_iocp_worker = 0;
       g_active_poller = NULL;
+      /* try to get a worker from this pollsets worker list */
+      next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
+      /* try to get a worker from the global list */
       next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
       if (next_worker != NULL) {
+        next_worker->kicked = 1;
         gpr_cv_signal(&next_worker->cv);
       }
-      gpr_mu_unlock(&g_polling_mu);
 
       if (pollset->shutting_down && pollset->on_shutdown != NULL) {
         grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1);
@@ -171,18 +171,21 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
       goto done;
     }
     push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker);
-    gpr_mu_unlock(&g_polling_mu);
     push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker);
     added_worker = 1;
-    gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
+    while (!worker->kicked) {
+      if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) {
+        break;
+      }
+    }
   } else {
     pollset->kicked_without_pollers = 0;
   }
 done:
   if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
-    gpr_mu_unlock(&pollset->mu);
+    gpr_mu_unlock(&grpc_polling_mu);
     grpc_exec_ctx_flush(exec_ctx);
-    gpr_mu_lock(&pollset->mu);
+    gpr_mu_lock(&grpc_polling_mu);
   }
   if (added_worker) {
     remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
@@ -197,6 +200,7 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
       for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
            specific_worker != &p->root_worker;
            specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
+        specific_worker->kicked = 1;
         gpr_cv_signal(&specific_worker->cv);
       }
       p->kicked_without_pollers = 1;
@@ -205,12 +209,11 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
       }
     } else {
       if (p->is_iocp_worker) {
-        gpr_mu_lock(&g_polling_mu);
         if (g_active_poller == specific_worker) {
           grpc_iocp_kick();
         }
-        gpr_mu_unlock(&g_polling_mu);
       } else {
+        specific_worker->kicked = 1;
         gpr_cv_signal(&specific_worker->cv);
       }
     }

+ 10 - 4
src/core/iomgr/pollset_windows.h

@@ -54,20 +54,26 @@ typedef struct grpc_pollset_worker_link {
   struct grpc_pollset_worker *prev;
 } grpc_pollset_worker_link;
 
+struct grpc_pollset;
+typedef struct grpc_pollset grpc_pollset;
+
 typedef struct grpc_pollset_worker {
   gpr_cv cv;
+  int kicked;
+  struct grpc_pollset *pollset;
   grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES];
 } grpc_pollset_worker;
 
-typedef struct grpc_pollset {
-  gpr_mu mu;
+struct grpc_pollset {
   int shutting_down;
   int kicked_without_pollers;
   int is_iocp_worker;
   grpc_pollset_worker root_worker;
   grpc_closure *on_shutdown;
-} grpc_pollset;
+};
+
+extern gpr_mu grpc_polling_mu;
 
-#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
+#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu)
 
 #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */