|
@@ -26,6 +26,7 @@
|
|
#include <stdio.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <string.h>
|
|
|
|
|
|
|
|
+#include <map>
|
|
#include <set>
|
|
#include <set>
|
|
|
|
|
|
#include <grpc/support/alloc.h>
|
|
#include <grpc/support/alloc.h>
|
|
@@ -33,6 +34,9 @@
|
|
#include <grpc/support/string_util.h>
|
|
#include <grpc/support/string_util.h>
|
|
#include <grpc/support/sync.h>
|
|
#include <grpc/support/sync.h>
|
|
|
|
|
|
|
|
+#include "absl/container/inlined_vector.h"
|
|
|
|
+#include "absl/types/optional.h"
|
|
|
|
+
|
|
#include "src/core/ext/filters/client_channel/backend_metric.h"
|
|
#include "src/core/ext/filters/client_channel/backend_metric.h"
|
|
#include "src/core/ext/filters/client_channel/backup_poller.h"
|
|
#include "src/core/ext/filters/client_channel/backup_poller.h"
|
|
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
|
|
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
|
|
@@ -52,9 +56,7 @@
|
|
#include "src/core/lib/channel/connected_channel.h"
|
|
#include "src/core/lib/channel/connected_channel.h"
|
|
#include "src/core/lib/channel/status_util.h"
|
|
#include "src/core/lib/channel/status_util.h"
|
|
#include "src/core/lib/gpr/string.h"
|
|
#include "src/core/lib/gpr/string.h"
|
|
-#include "src/core/lib/gprpp/inlined_vector.h"
|
|
|
|
#include "src/core/lib/gprpp/manual_constructor.h"
|
|
#include "src/core/lib/gprpp/manual_constructor.h"
|
|
-#include "src/core/lib/gprpp/map.h"
|
|
|
|
#include "src/core/lib/gprpp/sync.h"
|
|
#include "src/core/lib/gprpp/sync.h"
|
|
#include "src/core/lib/iomgr/iomgr.h"
|
|
#include "src/core/lib/iomgr/iomgr.h"
|
|
#include "src/core/lib/iomgr/polling_entity.h"
|
|
#include "src/core/lib/iomgr/polling_entity.h"
|
|
@@ -293,7 +295,7 @@ class ChannelData {
|
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
|
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
|
|
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
|
|
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
|
|
ConnectivityStateTracker state_tracker_;
|
|
ConnectivityStateTracker state_tracker_;
|
|
- grpc_core::UniquePtr<char> health_check_service_name_;
|
|
|
|
|
|
+ std::string health_check_service_name_;
|
|
RefCountedPtr<ServiceConfig> saved_service_config_;
|
|
RefCountedPtr<ServiceConfig> saved_service_config_;
|
|
bool received_first_resolver_result_ = false;
|
|
bool received_first_resolver_result_ = false;
|
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
|
@@ -305,8 +307,7 @@ class ChannelData {
|
|
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
|
|
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
|
|
// Updates are queued here in the control plane work_serializer and then
|
|
// Updates are queued here in the control plane work_serializer and then
|
|
// applied in the data plane mutex when the picker is updated.
|
|
// applied in the data plane mutex when the picker is updated.
|
|
- std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
|
|
|
|
- RefCountedPtrLess<SubchannelWrapper>>
|
|
|
|
|
|
+ std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
|
|
pending_subchannel_updates_;
|
|
pending_subchannel_updates_;
|
|
|
|
|
|
//
|
|
//
|
|
@@ -827,7 +828,7 @@ class CallData {
|
|
// Note: We inline the cache for the first 3 send_message ops and use
|
|
// Note: We inline the cache for the first 3 send_message ops and use
|
|
// dynamic allocation after that. This number was essentially picked
|
|
// dynamic allocation after that. This number was essentially picked
|
|
// at random; it could be changed in the future to tune performance.
|
|
// at random; it could be changed in the future to tune performance.
|
|
- InlinedVector<ByteStreamCache*, 3> send_messages_;
|
|
|
|
|
|
+ absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
|
|
// send_trailing_metadata
|
|
// send_trailing_metadata
|
|
bool seen_send_trailing_metadata_ = false;
|
|
bool seen_send_trailing_metadata_ = false;
|
|
grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
|
|
grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
|
|
@@ -849,7 +850,7 @@ class CallData {
|
|
class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
public:
|
|
public:
|
|
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
|
|
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
|
|
- grpc_core::UniquePtr<char> health_check_service_name)
|
|
|
|
|
|
+ std::string health_check_service_name)
|
|
: SubchannelInterface(&grpc_client_channel_routing_trace),
|
|
: SubchannelInterface(&grpc_client_channel_routing_trace),
|
|
chand_(chand),
|
|
chand_(chand),
|
|
subchannel_(subchannel),
|
|
subchannel_(subchannel),
|
|
@@ -896,7 +897,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
grpc_connectivity_state CheckConnectivityState() override {
|
|
grpc_connectivity_state CheckConnectivityState() override {
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
|
|
grpc_connectivity_state connectivity_state =
|
|
grpc_connectivity_state connectivity_state =
|
|
- subchannel_->CheckConnectivityState(health_check_service_name_.get(),
|
|
|
|
|
|
+ subchannel_->CheckConnectivityState(health_check_service_name_,
|
|
&connected_subchannel);
|
|
&connected_subchannel);
|
|
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
|
|
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
|
|
return connectivity_state;
|
|
return connectivity_state;
|
|
@@ -911,9 +912,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
Ref(DEBUG_LOCATION, "WatcherWrapper"),
|
|
Ref(DEBUG_LOCATION, "WatcherWrapper"),
|
|
initial_state);
|
|
initial_state);
|
|
subchannel_->WatchConnectivityState(
|
|
subchannel_->WatchConnectivityState(
|
|
- initial_state,
|
|
|
|
- grpc_core::UniquePtr<char>(
|
|
|
|
- gpr_strdup(health_check_service_name_.get())),
|
|
|
|
|
|
+ initial_state, health_check_service_name_,
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
|
|
watcher_wrapper));
|
|
watcher_wrapper));
|
|
}
|
|
}
|
|
@@ -922,7 +921,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
ConnectivityStateWatcherInterface* watcher) override {
|
|
ConnectivityStateWatcherInterface* watcher) override {
|
|
auto it = watcher_map_.find(watcher);
|
|
auto it = watcher_map_.find(watcher);
|
|
GPR_ASSERT(it != watcher_map_.end());
|
|
GPR_ASSERT(it != watcher_map_.end());
|
|
- subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
|
|
|
|
|
|
+ subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
|
|
it->second);
|
|
it->second);
|
|
watcher_map_.erase(it);
|
|
watcher_map_.erase(it);
|
|
}
|
|
}
|
|
@@ -935,14 +934,13 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
return subchannel_->channel_args();
|
|
return subchannel_->channel_args();
|
|
}
|
|
}
|
|
|
|
|
|
- void UpdateHealthCheckServiceName(
|
|
|
|
- grpc_core::UniquePtr<char> health_check_service_name) {
|
|
|
|
|
|
+ void UpdateHealthCheckServiceName(std::string health_check_service_name) {
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
|
|
gpr_log(GPR_INFO,
|
|
gpr_log(GPR_INFO,
|
|
"chand=%p: subchannel wrapper %p: updating health check service "
|
|
"chand=%p: subchannel wrapper %p: updating health check service "
|
|
"name from \"%s\" to \"%s\"",
|
|
"name from \"%s\" to \"%s\"",
|
|
- chand_, this, health_check_service_name_.get(),
|
|
|
|
- health_check_service_name.get());
|
|
|
|
|
|
+ chand_, this, health_check_service_name_.c_str(),
|
|
|
|
+ health_check_service_name.c_str());
|
|
}
|
|
}
|
|
for (auto& p : watcher_map_) {
|
|
for (auto& p : watcher_map_) {
|
|
WatcherWrapper*& watcher_wrapper = p.second;
|
|
WatcherWrapper*& watcher_wrapper = p.second;
|
|
@@ -957,13 +955,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
// problem, we may be able to handle it by waiting for the new
|
|
// problem, we may be able to handle it by waiting for the new
|
|
// watcher to report READY before we use it to replace the old one.
|
|
// watcher to report READY before we use it to replace the old one.
|
|
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
|
|
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
|
|
- subchannel_->CancelConnectivityStateWatch(
|
|
|
|
- health_check_service_name_.get(), watcher_wrapper);
|
|
|
|
|
|
+ subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
|
|
|
|
+ watcher_wrapper);
|
|
watcher_wrapper = replacement;
|
|
watcher_wrapper = replacement;
|
|
subchannel_->WatchConnectivityState(
|
|
subchannel_->WatchConnectivityState(
|
|
- replacement->last_seen_state(),
|
|
|
|
- grpc_core::UniquePtr<char>(
|
|
|
|
- gpr_strdup(health_check_service_name.get())),
|
|
|
|
|
|
+ replacement->last_seen_state(), health_check_service_name,
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
|
|
replacement));
|
|
replacement));
|
|
}
|
|
}
|
|
@@ -1101,7 +1097,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
|
|
|
|
|
|
ChannelData* chand_;
|
|
ChannelData* chand_;
|
|
Subchannel* subchannel_;
|
|
Subchannel* subchannel_;
|
|
- grpc_core::UniquePtr<char> health_check_service_name_;
|
|
|
|
|
|
+ std::string health_check_service_name_;
|
|
// Maps from the address of the watcher passed to us by the LB policy
|
|
// Maps from the address of the watcher passed to us by the LB policy
|
|
// to the address of the WrapperWatcher that we passed to the underlying
|
|
// to the address of the WrapperWatcher that we passed to the underlying
|
|
// subchannel. This is needed so that when the LB policy calls
|
|
// subchannel. This is needed so that when the LB policy calls
|
|
@@ -1264,10 +1260,9 @@ class ChannelData::ClientChannelControlHelper
|
|
const grpc_channel_args& args) override {
|
|
const grpc_channel_args& args) override {
|
|
bool inhibit_health_checking = grpc_channel_arg_get_bool(
|
|
bool inhibit_health_checking = grpc_channel_arg_get_bool(
|
|
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
|
|
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
|
|
- grpc_core::UniquePtr<char> health_check_service_name;
|
|
|
|
|
|
+ std::string health_check_service_name;
|
|
if (!inhibit_health_checking) {
|
|
if (!inhibit_health_checking) {
|
|
- health_check_service_name.reset(
|
|
|
|
- gpr_strdup(chand_->health_check_service_name_.get()));
|
|
|
|
|
|
+ health_check_service_name = chand_->health_check_service_name_;
|
|
}
|
|
}
|
|
static const char* args_to_remove[] = {
|
|
static const char* args_to_remove[] = {
|
|
GRPC_ARG_INHIBIT_HEALTH_CHECKING,
|
|
GRPC_ARG_INHIBIT_HEALTH_CHECKING,
|
|
@@ -1462,7 +1457,7 @@ void ChannelData::UpdateStateAndPickerLocked(
|
|
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
|
|
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
|
|
// Clean the control plane when entering IDLE.
|
|
// Clean the control plane when entering IDLE.
|
|
if (picker_ == nullptr) {
|
|
if (picker_ == nullptr) {
|
|
- health_check_service_name_.reset();
|
|
|
|
|
|
+ health_check_service_name_.clear();
|
|
saved_service_config_.reset();
|
|
saved_service_config_.reset();
|
|
received_first_resolver_result_ = false;
|
|
received_first_resolver_result_ = false;
|
|
}
|
|
}
|
|
@@ -1705,16 +1700,15 @@ bool ChannelData::ProcessResolverResultLocked(
|
|
}
|
|
}
|
|
// Save health check service name.
|
|
// Save health check service name.
|
|
if (service_config != nullptr) {
|
|
if (service_config != nullptr) {
|
|
- chand->health_check_service_name_.reset(
|
|
|
|
- gpr_strdup(parsed_service_config->health_check_service_name()));
|
|
|
|
|
|
+ chand->health_check_service_name_ =
|
|
|
|
+ parsed_service_config->health_check_service_name();
|
|
} else {
|
|
} else {
|
|
- chand->health_check_service_name_.reset();
|
|
|
|
|
|
+ chand->health_check_service_name_.clear();
|
|
}
|
|
}
|
|
// Update health check service name used by existing subchannel wrappers.
|
|
// Update health check service name used by existing subchannel wrappers.
|
|
for (auto* subchannel_wrapper : chand->subchannel_wrappers_) {
|
|
for (auto* subchannel_wrapper : chand->subchannel_wrappers_) {
|
|
subchannel_wrapper->UpdateHealthCheckServiceName(
|
|
subchannel_wrapper->UpdateHealthCheckServiceName(
|
|
- grpc_core::UniquePtr<char>(
|
|
|
|
- gpr_strdup(chand->health_check_service_name_.get())));
|
|
|
|
|
|
+ chand->health_check_service_name_);
|
|
}
|
|
}
|
|
// Save service config.
|
|
// Save service config.
|
|
chand->saved_service_config_ = std::move(service_config);
|
|
chand->saved_service_config_ = std::move(service_config);
|
|
@@ -1726,7 +1720,7 @@ bool ChannelData::ProcessResolverResultLocked(
|
|
chand->received_first_resolver_result_ = true;
|
|
chand->received_first_resolver_result_ = true;
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
|
|
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
|
|
if (parsed_service_config != nullptr) {
|
|
if (parsed_service_config != nullptr) {
|
|
- Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
|
|
|
|
|
|
+ absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
|
|
retry_throttle_config = parsed_service_config->retry_throttling();
|
|
retry_throttle_config = parsed_service_config->retry_throttling();
|
|
if (retry_throttle_config.has_value()) {
|
|
if (retry_throttle_config.has_value()) {
|
|
retry_throttle_data =
|
|
retry_throttle_data =
|