]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/python/contrib/Lib/ctypes/test/__init__.py
Inital import
[l4.git] / l4 / pkg / python / contrib / Lib / ctypes / test / __init__.py
1 import os, sys, unittest, getopt, time
2
3 use_resources = []
4
5 class ResourceDenied(Exception):
6     """Test skipped because it requested a disallowed resource.
7
8     This is raised when a test calls requires() for a resource that
9     has not be enabled.  Resources are defined by test modules.
10     """
11
12 def is_resource_enabled(resource):
13     """Test whether a resource is enabled.
14
15     If the caller's module is __main__ then automatically return True."""
16     if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
17         return True
18     result = use_resources is not None and \
19            (resource in use_resources or "*" in use_resources)
20     if not result:
21         _unavail[resource] = None
22     return result
23
24 _unavail = {}
25 def requires(resource, msg=None):
26     """Raise ResourceDenied if the specified resource is not available.
27
28     If the caller's module is __main__ then automatically return True."""
29     # see if the caller's module is __main__ - if so, treat as if
30     # the resource was set
31     if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
32         return
33     if not is_resource_enabled(resource):
34         if msg is None:
35             msg = "Use of the `%s' resource not enabled" % resource
36         raise ResourceDenied(msg)
37
38 def find_package_modules(package, mask):
39     import fnmatch
40     if (hasattr(package, "__loader__") and
41             hasattr(package.__loader__, '_files')):
42         path = package.__name__.replace(".", os.path.sep)
43         mask = os.path.join(path, mask)
44         for fnm in package.__loader__._files.iterkeys():
45             if fnmatch.fnmatchcase(fnm, mask):
46                 yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
47     else:
48         path = package.__path__[0]
49         for fnm in os.listdir(path):
50             if fnmatch.fnmatchcase(fnm, mask):
51                 yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
52
53 def get_tests(package, mask, verbosity, exclude=()):
54     """Return a list of skipped test modules, and a list of test cases."""
55     tests = []
56     skipped = []
57     for modname in find_package_modules(package, mask):
58         if modname.split(".")[-1] in exclude:
59             skipped.append(modname)
60             if verbosity > 1:
61                 print >> sys.stderr, "Skipped %s: excluded" % modname
62             continue
63         try:
64             mod = __import__(modname, globals(), locals(), ['*'])
65         except ResourceDenied, detail:
66             skipped.append(modname)
67             if verbosity > 1:
68                 print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
69             continue
70         for name in dir(mod):
71             if name.startswith("_"):
72                 continue
73             o = getattr(mod, name)
74             if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
75                 tests.append(o)
76     return skipped, tests
77
78 def usage():
79     print __doc__
80     return 1
81
82 def test_with_refcounts(runner, verbosity, testcase):
83     """Run testcase several times, tracking reference counts."""
84     import gc
85     import ctypes
86     ptc = ctypes._pointer_type_cache.copy()
87     cfc = ctypes._c_functype_cache.copy()
88     wfc = ctypes._win_functype_cache.copy()
89
90     # when searching for refcount leaks, we have to manually reset any
91     # caches that ctypes has.
92     def cleanup():
93         ctypes._pointer_type_cache = ptc.copy()
94         ctypes._c_functype_cache = cfc.copy()
95         ctypes._win_functype_cache = wfc.copy()
96         gc.collect()
97
98     test = unittest.makeSuite(testcase)
99     for i in range(5):
100         rc = sys.gettotalrefcount()
101         runner.run(test)
102         cleanup()
103     COUNT = 5
104     refcounts = [None] * COUNT
105     for i in range(COUNT):
106         rc = sys.gettotalrefcount()
107         runner.run(test)
108         cleanup()
109         refcounts[i] = sys.gettotalrefcount() - rc
110     if filter(None, refcounts):
111         print "%s leaks:\n\t" % testcase, refcounts
112     elif verbosity:
113         print "%s: ok." % testcase
114
115 class TestRunner(unittest.TextTestRunner):
116     def run(self, test, skipped):
117         "Run the given test case or test suite."
118         # Same as unittest.TextTestRunner.run, except that it reports
119         # skipped tests.
120         result = self._makeResult()
121         startTime = time.time()
122         test(result)
123         stopTime = time.time()
124         timeTaken = stopTime - startTime
125         result.printErrors()
126         self.stream.writeln(result.separator2)
127         run = result.testsRun
128         if _unavail: #skipped:
129             requested = _unavail.keys()
130             requested.sort()
131             self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
132                                 (run, run != 1 and "s" or "", timeTaken,
133                                  len(skipped),
134                                  len(skipped) != 1 and "s" or ""))
135             self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
136         else:
137             self.stream.writeln("Ran %d test%s in %.3fs" %
138                                 (run, run != 1 and "s" or "", timeTaken))
139         self.stream.writeln()
140         if not result.wasSuccessful():
141             self.stream.write("FAILED (")
142             failed, errored = map(len, (result.failures, result.errors))
143             if failed:
144                 self.stream.write("failures=%d" % failed)
145             if errored:
146                 if failed: self.stream.write(", ")
147                 self.stream.write("errors=%d" % errored)
148             self.stream.writeln(")")
149         else:
150             self.stream.writeln("OK")
151         return result
152
153
154 def main(*packages):
155     try:
156         opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
157     except getopt.error:
158         return usage()
159
160     verbosity = 1
161     search_leaks = False
162     exclude = []
163     for flag, value in opts:
164         if flag == "-q":
165             verbosity -= 1
166         elif flag == "-v":
167             verbosity += 1
168         elif flag == "-r":
169             try:
170                 sys.gettotalrefcount
171             except AttributeError:
172                 print >> sys.stderr, "-r flag requires Python debug build"
173                 return -1
174             search_leaks = True
175         elif flag == "-u":
176             use_resources.extend(value.split(","))
177         elif flag == "-x":
178             exclude.extend(value.split(","))
179
180     mask = "test_*.py"
181     if args:
182         mask = args[0]
183
184     for package in packages:
185         run_tests(package, mask, verbosity, search_leaks, exclude)
186
187
188 def run_tests(package, mask, verbosity, search_leaks, exclude):
189     skipped, testcases = get_tests(package, mask, verbosity, exclude)
190     runner = TestRunner(verbosity=verbosity)
191
192     suites = [unittest.makeSuite(o) for o in testcases]
193     suite = unittest.TestSuite(suites)
194     result = runner.run(suite, skipped)
195
196     if search_leaks:
197         # hunt for refcount leaks
198         runner = BasicTestRunner()
199         for t in testcases:
200             test_with_refcounts(runner, verbosity, t)
201
202     return bool(result.errors)
203
204 class BasicTestRunner:
205     def run(self, test):
206         result = unittest.TestResult()
207         test(result)
208         return result