| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 | 
							- # Copyright 2015 gRPC authors.
 
- #
 
- # Licensed under the Apache License, Version 2.0 (the "License");
 
- # you may not use this file except in compliance with the License.
 
- # You may obtain a copy of the License at
 
- #
 
- #     http://www.apache.org/licenses/LICENSE-2.0
 
- #
 
- # Unless required by applicable law or agreed to in writing, software
 
- # distributed under the License is distributed on an "AS IS" BASIS,
 
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
- # See the License for the specific language governing permissions and
 
- # limitations under the License.
 
- """Run a group of subprocesses and then finish."""
 
- from __future__ import print_function
 
- import logging
 
- import multiprocessing
 
- import os
 
- import platform
 
- import re
 
- import signal
 
- import subprocess
 
- import sys
 
- import tempfile
 
- import time
 
- import errno
 
- # cpu cost measurement
 
- measure_cpu_costs = False
 
- _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
 
- # Maximum number of bytes of job's stdout that will be stored in the result.
 
- # Only last N bytes of stdout will be kept if the actual output longer.
 
- _MAX_RESULT_SIZE = 64 * 1024
 
- # NOTE: If you change this, please make sure to test reviewing the
 
- # github PR with http://reviewable.io, which is known to add UTF-8
 
- # characters to the PR description, which leak into the environment here
 
- # and cause failures.
 
- def strip_non_ascii_chars(s):
 
-     return ''.join(c for c in s if ord(c) < 128)
 
- def sanitized_environment(env):
 
-     sanitized = {}
 
-     for key, value in env.items():
 
-         sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
 
-     return sanitized
 
- def platform_string():
 
-     if platform.system() == 'Windows':
 
-         return 'windows'
 
-     elif platform.system()[:7] == 'MSYS_NT':
 
-         return 'windows'
 
-     elif platform.system() == 'Darwin':
 
-         return 'mac'
 
-     elif platform.system() == 'Linux':
 
-         return 'linux'
 
-     else:
 
-         return 'posix'
 
- # setup a signal handler so that signal.pause registers 'something'
 
- # when a child finishes
 
- # not using futures and threading to avoid a dependency on subprocess32
 
- if platform_string() == 'windows':
 
-     pass
 
- else:
 
-     def alarm_handler(unused_signum, unused_frame):
 
-         pass
 
-     signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
 
-     signal.signal(signal.SIGALRM, alarm_handler)
 
- _SUCCESS = object()
 
- _FAILURE = object()
 
- _RUNNING = object()
 
- _KILLED = object()
 
- _COLORS = {
 
-     'red': [31, 0],
 
-     'green': [32, 0],
 
-     'yellow': [33, 0],
 
-     'lightgray': [37, 0],
 
-     'gray': [30, 1],
 
-     'purple': [35, 0],
 
-     'cyan': [36, 0]
 
- }
 
- _BEGINNING_OF_LINE = '\x1b[0G'
 
- _CLEAR_LINE = '\x1b[2K'
 
- _TAG_COLOR = {
 
-     'FAILED': 'red',
 
-     'FLAKE': 'purple',
 
-     'TIMEOUT_FLAKE': 'purple',
 
-     'WARNING': 'yellow',
 
-     'TIMEOUT': 'red',
 
-     'PASSED': 'green',
 
-     'START': 'gray',
 
-     'WAITING': 'yellow',
 
-     'SUCCESS': 'green',
 
-     'IDLE': 'gray',
 
-     'SKIPPED': 'cyan'
 
- }
 
- _FORMAT = '%(asctime)-15s %(message)s'
 
- logging.basicConfig(level=logging.INFO, format=_FORMAT)
 
- def eintr_be_gone(fn):
 
-     """Run fn until it doesn't stop because of EINTR"""
 
-     while True:
 
-         try:
 
-             return fn()
 
-         except IOError, e:
 
-             if e.errno != errno.EINTR:
 
-                 raise
 
- def message(tag, msg, explanatory_text=None, do_newline=False):
 
-     if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
 
-         return
 
-     message.old_tag = tag
 
-     message.old_msg = msg
 
-     while True:
 
-         try:
 
-             if platform_string() == 'windows' or not sys.stdout.isatty():
 
-                 if explanatory_text:
 
-                     logging.info(explanatory_text)
 
-                 logging.info('%s: %s', tag, msg)
 
-             else:
 
-                 sys.stdout.write(
 
-                     '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' %
 
-                     (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % explanatory_text
 
-                      if explanatory_text is not None else '',
 
-                      _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
 
-                      tag, msg, '\n'
 
-                      if do_newline or explanatory_text is not None else ''))
 
-             sys.stdout.flush()
 
-             return
 
-         except IOError, e:
 
-             if e.errno != errno.EINTR:
 
-                 raise
 
- message.old_tag = ''
 
- message.old_msg = ''
 
- def which(filename):
 
-     if '/' in filename:
 
-         return filename
 
-     for path in os.environ['PATH'].split(os.pathsep):
 
-         if os.path.exists(os.path.join(path, filename)):
 
-             return os.path.join(path, filename)
 
-     raise Exception('%s not found' % filename)
 
- class JobSpec(object):
 
-     """Specifies what to run for a job."""
 
-     def __init__(self,
 
-                  cmdline,
 
-                  shortname=None,
 
-                  environ=None,
 
-                  cwd=None,
 
-                  shell=False,
 
-                  timeout_seconds=5 * 60,
 
-                  flake_retries=0,
 
-                  timeout_retries=0,
 
-                  kill_handler=None,
 
-                  cpu_cost=1.0,
 
-                  verbose_success=False):
 
-         """
 
-     Arguments:
 
-       cmdline: a list of arguments to pass as the command line
 
-       environ: a dictionary of environment variables to set in the child process
 
-       kill_handler: a handler that will be called whenever job.kill() is invoked
 
-       cpu_cost: number of cores per second this job needs
 
-     """
 
-         if environ is None:
 
-             environ = {}
 
-         self.cmdline = cmdline
 
-         self.environ = environ
 
-         self.shortname = cmdline[0] if shortname is None else shortname
 
-         self.cwd = cwd
 
-         self.shell = shell
 
-         self.timeout_seconds = timeout_seconds
 
-         self.flake_retries = flake_retries
 
-         self.timeout_retries = timeout_retries
 
-         self.kill_handler = kill_handler
 
-         self.cpu_cost = cpu_cost
 
-         self.verbose_success = verbose_success
 
-     def identity(self):
 
-         return '%r %r' % (self.cmdline, self.environ)
 
-     def __hash__(self):
 
-         return hash(self.identity())
 
-     def __cmp__(self, other):
 
-         return self.identity() == other.identity()
 
-     def __repr__(self):
 
-         return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname,
 
-                                                       self.cmdline)
 
-     def __str__(self):
 
-         return '%s: %s %s' % (self.shortname, ' '.join(
 
-             '%s=%s' % kv for kv in self.environ.items()),
 
-                               ' '.join(self.cmdline))
 
- class JobResult(object):
 
-     def __init__(self):
 
-         self.state = 'UNKNOWN'
 
-         self.returncode = -1
 
-         self.elapsed_time = 0
 
-         self.num_failures = 0
 
-         self.retries = 0
 
-         self.message = ''
 
-         self.cpu_estimated = 1
 
-         self.cpu_measured = 1
 
- def read_from_start(f):
 
-     f.seek(0)
 
-     return f.read()
 
- class Job(object):
 
-     """Manages one job."""
 
-     def __init__(self,
 
-                  spec,
 
-                  newline_on_success,
 
-                  travis,
 
-                  add_env,
 
-                  quiet_success=False):
 
-         self._spec = spec
 
-         self._newline_on_success = newline_on_success
 
-         self._travis = travis
 
-         self._add_env = add_env.copy()
 
-         self._retries = 0
 
-         self._timeout_retries = 0
 
-         self._suppress_failure_message = False
 
-         self._quiet_success = quiet_success
 
-         if not self._quiet_success:
 
-             message('START', spec.shortname, do_newline=self._travis)
 
-         self.result = JobResult()
 
-         self.start()
 
-     def GetSpec(self):
 
-         return self._spec
 
-     def start(self):
 
-         self._tempfile = tempfile.TemporaryFile()
 
-         env = dict(os.environ)
 
-         env.update(self._spec.environ)
 
-         env.update(self._add_env)
 
-         env = sanitized_environment(env)
 
-         self._start = time.time()
 
-         cmdline = self._spec.cmdline
 
-         # The Unix time command is finicky when used with MSBuild, so we don't use it
 
-         # with jobs that run MSBuild.
 
-         global measure_cpu_costs
 
-         if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
 
-             cmdline = ['time', '-p'] + cmdline
 
-         else:
 
-             measure_cpu_costs = False
 
-         try_start = lambda: subprocess.Popen(args=cmdline,
 
-                                              stderr=subprocess.STDOUT,
 
-                                              stdout=self._tempfile,
 
-                                              cwd=self._spec.cwd,
 
-                                              shell=self._spec.shell,
 
-                                              env=env)
 
-         delay = 0.3
 
-         for i in range(0, 4):
 
-             try:
 
-                 self._process = try_start()
 
-                 break
 
-             except OSError:
 
-                 message('WARNING',
 
-                         'Failed to start %s, retrying in %f seconds' %
 
-                         (self._spec.shortname, delay))
 
-                 time.sleep(delay)
 
-                 delay *= 2
 
-         else:
 
-             self._process = try_start()
 
-         self._state = _RUNNING
 
-     def state(self):
 
-         """Poll current state of the job. Prints messages at completion."""
 
-         def stdout(self=self):
 
-             stdout = read_from_start(self._tempfile)
 
-             self.result.message = stdout[-_MAX_RESULT_SIZE:]
 
-             return stdout
 
-         if self._state == _RUNNING and self._process.poll() is not None:
 
-             elapsed = time.time() - self._start
 
-             self.result.elapsed_time = elapsed
 
-             if self._process.returncode != 0:
 
-                 if self._retries < self._spec.flake_retries:
 
-                     message(
 
-                         'FLAKE',
 
-                         '%s [ret=%d, pid=%d]' %
 
-                         (self._spec.shortname, self._process.returncode,
 
-                          self._process.pid),
 
-                         stdout(),
 
-                         do_newline=True)
 
-                     self._retries += 1
 
-                     self.result.num_failures += 1
 
-                     self.result.retries = self._timeout_retries + self._retries
 
-                     # NOTE: job is restarted regardless of jobset's max_time setting
 
-                     self.start()
 
-                 else:
 
-                     self._state = _FAILURE
 
-                     if not self._suppress_failure_message:
 
-                         message(
 
-                             'FAILED',
 
-                             '%s [ret=%d, pid=%d, time=%.1fsec]' %
 
-                             (self._spec.shortname, self._process.returncode,
 
-                              self._process.pid, elapsed),
 
-                             stdout(),
 
-                             do_newline=True)
 
-                     self.result.state = 'FAILED'
 
-                     self.result.num_failures += 1
 
-                     self.result.returncode = self._process.returncode
 
-             else:
 
-                 self._state = _SUCCESS
 
-                 measurement = ''
 
-                 if measure_cpu_costs:
 
-                     m = re.search(
 
-                         r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
 
-                         stdout())
 
-                     real = float(m.group(1))
 
-                     user = float(m.group(2))
 
-                     sys = float(m.group(3))
 
-                     if real > 0.5:
 
-                         cores = (user + sys) / real
 
-                         self.result.cpu_measured = float('%.01f' % cores)
 
-                         self.result.cpu_estimated = float(
 
-                             '%.01f' % self._spec.cpu_cost)
 
-                         measurement = '; cpu_cost=%.01f; estimated=%.01f' % (
 
-                             self.result.cpu_measured, self.result.cpu_estimated)
 
-                 if not self._quiet_success:
 
-                     message(
 
-                         'PASSED',
 
-                         '%s [time=%.1fsec, retries=%d:%d%s]' %
 
-                         (self._spec.shortname, elapsed, self._retries,
 
-                          self._timeout_retries, measurement),
 
-                         stdout() if self._spec.verbose_success else None,
 
-                         do_newline=self._newline_on_success or self._travis)
 
-                 self.result.state = 'PASSED'
 
-         elif (self._state == _RUNNING and
 
-               self._spec.timeout_seconds is not None and
 
-               time.time() - self._start > self._spec.timeout_seconds):
 
-             elapsed = time.time() - self._start
 
-             self.result.elapsed_time = elapsed
 
-             if self._timeout_retries < self._spec.timeout_retries:
 
-                 message(
 
-                     'TIMEOUT_FLAKE',
 
-                     '%s [pid=%d]' % (self._spec.shortname, self._process.pid),
 
-                     stdout(),
 
-                     do_newline=True)
 
-                 self._timeout_retries += 1
 
-                 self.result.num_failures += 1
 
-                 self.result.retries = self._timeout_retries + self._retries
 
-                 if self._spec.kill_handler:
 
-                     self._spec.kill_handler(self)
 
-                 self._process.terminate()
 
-                 # NOTE: job is restarted regardless of jobset's max_time setting
 
-                 self.start()
 
-             else:
 
-                 message(
 
-                     'TIMEOUT',
 
-                     '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname,
 
-                                                    self._process.pid, elapsed),
 
-                     stdout(),
 
-                     do_newline=True)
 
-                 self.kill()
 
-                 self.result.state = 'TIMEOUT'
 
-                 self.result.num_failures += 1
 
-         return self._state
 
-     def kill(self):
 
-         if self._state == _RUNNING:
 
-             self._state = _KILLED
 
-             if self._spec.kill_handler:
 
-                 self._spec.kill_handler(self)
 
-             self._process.terminate()
 
-     def suppress_failure_message(self):
 
-         self._suppress_failure_message = True
 
- class Jobset(object):
 
-     """Manages one run of jobs."""
 
-     def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
 
-                  newline_on_success, travis, stop_on_failure, add_env,
 
-                  quiet_success, max_time):
 
-         self._running = set()
 
-         self._check_cancelled = check_cancelled
 
-         self._cancelled = False
 
-         self._failures = 0
 
-         self._completed = 0
 
-         self._maxjobs = maxjobs
 
-         self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
 
-         self._newline_on_success = newline_on_success
 
-         self._travis = travis
 
-         self._stop_on_failure = stop_on_failure
 
-         self._add_env = add_env
 
-         self._quiet_success = quiet_success
 
-         self._max_time = max_time
 
-         self.resultset = {}
 
-         self._remaining = None
 
-         self._start_time = time.time()
 
-     def set_remaining(self, remaining):
 
-         self._remaining = remaining
 
-     def get_num_failures(self):
 
-         return self._failures
 
-     def cpu_cost(self):
 
-         c = 0
 
-         for job in self._running:
 
-             c += job._spec.cpu_cost
 
-         return c
 
-     def start(self, spec):
 
-         """Start a job. Return True on success, False on failure."""
 
-         while True:
 
-             if self._max_time > 0 and time.time(
 
-             ) - self._start_time > self._max_time:
 
-                 skipped_job_result = JobResult()
 
-                 skipped_job_result.state = 'SKIPPED'
 
-                 message('SKIPPED', spec.shortname, do_newline=True)
 
-                 self.resultset[spec.shortname] = [skipped_job_result]
 
-                 return True
 
-             if self.cancelled(): return False
 
-             current_cpu_cost = self.cpu_cost()
 
-             if current_cpu_cost == 0: break
 
-             if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
 
-                 if len(self._running) < self._maxjobs_cpu_agnostic:
 
-                     break
 
-             self.reap(spec.shortname, spec.cpu_cost)
 
-         if self.cancelled(): return False
 
-         job = Job(spec, self._newline_on_success, self._travis, self._add_env,
 
-                   self._quiet_success)
 
-         self._running.add(job)
 
-         if job.GetSpec().shortname not in self.resultset:
 
-             self.resultset[job.GetSpec().shortname] = []
 
-         return True
 
-     def reap(self, waiting_for=None, waiting_for_cost=None):
 
-         """Collect the dead jobs."""
 
-         while self._running:
 
-             dead = set()
 
-             for job in self._running:
 
-                 st = eintr_be_gone(lambda: job.state())
 
-                 if st == _RUNNING: continue
 
-                 if st == _FAILURE or st == _KILLED:
 
-                     self._failures += 1
 
-                     if self._stop_on_failure:
 
-                         self._cancelled = True
 
-                         for job in self._running:
 
-                             job.kill()
 
-                 dead.add(job)
 
-                 break
 
-             for job in dead:
 
-                 self._completed += 1
 
-                 if not self._quiet_success or job.result.state != 'PASSED':
 
-                     self.resultset[job.GetSpec().shortname].append(job.result)
 
-                 self._running.remove(job)
 
-             if dead: return
 
-             if not self._travis and platform_string() != 'windows':
 
-                 rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
 
-                 if self._remaining is not None and self._completed > 0:
 
-                     now = time.time()
 
-                     sofar = now - self._start_time
 
-                     remaining = sofar / self._completed * (
 
-                         self._remaining + len(self._running))
 
-                     rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
 
-                 if waiting_for is not None:
 
-                     wstr = ' next: %s @ %.2f cpu' % (waiting_for,
 
-                                                      waiting_for_cost)
 
-                 else:
 
-                     wstr = ''
 
-                 message(
 
-                     'WAITING',
 
-                     '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
 
-                     (rstr, len(self._running), self._completed, self._failures,
 
-                      self.cpu_cost(), wstr))
 
-             if platform_string() == 'windows':
 
-                 time.sleep(0.1)
 
-             else:
 
-                 signal.alarm(10)
 
-                 signal.pause()
 
-     def cancelled(self):
 
-         """Poll for cancellation."""
 
-         if self._cancelled: return True
 
-         if not self._check_cancelled(): return False
 
-         for job in self._running:
 
-             job.kill()
 
-         self._cancelled = True
 
-         return True
 
-     def finish(self):
 
-         while self._running:
 
-             if self.cancelled(): pass  # poll cancellation
 
-             self.reap()
 
-         if platform_string() != 'windows':
 
-             signal.alarm(0)
 
-         return not self.cancelled() and self._failures == 0
 
- def _never_cancelled():
 
-     return False
 
- def tag_remaining(xs):
 
-     staging = []
 
-     for x in xs:
 
-         staging.append(x)
 
-         if len(staging) > 5000:
 
-             yield (staging.pop(0), None)
 
-     n = len(staging)
 
-     for i, x in enumerate(staging):
 
-         yield (x, n - i - 1)
 
- def run(cmdlines,
 
-         check_cancelled=_never_cancelled,
 
-         maxjobs=None,
 
-         maxjobs_cpu_agnostic=None,
 
-         newline_on_success=False,
 
-         travis=False,
 
-         infinite_runs=False,
 
-         stop_on_failure=False,
 
-         add_env={},
 
-         skip_jobs=False,
 
-         quiet_success=False,
 
-         max_time=-1):
 
-     if skip_jobs:
 
-         resultset = {}
 
-         skipped_job_result = JobResult()
 
-         skipped_job_result.state = 'SKIPPED'
 
-         for job in cmdlines:
 
-             message('SKIPPED', job.shortname, do_newline=True)
 
-             resultset[job.shortname] = [skipped_job_result]
 
-         return 0, resultset
 
-     js = Jobset(check_cancelled, maxjobs if maxjobs is not None else
 
-                 _DEFAULT_MAX_JOBS, maxjobs_cpu_agnostic
 
-                 if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS,
 
-                 newline_on_success, travis, stop_on_failure, add_env,
 
-                 quiet_success, max_time)
 
-     for cmdline, remaining in tag_remaining(cmdlines):
 
-         if not js.start(cmdline):
 
-             break
 
-         if remaining is not None:
 
-             js.set_remaining(remaining)
 
-     js.finish()
 
-     return js.get_num_failures(), js.resultset
 
 
  |