]> rtime.felk.cvut.cz Git - hercules2020/kcf.git/blob - wvtool
Remove debug printf
[hercules2020/kcf.git] / wvtool
1 #!/usr/bin/env python3
2
3 # Copyright 2014, 2015, 2017 Michal Sojka <sojkam1@fel.cvut.cz>
4 # License: GPLv2+
5
6 """Versatile WvTest protocol tool. It replaces wvtestrun script and
7 provides some other useful features. Namely:
8 - Summary mode (--summary)
9 - Test results aligned to the same column
10 - (Experimental) Export to JUnit XML
11 - TODO: Conversion to HTML
12 - TODO: Variable timeout
13 - TODO: Checking of expected number of tests
14
15 Newest version can be found at https://github.com/wentasah/wvtest.
16 """
17
18 version = "v0-53-g52e7819"
19
20 import argparse
21 import subprocess as sp
22 import re
23 import sys
24 import os
25 import signal
26 import math
27 import io
28 import datetime
29 import time
30 import socket
31
32 # Regulr expression that matches potential prefixes to wvtest protocol lines
33 re_prefix = ''
34
35 class Term:
36     class attr:
37         reset         = '\033[0m'
38         bold          = '\033[01m'
39         disable       = '\033[02m'
40         underline     = '\033[04m'
41         reverse       = '\033[07m'
42         strikethrough = '\033[09m'
43         invisible     = '\033[08m'
44     class fg:
45         black      = '\033[30m'
46         red        = '\033[31m'
47         green      = '\033[32m'
48         orange     = '\033[33m'
49         blue       = '\033[34m'
50         purple     = '\033[35m'
51         cyan       = '\033[36m'
52         lightgrey  = '\033[37m'
53         darkgrey   = '\033[1;30m'
54         lightred   = '\033[1;31m'
55         lightgreen = '\033[1;32m'
56         yellow     = '\033[1;33m'
57         lightblue  = '\033[1;34m'
58         pink       = '\033[1;35m'
59         lightcyan  = '\033[1;36m'
60     class bg:
61         black     = '\033[40m'
62         red       = '\033[41m'
63         green     = '\033[42m'
64         orange    = '\033[43m'
65         blue      = '\033[44m'
66         purple    = '\033[45m'
67         cyan      = '\033[46m'
68         lightgrey = '\033[47m'
69
70     progress_chars = '|/-\\'
71
72     def __init__(self, width=None):
73         if not 'TERM'  in os.environ or os.environ['TERM'] == 'dumb':
74             self.output = None
75         else:
76             try:
77                 self.output = open('/dev/tty', 'w')
78             except IOError:
79                 self.output = None
80
81         self.width = width or self._get_width()
82         self._enabled = True
83         self._progress_msg = ''
84         self._progress_idx = 0
85
86     def _raw_write(self, string):
87         '''Write raw data if output is enabled.'''
88         if self._enabled and self.output:
89             try:
90                 self.output.write(string)
91                 self.output.flush()
92             except IOError:
93                 self._enabled = False
94
95     def _get_width(self):
96         try:
97             import fcntl, termios, struct, os
98             s = struct.pack('HHHH', 0, 0, 0, 0)
99             x = fcntl.ioctl(self.output.fileno(), termios.TIOCGWINSZ, s)
100             width = struct.unpack('HHHH', x)[1]
101             if width <= 0:
102                 raise Exception
103             return width
104         except:
105             return int(getattr(os.environ, 'COLUMNS', 80))
106
107     def clear_colors(self):
108         '''Sets all color and attribute memebers to empty strings'''
109         for cls in ('attr', 'fg', 'bg'):
110             c = getattr(self, cls)
111             for key in dir(c):
112                 if key[0] == '_':
113                     continue
114                 setattr(c, key, '')
115
116     def set_progress_msg(self, msg):
117         self._progress_msg = msg
118         self._progress_idx = 0
119         self.update_progress_msg()
120
121     def update_progress_msg(self):
122         self._progress_idx += 1
123         if self._progress_idx >= len(self.progress_chars):
124             self._progress_idx = 0
125         if self.output:
126             self._raw_write(self._progress_msg[:self.width - 3] + " " + self.progress_chars[self._progress_idx] + "\r")
127
128     def clear_progress_msg(self):
129         if self.output:
130             self._raw_write(' '*(len(self._progress_msg[:self.width - 3]) + 2) + "\r")
131
132
133 class WvLine:
134     def __init__(self, match):
135         for (key, val) in match.groupdict().items():
136             setattr(self, key, val)
137
138     def print(self, file=sys.stdout):
139         "Print the line (terminal is expected on output)"
140         print(str(self), file=file)
141
142     def log(self, file=sys.stdout):
143         "Print the line (without terminal escape sequences)"
144         self.print(file)
145
146
147 class WvPlainLine(WvLine):
148     re = re.compile("(?P<line>.*)")
149
150     def __str__(self):
151         return self.line
152
153 class WvTestingLine(WvLine):
154     re = re.compile('(?P<prefix>' + re_prefix + ')Testing "(?P<what>.*)" in (?P<where>.*):$')
155
156     def __init__(self, *args):
157         if len(args) == 1:
158             WvLine.__init__(self, args[0])
159         elif len(args) == 2:
160             self.prefix = ''
161             self.what = args[0]
162             self.where = args[1]
163         else:
164             raise TypeError("WvTestingLine.__init__() takes at most 2 positional arguments")
165
166     def __str__(self):
167         return '{self.prefix}Testing "{self.what}" in {self.where}:'.format(self=self)
168
169     def print(self, file=sys.stdout):
170         print(term.attr.bold + str(self) + term.attr.reset, file=file)
171
172     def log(self, file):
173         print(str(self), file=file)
174
175     def asWvCheckLine(self, result):
176         return WvCheckLine('{self.where}  {self.what}'.format(self=self), result)
177
178 class WvCheckLine(WvLine):
179     re = re.compile('(?P<prefix>' + re_prefix + ')!\s*(?P<text>.*?)\s+(?P<result>\S+)$')
180
181     def __init__(self, *args):
182         if len(args) == 1:
183             WvLine.__init__(self, args[0])
184         elif len(args) == 2:
185             self.prefix = ''
186             self.text = args[0].rstrip(' .')
187             self.result = args[1]
188         else:
189             raise TypeError("WvCheckLine.__init__() takes at most 2 positional arguments")
190
191     def __str__(self):
192         # Result == None when printing progress message
193         return '{self.prefix}! {self.text} {result}'.format(self=self, result=(self.result or ''))
194
195     def is_success(self):
196         return self.result == 'ok'
197
198     def formated(self, highlight=True, include_newlines=False, result_space=10):
199         text = '{self.prefix}! {self.text} '.format(self=self)
200         if highlight:
201             if self.is_success():
202                 color = term.fg.lightgreen
203             else:
204                 color = term.fg.lightred
205
206             result = term.attr.bold + color + self.result + term.attr.reset
207             width = term.width
208         else:
209             result = self.result
210             width = 80
211
212         lines = math.ceil((len(text) + result_space) / width)
213         text = format(text, '.<' + str(lines * width - result_space))
214         if include_newlines:
215             for i in reversed(range(width, width*lines, width)):
216                 text = text[:i] + '\n' + text[i:]
217         return '{text} {result}'.format(text=text, result=result)
218
219     def print(self, file=sys.stdout):
220         print(self.formated(), file=file)
221
222     def log(self, file=sys.stdout):
223         text = '{self.prefix}! {self.text} '.format(self=self)
224         print('{text:.<80} {result}'.format(text=text, result=self.result), file=file)
225
226
227 class WvTagLine(WvLine):
228     re  = re.compile('(?P<prefix>' + re_prefix + ')wvtest:\s*(?P<tag>.*)$')
229
230 class WvTestProcessor(list):
231
232     class Verbosity:
233         # Print one line for each "Testing" section. Passed tests are
234         # printed as "ok", failed tests as "FAILURE".
235         SUMMARY = 1
236
237         # Print one "ok" line for each passing "Testing" section.
238         # Failed "Testing" sections are printed verbosely.
239         NORMAL  = 2
240
241         # Print every line of the output, just
242         # reformat/syntax-highlight known lines.
243         VERBOSE = 3
244
245     def __init__(self,
246                  verbosity = Verbosity.NORMAL,
247                  junit_xml: io.IOBase = None,
248                  junit_prefix: str = '',
249                  logdir = None):
250         self.checkCount = 0
251         self.checkFailedCount = 0
252         self.testCount = 0
253         self.testFailedCount = 0
254
255         self.implicitTestTitle = None
256         self.currentTest = None
257
258         self.verbosity = verbosity
259         self.show_progress = False
260
261         self.junit_xml = junit_xml
262         self.junit_prefix = junit_prefix
263
264         if junit_xml:
265             global wvjunit
266             import wvjunit
267             self.junitTestcases = []
268             self.junitTestsuites = []
269
270         self.logdir = logdir
271         self.log = None
272         if logdir and not os.path.isdir(logdir):
273             os.mkdir(logdir)
274
275     def setImplicitTestTitle (self, testing):
276         """If the test does not supply its own title as the first line of test
277         output, this title will be used instead."""
278         self.implicitTestTitle = testing
279
280     def print(self, file=sys.stdout):
281         for entry in self:
282             entry.print(file=file)
283
284     def __str__(self):
285         s = ''
286         for entry in self:
287             if 'formated' in dir(entry):
288                 e = entry.formated()
289             else:
290                 e = str(entry)
291             s += e + "\n"
292         return s
293
294     def plainText(self):
295         return "\n".join([str(entry) for entry in self]) + "\n"
296
297     def _rememberJUnitTestcase(self, check: WvCheckLine):
298         if not self.junit_xml:
299             return
300
301         t = time.time()
302         duration = t - (self.lastCheckTime or self.testStartTime)
303         self.lastCheckTime = t
304
305         if not check.is_success():
306             failure = wvjunit.Failure(type='WvTest check',
307                                       message=check.text)
308         else:
309             failure = None
310
311         self.junitTestcases.append(
312             wvjunit.Testcase(
313                 classname="{}{}.{}".format(
314                     self.junit_prefix,
315                     self.currentTest.where.replace('.', '_'),
316                     self.currentTest.what),
317                 name=check.text,
318                 time=duration,
319                 failure=failure))
320
321     def _rememberJUnitTestsuite(self):
322         if not self.junit_xml:
323             return
324
325         system_out = wvjunit.SystemOut(text=self.plainText())
326
327         ts = wvjunit.Testsuite(tests=self.checkCount,
328                                failures=self.checkFailedCount,
329                                errors=0,
330                                name="{}{}.{}".format(
331                                    self.junit_prefix,
332                                    self.currentTest.where.replace('.', '_'),
333                                    self.currentTest.what),
334                                time=time.time()-self.testStartTime,
335                                hostname=socket.getfqdn(),
336                                timestamp=datetime.datetime.now(),
337                                testcases=self.junitTestcases,
338                                system_out=system_out)
339         self.junitTestsuites.append(ts)
340         self.junitTestcases = []
341
342     def _generateJUnitXML(self):
343         if not self.junit_xml:
344             return
345         tss = wvjunit.Testsuites(testsuites=self.junitTestsuites)
346         tss.print(file=self.junit_xml)
347
348     def _finishCurrentTest(self):
349         self._rememberJUnitTestsuite()
350         if self.checkFailedCount > 0:
351             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
352                 term.clear_progress_msg()
353             if self.verbosity == self.Verbosity.NORMAL:
354                 self.print()
355             elif self.verbosity < self.Verbosity.NORMAL:
356                 self.currentTest.asWvCheckLine('FAILED').print()
357             self.testFailedCount += 1
358         else:
359             if self.verbosity <= self.Verbosity.NORMAL:
360                 self.currentTest.asWvCheckLine('ok').print()
361         sys.stdout.flush()
362         self.clear()
363         if self.log:
364             self.log.close()
365
366     def clear(self):
367         del self[:]
368
369     def _newTest(self, testing : WvTestingLine):
370         if self.currentTest:
371             self._finishCurrentTest()
372         if testing != None:
373             self.testCount += 1
374             if self.show_progress and self.verbosity < self.Verbosity.VERBOSE:
375                 term.set_progress_msg(str(testing.asWvCheckLine(None)))
376
377             if self.logdir:
378                 trans = str.maketrans(' /', '__')
379                 self.log = open(os.path.join(self.logdir, "%04d-%s-%s.log" %
380                                              (self.testCount,
381                                               testing.where.translate(trans),
382                                               testing.what.lower().translate(trans))),
383                                 'w')
384             self.testStartTime = time.time()
385             self.lastCheckTime = None
386         self.currentTest = testing
387         self.checkCount = 0
388         self.checkFailedCount = 0
389
390     def _newCheck(self, check: WvCheckLine):
391         self.checkCount += 1
392         if not check.is_success():
393             self.checkFailedCount += 1
394         self._rememberJUnitTestcase(check)
395
396     def append(self, logEntry: WvLine):
397         if self.implicitTestTitle:
398             if str(logEntry) == '':
399                 pass
400             elif type(logEntry) != WvTestingLine:
401                 self._newTest(self.implicitTestTitle)
402                 super().append(self.implicitTestTitle)
403                 self.implicitTestTitle = None
404             else:
405                 self.implicitTestTitle = None
406
407
408         if type(logEntry) == WvTestingLine:
409             self._newTest(logEntry)
410         elif type(logEntry) == WvCheckLine:
411             self._newCheck(logEntry)
412
413         list.append(self, logEntry)
414
415         if self.verbosity == self.Verbosity.VERBOSE:
416             logEntry.print()
417         else:
418             if self.show_progress:
419                 term.update_progress_msg()
420
421         if self.log:
422             logEntry.log(self.log)
423
424     def processLine(self, line):
425         line = line.rstrip()
426         logEntry = None
427
428         for lineClass in [ WvCheckLine, WvTestingLine, WvTagLine, WvPlainLine ]:
429             match = lineClass.re.match(line)
430             if match:
431                 logEntry = lineClass(match)
432                 break
433         if not logEntry:
434             raise Exception("Non-matched line: {}".format(line))
435
436         self.append(logEntry)
437
438     def done(self):
439         self._newTest(None)
440
441         self._generateJUnitXML()
442
443         print("WvTest: {total} test{plt}, {fail} failure{plf}."
444               .format(total = self.testCount, plt = '' if self.testCount == 1 else 's',
445                       fail = self.testFailedCount, plf = '' if self.testFailedCount  == 1 else 's'))
446     def is_success(self):
447         return self.testFailedCount == 0
448
449 def _run(command, processor, timeout=100):
450     processor.show_progress = True
451
452
453     def kill_child(sig = None, frame = None):
454         os.killpg(proc.pid, sig)
455
456     def alarm(sig = None, frame = None):
457         msg = "! {wvtool}: Alarm timed out!  No test output for {timeout} seconds.  FAILED"
458         processor.processLine(msg.format(wvtool=sys.argv[0], timeout=timeout))
459         kill_child(signal.SIGTERM)
460
461     signal.signal(signal.SIGINT, kill_child)
462     signal.signal(signal.SIGTERM, kill_child)
463     signal.signal(signal.SIGALRM, alarm)
464
465     cmd = command if isinstance(command, str) else ' '.join(command)
466     processor.setImplicitTestTitle(WvTestingLine("Preamble of "+cmd, "wvtool"))
467
468     # Popen does not seem to be able to call setpgrp(). Therefore, we
469     # use start_new_session, but this also create a new session and
470     # detaches the process from a terminal. This might be a problem
471     # for programs that need a terminal to run.
472     with sp.Popen(command, stdin=None, stdout=sp.PIPE, stderr=sp.STDOUT,
473                   universal_newlines=False, start_new_session=True) as proc:
474         signal.alarm(timeout)
475         stdout = io.TextIOWrapper(proc.stdout, errors='replace')
476         for line in stdout:
477             signal.alarm(timeout)
478             processor.processLine(line)
479
480     signal.alarm(0)
481
482     if proc.returncode != 0:
483         if proc.returncode > 0:
484             msg = "{wvtool}: Program '{cmd}' returned non-zero exit code {ec}"
485         else:
486             msg = "{wvtool}: Program '{cmd}' terminated by signal {sig}"
487
488         text = msg.format(wvtool=sys.argv[0], cmd=cmd,
489                           ec=proc.returncode, sig=-proc.returncode)
490         processor.append(WvCheckLine(text, 'FAILED'))
491
492 def do_run(args, processor):
493     _run(args.command, processor, timeout=args.timeout)
494
495 def do_runall(args, processor):
496     for cmd in args.commands:
497         _run(cmd, processor)
498
499 def do_format(args, processor):
500     files = args.infiles
501     if len(files) == 0:
502         processor.setImplicitTestTitle(WvTestingLine("Preamble", "stdin"))
503         for line in io.TextIOWrapper(sys.stdin.buffer, errors='replace'):
504             processor.processLine(line)
505     else:
506         for fn in args.infiles:
507             processor.setImplicitTestTitle(WvTestingLine("Preamble", fn))
508             for line in open(fn, errors='replace'):
509                 processor.processLine(line)
510
511 def do_wrap(args, processor):
512     pass
513
514 parser = argparse.ArgumentParser(description='Versatile wvtest tool')
515
516
517 parser.set_defaults(verbosity=WvTestProcessor.Verbosity.NORMAL)
518 parser.add_argument('-v', '--verbose', dest='verbosity', action='store_const',
519                     const=WvTestProcessor.Verbosity.VERBOSE,
520                     help='Do not hide output of successful tests')
521 parser.add_argument('-s', '--summary', dest='verbosity', action='store_const',
522                     const=WvTestProcessor.Verbosity.SUMMARY,
523                     help='''Hide output of all tests. Print just one line for each "Testing"
524                     section and report "ok" or "FAILURE" of it.''')
525 parser.add_argument('-w', '--width', type=int,
526                     help='Override terminal width or COLUMNS environment wariable.')
527 parser.add_argument('--timeout', type=int, default=100, metavar='SEC',
528                     help='Timeout in seconds for any test output (default %(default)s)')
529 parser.add_argument('--junit-xml', type=argparse.FileType('w'), metavar='FILE',
530                     help='''Convert output to JUnit compatible XML file''')
531 parser.add_argument('--junit-prefix', metavar='STR',
532                     help='''Prefix to prepend to generated class names (useful when a test is
533                     run multiple times in different environments)''')
534 parser.add_argument('--logdir', metavar='DIR',
535                     help='''Store test logs in the given directory''')
536 parser.add_argument('--color', action='store_true', default=None,
537                     help='Force color output')
538 parser.add_argument('--no-color', action='store_false', dest='color',
539                     help='Disable color output')
540
541 parser.add_argument('--version', action='version', version='%(prog)s '+version)
542
543 subparsers = parser.add_subparsers(help='sub-command help')
544
545 parser_run = subparsers.add_parser('run', help='Run and supervise a command producing wvtest output')
546 parser_run.add_argument('command', nargs=argparse.REMAINDER, help='Command to run')
547 parser_run.set_defaults(func=do_run)
548
549 parser_runall = subparsers.add_parser('runall', help='Run multiple scripts/binaries mentioned on command line')
550 parser_runall.set_defaults(func=do_runall)
551 parser_runall.add_argument('commands', nargs='+', help='Scripts/binaries to run')
552
553 parser_format = subparsers.add_parser('format', help='Reformat/highlight/summarize WvTest protocol output')
554 parser_format.set_defaults(func=do_format)
555 parser_format.add_argument('infiles', nargs='*', help='Files with wvtest output')
556
557 # parser_wrap = subparsers.add_parser('wrap')
558 # parser_wrap.set_defaults(func=do_wrap)
559
560 args = parser.parse_args()
561 term = Term(args.width)
562 if args.color is None and not term.output or \
563    args.color is False:
564     term.clear_colors()
565
566 if not 'func' in args:
567     parser.print_help()
568     sys.exit(1)
569
570 processor = WvTestProcessor(
571     args.verbosity,
572     junit_xml = args.junit_xml,
573     junit_prefix = args.junit_prefix,
574     logdir=args.logdir)
575 args.func(args, processor)
576 processor.done()
577 sys.exit(0 if processor.is_success() else 1)
578
579 # Local Variables:
580 # End: