|  | @@ -38,11 +38,25 @@ cdef class BaseCompletionQueue:
 | 
	
		
			
				|  |  |          return self._cq
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +cdef class _EventLoopBound:
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __cinit__(self, object loop, object read_socket, object handler):
 | 
	
		
			
				|  |  | +        self.loop = loop
 | 
	
		
			
				|  |  | +        self.read_socket = read_socket
 | 
	
		
			
				|  |  | +        reader_function = functools.partial(
 | 
	
		
			
				|  |  | +            handler,
 | 
	
		
			
				|  |  | +            loop
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +        self.loop.add_reader(self.read_socket, reader_function)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def close(self):
 | 
	
		
			
				|  |  | +        if self.loop:
 | 
	
		
			
				|  |  | +            self.loop.remove_reader(self.read_socket)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  cdef class PollerCompletionQueue(BaseCompletionQueue):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __cinit__(self):
 | 
	
		
			
				|  |  | -        
 | 
	
		
			
				|  |  | -        self._loop = get_working_loop()
 | 
	
		
			
				|  |  |          self._cq = grpc_completion_queue_create_for_next(NULL)
 | 
	
		
			
				|  |  |          self._shutdown = False
 | 
	
		
			
				|  |  |          self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
 | 
	
	
		
			
				|  | @@ -50,10 +64,21 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self._read_socket, self._write_socket = socket.socketpair()
 | 
	
		
			
				|  |  |          self._write_fd = self._write_socket.fileno()
 | 
	
		
			
				|  |  | -        self._loop.add_reader(self._read_socket, self._handle_events)
 | 
	
		
			
				|  |  | +        self._loops = {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # The read socket might be read by multiple threads. But only one of them will
 | 
	
		
			
				|  |  | +        # read the 1 byte sent by the poller thread. This setting is essential to allow
 | 
	
		
			
				|  |  | +        # multiple loops in multiple threads bound to the same poller.
 | 
	
		
			
				|  |  | +        self._read_socket.setblocking(False)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self._queue = cpp_event_queue()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def bound_loop(self, object loop):
 | 
	
		
			
				|  |  | +        if loop in self._loops:
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            self._loops[loop] = _EventLoopBound(loop, self._read_socket, self._handle_events)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      cdef void _poll(self) nogil:
 | 
	
		
			
				|  |  |          cdef grpc_event event
 | 
	
		
			
				|  |  |          cdef CallbackContext *context
 | 
	
	
		
			
				|  | @@ -79,14 +104,21 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
 | 
	
		
			
				|  |  |              self._poll()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      cdef shutdown(self):
 | 
	
		
			
				|  |  | -        self._loop.remove_reader(self._read_socket)
 | 
	
		
			
				|  |  | +        # Removes the socket hook from loops
 | 
	
		
			
				|  |  | +        for loop in self._loops:
 | 
	
		
			
				|  |  | +            self._loops.get(loop).close()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
 | 
	
		
			
				|  |  |          grpc_completion_queue_shutdown(self._cq)
 | 
	
		
			
				|  |  |          while not self._shutdown:
 | 
	
		
			
				|  |  |              self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S)
 | 
	
		
			
				|  |  |          grpc_completion_queue_destroy(self._cq)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def _handle_events(self):
 | 
	
		
			
				|  |  | +        # Clean up socket resources
 | 
	
		
			
				|  |  | +        self._read_socket.close()
 | 
	
		
			
				|  |  | +        self._write_socket.close()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _handle_events(self, object context_loop):
 | 
	
		
			
				|  |  |          cdef bytes data = self._read_socket.recv(1)
 | 
	
		
			
				|  |  |          cdef grpc_event event
 | 
	
		
			
				|  |  |          cdef CallbackContext *context
 | 
	
	
		
			
				|  | @@ -103,7 +135,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              context = <CallbackContext *>event.tag
 | 
	
		
			
				|  |  |              loop = <object>context.loop
 | 
	
		
			
				|  |  | -            if loop is self._loop:
 | 
	
		
			
				|  |  | +            if loop is context_loop:
 | 
	
		
			
				|  |  |                  # Executes callbacks: complete the future
 | 
	
		
			
				|  |  |                  CallbackWrapper.functor_run(
 | 
	
		
			
				|  |  |                      <grpc_experimental_completion_queue_functor *>event.tag,
 |