|
|
@@ -62,6 +62,11 @@ struct grpc_combiner {
|
|
|
grpc_closure_scheduler uncovered_finally_scheduler;
|
|
|
grpc_closure_scheduler covered_finally_scheduler;
|
|
|
gpr_mpscq queue;
|
|
|
+ // either:
|
|
|
+ // a pointer to the initiating exec ctx if that is the only exec_ctx that has
|
|
|
+ // ever queued to this combiner, or NULL. If this is non-null, it's not
|
|
|
+ // dereferencable (since the initiating exec_ctx may have gone out of scope)
|
|
|
+ gpr_atm initiating_exec_ctx_or_null;
|
|
|
// state is:
|
|
|
// lower bit - zero if orphaned (STATE_UNORPHANED)
|
|
|
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
|
|
|
@@ -116,14 +121,23 @@ static error_data unpack_error_data(uintptr_t p) {
|
|
|
}
|
|
|
|
|
|
static bool is_covered_by_poller(grpc_combiner *lock) {
|
|
|
- return lock->final_list_covered_by_poller ||
|
|
|
- gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
|
|
|
+ return (lock->final_list_covered_by_poller ||
|
|
|
+ gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0);
|
|
|
}
|
|
|
|
|
|
-#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
|
|
|
-#define IS_COVERED_BY_POLLER_ARGS(lock) \
|
|
|
- (lock)->final_list_covered_by_poller, \
|
|
|
- gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
|
|
|
+static bool can_offload(grpc_combiner *lock) {
|
|
|
+ return lock->optional_workqueue != NULL &&
|
|
|
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0 &&
|
|
|
+ is_covered_by_poller(lock);
|
|
|
+}
|
|
|
+
|
|
|
+#define CAN_OFFLOAD_POLLER_FMT \
|
|
|
+ "(wq=%p contended=%d final=%d elems=%" PRIdPTR ")->%d"
|
|
|
+#define CAN_OFFLOAD_POLLER_ARGS(lock) \
|
|
|
+ lock->optional_workqueue, \
|
|
|
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) != 0, \
|
|
|
+ (lock)->final_list_covered_by_poller, \
|
|
|
+ gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
|
|
|
is_covered_by_poller((lock))
|
|
|
|
|
|
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
|
|
|
@@ -216,6 +230,21 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
|
|
|
GRPC_COMBINER_TRACE(gpr_log(
|
|
|
GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
|
|
|
cl, covered_by_poller, last));
|
|
|
+ if (last == 1) {
|
|
|
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
|
|
|
+ (gpr_atm)exec_ctx);
|
|
|
+ // first element on this list: add it to the list of combiner locks
|
|
|
+ // executing within this exec_ctx
|
|
|
+ push_last_on_exec_ctx(exec_ctx, lock);
|
|
|
+ } else {
|
|
|
+ // there may be a race with setting here: if that happens, we may delay
|
|
|
+ // offload for one or two actions, and that's fine
|
|
|
+ gpr_atm initiator =
|
|
|
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
|
|
|
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
|
|
|
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
|
|
|
assert(cl->cb);
|
|
|
cl->error_data.scratch =
|
|
|
@@ -224,11 +253,6 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
|
|
|
gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
|
|
|
}
|
|
|
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
|
|
|
- if (last == 1) {
|
|
|
- // first element on this list: add it to the list of combiner locks
|
|
|
- // executing within this exec_ctx
|
|
|
- push_last_on_exec_ctx(exec_ctx, lock);
|
|
|
- }
|
|
|
GPR_TIMER_END("combiner.execute", 0);
|
|
|
}
|
|
|
|
|
|
@@ -278,18 +302,16 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- GRPC_COMBINER_TRACE(
|
|
|
- gpr_log(GPR_DEBUG,
|
|
|
- "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
|
|
|
- "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
|
|
|
- " exec_ctx_ready_to_finish=%d "
|
|
|
- "time_to_execute_final_list=%d",
|
|
|
- lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
|
|
|
- grpc_exec_ctx_ready_to_finish(exec_ctx),
|
|
|
- lock->time_to_execute_final_list));
|
|
|
-
|
|
|
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
|
|
|
- grpc_exec_ctx_ready_to_finish(exec_ctx)) {
|
|
|
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
|
|
|
+ "C:%p grpc_combiner_continue_exec_ctx "
|
|
|
+ "can_offload=" CAN_OFFLOAD_POLLER_FMT
|
|
|
+ " exec_ctx_ready_to_finish=%d "
|
|
|
+ "time_to_execute_final_list=%d",
|
|
|
+ lock, CAN_OFFLOAD_POLLER_ARGS(lock),
|
|
|
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
|
|
|
+ lock->time_to_execute_final_list));
|
|
|
+
|
|
|
+ if (can_offload(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) {
|
|
|
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
|
|
|
// this execution context wants to move on, and we have a workqueue (and
|
|
|
// so can help the execution context out): schedule remaining work to be
|
|
|
@@ -310,7 +332,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
|
|
|
// queue is in an inconsistent state: use this as a cue that we should
|
|
|
// go off and do something else for a while (and come back later)
|
|
|
GPR_TIMER_MARK("delay_busy", 0);
|
|
|
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
|
|
|
+ if (can_offload(lock)) {
|
|
|
queue_offload(exec_ctx, lock);
|
|
|
}
|
|
|
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
|