xds_client.cc 84 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc/support/port_platform.h>
  19. #include <inttypes.h>
  20. #include <limits.h>
  21. #include <string.h>
  22. #include "absl/container/inlined_vector.h"
  23. #include "absl/strings/str_format.h"
  24. #include "absl/strings/str_join.h"
  25. #include "absl/strings/string_view.h"
  26. #include <grpc/byte_buffer_reader.h>
  27. #include <grpc/grpc.h>
  28. #include <grpc/support/alloc.h>
  29. #include <grpc/support/time.h>
  30. #include "src/core/ext/filters/client_channel/client_channel.h"
  31. #include "src/core/ext/filters/client_channel/service_config.h"
  32. #include "src/core/ext/xds/xds_api.h"
  33. #include "src/core/ext/xds/xds_channel_args.h"
  34. #include "src/core/ext/xds/xds_client.h"
  35. #include "src/core/ext/xds/xds_client_stats.h"
  36. #include "src/core/ext/xds/xds_http_filters.h"
  37. #include "src/core/lib/backoff/backoff.h"
  38. #include "src/core/lib/channel/channel_args.h"
  39. #include "src/core/lib/channel/channel_stack.h"
  40. #include "src/core/lib/gpr/string.h"
  41. #include "src/core/lib/gprpp/memory.h"
  42. #include "src/core/lib/gprpp/orphanable.h"
  43. #include "src/core/lib/gprpp/ref_counted_ptr.h"
  44. #include "src/core/lib/gprpp/sync.h"
  45. #include "src/core/lib/iomgr/sockaddr.h"
  46. #include "src/core/lib/iomgr/sockaddr_utils.h"
  47. #include "src/core/lib/iomgr/timer.h"
  48. #include "src/core/lib/slice/slice_internal.h"
  49. #include "src/core/lib/slice/slice_string_helpers.h"
  50. #include "src/core/lib/surface/call.h"
  51. #include "src/core/lib/surface/channel.h"
  52. #include "src/core/lib/surface/channel_init.h"
  53. #include "src/core/lib/transport/static_metadata.h"
  54. #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
  55. #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
  56. #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
  57. #define GRPC_XDS_RECONNECT_JITTER 0.2
  58. #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
  59. namespace grpc_core {
  60. TraceFlag grpc_xds_client_trace(false, "xds_client");
  61. TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
  62. namespace {
  63. Mutex* g_mu = nullptr;
  64. const grpc_channel_args* g_channel_args = nullptr;
  65. XdsClient* g_xds_client = nullptr;
  66. char* g_fallback_bootstrap_config = nullptr;
  67. } // namespace
  68. //
  69. // Internal class declarations
  70. //
  71. // An xds call wrapper that can restart a call upon failure. Holds a ref to
  72. // the xds channel. The template parameter is the kind of wrapped xds call.
  73. template <typename T>
  74. class XdsClient::ChannelState::RetryableCall
  75. : public InternallyRefCounted<RetryableCall<T>> {
  76. public:
  77. explicit RetryableCall(RefCountedPtr<ChannelState> chand);
  78. void Orphan() override;
  79. void OnCallFinishedLocked();
  80. T* calld() const { return calld_.get(); }
  81. ChannelState* chand() const { return chand_.get(); }
  82. bool IsCurrentCallOnChannel() const;
  83. private:
  84. void StartNewCallLocked();
  85. void StartRetryTimerLocked();
  86. static void OnRetryTimer(void* arg, grpc_error* error);
  87. void OnRetryTimerLocked(grpc_error* error);
  88. // The wrapped xds call that talks to the xds server. It's instantiated
  89. // every time we start a new call. It's null during call retry backoff.
  90. OrphanablePtr<T> calld_;
  91. // The owning xds channel.
  92. RefCountedPtr<ChannelState> chand_;
  93. // Retry state.
  94. BackOff backoff_;
  95. grpc_timer retry_timer_;
  96. grpc_closure on_retry_timer_;
  97. bool retry_timer_callback_pending_ = false;
  98. bool shutting_down_ = false;
  99. };
  100. // Contains an ADS call to the xds server.
  101. class XdsClient::ChannelState::AdsCallState
  102. : public InternallyRefCounted<AdsCallState> {
  103. public:
  104. // The ctor and dtor should not be used directly.
  105. explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
  106. ~AdsCallState() override;
  107. void Orphan() override;
  108. RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
  109. ChannelState* chand() const { return parent_->chand(); }
  110. XdsClient* xds_client() const { return chand()->xds_client(); }
  111. bool seen_response() const { return seen_response_; }
  112. void Subscribe(const std::string& type_url, const std::string& name);
  113. void Unsubscribe(const std::string& type_url, const std::string& name,
  114. bool delay_unsubscription);
  115. bool HasSubscribedResources() const;
  116. private:
  117. class ResourceState : public InternallyRefCounted<ResourceState> {
  118. public:
  119. ResourceState(const std::string& type_url, const std::string& name,
  120. bool sent_initial_request)
  121. : type_url_(type_url),
  122. name_(name),
  123. sent_initial_request_(sent_initial_request) {
  124. GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
  125. grpc_schedule_on_exec_ctx);
  126. }
  127. void Orphan() override {
  128. Finish();
  129. Unref(DEBUG_LOCATION, "Orphan");
  130. }
  131. void Start(RefCountedPtr<AdsCallState> ads_calld) {
  132. if (sent_initial_request_) return;
  133. sent_initial_request_ = true;
  134. ads_calld_ = std::move(ads_calld);
  135. Ref(DEBUG_LOCATION, "timer").release();
  136. timer_pending_ = true;
  137. grpc_timer_init(
  138. &timer_,
  139. ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
  140. &timer_callback_);
  141. }
  142. void Finish() {
  143. if (timer_pending_) {
  144. grpc_timer_cancel(&timer_);
  145. timer_pending_ = false;
  146. }
  147. }
  148. private:
  149. static void OnTimer(void* arg, grpc_error* error) {
  150. ResourceState* self = static_cast<ResourceState*>(arg);
  151. {
  152. MutexLock lock(&self->ads_calld_->xds_client()->mu_);
  153. self->OnTimerLocked(GRPC_ERROR_REF(error));
  154. }
  155. self->ads_calld_.reset();
  156. self->Unref(DEBUG_LOCATION, "timer");
  157. }
  158. void OnTimerLocked(grpc_error* error) {
  159. if (error == GRPC_ERROR_NONE && timer_pending_) {
  160. timer_pending_ = false;
  161. grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
  162. absl::StrFormat(
  163. "timeout obtaining resource {type=%s name=%s} from xds server",
  164. type_url_, name_)
  165. .c_str());
  166. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  167. gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
  168. grpc_error_string(watcher_error));
  169. }
  170. if (type_url_ == XdsApi::kLdsTypeUrl) {
  171. ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
  172. for (const auto& p : state.watchers) {
  173. p.first->OnError(GRPC_ERROR_REF(watcher_error));
  174. }
  175. } else if (type_url_ == XdsApi::kRdsTypeUrl) {
  176. RouteConfigState& state =
  177. ads_calld_->xds_client()->route_config_map_[name_];
  178. for (const auto& p : state.watchers) {
  179. p.first->OnError(GRPC_ERROR_REF(watcher_error));
  180. }
  181. } else if (type_url_ == XdsApi::kCdsTypeUrl) {
  182. ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
  183. for (const auto& p : state.watchers) {
  184. p.first->OnError(GRPC_ERROR_REF(watcher_error));
  185. }
  186. } else if (type_url_ == XdsApi::kEdsTypeUrl) {
  187. EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
  188. for (const auto& p : state.watchers) {
  189. p.first->OnError(GRPC_ERROR_REF(watcher_error));
  190. }
  191. } else {
  192. GPR_UNREACHABLE_CODE(return );
  193. }
  194. GRPC_ERROR_UNREF(watcher_error);
  195. }
  196. GRPC_ERROR_UNREF(error);
  197. }
  198. const std::string type_url_;
  199. const std::string name_;
  200. RefCountedPtr<AdsCallState> ads_calld_;
  201. bool sent_initial_request_;
  202. bool timer_pending_ = false;
  203. grpc_timer timer_;
  204. grpc_closure timer_callback_;
  205. };
  206. struct ResourceTypeState {
  207. ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
  208. // Nonce and error for this resource type.
  209. std::string nonce;
  210. grpc_error* error = GRPC_ERROR_NONE;
  211. // Subscribed resources of this type.
  212. std::map<std::string /* name */, OrphanablePtr<ResourceState>>
  213. subscribed_resources;
  214. };
  215. void SendMessageLocked(const std::string& type_url);
  216. void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
  217. void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
  218. void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
  219. void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
  220. static void OnRequestSent(void* arg, grpc_error* error);
  221. void OnRequestSentLocked(grpc_error* error);
  222. static void OnResponseReceived(void* arg, grpc_error* error);
  223. bool OnResponseReceivedLocked();
  224. static void OnStatusReceived(void* arg, grpc_error* error);
  225. void OnStatusReceivedLocked(grpc_error* error);
  226. bool IsCurrentCallOnChannel() const;
  227. std::set<absl::string_view> ResourceNamesForRequest(
  228. const std::string& type_url);
  229. // The owning RetryableCall<>.
  230. RefCountedPtr<RetryableCall<AdsCallState>> parent_;
  231. bool sent_initial_message_ = false;
  232. bool seen_response_ = false;
  233. // Always non-NULL.
  234. grpc_call* call_;
  235. // recv_initial_metadata
  236. grpc_metadata_array initial_metadata_recv_;
  237. // send_message
  238. grpc_byte_buffer* send_message_payload_ = nullptr;
  239. grpc_closure on_request_sent_;
  240. // recv_message
  241. grpc_byte_buffer* recv_message_payload_ = nullptr;
  242. grpc_closure on_response_received_;
  243. // recv_trailing_metadata
  244. grpc_metadata_array trailing_metadata_recv_;
  245. grpc_status_code status_code_;
  246. grpc_slice status_details_;
  247. grpc_closure on_status_received_;
  248. // Resource types for which requests need to be sent.
  249. std::set<std::string /*type_url*/> buffered_requests_;
  250. // State for each resource type.
  251. std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
  252. };
  253. // Contains an LRS call to the xds server.
  254. class XdsClient::ChannelState::LrsCallState
  255. : public InternallyRefCounted<LrsCallState> {
  256. public:
  257. // The ctor and dtor should not be used directly.
  258. explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
  259. ~LrsCallState() override;
  260. void Orphan() override;
  261. void MaybeStartReportingLocked();
  262. RetryableCall<LrsCallState>* parent() { return parent_.get(); }
  263. ChannelState* chand() const { return parent_->chand(); }
  264. XdsClient* xds_client() const { return chand()->xds_client(); }
  265. bool seen_response() const { return seen_response_; }
  266. private:
  267. // Reports client-side load stats according to a fixed interval.
  268. class Reporter : public InternallyRefCounted<Reporter> {
  269. public:
  270. Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
  271. : parent_(std::move(parent)), report_interval_(report_interval) {
  272. GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
  273. grpc_schedule_on_exec_ctx);
  274. GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
  275. grpc_schedule_on_exec_ctx);
  276. ScheduleNextReportLocked();
  277. }
  278. void Orphan() override;
  279. private:
  280. void ScheduleNextReportLocked();
  281. static void OnNextReportTimer(void* arg, grpc_error* error);
  282. bool OnNextReportTimerLocked(grpc_error* error);
  283. bool SendReportLocked();
  284. static void OnReportDone(void* arg, grpc_error* error);
  285. bool OnReportDoneLocked(grpc_error* error);
  286. bool IsCurrentReporterOnCall() const {
  287. return this == parent_->reporter_.get();
  288. }
  289. XdsClient* xds_client() const { return parent_->xds_client(); }
  290. // The owning LRS call.
  291. RefCountedPtr<LrsCallState> parent_;
  292. // The load reporting state.
  293. const grpc_millis report_interval_;
  294. bool last_report_counters_were_zero_ = false;
  295. bool next_report_timer_callback_pending_ = false;
  296. grpc_timer next_report_timer_;
  297. grpc_closure on_next_report_timer_;
  298. grpc_closure on_report_done_;
  299. };
  300. static void OnInitialRequestSent(void* arg, grpc_error* error);
  301. void OnInitialRequestSentLocked();
  302. static void OnResponseReceived(void* arg, grpc_error* error);
  303. bool OnResponseReceivedLocked();
  304. static void OnStatusReceived(void* arg, grpc_error* error);
  305. void OnStatusReceivedLocked(grpc_error* error);
  306. bool IsCurrentCallOnChannel() const;
  307. // The owning RetryableCall<>.
  308. RefCountedPtr<RetryableCall<LrsCallState>> parent_;
  309. bool seen_response_ = false;
  310. // Always non-NULL.
  311. grpc_call* call_;
  312. // recv_initial_metadata
  313. grpc_metadata_array initial_metadata_recv_;
  314. // send_message
  315. grpc_byte_buffer* send_message_payload_ = nullptr;
  316. grpc_closure on_initial_request_sent_;
  317. // recv_message
  318. grpc_byte_buffer* recv_message_payload_ = nullptr;
  319. grpc_closure on_response_received_;
  320. // recv_trailing_metadata
  321. grpc_metadata_array trailing_metadata_recv_;
  322. grpc_status_code status_code_;
  323. grpc_slice status_details_;
  324. grpc_closure on_status_received_;
  325. // Load reporting state.
  326. bool send_all_clusters_ = false;
  327. std::set<std::string> cluster_names_; // Asked for by the LRS server.
  328. grpc_millis load_reporting_interval_ = 0;
  329. OrphanablePtr<Reporter> reporter_;
  330. };
  331. //
  332. // XdsClient::ChannelState::StateWatcher
  333. //
  334. class XdsClient::ChannelState::StateWatcher
  335. : public AsyncConnectivityStateWatcherInterface {
  336. public:
  337. explicit StateWatcher(RefCountedPtr<ChannelState> parent)
  338. : parent_(std::move(parent)) {}
  339. private:
  340. void OnConnectivityStateChange(grpc_connectivity_state new_state,
  341. const absl::Status& status) override {
  342. MutexLock lock(&parent_->xds_client_->mu_);
  343. if (!parent_->shutting_down_ &&
  344. new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
  345. // In TRANSIENT_FAILURE. Notify all watchers of error.
  346. gpr_log(GPR_INFO,
  347. "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
  348. "status_message:(%s)",
  349. parent_->xds_client(), status.ToString().c_str());
  350. parent_->xds_client()->NotifyOnErrorLocked(
  351. GRPC_ERROR_CREATE_FROM_STATIC_STRING(
  352. "xds channel in TRANSIENT_FAILURE"));
  353. }
  354. }
  355. RefCountedPtr<ChannelState> parent_;
  356. };
  357. //
  358. // XdsClient::ChannelState
  359. //
  360. namespace {
  361. grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
  362. // Build channel args.
  363. absl::InlinedVector<grpc_arg, 2> args_to_add = {
  364. grpc_channel_arg_integer_create(
  365. const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
  366. 5 * 60 * GPR_MS_PER_SEC),
  367. grpc_channel_arg_integer_create(
  368. const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
  369. };
  370. grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
  371. g_channel_args, args_to_add.data(), args_to_add.size());
  372. // Create channel creds.
  373. RefCountedPtr<grpc_channel_credentials> channel_creds =
  374. XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
  375. server.channel_creds_config);
  376. // Create channel.
  377. grpc_channel* channel = grpc_secure_channel_create(
  378. channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
  379. grpc_channel_args_destroy(new_args);
  380. return channel;
  381. }
  382. } // namespace
  383. XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
  384. const XdsBootstrap::XdsServer& server)
  385. : InternallyRefCounted<ChannelState>(
  386. GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
  387. ? "ChannelState"
  388. : nullptr),
  389. xds_client_(std::move(xds_client)),
  390. server_(server) {
  391. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  392. gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
  393. xds_client_.get(), server.server_uri.c_str());
  394. }
  395. channel_ = CreateXdsChannel(server);
  396. GPR_ASSERT(channel_ != nullptr);
  397. StartConnectivityWatchLocked();
  398. }
  399. XdsClient::ChannelState::~ChannelState() {
  400. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  401. gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
  402. this);
  403. }
  404. grpc_channel_destroy(channel_);
  405. xds_client_.reset(DEBUG_LOCATION, "ChannelState");
  406. }
  407. void XdsClient::ChannelState::Orphan() {
  408. shutting_down_ = true;
  409. CancelConnectivityWatchLocked();
  410. ads_calld_.reset();
  411. lrs_calld_.reset();
  412. Unref(DEBUG_LOCATION, "ChannelState+orphaned");
  413. }
  414. XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
  415. const {
  416. return ads_calld_->calld();
  417. }
  418. XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
  419. const {
  420. return lrs_calld_->calld();
  421. }
  422. bool XdsClient::ChannelState::HasActiveAdsCall() const {
  423. return ads_calld_->calld() != nullptr;
  424. }
  425. void XdsClient::ChannelState::MaybeStartLrsCall() {
  426. if (lrs_calld_ != nullptr) return;
  427. lrs_calld_.reset(
  428. new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
  429. }
  430. void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
  431. void XdsClient::ChannelState::StartConnectivityWatchLocked() {
  432. grpc_channel_element* client_channel_elem =
  433. grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
  434. GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
  435. watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
  436. grpc_client_channel_start_connectivity_watch(
  437. client_channel_elem, GRPC_CHANNEL_IDLE,
  438. OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
  439. }
  440. void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
  441. grpc_channel_element* client_channel_elem =
  442. grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
  443. GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
  444. grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
  445. }
  446. void XdsClient::ChannelState::Subscribe(const std::string& type_url,
  447. const std::string& name) {
  448. if (ads_calld_ == nullptr) {
  449. // Start the ADS call if this is the first request.
  450. ads_calld_.reset(new RetryableCall<AdsCallState>(
  451. Ref(DEBUG_LOCATION, "ChannelState+ads")));
  452. // Note: AdsCallState's ctor will automatically subscribe to all
  453. // resources that the XdsClient already has watchers for, so we can
  454. // return here.
  455. return;
  456. }
  457. // If the ADS call is in backoff state, we don't need to do anything now
  458. // because when the call is restarted it will resend all necessary requests.
  459. if (ads_calld() == nullptr) return;
  460. // Subscribe to this resource if the ADS call is active.
  461. ads_calld()->Subscribe(type_url, name);
  462. }
  463. void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
  464. const std::string& name,
  465. bool delay_unsubscription) {
  466. if (ads_calld_ != nullptr) {
  467. auto* calld = ads_calld_->calld();
  468. if (calld != nullptr) {
  469. calld->Unsubscribe(type_url, name, delay_unsubscription);
  470. if (!calld->HasSubscribedResources()) ads_calld_.reset();
  471. }
  472. }
  473. }
  474. //
  475. // XdsClient::ChannelState::RetryableCall<>
  476. //
  477. template <typename T>
  478. XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
  479. RefCountedPtr<ChannelState> chand)
  480. : chand_(std::move(chand)),
  481. backoff_(
  482. BackOff::Options()
  483. .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
  484. 1000)
  485. .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
  486. .set_jitter(GRPC_XDS_RECONNECT_JITTER)
  487. .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
  488. // Closure Initialization
  489. GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
  490. grpc_schedule_on_exec_ctx);
  491. StartNewCallLocked();
  492. }
  493. template <typename T>
  494. void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
  495. shutting_down_ = true;
  496. calld_.reset();
  497. if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
  498. this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
  499. }
  500. template <typename T>
  501. void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
  502. const bool seen_response = calld_->seen_response();
  503. calld_.reset();
  504. if (seen_response) {
  505. // If we lost connection to the xds server, reset backoff and restart the
  506. // call immediately.
  507. backoff_.Reset();
  508. StartNewCallLocked();
  509. } else {
  510. // If we failed to connect to the xds server, retry later.
  511. StartRetryTimerLocked();
  512. }
  513. }
  514. template <typename T>
  515. void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
  516. if (shutting_down_) return;
  517. GPR_ASSERT(chand_->channel_ != nullptr);
  518. GPR_ASSERT(calld_ == nullptr);
  519. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  520. gpr_log(GPR_INFO,
  521. "[xds_client %p] Start new call from retryable call (chand: %p, "
  522. "retryable call: %p)",
  523. chand()->xds_client(), chand(), this);
  524. }
  525. calld_ = MakeOrphanable<T>(
  526. this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
  527. }
  528. template <typename T>
  529. void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
  530. if (shutting_down_) return;
  531. const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
  532. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  533. grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
  534. gpr_log(GPR_INFO,
  535. "[xds_client %p] Failed to connect to xds server (chand: %p) "
  536. "retry timer will fire in %" PRId64 "ms.",
  537. chand()->xds_client(), chand(), timeout);
  538. }
  539. this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
  540. grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
  541. retry_timer_callback_pending_ = true;
  542. }
  543. template <typename T>
  544. void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
  545. void* arg, grpc_error* error) {
  546. RetryableCall* calld = static_cast<RetryableCall*>(arg);
  547. {
  548. MutexLock lock(&calld->chand_->xds_client()->mu_);
  549. calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
  550. }
  551. calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
  552. }
  553. template <typename T>
  554. void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
  555. grpc_error* error) {
  556. retry_timer_callback_pending_ = false;
  557. if (!shutting_down_ && error == GRPC_ERROR_NONE) {
  558. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  559. gpr_log(
  560. GPR_INFO,
  561. "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
  562. chand()->xds_client(), chand(), this);
  563. }
  564. StartNewCallLocked();
  565. }
  566. GRPC_ERROR_UNREF(error);
  567. }
  568. //
  569. // XdsClient::ChannelState::AdsCallState
  570. //
  571. XdsClient::ChannelState::AdsCallState::AdsCallState(
  572. RefCountedPtr<RetryableCall<AdsCallState>> parent)
  573. : InternallyRefCounted<AdsCallState>(
  574. GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
  575. ? "AdsCallState"
  576. : nullptr),
  577. parent_(std::move(parent)) {
  578. // Init the ADS call. Note that the call will progress every time there's
  579. // activity in xds_client()->interested_parties_, which is comprised of
  580. // the polling entities from client_channel.
  581. GPR_ASSERT(xds_client() != nullptr);
  582. // Create a call with the specified method name.
  583. const auto& method =
  584. chand()->server_.ShouldUseV3()
  585. ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
  586. : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
  587. call_ = grpc_channel_create_pollset_set_call(
  588. chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
  589. xds_client()->interested_parties_, method, nullptr,
  590. GRPC_MILLIS_INF_FUTURE, nullptr);
  591. GPR_ASSERT(call_ != nullptr);
  592. // Init data associated with the call.
  593. grpc_metadata_array_init(&initial_metadata_recv_);
  594. grpc_metadata_array_init(&trailing_metadata_recv_);
  595. // Start the call.
  596. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  597. gpr_log(GPR_INFO,
  598. "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
  599. "call: %p)",
  600. xds_client(), chand(), this, call_);
  601. }
  602. // Create the ops.
  603. grpc_call_error call_error;
  604. grpc_op ops[3];
  605. memset(ops, 0, sizeof(ops));
  606. // Op: send initial metadata.
  607. grpc_op* op = ops;
  608. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  609. op->data.send_initial_metadata.count = 0;
  610. op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
  611. GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
  612. op->reserved = nullptr;
  613. op++;
  614. call_error = grpc_call_start_batch_and_execute(
  615. call_, ops, static_cast<size_t>(op - ops), nullptr);
  616. GPR_ASSERT(GRPC_CALL_OK == call_error);
  617. // Op: send request message.
  618. GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
  619. grpc_schedule_on_exec_ctx);
  620. for (const auto& p : xds_client()->listener_map_) {
  621. Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
  622. }
  623. for (const auto& p : xds_client()->route_config_map_) {
  624. Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
  625. }
  626. for (const auto& p : xds_client()->cluster_map_) {
  627. Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
  628. }
  629. for (const auto& p : xds_client()->endpoint_map_) {
  630. Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
  631. }
  632. // Op: recv initial metadata.
  633. op = ops;
  634. op->op = GRPC_OP_RECV_INITIAL_METADATA;
  635. op->data.recv_initial_metadata.recv_initial_metadata =
  636. &initial_metadata_recv_;
  637. op->flags = 0;
  638. op->reserved = nullptr;
  639. op++;
  640. // Op: recv response.
  641. op->op = GRPC_OP_RECV_MESSAGE;
  642. op->data.recv_message.recv_message = &recv_message_payload_;
  643. op->flags = 0;
  644. op->reserved = nullptr;
  645. op++;
  646. Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
  647. GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
  648. grpc_schedule_on_exec_ctx);
  649. call_error = grpc_call_start_batch_and_execute(
  650. call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
  651. GPR_ASSERT(GRPC_CALL_OK == call_error);
  652. // Op: recv server status.
  653. op = ops;
  654. op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  655. op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
  656. op->data.recv_status_on_client.status = &status_code_;
  657. op->data.recv_status_on_client.status_details = &status_details_;
  658. op->flags = 0;
  659. op->reserved = nullptr;
  660. op++;
  661. // This callback signals the end of the call, so it relies on the initial
  662. // ref instead of a new ref. When it's invoked, it's the initial ref that is
  663. // unreffed.
  664. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
  665. grpc_schedule_on_exec_ctx);
  666. call_error = grpc_call_start_batch_and_execute(
  667. call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
  668. GPR_ASSERT(GRPC_CALL_OK == call_error);
  669. }
  670. XdsClient::ChannelState::AdsCallState::~AdsCallState() {
  671. grpc_metadata_array_destroy(&initial_metadata_recv_);
  672. grpc_metadata_array_destroy(&trailing_metadata_recv_);
  673. grpc_byte_buffer_destroy(send_message_payload_);
  674. grpc_byte_buffer_destroy(recv_message_payload_);
  675. grpc_slice_unref_internal(status_details_);
  676. GPR_ASSERT(call_ != nullptr);
  677. grpc_call_unref(call_);
  678. }
  679. void XdsClient::ChannelState::AdsCallState::Orphan() {
  680. GPR_ASSERT(call_ != nullptr);
  681. // If we are here because xds_client wants to cancel the call,
  682. // on_status_received_ will complete the cancellation and clean up. Otherwise,
  683. // we are here because xds_client has to orphan a failed call, then the
  684. // following cancellation will be a no-op.
  685. grpc_call_cancel_internal(call_);
  686. state_map_.clear();
  687. // Note that the initial ref is hold by on_status_received_. So the
  688. // corresponding unref happens in on_status_received_ instead of here.
  689. }
  690. void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
  691. const std::string& type_url) {
  692. // Buffer message sending if an existing message is in flight.
  693. if (send_message_payload_ != nullptr) {
  694. buffered_requests_.insert(type_url);
  695. return;
  696. }
  697. auto& state = state_map_[type_url];
  698. grpc_slice request_payload_slice;
  699. std::set<absl::string_view> resource_names =
  700. ResourceNamesForRequest(type_url);
  701. request_payload_slice = xds_client()->api_.CreateAdsRequest(
  702. chand()->server_, type_url, resource_names,
  703. xds_client()->resource_version_map_[type_url], state.nonce,
  704. GRPC_ERROR_REF(state.error), !sent_initial_message_);
  705. if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
  706. type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
  707. state_map_.erase(type_url);
  708. }
  709. sent_initial_message_ = true;
  710. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  711. gpr_log(GPR_INFO,
  712. "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
  713. "error=%s resources=%s",
  714. xds_client(), type_url.c_str(),
  715. xds_client()->resource_version_map_[type_url].c_str(),
  716. state.nonce.c_str(), grpc_error_string(state.error),
  717. absl::StrJoin(resource_names, " ").c_str());
  718. }
  719. GRPC_ERROR_UNREF(state.error);
  720. state.error = GRPC_ERROR_NONE;
  721. // Create message payload.
  722. send_message_payload_ =
  723. grpc_raw_byte_buffer_create(&request_payload_slice, 1);
  724. grpc_slice_unref_internal(request_payload_slice);
  725. // Send the message.
  726. grpc_op op;
  727. memset(&op, 0, sizeof(op));
  728. op.op = GRPC_OP_SEND_MESSAGE;
  729. op.data.send_message.send_message = send_message_payload_;
  730. Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
  731. GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
  732. grpc_schedule_on_exec_ctx);
  733. grpc_call_error call_error =
  734. grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
  735. if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
  736. gpr_log(GPR_ERROR,
  737. "[xds_client %p] calld=%p call_error=%d sending ADS message",
  738. xds_client(), this, call_error);
  739. GPR_ASSERT(GRPC_CALL_OK == call_error);
  740. }
  741. }
  742. void XdsClient::ChannelState::AdsCallState::Subscribe(
  743. const std::string& type_url, const std::string& name) {
  744. auto& state = state_map_[type_url].subscribed_resources[name];
  745. if (state == nullptr) {
  746. state = MakeOrphanable<ResourceState>(
  747. type_url, name, !xds_client()->resource_version_map_[type_url].empty());
  748. SendMessageLocked(type_url);
  749. }
  750. }
  751. void XdsClient::ChannelState::AdsCallState::Unsubscribe(
  752. const std::string& type_url, const std::string& name,
  753. bool delay_unsubscription) {
  754. state_map_[type_url].subscribed_resources.erase(name);
  755. if (!delay_unsubscription) SendMessageLocked(type_url);
  756. }
  757. bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
  758. for (const auto& p : state_map_) {
  759. if (!p.second.subscribed_resources.empty()) return true;
  760. }
  761. return false;
  762. }
  763. void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
  764. XdsApi::LdsUpdateMap lds_update_map) {
  765. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  766. gpr_log(GPR_INFO,
  767. "[xds_client %p] LDS update received containing %" PRIuPTR
  768. " resources",
  769. xds_client(), lds_update_map.size());
  770. }
  771. auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
  772. std::set<std::string> rds_resource_names_seen;
  773. for (auto& p : lds_update_map) {
  774. const std::string& listener_name = p.first;
  775. XdsApi::LdsUpdate& lds_update = p.second;
  776. auto& state = lds_state.subscribed_resources[listener_name];
  777. if (state != nullptr) state->Finish();
  778. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  779. gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
  780. listener_name.c_str(), lds_update.ToString().c_str());
  781. }
  782. // Record the RDS resource names seen.
  783. if (!lds_update.http_connection_manager.route_config_name.empty()) {
  784. rds_resource_names_seen.insert(
  785. lds_update.http_connection_manager.route_config_name);
  786. }
  787. // Ignore identical update.
  788. ListenerState& listener_state = xds_client()->listener_map_[listener_name];
  789. if (listener_state.update.has_value() &&
  790. *listener_state.update == lds_update) {
  791. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  792. gpr_log(GPR_INFO,
  793. "[xds_client %p] LDS update for %s identical to current, "
  794. "ignoring.",
  795. xds_client(), listener_name.c_str());
  796. }
  797. continue;
  798. }
  799. // Update the listener state.
  800. listener_state.update = std::move(lds_update);
  801. // Notify watchers.
  802. for (const auto& p : listener_state.watchers) {
  803. p.first->OnListenerChanged(*listener_state.update);
  804. }
  805. }
  806. // For any subscribed resource that is not present in the update,
  807. // remove it from the cache and notify watchers that it does not exist.
  808. for (const auto& p : lds_state.subscribed_resources) {
  809. const std::string& listener_name = p.first;
  810. if (lds_update_map.find(listener_name) == lds_update_map.end()) {
  811. ListenerState& listener_state =
  812. xds_client()->listener_map_[listener_name];
  813. // If the resource was newly requested but has not yet been received,
  814. // we don't want to generate an error for the watchers, because this LDS
  815. // response may be in reaction to an earlier request that did not yet
  816. // request the new resource, so its absence from the response does not
  817. // necessarily indicate that the resource does not exist.
  818. // For that case, we rely on the request timeout instead.
  819. if (!listener_state.update.has_value()) continue;
  820. listener_state.update.reset();
  821. for (const auto& p : listener_state.watchers) {
  822. p.first->OnResourceDoesNotExist();
  823. }
  824. }
  825. }
  826. // For any RDS resource that is no longer referred to by any LDS
  827. // resources, remove it from the cache and notify watchers that it
  828. // does not exist.
  829. auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
  830. for (const auto& p : rds_state.subscribed_resources) {
  831. const std::string& rds_resource_name = p.first;
  832. if (rds_resource_names_seen.find(rds_resource_name) ==
  833. rds_resource_names_seen.end()) {
  834. RouteConfigState& route_config_state =
  835. xds_client()->route_config_map_[rds_resource_name];
  836. route_config_state.update.reset();
  837. for (const auto& p : route_config_state.watchers) {
  838. p.first->OnResourceDoesNotExist();
  839. }
  840. }
  841. }
  842. }
  843. void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
  844. XdsApi::RdsUpdateMap rds_update_map) {
  845. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  846. gpr_log(GPR_INFO,
  847. "[xds_client %p] RDS update received containing %" PRIuPTR
  848. " resources",
  849. xds_client(), rds_update_map.size());
  850. }
  851. auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
  852. for (auto& p : rds_update_map) {
  853. const std::string& route_config_name = p.first;
  854. XdsApi::RdsUpdate& rds_update = p.second;
  855. auto& state = rds_state.subscribed_resources[route_config_name];
  856. if (state != nullptr) state->Finish();
  857. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  858. gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
  859. rds_update.ToString().c_str());
  860. }
  861. RouteConfigState& route_config_state =
  862. xds_client()->route_config_map_[route_config_name];
  863. // Ignore identical update.
  864. if (route_config_state.update.has_value() &&
  865. *route_config_state.update == rds_update) {
  866. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  867. gpr_log(GPR_INFO,
  868. "[xds_client %p] RDS resource identical to current, ignoring",
  869. xds_client());
  870. }
  871. continue;
  872. }
  873. // Update the cache.
  874. route_config_state.update = std::move(rds_update);
  875. // Notify all watchers.
  876. for (const auto& p : route_config_state.watchers) {
  877. p.first->OnRouteConfigChanged(*route_config_state.update);
  878. }
  879. }
  880. }
  881. void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
  882. XdsApi::CdsUpdateMap cds_update_map) {
  883. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  884. gpr_log(GPR_INFO,
  885. "[xds_client %p] CDS update received containing %" PRIuPTR
  886. " resources",
  887. xds_client(), cds_update_map.size());
  888. }
  889. auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
  890. std::set<std::string> eds_resource_names_seen;
  891. for (auto& p : cds_update_map) {
  892. const char* cluster_name = p.first.c_str();
  893. XdsApi::CdsUpdate& cds_update = p.second;
  894. auto& state = cds_state.subscribed_resources[cluster_name];
  895. if (state != nullptr) state->Finish();
  896. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  897. gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
  898. cluster_name, cds_update.ToString().c_str());
  899. }
  900. // Record the EDS resource names seen.
  901. eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
  902. ? cluster_name
  903. : cds_update.eds_service_name);
  904. // Ignore identical update.
  905. ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
  906. if (cluster_state.update.has_value() &&
  907. *cluster_state.update == cds_update) {
  908. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  909. gpr_log(GPR_INFO,
  910. "[xds_client %p] CDS update identical to current, ignoring.",
  911. xds_client());
  912. }
  913. continue;
  914. }
  915. // Update the cluster state.
  916. cluster_state.update = std::move(cds_update);
  917. // Notify all watchers.
  918. for (const auto& p : cluster_state.watchers) {
  919. p.first->OnClusterChanged(cluster_state.update.value());
  920. }
  921. }
  922. // For any subscribed resource that is not present in the update,
  923. // remove it from the cache and notify watchers that it does not exist.
  924. for (const auto& p : cds_state.subscribed_resources) {
  925. const std::string& cluster_name = p.first;
  926. if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
  927. ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
  928. // If the resource was newly requested but has not yet been received,
  929. // we don't want to generate an error for the watchers, because this CDS
  930. // response may be in reaction to an earlier request that did not yet
  931. // request the new resource, so its absence from the response does not
  932. // necessarily indicate that the resource does not exist.
  933. // For that case, we rely on the request timeout instead.
  934. if (!cluster_state.update.has_value()) continue;
  935. cluster_state.update.reset();
  936. for (const auto& p : cluster_state.watchers) {
  937. p.first->OnResourceDoesNotExist();
  938. }
  939. }
  940. }
  941. // For any EDS resource that is no longer referred to by any CDS
  942. // resources, remove it from the cache and notify watchers that it
  943. // does not exist.
  944. auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
  945. for (const auto& p : eds_state.subscribed_resources) {
  946. const std::string& eds_resource_name = p.first;
  947. if (eds_resource_names_seen.find(eds_resource_name) ==
  948. eds_resource_names_seen.end()) {
  949. EndpointState& endpoint_state =
  950. xds_client()->endpoint_map_[eds_resource_name];
  951. endpoint_state.update.reset();
  952. for (const auto& p : endpoint_state.watchers) {
  953. p.first->OnResourceDoesNotExist();
  954. }
  955. }
  956. }
  957. }
  958. void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
  959. XdsApi::EdsUpdateMap eds_update_map) {
  960. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  961. gpr_log(GPR_INFO,
  962. "[xds_client %p] EDS update received containing %" PRIuPTR
  963. " resources",
  964. xds_client(), eds_update_map.size());
  965. }
  966. auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
  967. for (auto& p : eds_update_map) {
  968. const char* eds_service_name = p.first.c_str();
  969. XdsApi::EdsUpdate& eds_update = p.second;
  970. auto& state = eds_state.subscribed_resources[eds_service_name];
  971. if (state != nullptr) state->Finish();
  972. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  973. gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
  974. eds_service_name, eds_update.ToString().c_str());
  975. }
  976. EndpointState& endpoint_state =
  977. xds_client()->endpoint_map_[eds_service_name];
  978. // Ignore identical update.
  979. if (endpoint_state.update.has_value() &&
  980. *endpoint_state.update == eds_update) {
  981. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  982. gpr_log(GPR_INFO,
  983. "[xds_client %p] EDS update identical to current, ignoring.",
  984. xds_client());
  985. }
  986. continue;
  987. }
  988. // Update the cluster state.
  989. endpoint_state.update = std::move(eds_update);
  990. // Notify all watchers.
  991. for (const auto& p : endpoint_state.watchers) {
  992. p.first->OnEndpointChanged(endpoint_state.update.value());
  993. }
  994. }
  995. }
  996. void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
  997. grpc_error* error) {
  998. AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
  999. {
  1000. MutexLock lock(&ads_calld->xds_client()->mu_);
  1001. ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
  1002. }
  1003. ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
  1004. }
  1005. void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
  1006. grpc_error* error) {
  1007. if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
  1008. // Clean up the sent message.
  1009. grpc_byte_buffer_destroy(send_message_payload_);
  1010. send_message_payload_ = nullptr;
  1011. // Continue to send another pending message if any.
  1012. // TODO(roth): The current code to handle buffered messages has the
  1013. // advantage of sending only the most recent list of resource names for
  1014. // each resource type (no matter how many times that resource type has
  1015. // been requested to send while the current message sending is still
  1016. // pending). But its disadvantage is that we send the requests in fixed
  1017. // order of resource types. We need to fix this if we are seeing some
  1018. // resource type(s) starved due to frequent requests of other resource
  1019. // type(s).
  1020. auto it = buffered_requests_.begin();
  1021. if (it != buffered_requests_.end()) {
  1022. SendMessageLocked(*it);
  1023. buffered_requests_.erase(it);
  1024. }
  1025. }
  1026. GRPC_ERROR_UNREF(error);
  1027. }
  1028. void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
  1029. void* arg, grpc_error* /* error */) {
  1030. AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
  1031. bool done;
  1032. {
  1033. MutexLock lock(&ads_calld->xds_client()->mu_);
  1034. done = ads_calld->OnResponseReceivedLocked();
  1035. }
  1036. if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
  1037. }
  1038. bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
  1039. // Empty payload means the call was cancelled.
  1040. if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
  1041. return true;
  1042. }
  1043. // Read the response.
  1044. grpc_byte_buffer_reader bbr;
  1045. grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
  1046. grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
  1047. grpc_byte_buffer_reader_destroy(&bbr);
  1048. grpc_byte_buffer_destroy(recv_message_payload_);
  1049. recv_message_payload_ = nullptr;
  1050. // Parse and validate the response.
  1051. XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
  1052. chand()->server_, response_slice,
  1053. ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
  1054. ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
  1055. ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
  1056. ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
  1057. grpc_slice_unref_internal(response_slice);
  1058. if (result.type_url.empty()) {
  1059. // Ignore unparsable response.
  1060. gpr_log(GPR_ERROR,
  1061. "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
  1062. xds_client(), grpc_error_string(result.parse_error));
  1063. GRPC_ERROR_UNREF(result.parse_error);
  1064. } else {
  1065. // Update nonce.
  1066. auto& state = state_map_[result.type_url];
  1067. state.nonce = std::move(result.nonce);
  1068. // NACK or ACK the response.
  1069. if (result.parse_error != GRPC_ERROR_NONE) {
  1070. GRPC_ERROR_UNREF(state.error);
  1071. state.error = result.parse_error;
  1072. // NACK unacceptable update.
  1073. gpr_log(GPR_ERROR,
  1074. "[xds_client %p] ADS response invalid for resource type %s "
  1075. "version %s, will NACK: nonce=%s error=%s",
  1076. xds_client(), result.type_url.c_str(), result.version.c_str(),
  1077. state.nonce.c_str(), grpc_error_string(result.parse_error));
  1078. SendMessageLocked(result.type_url);
  1079. } else {
  1080. seen_response_ = true;
  1081. // Accept the ADS response according to the type_url.
  1082. if (result.type_url == XdsApi::kLdsTypeUrl) {
  1083. AcceptLdsUpdate(std::move(result.lds_update_map));
  1084. } else if (result.type_url == XdsApi::kRdsTypeUrl) {
  1085. AcceptRdsUpdate(std::move(result.rds_update_map));
  1086. } else if (result.type_url == XdsApi::kCdsTypeUrl) {
  1087. AcceptCdsUpdate(std::move(result.cds_update_map));
  1088. } else if (result.type_url == XdsApi::kEdsTypeUrl) {
  1089. AcceptEdsUpdate(std::move(result.eds_update_map));
  1090. }
  1091. xds_client()->resource_version_map_[result.type_url] =
  1092. std::move(result.version);
  1093. // ACK the update.
  1094. SendMessageLocked(result.type_url);
  1095. // Start load reporting if needed.
  1096. auto& lrs_call = chand()->lrs_calld_;
  1097. if (lrs_call != nullptr) {
  1098. LrsCallState* lrs_calld = lrs_call->calld();
  1099. if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
  1100. }
  1101. }
  1102. }
  1103. if (xds_client()->shutting_down_) return true;
  1104. // Keep listening for updates.
  1105. grpc_op op;
  1106. memset(&op, 0, sizeof(op));
  1107. op.op = GRPC_OP_RECV_MESSAGE;
  1108. op.data.recv_message.recv_message = &recv_message_payload_;
  1109. op.flags = 0;
  1110. op.reserved = nullptr;
  1111. GPR_ASSERT(call_ != nullptr);
  1112. // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
  1113. const grpc_call_error call_error =
  1114. grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
  1115. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1116. return false;
  1117. }
  1118. void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
  1119. void* arg, grpc_error* error) {
  1120. AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
  1121. {
  1122. MutexLock lock(&ads_calld->xds_client()->mu_);
  1123. ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
  1124. }
  1125. ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
  1126. }
  1127. void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
  1128. grpc_error* error) {
  1129. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1130. char* status_details = grpc_slice_to_c_string(status_details_);
  1131. gpr_log(GPR_INFO,
  1132. "[xds_client %p] ADS call status received. Status = %d, details "
  1133. "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
  1134. xds_client(), status_code_, status_details, chand(), this, call_,
  1135. grpc_error_string(error));
  1136. gpr_free(status_details);
  1137. }
  1138. // Ignore status from a stale call.
  1139. if (IsCurrentCallOnChannel()) {
  1140. // Try to restart the call.
  1141. parent_->OnCallFinishedLocked();
  1142. // Send error to all watchers.
  1143. xds_client()->NotifyOnErrorLocked(
  1144. GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
  1145. }
  1146. GRPC_ERROR_UNREF(error);
  1147. }
  1148. bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
  1149. // If the retryable ADS call is null (which only happens when the xds channel
  1150. // is shutting down), all the ADS calls are stale.
  1151. if (chand()->ads_calld_ == nullptr) return false;
  1152. return this == chand()->ads_calld_->calld();
  1153. }
  1154. std::set<absl::string_view>
  1155. XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
  1156. const std::string& type_url) {
  1157. std::set<absl::string_view> resource_names;
  1158. auto it = state_map_.find(type_url);
  1159. if (it != state_map_.end()) {
  1160. for (auto& p : it->second.subscribed_resources) {
  1161. resource_names.insert(p.first);
  1162. OrphanablePtr<ResourceState>& state = p.second;
  1163. state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
  1164. }
  1165. }
  1166. return resource_names;
  1167. }
  1168. //
  1169. // XdsClient::ChannelState::LrsCallState::Reporter
  1170. //
  1171. void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
  1172. if (next_report_timer_callback_pending_) {
  1173. grpc_timer_cancel(&next_report_timer_);
  1174. }
  1175. }
  1176. void XdsClient::ChannelState::LrsCallState::Reporter::
  1177. ScheduleNextReportLocked() {
  1178. const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
  1179. grpc_timer_init(&next_report_timer_, next_report_time,
  1180. &on_next_report_timer_);
  1181. next_report_timer_callback_pending_ = true;
  1182. }
  1183. void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
  1184. void* arg, grpc_error* error) {
  1185. Reporter* self = static_cast<Reporter*>(arg);
  1186. bool done;
  1187. {
  1188. MutexLock lock(&self->xds_client()->mu_);
  1189. done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
  1190. }
  1191. if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
  1192. }
  1193. bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
  1194. grpc_error* error) {
  1195. next_report_timer_callback_pending_ = false;
  1196. if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
  1197. GRPC_ERROR_UNREF(error);
  1198. return true;
  1199. }
  1200. return SendReportLocked();
  1201. }
  1202. namespace {
  1203. bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
  1204. for (const auto& p : snapshot) {
  1205. const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
  1206. if (!cluster_snapshot.dropped_requests.IsZero()) return false;
  1207. for (const auto& q : cluster_snapshot.locality_stats) {
  1208. const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
  1209. if (!locality_snapshot.IsZero()) return false;
  1210. }
  1211. }
  1212. return true;
  1213. }
  1214. } // namespace
  1215. bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
  1216. // Construct snapshot from all reported stats.
  1217. XdsApi::ClusterLoadReportMap snapshot =
  1218. xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
  1219. parent_->cluster_names_);
  1220. // Skip client load report if the counters were all zero in the last
  1221. // report and they are still zero in this one.
  1222. const bool old_val = last_report_counters_were_zero_;
  1223. last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
  1224. if (old_val && last_report_counters_were_zero_) {
  1225. if (xds_client()->load_report_map_.empty()) {
  1226. parent_->chand()->StopLrsCall();
  1227. return true;
  1228. }
  1229. ScheduleNextReportLocked();
  1230. return false;
  1231. }
  1232. // Create a request that contains the snapshot.
  1233. grpc_slice request_payload_slice =
  1234. xds_client()->api_.CreateLrsRequest(std::move(snapshot));
  1235. parent_->send_message_payload_ =
  1236. grpc_raw_byte_buffer_create(&request_payload_slice, 1);
  1237. grpc_slice_unref_internal(request_payload_slice);
  1238. // Send the report.
  1239. grpc_op op;
  1240. memset(&op, 0, sizeof(op));
  1241. op.op = GRPC_OP_SEND_MESSAGE;
  1242. op.data.send_message.send_message = parent_->send_message_payload_;
  1243. grpc_call_error call_error = grpc_call_start_batch_and_execute(
  1244. parent_->call_, &op, 1, &on_report_done_);
  1245. if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
  1246. gpr_log(GPR_ERROR,
  1247. "[xds_client %p] calld=%p call_error=%d sending client load report",
  1248. xds_client(), this, call_error);
  1249. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1250. }
  1251. return false;
  1252. }
  1253. void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
  1254. void* arg, grpc_error* error) {
  1255. Reporter* self = static_cast<Reporter*>(arg);
  1256. bool done;
  1257. {
  1258. MutexLock lock(&self->xds_client()->mu_);
  1259. done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
  1260. }
  1261. if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
  1262. }
  1263. bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
  1264. grpc_error* error) {
  1265. grpc_byte_buffer_destroy(parent_->send_message_payload_);
  1266. parent_->send_message_payload_ = nullptr;
  1267. // If there are no more registered stats to report, cancel the call.
  1268. if (xds_client()->load_report_map_.empty()) {
  1269. parent_->chand()->StopLrsCall();
  1270. GRPC_ERROR_UNREF(error);
  1271. return true;
  1272. }
  1273. if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
  1274. GRPC_ERROR_UNREF(error);
  1275. // If this reporter is no longer the current one on the call, the reason
  1276. // might be that it was orphaned for a new one due to config update.
  1277. if (!IsCurrentReporterOnCall()) {
  1278. parent_->MaybeStartReportingLocked();
  1279. }
  1280. return true;
  1281. }
  1282. ScheduleNextReportLocked();
  1283. return false;
  1284. }
  1285. //
  1286. // XdsClient::ChannelState::LrsCallState
  1287. //
  1288. XdsClient::ChannelState::LrsCallState::LrsCallState(
  1289. RefCountedPtr<RetryableCall<LrsCallState>> parent)
  1290. : InternallyRefCounted<LrsCallState>(
  1291. GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
  1292. ? "LrsCallState"
  1293. : nullptr),
  1294. parent_(std::move(parent)) {
  1295. // Init the LRS call. Note that the call will progress every time there's
  1296. // activity in xds_client()->interested_parties_, which is comprised of
  1297. // the polling entities from client_channel.
  1298. GPR_ASSERT(xds_client() != nullptr);
  1299. const auto& method =
  1300. chand()->server_.ShouldUseV3()
  1301. ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
  1302. : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
  1303. call_ = grpc_channel_create_pollset_set_call(
  1304. chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
  1305. xds_client()->interested_parties_, method, nullptr,
  1306. GRPC_MILLIS_INF_FUTURE, nullptr);
  1307. GPR_ASSERT(call_ != nullptr);
  1308. // Init the request payload.
  1309. grpc_slice request_payload_slice =
  1310. xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
  1311. send_message_payload_ =
  1312. grpc_raw_byte_buffer_create(&request_payload_slice, 1);
  1313. grpc_slice_unref_internal(request_payload_slice);
  1314. // Init other data associated with the LRS call.
  1315. grpc_metadata_array_init(&initial_metadata_recv_);
  1316. grpc_metadata_array_init(&trailing_metadata_recv_);
  1317. // Start the call.
  1318. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1319. gpr_log(GPR_INFO,
  1320. "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
  1321. "call: %p)",
  1322. xds_client(), chand(), this, call_);
  1323. }
  1324. // Create the ops.
  1325. grpc_call_error call_error;
  1326. grpc_op ops[3];
  1327. memset(ops, 0, sizeof(ops));
  1328. // Op: send initial metadata.
  1329. grpc_op* op = ops;
  1330. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  1331. op->data.send_initial_metadata.count = 0;
  1332. op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
  1333. GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
  1334. op->reserved = nullptr;
  1335. op++;
  1336. // Op: send request message.
  1337. GPR_ASSERT(send_message_payload_ != nullptr);
  1338. op->op = GRPC_OP_SEND_MESSAGE;
  1339. op->data.send_message.send_message = send_message_payload_;
  1340. op->flags = 0;
  1341. op->reserved = nullptr;
  1342. op++;
  1343. Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
  1344. GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
  1345. grpc_schedule_on_exec_ctx);
  1346. call_error = grpc_call_start_batch_and_execute(
  1347. call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
  1348. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1349. // Op: recv initial metadata.
  1350. op = ops;
  1351. op->op = GRPC_OP_RECV_INITIAL_METADATA;
  1352. op->data.recv_initial_metadata.recv_initial_metadata =
  1353. &initial_metadata_recv_;
  1354. op->flags = 0;
  1355. op->reserved = nullptr;
  1356. op++;
  1357. // Op: recv response.
  1358. op->op = GRPC_OP_RECV_MESSAGE;
  1359. op->data.recv_message.recv_message = &recv_message_payload_;
  1360. op->flags = 0;
  1361. op->reserved = nullptr;
  1362. op++;
  1363. Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
  1364. GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
  1365. grpc_schedule_on_exec_ctx);
  1366. call_error = grpc_call_start_batch_and_execute(
  1367. call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
  1368. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1369. // Op: recv server status.
  1370. op = ops;
  1371. op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  1372. op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
  1373. op->data.recv_status_on_client.status = &status_code_;
  1374. op->data.recv_status_on_client.status_details = &status_details_;
  1375. op->flags = 0;
  1376. op->reserved = nullptr;
  1377. op++;
  1378. // This callback signals the end of the call, so it relies on the initial
  1379. // ref instead of a new ref. When it's invoked, it's the initial ref that is
  1380. // unreffed.
  1381. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
  1382. grpc_schedule_on_exec_ctx);
  1383. call_error = grpc_call_start_batch_and_execute(
  1384. call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
  1385. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1386. }
  1387. XdsClient::ChannelState::LrsCallState::~LrsCallState() {
  1388. grpc_metadata_array_destroy(&initial_metadata_recv_);
  1389. grpc_metadata_array_destroy(&trailing_metadata_recv_);
  1390. grpc_byte_buffer_destroy(send_message_payload_);
  1391. grpc_byte_buffer_destroy(recv_message_payload_);
  1392. grpc_slice_unref_internal(status_details_);
  1393. GPR_ASSERT(call_ != nullptr);
  1394. grpc_call_unref(call_);
  1395. }
  1396. void XdsClient::ChannelState::LrsCallState::Orphan() {
  1397. reporter_.reset();
  1398. GPR_ASSERT(call_ != nullptr);
  1399. // If we are here because xds_client wants to cancel the call,
  1400. // on_status_received_ will complete the cancellation and clean up. Otherwise,
  1401. // we are here because xds_client has to orphan a failed call, then the
  1402. // following cancellation will be a no-op.
  1403. grpc_call_cancel_internal(call_);
  1404. // Note that the initial ref is hold by on_status_received_. So the
  1405. // corresponding unref happens in on_status_received_ instead of here.
  1406. }
  1407. void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
  1408. // Don't start again if already started.
  1409. if (reporter_ != nullptr) return;
  1410. // Don't start if the previous send_message op (of the initial request or the
  1411. // last report of the previous reporter) hasn't completed.
  1412. if (send_message_payload_ != nullptr) return;
  1413. // Don't start if no LRS response has arrived.
  1414. if (!seen_response()) return;
  1415. // Don't start if the ADS call hasn't received any valid response. Note that
  1416. // this must be the first channel because it is the current channel but its
  1417. // ADS call hasn't seen any response.
  1418. if (chand()->ads_calld_ == nullptr ||
  1419. chand()->ads_calld_->calld() == nullptr ||
  1420. !chand()->ads_calld_->calld()->seen_response()) {
  1421. return;
  1422. }
  1423. // Start reporting.
  1424. reporter_ = MakeOrphanable<Reporter>(
  1425. Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
  1426. }
  1427. void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
  1428. void* arg, grpc_error* /*error*/) {
  1429. LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
  1430. {
  1431. MutexLock lock(&lrs_calld->xds_client()->mu_);
  1432. lrs_calld->OnInitialRequestSentLocked();
  1433. }
  1434. lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
  1435. }
  1436. void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
  1437. // Clear the send_message_payload_.
  1438. grpc_byte_buffer_destroy(send_message_payload_);
  1439. send_message_payload_ = nullptr;
  1440. MaybeStartReportingLocked();
  1441. }
  1442. void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
  1443. void* arg, grpc_error* /*error*/) {
  1444. LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
  1445. bool done;
  1446. {
  1447. MutexLock lock(&lrs_calld->xds_client()->mu_);
  1448. done = lrs_calld->OnResponseReceivedLocked();
  1449. }
  1450. if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
  1451. }
  1452. bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
  1453. // Empty payload means the call was cancelled.
  1454. if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
  1455. return true;
  1456. }
  1457. // Read the response.
  1458. grpc_byte_buffer_reader bbr;
  1459. grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
  1460. grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
  1461. grpc_byte_buffer_reader_destroy(&bbr);
  1462. grpc_byte_buffer_destroy(recv_message_payload_);
  1463. recv_message_payload_ = nullptr;
  1464. // This anonymous lambda is a hack to avoid the usage of goto.
  1465. [&]() {
  1466. // Parse the response.
  1467. bool send_all_clusters = false;
  1468. std::set<std::string> new_cluster_names;
  1469. grpc_millis new_load_reporting_interval;
  1470. grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
  1471. response_slice, &send_all_clusters, &new_cluster_names,
  1472. &new_load_reporting_interval);
  1473. if (parse_error != GRPC_ERROR_NONE) {
  1474. gpr_log(GPR_ERROR,
  1475. "[xds_client %p] LRS response parsing failed. error=%s",
  1476. xds_client(), grpc_error_string(parse_error));
  1477. GRPC_ERROR_UNREF(parse_error);
  1478. return;
  1479. }
  1480. seen_response_ = true;
  1481. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1482. gpr_log(
  1483. GPR_INFO,
  1484. "[xds_client %p] LRS response received, %" PRIuPTR
  1485. " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
  1486. "ms",
  1487. xds_client(), new_cluster_names.size(), send_all_clusters,
  1488. new_load_reporting_interval);
  1489. size_t i = 0;
  1490. for (const auto& name : new_cluster_names) {
  1491. gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
  1492. xds_client(), i++, name.c_str());
  1493. }
  1494. }
  1495. if (new_load_reporting_interval <
  1496. GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
  1497. new_load_reporting_interval =
  1498. GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
  1499. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1500. gpr_log(GPR_INFO,
  1501. "[xds_client %p] Increased load_report_interval to minimum "
  1502. "value %dms",
  1503. xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
  1504. }
  1505. }
  1506. // Ignore identical update.
  1507. if (send_all_clusters == send_all_clusters_ &&
  1508. cluster_names_ == new_cluster_names &&
  1509. load_reporting_interval_ == new_load_reporting_interval) {
  1510. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1511. gpr_log(GPR_INFO,
  1512. "[xds_client %p] Incoming LRS response identical to current, "
  1513. "ignoring.",
  1514. xds_client());
  1515. }
  1516. return;
  1517. }
  1518. // Stop current load reporting (if any) to adopt the new config.
  1519. reporter_.reset();
  1520. // Record the new config.
  1521. send_all_clusters_ = send_all_clusters;
  1522. cluster_names_ = std::move(new_cluster_names);
  1523. load_reporting_interval_ = new_load_reporting_interval;
  1524. // Try starting sending load report.
  1525. MaybeStartReportingLocked();
  1526. }();
  1527. grpc_slice_unref_internal(response_slice);
  1528. if (xds_client()->shutting_down_) return true;
  1529. // Keep listening for LRS config updates.
  1530. grpc_op op;
  1531. memset(&op, 0, sizeof(op));
  1532. op.op = GRPC_OP_RECV_MESSAGE;
  1533. op.data.recv_message.recv_message = &recv_message_payload_;
  1534. op.flags = 0;
  1535. op.reserved = nullptr;
  1536. GPR_ASSERT(call_ != nullptr);
  1537. // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
  1538. const grpc_call_error call_error =
  1539. grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
  1540. GPR_ASSERT(GRPC_CALL_OK == call_error);
  1541. return false;
  1542. }
  1543. void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
  1544. void* arg, grpc_error* error) {
  1545. LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
  1546. {
  1547. MutexLock lock(&lrs_calld->xds_client()->mu_);
  1548. lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
  1549. }
  1550. lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
  1551. }
  1552. void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
  1553. grpc_error* error) {
  1554. GPR_ASSERT(call_ != nullptr);
  1555. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1556. char* status_details = grpc_slice_to_c_string(status_details_);
  1557. gpr_log(GPR_INFO,
  1558. "[xds_client %p] LRS call status received. Status = %d, details "
  1559. "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
  1560. xds_client(), status_code_, status_details, chand(), this, call_,
  1561. grpc_error_string(error));
  1562. gpr_free(status_details);
  1563. }
  1564. // Ignore status from a stale call.
  1565. if (IsCurrentCallOnChannel()) {
  1566. GPR_ASSERT(!xds_client()->shutting_down_);
  1567. // Try to restart the call.
  1568. parent_->OnCallFinishedLocked();
  1569. }
  1570. GRPC_ERROR_UNREF(error);
  1571. }
  1572. bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
  1573. // If the retryable LRS call is null (which only happens when the xds channel
  1574. // is shutting down), all the LRS calls are stale.
  1575. if (chand()->lrs_calld_ == nullptr) return false;
  1576. return this == chand()->lrs_calld_->calld();
  1577. }
  1578. //
  1579. // XdsClient
  1580. //
  1581. namespace {
  1582. grpc_millis GetRequestTimeout() {
  1583. return grpc_channel_args_find_integer(
  1584. g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
  1585. {15000, 0, INT_MAX});
  1586. }
  1587. } // namespace
  1588. XdsClient::XdsClient(grpc_error** error)
  1589. : DualRefCounted<XdsClient>(
  1590. GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
  1591. : nullptr),
  1592. request_timeout_(GetRequestTimeout()),
  1593. interested_parties_(grpc_pollset_set_create()),
  1594. bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace,
  1595. g_fallback_bootstrap_config, error)),
  1596. certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
  1597. bootstrap_ == nullptr
  1598. ? CertificateProviderStore::PluginDefinitionMap()
  1599. : bootstrap_->certificate_providers())),
  1600. api_(this, &grpc_xds_client_trace,
  1601. bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
  1602. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1603. gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
  1604. }
  1605. if (*error != GRPC_ERROR_NONE) {
  1606. gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
  1607. this, grpc_error_string(*error));
  1608. return;
  1609. }
  1610. // Create ChannelState object.
  1611. chand_ = MakeOrphanable<ChannelState>(
  1612. WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
  1613. }
  1614. XdsClient::~XdsClient() {
  1615. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1616. gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
  1617. }
  1618. grpc_pollset_set_destroy(interested_parties_);
  1619. }
  1620. void XdsClient::AddChannelzLinkage(
  1621. channelz::ChannelNode* parent_channelz_node) {
  1622. channelz::ChannelNode* xds_channelz_node =
  1623. grpc_channel_get_channelz_node(chand_->channel());
  1624. if (xds_channelz_node != nullptr) {
  1625. parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
  1626. }
  1627. }
  1628. void XdsClient::RemoveChannelzLinkage(
  1629. channelz::ChannelNode* parent_channelz_node) {
  1630. channelz::ChannelNode* xds_channelz_node =
  1631. grpc_channel_get_channelz_node(chand_->channel());
  1632. if (xds_channelz_node != nullptr) {
  1633. parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
  1634. }
  1635. }
  1636. void XdsClient::Orphan() {
  1637. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1638. gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
  1639. }
  1640. {
  1641. MutexLock lock(g_mu);
  1642. if (g_xds_client == this) g_xds_client = nullptr;
  1643. }
  1644. {
  1645. MutexLock lock(&mu_);
  1646. shutting_down_ = true;
  1647. // Orphan ChannelState object.
  1648. chand_.reset();
  1649. // We do not clear cluster_map_ and endpoint_map_ if the xds client was
  1650. // created by the XdsResolver because the maps contain refs for watchers
  1651. // which in turn hold refs to the loadbalancing policies. At this point, it
  1652. // is possible for ADS calls to be in progress. Unreffing the loadbalancing
  1653. // policies before those calls are done would lead to issues such as
  1654. // https://github.com/grpc/grpc/issues/20928.
  1655. if (!listener_map_.empty()) {
  1656. cluster_map_.clear();
  1657. endpoint_map_.clear();
  1658. }
  1659. }
  1660. }
  1661. void XdsClient::WatchListenerData(
  1662. absl::string_view listener_name,
  1663. std::unique_ptr<ListenerWatcherInterface> watcher) {
  1664. std::string listener_name_str = std::string(listener_name);
  1665. MutexLock lock(&mu_);
  1666. ListenerState& listener_state = listener_map_[listener_name_str];
  1667. ListenerWatcherInterface* w = watcher.get();
  1668. listener_state.watchers[w] = std::move(watcher);
  1669. // If we've already received an LDS update, notify the new watcher
  1670. // immediately.
  1671. if (listener_state.update.has_value()) {
  1672. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1673. gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
  1674. this, listener_name_str.c_str());
  1675. }
  1676. w->OnListenerChanged(*listener_state.update);
  1677. }
  1678. chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
  1679. }
  1680. void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
  1681. ListenerWatcherInterface* watcher,
  1682. bool delay_unsubscription) {
  1683. MutexLock lock(&mu_);
  1684. if (shutting_down_) return;
  1685. std::string listener_name_str = std::string(listener_name);
  1686. ListenerState& listener_state = listener_map_[listener_name_str];
  1687. auto it = listener_state.watchers.find(watcher);
  1688. if (it != listener_state.watchers.end()) {
  1689. listener_state.watchers.erase(it);
  1690. if (listener_state.watchers.empty()) {
  1691. listener_map_.erase(listener_name_str);
  1692. chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
  1693. delay_unsubscription);
  1694. }
  1695. }
  1696. }
  1697. void XdsClient::WatchRouteConfigData(
  1698. absl::string_view route_config_name,
  1699. std::unique_ptr<RouteConfigWatcherInterface> watcher) {
  1700. std::string route_config_name_str = std::string(route_config_name);
  1701. MutexLock lock(&mu_);
  1702. RouteConfigState& route_config_state =
  1703. route_config_map_[route_config_name_str];
  1704. RouteConfigWatcherInterface* w = watcher.get();
  1705. route_config_state.watchers[w] = std::move(watcher);
  1706. // If we've already received an RDS update, notify the new watcher
  1707. // immediately.
  1708. if (route_config_state.update.has_value()) {
  1709. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1710. gpr_log(GPR_INFO,
  1711. "[xds_client %p] returning cached route config data for %s", this,
  1712. route_config_name_str.c_str());
  1713. }
  1714. w->OnRouteConfigChanged(*route_config_state.update);
  1715. }
  1716. chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
  1717. }
  1718. void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
  1719. RouteConfigWatcherInterface* watcher,
  1720. bool delay_unsubscription) {
  1721. MutexLock lock(&mu_);
  1722. if (shutting_down_) return;
  1723. std::string route_config_name_str = std::string(route_config_name);
  1724. RouteConfigState& route_config_state =
  1725. route_config_map_[route_config_name_str];
  1726. auto it = route_config_state.watchers.find(watcher);
  1727. if (it != route_config_state.watchers.end()) {
  1728. route_config_state.watchers.erase(it);
  1729. if (route_config_state.watchers.empty()) {
  1730. route_config_map_.erase(route_config_name_str);
  1731. chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
  1732. delay_unsubscription);
  1733. }
  1734. }
  1735. }
  1736. void XdsClient::WatchClusterData(
  1737. absl::string_view cluster_name,
  1738. std::unique_ptr<ClusterWatcherInterface> watcher) {
  1739. std::string cluster_name_str = std::string(cluster_name);
  1740. MutexLock lock(&mu_);
  1741. ClusterState& cluster_state = cluster_map_[cluster_name_str];
  1742. ClusterWatcherInterface* w = watcher.get();
  1743. cluster_state.watchers[w] = std::move(watcher);
  1744. // If we've already received a CDS update, notify the new watcher
  1745. // immediately.
  1746. if (cluster_state.update.has_value()) {
  1747. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1748. gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
  1749. this, cluster_name_str.c_str());
  1750. }
  1751. w->OnClusterChanged(cluster_state.update.value());
  1752. }
  1753. chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
  1754. }
  1755. void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
  1756. ClusterWatcherInterface* watcher,
  1757. bool delay_unsubscription) {
  1758. MutexLock lock(&mu_);
  1759. if (shutting_down_) return;
  1760. std::string cluster_name_str = std::string(cluster_name);
  1761. ClusterState& cluster_state = cluster_map_[cluster_name_str];
  1762. auto it = cluster_state.watchers.find(watcher);
  1763. if (it != cluster_state.watchers.end()) {
  1764. cluster_state.watchers.erase(it);
  1765. if (cluster_state.watchers.empty()) {
  1766. cluster_map_.erase(cluster_name_str);
  1767. chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
  1768. delay_unsubscription);
  1769. }
  1770. }
  1771. }
  1772. void XdsClient::WatchEndpointData(
  1773. absl::string_view eds_service_name,
  1774. std::unique_ptr<EndpointWatcherInterface> watcher) {
  1775. std::string eds_service_name_str = std::string(eds_service_name);
  1776. MutexLock lock(&mu_);
  1777. EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
  1778. EndpointWatcherInterface* w = watcher.get();
  1779. endpoint_state.watchers[w] = std::move(watcher);
  1780. // If we've already received an EDS update, notify the new watcher
  1781. // immediately.
  1782. if (endpoint_state.update.has_value()) {
  1783. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1784. gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
  1785. this, eds_service_name_str.c_str());
  1786. }
  1787. w->OnEndpointChanged(endpoint_state.update.value());
  1788. }
  1789. chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
  1790. }
  1791. void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
  1792. EndpointWatcherInterface* watcher,
  1793. bool delay_unsubscription) {
  1794. MutexLock lock(&mu_);
  1795. if (shutting_down_) return;
  1796. std::string eds_service_name_str = std::string(eds_service_name);
  1797. EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
  1798. auto it = endpoint_state.watchers.find(watcher);
  1799. if (it != endpoint_state.watchers.end()) {
  1800. endpoint_state.watchers.erase(it);
  1801. if (endpoint_state.watchers.empty()) {
  1802. endpoint_map_.erase(eds_service_name_str);
  1803. chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
  1804. delay_unsubscription);
  1805. }
  1806. }
  1807. }
  1808. RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
  1809. absl::string_view lrs_server, absl::string_view cluster_name,
  1810. absl::string_view eds_service_name) {
  1811. // TODO(roth): When we add support for direct federation, use the
  1812. // server name specified in lrs_server.
  1813. auto key =
  1814. std::make_pair(std::string(cluster_name), std::string(eds_service_name));
  1815. MutexLock lock(&mu_);
  1816. // We jump through some hoops here to make sure that the absl::string_views
  1817. // stored in the XdsClusterDropStats object point to the strings
  1818. // in the load_report_map_ key, so that they have the same lifetime.
  1819. auto it = load_report_map_
  1820. .emplace(std::make_pair(std::move(key), LoadReportState()))
  1821. .first;
  1822. LoadReportState& load_report_state = it->second;
  1823. RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
  1824. if (load_report_state.drop_stats != nullptr) {
  1825. cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
  1826. }
  1827. if (cluster_drop_stats == nullptr) {
  1828. if (load_report_state.drop_stats != nullptr) {
  1829. load_report_state.deleted_drop_stats +=
  1830. load_report_state.drop_stats->GetSnapshotAndReset();
  1831. }
  1832. cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
  1833. Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
  1834. it->first.first /*cluster_name*/,
  1835. it->first.second /*eds_service_name*/);
  1836. load_report_state.drop_stats = cluster_drop_stats.get();
  1837. }
  1838. chand_->MaybeStartLrsCall();
  1839. return cluster_drop_stats;
  1840. }
  1841. void XdsClient::RemoveClusterDropStats(
  1842. absl::string_view /*lrs_server*/, absl::string_view cluster_name,
  1843. absl::string_view eds_service_name,
  1844. XdsClusterDropStats* cluster_drop_stats) {
  1845. MutexLock lock(&mu_);
  1846. // TODO(roth): When we add support for direct federation, use the
  1847. // server name specified in lrs_server.
  1848. auto it = load_report_map_.find(
  1849. std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
  1850. if (it == load_report_map_.end()) return;
  1851. LoadReportState& load_report_state = it->second;
  1852. if (load_report_state.drop_stats == cluster_drop_stats) {
  1853. // Record final snapshot in deleted_drop_stats, which will be
  1854. // added to the next load report.
  1855. load_report_state.deleted_drop_stats +=
  1856. load_report_state.drop_stats->GetSnapshotAndReset();
  1857. load_report_state.drop_stats = nullptr;
  1858. }
  1859. }
  1860. RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
  1861. absl::string_view lrs_server, absl::string_view cluster_name,
  1862. absl::string_view eds_service_name,
  1863. RefCountedPtr<XdsLocalityName> locality) {
  1864. // TODO(roth): When we add support for direct federation, use the
  1865. // server name specified in lrs_server.
  1866. auto key =
  1867. std::make_pair(std::string(cluster_name), std::string(eds_service_name));
  1868. MutexLock lock(&mu_);
  1869. // We jump through some hoops here to make sure that the absl::string_views
  1870. // stored in the XdsClusterLocalityStats object point to the strings
  1871. // in the load_report_map_ key, so that they have the same lifetime.
  1872. auto it = load_report_map_
  1873. .emplace(std::make_pair(std::move(key), LoadReportState()))
  1874. .first;
  1875. LoadReportState& load_report_state = it->second;
  1876. LoadReportState::LocalityState& locality_state =
  1877. load_report_state.locality_stats[locality];
  1878. RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
  1879. if (locality_state.locality_stats != nullptr) {
  1880. cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
  1881. }
  1882. if (cluster_locality_stats == nullptr) {
  1883. if (locality_state.locality_stats != nullptr) {
  1884. locality_state.deleted_locality_stats +=
  1885. locality_state.locality_stats->GetSnapshotAndReset();
  1886. }
  1887. cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
  1888. Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
  1889. it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
  1890. std::move(locality));
  1891. locality_state.locality_stats = cluster_locality_stats.get();
  1892. }
  1893. chand_->MaybeStartLrsCall();
  1894. return cluster_locality_stats;
  1895. }
  1896. void XdsClient::RemoveClusterLocalityStats(
  1897. absl::string_view /*lrs_server*/, absl::string_view cluster_name,
  1898. absl::string_view eds_service_name,
  1899. const RefCountedPtr<XdsLocalityName>& locality,
  1900. XdsClusterLocalityStats* cluster_locality_stats) {
  1901. MutexLock lock(&mu_);
  1902. // TODO(roth): When we add support for direct federation, use the
  1903. // server name specified in lrs_server.
  1904. auto it = load_report_map_.find(
  1905. std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
  1906. if (it == load_report_map_.end()) return;
  1907. LoadReportState& load_report_state = it->second;
  1908. auto locality_it = load_report_state.locality_stats.find(locality);
  1909. if (locality_it == load_report_state.locality_stats.end()) return;
  1910. LoadReportState::LocalityState& locality_state = locality_it->second;
  1911. if (locality_state.locality_stats == cluster_locality_stats) {
  1912. // Record final snapshot in deleted_locality_stats, which will be
  1913. // added to the next load report.
  1914. locality_state.deleted_locality_stats +=
  1915. locality_state.locality_stats->GetSnapshotAndReset();
  1916. locality_state.locality_stats = nullptr;
  1917. }
  1918. }
  1919. void XdsClient::ResetBackoff() {
  1920. MutexLock lock(&mu_);
  1921. if (chand_ != nullptr) {
  1922. grpc_channel_reset_connect_backoff(chand_->channel());
  1923. }
  1924. }
  1925. void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
  1926. for (const auto& p : listener_map_) {
  1927. const ListenerState& listener_state = p.second;
  1928. for (const auto& p : listener_state.watchers) {
  1929. p.first->OnError(GRPC_ERROR_REF(error));
  1930. }
  1931. }
  1932. for (const auto& p : route_config_map_) {
  1933. const RouteConfigState& route_config_state = p.second;
  1934. for (const auto& p : route_config_state.watchers) {
  1935. p.first->OnError(GRPC_ERROR_REF(error));
  1936. }
  1937. }
  1938. for (const auto& p : cluster_map_) {
  1939. const ClusterState& cluster_state = p.second;
  1940. for (const auto& p : cluster_state.watchers) {
  1941. p.first->OnError(GRPC_ERROR_REF(error));
  1942. }
  1943. }
  1944. for (const auto& p : endpoint_map_) {
  1945. const EndpointState& endpoint_state = p.second;
  1946. for (const auto& p : endpoint_state.watchers) {
  1947. p.first->OnError(GRPC_ERROR_REF(error));
  1948. }
  1949. }
  1950. GRPC_ERROR_UNREF(error);
  1951. }
  1952. XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
  1953. bool send_all_clusters, const std::set<std::string>& clusters) {
  1954. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1955. gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
  1956. }
  1957. XdsApi::ClusterLoadReportMap snapshot_map;
  1958. for (auto load_report_it = load_report_map_.begin();
  1959. load_report_it != load_report_map_.end();) {
  1960. // Cluster key is cluster and EDS service name.
  1961. const auto& cluster_key = load_report_it->first;
  1962. LoadReportState& load_report = load_report_it->second;
  1963. // If the CDS response for a cluster indicates to use LRS but the
  1964. // LRS server does not say that it wants reports for this cluster,
  1965. // then we'll have stats objects here whose data we're not going to
  1966. // include in the load report. However, we still need to clear out
  1967. // the data from the stats objects, so that if the LRS server starts
  1968. // asking for the data in the future, we don't incorrectly include
  1969. // data from previous reporting intervals in that future report.
  1970. const bool record_stats =
  1971. send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
  1972. XdsApi::ClusterLoadReport snapshot;
  1973. // Aggregate drop stats.
  1974. snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
  1975. if (load_report.drop_stats != nullptr) {
  1976. snapshot.dropped_requests +=
  1977. load_report.drop_stats->GetSnapshotAndReset();
  1978. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1979. gpr_log(GPR_INFO,
  1980. "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
  1981. this, cluster_key.first.c_str(), cluster_key.second.c_str(),
  1982. load_report.drop_stats);
  1983. }
  1984. }
  1985. // Aggregate locality stats.
  1986. for (auto it = load_report.locality_stats.begin();
  1987. it != load_report.locality_stats.end();) {
  1988. const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
  1989. auto& locality_state = it->second;
  1990. XdsClusterLocalityStats::Snapshot& locality_snapshot =
  1991. snapshot.locality_stats[locality_name];
  1992. locality_snapshot = std::move(locality_state.deleted_locality_stats);
  1993. if (locality_state.locality_stats != nullptr) {
  1994. locality_snapshot +=
  1995. locality_state.locality_stats->GetSnapshotAndReset();
  1996. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
  1997. gpr_log(GPR_INFO,
  1998. "[xds_client %p] cluster=%s eds_service_name=%s "
  1999. "locality=%s locality_stats=%p",
  2000. this, cluster_key.first.c_str(), cluster_key.second.c_str(),
  2001. locality_name->AsHumanReadableString().c_str(),
  2002. locality_state.locality_stats);
  2003. }
  2004. }
  2005. // If the only thing left in this entry was final snapshots from
  2006. // deleted locality stats objects, remove the entry.
  2007. if (locality_state.locality_stats == nullptr) {
  2008. it = load_report.locality_stats.erase(it);
  2009. } else {
  2010. ++it;
  2011. }
  2012. }
  2013. // Compute load report interval.
  2014. const grpc_millis now = ExecCtx::Get()->Now();
  2015. snapshot.load_report_interval = now - load_report.last_report_time;
  2016. load_report.last_report_time = now;
  2017. // Record snapshot.
  2018. if (record_stats) {
  2019. snapshot_map[cluster_key] = std::move(snapshot);
  2020. }
  2021. // If the only thing left in this entry was final snapshots from
  2022. // deleted stats objects, remove the entry.
  2023. if (load_report.locality_stats.empty() &&
  2024. load_report.drop_stats == nullptr) {
  2025. load_report_it = load_report_map_.erase(load_report_it);
  2026. } else {
  2027. ++load_report_it;
  2028. }
  2029. }
  2030. return snapshot_map;
  2031. }
  2032. //
  2033. // accessors for global state
  2034. //
  2035. void XdsClientGlobalInit() {
  2036. g_mu = new Mutex;
  2037. XdsHttpFilterRegistry::Init();
  2038. }
  2039. void XdsClientGlobalShutdown() {
  2040. delete g_mu;
  2041. g_mu = nullptr;
  2042. gpr_free(g_fallback_bootstrap_config);
  2043. g_fallback_bootstrap_config = nullptr;
  2044. XdsHttpFilterRegistry::Shutdown();
  2045. }
  2046. RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
  2047. MutexLock lock(g_mu);
  2048. if (g_xds_client != nullptr) {
  2049. auto xds_client = g_xds_client->RefIfNonZero();
  2050. if (xds_client != nullptr) return xds_client;
  2051. }
  2052. auto xds_client = MakeRefCounted<XdsClient>(error);
  2053. g_xds_client = xds_client.get();
  2054. return xds_client;
  2055. }
  2056. namespace internal {
  2057. void SetXdsChannelArgsForTest(grpc_channel_args* args) {
  2058. MutexLock lock(g_mu);
  2059. g_channel_args = args;
  2060. }
  2061. void UnsetGlobalXdsClientForTest() {
  2062. MutexLock lock(g_mu);
  2063. g_xds_client = nullptr;
  2064. }
  2065. void SetXdsFallbackBootstrapConfig(const char* config) {
  2066. MutexLock lock(g_mu);
  2067. gpr_free(g_fallback_bootstrap_config);
  2068. g_fallback_bootstrap_config = gpr_strdup(config);
  2069. }
  2070. } // namespace internal
  2071. } // namespace grpc_core