|  | @@ -18,7 +18,7 @@ import traceback
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  cdef int _EMPTY_FLAG = 0
 | 
	
		
			
				|  |  | -# TODO(lidiz) Use a designated value other than None.
 | 
	
		
			
				|  |  | +cdef str _RPC_FINISHED_DETAILS = 'RPC already finished.'
 | 
	
		
			
				|  |  |  cdef str _SERVER_STOPPED_DETAILS = 'Server already stopped.'
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  cdef class _HandlerCallDetails:
 | 
	
	
		
			
				|  | @@ -27,6 +27,10 @@ cdef class _HandlerCallDetails:
 | 
	
		
			
				|  |  |          self.invocation_metadata = invocation_metadata
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +class _ServerStoppedError(RuntimeError):
 | 
	
		
			
				|  |  | +    """Raised if the server is stopped."""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  cdef class RPCState:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __cinit__(self, AioServer server):
 | 
	
	
		
			
				|  | @@ -46,6 +50,23 @@ cdef class RPCState:
 | 
	
		
			
				|  |  |      cdef tuple invocation_metadata(self):
 | 
	
		
			
				|  |  |          return _metadata(&self.request_metadata)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    cdef void raise_for_termination(self) except *:
 | 
	
		
			
				|  |  | +        """Raise exceptions if RPC is not running.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Server method handlers may suppress the abort exception. We need to halt
 | 
	
		
			
				|  |  | +        the RPC execution in that case. This function needs to be called after
 | 
	
		
			
				|  |  | +        running application code.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Also, the server may stop unexpected. We need to check before calling
 | 
	
		
			
				|  |  | +        into Core functions, otherwise, segfault.
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  | +        if self.abort_exception is not None:
 | 
	
		
			
				|  |  | +            raise self.abort_exception
 | 
	
		
			
				|  |  | +        if self.status_sent:
 | 
	
		
			
				|  |  | +            raise RuntimeError(_RPC_FINISHED_DETAILS)
 | 
	
		
			
				|  |  | +        if self.server._status == AIO_SERVER_STATUS_STOPPED:
 | 
	
		
			
				|  |  | +            raise _ServerStoppedError(_SERVER_STOPPED_DETAILS)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def __dealloc__(self):
 | 
	
		
			
				|  |  |          """Cleans the Core objects."""
 | 
	
		
			
				|  |  |          grpc_call_details_destroy(&self.details)
 | 
	
	
		
			
				|  | @@ -59,17 +80,6 @@ cdef class RPCState:
 | 
	
		
			
				|  |  |  class AbortError(Exception): pass
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def _raise_if_aborted(RPCState rpc_state):
 | 
	
		
			
				|  |  | -    """Raise AbortError if RPC is aborted.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    Server method handlers may suppress the abort exception. We need to halt
 | 
	
		
			
				|  |  | -    the RPC execution in that case. This function needs to be called after
 | 
	
		
			
				|  |  | -    running application code.
 | 
	
		
			
				|  |  | -    """
 | 
	
		
			
				|  |  | -    if rpc_state.abort_exception is not None:
 | 
	
		
			
				|  |  | -        raise rpc_state.abort_exception
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  cdef class _ServicerContext:
 | 
	
		
			
				|  |  |      cdef RPCState _rpc_state
 | 
	
		
			
				|  |  |      cdef object _loop
 | 
	
	
		
			
				|  | @@ -88,10 +98,8 @@ cdef class _ServicerContext:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async def read(self):
 | 
	
		
			
				|  |  |          cdef bytes raw_message
 | 
	
		
			
				|  |  | -        if self._rpc_state.server._status == AIO_SERVER_STATUS_STOPPED:
 | 
	
		
			
				|  |  | -            raise RuntimeError(_SERVER_STOPPED_DETAILS)
 | 
	
		
			
				|  |  | -        if self._rpc_state.status_sent:
 | 
	
		
			
				|  |  | -            raise RuntimeError('RPC already finished.')
 | 
	
		
			
				|  |  | +        self._rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          if self._rpc_state.client_closed:
 | 
	
		
			
				|  |  |              return EOF
 | 
	
		
			
				|  |  |          raw_message = await _receive_message(self._rpc_state, self._loop)
 | 
	
	
		
			
				|  | @@ -102,10 +110,8 @@ cdef class _ServicerContext:
 | 
	
		
			
				|  |  |                              raw_message)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async def write(self, object message):
 | 
	
		
			
				|  |  | -        if self._rpc_state.server._status == AIO_SERVER_STATUS_STOPPED:
 | 
	
		
			
				|  |  | -            raise RuntimeError(_SERVER_STOPPED_DETAILS)
 | 
	
		
			
				|  |  | -        if self._rpc_state.status_sent:
 | 
	
		
			
				|  |  | -            raise RuntimeError('RPC already finished.')
 | 
	
		
			
				|  |  | +        self._rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          await _send_message(self._rpc_state,
 | 
	
		
			
				|  |  |                              serialize(self._response_serializer, message),
 | 
	
		
			
				|  |  |                              self._rpc_state.metadata_sent,
 | 
	
	
		
			
				|  | @@ -114,11 +120,9 @@ cdef class _ServicerContext:
 | 
	
		
			
				|  |  |              self._rpc_state.metadata_sent = True
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async def send_initial_metadata(self, tuple metadata):
 | 
	
		
			
				|  |  | -        if self._rpc_state.status_sent:
 | 
	
		
			
				|  |  | -            raise RuntimeError('RPC already finished.')
 | 
	
		
			
				|  |  | -        elif self._rpc_state.server._status == AIO_SERVER_STATUS_STOPPED:
 | 
	
		
			
				|  |  | -            raise RuntimeError(_SERVER_STOPPED_DETAILS)
 | 
	
		
			
				|  |  | -        elif self._rpc_state.metadata_sent:
 | 
	
		
			
				|  |  | +        self._rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if self._rpc_state.metadata_sent:
 | 
	
		
			
				|  |  |              raise RuntimeError('Send initial metadata failed: already sent')
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              await _send_initial_metadata(self._rpc_state, metadata, self._loop)
 | 
	
	
		
			
				|  | @@ -189,7 +193,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
 | 
	
		
			
				|  |  |      )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # Raises exception if aborted
 | 
	
		
			
				|  |  | -    _raise_if_aborted(rpc_state)
 | 
	
		
			
				|  |  | +    rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # Serializes the response message
 | 
	
		
			
				|  |  |      cdef bytes response_raw = serialize(
 | 
	
	
		
			
				|  | @@ -236,9 +240,6 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state,
 | 
	
		
			
				|  |  |              request,
 | 
	
		
			
				|  |  |              servicer_context,
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        # Raises exception if aborted
 | 
	
		
			
				|  |  | -        _raise_if_aborted(rpc_state)
 | 
	
		
			
				|  |  |      else:
 | 
	
		
			
				|  |  |          # The handler uses async generator API
 | 
	
		
			
				|  |  |          async_response_generator = stream_handler(
 | 
	
	
		
			
				|  | @@ -249,15 +250,12 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state,
 | 
	
		
			
				|  |  |          # Consumes messages from the generator
 | 
	
		
			
				|  |  |          async for response_message in async_response_generator:
 | 
	
		
			
				|  |  |              # Raises exception if aborted
 | 
	
		
			
				|  |  | -            _raise_if_aborted(rpc_state)
 | 
	
		
			
				|  |  | +            rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            if rpc_state.server._status == AIO_SERVER_STATUS_STOPPED:
 | 
	
		
			
				|  |  | -                # The async generator might yield much much later after the
 | 
	
		
			
				|  |  | -                # server is destroied. If we proceed, Core will crash badly.
 | 
	
		
			
				|  |  | -                _LOGGER.info('Aborting RPC due to server stop.')
 | 
	
		
			
				|  |  | -                return
 | 
	
		
			
				|  |  | -            else:
 | 
	
		
			
				|  |  | -                await servicer_context.write(response_message)
 | 
	
		
			
				|  |  | +            await servicer_context.write(response_message)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # Raises exception if aborted
 | 
	
		
			
				|  |  | +    rpc_state.raise_for_termination()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      # Sends the final status of this RPC
 | 
	
		
			
				|  |  |      cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
 | 
	
	
		
			
				|  | @@ -416,6 +414,8 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |      except (KeyboardInterrupt, SystemExit):
 | 
	
		
			
				|  |  |          raise
 | 
	
		
			
				|  |  | +    except _ServerStoppedError:
 | 
	
		
			
				|  |  | +        _LOGGER.info('Aborting RPC due to server stop.')
 | 
	
		
			
				|  |  |      except Exception as e:
 | 
	
		
			
				|  |  |          _LOGGER.exception(e)
 | 
	
		
			
				|  |  |          if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED:
 |