12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #include <grpc/support/port_platform.h>
- #include <inttypes.h>
- #include <limits.h>
- #include <string.h>
- #include "absl/container/inlined_vector.h"
- #include "absl/strings/str_format.h"
- #include "absl/strings/str_join.h"
- #include "absl/strings/string_view.h"
- #include <grpc/byte_buffer_reader.h>
- #include <grpc/grpc.h>
- #include <grpc/support/alloc.h>
- #include <grpc/support/time.h>
- #include "src/core/ext/filters/client_channel/client_channel.h"
- #include "src/core/ext/filters/client_channel/service_config.h"
- #include "src/core/ext/xds/xds_api.h"
- #include "src/core/ext/xds/xds_channel_args.h"
- #include "src/core/ext/xds/xds_client.h"
- #include "src/core/ext/xds/xds_client_stats.h"
- #include "src/core/ext/xds/xds_http_filters.h"
- #include "src/core/lib/backoff/backoff.h"
- #include "src/core/lib/channel/channel_args.h"
- #include "src/core/lib/channel/channel_stack.h"
- #include "src/core/lib/gpr/string.h"
- #include "src/core/lib/gprpp/memory.h"
- #include "src/core/lib/gprpp/orphanable.h"
- #include "src/core/lib/gprpp/ref_counted_ptr.h"
- #include "src/core/lib/gprpp/sync.h"
- #include "src/core/lib/iomgr/sockaddr.h"
- #include "src/core/lib/iomgr/sockaddr_utils.h"
- #include "src/core/lib/iomgr/timer.h"
- #include "src/core/lib/slice/slice_internal.h"
- #include "src/core/lib/slice/slice_string_helpers.h"
- #include "src/core/lib/surface/call.h"
- #include "src/core/lib/surface/channel.h"
- #include "src/core/lib/surface/channel_init.h"
- #include "src/core/lib/transport/static_metadata.h"
- #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
- #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
- #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
- #define GRPC_XDS_RECONNECT_JITTER 0.2
- #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
- namespace grpc_core {
- TraceFlag grpc_xds_client_trace(false, "xds_client");
- TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
- namespace {
- Mutex* g_mu = nullptr;
- const grpc_channel_args* g_channel_args = nullptr;
- XdsClient* g_xds_client = nullptr;
- char* g_fallback_bootstrap_config = nullptr;
- } // namespace
- //
- // Internal class declarations
- //
- // An xds call wrapper that can restart a call upon failure. Holds a ref to
- // the xds channel. The template parameter is the kind of wrapped xds call.
- template <typename T>
- class XdsClient::ChannelState::RetryableCall
- : public InternallyRefCounted<RetryableCall<T>> {
- public:
- explicit RetryableCall(RefCountedPtr<ChannelState> chand);
- void Orphan() override;
- void OnCallFinishedLocked();
- T* calld() const { return calld_.get(); }
- ChannelState* chand() const { return chand_.get(); }
- bool IsCurrentCallOnChannel() const;
- private:
- void StartNewCallLocked();
- void StartRetryTimerLocked();
- static void OnRetryTimer(void* arg, grpc_error* error);
- void OnRetryTimerLocked(grpc_error* error);
- // The wrapped xds call that talks to the xds server. It's instantiated
- // every time we start a new call. It's null during call retry backoff.
- OrphanablePtr<T> calld_;
- // The owning xds channel.
- RefCountedPtr<ChannelState> chand_;
- // Retry state.
- BackOff backoff_;
- grpc_timer retry_timer_;
- grpc_closure on_retry_timer_;
- bool retry_timer_callback_pending_ = false;
- bool shutting_down_ = false;
- };
- // Contains an ADS call to the xds server.
- class XdsClient::ChannelState::AdsCallState
- : public InternallyRefCounted<AdsCallState> {
- public:
- // The ctor and dtor should not be used directly.
- explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
- ~AdsCallState() override;
- void Orphan() override;
- RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
- ChannelState* chand() const { return parent_->chand(); }
- XdsClient* xds_client() const { return chand()->xds_client(); }
- bool seen_response() const { return seen_response_; }
- void Subscribe(const std::string& type_url, const std::string& name);
- void Unsubscribe(const std::string& type_url, const std::string& name,
- bool delay_unsubscription);
- bool HasSubscribedResources() const;
- private:
- class ResourceState : public InternallyRefCounted<ResourceState> {
- public:
- ResourceState(const std::string& type_url, const std::string& name,
- bool sent_initial_request)
- : type_url_(type_url),
- name_(name),
- sent_initial_request_(sent_initial_request) {
- GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
- grpc_schedule_on_exec_ctx);
- }
- void Orphan() override {
- Finish();
- Unref(DEBUG_LOCATION, "Orphan");
- }
- void Start(RefCountedPtr<AdsCallState> ads_calld) {
- if (sent_initial_request_) return;
- sent_initial_request_ = true;
- ads_calld_ = std::move(ads_calld);
- Ref(DEBUG_LOCATION, "timer").release();
- timer_pending_ = true;
- grpc_timer_init(
- &timer_,
- ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
- &timer_callback_);
- }
- void Finish() {
- if (timer_pending_) {
- grpc_timer_cancel(&timer_);
- timer_pending_ = false;
- }
- }
- private:
- static void OnTimer(void* arg, grpc_error* error) {
- ResourceState* self = static_cast<ResourceState*>(arg);
- {
- MutexLock lock(&self->ads_calld_->xds_client()->mu_);
- self->OnTimerLocked(GRPC_ERROR_REF(error));
- }
- self->ads_calld_.reset();
- self->Unref(DEBUG_LOCATION, "timer");
- }
- void OnTimerLocked(grpc_error* error) {
- if (error == GRPC_ERROR_NONE && timer_pending_) {
- timer_pending_ = false;
- grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrFormat(
- "timeout obtaining resource {type=%s name=%s} from xds server",
- type_url_, name_)
- .c_str());
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
- grpc_error_string(watcher_error));
- }
- if (type_url_ == XdsApi::kLdsTypeUrl) {
- ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
- for (const auto& p : state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(watcher_error));
- }
- } else if (type_url_ == XdsApi::kRdsTypeUrl) {
- RouteConfigState& state =
- ads_calld_->xds_client()->route_config_map_[name_];
- for (const auto& p : state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(watcher_error));
- }
- } else if (type_url_ == XdsApi::kCdsTypeUrl) {
- ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
- for (const auto& p : state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(watcher_error));
- }
- } else if (type_url_ == XdsApi::kEdsTypeUrl) {
- EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
- for (const auto& p : state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(watcher_error));
- }
- } else {
- GPR_UNREACHABLE_CODE(return );
- }
- GRPC_ERROR_UNREF(watcher_error);
- }
- GRPC_ERROR_UNREF(error);
- }
- const std::string type_url_;
- const std::string name_;
- RefCountedPtr<AdsCallState> ads_calld_;
- bool sent_initial_request_;
- bool timer_pending_ = false;
- grpc_timer timer_;
- grpc_closure timer_callback_;
- };
- struct ResourceTypeState {
- ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
- // Nonce and error for this resource type.
- std::string nonce;
- grpc_error* error = GRPC_ERROR_NONE;
- // Subscribed resources of this type.
- std::map<std::string /* name */, OrphanablePtr<ResourceState>>
- subscribed_resources;
- };
- void SendMessageLocked(const std::string& type_url);
- void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
- void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
- void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
- void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
- static void OnRequestSent(void* arg, grpc_error* error);
- void OnRequestSentLocked(grpc_error* error);
- static void OnResponseReceived(void* arg, grpc_error* error);
- bool OnResponseReceivedLocked();
- static void OnStatusReceived(void* arg, grpc_error* error);
- void OnStatusReceivedLocked(grpc_error* error);
- bool IsCurrentCallOnChannel() const;
- std::set<absl::string_view> ResourceNamesForRequest(
- const std::string& type_url);
- // The owning RetryableCall<>.
- RefCountedPtr<RetryableCall<AdsCallState>> parent_;
- bool sent_initial_message_ = false;
- bool seen_response_ = false;
- // Always non-NULL.
- grpc_call* call_;
- // recv_initial_metadata
- grpc_metadata_array initial_metadata_recv_;
- // send_message
- grpc_byte_buffer* send_message_payload_ = nullptr;
- grpc_closure on_request_sent_;
- // recv_message
- grpc_byte_buffer* recv_message_payload_ = nullptr;
- grpc_closure on_response_received_;
- // recv_trailing_metadata
- grpc_metadata_array trailing_metadata_recv_;
- grpc_status_code status_code_;
- grpc_slice status_details_;
- grpc_closure on_status_received_;
- // Resource types for which requests need to be sent.
- std::set<std::string /*type_url*/> buffered_requests_;
- // State for each resource type.
- std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
- };
- // Contains an LRS call to the xds server.
- class XdsClient::ChannelState::LrsCallState
- : public InternallyRefCounted<LrsCallState> {
- public:
- // The ctor and dtor should not be used directly.
- explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
- ~LrsCallState() override;
- void Orphan() override;
- void MaybeStartReportingLocked();
- RetryableCall<LrsCallState>* parent() { return parent_.get(); }
- ChannelState* chand() const { return parent_->chand(); }
- XdsClient* xds_client() const { return chand()->xds_client(); }
- bool seen_response() const { return seen_response_; }
- private:
- // Reports client-side load stats according to a fixed interval.
- class Reporter : public InternallyRefCounted<Reporter> {
- public:
- Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
- : parent_(std::move(parent)), report_interval_(report_interval) {
- GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
- grpc_schedule_on_exec_ctx);
- ScheduleNextReportLocked();
- }
- void Orphan() override;
- private:
- void ScheduleNextReportLocked();
- static void OnNextReportTimer(void* arg, grpc_error* error);
- bool OnNextReportTimerLocked(grpc_error* error);
- bool SendReportLocked();
- static void OnReportDone(void* arg, grpc_error* error);
- bool OnReportDoneLocked(grpc_error* error);
- bool IsCurrentReporterOnCall() const {
- return this == parent_->reporter_.get();
- }
- XdsClient* xds_client() const { return parent_->xds_client(); }
- // The owning LRS call.
- RefCountedPtr<LrsCallState> parent_;
- // The load reporting state.
- const grpc_millis report_interval_;
- bool last_report_counters_were_zero_ = false;
- bool next_report_timer_callback_pending_ = false;
- grpc_timer next_report_timer_;
- grpc_closure on_next_report_timer_;
- grpc_closure on_report_done_;
- };
- static void OnInitialRequestSent(void* arg, grpc_error* error);
- void OnInitialRequestSentLocked();
- static void OnResponseReceived(void* arg, grpc_error* error);
- bool OnResponseReceivedLocked();
- static void OnStatusReceived(void* arg, grpc_error* error);
- void OnStatusReceivedLocked(grpc_error* error);
- bool IsCurrentCallOnChannel() const;
- // The owning RetryableCall<>.
- RefCountedPtr<RetryableCall<LrsCallState>> parent_;
- bool seen_response_ = false;
- // Always non-NULL.
- grpc_call* call_;
- // recv_initial_metadata
- grpc_metadata_array initial_metadata_recv_;
- // send_message
- grpc_byte_buffer* send_message_payload_ = nullptr;
- grpc_closure on_initial_request_sent_;
- // recv_message
- grpc_byte_buffer* recv_message_payload_ = nullptr;
- grpc_closure on_response_received_;
- // recv_trailing_metadata
- grpc_metadata_array trailing_metadata_recv_;
- grpc_status_code status_code_;
- grpc_slice status_details_;
- grpc_closure on_status_received_;
- // Load reporting state.
- bool send_all_clusters_ = false;
- std::set<std::string> cluster_names_; // Asked for by the LRS server.
- grpc_millis load_reporting_interval_ = 0;
- OrphanablePtr<Reporter> reporter_;
- };
- //
- // XdsClient::ChannelState::StateWatcher
- //
- class XdsClient::ChannelState::StateWatcher
- : public AsyncConnectivityStateWatcherInterface {
- public:
- explicit StateWatcher(RefCountedPtr<ChannelState> parent)
- : parent_(std::move(parent)) {}
- private:
- void OnConnectivityStateChange(grpc_connectivity_state new_state,
- const absl::Status& status) override {
- MutexLock lock(&parent_->xds_client_->mu_);
- if (!parent_->shutting_down_ &&
- new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- // In TRANSIENT_FAILURE. Notify all watchers of error.
- gpr_log(GPR_INFO,
- "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
- "status_message:(%s)",
- parent_->xds_client(), status.ToString().c_str());
- parent_->xds_client()->NotifyOnErrorLocked(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "xds channel in TRANSIENT_FAILURE"));
- }
- }
- RefCountedPtr<ChannelState> parent_;
- };
- //
- // XdsClient::ChannelState
- //
- namespace {
- grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
- // Build channel args.
- absl::InlinedVector<grpc_arg, 2> args_to_add = {
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
- 5 * 60 * GPR_MS_PER_SEC),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
- };
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
- g_channel_args, args_to_add.data(), args_to_add.size());
- // Create channel creds.
- RefCountedPtr<grpc_channel_credentials> channel_creds =
- XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
- server.channel_creds_config);
- // Create channel.
- grpc_channel* channel = grpc_secure_channel_create(
- channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
- grpc_channel_args_destroy(new_args);
- return channel;
- }
- } // namespace
- XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
- const XdsBootstrap::XdsServer& server)
- : InternallyRefCounted<ChannelState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
- ? "ChannelState"
- : nullptr),
- xds_client_(std::move(xds_client)),
- server_(server) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
- xds_client_.get(), server.server_uri.c_str());
- }
- channel_ = CreateXdsChannel(server);
- GPR_ASSERT(channel_ != nullptr);
- StartConnectivityWatchLocked();
- }
- XdsClient::ChannelState::~ChannelState() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
- this);
- }
- grpc_channel_destroy(channel_);
- xds_client_.reset(DEBUG_LOCATION, "ChannelState");
- }
- void XdsClient::ChannelState::Orphan() {
- shutting_down_ = true;
- CancelConnectivityWatchLocked();
- ads_calld_.reset();
- lrs_calld_.reset();
- Unref(DEBUG_LOCATION, "ChannelState+orphaned");
- }
- XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
- const {
- return ads_calld_->calld();
- }
- XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
- const {
- return lrs_calld_->calld();
- }
- bool XdsClient::ChannelState::HasActiveAdsCall() const {
- return ads_calld_->calld() != nullptr;
- }
- void XdsClient::ChannelState::MaybeStartLrsCall() {
- if (lrs_calld_ != nullptr) return;
- lrs_calld_.reset(
- new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
- }
- void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
- void XdsClient::ChannelState::StartConnectivityWatchLocked() {
- grpc_channel_element* client_channel_elem =
- grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
- GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
- watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
- grpc_client_channel_start_connectivity_watch(
- client_channel_elem, GRPC_CHANNEL_IDLE,
- OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
- }
- void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
- grpc_channel_element* client_channel_elem =
- grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
- GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
- grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
- }
- void XdsClient::ChannelState::Subscribe(const std::string& type_url,
- const std::string& name) {
- if (ads_calld_ == nullptr) {
- // Start the ADS call if this is the first request.
- ads_calld_.reset(new RetryableCall<AdsCallState>(
- Ref(DEBUG_LOCATION, "ChannelState+ads")));
- // Note: AdsCallState's ctor will automatically subscribe to all
- // resources that the XdsClient already has watchers for, so we can
- // return here.
- return;
- }
- // If the ADS call is in backoff state, we don't need to do anything now
- // because when the call is restarted it will resend all necessary requests.
- if (ads_calld() == nullptr) return;
- // Subscribe to this resource if the ADS call is active.
- ads_calld()->Subscribe(type_url, name);
- }
- void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
- const std::string& name,
- bool delay_unsubscription) {
- if (ads_calld_ != nullptr) {
- auto* calld = ads_calld_->calld();
- if (calld != nullptr) {
- calld->Unsubscribe(type_url, name, delay_unsubscription);
- if (!calld->HasSubscribedResources()) ads_calld_.reset();
- }
- }
- }
- //
- // XdsClient::ChannelState::RetryableCall<>
- //
- template <typename T>
- XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
- RefCountedPtr<ChannelState> chand)
- : chand_(std::move(chand)),
- backoff_(
- BackOff::Options()
- .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
- 1000)
- .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
- .set_jitter(GRPC_XDS_RECONNECT_JITTER)
- .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
- // Closure Initialization
- GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
- grpc_schedule_on_exec_ctx);
- StartNewCallLocked();
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
- shutting_down_ = true;
- calld_.reset();
- if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
- this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
- const bool seen_response = calld_->seen_response();
- calld_.reset();
- if (seen_response) {
- // If we lost connection to the xds server, reset backoff and restart the
- // call immediately.
- backoff_.Reset();
- StartNewCallLocked();
- } else {
- // If we failed to connect to the xds server, retry later.
- StartRetryTimerLocked();
- }
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
- if (shutting_down_) return;
- GPR_ASSERT(chand_->channel_ != nullptr);
- GPR_ASSERT(calld_ == nullptr);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] Start new call from retryable call (chand: %p, "
- "retryable call: %p)",
- chand()->xds_client(), chand(), this);
- }
- calld_ = MakeOrphanable<T>(
- this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
- if (shutting_down_) return;
- const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
- gpr_log(GPR_INFO,
- "[xds_client %p] Failed to connect to xds server (chand: %p) "
- "retry timer will fire in %" PRId64 "ms.",
- chand()->xds_client(), chand(), timeout);
- }
- this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
- grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
- retry_timer_callback_pending_ = true;
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
- void* arg, grpc_error* error) {
- RetryableCall* calld = static_cast<RetryableCall*>(arg);
- {
- MutexLock lock(&calld->chand_->xds_client()->mu_);
- calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
- }
- calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
- }
- template <typename T>
- void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
- grpc_error* error) {
- retry_timer_callback_pending_ = false;
- if (!shutting_down_ && error == GRPC_ERROR_NONE) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(
- GPR_INFO,
- "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
- chand()->xds_client(), chand(), this);
- }
- StartNewCallLocked();
- }
- GRPC_ERROR_UNREF(error);
- }
- //
- // XdsClient::ChannelState::AdsCallState
- //
- XdsClient::ChannelState::AdsCallState::AdsCallState(
- RefCountedPtr<RetryableCall<AdsCallState>> parent)
- : InternallyRefCounted<AdsCallState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
- ? "AdsCallState"
- : nullptr),
- parent_(std::move(parent)) {
- // Init the ADS call. Note that the call will progress every time there's
- // activity in xds_client()->interested_parties_, which is comprised of
- // the polling entities from client_channel.
- GPR_ASSERT(xds_client() != nullptr);
- // Create a call with the specified method name.
- const auto& method =
- chand()->server_.ShouldUseV3()
- ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
- : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
- call_ = grpc_channel_create_pollset_set_call(
- chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
- xds_client()->interested_parties_, method, nullptr,
- GRPC_MILLIS_INF_FUTURE, nullptr);
- GPR_ASSERT(call_ != nullptr);
- // Init data associated with the call.
- grpc_metadata_array_init(&initial_metadata_recv_);
- grpc_metadata_array_init(&trailing_metadata_recv_);
- // Start the call.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
- "call: %p)",
- xds_client(), chand(), this, call_);
- }
- // Create the ops.
- grpc_call_error call_error;
- grpc_op ops[3];
- memset(ops, 0, sizeof(ops));
- // Op: send initial metadata.
- grpc_op* op = ops;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
- op->reserved = nullptr;
- op++;
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), nullptr);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: send request message.
- GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
- grpc_schedule_on_exec_ctx);
- for (const auto& p : xds_client()->listener_map_) {
- Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
- }
- for (const auto& p : xds_client()->route_config_map_) {
- Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
- }
- for (const auto& p : xds_client()->cluster_map_) {
- Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
- }
- for (const auto& p : xds_client()->endpoint_map_) {
- Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
- }
- // Op: recv initial metadata.
- op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata.recv_initial_metadata =
- &initial_metadata_recv_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Op: recv response.
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &recv_message_payload_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
- GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
- grpc_schedule_on_exec_ctx);
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: recv server status.
- op = ops;
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
- op->data.recv_status_on_client.status = &status_code_;
- op->data.recv_status_on_client.status_details = &status_details_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // This callback signals the end of the call, so it relies on the initial
- // ref instead of a new ref. When it's invoked, it's the initial ref that is
- // unreffed.
- GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
- grpc_schedule_on_exec_ctx);
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
- XdsClient::ChannelState::AdsCallState::~AdsCallState() {
- grpc_metadata_array_destroy(&initial_metadata_recv_);
- grpc_metadata_array_destroy(&trailing_metadata_recv_);
- grpc_byte_buffer_destroy(send_message_payload_);
- grpc_byte_buffer_destroy(recv_message_payload_);
- grpc_slice_unref_internal(status_details_);
- GPR_ASSERT(call_ != nullptr);
- grpc_call_unref(call_);
- }
- void XdsClient::ChannelState::AdsCallState::Orphan() {
- GPR_ASSERT(call_ != nullptr);
- // If we are here because xds_client wants to cancel the call,
- // on_status_received_ will complete the cancellation and clean up. Otherwise,
- // we are here because xds_client has to orphan a failed call, then the
- // following cancellation will be a no-op.
- grpc_call_cancel_internal(call_);
- state_map_.clear();
- // Note that the initial ref is hold by on_status_received_. So the
- // corresponding unref happens in on_status_received_ instead of here.
- }
- void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
- const std::string& type_url) {
- // Buffer message sending if an existing message is in flight.
- if (send_message_payload_ != nullptr) {
- buffered_requests_.insert(type_url);
- return;
- }
- auto& state = state_map_[type_url];
- grpc_slice request_payload_slice;
- std::set<absl::string_view> resource_names =
- ResourceNamesForRequest(type_url);
- request_payload_slice = xds_client()->api_.CreateAdsRequest(
- chand()->server_, type_url, resource_names,
- xds_client()->resource_version_map_[type_url], state.nonce,
- GRPC_ERROR_REF(state.error), !sent_initial_message_);
- if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
- type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
- state_map_.erase(type_url);
- }
- sent_initial_message_ = true;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
- "error=%s resources=%s",
- xds_client(), type_url.c_str(),
- xds_client()->resource_version_map_[type_url].c_str(),
- state.nonce.c_str(), grpc_error_string(state.error),
- absl::StrJoin(resource_names, " ").c_str());
- }
- GRPC_ERROR_UNREF(state.error);
- state.error = GRPC_ERROR_NONE;
- // Create message payload.
- send_message_payload_ =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
- // Send the message.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = send_message_payload_;
- Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
- GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
- grpc_schedule_on_exec_ctx);
- grpc_call_error call_error =
- grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
- if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR,
- "[xds_client %p] calld=%p call_error=%d sending ADS message",
- xds_client(), this, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
- }
- void XdsClient::ChannelState::AdsCallState::Subscribe(
- const std::string& type_url, const std::string& name) {
- auto& state = state_map_[type_url].subscribed_resources[name];
- if (state == nullptr) {
- state = MakeOrphanable<ResourceState>(
- type_url, name, !xds_client()->resource_version_map_[type_url].empty());
- SendMessageLocked(type_url);
- }
- }
- void XdsClient::ChannelState::AdsCallState::Unsubscribe(
- const std::string& type_url, const std::string& name,
- bool delay_unsubscription) {
- state_map_[type_url].subscribed_resources.erase(name);
- if (!delay_unsubscription) SendMessageLocked(type_url);
- }
- bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
- for (const auto& p : state_map_) {
- if (!p.second.subscribed_resources.empty()) return true;
- }
- return false;
- }
- void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
- XdsApi::LdsUpdateMap lds_update_map) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] LDS update received containing %" PRIuPTR
- " resources",
- xds_client(), lds_update_map.size());
- }
- auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
- std::set<std::string> rds_resource_names_seen;
- for (auto& p : lds_update_map) {
- const std::string& listener_name = p.first;
- XdsApi::LdsUpdate& lds_update = p.second;
- auto& state = lds_state.subscribed_resources[listener_name];
- if (state != nullptr) state->Finish();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
- listener_name.c_str(), lds_update.ToString().c_str());
- }
- // Record the RDS resource names seen.
- if (!lds_update.http_connection_manager.route_config_name.empty()) {
- rds_resource_names_seen.insert(
- lds_update.http_connection_manager.route_config_name);
- }
- // Ignore identical update.
- ListenerState& listener_state = xds_client()->listener_map_[listener_name];
- if (listener_state.update.has_value() &&
- *listener_state.update == lds_update) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] LDS update for %s identical to current, "
- "ignoring.",
- xds_client(), listener_name.c_str());
- }
- continue;
- }
- // Update the listener state.
- listener_state.update = std::move(lds_update);
- // Notify watchers.
- for (const auto& p : listener_state.watchers) {
- p.first->OnListenerChanged(*listener_state.update);
- }
- }
- // For any subscribed resource that is not present in the update,
- // remove it from the cache and notify watchers that it does not exist.
- for (const auto& p : lds_state.subscribed_resources) {
- const std::string& listener_name = p.first;
- if (lds_update_map.find(listener_name) == lds_update_map.end()) {
- ListenerState& listener_state =
- xds_client()->listener_map_[listener_name];
- // If the resource was newly requested but has not yet been received,
- // we don't want to generate an error for the watchers, because this LDS
- // response may be in reaction to an earlier request that did not yet
- // request the new resource, so its absence from the response does not
- // necessarily indicate that the resource does not exist.
- // For that case, we rely on the request timeout instead.
- if (!listener_state.update.has_value()) continue;
- listener_state.update.reset();
- for (const auto& p : listener_state.watchers) {
- p.first->OnResourceDoesNotExist();
- }
- }
- }
- // For any RDS resource that is no longer referred to by any LDS
- // resources, remove it from the cache and notify watchers that it
- // does not exist.
- auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
- for (const auto& p : rds_state.subscribed_resources) {
- const std::string& rds_resource_name = p.first;
- if (rds_resource_names_seen.find(rds_resource_name) ==
- rds_resource_names_seen.end()) {
- RouteConfigState& route_config_state =
- xds_client()->route_config_map_[rds_resource_name];
- route_config_state.update.reset();
- for (const auto& p : route_config_state.watchers) {
- p.first->OnResourceDoesNotExist();
- }
- }
- }
- }
- void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
- XdsApi::RdsUpdateMap rds_update_map) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] RDS update received containing %" PRIuPTR
- " resources",
- xds_client(), rds_update_map.size());
- }
- auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
- for (auto& p : rds_update_map) {
- const std::string& route_config_name = p.first;
- XdsApi::RdsUpdate& rds_update = p.second;
- auto& state = rds_state.subscribed_resources[route_config_name];
- if (state != nullptr) state->Finish();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
- rds_update.ToString().c_str());
- }
- RouteConfigState& route_config_state =
- xds_client()->route_config_map_[route_config_name];
- // Ignore identical update.
- if (route_config_state.update.has_value() &&
- *route_config_state.update == rds_update) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] RDS resource identical to current, ignoring",
- xds_client());
- }
- continue;
- }
- // Update the cache.
- route_config_state.update = std::move(rds_update);
- // Notify all watchers.
- for (const auto& p : route_config_state.watchers) {
- p.first->OnRouteConfigChanged(*route_config_state.update);
- }
- }
- }
- void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
- XdsApi::CdsUpdateMap cds_update_map) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] CDS update received containing %" PRIuPTR
- " resources",
- xds_client(), cds_update_map.size());
- }
- auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
- std::set<std::string> eds_resource_names_seen;
- for (auto& p : cds_update_map) {
- const char* cluster_name = p.first.c_str();
- XdsApi::CdsUpdate& cds_update = p.second;
- auto& state = cds_state.subscribed_resources[cluster_name];
- if (state != nullptr) state->Finish();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
- cluster_name, cds_update.ToString().c_str());
- }
- // Record the EDS resource names seen.
- eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
- ? cluster_name
- : cds_update.eds_service_name);
- // Ignore identical update.
- ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
- if (cluster_state.update.has_value() &&
- *cluster_state.update == cds_update) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] CDS update identical to current, ignoring.",
- xds_client());
- }
- continue;
- }
- // Update the cluster state.
- cluster_state.update = std::move(cds_update);
- // Notify all watchers.
- for (const auto& p : cluster_state.watchers) {
- p.first->OnClusterChanged(cluster_state.update.value());
- }
- }
- // For any subscribed resource that is not present in the update,
- // remove it from the cache and notify watchers that it does not exist.
- for (const auto& p : cds_state.subscribed_resources) {
- const std::string& cluster_name = p.first;
- if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
- ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
- // If the resource was newly requested but has not yet been received,
- // we don't want to generate an error for the watchers, because this CDS
- // response may be in reaction to an earlier request that did not yet
- // request the new resource, so its absence from the response does not
- // necessarily indicate that the resource does not exist.
- // For that case, we rely on the request timeout instead.
- if (!cluster_state.update.has_value()) continue;
- cluster_state.update.reset();
- for (const auto& p : cluster_state.watchers) {
- p.first->OnResourceDoesNotExist();
- }
- }
- }
- // For any EDS resource that is no longer referred to by any CDS
- // resources, remove it from the cache and notify watchers that it
- // does not exist.
- auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
- for (const auto& p : eds_state.subscribed_resources) {
- const std::string& eds_resource_name = p.first;
- if (eds_resource_names_seen.find(eds_resource_name) ==
- eds_resource_names_seen.end()) {
- EndpointState& endpoint_state =
- xds_client()->endpoint_map_[eds_resource_name];
- endpoint_state.update.reset();
- for (const auto& p : endpoint_state.watchers) {
- p.first->OnResourceDoesNotExist();
- }
- }
- }
- }
- void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
- XdsApi::EdsUpdateMap eds_update_map) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] EDS update received containing %" PRIuPTR
- " resources",
- xds_client(), eds_update_map.size());
- }
- auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
- for (auto& p : eds_update_map) {
- const char* eds_service_name = p.first.c_str();
- XdsApi::EdsUpdate& eds_update = p.second;
- auto& state = eds_state.subscribed_resources[eds_service_name];
- if (state != nullptr) state->Finish();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
- eds_service_name, eds_update.ToString().c_str());
- }
- EndpointState& endpoint_state =
- xds_client()->endpoint_map_[eds_service_name];
- // Ignore identical update.
- if (endpoint_state.update.has_value() &&
- *endpoint_state.update == eds_update) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] EDS update identical to current, ignoring.",
- xds_client());
- }
- continue;
- }
- // Update the cluster state.
- endpoint_state.update = std::move(eds_update);
- // Notify all watchers.
- for (const auto& p : endpoint_state.watchers) {
- p.first->OnEndpointChanged(endpoint_state.update.value());
- }
- }
- }
- void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
- grpc_error* error) {
- AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
- {
- MutexLock lock(&ads_calld->xds_client()->mu_);
- ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
- }
- ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
- }
- void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
- grpc_error* error) {
- if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
- // Clean up the sent message.
- grpc_byte_buffer_destroy(send_message_payload_);
- send_message_payload_ = nullptr;
- // Continue to send another pending message if any.
- // TODO(roth): The current code to handle buffered messages has the
- // advantage of sending only the most recent list of resource names for
- // each resource type (no matter how many times that resource type has
- // been requested to send while the current message sending is still
- // pending). But its disadvantage is that we send the requests in fixed
- // order of resource types. We need to fix this if we are seeing some
- // resource type(s) starved due to frequent requests of other resource
- // type(s).
- auto it = buffered_requests_.begin();
- if (it != buffered_requests_.end()) {
- SendMessageLocked(*it);
- buffered_requests_.erase(it);
- }
- }
- GRPC_ERROR_UNREF(error);
- }
- void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
- void* arg, grpc_error* /* error */) {
- AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
- bool done;
- {
- MutexLock lock(&ads_calld->xds_client()->mu_);
- done = ads_calld->OnResponseReceivedLocked();
- }
- if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
- }
- bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
- // Empty payload means the call was cancelled.
- if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
- return true;
- }
- // Read the response.
- grpc_byte_buffer_reader bbr;
- grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
- grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
- grpc_byte_buffer_reader_destroy(&bbr);
- grpc_byte_buffer_destroy(recv_message_payload_);
- recv_message_payload_ = nullptr;
- // Parse and validate the response.
- XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
- chand()->server_, response_slice,
- ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
- ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
- ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
- ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
- grpc_slice_unref_internal(response_slice);
- if (result.type_url.empty()) {
- // Ignore unparsable response.
- gpr_log(GPR_ERROR,
- "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
- xds_client(), grpc_error_string(result.parse_error));
- GRPC_ERROR_UNREF(result.parse_error);
- } else {
- // Update nonce.
- auto& state = state_map_[result.type_url];
- state.nonce = std::move(result.nonce);
- // NACK or ACK the response.
- if (result.parse_error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(state.error);
- state.error = result.parse_error;
- // NACK unacceptable update.
- gpr_log(GPR_ERROR,
- "[xds_client %p] ADS response invalid for resource type %s "
- "version %s, will NACK: nonce=%s error=%s",
- xds_client(), result.type_url.c_str(), result.version.c_str(),
- state.nonce.c_str(), grpc_error_string(result.parse_error));
- SendMessageLocked(result.type_url);
- } else {
- seen_response_ = true;
- // Accept the ADS response according to the type_url.
- if (result.type_url == XdsApi::kLdsTypeUrl) {
- AcceptLdsUpdate(std::move(result.lds_update_map));
- } else if (result.type_url == XdsApi::kRdsTypeUrl) {
- AcceptRdsUpdate(std::move(result.rds_update_map));
- } else if (result.type_url == XdsApi::kCdsTypeUrl) {
- AcceptCdsUpdate(std::move(result.cds_update_map));
- } else if (result.type_url == XdsApi::kEdsTypeUrl) {
- AcceptEdsUpdate(std::move(result.eds_update_map));
- }
- xds_client()->resource_version_map_[result.type_url] =
- std::move(result.version);
- // ACK the update.
- SendMessageLocked(result.type_url);
- // Start load reporting if needed.
- auto& lrs_call = chand()->lrs_calld_;
- if (lrs_call != nullptr) {
- LrsCallState* lrs_calld = lrs_call->calld();
- if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
- }
- }
- }
- if (xds_client()->shutting_down_) return true;
- // Keep listening for updates.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_RECV_MESSAGE;
- op.data.recv_message.recv_message = &recv_message_payload_;
- op.flags = 0;
- op.reserved = nullptr;
- GPR_ASSERT(call_ != nullptr);
- // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
- const grpc_call_error call_error =
- grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- return false;
- }
- void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
- void* arg, grpc_error* error) {
- AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
- {
- MutexLock lock(&ads_calld->xds_client()->mu_);
- ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
- }
- ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
- }
- void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
- grpc_error* error) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- char* status_details = grpc_slice_to_c_string(status_details_);
- gpr_log(GPR_INFO,
- "[xds_client %p] ADS call status received. Status = %d, details "
- "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
- xds_client(), status_code_, status_details, chand(), this, call_,
- grpc_error_string(error));
- gpr_free(status_details);
- }
- // Ignore status from a stale call.
- if (IsCurrentCallOnChannel()) {
- // Try to restart the call.
- parent_->OnCallFinishedLocked();
- // Send error to all watchers.
- xds_client()->NotifyOnErrorLocked(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
- }
- GRPC_ERROR_UNREF(error);
- }
- bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
- // If the retryable ADS call is null (which only happens when the xds channel
- // is shutting down), all the ADS calls are stale.
- if (chand()->ads_calld_ == nullptr) return false;
- return this == chand()->ads_calld_->calld();
- }
- std::set<absl::string_view>
- XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
- const std::string& type_url) {
- std::set<absl::string_view> resource_names;
- auto it = state_map_.find(type_url);
- if (it != state_map_.end()) {
- for (auto& p : it->second.subscribed_resources) {
- resource_names.insert(p.first);
- OrphanablePtr<ResourceState>& state = p.second;
- state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
- }
- }
- return resource_names;
- }
- //
- // XdsClient::ChannelState::LrsCallState::Reporter
- //
- void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
- if (next_report_timer_callback_pending_) {
- grpc_timer_cancel(&next_report_timer_);
- }
- }
- void XdsClient::ChannelState::LrsCallState::Reporter::
- ScheduleNextReportLocked() {
- const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
- grpc_timer_init(&next_report_timer_, next_report_time,
- &on_next_report_timer_);
- next_report_timer_callback_pending_ = true;
- }
- void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
- void* arg, grpc_error* error) {
- Reporter* self = static_cast<Reporter*>(arg);
- bool done;
- {
- MutexLock lock(&self->xds_client()->mu_);
- done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
- }
- if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
- }
- bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
- grpc_error* error) {
- next_report_timer_callback_pending_ = false;
- if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
- GRPC_ERROR_UNREF(error);
- return true;
- }
- return SendReportLocked();
- }
- namespace {
- bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
- for (const auto& p : snapshot) {
- const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
- if (!cluster_snapshot.dropped_requests.IsZero()) return false;
- for (const auto& q : cluster_snapshot.locality_stats) {
- const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
- if (!locality_snapshot.IsZero()) return false;
- }
- }
- return true;
- }
- } // namespace
- bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
- // Construct snapshot from all reported stats.
- XdsApi::ClusterLoadReportMap snapshot =
- xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
- parent_->cluster_names_);
- // Skip client load report if the counters were all zero in the last
- // report and they are still zero in this one.
- const bool old_val = last_report_counters_were_zero_;
- last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
- if (old_val && last_report_counters_were_zero_) {
- if (xds_client()->load_report_map_.empty()) {
- parent_->chand()->StopLrsCall();
- return true;
- }
- ScheduleNextReportLocked();
- return false;
- }
- // Create a request that contains the snapshot.
- grpc_slice request_payload_slice =
- xds_client()->api_.CreateLrsRequest(std::move(snapshot));
- parent_->send_message_payload_ =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
- // Send the report.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = parent_->send_message_payload_;
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- parent_->call_, &op, 1, &on_report_done_);
- if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR,
- "[xds_client %p] calld=%p call_error=%d sending client load report",
- xds_client(), this, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
- return false;
- }
- void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
- void* arg, grpc_error* error) {
- Reporter* self = static_cast<Reporter*>(arg);
- bool done;
- {
- MutexLock lock(&self->xds_client()->mu_);
- done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
- }
- if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
- }
- bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
- grpc_error* error) {
- grpc_byte_buffer_destroy(parent_->send_message_payload_);
- parent_->send_message_payload_ = nullptr;
- // If there are no more registered stats to report, cancel the call.
- if (xds_client()->load_report_map_.empty()) {
- parent_->chand()->StopLrsCall();
- GRPC_ERROR_UNREF(error);
- return true;
- }
- if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
- GRPC_ERROR_UNREF(error);
- // If this reporter is no longer the current one on the call, the reason
- // might be that it was orphaned for a new one due to config update.
- if (!IsCurrentReporterOnCall()) {
- parent_->MaybeStartReportingLocked();
- }
- return true;
- }
- ScheduleNextReportLocked();
- return false;
- }
- //
- // XdsClient::ChannelState::LrsCallState
- //
- XdsClient::ChannelState::LrsCallState::LrsCallState(
- RefCountedPtr<RetryableCall<LrsCallState>> parent)
- : InternallyRefCounted<LrsCallState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
- ? "LrsCallState"
- : nullptr),
- parent_(std::move(parent)) {
- // Init the LRS call. Note that the call will progress every time there's
- // activity in xds_client()->interested_parties_, which is comprised of
- // the polling entities from client_channel.
- GPR_ASSERT(xds_client() != nullptr);
- const auto& method =
- chand()->server_.ShouldUseV3()
- ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
- : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
- call_ = grpc_channel_create_pollset_set_call(
- chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
- xds_client()->interested_parties_, method, nullptr,
- GRPC_MILLIS_INF_FUTURE, nullptr);
- GPR_ASSERT(call_ != nullptr);
- // Init the request payload.
- grpc_slice request_payload_slice =
- xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
- send_message_payload_ =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
- // Init other data associated with the LRS call.
- grpc_metadata_array_init(&initial_metadata_recv_);
- grpc_metadata_array_init(&trailing_metadata_recv_);
- // Start the call.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
- "call: %p)",
- xds_client(), chand(), this, call_);
- }
- // Create the ops.
- grpc_call_error call_error;
- grpc_op ops[3];
- memset(ops, 0, sizeof(ops));
- // Op: send initial metadata.
- grpc_op* op = ops;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
- op->reserved = nullptr;
- op++;
- // Op: send request message.
- GPR_ASSERT(send_message_payload_ != nullptr);
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message = send_message_payload_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
- GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
- grpc_schedule_on_exec_ctx);
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: recv initial metadata.
- op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata.recv_initial_metadata =
- &initial_metadata_recv_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Op: recv response.
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &recv_message_payload_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
- GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
- grpc_schedule_on_exec_ctx);
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: recv server status.
- op = ops;
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
- op->data.recv_status_on_client.status = &status_code_;
- op->data.recv_status_on_client.status_details = &status_details_;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // This callback signals the end of the call, so it relies on the initial
- // ref instead of a new ref. When it's invoked, it's the initial ref that is
- // unreffed.
- GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
- grpc_schedule_on_exec_ctx);
- call_error = grpc_call_start_batch_and_execute(
- call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
- XdsClient::ChannelState::LrsCallState::~LrsCallState() {
- grpc_metadata_array_destroy(&initial_metadata_recv_);
- grpc_metadata_array_destroy(&trailing_metadata_recv_);
- grpc_byte_buffer_destroy(send_message_payload_);
- grpc_byte_buffer_destroy(recv_message_payload_);
- grpc_slice_unref_internal(status_details_);
- GPR_ASSERT(call_ != nullptr);
- grpc_call_unref(call_);
- }
- void XdsClient::ChannelState::LrsCallState::Orphan() {
- reporter_.reset();
- GPR_ASSERT(call_ != nullptr);
- // If we are here because xds_client wants to cancel the call,
- // on_status_received_ will complete the cancellation and clean up. Otherwise,
- // we are here because xds_client has to orphan a failed call, then the
- // following cancellation will be a no-op.
- grpc_call_cancel_internal(call_);
- // Note that the initial ref is hold by on_status_received_. So the
- // corresponding unref happens in on_status_received_ instead of here.
- }
- void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
- // Don't start again if already started.
- if (reporter_ != nullptr) return;
- // Don't start if the previous send_message op (of the initial request or the
- // last report of the previous reporter) hasn't completed.
- if (send_message_payload_ != nullptr) return;
- // Don't start if no LRS response has arrived.
- if (!seen_response()) return;
- // Don't start if the ADS call hasn't received any valid response. Note that
- // this must be the first channel because it is the current channel but its
- // ADS call hasn't seen any response.
- if (chand()->ads_calld_ == nullptr ||
- chand()->ads_calld_->calld() == nullptr ||
- !chand()->ads_calld_->calld()->seen_response()) {
- return;
- }
- // Start reporting.
- reporter_ = MakeOrphanable<Reporter>(
- Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
- }
- void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
- void* arg, grpc_error* /*error*/) {
- LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
- {
- MutexLock lock(&lrs_calld->xds_client()->mu_);
- lrs_calld->OnInitialRequestSentLocked();
- }
- lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
- }
- void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
- // Clear the send_message_payload_.
- grpc_byte_buffer_destroy(send_message_payload_);
- send_message_payload_ = nullptr;
- MaybeStartReportingLocked();
- }
- void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
- void* arg, grpc_error* /*error*/) {
- LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
- bool done;
- {
- MutexLock lock(&lrs_calld->xds_client()->mu_);
- done = lrs_calld->OnResponseReceivedLocked();
- }
- if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
- }
- bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
- // Empty payload means the call was cancelled.
- if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
- return true;
- }
- // Read the response.
- grpc_byte_buffer_reader bbr;
- grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
- grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
- grpc_byte_buffer_reader_destroy(&bbr);
- grpc_byte_buffer_destroy(recv_message_payload_);
- recv_message_payload_ = nullptr;
- // This anonymous lambda is a hack to avoid the usage of goto.
- [&]() {
- // Parse the response.
- bool send_all_clusters = false;
- std::set<std::string> new_cluster_names;
- grpc_millis new_load_reporting_interval;
- grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
- response_slice, &send_all_clusters, &new_cluster_names,
- &new_load_reporting_interval);
- if (parse_error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR,
- "[xds_client %p] LRS response parsing failed. error=%s",
- xds_client(), grpc_error_string(parse_error));
- GRPC_ERROR_UNREF(parse_error);
- return;
- }
- seen_response_ = true;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(
- GPR_INFO,
- "[xds_client %p] LRS response received, %" PRIuPTR
- " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
- "ms",
- xds_client(), new_cluster_names.size(), send_all_clusters,
- new_load_reporting_interval);
- size_t i = 0;
- for (const auto& name : new_cluster_names) {
- gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
- xds_client(), i++, name.c_str());
- }
- }
- if (new_load_reporting_interval <
- GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
- new_load_reporting_interval =
- GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] Increased load_report_interval to minimum "
- "value %dms",
- xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
- }
- }
- // Ignore identical update.
- if (send_all_clusters == send_all_clusters_ &&
- cluster_names_ == new_cluster_names &&
- load_reporting_interval_ == new_load_reporting_interval) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] Incoming LRS response identical to current, "
- "ignoring.",
- xds_client());
- }
- return;
- }
- // Stop current load reporting (if any) to adopt the new config.
- reporter_.reset();
- // Record the new config.
- send_all_clusters_ = send_all_clusters;
- cluster_names_ = std::move(new_cluster_names);
- load_reporting_interval_ = new_load_reporting_interval;
- // Try starting sending load report.
- MaybeStartReportingLocked();
- }();
- grpc_slice_unref_internal(response_slice);
- if (xds_client()->shutting_down_) return true;
- // Keep listening for LRS config updates.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_RECV_MESSAGE;
- op.data.recv_message.recv_message = &recv_message_payload_;
- op.flags = 0;
- op.reserved = nullptr;
- GPR_ASSERT(call_ != nullptr);
- // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
- const grpc_call_error call_error =
- grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- return false;
- }
- void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
- void* arg, grpc_error* error) {
- LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
- {
- MutexLock lock(&lrs_calld->xds_client()->mu_);
- lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
- }
- lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
- }
- void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
- grpc_error* error) {
- GPR_ASSERT(call_ != nullptr);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- char* status_details = grpc_slice_to_c_string(status_details_);
- gpr_log(GPR_INFO,
- "[xds_client %p] LRS call status received. Status = %d, details "
- "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
- xds_client(), status_code_, status_details, chand(), this, call_,
- grpc_error_string(error));
- gpr_free(status_details);
- }
- // Ignore status from a stale call.
- if (IsCurrentCallOnChannel()) {
- GPR_ASSERT(!xds_client()->shutting_down_);
- // Try to restart the call.
- parent_->OnCallFinishedLocked();
- }
- GRPC_ERROR_UNREF(error);
- }
- bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
- // If the retryable LRS call is null (which only happens when the xds channel
- // is shutting down), all the LRS calls are stale.
- if (chand()->lrs_calld_ == nullptr) return false;
- return this == chand()->lrs_calld_->calld();
- }
- //
- // XdsClient
- //
- namespace {
- grpc_millis GetRequestTimeout() {
- return grpc_channel_args_find_integer(
- g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
- {15000, 0, INT_MAX});
- }
- } // namespace
- XdsClient::XdsClient(grpc_error** error)
- : DualRefCounted<XdsClient>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
- : nullptr),
- request_timeout_(GetRequestTimeout()),
- interested_parties_(grpc_pollset_set_create()),
- bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace,
- g_fallback_bootstrap_config, error)),
- certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
- bootstrap_ == nullptr
- ? CertificateProviderStore::PluginDefinitionMap()
- : bootstrap_->certificate_providers())),
- api_(this, &grpc_xds_client_trace,
- bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
- }
- if (*error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
- this, grpc_error_string(*error));
- return;
- }
- // Create ChannelState object.
- chand_ = MakeOrphanable<ChannelState>(
- WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
- }
- XdsClient::~XdsClient() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
- }
- grpc_pollset_set_destroy(interested_parties_);
- }
- void XdsClient::AddChannelzLinkage(
- channelz::ChannelNode* parent_channelz_node) {
- channelz::ChannelNode* xds_channelz_node =
- grpc_channel_get_channelz_node(chand_->channel());
- if (xds_channelz_node != nullptr) {
- parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
- }
- }
- void XdsClient::RemoveChannelzLinkage(
- channelz::ChannelNode* parent_channelz_node) {
- channelz::ChannelNode* xds_channelz_node =
- grpc_channel_get_channelz_node(chand_->channel());
- if (xds_channelz_node != nullptr) {
- parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
- }
- }
- void XdsClient::Orphan() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
- }
- {
- MutexLock lock(g_mu);
- if (g_xds_client == this) g_xds_client = nullptr;
- }
- {
- MutexLock lock(&mu_);
- shutting_down_ = true;
- // Orphan ChannelState object.
- chand_.reset();
- // We do not clear cluster_map_ and endpoint_map_ if the xds client was
- // created by the XdsResolver because the maps contain refs for watchers
- // which in turn hold refs to the loadbalancing policies. At this point, it
- // is possible for ADS calls to be in progress. Unreffing the loadbalancing
- // policies before those calls are done would lead to issues such as
- // https://github.com/grpc/grpc/issues/20928.
- if (!listener_map_.empty()) {
- cluster_map_.clear();
- endpoint_map_.clear();
- }
- }
- }
- void XdsClient::WatchListenerData(
- absl::string_view listener_name,
- std::unique_ptr<ListenerWatcherInterface> watcher) {
- std::string listener_name_str = std::string(listener_name);
- MutexLock lock(&mu_);
- ListenerState& listener_state = listener_map_[listener_name_str];
- ListenerWatcherInterface* w = watcher.get();
- listener_state.watchers[w] = std::move(watcher);
- // If we've already received an LDS update, notify the new watcher
- // immediately.
- if (listener_state.update.has_value()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
- this, listener_name_str.c_str());
- }
- w->OnListenerChanged(*listener_state.update);
- }
- chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
- }
- void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
- ListenerWatcherInterface* watcher,
- bool delay_unsubscription) {
- MutexLock lock(&mu_);
- if (shutting_down_) return;
- std::string listener_name_str = std::string(listener_name);
- ListenerState& listener_state = listener_map_[listener_name_str];
- auto it = listener_state.watchers.find(watcher);
- if (it != listener_state.watchers.end()) {
- listener_state.watchers.erase(it);
- if (listener_state.watchers.empty()) {
- listener_map_.erase(listener_name_str);
- chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
- delay_unsubscription);
- }
- }
- }
- void XdsClient::WatchRouteConfigData(
- absl::string_view route_config_name,
- std::unique_ptr<RouteConfigWatcherInterface> watcher) {
- std::string route_config_name_str = std::string(route_config_name);
- MutexLock lock(&mu_);
- RouteConfigState& route_config_state =
- route_config_map_[route_config_name_str];
- RouteConfigWatcherInterface* w = watcher.get();
- route_config_state.watchers[w] = std::move(watcher);
- // If we've already received an RDS update, notify the new watcher
- // immediately.
- if (route_config_state.update.has_value()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] returning cached route config data for %s", this,
- route_config_name_str.c_str());
- }
- w->OnRouteConfigChanged(*route_config_state.update);
- }
- chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
- }
- void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
- RouteConfigWatcherInterface* watcher,
- bool delay_unsubscription) {
- MutexLock lock(&mu_);
- if (shutting_down_) return;
- std::string route_config_name_str = std::string(route_config_name);
- RouteConfigState& route_config_state =
- route_config_map_[route_config_name_str];
- auto it = route_config_state.watchers.find(watcher);
- if (it != route_config_state.watchers.end()) {
- route_config_state.watchers.erase(it);
- if (route_config_state.watchers.empty()) {
- route_config_map_.erase(route_config_name_str);
- chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
- delay_unsubscription);
- }
- }
- }
- void XdsClient::WatchClusterData(
- absl::string_view cluster_name,
- std::unique_ptr<ClusterWatcherInterface> watcher) {
- std::string cluster_name_str = std::string(cluster_name);
- MutexLock lock(&mu_);
- ClusterState& cluster_state = cluster_map_[cluster_name_str];
- ClusterWatcherInterface* w = watcher.get();
- cluster_state.watchers[w] = std::move(watcher);
- // If we've already received a CDS update, notify the new watcher
- // immediately.
- if (cluster_state.update.has_value()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
- this, cluster_name_str.c_str());
- }
- w->OnClusterChanged(cluster_state.update.value());
- }
- chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
- }
- void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
- ClusterWatcherInterface* watcher,
- bool delay_unsubscription) {
- MutexLock lock(&mu_);
- if (shutting_down_) return;
- std::string cluster_name_str = std::string(cluster_name);
- ClusterState& cluster_state = cluster_map_[cluster_name_str];
- auto it = cluster_state.watchers.find(watcher);
- if (it != cluster_state.watchers.end()) {
- cluster_state.watchers.erase(it);
- if (cluster_state.watchers.empty()) {
- cluster_map_.erase(cluster_name_str);
- chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
- delay_unsubscription);
- }
- }
- }
- void XdsClient::WatchEndpointData(
- absl::string_view eds_service_name,
- std::unique_ptr<EndpointWatcherInterface> watcher) {
- std::string eds_service_name_str = std::string(eds_service_name);
- MutexLock lock(&mu_);
- EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
- EndpointWatcherInterface* w = watcher.get();
- endpoint_state.watchers[w] = std::move(watcher);
- // If we've already received an EDS update, notify the new watcher
- // immediately.
- if (endpoint_state.update.has_value()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
- this, eds_service_name_str.c_str());
- }
- w->OnEndpointChanged(endpoint_state.update.value());
- }
- chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
- }
- void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
- EndpointWatcherInterface* watcher,
- bool delay_unsubscription) {
- MutexLock lock(&mu_);
- if (shutting_down_) return;
- std::string eds_service_name_str = std::string(eds_service_name);
- EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
- auto it = endpoint_state.watchers.find(watcher);
- if (it != endpoint_state.watchers.end()) {
- endpoint_state.watchers.erase(it);
- if (endpoint_state.watchers.empty()) {
- endpoint_map_.erase(eds_service_name_str);
- chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
- delay_unsubscription);
- }
- }
- }
- RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
- absl::string_view lrs_server, absl::string_view cluster_name,
- absl::string_view eds_service_name) {
- // TODO(roth): When we add support for direct federation, use the
- // server name specified in lrs_server.
- auto key =
- std::make_pair(std::string(cluster_name), std::string(eds_service_name));
- MutexLock lock(&mu_);
- // We jump through some hoops here to make sure that the absl::string_views
- // stored in the XdsClusterDropStats object point to the strings
- // in the load_report_map_ key, so that they have the same lifetime.
- auto it = load_report_map_
- .emplace(std::make_pair(std::move(key), LoadReportState()))
- .first;
- LoadReportState& load_report_state = it->second;
- RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
- if (load_report_state.drop_stats != nullptr) {
- cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
- }
- if (cluster_drop_stats == nullptr) {
- if (load_report_state.drop_stats != nullptr) {
- load_report_state.deleted_drop_stats +=
- load_report_state.drop_stats->GetSnapshotAndReset();
- }
- cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
- Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
- it->first.first /*cluster_name*/,
- it->first.second /*eds_service_name*/);
- load_report_state.drop_stats = cluster_drop_stats.get();
- }
- chand_->MaybeStartLrsCall();
- return cluster_drop_stats;
- }
- void XdsClient::RemoveClusterDropStats(
- absl::string_view /*lrs_server*/, absl::string_view cluster_name,
- absl::string_view eds_service_name,
- XdsClusterDropStats* cluster_drop_stats) {
- MutexLock lock(&mu_);
- // TODO(roth): When we add support for direct federation, use the
- // server name specified in lrs_server.
- auto it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (it == load_report_map_.end()) return;
- LoadReportState& load_report_state = it->second;
- if (load_report_state.drop_stats == cluster_drop_stats) {
- // Record final snapshot in deleted_drop_stats, which will be
- // added to the next load report.
- load_report_state.deleted_drop_stats +=
- load_report_state.drop_stats->GetSnapshotAndReset();
- load_report_state.drop_stats = nullptr;
- }
- }
- RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
- absl::string_view lrs_server, absl::string_view cluster_name,
- absl::string_view eds_service_name,
- RefCountedPtr<XdsLocalityName> locality) {
- // TODO(roth): When we add support for direct federation, use the
- // server name specified in lrs_server.
- auto key =
- std::make_pair(std::string(cluster_name), std::string(eds_service_name));
- MutexLock lock(&mu_);
- // We jump through some hoops here to make sure that the absl::string_views
- // stored in the XdsClusterLocalityStats object point to the strings
- // in the load_report_map_ key, so that they have the same lifetime.
- auto it = load_report_map_
- .emplace(std::make_pair(std::move(key), LoadReportState()))
- .first;
- LoadReportState& load_report_state = it->second;
- LoadReportState::LocalityState& locality_state =
- load_report_state.locality_stats[locality];
- RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
- if (locality_state.locality_stats != nullptr) {
- cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
- }
- if (cluster_locality_stats == nullptr) {
- if (locality_state.locality_stats != nullptr) {
- locality_state.deleted_locality_stats +=
- locality_state.locality_stats->GetSnapshotAndReset();
- }
- cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
- Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
- it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
- std::move(locality));
- locality_state.locality_stats = cluster_locality_stats.get();
- }
- chand_->MaybeStartLrsCall();
- return cluster_locality_stats;
- }
- void XdsClient::RemoveClusterLocalityStats(
- absl::string_view /*lrs_server*/, absl::string_view cluster_name,
- absl::string_view eds_service_name,
- const RefCountedPtr<XdsLocalityName>& locality,
- XdsClusterLocalityStats* cluster_locality_stats) {
- MutexLock lock(&mu_);
- // TODO(roth): When we add support for direct federation, use the
- // server name specified in lrs_server.
- auto it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (it == load_report_map_.end()) return;
- LoadReportState& load_report_state = it->second;
- auto locality_it = load_report_state.locality_stats.find(locality);
- if (locality_it == load_report_state.locality_stats.end()) return;
- LoadReportState::LocalityState& locality_state = locality_it->second;
- if (locality_state.locality_stats == cluster_locality_stats) {
- // Record final snapshot in deleted_locality_stats, which will be
- // added to the next load report.
- locality_state.deleted_locality_stats +=
- locality_state.locality_stats->GetSnapshotAndReset();
- locality_state.locality_stats = nullptr;
- }
- }
- void XdsClient::ResetBackoff() {
- MutexLock lock(&mu_);
- if (chand_ != nullptr) {
- grpc_channel_reset_connect_backoff(chand_->channel());
- }
- }
- void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
- for (const auto& p : listener_map_) {
- const ListenerState& listener_state = p.second;
- for (const auto& p : listener_state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(error));
- }
- }
- for (const auto& p : route_config_map_) {
- const RouteConfigState& route_config_state = p.second;
- for (const auto& p : route_config_state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(error));
- }
- }
- for (const auto& p : cluster_map_) {
- const ClusterState& cluster_state = p.second;
- for (const auto& p : cluster_state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(error));
- }
- }
- for (const auto& p : endpoint_map_) {
- const EndpointState& endpoint_state = p.second;
- for (const auto& p : endpoint_state.watchers) {
- p.first->OnError(GRPC_ERROR_REF(error));
- }
- }
- GRPC_ERROR_UNREF(error);
- }
- XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
- bool send_all_clusters, const std::set<std::string>& clusters) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
- }
- XdsApi::ClusterLoadReportMap snapshot_map;
- for (auto load_report_it = load_report_map_.begin();
- load_report_it != load_report_map_.end();) {
- // Cluster key is cluster and EDS service name.
- const auto& cluster_key = load_report_it->first;
- LoadReportState& load_report = load_report_it->second;
- // If the CDS response for a cluster indicates to use LRS but the
- // LRS server does not say that it wants reports for this cluster,
- // then we'll have stats objects here whose data we're not going to
- // include in the load report. However, we still need to clear out
- // the data from the stats objects, so that if the LRS server starts
- // asking for the data in the future, we don't incorrectly include
- // data from previous reporting intervals in that future report.
- const bool record_stats =
- send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
- XdsApi::ClusterLoadReport snapshot;
- // Aggregate drop stats.
- snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
- if (load_report.drop_stats != nullptr) {
- snapshot.dropped_requests +=
- load_report.drop_stats->GetSnapshotAndReset();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
- this, cluster_key.first.c_str(), cluster_key.second.c_str(),
- load_report.drop_stats);
- }
- }
- // Aggregate locality stats.
- for (auto it = load_report.locality_stats.begin();
- it != load_report.locality_stats.end();) {
- const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
- auto& locality_state = it->second;
- XdsClusterLocalityStats::Snapshot& locality_snapshot =
- snapshot.locality_stats[locality_name];
- locality_snapshot = std::move(locality_state.deleted_locality_stats);
- if (locality_state.locality_stats != nullptr) {
- locality_snapshot +=
- locality_state.locality_stats->GetSnapshotAndReset();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] cluster=%s eds_service_name=%s "
- "locality=%s locality_stats=%p",
- this, cluster_key.first.c_str(), cluster_key.second.c_str(),
- locality_name->AsHumanReadableString().c_str(),
- locality_state.locality_stats);
- }
- }
- // If the only thing left in this entry was final snapshots from
- // deleted locality stats objects, remove the entry.
- if (locality_state.locality_stats == nullptr) {
- it = load_report.locality_stats.erase(it);
- } else {
- ++it;
- }
- }
- // Compute load report interval.
- const grpc_millis now = ExecCtx::Get()->Now();
- snapshot.load_report_interval = now - load_report.last_report_time;
- load_report.last_report_time = now;
- // Record snapshot.
- if (record_stats) {
- snapshot_map[cluster_key] = std::move(snapshot);
- }
- // If the only thing left in this entry was final snapshots from
- // deleted stats objects, remove the entry.
- if (load_report.locality_stats.empty() &&
- load_report.drop_stats == nullptr) {
- load_report_it = load_report_map_.erase(load_report_it);
- } else {
- ++load_report_it;
- }
- }
- return snapshot_map;
- }
- //
- // accessors for global state
- //
- void XdsClientGlobalInit() {
- g_mu = new Mutex;
- XdsHttpFilterRegistry::Init();
- }
- void XdsClientGlobalShutdown() {
- delete g_mu;
- g_mu = nullptr;
- gpr_free(g_fallback_bootstrap_config);
- g_fallback_bootstrap_config = nullptr;
- XdsHttpFilterRegistry::Shutdown();
- }
- RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
- MutexLock lock(g_mu);
- if (g_xds_client != nullptr) {
- auto xds_client = g_xds_client->RefIfNonZero();
- if (xds_client != nullptr) return xds_client;
- }
- auto xds_client = MakeRefCounted<XdsClient>(error);
- g_xds_client = xds_client.get();
- return xds_client;
- }
- namespace internal {
- void SetXdsChannelArgsForTest(grpc_channel_args* args) {
- MutexLock lock(g_mu);
- g_channel_args = args;
- }
- void UnsetGlobalXdsClientForTest() {
- MutexLock lock(g_mu);
- g_xds_client = nullptr;
- }
- void SetXdsFallbackBootstrapConfig(const char* config) {
- MutexLock lock(g_mu);
- gpr_free(g_fallback_bootstrap_config);
- g_fallback_bootstrap_config = gpr_strdup(config);
- }
- } // namespace internal
- } // namespace grpc_core
|