mpmcqueue.cc 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc/support/port_platform.h>
  19. #include "src/core/lib/iomgr/executor/mpmcqueue.h"
  20. #include <grpc/support/alloc.h>
  21. #include <grpc/support/cpu.h>
  22. #include <grpc/support/log.h>
  23. #include <grpc/support/sync.h>
  24. #include <grpc/support/time.h>
  25. #include <inttypes.h>
  26. #include <string.h>
  27. #include "src/core/lib/debug/stats.h"
  28. #include "src/core/lib/gprpp/sync.h"
  29. namespace grpc_core {
  30. DebugOnlyTraceFlag thread_pool_trace(false, "thread_pool_trace");
  31. inline void* MPMCQueue::PopFront() {
  32. void* result = queue_head_->content;
  33. Node* head_to_remove = queue_head_;
  34. queue_head_ = queue_head_->next;
  35. count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);
  36. gpr_timespec wait_time =
  37. gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), head_to_remove->insert_time);
  38. gpr_free(head_to_remove);
  39. // Update Stats info
  40. stats_.num_completed++;
  41. stats_.total_queue_cycles =
  42. gpr_time_add(stats_.total_queue_cycles, wait_time);
  43. stats_.max_queue_cycles = gpr_time_max(
  44. gpr_convert_clock_type(stats_.max_queue_cycles, GPR_TIMESPAN), wait_time);
  45. if (count_.Load(MemoryOrder::RELAXED) == 0) {
  46. stats_.busy_time_cycles =
  47. gpr_time_add(stats_.busy_time_cycles,
  48. gpr_time_sub(gpr_now(GPR_CLOCK_PRECISE), busy_time));
  49. }
  50. if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) {
  51. PrintStats();
  52. }
  53. // Singal waiting thread
  54. if (count_.Load(MemoryOrder::RELAXED) > 0 && num_waiters_ > 0) {
  55. wait_nonempty_.Signal();
  56. }
  57. return result;
  58. }
  59. MPMCQueue::MPMCQueue()
  60. : num_waiters_(0), queue_head_(nullptr), queue_tail_(nullptr) {
  61. count_.Store(0, MemoryOrder::RELAXED);
  62. }
  63. MPMCQueue::~MPMCQueue() {
  64. GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
  65. MutexLock l(&mu_);
  66. GPR_ASSERT(num_waiters_ == 0);
  67. }
  68. void MPMCQueue::Put(void* elem) {
  69. MutexLock l(&mu_);
  70. Node* new_node = static_cast<Node*>(gpr_malloc(sizeof(Node)));
  71. new_node->next = nullptr;
  72. new_node->content = elem;
  73. new_node->insert_time = gpr_now(GPR_CLOCK_PRECISE);
  74. if (count_.Load(MemoryOrder::RELAXED) == 0) {
  75. busy_time = gpr_now(GPR_CLOCK_PRECISE);
  76. queue_head_ = queue_tail_ = new_node;
  77. } else {
  78. queue_tail_->next = new_node;
  79. queue_tail_ = queue_tail_->next;
  80. }
  81. count_.Store(count_.Load(MemoryOrder::RELAXED) + 1, MemoryOrder::RELAXED);
  82. // Update Stats info
  83. stats_.num_started++;
  84. if (GRPC_TRACE_FLAG_ENABLED(thread_pool_trace)) {
  85. PrintStats();
  86. }
  87. if (num_waiters_ > 0) {
  88. wait_nonempty_.Signal();
  89. }
  90. }
  91. void* MPMCQueue::Get() {
  92. MutexLock l(&mu_);
  93. if (count_.Load(MemoryOrder::RELAXED) == 0) {
  94. num_waiters_++;
  95. do {
  96. wait_nonempty_.Wait(&mu_);
  97. } while (count_.Load(MemoryOrder::RELAXED) == 0);
  98. num_waiters_--;
  99. }
  100. GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
  101. return PopFront();
  102. }
  103. void MPMCQueue::PrintStats() {
  104. gpr_log(GPR_INFO, "STATS INFO:");
  105. gpr_log(GPR_INFO, "num_started: %" PRIu64, stats_.num_started);
  106. gpr_log(GPR_INFO, "num_completed: %" PRIu64, stats_.num_completed);
  107. gpr_log(GPR_INFO, "total_queue_cycles: %" PRId32,
  108. gpr_time_to_millis(stats_.total_queue_cycles));
  109. gpr_log(GPR_INFO, "max_queue_cycles: %" PRId32,
  110. gpr_time_to_millis(stats_.max_queue_cycles));
  111. gpr_log(GPR_INFO, "busy_time_cycles: %" PRId32,
  112. gpr_time_to_millis(stats_.busy_time_cycles));
  113. }
  114. } // namespace grpc_core