|  | @@ -33,11 +33,13 @@
 | 
	
		
			
				|  |  |  from __future__ import print_function
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import argparse
 | 
	
		
			
				|  |  | +import collections
 | 
	
		
			
				|  |  |  import itertools
 | 
	
		
			
				|  |  |  import jobset
 | 
	
		
			
				|  |  |  import json
 | 
	
		
			
				|  |  |  import multiprocessing
 | 
	
		
			
				|  |  |  import os
 | 
	
		
			
				|  |  | +import performance.scenario_config as scenario_config
 | 
	
		
			
				|  |  |  import pipes
 | 
	
		
			
				|  |  |  import re
 | 
	
		
			
				|  |  |  import subprocess
 | 
	
	
		
			
				|  | @@ -46,7 +48,6 @@ import tempfile
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  |  import traceback
 | 
	
		
			
				|  |  |  import uuid
 | 
	
		
			
				|  |  | -import performance.scenario_config as scenario_config
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
 | 
	
	
		
			
				|  | @@ -63,14 +64,19 @@ class QpsWorkerJob:
 | 
	
		
			
				|  |  |      self._spec = spec
 | 
	
		
			
				|  |  |      self.language = language
 | 
	
		
			
				|  |  |      self.host_and_port = host_and_port
 | 
	
		
			
				|  |  | -    self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={})
 | 
	
		
			
				|  |  | +    self._job = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def start(self):
 | 
	
		
			
				|  |  | +    self._job = jobset.Job(self._spec, newline_on_success=True, travis=True, add_env={})
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def is_running(self):
 | 
	
		
			
				|  |  |      """Polls a job and returns True if given job is still running."""
 | 
	
		
			
				|  |  | -    return self._job.state() == jobset._RUNNING
 | 
	
		
			
				|  |  | +    return self._job and self._job.state() == jobset._RUNNING
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def kill(self):
 | 
	
		
			
				|  |  | -    return self._job.kill()
 | 
	
		
			
				|  |  | +    if self._job:
 | 
	
		
			
				|  |  | +      self._job.kill()
 | 
	
		
			
				|  |  | +      self._job = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def create_qpsworker_job(language, shortname=None,
 | 
	
	
		
			
				|  | @@ -119,7 +125,7 @@ def create_scenario_jobspec(scenario_json, workers, remote_host=None,
 | 
	
		
			
				|  |  |  def create_quit_jobspec(workers, remote_host=None):
 | 
	
		
			
				|  |  |    """Runs quit using QPS driver."""
 | 
	
		
			
				|  |  |    # setting QPS_WORKERS env variable here makes sure it works with SSH too.
 | 
	
		
			
				|  |  | -  cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(workers)
 | 
	
		
			
				|  |  | +  cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(w.host_and_port for w in workers)
 | 
	
		
			
				|  |  |    if remote_host:
 | 
	
		
			
				|  |  |      user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
 | 
	
		
			
				|  |  |      cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))
 | 
	
	
		
			
				|  | @@ -253,8 +259,8 @@ def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), bui
 | 
	
		
			
				|  |  |      sys.exit(1)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def start_qpsworkers(languages, worker_hosts):
 | 
	
		
			
				|  |  | -  """Starts QPS workers as background jobs."""
 | 
	
		
			
				|  |  | +def create_qpsworkers(languages, worker_hosts):
 | 
	
		
			
				|  |  | +  """Creates QPS workers (but does not start them)."""
 | 
	
		
			
				|  |  |    if not worker_hosts:
 | 
	
		
			
				|  |  |      # run two workers locally (for each language)
 | 
	
		
			
				|  |  |      workers=[(None, 10000), (None, 10010)]
 | 
	
	
		
			
				|  | @@ -274,6 +280,9 @@ def start_qpsworkers(languages, worker_hosts):
 | 
	
		
			
				|  |  |            for worker_idx, worker in enumerate(workers)]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +Scenario = collections.namedtuple('Scenario', 'jobspec workers')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
 | 
	
		
			
				|  |  |                       category='all', bq_result_table=None,
 | 
	
		
			
				|  |  |                       netperf=False, netperf_hosts=[]):
 | 
	
	
		
			
				|  | @@ -282,6 +291,7 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
 | 
	
		
			
				|  |  |                   for workers in workers_by_lang.values()
 | 
	
		
			
				|  |  |                   for worker in workers]
 | 
	
		
			
				|  |  |    scenarios = []
 | 
	
		
			
				|  |  | +  _NO_WORKERS = []
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if netperf:
 | 
	
		
			
				|  |  |      if not netperf_hosts:
 | 
	
	
		
			
				|  | @@ -293,16 +303,18 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
 | 
	
		
			
				|  |  |      else:
 | 
	
		
			
				|  |  |        netperf_server=netperf_hosts[0]
 | 
	
		
			
				|  |  |        netperf_client=netperf_hosts[1]
 | 
	
		
			
				|  |  | -    scenarios.append(create_netperf_jobspec(server_host=netperf_server,
 | 
	
		
			
				|  |  | -                                            client_host=netperf_client,
 | 
	
		
			
				|  |  | -                                            bq_result_table=bq_result_table))
 | 
	
		
			
				|  |  | +    scenarios.append(Scenario(
 | 
	
		
			
				|  |  | +        create_netperf_jobspec(server_host=netperf_server,
 | 
	
		
			
				|  |  | +                               client_host=netperf_client,
 | 
	
		
			
				|  |  | +                               bq_result_table=bq_result_table),
 | 
	
		
			
				|  |  | +        _NO_WORKERS))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    for language in languages:
 | 
	
		
			
				|  |  |      for scenario_json in language.scenarios():
 | 
	
		
			
				|  |  |        if re.search(args.regex, scenario_json['name']):
 | 
	
		
			
				|  |  |          categories = scenario_json.get('CATEGORIES', [])
 | 
	
		
			
				|  |  |          if category in categories or (category == 'all' and categories != ['sweep']):
 | 
	
		
			
				|  |  | -          workers = workers_by_lang[str(language)]
 | 
	
		
			
				|  |  | +          workers = workers_by_lang[str(language)][:]
 | 
	
		
			
				|  |  |            # 'SERVER_LANGUAGE' is an indicator for this script to pick
 | 
	
		
			
				|  |  |            # a server in different language.
 | 
	
		
			
				|  |  |            custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None)
 | 
	
	
		
			
				|  | @@ -330,14 +342,14 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
 | 
	
		
			
				|  |  |                # replace all client workers by workers of a different language,
 | 
	
		
			
				|  |  |                # leave num_server workers as they are server workers.
 | 
	
		
			
				|  |  |                workers[idx] = workers_by_lang[custom_client_lang][idx]
 | 
	
		
			
				|  |  | -          scenario = create_scenario_jobspec(scenario_json,
 | 
	
		
			
				|  |  | -                                             workers,
 | 
	
		
			
				|  |  | -                                             remote_host=remote_host,
 | 
	
		
			
				|  |  | -                                             bq_result_table=bq_result_table)
 | 
	
		
			
				|  |  | +          scenario = Scenario(
 | 
	
		
			
				|  |  | +              create_scenario_jobspec(scenario_json,
 | 
	
		
			
				|  |  | +                                      [w.host_and_port for w in workers],
 | 
	
		
			
				|  |  | +                                      remote_host=remote_host,
 | 
	
		
			
				|  |  | +                                      bq_result_table=bq_result_table),
 | 
	
		
			
				|  |  | +              workers)
 | 
	
		
			
				|  |  |            scenarios.append(scenario)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  # the very last scenario requests shutting down the workers.
 | 
	
		
			
				|  |  | -  scenarios.append(create_quit_jobspec(all_workers, remote_host=remote_host))
 | 
	
		
			
				|  |  |    return scenarios
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -411,42 +423,31 @@ if not args.remote_driver_host:
 | 
	
		
			
				|  |  |    build_local = True
 | 
	
		
			
				|  |  |  build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -qpsworker_jobs = start_qpsworkers(languages, args.remote_worker_host)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -# TODO(jtattermusch): see https://github.com/grpc/grpc/issues/6174
 | 
	
		
			
				|  |  | -time.sleep(5)
 | 
	
		
			
				|  |  | +qpsworker_jobs = create_qpsworkers(languages, args.remote_worker_host)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  # get list of worker addresses for each language.
 | 
	
		
			
				|  |  | -worker_addresses = dict([(str(language), []) for language in languages])
 | 
	
		
			
				|  |  | +workers_by_lang = dict([(str(language), []) for language in languages])
 | 
	
		
			
				|  |  |  for job in qpsworker_jobs:
 | 
	
		
			
				|  |  | -  worker_addresses[str(job.language)].append(job.host_and_port)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -try:
 | 
	
		
			
				|  |  | -  scenarios = create_scenarios(languages,
 | 
	
		
			
				|  |  | -                               workers_by_lang=worker_addresses,
 | 
	
		
			
				|  |  | -                               remote_host=args.remote_driver_host,
 | 
	
		
			
				|  |  | -                               regex=args.regex,
 | 
	
		
			
				|  |  | -                               category=args.category,
 | 
	
		
			
				|  |  | -                               bq_result_table=args.bq_result_table,
 | 
	
		
			
				|  |  | -                               netperf=args.netperf,
 | 
	
		
			
				|  |  | -                               netperf_hosts=args.remote_worker_host)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if not scenarios:
 | 
	
		
			
				|  |  | -    raise Exception('No scenarios to run')
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  jobset.message('START', 'Running scenarios.', do_newline=True)
 | 
	
		
			
				|  |  | -  num_failures, _ = jobset.run(
 | 
	
		
			
				|  |  | -      scenarios, newline_on_success=True, maxjobs=1)
 | 
	
		
			
				|  |  | -  if num_failures == 0:
 | 
	
		
			
				|  |  | -    jobset.message('SUCCESS',
 | 
	
		
			
				|  |  | -                   'All scenarios finished successfully.',
 | 
	
		
			
				|  |  | -                   do_newline=True)
 | 
	
		
			
				|  |  | -  else:
 | 
	
		
			
				|  |  | -    jobset.message('FAILED', 'Some of the scenarios failed.',
 | 
	
		
			
				|  |  | -                   do_newline=True)
 | 
	
		
			
				|  |  | -    sys.exit(1)
 | 
	
		
			
				|  |  | -except:
 | 
	
		
			
				|  |  | -  traceback.print_exc()
 | 
	
		
			
				|  |  | -  raise
 | 
	
		
			
				|  |  | -finally:
 | 
	
		
			
				|  |  | -  finish_qps_workers(qpsworker_jobs)
 | 
	
		
			
				|  |  | +  workers_by_lang[str(job.language)].append(job)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +scenarios = create_scenarios(languages,
 | 
	
		
			
				|  |  | +                           workers_by_lang=workers_by_lang,
 | 
	
		
			
				|  |  | +                           remote_host=args.remote_driver_host,
 | 
	
		
			
				|  |  | +                           regex=args.regex,
 | 
	
		
			
				|  |  | +                           category=args.category,
 | 
	
		
			
				|  |  | +                           bq_result_table=args.bq_result_table,
 | 
	
		
			
				|  |  | +                           netperf=args.netperf,
 | 
	
		
			
				|  |  | +                           netperf_hosts=args.remote_worker_host)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +if not scenarios:
 | 
	
		
			
				|  |  | +  raise Exception('No scenarios to run')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +for scenario in scenarios:
 | 
	
		
			
				|  |  | +  try:
 | 
	
		
			
				|  |  | +    for worker in scenario.workers:
 | 
	
		
			
				|  |  | +      worker.start()
 | 
	
		
			
				|  |  | +    jobset.run([scenario.jobspec,
 | 
	
		
			
				|  |  | +                create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)],
 | 
	
		
			
				|  |  | +               newline_on_success=True, maxjobs=1)
 | 
	
		
			
				|  |  | +  finally:
 | 
	
		
			
				|  |  | +    finish_qps_workers(scenario.workers)
 |