]> rtime.felk.cvut.cz Git - omk.git/blob - tests/wvtool
Update wvtool
[omk.git] / tests / wvtool
1 #!/usr/bin/env python3
2
3 # Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
4 # License: GPLv2+
5
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
13
14 Newest version can be found at https://github.com/wentasah/wvtest.
15 """
16
17 version = "v0-7-g409306f"
18
19 import argparse
20 import subprocess as sp
21 import re
22 import sys
23 import os
24 import signal
25 import math
26
27 # Regulr expression that matches potential prefixes to wvtest protocol lines
28 re_prefix = ''
29
30 class Term:
31     class attr:
32         reset         = '\033[0m'
33         bold          = '\033[01m'
34         disable       = '\033[02m'
35         underline     = '\033[04m'
36         reverse       = '\033[07m'
37         strikethrough = '\033[09m'
38         invisible     = '\033[08m'
39     class fg:
40         black      = '\033[30m'
41         red        = '\033[31m'
42         green      = '\033[32m'
43         orange     = '\033[33m'
44         blue       = '\033[34m'
45         purple     = '\033[35m'
46         cyan       = '\033[36m'
47         lightgrey  = '\033[37m'
48         darkgrey   = '\033[90m'
49         lightred   = '\033[91m'
50         lightgreen = '\033[92m'
51         yellow     = '\033[93m'
52         lightblue  = '\033[94m'
53         pink       = '\033[95m'
54         lightcyan  = '\033[96m'
55     class bg:
56         black     = '\033[40m'
57         red       = '\033[41m'
58         green     = '\033[42m'
59         orange    = '\033[43m'
60         blue      = '\033[44m'
61         purple    = '\033[45m'
62         cyan      = '\033[46m'
63         lightgrey = '\033[47m'
64
65     progress_chars = '|/-\\'
66
67     def __init__(self):
68         if os.environ['TERM'] == 'dumb':
69             self.output = None
70         else:
71             try:
72                 self.output = open('/dev/tty', 'w')
73             except IOError:
74                 self.output = None
75
76         if not self.output:
77             self._clear_colors()
78
79         self.width = self._get_width()
80         self._enabled = True
81         self._progress_msg = ''
82         self._progress_idx = 0
83
84     def _raw_write(self, string):
85         '''Write raw data if output is enabled.'''
86         if self._enabled and self.output:
87             try:
88                 self.output.write(string)
89                 self.output.flush()
90             except IOError:
91                 self._enabled = False
92
93     def _get_width(self):
94         try:
95             import fcntl, termios, struct, os
96             s = struct.pack('HHHH', 0, 0, 0, 0)
97             x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
98             return struct.unpack('HHHH', x)[1]
99         except:
100             return int(getattr(os.environ, 'COLUMNS', 80))
101
102     def _clear_colors(self):
103         '''Sets all color and attribute memebers to empty strings'''
104         for cls in ('attr', 'fg', 'bg'):
105             c = getattr(self, cls)
106             for key in dir(c):
107                 if key[0] == '_':
108                     continue
109                 setattr(c, key, '')
110
111     def set_progress_msg(self, msg):
112         self._progress_msg = msg
113         self._progress_idx = 0
114         self.update_progress_msg()
115
116     def update_progress_msg(self):
117         self._progress_idx += 1
118         if self._progress_idx >= len(self.progress_chars):
119             self._progress_idx = 0
120         if self.output:
121             self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
122
123     def clear_progress_msg(self):
124         if self.output:
125             self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
126
127
128 term = Term()
129
130 class WvLine:
131     def __init__(self, match):
132         for (key, val) in match.groupdict().items():
133             setattr(self, key, val)
134
135     def print(self):
136         print(str(self))
137
138
139 class WvPlainLine(WvLine):
140     re = re.compile("(?P<line>.*)")
141     def __str__(self):
142         return self.line
143
144 class WvTestingLine(WvLine):
145     re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
146     def __init__(self, *args):
147         if len(args) == 1:
148             WvLine.__init__(self, args[0])
149         elif len(args) == 2:
150             self.prefix = ''
151             self.what = args[0]
152             self.where = args[1]
153         else:
154             raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
155     def __str__(self):
156         return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
157     def print(self):
158         print(term.attr.bold + str(self) + term.attr.reset)
159
160     def asWvCheckLine(self, result):
161         return WvCheckLine('{self.where}  {self.what}'.format(self=self), result)
162
163 class WvCheckLine(WvLine):
164     re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
165     def __init__(self, *args):
166         if len(args) == 1:
167             WvLine.__init__(self, args[0])
168         elif len(args) == 2:
169             self.prefix = ''
170             self.text = args[0]
171             self.result = args[1]
172         else:
173             raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
174
175     def __str__(self):
176         # Result == None when printing progress message
177         return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
178
179     def is_success(self):
180         return self.result == 'ok'
181
182     def print(self):
183         text = '{self.prefix}! {self.text} '.format(self=self)
184         if self.is_success():
185             color = term.fg.lightgreen
186         else:
187             color = term.fg.lightred
188         result = term.attr.bold + color + self.result + term.attr.reset
189
190         lines = math.ceil(len(text) / term.width)
191         if len(text) % term.width > term.width - 10:
192             lines += 1
193
194         text = format(text, '.<' + str(lines * term.width - 10))
195         print('{text} {result}'.format(text=text, result=result))
196
197 class WvTagLine(WvLine):
198     re  = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
199
200 class WvTestLog(list):
201
202     class Verbosity:
203         # Print one line for each "Testing" section. Passed tests are
204         # printed as "ok", failed tests as "FAILURE".
205         SUMMARY = 1
206
207         # Print one "ok" line for each passing "Testing" section.
208         # Failed "Testing" sections are printed verbosely.
209         NORMAL  = 2
210
211         # Print every line of the output, just
212         # reformat/syntax-highlight known lines.
213         VERBOSE = 3
214
215     def __init__(self, verbosity = Verbosity.NORMAL):
216         self.checkCount = 0
217         self.checkFailedCount = 0
218         self.testCount = 0
219         self.testFailedCount = 0
220
221         self.implicitTestTitle = None
222         self.currentTest = None
223         self.currentTestFailedCount = 0
224
225         self.verbosity = verbosity
226         self.show_progress = False
227
228     def setImplicitTestTitle (self, testing):
229         """If the test does not supply its own title as a first line of test
230         output, it this title will be used instead."""
231         self.implicitTestTitle = testing
232
233     def print(self):
234         for entry in self:
235             entry.print()
236
237     def _finishCurrentTest(self):
238         if self.currentTestFailedCount > 0:
239             if self.verbosity >= self.Verbosity.NORMAL:
240                 if self.show_progress:
241                     term.clear_progress_msg()
242                 self.print()
243             else:
244                 self.currentTest.asWvCheckLine('FAILED').print()
245             self.testFailedCount += 1
246         else:
247             if self.verbosity <= self.Verbosity.NORMAL:
248                 self.currentTest.asWvCheckLine('ok').print()
249         sys.stdout.flush()
250         self.clear()
251
252     def clear(self):
253         del self[:]
254
255     def _newTest(self, testing : WvTestingLine):
256         if self.currentTest:
257             self._finishCurrentTest()
258         if testing != None:
259             self.testCount += 1
260             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
261                 term.set_progress_msg(str(testing.asWvCheckLine(None)))
262         self.currentTest = testing
263         self.currentTestFailedCount = 0
264
265     def _newCheck(self, check : WvCheckLine):
266         self.checkCount += 1
267         if not check.is_success():
268             self.checkFailedCount += 1
269             self.currentTestFailedCount += 1
270
271     def append(self, logEntry : WvLine):
272         if self.implicitTestTitle:
273             if str(logEntry) == '':
274                 pass
275             elif type(logEntry) != WvTestingLine:
276                 self._newTest(self.implicitTestTitle)
277                 super().append(self.implicitTestTitle)
278                 self.implicitTestTitle = None
279             else:
280                 self.implicitTestTitle = None
281
282
283         if type(logEntry) == WvTestingLine:
284             self._newTest(logEntry)
285         elif type(logEntry) == WvCheckLine:
286             self._newCheck(logEntry)
287
288         list.append(self, logEntry)
289
290         if self.verbosity == self.Verbosity.VERBOSE:
291             self.print()
292             self.clear()
293         else:
294             if self.show_progress:
295                 term.update_progress_msg()
296
297     def addLine(self, line):
298         line = line.rstrip()
299         logEntry = None
300
301         for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
302             match = lineClass.re.match(line)
303             if match:
304                 logEntry = lineClass(match)
305                 break
306         if not logEntry:
307             raise Exception("Non-matched line: {}".format(line))
308
309         self.append(logEntry)
310
311     def done(self):
312         self._newTest(None)
313
314         print("WvTest: {total} test{plt}, {fail} failure{plf}."
315               .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
316                       fail = self.testFailedCount, plf = '' if self.testFailedCount  == 1 else 's'))
317     def is_success(self):
318         return self.testFailedCount == 0
319
320 def _run(command, log):
321     log.show_progress = True
322     timeout = 100
323
324     def kill_child(sig = None, frame = None):
325         os.killpg(proc.pid, sig)
326
327     def alarm(sig = None, frame = None):
328         msg = "! {wvtool}: Alarm timed out!  No test output for {timeout} seconds.  FAILED"
329         log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
330         kill_child(signal.SIGTERM)
331
332     signal.signal(signal.SIGINT, kill_child)
333     signal.signal(signal.SIGTERM, kill_child)
334     signal.signal(signal.SIGALRM, alarm)
335
336     cmd = command if isinstance(command, str) else ' '.join(command)
337     log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
338
339     # Popen does not seep to be able to call setpgrp(). Therefore, we
340     # use start_new_session, but this also create a new session and
341     # detaches the process from a terminal. This might be a problem
342     # for programs that need a terminal to run.
343     with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
344                   universal_newlines=True, start_new_session=True) as proc:
345         signal.alarm(timeout)
346         for line in proc.stdout:
347             signal.alarm(timeout)
348             log.addLine(line)
349
350     signal.alarm(0)
351
352     if proc.returncode != 0:
353         if proc.returncode > 0:
354             msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
355         else:
356             msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
357
358         text = msg.format(wvtool=sys.argv[0], cmd=cmd,
359                           ec=proc.returncode, sig=-proc.returncode)
360         log.append(WvCheckLine(text, 'FAILED'))
361
362 def do_run(args, log):
363     _run(args.command, log)
364
365 def do_runall(args, log):
366     for cmd in args.commands:
367         _run(cmd, log)
368
369 def do_format(args, log):
370     files = args.infiles
371     if len(files) == 0:
372         log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
373         for line in sys.stdin:
374             log.addLine(line)
375     else:
376         for fn in args.infiles:
377             log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
378             for line in open(fn):
379                 log.addLine(line)
380
381 def do_wrap(args, log):
382     pass
383
384 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
385
386 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
387 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
388                     const=WvTestLog.Verbosity.VERBOSE,
389                     help='Do not hide output of successful tests')
390 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
391                     const=WvTestLog.Verbosity.SUMMARY,
392                     help='''Hide output of all tests. Print just one line for each "Testing"
393                     section and report "ok" or "FAILURE" of it.''')
394 parser.add_argument('--version', action='version', version='%(prog)s '+version)
395
396 subparsers = parser.add_subparsers(help='sub-command help')
397
398 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
399 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
400 parser_run.set_defaults(func=do_run)
401
402 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
403 parser_runall.set_defaults(func=do_runall)
404 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
405
406 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
407 parser_format.set_defaults(func=do_format)
408 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
409
410 # parser_wrap = subparsers.add_parser('wrap')
411 # parser_wrap.set_defaults(func=do_wrap)
412
413 args = parser.parse_args()
414
415 if not 'func' in args:
416     parser.print_help()
417     sys.exit(1)
418
419 log = WvTestLog(args.verbosity)
420 args.func(args, log)
421 log.done()
422 sys.exit(0 if log.is_success() else 1)
423
424 # Local Variables:
425 # compile-command: "make wvtool"
426 # End: