|
@@ -29,9 +29,11 @@ else:
|
|
|
|
|
|
|
|
|
def _create_channel(target: str, options: Sequence[Tuple[str, str]],
|
|
|
- channel_credentials: grpc.ChannelCredentials,
|
|
|
+ channel_credentials: Optional[grpc.ChannelCredentials],
|
|
|
compression: Optional[grpc.Compression]) -> grpc.Channel:
|
|
|
- if channel_credentials._credentials is grpc._insecure_channel_credentials:
|
|
|
+ channel_credentials = channel_credentials or grpc.local_channel_credentials(
|
|
|
+ )
|
|
|
+ if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials:
|
|
|
_LOGGER.info(f"Creating insecure channel with options '{options}' " +
|
|
|
f"and compression '{compression}'")
|
|
|
return grpc.insecure_channel(target,
|
|
@@ -98,7 +100,7 @@ class ChannelCache:
|
|
|
ChannelCache._condition.wait(timeout=time_to_eviction)
|
|
|
|
|
|
def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
|
|
|
- channel_credentials: grpc.ChannelCredentials,
|
|
|
+ channel_credentials: Optional[grpc.ChannelCredentials],
|
|
|
compression: Optional[grpc.Compression]) -> grpc.Channel:
|
|
|
key = (target, options, channel_credentials, compression)
|
|
|
with self._lock:
|
|
@@ -128,7 +130,6 @@ class ChannelCache:
|
|
|
RequestType = TypeVar('RequestType')
|
|
|
ResponseType = TypeVar('ResponseType')
|
|
|
|
|
|
-
|
|
|
# TODO(rbellevi): Consider a credential type that has the
|
|
|
# following functionality matrix:
|
|
|
#
|
|
@@ -141,6 +142,7 @@ ResponseType = TypeVar('ResponseType')
|
|
|
#
|
|
|
# Make this the default option.
|
|
|
|
|
|
+
|
|
|
# TODO: Make LocalChannelCredentials the default.
|
|
|
def unary_unary(
|
|
|
request: RequestType,
|
|
@@ -158,6 +160,8 @@ def unary_unary(
|
|
|
) -> ResponseType:
|
|
|
"""Invokes a unary-unary RPC without an explicitly specified channel.
|
|
|
|
|
|
+ THIS IS AN EXPERIMENTAL API.
|
|
|
+
|
|
|
This is backed by a per-process cache of channels. Channels are evicted
|
|
|
from the cache after a fixed period by a background. Channels will also be
|
|
|
evicted if more than a configured maximum accumulate.
|
|
@@ -199,7 +203,7 @@ def unary_unary(
|
|
|
Returns:
|
|
|
The response to the RPC.
|
|
|
"""
|
|
|
- channel_credentials = channel_credentials or grpc.local_channel_credentials()
|
|
|
+ grpc.experimental.warn_experimental("unary_unary")
|
|
|
channel = ChannelCache.get().get_channel(target, options,
|
|
|
channel_credentials, compression)
|
|
|
multicallable = channel.unary_unary(method, request_serializer,
|
|
@@ -227,6 +231,8 @@ def unary_stream(
|
|
|
) -> Iterator[ResponseType]:
|
|
|
"""Invokes a unary-stream RPC without an explicitly specified channel.
|
|
|
|
|
|
+ THIS IS AN EXPERIMENTAL API.
|
|
|
+
|
|
|
This is backed by a per-process cache of channels. Channels are evicted
|
|
|
from the cache after a fixed period by a background. Channels will also be
|
|
|
evicted if more than a configured maximum accumulate.
|
|
@@ -267,7 +273,7 @@ def unary_stream(
|
|
|
Returns:
|
|
|
An iterator of responses.
|
|
|
"""
|
|
|
- channel_credentials = channel_credentials or grpc.local_channel_credentials()
|
|
|
+ grpc.experimental.warn_experimental("unary_stream")
|
|
|
channel = ChannelCache.get().get_channel(target, options,
|
|
|
channel_credentials, compression)
|
|
|
multicallable = channel.unary_stream(method, request_serializer,
|
|
@@ -295,6 +301,8 @@ def stream_unary(
|
|
|
) -> ResponseType:
|
|
|
"""Invokes a stream-unary RPC without an explicitly specified channel.
|
|
|
|
|
|
+ THIS IS AN EXPERIMENTAL API.
|
|
|
+
|
|
|
This is backed by a per-process cache of channels. Channels are evicted
|
|
|
from the cache after a fixed period by a background. Channels will also be
|
|
|
evicted if more than a configured maximum accumulate.
|
|
@@ -335,7 +343,7 @@ def stream_unary(
|
|
|
Returns:
|
|
|
The response to the RPC.
|
|
|
"""
|
|
|
- channel_credentials = channel_credentials or grpc.local_channel_credentials()
|
|
|
+ grpc.experimental.warn_experimental("stream_unary")
|
|
|
channel = ChannelCache.get().get_channel(target, options,
|
|
|
channel_credentials, compression)
|
|
|
multicallable = channel.stream_unary(method, request_serializer,
|
|
@@ -363,6 +371,8 @@ def stream_stream(
|
|
|
) -> Iterator[ResponseType]:
|
|
|
"""Invokes a stream-stream RPC without an explicitly specified channel.
|
|
|
|
|
|
+ THIS IS AN EXPERIMENTAL API.
|
|
|
+
|
|
|
This is backed by a per-process cache of channels. Channels are evicted
|
|
|
from the cache after a fixed period by a background. Channels will also be
|
|
|
evicted if more than a configured maximum accumulate.
|
|
@@ -403,7 +413,7 @@ def stream_stream(
|
|
|
Returns:
|
|
|
An iterator of responses.
|
|
|
"""
|
|
|
- channel_credentials = channel_credentials or grpc.local_channel_credentials()
|
|
|
+ grpc.experimental.warn_experimental("stream_stream")
|
|
|
channel = ChannelCache.get().get_channel(target, options,
|
|
|
channel_credentials, compression)
|
|
|
multicallable = channel.stream_stream(method, request_serializer,
|