|  | @@ -51,6 +51,8 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/time.h>
 | 
	
		
			
				|  |  |  #include "test/core/util/test_config.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static grpc_pollset g_pollset;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /* buffer size used to send and receive data.
 | 
	
		
			
				|  |  |     1024 is the minimal value to set TCP send and receive buffer. */
 | 
	
		
			
				|  |  |  #define BUF_SIZE 1024
 | 
	
	
		
			
				|  | @@ -94,16 +96,12 @@ void no_op_cb(void *arg, int success) {}
 | 
	
		
			
				|  |  |  typedef struct {
 | 
	
		
			
				|  |  |    grpc_fd *em_fd;           /* listening fd */
 | 
	
		
			
				|  |  |    ssize_t read_bytes_total; /* total number of received bytes */
 | 
	
		
			
				|  |  | -  gpr_mu mu;                /* protect done and done_cv */
 | 
	
		
			
				|  |  | -  gpr_cv done_cv;           /* signaled when a server finishes serving */
 | 
	
		
			
				|  |  |    int done;                 /* set to 1 when a server finishes serving */
 | 
	
		
			
				|  |  |    grpc_iomgr_closure listen_closure;
 | 
	
		
			
				|  |  |  } server;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void server_init(server *sv) {
 | 
	
		
			
				|  |  |    sv->read_bytes_total = 0;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&sv->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_init(&sv->done_cv);
 | 
	
		
			
				|  |  |    sv->done = 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -179,10 +177,10 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_fd_orphan(sv->em_fd, NULL, NULL);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&sv->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    sv->done = 1;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(&sv->done_cv);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&sv->mu);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(&g_pollset);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Called when a new TCP connection request arrives in the listening port. */
 | 
	
	
		
			
				|  | @@ -209,6 +207,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
 | 
	
		
			
				|  |  |    se = gpr_malloc(sizeof(*se));
 | 
	
		
			
				|  |  |    se->sv = sv;
 | 
	
		
			
				|  |  |    se->em_fd = grpc_fd_create(fd);
 | 
	
		
			
				|  |  | +  grpc_pollset_add_fd(&g_pollset, se->em_fd);
 | 
	
		
			
				|  |  |    se->session_read_closure.cb = session_read_cb;
 | 
	
		
			
				|  |  |    se->session_read_closure.cb_arg = se;
 | 
	
		
			
				|  |  |    grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
 | 
	
	
		
			
				|  | @@ -237,6 +236,7 @@ static int server_start(server *sv) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    sv->em_fd = grpc_fd_create(fd);
 | 
	
		
			
				|  |  | +  grpc_pollset_add_fd(&g_pollset, sv->em_fd);
 | 
	
		
			
				|  |  |    /* Register to be interested in reading from listen_fd. */
 | 
	
		
			
				|  |  |    sv->listen_closure.cb = listen_cb;
 | 
	
		
			
				|  |  |    sv->listen_closure.cb_arg = sv;
 | 
	
	
		
			
				|  | @@ -247,12 +247,11 @@ static int server_start(server *sv) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Wait and shutdown a sever. */
 | 
	
		
			
				|  |  |  static void server_wait_and_shutdown(server *sv) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&sv->mu);
 | 
	
		
			
				|  |  | -  while (!sv->done) gpr_cv_wait(&sv->done_cv, &sv->mu, gpr_inf_future);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&sv->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&sv->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_destroy(&sv->done_cv);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  | +  while (!sv->done) {
 | 
	
		
			
				|  |  | +    grpc_pollset_work(&g_pollset, gpr_inf_future);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* ===An upload client to test notify_on_write=== */
 | 
	
	
		
			
				|  | @@ -271,8 +270,6 @@ typedef struct {
 | 
	
		
			
				|  |  |       notify_on_write to schedule another write. */
 | 
	
		
			
				|  |  |    int client_write_cnt;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu mu;      /* protect done and done_cv */
 | 
	
		
			
				|  |  | -  gpr_cv done_cv; /* signaled when a client finishes sending */
 | 
	
		
			
				|  |  |    int done;       /* set to 1 when a client finishes sending */
 | 
	
		
			
				|  |  |    grpc_iomgr_closure write_closure;
 | 
	
		
			
				|  |  |  } client;
 | 
	
	
		
			
				|  | @@ -281,8 +278,6 @@ static void client_init(client *cl) {
 | 
	
		
			
				|  |  |    memset(cl->write_buf, 0, sizeof(cl->write_buf));
 | 
	
		
			
				|  |  |    cl->write_bytes_total = 0;
 | 
	
		
			
				|  |  |    cl->client_write_cnt = 0;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&cl->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_init(&cl->done_cv);
 | 
	
		
			
				|  |  |    cl->done = 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -291,7 +286,7 @@ static void client_session_shutdown_cb(void *arg /*client*/, int success) {
 | 
	
		
			
				|  |  |    client *cl = arg;
 | 
	
		
			
				|  |  |    grpc_fd_orphan(cl->em_fd, NULL, NULL);
 | 
	
		
			
				|  |  |    cl->done = 1;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(&cl->done_cv);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(&g_pollset);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Write as much as possible, then register notify_on_write. */
 | 
	
	
		
			
				|  | @@ -302,9 +297,9 @@ static void client_session_write(void *arg, /*client*/
 | 
	
		
			
				|  |  |    ssize_t write_once = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (!success) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&cl->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |      client_session_shutdown_cb(arg, 1);
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&cl->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -314,7 +309,7 @@ static void client_session_write(void *arg, /*client*/
 | 
	
		
			
				|  |  |    } while (write_once > 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (errno == EAGAIN) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(&cl->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |      if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
 | 
	
		
			
				|  |  |        cl->write_closure.cb = client_session_write;
 | 
	
		
			
				|  |  |        cl->write_closure.cb_arg = cl;
 | 
	
	
		
			
				|  | @@ -323,7 +318,7 @@ static void client_session_write(void *arg, /*client*/
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        client_session_shutdown_cb(arg, 1);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&cl->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
 | 
	
		
			
				|  |  |      abort();
 | 
	
	
		
			
				|  | @@ -352,18 +347,18 @@ static void client_start(client *cl, int port) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cl->em_fd = grpc_fd_create(fd);
 | 
	
		
			
				|  |  | +  grpc_pollset_add_fd(&g_pollset, cl->em_fd);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    client_session_write(cl, 1);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Wait for the signal to shutdown a client. */
 | 
	
		
			
				|  |  |  static void client_wait_and_shutdown(client *cl) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&cl->mu);
 | 
	
		
			
				|  |  | -  while (!cl->done) gpr_cv_wait(&cl->done_cv, &cl->mu, gpr_inf_future);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&cl->mu);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&cl->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_destroy(&cl->done_cv);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  | +  while (!cl->done) {
 | 
	
		
			
				|  |  | +    grpc_pollset_work(&g_pollset, gpr_inf_future);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Test grpc_fd. Start an upload server and client, upload a stream of
 | 
	
	
		
			
				|  | @@ -385,38 +380,32 @@ static void test_grpc_fd(void) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct fd_change_data {
 | 
	
		
			
				|  |  | -  gpr_mu mu;
 | 
	
		
			
				|  |  | -  gpr_cv cv;
 | 
	
		
			
				|  |  |    void (*cb_that_ran)(void *, int success);
 | 
	
		
			
				|  |  |  } fd_change_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void init_change_data(fd_change_data *fdc) {
 | 
	
		
			
				|  |  | -  gpr_mu_init(&fdc->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_init(&fdc->cv);
 | 
	
		
			
				|  |  |    fdc->cb_that_ran = NULL;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void destroy_change_data(fd_change_data *fdc) {
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&fdc->mu);
 | 
	
		
			
				|  |  | -  gpr_cv_destroy(&fdc->cv);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void first_read_callback(void *arg /* fd_change_data */, int success) {
 | 
	
		
			
				|  |  |    fd_change_data *fdc = arg;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fdc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    fdc->cb_that_ran = first_read_callback;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(&fdc->cv);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fdc->mu);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(&g_pollset);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void second_read_callback(void *arg /* fd_change_data */, int success) {
 | 
	
		
			
				|  |  |    fd_change_data *fdc = arg;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fdc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    fdc->cb_that_ran = second_read_callback;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(&fdc->cv);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fdc->mu);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick(&g_pollset);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Test that changing the callback we use for notify_on_read actually works.
 | 
	
	
		
			
				|  | @@ -448,6 +437,7 @@ static void test_grpc_fd_change(void) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    em_fd = grpc_fd_create(sv[0]);
 | 
	
		
			
				|  |  | +  grpc_pollset_add_fd(&g_pollset, em_fd);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* Register the first callback, then make its FD readable */
 | 
	
		
			
				|  |  |    grpc_fd_notify_on_read(em_fd, &first_closure);
 | 
	
	
		
			
				|  | @@ -456,12 +446,12 @@ static void test_grpc_fd_change(void) {
 | 
	
		
			
				|  |  |    GPR_ASSERT(result == 1);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* And now wait for it to run. */
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&a.mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    while (a.cb_that_ran == NULL) {
 | 
	
		
			
				|  |  | -    gpr_cv_wait(&a.cv, &a.mu, gpr_inf_future);
 | 
	
		
			
				|  |  | +    grpc_pollset_work(&g_pollset, gpr_inf_future);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    GPR_ASSERT(a.cb_that_ran == first_read_callback);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&a.mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* And drain the socket so we can generate a new read edge */
 | 
	
		
			
				|  |  |    result = read(sv[0], &data, 1);
 | 
	
	
		
			
				|  | @@ -474,13 +464,13 @@ static void test_grpc_fd_change(void) {
 | 
	
		
			
				|  |  |    result = write(sv[1], &data, 1);
 | 
	
		
			
				|  |  |    GPR_ASSERT(result == 1);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&b.mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |    while (b.cb_that_ran == NULL) {
 | 
	
		
			
				|  |  | -    gpr_cv_wait(&b.cv, &b.mu, gpr_inf_future);
 | 
	
		
			
				|  |  | +    grpc_pollset_work(&g_pollset, gpr_inf_future);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    /* Except now we verify that second_read_callback ran instead */
 | 
	
		
			
				|  |  |    GPR_ASSERT(b.cb_that_ran == second_read_callback);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&b.mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_fd_orphan(em_fd, NULL, NULL);
 | 
	
		
			
				|  |  |    destroy_change_data(&a);
 | 
	
	
		
			
				|  | @@ -488,11 +478,17 @@ static void test_grpc_fd_change(void) {
 | 
	
		
			
				|  |  |    close(sv[1]);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void destroy_pollset(void *p) {
 | 
	
		
			
				|  |  | +  grpc_pollset_destroy(p);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  int main(int argc, char **argv) {
 | 
	
		
			
				|  |  |    grpc_test_init(argc, argv);
 | 
	
		
			
				|  |  |    grpc_iomgr_init();
 | 
	
		
			
				|  |  | +  grpc_pollset_init(&g_pollset);
 | 
	
		
			
				|  |  |    test_grpc_fd();
 | 
	
		
			
				|  |  |    test_grpc_fd_change();
 | 
	
		
			
				|  |  | +  grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
 | 
	
		
			
				|  |  |    grpc_iomgr_shutdown();
 | 
	
		
			
				|  |  |    return 0;
 | 
	
		
			
				|  |  |  }
 |