]> rtime.felk.cvut.cz Git - l4.git/blob - l4/pkg/python/contrib/Lib/test/test_weakref.py
Inital import
[l4.git] / l4 / pkg / python / contrib / Lib / test / test_weakref.py
1 import gc
2 import sys
3 import unittest
4 import UserList
5 import weakref
6 import operator
7
8 from test import test_support
9
10 # Used in ReferencesTestCase.test_ref_created_during_del() .
11 ref_from_del = None
12
13 class C:
14     def method(self):
15         pass
16
17
18 class Callable:
19     bar = None
20
21     def __call__(self, x):
22         self.bar = x
23
24
25 def create_function():
26     def f(): pass
27     return f
28
29 def create_bound_method():
30     return C().method
31
32 def create_unbound_method():
33     return C.method
34
35
36 class TestBase(unittest.TestCase):
37
38     def setUp(self):
39         self.cbcalled = 0
40
41     def callback(self, ref):
42         self.cbcalled += 1
43
44
45 class ReferencesTestCase(TestBase):
46
47     def test_basic_ref(self):
48         self.check_basic_ref(C)
49         self.check_basic_ref(create_function)
50         self.check_basic_ref(create_bound_method)
51         self.check_basic_ref(create_unbound_method)
52
53         # Just make sure the tp_repr handler doesn't raise an exception.
54         # Live reference:
55         o = C()
56         wr = weakref.ref(o)
57         `wr`
58         # Dead reference:
59         del o
60         `wr`
61
62     def test_basic_callback(self):
63         self.check_basic_callback(C)
64         self.check_basic_callback(create_function)
65         self.check_basic_callback(create_bound_method)
66         self.check_basic_callback(create_unbound_method)
67
68     def test_multiple_callbacks(self):
69         o = C()
70         ref1 = weakref.ref(o, self.callback)
71         ref2 = weakref.ref(o, self.callback)
72         del o
73         self.assert_(ref1() is None,
74                      "expected reference to be invalidated")
75         self.assert_(ref2() is None,
76                      "expected reference to be invalidated")
77         self.assert_(self.cbcalled == 2,
78                      "callback not called the right number of times")
79
80     def test_multiple_selfref_callbacks(self):
81         # Make sure all references are invalidated before callbacks are called
82         #
83         # What's important here is that we're using the first
84         # reference in the callback invoked on the second reference
85         # (the most recently created ref is cleaned up first).  This
86         # tests that all references to the object are invalidated
87         # before any of the callbacks are invoked, so that we only
88         # have one invocation of _weakref.c:cleanup_helper() active
89         # for a particular object at a time.
90         #
91         def callback(object, self=self):
92             self.ref()
93         c = C()
94         self.ref = weakref.ref(c, callback)
95         ref1 = weakref.ref(c, callback)
96         del c
97
98     def test_proxy_ref(self):
99         o = C()
100         o.bar = 1
101         ref1 = weakref.proxy(o, self.callback)
102         ref2 = weakref.proxy(o, self.callback)
103         del o
104
105         def check(proxy):
106             proxy.bar
107
108         self.assertRaises(weakref.ReferenceError, check, ref1)
109         self.assertRaises(weakref.ReferenceError, check, ref2)
110         self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
111         self.assert_(self.cbcalled == 2)
112
113     def check_basic_ref(self, factory):
114         o = factory()
115         ref = weakref.ref(o)
116         self.assert_(ref() is not None,
117                      "weak reference to live object should be live")
118         o2 = ref()
119         self.assert_(o is o2,
120                      "<ref>() should return original object if live")
121
122     def check_basic_callback(self, factory):
123         self.cbcalled = 0
124         o = factory()
125         ref = weakref.ref(o, self.callback)
126         del o
127         self.assert_(self.cbcalled == 1,
128                      "callback did not properly set 'cbcalled'")
129         self.assert_(ref() is None,
130                      "ref2 should be dead after deleting object reference")
131
132     def test_ref_reuse(self):
133         o = C()
134         ref1 = weakref.ref(o)
135         # create a proxy to make sure that there's an intervening creation
136         # between these two; it should make no difference
137         proxy = weakref.proxy(o)
138         ref2 = weakref.ref(o)
139         self.assert_(ref1 is ref2,
140                      "reference object w/out callback should be re-used")
141
142         o = C()
143         proxy = weakref.proxy(o)
144         ref1 = weakref.ref(o)
145         ref2 = weakref.ref(o)
146         self.assert_(ref1 is ref2,
147                      "reference object w/out callback should be re-used")
148         self.assert_(weakref.getweakrefcount(o) == 2,
149                      "wrong weak ref count for object")
150         del proxy
151         self.assert_(weakref.getweakrefcount(o) == 1,
152                      "wrong weak ref count for object after deleting proxy")
153
154     def test_proxy_reuse(self):
155         o = C()
156         proxy1 = weakref.proxy(o)
157         ref = weakref.ref(o)
158         proxy2 = weakref.proxy(o)
159         self.assert_(proxy1 is proxy2,
160                      "proxy object w/out callback should have been re-used")
161
162     def test_basic_proxy(self):
163         o = C()
164         self.check_proxy(o, weakref.proxy(o))
165
166         L = UserList.UserList()
167         p = weakref.proxy(L)
168         self.failIf(p, "proxy for empty UserList should be false")
169         p.append(12)
170         self.assertEqual(len(L), 1)
171         self.failUnless(p, "proxy for non-empty UserList should be true")
172         p[:] = [2, 3]
173         self.assertEqual(len(L), 2)
174         self.assertEqual(len(p), 2)
175         self.failUnless(3 in p,
176                         "proxy didn't support __contains__() properly")
177         p[1] = 5
178         self.assertEqual(L[1], 5)
179         self.assertEqual(p[1], 5)
180         L2 = UserList.UserList(L)
181         p2 = weakref.proxy(L2)
182         self.assertEqual(p, p2)
183         ## self.assertEqual(repr(L2), repr(p2))
184         L3 = UserList.UserList(range(10))
185         p3 = weakref.proxy(L3)
186         self.assertEqual(L3[:], p3[:])
187         self.assertEqual(L3[5:], p3[5:])
188         self.assertEqual(L3[:5], p3[:5])
189         self.assertEqual(L3[2:5], p3[2:5])
190
191     def test_proxy_index(self):
192         class C:
193             def __index__(self):
194                 return 10
195         o = C()
196         p = weakref.proxy(o)
197         self.assertEqual(operator.index(p), 10)
198
199     def test_proxy_div(self):
200         class C:
201             def __floordiv__(self, other):
202                 return 42
203             def __ifloordiv__(self, other):
204                 return 21
205         o = C()
206         p = weakref.proxy(o)
207         self.assertEqual(p // 5, 42)
208         p //= 5
209         self.assertEqual(p, 21)
210
211     # The PyWeakref_* C API is documented as allowing either NULL or
212     # None as the value for the callback, where either means "no
213     # callback".  The "no callback" ref and proxy objects are supposed
214     # to be shared so long as they exist by all callers so long as
215     # they are active.  In Python 2.3.3 and earlier, this guarantee
216     # was not honored, and was broken in different ways for
217     # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
218
219     def test_shared_ref_without_callback(self):
220         self.check_shared_without_callback(weakref.ref)
221
222     def test_shared_proxy_without_callback(self):
223         self.check_shared_without_callback(weakref.proxy)
224
225     def check_shared_without_callback(self, makeref):
226         o = Object(1)
227         p1 = makeref(o, None)
228         p2 = makeref(o, None)
229         self.assert_(p1 is p2, "both callbacks were None in the C API")
230         del p1, p2
231         p1 = makeref(o)
232         p2 = makeref(o, None)
233         self.assert_(p1 is p2, "callbacks were NULL, None in the C API")
234         del p1, p2
235         p1 = makeref(o)
236         p2 = makeref(o)
237         self.assert_(p1 is p2, "both callbacks were NULL in the C API")
238         del p1, p2
239         p1 = makeref(o, None)
240         p2 = makeref(o)
241         self.assert_(p1 is p2, "callbacks were None, NULL in the C API")
242
243     def test_callable_proxy(self):
244         o = Callable()
245         ref1 = weakref.proxy(o)
246
247         self.check_proxy(o, ref1)
248
249         self.assert_(type(ref1) is weakref.CallableProxyType,
250                      "proxy is not of callable type")
251         ref1('twinkies!')
252         self.assert_(o.bar == 'twinkies!',
253                      "call through proxy not passed through to original")
254         ref1(x='Splat.')
255         self.assert_(o.bar == 'Splat.',
256                      "call through proxy not passed through to original")
257
258         # expect due to too few args
259         self.assertRaises(TypeError, ref1)
260
261         # expect due to too many args
262         self.assertRaises(TypeError, ref1, 1, 2, 3)
263
264     def check_proxy(self, o, proxy):
265         o.foo = 1
266         self.assert_(proxy.foo == 1,
267                      "proxy does not reflect attribute addition")
268         o.foo = 2
269         self.assert_(proxy.foo == 2,
270                      "proxy does not reflect attribute modification")
271         del o.foo
272         self.assert_(not hasattr(proxy, 'foo'),
273                      "proxy does not reflect attribute removal")
274
275         proxy.foo = 1
276         self.assert_(o.foo == 1,
277                      "object does not reflect attribute addition via proxy")
278         proxy.foo = 2
279         self.assert_(
280             o.foo == 2,
281             "object does not reflect attribute modification via proxy")
282         del proxy.foo
283         self.assert_(not hasattr(o, 'foo'),
284                      "object does not reflect attribute removal via proxy")
285
286     def test_proxy_deletion(self):
287         # Test clearing of SF bug #762891
288         class Foo:
289             result = None
290             def __delitem__(self, accessor):
291                 self.result = accessor
292         g = Foo()
293         f = weakref.proxy(g)
294         del f[0]
295         self.assertEqual(f.result, 0)
296
297     def test_proxy_bool(self):
298         # Test clearing of SF bug #1170766
299         class List(list): pass
300         lyst = List()
301         self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
302
303     def test_getweakrefcount(self):
304         o = C()
305         ref1 = weakref.ref(o)
306         ref2 = weakref.ref(o, self.callback)
307         self.assert_(weakref.getweakrefcount(o) == 2,
308                      "got wrong number of weak reference objects")
309
310         proxy1 = weakref.proxy(o)
311         proxy2 = weakref.proxy(o, self.callback)
312         self.assert_(weakref.getweakrefcount(o) == 4,
313                      "got wrong number of weak reference objects")
314
315         del ref1, ref2, proxy1, proxy2
316         self.assert_(weakref.getweakrefcount(o) == 0,
317                      "weak reference objects not unlinked from"
318                      " referent when discarded.")
319
320         # assumes ints do not support weakrefs
321         self.assert_(weakref.getweakrefcount(1) == 0,
322                      "got wrong number of weak reference objects for int")
323
324     def test_getweakrefs(self):
325         o = C()
326         ref1 = weakref.ref(o, self.callback)
327         ref2 = weakref.ref(o, self.callback)
328         del ref1
329         self.assert_(weakref.getweakrefs(o) == [ref2],
330                      "list of refs does not match")
331
332         o = C()
333         ref1 = weakref.ref(o, self.callback)
334         ref2 = weakref.ref(o, self.callback)
335         del ref2
336         self.assert_(weakref.getweakrefs(o) == [ref1],
337                      "list of refs does not match")
338
339         del ref1
340         self.assert_(weakref.getweakrefs(o) == [],
341                      "list of refs not cleared")
342
343         # assumes ints do not support weakrefs
344         self.assert_(weakref.getweakrefs(1) == [],
345                      "list of refs does not match for int")
346
347     def test_newstyle_number_ops(self):
348         class F(float):
349             pass
350         f = F(2.0)
351         p = weakref.proxy(f)
352         self.assert_(p + 1.0 == 3.0)
353         self.assert_(1.0 + p == 3.0)  # this used to SEGV
354
355     def test_callbacks_protected(self):
356         # Callbacks protected from already-set exceptions?
357         # Regression test for SF bug #478534.
358         class BogusError(Exception):
359             pass
360         data = {}
361         def remove(k):
362             del data[k]
363         def encapsulate():
364             f = lambda : ()
365             data[weakref.ref(f, remove)] = None
366             raise BogusError
367         try:
368             encapsulate()
369         except BogusError:
370             pass
371         else:
372             self.fail("exception not properly restored")
373         try:
374             encapsulate()
375         except BogusError:
376             pass
377         else:
378             self.fail("exception not properly restored")
379
380     def test_sf_bug_840829(self):
381         # "weakref callbacks and gc corrupt memory"
382         # subtype_dealloc erroneously exposed a new-style instance
383         # already in the process of getting deallocated to gc,
384         # causing double-deallocation if the instance had a weakref
385         # callback that triggered gc.
386         # If the bug exists, there probably won't be an obvious symptom
387         # in a release build.  In a debug build, a segfault will occur
388         # when the second attempt to remove the instance from the "list
389         # of all objects" occurs.
390
391         import gc
392
393         class C(object):
394             pass
395
396         c = C()
397         wr = weakref.ref(c, lambda ignore: gc.collect())
398         del c
399
400         # There endeth the first part.  It gets worse.
401         del wr
402
403         c1 = C()
404         c1.i = C()
405         wr = weakref.ref(c1.i, lambda ignore: gc.collect())
406
407         c2 = C()
408         c2.c1 = c1
409         del c1  # still alive because c2 points to it
410
411         # Now when subtype_dealloc gets called on c2, it's not enough just
412         # that c2 is immune from gc while the weakref callbacks associated
413         # with c2 execute (there are none in this 2nd half of the test, btw).
414         # subtype_dealloc goes on to call the base classes' deallocs too,
415         # so any gc triggered by weakref callbacks associated with anything
416         # torn down by a base class dealloc can also trigger double
417         # deallocation of c2.
418         del c2
419
420     def test_callback_in_cycle_1(self):
421         import gc
422
423         class J(object):
424             pass
425
426         class II(object):
427             def acallback(self, ignore):
428                 self.J
429
430         I = II()
431         I.J = J
432         I.wr = weakref.ref(J, I.acallback)
433
434         # Now J and II are each in a self-cycle (as all new-style class
435         # objects are, since their __mro__ points back to them).  I holds
436         # both a weak reference (I.wr) and a strong reference (I.J) to class
437         # J.  I is also in a cycle (I.wr points to a weakref that references
438         # I.acallback).  When we del these three, they all become trash, but
439         # the cycles prevent any of them from getting cleaned up immediately.
440         # Instead they have to wait for cyclic gc to deduce that they're
441         # trash.
442         #
443         # gc used to call tp_clear on all of them, and the order in which
444         # it does that is pretty accidental.  The exact order in which we
445         # built up these things manages to provoke gc into running tp_clear
446         # in just the right order (I last).  Calling tp_clear on II leaves
447         # behind an insane class object (its __mro__ becomes NULL).  Calling
448         # tp_clear on J breaks its self-cycle, but J doesn't get deleted
449         # just then because of the strong reference from I.J.  Calling
450         # tp_clear on I starts to clear I's __dict__, and just happens to
451         # clear I.J first -- I.wr is still intact.  That removes the last
452         # reference to J, which triggers the weakref callback.  The callback
453         # tries to do "self.J", and instances of new-style classes look up
454         # attributes ("J") in the class dict first.  The class (II) wants to
455         # search II.__mro__, but that's NULL.   The result was a segfault in
456         # a release build, and an assert failure in a debug build.
457         del I, J, II
458         gc.collect()
459
460     def test_callback_in_cycle_2(self):
461         import gc
462
463         # This is just like test_callback_in_cycle_1, except that II is an
464         # old-style class.  The symptom is different then:  an instance of an
465         # old-style class looks in its own __dict__ first.  'J' happens to
466         # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
467         # __dict__, so the attribute isn't found.  The difference is that
468         # the old-style II doesn't have a NULL __mro__ (it doesn't have any
469         # __mro__), so no segfault occurs.  Instead it got:
470         #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
471         #    Exception exceptions.AttributeError:
472         #   "II instance has no attribute 'J'" in <bound method II.acallback
473         #       of <?.II instance at 0x00B9B4B8>> ignored
474
475         class J(object):
476             pass
477
478         class II:
479             def acallback(self, ignore):
480                 self.J
481
482         I = II()
483         I.J = J
484         I.wr = weakref.ref(J, I.acallback)
485
486         del I, J, II
487         gc.collect()
488
489     def test_callback_in_cycle_3(self):
490         import gc
491
492         # This one broke the first patch that fixed the last two.  In this
493         # case, the objects reachable from the callback aren't also reachable
494         # from the object (c1) *triggering* the callback:  you can get to
495         # c1 from c2, but not vice-versa.  The result was that c2's __dict__
496         # got tp_clear'ed by the time the c2.cb callback got invoked.
497
498         class C:
499             def cb(self, ignore):
500                 self.me
501                 self.c1
502                 self.wr
503
504         c1, c2 = C(), C()
505
506         c2.me = c2
507         c2.c1 = c1
508         c2.wr = weakref.ref(c1, c2.cb)
509
510         del c1, c2
511         gc.collect()
512
513     def test_callback_in_cycle_4(self):
514         import gc
515
516         # Like test_callback_in_cycle_3, except c2 and c1 have different
517         # classes.  c2's class (C) isn't reachable from c1 then, so protecting
518         # objects reachable from the dying object (c1) isn't enough to stop
519         # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
520         # The result was a segfault (C.__mro__ was NULL when the callback
521         # tried to look up self.me).
522
523         class C(object):
524             def cb(self, ignore):
525                 self.me
526                 self.c1
527                 self.wr
528
529         class D:
530             pass
531
532         c1, c2 = D(), C()
533
534         c2.me = c2
535         c2.c1 = c1
536         c2.wr = weakref.ref(c1, c2.cb)
537
538         del c1, c2, C, D
539         gc.collect()
540
541     def test_callback_in_cycle_resurrection(self):
542         import gc
543
544         # Do something nasty in a weakref callback:  resurrect objects
545         # from dead cycles.  For this to be attempted, the weakref and
546         # its callback must also be part of the cyclic trash (else the
547         # objects reachable via the callback couldn't be in cyclic trash
548         # to begin with -- the callback would act like an external root).
549         # But gc clears trash weakrefs with callbacks early now, which
550         # disables the callbacks, so the callbacks shouldn't get called
551         # at all (and so nothing actually gets resurrected).
552
553         alist = []
554         class C(object):
555             def __init__(self, value):
556                 self.attribute = value
557
558             def acallback(self, ignore):
559                 alist.append(self.c)
560
561         c1, c2 = C(1), C(2)
562         c1.c = c2
563         c2.c = c1
564         c1.wr = weakref.ref(c2, c1.acallback)
565         c2.wr = weakref.ref(c1, c2.acallback)
566
567         def C_went_away(ignore):
568             alist.append("C went away")
569         wr = weakref.ref(C, C_went_away)
570
571         del c1, c2, C   # make them all trash
572         self.assertEqual(alist, [])  # del isn't enough to reclaim anything
573
574         gc.collect()
575         # c1.wr and c2.wr were part of the cyclic trash, so should have
576         # been cleared without their callbacks executing.  OTOH, the weakref
577         # to C is bound to a function local (wr), and wasn't trash, so that
578         # callback should have been invoked when C went away.
579         self.assertEqual(alist, ["C went away"])
580         # The remaining weakref should be dead now (its callback ran).
581         self.assertEqual(wr(), None)
582
583         del alist[:]
584         gc.collect()
585         self.assertEqual(alist, [])
586
587     def test_callbacks_on_callback(self):
588         import gc
589
590         # Set up weakref callbacks *on* weakref callbacks.
591         alist = []
592         def safe_callback(ignore):
593             alist.append("safe_callback called")
594
595         class C(object):
596             def cb(self, ignore):
597                 alist.append("cb called")
598
599         c, d = C(), C()
600         c.other = d
601         d.other = c
602         callback = c.cb
603         c.wr = weakref.ref(d, callback)     # this won't trigger
604         d.wr = weakref.ref(callback, d.cb)  # ditto
605         external_wr = weakref.ref(callback, safe_callback)  # but this will
606         self.assert_(external_wr() is callback)
607
608         # The weakrefs attached to c and d should get cleared, so that
609         # C.cb is never called.  But external_wr isn't part of the cyclic
610         # trash, and no cyclic trash is reachable from it, so safe_callback
611         # should get invoked when the bound method object callback (c.cb)
612         # -- which is itself a callback, and also part of the cyclic trash --
613         # gets reclaimed at the end of gc.
614
615         del callback, c, d, C
616         self.assertEqual(alist, [])  # del isn't enough to clean up cycles
617         gc.collect()
618         self.assertEqual(alist, ["safe_callback called"])
619         self.assertEqual(external_wr(), None)
620
621         del alist[:]
622         gc.collect()
623         self.assertEqual(alist, [])
624
625     def test_gc_during_ref_creation(self):
626         self.check_gc_during_creation(weakref.ref)
627
628     def test_gc_during_proxy_creation(self):
629         self.check_gc_during_creation(weakref.proxy)
630
631     def check_gc_during_creation(self, makeref):
632         thresholds = gc.get_threshold()
633         gc.set_threshold(1, 1, 1)
634         gc.collect()
635         class A:
636             pass
637
638         def callback(*args):
639             pass
640
641         referenced = A()
642
643         a = A()
644         a.a = a
645         a.wr = makeref(referenced)
646
647         try:
648             # now make sure the object and the ref get labeled as
649             # cyclic trash:
650             a = A()
651             weakref.ref(referenced, callback)
652
653         finally:
654             gc.set_threshold(*thresholds)
655
656     def test_ref_created_during_del(self):
657         # Bug #1377858
658         # A weakref created in an object's __del__() would crash the
659         # interpreter when the weakref was cleaned up since it would refer to
660         # non-existent memory.  This test should not segfault the interpreter.
661         class Target(object):
662             def __del__(self):
663                 global ref_from_del
664                 ref_from_del = weakref.ref(self)
665
666         w = Target()
667
668     def test_init(self):
669         # Issue 3634
670         # <weakref to class>.__init__() doesn't check errors correctly
671         r = weakref.ref(Exception)
672         self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
673         # No exception should be raised here
674         gc.collect()
675
676
677 class SubclassableWeakrefTestCase(TestBase):
678
679     def test_subclass_refs(self):
680         class MyRef(weakref.ref):
681             def __init__(self, ob, callback=None, value=42):
682                 self.value = value
683                 super(MyRef, self).__init__(ob, callback)
684             def __call__(self):
685                 self.called = True
686                 return super(MyRef, self).__call__()
687         o = Object("foo")
688         mr = MyRef(o, value=24)
689         self.assert_(mr() is o)
690         self.assert_(mr.called)
691         self.assertEqual(mr.value, 24)
692         del o
693         self.assert_(mr() is None)
694         self.assert_(mr.called)
695
696     def test_subclass_refs_dont_replace_standard_refs(self):
697         class MyRef(weakref.ref):
698             pass
699         o = Object(42)
700         r1 = MyRef(o)
701         r2 = weakref.ref(o)
702         self.assert_(r1 is not r2)
703         self.assertEqual(weakref.getweakrefs(o), [r2, r1])
704         self.assertEqual(weakref.getweakrefcount(o), 2)
705         r3 = MyRef(o)
706         self.assertEqual(weakref.getweakrefcount(o), 3)
707         refs = weakref.getweakrefs(o)
708         self.assertEqual(len(refs), 3)
709         self.assert_(r2 is refs[0])
710         self.assert_(r1 in refs[1:])
711         self.assert_(r3 in refs[1:])
712
713     def test_subclass_refs_dont_conflate_callbacks(self):
714         class MyRef(weakref.ref):
715             pass
716         o = Object(42)
717         r1 = MyRef(o, id)
718         r2 = MyRef(o, str)
719         self.assert_(r1 is not r2)
720         refs = weakref.getweakrefs(o)
721         self.assert_(r1 in refs)
722         self.assert_(r2 in refs)
723
724     def test_subclass_refs_with_slots(self):
725         class MyRef(weakref.ref):
726             __slots__ = "slot1", "slot2"
727             def __new__(type, ob, callback, slot1, slot2):
728                 return weakref.ref.__new__(type, ob, callback)
729             def __init__(self, ob, callback, slot1, slot2):
730                 self.slot1 = slot1
731                 self.slot2 = slot2
732             def meth(self):
733                 return self.slot1 + self.slot2
734         o = Object(42)
735         r = MyRef(o, None, "abc", "def")
736         self.assertEqual(r.slot1, "abc")
737         self.assertEqual(r.slot2, "def")
738         self.assertEqual(r.meth(), "abcdef")
739         self.failIf(hasattr(r, "__dict__"))
740
741     def test_subclass_refs_with_cycle(self):
742         # Bug #3110
743         # An instance of a weakref subclass can have attributes.
744         # If such a weakref holds the only strong reference to the object,
745         # deleting the weakref will delete the object. In this case,
746         # the callback must not be called, because the ref object is
747         # being deleted.
748         class MyRef(weakref.ref):
749             pass
750
751         # Use a local callback, for "regrtest -R::"
752         # to detect refcounting problems
753         def callback(w):
754             self.cbcalled += 1
755
756         o = C()
757         r1 = MyRef(o, callback)
758         r1.o = o
759         del o
760
761         del r1 # Used to crash here
762
763         self.assertEqual(self.cbcalled, 0)
764
765         # Same test, with two weakrefs to the same object
766         # (since code paths are different)
767         o = C()
768         r1 = MyRef(o, callback)
769         r2 = MyRef(o, callback)
770         r1.r = r2
771         r2.o = o
772         del o
773         del r2
774
775         del r1 # Used to crash here
776
777         self.assertEqual(self.cbcalled, 0)
778
779
780 class Object:
781     def __init__(self, arg):
782         self.arg = arg
783     def __repr__(self):
784         return "<Object %r>" % self.arg
785
786
787 class MappingTestCase(TestBase):
788
789     COUNT = 10
790
791     def test_weak_values(self):
792         #
793         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
794         #
795         dict, objects = self.make_weak_valued_dict()
796         for o in objects:
797             self.assert_(weakref.getweakrefcount(o) == 1,
798                          "wrong number of weak references to %r!" % o)
799             self.assert_(o is dict[o.arg],
800                          "wrong object returned by weak dict!")
801         items1 = dict.items()
802         items2 = dict.copy().items()
803         items1.sort()
804         items2.sort()
805         self.assert_(items1 == items2,
806                      "cloning of weak-valued dictionary did not work!")
807         del items1, items2
808         self.assert_(len(dict) == self.COUNT)
809         del objects[0]
810         self.assert_(len(dict) == (self.COUNT - 1),
811                      "deleting object did not cause dictionary update")
812         del objects, o
813         self.assert_(len(dict) == 0,
814                      "deleting the values did not clear the dictionary")
815         # regression on SF bug #447152:
816         dict = weakref.WeakValueDictionary()
817         self.assertRaises(KeyError, dict.__getitem__, 1)
818         dict[2] = C()
819         self.assertRaises(KeyError, dict.__getitem__, 2)
820
821     def test_weak_keys(self):
822         #
823         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
824         #  len(d), d.has_key().
825         #
826         dict, objects = self.make_weak_keyed_dict()
827         for o in objects:
828             self.assert_(weakref.getweakrefcount(o) == 1,
829                          "wrong number of weak references to %r!" % o)
830             self.assert_(o.arg is dict[o],
831                          "wrong object returned by weak dict!")
832         items1 = dict.items()
833         items2 = dict.copy().items()
834         self.assert_(set(items1) == set(items2),
835                      "cloning of weak-keyed dictionary did not work!")
836         del items1, items2
837         self.assert_(len(dict) == self.COUNT)
838         del objects[0]
839         self.assert_(len(dict) == (self.COUNT - 1),
840                      "deleting object did not cause dictionary update")
841         del objects, o
842         self.assert_(len(dict) == 0,
843                      "deleting the keys did not clear the dictionary")
844         o = Object(42)
845         dict[o] = "What is the meaning of the universe?"
846         self.assert_(dict.has_key(o))
847         self.assert_(not dict.has_key(34))
848
849     def test_weak_keyed_iters(self):
850         dict, objects = self.make_weak_keyed_dict()
851         self.check_iters(dict)
852
853         # Test keyrefs()
854         refs = dict.keyrefs()
855         self.assertEqual(len(refs), len(objects))
856         objects2 = list(objects)
857         for wr in refs:
858             ob = wr()
859             self.assert_(dict.has_key(ob))
860             self.assert_(ob in dict)
861             self.assertEqual(ob.arg, dict[ob])
862             objects2.remove(ob)
863         self.assertEqual(len(objects2), 0)
864
865         # Test iterkeyrefs()
866         objects2 = list(objects)
867         self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
868         for wr in dict.iterkeyrefs():
869             ob = wr()
870             self.assert_(dict.has_key(ob))
871             self.assert_(ob in dict)
872             self.assertEqual(ob.arg, dict[ob])
873             objects2.remove(ob)
874         self.assertEqual(len(objects2), 0)
875
876     def test_weak_valued_iters(self):
877         dict, objects = self.make_weak_valued_dict()
878         self.check_iters(dict)
879
880         # Test valuerefs()
881         refs = dict.valuerefs()
882         self.assertEqual(len(refs), len(objects))
883         objects2 = list(objects)
884         for wr in refs:
885             ob = wr()
886             self.assertEqual(ob, dict[ob.arg])
887             self.assertEqual(ob.arg, dict[ob.arg].arg)
888             objects2.remove(ob)
889         self.assertEqual(len(objects2), 0)
890
891         # Test itervaluerefs()
892         objects2 = list(objects)
893         self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
894         for wr in dict.itervaluerefs():
895             ob = wr()
896             self.assertEqual(ob, dict[ob.arg])
897             self.assertEqual(ob.arg, dict[ob.arg].arg)
898             objects2.remove(ob)
899         self.assertEqual(len(objects2), 0)
900
901     def check_iters(self, dict):
902         # item iterator:
903         items = dict.items()
904         for item in dict.iteritems():
905             items.remove(item)
906         self.assert_(len(items) == 0, "iteritems() did not touch all items")
907
908         # key iterator, via __iter__():
909         keys = dict.keys()
910         for k in dict:
911             keys.remove(k)
912         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
913
914         # key iterator, via iterkeys():
915         keys = dict.keys()
916         for k in dict.iterkeys():
917             keys.remove(k)
918         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
919
920         # value iterator:
921         values = dict.values()
922         for v in dict.itervalues():
923             values.remove(v)
924         self.assert_(len(values) == 0,
925                      "itervalues() did not touch all values")
926
927     def test_make_weak_keyed_dict_from_dict(self):
928         o = Object(3)
929         dict = weakref.WeakKeyDictionary({o:364})
930         self.assert_(dict[o] == 364)
931
932     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
933         o = Object(3)
934         dict = weakref.WeakKeyDictionary({o:364})
935         dict2 = weakref.WeakKeyDictionary(dict)
936         self.assert_(dict[o] == 364)
937
938     def make_weak_keyed_dict(self):
939         dict = weakref.WeakKeyDictionary()
940         objects = map(Object, range(self.COUNT))
941         for o in objects:
942             dict[o] = o.arg
943         return dict, objects
944
945     def make_weak_valued_dict(self):
946         dict = weakref.WeakValueDictionary()
947         objects = map(Object, range(self.COUNT))
948         for o in objects:
949             dict[o.arg] = o
950         return dict, objects
951
952     def check_popitem(self, klass, key1, value1, key2, value2):
953         weakdict = klass()
954         weakdict[key1] = value1
955         weakdict[key2] = value2
956         self.assert_(len(weakdict) == 2)
957         k, v = weakdict.popitem()
958         self.assert_(len(weakdict) == 1)
959         if k is key1:
960             self.assert_(v is value1)
961         else:
962             self.assert_(v is value2)
963         k, v = weakdict.popitem()
964         self.assert_(len(weakdict) == 0)
965         if k is key1:
966             self.assert_(v is value1)
967         else:
968             self.assert_(v is value2)
969
970     def test_weak_valued_dict_popitem(self):
971         self.check_popitem(weakref.WeakValueDictionary,
972                            "key1", C(), "key2", C())
973
974     def test_weak_keyed_dict_popitem(self):
975         self.check_popitem(weakref.WeakKeyDictionary,
976                            C(), "value 1", C(), "value 2")
977
978     def check_setdefault(self, klass, key, value1, value2):
979         self.assert_(value1 is not value2,
980                      "invalid test"
981                      " -- value parameters must be distinct objects")
982         weakdict = klass()
983         o = weakdict.setdefault(key, value1)
984         self.assert_(o is value1)
985         self.assert_(weakdict.has_key(key))
986         self.assert_(weakdict.get(key) is value1)
987         self.assert_(weakdict[key] is value1)
988
989         o = weakdict.setdefault(key, value2)
990         self.assert_(o is value1)
991         self.assert_(weakdict.has_key(key))
992         self.assert_(weakdict.get(key) is value1)
993         self.assert_(weakdict[key] is value1)
994
995     def test_weak_valued_dict_setdefault(self):
996         self.check_setdefault(weakref.WeakValueDictionary,
997                               "key", C(), C())
998
999     def test_weak_keyed_dict_setdefault(self):
1000         self.check_setdefault(weakref.WeakKeyDictionary,
1001                               C(), "value 1", "value 2")
1002
1003     def check_update(self, klass, dict):
1004         #
1005         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
1006         #  d.get(), d[].
1007         #
1008         weakdict = klass()
1009         weakdict.update(dict)
1010         self.assert_(len(weakdict) == len(dict))
1011         for k in weakdict.keys():
1012             self.assert_(dict.has_key(k),
1013                          "mysterious new key appeared in weak dict")
1014             v = dict.get(k)
1015             self.assert_(v is weakdict[k])
1016             self.assert_(v is weakdict.get(k))
1017         for k in dict.keys():
1018             self.assert_(weakdict.has_key(k),
1019                          "original key disappeared in weak dict")
1020             v = dict[k]
1021             self.assert_(v is weakdict[k])
1022             self.assert_(v is weakdict.get(k))
1023
1024     def test_weak_valued_dict_update(self):
1025         self.check_update(weakref.WeakValueDictionary,
1026                           {1: C(), 'a': C(), C(): C()})
1027
1028     def test_weak_keyed_dict_update(self):
1029         self.check_update(weakref.WeakKeyDictionary,
1030                           {C(): 1, C(): 2, C(): 3})
1031
1032     def test_weak_keyed_delitem(self):
1033         d = weakref.WeakKeyDictionary()
1034         o1 = Object('1')
1035         o2 = Object('2')
1036         d[o1] = 'something'
1037         d[o2] = 'something'
1038         self.assert_(len(d) == 2)
1039         del d[o1]
1040         self.assert_(len(d) == 1)
1041         self.assert_(d.keys() == [o2])
1042
1043     def test_weak_valued_delitem(self):
1044         d = weakref.WeakValueDictionary()
1045         o1 = Object('1')
1046         o2 = Object('2')
1047         d['something'] = o1
1048         d['something else'] = o2
1049         self.assert_(len(d) == 2)
1050         del d['something']
1051         self.assert_(len(d) == 1)
1052         self.assert_(d.items() == [('something else', o2)])
1053
1054     def test_weak_keyed_bad_delitem(self):
1055         d = weakref.WeakKeyDictionary()
1056         o = Object('1')
1057         # An attempt to delete an object that isn't there should raise
1058         # KeyError.  It didn't before 2.3.
1059         self.assertRaises(KeyError, d.__delitem__, o)
1060         self.assertRaises(KeyError, d.__getitem__, o)
1061
1062         # If a key isn't of a weakly referencable type, __getitem__ and
1063         # __setitem__ raise TypeError.  __delitem__ should too.
1064         self.assertRaises(TypeError, d.__delitem__,  13)
1065         self.assertRaises(TypeError, d.__getitem__,  13)
1066         self.assertRaises(TypeError, d.__setitem__,  13, 13)
1067
1068     def test_weak_keyed_cascading_deletes(self):
1069         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1070         # over the keys via self.data.iterkeys().  If things vanished from
1071         # the dict during this (or got added), that caused a RuntimeError.
1072
1073         d = weakref.WeakKeyDictionary()
1074         mutate = False
1075
1076         class C(object):
1077             def __init__(self, i):
1078                 self.value = i
1079             def __hash__(self):
1080                 return hash(self.value)
1081             def __eq__(self, other):
1082                 if mutate:
1083                     # Side effect that mutates the dict, by removing the
1084                     # last strong reference to a key.
1085                     del objs[-1]
1086                 return self.value == other.value
1087
1088         objs = [C(i) for i in range(4)]
1089         for o in objs:
1090             d[o] = o.value
1091         del o   # now the only strong references to keys are in objs
1092         # Find the order in which iterkeys sees the keys.
1093         objs = d.keys()
1094         # Reverse it, so that the iteration implementation of __delitem__
1095         # has to keep looping to find the first object we delete.
1096         objs.reverse()
1097
1098         # Turn on mutation in C.__eq__.  The first time thru the loop,
1099         # under the iterkeys() business the first comparison will delete
1100         # the last item iterkeys() would see, and that causes a
1101         #     RuntimeError: dictionary changed size during iteration
1102         # when the iterkeys() loop goes around to try comparing the next
1103         # key.  After this was fixed, it just deletes the last object *our*
1104         # "for o in obj" loop would have gotten to.
1105         mutate = True
1106         count = 0
1107         for o in objs:
1108             count += 1
1109             del d[o]
1110         self.assertEqual(len(d), 0)
1111         self.assertEqual(count, 2)
1112
1113 from test import mapping_tests
1114
1115 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1116     """Check that WeakValueDictionary conforms to the mapping protocol"""
1117     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1118     type2test = weakref.WeakValueDictionary
1119     def _reference(self):
1120         return self.__ref.copy()
1121
1122 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1123     """Check that WeakKeyDictionary conforms to the mapping protocol"""
1124     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1125     type2test = weakref.WeakKeyDictionary
1126     def _reference(self):
1127         return self.__ref.copy()
1128
1129 libreftest = """ Doctest for examples in the library reference: weakref.rst
1130
1131 >>> import weakref
1132 >>> class Dict(dict):
1133 ...     pass
1134 ...
1135 >>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1136 >>> r = weakref.ref(obj)
1137 >>> print r() is obj
1138 True
1139
1140 >>> import weakref
1141 >>> class Object:
1142 ...     pass
1143 ...
1144 >>> o = Object()
1145 >>> r = weakref.ref(o)
1146 >>> o2 = r()
1147 >>> o is o2
1148 True
1149 >>> del o, o2
1150 >>> print r()
1151 None
1152
1153 >>> import weakref
1154 >>> class ExtendedRef(weakref.ref):
1155 ...     def __init__(self, ob, callback=None, **annotations):
1156 ...         super(ExtendedRef, self).__init__(ob, callback)
1157 ...         self.__counter = 0
1158 ...         for k, v in annotations.iteritems():
1159 ...             setattr(self, k, v)
1160 ...     def __call__(self):
1161 ...         '''Return a pair containing the referent and the number of
1162 ...         times the reference has been called.
1163 ...         '''
1164 ...         ob = super(ExtendedRef, self).__call__()
1165 ...         if ob is not None:
1166 ...             self.__counter += 1
1167 ...             ob = (ob, self.__counter)
1168 ...         return ob
1169 ...
1170 >>> class A:   # not in docs from here, just testing the ExtendedRef
1171 ...     pass
1172 ...
1173 >>> a = A()
1174 >>> r = ExtendedRef(a, foo=1, bar="baz")
1175 >>> r.foo
1176 1
1177 >>> r.bar
1178 'baz'
1179 >>> r()[1]
1180 1
1181 >>> r()[1]
1182 2
1183 >>> r()[0] is a
1184 True
1185
1186
1187 >>> import weakref
1188 >>> _id2obj_dict = weakref.WeakValueDictionary()
1189 >>> def remember(obj):
1190 ...     oid = id(obj)
1191 ...     _id2obj_dict[oid] = obj
1192 ...     return oid
1193 ...
1194 >>> def id2obj(oid):
1195 ...     return _id2obj_dict[oid]
1196 ...
1197 >>> a = A()             # from here, just testing
1198 >>> a_id = remember(a)
1199 >>> id2obj(a_id) is a
1200 True
1201 >>> del a
1202 >>> try:
1203 ...     id2obj(a_id)
1204 ... except KeyError:
1205 ...     print 'OK'
1206 ... else:
1207 ...     print 'WeakValueDictionary error'
1208 OK
1209
1210 """
1211
1212 __test__ = {'libreftest' : libreftest}
1213
1214 def test_main():
1215     test_support.run_unittest(
1216         ReferencesTestCase,
1217         MappingTestCase,
1218         WeakValueDictionaryTestCase,
1219         WeakKeyDictionaryTestCase,
1220         SubclassableWeakrefTestCase,
1221         )
1222     test_support.run_doctest(sys.modules[__name__])
1223
1224
1225 if __name__ == "__main__":
1226     test_main()