|  | @@ -60,6 +60,13 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/profiling/timers.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/support/block_annotate.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
 | 
	
		
			
				|  |  | +static int grpc_polling_trace = 0; /* Disabled by default */
 | 
	
		
			
				|  |  | +#define GRPC_POLLING_TRACE(fmt, ...)       \
 | 
	
		
			
				|  |  | +  if (grpc_polling_trace) {                \
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static int grpc_wakeup_signal = -1;
 | 
	
		
			
				|  |  |  static bool is_grpc_wakeup_signal_initialized = false;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -195,7 +202,11 @@ typedef struct polling_island {
 | 
	
		
			
				|  |  |   * Pollset Declarations
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  struct grpc_pollset_worker {
 | 
	
		
			
				|  |  | -  pthread_t pt_id; /* Thread id of this worker */
 | 
	
		
			
				|  |  | +  /* Thread id of this worker */
 | 
	
		
			
				|  |  | +  pthread_t pt_id;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* Used to prevent a worker from getting kicked multiple times */
 | 
	
		
			
				|  |  | +  gpr_atm is_kicked;
 | 
	
		
			
				|  |  |    struct grpc_pollset_worker *next;
 | 
	
		
			
				|  |  |    struct grpc_pollset_worker *prev;
 | 
	
		
			
				|  |  |  };
 | 
	
	
		
			
				|  | @@ -1058,9 +1069,16 @@ static void pollset_global_shutdown(void) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
 | 
	
		
			
				|  |  |    grpc_error *err = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | -  int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
 | 
	
		
			
				|  |  | -  if (err_num != 0) {
 | 
	
		
			
				|  |  | -    err = GRPC_OS_ERROR(err_num, "pthread_kill");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /* Kick the worker only if it was not already kicked */
 | 
	
		
			
				|  |  | +  if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
 | 
	
		
			
				|  |  | +    GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +        "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
 | 
	
		
			
				|  |  | +        (void *)worker, worker->pt_id);
 | 
	
		
			
				|  |  | +    int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
 | 
	
		
			
				|  |  | +    if (err_num != 0) {
 | 
	
		
			
				|  |  | +      err = GRPC_OS_ERROR(err_num, "pthread_kill");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    return err;
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -1104,7 +1122,6 @@ static grpc_error *pollset_kick(grpc_pollset *p,
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("pollset_kick", 0);
 | 
	
		
			
				|  |  |    grpc_error *error = GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |    const char *err_desc = "Kick Failure";
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    grpc_pollset_worker *worker = specific_worker;
 | 
	
		
			
				|  |  |    if (worker != NULL) {
 | 
	
		
			
				|  |  |      if (worker == GRPC_POLLSET_KICK_BROADCAST) {
 | 
	
	
		
			
				|  | @@ -1270,7 +1287,8 @@ static void pollset_reset(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |  #define GRPC_EPOLL_MAX_EVENTS 1000
 | 
	
		
			
				|  |  |  /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
 | 
	
		
			
				|  |  |  static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  | -                                    grpc_pollset *pollset, int timeout_ms,
 | 
	
		
			
				|  |  | +                                    grpc_pollset *pollset,
 | 
	
		
			
				|  |  | +                                    grpc_pollset_worker *worker, int timeout_ms,
 | 
	
		
			
				|  |  |                                      sigset_t *sig_mask, grpc_error **error) {
 | 
	
		
			
				|  |  |    struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
 | 
	
		
			
				|  |  |    int epoll_fd = -1;
 | 
	
	
		
			
				|  | @@ -1298,6 +1316,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      PI_ADD_REF(pollset->polling_island, "ps");
 | 
	
		
			
				|  |  | +    GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
 | 
	
		
			
				|  |  | +                       (void *)pollset, (void *)pollset->polling_island);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    pi = polling_island_maybe_get_latest(pollset->polling_island);
 | 
	
	
		
			
				|  | @@ -1331,6 +1351,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  |          /* We were interrupted. Save an interation by doing a zero timeout
 | 
	
		
			
				|  |  |             epoll_wait to see if there are any other events of interest */
 | 
	
		
			
				|  |  | +        GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +            "pollset_work: pollset: %p, worker: %p received kick",
 | 
	
		
			
				|  |  | +            (void *)pollset, (void *)worker);
 | 
	
		
			
				|  |  |          ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -1347,6 +1370,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                       grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
 | 
	
		
			
				|  |  |                       err_desc);
 | 
	
		
			
				|  |  |        } else if (data_ptr == &polling_island_wakeup_fd) {
 | 
	
		
			
				|  |  | +        GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +            "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
 | 
	
		
			
				|  |  | +            "%d) got merged",
 | 
	
		
			
				|  |  | +            (void *)pollset, (void *)worker, epoll_fd);
 | 
	
		
			
				|  |  |          /* This means that our polling island is merged with a different
 | 
	
		
			
				|  |  |             island. We do not have to do anything here since the subsequent call
 | 
	
		
			
				|  |  |             to the function pollset_work_and_unlock() will pick up the correct
 | 
	
	
		
			
				|  | @@ -1394,6 +1421,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
 | 
	
		
			
				|  |  |    grpc_pollset_worker worker;
 | 
	
		
			
				|  |  |    worker.next = worker.prev = NULL;
 | 
	
		
			
				|  |  |    worker.pt_id = pthread_self();
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    *worker_hdl = &worker;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1409,18 +1437,20 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
 | 
	
		
			
				|  |  |      pollset->kicked_without_pollers = 0;
 | 
	
		
			
				|  |  |    } else if (!pollset->shutting_down) {
 | 
	
		
			
				|  |  |      /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
 | 
	
		
			
				|  |  | -       (i.e 'kicking') a worker in the pollset.
 | 
	
		
			
				|  |  | -       A 'kick' is a way to inform that worker that there is some pending work
 | 
	
		
			
				|  |  | -       that needs immediate attention (like an event on the completion queue,
 | 
	
		
			
				|  |  | -       or a polling island merge that results in a new epoll-fd to wait on) and
 | 
	
		
			
				|  |  | -       that the worker should not spend time waiting in epoll_pwait().
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -       A kick can come at anytime (i.e before/during or after the worker calls
 | 
	
		
			
				|  |  | -       epoll_pwait()) but in all cases we have to make sure that when a worker
 | 
	
		
			
				|  |  | -       gets a kick, it does not spend time in epoll_pwait(). In other words, one
 | 
	
		
			
				|  |  | -       kick should result in skipping/exiting of one epoll_pwait();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -       To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all
 | 
	
		
			
				|  |  | +       (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
 | 
	
		
			
				|  |  | +       worker that there is some pending work that needs immediate attention
 | 
	
		
			
				|  |  | +       (like an event on the completion queue, or a polling island merge that
 | 
	
		
			
				|  |  | +       results in a new epoll-fd to wait on) and that the worker should not
 | 
	
		
			
				|  |  | +       spend time waiting in epoll_pwait().
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +       A worker can be kicked anytime from the point it is added to the pollset
 | 
	
		
			
				|  |  | +       via push_front_worker() (or push_back_worker()) to the point it is
 | 
	
		
			
				|  |  | +       removed via remove_worker().
 | 
	
		
			
				|  |  | +       If the worker is kicked before/during it calls epoll_pwait(), it should
 | 
	
		
			
				|  |  | +       immediately exit from epoll_wait(). If the worker is kicked after it
 | 
	
		
			
				|  |  | +       returns from epoll_wait(), then nothing really needs to be done.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +       To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
 | 
	
		
			
				|  |  |         times *except* when it is in epoll_pwait(). This way, the worker never
 | 
	
		
			
				|  |  |         misses acting on a kick */
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1442,11 +1472,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      push_front_worker(pollset, &worker); /* Add worker to pollset */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &g_orig_sigmask,
 | 
	
		
			
				|  |  | -                            &error);
 | 
	
		
			
				|  |  | +    pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
 | 
	
		
			
				|  |  | +                            &g_orig_sigmask, &error);
 | 
	
		
			
				|  |  |      grpc_exec_ctx_flush(exec_ctx);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      gpr_mu_lock(&pollset->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /* Note: There is no need to reset worker.is_kicked to 0 since we are no
 | 
	
		
			
				|  |  | +       longer going to use this worker */
 | 
	
		
			
				|  |  |      remove_worker(pollset, &worker);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1506,17 +1539,38 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
 | 
	
		
			
				|  |  |      pi_new = fd->polling_island;
 | 
	
		
			
				|  |  |      if (pi_new == NULL) {
 | 
	
		
			
				|  |  |        pi_new = polling_island_create(fd, &error);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +          "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
 | 
	
		
			
				|  |  | +          "pollset: %p)",
 | 
	
		
			
				|  |  | +          (void *)pi_new, fd->fd, (void *)pollset);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else if (fd->polling_island == NULL) {
 | 
	
		
			
				|  |  |      pi_new = polling_island_lock(pollset->polling_island);
 | 
	
		
			
				|  |  |      polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
 | 
	
		
			
				|  |  |      gpr_mu_unlock(&pi_new->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +        "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
 | 
	
		
			
				|  |  | +        "pollset->pi: %p)",
 | 
	
		
			
				|  |  | +        (void *)pi_new, fd->fd, (void *)pollset,
 | 
	
		
			
				|  |  | +        (void *)pollset->polling_island);
 | 
	
		
			
				|  |  |    } else if (pollset->polling_island == NULL) {
 | 
	
		
			
				|  |  |      pi_new = polling_island_lock(fd->polling_island);
 | 
	
		
			
				|  |  |      gpr_mu_unlock(&pi_new->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +        "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
 | 
	
		
			
				|  |  | +        "%p, fd->pi: %p",
 | 
	
		
			
				|  |  | +        (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
 | 
	
		
			
				|  |  |                                    &error);
 | 
	
		
			
				|  |  | +    GRPC_POLLING_TRACE(
 | 
	
		
			
				|  |  | +        "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
 | 
	
		
			
				|  |  | +        "%p, fd->pi: %p, pollset->pi: %p)",
 | 
	
		
			
				|  |  | +        (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
 | 
	
		
			
				|  |  | +        (void *)pollset->polling_island);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* At this point, pi_new is the polling island that both fd->polling_island
 |