|
|
@@ -59,12 +59,14 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
|
|
|
: shutdown_(false),
|
|
|
num_pollers_(0),
|
|
|
min_pollers_(min_pollers),
|
|
|
- max_pollers_(max_pollers == -1 ? INT_MAX: max_pollers),
|
|
|
+ max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
|
|
|
num_threads_(0) {}
|
|
|
|
|
|
GrpcRpcManager::~GrpcRpcManager() {
|
|
|
- std::unique_lock<grpc::mutex> lock(mu_);
|
|
|
- GPR_ASSERT(num_threads_ == 0);
|
|
|
+ {
|
|
|
+ std::unique_lock<grpc::mutex> lock(mu_);
|
|
|
+ GPR_ASSERT(num_threads_ == 0);
|
|
|
+ }
|
|
|
|
|
|
CleanupCompletedThreads();
|
|
|
}
|
|
|
@@ -87,8 +89,16 @@ bool GrpcRpcManager::IsShutdown() {
|
|
|
}
|
|
|
|
|
|
void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
|
|
|
- std::unique_lock<grpc::mutex> lock(list_mu_);
|
|
|
- completed_threads_.push_back(thd);
|
|
|
+ {
|
|
|
+ std::unique_lock<grpc::mutex> list_lock(list_mu_);
|
|
|
+ completed_threads_.push_back(thd);
|
|
|
+ }
|
|
|
+
|
|
|
+ grpc::unique_lock<grpc::mutex> lock(mu_);
|
|
|
+ num_threads_--;
|
|
|
+ if (num_threads_ == 0) {
|
|
|
+ shutdown_cv_.notify_one();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void GrpcRpcManager::CleanupCompletedThreads() {
|
|
|
@@ -169,17 +179,10 @@ void GrpcRpcManager::MainWorkLoop() {
|
|
|
}
|
|
|
} while (MaybeContinueAsPoller());
|
|
|
|
|
|
- // If we are here, either GrpcRpcManager is shutting down or it already has
|
|
|
- // enough threads. In both cases, current thread can be terminated
|
|
|
- {
|
|
|
- grpc::unique_lock<grpc::mutex> lock(mu_);
|
|
|
- num_threads_--;
|
|
|
- if (num_threads_ == 0) {
|
|
|
- shutdown_cv_.notify_one();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
CleanupCompletedThreads();
|
|
|
+
|
|
|
+ // If we are here, either GrpcRpcManager is shutting down or it already has
|
|
|
+ // enough threads.
|
|
|
}
|
|
|
|
|
|
} // namespace grpc
|