From 08580011d635b97a22b3c00880d291b216349a44 Mon Sep 17 00:00:00 2001 From: Michal Sojka Date: Tue, 4 Nov 2014 15:54:45 +0100 Subject: [PATCH] Add wvtool --- wvtool | 306 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 306 insertions(+) create mode 100755 wvtool diff --git a/wvtool b/wvtool new file mode 100755 index 0000000..770bbd3 --- /dev/null +++ b/wvtool @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 + +# Copyright 2014 Michal Sojka +# License: GPLv2+ + +import argparse +import subprocess as sp +import re +import sys +import os +import signal +import math + +# Regulr expression that matches potential prefixes to wvtest protocol lines +re_prefix = '' + +class Term: + reset = '\033[0m' + bold = '\033[01m' + disable = '\033[02m' + underline = '\033[04m' + reverse = '\033[07m' + strikethrough = '\033[09m' + invisible = '\033[08m' + class fg: + black = '\033[30m' + red = '\033[31m' + green = '\033[32m' + orange = '\033[33m' + blue = '\033[34m' + purple = '\033[35m' + cyan = '\033[36m' + lightgrey = '\033[37m' + darkgrey = '\033[90m' + lightred = '\033[91m' + lightgreen = '\033[92m' + yellow = '\033[93m' + lightblue = '\033[94m' + pink = '\033[95m' + lightcyan = '\033[96m' + class bg: + black = '\033[40m' + red = '\033[41m' + green = '\033[42m' + orange = '\033[43m' + blue = '\033[44m' + purple = '\033[45m' + cyan = '\033[46m' + lightgrey = '\033[47m' + + def __init__(self, use_colors): + def clear_colors(obj): + for key in dir(obj): + if key[0] == '_': + continue + if key in ('fg', 'bg'): + clear_colors(getattr(obj, key)) + continue + setattr(obj, key, '') + + if not use_colors: + clear_colors(self) + + if use_colors: + def ioctl_GWINSZ(fd): + try: + import fcntl, termios, struct, os + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) + except: + return + return cr + cr = ioctl_GWINSZ(1) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + self.width = cr[1] + else: + self.width = int(getattr(os.environ, 'COLUMNS', 80)) + +term = Term(sys.stdout.isatty() and os.environ['TERM'] != 'dumb') + +class WvLine: + def __init__(self, match): + for (key, val) in match.groupdict().items(): + setattr(self, key, val) + + def print(self): + print(str(self)) + + +class WvPlainLine(WvLine): + re = re.compile("(?P.*)") + def __str__(self): + return self.line + +class WvTestingLine(WvLine): + re = re.compile('(?P' + re_prefix + ')Testing "(?P.*)" in (?P.*):$') + def __str__(self): + return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self) + def print(self): + print(term.bold + str(self) + term.reset) + + def asWvCheckLine(self, result): + return WvCheckLine('{self.where} {self.what}'.format(self=self), result) + +class WvCheckLine(WvLine): + re = re.compile('(?P' + re_prefix + ')!\s*(?P.*?)\s+(?P\S+)$') + def __init__(self, *args): + if len(args) == 1: + WvLine.__init__(self, args[0]) + elif len(args) == 2: + self.prefix = '' + self.text = args[0] + self.result = args[1] + else: + raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments") + + def __str__(self): + return '{self.prefix}! {self.text} {self.result}'.format(self=self) + + def is_success(self): + return self.result == 'ok' + + def print(self): + text = '{self.prefix}! {self.text} '.format(self=self) + if self.is_success(): + color = term.fg.lightgreen + else: + color = term.fg.lightred + result = term.bold + color + self.result + term.reset + + lines = math.ceil(len(text) / term.width) + if len(text) % term.width > term.width - 10: + lines += 1 + + text = format(text, '.<' + str(lines * term.width - 10)) + print('{text} {result}'.format(text=text, result=result)) + +class WvTagLine(WvLine): + re = re.compile('(?P' + re_prefix + ')wvtest:\s*(?P.*)$') + +class WvTestLog(list): + def __init__(self, verbose = False): + self.checkCount = 0 + self.checkFailedCount = 0 + self.testCount = 0 + self.testFailedCount = 0 + + self.currentTest = None + self.currentTestFailedCount = 0 + + self.verbose = verbose + + def print(self): + for entry in self: + entry.print() + + def _newTest(self, testing): + if self.currentTest: + if self.currentTestFailedCount > 0: + if not self.verbose: + self.print() + #self.currentTest.asWvCheckLine('FAILED').print() + self.testFailedCount += 1 + else: + if not self.verbose: + self.currentTest.asWvCheckLine('ok').print() + self.clear() + + if testing != None: + self.testCount += 1 + self.currentTest = testing + self.currentTestFailedCount = 0 + + def _newCheck(self, check): + self.checkCount += 1 + if not check.is_success(): + self.checkFailedCount += 1 + self.currentTestFailedCount += 1 + + def addLine(self, line): + line = line.rstrip() + logEntry = None + + for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]: + match = lineClass.re.match(line) + if match: + logEntry = lineClass(match) + break + if not logEntry: + raise Exception("Non-matched line: {}".format(line)) + + self.append(logEntry) + + def append(self, logEntry): + if self.verbose: + logEntry.print() + + if type(logEntry) == WvTestingLine: + self._newTest(logEntry) + elif type(logEntry) == WvCheckLine: + self._newCheck(logEntry) + list.append(self, logEntry) + + def done(self): + self._newTest(None) + + print("WvTest: {total} test{plt}, {fail} failure{plf}." + .format(total = self.testCount, plt = '' if self.testCount == 1 else 's', + fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's')) + def is_success(self): + return self.testFailedCount == 0 + +def _run(command, log): + timeout = 100 + + def kill_child(sig = None, frame = None): + os.killpg(proc.pid, sig) + + def alarm(sig = None, frame = None): + msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED" + log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout)) + kill_child(signal.SIGTERM) + + signal.signal(signal.SIGINT, kill_child) + signal.signal(signal.SIGTERM, kill_child) + signal.signal(signal.SIGALRM, alarm) + + # Popen does not seep to be able to call setpgrp(). Therefore, we + # use start_new_session, but this also create a new session and + # detaches the process from a terminal. This might be a problem + # for programs that need a terminal to run. + with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT, + universal_newlines=True, start_new_session=True) as proc: + signal.alarm(timeout) + for line in proc.stdout: + signal.alarm(timeout) + log.addLine(line) + + signal.alarm(0) + + if proc.returncode != 0: + if proc.returncode > 0: + msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}" + else: + msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}" + + text = msg.format(wvtool=sys.argv[0], cmd=' '.join(command), + ec=proc.returncode, sig=-proc.returncode) + log.append(WvCheckLine(text, 'FAILED')) + +def do_run(args): + log = WvTestLog(args.verbose) + + _run(args.command, log) + + log.done() + sys.exit(0 if log.is_success() else 1) + +def do_runall(args): + log = WvTestLog(args.verbose) + + for cmd in args.commands: + _run([cmd], log) + + log.done() + sys.exit(0 if log.is_success() else 1) + +def do_format(args): + pass + +def do_wrap(args): + pass + +parser = argparse.ArgumentParser(description='Versatile wvtest tool') +subparsers = parser.add_subparsers(help='sub-command help') + +parser_run = subparsers.add_parser('run', + help='Run and supervise a command producing wvtest output') +parser_run.add_argument('-v', '--verbose', action='store_true', + help='Print all output produced by the command') +parser_run.add_argument('command', nargs=argparse.REMAINDER, + help='Command to run') +parser_run.set_defaults(func=do_run) + +parser_runall = subparsers.add_parser('runall', + help='Run multiple scripts/binaries mentioned on command line') +parser_runall.set_defaults(func=do_runall) +parser_runall.add_argument('commands', nargs='+', + help='Scripts/binaries to run') +parser_runall.add_argument('-v', '--verbose', action='store_true', + help='Print all output produced by the commands') + + +# parser_format = subparsers.add_parser('format') +# parser_format.set_defaults(func=do_format) + +# parser_wrap = subparsers.add_parser('wrap') +# parser_wrap.set_defaults(func=do_wrap) + +args = parser.parse_args() +args.func(args) -- 2.39.2