|  | @@ -26,23 +26,43 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/debug/stats.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/profiling/timers.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
 | 
	
		
			
				|  |  | +namespace grpc_core {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
 | 
	
		
			
				|  |  | +TraceFlag grpc_call_combiner_trace(false, "call_combiner");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +namespace {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +grpc_error* DecodeCancelStateError(gpr_atm cancel_state) {
 | 
	
		
			
				|  |  |    if (cancel_state & 1) {
 | 
	
		
			
				|  |  |      return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static gpr_atm encode_cancel_state_error(grpc_error* error) {
 | 
	
		
			
				|  |  | +gpr_atm EncodeCancelStateError(grpc_error* error) {
 | 
	
		
			
				|  |  |    return static_cast<gpr_atm>(1) | (gpr_atm)error;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +}  // namespace
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +CallCombiner::CallCombiner() {
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_store(&cancel_state_, 0);
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_store(&size_, 0);
 | 
	
		
			
				|  |  | +  gpr_mpscq_init(&queue_);
 | 
	
		
			
				|  |  | +#ifdef GRPC_TSAN_ENABLED
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
 | 
	
		
			
				|  |  | +                    grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +#endif
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +CallCombiner::~CallCombiner() {
 | 
	
		
			
				|  |  | +  gpr_mpscq_destroy(&queue_);
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  #ifdef GRPC_TSAN_ENABLED
 | 
	
		
			
				|  |  | -static void tsan_closure(void* user_data, grpc_error* error) {
 | 
	
		
			
				|  |  | -  grpc_call_combiner* call_combiner =
 | 
	
		
			
				|  |  | -      static_cast<grpc_call_combiner*>(user_data);
 | 
	
		
			
				|  |  | +void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
 | 
	
		
			
				|  |  | +  CallCombiner* self = static_cast<CallCombiner*>(arg);
 | 
	
		
			
				|  |  |    // We ref-count the lock, and check if it's already taken.
 | 
	
		
			
				|  |  |    // If it was taken, we should do nothing. Otherwise, we will mark it as
 | 
	
		
			
				|  |  |    // locked. Note that if two different threads try to do this, only one of
 | 
	
	
		
			
				|  | @@ -51,18 +71,18 @@ static void tsan_closure(void* user_data, grpc_error* error) {
 | 
	
		
			
				|  |  |    // TSAN will correctly produce an error.
 | 
	
		
			
				|  |  |    //
 | 
	
		
			
				|  |  |    // TODO(soheil): This only covers the callbacks scheduled by
 | 
	
		
			
				|  |  | -  //               grpc_call_combiner_(start|finish). If in the future, a
 | 
	
		
			
				|  |  | -  //               callback gets scheduled using other mechanisms, we will need
 | 
	
		
			
				|  |  | -  //               to add APIs to externally lock call combiners.
 | 
	
		
			
				|  |  | -  grpc_core::RefCountedPtr<grpc_call_combiner::TsanLock> lock =
 | 
	
		
			
				|  |  | -      call_combiner->tsan_lock;
 | 
	
		
			
				|  |  | +  //               CallCombiner::Start() and CallCombiner::Stop().
 | 
	
		
			
				|  |  | +  //               If in the future, a callback gets scheduled using other
 | 
	
		
			
				|  |  | +  //               mechanisms, we will need to add APIs to externally lock
 | 
	
		
			
				|  |  | +  //               call combiners.
 | 
	
		
			
				|  |  | +  RefCountedPtr<TsanLock> lock = self->tsan_lock_;
 | 
	
		
			
				|  |  |    bool prev = false;
 | 
	
		
			
				|  |  |    if (lock->taken.compare_exchange_strong(prev, true)) {
 | 
	
		
			
				|  |  |      TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      lock.reset();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_RUN(self->original_closure_, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    if (lock != nullptr) {
 | 
	
		
			
				|  |  |      TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
 | 
	
		
			
				|  |  |      bool prev = true;
 | 
	
	
		
			
				|  | @@ -71,34 +91,17 @@ static void tsan_closure(void* user_data, grpc_error* error) {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void call_combiner_sched_closure(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  | -                                        grpc_closure* closure,
 | 
	
		
			
				|  |  | -                                        grpc_error* error) {
 | 
	
		
			
				|  |  | +void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
 | 
	
		
			
				|  |  |  #ifdef GRPC_TSAN_ENABLED
 | 
	
		
			
				|  |  | -  call_combiner->original_closure = closure;
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error);
 | 
	
		
			
				|  |  | +  original_closure_ = closure;
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_SCHED(&tsan_closure_, error);
 | 
	
		
			
				|  |  |  #else
 | 
	
		
			
				|  |  |    GRPC_CLOSURE_SCHED(closure, error);
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0);
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_store(&call_combiner->size, 0);
 | 
	
		
			
				|  |  | -  gpr_mpscq_init(&call_combiner->queue);
 | 
	
		
			
				|  |  | -#ifdef GRPC_TSAN_ENABLED
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner,
 | 
	
		
			
				|  |  | -                    grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -#endif
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
 | 
	
		
			
				|  |  | -  gpr_mpscq_destroy(&call_combiner->queue);
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  | -#define DEBUG_ARGS , const char *file, int line
 | 
	
		
			
				|  |  | +#define DEBUG_ARGS const char *file, int line,
 | 
	
		
			
				|  |  |  #define DEBUG_FMT_STR "%s:%d: "
 | 
	
		
			
				|  |  |  #define DEBUG_FMT_ARGS , file, line
 | 
	
		
			
				|  |  |  #else
 | 
	
	
		
			
				|  | @@ -107,20 +110,17 @@ void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
 | 
	
		
			
				|  |  |  #define DEBUG_FMT_ARGS
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  | -                              grpc_closure* closure,
 | 
	
		
			
				|  |  | -                              grpc_error* error DEBUG_ARGS,
 | 
	
		
			
				|  |  | -                              const char* reason) {
 | 
	
		
			
				|  |  | -  GPR_TIMER_SCOPE("call_combiner_start", 0);
 | 
	
		
			
				|  |  | +void CallCombiner::Start(grpc_closure* closure, grpc_error* error,
 | 
	
		
			
				|  |  | +                         DEBUG_ARGS const char* reason) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_SCOPE("CallCombiner::Start", 0);
 | 
	
		
			
				|  |  |    if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -            "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
 | 
	
		
			
				|  |  | +            "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
 | 
	
		
			
				|  |  |              "%s] error=%s",
 | 
	
		
			
				|  |  | -            call_combiner, closure DEBUG_FMT_ARGS, reason,
 | 
	
		
			
				|  |  | -            grpc_error_string(error));
 | 
	
		
			
				|  |  | +            this, closure DEBUG_FMT_ARGS, reason, grpc_error_string(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  size_t prev_size = static_cast<size_t>(
 | 
	
		
			
				|  |  | -      gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
 | 
	
		
			
				|  |  | +  size_t prev_size =
 | 
	
		
			
				|  |  | +      static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
 | 
	
		
			
				|  |  |    if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
 | 
	
		
			
				|  |  |              prev_size + 1);
 | 
	
	
		
			
				|  | @@ -128,34 +128,30 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  |    GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
 | 
	
		
			
				|  |  |    if (prev_size == 0) {
 | 
	
		
			
				|  |  |      GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      GPR_TIMER_MARK("call_combiner_initiate", 0);
 | 
	
		
			
				|  |  |      if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Queue was empty, so execute this closure immediately.
 | 
	
		
			
				|  |  | -    call_combiner_sched_closure(call_combiner, closure, error);
 | 
	
		
			
				|  |  | +    ScheduleClosure(closure, error);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |        gpr_log(GPR_INFO, "  QUEUING");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      // Queue was not empty, so add closure to queue.
 | 
	
		
			
				|  |  |      closure->error_data.error = error;
 | 
	
		
			
				|  |  | -    gpr_mpscq_push(&call_combiner->queue,
 | 
	
		
			
				|  |  | -                   reinterpret_cast<gpr_mpscq_node*>(closure));
 | 
	
		
			
				|  |  | +    gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(closure));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
 | 
	
		
			
				|  |  | -                             const char* reason) {
 | 
	
		
			
				|  |  | -  GPR_TIMER_SCOPE("call_combiner_stop", 0);
 | 
	
		
			
				|  |  | +void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
 | 
	
		
			
				|  |  |    if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -            "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
 | 
	
		
			
				|  |  | -            call_combiner DEBUG_FMT_ARGS, reason);
 | 
	
		
			
				|  |  | +    gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
 | 
	
		
			
				|  |  | +            this DEBUG_FMT_ARGS, reason);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  size_t prev_size = static_cast<size_t>(
 | 
	
		
			
				|  |  | -      gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
 | 
	
		
			
				|  |  | +  size_t prev_size =
 | 
	
		
			
				|  |  | +      static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
 | 
	
		
			
				|  |  |    if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |      gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
 | 
	
		
			
				|  |  |              prev_size - 1);
 | 
	
	
		
			
				|  | @@ -168,10 +164,10 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        bool empty;
 | 
	
		
			
				|  |  |        grpc_closure* closure = reinterpret_cast<grpc_closure*>(
 | 
	
		
			
				|  |  | -          gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
 | 
	
		
			
				|  |  | +          gpr_mpscq_pop_and_check_end(&queue_, &empty));
 | 
	
		
			
				|  |  |        if (closure == nullptr) {
 | 
	
		
			
				|  |  |          // This can happen either due to a race condition within the mpscq
 | 
	
		
			
				|  |  | -        // code or because of a race with grpc_call_combiner_start().
 | 
	
		
			
				|  |  | +        // code or because of a race with Start().
 | 
	
		
			
				|  |  |          if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |            gpr_log(GPR_INFO, "  queue returned no result; checking again");
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -181,8 +177,7 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
 | 
	
		
			
				|  |  |          gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
 | 
	
		
			
				|  |  |                  closure, grpc_error_string(closure->error_data.error));
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -      call_combiner_sched_closure(call_combiner, closure,
 | 
	
		
			
				|  |  | -                                  closure->error_data.error);
 | 
	
		
			
				|  |  | +      ScheduleClosure(closure, closure->error_data.error);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else if (grpc_call_combiner_trace.enabled()) {
 | 
	
	
		
			
				|  | @@ -190,13 +185,12 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  | -                                             grpc_closure* closure) {
 | 
	
		
			
				|  |  | +void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
 | 
	
		
			
				|  |  |    GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
 | 
	
		
			
				|  |  |    while (true) {
 | 
	
		
			
				|  |  |      // Decode original state.
 | 
	
		
			
				|  |  | -    gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
 | 
	
		
			
				|  |  | -    grpc_error* original_error = decode_cancel_state_error(original_state);
 | 
	
		
			
				|  |  | +    gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
 | 
	
		
			
				|  |  | +    grpc_error* original_error = DecodeCancelStateError(original_state);
 | 
	
		
			
				|  |  |      // If error is set, invoke the cancellation closure immediately.
 | 
	
		
			
				|  |  |      // Otherwise, store the new closure.
 | 
	
		
			
				|  |  |      if (original_error != GRPC_ERROR_NONE) {
 | 
	
	
		
			
				|  | @@ -204,16 +198,15 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  |          gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                  "call_combiner=%p: scheduling notify_on_cancel callback=%p "
 | 
	
		
			
				|  |  |                  "for pre-existing cancellation",
 | 
	
		
			
				|  |  | -                call_combiner, closure);
 | 
	
		
			
				|  |  | +                this, closure);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  | -      if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
 | 
	
		
			
				|  |  | -                           (gpr_atm)closure)) {
 | 
	
		
			
				|  |  | +      if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) {
 | 
	
		
			
				|  |  |          if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |            gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
 | 
	
		
			
				|  |  | -                  call_combiner, closure);
 | 
	
		
			
				|  |  | +                  this, closure);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          // If we replaced an earlier closure, invoke the original
 | 
	
		
			
				|  |  |          // closure with GRPC_ERROR_NONE.  This allows callers to clean
 | 
	
	
		
			
				|  | @@ -222,8 +215,8 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  |            closure = (grpc_closure*)original_state;
 | 
	
		
			
				|  |  |            if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |              gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  | -                    "call_combiner=%p: scheduling old cancel callback=%p",
 | 
	
		
			
				|  |  | -                    call_combiner, closure);
 | 
	
		
			
				|  |  | +                    "call_combiner=%p: scheduling old cancel callback=%p", this,
 | 
	
		
			
				|  |  | +                    closure);
 | 
	
		
			
				|  |  |            }
 | 
	
		
			
				|  |  |            GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -234,24 +227,23 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  | -                               grpc_error* error) {
 | 
	
		
			
				|  |  | +void CallCombiner::Cancel(grpc_error* error) {
 | 
	
		
			
				|  |  |    GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
 | 
	
		
			
				|  |  |    while (true) {
 | 
	
		
			
				|  |  | -    gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
 | 
	
		
			
				|  |  | -    grpc_error* original_error = decode_cancel_state_error(original_state);
 | 
	
		
			
				|  |  | +    gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
 | 
	
		
			
				|  |  | +    grpc_error* original_error = DecodeCancelStateError(original_state);
 | 
	
		
			
				|  |  |      if (original_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
 | 
	
		
			
				|  |  | -                         encode_cancel_state_error(error))) {
 | 
	
		
			
				|  |  | +    if (gpr_atm_full_cas(&cancel_state_, original_state,
 | 
	
		
			
				|  |  | +                         EncodeCancelStateError(error))) {
 | 
	
		
			
				|  |  |        if (original_state != 0) {
 | 
	
		
			
				|  |  |          grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
 | 
	
		
			
				|  |  |          if (grpc_call_combiner_trace.enabled()) {
 | 
	
		
			
				|  |  |            gpr_log(GPR_INFO,
 | 
	
		
			
				|  |  |                    "call_combiner=%p: scheduling notify_on_cancel callback=%p",
 | 
	
		
			
				|  |  | -                  call_combiner, notify_on_cancel);
 | 
	
		
			
				|  |  | +                  this, notify_on_cancel);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |        }
 | 
	
	
		
			
				|  | @@ -260,3 +252,5 @@ void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
 | 
	
		
			
				|  |  |      // cas failed, try again.
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}  // namespace grpc_core
 |