|  | @@ -45,10 +45,8 @@
 | 
	
		
			
				|  |  |  #include <grpc/support/log.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/useful.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -enum descriptor_state {
 | 
	
		
			
				|  |  | -  NOT_READY = 0,
 | 
	
		
			
				|  |  | -  READY = 1
 | 
	
		
			
				|  |  | -}; /* or a pointer to a closure to call */
 | 
	
		
			
				|  |  | +#define CLOSURE_NOT_READY ((grpc_closure *)0)
 | 
	
		
			
				|  |  | +#define CLOSURE_READY ((grpc_closure *)1)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* We need to keep a freelist not because of any concerns of malloc performance
 | 
	
		
			
				|  |  |   * but instead so that implementations with multiple threads in (for example)
 | 
	
	
		
			
				|  | @@ -88,14 +86,13 @@ static grpc_fd *alloc_fd(int fd) {
 | 
	
		
			
				|  |  |    gpr_mu_unlock(&fd_freelist_mu);
 | 
	
		
			
				|  |  |    if (r == NULL) {
 | 
	
		
			
				|  |  |      r = gpr_malloc(sizeof(grpc_fd));
 | 
	
		
			
				|  |  | -    gpr_mu_init(&r->set_state_mu);
 | 
	
		
			
				|  |  | -    gpr_mu_init(&r->watcher_mu);
 | 
	
		
			
				|  |  | +    gpr_mu_init(&r->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_atm_rel_store(&r->refst, 1);
 | 
	
		
			
				|  |  | -  gpr_atm_rel_store(&r->readst, NOT_READY);
 | 
	
		
			
				|  |  | -  gpr_atm_rel_store(&r->writest, NOT_READY);
 | 
	
		
			
				|  |  | -  gpr_atm_rel_store(&r->shutdown, 0);
 | 
	
		
			
				|  |  | +  r->shutdown = 0;
 | 
	
		
			
				|  |  | +  r->read_closure = CLOSURE_NOT_READY;
 | 
	
		
			
				|  |  | +  r->write_closure = CLOSURE_NOT_READY;
 | 
	
		
			
				|  |  |    r->fd = fd;
 | 
	
		
			
				|  |  |    r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
 | 
	
		
			
				|  |  |        &r->inactive_watcher_root;
 | 
	
	
		
			
				|  | @@ -107,8 +104,7 @@ static grpc_fd *alloc_fd(int fd) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void destroy(grpc_fd *fd) {
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&fd->set_state_mu);
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_destroy(&fd->mu);
 | 
	
		
			
				|  |  |    gpr_free(fd);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -173,39 +169,35 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
 | 
	
		
			
				|  |  |    return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void pollset_kick_locked(grpc_pollset *pollset) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(GRPC_POLLSET_MU(pollset));
 | 
	
		
			
				|  |  | -  grpc_pollset_kick(pollset, NULL);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
 | 
	
		
			
				|  |  | +static void pollset_kick_locked(grpc_fd_watcher *watcher) {
 | 
	
		
			
				|  |  | +  gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
 | 
	
		
			
				|  |  | +  GPR_ASSERT(watcher->worker);
 | 
	
		
			
				|  |  | +  grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
 | 
	
		
			
				|  |  | +                        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
 | 
	
		
			
				|  |  |    if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(fd->inactive_watcher_root.next);
 | 
	
		
			
				|  |  |    } else if (fd->read_watcher) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(fd->read_watcher->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(fd->read_watcher);
 | 
	
		
			
				|  |  |    } else if (fd->write_watcher) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(fd->write_watcher->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(fd->write_watcher);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void maybe_wake_one_watcher(grpc_fd *fd) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | -  maybe_wake_one_watcher_locked(fd);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  static void wake_all_watchers_locked(grpc_fd *fd) {
 | 
	
		
			
				|  |  |    grpc_fd_watcher *watcher;
 | 
	
		
			
				|  |  |    for (watcher = fd->inactive_watcher_root.next;
 | 
	
		
			
				|  |  |         watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(watcher->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(watcher);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (fd->read_watcher) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(fd->read_watcher->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(fd->read_watcher);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
 | 
	
		
			
				|  |  | -    pollset_kick_locked(fd->write_watcher->pollset);
 | 
	
		
			
				|  |  | +    pollset_kick_locked(fd->write_watcher);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -218,7 +210,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
 | 
	
		
			
				|  |  |                      const char *reason) {
 | 
	
		
			
				|  |  |    fd->on_done_closure = on_done;
 | 
	
		
			
				|  |  |    shutdown(fd->fd, SHUT_RDWR);
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  |    REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
 | 
	
		
			
				|  |  |    if (!has_watchers(fd)) {
 | 
	
		
			
				|  |  |      fd->closed = 1;
 | 
	
	
		
			
				|  | @@ -227,7 +219,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      wake_all_watchers_locked(fd);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |    UNREF_BY(fd, 2, reason); /* drop the reference */
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -247,136 +239,121 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
 | 
	
		
			
				|  |  |  void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st,
 | 
	
		
			
				|  |  | -                      grpc_closure *closure) {
 | 
	
		
			
				|  |  | -  switch (gpr_atm_acq_load(st)) {
 | 
	
		
			
				|  |  | -    case NOT_READY:
 | 
	
		
			
				|  |  | -      /* There is no race if the descriptor is already ready, so we skip
 | 
	
		
			
				|  |  | -         the interlocked op in that case.  As long as the app doesn't
 | 
	
		
			
				|  |  | -         try to set the same upcall twice (which it shouldn't) then
 | 
	
		
			
				|  |  | -         oldval should never be anything other than READY or NOT_READY.  We
 | 
	
		
			
				|  |  | -         don't
 | 
	
		
			
				|  |  | -         check for user error on the fast path. */
 | 
	
		
			
				|  |  | -      if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
 | 
	
		
			
				|  |  | -        /* swap was successful -- the closure will run after the next
 | 
	
		
			
				|  |  | -           set_ready call.  NOTE: we don't have an ABA problem here,
 | 
	
		
			
				|  |  | -           since we should never have concurrent calls to the same
 | 
	
		
			
				|  |  | -           notify_on function. */
 | 
	
		
			
				|  |  | -        maybe_wake_one_watcher(fd);
 | 
	
		
			
				|  |  | -        return;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -    /* swap was unsuccessful due to an intervening set_ready call.
 | 
	
		
			
				|  |  | -       Fall through to the READY code below */
 | 
	
		
			
				|  |  | -    case READY:
 | 
	
		
			
				|  |  | -      GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
 | 
	
		
			
				|  |  | -      gpr_atm_rel_store(st, NOT_READY);
 | 
	
		
			
				|  |  | -      grpc_exec_ctx_enqueue(exec_ctx, closure,
 | 
	
		
			
				|  |  | -                            !gpr_atm_acq_load(&fd->shutdown));
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    default: /* WAITING */
 | 
	
		
			
				|  |  | -      /* upcallptr was set to a different closure.  This is an error! */
 | 
	
		
			
				|  |  | -      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -              "User called a notify_on function with a previous callback still "
 | 
	
		
			
				|  |  | -              "pending");
 | 
	
		
			
				|  |  | -      abort();
 | 
	
		
			
				|  |  | +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 | 
	
		
			
				|  |  | +                             grpc_closure **st, grpc_closure *closure) {
 | 
	
		
			
				|  |  | +  if (*st == CLOSURE_NOT_READY) {
 | 
	
		
			
				|  |  | +    /* not ready ==> switch to a waiting state by setting the closure */
 | 
	
		
			
				|  |  | +    *st = closure;
 | 
	
		
			
				|  |  | +  } else if (*st == CLOSURE_READY) {
 | 
	
		
			
				|  |  | +    /* already ready ==> queue the closure to run immediately */
 | 
	
		
			
				|  |  | +    *st = CLOSURE_NOT_READY;
 | 
	
		
			
				|  |  | +    grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
 | 
	
		
			
				|  |  | +    maybe_wake_one_watcher_locked(fd);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    /* upcallptr was set to a different closure.  This is an error! */
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +            "User called a notify_on function with a previous callback still "
 | 
	
		
			
				|  |  | +            "pending");
 | 
	
		
			
				|  |  | +    abort();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
 | 
	
		
			
				|  |  | -  abort();
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 | 
	
		
			
				|  |  | -                             gpr_atm *st) {
 | 
	
		
			
				|  |  | -  gpr_intptr state = gpr_atm_acq_load(st);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  switch (state) {
 | 
	
		
			
				|  |  | -    case READY:
 | 
	
		
			
				|  |  | -      /* duplicate ready, ignore */
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    case NOT_READY:
 | 
	
		
			
				|  |  | -      if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
 | 
	
		
			
				|  |  | -        /* swap was successful -- the closure will run after the next
 | 
	
		
			
				|  |  | -           notify_on call. */
 | 
	
		
			
				|  |  | -        return;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      /* swap was unsuccessful due to an intervening set_ready call.
 | 
	
		
			
				|  |  | -         Fall through to the WAITING code below */
 | 
	
		
			
				|  |  | -      state = gpr_atm_acq_load(st);
 | 
	
		
			
				|  |  | -    default: /* waiting */
 | 
	
		
			
				|  |  | -      GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
 | 
	
		
			
				|  |  | -                 gpr_atm_no_barrier_load(st) != NOT_READY);
 | 
	
		
			
				|  |  | -      grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state,
 | 
	
		
			
				|  |  | -                            !gpr_atm_acq_load(&fd->shutdown));
 | 
	
		
			
				|  |  | -      gpr_atm_rel_store(st, NOT_READY);
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | +/* returns 1 if state becomes not ready */
 | 
	
		
			
				|  |  | +static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 | 
	
		
			
				|  |  | +                            grpc_closure **st) {
 | 
	
		
			
				|  |  | +  if (*st == CLOSURE_READY) {
 | 
	
		
			
				|  |  | +    /* duplicate ready ==> ignore */
 | 
	
		
			
				|  |  | +    return 0;
 | 
	
		
			
				|  |  | +  } else if (*st == CLOSURE_NOT_READY) {
 | 
	
		
			
				|  |  | +    /* not ready, and not waiting ==> flag ready */
 | 
	
		
			
				|  |  | +    *st = CLOSURE_READY;
 | 
	
		
			
				|  |  | +    return 0;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    /* waiting ==> queue closure */
 | 
	
		
			
				|  |  | +    grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
 | 
	
		
			
				|  |  | +    *st = CLOSURE_NOT_READY;
 | 
	
		
			
				|  |  | +    return 1;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) {
 | 
	
		
			
				|  |  | +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
 | 
	
		
			
				|  |  |    /* only one set_ready can be active at once (but there may be a racing
 | 
	
		
			
				|  |  |       notify_on) */
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->set_state_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  |    set_ready_locked(exec_ctx, fd, st);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->set_state_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->set_state_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  |    GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
 | 
	
		
			
				|  |  | -  gpr_atm_rel_store(&fd->shutdown, 1);
 | 
	
		
			
				|  |  | -  set_ready_locked(exec_ctx, fd, &fd->readst);
 | 
	
		
			
				|  |  | -  set_ready_locked(exec_ctx, fd, &fd->writest);
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->set_state_mu);
 | 
	
		
			
				|  |  | +  fd->shutdown = 1;
 | 
	
		
			
				|  |  | +  set_ready_locked(exec_ctx, fd, &fd->read_closure);
 | 
	
		
			
				|  |  | +  set_ready_locked(exec_ctx, fd, &fd->write_closure);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 | 
	
		
			
				|  |  |                              grpc_closure *closure) {
 | 
	
		
			
				|  |  | -  notify_on(exec_ctx, fd, &fd->readst, closure);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  | +  notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
 | 
	
		
			
				|  |  |                               grpc_closure *closure) {
 | 
	
		
			
				|  |  | -  notify_on(exec_ctx, fd, &fd->writest, closure);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  | +  notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
 | 
	
		
			
				|  |  | -                              gpr_uint32 read_mask, gpr_uint32 write_mask,
 | 
	
		
			
				|  |  | -                              grpc_fd_watcher *watcher) {
 | 
	
		
			
				|  |  | +                              grpc_pollset_worker *worker, gpr_uint32 read_mask,
 | 
	
		
			
				|  |  | +                              gpr_uint32 write_mask, grpc_fd_watcher *watcher) {
 | 
	
		
			
				|  |  |    gpr_uint32 mask = 0;
 | 
	
		
			
				|  |  | +  grpc_closure *cur;
 | 
	
		
			
				|  |  | +  int requested;
 | 
	
		
			
				|  |  |    /* keep track of pollers that have requested our events, in case they change
 | 
	
		
			
				|  |  |     */
 | 
	
		
			
				|  |  |    GRPC_FD_REF(fd, "poll");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* if we are shutdown, then don't add to the watcher set */
 | 
	
		
			
				|  |  |    if (gpr_atm_no_barrier_load(&fd->shutdown)) {
 | 
	
		
			
				|  |  |      watcher->fd = NULL;
 | 
	
		
			
				|  |  |      watcher->pollset = NULL;
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +    watcher->worker = NULL;
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |      GRPC_FD_UNREF(fd, "poll");
 | 
	
		
			
				|  |  |      return 0;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* if there is nobody polling for read, but we need to, then start doing so */
 | 
	
		
			
				|  |  | -  if (read_mask && !fd->read_watcher &&
 | 
	
		
			
				|  |  | -      (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
 | 
	
		
			
				|  |  | +  cur = fd->read_closure;
 | 
	
		
			
				|  |  | +  requested = cur != CLOSURE_READY;
 | 
	
		
			
				|  |  | +  if (read_mask && fd->read_watcher == NULL && requested) {
 | 
	
		
			
				|  |  |      fd->read_watcher = watcher;
 | 
	
		
			
				|  |  |      mask |= read_mask;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    /* if there is nobody polling for write, but we need to, then start doing so
 | 
	
		
			
				|  |  |     */
 | 
	
		
			
				|  |  | -  if (write_mask && !fd->write_watcher &&
 | 
	
		
			
				|  |  | -      (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
 | 
	
		
			
				|  |  | +  cur = fd->write_closure;
 | 
	
		
			
				|  |  | +  requested = cur != CLOSURE_READY;
 | 
	
		
			
				|  |  | +  if (write_mask && fd->write_watcher == NULL && requested) {
 | 
	
		
			
				|  |  |      fd->write_watcher = watcher;
 | 
	
		
			
				|  |  |      mask |= write_mask;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    /* if not polling, remember this watcher in case we need someone to later */
 | 
	
		
			
				|  |  | -  if (mask == 0) {
 | 
	
		
			
				|  |  | +  if (mask == 0 && worker != NULL) {
 | 
	
		
			
				|  |  |      watcher->next = &fd->inactive_watcher_root;
 | 
	
		
			
				|  |  |      watcher->prev = watcher->next->prev;
 | 
	
		
			
				|  |  |      watcher->next->prev = watcher->prev->next = watcher;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    watcher->pollset = pollset;
 | 
	
		
			
				|  |  | +  watcher->worker = worker;
 | 
	
		
			
				|  |  |    watcher->fd = fd;
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    return mask;
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -391,24 +368,39 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(&fd->mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    if (watcher == fd->read_watcher) {
 | 
	
		
			
				|  |  |      /* remove read watcher, kick if we still need a read */
 | 
	
		
			
				|  |  |      was_polling = 1;
 | 
	
		
			
				|  |  | -    kick = kick || !got_read;
 | 
	
		
			
				|  |  | +    if (!got_read) {
 | 
	
		
			
				|  |  | +      kick = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      fd->read_watcher = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    if (watcher == fd->write_watcher) {
 | 
	
		
			
				|  |  |      /* remove write watcher, kick if we still need a write */
 | 
	
		
			
				|  |  |      was_polling = 1;
 | 
	
		
			
				|  |  | -    kick = kick || !got_write;
 | 
	
		
			
				|  |  | +    if (!got_write) {
 | 
	
		
			
				|  |  | +      kick = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      fd->write_watcher = NULL;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (!was_polling) {
 | 
	
		
			
				|  |  | +  if (!was_polling && watcher->worker != NULL) {
 | 
	
		
			
				|  |  |      /* remove from inactive list */
 | 
	
		
			
				|  |  |      watcher->next->prev = watcher->prev;
 | 
	
		
			
				|  |  |      watcher->prev->next = watcher->next;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  if (got_read) {
 | 
	
		
			
				|  |  | +    if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
 | 
	
		
			
				|  |  | +      kick = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (got_write) {
 | 
	
		
			
				|  |  | +    if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
 | 
	
		
			
				|  |  | +      kick = 1;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    if (kick) {
 | 
	
		
			
				|  |  |      maybe_wake_one_watcher_locked(fd);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -417,17 +409,17 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
 | 
	
		
			
				|  |  |      close(fd->fd);
 | 
	
		
			
				|  |  |      grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&fd->watcher_mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(&fd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_FD_UNREF(fd, "poll");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
 | 
	
		
			
				|  |  | -  set_ready(exec_ctx, fd, &fd->readst);
 | 
	
		
			
				|  |  | +  set_ready(exec_ctx, fd, &fd->read_closure);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
 | 
	
		
			
				|  |  | -  set_ready(exec_ctx, fd, &fd->writest);
 | 
	
		
			
				|  |  | +  set_ready(exec_ctx, fd, &fd->write_closure);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #endif
 |