|
@@ -141,15 +141,25 @@ cdef class _ServicerContext:
|
|
|
object code,
|
|
|
str details='',
|
|
|
tuple trailing_metadata=_EMPTY_METADATA):
|
|
|
- await _perform_abort(
|
|
|
- self._rpc_state,
|
|
|
- code.value[0],
|
|
|
- details,
|
|
|
- trailing_metadata,
|
|
|
- self._loop
|
|
|
- )
|
|
|
+ if self._rpc_state.abort_exception is not None:
|
|
|
+ raise RuntimeError('Abort already called!')
|
|
|
+ else:
|
|
|
+ # Keeps track of the exception object. After abort happen, the RPC
|
|
|
+ # should stop execution. However, if users decided to suppress it, it
|
|
|
+ # could lead to undefined behavior.
|
|
|
+ self._rpc_state.abort_exception = AbortError('Locally aborted.')
|
|
|
+
|
|
|
+ self._rpc_state.status_sent = True
|
|
|
+ await _send_error_status_from_server(
|
|
|
+ self._rpc_state,
|
|
|
+ code.value[0],
|
|
|
+ details,
|
|
|
+ trailing_metadata,
|
|
|
+ self._rpc_state.metadata_sent,
|
|
|
+ self._loop
|
|
|
+ )
|
|
|
|
|
|
- raise self._rpc_state.abort_exception
|
|
|
+ raise self._rpc_state.abort_exception
|
|
|
|
|
|
|
|
|
cdef _find_method_handler(str method, list generic_handlers):
|
|
@@ -207,8 +217,8 @@ async def _handle_unary_unary_rpc(object method_handler,
|
|
|
SendInitialMetadataOperation(None, _EMPTY_FLAGS),
|
|
|
SendMessageOperation(response_raw, _EMPTY_FLAGS),
|
|
|
)
|
|
|
- await execute_batch(rpc_state, send_ops, loop)
|
|
|
rpc_state.status_sent = True
|
|
|
+ await execute_batch(rpc_state, send_ops, loop)
|
|
|
|
|
|
|
|
|
async def _handle_unary_stream_rpc(object method_handler,
|
|
@@ -270,8 +280,8 @@ async def _handle_unary_stream_rpc(object method_handler,
|
|
|
)
|
|
|
|
|
|
cdef tuple ops = (op,)
|
|
|
- await execute_batch(rpc_state, ops, loop)
|
|
|
rpc_state.status_sent = True
|
|
|
+ await execute_batch(rpc_state, ops, loop)
|
|
|
|
|
|
|
|
|
async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
|
|
@@ -292,11 +302,12 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop):
|
|
|
except Exception as e:
|
|
|
_LOGGER.exception(e)
|
|
|
if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED:
|
|
|
- await _perform_abort(
|
|
|
+ await _send_error_status_from_server(
|
|
|
rpc_state,
|
|
|
StatusCode.unknown,
|
|
|
'%s: %s' % (type(e), e),
|
|
|
_EMPTY_METADATA,
|
|
|
+ rpc_state.metadata_sent,
|
|
|
loop
|
|
|
)
|
|
|
|
|
@@ -333,11 +344,13 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop):
|
|
|
generic_handlers,
|
|
|
)
|
|
|
if method_handler is None:
|
|
|
- await _perform_abort(
|
|
|
+ rpc_state.status_sent = True
|
|
|
+ await _send_error_status_from_server(
|
|
|
rpc_state,
|
|
|
StatusCode.unimplemented,
|
|
|
b'Method not found!',
|
|
|
_EMPTY_METADATA,
|
|
|
+ rpc_state.metadata_sent,
|
|
|
loop
|
|
|
)
|
|
|
return
|