]> rtime.felk.cvut.cz Git - omk.git/blob - tests/wvtool
fe9098b1a96ff9f1f09a3ffe093336e09577755a
[omk.git] / tests / wvtool
1 #!/usr/bin/env python3
2
3 # Copyright 2014 Michal Sojka <sojkam1@fel.cvut.cz>
4 # License: GPLv2+
5
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
13
14 Newest version can be found at https://github.com/wentasah/wvtest.
15 """
16
17 version = "v0-6-gca4c750"
18
19 import argparse
20 import subprocess as sp
21 import re
22 import sys
23 import os
24 import signal
25 import math
26
27 # Regulr expression that matches potential prefixes to wvtest protocol lines
28 re_prefix = ''
29
30 class Term:
31     class attr:
32         reset         = '\033[0m'
33         bold          = '\033[01m'
34         disable       = '\033[02m'
35         underline     = '\033[04m'
36         reverse       = '\033[07m'
37         strikethrough = '\033[09m'
38         invisible     = '\033[08m'
39     class fg:
40         black      = '\033[30m'
41         red        = '\033[31m'
42         green      = '\033[32m'
43         orange     = '\033[33m'
44         blue       = '\033[34m'
45         purple     = '\033[35m'
46         cyan       = '\033[36m'
47         lightgrey  = '\033[37m'
48         darkgrey   = '\033[90m'
49         lightred   = '\033[91m'
50         lightgreen = '\033[92m'
51         yellow     = '\033[93m'
52         lightblue  = '\033[94m'
53         pink       = '\033[95m'
54         lightcyan  = '\033[96m'
55     class bg:
56         black     = '\033[40m'
57         red       = '\033[41m'
58         green     = '\033[42m'
59         orange    = '\033[43m'
60         blue      = '\033[44m'
61         purple    = '\033[45m'
62         cyan      = '\033[46m'
63         lightgrey = '\033[47m'
64
65     progress_chars = '|/-\\'
66
67     def __init__(self):
68         if os.environ['TERM'] == 'dumb':
69             self.output = None
70         else:
71             try:
72                 self.output = open('/dev/tty', 'w')
73             except IOError:
74                 self.output = None
75
76         if not self.output:
77             self._clear_colors()
78
79         self.width = self._get_width()
80         self._enabled = True
81
82     def _raw_write(self, string):
83         '''Write raw data if output is enabled.'''
84         if self._enabled and self.output:
85             try:
86                 self.output.write(string)
87                 self.output.flush()
88             except IOError:
89                 self._enabled = False
90
91     def _get_width(self):
92         try:
93             import fcntl, termios, struct, os
94             s = struct.pack('HHHH', 0, 0, 0, 0)
95             x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
96             return struct.unpack('HHHH', x)[1]
97         except:
98             return int(getattr(os.environ, 'COLUMNS', 80))
99
100     def _clear_colors(self):
101         '''Sets all color and attribute memebers to empty strings'''
102         for cls in ('attr', 'fg', 'bg'):
103             c = getattr(self, cls)
104             for key in dir(c):
105                 if key[0] == '_':
106                     continue
107                 setattr(c, key, '')
108
109     def set_progress_msg(self, msg):
110         self._progress_msg = msg
111         self._progress_idx = 0
112         self.update_progress_msg()
113
114     def update_progress_msg(self):
115         self._progress_idx += 1
116         if self._progress_idx >= len(self.progress_chars):
117             self._progress_idx = 0
118         if self.output:
119             self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
120
121     def clear_progress_msg(self):
122         if self.output:
123             self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
124
125
126 term = Term()
127
128 class WvLine:
129     def __init__(self, match):
130         for (key, val) in match.groupdict().items():
131             setattr(self, key, val)
132
133     def print(self):
134         print(str(self))
135
136
137 class WvPlainLine(WvLine):
138     re = re.compile("(?P<line>.*)")
139     def __str__(self):
140         return self.line
141
142 class WvTestingLine(WvLine):
143     re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
144     def __init__(self, *args):
145         if len(args) == 1:
146             WvLine.__init__(self, args[0])
147         elif len(args) == 2:
148             self.prefix = ''
149             self.what = args[0]
150             self.where = args[1]
151         else:
152             raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
153     def __str__(self):
154         return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
155     def print(self):
156         print(term.attr.bold + str(self) + term.attr.reset)
157
158     def asWvCheckLine(self, result):
159         return WvCheckLine('{self.where}  {self.what}'.format(self=self), result)
160
161 class WvCheckLine(WvLine):
162     re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
163     def __init__(self, *args):
164         if len(args) == 1:
165             WvLine.__init__(self, args[0])
166         elif len(args) == 2:
167             self.prefix = ''
168             self.text = args[0]
169             self.result = args[1]
170         else:
171             raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
172
173     def __str__(self):
174         # Result == None when printing progress message
175         return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
176
177     def is_success(self):
178         return self.result == 'ok'
179
180     def print(self):
181         text = '{self.prefix}! {self.text} '.format(self=self)
182         if self.is_success():
183             color = term.fg.lightgreen
184         else:
185             color = term.fg.lightred
186         result = term.attr.bold + color + self.result + term.attr.reset
187
188         lines = math.ceil(len(text) / term.width)
189         if len(text) % term.width > term.width - 10:
190             lines += 1
191
192         text = format(text, '.<' + str(lines * term.width - 10))
193         print('{text} {result}'.format(text=text, result=result))
194
195 class WvTagLine(WvLine):
196     re  = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
197
198 class WvTestLog(list):
199
200     class Verbosity:
201         # Print one line for each "Testing" section. Passed tests are
202         # printed as "ok", failed tests as "FAILURE".
203         SUMMARY = 1
204
205         # Print one "ok" line for each passing "Testing" section.
206         # Failed "Testing" sections are printed verbosely.
207         NORMAL  = 2
208
209         # Print every line of the output, just
210         # reformat/syntax-highlight known lines.
211         VERBOSE = 3
212
213     def __init__(self, verbosity = Verbosity.NORMAL):
214         self.checkCount = 0
215         self.checkFailedCount = 0
216         self.testCount = 0
217         self.testFailedCount = 0
218
219         self.implicitTestTitle = None
220         self.currentTest = None
221         self.currentTestFailedCount = 0
222
223         self.verbosity = verbosity
224         self.show_progress = False
225
226     def setImplicitTestTitle (self, testing):
227         """If the test does not supply its own title as a first line of test
228         output, it this title will be used instead."""
229         self.implicitTestTitle = testing
230
231     def print(self):
232         for entry in self:
233             entry.print()
234
235     def _finishCurrentTest(self):
236         if self.currentTestFailedCount > 0:
237             if self.verbosity >= self.Verbosity.NORMAL:
238                 self.print()
239             else:
240                 self.currentTest.asWvCheckLine('FAILED').print()
241             self.testFailedCount += 1
242         else:
243             if self.verbosity <= self.Verbosity.NORMAL:
244                 self.currentTest.asWvCheckLine('ok').print()
245         sys.stdout.flush()
246         self.clear()
247
248     def clear(self):
249         del self[:]
250
251     def _newTest(self, testing : WvTestingLine):
252         if self.currentTest:
253             self._finishCurrentTest()
254         if testing != None:
255             self.testCount += 1
256             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
257                 term.set_progress_msg(str(testing.asWvCheckLine(None)))
258         self.currentTest = testing
259         self.currentTestFailedCount = 0
260
261     def _newCheck(self, check : WvCheckLine):
262         self.checkCount += 1
263         if not check.is_success():
264             self.checkFailedCount += 1
265             self.currentTestFailedCount += 1
266
267     def append(self, logEntry : WvLine):
268         if self.implicitTestTitle:
269             if str(logEntry) == '':
270                 pass
271             elif type(logEntry) != WvTestingLine:
272                 self._newTest(self.implicitTestTitle)
273                 super().append(self.implicitTestTitle)
274                 self.implicitTestTitle = None
275             else:
276                 self.implicitTestTitle = None
277
278
279         if type(logEntry) == WvTestingLine:
280             self._newTest(logEntry)
281         elif type(logEntry) == WvCheckLine:
282             self._newCheck(logEntry)
283
284         list.append(self, logEntry)
285
286         if self.verbosity == self.Verbosity.VERBOSE:
287             if self.show_progress:
288                 term.clear_progress_msg()
289             self.print()
290             self.clear()
291         else:
292             if self.show_progress:
293                 term.update_progress_msg()
294
295     def addLine(self, line):
296         line = line.rstrip()
297         logEntry = None
298
299         for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
300             match = lineClass.re.match(line)
301             if match:
302                 logEntry = lineClass(match)
303                 break
304         if not logEntry:
305             raise Exception("Non-matched line: {}".format(line))
306
307         self.append(logEntry)
308
309     def done(self):
310         self._newTest(None)
311
312         print("WvTest: {total} test{plt}, {fail} failure{plf}."
313               .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
314                       fail = self.testFailedCount, plf = '' if self.testFailedCount  == 1 else 's'))
315     def is_success(self):
316         return self.testFailedCount == 0
317
318 def _run(command, log):
319     log.show_progress = True
320     timeout = 100
321
322     def kill_child(sig = None, frame = None):
323         os.killpg(proc.pid, sig)
324
325     def alarm(sig = None, frame = None):
326         msg = "! {wvtool}: Alarm timed out!  No test output for {timeout} seconds.  FAILED"
327         log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
328         kill_child(signal.SIGTERM)
329
330     signal.signal(signal.SIGINT, kill_child)
331     signal.signal(signal.SIGTERM, kill_child)
332     signal.signal(signal.SIGALRM, alarm)
333
334     cmd = command if isinstance(command, str) else ' '.join(command)
335     log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
336
337     # Popen does not seep to be able to call setpgrp(). Therefore, we
338     # use start_new_session, but this also create a new session and
339     # detaches the process from a terminal. This might be a problem
340     # for programs that need a terminal to run.
341     with sp.Popen(command, stdout=sp.PIPE, stderr=sp.STDOUT,
342                   universal_newlines=True, start_new_session=True) as proc:
343         signal.alarm(timeout)
344         for line in proc.stdout:
345             signal.alarm(timeout)
346             log.addLine(line)
347
348     signal.alarm(0)
349
350     if proc.returncode != 0:
351         if proc.returncode > 0:
352             msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
353         else:
354             msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
355
356         text = msg.format(wvtool=sys.argv[0], cmd=cmd,
357                           ec=proc.returncode, sig=-proc.returncode)
358         log.append(WvCheckLine(text, 'FAILED'))
359
360 def do_run(args, log):
361     _run(args.command, log)
362
363 def do_runall(args, log):
364     for cmd in args.commands:
365         _run(cmd, log)
366
367 def do_format(args, log):
368     files = args.infiles
369     if len(files) == 0:
370         log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
371         for line in sys.stdin:
372             log.addLine(line)
373     else:
374         for fn in args.infiles:
375             log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
376             for line in open(fn):
377                 log.addLine(line)
378
379 def do_wrap(args, log):
380     pass
381
382 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
383
384 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
385 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
386                     const=WvTestLog.Verbosity.VERBOSE,
387                     help='Do not hide output of successful tests')
388 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
389                     const=WvTestLog.Verbosity.SUMMARY,
390                     help='''Hide output of all tests. Print just one line for each "Testing"
391                     section and report "ok" or "FAILURE" of it.''')
392 parser.add_argument('--version', action='version', version='%(prog)s '+version)
393
394 subparsers = parser.add_subparsers(help='sub-command help')
395
396 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
397 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
398 parser_run.set_defaults(func=do_run)
399
400 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
401 parser_runall.set_defaults(func=do_runall)
402 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
403
404 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
405 parser_format.set_defaults(func=do_format)
406 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
407
408 # parser_wrap = subparsers.add_parser('wrap')
409 # parser_wrap.set_defaults(func=do_wrap)
410
411 args = parser.parse_args()
412
413 if not 'func' in args:
414     parser.print_help()
415     sys.exit(1)
416
417 log = WvTestLog(args.verbosity)
418 args.func(args, log)
419 log.done()
420 sys.exit(0 if log.is_success() else 1)
421
422 # Local Variables:
423 # compile-command: "make wvtool"
424 # End: