| 
					
				 | 
			
			
				@@ -12,11 +12,21 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 # See the License for the specific language governing permissions and 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 # limitations under the License. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-from libc.stdio cimport printf 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import socket 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+IF UNAME_SYSNAME == "Windows": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cdef void _unified_socket_write(int fd) nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        win_socket_send(<WIN_SOCKET>fd, b"1", 1, 0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ELSE: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    from posix cimport unistd 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cdef void _unified_socket_write(int fd) nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        unistd.write(fd, b"1", 1) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     CallbackWrapper.functor_run(callback_wrapper.c_functor(), success) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -30,41 +40,69 @@ cdef class BaseCompletionQueue: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 cdef class PollerCompletionQueue(BaseCompletionQueue): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def __cinit__(self): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self._loop = asyncio.get_event_loop() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         self._cq = grpc_completion_queue_create_for_next(NULL) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         self._shutdown = False 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         self._poller_thread.start() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    cdef void _poll(self) except *: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self._read_socket, self._write_socket = socket.socketpair() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self._write_fd = self._write_socket.fileno() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self._loop.add_reader(self._read_socket, self._handle_events) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        self._queue = cpp_event_queue() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cdef void _poll(self) nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         cdef grpc_event event 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         cdef CallbackContext *context 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         while not self._shutdown: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            with nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                event = grpc_completion_queue_next(self._cq, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                                   _GPR_INF_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                                   NULL) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            event = grpc_completion_queue_next(self._cq, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                _GPR_INF_FUTURE, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                NULL) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if event.type == GRPC_QUEUE_TIMEOUT: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                with gil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             elif event.type == GRPC_QUEUE_SHUTDOWN: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 self._shutdown = True 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                context = <CallbackContext *>event.tag 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                loop = <object>context.loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                loop.call_soon_threadsafe( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    _handle_callback_wrapper, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    <CallbackWrapper>context.callback_wrapper, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    event.success) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                self._queue.push(event) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                _unified_socket_write(self._write_fd) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     def _poll_wrapper(self): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        self._poll() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        with nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            self._poll() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    cdef void shutdown(self) nogil: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cdef shutdown(self): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         grpc_completion_queue_shutdown(self._cq) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         grpc_completion_queue_destroy(self._cq) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    def _handle_events(self): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cdef bytes data = self._read_socket.recv(1) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cdef grpc_event event 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cdef CallbackContext *context 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        while not self._queue.empty(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            event = self._queue.front() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            self._queue.pop() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            context = <CallbackContext *>event.tag 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            loop = <object>context.loop 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if loop is self._loop: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # Executes callbacks: complete the future 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                CallbackWrapper.functor_run( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    <grpc_experimental_completion_queue_functor *>event.tag, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    event.success 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                loop.call_soon_threadsafe( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    _handle_callback_wrapper, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    <CallbackWrapper>context.callback_wrapper, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    event.success 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 cdef class CallbackCompletionQueue(BaseCompletionQueue): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 |