Răsfoiți Sursa

Prevent polling island + workqueue reference loop

Craig Tiller 9 ani în urmă
părinte
comite
1500761b07
1 a modificat fișierele cu 33 adăugiri și 15 ștergeri
  1. 33 15
      src/core/lib/iomgr/ev_epoll_linux.c

+ 33 - 15
src/core/lib/iomgr/ev_epoll_linux.c

@@ -153,7 +153,7 @@ static void fd_global_shutdown(void);
  * Polling island Declarations
  * Polling island Declarations
  */
  */
 
 
-// #define GRPC_PI_REF_COUNT_DEBUG
+//#define GRPC_PI_REF_COUNT_DEBUG
 #ifdef GRPC_PI_REF_COUNT_DEBUG
 #ifdef GRPC_PI_REF_COUNT_DEBUG
 
 
 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
@@ -174,7 +174,7 @@ typedef struct polling_island {
      Once the ref count becomes zero, this structure is destroyed which means
      Once the ref count becomes zero, this structure is destroyed which means
      we should ensure that there is never a scenario where a PI_ADD_REF() is
      we should ensure that there is never a scenario where a PI_ADD_REF() is
      racing with a PI_UNREF() that just made the ref_count zero. */
      racing with a PI_UNREF() that just made the ref_count zero. */
-  gpr_refcount ref_count;
+  gpr_atm ref_count;
 
 
   /* Pointer to the polling_island this merged into.
   /* Pointer to the polling_island this merged into.
    * merged_to value is only set once in polling_island's lifetime (and that too
    * merged_to value is only set once in polling_island's lifetime (and that too
@@ -296,7 +296,7 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
 
 
 static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
 static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
                            int line) {
                            int line) {
-  long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
+  long old_cnt = gpr_atm_acq_load(&pi->ref_count);
   pi_add_ref(pi);
   pi_add_ref(pi);
   gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
   gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
           (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
           (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
@@ -304,17 +304,22 @@ static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
 
 
 static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
 static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
                          char *reason, char *file, int line) {
                          char *reason, char *file, int line) {
-  long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
+  long old_cnt = gpr_atm_acq_load(&pi->ref_count);
   pi_unref(exec_ctx, pi);
   pi_unref(exec_ctx, pi);
   gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
   gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
           (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
           (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
 }
 }
 #endif
 #endif
 
 
-static void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
+static void pi_add_ref(polling_island *pi) {
+  gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
+}
 
 
 static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
 static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
-  /* If ref count went to zero, delete the polling island.
+  /* If ref count went to one, we're back to just the workqueue owning a ref.
+     Unref the workqueue to break the loop.
+
+     If ref count went to zero, delete the polling island.
      Note that this deletion not be done under a lock. Once the ref count goes
      Note that this deletion not be done under a lock. Once the ref count goes
      to zero, we are guaranteed that no one else holds a reference to the
      to zero, we are guaranteed that no one else holds a reference to the
      polling island (and that there is no racing pi_add_ref() call either).
      polling island (and that there is no racing pi_add_ref() call either).
@@ -322,12 +327,20 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
      Also, if we are deleting the polling island and the merged_to field is
      Also, if we are deleting the polling island and the merged_to field is
      non-empty, we should remove a ref to the merged_to polling island
      non-empty, we should remove a ref to the merged_to polling island
    */
    */
-  if (gpr_unref(&pi->ref_count)) {
-    polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
-    polling_island_delete(exec_ctx, pi);
-    if (next != NULL) {
-      PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+  switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+    case 2: /* last external ref: the only one now owned is by the workqueue */
+      GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+      break;
+    case 1: {
+      polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+      polling_island_delete(exec_ctx, pi);
+      if (next != NULL) {
+        PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+      }
+      break;
     }
     }
+    case 0:
+      GPR_UNREACHABLE_CODE(return );
   }
   }
 }
 }
 
 
@@ -478,7 +491,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
   pi->epoll_fd = -1;
   pi->epoll_fd = -1;
   pi->workqueue = NULL;
   pi->workqueue = NULL;
 
 
-  gpr_ref_init(&pi->ref_count, 0);
+  gpr_atm_rel_store(&pi->ref_count, 0);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
 
 
   pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
   pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
@@ -501,7 +514,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
                                   error);
                                   error);
     GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
     GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
     pi->workqueue->wakeup_read_fd->polling_island = pi;
     pi->workqueue->wakeup_read_fd->polling_island = pi;
-    PI_ADD_REF(pi, 1);
+    PI_ADD_REF(pi, "fd");
   }
   }
 
 
 done:
 done:
@@ -525,7 +538,6 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
 
 
   close(pi->epoll_fd);
   close(pi->epoll_fd);
-  GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
   gpr_mu_destroy(&pi->mu);
   gpr_mu_destroy(&pi->mu);
   gpr_free(pi->fds);
   gpr_free(pi->fds);
   gpr_free(pi);
   gpr_free(pi);
@@ -885,6 +897,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                       const char *reason) {
                       const char *reason) {
   bool is_fd_closed = false;
   bool is_fd_closed = false;
   grpc_error *error = GRPC_ERROR_NONE;
   grpc_error *error = GRPC_ERROR_NONE;
+  polling_island *unref_pi = NULL;
 
 
   gpr_mu_lock(&fd->mu);
   gpr_mu_lock(&fd->mu);
   fd->on_done_closure = on_done;
   fd->on_done_closure = on_done;
@@ -918,7 +931,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
     polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
     polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
     gpr_mu_unlock(&pi_latest->mu);
     gpr_mu_unlock(&pi_latest->mu);
 
 
-    PI_UNREF(exec_ctx, fd->polling_island, "fd_orphan");
+    unref_pi = fd->polling_island;
     fd->polling_island = NULL;
     fd->polling_island = NULL;
   }
   }
   gpr_mu_unlock(&fd->pi_mu);
   gpr_mu_unlock(&fd->pi_mu);
@@ -927,6 +940,9 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 
 
   gpr_mu_unlock(&fd->mu);
   gpr_mu_unlock(&fd->mu);
   UNREF_BY(fd, 2, reason); /* Drop the reference */
   UNREF_BY(fd, 2, reason); /* Drop the reference */
+  if (unref_pi != NULL) {
+    PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
+  }
   GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
   GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
 }
 }
 
 
@@ -1595,6 +1611,8 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
 
 
   gpr_mu_unlock(&fd->pi_mu);
   gpr_mu_unlock(&fd->pi_mu);
   gpr_mu_unlock(&pollset->mu);
   gpr_mu_unlock(&pollset->mu);
+
+  GRPC_LOG_IF_ERROR("pollset_add_fd", error);
 }
 }
 
 
 /*******************************************************************************
 /*******************************************************************************