|  | @@ -201,56 +201,66 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
 | 
	
		
			
				|  |  |       .destroy = non_polling_poller_destroy},
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Queue that holds the cq_completion_events. This internally uses gpr_mpscq
 | 
	
		
			
				|  |  | - * queue (a lockfree multiproducer single consumer queue). However this queue
 | 
	
		
			
				|  |  | - * supports multiple consumers too. As such, it uses the queue_mu to serialize
 | 
	
		
			
				|  |  | - * consumer access (but no locks for producer access).
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - * Currently this is only used in completion queues whose completion_type is
 | 
	
		
			
				|  |  | - * GRPC_CQ_NEXT */
 | 
	
		
			
				|  |  | +typedef struct cq_vtable {
 | 
	
		
			
				|  |  | +  grpc_cq_completion_type cq_completion_type;
 | 
	
		
			
				|  |  | +  size_t (*size)();
 | 
	
		
			
				|  |  | +  void (*begin_op)(grpc_completion_queue *cc, void *tag);
 | 
	
		
			
				|  |  | +  void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  | +                 grpc_error *error,
 | 
	
		
			
				|  |  | +                 void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
 | 
	
		
			
				|  |  | +                              grpc_cq_completion *storage),
 | 
	
		
			
				|  |  | +                 void *done_arg, grpc_cq_completion *storage);
 | 
	
		
			
				|  |  | +  grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
 | 
	
		
			
				|  |  | +                     void *reserved);
 | 
	
		
			
				|  |  | +  grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  | +                      gpr_timespec deadline, void *reserved);
 | 
	
		
			
				|  |  | +} cq_vtable;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
 | 
	
		
			
				|  |  | + * (a lockfree multiproducer single consumer queue). It uses a queue_lock
 | 
	
		
			
				|  |  | + * to support multiple consumers.
 | 
	
		
			
				|  |  | + * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
 | 
	
		
			
				|  |  |  typedef struct grpc_cq_event_queue {
 | 
	
		
			
				|  |  | -  /* spinlock to serialize consumers i.e pop() operations */
 | 
	
		
			
				|  |  | +  /* Spinlock to serialize consumers i.e pop() operations */
 | 
	
		
			
				|  |  |    gpr_spinlock queue_lock;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_mpscq queue;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* A lazy counter indicating the number of items in the queue. This is NOT
 | 
	
		
			
				|  |  | -     atomically incremented/decrements along with push/pop operations and hence
 | 
	
		
			
				|  |  | -     only eventually consistent */
 | 
	
		
			
				|  |  | +  /* A lazy counter of number of items in the queue. This is NOT atomically
 | 
	
		
			
				|  |  | +     incremented/decremented along with push/pop operations and hence is only
 | 
	
		
			
				|  |  | +     eventually consistent */
 | 
	
		
			
				|  |  |    gpr_atm num_queue_items;
 | 
	
		
			
				|  |  |  } grpc_cq_event_queue;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Completion queue structure */
 | 
	
		
			
				|  |  | -struct grpc_completion_queue {
 | 
	
		
			
				|  |  | -  /** Owned by pollset */
 | 
	
		
			
				|  |  | +/* TODO: sreek Refactor this based on the completion_type. Put completion-type
 | 
	
		
			
				|  |  | + * specific data in a different structure (and co-allocate memory for it along
 | 
	
		
			
				|  |  | + * with completion queue + pollset )*/
 | 
	
		
			
				|  |  | +typedef struct cq_data {
 | 
	
		
			
				|  |  |    gpr_mu *mu;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_cq_completion_type completion_type;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  const cq_poller_vtable *poller_vtable;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /** completed events */
 | 
	
		
			
				|  |  | +  /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
 | 
	
		
			
				|  |  |    grpc_cq_completion completed_head;
 | 
	
		
			
				|  |  |    grpc_cq_completion *completed_tail;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /** Completed events for completion-queues of type GRPC_CQ_NEXT are stored in
 | 
	
		
			
				|  |  | -   * this queue */
 | 
	
		
			
				|  |  | +  /** Completed events for completion-queues of type GRPC_CQ_NEXT */
 | 
	
		
			
				|  |  |    grpc_cq_event_queue queue;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /** Number of pending events (+1 if we're not shutdown) */
 | 
	
		
			
				|  |  |    gpr_refcount pending_events;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /** Once owning_refs drops to zero, we will destroy the cq */
 | 
	
		
			
				|  |  |    gpr_refcount owning_refs;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /** Counter of how many things have ever been queued on this completion queue
 | 
	
		
			
				|  |  |        useful for avoiding locks to check the queue */
 | 
	
		
			
				|  |  |    gpr_atm things_queued_ever;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /** 0 initially, 1 once we've begun shutting down */
 | 
	
		
			
				|  |  |    gpr_atm shutdown;
 | 
	
		
			
				|  |  |    int shutdown_called;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    int is_server_cq;
 | 
	
		
			
				|  |  | -  /** Can the server cq accept incoming channels */
 | 
	
		
			
				|  |  | -  /* TODO: sreek - This will no longer be needed. Use polling_type set */
 | 
	
		
			
				|  |  | -  int is_non_listening_server_cq;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    int num_pluckers;
 | 
	
		
			
				|  |  |    plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
 | 
	
		
			
				|  |  |    grpc_closure pollset_shutdown_done;
 | 
	
	
		
			
				|  | @@ -260,8 +270,31 @@ struct grpc_completion_queue {
 | 
	
		
			
				|  |  |    size_t outstanding_tag_count;
 | 
	
		
			
				|  |  |    size_t outstanding_tag_capacity;
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  | +} cq_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  grpc_completion_queue *next_free;
 | 
	
		
			
				|  |  | +/* Completion queue structure */
 | 
	
		
			
				|  |  | +struct grpc_completion_queue {
 | 
	
		
			
				|  |  | +  cq_data data;
 | 
	
		
			
				|  |  | +  const cq_vtable *vtable;
 | 
	
		
			
				|  |  | +  const cq_poller_vtable *poller_vtable;
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/* Completion queue vtables based on the completion-type */
 | 
	
		
			
				|  |  | +static const cq_vtable g_cq_vtable[] = {
 | 
	
		
			
				|  |  | +    /* GRPC_CQ_NEXT */
 | 
	
		
			
				|  |  | +    {.cq_completion_type = GRPC_CQ_NEXT,
 | 
	
		
			
				|  |  | +     .size = grpc_cq_size,
 | 
	
		
			
				|  |  | +     .begin_op = grpc_cq_begin_op,
 | 
	
		
			
				|  |  | +     .end_op = grpc_cq_end_op_for_next,
 | 
	
		
			
				|  |  | +     .next = grpc_completion_queue_next,
 | 
	
		
			
				|  |  | +     .pluck = NULL},
 | 
	
		
			
				|  |  | +    /* GRPC_CQ_PLUCK */
 | 
	
		
			
				|  |  | +    {.cq_completion_type = GRPC_CQ_PLUCK,
 | 
	
		
			
				|  |  | +     .size = grpc_cq_size,
 | 
	
		
			
				|  |  | +     .begin_op = grpc_cq_begin_op,
 | 
	
		
			
				|  |  | +     .end_op = grpc_cq_end_op_for_pluck,
 | 
	
		
			
				|  |  | +     .next = NULL,
 | 
	
		
			
				|  |  | +     .pluck = grpc_completion_queue_pluck},
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
 | 
	
	
		
			
				|  | @@ -316,6 +349,12 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
 | 
	
		
			
				|  |  |    return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +size_t grpc_cq_size(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | +  /* Size of the completion queue and the size of the pollset whose memory is
 | 
	
		
			
				|  |  | +     allocated right after that of completion queue */
 | 
	
		
			
				|  |  | +  return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  grpc_completion_queue *grpc_completion_queue_create_internal(
 | 
	
		
			
				|  |  |      grpc_cq_completion_type completion_type,
 | 
	
		
			
				|  |  |      grpc_cq_polling_type polling_type) {
 | 
	
	
		
			
				|  | @@ -328,36 +367,39 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
 | 
	
		
			
				|  |  |        "polling_type=%d)",
 | 
	
		
			
				|  |  |        2, (completion_type, polling_type));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  const cq_vtable *vtable = &g_cq_vtable[completion_type];
 | 
	
		
			
				|  |  |    const cq_poller_vtable *poller_vtable =
 | 
	
		
			
				|  |  |        &g_poller_vtable_by_poller_type[polling_type];
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
 | 
	
		
			
				|  |  | -  poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
 | 
	
		
			
				|  |  | -#ifndef NDEBUG
 | 
	
		
			
				|  |  | -  cc->outstanding_tags = NULL;
 | 
	
		
			
				|  |  | -  cc->outstanding_tag_capacity = 0;
 | 
	
		
			
				|  |  | -#endif
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  cc->completion_type = completion_type;
 | 
	
		
			
				|  |  | +  cc->vtable = vtable;
 | 
	
		
			
				|  |  |    cc->poller_vtable = poller_vtable;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +#ifndef NDEBUG
 | 
	
		
			
				|  |  | +  cqd->outstanding_tags = NULL;
 | 
	
		
			
				|  |  | +  cqd->outstanding_tag_capacity = 0;
 | 
	
		
			
				|  |  | +#endif
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /* Initial ref is dropped by grpc_completion_queue_shutdown */
 | 
	
		
			
				|  |  | -  gpr_ref_init(&cc->pending_events, 1);
 | 
	
		
			
				|  |  | +  gpr_ref_init(&cqd->pending_events, 1);
 | 
	
		
			
				|  |  |    /* One for destroy(), one for pollset_shutdown */
 | 
	
		
			
				|  |  | -  gpr_ref_init(&cc->owning_refs, 2);
 | 
	
		
			
				|  |  | -  cc->completed_tail = &cc->completed_head;
 | 
	
		
			
				|  |  | -  cc->completed_head.next = (uintptr_t)cc->completed_tail;
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_store(&cc->shutdown, 0);
 | 
	
		
			
				|  |  | -  cc->shutdown_called = 0;
 | 
	
		
			
				|  |  | -  cc->is_server_cq = 0;
 | 
	
		
			
				|  |  | -  cc->is_non_listening_server_cq = 0;
 | 
	
		
			
				|  |  | -  cc->num_pluckers = 0;
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
 | 
	
		
			
				|  |  | +  gpr_ref_init(&cqd->owning_refs, 2);
 | 
	
		
			
				|  |  | +  cqd->completed_tail = &cqd->completed_head;
 | 
	
		
			
				|  |  | +  cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_store(&cqd->shutdown, 0);
 | 
	
		
			
				|  |  | +  cqd->shutdown_called = 0;
 | 
	
		
			
				|  |  | +  cqd->is_server_cq = 0;
 | 
	
		
			
				|  |  | +  cqd->num_pluckers = 0;
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  | -  cc->outstanding_tag_count = 0;
 | 
	
		
			
				|  |  | +  cqd->outstanding_tag_count = 0;
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  | -  cq_event_queue_init(&cc->queue);
 | 
	
		
			
				|  |  | -  grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
 | 
	
		
			
				|  |  | +  cq_event_queue_init(&cqd->queue);
 | 
	
		
			
				|  |  | +  grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
 | 
	
	
		
			
				|  | @@ -366,18 +408,19 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | -  return cc->completion_type;
 | 
	
		
			
				|  |  | +  return cc->vtable->cq_completion_type;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #ifdef GRPC_CQ_REF_COUNT_DEBUG
 | 
	
		
			
				|  |  |  void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
 | 
	
		
			
				|  |  |                            const char *file, int line) {
 | 
	
		
			
				|  |  |    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p   ref %d -> %d %s", cc,
 | 
	
		
			
				|  |  | -          (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
 | 
	
		
			
				|  |  | +          (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason);
 | 
	
		
			
				|  |  |  #else
 | 
	
		
			
				|  |  |  void grpc_cq_internal_ref(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  | -  gpr_ref(&cc->owning_refs);
 | 
	
		
			
				|  |  | +  gpr_ref(&cqd->owning_refs);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
	
		
			
				|  | @@ -389,57 +432,62 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |  #ifdef GRPC_CQ_REF_COUNT_DEBUG
 | 
	
		
			
				|  |  |  void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
 | 
	
		
			
				|  |  |                              const char *file, int line) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
 | 
	
		
			
				|  |  | -          (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
 | 
	
		
			
				|  |  | +          (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason);
 | 
	
		
			
				|  |  |  #else
 | 
	
		
			
				|  |  |  void grpc_cq_internal_unref(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  | -  if (gpr_unref(&cc->owning_refs)) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
 | 
	
		
			
				|  |  | +  if (gpr_unref(&cqd->owning_refs)) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
 | 
	
		
			
				|  |  |      cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc));
 | 
	
		
			
				|  |  | -    cq_event_queue_destroy(&cc->queue);
 | 
	
		
			
				|  |  | +    cq_event_queue_destroy(&cqd->queue);
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  | -    gpr_free(cc->outstanding_tags);
 | 
	
		
			
				|  |  | +    gpr_free(cqd->outstanding_tags);
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  |      gpr_free(cc);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!cc->shutdown_called);
 | 
	
		
			
				|  |  | -  if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
 | 
	
		
			
				|  |  | -    cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
 | 
	
		
			
				|  |  | -    cc->outstanding_tags =
 | 
	
		
			
				|  |  | -        gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) *
 | 
	
		
			
				|  |  | -                                              cc->outstanding_tag_capacity);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!cqd->shutdown_called);
 | 
	
		
			
				|  |  | +  if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
 | 
	
		
			
				|  |  | +    cqd->outstanding_tag_capacity =
 | 
	
		
			
				|  |  | +        GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
 | 
	
		
			
				|  |  | +    cqd->outstanding_tags =
 | 
	
		
			
				|  |  | +        gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
 | 
	
		
			
				|  |  | +                                               cqd->outstanding_tag_capacity);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +  cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |  #endif
 | 
	
		
			
				|  |  | -  gpr_ref(&cc->pending_events);
 | 
	
		
			
				|  |  | +  gpr_ref(&cqd->pending_events);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #ifndef NDEBUG
 | 
	
		
			
				|  |  |  void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |    int found = 0;
 | 
	
		
			
				|  |  |    if (lock_cq) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  for (int i = 0; i < (int)cc->outstanding_tag_count; i++) {
 | 
	
		
			
				|  |  | -    if (cc->outstanding_tags[i] == tag) {
 | 
	
		
			
				|  |  | -      cc->outstanding_tag_count--;
 | 
	
		
			
				|  |  | -      GPR_SWAP(void *, cc->outstanding_tags[i],
 | 
	
		
			
				|  |  | -               cc->outstanding_tags[cc->outstanding_tag_count]);
 | 
	
		
			
				|  |  | +  for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
 | 
	
		
			
				|  |  | +    if (cqd->outstanding_tags[i] == tag) {
 | 
	
		
			
				|  |  | +      cqd->outstanding_tag_count--;
 | 
	
		
			
				|  |  | +      GPR_SWAP(void *, cqd->outstanding_tags[i],
 | 
	
		
			
				|  |  | +               cqd->outstanding_tags[cqd->outstanding_tag_count]);
 | 
	
		
			
				|  |  |        found = 1;
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (lock_cq) {
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_ASSERT(found);
 | 
	
	
		
			
				|  | @@ -451,11 +499,28 @@ void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
 | 
	
		
			
				|  |  |  /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
 | 
	
		
			
				|  |  |   * type of GRPC_CQ_NEXT) */
 | 
	
		
			
				|  |  |  void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
 | 
	
		
			
				|  |  | -                             void *tag, int is_success,
 | 
	
		
			
				|  |  | +                             void *tag, grpc_error *error,
 | 
	
		
			
				|  |  |                               void (*done)(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                            void *done_arg,
 | 
	
		
			
				|  |  |                                            grpc_cq_completion *storage),
 | 
	
		
			
				|  |  |                               void *done_arg, grpc_cq_completion *storage) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_BEGIN("grpc_cq_end_op_for_next", 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (grpc_api_trace ||
 | 
	
		
			
				|  |  | +      (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | +    const char *errmsg = grpc_error_string(error);
 | 
	
		
			
				|  |  | +    GRPC_API_TRACE(
 | 
	
		
			
				|  |  | +        "grpc_cq_end_op_for_mext(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
 | 
	
		
			
				|  |  | +        "done=%p, done_arg=%p, storage=%p)",
 | 
	
		
			
				|  |  | +        7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
 | 
	
		
			
				|  |  | +    if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +  int is_success = (error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    storage->tag = tag;
 | 
	
		
			
				|  |  |    storage->done = done;
 | 
	
		
			
				|  |  |    storage->done_arg = done_arg;
 | 
	
	
		
			
				|  | @@ -464,15 +529,15 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |    check_tag_in_cq(cc, tag, true); /* Used in debug builds only */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* Add the completion to the queue */
 | 
	
		
			
				|  |  | -  cq_event_queue_push(&cc->queue, storage);
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
 | 
	
		
			
				|  |  | +  cq_event_queue_push(&cqd->queue, storage);
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  int shutdown = gpr_unref(&cc->pending_events);
 | 
	
		
			
				|  |  | +  int shutdown = gpr_unref(&cqd->pending_events);
 | 
	
		
			
				|  |  |    if (!shutdown) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (kick_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        const char *msg = grpc_error_string(kick_error);
 | 
	
	
		
			
				|  | @@ -481,47 +546,68 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(kick_error);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown));
 | 
	
		
			
				|  |  | -    GPR_ASSERT(cc->shutdown_called);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cqd->shutdown_called);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    gpr_atm_no_barrier_store(&cc->shutdown, 1);
 | 
	
		
			
				|  |  | +    gpr_atm_no_barrier_store(&cqd->shutdown, 1);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |      cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
 | 
	
		
			
				|  |  | -                                &cc->pollset_shutdown_done);
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +                                &cqd->pollset_shutdown_done);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_TIMER_END("grpc_cq_end_op_for_next", 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
 | 
	
		
			
				|  |  |   * type of GRPC_CQ_PLUCK) */
 | 
	
		
			
				|  |  |  void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  | -                              int is_success,
 | 
	
		
			
				|  |  | +                              grpc_error *error,
 | 
	
		
			
				|  |  |                                void (*done)(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                             void *done_arg,
 | 
	
		
			
				|  |  |                                             grpc_cq_completion *storage),
 | 
	
		
			
				|  |  |                                void *done_arg, grpc_cq_completion *storage) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +  int is_success = (error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_TIMER_BEGIN("grpc_cq_end_op_for_pluck", 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (grpc_api_trace ||
 | 
	
		
			
				|  |  | +      (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | +    const char *errmsg = grpc_error_string(error);
 | 
	
		
			
				|  |  | +    GRPC_API_TRACE(
 | 
	
		
			
				|  |  | +        "grpc_cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
 | 
	
		
			
				|  |  | +        "done=%p, done_arg=%p, storage=%p)",
 | 
	
		
			
				|  |  | +        7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
 | 
	
		
			
				|  |  | +    if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    storage->tag = tag;
 | 
	
		
			
				|  |  |    storage->done = done;
 | 
	
		
			
				|  |  |    storage->done_arg = done_arg;
 | 
	
		
			
				|  |  | -  storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(is_success));
 | 
	
		
			
				|  |  | +  storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |    check_tag_in_cq(cc, tag, false); /* Used in debug builds only */
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* Add to the list of completions */
 | 
	
		
			
				|  |  | -  gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
 | 
	
		
			
				|  |  | -  cc->completed_tail->next =
 | 
	
		
			
				|  |  | -      ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
 | 
	
		
			
				|  |  | -  cc->completed_tail = storage;
 | 
	
		
			
				|  |  | +  gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
 | 
	
		
			
				|  |  | +  cqd->completed_tail->next =
 | 
	
		
			
				|  |  | +      ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
 | 
	
		
			
				|  |  | +  cqd->completed_tail = storage;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  int shutdown = gpr_unref(&cc->pending_events);
 | 
	
		
			
				|  |  | +  int shutdown = gpr_unref(&cqd->pending_events);
 | 
	
		
			
				|  |  |    if (!shutdown) {
 | 
	
		
			
				|  |  |      grpc_pollset_worker *pluck_worker = NULL;
 | 
	
		
			
				|  |  | -    for (int i = 0; i < cc->num_pluckers; i++) {
 | 
	
		
			
				|  |  | -      if (cc->pluckers[i].tag == tag) {
 | 
	
		
			
				|  |  | -        pluck_worker = *cc->pluckers[i].worker;
 | 
	
		
			
				|  |  | +    for (int i = 0; i < cqd->num_pluckers; i++) {
 | 
	
		
			
				|  |  | +      if (cqd->pluckers[i].tag == tag) {
 | 
	
		
			
				|  |  | +        pluck_worker = *cqd->pluckers[i].worker;
 | 
	
		
			
				|  |  |          break;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -529,7 +615,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |      grpc_error *kick_error =
 | 
	
		
			
				|  |  |          cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (kick_error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        const char *msg = grpc_error_string(kick_error);
 | 
	
	
		
			
				|  | @@ -538,54 +624,25 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        GRPC_ERROR_UNREF(kick_error);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown));
 | 
	
		
			
				|  |  | -    GPR_ASSERT(cc->shutdown_called);
 | 
	
		
			
				|  |  | -    gpr_atm_no_barrier_store(&cc->shutdown, 1);
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cqd->shutdown_called);
 | 
	
		
			
				|  |  | +    gpr_atm_no_barrier_store(&cqd->shutdown, 1);
 | 
	
		
			
				|  |  |      cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
 | 
	
		
			
				|  |  | -                                &cc->pollset_shutdown_done);
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +                                &cqd->pollset_shutdown_done);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GPR_TIMER_END("grpc_cq_end_op_for_pluck", 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Signal the end of an operation - if this is the last waiting-to-be-queued
 | 
	
		
			
				|  |  | -   event, then enter shutdown mode */
 | 
	
		
			
				|  |  | -/* Queue a GRPC_OP_COMPLETED operation */
 | 
	
		
			
				|  |  |  void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |                      void *tag, grpc_error *error,
 | 
	
		
			
				|  |  |                      void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
 | 
	
		
			
				|  |  |                                   grpc_cq_completion *storage),
 | 
	
		
			
				|  |  |                      void *done_arg, grpc_cq_completion *storage) {
 | 
	
		
			
				|  |  | -  GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (grpc_api_trace ||
 | 
	
		
			
				|  |  | -      (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
 | 
	
		
			
				|  |  | -    const char *errmsg = grpc_error_string(error);
 | 
	
		
			
				|  |  | -    GRPC_API_TRACE(
 | 
	
		
			
				|  |  | -        "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
 | 
	
		
			
				|  |  | -        "done_arg=%p, storage=%p)",
 | 
	
		
			
				|  |  | -        7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
 | 
	
		
			
				|  |  | -    if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -      gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* Call the appropriate function to queue the completion based on the
 | 
	
		
			
				|  |  | -     completion queue type */
 | 
	
		
			
				|  |  | -  int is_success = (error == GRPC_ERROR_NONE);
 | 
	
		
			
				|  |  | -  if (cc->completion_type == GRPC_CQ_NEXT) {
 | 
	
		
			
				|  |  | -    grpc_cq_end_op_for_next(exec_ctx, cc, tag, is_success, done, done_arg,
 | 
	
		
			
				|  |  | -                            storage);
 | 
	
		
			
				|  |  | -  } else if (cc->completion_type == GRPC_CQ_PLUCK) {
 | 
	
		
			
				|  |  | -    grpc_cq_end_op_for_pluck(exec_ctx, cc, tag, is_success, done, done_arg,
 | 
	
		
			
				|  |  | -                             storage);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_ERROR, "Unexpected completion type %d", cc->completion_type);
 | 
	
		
			
				|  |  | -    abort();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  GPR_TIMER_END("grpc_cq_end_op", 0);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  GRPC_ERROR_UNREF(error);
 | 
	
		
			
				|  |  | +  cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct {
 | 
	
	
		
			
				|  | @@ -600,20 +657,21 @@ typedef struct {
 | 
	
		
			
				|  |  |  static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
 | 
	
		
			
				|  |  |    cq_is_finished_arg *a = arg;
 | 
	
		
			
				|  |  |    grpc_completion_queue *cq = a->cq;
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cq->data;
 | 
	
		
			
				|  |  |    GPR_ASSERT(a->stolen_completion == NULL);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    gpr_atm current_last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -      gpr_atm_no_barrier_load(&cq->things_queued_ever);
 | 
	
		
			
				|  |  | +      gpr_atm_no_barrier_load(&cqd->things_queued_ever);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
 | 
	
		
			
				|  |  |      a->last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -        gpr_atm_no_barrier_load(&cq->things_queued_ever);
 | 
	
		
			
				|  |  | +        gpr_atm_no_barrier_load(&cqd->things_queued_ever);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
 | 
	
		
			
				|  |  |       * might return NULL in some cases even if the queue is not empty; but that
 | 
	
		
			
				|  |  |       * is ok and doesn't affect correctness. Might effect the tail latencies a
 | 
	
		
			
				|  |  |       * bit) */
 | 
	
		
			
				|  |  | -    a->stolen_completion = cq_event_queue_pop(&cq->queue);
 | 
	
		
			
				|  |  | +    a->stolen_completion = cq_event_queue_pop(&cqd->queue);
 | 
	
		
			
				|  |  |      if (a->stolen_completion != NULL) {
 | 
	
		
			
				|  |  |        return true;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -626,16 +684,18 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
 | 
	
		
			
				|  |  |  static void dump_pending_tags(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  |    if (!grpc_trace_pending_tags) return;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    gpr_strvec v;
 | 
	
		
			
				|  |  |    gpr_strvec_init(&v);
 | 
	
		
			
				|  |  |    gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | -  for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
 | 
	
		
			
				|  |  | +  gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  | +  for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
 | 
	
		
			
				|  |  |      char *s;
 | 
	
		
			
				|  |  | -    gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
 | 
	
		
			
				|  |  | +    gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
 | 
	
		
			
				|  |  |      gpr_strvec_add(&v, s);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    char *out = gpr_strvec_flatten(&v, NULL);
 | 
	
		
			
				|  |  |    gpr_strvec_destroy(&v);
 | 
	
		
			
				|  |  |    gpr_log(GPR_DEBUG, "%s", out);
 | 
	
	
		
			
				|  | @@ -649,13 +709,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |                                        gpr_timespec deadline, void *reserved) {
 | 
	
		
			
				|  |  |    grpc_event ret;
 | 
	
		
			
				|  |  |    gpr_timespec now;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (cc->completion_type != GRPC_CQ_NEXT) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -            "grpc_completion_queue_next() cannot be called on this completion "
 | 
	
		
			
				|  |  | -            "queue since its completion type is not GRPC_CQ_NEXT");
 | 
	
		
			
				|  |  | -    abort();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -677,7 +731,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    cq_is_finished_arg is_finished_arg = {
 | 
	
		
			
				|  |  |        .last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -          gpr_atm_no_barrier_load(&cc->things_queued_ever),
 | 
	
		
			
				|  |  | +          gpr_atm_no_barrier_load(&cqd->things_queued_ever),
 | 
	
		
			
				|  |  |        .cq = cc,
 | 
	
		
			
				|  |  |        .deadline = deadline,
 | 
	
		
			
				|  |  |        .stolen_completion = NULL,
 | 
	
	
		
			
				|  | @@ -699,7 +753,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    grpc_cq_completion *c = cq_event_queue_pop(&cc->queue);
 | 
	
		
			
				|  |  | +    grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (c != NULL) {
 | 
	
		
			
				|  |  |        ret.type = GRPC_OP_COMPLETE;
 | 
	
	
		
			
				|  | @@ -713,16 +767,16 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |           so that the thread comes back quickly from poll to make a second
 | 
	
		
			
				|  |  |           attempt at popping. Not doing this can potentially deadlock this thread
 | 
	
		
			
				|  |  |           forever (if the deadline is infinity) */
 | 
	
		
			
				|  |  | -      if (cq_event_queue_num_items(&cc->queue) > 0) {
 | 
	
		
			
				|  |  | +      if (cq_event_queue_num_items(&cqd->queue) > 0) {
 | 
	
		
			
				|  |  |          iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    if (gpr_atm_no_barrier_load(&cc->shutdown)) {
 | 
	
		
			
				|  |  | +    if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
 | 
	
		
			
				|  |  |        /* Before returning, check if the queue has any items left over (since
 | 
	
		
			
				|  |  |           gpr_mpscq_pop() can sometimes return NULL even if the queue is not
 | 
	
		
			
				|  |  |           empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
 | 
	
		
			
				|  |  | -      if (cq_event_queue_num_items(&cc->queue) > 0) {
 | 
	
		
			
				|  |  | +      if (cq_event_queue_num_items(&cqd->queue) > 0) {
 | 
	
		
			
				|  |  |          /* Go to the beginning of the loop. No point doing a poll because
 | 
	
		
			
				|  |  |             (cc->shutdown == true) is only possible when there is no pending work
 | 
	
		
			
				|  |  |             (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
 | 
	
	
		
			
				|  | @@ -752,10 +806,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /* The main polling work happens in grpc_pollset_work */
 | 
	
		
			
				|  |  | -    gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |      grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
 | 
	
		
			
				|  |  |                                                NULL, now, iteration_deadline);
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      if (err != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |        const char *msg = grpc_error_string(err);
 | 
	
	
		
			
				|  | @@ -782,22 +836,23 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static int add_plucker(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |                         grpc_pollset_worker **worker) {
 | 
	
		
			
				|  |  | -  if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +  if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
 | 
	
		
			
				|  |  |      return 0;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  cc->pluckers[cc->num_pluckers].tag = tag;
 | 
	
		
			
				|  |  | -  cc->pluckers[cc->num_pluckers].worker = worker;
 | 
	
		
			
				|  |  | -  cc->num_pluckers++;
 | 
	
		
			
				|  |  | +  cqd->pluckers[cqd->num_pluckers].tag = tag;
 | 
	
		
			
				|  |  | +  cqd->pluckers[cqd->num_pluckers].worker = worker;
 | 
	
		
			
				|  |  | +  cqd->num_pluckers++;
 | 
	
		
			
				|  |  |    return 1;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  static void del_plucker(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |                          grpc_pollset_worker **worker) {
 | 
	
		
			
				|  |  | -  int i;
 | 
	
		
			
				|  |  | -  for (i = 0; i < cc->num_pluckers; i++) {
 | 
	
		
			
				|  |  | -    if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
 | 
	
		
			
				|  |  | -      cc->num_pluckers--;
 | 
	
		
			
				|  |  | -      GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +  for (int i = 0; i < cqd->num_pluckers; i++) {
 | 
	
		
			
				|  |  | +    if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
 | 
	
		
			
				|  |  | +      cqd->num_pluckers--;
 | 
	
		
			
				|  |  | +      GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
 | 
	
		
			
				|  |  |        return;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -807,29 +862,31 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |  static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
 | 
	
		
			
				|  |  |    cq_is_finished_arg *a = arg;
 | 
	
		
			
				|  |  |    grpc_completion_queue *cq = a->cq;
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cq->data;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    GPR_ASSERT(a->stolen_completion == NULL);
 | 
	
		
			
				|  |  |    gpr_atm current_last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -      gpr_atm_no_barrier_load(&cq->things_queued_ever);
 | 
	
		
			
				|  |  | +      gpr_atm_no_barrier_load(&cqd->things_queued_ever);
 | 
	
		
			
				|  |  |    if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
 | 
	
		
			
				|  |  | -    gpr_mu_lock(cq->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |      a->last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -        gpr_atm_no_barrier_load(&cq->things_queued_ever);
 | 
	
		
			
				|  |  | +        gpr_atm_no_barrier_load(&cqd->things_queued_ever);
 | 
	
		
			
				|  |  |      grpc_cq_completion *c;
 | 
	
		
			
				|  |  | -    grpc_cq_completion *prev = &cq->completed_head;
 | 
	
		
			
				|  |  | +    grpc_cq_completion *prev = &cqd->completed_head;
 | 
	
		
			
				|  |  |      while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
 | 
	
		
			
				|  |  | -           &cq->completed_head) {
 | 
	
		
			
				|  |  | +           &cqd->completed_head) {
 | 
	
		
			
				|  |  |        if (c->tag == a->tag) {
 | 
	
		
			
				|  |  |          prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
 | 
	
		
			
				|  |  | -        if (c == cq->completed_tail) {
 | 
	
		
			
				|  |  | -          cq->completed_tail = prev;
 | 
	
		
			
				|  |  | +        if (c == cqd->completed_tail) {
 | 
	
		
			
				|  |  | +          cqd->completed_tail = prev;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(cq->mu);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |          a->stolen_completion = c;
 | 
	
		
			
				|  |  |          return true;
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        prev = c;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cq->mu);
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    return !a->first_loop &&
 | 
	
		
			
				|  |  |           gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
 | 
	
	
		
			
				|  | @@ -842,16 +899,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |    grpc_cq_completion *prev;
 | 
	
		
			
				|  |  |    grpc_pollset_worker *worker = NULL;
 | 
	
		
			
				|  |  |    gpr_timespec now;
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (cc->completion_type != GRPC_CQ_PLUCK) {
 | 
	
		
			
				|  |  | -    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | -            "grpc_completion_queue_pluck() cannot be called on this completion "
 | 
	
		
			
				|  |  | -            "queue since its completion type is not GRPC_CQ_PLUCK");
 | 
	
		
			
				|  |  | -    abort();
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    if (grpc_cq_pluck_trace) {
 | 
	
		
			
				|  |  |      GRPC_API_TRACE(
 | 
	
		
			
				|  |  |          "grpc_completion_queue_pluck("
 | 
	
	
		
			
				|  | @@ -869,10 +920,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |    deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CQ_INTERNAL_REF(cc, "pluck");
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |    cq_is_finished_arg is_finished_arg = {
 | 
	
		
			
				|  |  |        .last_seen_things_queued_ever =
 | 
	
		
			
				|  |  | -          gpr_atm_no_barrier_load(&cc->things_queued_ever),
 | 
	
		
			
				|  |  | +          gpr_atm_no_barrier_load(&cqd->things_queued_ever),
 | 
	
		
			
				|  |  |        .cq = cc,
 | 
	
		
			
				|  |  |        .deadline = deadline,
 | 
	
		
			
				|  |  |        .stolen_completion = NULL,
 | 
	
	
		
			
				|  | @@ -882,7 +933,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |        GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
 | 
	
		
			
				|  |  |    for (;;) {
 | 
	
		
			
				|  |  |      if (is_finished_arg.stolen_completion != NULL) {
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |        c = is_finished_arg.stolen_completion;
 | 
	
		
			
				|  |  |        is_finished_arg.stolen_completion = NULL;
 | 
	
		
			
				|  |  |        ret.type = GRPC_OP_COMPLETE;
 | 
	
	
		
			
				|  | @@ -891,15 +942,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |        c->done(&exec_ctx, c->done_arg, c);
 | 
	
		
			
				|  |  |        break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    prev = &cc->completed_head;
 | 
	
		
			
				|  |  | +    prev = &cqd->completed_head;
 | 
	
		
			
				|  |  |      while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
 | 
	
		
			
				|  |  | -           &cc->completed_head) {
 | 
	
		
			
				|  |  | +           &cqd->completed_head) {
 | 
	
		
			
				|  |  |        if (c->tag == tag) {
 | 
	
		
			
				|  |  |          prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
 | 
	
		
			
				|  |  | -        if (c == cc->completed_tail) {
 | 
	
		
			
				|  |  | -          cc->completed_tail = prev;
 | 
	
		
			
				|  |  | +        if (c == cqd->completed_tail) {
 | 
	
		
			
				|  |  | +          cqd->completed_tail = prev;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |          ret.type = GRPC_OP_COMPLETE;
 | 
	
		
			
				|  |  |          ret.success = c->next & 1u;
 | 
	
		
			
				|  |  |          ret.tag = c->tag;
 | 
	
	
		
			
				|  | @@ -908,8 +959,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  |        prev = c;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (cc->shutdown) {
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +    if (cqd->shutdown) {
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |        memset(&ret, 0, sizeof(ret));
 | 
	
		
			
				|  |  |        ret.type = GRPC_QUEUE_SHUTDOWN;
 | 
	
		
			
				|  |  |        break;
 | 
	
	
		
			
				|  | @@ -919,7 +970,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |                "Too many outstanding grpc_completion_queue_pluck calls: maximum "
 | 
	
		
			
				|  |  |                "is %d",
 | 
	
		
			
				|  |  |                GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |        memset(&ret, 0, sizeof(ret));
 | 
	
		
			
				|  |  |        /* TODO(ctiller): should we use a different result here */
 | 
	
		
			
				|  |  |        ret.type = GRPC_QUEUE_TIMEOUT;
 | 
	
	
		
			
				|  | @@ -929,7 +980,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |      now = gpr_now(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |      if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
 | 
	
		
			
				|  |  |        del_plucker(cc, tag, &worker);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |        memset(&ret, 0, sizeof(ret));
 | 
	
		
			
				|  |  |        ret.type = GRPC_QUEUE_TIMEOUT;
 | 
	
		
			
				|  |  |        dump_pending_tags(cc);
 | 
	
	
		
			
				|  | @@ -942,15 +993,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
 | 
	
		
			
				|  |  |      gpr_timespec iteration_deadline = deadline;
 | 
	
		
			
				|  |  |      if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
 | 
	
		
			
				|  |  |        GPR_TIMER_MARK("alarm_triggered", 0);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +      gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |        grpc_exec_ctx_flush(&exec_ctx);
 | 
	
		
			
				|  |  | -      gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | +      gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        grpc_error *err = cc->poller_vtable->work(
 | 
	
		
			
				|  |  |            &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
 | 
	
		
			
				|  |  |        if (err != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |          del_plucker(cc, tag, &worker);
 | 
	
		
			
				|  |  | -        gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |          const char *msg = grpc_error_string(err);
 | 
	
		
			
				|  |  |          gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -981,20 +1032,22 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  |    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
 | 
	
		
			
				|  |  |    GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
 | 
	
		
			
				|  |  | -  gpr_mu_lock(cc->mu);
 | 
	
		
			
				|  |  | -  if (cc->shutdown_called) {
 | 
	
		
			
				|  |  | -    gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +  cq_data *cqd = &cc->data;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  gpr_mu_lock(cqd->mu);
 | 
	
		
			
				|  |  | +  if (cqd->shutdown_called) {
 | 
	
		
			
				|  |  | +    gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |      GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  cc->shutdown_called = 1;
 | 
	
		
			
				|  |  | -  if (gpr_unref(&cc->pending_events)) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(!cc->shutdown);
 | 
	
		
			
				|  |  | -    cc->shutdown = 1;
 | 
	
		
			
				|  |  | +  cqd->shutdown_called = 1;
 | 
	
		
			
				|  |  | +  if (gpr_unref(&cqd->pending_events)) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(!cqd->shutdown);
 | 
	
		
			
				|  |  | +    cqd->shutdown = 1;
 | 
	
		
			
				|  |  |      cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
 | 
	
		
			
				|  |  | -                                &cc->pollset_shutdown_done);
 | 
	
		
			
				|  |  | +                                &cqd->pollset_shutdown_done);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(cc->mu);
 | 
	
		
			
				|  |  | +  gpr_mu_unlock(cqd->mu);
 | 
	
		
			
				|  |  |    grpc_exec_ctx_finish(&exec_ctx);
 | 
	
		
			
				|  |  |    GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -1004,8 +1057,10 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
 | 
	
		
			
				|  |  |    grpc_completion_queue_shutdown(cc);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (cc->completion_type == GRPC_CQ_NEXT) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(cq_event_queue_num_items(&cc->queue) == 0);
 | 
	
		
			
				|  |  | +  /* TODO (sreek): This should not ideally be here. Refactor it into the
 | 
	
		
			
				|  |  | +   * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
 | 
	
		
			
				|  |  | +  if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
 | 
	
	
		
			
				|  | @@ -1020,22 +1075,12 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
 | 
	
		
			
				|  |  |    return CQ_FROM_POLLSET(ps);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | -  /* TODO: sreek - use cc->polling_type field here and add a validation check
 | 
	
		
			
				|  |  | -     (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
 | 
	
		
			
				|  |  | -     polling_type is set to GRPC_CQ_NON_LISTENING */
 | 
	
		
			
				|  |  | -  cc->is_non_listening_server_cq = 1;
 | 
	
		
			
				|  |  | +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | +  cc->data.is_server_cq = 1;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | -  /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
 | 
	
		
			
				|  |  | -  return (cc->is_non_listening_server_cq == 1);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
 | 
	
		
			
				|  |  | -  return cc->is_server_cq;
 | 
	
		
			
				|  |  | +  return cc->data.is_server_cq;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  bool grpc_cq_can_listen(grpc_completion_queue *cc) {
 |