]> rtime.felk.cvut.cz Git - novaboot.git/blob - tests/wvtool
server: Unify installation target to PREFIX
[novaboot.git] / tests / wvtool
1 #!/usr/bin/env python3
2
3 # Copyright 2014, 2015, 2017 Michal Sojka <sojkam1@fel.cvut.cz>
4 # License: GPLv2+
5
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - TODO: Conversion to HTML
11 - TODO: Variable timeout
12 - TODO: Checking of expected number of tests
13
14 Newest version can be found at https://github.com/wentasah/wvtest.
15 """
16
17 version = "v0-28-g299b716"
18
19 import argparse
20 import subprocess as sp
21 import re
22 import sys
23 import os
24 import signal
25 import math
26 import io
27 import datetime
28 import time
29
30 # Regulr expression that matches potential prefixes to wvtest protocol lines
31 re_prefix = ''
32
33 class Term:
34     class attr:
35         reset         = '\033[0m'
36         bold          = '\033[01m'
37         disable       = '\033[02m'
38         underline     = '\033[04m'
39         reverse       = '\033[07m'
40         strikethrough = '\033[09m'
41         invisible     = '\033[08m'
42     class fg:
43         black      = '\033[30m'
44         red        = '\033[31m'
45         green      = '\033[32m'
46         orange     = '\033[33m'
47         blue       = '\033[34m'
48         purple     = '\033[35m'
49         cyan       = '\033[36m'
50         lightgrey  = '\033[37m'
51         darkgrey   = '\033[90m'
52         lightred   = '\033[91m'
53         lightgreen = '\033[92m'
54         yellow     = '\033[93m'
55         lightblue  = '\033[94m'
56         pink       = '\033[95m'
57         lightcyan  = '\033[96m'
58     class bg:
59         black     = '\033[40m'
60         red       = '\033[41m'
61         green     = '\033[42m'
62         orange    = '\033[43m'
63         blue      = '\033[44m'
64         purple    = '\033[45m'
65         cyan      = '\033[46m'
66         lightgrey = '\033[47m'
67
68     progress_chars = '|/-\\'
69
70     def __init__(self):
71         if os.environ['TERM'] == 'dumb':
72             self.output = None
73         else:
74             try:
75                 self.output = open('/dev/tty', 'w')
76             except IOError:
77                 self.output = None
78
79         if not self.output:
80             self._clear_colors()
81
82         self.width = self._get_width()
83         self._enabled = True
84         self._progress_msg = ''
85         self._progress_idx = 0
86
87     def _raw_write(self, string):
88         '''Write raw data if output is enabled.'''
89         if self._enabled and self.output:
90             try:
91                 self.output.write(string)
92                 self.output.flush()
93             except IOError:
94                 self._enabled = False
95
96     def _get_width(self):
97         try:
98             import fcntl, termios, struct, os
99             s = struct.pack('HHHH', 0, 0, 0, 0)
100             x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
101             return struct.unpack('HHHH', x)[1]
102         except:
103             return int(getattr(os.environ, 'COLUMNS', 80))
104
105     def _clear_colors(self):
106         '''Sets all color and attribute memebers to empty strings'''
107         for cls in ('attr', 'fg', 'bg'):
108             c = getattr(self, cls)
109             for key in dir(c):
110                 if key[0] == '_':
111                     continue
112                 setattr(c, key, '')
113
114     def set_progress_msg(self, msg):
115         self._progress_msg = msg
116         self._progress_idx = 0
117         self.update_progress_msg()
118
119     def update_progress_msg(self):
120         self._progress_idx += 1
121         if self._progress_idx >= len(self.progress_chars):
122             self._progress_idx = 0
123         if self.output:
124             self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
125
126     def clear_progress_msg(self):
127         if self.output:
128             self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
129
130
131 term = Term()
132
133 class WvLine:
134     def __init__(self, match):
135         for (key, val) in match.groupdict().items():
136             setattr(self, key, val)
137
138     def print(self, file=sys.stdout):
139         "Print the line (terminal is expected on output)"
140         print(str(self), file=file)
141
142     def log(self, file=sys.stdout):
143         "Print the line (without terminal escape sequences)"
144         self.print(file)
145
146
147 class WvPlainLine(WvLine):
148     re = re.compile("(?P<line>.*)")
149
150     def __str__(self):
151         return self.line
152
153 class WvTestingLine(WvLine):
154     re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
155
156     def __init__(self, *args):
157         if len(args) == 1:
158             WvLine.__init__(self, args[0])
159         elif len(args) == 2:
160             self.prefix = ''
161             self.what = args[0]
162             self.where = args[1]
163         else:
164             raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
165
166     def __str__(self):
167         return '{self.prefix}! Testing "{self.what}" in {self.where}:'.format(self=self)
168
169     def print(self, file=sys.stdout):
170         print(term.attr.bold + str(self) + term.attr.reset, file=file)
171
172     def log(self, file):
173         print(str(self), file=file)
174
175     def asWvCheckLine(self, result):
176         return WvCheckLine('{self.where}  {self.what}'.format(self=self), result)
177
178 class WvCheckLine(WvLine):
179     re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
180
181     def __init__(self, *args):
182         if len(args) == 1:
183             WvLine.__init__(self, args[0])
184         elif len(args) == 2:
185             self.prefix = ''
186             self.text = args[0]
187             self.result = args[1]
188         else:
189             raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
190
191     def __str__(self):
192         # Result == None when printing progress message
193         return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
194
195     def is_success(self):
196         return self.result == 'ok'
197
198     def formated(self):
199         text = '{self.prefix}! {self.text} '.format(self=self)
200         if self.is_success():
201             color = term.fg.lightgreen
202         else:
203             color = term.fg.lightred
204         result = term.attr.bold + color + self.result + term.attr.reset
205
206         lines = math.ceil(len(text) / term.width)
207         if len(text) % term.width > term.width - 10:
208             lines += 1
209
210         text = format(text, '.<' + str(lines * term.width - 10))
211         return '{text} {result}'.format(text=text, result=result)
212
213     def print(self, file=sys.stdout):
214         print(self.formated(), file=file)
215
216     def log(self, file=sys.stdout):
217         text = '{self.prefix}! {self.text} '.format(self=self)
218         print('{text:.<80} {result}'.format(text=text, result=self.result), file=file)
219
220
221 class WvTagLine(WvLine):
222     re  = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
223
224 class WvTestLog(list):
225
226     class Verbosity:
227         # Print one line for each "Testing" section. Passed tests are
228         # printed as "ok", failed tests as "FAILURE".
229         SUMMARY = 1
230
231         # Print one "ok" line for each passing "Testing" section.
232         # Failed "Testing" sections are printed verbosely.
233         NORMAL  = 2
234
235         # Print every line of the output, just
236         # reformat/syntax-highlight known lines.
237         VERBOSE = 3
238
239     def __init__(self, verbosity = Verbosity.NORMAL, junit_xml : io.IOBase = None,
240                  logdir = None):
241         self.checkCount = 0
242         self.checkFailedCount = 0
243         self.testCount = 0
244         self.testFailedCount = 0
245
246         self.implicitTestTitle = None
247         self.currentTest = None
248         self.currentTestFailedCount = 0
249
250         self.verbosity = verbosity
251         self.show_progress = False
252
253         self.junit_xml = junit_xml
254
255         if junit_xml:
256             global wvjunit
257             import wvjunit
258             self.junitTestcases = []
259
260         self.logdir = logdir
261         self.log = None
262         if logdir and not os.path.isdir(logdir):
263             os.mkdir(logdir)
264
265     def setImplicitTestTitle (self, testing):
266         """If the test does not supply its own title as a first line of test
267         output, it this title will be used instead."""
268         self.implicitTestTitle = testing
269
270     def print(self, file=sys.stdout):
271         for entry in self:
272             entry.print(file=file)
273
274     def __str__(self):
275         s = ''
276         for entry in self:
277             if 'formated' in dir(entry):
278                 e = entry.formated()
279             else:
280                 e = str(entry)
281             s += e + "\n"
282         return s
283
284     def plainText(self):
285         return "\n".join([str(entry) for entry in self]) + "\n"
286
287     def _rememberJUnitTestcase(self):
288         if not self.junit_xml:
289             return
290
291         failure = None
292         if self.currentTestFailedCount > 0:
293             failure = wvjunit.Failure(text=self.plainText())
294
295         tc = wvjunit.Testcase(classname = self.currentTest.where,
296                               name = self.currentTest.what,
297                               time = time.time() - self.testStartTime,
298                               failure = failure)
299         self.junitTestcases.append(tc)
300
301     def _generateJUnitXML(self):
302         if not self.junit_xml:
303             return
304         ts = wvjunit.Testsuite(tests = self.testCount,
305                                failures = self.testFailedCount,
306                                errors = 0,
307                                name = 'N/A',
308                                time = 0,
309                                hostname="localhost",
310                                timestamp = datetime.datetime.now(),
311                                testcases = self.junitTestcases)
312         ts.print(file = self.junit_xml)
313
314     def _finishCurrentTest(self):
315         self._rememberJUnitTestcase()
316         if self.currentTestFailedCount > 0:
317             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
318                 term.clear_progress_msg()
319             if self.verbosity == self.Verbosity.NORMAL:
320                 self.print()
321             elif self.verbosity < self.Verbosity.NORMAL:
322                 self.currentTest.asWvCheckLine('FAILED').print()
323             self.testFailedCount += 1
324         else:
325             if self.verbosity <= self.Verbosity.NORMAL:
326                 self.currentTest.asWvCheckLine('ok').print()
327         sys.stdout.flush()
328         self.clear()
329         if self.log:
330             self.log.close()
331
332     def clear(self):
333         del self[:]
334
335     def _newTest(self, testing : WvTestingLine):
336         if self.currentTest:
337             self._finishCurrentTest()
338         if testing != None:
339             self.testCount += 1
340             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
341                 term.set_progress_msg(str(testing.asWvCheckLine(None)))
342
343             if self.logdir:
344                 self.log = open(os.path.join(self.logdir, "%s-%s.log" %
345                                              (testing.where, testing.what.lower().replace(' ', '_'))),
346                                 'w')
347             self.testStartTime = time.time()
348         self.currentTest = testing
349         self.currentTestFailedCount = 0
350
351     def _newCheck(self, check : WvCheckLine):
352         self.checkCount += 1
353         if not check.is_success():
354             self.checkFailedCount += 1
355             self.currentTestFailedCount += 1
356
357     def append(self, logEntry : WvLine):
358         if self.implicitTestTitle:
359             if str(logEntry) == '':
360                 pass
361             elif type(logEntry) != WvTestingLine:
362                 self._newTest(self.implicitTestTitle)
363                 super().append(self.implicitTestTitle)
364                 self.implicitTestTitle = None
365             else:
366                 self.implicitTestTitle = None
367
368
369         if type(logEntry) == WvTestingLine:
370             self._newTest(logEntry)
371         elif type(logEntry) == WvCheckLine:
372             self._newCheck(logEntry)
373
374         list.append(self, logEntry)
375
376         if self.verbosity == self.Verbosity.VERBOSE:
377             logEntry.print()
378         else:
379             if self.show_progress:
380                 term.update_progress_msg()
381
382         if self.log:
383             logEntry.log(self.log)
384
385     def addLine(self, line):
386         line = line.rstrip()
387         logEntry = None
388
389         for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
390             match = lineClass.re.match(line)
391             if match:
392                 logEntry = lineClass(match)
393                 break
394         if not logEntry:
395             raise Exception("Non-matched line: {}".format(line))
396
397         self.append(logEntry)
398
399     def done(self):
400         self._newTest(None)
401
402         self._generateJUnitXML()
403
404         print("WvTest: {total} test{plt}, {fail} failure{plf}."
405               .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
406                       fail = self.testFailedCount, plf = '' if self.testFailedCount  == 1 else 's'))
407     def is_success(self):
408         return self.testFailedCount == 0
409
410 def _run(command, log):
411     log.show_progress = True
412     timeout = 100
413
414     def kill_child(sig = None, frame = None):
415         os.killpg(proc.pid, sig)
416
417     def alarm(sig = None, frame = None):
418         msg = "! {wvtool}: Alarm timed out!  No test output for {timeout} seconds.  FAILED"
419         log.addLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
420         kill_child(signal.SIGTERM)
421
422     signal.signal(signal.SIGINT, kill_child)
423     signal.signal(signal.SIGTERM, kill_child)
424     signal.signal(signal.SIGALRM, alarm)
425
426     cmd = command if isinstance(command, str) else ' '.join(command)
427     log.setImplicitTestTitle(WvTestingLine("Executing "+cmd, "wvtool"))
428
429     # Popen does not seem to be able to call setpgrp(). Therefore, we
430     # use start_new_session, but this also create a new session and
431     # detaches the process from a terminal. This might be a problem
432     # for programs that need a terminal to run.
433     with sp.Popen(command, stdin=None, stdout=sp.PIPE, stderr=sp.STDOUT,
434                   universal_newlines=False, start_new_session=True) as proc:
435         signal.alarm(timeout)
436         stdout = io.TextIOWrapper(proc.stdout, errors='replace')
437         for line in stdout:
438             signal.alarm(timeout)
439             log.addLine(line)
440
441     signal.alarm(0)
442
443     if proc.returncode != 0:
444         if proc.returncode > 0:
445             msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
446         else:
447             msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
448
449         text = msg.format(wvtool=sys.argv[0], cmd=cmd,
450                           ec=proc.returncode, sig=-proc.returncode)
451         log.append(WvCheckLine(text, 'FAILED'))
452
453 def do_run(args, log):
454     _run(args.command, log)
455
456 def do_runall(args, log):
457     for cmd in args.commands:
458         _run(cmd, log)
459
460 def do_format(args, log):
461     files = args.infiles
462     if len(files) == 0:
463         log.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
464         for line in sys.stdin:
465             log.addLine(line)
466     else:
467         for fn in args.infiles:
468             log.setImplicitTestTitle(WvTestingLine("Preamble", fn))
469             for line in open(fn):
470                 log.addLine(line)
471
472 def do_wrap(args, log):
473     pass
474
475 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
476
477 parser.set_defaults(verbosity=WvTestLog.Verbosity.NORMAL)
478 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
479                     const=WvTestLog.Verbosity.VERBOSE,
480                     help='Do not hide output of successful tests')
481 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
482                     const=WvTestLog.Verbosity.SUMMARY,
483                     help='''Hide output of all tests. Print just one line for each "Testing"
484                     section and report "ok" or "FAILURE" of it.''')
485 parser.add_argument('--junit-xml', type=argparse.FileType('w'),
486                     help='''Convert output to JUnit compatible XML file''')
487 parser.add_argument('--logdir',
488                     help='''Store test logs in the given directory''')
489 parser.add_argument('--version', action='version', version='%(prog)s '+version)
490
491 subparsers = parser.add_subparsers(help='sub-command help')
492
493 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
494 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
495 parser_run.set_defaults(func=do_run)
496
497 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
498 parser_runall.set_defaults(func=do_runall)
499 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
500
501 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
502 parser_format.set_defaults(func=do_format)
503 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
504
505 # parser_wrap = subparsers.add_parser('wrap')
506 # parser_wrap.set_defaults(func=do_wrap)
507
508 args = parser.parse_args()
509
510 if not 'func' in args:
511     parser.print_help()
512     sys.exit(1)
513
514 log = WvTestLog(args.verbosity, junit_xml = args.junit_xml, logdir=args.logdir)
515 args.func(args, log)
516 log.done()
517 sys.exit(0 if log.is_success() else 1)
518
519 # Local Variables:
520 # End: