|
|
@@ -162,11 +162,15 @@ class RoundRobin : public LoadBalancingPolicy {
|
|
|
// subchannels in each state.
|
|
|
void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
|
|
|
|
|
|
+ size_t GetNextReadySubchannelIndexLocked();
|
|
|
+ void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
|
|
|
+
|
|
|
private:
|
|
|
size_t num_ready_ = 0;
|
|
|
size_t num_connecting_ = 0;
|
|
|
size_t num_transient_failure_ = 0;
|
|
|
grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
|
|
|
+ size_t last_ready_index_ = -1; // Index into list of last pick.
|
|
|
};
|
|
|
|
|
|
void ShutdownLocked() override;
|
|
|
@@ -174,8 +178,6 @@ class RoundRobin : public LoadBalancingPolicy {
|
|
|
void StartPickingLocked();
|
|
|
bool DoPickLocked(PickState* pick);
|
|
|
void DrainPendingPicksLocked();
|
|
|
- size_t GetNextReadySubchannelIndexLocked();
|
|
|
- void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
|
|
|
|
|
|
/** list of subchannels */
|
|
|
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
|
|
|
@@ -193,8 +195,6 @@ class RoundRobin : public LoadBalancingPolicy {
|
|
|
PickState* pending_picks_ = nullptr;
|
|
|
/** our connectivity state tracker */
|
|
|
grpc_connectivity_state_tracker state_tracker_;
|
|
|
- /** Index into subchannel_list_ for last pick. */
|
|
|
- size_t last_ready_subchannel_index_ = -1;
|
|
|
};
|
|
|
|
|
|
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
|
|
|
@@ -220,68 +220,6 @@ RoundRobin::~RoundRobin() {
|
|
|
grpc_subchannel_index_unref();
|
|
|
}
|
|
|
|
|
|
-/** Returns the index into p->subchannel_list->subchannels of the next
|
|
|
- * subchannel in READY state, or p->subchannel_list->num_subchannels if no
|
|
|
- * subchannel is READY.
|
|
|
- *
|
|
|
- * Note that this function does *not* update p->last_ready_subchannel_index.
|
|
|
- * The caller must do that if it returns a pick. */
|
|
|
-size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
|
|
|
- GPR_ASSERT(subchannel_list_ != nullptr);
|
|
|
- if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
- gpr_log(GPR_INFO,
|
|
|
- "[RR %p] getting next ready subchannel (out of %" PRIuPTR
|
|
|
- "), "
|
|
|
- "last_ready_subchannel_index=%" PRIuPTR,
|
|
|
- this, subchannel_list_->num_subchannels(),
|
|
|
- last_ready_subchannel_index_);
|
|
|
- }
|
|
|
- for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
|
|
|
- const size_t index = (i + last_ready_subchannel_index_ + 1) %
|
|
|
- subchannel_list_->num_subchannels();
|
|
|
- if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
- gpr_log(
|
|
|
- GPR_INFO,
|
|
|
- "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
|
|
|
- ": state=%s",
|
|
|
- this, subchannel_list_->subchannel(index)->subchannel(),
|
|
|
- subchannel_list_.get(), index,
|
|
|
- grpc_connectivity_state_name(
|
|
|
- subchannel_list_->subchannel(index)->connectivity_state()));
|
|
|
- }
|
|
|
- if (subchannel_list_->subchannel(index)->connectivity_state() ==
|
|
|
- GRPC_CHANNEL_READY) {
|
|
|
- if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
- gpr_log(GPR_INFO,
|
|
|
- "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
|
|
|
- " of subchannel_list %p",
|
|
|
- this, subchannel_list_->subchannel(index)->subchannel(), index,
|
|
|
- subchannel_list_.get());
|
|
|
- }
|
|
|
- return index;
|
|
|
- }
|
|
|
- }
|
|
|
- if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
- gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
|
|
|
- }
|
|
|
- return subchannel_list_->num_subchannels();
|
|
|
-}
|
|
|
-
|
|
|
-// Sets last_ready_subchannel_index_ to last_ready_index.
|
|
|
-void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
|
|
|
- GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels());
|
|
|
- last_ready_subchannel_index_ = last_ready_index;
|
|
|
- if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
- gpr_log(GPR_INFO,
|
|
|
- "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
|
|
|
- " (SC %p, CSC %p)",
|
|
|
- this, last_ready_index,
|
|
|
- subchannel_list_->subchannel(last_ready_index)->subchannel(),
|
|
|
- subchannel_list_->subchannel(last_ready_index)
|
|
|
- ->connected_subchannel());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
|
|
|
PickState* pick;
|
|
|
while ((pick = pending_picks_) != nullptr) {
|
|
|
@@ -366,7 +304,8 @@ void RoundRobin::ExitIdleLocked() {
|
|
|
}
|
|
|
|
|
|
bool RoundRobin::DoPickLocked(PickState* pick) {
|
|
|
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
|
|
|
+ const size_t next_ready_index =
|
|
|
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
|
|
|
if (next_ready_index < subchannel_list_->num_subchannels()) {
|
|
|
/* readily available, report right away */
|
|
|
RoundRobinSubchannelData* sd =
|
|
|
@@ -384,7 +323,7 @@ bool RoundRobin::DoPickLocked(PickState* pick) {
|
|
|
sd->subchannel_list(), next_ready_index);
|
|
|
}
|
|
|
/* only advance the last picked pointer if the selection was used */
|
|
|
- UpdateLastReadySubchannelIndexLocked(next_ready_index);
|
|
|
+ subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
|
@@ -511,7 +450,7 @@ void RoundRobin::RoundRobinSubchannelList::
|
|
|
// Promote this list to p->subchannel_list_.
|
|
|
// This list must be p->latest_pending_subchannel_list_, because
|
|
|
// any previous update would have been shut down already and
|
|
|
- // therefore weeded out in ProcessConnectivityChangeLocked().
|
|
|
+ // therefore we would not be receiving a notification for them.
|
|
|
GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
|
|
|
GPR_ASSERT(!shutting_down());
|
|
|
if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
@@ -526,7 +465,6 @@ void RoundRobin::RoundRobinSubchannelList::
|
|
|
num_subchannels());
|
|
|
}
|
|
|
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
|
|
|
- p->last_ready_subchannel_index_ = -1;
|
|
|
}
|
|
|
// Drain pending picks.
|
|
|
p->DrainPendingPicksLocked();
|
|
|
@@ -580,6 +518,62 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
|
|
|
RenewConnectivityWatchLocked();
|
|
|
}
|
|
|
|
|
|
+/** Returns the index into p->subchannel_list->subchannels of the next
|
|
|
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
|
|
|
+ * subchannel is READY.
|
|
|
+ *
|
|
|
+ * Note that this function does *not* update p->last_ready_subchannel_index.
|
|
|
+ * The caller must do that if it returns a pick. */
|
|
|
+size_t
|
|
|
+RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
|
|
|
+ if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
+ gpr_log(GPR_INFO,
|
|
|
+ "[RR %p] getting next ready subchannel (out of %" PRIuPTR
|
|
|
+ "), last_ready_index=%" PRIuPTR,
|
|
|
+ policy(), num_subchannels(), last_ready_index_);
|
|
|
+ }
|
|
|
+ for (size_t i = 0; i < num_subchannels(); ++i) {
|
|
|
+ const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
|
|
|
+ if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
+ gpr_log(
|
|
|
+ GPR_INFO,
|
|
|
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
|
|
|
+ ": state=%s",
|
|
|
+ policy(), subchannel(index)->subchannel(), this, index,
|
|
|
+ grpc_connectivity_state_name(
|
|
|
+ subchannel(index)->connectivity_state()));
|
|
|
+ }
|
|
|
+ if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
|
|
|
+ if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
+ gpr_log(GPR_INFO,
|
|
|
+ "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
|
|
|
+ " of subchannel_list %p",
|
|
|
+ policy(), subchannel(index)->subchannel(), index, this);
|
|
|
+ }
|
|
|
+ return index;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
+ gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
|
|
|
+ }
|
|
|
+ return num_subchannels();
|
|
|
+}
|
|
|
+
|
|
|
+// Sets last_ready_index_ to last_ready_index.
|
|
|
+void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
|
|
|
+ size_t last_ready_index) {
|
|
|
+ GPR_ASSERT(last_ready_index < num_subchannels());
|
|
|
+ last_ready_index_ = last_ready_index;
|
|
|
+ if (grpc_lb_round_robin_trace.enabled()) {
|
|
|
+ gpr_log(GPR_INFO,
|
|
|
+ "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
|
|
|
+ " (SC %p, CSC %p)",
|
|
|
+ policy(), last_ready_index,
|
|
|
+ subchannel(last_ready_index)->subchannel(),
|
|
|
+ subchannel(last_ready_index)->connected_subchannel());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
|
|
|
grpc_error** error) {
|
|
|
return grpc_connectivity_state_get(&state_tracker_, error);
|
|
|
@@ -593,7 +587,8 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
|
|
|
|
|
|
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
|
|
|
grpc_closure* on_ack) {
|
|
|
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
|
|
|
+ const size_t next_ready_index =
|
|
|
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
|
|
|
if (next_ready_index < subchannel_list_->num_subchannels()) {
|
|
|
RoundRobinSubchannelData* selected =
|
|
|
subchannel_list_->subchannel(next_ready_index);
|
|
|
@@ -648,7 +643,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
|
|
|
"rr_update_empty");
|
|
|
}
|
|
|
subchannel_list_ = std::move(latest_pending_subchannel_list_);
|
|
|
- last_ready_subchannel_index_ = -1;
|
|
|
} else {
|
|
|
// If we've started picking, start watching the new list.
|
|
|
latest_pending_subchannel_list_->StartWatchingLocked();
|