exec_ctx.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
  19. #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
  20. #include <grpc/support/port_platform.h>
  21. #include <limits>
  22. #include <grpc/impl/codegen/grpc_types.h>
  23. #include <grpc/support/atm.h>
  24. #include <grpc/support/cpu.h>
  25. #include <grpc/support/log.h>
  26. #include "src/core/lib/gpr/time_precise.h"
  27. #include "src/core/lib/gpr/tls.h"
  28. #include "src/core/lib/gprpp/debug_location.h"
  29. #include "src/core/lib/gprpp/fork.h"
  30. #include "src/core/lib/iomgr/closure.h"
  31. typedef int64_t grpc_millis;
  32. #define GRPC_MILLIS_INF_FUTURE INT64_MAX
  33. #define GRPC_MILLIS_INF_PAST INT64_MIN
  34. /** A combiner represents a list of work to be executed later.
  35. Forward declared here to avoid a circular dependency with combiner.h. */
  36. typedef struct grpc_combiner grpc_combiner;
  37. /* This exec_ctx is ready to return: either pre-populated, or cached as soon as
  38. the finish_check returns true */
  39. #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1
  40. /* The exec_ctx's thread is (potentially) owned by a call or channel: care
  41. should be given to not delete said call/channel from this exec_ctx */
  42. #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2
  43. /* This exec ctx was initialized by an internal thread, and should not
  44. be counted by fork handlers */
  45. #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
  46. /* This application callback exec ctx was initialized by an internal thread, and
  47. should not be counted by fork handlers */
  48. #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
  49. gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
  50. grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts);
  51. grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts);
  52. grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles);
  53. grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles);
  54. namespace grpc_core {
  55. class Combiner;
  56. /** Execution context.
  57. * A bag of data that collects information along a callstack.
  58. * It is created on the stack at core entry points (public API or iomgr), and
  59. * stored internally as a thread-local variable.
  60. *
  61. * Generally, to create an exec_ctx instance, add the following line at the top
  62. * of the public API entry point or at the start of a thread's work function :
  63. *
  64. * grpc_core::ExecCtx exec_ctx;
  65. *
  66. * Access the created ExecCtx instance using :
  67. * grpc_core::ExecCtx::Get()
  68. *
  69. * Specific responsibilities (this may grow in the future):
  70. * - track a list of core work that needs to be delayed until the base of the
  71. * call stack (this provides a convenient mechanism to run callbacks
  72. * without worrying about locking issues)
  73. * - provide a decision maker (via IsReadyToFinish) that provides a
  74. * signal as to whether a borrowed thread should continue to do work or
  75. * should actively try to finish up and get this thread back to its owner
  76. *
  77. * CONVENTIONS:
  78. * - Instance of this must ALWAYS be constructed on the stack, never
  79. * heap allocated.
  80. * - Do not pass exec_ctx as a parameter to a function. Always access it using
  81. * grpc_core::ExecCtx::Get().
  82. * - NOTE: In the future, the convention is likely to change to allow only one
  83. * ExecCtx on a thread's stack at the same time. The TODO below
  84. * discusses this plan in more detail.
  85. *
  86. * TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time.
  87. * Stage 1: If a new one is created on the stack, it should just
  88. * pass-through to the underlying ExecCtx deeper in the thread's
  89. * stack.
  90. * Stage 2: Assert if a 2nd one is ever created on the stack
  91. * since that implies a core re-entry outside of application
  92. * callbacks.
  93. */
  94. class ExecCtx {
  95. public:
  96. /** Default Constructor */
  97. ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
  98. grpc_core::Fork::IncExecCtxCount();
  99. Set(this);
  100. }
  101. /** Parameterised Constructor */
  102. explicit ExecCtx(uintptr_t fl) : flags_(fl) {
  103. if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
  104. grpc_core::Fork::IncExecCtxCount();
  105. }
  106. Set(this);
  107. }
  108. /** Destructor */
  109. virtual ~ExecCtx() {
  110. flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
  111. Flush();
  112. Set(last_exec_ctx_);
  113. if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
  114. grpc_core::Fork::DecExecCtxCount();
  115. }
  116. }
  117. /** Disallow copy and assignment operators */
  118. ExecCtx(const ExecCtx&) = delete;
  119. ExecCtx& operator=(const ExecCtx&) = delete;
  120. unsigned starting_cpu() {
  121. if (starting_cpu_ == std::numeric_limits<unsigned>::max()) {
  122. starting_cpu_ = gpr_cpu_current_cpu();
  123. }
  124. return starting_cpu_;
  125. }
  126. struct CombinerData {
  127. /* currently active combiner: updated only via combiner.c */
  128. Combiner* active_combiner;
  129. /* last active combiner in the active combiner list */
  130. Combiner* last_combiner;
  131. };
  132. /** Only to be used by grpc-combiner code */
  133. CombinerData* combiner_data() { return &combiner_data_; }
  134. /** Return pointer to grpc_closure_list */
  135. grpc_closure_list* closure_list() { return &closure_list_; }
  136. /** Return flags */
  137. uintptr_t flags() { return flags_; }
  138. /** Checks if there is work to be done */
  139. bool HasWork() {
  140. return combiner_data_.active_combiner != nullptr ||
  141. !grpc_closure_list_empty(closure_list_);
  142. }
  143. /** Flush any work that has been enqueued onto this grpc_exec_ctx.
  144. * Caller must guarantee that no interfering locks are held.
  145. * Returns true if work was performed, false otherwise.
  146. */
  147. bool Flush();
  148. /** Returns true if we'd like to leave this execution context as soon as
  149. * possible: useful for deciding whether to do something more or not
  150. * depending on outside context.
  151. */
  152. bool IsReadyToFinish() {
  153. if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) {
  154. if (CheckReadyToFinish()) {
  155. flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
  156. return true;
  157. }
  158. return false;
  159. } else {
  160. return true;
  161. }
  162. }
  163. /** Returns the stored current time relative to start if valid,
  164. * otherwise refreshes the stored time, sets it valid and returns the new
  165. * value.
  166. */
  167. grpc_millis Now();
  168. /** Invalidates the stored time value. A new time value will be set on calling
  169. * Now().
  170. */
  171. void InvalidateNow() { now_is_valid_ = false; }
  172. /** To be used only by shutdown code in iomgr */
  173. void SetNowIomgrShutdown() {
  174. now_ = GRPC_MILLIS_INF_FUTURE;
  175. now_is_valid_ = true;
  176. }
  177. /** To be used only for testing.
  178. * Sets the now value.
  179. */
  180. void TestOnlySetNow(grpc_millis new_val) {
  181. now_ = new_val;
  182. now_is_valid_ = true;
  183. }
  184. static void TestOnlyGlobalInit(gpr_timespec new_val);
  185. /** Global initialization for ExecCtx. Called by iomgr. */
  186. static void GlobalInit(void);
  187. /** Global shutdown for ExecCtx. Called by iomgr. */
  188. static void GlobalShutdown(void) { gpr_tls_destroy(&exec_ctx_); }
  189. /** Gets pointer to current exec_ctx. */
  190. static ExecCtx* Get() {
  191. return reinterpret_cast<ExecCtx*>(gpr_tls_get(&exec_ctx_));
  192. }
  193. static void Set(ExecCtx* exec_ctx) {
  194. gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
  195. }
  196. static void Run(const DebugLocation& location, grpc_closure* closure,
  197. grpc_error* error);
  198. static void RunList(const DebugLocation& location, grpc_closure_list* list);
  199. protected:
  200. /** Check if ready to finish. */
  201. virtual bool CheckReadyToFinish() { return false; }
  202. /** Disallow delete on ExecCtx. */
  203. static void operator delete(void* /* p */) { abort(); }
  204. private:
  205. /** Set exec_ctx_ to exec_ctx. */
  206. grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
  207. CombinerData combiner_data_ = {nullptr, nullptr};
  208. uintptr_t flags_;
  209. unsigned starting_cpu_ = std::numeric_limits<unsigned>::max();
  210. bool now_is_valid_ = false;
  211. grpc_millis now_ = 0;
  212. GPR_TLS_CLASS_DECL(exec_ctx_);
  213. ExecCtx* last_exec_ctx_ = Get();
  214. };
  215. /** Application-callback execution context.
  216. * A bag of data that collects information along a callstack.
  217. * It is created on the stack at core entry points, and stored internally
  218. * as a thread-local variable.
  219. *
  220. * There are three key differences between this structure and ExecCtx:
  221. * 1. ApplicationCallbackExecCtx builds a list of application-level
  222. * callbacks, but ExecCtx builds a list of internal callbacks to invoke.
  223. * 2. ApplicationCallbackExecCtx invokes its callbacks only at destruction;
  224. * there is no explicit Flush method.
  225. * 3. If more than one ApplicationCallbackExecCtx is created on the thread's
  226. * stack, only the one closest to the base of the stack is actually
  227. * active and this is the only one that enqueues application callbacks.
  228. * (Unlike ExecCtx, it is not feasible to prevent multiple of these on the
  229. * stack since the executing application callback may itself enter core.
  230. * However, the new one created will just pass callbacks through to the
  231. * base one and those will not be executed until the return to the
  232. * destructor of the base one, preventing unlimited stack growth.)
  233. *
  234. * This structure exists because application callbacks may themselves cause a
  235. * core re-entry (e.g., through a public API call) and if that call in turn
  236. * causes another application-callback, there could be arbitrarily growing
  237. * stacks of core re-entries. Instead, any application callbacks instead should
  238. * not be invoked until other core work is done and other application callbacks
  239. * have completed. To accomplish this, any application callback should be
  240. * enqueued using grpc_core::ApplicationCallbackExecCtx::Enqueue .
  241. *
  242. * CONVENTIONS:
  243. * - Instances of this must ALWAYS be constructed on the stack, never
  244. * heap allocated.
  245. * - Instances of this are generally constructed before ExecCtx when needed.
  246. * The only exception is for ExecCtx's that are explicitly flushed and
  247. * that survive beyond the scope of the function that can cause application
  248. * callbacks to be invoked (e.g., in the timer thread).
  249. *
  250. * Generally, core entry points that may trigger application-level callbacks
  251. * will have the following declarations:
  252. *
  253. * grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
  254. * grpc_core::ExecCtx exec_ctx;
  255. *
  256. * This ordering is important to make sure that the ApplicationCallbackExecCtx
  257. * is destroyed after the ExecCtx (to prevent the re-entry problem described
  258. * above, as well as making sure that ExecCtx core callbacks are invoked first)
  259. *
  260. */
  261. class ApplicationCallbackExecCtx {
  262. public:
  263. /** Default Constructor */
  264. ApplicationCallbackExecCtx() { Set(this, flags_); }
  265. /** Parameterised Constructor */
  266. explicit ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) {
  267. Set(this, flags_);
  268. }
  269. ~ApplicationCallbackExecCtx() {
  270. if (reinterpret_cast<ApplicationCallbackExecCtx*>(
  271. gpr_tls_get(&callback_exec_ctx_)) == this) {
  272. while (head_ != nullptr) {
  273. auto* f = head_;
  274. head_ = f->internal_next;
  275. if (f->internal_next == nullptr) {
  276. tail_ = nullptr;
  277. }
  278. (*f->functor_run)(f, f->internal_success);
  279. }
  280. gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
  281. if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
  282. grpc_core::Fork::DecExecCtxCount();
  283. }
  284. } else {
  285. GPR_DEBUG_ASSERT(head_ == nullptr);
  286. GPR_DEBUG_ASSERT(tail_ == nullptr);
  287. }
  288. }
  289. uintptr_t Flags() { return flags_; }
  290. static ApplicationCallbackExecCtx* Get() {
  291. return reinterpret_cast<ApplicationCallbackExecCtx*>(
  292. gpr_tls_get(&callback_exec_ctx_));
  293. }
  294. static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
  295. if (Get() == nullptr) {
  296. if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
  297. grpc_core::Fork::IncExecCtxCount();
  298. }
  299. gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
  300. }
  301. }
  302. static void Enqueue(grpc_experimental_completion_queue_functor* functor,
  303. int is_success) {
  304. functor->internal_success = is_success;
  305. functor->internal_next = nullptr;
  306. ApplicationCallbackExecCtx* ctx = Get();
  307. if (ctx->head_ == nullptr) {
  308. ctx->head_ = functor;
  309. }
  310. if (ctx->tail_ != nullptr) {
  311. ctx->tail_->internal_next = functor;
  312. }
  313. ctx->tail_ = functor;
  314. }
  315. /** Global initialization for ApplicationCallbackExecCtx. Called by init. */
  316. static void GlobalInit(void) { gpr_tls_init(&callback_exec_ctx_); }
  317. /** Global shutdown for ApplicationCallbackExecCtx. Called by init. */
  318. static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); }
  319. static bool Available() { return Get() != nullptr; }
  320. private:
  321. uintptr_t flags_{0u};
  322. grpc_experimental_completion_queue_functor* head_{nullptr};
  323. grpc_experimental_completion_queue_functor* tail_{nullptr};
  324. GPR_TLS_CLASS_DECL(callback_exec_ctx_);
  325. };
  326. } // namespace grpc_core
  327. #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */