|  | @@ -55,22 +55,60 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/useful.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  GPR_TLS_DECL(g_current_thread_poller);
 | 
	
		
			
				|  |  | +GPR_TLS_DECL(g_current_thread_worker);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_pollset_kick(grpc_pollset *p) {
 | 
	
		
			
				|  |  | -  if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) {
 | 
	
		
			
				|  |  | -    p->vtable->kick(p);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
 | 
	
		
			
				|  |  | +  worker->prev->next = worker->next;
 | 
	
		
			
				|  |  | +  worker->next->prev = worker->prev;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +int grpc_pollset_has_workers(grpc_pollset *p) {
 | 
	
		
			
				|  |  | +  return p->root_worker.next != &p->root_worker;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_pollset_force_kick(grpc_pollset *p) {
 | 
	
		
			
				|  |  | -  if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
 | 
	
		
			
				|  |  | -    grpc_pollset_kick_kick(&p->kick_state);
 | 
	
		
			
				|  |  | +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
 | 
	
		
			
				|  |  | +  if (grpc_pollset_has_workers(p)) {
 | 
	
		
			
				|  |  | +    grpc_pollset_worker *w = p->root_worker.next;
 | 
	
		
			
				|  |  | +    remove_worker(p, w);
 | 
	
		
			
				|  |  | +    return w;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    return NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void kick_using_pollset_kick(grpc_pollset *p) {
 | 
	
		
			
				|  |  | -  if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
 | 
	
		
			
				|  |  | -    grpc_pollset_kick_kick(&p->kick_state);
 | 
	
		
			
				|  |  | +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
 | 
	
		
			
				|  |  | +  worker->next = &p->root_worker;
 | 
	
		
			
				|  |  | +  worker->prev = worker->next->prev;
 | 
	
		
			
				|  |  | +  worker->prev->next = worker->next->prev = worker;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
 | 
	
		
			
				|  |  | +  worker->prev = &p->root_worker;
 | 
	
		
			
				|  |  | +  worker->next = worker->prev->next;
 | 
	
		
			
				|  |  | +  worker->prev->next = worker->next->prev = worker;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
 | 
	
		
			
				|  |  | +  if (specific_worker != NULL) {
 | 
	
		
			
				|  |  | +    if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
 | 
	
		
			
				|  |  | +      for (specific_worker = p->root_worker.next;
 | 
	
		
			
				|  |  | +           specific_worker != &p->root_worker;
 | 
	
		
			
				|  |  | +           specific_worker = specific_worker->next) {
 | 
	
		
			
				|  |  | +        grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +      p->kicked_without_pollers = 1;
 | 
	
		
			
				|  |  | +    } else if (gpr_tls_get(&g_current_thread_worker) !=
 | 
	
		
			
				|  |  | +               (gpr_intptr)specific_worker) {
 | 
	
		
			
				|  |  | +      grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
 | 
	
		
			
				|  |  | +    specific_worker = pop_front_worker(p);
 | 
	
		
			
				|  |  | +    if (specific_worker != NULL) {
 | 
	
		
			
				|  |  | +      push_back_worker(p, specific_worker);
 | 
	
		
			
				|  |  | +      grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      p->kicked_without_pollers = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_pollset_global_init(void) {
 | 
	
		
			
				|  |  |    gpr_tls_init(&g_current_thread_poller);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* Initialize kick fd state */
 | 
	
		
			
				|  |  | -  grpc_pollset_kick_global_init();
 | 
	
		
			
				|  |  | +  grpc_wakeup_fd_global_init();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_pollset_global_shutdown(void) {
 | 
	
		
			
				|  |  | -  /* destroy the kick pipes */
 | 
	
		
			
				|  |  | -  grpc_pollset_kick_global_destroy();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    gpr_tls_destroy(&g_current_thread_poller);
 | 
	
		
			
				|  |  | +  grpc_wakeup_fd_global_destroy();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* main interface */
 | 
	
	
		
			
				|  | @@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_pollset_init(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |    gpr_mu_init(&pollset->mu);
 | 
	
		
			
				|  |  | -  grpc_pollset_kick_init(&pollset->kick_state);
 | 
	
		
			
				|  |  | +  pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
 | 
	
		
			
				|  |  |    pollset->in_flight_cbs = 0;
 | 
	
		
			
				|  |  |    pollset->shutting_down = 0;
 | 
	
		
			
				|  |  |    pollset->called_shutdown = 0;
 | 
	
	
		
			
				|  | @@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |    pollset->shutdown_done_cb(pollset->shutdown_done_arg);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
 | 
	
		
			
				|  |  | +int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
 | 
	
		
			
				|  |  | +                      gpr_timespec deadline) {
 | 
	
		
			
				|  |  |    /* pollset->mu already held */
 | 
	
		
			
				|  |  |    gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  | +  int added_worker = 0;
 | 
	
		
			
				|  |  |    if (gpr_time_cmp(now, deadline) > 0) {
 | 
	
		
			
				|  |  |      return 0;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  /* this must happen before we (potentially) drop pollset->mu */
 | 
	
		
			
				|  |  | +  worker->next = worker->prev = NULL;
 | 
	
		
			
				|  |  | +  /* TODO(ctiller): pool these */
 | 
	
		
			
				|  |  | +  grpc_wakeup_fd_init(&worker->wakeup_fd);
 | 
	
		
			
				|  |  |    if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
 | 
	
		
			
				|  |  | -    return 1;
 | 
	
		
			
				|  |  | +    goto done;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
 | 
	
		
			
				|  |  | -    return 1;
 | 
	
		
			
				|  |  | +    goto done;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (pollset->shutting_down) {
 | 
	
		
			
				|  |  | -    return 1;
 | 
	
		
			
				|  |  | +    goto done;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (!pollset->kicked_without_pollers) {
 | 
	
		
			
				|  |  | +    push_front_worker(pollset, worker);
 | 
	
		
			
				|  |  | +    added_worker = 1;
 | 
	
		
			
				|  |  | +    gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
 | 
	
		
			
				|  |  | +    pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
 | 
	
		
			
				|  |  | +    gpr_tls_set(&g_current_thread_poller, 0);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    pollset->kicked_without_pollers = 0;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +done:
 | 
	
		
			
				|  |  | +  grpc_wakeup_fd_destroy(&worker->wakeup_fd);
 | 
	
		
			
				|  |  | +  if (added_worker) {
 | 
	
		
			
				|  |  | +    remove_worker(pollset, worker);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
 | 
	
		
			
				|  |  | -  pollset->vtable->maybe_work(pollset, deadline, now, 1);
 | 
	
		
			
				|  |  | -  gpr_tls_set(&g_current_thread_poller, 0);
 | 
	
		
			
				|  |  |    if (pollset->shutting_down) {
 | 
	
		
			
				|  |  | -    if (pollset->counter > 0) {
 | 
	
		
			
				|  |  | -      grpc_pollset_kick(pollset);
 | 
	
		
			
				|  |  | +    if (grpc_pollset_has_workers(pollset)) {
 | 
	
		
			
				|  |  | +      grpc_pollset_kick(pollset, NULL);
 | 
	
		
			
				|  |  |      } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
 | 
	
		
			
				|  |  |        pollset->called_shutdown = 1;
 | 
	
		
			
				|  |  |        gpr_mu_unlock(&pollset->mu);
 | 
	
	
		
			
				|  | @@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
 | 
	
		
			
				|  |  |    GPR_ASSERT(!pollset->shutting_down);
 | 
	
		
			
				|  |  |    pollset->shutting_down = 1;
 | 
	
		
			
				|  |  |    if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
 | 
	
		
			
				|  |  | -      pollset->counter == 0) {
 | 
	
		
			
				|  |  | +      !grpc_pollset_has_workers(pollset)) {
 | 
	
		
			
				|  |  |      pollset->called_shutdown = 1;
 | 
	
		
			
				|  |  |      call_shutdown = 1;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    pollset->shutdown_done_cb = shutdown_done;
 | 
	
		
			
				|  |  |    pollset->shutdown_done_arg = shutdown_done_arg;
 | 
	
		
			
				|  |  | -  if (pollset->counter > 0) {
 | 
	
		
			
				|  |  | -    grpc_pollset_kick(pollset);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
 | 
	
		
			
				|  |  |    gpr_mu_unlock(&pollset->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (call_shutdown) {
 | 
	
	
		
			
				|  | @@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
 | 
	
		
			
				|  |  |  void grpc_pollset_destroy(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(pollset->shutting_down);
 | 
	
		
			
				|  |  |    GPR_ASSERT(pollset->in_flight_cbs == 0);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!grpc_pollset_has_workers(pollset));
 | 
	
		
			
				|  |  |    pollset->vtable->destroy(pollset);
 | 
	
		
			
				|  |  | -  grpc_pollset_kick_destroy(&pollset->kick_state);
 | 
	
		
			
				|  |  |    gpr_mu_destroy(&pollset->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_mu_lock(&pollset->mu);
 | 
	
		
			
				|  |  |    /* First we need to ensure that nobody is polling concurrently */
 | 
	
		
			
				|  |  | -  if (pollset->counter != 0) {
 | 
	
		
			
				|  |  | -    grpc_pollset_kick(pollset);
 | 
	
		
			
				|  |  | +  if (grpc_pollset_has_workers(pollset)) {
 | 
	
		
			
				|  |  | +    grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
 | 
	
		
			
				|  |  |      grpc_iomgr_add_callback(&up_args->promotion_closure);
 | 
	
		
			
				|  |  |      gpr_mu_unlock(&pollset->mu);
 | 
	
		
			
				|  |  |      return;
 | 
	
	
		
			
				|  | @@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) {
 | 
	
		
			
				|  |  |    pollset->in_flight_cbs--;
 | 
	
		
			
				|  |  |    if (pollset->shutting_down) {
 | 
	
		
			
				|  |  |      /* We don't care about this pollset anymore. */
 | 
	
		
			
				|  |  | -    if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) {
 | 
	
		
			
				|  |  | +    if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
 | 
	
		
			
				|  |  | +      GPR_ASSERT(!grpc_pollset_has_workers(pollset));
 | 
	
		
			
				|  |  |        pollset->called_shutdown = 1;
 | 
	
		
			
				|  |  |        do_shutdown_cb = 1;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
 | 
	
		
			
				|  |  |    GPR_ASSERT(fd);
 | 
	
		
			
				|  |  |    if (fd == pollset->data.ptr) goto exit;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (!pollset->counter) {
 | 
	
		
			
				|  |  | +  if (!grpc_pollset_has_workers(pollset)) {
 | 
	
		
			
				|  |  |      /* Fast path -- no in flight cbs */
 | 
	
		
			
				|  |  |      /* TODO(klempner): Comment this out and fix any test failures or establish
 | 
	
		
			
				|  |  |       * they are due to timing issues */
 | 
	
	
		
			
				|  | @@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
 | 
	
		
			
				|  |  |    up_args->promotion_closure.cb_arg = up_args;
 | 
	
		
			
				|  |  |    grpc_iomgr_add_callback(&up_args->promotion_closure);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_pollset_kick(pollset);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  exit:
 | 
	
		
			
				|  |  |    if (and_unlock_pollset) {
 | 
	
	
		
			
				|  | @@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void basic_pollset_maybe_work(grpc_pollset *pollset,
 | 
	
		
			
				|  |  | +                                     grpc_pollset_worker *worker,
 | 
	
		
			
				|  |  |                                       gpr_timespec deadline, gpr_timespec now,
 | 
	
		
			
				|  |  |                                       int allow_synchronous_callback) {
 | 
	
		
			
				|  |  |    struct pollfd pfd[2];
 | 
	
		
			
				|  |  |    grpc_fd *fd;
 | 
	
		
			
				|  |  |    grpc_fd_watcher fd_watcher;
 | 
	
		
			
				|  |  | -  grpc_kick_fd_info *kfd;
 | 
	
		
			
				|  |  |    int timeout;
 | 
	
		
			
				|  |  |    int r;
 | 
	
		
			
				|  |  |    int nfds;
 | 
	
	
		
			
				|  | @@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
 | 
	
		
			
				|  |  |      fd = pollset->data.ptr = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
 | 
	
		
			
				|  |  | -  kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
 | 
	
		
			
				|  |  | -  if (kfd == NULL) {
 | 
	
		
			
				|  |  | -    /* Already kicked */
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
 | 
	
		
			
				|  |  | +  pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
 | 
	
		
			
				|  |  |    pfd[0].events = POLLIN;
 | 
	
		
			
				|  |  |    pfd[0].revents = 0;
 | 
	
		
			
				|  |  |    nfds = 1;
 | 
	
		
			
				|  |  | -  pollset->counter++;
 | 
	
		
			
				|  |  |    if (fd) {
 | 
	
		
			
				|  |  |      pfd[1].fd = fd->fd;
 | 
	
		
			
				|  |  |      pfd[1].revents = 0;
 | 
	
	
		
			
				|  | @@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
 | 
	
		
			
				|  |  |      /* do nothing */
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      if (pfd[0].revents & POLLIN) {
 | 
	
		
			
				|  |  | -      grpc_pollset_kick_consume(&pollset->kick_state, kfd);
 | 
	
		
			
				|  |  | +      grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      if (nfds > 1) {
 | 
	
		
			
				|  |  |        if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
 | 
	
	
		
			
				|  | @@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    gpr_mu_lock(&pollset->mu);
 | 
	
		
			
				|  |  | -  pollset->counter--;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void basic_pollset_destroy(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(pollset->counter == 0);
 | 
	
		
			
				|  |  |    if (pollset->data.ptr != NULL) {
 | 
	
		
			
				|  |  |      GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
 | 
	
		
			
				|  |  |      pollset->data.ptr = NULL;
 | 
	
	
		
			
				|  | @@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static const grpc_pollset_vtable basic_pollset = {
 | 
	
		
			
				|  |  | -    basic_pollset_add_fd,    basic_pollset_del_fd,  basic_pollset_maybe_work,
 | 
	
		
			
				|  |  | -    kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
 | 
	
		
			
				|  |  | +    basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
 | 
	
		
			
				|  |  | +    basic_pollset_destroy, basic_pollset_destroy};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
 | 
	
		
			
				|  |  |    pollset->vtable = &basic_pollset;
 | 
	
		
			
				|  |  | -  pollset->counter = 0;
 | 
	
		
			
				|  |  |    pollset->data.ptr = fd_or_null;
 | 
	
		
			
				|  |  | -  if (fd_or_null) {
 | 
	
		
			
				|  |  | +  if (fd_or_null != NULL) {
 | 
	
		
			
				|  |  |      GRPC_FD_REF(fd_or_null, "basicpoll");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 |