1 import os, sys, unittest, getopt, time
5 class ResourceDenied(Exception):
6 """Test skipped because it requested a disallowed resource.
8 This is raised when a test calls requires() for a resource that
9 has not be enabled. Resources are defined by test modules.
12 def is_resource_enabled(resource):
13 """Test whether a resource is enabled.
15 If the caller's module is __main__ then automatically return True."""
16 if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
18 result = use_resources is not None and \
19 (resource in use_resources or "*" in use_resources)
21 _unavail[resource] = None
25 def requires(resource, msg=None):
26 """Raise ResourceDenied if the specified resource is not available.
28 If the caller's module is __main__ then automatically return True."""
29 # see if the caller's module is __main__ - if so, treat as if
30 # the resource was set
31 if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
33 if not is_resource_enabled(resource):
35 msg = "Use of the `%s' resource not enabled" % resource
36 raise ResourceDenied(msg)
38 def find_package_modules(package, mask):
40 if (hasattr(package, "__loader__") and
41 hasattr(package.__loader__, '_files')):
42 path = package.__name__.replace(".", os.path.sep)
43 mask = os.path.join(path, mask)
44 for fnm in package.__loader__._files.iterkeys():
45 if fnmatch.fnmatchcase(fnm, mask):
46 yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
48 path = package.__path__[0]
49 for fnm in os.listdir(path):
50 if fnmatch.fnmatchcase(fnm, mask):
51 yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
53 def get_tests(package, mask, verbosity, exclude=()):
54 """Return a list of skipped test modules, and a list of test cases."""
57 for modname in find_package_modules(package, mask):
58 if modname.split(".")[-1] in exclude:
59 skipped.append(modname)
61 print >> sys.stderr, "Skipped %s: excluded" % modname
64 mod = __import__(modname, globals(), locals(), ['*'])
65 except ResourceDenied, detail:
66 skipped.append(modname)
68 print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
71 if name.startswith("_"):
73 o = getattr(mod, name)
74 if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
82 def test_with_refcounts(runner, verbosity, testcase):
83 """Run testcase several times, tracking reference counts."""
86 ptc = ctypes._pointer_type_cache.copy()
87 cfc = ctypes._c_functype_cache.copy()
88 wfc = ctypes._win_functype_cache.copy()
90 # when searching for refcount leaks, we have to manually reset any
91 # caches that ctypes has.
93 ctypes._pointer_type_cache = ptc.copy()
94 ctypes._c_functype_cache = cfc.copy()
95 ctypes._win_functype_cache = wfc.copy()
98 test = unittest.makeSuite(testcase)
100 rc = sys.gettotalrefcount()
104 refcounts = [None] * COUNT
105 for i in range(COUNT):
106 rc = sys.gettotalrefcount()
109 refcounts[i] = sys.gettotalrefcount() - rc
110 if filter(None, refcounts):
111 print "%s leaks:\n\t" % testcase, refcounts
113 print "%s: ok." % testcase
115 class TestRunner(unittest.TextTestRunner):
116 def run(self, test, skipped):
117 "Run the given test case or test suite."
118 # Same as unittest.TextTestRunner.run, except that it reports
120 result = self._makeResult()
121 startTime = time.time()
123 stopTime = time.time()
124 timeTaken = stopTime - startTime
126 self.stream.writeln(result.separator2)
127 run = result.testsRun
128 if _unavail: #skipped:
129 requested = _unavail.keys()
131 self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
132 (run, run != 1 and "s" or "", timeTaken,
134 len(skipped) != 1 and "s" or ""))
135 self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
137 self.stream.writeln("Ran %d test%s in %.3fs" %
138 (run, run != 1 and "s" or "", timeTaken))
139 self.stream.writeln()
140 if not result.wasSuccessful():
141 self.stream.write("FAILED (")
142 failed, errored = map(len, (result.failures, result.errors))
144 self.stream.write("failures=%d" % failed)
146 if failed: self.stream.write(", ")
147 self.stream.write("errors=%d" % errored)
148 self.stream.writeln(")")
150 self.stream.writeln("OK")
156 opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
163 for flag, value in opts:
171 except AttributeError:
172 print >> sys.stderr, "-r flag requires Python debug build"
176 use_resources.extend(value.split(","))
178 exclude.extend(value.split(","))
184 for package in packages:
185 run_tests(package, mask, verbosity, search_leaks, exclude)
188 def run_tests(package, mask, verbosity, search_leaks, exclude):
189 skipped, testcases = get_tests(package, mask, verbosity, exclude)
190 runner = TestRunner(verbosity=verbosity)
192 suites = [unittest.makeSuite(o) for o in testcases]
193 suite = unittest.TestSuite(suites)
194 result = runner.run(suite, skipped)
197 # hunt for refcount leaks
198 runner = BasicTestRunner()
200 test_with_refcounts(runner, verbosity, t)
202 return bool(result.errors)
204 class BasicTestRunner:
206 result = unittest.TestResult()