3 # Copyright 2014, 2015, 2017 Michal Sojka <sojkam1@fel.cvut.cz>
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - (Experimental) Export to JUnit XML
11 - TODO: Conversion to HTML
12 - TODO: Variable timeout
13 - TODO: Checking of expected number of tests
15 Newest version can be found at https://github.com/wentasah/wvtest.
18 version = "v0-53-g52e7819"
21 import subprocess as sp
32 # Regulr expression that matches potential prefixes to wvtest protocol lines
40 underline = '\033[04m'
42 strikethrough = '\033[09m'
43 invisible = '\033[08m'
52 lightgrey = '\033[37m'
53 darkgrey = '\033[1;30m'
54 lightred = '\033[1;31m'
55 lightgreen = '\033[1;32m'
57 lightblue = '\033[1;34m'
59 lightcyan = '\033[1;36m'
68 lightgrey = '\033[47m'
70 progress_chars = '|/-\\'
72 def __init__(self, width=None):
73 if not 'TERM' in os.environ or os.environ['TERM'] == 'dumb':
77 self.output = open('/dev/tty', 'w')
81 self.width = width or self._get_width()
83 self._progress_msg = ''
84 self._progress_idx = 0
86 def _raw_write(self, string):
87 '''Write raw data if output is enabled.'''
88 if self._enabled and self.output:
90 self.output.write(string)
97 import fcntl, termios, struct, os
98 s = struct.pack('HHHH', 0, 0, 0, 0)
99 x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
100 width = struct.unpack('HHHH', x)[1]
105 return int(getattr(os.environ, 'COLUMNS', 80))
107 def clear_colors(self):
108 '''Sets all color and attribute memebers to empty strings'''
109 for cls in ('attr', 'fg', 'bg'):
110 c = getattr(self, cls)
116 def set_progress_msg(self, msg):
117 self._progress_msg = msg
118 self._progress_idx = 0
119 self.update_progress_msg()
121 def update_progress_msg(self):
122 self._progress_idx += 1
123 if self._progress_idx >= len(self.progress_chars):
124 self._progress_idx = 0
126 self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
128 def clear_progress_msg(self):
130 self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
134 def __init__(self, match):
135 for (key, val) in match.groupdict().items():
136 setattr(self, key, val)
138 def print(self, file=sys.stdout):
139 "Print the line (terminal is expected on output)"
140 print(str(self), file=file)
142 def log(self, file=sys.stdout):
143 "Print the line (without terminal escape sequences)"
147 class WvPlainLine(WvLine):
148 re = re.compile("(?P<line>.*)")
153 class WvTestingLine(WvLine):
154 re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
156 def __init__(self, *args):
158 WvLine.__init__(self, args[0])
164 raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
167 return '{self.prefix}Testing "{self.what}" in {self.where}:'.format(self=self)
169 def print(self, file=sys.stdout):
170 print(term.attr.bold + str(self) + term.attr.reset, file=file)
173 print(str(self), file=file)
175 def asWvCheckLine(self, result):
176 return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
178 class WvCheckLine(WvLine):
179 re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
181 def __init__(self, *args):
183 WvLine.__init__(self, args[0])
186 self.text = args[0].rstrip(' .')
187 self.result = args[1]
189 raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
192 # Result == None when printing progress message
193 return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
195 def is_success(self):
196 return self.result == 'ok'
198 def formated(self, highlight=True, include_newlines=False, result_space=10):
199 text = '{self.prefix}! {self.text} '.format(self=self)
201 if self.is_success():
202 color = term.fg.lightgreen
204 color = term.fg.lightred
206 result = term.attr.bold + color + self.result + term.attr.reset
212 lines = math.ceil((len(text) + result_space) / width)
213 text = format(text, '.<' + str(lines * width - result_space))
215 for i in reversed(range(width, width*lines, width)):
216 text = text[:i] + '\n' + text[i:]
217 return '{text} {result}'.format(text=text, result=result)
219 def print(self, file=sys.stdout):
220 print(self.formated(), file=file)
222 def log(self, file=sys.stdout):
223 text = '{self.prefix}! {self.text} '.format(self=self)
224 print('{text:.<80} {result}'.format(text=text, result=self.result), file=file)
227 class WvTagLine(WvLine):
228 re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
230 class WvTestProcessor(list):
233 # Print one line for each "Testing" section. Passed tests are
234 # printed as "ok", failed tests as "FAILURE".
237 # Print one "ok" line for each passing "Testing" section.
238 # Failed "Testing" sections are printed verbosely.
241 # Print every line of the output, just
242 # reformat/syntax-highlight known lines.
246 verbosity = Verbosity.NORMAL,
247 junit_xml: io.IOBase = None,
248 junit_prefix: str = '',
251 self.checkFailedCount = 0
253 self.testFailedCount = 0
255 self.implicitTestTitle = None
256 self.currentTest = None
258 self.verbosity = verbosity
259 self.show_progress = False
261 self.junit_xml = junit_xml
262 self.junit_prefix = junit_prefix
267 self.junitTestcases = []
268 self.junitTestsuites = []
272 if logdir and not os.path.isdir(logdir):
275 def setImplicitTestTitle (self, testing):
276 """If the test does not supply its own title as the first line of test
277 output, this title will be used instead."""
278 self.implicitTestTitle = testing
280 def print(self, file=sys.stdout):
282 entry.print(file=file)
287 if 'formated' in dir(entry):
295 return "\n".join([str(entry) for entry in self]) + "\n"
297 def _rememberJUnitTestcase(self, check: WvCheckLine):
298 if not self.junit_xml:
302 duration = t - (self.lastCheckTime or self.testStartTime)
303 self.lastCheckTime = t
305 if not check.is_success():
306 failure = wvjunit.Failure(type='WvTest check',
311 self.junitTestcases.append(
313 classname="{}{}.{}".format(
315 self.currentTest.where.replace('.', '_'),
316 self.currentTest.what),
321 def _rememberJUnitTestsuite(self):
322 if not self.junit_xml:
325 system_out = wvjunit.SystemOut(text=self.plainText())
327 ts = wvjunit.Testsuite(tests=self.checkCount,
328 failures=self.checkFailedCount,
330 name="{}{}.{}".format(
332 self.currentTest.where.replace('.', '_'),
333 self.currentTest.what),
334 time=time.time()-self.testStartTime,
335 hostname=socket.getfqdn(),
336 timestamp=datetime.datetime.now(),
337 testcases=self.junitTestcases,
338 system_out=system_out)
339 self.junitTestsuites.append(ts)
340 self.junitTestcases = []
342 def _generateJUnitXML(self):
343 if not self.junit_xml:
345 tss = wvjunit.Testsuites(testsuites=self.junitTestsuites)
346 tss.print(file=self.junit_xml)
348 def _finishCurrentTest(self):
349 self._rememberJUnitTestsuite()
350 if self.checkFailedCount > 0:
351 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
352 term.clear_progress_msg()
353 if self.verbosity == self.Verbosity.NORMAL:
355 elif self.verbosity < self.Verbosity.NORMAL:
356 self.currentTest.asWvCheckLine('FAILED').print()
357 self.testFailedCount += 1
359 if self.verbosity <= self.Verbosity.NORMAL:
360 self.currentTest.asWvCheckLine('ok').print()
369 def _newTest(self, testing : WvTestingLine):
371 self._finishCurrentTest()
374 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
375 term.set_progress_msg(str(testing.asWvCheckLine(None)))
378 trans = str.maketrans(' /', '__')
379 self.log = open(os.path.join(self.logdir, "%04d-%s-%s.log" %
381 testing.where.translate(trans),
382 testing.what.lower().translate(trans))),
384 self.testStartTime = time.time()
385 self.lastCheckTime = None
386 self.currentTest = testing
388 self.checkFailedCount = 0
390 def _newCheck(self, check: WvCheckLine):
392 if not check.is_success():
393 self.checkFailedCount += 1
394 self._rememberJUnitTestcase(check)
396 def append(self, logEntry: WvLine):
397 if self.implicitTestTitle:
398 if str(logEntry) == '':
400 elif type(logEntry) != WvTestingLine:
401 self._newTest(self.implicitTestTitle)
402 super().append(self.implicitTestTitle)
403 self.implicitTestTitle = None
405 self.implicitTestTitle = None
408 if type(logEntry) == WvTestingLine:
409 self._newTest(logEntry)
410 elif type(logEntry) == WvCheckLine:
411 self._newCheck(logEntry)
413 list.append(self, logEntry)
415 if self.verbosity == self.Verbosity.VERBOSE:
418 if self.show_progress:
419 term.update_progress_msg()
422 logEntry.log(self.log)
424 def processLine(self, line):
428 for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
429 match = lineClass.re.match(line)
431 logEntry = lineClass(match)
434 raise Exception("Non-matched line: {}".format(line))
436 self.append(logEntry)
441 self._generateJUnitXML()
443 print("WvTest: {total} test{plt}, {fail} failure{plf}."
444 .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
445 fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
446 def is_success(self):
447 return self.testFailedCount == 0
449 def _run(command, processor, timeout=100):
450 processor.show_progress = True
453 def kill_child(sig = None, frame = None):
454 os.killpg(proc.pid, sig)
456 def alarm(sig = None, frame = None):
457 msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
458 processor.processLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
459 kill_child(signal.SIGTERM)
461 signal.signal(signal.SIGINT, kill_child)
462 signal.signal(signal.SIGTERM, kill_child)
463 signal.signal(signal.SIGALRM, alarm)
465 cmd = command if isinstance(command, str) else ' '.join(command)
466 processor.setImplicitTestTitle(WvTestingLine("Preamble of "+cmd, "wvtool"))
468 # Popen does not seem to be able to call setpgrp(). Therefore, we
469 # use start_new_session, but this also create a new session and
470 # detaches the process from a terminal. This might be a problem
471 # for programs that need a terminal to run.
472 with sp.Popen(command, stdin=None, stdout=sp.PIPE, stderr=sp.STDOUT,
473 universal_newlines=False, start_new_session=True) as proc:
474 signal.alarm(timeout)
475 stdout = io.TextIOWrapper(proc.stdout, errors='replace')
477 signal.alarm(timeout)
478 processor.processLine(line)
482 if proc.returncode != 0:
483 if proc.returncode > 0:
484 msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
486 msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
488 text = msg.format(wvtool=sys.argv[0], cmd=cmd,
489 ec=proc.returncode, sig=-proc.returncode)
490 processor.append(WvCheckLine(text, 'FAILED'))
492 def do_run(args, processor):
493 _run(args.command, processor, timeout=args.timeout)
495 def do_runall(args, processor):
496 for cmd in args.commands:
499 def do_format(args, processor):
502 processor.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
503 for line in io.TextIOWrapper(sys.stdin.buffer, errors='replace'):
504 processor.processLine(line)
506 for fn in args.infiles:
507 processor.setImplicitTestTitle(WvTestingLine("Preamble", fn))
508 for line in open(fn, errors='replace'):
509 processor.processLine(line)
511 def do_wrap(args, processor):
514 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
517 parser.set_defaults(verbosity=WvTestProcessor.Verbosity.NORMAL)
518 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
519 const=WvTestProcessor.Verbosity.VERBOSE,
520 help='Do not hide output of successful tests')
521 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
522 const=WvTestProcessor.Verbosity.SUMMARY,
523 help='''Hide output of all tests. Print just one line for each "Testing"
524 section and report "ok" or "FAILURE" of it.''')
525 parser.add_argument('-w', '--width', type=int,
526 help='Override terminal width or COLUMNS environment wariable.')
527 parser.add_argument('--timeout', type=int, default=100, metavar='SEC',
528 help='Timeout in seconds for any test output (default %(default)s)')
529 parser.add_argument('--junit-xml', type=argparse.FileType('w'), metavar='FILE',
530 help='''Convert output to JUnit compatible XML file''')
531 parser.add_argument('--junit-prefix', metavar='STR',
532 help='''Prefix to prepend to generated class names (useful when a test is
533 run multiple times in different environments)''')
534 parser.add_argument('--logdir', metavar='DIR',
535 help='''Store test logs in the given directory''')
536 parser.add_argument('--color', action='store_true', default=None,
537 help='Force color output')
538 parser.add_argument('--no-color', action='store_false', dest='color',
539 help='Disable color output')
541 parser.add_argument('--version', action='version', version='%(prog)s '+version)
543 subparsers = parser.add_subparsers(help='sub-command help')
545 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
546 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
547 parser_run.set_defaults(func=do_run)
549 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
550 parser_runall.set_defaults(func=do_runall)
551 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
553 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
554 parser_format.set_defaults(func=do_format)
555 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
557 # parser_wrap = subparsers.add_parser('wrap')
558 # parser_wrap.set_defaults(func=do_wrap)
560 args = parser.parse_args()
561 term = Term(args.width)
562 if args.color is None and not term.output or \
566 if not 'func' in args:
570 processor = WvTestProcessor(
572 junit_xml = args.junit_xml,
573 junit_prefix = args.junit_prefix,
575 args.func(args, processor)
577 sys.exit(0 if processor.is_success() else 1)