Bladeren bron

Minimal AsyncIO Server for gRPC
* Extends AsyncIO IO manager to support server-side operations;
* Adds more logic to AsyncSocket class;
* Implements an AsyncIO server that can serve unary-unary handlers;
* Adds a server test with grpc.aio.Channel;
* Support both Bazel / setup.py build.

Lidi Zheng 6 jaren geleden
bovenliggende
commit
c6ae98d49a

+ 17 - 0
src/proto/grpc/testing/BUILD

@@ -125,6 +125,23 @@ grpc_proto_library(
     ],
 )
 
+proto_library(
+    name = "benchmark_service_descriptor",
+    srcs = ["benchmark_service.proto"],
+    deps = [":messages_proto_descriptor"],
+)
+
+py_proto_library(
+    name = "benchmark_service_py_pb2",
+    deps = [":benchmark_service_descriptor"],
+)
+
+py_grpc_library(
+    name = "benchmark_service_py_pb2_grpc",
+    srcs = [":benchmark_service_descriptor"],
+    deps = [":benchmark_service_py_pb2"],
+)
+
 grpc_proto_library(
     name = "report_qps_scenario_service_proto",
     srcs = ["report_qps_scenario_service.proto"],

+ 2 - 0
src/python/grpcio/grpc/_cython/BUILD.bazel

@@ -24,6 +24,8 @@ pyx_library(
         "_cygrpc/aio/iomgr/socket.pyx.pxi",
         "_cygrpc/aio/iomgr/timer.pxd.pxi",
         "_cygrpc/aio/iomgr/timer.pyx.pxi",
+        "_cygrpc/aio/server.pxd.pxi",
+        "_cygrpc/aio/server.pyx.pxi",
         "_cygrpc/arguments.pxd.pxi",
         "_cygrpc/arguments.pyx.pxi",
         "_cygrpc/call.pxd.pxi",

+ 52 - 7
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi

@@ -14,9 +14,10 @@
 
 
 from cpython cimport Py_INCREF, Py_DECREF
-
 from libc cimport string
 
+import socket as native_socket
+
 cdef grpc_socket_vtable asyncio_socket_vtable
 cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
 cdef grpc_custom_timer_vtable asyncio_timer_vtable
@@ -81,39 +82,83 @@ cdef grpc_error* asyncio_socket_getpeername(
         grpc_custom_socket* grpc_socket,
         const grpc_sockaddr* addr,
         int* length) with gil:
-    raise NotImplemented()
+    peer = (<_AsyncioSocket>grpc_socket.impl).peername()
+
+    cdef grpc_resolved_address c_addr
+    hostname = str_to_bytes(peer[0])
+    grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
+    string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
+    length[0] = c_addr.len
+    return grpc_error_none()
 
 
 cdef grpc_error* asyncio_socket_getsockname(
         grpc_custom_socket* grpc_socket,
         const grpc_sockaddr* addr,
         int* length) with gil:
-    raise NotImplemented()
+    """Supplies sock_addr in add_socket_to_server."""
+    cdef grpc_resolved_address c_addr
+    socket = (<_AsyncioSocket>grpc_socket.impl)
+    if socket is None:
+        peer = ('0.0.0.0', 0)
+    else:
+        peer = socket.sockname()
+    hostname = str_to_bytes(peer[0])
+    grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
+    string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
+    length[0] = c_addr.len
+    return grpc_error_none()
 
 
 cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
-    raise NotImplemented()
+    (<_AsyncioSocket>grpc_socket.impl).listen()
+    return grpc_error_none()
+
+
+# TODO(lidiz) connects the so_reuse_port option to channel arguments
+def _asyncio_apply_socket_options(object s, so_reuse_port=False):
+  s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
+  if so_reuse_port:
+    s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1)
+  s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
 
 
 cdef grpc_error* asyncio_socket_bind(
         grpc_custom_socket* grpc_socket,
         const grpc_sockaddr* addr,
         size_t len, int flags) with gil:
-    raise NotImplemented()
+    host, port = sockaddr_to_tuple(addr, len)
+    try:
+        try:
+            socket = native_socket.socket(family=native_socket.AF_INET6)
+            _asyncio_apply_socket_options(socket)
+            socket.bind((host, port))
+        except native_socket.gaierror:
+            socket = native_socket.socket(family=native_socket.AF_INET)
+            _asyncio_apply_socket_options(socket)
+            socket.bind((host, port))
+    except IOError as io_error:
+        return socket_error("bind", str(io_error))
+    else:
+        aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
+        cpython.Py_INCREF(aio_socket)
+        grpc_socket.impl = <void*>aio_socket
+        return grpc_error_none()
 
 
 cdef void asyncio_socket_accept(
         grpc_custom_socket* grpc_socket,
         grpc_custom_socket* grpc_socket_client,
         grpc_custom_accept_callback accept_cb) with gil:
-    raise NotImplemented()
+    (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)
 
 
 cdef grpc_error* asyncio_resolve(
         char* host,
         char* port,
         grpc_resolved_addresses** res) with gil:
-    raise NotImplemented()
+    result = native_socket.getaddrinfo(host, port)
+    res[0] = tuples_to_resolvaddr(result)
 
 
 cdef void asyncio_resolve_async(

+ 18 - 1
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi

@@ -15,8 +15,8 @@
 
 cdef class _AsyncioSocket:
     cdef:
+        # Common attributes
         grpc_custom_socket * _grpc_socket
-        grpc_custom_connect_callback _grpc_connect_cb
         grpc_custom_read_callback _grpc_read_cb
         object _reader
         object _writer
@@ -24,11 +24,28 @@ cdef class _AsyncioSocket:
         object _task_connect
         char * _read_buffer
 
+        # Client-side attributes
+        grpc_custom_connect_callback _grpc_connect_cb
+        
+        # Server-side attributes
+        grpc_custom_accept_callback _grpc_accept_cb
+        grpc_custom_socket * _grpc_client_socket
+        object _server
+        object _py_socket
+        object _peername
+
     @staticmethod
     cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket)
+    @staticmethod
+    cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket)
 
     cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb)
     cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb)
     cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb)
     cdef bint is_connected(self)
     cdef void close(self)
+
+    cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb)
+    cdef listen(self)
+    cdef tuple peername(self)
+    cdef tuple sockname(self)

+ 46 - 2
src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi

@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import socket
+import socket as native_socket
 
 from libc cimport string
 
@@ -26,6 +26,8 @@ cdef class _AsyncioSocket:
         self._task_connect = None
         self._task_read = None
         self._read_buffer = NULL
+        self._server = None
+        self._py_socket = None
 
     @staticmethod
     cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket):
@@ -33,6 +35,13 @@ cdef class _AsyncioSocket:
         socket._grpc_socket = grpc_socket
         return socket
 
+    @staticmethod
+    cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
+        socket = _AsyncioSocket()
+        socket._grpc_socket = grpc_socket
+        socket._py_socket = py_socket
+        return socket
+
     def __repr__(self):
         class_name = self.__class__.__name__ 
         id_ = id(self)
@@ -52,7 +61,7 @@ cdef class _AsyncioSocket:
             # gRPC default posix implementation disables nagle
             # algorithm.
             sock = self._writer.transport.get_extra_info('socket')
-            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
+            sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
 
             self._grpc_connect_cb(
                 <grpc_custom_socket*>self._grpc_socket,
@@ -132,3 +141,38 @@ cdef class _AsyncioSocket:
     cdef void close(self):
         if self.is_connected():
             self._writer.close()
+
+    def _new_connection_callback(self, object reader, object writer):
+        client_socket = _AsyncioSocket.create(self._grpc_client_socket)
+        client_socket._reader = reader
+        client_socket._writer = writer
+        client_socket._peername = addr = writer.get_extra_info('peername')
+
+        self._grpc_client_socket.impl = <void*>client_socket
+        cpython.Py_INCREF(client_socket)
+        # Accept callback expects to be called with:
+        # * An grpc custom socket for server
+        # * An grpc custom socket for client (with new Socket instance)
+        # * An error object
+        self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())
+
+    cdef listen(self):
+        async def create_asyncio_server():
+            self._server = await asyncio.start_server(
+                self._new_connection_callback,
+                sock=self._py_socket,
+            )
+
+        asyncio.get_event_loop().create_task(create_asyncio_server())
+
+    cdef accept(self,
+                grpc_custom_socket* grpc_socket_client,
+                grpc_custom_accept_callback grpc_accept_cb):
+        self._grpc_client_socket = grpc_socket_client
+        self._grpc_accept_cb = grpc_accept_cb
+
+    cdef tuple peername(self):
+        return self._peername
+
+    cdef tuple sockname(self):
+        return self._py_socket.getsockname()

+ 35 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi

@@ -0,0 +1,35 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class _HandlerCallDetails:
+    cdef readonly str method
+    cdef readonly tuple invocation_metadata
+
+
+cdef class RPCState:
+    cdef grpc_call* call,
+    cdef grpc_call_details details
+    cdef grpc_metadata_array request_metadata
+
+    cdef bytes method(self)
+
+
+cdef class _AioServerState:
+    cdef Server server
+    cdef grpc_completion_queue *cq
+    cdef list generic_handlers
+
+
+cdef class AioServer:
+    cdef _AioServerState _state

+ 234 - 0
src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@@ -0,0 +1,234 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef class _HandlerCallDetails:
+    def __cinit__(self, str method, tuple invocation_metadata):
+        self.method = method
+        self.invocation_metadata = invocation_metadata
+
+
+class _ServicerContextPlaceHolder(object): pass
+
+
+cdef class CallbackWrapper:
+    cdef CallbackContext context
+    cdef object future
+
+    def __cinit__(self, object future):
+        self.context.functor.functor_run = self.functor_run
+        self.context.waiter = <cpython.PyObject*>(future)
+        self.future = future
+
+    @staticmethod
+    cdef void functor_run(
+            grpc_experimental_completion_queue_functor* functor,
+            int succeed):
+        cdef CallbackContext *context = <CallbackContext *>functor
+        (<object>context.waiter).set_result(None)
+
+    cdef grpc_experimental_completion_queue_functor *c_functor(self):
+        return &self.context.functor
+
+
+cdef class RPCState:
+
+    def __cinit__(self):
+        grpc_metadata_array_init(&self.request_metadata)
+        grpc_call_details_init(&self.details)
+
+    cdef bytes method(self):
+      return _slice_bytes(self.details.method)
+
+    def __dealloc__(self):
+        """Cleans the Core objects."""
+        grpc_call_details_destroy(&self.details)
+        grpc_metadata_array_destroy(&self.request_metadata)
+        if self.call:
+            grpc_call_unref(self.call)
+
+
+cdef _find_method_handler(RPCState rpc_state, list generic_handlers):
+    # TODO(lidiz) connects Metadata to call details
+    cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(
+        rpc_state.method().decode(),
+        tuple()
+    )
+
+    for generic_handler in generic_handlers:
+        method_handler = generic_handler.service(handler_call_details)
+        if method_handler is not None:
+            return method_handler
+    return None
+
+
+async def callback_start_batch(RPCState rpc_state, tuple operations, object
+loop):
+    """The callback version of start batch operations."""
+    cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None)
+    batch_operation_tag.prepare()
+
+    cdef object future = loop.create_future()
+    cdef CallbackWrapper wrapper = CallbackWrapper(future)
+    # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
+    # when calling "await". This is an over-optimization by Cython.
+    cpython.Py_INCREF(wrapper)
+    cdef grpc_call_error error = grpc_call_start_batch(
+        rpc_state.call,
+        batch_operation_tag.c_ops,
+        batch_operation_tag.c_nops,
+        wrapper.c_functor(), NULL)
+
+    if error != GRPC_CALL_OK:
+        raise RuntimeError("Error with callback_start_batch {}".format(error))
+
+    await future
+    cpython.Py_DECREF(wrapper)
+    cdef grpc_event c_event
+    batch_operation_tag.event(c_event)
+
+
+async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object loop):
+    # Finds the method handler (application logic)
+    cdef object method_handler = _find_method_handler(
+        rpc_state,
+        server_state.generic_handlers
+    )
+    if method_handler.request_streaming or method_handler.response_streaming:
+        raise NotImplementedError()
+
+    # Receives request message
+    cdef tuple receive_ops = (
+        ReceiveMessageOperation(_EMPTY_FLAGS),
+    )
+    await callback_start_batch(rpc_state, receive_ops, loop)
+
+    # Parses the request
+    cdef bytes request_raw = receive_ops[0].message()
+    cdef object request_message
+    if method_handler.request_deserializer:
+        request_message = method_handler.request_deserializer(request_raw)
+    else:
+        request_message = request_raw
+
+    # Executes application logic & encodes response message
+    cdef object response_message = await method_handler.unary_unary(request_message, _ServicerContextPlaceHolder())
+    cdef bytes response_raw
+    if method_handler.response_serializer:
+        response_raw = method_handler.response_serializer(response_message)
+    else:
+        response_raw = response_message
+
+    # Sends response message
+    cdef tuple send_ops = (
+        SendStatusFromServerOperation(
+        tuple(), StatusCode.ok, b'', _EMPTY_FLAGS),
+        SendInitialMetadataOperation(tuple(), _EMPTY_FLAGS),
+        SendMessageOperation(response_raw, _EMPTY_FLAGS),
+    )
+    await callback_start_batch(rpc_state, send_ops, loop)
+
+
+async def _server_call_request_call(_AioServerState server_state, object loop):
+    cdef grpc_call_error error
+    cdef RPCState rpc_state = RPCState()
+    cdef object future = loop.create_future()
+    cdef CallbackWrapper wrapper = CallbackWrapper(future)
+    # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
+    # when calling "await". This is an over-optimization by Cython.
+    cpython.Py_INCREF(wrapper)
+    error = grpc_server_request_call(
+        server_state.server.c_server, &rpc_state.call, &rpc_state.details,
+        &rpc_state.request_metadata,
+        server_state.cq, server_state.cq,
+        wrapper.c_functor()
+    )
+    if error != GRPC_CALL_OK:
+        raise RuntimeError("Error in _server_call_request_call: %s" % error)
+
+    await future
+    cpython.Py_DECREF(wrapper)
+    return rpc_state
+
+
+async def _server_main_loop(_AioServerState server_state):
+    cdef object loop = asyncio.get_event_loop()
+    cdef RPCState rpc_state
+    cdef object waiter
+
+    while True:
+        rpc_state = await _server_call_request_call(
+            server_state,
+            loop)
+        # await waiter
+
+        loop.create_task(_handle_rpc(server_state, rpc_state, loop))
+        await asyncio.sleep(0)
+
+
+async def _server_start(_AioServerState server_state):
+    server_state.server.start()
+    await _server_main_loop(server_state)
+
+
+cdef class _AioServerState:
+    def __cinit__(self):
+        self.server = None
+        self.cq = NULL
+        self.generic_handlers = []
+
+
+cdef class AioServer:
+
+    def __init__(self, thread_pool, generic_handlers, interceptors, options,
+                 maximum_concurrent_rpcs, compression):
+        self._state = _AioServerState()
+        self._state.server = Server(options)
+        self._state.cq = grpc_completion_queue_create_for_callback(
+            NULL,
+            NULL
+        )
+        grpc_server_register_completion_queue(
+            self._state.server.c_server,
+            self._state.cq,
+            NULL
+        )
+        self.add_generic_rpc_handlers(generic_handlers)
+
+        if interceptors:
+            raise NotImplementedError()
+        if maximum_concurrent_rpcs:
+            raise NotImplementedError()
+        if compression:
+            raise NotImplementedError()
+        if thread_pool:
+            raise NotImplementedError()
+
+    def add_generic_rpc_handlers(self, generic_rpc_handlers):
+        for h in generic_rpc_handlers:
+            self._state.generic_handlers.append(h)
+
+    def add_insecure_port(self, address):
+        return self._state.server.add_http2_port(address)
+
+    def add_secure_port(self, address, server_credentials):
+        return self._state.server.add_http2_port(address,
+                                          server_credentials._credentials)
+
+    async def start(self):
+        loop = asyncio.get_event_loop()
+        loop.create_task(_server_start(self._state))
+        await asyncio.sleep(0)
+
+    def stop(self, unused_grace):
+        pass

+ 1 - 0
src/python/grpcio/grpc/_cython/cygrpc.pxd

@@ -47,3 +47,4 @@ include "_cygrpc/aio/grpc_aio.pxd.pxi"
 include "_cygrpc/aio/callbackcontext.pxd.pxi"
 include "_cygrpc/aio/call.pxd.pxi"
 include "_cygrpc/aio/channel.pxd.pxi"
+include "_cygrpc/aio/server.pxd.pxi"

+ 1 - 0
src/python/grpcio/grpc/_cython/cygrpc.pyx

@@ -64,6 +64,7 @@ include "_cygrpc/aio/grpc_aio.pyx.pxi"
 include "_cygrpc/aio/call.pyx.pxi"
 include "_cygrpc/aio/channel.pyx.pxi"
 include "_cygrpc/aio/rpc_error.pyx.pxi"
+include "_cygrpc/aio/server.pyx.pxi"
 
 
 #

+ 1 - 0
src/python/grpcio/grpc/experimental/BUILD.bazel

@@ -5,6 +5,7 @@ py_library(
     srcs = [
         "aio/__init__.py",
         "aio/_channel.py",
+        "aio/_server.py",
     ],
     deps = [
         "//src/python/grpcio/grpc/_cython:cygrpc",

+ 1 - 0
src/python/grpcio/grpc/experimental/aio/__init__.py

@@ -20,6 +20,7 @@ import six
 import grpc
 from grpc._cython import cygrpc
 from grpc._cython.cygrpc import init_grpc_aio
+from ._server import server
 
 
 class Channel(six.with_metaclass(abc.ABCMeta)):

+ 174 - 0
src/python/grpcio/grpc/experimental/aio/_server.py

@@ -0,0 +1,174 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Server-side implementation of gRPC Asyncio Python."""
+
+from typing import Text, Optional
+import asyncio
+import grpc
+from grpc._cython import cygrpc
+
+class Server:
+    """Serves RPCs."""
+
+    def __init__(self, thread_pool, generic_handlers, interceptors, options,
+                 maximum_concurrent_rpcs, compression):
+        self._server = cygrpc.AioServer(thread_pool, generic_handlers,
+                                        interceptors, options,
+                                        maximum_concurrent_rpcs, compression)
+
+    def add_generic_rpc_handlers(
+            self,
+            generic_rpc_handlers,
+            # generic_rpc_handlers: Iterable[grpc.GenericRpcHandlers]
+    ) -> None:
+        """Registers GenericRpcHandlers with this Server.
+
+        This method is only safe to call before the server is started.
+
+        Args:
+          generic_rpc_handlers: An iterable of GenericRpcHandlers that will be
+          used to service RPCs.
+        """
+        self._server.add_generic_rpc_handlers(generic_rpc_handlers)
+
+    def add_insecure_port(self, address: Text) -> int:
+        """Opens an insecure port for accepting RPCs.
+
+        This method may only be called before starting the server.
+
+        Args:
+          address: The address for which to open a port. If the port is 0,
+            or not specified in the address, then gRPC runtime will choose a port.
+
+        Returns:
+          An integer port on which server will accept RPC requests.
+        """
+        return self._server.add_insecure_port(address)
+
+    def add_secure_port(self, address: Text,
+                        server_credentials: grpc.ServerCredentials) -> int:
+        """Opens a secure port for accepting RPCs.
+
+        This method may only be called before starting the server.
+
+        Args:
+          address: The address for which to open a port.
+            if the port is 0, or not specified in the address, then gRPC
+            runtime will choose a port.
+          server_credentials: A ServerCredentials object.
+
+        Returns:
+          An integer port on which server will accept RPC requests.
+        """
+        return self._server.add_secure_port(address, server_credentials)
+
+    async def start(self) -> None:
+        """Starts this Server.
+
+        This method may only be called once. (i.e. it is not idempotent).
+        """
+        await self._server.start()
+
+    def stop(self, grace: Optional[float]) -> asyncio.Event:
+        """Stops this Server.
+
+        This method immediately stop service of new RPCs in all cases.
+
+        If a grace period is specified, this method returns immediately
+        and all RPCs active at the end of the grace period are aborted.
+        If a grace period is not specified (by passing None for `grace`),
+        all existing RPCs are aborted immediately and this method
+        blocks until the last RPC handler terminates.
+
+        This method is idempotent and may be called at any time.
+        Passing a smaller grace value in a subsequent call will have
+        the effect of stopping the Server sooner (passing None will
+        have the effect of stopping the server immediately). Passing
+        a larger grace value in a subsequent call *will not* have the
+        effect of stopping the server later (i.e. the most restrictive
+        grace value is used).
+
+        Args:
+          grace: A duration of time in seconds or None.
+
+        Returns:
+          A threading.Event that will be set when this Server has completely
+          stopped, i.e. when running RPCs either complete or are aborted and
+          all handlers have terminated.
+        """
+        raise NotImplementedError()
+
+    async def wait_for_termination(self,
+                                   timeout: Optional[float] = None) -> bool:
+        """Block current thread until the server stops.
+
+        This is an EXPERIMENTAL API.
+
+        The wait will not consume computational resources during blocking, and
+        it will block until one of the two following conditions are met:
+
+        1) The server is stopped or terminated;
+        2) A timeout occurs if timeout is not `None`.
+
+        The timeout argument works in the same way as `threading.Event.wait()`.
+        https://docs.python.org/3/library/threading.html#threading.Event.wait
+
+        Args:
+          timeout: A floating point number specifying a timeout for the
+            operation in seconds.
+
+        Returns:
+          A bool indicates if the operation times out.
+        """
+        if timeout:
+            raise NotImplementedError()
+        # TODO(lidiz) replace this wait forever logic
+        future = asyncio.get_event_loop().create_future()
+        await future
+
+
+def server(thread_pool=None,
+           handlers=None,
+           interceptors=None,
+           options=None,
+           maximum_concurrent_rpcs=None,
+           compression=None):
+    """Creates a Server with which RPCs can be serviced.
+
+    Args:
+      thread_pool: A futures.ThreadPoolExecutor to be used by the Server
+        to execute RPC handlers.
+      handlers: An optional list of GenericRpcHandlers used for executing RPCs.
+        More handlers may be added by calling add_generic_rpc_handlers any time
+        before the server is started.
+      interceptors: An optional list of ServerInterceptor objects that observe
+        and optionally manipulate the incoming RPCs before handing them over to
+        handlers. The interceptors are given control in the order they are
+        specified. This is an EXPERIMENTAL API.
+      options: An optional list of key-value pairs (channel args in gRPC runtime)
+        to configure the channel.
+      maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
+        will service before returning RESOURCE_EXHAUSTED status, or None to
+        indicate no limit.
+      compression: An element of grpc.compression, e.g.
+        grpc.compression.Gzip. This compression algorithm will be used for the
+        lifetime of the server unless overridden. This is an EXPERIMENTAL option.
+
+    Returns:
+      A Server object.
+    """
+    return Server(thread_pool, () if handlers is None else handlers, ()
+                  if interceptors is None else interceptors, ()
+                  if options is None else options, maximum_concurrent_rpcs,
+                  compression)

+ 52 - 0
src/python/grpcio_tests/tests_aio/benchmark/server.py

@@ -0,0 +1,52 @@
+# Copyright 2019 The gRPC Authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import logging
+import unittest
+
+from grpc.experimental import aio
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
+
+
+class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer):
+
+    async def UnaryCall(self, request, context):
+        payload = messages_pb2.Payload(body=b'\0' * request.response_size)
+        return messages_pb2.SimpleResponse(payload=payload)
+
+
+async def _start_async_server():
+    server = aio.server()
+
+    port = server.add_insecure_port(('localhost:%s' % 50051).encode('ASCII'))
+    servicer = BenchmarkServer()
+    benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
+        servicer, server)
+
+    await server.start()
+    await server.wait_for_termination()
+
+
+def main():
+    aio.init_grpc_aio()
+    loop = asyncio.get_event_loop()
+    loop.create_task(_start_async_server())
+    loop.run_forever()
+
+
+if __name__ == '__main__':
+    logging.basicConfig()
+    main()

+ 42 - 0
src/python/grpcio_tests/tests_aio/unit/BUILD.bazel

@@ -0,0 +1,42 @@
+# Copyright 2019 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+package(
+    default_testonly = 1,
+    default_visibility = ["//visibility:public"],
+)
+
+GRPC_ASYNC_TESTS = [
+    "server_test.py",
+]
+
+[
+    py_test(
+        name=test_file_name[:-3],
+        size="small",
+        srcs=[test_file_name],
+        main=test_file_name,
+        deps=[
+            "//src/python/grpcio/grpc:grpcio",
+            "//src/proto/grpc/testing:py_messages_proto",
+            "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc",
+            "//src/proto/grpc/testing:benchmark_service_py_pb2",
+            "//external:six"
+        ],
+        imports=["../../",],
+        data=[
+            "//src/python/grpcio_tests/tests/unit/credentials",
+        ],
+    ) for test_file_name in GRPC_ASYNC_TESTS
+]

+ 61 - 0
src/python/grpcio_tests/tests_aio/unit/server_test.py

@@ -0,0 +1,61 @@
+# Copyright 2019 The gRPC Authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import logging
+import unittest
+
+from grpc.experimental import aio
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import benchmark_service_pb2_grpc
+
+
+class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer):
+
+    async def UnaryCall(self, request, context):
+        payload = messages_pb2.Payload(body=b'\0' * request.response_size)
+        return messages_pb2.SimpleResponse(payload=payload)
+
+
+class TestServer(unittest.TestCase):
+
+    def test_unary_unary(self):
+        loop = asyncio.get_event_loop()
+
+        async def test_unary_unary_body():
+            server = aio.server()
+            port = server.add_insecure_port(('[::]:0').encode('ASCII'))
+            benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
+                BenchmarkServer(), server)
+            await server.start()
+
+            async with aio.insecure_channel(f'localhost:{port}') as channel:
+                unary_call = channel.unary_unary(
+                    '/grpc.testing.BenchmarkService/UnaryCall',
+                    request_serializer=messages_pb2.SimpleRequest.
+                    SerializeToString,
+                    response_deserializer=messages_pb2.SimpleResponse.FromString
+                )
+                response = await unary_call(
+                    messages_pb2.SimpleRequest(response_size=1))
+                self.assertIsInstance(response, messages_pb2.SimpleResponse)
+                self.assertEqual(1, len(response.payload.body))
+
+        loop.run_until_complete(test_unary_unary_body())
+
+
+if __name__ == '__main__':
+    aio.init_grpc_aio()
+    logging.basicConfig()
+    unittest.main(verbosity=2)