+#!/usr/bin/env python3
+
+# Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
+# License: GPLv2+
+
+"""Versatile WvTest protocol tool. It replaces wvtestrun script and
+provides some other useful features. Namely:
+- Summary mode (--summary)
+- Test results aligned to the same column
+- FIXME: No "progress" reporting
+- TODO: Conversion to HTML
+- TODO: Variable timeout
+- TODO: Checking of expected number of tests
+"""
+
+import argparse
+import subprocess as sp
+import re
+import sys
+import os
+import signal
+import math
+
+# Regulr expression that matches potential prefixes to wvtest protocol lines
+re_prefix = ''
+
+class Term:
+ reset = '\033[0m'
+ bold = '\033[01m'
+ disable = '\033[02m'
+ underline = '\033[04m'
+ reverse = '\033[07m'
+ strikethrough = '\033[09m'
+ invisible = '\033[08m'
+ class fg:
+ black = '\033[30m'
+ red = '\033[31m'
+ green = '\033[32m'
+ orange = '\033[33m'
+ blue = '\033[34m'
+ purple = '\033[35m'
+ cyan = '\033[36m'
+ lightgrey = '\033[37m'
+ darkgrey = '\033[90m'
+ lightred = '\033[91m'
+ lightgreen = '\033[92m'
+ yellow = '\033[93m'
+ lightblue = '\033[94m'
+ pink = '\033[95m'
+ lightcyan = '\033[96m'
+ class bg:
+ black = '\033[40m'
+ red = '\033[41m'
+ green = '\033[42m'
+ orange = '\033[43m'
+ blue = '\033[44m'
+ purple = '\033[45m'
+ cyan = '\033[46m'
+ lightgrey = '\033[47m'
+
+ def __init__(self, use_colors):
+ def clear_colors(obj):
+ for key in dir(obj):
+ if key[0] == '_':
+ continue
+ if key in ('fg', 'bg'):
+ clear_colors(getattr(obj, key))
+ continue
+ setattr(obj, key, '')
+
+ if not use_colors:
+ clear_colors(self)
+
+ if use_colors:
+ def ioctl_GWINSZ(fd):
+ try:
+ import fcntl, termios, struct, os
+ cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
+ except:
+ return
+ return cr
+ cr = ioctl_GWINSZ(1)
+ if not cr:
+ try:
+ fd = os.open(os.ctermid(), os.O_RDONLY)
+ cr = ioctl_GWINSZ(fd)
+ os.close(fd)
+ except:
+ pass
+ self.width = cr[1]
+ else:
+ self.width = int(getattr(os.environ, 'COLUMNS', 80))
+
+term = Term(sys.stdout.isatty() and os.environ['TERM'] != 'dumb')
+
+class WvLine:
+ def __init__(self, match):
+ for (key, val) in match.groupdict().items():
+ setattr(self, key, val)
+
+ def print(self):
+ print(str(self))
+
+
+class WvPlainLine(WvLine):
+ re = re.compile("(?P<line>.*)")
+ def __str__(self):
+ return self.line
+
+class WvTestingLine(WvLine):
+ re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
+ def __init__(self, *args):
+ if len(args) == 1:
+ WvLine.__init__(self, args[0])
+ elif len(args) == 2:
+ self.prefix = ''
+ self.what = args[0]
+ self.where = args[1]
+ else:
+ raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
+ def __str__(self):
+ return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
+ def print(self):
+ print(term.bold + str(self) + term.reset)
+
+ def asWvCheckLine(self, result):
+ return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
+
+class WvCheckLine(WvLine):
+ re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
+ def __init__(self, *args):
+ if len(args) == 1:
+ WvLine.__init__(self, args[0])
+ elif len(args) == 2:
+ self.prefix = ''
+ self.text = args[0]
+ self.result = args[1]
+ else:
+ raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
+
+ def __str__(self):
+ return '{self.prefix}! {self.text} {self.result}'.format(self=self)
+
+ def is_success(self):
+ return self.result == 'ok'
+
+ def print(self):
+ text = '{self.prefix}! {self.text} '.format(self=self)
+ if self.is_success():
+ color = term.fg.lightgreen
+ else:
+ color = term.fg.lightred
+ result = term.bold + color + self.result + term.reset
+
+ lines = math.ceil(len(text) / term.width)
+ if len(text) % term.width > term.width - 10:
+ lines += 1
+
+ text = format(text, '.<' + str(lines * term.width - 10))
+ print('{text} {result}'.format(text=text, result=result))
+
+class WvTagLine(WvLine):
+ re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
+
+class WvTestLog(list):
+
+ class Verbosity:
+ # Print one line for each "Testing" section. Passed tests are
+ # printed as "ok", failed tests as "FAILURE".
+ SUMMARY = 1
+
+ # Print one "ok" line for each passing "Testing" section.
+ # Failed "Testing" sections are printed verbosely.
+ NORMAL = 2
+
+ # Print every line of the output, just
+ # reformat/syntax-highlight known lines.
+ VERBOSE = 3
+
+ def __init__(self, verbosity = Verbosity.NORMAL):
+ self.checkCount = 0
+ self.checkFailedCount = 0
+ self.testCount = 0
+ self.testFailedCount = 0
+
+ self.implicitTestTitle = None
+ self.currentTest = None
+ self.currentTestFailedCount = 0
+
+ self.verbosity = verbosity
+
+ def setImplicitTestTitle (self, testing):
+ """If the test does not supply its own title as a first line of test
+ output, it this title will be used instead."""
+ self.implicitTestTitle = testing
+
+ def print(self):
+ for entry in self:
+ entry.print()
+
+ def _finishCurrentTest(self):
+ if self.currentTestFailedCount > 0:
+ if self.verbosity >= self.Verbosity.NORMAL:
+ self.print()
+ else:
+ self.currentTest.asWvCheckLine('FAILED').print()
+ self.testFailedCount += 1
+ else:
+ if self.verbosity <= self.Verbosity.NORMAL:
+ self.currentTest.asWvCheckLine('ok').print()
+ self.clear()
+
+ def _newTest(self, testing):
+ if self.currentTest:
+ self._finishCurrentTest()
+ if testing != None:
+ self.testCount += 1
+ self.currentTest = testing
+ self.currentTestFailedCount = 0
+
+ def _newCheck(self, check):
+ self.checkCount += 1
+ if not check.is_success():
+ self.checkFailedCount += 1
+ self.currentTestFailedCount += 1
+
+ def append(self, logEntry):
+ if self.implicitTestTitle and type(logEntry) != WvTestingLine:
+ self._newTest(self.implicitTestTitle)
+ super().append(self.implicitTestTitle)
+ self.implicitTestTitle = None
+
+ if type(logEntry) == WvTestingLine:
+ self._newTest(logEntry)
+ elif type(logEntry) == WvCheckLine:
+ self._newCheck(logEntry)
+
+ list.append(self, logEntry)
+
+ if self.verbosity == self.Verbosity.VERBOSE:
+ self.print()
+ self.clear()
+
+ def addLine(self, line):
+ line = line.rstrip()
+ logEntry = None
+
+ for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
+ match = lineClass.re.match(line)
+ if match:
+ logEntry = lineClass(match)
+ break
+ if not logEntry:
+ raise Exception("Non-matched line: {}".format(line))
+
+ self.append(logEntry)
+
+ def done(self):
+ self._newTest(None)
+
+ print("WvTest: {total} test{plt}, {fail} failure{plf}."
+ .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
+ fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
+ def is_success(self):
+ return self.testFailedCount == 0
+
+def _run(command, log):
+ timeout = 100
+
+ def kill_child(sig = None, frame = None):
+ os.killpg(proc.pid, sig)
+
+ def alarm(sig = None, frame = None):
+ msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
+ log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
+ kill_child(signal.SIGTERM)
+
+ signal.signal(signal.SIGINT, kill_child)
+ signal.signal(signal.SIGTERM, kill_child)
+ signal.signal(signal.SIGALRM, alarm)
+
+ cmd = command if isinstance(command, str) else ' '.join(command)
+ log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
+
+ # Popen does not seep to be able to call setpgrp(). Therefore, we
+ # use start_new_session, but this also create a new session and
+ # detaches the process from a terminal. This might be a problem
+ # for programs that need a terminal to run.
+ with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
+ universal_newlines=True, start_new_session=True) as proc:
+ signal.alarm(timeout)
+ for line in proc.stdout:
+ signal.alarm(timeout)
+ log.addLine(line)
+
+ signal.alarm(0)
+
+ if proc.returncode != 0:
+ if proc.returncode > 0:
+ msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
+ else:
+ msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
+
+ text = msg.format(wvtool=sys.argv[0], cmd=cmd,
+ ec=proc.returncode, sig=-proc.returncode)
+ log.append(WvCheckLine(text, 'FAILED'))
+
+def do_run(args, log):
+ _run(args.command, log)
+
+def do_runall(args, log):
+ for cmd in args.commands:
+ _run(cmd, log)
+
+def do_format(args, log):
+ files = args.infiles
+ if len(files) == 0:
+ log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
+ for line in sys.stdin:
+ log.addLine(line)
+ else:
+ for fn in args.infiles:
+ log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
+ for line in open(fn):
+ log.addLine(line)
+
+def do_wrap(args, log):
+ pass
+
+parser = argparse.ArgumentParser(description='Versatile wvtest tool')
+
+parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
+parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
+ const=WvTestLog.Verbosity.VERBOSE,
+ help='Do not hide output of successful tests')
+parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
+ const=WvTestLog.Verbosity.SUMMARY,
+ help='''Hide output of all tests. Print just one line for each "Testing"
+ section and report "ok" or "FAILURE" of it.''')
+
+subparsers = parser.add_subparsers(help='sub-command help')
+
+parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
+parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
+parser_run.set_defaults(func=do_run)
+
+parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
+parser_runall.set_defaults(func=do_runall)
+parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
+
+parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protcol output')
+parser_format.set_defaults(func=do_format)
+parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
+
+# parser_wrap = subparsers.add_parser('wrap')
+# parser_wrap.set_defaults(func=do_wrap)
+
+args = parser.parse_args()
+
+log = WvTestLog(args.verbosity)
+args.func(args, log)
+log.done()
+sys.exit(0 if log.is_success() else 1)