3 # Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
14 Newest version can be found at https://github.com/wentasah/wvtest.
17 version = "v0-7-g409306f"
20 import subprocess as sp
27 # Regulr expression that matches potential prefixes to wvtest protocol lines
35 underline = '\033[04m'
37 strikethrough = '\033[09m'
38 invisible = '\033[08m'
47 lightgrey = '\033[37m'
50 lightgreen = '\033[92m'
52 lightblue = '\033[94m'
54 lightcyan = '\033[96m'
63 lightgrey = '\033[47m'
65 progress_chars = '|/-\\'
68 if os.environ['TERM'] == 'dumb':
72 self.output = open('/dev/tty', 'w')
79 self.width = self._get_width()
81 self._progress_msg = ''
82 self._progress_idx = 0
84 def _raw_write(self, string):
85 '''Write raw data if output is enabled.'''
86 if self._enabled and self.output:
88 self.output.write(string)
95 import fcntl, termios, struct, os
96 s = struct.pack('HHHH', 0, 0, 0, 0)
97 x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
98 return struct.unpack('HHHH', x)[1]
100 return int(getattr(os.environ, 'COLUMNS', 80))
102 def _clear_colors(self):
103 '''Sets all color and attribute memebers to empty strings'''
104 for cls in ('attr', 'fg', 'bg'):
105 c = getattr(self, cls)
111 def set_progress_msg(self, msg):
112 self._progress_msg = msg
113 self._progress_idx = 0
114 self.update_progress_msg()
116 def update_progress_msg(self):
117 self._progress_idx += 1
118 if self._progress_idx >= len(self.progress_chars):
119 self._progress_idx = 0
121 self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
123 def clear_progress_msg(self):
125 self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
131 def __init__(self, match):
132 for (key, val) in match.groupdict().items():
133 setattr(self, key, val)
139 class WvPlainLine(WvLine):
140 re = re.compile("(?P<line>.*)")
144 class WvTestingLine(WvLine):
145 re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
146 def __init__(self, *args):
148 WvLine.__init__(self, args[0])
154 raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
156 return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
158 print(term.attr.bold + str(self) + term.attr.reset)
160 def asWvCheckLine(self, result):
161 return WvCheckLine('{self.where} {self.what}'.format(self=self), result)
163 class WvCheckLine(WvLine):
164 re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
165 def __init__(self, *args):
167 WvLine.__init__(self, args[0])
171 self.result = args[1]
173 raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
176 # Result == None when printing progress message
177 return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
179 def is_success(self):
180 return self.result == 'ok'
183 text = '{self.prefix}! {self.text} '.format(self=self)
184 if self.is_success():
185 color = term.fg.lightgreen
187 color = term.fg.lightred
188 result = term.attr.bold + color + self.result + term.attr.reset
190 lines = math.ceil(len(text) / term.width)
191 if len(text) % term.width > term.width - 10:
194 text = format(text, '.<' + str(lines * term.width - 10))
195 print('{text} {result}'.format(text=text, result=result))
197 class WvTagLine(WvLine):
198 re = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
200 class WvTestLog(list):
203 # Print one line for each "Testing" section. Passed tests are
204 # printed as "ok", failed tests as "FAILURE".
207 # Print one "ok" line for each passing "Testing" section.
208 # Failed "Testing" sections are printed verbosely.
211 # Print every line of the output, just
212 # reformat/syntax-highlight known lines.
215 def __init__(self, verbosity = Verbosity.NORMAL):
217 self.checkFailedCount = 0
219 self.testFailedCount = 0
221 self.implicitTestTitle = None
222 self.currentTest = None
223 self.currentTestFailedCount = 0
225 self.verbosity = verbosity
226 self.show_progress = False
228 def setImplicitTestTitle (self, testing):
229 """If the test does not supply its own title as a first line of test
230 output, it this title will be used instead."""
231 self.implicitTestTitle = testing
237 def _finishCurrentTest(self):
238 if self.currentTestFailedCount > 0:
239 if self.verbosity >= self.Verbosity.NORMAL:
240 if self.show_progress:
241 term.clear_progress_msg()
244 self.currentTest.asWvCheckLine('FAILED').print()
245 self.testFailedCount += 1
247 if self.verbosity <= self.Verbosity.NORMAL:
248 self.currentTest.asWvCheckLine('ok').print()
255 def _newTest(self, testing : WvTestingLine):
257 self._finishCurrentTest()
260 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
261 term.set_progress_msg(str(testing.asWvCheckLine(None)))
262 self.currentTest = testing
263 self.currentTestFailedCount = 0
265 def _newCheck(self, check : WvCheckLine):
267 if not check.is_success():
268 self.checkFailedCount += 1
269 self.currentTestFailedCount += 1
271 def append(self, logEntry : WvLine):
272 if self.implicitTestTitle:
273 if str(logEntry) == '':
275 elif type(logEntry) != WvTestingLine:
276 self._newTest(self.implicitTestTitle)
277 super().append(self.implicitTestTitle)
278 self.implicitTestTitle = None
280 self.implicitTestTitle = None
283 if type(logEntry) == WvTestingLine:
284 self._newTest(logEntry)
285 elif type(logEntry) == WvCheckLine:
286 self._newCheck(logEntry)
288 list.append(self, logEntry)
290 if self.verbosity == self.Verbosity.VERBOSE:
294 if self.show_progress:
295 term.update_progress_msg()
297 def addLine(self, line):
301 for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
302 match = lineClass.re.match(line)
304 logEntry = lineClass(match)
307 raise Exception("Non-matched line: {}".format(line))
309 self.append(logEntry)
314 print("WvTest: {total} test{plt}, {fail} failure{plf}."
315 .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
316 fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's'))
317 def is_success(self):
318 return self.testFailedCount == 0
320 def _run(command, log):
321 log.show_progress = True
324 def kill_child(sig = None, frame = None):
325 os.killpg(proc.pid, sig)
327 def alarm(sig = None, frame = None):
328 msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED"
329 log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
330 kill_child(signal.SIGTERM)
332 signal.signal(signal.SIGINT, kill_child)
333 signal.signal(signal.SIGTERM, kill_child)
334 signal.signal(signal.SIGALRM, alarm)
336 cmd = command if isinstance(command, str) else ' '.join(command)
337 log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
339 # Popen does not seep to be able to call setpgrp(). Therefore, we
340 # use start_new_session, but this also create a new session and
341 # detaches the process from a terminal. This might be a problem
342 # for programs that need a terminal to run.
343 with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
344 universal_newlines=True, start_new_session=True) as proc:
345 signal.alarm(timeout)
346 for line in proc.stdout:
347 signal.alarm(timeout)
352 if proc.returncode != 0:
353 if proc.returncode > 0:
354 msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
356 msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
358 text = msg.format(wvtool=sys.argv[0], cmd=cmd,
359 ec=proc.returncode, sig=-proc.returncode)
360 log.append(WvCheckLine(text, 'FAILED'))
362 def do_run(args, log):
363 _run(args.command, log)
365 def do_runall(args, log):
366 for cmd in args.commands:
369 def do_format(args, log):
372 log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
373 for line in sys.stdin:
376 for fn in args.infiles:
377 log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
378 for line in open(fn):
381 def do_wrap(args, log):
384 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
386 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
387 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
388 const=WvTestLog.Verbosity.VERBOSE,
389 help='Do not hide output of successful tests')
390 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
391 const=WvTestLog.Verbosity.SUMMARY,
392 help='''Hide output of all tests. Print just one line for each "Testing"
393 section and report "ok" or "FAILURE" of it.''')
394 parser.add_argument('--version', action='version', version='%(prog)s '+version)
396 subparsers = parser.add_subparsers(help='sub-command help')
398 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
399 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
400 parser_run.set_defaults(func=do_run)
402 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
403 parser_runall.set_defaults(func=do_runall)
404 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
406 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
407 parser_format.set_defaults(func=do_format)
408 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
410 # parser_wrap = subparsers.add_parser('wrap')
411 # parser_wrap.set_defaults(func=do_wrap)
413 args = parser.parse_args()
415 if not 'func' in args:
419 log = WvTestLog(args.verbosity)
422 sys.exit(0 if log.is_success() else 1)
425 # compile-command: "make wvtool"