3 # Copyright 2014, 2015, 2017 Michal Sojka <sojkam1@fel.cvut.cz>
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
14 Newest version can be found at https://github.com/wentasah/wvtest.
17 version = "v0-28-g299b716"
20 import subprocess as sp
30 # Regulr expression that matches potential prefixes to wvtest protocol lines
38 underline = '\033[04m'
40 strikethrough = '\033[09m'
41 invisible = '\033[08m'
50 lightgrey = '\033[37m'
53 lightgreen = '\033[92m'
55 lightblue = '\033[94m'
57 lightcyan = '\033[96m'
66 lightgrey = '\033[47m'
68 progress_chars = '|/-\\'
71 if os.environ['TERM'] == 'dumb':
75 self.output = open('/dev/tty', 'w')
82 self.width = self._get_width()
84 self._progress_msg = ''
85 self._progress_idx = 0
87 def _raw_write(self, string):
88 '''Write raw data if output is enabled.'''
89 if self._enabled and self.output:
91 self.output.write(string)
98 import fcntl, termios, struct, os
99 s = struct.pack('HHHH', 0, 0, 0, 0)
100 x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
101 return struct.unpack('HHHH', x)[1]
103 return int(getattr(os.environ, 'COLUMNS', 80))
105 def _clear_colors(self):
106 '''Sets all color and attribute memebers to empty strings'''
107 for cls in ('attr', 'fg', 'bg'):
108 c = getattr(self, cls)
114 def set_progress_msg(self, msg):
115 self._progress_msg = msg
116 self._progress_idx = 0
117 self.update_progress_msg()
119 def update_progress_msg(self):
120 self._progress_idx += 1
121 if self._progress_idx >= len(self.progress_chars):
122 self._progress_idx = 0
124 self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
126 def clear_progress_msg(self):
128 self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
134 def __init__(self, match):
135 for (key, val) in match.groupdict().items():
136 setattr(self, key, val)
138 def print(self, file=sys.stdout):
139 "Print the line (terminal is expected on output)"
140 print(str(self), file=file)
142 def log(self, file=sys.stdout):
143 "Print the line (without terminal escape sequences)"
147 class WvPlainLine(WvLine):
148 re = re.compile("(?P<line>.*)")
153 class WvTestingLine(WvLine):
154 re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
156 def __init__(self, *args):
158 WvLine.__init__(self, args[0])
164 raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
167 return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
169 def print(self, file=sys.stdout):
170 print(term.attr.bold + str(self) + term.attr.reset, file=file)
173 print(str(self), file=file)
175 def asWvCheckLine(self, result):
176 return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
178 class WvCheckLine(WvLine):
179 re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
181 def __init__(self, *args):
183 WvLine.__init__(self, args[0])
187 self.result = args[1]
189 raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
192 # Result == None when printing progress message
193 return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
195 def is_success(self):
196 return self.result == 'ok'
199 text = '{self.prefix}! {self.text} '.format(self=self)
200 if self.is_success():
201 color = term.fg.lightgreen
203 color = term.fg.lightred
204 result = term.attr.bold + color + self.result + term.attr.reset
206 lines = math.ceil(len(text) / term.width)
207 if len(text) % term.width > term.width - 10:
210 text = format(text, '.<' + str(lines * term.width - 10))
211 return '{text} {result}'.format(text=text, result=result)
213 def print(self, file=sys.stdout):
214 print(self.formated(), file=file)
216 def log(self, file=sys.stdout):
217 text = '{self.prefix}! {self.text} '.format(self=self)
218 print('{text:.<80} {result}'.format(text=text, result=self.result), file=file)
221 class WvTagLine(WvLine):
222 re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
224 class WvTestLog(list):
227 # Print one line for each "Testing" section. Passed tests are
228 # printed as "ok", failed tests as "FAILURE".
231 # Print one "ok" line for each passing "Testing" section.
232 # Failed "Testing" sections are printed verbosely.
235 # Print every line of the output, just
236 # reformat/syntax-highlight known lines.
239 def __init__(self, verbosity = Verbosity.NORMAL, junit_xml : io.IOBase = None,
242 self.checkFailedCount = 0
244 self.testFailedCount = 0
246 self.implicitTestTitle = None
247 self.currentTest = None
248 self.currentTestFailedCount = 0
250 self.verbosity = verbosity
251 self.show_progress = False
253 self.junit_xml = junit_xml
258 self.junitTestcases = []
262 if logdir and not os.path.isdir(logdir):
265 def setImplicitTestTitle (self, testing):
266 """If the test does not supply its own title as a first line of test
267 output, it this title will be used instead."""
268 self.implicitTestTitle = testing
270 def print(self, file=sys.stdout):
272 entry.print(file=file)
277 if 'formated' in dir(entry):
285 return "\n".join([str(entry) for entry in self]) + "\n"
287 def _rememberJUnitTestcase(self):
288 if not self.junit_xml:
292 if self.currentTestFailedCount > 0:
293 failure = wvjunit.Failure(text=self.plainText())
295 tc = wvjunit.Testcase(classname = self.currentTest.where,
296 name = self.currentTest.what,
297 time = time.time() - self.testStartTime,
299 self.junitTestcases.append(tc)
301 def _generateJUnitXML(self):
302 if not self.junit_xml:
304 ts = wvjunit.Testsuite(tests = self.testCount,
305 failures = self.testFailedCount,
309 hostname="localhost",
310 timestamp = datetime.datetime.now(),
311 testcases = self.junitTestcases)
312 ts.print(file = self.junit_xml)
314 def _finishCurrentTest(self):
315 self._rememberJUnitTestcase()
316 if self.currentTestFailedCount > 0:
317 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
318 term.clear_progress_msg()
319 if self.verbosity == self.Verbosity.NORMAL:
321 elif self.verbosity < self.Verbosity.NORMAL:
322 self.currentTest.asWvCheckLine('FAILED').print()
323 self.testFailedCount += 1
325 if self.verbosity <= self.Verbosity.NORMAL:
326 self.currentTest.asWvCheckLine('ok').print()
335 def _newTest(self, testing : WvTestingLine):
337 self._finishCurrentTest()
340 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
341 term.set_progress_msg(str(testing.asWvCheckLine(None)))
344 self.log = open(os.path.join(self.logdir, "%s-%s.log" %
345 (testing.where, testing.what.lower().replace(' ', '_'))),
347 self.testStartTime = time.time()
348 self.currentTest = testing
349 self.currentTestFailedCount = 0
351 def _newCheck(self, check : WvCheckLine):
353 if not check.is_success():
354 self.checkFailedCount += 1
355 self.currentTestFailedCount += 1
357 def append(self, logEntry : WvLine):
358 if self.implicitTestTitle:
359 if str(logEntry) == '':
361 elif type(logEntry) != WvTestingLine:
362 self._newTest(self.implicitTestTitle)
363 super().append(self.implicitTestTitle)
364 self.implicitTestTitle = None
366 self.implicitTestTitle = None
369 if type(logEntry) == WvTestingLine:
370 self._newTest(logEntry)
371 elif type(logEntry) == WvCheckLine:
372 self._newCheck(logEntry)
374 list.append(self, logEntry)
376 if self.verbosity == self.Verbosity.VERBOSE:
379 if self.show_progress:
380 term.update_progress_msg()
383 logEntry.log(self.log)
385 def addLine(self, line):
389 for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
390 match = lineClass.re.match(line)
392 logEntry = lineClass(match)
395 raise Exception("Non-matched line: {}".format(line))
397 self.append(logEntry)
402 self._generateJUnitXML()
404 print("WvTest: {total} test{plt}, {fail} failure{plf}."
405 .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
406 fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
407 def is_success(self):
408 return self.testFailedCount == 0
410 def _run(command, log):
411 log.show_progress = True
414 def kill_child(sig = None, frame = None):
415 os.killpg(proc.pid, sig)
417 def alarm(sig = None, frame = None):
418 msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
419 log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
420 kill_child(signal.SIGTERM)
422 signal.signal(signal.SIGINT, kill_child)
423 signal.signal(signal.SIGTERM, kill_child)
424 signal.signal(signal.SIGALRM, alarm)
426 cmd = command if isinstance(command, str) else ' '.join(command)
427 log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
429 # Popen does not seem to be able to call setpgrp(). Therefore, we
430 # use start_new_session, but this also create a new session and
431 # detaches the process from a terminal. This might be a problem
432 # for programs that need a terminal to run.
433 with sp.Popen(command, stdin=None, stdout=sp.PIPE, stderr=sp.STDOUT,
434 universal_newlines=False, start_new_session=True) as proc:
435 signal.alarm(timeout)
436 stdout = io.TextIOWrapper(proc.stdout, errors='replace')
438 signal.alarm(timeout)
443 if proc.returncode != 0:
444 if proc.returncode > 0:
445 msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
447 msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
449 text = msg.format(wvtool=sys.argv[0], cmd=cmd,
450 ec=proc.returncode, sig=-proc.returncode)
451 log.append(WvCheckLine(text, 'FAILED'))
453 def do_run(args, log):
454 _run(args.command, log)
456 def do_runall(args, log):
457 for cmd in args.commands:
460 def do_format(args, log):
463 log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
464 for line in sys.stdin:
467 for fn in args.infiles:
468 log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
469 for line in open(fn):
472 def do_wrap(args, log):
475 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
477 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
478 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
479 const=WvTestLog.Verbosity.VERBOSE,
480 help='Do not hide output of successful tests')
481 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
482 const=WvTestLog.Verbosity.SUMMARY,
483 help='''Hide output of all tests. Print just one line for each "Testing"
484 section and report "ok" or "FAILURE" of it.''')
485 parser.add_argument('--junit-xml', type=argparse.FileType('w'),
486 help='''Convert output to JUnit compatible XML file''')
487 parser.add_argument('--logdir',
488 help='''Store test logs in the given directory''')
489 parser.add_argument('--version', action='version', version='%(prog)s '+version)
491 subparsers = parser.add_subparsers(help='sub-command help')
493 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
494 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
495 parser_run.set_defaults(func=do_run)
497 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
498 parser_runall.set_defaults(func=do_runall)
499 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
501 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
502 parser_format.set_defaults(func=do_format)
503 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
505 # parser_wrap = subparsers.add_parser('wrap')
506 # parser_wrap.set_defaults(func=do_wrap)
508 args = parser.parse_args()
510 if not 'func' in args:
514 log = WvTestLog(args.verbosity, junit_xml = args.junit_xml, logdir=args.logdir)
517 sys.exit(0 if log.is_success() else 1)