Explorar o código

Merge pull request #16788 from ncteisen/channelz-registry

Add Compaction to Channelz Registry
Noah Eisen %!s(int64=7) %!d(string=hai) anos
pai
achega
91d23ac8ea

+ 54 - 6
src/core/lib/channel/channelz_registry.cc

@@ -56,23 +56,71 @@ ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
 intptr_t ChannelzRegistry::InternalRegister(BaseNode* node) {
   MutexLock lock(&mu_);
   entities_.push_back(node);
-  intptr_t uuid = entities_.size();
-  return uuid;
+  return ++uuid_generator_;
+}
+
+void ChannelzRegistry::MaybePerformCompactionLocked() {
+  constexpr double kEmptinessTheshold = 1 / 3;
+  double emptiness_ratio =
+      double(num_empty_slots_) / double(entities_.capacity());
+  if (emptiness_ratio > kEmptinessTheshold) {
+    int front = 0;
+    for (size_t i = 0; i < entities_.size(); ++i) {
+      if (entities_[i] != nullptr) {
+        entities_[front++] = entities_[i];
+      }
+    }
+    for (int i = 0; i < num_empty_slots_; ++i) {
+      entities_.pop_back();
+    }
+    num_empty_slots_ = 0;
+  }
+}
+
+int ChannelzRegistry::FindByUuidLocked(intptr_t target_uuid) {
+  size_t left = 0;
+  size_t right = entities_.size() - 1;
+  while (left <= right) {
+    size_t true_middle = left + (right - left) / 2;
+    size_t first_non_null = true_middle;
+    while (first_non_null < right && entities_[first_non_null] == nullptr) {
+      first_non_null++;
+    }
+    if (entities_[first_non_null] == nullptr) {
+      right = true_middle - 1;
+      continue;
+    }
+    intptr_t uuid = entities_[first_non_null]->uuid();
+    if (uuid == target_uuid) {
+      return int(first_non_null);
+    }
+    if (uuid < target_uuid) {
+      left = first_non_null + 1;
+    } else {
+      right = true_middle - 1;
+    }
+  }
+  return -1;
 }
 
 void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
   GPR_ASSERT(uuid >= 1);
   MutexLock lock(&mu_);
-  GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
-  entities_[uuid - 1] = nullptr;
+  GPR_ASSERT(uuid <= uuid_generator_);
+  int idx = FindByUuidLocked(uuid);
+  GPR_ASSERT(idx >= 0);
+  entities_[idx] = nullptr;
+  num_empty_slots_++;
+  MaybePerformCompactionLocked();
 }
 
 BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) {
   MutexLock lock(&mu_);
-  if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
+  if (uuid < 1 || uuid > uuid_generator_) {
     return nullptr;
   }
-  return entities_[uuid - 1];
+  int idx = FindByUuidLocked(uuid);
+  return idx < 0 ? nullptr : entities_[idx];
 }
 
 char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {

+ 15 - 1
src/core/lib/channel/channelz_registry.h

@@ -30,6 +30,10 @@
 namespace grpc_core {
 namespace channelz {
 
+namespace testing {
+class ChannelzRegistryPeer;
+}
+
 // singleton registry object to track all objects that are needed to support
 // channelz bookkeeping. All objects share globally distributed uuids.
 class ChannelzRegistry {
@@ -61,6 +65,7 @@ class ChannelzRegistry {
  private:
   GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
   GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
+  friend class testing::ChannelzRegistryPeer;
 
   ChannelzRegistry();
   ~ChannelzRegistry();
@@ -82,9 +87,18 @@ class ChannelzRegistry {
   char* InternalGetTopChannels(intptr_t start_channel_id);
   char* InternalGetServers(intptr_t start_server_id);
 
-  // protects entities_ and uuid_
+  // If entities_ has over a certain threshold of empty slots, it will
+  // compact the vector and move all used slots to the front.
+  void MaybePerformCompactionLocked();
+
+  // Performs binary search on entities_ to find the index with that uuid.
+  int FindByUuidLocked(intptr_t uuid);
+
+  // protects members
   gpr_mu mu_;
   InlinedVector<BaseNode*, 20> entities_;
+  intptr_t uuid_generator_ = 0;
+  int num_empty_slots_ = 0;
 };
 
 }  // namespace channelz

+ 8 - 0
src/core/lib/gprpp/inlined_vector.h

@@ -123,6 +123,14 @@ class InlinedVector {
 
   void push_back(T&& value) { emplace_back(std::move(value)); }
 
+  void pop_back() {
+    assert(!empty());
+    size_t s = size();
+    T& value = data()[s - 1];
+    value.~T();
+    size_--;
+  }
+
   void copy_from(const InlinedVector& v) {
     // if v is allocated, copy over the buffer.
     if (v.dynamic_ != nullptr) {

+ 126 - 33
test/core/channel/channelz_registry_test.cc

@@ -43,56 +43,151 @@ namespace grpc_core {
 namespace channelz {
 namespace testing {
 
-TEST(ChannelzRegistryTest, UuidStartsAboveZeroTest) {
-  BaseNode* channelz_channel = nullptr;
-  intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
+class ChannelzRegistryPeer {
+ public:
+  const InlinedVector<BaseNode*, 20>* entities() {
+    return &ChannelzRegistry::Default()->entities_;
+  }
+  int num_empty_slots() {
+    return ChannelzRegistry::Default()->num_empty_slots_;
+  }
+};
+
+class ChannelzRegistryTest : public ::testing::Test {
+ protected:
+  // ensure we always have a fresh registry for tests.
+  void SetUp() override { ChannelzRegistry::Init(); }
+
+  void TearDown() override { ChannelzRegistry::Shutdown(); }
+};
+
+TEST_F(ChannelzRegistryTest, UuidStartsAboveZeroTest) {
+  UniquePtr<BaseNode> channelz_channel =
+      MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
+  intptr_t uuid = channelz_channel->uuid();
   EXPECT_GT(uuid, 0) << "First uuid chose must be greater than zero. Zero if "
                         "reserved according to "
                         "https://github.com/grpc/proposal/blob/master/"
                         "A14-channelz.md";
-  ChannelzRegistry::Unregister(uuid);
 }
 
-TEST(ChannelzRegistryTest, UuidsAreIncreasing) {
-  BaseNode* channelz_channel = nullptr;
-  std::vector<intptr_t> uuids;
-  uuids.reserve(10);
+TEST_F(ChannelzRegistryTest, UuidsAreIncreasing) {
+  std::vector<UniquePtr<BaseNode>> channelz_channels;
+  channelz_channels.reserve(10);
   for (int i = 0; i < 10; ++i) {
-    // reregister the same object. It's ok since we are just testing uuids
-    uuids.push_back(ChannelzRegistry::Register(channelz_channel));
+    channelz_channels.push_back(
+        MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
   }
-  for (size_t i = 1; i < uuids.size(); ++i) {
-    EXPECT_LT(uuids[i - 1], uuids[i]) << "Uuids must always be increasing";
+  for (size_t i = 1; i < channelz_channels.size(); ++i) {
+    EXPECT_LT(channelz_channels[i - 1]->uuid(), channelz_channels[i]->uuid())
+        << "Uuids must always be increasing";
   }
 }
 
-TEST(ChannelzRegistryTest, RegisterGetTest) {
-  // we hackily jam an intptr_t into this pointer to check for equality later
-  BaseNode* channelz_channel = (BaseNode*)42;
-  intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
-  BaseNode* retrieved = ChannelzRegistry::Get(uuid);
-  EXPECT_EQ(channelz_channel, retrieved);
+TEST_F(ChannelzRegistryTest, RegisterGetTest) {
+  UniquePtr<BaseNode> channelz_channel =
+      MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
+  BaseNode* retrieved = ChannelzRegistry::Get(channelz_channel->uuid());
+  EXPECT_EQ(channelz_channel.get(), retrieved);
 }
 
-TEST(ChannelzRegistryTest, RegisterManyItems) {
-  // we hackily jam an intptr_t into this pointer to check for equality later
-  BaseNode* channelz_channel = (BaseNode*)42;
+TEST_F(ChannelzRegistryTest, RegisterManyItems) {
+  std::vector<UniquePtr<BaseNode>> channelz_channels;
   for (int i = 0; i < 100; i++) {
-    intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
-    BaseNode* retrieved = ChannelzRegistry::Get(uuid);
-    EXPECT_EQ(channelz_channel, retrieved);
+    channelz_channels.push_back(
+        MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+    BaseNode* retrieved = ChannelzRegistry::Get(channelz_channels[i]->uuid());
+    EXPECT_EQ(channelz_channels[i].get(), retrieved);
   }
 }
 
-TEST(ChannelzRegistryTest, NullIfNotPresentTest) {
-  // we hackily jam an intptr_t into this pointer to check for equality later
-  BaseNode* channelz_channel = (BaseNode*)42;
-  intptr_t uuid = ChannelzRegistry::Register(channelz_channel);
+TEST_F(ChannelzRegistryTest, NullIfNotPresentTest) {
+  UniquePtr<BaseNode> channelz_channel =
+      MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel);
   // try to pull out a uuid that does not exist.
-  BaseNode* nonexistant = ChannelzRegistry::Get(uuid + 1);
+  BaseNode* nonexistant = ChannelzRegistry::Get(channelz_channel->uuid() + 1);
   EXPECT_EQ(nonexistant, nullptr);
-  BaseNode* retrieved = ChannelzRegistry::Get(uuid);
-  EXPECT_EQ(channelz_channel, retrieved);
+  BaseNode* retrieved = ChannelzRegistry::Get(channelz_channel->uuid());
+  EXPECT_EQ(channelz_channel.get(), retrieved);
+}
+
+TEST_F(ChannelzRegistryTest, TestCompaction) {
+  const int kLoopIterations = 100;
+  // These channels that will stay in the registry for the duration of the test.
+  std::vector<UniquePtr<BaseNode>> even_channels;
+  even_channels.reserve(kLoopIterations);
+  {
+    // The channels will unregister themselves at the end of the for block.
+    std::vector<UniquePtr<BaseNode>> odd_channels;
+    odd_channels.reserve(kLoopIterations);
+    for (int i = 0; i < kLoopIterations; i++) {
+      even_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+      odd_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+    }
+  }
+  // without compaction, there would be exactly kLoopIterations empty slots at
+  // this point. However, one of the unregisters should have triggered
+  // compaction.
+  ChannelzRegistryPeer peer;
+  EXPECT_LT(peer.num_empty_slots(), kLoopIterations);
+}
+
+TEST_F(ChannelzRegistryTest, TestGetAfterCompaction) {
+  const int kLoopIterations = 100;
+  // These channels that will stay in the registry for the duration of the test.
+  std::vector<UniquePtr<BaseNode>> even_channels;
+  even_channels.reserve(kLoopIterations);
+  std::vector<intptr_t> odd_uuids;
+  odd_uuids.reserve(kLoopIterations);
+  {
+    // The channels will unregister themselves at the end of the for block.
+    std::vector<UniquePtr<BaseNode>> odd_channels;
+    odd_channels.reserve(kLoopIterations);
+    for (int i = 0; i < kLoopIterations; i++) {
+      even_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+      odd_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+      odd_uuids.push_back(odd_channels[i]->uuid());
+    }
+  }
+  for (int i = 0; i < kLoopIterations; i++) {
+    BaseNode* retrieved = ChannelzRegistry::Get(even_channels[i]->uuid());
+    EXPECT_EQ(even_channels[i].get(), retrieved);
+    retrieved = ChannelzRegistry::Get(odd_uuids[i]);
+    EXPECT_EQ(retrieved, nullptr);
+  }
+}
+
+TEST_F(ChannelzRegistryTest, TestAddAfterCompaction) {
+  const int kLoopIterations = 100;
+  // These channels that will stay in the registry for the duration of the test.
+  std::vector<UniquePtr<BaseNode>> even_channels;
+  even_channels.reserve(kLoopIterations);
+  std::vector<intptr_t> odd_uuids;
+  odd_uuids.reserve(kLoopIterations);
+  {
+    // The channels will unregister themselves at the end of the for block.
+    std::vector<UniquePtr<BaseNode>> odd_channels;
+    odd_channels.reserve(kLoopIterations);
+    for (int i = 0; i < kLoopIterations; i++) {
+      even_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+      odd_channels.push_back(
+          MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+      odd_uuids.push_back(odd_channels[i]->uuid());
+    }
+  }
+  std::vector<UniquePtr<BaseNode>> more_channels;
+  more_channels.reserve(kLoopIterations);
+  for (int i = 0; i < kLoopIterations; i++) {
+    more_channels.push_back(
+        MakeUnique<BaseNode>(BaseNode::EntityType::kTopLevelChannel));
+    BaseNode* retrieved = ChannelzRegistry::Get(more_channels[i]->uuid());
+    EXPECT_EQ(more_channels[i].get(), retrieved);
+  }
 }
 
 }  // namespace testing
@@ -101,9 +196,7 @@ TEST(ChannelzRegistryTest, NullIfNotPresentTest) {
 
 int main(int argc, char** argv) {
   grpc_test_init(argc, argv);
-  grpc_init();
   ::testing::InitGoogleTest(&argc, argv);
   int ret = RUN_ALL_TESTS();
-  grpc_shutdown();
   return ret;
 }

+ 26 - 0
test/core/gprpp/inlined_vector_test.cc

@@ -264,6 +264,32 @@ TEST(InlinedVectorTest, MoveAssignmentAllocatedAllocated) {
   EXPECT_EQ(move_assigned.data(), old_data);
 }
 
+TEST(InlinedVectorTest, PopBackInlined) {
+  InlinedVector<UniquePtr<int>, 2> v;
+  // Add two elements, pop one out
+  v.push_back(MakeUnique<int>(3));
+  EXPECT_EQ(1UL, v.size());
+  EXPECT_EQ(3, *v[0]);
+  v.push_back(MakeUnique<int>(5));
+  EXPECT_EQ(2UL, v.size());
+  EXPECT_EQ(5, *v[1]);
+  v.pop_back();
+  EXPECT_EQ(1UL, v.size());
+}
+
+TEST(InlinedVectorTest, PopBackAllocated) {
+  const int kInlinedSize = 2;
+  InlinedVector<UniquePtr<int>, kInlinedSize> v;
+  // Add elements to ensure allocated backing.
+  for (size_t i = 0; i < kInlinedSize + 1; ++i) {
+    v.push_back(MakeUnique<int>(3));
+    EXPECT_EQ(i + 1, v.size());
+  }
+  size_t sz = v.size();
+  v.pop_back();
+  EXPECT_EQ(sz - 1, v.size());
+}
+
 }  // namespace testing
 }  // namespace grpc_core