#!/usr/bin/env python3 # Copyright 2014 Michal Sojka # License: GPLv2+ import argparse import subprocess as sp import re import sys import os import signal import math # Regulr expression that matches potential prefixes to wvtest protocol lines re_prefix = '' class Term: reset = '\033[0m' bold = '\033[01m' disable = '\033[02m' underline = '\033[04m' reverse = '\033[07m' strikethrough = '\033[09m' invisible = '\033[08m' class fg: black = '\033[30m' red = '\033[31m' green = '\033[32m' orange = '\033[33m' blue = '\033[34m' purple = '\033[35m' cyan = '\033[36m' lightgrey = '\033[37m' darkgrey = '\033[90m' lightred = '\033[91m' lightgreen = '\033[92m' yellow = '\033[93m' lightblue = '\033[94m' pink = '\033[95m' lightcyan = '\033[96m' class bg: black = '\033[40m' red = '\033[41m' green = '\033[42m' orange = '\033[43m' blue = '\033[44m' purple = '\033[45m' cyan = '\033[46m' lightgrey = '\033[47m' def __init__(self, use_colors): def clear_colors(obj): for key in dir(obj): if key[0] == '_': continue if key in ('fg', 'bg'): clear_colors(getattr(obj, key)) continue setattr(obj, key, '') if not use_colors: clear_colors(self) if use_colors: def ioctl_GWINSZ(fd): try: import fcntl, termios, struct, os cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return return cr cr = ioctl_GWINSZ(1) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass self.width = cr[1] else: self.width = int(getattr(os.environ, 'COLUMNS', 80)) term = Term(sys.stdout.isatty() and os.environ['TERM'] != 'dumb') class WvLine: def __init__(self, match): for (key, val) in match.groupdict().items(): setattr(self, key, val) def print(self): print(str(self)) class WvPlainLine(WvLine): re = re.compile("(?P.*)") def __str__(self): return self.line class WvTestingLine(WvLine): re = re.compile('(?P' + re_prefix + ')Testing "(?P.*)" in (?P.*):$') def __str__(self): return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self) def print(self): print(term.bold + str(self) + term.reset) def asWvCheckLine(self, result): return WvCheckLine('{self.where} {self.what}'.format(self=self), result) class WvCheckLine(WvLine): re = re.compile('(?P' + re_prefix + ')!\s*(?P.*?)\s+(?P\S+)$') def __init__(self, *args): if len(args) == 1: WvLine.__init__(self, args[0]) elif len(args) == 2: self.prefix = '' self.text = args[0] self.result = args[1] else: raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments") def __str__(self): return '{self.prefix}! {self.text} {self.result}'.format(self=self) def is_success(self): return self.result == 'ok' def print(self): text = '{self.prefix}! {self.text} '.format(self=self) if self.is_success(): color = term.fg.lightgreen else: color = term.fg.lightred result = term.bold + color + self.result + term.reset lines = math.ceil(len(text) / term.width) if len(text) % term.width > term.width - 10: lines += 1 text = format(text, '.<' + str(lines * term.width - 10)) print('{text} {result}'.format(text=text, result=result)) class WvTagLine(WvLine): re = re.compile('(?P' + re_prefix + ')wvtest:\s*(?P.*)$') class WvTestLog(list): def __init__(self, verbose = False): self.checkCount = 0 self.checkFailedCount = 0 self.testCount = 0 self.testFailedCount = 0 self.currentTest = None self.currentTestFailedCount = 0 self.verbose = verbose def print(self): for entry in self: entry.print() def _newTest(self, testing): if self.currentTest: if self.currentTestFailedCount > 0: if not self.verbose: self.print() #self.currentTest.asWvCheckLine('FAILED').print() self.testFailedCount += 1 else: if not self.verbose: self.currentTest.asWvCheckLine('ok').print() self.clear() if testing != None: self.testCount += 1 self.currentTest = testing self.currentTestFailedCount = 0 def _newCheck(self, check): self.checkCount += 1 if not check.is_success(): self.checkFailedCount += 1 self.currentTestFailedCount += 1 def addLine(self, line): line = line.rstrip() logEntry = None for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]: match = lineClass.re.match(line) if match: logEntry = lineClass(match) break if not logEntry: raise Exception("Non-matched line: {}".format(line)) self.append(logEntry) def append(self, logEntry): if self.verbose: logEntry.print() if type(logEntry) == WvTestingLine: self._newTest(logEntry) elif type(logEntry) == WvCheckLine: self._newCheck(logEntry) list.append(self, logEntry) def done(self): self._newTest(None) print("WvTest: {total} test{plt}, {fail} failure{plf}." .format(total = self.testCount, plt = '' if self.testCount == 1 else 's', fail = self.testFailedCount, plf = '' if self.testFailedCount == 1 else 's')) def is_success(self): return self.testFailedCount == 0 def _run(command, log): timeout = 100 def kill_child(sig = None, frame = None): os.killpg(proc.pid, sig) def alarm(sig = None, frame = None): msg = "! {wvtool}: Alarm timed out! No test output for {timeout} seconds. FAILED" log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout)) kill_child(signal.SIGTERM) signal.signal(signal.SIGINT, kill_child) signal.signal(signal.SIGTERM, kill_child) signal.signal(signal.SIGALRM, alarm) # Popen does not seep to be able to call setpgrp(). Therefore, we # use start_new_session, but this also create a new session and # detaches the process from a terminal. This might be a problem # for programs that need a terminal to run. with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT, universal_newlines=True, start_new_session=True) as proc: signal.alarm(timeout) for line in proc.stdout: signal.alarm(timeout) log.addLine(line) signal.alarm(0) if proc.returncode != 0: if proc.returncode > 0: msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}" else: msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}" text = msg.format(wvtool=sys.argv[0], cmd=' '.join(command), ec=proc.returncode, sig=-proc.returncode) log.append(WvCheckLine(text, 'FAILED')) def do_run(args): log = WvTestLog(args.verbose) _run(args.command, log) log.done() sys.exit(0 if log.is_success() else 1) def do_runall(args): log = WvTestLog(args.verbose) for cmd in args.commands: _run([cmd], log) log.done() sys.exit(0 if log.is_success() else 1) def do_format(args): pass def do_wrap(args): pass parser = argparse.ArgumentParser(description='Versatile wvtest tool') subparsers = parser.add_subparsers(help='sub-command help') parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output') parser_run.add_argument('-v', '--verbose', action='store_true', help='Print all output produced by the command') parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run') parser_run.set_defaults(func=do_run) parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line') parser_runall.set_defaults(func=do_runall) parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run') parser_runall.add_argument('-v', '--verbose', action='store_true', help='Print all output produced by the commands') # parser_format = subparsers.add_parser('format') # parser_format.set_defaults(func=do_format) # parser_wrap = subparsers.add_parser('wrap') # parser_wrap.set_defaults(func=do_wrap) args = parser.parse_args() args.func(args)