#!/usr/bin/env python3 # Copyright 2014 Michal Sojka # License: GPLv2+ """Versatile WvTest protocol tool. It replaces wvtestrun script and provides some other useful features. Namely: - Summary mode (--summary) - Test results aligned to the same column - TODO: Conversion to HTML - TODO: Variable timeout - TODO: Checking of expected number of tests Newest version can be found at https://github.com/wentasah/wvtest. """ version = "v0-7-g409306f" import argparse import subprocess as sp import re import sys import os import signal import math # Regulr expression that matches potential prefixes to wvtest protocol lines re_prefix = '' class Term: class attr: reset = '\033[0m' bold = '\033[01m' disable = '\033[02m' underline = '\033[04m' reverse = '\033[07m' strikethrough = '\033[09m' invisible = '\033[08m' class fg: black = '\033[30m' red = '\033[31m' green = '\033[32m' orange = '\033[33m' blue = '\033[34m' purple = '\033[35m' cyan = '\033[36m' lightgrey = '\033[37m' darkgrey = '\033[90m' lightred = '\033[91m' lightgreen = '\033[92m' yellow = '\033[93m' lightblue = '\033[94m' pink = '\033[95m' lightcyan = '\033[96m' class bg: black = '\033[40m' red = '\033[41m' green = '\033[42m' orange = '\033[43m' blue = '\033[44m' purple = '\033[45m' cyan = '\033[46m' lightgrey = '\033[47m' progress_chars = '|/-\\' def __init__(self): if os.environ['TERM'] == 'dumb': self.output = None else: try: self.output = open('/dev/tty', 'w') except IOError: self.output = None if not self.output: self._clear_colors() self.width = self._get_width() self._enabled = True self._progress_msg = '' self._progress_idx = 0 def _raw_write(self, string): '''Write raw data if output is enabled.''' if self._enabled and self.output: try: self.output.write(string) self.output.flush() except IOError: self._enabled = False def _get_width(self): try: import fcntl, termios, struct, os s = struct.pack('HHHH', 0, 0, 0, 0) x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s) return struct.unpack('HHHH', x)[1] except: return int(getattr(os.environ, 'COLUMNS', 80)) def _clear_colors(self): '''Sets all color and attribute memebers to empty strings''' for cls in ('attr', 'fg', 'bg'): c = getattr(self, cls) for key in dir(c): if key[0] == '_': continue setattr(c, key, '') def set_progress_msg(self, msg): self._progress_msg = msg self._progress_idx = 0 self.update_progress_msg() def update_progress_msg(self): self._progress_idx += 1 if self._progress_idx >= len(self.progress_chars): self._progress_idx = 0 if self.output: self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r") def clear_progress_msg(self): if self.output: self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r") term = Term() class WvLine: def __init__(self, match): for (key, val) in match.groupdict().items(): setattr(self, key, val) def print(self): print(str(self)) class WvPlainLine(WvLine): re = re.compile("(?P.*)") def __str__(self): return self.line class WvTestingLine(WvLine): re = re.compile('(?P' + re_prefix + ')Testing "(?P.*)" in (?P.*):$') def __init__(self, *args): if len(args) == 1: WvLine.__init__(self, args[0]) elif len(args) == 2: self.prefix = '' self.what = args[0] self.where = args[1] else: raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments") def __str__(self): return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self) def print(self): print(term.attr.bold + str(self) + term.attr.reset) def asWvCheckLine(self, result): return WvCheckLine('{self.where} {self.what}'.format(self=self), result) class WvCheckLine(WvLine): re = re.compile('(?P' + re_prefix + ')!\s*(?P.*?)\s+(?P\S+)$') def __init__(self, *args): if len(args) == 1: WvLine.__init__(self, args[0]) elif len(args) == 2: self.prefix = '' self.text = args[0] self.result = args[1] else: raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments") def __str__(self): # Result == None when printing progress message return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or '')) def is_success(self): return self.result == 'ok' def print(self): text = '{self.prefix}! {self.text} '.format(self=self) if self.is_success(): color = term.fg.lightgreen else: color = term.fg.lightred result = term.attr.bold + color + self.result + term.attr.reset lines = math.ceil(len(text) / term.width) if len(text) % term.width > term.width - 10: lines += 1 text = format(text, '.<' + str(lines * term.width - 10)) print('{text} {result}'.format(text=text, result=result)) class WvTagLine(WvLine): re = re.compile('(?P' + re_prefix + ')wvtest:\s*(?P.*)$') class WvTestLog(list): class Verbosity: # Print one line for each "Testing" section. Passed tests are # printed as "ok", failed tests as "FAILURE". SUMMARY = 1 # Print one "ok" line for each passing "Testing" section. # Failed "Testing" sections are printed verbosely. NORMAL = 2 # Print every line of the output, just # reformat/syntax-highlight known lines. VERBOSE = 3 def __init__(self, verbosity = Verbosity.NORMAL): self.checkCount = 0 self.checkFailedCount = 0 self.testCount = 0 self.testFailedCount = 0 self.implicitTestTitle = None self.currentTest = None self.currentTestFailedCount = 0 self.verbosity = verbosity self.show_progress = False def setImplicitTestTitle (self, testing): """If the test does not supply its own title as a first line of test output, it this title will be used instead.""" self.implicitTestTitle = testing def print(self): for entry in self: entry.print() def _finishCurrentTest(self): if self.currentTestFailedCount > 0: if self.verbosity >= self.Verbosity.NORMAL: if self.show_progress: term.clear_progress_msg() self.print() else: self.currentTest.asWvCheckLine('FAILED').print() self.testFailedCount += 1 else: if self.verbosity <= self.Verbosity.NORMAL: self.currentTest.asWvCheckLine('ok').print() sys.stdout.flush() self.clear() def clear(self): del self[:] def _newTest(self, testing : WvTestingLine): if self.currentTest: self._finishCurrentTest() if testing != None: self.testCount += 1 if self.show_progress and self.verbosity < self.Verbosity.VERBOSE: term.set_progress_msg(str(testing.asWvCheckLine(None))) self.currentTest = testing self.currentTestFailedCount = 0 def _newCheck(self, check : WvCheckLine): self.checkCount += 1 if not check.is_success(): self.checkFailedCount += 1 self.currentTestFailedCount += 1 def append(self, logEntry : WvLine): if self.implicitTestTitle: if str(logEntry) == '': pass elif type(logEntry) != WvTestingLine: self._newTest(self.implicitTestTitle) super().append(self.implicitTestTitle) self.implicitTestTitle = None else: self.implicitTestTitle = None if type(logEntry) == WvTestingLine: self._newTest(logEntry) elif type(logEntry) == WvCheckLine: self._newCheck(logEntry) list.append(self, logEntry) if self.verbosity == self.Verbosity.VERBOSE: self.print() self.clear() else: if self.show_progress: term.update_progress_msg() def addLine(self, line): line = line.rstrip() logEntry = None for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]: match = lineClass.re.match(line) if match: logEntry = lineClass(match) break if not logEntry: raise Exception("Non-matched line: {}".format(line)) self.append(logEntry) def done(self): self._newTest(None) print("WvTest: {total} test{plt}, {fail} failure{plf}." .format(total = self.testCount, plt = '' if self.testCount == 1 else 's', fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's')) def is_success(self): return self.testFailedCount == 0 def _run(command, log): log.show_progress = True timeout = 100 def kill_child(sig = None, frame = None): os.killpg(proc.pid, sig) def alarm(sig = None, frame = None): msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED" log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout)) kill_child(signal.SIGTERM) signal.signal(signal.SIGINT, kill_child) signal.signal(signal.SIGTERM, kill_child) signal.signal(signal.SIGALRM, alarm) cmd = command if isinstance(command, str) else ' '.join(command) log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool")) # Popen does not seep to be able to call setpgrp(). Therefore, we # use start_new_session, but this also create a new session and # detaches the process from a terminal. This might be a problem # for programs that need a terminal to run. with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT, universal_newlines=True, start_new_session=True) as proc: signal.alarm(timeout) for line in proc.stdout: signal.alarm(timeout) log.addLine(line) signal.alarm(0) if proc.returncode != 0: if proc.returncode > 0: msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}" else: msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}" text = msg.format(wvtool=sys.argv[0], cmd=cmd, ec=proc.returncode, sig=-proc.returncode) log.append(WvCheckLine(text, 'FAILED')) def do_run(args, log): _run(args.command, log) def do_runall(args, log): for cmd in args.commands: _run(cmd, log) def do_format(args, log): files = args.infiles if len(files) == 0: log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin")) for line in sys.stdin: log.addLine(line) else: for fn in args.infiles: log.setImplicitTestTitle(WvTestingLine("Preamble", fn)) for line in open(fn): log.addLine(line) def do_wrap(args, log): pass parser = argparse.ArgumentParser(description='Versatile wvtest tool') parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=WvTestLog.Verbosity.VERBOSE, help='Do not hide output of successful tests') parser.add_argument('-s', '--summary', dest='verbosity', action='store_const', const=WvTestLog.Verbosity.SUMMARY, help='''Hide output of all tests. Print just one line for each "Testing" section and report "ok" or "FAILURE" of it.''') parser.add_argument('--version', action='version', version='%(prog)s '+version) subparsers = parser.add_subparsers(help='sub-command help') parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output') parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run') parser_run.set_defaults(func=do_run) parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line') parser_runall.set_defaults(func=do_runall) parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run') parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output') parser_format.set_defaults(func=do_format) parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output') # parser_wrap = subparsers.add_parser('wrap') # parser_wrap.set_defaults(func=do_wrap) args = parser.parse_args() if not 'func' in args: parser.print_help() sys.exit(1) log = WvTestLog(args.verbosity) args.func(args, log) log.done() sys.exit(0 if log.is_success() else 1) # Local Variables: # compile-command: "make wvtool" # End: