jobset.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. # Copyright 2015, Google Inc.
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are
  6. # met:
  7. #
  8. # * Redistributions of source code must retain the above copyright
  9. # notice, this list of conditions and the following disclaimer.
  10. # * Redistributions in binary form must reproduce the above
  11. # copyright notice, this list of conditions and the following disclaimer
  12. # in the documentation and/or other materials provided with the
  13. # distribution.
  14. # * Neither the name of Google Inc. nor the names of its
  15. # contributors may be used to endorse or promote products derived from
  16. # this software without specific prior written permission.
  17. #
  18. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  19. # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  20. # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  21. # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  22. # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  24. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  25. # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  26. # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  27. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  28. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29. """Run a group of subprocesses and then finish."""
  30. from __future__ import print_function
  31. import multiprocessing
  32. import os
  33. import platform
  34. import re
  35. import signal
  36. import subprocess
  37. import sys
  38. import tempfile
  39. import time
  40. # cpu cost measurement
  41. measure_cpu_costs = False
  42. _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
  43. _MAX_RESULT_SIZE = 8192
  44. def sanitized_environment(env):
  45. sanitized = {}
  46. for key, value in env.items():
  47. sanitized[str(key).encode()] = str(value).encode()
  48. return sanitized
  49. def platform_string():
  50. if platform.system() == 'Windows':
  51. return 'windows'
  52. elif platform.system()[:7] == 'MSYS_NT':
  53. return 'windows'
  54. elif platform.system() == 'Darwin':
  55. return 'mac'
  56. elif platform.system() == 'Linux':
  57. return 'linux'
  58. else:
  59. return 'posix'
  60. # setup a signal handler so that signal.pause registers 'something'
  61. # when a child finishes
  62. # not using futures and threading to avoid a dependency on subprocess32
  63. if platform_string() == 'windows':
  64. pass
  65. else:
  66. have_alarm = False
  67. def alarm_handler(unused_signum, unused_frame):
  68. global have_alarm
  69. have_alarm = False
  70. signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
  71. signal.signal(signal.SIGALRM, alarm_handler)
  72. _SUCCESS = object()
  73. _FAILURE = object()
  74. _RUNNING = object()
  75. _KILLED = object()
  76. _COLORS = {
  77. 'red': [ 31, 0 ],
  78. 'green': [ 32, 0 ],
  79. 'yellow': [ 33, 0 ],
  80. 'lightgray': [ 37, 0],
  81. 'gray': [ 30, 1 ],
  82. 'purple': [ 35, 0 ],
  83. }
  84. _BEGINNING_OF_LINE = '\x1b[0G'
  85. _CLEAR_LINE = '\x1b[2K'
  86. _TAG_COLOR = {
  87. 'FAILED': 'red',
  88. 'FLAKE': 'purple',
  89. 'TIMEOUT_FLAKE': 'purple',
  90. 'WARNING': 'yellow',
  91. 'TIMEOUT': 'red',
  92. 'PASSED': 'green',
  93. 'START': 'gray',
  94. 'WAITING': 'yellow',
  95. 'SUCCESS': 'green',
  96. 'IDLE': 'gray',
  97. }
  98. def message(tag, msg, explanatory_text=None, do_newline=False):
  99. if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
  100. return
  101. message.old_tag = tag
  102. message.old_msg = msg
  103. try:
  104. if platform_string() == 'windows' or not sys.stdout.isatty():
  105. if explanatory_text:
  106. print(explanatory_text)
  107. print('%s: %s' % (tag, msg))
  108. return
  109. sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
  110. _BEGINNING_OF_LINE,
  111. _CLEAR_LINE,
  112. '\n%s' % explanatory_text if explanatory_text is not None else '',
  113. _COLORS[_TAG_COLOR[tag]][1],
  114. _COLORS[_TAG_COLOR[tag]][0],
  115. tag,
  116. msg,
  117. '\n' if do_newline or explanatory_text is not None else ''))
  118. sys.stdout.flush()
  119. except:
  120. pass
  121. message.old_tag = ''
  122. message.old_msg = ''
  123. def which(filename):
  124. if '/' in filename:
  125. return filename
  126. for path in os.environ['PATH'].split(os.pathsep):
  127. if os.path.exists(os.path.join(path, filename)):
  128. return os.path.join(path, filename)
  129. raise Exception('%s not found' % filename)
  130. class JobSpec(object):
  131. """Specifies what to run for a job."""
  132. def __init__(self, cmdline, shortname=None, environ=None,
  133. cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
  134. timeout_retries=0, kill_handler=None, cpu_cost=1.0,
  135. verbose_success=False):
  136. """
  137. Arguments:
  138. cmdline: a list of arguments to pass as the command line
  139. environ: a dictionary of environment variables to set in the child process
  140. kill_handler: a handler that will be called whenever job.kill() is invoked
  141. cpu_cost: number of cores per second this job needs
  142. """
  143. if environ is None:
  144. environ = {}
  145. self.cmdline = cmdline
  146. self.environ = environ
  147. self.shortname = cmdline[0] if shortname is None else shortname
  148. self.cwd = cwd
  149. self.shell = shell
  150. self.timeout_seconds = timeout_seconds
  151. self.flake_retries = flake_retries
  152. self.timeout_retries = timeout_retries
  153. self.kill_handler = kill_handler
  154. self.cpu_cost = cpu_cost
  155. self.verbose_success = verbose_success
  156. def identity(self):
  157. return '%r %r' % (self.cmdline, self.environ)
  158. def __hash__(self):
  159. return hash(self.identity())
  160. def __cmp__(self, other):
  161. return self.identity() == other.identity()
  162. def __repr__(self):
  163. return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, self.cmdline)
  164. class JobResult(object):
  165. def __init__(self):
  166. self.state = 'UNKNOWN'
  167. self.returncode = -1
  168. self.elapsed_time = 0
  169. self.num_failures = 0
  170. self.retries = 0
  171. self.message = ''
  172. class Job(object):
  173. """Manages one job."""
  174. def __init__(self, spec, newline_on_success, travis, add_env):
  175. self._spec = spec
  176. self._newline_on_success = newline_on_success
  177. self._travis = travis
  178. self._add_env = add_env.copy()
  179. self._retries = 0
  180. self._timeout_retries = 0
  181. self._suppress_failure_message = False
  182. message('START', spec.shortname, do_newline=self._travis)
  183. self.result = JobResult()
  184. self.start()
  185. def GetSpec(self):
  186. return self._spec
  187. def start(self):
  188. self._tempfile = tempfile.TemporaryFile()
  189. env = dict(os.environ)
  190. env.update(self._spec.environ)
  191. env.update(self._add_env)
  192. env = sanitized_environment(env)
  193. penv = {}
  194. penv.update(self._spec.environ)
  195. penv.update(self._add_env)
  196. self._start = time.time()
  197. cmdline = self._spec.cmdline
  198. if measure_cpu_costs:
  199. cmdline = ['time', '--portability'] + cmdline
  200. print('\n\ncmdline: %s\nenv: %s\n' % (cmdline, penv))
  201. try_start = lambda: subprocess.Popen(args=cmdline,
  202. stderr=subprocess.STDOUT,
  203. stdout=self._tempfile,
  204. cwd=self._spec.cwd,
  205. shell=self._spec.shell,
  206. env=env)
  207. delay = 0.3
  208. for i in range(0, 4):
  209. try:
  210. self._process = try_start()
  211. break
  212. except OSError:
  213. message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self._spec.shortname, delay))
  214. time.sleep(delay)
  215. delay *= 2
  216. else:
  217. self._process = try_start()
  218. self._state = _RUNNING
  219. def state(self):
  220. """Poll current state of the job. Prints messages at completion."""
  221. def stdout(self=self):
  222. self._tempfile.seek(0)
  223. stdout = self._tempfile.read()
  224. self.result.message = stdout[-_MAX_RESULT_SIZE:]
  225. return stdout
  226. if self._state == _RUNNING and self._process.poll() is not None:
  227. elapsed = time.time() - self._start
  228. self.result.elapsed_time = elapsed
  229. if self._process.returncode != 0:
  230. if self._retries < self._spec.flake_retries:
  231. message('FLAKE', '%s [ret=%d, pid=%d]' % (
  232. self._spec.shortname, self._process.returncode, self._process.pid),
  233. stdout(), do_newline=True)
  234. self._retries += 1
  235. self.result.num_failures += 1
  236. self.result.retries = self._timeout_retries + self._retries
  237. self.start()
  238. else:
  239. self._state = _FAILURE
  240. if not self._suppress_failure_message:
  241. message('FAILED', '%s [ret=%d, pid=%d]' % (
  242. self._spec.shortname, self._process.returncode, self._process.pid),
  243. stdout(), do_newline=True)
  244. self.result.state = 'FAILED'
  245. self.result.num_failures += 1
  246. self.result.returncode = self._process.returncode
  247. else:
  248. self._state = _SUCCESS
  249. measurement = ''
  250. if measure_cpu_costs:
  251. m = re.search(r'real ([0-9.]+)\nuser ([0-9.]+)\nsys ([0-9.]+)', stdout())
  252. real = float(m.group(1))
  253. user = float(m.group(2))
  254. sys = float(m.group(3))
  255. if real > 0.5:
  256. cores = (user + sys) / real
  257. measurement = '; cpu_cost=%.01f; estimated=%.01f' % (cores, self._spec.cpu_cost)
  258. message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
  259. self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
  260. stdout() if self._spec.verbose_success else None,
  261. do_newline=self._newline_on_success or self._travis)
  262. self.result.state = 'PASSED'
  263. elif (self._state == _RUNNING and
  264. self._spec.timeout_seconds is not None and
  265. time.time() - self._start > self._spec.timeout_seconds):
  266. if self._timeout_retries < self._spec.timeout_retries:
  267. message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
  268. self._timeout_retries += 1
  269. self.result.num_failures += 1
  270. self.result.retries = self._timeout_retries + self._retries
  271. if self._spec.kill_handler:
  272. self._spec.kill_handler(self)
  273. self._process.terminate()
  274. self.start()
  275. else:
  276. message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
  277. self.kill()
  278. self.result.state = 'TIMEOUT'
  279. self.result.num_failures += 1
  280. return self._state
  281. def kill(self):
  282. if self._state == _RUNNING:
  283. self._state = _KILLED
  284. if self._spec.kill_handler:
  285. self._spec.kill_handler(self)
  286. self._process.terminate()
  287. def suppress_failure_message(self):
  288. self._suppress_failure_message = True
  289. class Jobset(object):
  290. """Manages one run of jobs."""
  291. def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
  292. stop_on_failure, add_env):
  293. self._running = set()
  294. self._check_cancelled = check_cancelled
  295. self._cancelled = False
  296. self._failures = 0
  297. self._completed = 0
  298. self._maxjobs = maxjobs
  299. self._newline_on_success = newline_on_success
  300. self._travis = travis
  301. self._stop_on_failure = stop_on_failure
  302. self._add_env = add_env
  303. self.resultset = {}
  304. self._remaining = None
  305. self._start_time = time.time()
  306. def set_remaining(self, remaining):
  307. self._remaining = remaining
  308. def get_num_failures(self):
  309. return self._failures
  310. def cpu_cost(self):
  311. c = 0
  312. for job in self._running:
  313. c += job._spec.cpu_cost
  314. return c
  315. def start(self, spec):
  316. """Start a job. Return True on success, False on failure."""
  317. while True:
  318. if self.cancelled(): return False
  319. current_cpu_cost = self.cpu_cost()
  320. if current_cpu_cost == 0: break
  321. if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
  322. self.reap()
  323. if self.cancelled(): return False
  324. job = Job(spec,
  325. self._newline_on_success,
  326. self._travis,
  327. self._add_env)
  328. self._running.add(job)
  329. if job.GetSpec().shortname not in self.resultset:
  330. self.resultset[job.GetSpec().shortname] = []
  331. return True
  332. def reap(self):
  333. """Collect the dead jobs."""
  334. while self._running:
  335. dead = set()
  336. for job in self._running:
  337. st = job.state()
  338. if st == _RUNNING: continue
  339. if st == _FAILURE or st == _KILLED:
  340. self._failures += 1
  341. if self._stop_on_failure:
  342. self._cancelled = True
  343. for job in self._running:
  344. job.kill()
  345. dead.add(job)
  346. break
  347. for job in dead:
  348. self._completed += 1
  349. self.resultset[job.GetSpec().shortname].append(job.result)
  350. self._running.remove(job)
  351. if dead: return
  352. if (not self._travis):
  353. rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
  354. if self._remaining is not None and self._completed > 0:
  355. now = time.time()
  356. sofar = now - self._start_time
  357. remaining = sofar / self._completed * (self._remaining + len(self._running))
  358. rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
  359. message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
  360. rstr, len(self._running), self._completed, self._failures))
  361. if platform_string() == 'windows':
  362. time.sleep(0.1)
  363. else:
  364. global have_alarm
  365. if not have_alarm:
  366. have_alarm = True
  367. signal.alarm(10)
  368. signal.pause()
  369. def cancelled(self):
  370. """Poll for cancellation."""
  371. if self._cancelled: return True
  372. if not self._check_cancelled(): return False
  373. for job in self._running:
  374. job.kill()
  375. self._cancelled = True
  376. return True
  377. def finish(self):
  378. while self._running:
  379. if self.cancelled(): pass # poll cancellation
  380. self.reap()
  381. return not self.cancelled() and self._failures == 0
  382. def _never_cancelled():
  383. return False
  384. def tag_remaining(xs):
  385. staging = []
  386. for x in xs:
  387. staging.append(x)
  388. if len(staging) > 5000:
  389. yield (staging.pop(0), None)
  390. n = len(staging)
  391. for i, x in enumerate(staging):
  392. yield (x, n - i - 1)
  393. def run(cmdlines,
  394. check_cancelled=_never_cancelled,
  395. maxjobs=None,
  396. newline_on_success=False,
  397. travis=False,
  398. infinite_runs=False,
  399. stop_on_failure=False,
  400. add_env={}):
  401. js = Jobset(check_cancelled,
  402. maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
  403. newline_on_success, travis, stop_on_failure, add_env)
  404. for cmdline, remaining in tag_remaining(cmdlines):
  405. if not js.start(cmdline):
  406. break
  407. if remaining is not None:
  408. js.set_remaining(remaining)
  409. js.finish()
  410. return js.get_num_failures(), js.resultset