Jelajahi Sumber

Only allocate what we need in the last slice for proto serialization

yang-g 8 tahun lalu
induk
melakukan
272eebbbcd
2 mengubah file dengan 119 tambahan dan 30 penghapusan
  1. 21 18
      include/grpc++/impl/codegen/proto_utils.h
  2. 98 12
      test/cpp/codegen/proto_utils_test.cc

+ 21 - 18
include/grpc++/impl/codegen/proto_utils.h

@@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
 
 class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
  public:
-  explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
-      : block_size_(block_size), byte_count_(0), have_backup_(false) {
+  GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
+      : block_size_(block_size),
+        total_size_(total_size),
+        byte_count_(0),
+        have_backup_(false) {
     *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
     slice_buffer_ = &(*bp)->data.raw.slice_buffer;
   }
@@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
   }
 
   bool Next(void** data, int* size) override {
+    // Protobuf should not ask for more memory than total_size_.
+    GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
     if (have_backup_) {
       slice_ = backup_slice_;
       have_backup_ = false;
     } else {
-      slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
+      // When less than a whole block is needed, only allocate that much.
+      // But make sure the allocated slice is not inlined.
+      size_t remain = total_size_ - byte_count_ > block_size_
+                          ? block_size_
+                          : total_size_ - byte_count_;
+      slice_ = g_core_codegen_interface->grpc_slice_malloc(
+          remain > GRPC_SLICE_INLINED_SIZE ? remain
+                                           : GRPC_SLICE_INLINED_SIZE + 1);
     }
     *data = GRPC_SLICE_START_PTR(slice_);
     // On win x64, int is only 32bit
@@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
 
   void BackUp(int count) override {
     g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
-    if (count == block_size_) {
+    if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
       backup_slice_ = slice_;
     } else {
       backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
  protected:
   friend class GrpcBufferWriterPeer;
   const int block_size_;
+  const int total_size_;
   int64_t byte_count_;
   grpc_slice_buffer* slice_buffer_;
   bool have_backup_;
@@ -175,20 +188,10 @@ Status GenericSerialize(const grpc::protobuf::Message& msg,
       "BufferWriter must be a subclass of io::ZeroCopyOutputStream");
   *own_buffer = true;
   int byte_size = msg.ByteSize();
-  if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
-    grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
-    GPR_CODEGEN_ASSERT(
-        GRPC_SLICE_END_PTR(slice) ==
-        msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
-    *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
-    g_core_codegen_interface->grpc_slice_unref(slice);
-    return g_core_codegen_interface->ok();
-  } else {
-    BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
-    return msg.SerializeToZeroCopyStream(&writer)
-               ? g_core_codegen_interface->ok()
-               : Status(StatusCode::INTERNAL, "Failed to serialize message");
-  }
+  BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
+  return msg.SerializeToZeroCopyStream(&writer)
+             ? g_core_codegen_interface->ok()
+             : Status(StatusCode::INTERNAL, "Failed to serialize message");
 }
 
 // BufferReader must be a subclass of io::ZeroCopyInputStream.

+ 98 - 12
test/cpp/codegen/proto_utils_test.cc

@@ -16,15 +16,16 @@
  *
  */
 
+#include <grpc++/impl/codegen/grpc_library.h>
 #include <grpc++/impl/codegen/proto_utils.h>
 #include <grpc++/impl/grpc_library.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/slice.h>
 #include <gtest/gtest.h>
 
 namespace grpc {
 namespace internal {
 
-static GrpcLibraryInitializer g_gli_initializer;
-
 // Provide access to GrpcBufferWriter internals.
 class GrpcBufferWriterPeer {
  public:
@@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {};
 // GrpcBufferWriter Next()/Backup() invocations could result in a dangling
 // pointer returned by Next() due to the interaction between grpc_slice inlining
 // and GRPC_SLICE_START_PTR.
-TEST_F(ProtoUtilsTest, BackupNext) {
-  // Ensure the GrpcBufferWriter internals are initialized.
-  g_gli_initializer.summon();
-
+TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
   grpc_byte_buffer* bp;
-  GrpcBufferWriter writer(&bp, 8192);
+  const int block_size = 1024;
+  GrpcBufferWriter writer(&bp, block_size, 8192);
   GrpcBufferWriterPeer peer(&writer);
 
   void* data;
   int size;
   // Allocate a slice.
   ASSERT_TRUE(writer.Next(&data, &size));
-  EXPECT_EQ(8192, size);
-  // Return a single byte. Before the fix that this test acts as a regression
-  // for, this would have resulted in an inlined backup slice.
+  EXPECT_EQ(block_size, size);
+  // Return a single byte.
   writer.BackUp(1);
-  EXPECT_TRUE(!peer.have_backup());
-  // On the next allocation, the slice is non-inlined.
+  EXPECT_FALSE(peer.have_backup());
+  // On the next allocation, the returned slice is non-inlined.
   ASSERT_TRUE(writer.Next(&data, &size));
   EXPECT_TRUE(peer.slice().refcount != NULL);
+  EXPECT_EQ(block_size, size);
 
   // Cleanup.
   g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
 }
 
+namespace {
+
+// Set backup_size to 0 to indicate no backup is needed.
+void BufferWriterTest(int block_size, int total_size, int backup_size) {
+  grpc_byte_buffer* bp;
+  GrpcBufferWriter writer(&bp, block_size, total_size);
+
+  int written_size = 0;
+  void* data;
+  int size = 0;
+  bool backed_up_entire_slice = false;
+
+  while (written_size < total_size) {
+    EXPECT_TRUE(writer.Next(&data, &size));
+    EXPECT_GT(size, 0);
+    EXPECT_TRUE(data);
+    int write_size = size;
+    bool should_backup = false;
+    if (backup_size > 0 && size > backup_size) {
+      write_size = size - backup_size;
+      should_backup = true;
+    } else if (size == backup_size && !backed_up_entire_slice) {
+      // only backup entire slice once.
+      backed_up_entire_slice = true;
+      should_backup = true;
+      write_size = 0;
+    }
+    // May need a last backup.
+    if (write_size + written_size > total_size) {
+      write_size = total_size - written_size;
+      should_backup = true;
+      backup_size = size - write_size;
+      ASSERT_GT(backup_size, 0);
+    }
+    for (int i = 0; i < write_size; i++) {
+      ((uint8_t*)data)[i] = written_size % 128;
+      written_size++;
+    }
+    if (should_backup) {
+      writer.BackUp(backup_size);
+    }
+  }
+  EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
+
+  grpc_byte_buffer_reader reader;
+  grpc_byte_buffer_reader_init(&reader, bp);
+  int read_bytes = 0;
+  while (read_bytes < total_size) {
+    grpc_slice s;
+    EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
+    for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
+      EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
+      read_bytes++;
+    }
+    grpc_slice_unref(s);
+  }
+  EXPECT_EQ(read_bytes, total_size);
+  grpc_byte_buffer_reader_destroy(&reader);
+  grpc_byte_buffer_destroy(bp);
+}
+
+TEST(WriterTest, TinyBlockTinyBackup) {
+  for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
+    BufferWriterTest(i, 256, 1);
+  }
+}
+
+TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
+
+TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
+
+TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
+
+TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
+
+TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
+
+TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
+
+TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
+
+}  // namespace
 }  // namespace internal
 }  // namespace grpc
 
 int main(int argc, char** argv) {
+  // Ensure the GrpcBufferWriter internals are initialized.
+  grpc::internal::GrpcLibraryInitializer init;
+  init.summon();
+  grpc::GrpcLibraryCodegen lib;
+
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }