浏览代码

Add shutdown process for completion queue

Lidi Zheng 6 年之前
父节点
当前提交
3c43e6330f

+ 15 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi

@@ -25,6 +25,18 @@ cdef class RPCState:
     cdef bytes method(self)
 
 
+cdef class CallbackWrapper:
+    cdef CallbackContext context
+    cdef object _reference
+
+    @staticmethod
+    cdef void functor_run(
+            grpc_experimental_completion_queue_functor* functor,
+            int succeed)
+
+    cdef grpc_experimental_completion_queue_functor *c_functor(self)
+
+
 cdef enum AioServerStatus:
     AIO_SERVER_STATUS_UNKNOWN
     AIO_SERVER_STATUS_READY
@@ -36,6 +48,9 @@ cdef enum AioServerStatus:
 cdef class _CallbackCompletionQueue:
     cdef grpc_completion_queue *_cq
     cdef grpc_completion_queue* c_ptr(self)
+    cdef object _shutdown_completed
+    cdef CallbackWrapper _wrapper
+    cdef object _loop
 
 
 cdef class AioServer:

+ 18 - 9
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@@ -26,8 +26,6 @@ class _ServicerContextPlaceHolder(object): pass
 # TODO(https://github.com/grpc/grpc/issues/20669)
 # Apply this to the client-side
 cdef class CallbackWrapper:
-    cdef CallbackContext context
-    cdef object _reference
 
     def __cinit__(self, object future):
         self.context.functor.functor_run = self.functor_run
@@ -213,14 +211,22 @@ async def _server_start(object loop,
 
 cdef class _CallbackCompletionQueue:
 
-    def __cinit__(self):
+    def __cinit__(self, object loop):
+        self._loop = loop
+        self._shutdown_completed = loop.create_future()
+        self._wrapper = CallbackWrapper(self._shutdown_completed)
         self._cq = grpc_completion_queue_create_for_callback(
-            NULL,
+            self._wrapper.c_functor(),
             NULL
         )
 
     cdef grpc_completion_queue* c_ptr(self):
         return self._cq
+    
+    async def shutdown(self):
+        grpc_completion_queue_shutdown(self._cq)
+        await self._shutdown_completed
+        grpc_completion_queue_destroy(self._cq)
 
 
 cdef class AioServer:
@@ -228,13 +234,16 @@ cdef class AioServer:
     def __init__(self, loop, thread_pool, generic_handlers, interceptors,
                  options, maximum_concurrent_rpcs, compression):
         self._loop = loop
+
+        # C-Core objects won't be deallocated automatically.
         self._server = Server(options)
-        self._cq = _CallbackCompletionQueue()
+        self._cq = _CallbackCompletionQueue(loop)
         grpc_server_register_completion_queue(
             self._server.c_server,
             self._cq.c_ptr(),
             NULL
         )
+
         self._status = AIO_SERVER_STATUS_READY
         self._generic_handlers = []
         self.add_generic_rpc_handlers(generic_handlers)
@@ -319,9 +328,9 @@ cdef class AioServer:
         self._server.is_shutdown = True
         self._status = AIO_SERVER_STATUS_STOPPED
 
+        # Shuts down the completion queue
+        await self._cq.shutdown()
+
     def __dealloc__(self):
-        if self._status == AIO_SERVER_STATUS_STOPPED:
-            grpc_completion_queue_shutdown(self._cq._cq)
-            grpc_completion_queue_destroy(self._cq._cq)
-        else:
+        if self._status != AIO_SERVER_STATUS_STOPPED:
             _LOGGER.error('Server is not stopped while deallocation: %d', self._status)