|  | @@ -258,6 +258,8 @@ typedef struct {
 | 
	
		
			
				|  |  |    grpc_endpoint base;
 | 
	
		
			
				|  |  |    grpc_fd *em_fd;
 | 
	
		
			
				|  |  |    int fd;
 | 
	
		
			
				|  |  | +  int iov_size;            /* Number of slices to allocate per read attempt */
 | 
	
		
			
				|  |  | +  int finished_edge;
 | 
	
		
			
				|  |  |    size_t slice_size;
 | 
	
		
			
				|  |  |    gpr_refcount refcount;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #define INLINE_SLICE_BUFFER_SIZE 8
 | 
	
		
			
				|  |  |  #define MAX_READ_IOVEC 4
 | 
	
		
			
				|  |  | -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
 | 
	
		
			
				|  |  | -  grpc_tcp *tcp = (grpc_tcp *)arg;
 | 
	
		
			
				|  |  | -  int iov_size = 1;
 | 
	
		
			
				|  |  | +static void grpc_tcp_continue_read(grpc_tcp *tcp) {
 | 
	
		
			
				|  |  |    gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
 | 
	
		
			
				|  |  |    struct msghdr msg;
 | 
	
		
			
				|  |  |    struct iovec iov[MAX_READ_IOVEC];
 | 
	
	
		
			
				|  | @@ -327,88 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
 | 
	
		
			
				|  |  |    gpr_slice *final_slices;
 | 
	
		
			
				|  |  |    size_t final_nslices;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!tcp->finished_edge);
 | 
	
		
			
				|  |  |    GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
 | 
	
		
			
				|  |  |    slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
 | 
	
		
			
				|  |  |                     0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (!success) {
 | 
	
		
			
				|  |  | -    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
 | 
	
		
			
				|  |  | -    grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | -    return;
 | 
	
		
			
				|  |  | +  allocated_bytes = slice_state_append_blocks_into_iovec(
 | 
	
		
			
				|  |  | +      &read_state, iov, tcp->iov_size, tcp->slice_size);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  msg.msg_name = NULL;
 | 
	
		
			
				|  |  | +  msg.msg_namelen = 0;
 | 
	
		
			
				|  |  | +  msg.msg_iov = iov;
 | 
	
		
			
				|  |  | +  msg.msg_iovlen = tcp->iov_size;
 | 
	
		
			
				|  |  | +  msg.msg_control = NULL;
 | 
	
		
			
				|  |  | +  msg.msg_controllen = 0;
 | 
	
		
			
				|  |  | +  msg.msg_flags = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
 | 
	
		
			
				|  |  | +  do {
 | 
	
		
			
				|  |  | +    read_bytes = recvmsg(tcp->fd, &msg, 0);
 | 
	
		
			
				|  |  | +  } while (read_bytes < 0 && errno == EINTR);
 | 
	
		
			
				|  |  | +  GRPC_TIMER_MARK(RECVMSG_END, 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (read_bytes < allocated_bytes) {
 | 
	
		
			
				|  |  | +    /* TODO(klempner): Consider a second read first, in hopes of getting a
 | 
	
		
			
				|  |  | +     * quick EAGAIN and saving a bunch of allocations. */
 | 
	
		
			
				|  |  | +    slice_state_remove_last(&read_state, read_bytes < 0
 | 
	
		
			
				|  |  | +                                             ? allocated_bytes
 | 
	
		
			
				|  |  | +                                             : allocated_bytes - read_bytes);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* TODO(klempner): Limit the amount we read at once. */
 | 
	
		
			
				|  |  | -  for (;;) {
 | 
	
		
			
				|  |  | -    allocated_bytes = slice_state_append_blocks_into_iovec(
 | 
	
		
			
				|  |  | -        &read_state, iov, iov_size, tcp->slice_size);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    msg.msg_name = NULL;
 | 
	
		
			
				|  |  | -    msg.msg_namelen = 0;
 | 
	
		
			
				|  |  | -    msg.msg_iov = iov;
 | 
	
		
			
				|  |  | -    msg.msg_iovlen = iov_size;
 | 
	
		
			
				|  |  | -    msg.msg_control = NULL;
 | 
	
		
			
				|  |  | -    msg.msg_controllen = 0;
 | 
	
		
			
				|  |  | -    msg.msg_flags = 0;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
 | 
	
		
			
				|  |  | -    do {
 | 
	
		
			
				|  |  | -      read_bytes = recvmsg(tcp->fd, &msg, 0);
 | 
	
		
			
				|  |  | -    } while (read_bytes < 0 && errno == EINTR);
 | 
	
		
			
				|  |  | -    GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    if (read_bytes < allocated_bytes) {
 | 
	
		
			
				|  |  | -      /* TODO(klempner): Consider a second read first, in hopes of getting a
 | 
	
		
			
				|  |  | -       * quick EAGAIN and saving a bunch of allocations. */
 | 
	
		
			
				|  |  | -      slice_state_remove_last(&read_state, read_bytes < 0
 | 
	
		
			
				|  |  | -                                               ? allocated_bytes
 | 
	
		
			
				|  |  | -                                               : allocated_bytes - read_bytes);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    if (read_bytes < 0) {
 | 
	
		
			
				|  |  | -      /* NB: After calling the user_cb a parallel call of the read handler may
 | 
	
		
			
				|  |  | -       * be running. */
 | 
	
		
			
				|  |  | -      if (errno == EAGAIN) {
 | 
	
		
			
				|  |  | -        if (slice_state_has_available(&read_state)) {
 | 
	
		
			
				|  |  | -          /* TODO(klempner): We should probably do the call into the application
 | 
	
		
			
				|  |  | -             without all this junk on the stack */
 | 
	
		
			
				|  |  | -          /* FIXME(klempner): Refcount properly */
 | 
	
		
			
				|  |  | -          slice_state_transfer_ownership(&read_state, &final_slices,
 | 
	
		
			
				|  |  | -                                         &final_nslices);
 | 
	
		
			
				|  |  | -          call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
 | 
	
		
			
				|  |  | -          slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | -          grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | -        } else {
 | 
	
		
			
				|  |  | -          /* Spurious read event, consume it here */
 | 
	
		
			
				|  |  | -          slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | -          grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        /* TODO(klempner): Log interesting errors */
 | 
	
		
			
				|  |  | -        call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
 | 
	
		
			
				|  |  | -        slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | -        grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | +  if (read_bytes < 0) {
 | 
	
		
			
				|  |  | +    /* NB: After calling the user_cb a parallel call of the read handler may
 | 
	
		
			
				|  |  | +     * be running. */
 | 
	
		
			
				|  |  | +    if (errno == EAGAIN) {
 | 
	
		
			
				|  |  | +      if (tcp->iov_size > 1) {
 | 
	
		
			
				|  |  | +        tcp->iov_size /= 2;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    } else if (read_bytes == 0) {
 | 
	
		
			
				|  |  | -      /* 0 read size ==> end of stream */
 | 
	
		
			
				|  |  |        if (slice_state_has_available(&read_state)) {
 | 
	
		
			
				|  |  | -        /* there were bytes already read: pass them up to the application */
 | 
	
		
			
				|  |  | +        /* TODO(klempner): We should probably do the call into the application
 | 
	
		
			
				|  |  | +           without all this junk on the stack */
 | 
	
		
			
				|  |  | +        /* FIXME(klempner): Refcount properly */
 | 
	
		
			
				|  |  |          slice_state_transfer_ownership(&read_state, &final_slices,
 | 
	
		
			
				|  |  |                                         &final_nslices);
 | 
	
		
			
				|  |  | -        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
 | 
	
		
			
				|  |  | +        tcp->finished_edge = 1;
 | 
	
		
			
				|  |  | +        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
 | 
	
		
			
				|  |  | +        slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | +        grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  |        } else {
 | 
	
		
			
				|  |  | -        call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
 | 
	
		
			
				|  |  | +        /* We've consumed the edge, request a new one */
 | 
	
		
			
				|  |  | +        slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | +        grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      /* TODO(klempner): Log interesting errors */
 | 
	
		
			
				|  |  | +      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
 | 
	
		
			
				|  |  |        slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  |        grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | -      return;
 | 
	
		
			
				|  |  | -    } else if (iov_size < MAX_READ_IOVEC) {
 | 
	
		
			
				|  |  | -      ++iov_size;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +  } else if (read_bytes == 0) {
 | 
	
		
			
				|  |  | +    /* 0 read size ==> end of stream */
 | 
	
		
			
				|  |  | +    if (slice_state_has_available(&read_state)) {
 | 
	
		
			
				|  |  | +      /* there were bytes already read: pass them up to the application */
 | 
	
		
			
				|  |  | +      slice_state_transfer_ownership(&read_state, &final_slices,
 | 
	
		
			
				|  |  | +                                     &final_nslices);
 | 
	
		
			
				|  |  | +      call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | +    grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    if (tcp->iov_size < MAX_READ_IOVEC) {
 | 
	
		
			
				|  |  | +      ++tcp->iov_size;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    GPR_ASSERT(slice_state_has_available(&read_state));
 | 
	
		
			
				|  |  | +    slice_state_transfer_ownership(&read_state, &final_slices,
 | 
	
		
			
				|  |  | +                                   &final_nslices);
 | 
	
		
			
				|  |  | +    call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
 | 
	
		
			
				|  |  | +    slice_state_destroy(&read_state);
 | 
	
		
			
				|  |  | +    grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
 | 
	
		
			
				|  |  | +  grpc_tcp *tcp = (grpc_tcp *)arg;
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!tcp->finished_edge);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (!success) {
 | 
	
		
			
				|  |  | +    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
 | 
	
		
			
				|  |  | +    grpc_tcp_unref(tcp);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    grpc_tcp_continue_read(tcp);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
 | 
	
		
			
				|  |  |                                      void *user_data) {
 | 
	
		
			
				|  |  |    grpc_tcp *tcp = (grpc_tcp *)ep;
 | 
	
	
		
			
				|  | @@ -416,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
 | 
	
		
			
				|  |  |    tcp->read_cb = cb;
 | 
	
		
			
				|  |  |    tcp->read_user_data = user_data;
 | 
	
		
			
				|  |  |    gpr_ref(&tcp->refcount);
 | 
	
		
			
				|  |  | -  grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
 | 
	
		
			
				|  |  | +  if (tcp->finished_edge) {
 | 
	
		
			
				|  |  | +    tcp->finished_edge = 0;
 | 
	
		
			
				|  |  | +    grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #define MAX_WRITE_IOVEC 16
 | 
	
	
		
			
				|  | @@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
 | 
	
		
			
				|  |  |    tcp->read_user_data = NULL;
 | 
	
		
			
				|  |  |    tcp->write_user_data = NULL;
 | 
	
		
			
				|  |  |    tcp->slice_size = slice_size;
 | 
	
		
			
				|  |  | +  tcp->iov_size = 1;
 | 
	
		
			
				|  |  | +  tcp->finished_edge = 1;
 | 
	
		
			
				|  |  |    slice_state_init(&tcp->write_state, NULL, 0, 0);
 | 
	
		
			
				|  |  |    /* paired with unref in grpc_tcp_destroy */
 | 
	
		
			
				|  |  |    gpr_ref_init(&tcp->refcount, 1);
 |