| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609 | #!/usr/bin/env python# Copyright 2015 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Run interop (cross-language) tests in parallel."""from __future__ import print_functionimport argparseimport atexitimport itertoolsimport jsonimport multiprocessingimport osimport reimport subprocessimport sysimport tempfileimport timeimport uuidimport siximport tracebackimport python_utils.dockerjob as dockerjobimport python_utils.jobset as jobsetimport python_utils.report_utils as report_utils# Docker doesn't clean up after itself, so we do it on exit.atexit.register(lambda: subprocess.call(['stty', 'echo']))ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))os.chdir(ROOT)_FALLBACK_SERVER_PORT = 443_BALANCER_SERVER_PORT = 12000_BACKEND_SERVER_PORT = 8080_TEST_TIMEOUT = 30_FAKE_SERVERS_SAFENAME = 'fake_servers'# Use a name that's verified by the test certs_SERVICE_NAME = 'server.test.google.fr'class CXXLanguage:    def __init__(self):        self.client_cwd = '/var/local/git/grpc'        self.safename = 'cxx'    def client_cmd(self, args):        return ['bins/opt/interop_client'] + args    def global_env(self):        # 1) Set c-ares as the resolver, to        #    enable grpclb.        # 2) Turn on verbose logging.        # 3) Set the ROOTS_PATH env variable        #    to the test CA in order for        #    GoogleDefaultCredentials to be        #    able to use the test CA.        return {            'GRPC_DNS_RESOLVER':            'ares',            'GRPC_VERBOSITY':            'DEBUG',            'GRPC_TRACE':            'client_channel,glb',            'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':            '/var/local/git/grpc/src/core/tsi/test_creds/ca.pem',        }    def __str__(self):        return 'c++'class JavaLanguage:    def __init__(self):        self.client_cwd = '/var/local/git/grpc-java'        self.safename = str(self)    def client_cmd(self, args):        # Take necessary steps to import our test CA into        # the set of test CA's that the Java runtime of the        # docker container will pick up, so that        # Java GoogleDefaultCreds can use it.        pem_to_der_cmd = ('openssl x509 -outform der '                          '-in /external_mount/src/core/tsi/test_creds/ca.pem '                          '-out /tmp/test_ca.der')        keystore_import_cmd = (            'keytool -import '            '-keystore /usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts '            '-file /tmp/test_ca.der '            '-deststorepass changeit '            '-noprompt')        return [            'bash', '-c', ('{pem_to_der_cmd} && '                           '{keystore_import_cmd} && '                           './run-test-client.sh {java_client_args}').format(                               pem_to_der_cmd=pem_to_der_cmd,                               keystore_import_cmd=keystore_import_cmd,                               java_client_args=' '.join(args))        ]    def global_env(self):        # 1) Enable grpclb        # 2) Enable verbose logging        return {            'JAVA_OPTS':            ('-Dio.grpc.internal.DnsNameResolverProvider.enable_grpclb=true '             '-Djava.util.logging.config.file=/var/local/grpc_java_logging/logconf.txt'            )        }    def __str__(self):        return 'java'class GoLanguage:    def __init__(self):        self.client_cwd = '/go/src/google.golang.org/grpc/interop/client'        self.safename = str(self)    def client_cmd(self, args):        # Copy the test CA file into the path that        # the Go runtime in the docker container will use, so        # that Go's GoogleDefaultCredentials can use it.        # See https://golang.org/src/crypto/x509/root_linux.go.        return [            'bash', '-c', ('cp /external_mount/src/core/tsi/test_creds/ca.pem '                           '/etc/ssl/certs/ca-certificates.crt && '                           '/go/bin/client {go_client_args}'                          ).format(go_client_args=' '.join(args))        ]    def global_env(self):        return {            'GRPC_GO_LOG_VERBOSITY_LEVEL': '3',            'GRPC_GO_LOG_SEVERITY_LEVEL': 'INFO'        }    def __str__(self):        return 'go'_LANGUAGES = {    'c++': CXXLanguage(),    'go': GoLanguage(),    'java': JavaLanguage(),}def docker_run_cmdline(cmdline, image, docker_args, cwd, environ=None):    """Wraps given cmdline array to create 'docker run' cmdline from it."""    # turn environ into -e docker args    docker_cmdline = 'docker run -i --rm=true'.split()    if environ:        for k, v in environ.items():            docker_cmdline += ['-e', '%s=%s' % (k, v)]    return docker_cmdline + ['-w', cwd] + docker_args + [image] + cmdlinedef _job_kill_handler(job):    assert job._spec.container_name    dockerjob.docker_kill(job._spec.container_name)def transport_security_to_args(transport_security):    args = []    if transport_security == 'tls':        args += ['--use_tls=true']    elif transport_security == 'alts':        args += ['--use_tls=false', '--use_alts=true']    elif transport_security == 'insecure':        args += ['--use_tls=false']    elif transport_security == 'google_default_credentials':        args += ['--custom_credentials_type=google_default_credentials']    else:        print('Invalid transport security option.')        sys.exit(1)    return argsdef lb_client_interop_jobspec(language,                              dns_server_ip,                              docker_image,                              transport_security='tls'):    """Runs a gRPC client under test in a docker container"""    interop_only_options = [        '--server_host=%s' % _SERVICE_NAME,        '--server_port=%d' % _FALLBACK_SERVER_PORT    ] + transport_security_to_args(transport_security)    # Don't set the server host override in any client;    # Go and Java default to no override.    # We're using a DNS server so there's no need.    if language.safename == 'c++':        interop_only_options += ['--server_host_override=""']    # Don't set --use_test_ca; we're configuring    # clients to use test CA's via alternate means.    interop_only_options += ['--use_test_ca=false']    client_args = language.client_cmd(interop_only_options)    container_name = dockerjob.random_name(        'lb_interop_client_%s' % language.safename)    docker_cmdline = docker_run_cmdline(        client_args,        environ=language.global_env(),        image=docker_image,        cwd=language.client_cwd,        docker_args=[            '--dns=%s' % dns_server_ip,            '--net=host',            '--name=%s' % container_name,            '-v',            '{grpc_grpc_root_dir}:/external_mount:ro'.format(                grpc_grpc_root_dir=ROOT),        ])    jobset.message(        'IDLE',        'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),        do_newline=True)    test_job = jobset.JobSpec(        cmdline=docker_cmdline,        shortname=('lb_interop_client:%s' % language),        timeout_seconds=_TEST_TIMEOUT,        kill_handler=_job_kill_handler)    test_job.container_name = container_name    return test_jobdef fallback_server_jobspec(transport_security, shortname):    """Create jobspec for running a fallback server"""    cmdline = [        'bin/server',        '--port=%d' % _FALLBACK_SERVER_PORT,    ] + transport_security_to_args(transport_security)    return grpc_server_in_docker_jobspec(        server_cmdline=cmdline, shortname=shortname)def backend_server_jobspec(transport_security, shortname):    """Create jobspec for running a backend server"""    cmdline = [        'bin/server',        '--port=%d' % _BACKEND_SERVER_PORT,    ] + transport_security_to_args(transport_security)    return grpc_server_in_docker_jobspec(        server_cmdline=cmdline, shortname=shortname)def grpclb_jobspec(transport_security, short_stream, backend_addrs, shortname):    """Create jobspec for running a balancer server"""    cmdline = [        'bin/fake_grpclb',        '--backend_addrs=%s' % ','.join(backend_addrs),        '--port=%d' % _BALANCER_SERVER_PORT,        '--short_stream=%s' % short_stream,        '--service_name=%s' % _SERVICE_NAME,    ] + transport_security_to_args(transport_security)    return grpc_server_in_docker_jobspec(        server_cmdline=cmdline, shortname=shortname)def grpc_server_in_docker_jobspec(server_cmdline, shortname):    container_name = dockerjob.random_name(shortname)    environ = {        'GRPC_GO_LOG_VERBOSITY_LEVEL': '3',        'GRPC_GO_LOG_SEVERITY_LEVEL': 'INFO ',    }    docker_cmdline = docker_run_cmdline(        server_cmdline,        cwd='/go',        image=docker_images.get(_FAKE_SERVERS_SAFENAME),        environ=environ,        docker_args=['--name=%s' % container_name])    jobset.message(        'IDLE',        'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),        do_newline=True)    server_job = jobset.JobSpec(        cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60)    server_job.container_name = container_name    return server_jobdef dns_server_in_docker_jobspec(grpclb_ips, fallback_ips, shortname,                                 cause_no_error_no_data_for_balancer_a_record):    container_name = dockerjob.random_name(shortname)    run_dns_server_cmdline = [        'python',        'test/cpp/naming/utils/run_dns_server_for_lb_interop_tests.py',        '--grpclb_ips=%s' % ','.join(grpclb_ips),        '--fallback_ips=%s' % ','.join(fallback_ips),    ]    if cause_no_error_no_data_for_balancer_a_record:        run_dns_server_cmdline.append(            '--cause_no_error_no_data_for_balancer_a_record')    docker_cmdline = docker_run_cmdline(        run_dns_server_cmdline,        cwd='/var/local/git/grpc',        image=docker_images.get(_FAKE_SERVERS_SAFENAME),        docker_args=['--name=%s' % container_name])    jobset.message(        'IDLE',        'docker_cmdline:\b|%s|' % ' '.join(docker_cmdline),        do_newline=True)    server_job = jobset.JobSpec(        cmdline=docker_cmdline, shortname=shortname, timeout_seconds=30 * 60)    server_job.container_name = container_name    return server_jobdef build_interop_image_jobspec(lang_safename, basename_prefix='grpc_interop'):    """Creates jobspec for building interop docker image for a language"""    tag = '%s_%s:%s' % (basename_prefix, lang_safename, uuid.uuid4())    env = {        'INTEROP_IMAGE': tag,        'BASE_NAME': '%s_%s' % (basename_prefix, lang_safename),    }    build_job = jobset.JobSpec(        cmdline=['tools/run_tests/dockerize/build_interop_image.sh'],        environ=env,        shortname='build_docker_%s' % lang_safename,        timeout_seconds=30 * 60)    build_job.tag = tag    return build_jobargp = argparse.ArgumentParser(description='Run interop tests.')argp.add_argument(    '-l',    '--language',    choices=['all'] + sorted(_LANGUAGES),    nargs='+',    default=['all'],    help='Clients to run.')argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int)argp.add_argument(    '-s',    '--scenarios_file',    default=None,    type=str,    help='File containing test scenarios as JSON configs.')argp.add_argument(    '-n',    '--scenario_name',    default=None,    type=str,    help=(        'Useful for manual runs: specify the name of '        'the scenario to run from scenarios_file. Run all scenarios if unset.'))argp.add_argument(    '--cxx_image_tag',    default=None,    type=str,    help=('Setting this skips the clients docker image '          'build step and runs the client from the named '          'image. Only supports running a one client language.'))argp.add_argument(    '--go_image_tag',    default=None,    type=str,    help=('Setting this skips the clients docker image build '          'step and runs the client from the named image. Only '          'supports running a one client language.'))argp.add_argument(    '--java_image_tag',    default=None,    type=str,    help=('Setting this skips the clients docker image build '          'step and runs the client from the named image. Only '          'supports running a one client language.'))argp.add_argument(    '--servers_image_tag',    default=None,    type=str,    help=('Setting this skips the fake servers docker image '          'build step and runs the servers from the named image.'))argp.add_argument(    '--no_skips',    default=False,    type=bool,    nargs='?',    const=True,    help=('Useful for manual runs. Setting this overrides test '          '"skips" configured in test scenarios.'))argp.add_argument(    '--verbose',    default=False,    type=bool,    nargs='?',    const=True,    help='Increase logging.')args = argp.parse_args()docker_images = {}build_jobs = []if len(args.language) and args.language[0] == 'all':    languages = _LANGUAGES.keys()else:    languages = args.languagefor lang_name in languages:    l = _LANGUAGES[lang_name]    # First check if a pre-built image was supplied, and avoid    # rebuilding the particular docker image if so.    if lang_name == 'c++' and args.cxx_image_tag:        docker_images[str(l.safename)] = args.cxx_image_tag    elif lang_name == 'go' and args.go_image_tag:        docker_images[str(l.safename)] = args.go_image_tag    elif lang_name == 'java' and args.java_image_tag:        docker_images[str(l.safename)] = args.java_image_tag    else:        # Build the test client in docker and save the fully        # built image.        job = build_interop_image_jobspec(l.safename)        build_jobs.append(job)        docker_images[str(l.safename)] = job.tag# First check if a pre-built image was supplied.if args.servers_image_tag:    docker_images[_FAKE_SERVERS_SAFENAME] = args.servers_image_tagelse:    # Build the test servers in docker and save the fully    # built image.    job = build_interop_image_jobspec(        _FAKE_SERVERS_SAFENAME, basename_prefix='lb_interop')    build_jobs.append(job)    docker_images[_FAKE_SERVERS_SAFENAME] = job.tagif build_jobs:    jobset.message('START', 'Building interop docker images.', do_newline=True)    print('Jobs to run: \n%s\n' % '\n'.join(str(j) for j in build_jobs))    num_failures, _ = jobset.run(        build_jobs, newline_on_success=True, maxjobs=args.jobs)    if num_failures == 0:        jobset.message(            'SUCCESS', 'All docker images built successfully.', do_newline=True)    else:        jobset.message(            'FAILED', 'Failed to build interop docker images.', do_newline=True)        sys.exit(1)def wait_until_dns_server_is_up(dns_server_ip):    """Probes the DNS server until it's running and safe for tests."""    for i in range(0, 30):        print('Health check: attempt to connect to DNS server over TCP.')        tcp_connect_subprocess = subprocess.Popen([            os.path.join(os.getcwd(), 'test/cpp/naming/utils/tcp_connect.py'),            '--server_host', dns_server_ip, '--server_port',            str(53), '--timeout',            str(1)        ])        tcp_connect_subprocess.communicate()        if tcp_connect_subprocess.returncode == 0:            print(('Health check: attempt to make an A-record '                   'query to DNS server.'))            dns_resolver_subprocess = subprocess.Popen(                [                    os.path.join(os.getcwd(),                                 'test/cpp/naming/utils/dns_resolver.py'),                    '--qname', ('health-check-local-dns-server-is-alive.'                                'resolver-tests.grpctestingexp'),                    '--server_host', dns_server_ip, '--server_port',                    str(53)                ],                stdout=subprocess.PIPE)            dns_resolver_stdout, _ = dns_resolver_subprocess.communicate()            if dns_resolver_subprocess.returncode == 0:                if '123.123.123.123' in dns_resolver_stdout:                    print(('DNS server is up! '                           'Successfully reached it over UDP and TCP.'))                    return        time.sleep(0.1)    raise Exception(('Failed to reach DNS server over TCP and/or UDP. '                     'Exitting without running tests.'))def shortname(shortname_prefix, shortname, index):    return '%s_%s_%d' % (shortname_prefix, shortname, index)def run_one_scenario(scenario_config):    jobset.message('START', 'Run scenario: %s' % scenario_config['name'])    server_jobs = {}    server_addresses = {}    suppress_server_logs = True    try:        backend_addrs = []        fallback_ips = []        grpclb_ips = []        shortname_prefix = scenario_config['name']        # Start backends        for i in xrange(len(scenario_config['backend_configs'])):            backend_config = scenario_config['backend_configs'][i]            backend_shortname = shortname(shortname_prefix, 'backend_server', i)            backend_spec = backend_server_jobspec(                backend_config['transport_sec'], backend_shortname)            backend_job = dockerjob.DockerJob(backend_spec)            server_jobs[backend_shortname] = backend_job            backend_addrs.append('%s:%d' % (backend_job.ip_address(),                                            _BACKEND_SERVER_PORT))        # Start fallbacks        for i in xrange(len(scenario_config['fallback_configs'])):            fallback_config = scenario_config['fallback_configs'][i]            fallback_shortname = shortname(shortname_prefix, 'fallback_server',                                           i)            fallback_spec = fallback_server_jobspec(                fallback_config['transport_sec'], fallback_shortname)            fallback_job = dockerjob.DockerJob(fallback_spec)            server_jobs[fallback_shortname] = fallback_job            fallback_ips.append(fallback_job.ip_address())        # Start balancers        for i in xrange(len(scenario_config['balancer_configs'])):            balancer_config = scenario_config['balancer_configs'][i]            grpclb_shortname = shortname(shortname_prefix, 'grpclb_server', i)            grpclb_spec = grpclb_jobspec(balancer_config['transport_sec'],                                         balancer_config['short_stream'],                                         backend_addrs, grpclb_shortname)            grpclb_job = dockerjob.DockerJob(grpclb_spec)            server_jobs[grpclb_shortname] = grpclb_job            grpclb_ips.append(grpclb_job.ip_address())        # Start DNS server        dns_server_shortname = shortname(shortname_prefix, 'dns_server', 0)        dns_server_spec = dns_server_in_docker_jobspec(            grpclb_ips, fallback_ips, dns_server_shortname,            scenario_config['cause_no_error_no_data_for_balancer_a_record'])        dns_server_job = dockerjob.DockerJob(dns_server_spec)        server_jobs[dns_server_shortname] = dns_server_job        # Get the IP address of the docker container running the DNS server.        # The DNS server is running on port 53 of that IP address. Note we will        # point the DNS resolvers of grpc clients under test to our controlled        # DNS server by effectively modifying the /etc/resolve.conf "nameserver"        # lists of their docker containers.        dns_server_ip = dns_server_job.ip_address()        wait_until_dns_server_is_up(dns_server_ip)        # Run clients        jobs = []        for lang_name in languages:            # Skip languages that are known to not currently            # work for this test.            if not args.no_skips and lang_name in scenario_config.get(                    'skip_langs', []):                jobset.message('IDLE',                               'Skipping scenario: %s for language: %s\n' %                               (scenario_config['name'], lang_name))                continue            lang = _LANGUAGES[lang_name]            test_job = lb_client_interop_jobspec(                lang,                dns_server_ip,                docker_image=docker_images.get(lang.safename),                transport_security=scenario_config['transport_sec'])            jobs.append(test_job)        jobset.message('IDLE', 'Jobs to run: \n%s\n' % '\n'.join(            str(job) for job in jobs))        num_failures, resultset = jobset.run(            jobs, newline_on_success=True, maxjobs=args.jobs)        report_utils.render_junit_xml_report(resultset, 'sponge_log.xml')        if num_failures:            suppress_server_logs = False            jobset.message(                'FAILED',                'Scenario: %s. Some tests failed' % scenario_config['name'],                do_newline=True)        else:            jobset.message(                'SUCCESS',                'Scenario: %s. All tests passed' % scenario_config['name'],                do_newline=True)        return num_failures    finally:        # Check if servers are still running.        for server, job in server_jobs.items():            if not job.is_running():                print('Server "%s" has exited prematurely.' % server)        suppress_failure = suppress_server_logs and not args.verbose        dockerjob.finish_jobs(            [j for j in six.itervalues(server_jobs)],            suppress_failure=suppress_failure)num_failures = 0with open(args.scenarios_file, 'r') as scenarios_input:    all_scenarios = json.loads(scenarios_input.read())    for scenario in all_scenarios:        if args.scenario_name:            if args.scenario_name != scenario['name']:                jobset.message('IDLE',                               'Skipping scenario: %s' % scenario['name'])                continue        num_failures += run_one_scenario(scenario)if num_failures == 0:    sys.exit(0)else:    sys.exit(1)
 |