|  | @@ -40,6 +40,7 @@
 | 
	
		
			
				|  |  |  #include <grpc/grpc_security.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/alloc.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/log.h>
 | 
	
		
			
				|  |  | +#include <grpc/support/time.h>
 | 
	
		
			
				|  |  |  #include "rb_grpc.h"
 | 
	
		
			
				|  |  |  #include "rb_call.h"
 | 
	
		
			
				|  |  |  #include "rb_channel_args.h"
 | 
	
	
		
			
				|  | @@ -71,6 +72,7 @@ typedef struct grpc_rb_channel {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* The actual channel */
 | 
	
		
			
				|  |  |    grpc_channel *wrapped;
 | 
	
		
			
				|  |  | +  grpc_completion_queue *queue;
 | 
	
		
			
				|  |  |  } grpc_rb_channel;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Destroys Channel instances. */
 | 
	
	
		
			
				|  | @@ -83,6 +85,7 @@ static void grpc_rb_channel_free(void *p) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (ch->wrapped != NULL) {
 | 
	
		
			
				|  |  |      grpc_channel_destroy(ch->wrapped);
 | 
	
		
			
				|  |  | +    grpc_rb_completion_queue_destroy(ch->queue);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    xfree(p);
 | 
	
	
		
			
				|  | @@ -165,6 +168,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    rb_ivar_set(self, id_target, target);
 | 
	
		
			
				|  |  |    wrapper->wrapped = ch;
 | 
	
		
			
				|  |  | +  wrapper->queue = grpc_completion_queue_create(NULL);
 | 
	
		
			
				|  |  |    return self;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -203,16 +207,18 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
 | 
	
		
			
				|  |  |     the completion queue with success=0 */
 | 
	
		
			
				|  |  |  static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
 | 
	
		
			
				|  |  |                                                        VALUE last_state,
 | 
	
		
			
				|  |  | -                                                      VALUE cqueue,
 | 
	
		
			
				|  |  | -                                                      VALUE deadline,
 | 
	
		
			
				|  |  | -                                                      VALUE tag) {
 | 
	
		
			
				|  |  | +                                                      VALUE deadline) {
 | 
	
		
			
				|  |  |    grpc_rb_channel *wrapper = NULL;
 | 
	
		
			
				|  |  |    grpc_channel *ch = NULL;
 | 
	
		
			
				|  |  |    grpc_completion_queue *cq = NULL;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  cq = grpc_rb_get_wrapped_completion_queue(cqueue);
 | 
	
		
			
				|  |  | +  void *tag = wrapper;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_event event;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
 | 
	
		
			
				|  |  |    ch = wrapper->wrapped;
 | 
	
		
			
				|  |  | +  cq = wrapper->queue;
 | 
	
		
			
				|  |  |    if (ch == NULL) {
 | 
	
		
			
				|  |  |      rb_raise(rb_eRuntimeError, "closed!");
 | 
	
		
			
				|  |  |      return Qnil;
 | 
	
	
		
			
				|  | @@ -222,9 +228,16 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
 | 
	
		
			
				|  |  |        (grpc_connectivity_state)NUM2LONG(last_state),
 | 
	
		
			
				|  |  |        grpc_rb_time_timeval(deadline, /* absolute time */ 0),
 | 
	
		
			
				|  |  |        cq,
 | 
	
		
			
				|  |  | -      ROBJECT(tag));
 | 
	
		
			
				|  |  | +      tag);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  return Qnil;
 | 
	
		
			
				|  |  | +  event = rb_completion_queue_pluck(cq, tag,
 | 
	
		
			
				|  |  | +                                    gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (event.success) {
 | 
	
		
			
				|  |  | +    return Qtrue;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    return Qfalse;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /* Create a call given a grpc_channel, in order to call method. The request
 |