#!/usr/bin/env python3 # Copyright 2014 Michal Sojka # License: GPLv2+ """Versatile WvTest protocol tool. It replaces wvtestrun script and provides some other useful features. Namely: - Summary mode (--summary) - Test results aligned to the same column - FIXME: No "progress" reporting - TODO: Conversion to HTML - TODO: Variable timeout - TODO: Checking of expected number of tests """ import argparse import subprocess as sp import re import sys import os import signal import math # Regulr expression that matches potential prefixes to wvtest protocol lines re_prefix = '' class Term: reset = '\033[0m' bold = '\033[01m' disable = '\033[02m' underline = '\033[04m' reverse = '\033[07m' strikethrough = '\033[09m' invisible = '\033[08m' class fg: black = '\033[30m' red = '\033[31m' green = '\033[32m' orange = '\033[33m' blue = '\033[34m' purple = '\033[35m' cyan = '\033[36m' lightgrey = '\033[37m' darkgrey = '\033[90m' lightred = '\033[91m' lightgreen = '\033[92m' yellow = '\033[93m' lightblue = '\033[94m' pink = '\033[95m' lightcyan = '\033[96m' class bg: black = '\033[40m' red = '\033[41m' green = '\033[42m' orange = '\033[43m' blue = '\033[44m' purple = '\033[45m' cyan = '\033[46m' lightgrey = '\033[47m' def __init__(self, use_colors): def clear_colors(obj): for key in dir(obj): if key[0] == '_': continue if key in ('fg', 'bg'): clear_colors(getattr(obj, key)) continue setattr(obj, key, '') if not use_colors: clear_colors(self) if use_colors: def ioctl_GWINSZ(fd): try: import fcntl, termios, struct, os cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return return cr cr = ioctl_GWINSZ(1) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass self.width = cr[1] else: self.width = int(getattr(os.environ, 'COLUMNS', 80)) term = Term(sys.stdout.isatty() and os.environ['TERM'] != 'dumb') class WvLine: def __init__(self, match): for (key, val) in match.groupdict().items(): setattr(self, key, val) def print(self): print(str(self)) class WvPlainLine(WvLine): re = re.compile("(?P.*)") def __str__(self): return self.line class WvTestingLine(WvLine): re = re.compile('(?P' + re_prefix + ')Testing "(?P.*)" in (?P.*):$') def __init__(self, *args): if len(args) == 1: WvLine.__init__(self, args[0]) elif len(args) == 2: self.prefix = '' self.what = args[0] self.where = args[1] else: raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments") def __str__(self): return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self) def print(self): print(term.bold + str(self) + term.reset) def asWvCheckLine(self, result): return WvCheckLine('{self.where} {self.what}'.format(self=self), result) class WvCheckLine(WvLine): re = re.compile('(?P' + re_prefix + ')!\s*(?P.*?)\s+(?P\S+)$') def __init__(self, *args): if len(args) == 1: WvLine.__init__(self, args[0]) elif len(args) == 2: self.prefix = '' self.text = args[0] self.result = args[1] else: raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments") def __str__(self): return '{self.prefix}! {self.text} {self.result}'.format(self=self) def is_success(self): return self.result == 'ok' def print(self): text = '{self.prefix}! {self.text} '.format(self=self) if self.is_success(): color = term.fg.lightgreen else: color = term.fg.lightred result = term.bold + color + self.result + term.reset lines = math.ceil(len(text) / term.width) if len(text) % term.width > term.width - 10: lines += 1 text = format(text, '.<' + str(lines * term.width - 10)) print('{text} {result}'.format(text=text, result=result)) class WvTagLine(WvLine): re = re.compile('(?P' + re_prefix + ')wvtest:\s*(?P.*)$') class WvTestLog(list): class Verbosity: # Print one line for each "Testing" section. Passed tests are # printed as "ok", failed tests as "FAILURE". SUMMARY = 1 # Print one "ok" line for each passing "Testing" section. # Failed "Testing" sections are printed verbosely. NORMAL = 2 # Print every line of the output, just # reformat/syntax-highlight known lines. VERBOSE = 3 def __init__(self, verbosity = Verbosity.NORMAL): self.checkCount = 0 self.checkFailedCount = 0 self.testCount = 0 self.testFailedCount = 0 self.implicitTestTitle = None self.currentTest = None self.currentTestFailedCount = 0 self.verbosity = verbosity def setImplicitTestTitle (self, testing): """If the test does not supply its own title as a first line of test output, it this title will be used instead.""" self.implicitTestTitle = testing def print(self): for entry in self: entry.print() def _finishCurrentTest(self): if self.currentTestFailedCount > 0: if self.verbosity >= self.Verbosity.NORMAL: self.print() else: self.currentTest.asWvCheckLine('FAILED').print() self.testFailedCount += 1 else: if self.verbosity <= self.Verbosity.NORMAL: self.currentTest.asWvCheckLine('ok').print() self.clear() def _newTest(self, testing): if self.currentTest: self._finishCurrentTest() if testing != None: self.testCount += 1 self.currentTest = testing self.currentTestFailedCount = 0 def _newCheck(self, check): self.checkCount += 1 if not check.is_success(): self.checkFailedCount += 1 self.currentTestFailedCount += 1 def append(self, logEntry): if self.implicitTestTitle and type(logEntry) != WvTestingLine: self._newTest(self.implicitTestTitle) super().append(self.implicitTestTitle) self.implicitTestTitle = None if type(logEntry) == WvTestingLine: self._newTest(logEntry) elif type(logEntry) == WvCheckLine: self._newCheck(logEntry) list.append(self, logEntry) if self.verbosity == self.Verbosity.VERBOSE: self.print() self.clear() def addLine(self, line): line = line.rstrip() logEntry = None for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]: match = lineClass.re.match(line) if match: logEntry = lineClass(match) break if not logEntry: raise Exception("Non-matched line: {}".format(line)) self.append(logEntry) def done(self): self._newTest(None) print("WvTest: {total} test{plt}, {fail} failure{plf}." .format(total = self.testCount, plt = '' if self.testCount == 1 else 's', fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's')) def is_success(self): return self.testFailedCount == 0 def _run(command, log): timeout = 100 def kill_child(sig = None, frame = None): os.killpg(proc.pid, sig) def alarm(sig = None, frame = None): msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED" log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout)) kill_child(signal.SIGTERM) signal.signal(signal.SIGINT, kill_child) signal.signal(signal.SIGTERM, kill_child) signal.signal(signal.SIGALRM, alarm) cmd = command if isinstance(command, str) else ' '.join(command) log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool")) # Popen does not seep to be able to call setpgrp(). Therefore, we # use start_new_session, but this also create a new session and # detaches the process from a terminal. This might be a problem # for programs that need a terminal to run. with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT, universal_newlines=True, start_new_session=True) as proc: signal.alarm(timeout) for line in proc.stdout: signal.alarm(timeout) log.addLine(line) signal.alarm(0) if proc.returncode != 0: if proc.returncode > 0: msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}" else: msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}" text = msg.format(wvtool=sys.argv[0], cmd=cmd, ec=proc.returncode, sig=-proc.returncode) log.append(WvCheckLine(text, 'FAILED')) def do_run(args, log): _run(args.command, log) def do_runall(args, log): for cmd in args.commands: _run(cmd, log) def do_format(args, log): files = args.infiles if len(files) == 0: log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin")) for line in sys.stdin: log.addLine(line) else: for fn in args.infiles: log.setImplicitTestTitle(WvTestingLine("Preamble", fn)) for line in open(fn): log.addLine(line) def do_wrap(args, log): pass parser = argparse.ArgumentParser(description='Versatile wvtest tool') parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL) parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const', const=WvTestLog.Verbosity.VERBOSE, help='Do not hide output of successful tests') parser.add_argument('-s', '--summary', dest='verbosity', action='store_const', const=WvTestLog.Verbosity.SUMMARY, help='''Hide output of all tests. Print just one line for each "Testing" section and report "ok" or "FAILURE" of it.''') subparsers = parser.add_subparsers(help='sub-command help') parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output') parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run') parser_run.set_defaults(func=do_run) parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line') parser_runall.set_defaults(func=do_runall) parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run') parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protcol output') parser_format.set_defaults(func=do_format) parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output') # parser_wrap = subparsers.add_parser('wrap') # parser_wrap.set_defaults(func=do_wrap) args = parser.parse_args() log = WvTestLog(args.verbosity) args.func(args, log) log.done() sys.exit(0 if log.is_success() else 1)