3 # Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - FIXME: No "progress" reporting
11 - TODO: Conversion to HTML
12 - TODO: Variable timeout
13 - TODO: Checking of expected number of tests
17 import subprocess as sp
24 # Regulr expression that matches potential prefixes to wvtest protocol lines
31 underline = '\033[04m'
33 strikethrough = '\033[09m'
34 invisible = '\033[08m'
43 lightgrey = '\033[37m'
46 lightgreen = '\033[92m'
48 lightblue = '\033[94m'
50 lightcyan = '\033[96m'
59 lightgrey = '\033[47m'
61 def __init__(self, use_colors):
62 def clear_colors(obj):
66 if key in ('fg', 'bg'):
67 clear_colors(getattr(obj, key))
77 import fcntl, termios, struct, os
78 cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
85 fd = os.open(os.ctermid(), os.O_RDONLY)
92 self.width = int(getattr(os.environ, 'COLUMNS', 80))
94 term = Term(sys.stdout.isatty() and os.environ['TERM'] != 'dumb')
97 def __init__(self, match):
98 for (key, val) in match.groupdict().items():
99 setattr(self, key, val)
105 class WvPlainLine(WvLine):
106 re = re.compile("(?P<line>.*)")
110 class WvTestingLine(WvLine):
111 re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
112 def __init__(self, *args):
114 WvLine.__init__(self, args[0])
120 raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
122 return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
124 print(term.bold + str(self) + term.reset)
126 def asWvCheckLine(self, result):
127 return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
129 class WvCheckLine(WvLine):
130 re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
131 def __init__(self, *args):
133 WvLine.__init__(self, args[0])
137 self.result = args[1]
139 raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
142 return '{self.prefix}! {self.text} {self.result}'.format(self=self)
144 def is_success(self):
145 return self.result == 'ok'
148 text = '{self.prefix}! {self.text} '.format(self=self)
149 if self.is_success():
150 color = term.fg.lightgreen
152 color = term.fg.lightred
153 result = term.bold + color + self.result + term.reset
155 lines = math.ceil(len(text) / term.width)
156 if len(text) % term.width > term.width - 10:
159 text = format(text, '.<' + str(lines * term.width - 10))
160 print('{text} {result}'.format(text=text, result=result))
162 class WvTagLine(WvLine):
163 re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
165 class WvTestLog(list):
168 # Print one line for each "Testing" section. Passed tests are
169 # printed as "ok", failed tests as "FAILURE".
172 # Print one "ok" line for each passing "Testing" section.
173 # Failed "Testing" sections are printed verbosely.
176 # Print every line of the output, just
177 # reformat/syntax-highlight known lines.
180 def __init__(self, verbosity = Verbosity.NORMAL):
182 self.checkFailedCount = 0
184 self.testFailedCount = 0
186 self.implicitTestTitle = None
187 self.currentTest = None
188 self.currentTestFailedCount = 0
190 self.verbosity = verbosity
192 def setImplicitTestTitle (self, testing):
193 """If the test does not supply its own title as a first line of test
194 output, it this title will be used instead."""
195 self.implicitTestTitle = testing
201 def _finishCurrentTest(self):
202 if self.currentTestFailedCount > 0:
203 if self.verbosity >= self.Verbosity.NORMAL:
206 self.currentTest.asWvCheckLine('FAILED').print()
207 self.testFailedCount += 1
209 if self.verbosity <= self.Verbosity.NORMAL:
210 self.currentTest.asWvCheckLine('ok').print()
213 def _newTest(self, testing):
215 self._finishCurrentTest()
218 self.currentTest = testing
219 self.currentTestFailedCount = 0
221 def _newCheck(self, check):
223 if not check.is_success():
224 self.checkFailedCount += 1
225 self.currentTestFailedCount += 1
227 def append(self, logEntry):
228 if self.implicitTestTitle and type(logEntry) != WvTestingLine:
229 self._newTest(self.implicitTestTitle)
230 super().append(self.implicitTestTitle)
231 self.implicitTestTitle = None
233 if type(logEntry) == WvTestingLine:
234 self._newTest(logEntry)
235 elif type(logEntry) == WvCheckLine:
236 self._newCheck(logEntry)
238 list.append(self, logEntry)
240 if self.verbosity == self.Verbosity.VERBOSE:
244 def addLine(self, line):
248 for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
249 match = lineClass.re.match(line)
251 logEntry = lineClass(match)
254 raise Exception("Non-matched line: {}".format(line))
256 self.append(logEntry)
261 print("WvTest: {total} test{plt}, {fail} failure{plf}."
262 .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
263 fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
264 def is_success(self):
265 return self.testFailedCount == 0
267 def _run(command, log):
270 def kill_child(sig = None, frame = None):
271 os.killpg(proc.pid, sig)
273 def alarm(sig = None, frame = None):
274 msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
275 log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
276 kill_child(signal.SIGTERM)
278 signal.signal(signal.SIGINT, kill_child)
279 signal.signal(signal.SIGTERM, kill_child)
280 signal.signal(signal.SIGALRM, alarm)
282 cmd = command if isinstance(command, str) else ' '.join(command)
283 log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
285 # Popen does not seep to be able to call setpgrp(). Therefore, we
286 # use start_new_session, but this also create a new session and
287 # detaches the process from a terminal. This might be a problem
288 # for programs that need a terminal to run.
289 with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
290 universal_newlines=True, start_new_session=True) as proc:
291 signal.alarm(timeout)
292 for line in proc.stdout:
293 signal.alarm(timeout)
298 if proc.returncode != 0:
299 if proc.returncode > 0:
300 msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
302 msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
304 text = msg.format(wvtool=sys.argv[0], cmd=cmd,
305 ec=proc.returncode, sig=-proc.returncode)
306 log.append(WvCheckLine(text, 'FAILED'))
308 def do_run(args, log):
309 _run(args.command, log)
311 def do_runall(args, log):
312 for cmd in args.commands:
315 def do_format(args, log):
318 log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
319 for line in sys.stdin:
322 for fn in args.infiles:
323 log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
324 for line in open(fn):
327 def do_wrap(args, log):
330 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
332 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
333 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
334 const=WvTestLog.Verbosity.VERBOSE,
335 help='Do not hide output of successful tests')
336 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
337 const=WvTestLog.Verbosity.SUMMARY,
338 help='''Hide output of all tests. Print just one line for each "Testing"
339 section and report "ok" or "FAILURE" of it.''')
341 subparsers = parser.add_subparsers(help='sub-command help')
343 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
344 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
345 parser_run.set_defaults(func=do_run)
347 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
348 parser_runall.set_defaults(func=do_runall)
349 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
351 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protcol output')
352 parser_format.set_defaults(func=do_format)
353 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
355 # parser_wrap = subparsers.add_parser('wrap')
356 # parser_wrap.set_defaults(func=do_wrap)
358 args = parser.parse_args()
360 log = WvTestLog(args.verbosity)
363 sys.exit(0 if log.is_success() else 1)