]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/python/contrib/Lib/multiprocessing/util.py
Inital import
[l4.git] / l4 / pkg / python / contrib / Lib / multiprocessing / util.py
1 #
2 # Module providing various facilities to other parts of the package
3 #
4 # multiprocessing/util.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7 #
8
9 import itertools
10 import weakref
11 import atexit
12 import threading        # we want threading to install it's
13                         # cleanup function before multiprocessing does
14
15 from multiprocessing.process import current_process, active_children
16
17 __all__ = [
18     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
19     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
20     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
21     'SUBDEBUG', 'SUBWARNING',
22     ]
23
24 #
25 # Logging
26 #
27
28 NOTSET = 0
29 SUBDEBUG = 5
30 DEBUG = 10
31 INFO = 20
32 SUBWARNING = 25
33
34 LOGGER_NAME = 'multiprocessing'
35 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
36
37 _logger = None
38 _log_to_stderr = False
39
40 def sub_debug(msg, *args):
41     if _logger:
42         _logger.log(SUBDEBUG, msg, *args)
43
44 def debug(msg, *args):
45     if _logger:
46         _logger.log(DEBUG, msg, *args)
47
48 def info(msg, *args):
49     if _logger:
50         _logger.log(INFO, msg, *args)
51
52 def sub_warning(msg, *args):
53     if _logger:
54         _logger.log(SUBWARNING, msg, *args)
55
56 def get_logger():
57     '''
58     Returns logger used by multiprocessing
59     '''
60     global _logger
61     import logging, atexit
62
63     logging._acquireLock()
64     try:
65         if not _logger:
66
67             _logger = logging.getLogger(LOGGER_NAME)
68             _logger.propagate = 0
69             logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
70             logging.addLevelName(SUBWARNING, 'SUBWARNING')
71
72             # XXX multiprocessing should cleanup before logging
73             if hasattr(atexit, 'unregister'):
74                 atexit.unregister(_exit_function)
75                 atexit.register(_exit_function)
76             else:
77                 atexit._exithandlers.remove((_exit_function, (), {}))
78                 atexit._exithandlers.append((_exit_function, (), {}))
79
80     finally:
81         logging._releaseLock()
82
83     return _logger
84
85 def log_to_stderr(level=None):
86     '''
87     Turn on logging and add a handler which prints to stderr
88     '''
89     global _log_to_stderr
90     import logging
91
92     logger = get_logger()
93     formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
94     handler = logging.StreamHandler()
95     handler.setFormatter(formatter)
96     logger.addHandler(handler)
97
98     if level:
99         logger.setLevel(level)
100     _log_to_stderr = True
101     return _logger
102
103 #
104 # Function returning a temp directory which will be removed on exit
105 #
106
107 def get_temp_dir():
108     # get name of a temp directory which will be automatically cleaned up
109     if current_process()._tempdir is None:
110         import shutil, tempfile
111         tempdir = tempfile.mkdtemp(prefix='pymp-')
112         info('created temp directory %s', tempdir)
113         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
114         current_process()._tempdir = tempdir
115     return current_process()._tempdir
116
117 #
118 # Support for reinitialization of objects when bootstrapping a child process
119 #
120
121 _afterfork_registry = weakref.WeakValueDictionary()
122 _afterfork_counter = itertools.count()
123
124 def _run_after_forkers():
125     items = list(_afterfork_registry.items())
126     items.sort()
127     for (index, ident, func), obj in items:
128         try:
129             func(obj)
130         except Exception, e:
131             info('after forker raised exception %s', e)
132
133 def register_after_fork(obj, func):
134     _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
135
136 #
137 # Finalization using weakrefs
138 #
139
140 _finalizer_registry = {}
141 _finalizer_counter = itertools.count()
142
143
144 class Finalize(object):
145     '''
146     Class which supports object finalization using weakrefs
147     '''
148     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
149         assert exitpriority is None or type(exitpriority) is int
150
151         if obj is not None:
152             self._weakref = weakref.ref(obj, self)
153         else:
154             assert exitpriority is not None
155
156         self._callback = callback
157         self._args = args
158         self._kwargs = kwargs or {}
159         self._key = (exitpriority, _finalizer_counter.next())
160
161         _finalizer_registry[self._key] = self
162
163     def __call__(self, wr=None):
164         '''
165         Run the callback unless it has already been called or cancelled
166         '''
167         try:
168             del _finalizer_registry[self._key]
169         except KeyError:
170             sub_debug('finalizer no longer registered')
171         else:
172             sub_debug('finalizer calling %s with args %s and kwargs %s',
173                      self._callback, self._args, self._kwargs)
174             res = self._callback(*self._args, **self._kwargs)
175             self._weakref = self._callback = self._args = \
176                             self._kwargs = self._key = None
177             return res
178
179     def cancel(self):
180         '''
181         Cancel finalization of the object
182         '''
183         try:
184             del _finalizer_registry[self._key]
185         except KeyError:
186             pass
187         else:
188             self._weakref = self._callback = self._args = \
189                             self._kwargs = self._key = None
190
191     def still_active(self):
192         '''
193         Return whether this finalizer is still waiting to invoke callback
194         '''
195         return self._key in _finalizer_registry
196
197     def __repr__(self):
198         try:
199             obj = self._weakref()
200         except (AttributeError, TypeError):
201             obj = None
202
203         if obj is None:
204             return '<Finalize object, dead>'
205
206         x = '<Finalize object, callback=%s' % \
207             getattr(self._callback, '__name__', self._callback)
208         if self._args:
209             x += ', args=' + str(self._args)
210         if self._kwargs:
211             x += ', kwargs=' + str(self._kwargs)
212         if self._key[0] is not None:
213             x += ', exitprority=' + str(self._key[0])
214         return x + '>'
215
216
217 def _run_finalizers(minpriority=None):
218     '''
219     Run all finalizers whose exit priority is not None and at least minpriority
220
221     Finalizers with highest priority are called first; finalizers with
222     the same priority will be called in reverse order of creation.
223     '''
224     if minpriority is None:
225         f = lambda p : p[0][0] is not None
226     else:
227         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
228
229     items = [x for x in _finalizer_registry.items() if f(x)]
230     items.sort(reverse=True)
231
232     for key, finalizer in items:
233         sub_debug('calling %s', finalizer)
234         try:
235             finalizer()
236         except Exception:
237             import traceback
238             traceback.print_exc()
239
240     if minpriority is None:
241         _finalizer_registry.clear()
242
243 #
244 # Clean up on exit
245 #
246
247 def is_exiting():
248     '''
249     Returns true if the process is shutting down
250     '''
251     return _exiting or _exiting is None
252
253 _exiting = False
254
255 def _exit_function():
256     global _exiting
257
258     info('process shutting down')
259     debug('running all "atexit" finalizers with priority >= 0')
260     _run_finalizers(0)
261
262     for p in active_children():
263         if p._daemonic:
264             info('calling terminate() for daemon %s', p.name)
265             p._popen.terminate()
266
267     for p in active_children():
268         info('calling join() for process %s', p.name)
269         p.join()
270
271     debug('running the remaining "atexit" finalizers')
272     _run_finalizers()
273
274 atexit.register(_exit_function)
275
276 #
277 # Some fork aware types
278 #
279
280 class ForkAwareThreadLock(object):
281     def __init__(self):
282         self._lock = threading.Lock()
283         self.acquire = self._lock.acquire
284         self.release = self._lock.release
285         register_after_fork(self, ForkAwareThreadLock.__init__)
286
287 class ForkAwareLocal(threading.local):
288     def __init__(self):
289         register_after_fork(self, lambda obj : obj.__dict__.clear())
290     def __reduce__(self):
291         return type(self), ()