|  | @@ -31,6 +31,7 @@
 | 
	
		
			
				|  |  |  """Run performance tests locally or remotely."""
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import argparse
 | 
	
		
			
				|  |  | +import itertools
 | 
	
		
			
				|  |  |  import jobset
 | 
	
		
			
				|  |  |  import multiprocessing
 | 
	
		
			
				|  |  |  import os
 | 
	
	
		
			
				|  | @@ -53,6 +54,12 @@ class CXXLanguage:
 | 
	
		
			
				|  |  |    def __init__(self):
 | 
	
		
			
				|  |  |      self.safename = 'cxx'
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  def worker_cmdline(self):
 | 
	
		
			
				|  |  | +    return ['bins/opt/qps_worker']
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def worker_port_offset(self):
 | 
	
		
			
				|  |  | +    return 0
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    def scenarios(self):
 | 
	
		
			
				|  |  |      # TODO(jtattermusch): add more scenarios
 | 
	
		
			
				|  |  |      return {
 | 
	
	
		
			
				|  | @@ -96,6 +103,32 @@ class CSharpLanguage:
 | 
	
		
			
				|  |  |    def __init__(self):
 | 
	
		
			
				|  |  |      self.safename = str(self)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  def worker_cmdline(self):
 | 
	
		
			
				|  |  | +    return ['tools/run_tests/performance/run_worker_csharp.sh']
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def worker_port_offset(self):
 | 
	
		
			
				|  |  | +    return 100
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def scenarios(self):
 | 
	
		
			
				|  |  | +    # TODO(jtattermusch): add more scenarios
 | 
	
		
			
				|  |  | +    return {
 | 
	
		
			
				|  |  | +            # Scenario 1: generic async streaming ping-pong (contentionless latency)
 | 
	
		
			
				|  |  | +            'csharp_async_generic_streaming_ping_pong': [
 | 
	
		
			
				|  |  | +                '--rpc_type=STREAMING',
 | 
	
		
			
				|  |  | +                '--client_type=ASYNC_CLIENT',
 | 
	
		
			
				|  |  | +                '--server_type=ASYNC_GENERIC_SERVER',
 | 
	
		
			
				|  |  | +                '--outstanding_rpcs_per_channel=1',
 | 
	
		
			
				|  |  | +                '--client_channels=1',
 | 
	
		
			
				|  |  | +                '--bbuf_req_size=0',
 | 
	
		
			
				|  |  | +                '--bbuf_resp_size=0',
 | 
	
		
			
				|  |  | +                '--async_client_threads=1',
 | 
	
		
			
				|  |  | +                '--async_server_threads=1',
 | 
	
		
			
				|  |  | +                '--secure_test=true',
 | 
	
		
			
				|  |  | +                '--num_servers=1',
 | 
	
		
			
				|  |  | +                '--num_clients=1',
 | 
	
		
			
				|  |  | +                '--server_core_limit=0',
 | 
	
		
			
				|  |  | +                '--client_core_limit=0']}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    def __str__(self):
 | 
	
		
			
				|  |  |      return 'csharp'
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -106,6 +139,29 @@ class NodeLanguage:
 | 
	
		
			
				|  |  |      pass
 | 
	
		
			
				|  |  |      self.safename = str(self)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  def worker_cmdline(self):
 | 
	
		
			
				|  |  | +    return ['tools/run_tests/performance/run_worker_node.sh']
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def worker_port_offset(self):
 | 
	
		
			
				|  |  | +    return 200
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def scenarios(self):
 | 
	
		
			
				|  |  | +    # TODO(jtattermusch): add more scenarios
 | 
	
		
			
				|  |  | +    return {
 | 
	
		
			
				|  |  | +             'node_sync_unary_ping_pong_protobuf': [
 | 
	
		
			
				|  |  | +                '--rpc_type=UNARY',
 | 
	
		
			
				|  |  | +                '--client_type=ASYNC_CLIENT',
 | 
	
		
			
				|  |  | +                '--server_type=ASYNC_SERVER',
 | 
	
		
			
				|  |  | +                '--outstanding_rpcs_per_channel=1',
 | 
	
		
			
				|  |  | +                '--client_channels=1',
 | 
	
		
			
				|  |  | +                '--simple_req_size=0',
 | 
	
		
			
				|  |  | +                '--simple_resp_size=0',
 | 
	
		
			
				|  |  | +                '--secure_test=false',
 | 
	
		
			
				|  |  | +                '--num_servers=1',
 | 
	
		
			
				|  |  | +                '--num_clients=1',
 | 
	
		
			
				|  |  | +                '--server_core_limit=0',
 | 
	
		
			
				|  |  | +                '--client_core_limit=0']}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    def __str__(self):
 | 
	
		
			
				|  |  |      return 'node'
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -120,8 +176,9 @@ _LANGUAGES = {
 | 
	
		
			
				|  |  |  class QpsWorkerJob:
 | 
	
		
			
				|  |  |    """Encapsulates a qps worker server job."""
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  def __init__(self, spec, host_and_port):
 | 
	
		
			
				|  |  | +  def __init__(self, spec, language, host_and_port):
 | 
	
		
			
				|  |  |      self._spec = spec
 | 
	
		
			
				|  |  | +    self.language = language
 | 
	
		
			
				|  |  |      self.host_and_port = host_and_port
 | 
	
		
			
				|  |  |      self._job = jobset.Job(spec, bin_hash=None, newline_on_success=True, travis=True, add_env={})
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -133,22 +190,24 @@ class QpsWorkerJob:
 | 
	
		
			
				|  |  |      return self._job.kill()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def create_qpsworker_job(language, port=10000, remote_host=None):
 | 
	
		
			
				|  |  | +def create_qpsworker_job(language, shortname=None,
 | 
	
		
			
				|  |  | +                         port=10000, remote_host=None):
 | 
	
		
			
				|  |  |    # TODO: support more languages
 | 
	
		
			
				|  |  | -  cmd = 'bins/opt/qps_worker --driver_port=%s' % port
 | 
	
		
			
				|  |  | +  cmdline = language.worker_cmdline() + ['--driver_port=%s' % port]
 | 
	
		
			
				|  |  |    if remote_host:
 | 
	
		
			
				|  |  |      user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
 | 
	
		
			
				|  |  | -    cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd)
 | 
	
		
			
				|  |  | +    cmdline = ['ssh',
 | 
	
		
			
				|  |  | +               str(user_at_host),
 | 
	
		
			
				|  |  | +               'cd ~/performance_workspace/grpc/ && %s' % ' '.join(cmdline)]
 | 
	
		
			
				|  |  |      host_and_port='%s:%s' % (remote_host, port)
 | 
	
		
			
				|  |  |    else:
 | 
	
		
			
				|  |  |      host_and_port='localhost:%s' % port
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    jobspec = jobset.JobSpec(
 | 
	
		
			
				|  |  | -      cmdline=[cmd],
 | 
	
		
			
				|  |  | -      shortname='qps_worker',
 | 
	
		
			
				|  |  | -      timeout_seconds=15*60,
 | 
	
		
			
				|  |  | -      shell=True)
 | 
	
		
			
				|  |  | -  return QpsWorkerJob(jobspec, host_and_port)
 | 
	
		
			
				|  |  | +      cmdline=cmdline,
 | 
	
		
			
				|  |  | +      shortname=shortname,
 | 
	
		
			
				|  |  | +      timeout_seconds=15*60)
 | 
	
		
			
				|  |  | +  return QpsWorkerJob(jobspec, language, host_and_port)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def create_scenario_jobspec(scenario_name, driver_args, workers, remote_host=None):
 | 
	
	
		
			
				|  | @@ -199,7 +258,7 @@ def prepare_remote_hosts(hosts):
 | 
	
		
			
				|  |  |              cmdline=['tools/run_tests/performance/remote_host_prepare.sh'],
 | 
	
		
			
				|  |  |              shortname='remote_host_prepare.%s' % host,
 | 
	
		
			
				|  |  |              environ = {'USER_AT_HOST': user_at_host},
 | 
	
		
			
				|  |  | -            timeout_seconds=3*60))
 | 
	
		
			
				|  |  | +            timeout_seconds=5*60))
 | 
	
		
			
				|  |  |    jobset.message('START', 'Preparing remote hosts.', do_newline=True)
 | 
	
		
			
				|  |  |    num_failures, _ = jobset.run(
 | 
	
		
			
				|  |  |        prepare_jobs, newline_on_success=True, maxjobs=10)
 | 
	
	
		
			
				|  | @@ -214,7 +273,7 @@ def prepare_remote_hosts(hosts):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def build_on_remote_hosts(hosts, build_local=False):
 | 
	
		
			
				|  |  | -  """Builds performance worker on remote hosts."""
 | 
	
		
			
				|  |  | +  """Builds performance worker on remote hosts (and maybe also locally)."""
 | 
	
		
			
				|  |  |    build_timeout = 15*60
 | 
	
		
			
				|  |  |    build_jobs = []
 | 
	
		
			
				|  |  |    for host in hosts:
 | 
	
	
		
			
				|  | @@ -233,52 +292,58 @@ def build_on_remote_hosts(hosts, build_local=False):
 | 
	
		
			
				|  |  |              shortname='local_build',
 | 
	
		
			
				|  |  |              environ = {'CONFIG': 'opt'},
 | 
	
		
			
				|  |  |              timeout_seconds=build_timeout))
 | 
	
		
			
				|  |  | -  jobset.message('START', 'Building on remote hosts.', do_newline=True)
 | 
	
		
			
				|  |  | +  jobset.message('START', 'Building.', do_newline=True)
 | 
	
		
			
				|  |  |    num_failures, _ = jobset.run(
 | 
	
		
			
				|  |  |        build_jobs, newline_on_success=True, maxjobs=10)
 | 
	
		
			
				|  |  |    if num_failures == 0:
 | 
	
		
			
				|  |  |      jobset.message('SUCCESS',
 | 
	
		
			
				|  |  | -                   'Build on remote hosts was successful.',
 | 
	
		
			
				|  |  | +                   'Built successfully.',
 | 
	
		
			
				|  |  |                     do_newline=True)
 | 
	
		
			
				|  |  |    else:
 | 
	
		
			
				|  |  | -    jobset.message('FAILED', 'Failed to build on remote hosts.',
 | 
	
		
			
				|  |  | +    jobset.message('FAILED', 'Build failed.',
 | 
	
		
			
				|  |  |                     do_newline=True)
 | 
	
		
			
				|  |  |      sys.exit(1)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def start_qpsworkers(worker_hosts):
 | 
	
		
			
				|  |  | +def start_qpsworkers(languages, worker_hosts):
 | 
	
		
			
				|  |  |    """Starts QPS workers as background jobs."""
 | 
	
		
			
				|  |  |    if not worker_hosts:
 | 
	
		
			
				|  |  | -    # run two workers locally
 | 
	
		
			
				|  |  | +    # run two workers locally (for each language)
 | 
	
		
			
				|  |  |      workers=[(None, 10000), (None, 10010)]
 | 
	
		
			
				|  |  |    elif len(worker_hosts) == 1:
 | 
	
		
			
				|  |  | -    # run two workers on the remote host
 | 
	
		
			
				|  |  | +    # run two workers on the remote host (for each language)
 | 
	
		
			
				|  |  |      workers=[(worker_hosts[0], 10000), (worker_hosts[0], 10010)]
 | 
	
		
			
				|  |  |    else:
 | 
	
		
			
				|  |  | -    # run one worker per each remote host
 | 
	
		
			
				|  |  | +    # run one worker per each remote host (for each language)
 | 
	
		
			
				|  |  |      workers=[(worker_host, 10000) for worker_host in worker_hosts]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  return [create_qpsworker_job(CXXLanguage(),
 | 
	
		
			
				|  |  | -                               port=worker[1],
 | 
	
		
			
				|  |  | +  return [create_qpsworker_job(language,
 | 
	
		
			
				|  |  | +                               shortname= 'qps_worker_%s_%s' % (language,
 | 
	
		
			
				|  |  | +                                                                worker_idx),
 | 
	
		
			
				|  |  | +                               port=worker[1] + language.worker_port_offset(),
 | 
	
		
			
				|  |  |                                 remote_host=worker[0])
 | 
	
		
			
				|  |  | -          for worker in workers]
 | 
	
		
			
				|  |  | +          for language in languages
 | 
	
		
			
				|  |  | +          for worker_idx, worker in enumerate(workers)]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def create_scenarios(languages, workers, remote_host=None):
 | 
	
		
			
				|  |  | +def create_scenarios(languages, workers_by_lang, remote_host=None):
 | 
	
		
			
				|  |  |    """Create jobspecs for scenarios to run."""
 | 
	
		
			
				|  |  |    scenarios = []
 | 
	
		
			
				|  |  |    for language in languages:
 | 
	
		
			
				|  |  |      for scenario_name, driver_args in language.scenarios().iteritems():
 | 
	
		
			
				|  |  |        scenario = create_scenario_jobspec(scenario_name,
 | 
	
		
			
				|  |  |                                           driver_args,
 | 
	
		
			
				|  |  | -                                         workers,
 | 
	
		
			
				|  |  | +                                         workers_by_lang[str(language)],
 | 
	
		
			
				|  |  |                                           remote_host=remote_host)
 | 
	
		
			
				|  |  |        scenarios.append(scenario)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    # the very last scenario requests shutting down the workers.
 | 
	
		
			
				|  |  | +  all_workers = [worker
 | 
	
		
			
				|  |  | +                 for workers in workers_by_lang.values()
 | 
	
		
			
				|  |  | +                 for worker in workers]
 | 
	
		
			
				|  |  |    scenarios.append(create_scenario_jobspec('quit_workers',
 | 
	
		
			
				|  |  |                                             ['--quit=true'],
 | 
	
		
			
				|  |  | -                                           workers,
 | 
	
		
			
				|  |  | +                                           all_workers,
 | 
	
		
			
				|  |  |                                             remote_host=remote_host))
 | 
	
		
			
				|  |  |    return scenarios
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -300,6 +365,11 @@ def finish_qps_workers(jobs):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  argp = argparse.ArgumentParser(description='Run performance tests.')
 | 
	
		
			
				|  |  | +argp.add_argument('-l', '--language',
 | 
	
		
			
				|  |  | +                  choices=['all'] + sorted(_LANGUAGES.keys()),
 | 
	
		
			
				|  |  | +                  nargs='+',
 | 
	
		
			
				|  |  | +                  default=['all'],
 | 
	
		
			
				|  |  | +                  help='Languages to benchmark.')
 | 
	
		
			
				|  |  |  argp.add_argument('--remote_driver_host',
 | 
	
		
			
				|  |  |                    default=None,
 | 
	
		
			
				|  |  |                    help='Run QPS driver on given host. By default, QPS driver is run locally.')
 | 
	
	
		
			
				|  | @@ -310,6 +380,11 @@ argp.add_argument('--remote_worker_host',
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  args = argp.parse_args()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +languages = set(_LANGUAGES[l]
 | 
	
		
			
				|  |  | +                for l in itertools.chain.from_iterable(
 | 
	
		
			
				|  |  | +                      _LANGUAGES.iterkeys() if x == 'all' else [x]
 | 
	
		
			
				|  |  | +                      for x in args.language))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  # Put together set of remote hosts where to run and build
 | 
	
		
			
				|  |  |  remote_hosts = set()
 | 
	
		
			
				|  |  |  if args.remote_worker_host:
 | 
	
	
		
			
				|  | @@ -327,13 +402,16 @@ if not args.remote_driver_host:
 | 
	
		
			
				|  |  |    build_local = True
 | 
	
		
			
				|  |  |  build_on_remote_hosts(remote_hosts, build_local=build_local)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -qpsworker_jobs = start_qpsworkers(args.remote_worker_host)
 | 
	
		
			
				|  |  | +qpsworker_jobs = start_qpsworkers(languages, args.remote_worker_host)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -worker_addresses = [job.host_and_port for job in qpsworker_jobs]
 | 
	
		
			
				|  |  | +# get list of worker addresses for each language.
 | 
	
		
			
				|  |  | +worker_addresses = dict([(str(language), []) for language in languages])
 | 
	
		
			
				|  |  | +for job in qpsworker_jobs:
 | 
	
		
			
				|  |  | +  worker_addresses[str(job.language)].append(job.host_and_port)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  try:
 | 
	
		
			
				|  |  | -  scenarios = create_scenarios(languages=[CXXLanguage()],
 | 
	
		
			
				|  |  | -                               workers=worker_addresses,
 | 
	
		
			
				|  |  | +  scenarios = create_scenarios(languages,
 | 
	
		
			
				|  |  | +                               workers_by_lang=worker_addresses,
 | 
	
		
			
				|  |  |                                 remote_host=args.remote_driver_host)
 | 
	
		
			
				|  |  |    if not scenarios:
 | 
	
		
			
				|  |  |      raise Exception('No scenarios to run')
 |