ev_epoll1_linux.cc 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc/support/port_platform.h>
  19. #include "src/core/lib/iomgr/port.h"
  20. #include <grpc/support/log.h>
  21. /* This polling engine is only relevant on linux kernels supporting epoll
  22. epoll_create() or epoll_create1() */
  23. #ifdef GRPC_LINUX_EPOLL
  24. #include "src/core/lib/iomgr/ev_epoll1_linux.h"
  25. #include <assert.h>
  26. #include <errno.h>
  27. #include <fcntl.h>
  28. #include <limits.h>
  29. #include <poll.h>
  30. #include <pthread.h>
  31. #include <string.h>
  32. #include <sys/epoll.h>
  33. #include <sys/socket.h>
  34. #include <unistd.h>
  35. #include <grpc/support/alloc.h>
  36. #include <grpc/support/cpu.h>
  37. #include <grpc/support/string_util.h>
  38. #include "src/core/lib/debug/stats.h"
  39. #include "src/core/lib/gpr/string.h"
  40. #include "src/core/lib/gpr/tls.h"
  41. #include "src/core/lib/gpr/useful.h"
  42. #include "src/core/lib/gprpp/manual_constructor.h"
  43. #include "src/core/lib/iomgr/block_annotate.h"
  44. #include "src/core/lib/iomgr/ev_posix.h"
  45. #include "src/core/lib/iomgr/iomgr_internal.h"
  46. #include "src/core/lib/iomgr/lockfree_event.h"
  47. #include "src/core/lib/iomgr/wakeup_fd_posix.h"
  48. #include "src/core/lib/profiling/timers.h"
  49. static grpc_wakeup_fd global_wakeup_fd;
  50. /*******************************************************************************
  51. * Singleton epoll set related fields
  52. */
  53. #define MAX_EPOLL_EVENTS 100
  54. #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
  55. /* NOTE ON SYNCHRONIZATION:
  56. * - Fields in this struct are only modified by the designated poller. Hence
  57. * there is no need for any locks to protect the struct.
  58. * - num_events and cursor fields have to be of atomic type to provide memory
  59. * visibility guarantees only. i.e In case of multiple pollers, the designated
  60. * polling thread keeps changing; the thread that wrote these values may be
  61. * different from the thread reading the values
  62. */
  63. typedef struct epoll_set {
  64. int epfd;
  65. /* The epoll_events after the last call to epoll_wait() */
  66. struct epoll_event events[MAX_EPOLL_EVENTS];
  67. /* The number of epoll_events after the last call to epoll_wait() */
  68. gpr_atm num_events;
  69. /* Index of the first event in epoll_events that has to be processed. This
  70. * field is only valid if num_events > 0 */
  71. gpr_atm cursor;
  72. } epoll_set;
  73. /* The global singleton epoll set */
  74. static epoll_set g_epoll_set;
  75. static int epoll_create_and_cloexec() {
  76. #ifdef GRPC_LINUX_EPOLL_CREATE1
  77. int fd = epoll_create1(EPOLL_CLOEXEC);
  78. if (fd < 0) {
  79. gpr_log(GPR_ERROR, "epoll_create1 unavailable");
  80. }
  81. #else
  82. int fd = epoll_create(MAX_EPOLL_EVENTS);
  83. if (fd < 0) {
  84. gpr_log(GPR_ERROR, "epoll_create unavailable");
  85. } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
  86. gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
  87. return -1;
  88. }
  89. #endif
  90. return fd;
  91. }
  92. /* Must be called *only* once */
  93. static bool epoll_set_init() {
  94. g_epoll_set.epfd = epoll_create_and_cloexec();
  95. if (g_epoll_set.epfd < 0) {
  96. return false;
  97. }
  98. gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
  99. gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
  100. gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
  101. return true;
  102. }
  103. /* epoll_set_init() MUST be called before calling this. */
  104. static void epoll_set_shutdown() {
  105. if (g_epoll_set.epfd >= 0) {
  106. close(g_epoll_set.epfd);
  107. g_epoll_set.epfd = -1;
  108. }
  109. }
  110. /*******************************************************************************
  111. * Fd Declarations
  112. */
  113. struct grpc_fd {
  114. int fd;
  115. grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
  116. grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
  117. grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
  118. struct grpc_fd* freelist_next;
  119. /* The pollset that last noticed that the fd is readable. The actual type
  120. * stored in this is (grpc_pollset *) */
  121. gpr_atm read_notifier_pollset;
  122. grpc_iomgr_object iomgr_object;
  123. };
  124. static void fd_global_init(void);
  125. static void fd_global_shutdown(void);
  126. /*******************************************************************************
  127. * Pollset Declarations
  128. */
  129. typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
  130. static const char* kick_state_string(kick_state st) {
  131. switch (st) {
  132. case UNKICKED:
  133. return "UNKICKED";
  134. case KICKED:
  135. return "KICKED";
  136. case DESIGNATED_POLLER:
  137. return "DESIGNATED_POLLER";
  138. }
  139. GPR_UNREACHABLE_CODE(return "UNKNOWN");
  140. }
  141. struct grpc_pollset_worker {
  142. kick_state state;
  143. int kick_state_mutator; // which line of code last changed kick state
  144. bool initialized_cv;
  145. grpc_pollset_worker* next;
  146. grpc_pollset_worker* prev;
  147. gpr_cv cv;
  148. grpc_closure_list schedule_on_end_work;
  149. };
  150. #define SET_KICK_STATE(worker, kick_state) \
  151. do { \
  152. (worker)->state = (kick_state); \
  153. (worker)->kick_state_mutator = __LINE__; \
  154. } while (false)
  155. #define MAX_NEIGHBORHOODS 1024
  156. typedef struct pollset_neighborhood {
  157. gpr_mu mu;
  158. grpc_pollset* active_root;
  159. char pad[GPR_CACHELINE_SIZE];
  160. } pollset_neighborhood;
  161. struct grpc_pollset {
  162. gpr_mu mu;
  163. pollset_neighborhood* neighborhood;
  164. bool reassigning_neighborhood;
  165. grpc_pollset_worker* root_worker;
  166. bool kicked_without_poller;
  167. /* Set to true if the pollset is observed to have no workers available to
  168. poll */
  169. bool seen_inactive;
  170. bool shutting_down; /* Is the pollset shutting down ? */
  171. grpc_closure* shutdown_closure; /* Called after after shutdown is complete */
  172. /* Number of workers who are *about-to* attach themselves to the pollset
  173. * worker list */
  174. int begin_refs;
  175. grpc_pollset* next;
  176. grpc_pollset* prev;
  177. };
  178. /*******************************************************************************
  179. * Pollset-set Declarations
  180. */
  181. struct grpc_pollset_set {
  182. char unused;
  183. };
  184. /*******************************************************************************
  185. * Common helpers
  186. */
  187. static bool append_error(grpc_error** composite, grpc_error* error,
  188. const char* desc) {
  189. if (error == GRPC_ERROR_NONE) return true;
  190. if (*composite == GRPC_ERROR_NONE) {
  191. *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
  192. }
  193. *composite = grpc_error_add_child(*composite, error);
  194. return false;
  195. }
  196. /*******************************************************************************
  197. * Fd Definitions
  198. */
  199. /* We need to keep a freelist not because of any concerns of malloc performance
  200. * but instead so that implementations with multiple threads in (for example)
  201. * epoll_wait deal with the race between pollset removal and incoming poll
  202. * notifications.
  203. *
  204. * The problem is that the poller ultimately holds a reference to this
  205. * object, so it is very difficult to know when is safe to free it, at least
  206. * without some expensive synchronization.
  207. *
  208. * If we keep the object freelisted, in the worst case losing this race just
  209. * becomes a spurious read notification on a reused fd.
  210. */
  211. /* The alarm system needs to be able to wakeup 'some poller' sometimes
  212. * (specifically when a new alarm needs to be triggered earlier than the next
  213. * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
  214. * case occurs. */
  215. static grpc_fd* fd_freelist = nullptr;
  216. static gpr_mu fd_freelist_mu;
  217. static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
  218. static void fd_global_shutdown(void) {
  219. gpr_mu_lock(&fd_freelist_mu);
  220. gpr_mu_unlock(&fd_freelist_mu);
  221. while (fd_freelist != nullptr) {
  222. grpc_fd* fd = fd_freelist;
  223. fd_freelist = fd_freelist->freelist_next;
  224. gpr_free(fd);
  225. }
  226. gpr_mu_destroy(&fd_freelist_mu);
  227. }
  228. static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
  229. grpc_fd* new_fd = nullptr;
  230. gpr_mu_lock(&fd_freelist_mu);
  231. if (fd_freelist != nullptr) {
  232. new_fd = fd_freelist;
  233. fd_freelist = fd_freelist->freelist_next;
  234. }
  235. gpr_mu_unlock(&fd_freelist_mu);
  236. if (new_fd == nullptr) {
  237. new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
  238. new_fd->read_closure.Init();
  239. new_fd->write_closure.Init();
  240. new_fd->error_closure.Init();
  241. }
  242. new_fd->fd = fd;
  243. new_fd->read_closure->InitEvent();
  244. new_fd->write_closure->InitEvent();
  245. new_fd->error_closure->InitEvent();
  246. gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
  247. new_fd->freelist_next = nullptr;
  248. char* fd_name;
  249. gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
  250. grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
  251. #ifndef NDEBUG
  252. if (grpc_trace_fd_refcount.enabled()) {
  253. gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
  254. }
  255. #endif
  256. gpr_free(fd_name);
  257. struct epoll_event ev;
  258. ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
  259. /* Use the least significant bit of ev.data.ptr to store track_err. We expect
  260. * the addresses to be word aligned. We need to store track_err to avoid
  261. * synchronization issues when accessing it after receiving an event.
  262. * Accessing fd would be a data race there because the fd might have been
  263. * returned to the free list at that point. */
  264. ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
  265. (track_err ? 1 : 0));
  266. if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
  267. gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
  268. }
  269. return new_fd;
  270. }
  271. static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
  272. /* if 'releasing_fd' is true, it means that we are going to detach the internal
  273. * fd from grpc_fd structure (i.e which means we should not be calling
  274. * shutdown() syscall on that fd) */
  275. static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
  276. bool releasing_fd) {
  277. if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
  278. if (!releasing_fd) {
  279. shutdown(fd->fd, SHUT_RDWR);
  280. }
  281. fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
  282. fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
  283. }
  284. GRPC_ERROR_UNREF(why);
  285. }
  286. /* Might be called multiple times */
  287. static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
  288. fd_shutdown_internal(fd, why, false);
  289. }
  290. static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
  291. const char* reason) {
  292. grpc_error* error = GRPC_ERROR_NONE;
  293. bool is_release_fd = (release_fd != nullptr);
  294. if (!fd->read_closure->IsShutdown()) {
  295. fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
  296. is_release_fd);
  297. }
  298. /* If release_fd is not NULL, we should be relinquishing control of the file
  299. descriptor fd->fd (but we still own the grpc_fd structure). */
  300. if (is_release_fd) {
  301. *release_fd = fd->fd;
  302. } else {
  303. close(fd->fd);
  304. }
  305. GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
  306. grpc_iomgr_unregister_object(&fd->iomgr_object);
  307. fd->read_closure->DestroyEvent();
  308. fd->write_closure->DestroyEvent();
  309. fd->error_closure->DestroyEvent();
  310. gpr_mu_lock(&fd_freelist_mu);
  311. fd->freelist_next = fd_freelist;
  312. fd_freelist = fd;
  313. gpr_mu_unlock(&fd_freelist_mu);
  314. }
  315. static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
  316. gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
  317. return (grpc_pollset*)notifier;
  318. }
  319. static bool fd_is_shutdown(grpc_fd* fd) {
  320. return fd->read_closure->IsShutdown();
  321. }
  322. static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
  323. fd->read_closure->NotifyOn(closure);
  324. }
  325. static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
  326. fd->write_closure->NotifyOn(closure);
  327. }
  328. static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
  329. fd->error_closure->NotifyOn(closure);
  330. }
  331. static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
  332. fd->read_closure->SetReady();
  333. /* Use release store to match with acquire load in fd_get_read_notifier */
  334. gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
  335. }
  336. static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
  337. static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
  338. /*******************************************************************************
  339. * Pollset Definitions
  340. */
  341. GPR_TLS_DECL(g_current_thread_pollset);
  342. GPR_TLS_DECL(g_current_thread_worker);
  343. /* The designated poller */
  344. static gpr_atm g_active_poller;
  345. static pollset_neighborhood* g_neighborhoods;
  346. static size_t g_num_neighborhoods;
  347. /* Return true if first in list */
  348. static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
  349. if (pollset->root_worker == nullptr) {
  350. pollset->root_worker = worker;
  351. worker->next = worker->prev = worker;
  352. return true;
  353. } else {
  354. worker->next = pollset->root_worker;
  355. worker->prev = worker->next->prev;
  356. worker->next->prev = worker;
  357. worker->prev->next = worker;
  358. return false;
  359. }
  360. }
  361. /* Return true if last in list */
  362. typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
  363. static worker_remove_result worker_remove(grpc_pollset* pollset,
  364. grpc_pollset_worker* worker) {
  365. if (worker == pollset->root_worker) {
  366. if (worker == worker->next) {
  367. pollset->root_worker = nullptr;
  368. return EMPTIED;
  369. } else {
  370. pollset->root_worker = worker->next;
  371. worker->prev->next = worker->next;
  372. worker->next->prev = worker->prev;
  373. return NEW_ROOT;
  374. }
  375. } else {
  376. worker->prev->next = worker->next;
  377. worker->next->prev = worker->prev;
  378. return REMOVED;
  379. }
  380. }
  381. static size_t choose_neighborhood(void) {
  382. return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
  383. }
  384. static grpc_error* pollset_global_init(void) {
  385. gpr_tls_init(&g_current_thread_pollset);
  386. gpr_tls_init(&g_current_thread_worker);
  387. gpr_atm_no_barrier_store(&g_active_poller, 0);
  388. global_wakeup_fd.read_fd = -1;
  389. grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
  390. if (err != GRPC_ERROR_NONE) return err;
  391. struct epoll_event ev;
  392. ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
  393. ev.data.ptr = &global_wakeup_fd;
  394. if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
  395. &ev) != 0) {
  396. return GRPC_OS_ERROR(errno, "epoll_ctl");
  397. }
  398. g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
  399. g_neighborhoods = static_cast<pollset_neighborhood*>(
  400. gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
  401. for (size_t i = 0; i < g_num_neighborhoods; i++) {
  402. gpr_mu_init(&g_neighborhoods[i].mu);
  403. }
  404. return GRPC_ERROR_NONE;
  405. }
  406. static void pollset_global_shutdown(void) {
  407. gpr_tls_destroy(&g_current_thread_pollset);
  408. gpr_tls_destroy(&g_current_thread_worker);
  409. if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
  410. for (size_t i = 0; i < g_num_neighborhoods; i++) {
  411. gpr_mu_destroy(&g_neighborhoods[i].mu);
  412. }
  413. gpr_free(g_neighborhoods);
  414. }
  415. static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
  416. gpr_mu_init(&pollset->mu);
  417. *mu = &pollset->mu;
  418. pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
  419. pollset->reassigning_neighborhood = false;
  420. pollset->root_worker = nullptr;
  421. pollset->kicked_without_poller = false;
  422. pollset->seen_inactive = true;
  423. pollset->shutting_down = false;
  424. pollset->shutdown_closure = nullptr;
  425. pollset->begin_refs = 0;
  426. pollset->next = pollset->prev = nullptr;
  427. }
  428. static void pollset_destroy(grpc_pollset* pollset) {
  429. gpr_mu_lock(&pollset->mu);
  430. if (!pollset->seen_inactive) {
  431. pollset_neighborhood* neighborhood = pollset->neighborhood;
  432. gpr_mu_unlock(&pollset->mu);
  433. retry_lock_neighborhood:
  434. gpr_mu_lock(&neighborhood->mu);
  435. gpr_mu_lock(&pollset->mu);
  436. if (!pollset->seen_inactive) {
  437. if (pollset->neighborhood != neighborhood) {
  438. gpr_mu_unlock(&neighborhood->mu);
  439. neighborhood = pollset->neighborhood;
  440. gpr_mu_unlock(&pollset->mu);
  441. goto retry_lock_neighborhood;
  442. }
  443. pollset->prev->next = pollset->next;
  444. pollset->next->prev = pollset->prev;
  445. if (pollset == pollset->neighborhood->active_root) {
  446. pollset->neighborhood->active_root =
  447. pollset->next == pollset ? nullptr : pollset->next;
  448. }
  449. }
  450. gpr_mu_unlock(&pollset->neighborhood->mu);
  451. }
  452. gpr_mu_unlock(&pollset->mu);
  453. gpr_mu_destroy(&pollset->mu);
  454. }
  455. static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
  456. GPR_TIMER_SCOPE("pollset_kick_all", 0);
  457. grpc_error* error = GRPC_ERROR_NONE;
  458. if (pollset->root_worker != nullptr) {
  459. grpc_pollset_worker* worker = pollset->root_worker;
  460. do {
  461. GRPC_STATS_INC_POLLSET_KICK();
  462. switch (worker->state) {
  463. case KICKED:
  464. GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
  465. break;
  466. case UNKICKED:
  467. SET_KICK_STATE(worker, KICKED);
  468. if (worker->initialized_cv) {
  469. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  470. gpr_cv_signal(&worker->cv);
  471. }
  472. break;
  473. case DESIGNATED_POLLER:
  474. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
  475. SET_KICK_STATE(worker, KICKED);
  476. append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
  477. "pollset_kick_all");
  478. break;
  479. }
  480. worker = worker->next;
  481. } while (worker != pollset->root_worker);
  482. }
  483. // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
  484. // in the else case
  485. return error;
  486. }
  487. static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
  488. if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
  489. pollset->begin_refs == 0) {
  490. GPR_TIMER_MARK("pollset_finish_shutdown", 0);
  491. GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
  492. pollset->shutdown_closure = nullptr;
  493. }
  494. }
  495. static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
  496. GPR_TIMER_SCOPE("pollset_shutdown", 0);
  497. GPR_ASSERT(pollset->shutdown_closure == nullptr);
  498. GPR_ASSERT(!pollset->shutting_down);
  499. pollset->shutdown_closure = closure;
  500. pollset->shutting_down = true;
  501. GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
  502. pollset_maybe_finish_shutdown(pollset);
  503. }
  504. static int poll_deadline_to_millis_timeout(grpc_millis millis) {
  505. if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
  506. grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
  507. if (delta > INT_MAX) {
  508. return INT_MAX;
  509. } else if (delta < 0) {
  510. return 0;
  511. } else {
  512. return static_cast<int>(delta);
  513. }
  514. }
  515. /* Process the epoll events found by do_epoll_wait() function.
  516. - g_epoll_set.cursor points to the index of the first event to be processed
  517. - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
  518. updates the g_epoll_set.cursor
  519. NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
  520. called by g_active_poller thread. So there is no need for synchronization
  521. when accessing fields in g_epoll_set */
  522. static grpc_error* process_epoll_events(grpc_pollset* pollset) {
  523. GPR_TIMER_SCOPE("process_epoll_events", 0);
  524. static const char* err_desc = "process_events";
  525. grpc_error* error = GRPC_ERROR_NONE;
  526. long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
  527. long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
  528. for (int idx = 0;
  529. (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
  530. idx++) {
  531. long c = cursor++;
  532. struct epoll_event* ev = &g_epoll_set.events[c];
  533. void* data_ptr = ev->data.ptr;
  534. if (data_ptr == &global_wakeup_fd) {
  535. append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
  536. err_desc);
  537. } else {
  538. grpc_fd* fd = reinterpret_cast<grpc_fd*>(
  539. reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
  540. bool track_err =
  541. reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
  542. bool cancel = (ev->events & EPOLLHUP) != 0;
  543. bool error = (ev->events & EPOLLERR) != 0;
  544. bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
  545. bool write_ev = (ev->events & EPOLLOUT) != 0;
  546. bool err_fallback = error && !track_err;
  547. if (error && !err_fallback) {
  548. fd_has_errors(fd);
  549. }
  550. if (read_ev || cancel || err_fallback) {
  551. fd_become_readable(fd, pollset);
  552. }
  553. if (write_ev || cancel || err_fallback) {
  554. fd_become_writable(fd);
  555. }
  556. }
  557. }
  558. gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
  559. return error;
  560. }
  561. /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
  562. "process" any of the events yet; that is done in process_epoll_events().
  563. *See process_epoll_events() function for more details.
  564. NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
  565. (i.e the designated poller thread) will be calling this function. So there is
  566. no need for any synchronization when accesing fields in g_epoll_set */
  567. static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
  568. GPR_TIMER_SCOPE("do_epoll_wait", 0);
  569. int r;
  570. int timeout = poll_deadline_to_millis_timeout(deadline);
  571. if (timeout != 0) {
  572. GRPC_SCHEDULING_START_BLOCKING_REGION;
  573. }
  574. do {
  575. GRPC_STATS_INC_SYSCALL_POLL();
  576. r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
  577. timeout);
  578. } while (r < 0 && errno == EINTR);
  579. if (timeout != 0) {
  580. GRPC_SCHEDULING_END_BLOCKING_REGION;
  581. }
  582. if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
  583. GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
  584. if (grpc_polling_trace.enabled()) {
  585. gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
  586. }
  587. gpr_atm_rel_store(&g_epoll_set.num_events, r);
  588. gpr_atm_rel_store(&g_epoll_set.cursor, 0);
  589. return GRPC_ERROR_NONE;
  590. }
  591. static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
  592. grpc_pollset_worker** worker_hdl,
  593. grpc_millis deadline) {
  594. GPR_TIMER_SCOPE("begin_worker", 0);
  595. if (worker_hdl != nullptr) *worker_hdl = worker;
  596. worker->initialized_cv = false;
  597. SET_KICK_STATE(worker, UNKICKED);
  598. worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
  599. pollset->begin_refs++;
  600. if (grpc_polling_trace.enabled()) {
  601. gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
  602. }
  603. if (pollset->seen_inactive) {
  604. // pollset has been observed to be inactive, we need to move back to the
  605. // active list
  606. bool is_reassigning = false;
  607. if (!pollset->reassigning_neighborhood) {
  608. is_reassigning = true;
  609. pollset->reassigning_neighborhood = true;
  610. pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
  611. }
  612. pollset_neighborhood* neighborhood = pollset->neighborhood;
  613. gpr_mu_unlock(&pollset->mu);
  614. // pollset unlocked: state may change (even worker->kick_state)
  615. retry_lock_neighborhood:
  616. gpr_mu_lock(&neighborhood->mu);
  617. gpr_mu_lock(&pollset->mu);
  618. if (grpc_polling_trace.enabled()) {
  619. gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
  620. pollset, worker, kick_state_string(worker->state),
  621. is_reassigning);
  622. }
  623. if (pollset->seen_inactive) {
  624. if (neighborhood != pollset->neighborhood) {
  625. gpr_mu_unlock(&neighborhood->mu);
  626. neighborhood = pollset->neighborhood;
  627. gpr_mu_unlock(&pollset->mu);
  628. goto retry_lock_neighborhood;
  629. }
  630. /* In the brief time we released the pollset locks above, the worker MAY
  631. have been kicked. In this case, the worker should get out of this
  632. pollset ASAP and hence this should neither add the pollset to
  633. neighborhood nor mark the pollset as active.
  634. On a side note, the only way a worker's kick state could have changed
  635. at this point is if it were "kicked specifically". Since the worker has
  636. not added itself to the pollset yet (by calling worker_insert()), it is
  637. not visible in the "kick any" path yet */
  638. if (worker->state == UNKICKED) {
  639. pollset->seen_inactive = false;
  640. if (neighborhood->active_root == nullptr) {
  641. neighborhood->active_root = pollset->next = pollset->prev = pollset;
  642. /* Make this the designated poller if there isn't one already */
  643. if (worker->state == UNKICKED &&
  644. gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
  645. SET_KICK_STATE(worker, DESIGNATED_POLLER);
  646. }
  647. } else {
  648. pollset->next = neighborhood->active_root;
  649. pollset->prev = pollset->next->prev;
  650. pollset->next->prev = pollset->prev->next = pollset;
  651. }
  652. }
  653. }
  654. if (is_reassigning) {
  655. GPR_ASSERT(pollset->reassigning_neighborhood);
  656. pollset->reassigning_neighborhood = false;
  657. }
  658. gpr_mu_unlock(&neighborhood->mu);
  659. }
  660. worker_insert(pollset, worker);
  661. pollset->begin_refs--;
  662. if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
  663. GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
  664. worker->initialized_cv = true;
  665. gpr_cv_init(&worker->cv);
  666. while (worker->state == UNKICKED && !pollset->shutting_down) {
  667. if (grpc_polling_trace.enabled()) {
  668. gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
  669. pollset, worker, kick_state_string(worker->state),
  670. pollset->shutting_down);
  671. }
  672. if (gpr_cv_wait(&worker->cv, &pollset->mu,
  673. grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
  674. worker->state == UNKICKED) {
  675. /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
  676. received a kick */
  677. SET_KICK_STATE(worker, KICKED);
  678. }
  679. }
  680. grpc_core::ExecCtx::Get()->InvalidateNow();
  681. }
  682. if (grpc_polling_trace.enabled()) {
  683. gpr_log(GPR_INFO,
  684. "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
  685. "kicked_without_poller: %d",
  686. pollset, worker, kick_state_string(worker->state),
  687. pollset->shutting_down, pollset->kicked_without_poller);
  688. }
  689. /* We release pollset lock in this function at a couple of places:
  690. * 1. Briefly when assigning pollset to a neighborhood
  691. * 2. When doing gpr_cv_wait()
  692. * It is possible that 'kicked_without_poller' was set to true during (1) and
  693. * 'shutting_down' is set to true during (1) or (2). If either of them is
  694. * true, this worker cannot do polling */
  695. /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
  696. * case; especially when the worker is the DESIGNATED_POLLER */
  697. if (pollset->kicked_without_poller) {
  698. pollset->kicked_without_poller = false;
  699. return false;
  700. }
  701. return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
  702. }
  703. static bool check_neighborhood_for_available_poller(
  704. pollset_neighborhood* neighborhood) {
  705. GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
  706. bool found_worker = false;
  707. do {
  708. grpc_pollset* inspect = neighborhood->active_root;
  709. if (inspect == nullptr) {
  710. break;
  711. }
  712. gpr_mu_lock(&inspect->mu);
  713. GPR_ASSERT(!inspect->seen_inactive);
  714. grpc_pollset_worker* inspect_worker = inspect->root_worker;
  715. if (inspect_worker != nullptr) {
  716. do {
  717. switch (inspect_worker->state) {
  718. case UNKICKED:
  719. if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
  720. (gpr_atm)inspect_worker)) {
  721. if (grpc_polling_trace.enabled()) {
  722. gpr_log(GPR_INFO, " .. choose next poller to be %p",
  723. inspect_worker);
  724. }
  725. SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
  726. if (inspect_worker->initialized_cv) {
  727. GPR_TIMER_MARK("signal worker", 0);
  728. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  729. gpr_cv_signal(&inspect_worker->cv);
  730. }
  731. } else {
  732. if (grpc_polling_trace.enabled()) {
  733. gpr_log(GPR_INFO, " .. beaten to choose next poller");
  734. }
  735. }
  736. // even if we didn't win the cas, there's a worker, we can stop
  737. found_worker = true;
  738. break;
  739. case KICKED:
  740. break;
  741. case DESIGNATED_POLLER:
  742. found_worker = true; // ok, so someone else found the worker, but
  743. // we'll accept that
  744. break;
  745. }
  746. inspect_worker = inspect_worker->next;
  747. } while (!found_worker && inspect_worker != inspect->root_worker);
  748. }
  749. if (!found_worker) {
  750. if (grpc_polling_trace.enabled()) {
  751. gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
  752. }
  753. inspect->seen_inactive = true;
  754. if (inspect == neighborhood->active_root) {
  755. neighborhood->active_root =
  756. inspect->next == inspect ? nullptr : inspect->next;
  757. }
  758. inspect->next->prev = inspect->prev;
  759. inspect->prev->next = inspect->next;
  760. inspect->next = inspect->prev = nullptr;
  761. }
  762. gpr_mu_unlock(&inspect->mu);
  763. } while (!found_worker);
  764. return found_worker;
  765. }
  766. static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
  767. grpc_pollset_worker** worker_hdl) {
  768. GPR_TIMER_SCOPE("end_worker", 0);
  769. if (grpc_polling_trace.enabled()) {
  770. gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
  771. }
  772. if (worker_hdl != nullptr) *worker_hdl = nullptr;
  773. /* Make sure we appear kicked */
  774. SET_KICK_STATE(worker, KICKED);
  775. grpc_closure_list_move(&worker->schedule_on_end_work,
  776. grpc_core::ExecCtx::Get()->closure_list());
  777. if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
  778. if (worker->next != worker && worker->next->state == UNKICKED) {
  779. if (grpc_polling_trace.enabled()) {
  780. gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
  781. }
  782. GPR_ASSERT(worker->next->initialized_cv);
  783. gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
  784. SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
  785. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  786. gpr_cv_signal(&worker->next->cv);
  787. if (grpc_core::ExecCtx::Get()->HasWork()) {
  788. gpr_mu_unlock(&pollset->mu);
  789. grpc_core::ExecCtx::Get()->Flush();
  790. gpr_mu_lock(&pollset->mu);
  791. }
  792. } else {
  793. gpr_atm_no_barrier_store(&g_active_poller, 0);
  794. size_t poller_neighborhood_idx =
  795. static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
  796. gpr_mu_unlock(&pollset->mu);
  797. bool found_worker = false;
  798. bool scan_state[MAX_NEIGHBORHOODS];
  799. for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
  800. pollset_neighborhood* neighborhood =
  801. &g_neighborhoods[(poller_neighborhood_idx + i) %
  802. g_num_neighborhoods];
  803. if (gpr_mu_trylock(&neighborhood->mu)) {
  804. found_worker = check_neighborhood_for_available_poller(neighborhood);
  805. gpr_mu_unlock(&neighborhood->mu);
  806. scan_state[i] = true;
  807. } else {
  808. scan_state[i] = false;
  809. }
  810. }
  811. for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
  812. if (scan_state[i]) continue;
  813. pollset_neighborhood* neighborhood =
  814. &g_neighborhoods[(poller_neighborhood_idx + i) %
  815. g_num_neighborhoods];
  816. gpr_mu_lock(&neighborhood->mu);
  817. found_worker = check_neighborhood_for_available_poller(neighborhood);
  818. gpr_mu_unlock(&neighborhood->mu);
  819. }
  820. grpc_core::ExecCtx::Get()->Flush();
  821. gpr_mu_lock(&pollset->mu);
  822. }
  823. } else if (grpc_core::ExecCtx::Get()->HasWork()) {
  824. gpr_mu_unlock(&pollset->mu);
  825. grpc_core::ExecCtx::Get()->Flush();
  826. gpr_mu_lock(&pollset->mu);
  827. }
  828. if (worker->initialized_cv) {
  829. gpr_cv_destroy(&worker->cv);
  830. }
  831. if (grpc_polling_trace.enabled()) {
  832. gpr_log(GPR_INFO, " .. remove worker");
  833. }
  834. if (EMPTIED == worker_remove(pollset, worker)) {
  835. pollset_maybe_finish_shutdown(pollset);
  836. }
  837. GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
  838. }
  839. /* pollset->po.mu lock must be held by the caller before calling this.
  840. The function pollset_work() may temporarily release the lock (pollset->po.mu)
  841. during the course of its execution but it will always re-acquire the lock and
  842. ensure that it is held by the time the function returns */
  843. static grpc_error* pollset_work(grpc_pollset* ps,
  844. grpc_pollset_worker** worker_hdl,
  845. grpc_millis deadline) {
  846. GPR_TIMER_SCOPE("pollset_work", 0);
  847. grpc_pollset_worker worker;
  848. grpc_error* error = GRPC_ERROR_NONE;
  849. static const char* err_desc = "pollset_work";
  850. if (ps->kicked_without_poller) {
  851. ps->kicked_without_poller = false;
  852. return GRPC_ERROR_NONE;
  853. }
  854. if (begin_worker(ps, &worker, worker_hdl, deadline)) {
  855. gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
  856. gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
  857. GPR_ASSERT(!ps->shutting_down);
  858. GPR_ASSERT(!ps->seen_inactive);
  859. gpr_mu_unlock(&ps->mu); /* unlock */
  860. /* This is the designated polling thread at this point and should ideally do
  861. polling. However, if there are unprocessed events left from a previous
  862. call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
  863. process the pending epoll events.
  864. The reason for decoupling do_epoll_wait and process_epoll_events is to
  865. better distrubute the work (i.e handling epoll events) across multiple
  866. threads
  867. process_epoll_events() returns very quickly: It just queues the work on
  868. exec_ctx but does not execute it (the actual exectution or more
  869. accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
  870. AFTER selecting a designated poller). So we are not waiting long periods
  871. without a designated poller */
  872. if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
  873. gpr_atm_acq_load(&g_epoll_set.num_events)) {
  874. append_error(&error, do_epoll_wait(ps, deadline), err_desc);
  875. }
  876. append_error(&error, process_epoll_events(ps), err_desc);
  877. gpr_mu_lock(&ps->mu); /* lock */
  878. gpr_tls_set(&g_current_thread_worker, 0);
  879. } else {
  880. gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
  881. }
  882. end_worker(ps, &worker, worker_hdl);
  883. gpr_tls_set(&g_current_thread_pollset, 0);
  884. return error;
  885. }
  886. static grpc_error* pollset_kick(grpc_pollset* pollset,
  887. grpc_pollset_worker* specific_worker) {
  888. GPR_TIMER_SCOPE("pollset_kick", 0);
  889. GRPC_STATS_INC_POLLSET_KICK();
  890. grpc_error* ret_err = GRPC_ERROR_NONE;
  891. if (grpc_polling_trace.enabled()) {
  892. gpr_strvec log;
  893. gpr_strvec_init(&log);
  894. char* tmp;
  895. gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
  896. specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
  897. (void*)gpr_tls_get(&g_current_thread_worker),
  898. pollset->root_worker);
  899. gpr_strvec_add(&log, tmp);
  900. if (pollset->root_worker != nullptr) {
  901. gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
  902. kick_state_string(pollset->root_worker->state),
  903. pollset->root_worker->next,
  904. kick_state_string(pollset->root_worker->next->state));
  905. gpr_strvec_add(&log, tmp);
  906. }
  907. if (specific_worker != nullptr) {
  908. gpr_asprintf(&tmp, " worker_kick_state=%s",
  909. kick_state_string(specific_worker->state));
  910. gpr_strvec_add(&log, tmp);
  911. }
  912. tmp = gpr_strvec_flatten(&log, nullptr);
  913. gpr_strvec_destroy(&log);
  914. gpr_log(GPR_DEBUG, "%s", tmp);
  915. gpr_free(tmp);
  916. }
  917. if (specific_worker == nullptr) {
  918. if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
  919. grpc_pollset_worker* root_worker = pollset->root_worker;
  920. if (root_worker == nullptr) {
  921. GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
  922. pollset->kicked_without_poller = true;
  923. if (grpc_polling_trace.enabled()) {
  924. gpr_log(GPR_INFO, " .. kicked_without_poller");
  925. }
  926. goto done;
  927. }
  928. grpc_pollset_worker* next_worker = root_worker->next;
  929. if (root_worker->state == KICKED) {
  930. GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
  931. if (grpc_polling_trace.enabled()) {
  932. gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
  933. }
  934. SET_KICK_STATE(root_worker, KICKED);
  935. goto done;
  936. } else if (next_worker->state == KICKED) {
  937. GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
  938. if (grpc_polling_trace.enabled()) {
  939. gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
  940. }
  941. SET_KICK_STATE(next_worker, KICKED);
  942. goto done;
  943. } else if (root_worker ==
  944. next_worker && // only try and wake up a poller if
  945. // there is no next worker
  946. root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
  947. &g_active_poller)) {
  948. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
  949. if (grpc_polling_trace.enabled()) {
  950. gpr_log(GPR_INFO, " .. kicked %p", root_worker);
  951. }
  952. SET_KICK_STATE(root_worker, KICKED);
  953. ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
  954. goto done;
  955. } else if (next_worker->state == UNKICKED) {
  956. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  957. if (grpc_polling_trace.enabled()) {
  958. gpr_log(GPR_INFO, " .. kicked %p", next_worker);
  959. }
  960. GPR_ASSERT(next_worker->initialized_cv);
  961. SET_KICK_STATE(next_worker, KICKED);
  962. gpr_cv_signal(&next_worker->cv);
  963. goto done;
  964. } else if (next_worker->state == DESIGNATED_POLLER) {
  965. if (root_worker->state != DESIGNATED_POLLER) {
  966. if (grpc_polling_trace.enabled()) {
  967. gpr_log(
  968. GPR_INFO,
  969. " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
  970. root_worker, root_worker->initialized_cv, next_worker);
  971. }
  972. SET_KICK_STATE(root_worker, KICKED);
  973. if (root_worker->initialized_cv) {
  974. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  975. gpr_cv_signal(&root_worker->cv);
  976. }
  977. goto done;
  978. } else {
  979. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
  980. if (grpc_polling_trace.enabled()) {
  981. gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
  982. root_worker);
  983. }
  984. SET_KICK_STATE(next_worker, KICKED);
  985. ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
  986. goto done;
  987. }
  988. } else {
  989. GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
  990. GPR_ASSERT(next_worker->state == KICKED);
  991. SET_KICK_STATE(next_worker, KICKED);
  992. goto done;
  993. }
  994. } else {
  995. GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
  996. if (grpc_polling_trace.enabled()) {
  997. gpr_log(GPR_INFO, " .. kicked while waking up");
  998. }
  999. goto done;
  1000. }
  1001. GPR_UNREACHABLE_CODE(goto done);
  1002. }
  1003. if (specific_worker->state == KICKED) {
  1004. if (grpc_polling_trace.enabled()) {
  1005. gpr_log(GPR_INFO, " .. specific worker already kicked");
  1006. }
  1007. goto done;
  1008. } else if (gpr_tls_get(&g_current_thread_worker) ==
  1009. (intptr_t)specific_worker) {
  1010. GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
  1011. if (grpc_polling_trace.enabled()) {
  1012. gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
  1013. }
  1014. SET_KICK_STATE(specific_worker, KICKED);
  1015. goto done;
  1016. } else if (specific_worker ==
  1017. (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
  1018. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
  1019. if (grpc_polling_trace.enabled()) {
  1020. gpr_log(GPR_INFO, " .. kick active poller");
  1021. }
  1022. SET_KICK_STATE(specific_worker, KICKED);
  1023. ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
  1024. goto done;
  1025. } else if (specific_worker->initialized_cv) {
  1026. GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
  1027. if (grpc_polling_trace.enabled()) {
  1028. gpr_log(GPR_INFO, " .. kick waiting worker");
  1029. }
  1030. SET_KICK_STATE(specific_worker, KICKED);
  1031. gpr_cv_signal(&specific_worker->cv);
  1032. goto done;
  1033. } else {
  1034. GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
  1035. if (grpc_polling_trace.enabled()) {
  1036. gpr_log(GPR_INFO, " .. kick non-waiting worker");
  1037. }
  1038. SET_KICK_STATE(specific_worker, KICKED);
  1039. goto done;
  1040. }
  1041. done:
  1042. return ret_err;
  1043. }
  1044. static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
  1045. /*******************************************************************************
  1046. * Pollset-set Definitions
  1047. */
  1048. static grpc_pollset_set* pollset_set_create(void) {
  1049. return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
  1050. }
  1051. static void pollset_set_destroy(grpc_pollset_set* pss) {}
  1052. static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
  1053. static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
  1054. static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
  1055. static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
  1056. static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
  1057. grpc_pollset_set* item) {}
  1058. static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
  1059. grpc_pollset_set* item) {}
  1060. /*******************************************************************************
  1061. * Event engine binding
  1062. */
  1063. static void shutdown_engine(void) {
  1064. fd_global_shutdown();
  1065. pollset_global_shutdown();
  1066. epoll_set_shutdown();
  1067. }
  1068. static const grpc_event_engine_vtable vtable = {
  1069. sizeof(grpc_pollset),
  1070. true,
  1071. fd_create,
  1072. fd_wrapped_fd,
  1073. fd_orphan,
  1074. fd_shutdown,
  1075. fd_notify_on_read,
  1076. fd_notify_on_write,
  1077. fd_notify_on_error,
  1078. fd_is_shutdown,
  1079. fd_get_read_notifier_pollset,
  1080. pollset_init,
  1081. pollset_shutdown,
  1082. pollset_destroy,
  1083. pollset_work,
  1084. pollset_kick,
  1085. pollset_add_fd,
  1086. pollset_set_create,
  1087. pollset_set_destroy,
  1088. pollset_set_add_pollset,
  1089. pollset_set_del_pollset,
  1090. pollset_set_add_pollset_set,
  1091. pollset_set_del_pollset_set,
  1092. pollset_set_add_fd,
  1093. pollset_set_del_fd,
  1094. shutdown_engine,
  1095. };
  1096. /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
  1097. * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
  1098. * support is available */
  1099. const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
  1100. if (!grpc_has_wakeup_fd()) {
  1101. gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
  1102. return nullptr;
  1103. }
  1104. if (!epoll_set_init()) {
  1105. return nullptr;
  1106. }
  1107. fd_global_init();
  1108. if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
  1109. fd_global_shutdown();
  1110. epoll_set_shutdown();
  1111. return nullptr;
  1112. }
  1113. return &vtable;
  1114. }
  1115. #else /* defined(GRPC_LINUX_EPOLL) */
  1116. #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
  1117. #include "src/core/lib/iomgr/ev_epoll1_linux.h"
  1118. /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
  1119. * NULL */
  1120. const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
  1121. return nullptr;
  1122. }
  1123. #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
  1124. #endif /* !defined(GRPC_LINUX_EPOLL) */