|
|
@@ -59,6 +59,11 @@ struct grpc_combiner {
|
|
|
grpc_closure_scheduler scheduler;
|
|
|
grpc_closure_scheduler finally_scheduler;
|
|
|
gpr_mpscq queue;
|
|
|
+ // either:
|
|
|
+ // a pointer to the initiating exec ctx if that is the only exec_ctx that has
|
|
|
+ // ever queued to this combiner, or NULL. If this is non-null, it's not
|
|
|
+ // dereferencable (since the initiating exec_ctx may have gone out of scope)
|
|
|
+ gpr_atm initiating_exec_ctx_or_null;
|
|
|
// state is:
|
|
|
// lower bit - zero if orphaned (STATE_UNORPHANED)
|
|
|
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
|
|
|
@@ -168,15 +173,25 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
|
|
|
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
|
|
|
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
|
|
|
lock, cl, last));
|
|
|
- GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
|
|
|
- assert(cl->cb);
|
|
|
- cl->error_data.error = error;
|
|
|
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
|
|
|
if (last == 1) {
|
|
|
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
|
|
|
+ (gpr_atm)exec_ctx);
|
|
|
// first element on this list: add it to the list of combiner locks
|
|
|
// executing within this exec_ctx
|
|
|
push_last_on_exec_ctx(exec_ctx, lock);
|
|
|
+ } else {
|
|
|
+ // there may be a race with setting here: if that happens, we may delay
|
|
|
+ // offload for one or two actions, and that's fine
|
|
|
+ gpr_atm initiator =
|
|
|
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
|
|
|
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
|
|
|
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
|
|
|
+ }
|
|
|
}
|
|
|
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
|
|
|
+ assert(cl->cb);
|
|
|
+ cl->error_data.error = error;
|
|
|
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
|
|
|
GPR_TIMER_END("combiner.execute", 0);
|
|
|
}
|
|
|
|