| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 | # Copyright 2015 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""The Python implementation of the GRPC interoperability test client."""import argparseimport osfrom google import auth as google_authfrom google.auth import jwt as google_auth_jwtimport grpcfrom src.proto.grpc.testing import test_pb2_grpcfrom tests.interop import methodsfrom tests.interop import resourcesdef _args():    parser = argparse.ArgumentParser()    parser.add_argument(        '--server_host',        help='the host to which to connect',        type=str,        default="localhost")    parser.add_argument(        '--server_port', help='the port to which to connect', type=int)    parser.add_argument(        '--test_case',        help='the test case to execute',        type=str,        default="large_unary")    parser.add_argument(        '--use_tls',        help='require a secure connection',        default=False,        type=resources.parse_bool)    parser.add_argument(        '--use_test_ca',        help='replace platform root CAs with ca.pem',        default=False,        type=resources.parse_bool)    parser.add_argument(        '--server_host_override',        default="foo.test.google.fr",        help='the server host to which to claim to connect',        type=str)    parser.add_argument(        '--oauth_scope', help='scope for OAuth tokens', type=str)    parser.add_argument(        '--default_service_account',        help='email address of the default service account',        type=str)    return parser.parse_args()def _application_default_credentials():    return oauth2client_client.GoogleCredentials.get_application_default()def _stub(args):    target = '{}:{}'.format(args.server_host, args.server_port)    if args.test_case == 'oauth2_auth_token':        google_credentials, unused_project_id = google_auth.default(            scopes=[args.oauth_scope])        google_credentials.refresh(google_auth.transport.requests.Request())        call_credentials = grpc.access_token_call_credentials(            google_credentials.token)    elif args.test_case == 'compute_engine_creds':        google_credentials, unused_project_id = google_auth.default(            scopes=[args.oauth_scope])        call_credentials = grpc.metadata_call_credentials(            google_auth.transport.grpc.AuthMetadataPlugin(                credentials=google_credentials,                request=google_auth.transport.requests.Request()))    elif args.test_case == 'jwt_token_creds':        google_credentials = google_auth_jwt.OnDemandCredentials.from_service_account_file(            os.environ[google_auth.environment_vars.CREDENTIALS])        call_credentials = grpc.metadata_call_credentials(            google_auth.transport.grpc.AuthMetadataPlugin(                credentials=google_credentials, request=None))    else:        call_credentials = None    if args.use_tls:        if args.use_test_ca:            root_certificates = resources.test_root_certificates()        else:            root_certificates = None  # will load default roots.        channel_credentials = grpc.ssl_channel_credentials(root_certificates)        if call_credentials is not None:            channel_credentials = grpc.composite_channel_credentials(                channel_credentials, call_credentials)        channel = grpc.secure_channel(target, channel_credentials, (            ('grpc.ssl_target_name_override', args.server_host_override,),))    else:        channel = grpc.insecure_channel(target)    if args.test_case == "unimplemented_service":        return test_pb2_grpc.UnimplementedServiceStub(channel)    else:        return test_pb2_grpc.TestServiceStub(channel)def _test_case_from_arg(test_case_arg):    for test_case in methods.TestCase:        if test_case_arg == test_case.value:            return test_case    else:        raise ValueError('No test case "%s"!' % test_case_arg)def test_interoperability():    args = _args()    stub = _stub(args)    test_case = _test_case_from_arg(args.test_case)    test_case.test_interoperability(stub, args)if __name__ == '__main__':    test_interoperability()
 |