| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 | #!/usr/bin/env python2.7# Copyright 2016, Google Inc.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are# met:##     * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.#     * Redistributions in binary form must reproduce the above# copyright notice, this list of conditions and the following disclaimer# in the documentation and/or other materials provided with the# distribution.#     * Neither the name of Google Inc. nor the names of its# contributors may be used to endorse or promote products derived from# this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."""Run performance tests locally or remotely."""from __future__ import print_functionimport argparseimport collectionsimport itertoolsimport jsonimport multiprocessingimport osimport pipesimport reimport subprocessimport sysimport tempfileimport timeimport tracebackimport uuidimport performance.scenario_config as scenario_configimport python_utils.jobset as jobsetimport python_utils.report_utils as report_utils_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))os.chdir(_ROOT)_REMOTE_HOST_USERNAME = 'jenkins'class QpsWorkerJob:  """Encapsulates a qps worker server job."""  def __init__(self, spec, language, host_and_port, perf_file_base_name=None):    self._spec = spec    self.language = language    self.host_and_port = host_and_port    self._job = None    self.perf_file_base_name = perf_file_base_name  def start(self):    self._job = jobset.Job(self._spec, newline_on_success=True, travis=True, add_env={})  def is_running(self):    """Polls a job and returns True if given job is still running."""    return self._job and self._job.state() == jobset._RUNNING  def kill(self):    if self._job:      self._job.kill()      self._job = Nonedef create_qpsworker_job(language, shortname=None, port=10000, remote_host=None, perf_cmd=None):  cmdline = (language.worker_cmdline() + ['--driver_port=%s' % port])  if remote_host:    host_and_port='%s:%s' % (remote_host, port)  else:    host_and_port='localhost:%s' % port  perf_file_base_name = None  if perf_cmd:    perf_file_base_name = '%s-%s' % (host_and_port, shortname)    # specify -o output file so perf.data gets collected when worker stopped    cmdline = perf_cmd + ['-o', '%s-perf.data' % perf_file_base_name] + cmdline  worker_timeout = 3 * 60  if remote_host:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)    ssh_cmd = ['ssh']    cmdline = ['timeout', '%s' % (worker_timeout + 30)] + cmdline    ssh_cmd.extend([str(user_at_host), 'cd ~/performance_workspace/grpc/ && %s' % ' '.join(cmdline)])    cmdline = ssh_cmd  jobspec = jobset.JobSpec(      cmdline=cmdline,      shortname=shortname,      timeout_seconds=worker_timeout,  # workers get restarted after each scenario      verbose_success=True)  return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name)def create_scenario_jobspec(scenario_json, workers, remote_host=None,                            bq_result_table=None, server_cpu_load=0):  """Runs one scenario using QPS driver."""  # setting QPS_WORKERS env variable here makes sure it works with SSH too.  cmd = 'QPS_WORKERS="%s" ' % ','.join(workers)  if bq_result_table:    cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table  cmd += 'tools/run_tests/performance/run_qps_driver.sh '  cmd += '--scenarios_json=%s ' % pipes.quote(json.dumps({'scenarios': [scenario_json]}))  cmd += '--scenario_result_file=scenario_result.json '  if server_cpu_load != 0:      cmd += '--search_param=offered_load --initial_search_value=1000 --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01' % server_cpu_load  if remote_host:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)    cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))  return jobset.JobSpec(      cmdline=[cmd],      shortname='qps_json_driver.%s' % scenario_json['name'],      timeout_seconds=12*60,      shell=True,      verbose_success=True)def create_quit_jobspec(workers, remote_host=None):  """Runs quit using QPS driver."""  # setting QPS_WORKERS env variable here makes sure it works with SSH too.  cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(w.host_and_port for w in workers)  if remote_host:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)    cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))  return jobset.JobSpec(      cmdline=[cmd],      shortname='qps_json_driver.quit',      timeout_seconds=3*60,      shell=True,      verbose_success=True)def create_netperf_jobspec(server_host='localhost', client_host=None,                           bq_result_table=None):  """Runs netperf benchmark."""  cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host  if bq_result_table:    cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table  if client_host:    # If netperf is running remotely, the env variables populated by Jenkins    # won't be available on the client, but we need them for uploading results    # to BigQuery.    jenkins_job_name = os.getenv('JOB_NAME')    if jenkins_job_name:      cmd += 'JOB_NAME="%s" ' % jenkins_job_name    jenkins_build_number = os.getenv('BUILD_NUMBER')    if jenkins_build_number:      cmd += 'BUILD_NUMBER="%s" ' % jenkins_build_number  cmd += 'tools/run_tests/performance/run_netperf.sh'  if client_host:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)    cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))  return jobset.JobSpec(      cmdline=[cmd],      shortname='netperf',      timeout_seconds=60,      shell=True,      verbose_success=True)def archive_repo(languages):  """Archives local version of repo including submodules."""  cmdline=['tar', '-cf', '../grpc.tar', '../grpc/']  if 'java' in languages:    cmdline.append('../grpc-java')  if 'go' in languages:    cmdline.append('../grpc-go')  archive_job = jobset.JobSpec(      cmdline=cmdline,      shortname='archive_repo',      timeout_seconds=3*60)  jobset.message('START', 'Archiving local repository.', do_newline=True)  num_failures, _ = jobset.run(      [archive_job], newline_on_success=True, maxjobs=1)  if num_failures == 0:    jobset.message('SUCCESS',                   'Archive with local repository created successfully.',                   do_newline=True)  else:    jobset.message('FAILED', 'Failed to archive local repository.',                   do_newline=True)    sys.exit(1)def prepare_remote_hosts(hosts, prepare_local=False):  """Prepares remote hosts (and maybe prepare localhost as well)."""  prepare_timeout = 5*60  prepare_jobs = []  for host in hosts:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)    prepare_jobs.append(        jobset.JobSpec(            cmdline=['tools/run_tests/performance/remote_host_prepare.sh'],            shortname='remote_host_prepare.%s' % host,            environ = {'USER_AT_HOST': user_at_host},            timeout_seconds=prepare_timeout))  if prepare_local:    # Prepare localhost as well    prepare_jobs.append(        jobset.JobSpec(            cmdline=['tools/run_tests/performance/kill_workers.sh'],            shortname='local_prepare',            timeout_seconds=prepare_timeout))  jobset.message('START', 'Preparing hosts.', do_newline=True)  num_failures, _ = jobset.run(      prepare_jobs, newline_on_success=True, maxjobs=10)  if num_failures == 0:    jobset.message('SUCCESS',                   'Prepare step completed successfully.',                   do_newline=True)  else:    jobset.message('FAILED', 'Failed to prepare remote hosts.',                   do_newline=True)    sys.exit(1)def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), build_local=False):  """Builds performance worker on remote hosts (and maybe also locally)."""  build_timeout = 15*60  build_jobs = []  for host in hosts:    user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)    build_jobs.append(        jobset.JobSpec(            cmdline=['tools/run_tests/performance/remote_host_build.sh'] + languages,            shortname='remote_host_build.%s' % host,            environ = {'USER_AT_HOST': user_at_host, 'CONFIG': 'lto'},            timeout_seconds=build_timeout))  if build_local:    # Build locally as well    build_jobs.append(        jobset.JobSpec(            cmdline=['tools/run_tests/performance/build_performance.sh'] + languages,            shortname='local_build',            environ = {'CONFIG': 'lto'},            timeout_seconds=build_timeout))  jobset.message('START', 'Building.', do_newline=True)  num_failures, _ = jobset.run(      build_jobs, newline_on_success=True, maxjobs=10)  if num_failures == 0:    jobset.message('SUCCESS',                   'Built successfully.',                   do_newline=True)  else:    jobset.message('FAILED', 'Build failed.',                   do_newline=True)    sys.exit(1)def create_qpsworkers(languages, worker_hosts, perf_cmd=None):  """Creates QPS workers (but does not start them)."""  if not worker_hosts:    # run two workers locally (for each language)    workers=[(None, 10000), (None, 10010)]  elif len(worker_hosts) == 1:    # run two workers on the remote host (for each language)    workers=[(worker_hosts[0], 10000), (worker_hosts[0], 10010)]  else:    # run one worker per each remote host (for each language)    workers=[(worker_host, 10000) for worker_host in worker_hosts]  return [create_qpsworker_job(language,                               shortname= 'qps_worker_%s_%s' % (language,                                                                worker_idx),                               port=worker[1] + language.worker_port_offset(),                               remote_host=worker[0],                               perf_cmd=perf_cmd)          for language in languages          for worker_idx, worker in enumerate(workers)]def perf_report_processor_job(worker_host, perf_base_name, output_filename):  print('Creating perf report collection job for %s' % worker_host)  cmd = ''  if worker_host != 'localhost':    user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host)    cmd = "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s\         tools/run_tests/performance/process_remote_perf_flamegraphs.sh" \          % (user_at_host, output_filename, args.flame_graph_reports, perf_base_name)  else:    cmd = "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s\          tools/run_tests/performance/process_local_perf_flamegraphs.sh" \          % (output_filename, args.flame_graph_reports, perf_base_name)  return jobset.JobSpec(cmdline=cmd,                        timeout_seconds=3*60,                        shell=True,                        verbose_success=True,                        shortname='process perf report')Scenario = collections.namedtuple('Scenario', 'jobspec workers name')def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',                     category='all', bq_result_table=None,                     netperf=False, netperf_hosts=[], server_cpu_load=0):  """Create jobspecs for scenarios to run."""  all_workers = [worker                 for workers in workers_by_lang.values()                 for worker in workers]  scenarios = []  _NO_WORKERS = []  if netperf:    if not netperf_hosts:      netperf_server='localhost'      netperf_client=None    elif len(netperf_hosts) == 1:      netperf_server=netperf_hosts[0]      netperf_client=netperf_hosts[0]    else:      netperf_server=netperf_hosts[0]      netperf_client=netperf_hosts[1]    scenarios.append(Scenario(        create_netperf_jobspec(server_host=netperf_server,                               client_host=netperf_client,                               bq_result_table=bq_result_table),        _NO_WORKERS, 'netperf'))  for language in languages:    for scenario_json in language.scenarios():      if re.search(args.regex, scenario_json['name']):        categories = scenario_json.get('CATEGORIES', ['scalable', 'smoketest'])        if category in categories or category == 'all':          workers = workers_by_lang[str(language)][:]          # 'SERVER_LANGUAGE' is an indicator for this script to pick          # a server in different language.          custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None)          custom_client_lang = scenario_json.get('CLIENT_LANGUAGE', None)          scenario_json = scenario_config.remove_nonproto_fields(scenario_json)          if custom_server_lang and custom_client_lang:            raise Exception('Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE'                            'in the same scenario')          if custom_server_lang:            if not workers_by_lang.get(custom_server_lang, []):              print('Warning: Skipping scenario %s as' % scenario_json['name'])              print('SERVER_LANGUAGE is set to %s yet the language has '                    'not been selected with -l' % custom_server_lang)              continue            for idx in range(0, scenario_json['num_servers']):              # replace first X workers by workers of a different language              workers[idx] = workers_by_lang[custom_server_lang][idx]          if custom_client_lang:            if not workers_by_lang.get(custom_client_lang, []):              print('Warning: Skipping scenario %s as' % scenario_json['name'])              print('CLIENT_LANGUAGE is set to %s yet the language has '                    'not been selected with -l' % custom_client_lang)              continue            for idx in range(scenario_json['num_servers'], len(workers)):              # replace all client workers by workers of a different language,              # leave num_server workers as they are server workers.              workers[idx] = workers_by_lang[custom_client_lang][idx]          scenario = Scenario(              create_scenario_jobspec(scenario_json,                                      [w.host_and_port for w in workers],                                      remote_host=remote_host,                                      bq_result_table=bq_result_table,                                      server_cpu_load=server_cpu_load),              workers,              scenario_json['name'])          scenarios.append(scenario)  return scenariosdef finish_qps_workers(jobs):  """Waits for given jobs to finish and eventually kills them."""  retries = 0  num_killed = 0  while any(job.is_running() for job in jobs):    for job in qpsworker_jobs:      if job.is_running():        print('QPS worker "%s" is still running.' % job.host_and_port)    if retries > 10:      print('Killing all QPS workers.')      for job in jobs:        job.kill()        num_killed += 1    retries += 1    time.sleep(3)  print('All QPS workers finished.')  return num_killedprofile_output_files = []# Collect perf text reports and flamegraphs if perf_cmd was used# Note the base names of perf text reports are used when creating and processing# perf data. The scenario name uniqifies the output name in the final# perf reports directory. # Alos, the perf profiles need to be fetched and processed after each scenario# in order to avoid clobbering the output files.def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name):  perf_report_jobs = []  global profile_output_files  for host_and_port in hosts_and_base_names:    perf_base_name = hosts_and_base_names[host_and_port]    output_filename = '%s-%s' % (scenario_name, perf_base_name)    # from the base filename, create .svg output filename    host = host_and_port.split(':')[0]    profile_output_files.append('%s.svg' % output_filename)    perf_report_jobs.append(perf_report_processor_job(host, perf_base_name, output_filename))  jobset.message('START', 'Collecting perf reports from qps workers', do_newline=True)  failures, _ = jobset.run(perf_report_jobs, newline_on_success=True, maxjobs=1)  jobset.message('END', 'Collecting perf reports from qps workers', do_newline=True)  return failuresargp = argparse.ArgumentParser(description='Run performance tests.')argp.add_argument('-l', '--language',                  choices=['all'] + sorted(scenario_config.LANGUAGES.keys()),                  nargs='+',                  required=True,                  help='Languages to benchmark.')argp.add_argument('--remote_driver_host',                  default=None,                  help='Run QPS driver on given host. By default, QPS driver is run locally.')argp.add_argument('--remote_worker_host',                  nargs='+',                  default=[],                  help='Worker hosts where to start QPS workers.')argp.add_argument('--dry_run',                  default=False,                  action='store_const',                  const=True,                  help='Just list scenarios to be run, but don\'t run them.')argp.add_argument('-r', '--regex', default='.*', type=str,                  help='Regex to select scenarios to run.')argp.add_argument('--bq_result_table', default=None, type=str,                  help='Bigquery "dataset.table" to upload results to.')argp.add_argument('--category',                  choices=['smoketest','all','scalable','sweep'],                  default='all',                  help='Select a category of tests to run.')argp.add_argument('--netperf',                  default=False,                  action='store_const',                  const=True,                  help='Run netperf benchmark as one of the scenarios.')argp.add_argument('--server_cpu_load',                  default=0, type=int,                  help='Select a targeted server cpu load to run. 0 means ignore this flag')argp.add_argument('-x', '--xml_report', default='report.xml', type=str,                  help='Name of XML report file to generate.')argp.add_argument('--perf_args',                  help=('Example usage: "--perf_args=record -F 99 -g". '                        'Wrap QPS workers in a perf command '                        'with the arguments to perf specified here. '                        '".svg" flame graph profiles will be '                        'created for each Qps Worker on each scenario. '                        'Files will output to "<repo_root>/<args.flame_graph_reports>" '                        'directory. Output files from running the worker '                        'under perf are saved in the repo root where its ran. '                        'Note that the perf "-g" flag is necessary for '                        'flame graphs generation to work (assuming the binary '                        'being profiled uses frame pointers, check out '                        '"--call-graph dwarf" option using libunwind otherwise.) '                        'Also note that the entire "--perf_args=<arg(s)>" must '                        'be wrapped in quotes as in the example usage. '                        'If the "--perg_args" is unspecified, "perf" will '                        'not be used at all. '                        'See http://www.brendangregg.com/perf.html '                        'for more general perf examples.'))argp.add_argument('--skip_generate_flamegraphs',                  default=False,                  action='store_const',                  const=True,                  help=('Turn flame graph generation off. '                        'May be useful if "perf_args" arguments do not make sense for '                        'generating flamegraphs (e.g., "--perf_args=stat ...")'))argp.add_argument('-f', '--flame_graph_reports', default='perf_reports', type=str,                  help='Name of directory to output flame graph profiles to, if any are created.')args = argp.parse_args()languages = set(scenario_config.LANGUAGES[l]                for l in itertools.chain.from_iterable(                      scenario_config.LANGUAGES.iterkeys() if x == 'all' else [x]                      for x in args.language))# Put together set of remote hosts where to run and buildremote_hosts = set()if args.remote_worker_host:  for host in args.remote_worker_host:    remote_hosts.add(host)if args.remote_driver_host:  remote_hosts.add(args.remote_driver_host)if not args.dry_run:  if remote_hosts:    archive_repo(languages=[str(l) for l in languages])    prepare_remote_hosts(remote_hosts, prepare_local=True)  else:    prepare_remote_hosts([], prepare_local=True)build_local = Falseif not args.remote_driver_host:  build_local = Trueif not args.dry_run:  build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local)perf_cmd = Noneif args.perf_args:  print('Running workers under perf profiler')  # Expect /usr/bin/perf to be installed here, as is usual  perf_cmd = ['/usr/bin/perf']  perf_cmd.extend(re.split('\s+', args.perf_args))qpsworker_jobs = create_qpsworkers(languages, args.remote_worker_host, perf_cmd=perf_cmd)# get list of worker addresses for each language.workers_by_lang = dict([(str(language), []) for language in languages])for job in qpsworker_jobs:  workers_by_lang[str(job.language)].append(job)scenarios = create_scenarios(languages,                           workers_by_lang=workers_by_lang,                           remote_host=args.remote_driver_host,                           regex=args.regex,                           category=args.category,                           bq_result_table=args.bq_result_table,                           netperf=args.netperf,                           netperf_hosts=args.remote_worker_host,                           server_cpu_load=args.server_cpu_load)if not scenarios:  raise Exception('No scenarios to run')total_scenario_failures = 0qps_workers_killed = 0merged_resultset = {}perf_report_failures = 0for scenario in scenarios:  if args.dry_run:    print(scenario.name)  else:    scenario_failures = 0    try:      for worker in scenario.workers:        worker.start()      jobs = [scenario.jobspec]      if scenario.workers:        jobs.append(create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host))      scenario_failures, resultset = jobset.run(jobs, newline_on_success=True, maxjobs=1)      total_scenario_failures += scenario_failures      merged_resultset = dict(itertools.chain(merged_resultset.iteritems(),                                              resultset.iteritems()))    finally:      # Consider qps workers that need to be killed as failures      qps_workers_killed += finish_qps_workers(scenario.workers)    if perf_cmd and scenario_failures == 0 and not args.skip_generate_flamegraphs:      workers_and_base_names = {}      for worker in scenario.workers:        if not worker.perf_file_base_name:          raise Exception('using perf buf perf report filename is unspecified')        workers_and_base_names[worker.host_and_port] = worker.perf_file_base_name      perf_report_failures += run_collect_perf_profile_jobs(workers_and_base_names, scenario.name)# Still write the index.html even if some scenarios failed.# 'profile_output_files' will only have names for scenarios that passedif perf_cmd and not args.skip_generate_flamegraphs:  # write the index fil to the output dir, with all profiles from all scenarios/workers  report_utils.render_perf_profiling_results('%s/index.html' % args.flame_graph_reports, profile_output_files)report_utils.render_junit_xml_report(merged_resultset, args.xml_report,                                     suite_name='benchmarks')if total_scenario_failures > 0 or qps_workers_killed > 0:  print('%s scenarios failed and %s qps worker jobs killed' % (total_scenario_failures, qps_workers_killed))  sys.exit(1)if perf_report_failures > 0:  print('%s perf profile collection jobs failed' % perf_report_failures)  sys.exit(1)
 |