3 # Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
14 Newest version can be found at https://github.com/wentasah/wvtest.
17 version = "v0-6-gca4c750"
20 import subprocess as sp
27 # Regulr expression that matches potential prefixes to wvtest protocol lines
35 underline = '\033[04m'
37 strikethrough = '\033[09m'
38 invisible = '\033[08m'
47 lightgrey = '\033[37m'
50 lightgreen = '\033[92m'
52 lightblue = '\033[94m'
54 lightcyan = '\033[96m'
63 lightgrey = '\033[47m'
65 progress_chars = '|/-\\'
68 if os.environ['TERM'] == 'dumb':
72 self.output = open('/dev/tty', 'w')
79 self.width = self._get_width()
82 def _raw_write(self, string):
83 '''Write raw data if output is enabled.'''
84 if self._enabled and self.output:
86 self.output.write(string)
93 import fcntl, termios, struct, os
94 s = struct.pack('HHHH', 0, 0, 0, 0)
95 x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
96 return struct.unpack('HHHH', x)[1]
98 return int(getattr(os.environ, 'COLUMNS', 80))
100 def _clear_colors(self):
101 '''Sets all color and attribute memebers to empty strings'''
102 for cls in ('attr', 'fg', 'bg'):
103 c = getattr(self, cls)
109 def set_progress_msg(self, msg):
110 self._progress_msg = msg
111 self._progress_idx = 0
112 self.update_progress_msg()
114 def update_progress_msg(self):
115 self._progress_idx += 1
116 if self._progress_idx >= len(self.progress_chars):
117 self._progress_idx = 0
119 self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
121 def clear_progress_msg(self):
123 self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
129 def __init__(self, match):
130 for (key, val) in match.groupdict().items():
131 setattr(self, key, val)
137 class WvPlainLine(WvLine):
138 re = re.compile("(?P<line>.*)")
142 class WvTestingLine(WvLine):
143 re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
144 def __init__(self, *args):
146 WvLine.__init__(self, args[0])
152 raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
154 return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
156 print(term.attr.bold + str(self) + term.attr.reset)
158 def asWvCheckLine(self, result):
159 return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
161 class WvCheckLine(WvLine):
162 re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
163 def __init__(self, *args):
165 WvLine.__init__(self, args[0])
169 self.result = args[1]
171 raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
174 # Result == None when printing progress message
175 return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
177 def is_success(self):
178 return self.result == 'ok'
181 text = '{self.prefix}! {self.text} '.format(self=self)
182 if self.is_success():
183 color = term.fg.lightgreen
185 color = term.fg.lightred
186 result = term.attr.bold + color + self.result + term.attr.reset
188 lines = math.ceil(len(text) / term.width)
189 if len(text) % term.width > term.width - 10:
192 text = format(text, '.<' + str(lines * term.width - 10))
193 print('{text} {result}'.format(text=text, result=result))
195 class WvTagLine(WvLine):
196 re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
198 class WvTestLog(list):
201 # Print one line for each "Testing" section. Passed tests are
202 # printed as "ok", failed tests as "FAILURE".
205 # Print one "ok" line for each passing "Testing" section.
206 # Failed "Testing" sections are printed verbosely.
209 # Print every line of the output, just
210 # reformat/syntax-highlight known lines.
213 def __init__(self, verbosity = Verbosity.NORMAL):
215 self.checkFailedCount = 0
217 self.testFailedCount = 0
219 self.implicitTestTitle = None
220 self.currentTest = None
221 self.currentTestFailedCount = 0
223 self.verbosity = verbosity
224 self.show_progress = False
226 def setImplicitTestTitle (self, testing):
227 """If the test does not supply its own title as a first line of test
228 output, it this title will be used instead."""
229 self.implicitTestTitle = testing
235 def _finishCurrentTest(self):
236 if self.currentTestFailedCount > 0:
237 if self.verbosity >= self.Verbosity.NORMAL:
240 self.currentTest.asWvCheckLine('FAILED').print()
241 self.testFailedCount += 1
243 if self.verbosity <= self.Verbosity.NORMAL:
244 self.currentTest.asWvCheckLine('ok').print()
251 def _newTest(self, testing : WvTestingLine):
253 self._finishCurrentTest()
256 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
257 term.set_progress_msg(str(testing.asWvCheckLine(None)))
258 self.currentTest = testing
259 self.currentTestFailedCount = 0
261 def _newCheck(self, check : WvCheckLine):
263 if not check.is_success():
264 self.checkFailedCount += 1
265 self.currentTestFailedCount += 1
267 def append(self, logEntry : WvLine):
268 if self.implicitTestTitle:
269 if str(logEntry) == '':
271 elif type(logEntry) != WvTestingLine:
272 self._newTest(self.implicitTestTitle)
273 super().append(self.implicitTestTitle)
274 self.implicitTestTitle = None
276 self.implicitTestTitle = None
279 if type(logEntry) == WvTestingLine:
280 self._newTest(logEntry)
281 elif type(logEntry) == WvCheckLine:
282 self._newCheck(logEntry)
284 list.append(self, logEntry)
286 if self.verbosity == self.Verbosity.VERBOSE:
287 if self.show_progress:
288 term.clear_progress_msg()
292 if self.show_progress:
293 term.update_progress_msg()
295 def addLine(self, line):
299 for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
300 match = lineClass.re.match(line)
302 logEntry = lineClass(match)
305 raise Exception("Non-matched line: {}".format(line))
307 self.append(logEntry)
312 print("WvTest: {total} test{plt}, {fail} failure{plf}."
313 .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
314 fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
315 def is_success(self):
316 return self.testFailedCount == 0
318 def _run(command, log):
319 log.show_progress = True
322 def kill_child(sig = None, frame = None):
323 os.killpg(proc.pid, sig)
325 def alarm(sig = None, frame = None):
326 msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
327 log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
328 kill_child(signal.SIGTERM)
330 signal.signal(signal.SIGINT, kill_child)
331 signal.signal(signal.SIGTERM, kill_child)
332 signal.signal(signal.SIGALRM, alarm)
334 cmd = command if isinstance(command, str) else ' '.join(command)
335 log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
337 # Popen does not seep to be able to call setpgrp(). Therefore, we
338 # use start_new_session, but this also create a new session and
339 # detaches the process from a terminal. This might be a problem
340 # for programs that need a terminal to run.
341 with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
342 universal_newlines=True, start_new_session=True) as proc:
343 signal.alarm(timeout)
344 for line in proc.stdout:
345 signal.alarm(timeout)
350 if proc.returncode != 0:
351 if proc.returncode > 0:
352 msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
354 msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
356 text = msg.format(wvtool=sys.argv[0], cmd=cmd,
357 ec=proc.returncode, sig=-proc.returncode)
358 log.append(WvCheckLine(text, 'FAILED'))
360 def do_run(args, log):
361 _run(args.command, log)
363 def do_runall(args, log):
364 for cmd in args.commands:
367 def do_format(args, log):
370 log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
371 for line in sys.stdin:
374 for fn in args.infiles:
375 log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
376 for line in open(fn):
379 def do_wrap(args, log):
382 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
384 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
385 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
386 const=WvTestLog.Verbosity.VERBOSE,
387 help='Do not hide output of successful tests')
388 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
389 const=WvTestLog.Verbosity.SUMMARY,
390 help='''Hide output of all tests. Print just one line for each "Testing"
391 section and report "ok" or "FAILURE" of it.''')
392 parser.add_argument('--version', action='version', version='%(prog)s '+version)
394 subparsers = parser.add_subparsers(help='sub-command help')
396 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
397 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
398 parser_run.set_defaults(func=do_run)
400 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
401 parser_runall.set_defaults(func=do_runall)
402 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
404 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
405 parser_format.set_defaults(func=do_format)
406 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
408 # parser_wrap = subparsers.add_parser('wrap')
409 # parser_wrap.set_defaults(func=do_wrap)
411 args = parser.parse_args()
413 if not 'func' in args:
417 log = WvTestLog(args.verbosity)
420 sys.exit(0 if log.is_success() else 1)
423 # compile-command: "make wvtool"