ソースを参照

Merge pull request #8650 from kpayson64/poll_cv_improvement

Bypass poll thread if wakeup fd is set
kpayson64 9 年 前
コミット
543d3a35b3
2 ファイル変更10 行追加12 行削除
  1. 7 8
      src/core/lib/iomgr/ev_poll_posix.c
  2. 3 4
      test/core/iomgr/wakeup_fd_cv_test.c

+ 7 - 8
src/core/lib/iomgr/ev_poll_posix.c

@@ -1343,6 +1343,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
   int res, idx;
   gpr_cv *pollcv;
   cv_node *cvn, *prev;
+  int skip_poll = 0;
   nfds_t nsockfds = 0;
   gpr_thd_id t_id;
   gpr_thd_options opt;
@@ -1358,17 +1359,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
       cvn->cv = pollcv;
       cvn->next = g_cvfds.cvfds[idx].cvs;
       g_cvfds.cvfds[idx].cvs = cvn;
-      // We should return immediately if there are pending events,
-      // but we still need to call poll() to check for socket events
+      // Don't bother polling if a wakeup fd is ready
       if (g_cvfds.cvfds[idx].is_set) {
-        timeout = 0;
+        skip_poll=1;
       }
     } else if (fds[i].fd >= 0) {
       nsockfds++;
     }
   }
 
-  if (nsockfds > 0) {
+  res = 0;
+  if (!skip_poll && nsockfds > 0) {
     pargs = gpr_malloc(sizeof(struct poll_args));
     // Both the main thread and calling thread get a reference
     gpr_ref_init(&pargs->refcount, 2);
@@ -1398,16 +1399,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
       res = pargs->retval;
       errno = pargs->err;
     } else {
-      res = 0;
       errno = 0;
       gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
     }
-  } else {
+  } else if (!skip_poll) {
     gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
     deadline =
         gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
     gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
-    res = 0;
   }
 
   idx = 0;
@@ -1431,7 +1430,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
         fds[i].revents = POLLIN;
         if (res >= 0) res++;
       }
-    } else if (fds[i].fd >= 0 &&
+    } else if (!skip_poll && fds[i].fd >= 0 &&
                gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
       fds[i].revents = pargs->fds[idx].revents;
       idx++;

+ 3 - 4
test/core/iomgr/wakeup_fd_cv_test.c

@@ -195,16 +195,15 @@ void test_poll_cv_trigger(void) {
   GPR_ASSERT(pfds[4].revents == 0);
   GPR_ASSERT(pfds[5].revents == 0);
 
-  // Pollin on wakeup fd + socket fd
-  trigger_socket_event();
+  // Pollin on wakeupfd before poll()
   pargs.result = -2;
   gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
   gpr_thd_join(t_id);
 
-  GPR_ASSERT(pargs.result == 2);
+  GPR_ASSERT(pargs.result == 1);
   GPR_ASSERT(pfds[0].revents == 0);
   GPR_ASSERT(pfds[1].revents == POLLIN);
-  GPR_ASSERT(pfds[2].revents == POLLIN);
+  GPR_ASSERT(pfds[2].revents == 0);
   GPR_ASSERT(pfds[3].revents == 0);
   GPR_ASSERT(pfds[4].revents == 0);
   GPR_ASSERT(pfds[5].revents == 0);