|
@@ -44,6 +44,11 @@ _SUPPORTED_METHODS = (
|
|
|
"EmptyCall",
|
|
|
)
|
|
|
|
|
|
+_METHOD_CAMEL_TO_CAPS_SNAKE = {
|
|
|
+ "UnaryCall": "UNARY_CALL",
|
|
|
+ "EmptyCall": "EMPTY_CALL",
|
|
|
+}
|
|
|
+
|
|
|
_METHOD_STR_TO_ENUM = {
|
|
|
"UnaryCall": messages_pb2.ClientConfigureRequest.UNARY_CALL,
|
|
|
"EmptyCall": messages_pb2.ClientConfigureRequest.EMPTY_CALL,
|
|
@@ -147,10 +152,11 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
|
|
|
response = messages_pb2.LoadBalancerAccumulatedStatsResponse()
|
|
|
with _global_lock:
|
|
|
for method in _SUPPORTED_METHODS:
|
|
|
- response.num_rpcs_started_by_method[method] = _global_rpcs_started.get[method]
|
|
|
- response.num_rpcs_succeeded_by_method[method] = _global_rpcs_succeeded.get[method]
|
|
|
- response.num_rpcs_failed_by_method[method] = _global_rpcs_succeeded.get[method]
|
|
|
- logger.info("Returning cumulative stats request.")
|
|
|
+ caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method]
|
|
|
+ response.num_rpcs_started_by_method[caps_method] = _global_rpcs_started[method]
|
|
|
+ response.num_rpcs_succeeded_by_method[caps_method] = _global_rpcs_succeeded[method]
|
|
|
+ response.num_rpcs_failed_by_method[caps_method] = _global_rpcs_failed[method]
|
|
|
+ logger.info("Returning cumulative stats response.")
|
|
|
return response
|
|
|
|
|
|
|
|
@@ -184,8 +190,6 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str,
|
|
|
else:
|
|
|
logger.error(exception)
|
|
|
else:
|
|
|
- with _global_lock:
|
|
|
- _global_rpcs_succeeded[method] += 1
|
|
|
response = future.result()
|
|
|
hostname = None
|
|
|
for metadatum in future.initial_metadata():
|
|
@@ -194,6 +198,12 @@ def _on_rpc_done(rpc_id: int, future: grpc.Future, method: str,
|
|
|
break
|
|
|
else:
|
|
|
hostname = response.hostname
|
|
|
+ if future.code() == grpc.StatusCode.OK:
|
|
|
+ with _global_lock:
|
|
|
+ _global_rpcs_succeeded[method] += 1
|
|
|
+ else:
|
|
|
+ with _global_lock:
|
|
|
+ _global_rpcs_failed[method] += 1
|
|
|
if print_response:
|
|
|
if future.code() == grpc.StatusCode.OK:
|
|
|
logger.info("Successful response.")
|
|
@@ -284,6 +294,7 @@ class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureSe
|
|
|
|
|
|
def Configure(self, request: messages_pb2.ClientConfigureRequest,
|
|
|
context: grpc.ServicerContext) -> messages_pb2.ClientConfigureResponse:
|
|
|
+ logging.info("Received Configure RPC: {}".format(request))
|
|
|
method_strs = (_METHOD_ENUM_TO_STR[t] for t in request.types)
|
|
|
for method in _SUPPORTED_METHODS:
|
|
|
method_enum = _METHOD_STR_TO_ENUM[method]
|
|
@@ -293,10 +304,10 @@ class _XdsUpdateClientConfigureServicer(test_pb2_grpc.XdsUpdateClientConfigureSe
|
|
|
else:
|
|
|
qps = 0
|
|
|
metadata = ()
|
|
|
- channel_config = self._per_method_config[method]
|
|
|
+ channel_config = self._per_method_configs[method]
|
|
|
with channel_config.condition:
|
|
|
channel_config.qps = qps
|
|
|
- channel_config.metadata = metadata
|
|
|
+ channel_config.metadata = list(metadata)
|
|
|
channel_config.condition.notify_all()
|
|
|
# TODO: Wait for all channels to respond until responding to RPC?
|
|
|
return messages_pb2.ClientConfigureResponse()
|