Kaynağa Gözat

Fix random segfaults when gRPC call resources are released

Due to the GC work for breaking reference cycles the internal
attributes of the _AioCall object were cleared and initialized
with None in aims of releasing refcounts for breaking direct or
indirect references.

Once the clear was done, the deallocation of the _AioCall would
fail since the pointer to gRPC call was cast to an invalid object,
no longer was a GrcpWrapper object but a None object, returning a
NULL as a value for the attribute call.
Pau Freixes 5 yıl önce
ebeveyn
işleme
bb2c94f0e2

+ 1 - 3
src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi

@@ -13,11 +13,10 @@
 # limitations under the License.
 
 
-cdef class _AioCall:
+cdef class _AioCall(GrpcCallWrapper):
     cdef:
         AioChannel _channel
         list _references
-        GrpcCallWrapper _grpc_call_wrapper
         # Caches the picked event loop, so we can avoid the 30ns overhead each
         # time we need access to the event loop.
         object _loop
@@ -30,4 +29,3 @@ cdef class _AioCall:
         bint _is_locally_cancelled
 
     cdef grpc_call* _create_grpc_call(self, object timeout, bytes method) except *
-    cdef void _destroy_grpc_call(self)

+ 11 - 14
src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi

@@ -29,15 +29,16 @@ cdef class _AioCall:
                   AioChannel channel,
                   object deadline,
                   bytes method):
+        self.call = NULL
         self._channel = channel
         self._references = []
-        self._grpc_call_wrapper = GrpcCallWrapper()
         self._loop = asyncio.get_event_loop()
         self._create_grpc_call(deadline, method)
         self._is_locally_cancelled = False
 
     def __dealloc__(self):
-        self._destroy_grpc_call()
+        if self.call:
+            grpc_call_unref(self.call)
 
     def __repr__(self):
         class_name = self.__class__.__name__
@@ -62,7 +63,7 @@ cdef class _AioCall:
             <const char *> method,
             <size_t> len(method)
         )
-        self._grpc_call_wrapper.call = grpc_channel_create_call(
+        self.call = grpc_channel_create_call(
             self._channel.channel,
             NULL,
             _EMPTY_MASK,
@@ -74,10 +75,6 @@ cdef class _AioCall:
         )
         grpc_slice_unref(method_slice)
 
-    cdef void _destroy_grpc_call(self):
-        """Destroys the corresponding Core object for this RPC."""
-        grpc_call_unref(self._grpc_call_wrapper.call)
-
     def cancel(self, AioRpcStatus status):
         """Cancels the RPC in Core with given RPC status.
         
@@ -98,7 +95,7 @@ cdef class _AioCall:
             c_details = <char *>details
             # By implementation, grpc_call_cancel_with_status always return OK
             error = grpc_call_cancel_with_status(
-                self._grpc_call_wrapper.call,
+                self.call,
                 status.c_code(),
                 c_details,
                 NULL,
@@ -106,7 +103,7 @@ cdef class _AioCall:
             assert error == GRPC_CALL_OK
         else:
             # By implementation, grpc_call_cancel always return OK
-            error = grpc_call_cancel(self._grpc_call_wrapper.call, NULL)
+            error = grpc_call_cancel(self.call, NULL)
             assert error == GRPC_CALL_OK
 
     async def unary_unary(self,
@@ -141,7 +138,7 @@ cdef class _AioCall:
 
         # Executes all operations in one batch.
         # Might raise CancelledError, handling it in Python UnaryUnaryCall.
-        await execute_batch(self._grpc_call_wrapper,
+        await execute_batch(self,
                             ops,
                             self._loop)
 
@@ -164,7 +161,7 @@ cdef class _AioCall:
         """Handles the status sent by peer once received."""
         cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
         cdef tuple ops = (op,)
-        await execute_batch(self._grpc_call_wrapper, ops, self._loop)
+        await execute_batch(self, ops, self._loop)
 
         # Halts if the RPC is locally cancelled
         if self._is_locally_cancelled:
@@ -187,7 +184,7 @@ cdef class _AioCall:
         # * The client application cancels;
         # * The server sends final status.
         received_message = await _receive_message(
-            self._grpc_call_wrapper,
+            self,
             self._loop
         )
         return received_message
@@ -218,12 +215,12 @@ cdef class _AioCall:
         )
 
         # Sends out the request message.
-        await execute_batch(self._grpc_call_wrapper,
+        await execute_batch(self,
                             outbound_ops,
                             self._loop)
 
         # Receives initial metadata.
         initial_metadata_observer(
-            await _receive_initial_metadata(self._grpc_call_wrapper,
+            await _receive_initial_metadata(self,
                                             self._loop),
         )

+ 1 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@@ -30,6 +30,7 @@ cdef class _HandlerCallDetails:
 cdef class RPCState:
 
     def __cinit__(self, AioServer server):
+        self.call = NULL
         self.server = server
         grpc_metadata_array_init(&self.request_metadata)
         grpc_call_details_init(&self.details)