Преглед изворни кода

Reviewer feedback: no API change

ncteisen пре 8 година
родитељ
комит
311fa5f818
3 измењених фајлова са 23 додато и 38 уклоњено
  1. 0 4
      include/grpc/impl/codegen/slice.h
  2. 22 17
      src/core/lib/iomgr/tcp_posix.cc
  3. 1 17
      src/core/lib/slice/slice_buffer.cc

+ 0 - 4
include/grpc/impl/codegen/slice.h

@@ -108,10 +108,6 @@ typedef struct {
   /** the number of slices allocated in the array. External users (i.e any code
   /** the number of slices allocated in the array. External users (i.e any code
    * outside grpc core) MUST NOT use this field */
    * outside grpc core) MUST NOT use this field */
   size_t capacity;
   size_t capacity;
-  /** the index of the first slice who's memory is still owned by this buffer.
-   * This is only to be used when partially unreffing this slice buffer in
-   * grpc_slice_buffer_partial_reset_and_unref_internal. */
-  size_t idx_of_first_valid_slice;
   /** the combined length of all slices in the array */
   /** the combined length of all slices in the array */
   size_t length;
   size_t length;
   /** inlined elements to avoid allocations */
   /** inlined elements to avoid allocations */

+ 22 - 17
src/core/lib/iomgr/tcp_posix.cc

@@ -81,9 +81,7 @@ typedef struct {
 
 
   grpc_slice_buffer* incoming_buffer;
   grpc_slice_buffer* incoming_buffer;
   grpc_slice_buffer* outgoing_buffer;
   grpc_slice_buffer* outgoing_buffer;
-  /** slice within outgoing_buffer to write next */
-  size_t outgoing_slice_idx;
-  /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+  /** byte within outgoing_buffer->slices[0] to write next */
   size_t outgoing_byte_idx;
   size_t outgoing_byte_idx;
 
 
   grpc_closure* read_cb;
   grpc_closure* read_cb;
@@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
   size_t unwind_slice_idx;
   size_t unwind_slice_idx;
   size_t unwind_byte_idx;
   size_t unwind_byte_idx;
 
 
+  // We always start at zero, because we eagerly unref and trim the slice
+  // buffer as we write
+  size_t outgoing_slice_idx = 0;
+
   for (;;) {
   for (;;) {
     sending_length = 0;
     sending_length = 0;
-    unwind_slice_idx = tcp->outgoing_slice_idx;
+    unwind_slice_idx = outgoing_slice_idx;
     unwind_byte_idx = tcp->outgoing_byte_idx;
     unwind_byte_idx = tcp->outgoing_byte_idx;
-    for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+    for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count &&
                        iov_size != MAX_WRITE_IOVEC;
                        iov_size != MAX_WRITE_IOVEC;
          iov_size++) {
          iov_size++) {
       iov[iov_size].iov_base =
       iov[iov_size].iov_base =
           GRPC_SLICE_START_PTR(
           GRPC_SLICE_START_PTR(
-              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+              tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
           tcp->outgoing_byte_idx;
           tcp->outgoing_byte_idx;
       iov[iov_size].iov_len =
       iov[iov_size].iov_len =
-          GRPC_SLICE_LENGTH(
-              tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+          GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) -
           tcp->outgoing_byte_idx;
           tcp->outgoing_byte_idx;
       sending_length += iov[iov_size].iov_len;
       sending_length += iov[iov_size].iov_len;
-      tcp->outgoing_slice_idx++;
+      outgoing_slice_idx++;
       tcp->outgoing_byte_idx = 0;
       tcp->outgoing_byte_idx = 0;
     }
     }
     GPR_ASSERT(iov_size > 0);
     GPR_ASSERT(iov_size > 0);
@@ -574,10 +575,15 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
 
 
     if (sent_length < 0) {
     if (sent_length < 0) {
       if (errno == EAGAIN) {
       if (errno == EAGAIN) {
-        tcp->outgoing_slice_idx = unwind_slice_idx;
         tcp->outgoing_byte_idx = unwind_byte_idx;
         tcp->outgoing_byte_idx = unwind_byte_idx;
-        grpc_slice_buffer_partial_unref_internal(exec_ctx, tcp->outgoing_buffer,
-                                                 unwind_slice_idx);
+        // unref all and forget about all slices that have been written to this
+        // point
+        for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
+          grpc_slice_unref_internal(exec_ctx,
+                                    tcp->outgoing_buffer->slices[idx]);
+          tcp->outgoing_buffer->count--;
+        }
+        tcp->outgoing_buffer->slices += unwind_slice_idx;
         return false;
         return false;
       } else if (errno == EPIPE) {
       } else if (errno == EPIPE) {
         *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
         *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
@@ -595,9 +601,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
     while (trailing > 0) {
     while (trailing > 0) {
       size_t slice_length;
       size_t slice_length;
 
 
-      tcp->outgoing_slice_idx--;
-      slice_length = GRPC_SLICE_LENGTH(
-          tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+      outgoing_slice_idx--;
+      slice_length =
+          GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]);
       if (slice_length > trailing) {
       if (slice_length > trailing) {
         tcp->outgoing_byte_idx = slice_length - trailing;
         tcp->outgoing_byte_idx = slice_length - trailing;
         break;
         break;
@@ -606,7 +612,7 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
       }
       }
     }
     }
 
 
-    if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+    if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
       *error = GRPC_ERROR_NONE;
       *error = GRPC_ERROR_NONE;
       grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
       grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
                                                  tcp->outgoing_buffer);
                                                  tcp->outgoing_buffer);
@@ -676,7 +682,6 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
     return;
     return;
   }
   }
   tcp->outgoing_buffer = buf;
   tcp->outgoing_buffer = buf;
-  tcp->outgoing_slice_idx = 0;
   tcp->outgoing_byte_idx = 0;
   tcp->outgoing_byte_idx = 0;
 
 
   if (!tcp_flush(exec_ctx, tcp, &error)) {
   if (!tcp_flush(exec_ctx, tcp, &error)) {

+ 1 - 17
src/core/lib/slice/slice_buffer.cc

@@ -62,7 +62,6 @@ void grpc_slice_buffer_init(grpc_slice_buffer* sb) {
   sb->count = 0;
   sb->count = 0;
   sb->length = 0;
   sb->length = 0;
   sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS;
   sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS;
-  sb->idx_of_first_valid_slice = 0;
   sb->base_slices = sb->slices = sb->inlined;
   sb->base_slices = sb->slices = sb->inlined;
 }
 }
 
 
@@ -167,27 +166,12 @@ void grpc_slice_buffer_pop(grpc_slice_buffer* sb) {
 void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx,
 void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx,
                                                 grpc_slice_buffer* sb) {
                                                 grpc_slice_buffer* sb) {
   size_t i;
   size_t i;
-  for (i = sb->idx_of_first_valid_slice; i < sb->count; i++) {
+  for (i = 0; i < sb->count; i++) {
     grpc_slice_unref_internal(exec_ctx, sb->slices[i]);
     grpc_slice_unref_internal(exec_ctx, sb->slices[i]);
   }
   }
 
 
   sb->count = 0;
   sb->count = 0;
   sb->length = 0;
   sb->length = 0;
-  sb->idx_of_first_valid_slice = 0;
-}
-
-void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx,
-                                              grpc_slice_buffer* sb,
-                                              size_t idx) {
-  GPR_ASSERT(idx < sb->count);  //  if idx == count, then partial is not needed
-  GPR_ASSERT(sb->idx_of_first_valid_slice <= idx);
-
-  size_t i;
-  for (i = sb->idx_of_first_valid_slice; i < idx; i++) {
-    grpc_slice_unref_internal(exec_ctx, sb->slices[i]);
-  }
-
-  sb->idx_of_first_valid_slice = idx;
 }
 }
 
 
 void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) {
 void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) {