Reimplemented corolocal so as to implement __init__, local function calling, and, amazing, also leak fixing. Fixes #41.
This commit is contained in:
@@ -1,30 +1,50 @@
|
|||||||
|
import weakref
|
||||||
|
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
|
|
||||||
|
__all__ = ['get_ident', 'local']
|
||||||
|
|
||||||
def get_ident():
|
def get_ident():
|
||||||
""" Returns ``id()`` of current greenlet. Useful for debugging."""
|
""" Returns ``id()`` of current greenlet. Useful for debugging."""
|
||||||
return id(greenthread.getcurrent())
|
return id(greenthread.getcurrent())
|
||||||
|
|
||||||
# TODO: The base threadlocal class wants to call __init__ on itself for every new thread that associates with it; our corolocal doesn't do this, but should for 100% compatibility. The implementation in _threading_local.py is so janky....
|
|
||||||
class local(object):
|
|
||||||
"""Coroutine equivalent of threading.local class."""
|
|
||||||
def __getattribute__(self, attr, g=get_ident):
|
|
||||||
try:
|
|
||||||
d = object.__getattribute__(self, '__dict__')
|
|
||||||
return d.setdefault('__objs', {})[g()][attr]
|
|
||||||
except KeyError:
|
|
||||||
raise AttributeError(
|
|
||||||
"No variable %s defined for the thread %s"
|
|
||||||
% (attr, g()))
|
|
||||||
|
|
||||||
def __setattr__(self, attr, value, g=get_ident):
|
# the entire purpose of this class is to store off the constructor
|
||||||
d = object.__getattribute__(self, '__dict__')
|
# arguments in a local variable without calling __init__ directly
|
||||||
d.setdefault('__objs', {}).setdefault(g(), {})[attr] = value
|
class _localbase(object):
|
||||||
|
__slots__ = '_local__args', '_local__greens'
|
||||||
|
def __new__(cls, *args, **kw):
|
||||||
|
self = object.__new__(cls)
|
||||||
|
object.__setattr__(self, '_local__args', (args, kw))
|
||||||
|
object.__setattr__(self, '_local__greens', weakref.WeakKeyDictionary())
|
||||||
|
if (args or kw) and (cls.__init__ is object.__init__):
|
||||||
|
raise TypeError("Initialization arguments are not supported")
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _patch(thrl):
|
||||||
|
greens = object.__getattribute__(thrl, '_local__greens')
|
||||||
|
# until we can store the localdict on greenlets themselves,
|
||||||
|
# we store it in _local__greens on the local object
|
||||||
|
cur = greenthread.getcurrent()
|
||||||
|
if cur not in greens:
|
||||||
|
# must be the first time we've seen this greenlet, call __init__
|
||||||
|
greens[cur] = {}
|
||||||
|
cls = type(thrl)
|
||||||
|
if cls.__init__ is not object.__init__:
|
||||||
|
args, kw = object.__getattribute__(thrl, '_local__args')
|
||||||
|
thrl.__init__(*args, **kw)
|
||||||
|
object.__setattr__(thrl, '__dict__', greens[cur])
|
||||||
|
|
||||||
|
|
||||||
def __delattr__(self, attr, g=get_ident):
|
class local(_localbase):
|
||||||
try:
|
def __getattribute__(self, attr):
|
||||||
d = object.__getattribute__(self, '__dict__')
|
_patch(self)
|
||||||
del d.setdefault('__objs', {})[g()][attr]
|
return object.__getattribute__(self, attr)
|
||||||
except KeyError:
|
|
||||||
raise AttributeError(
|
def __setattr__(self, attr, value):
|
||||||
"No variable %s defined for thread %s"
|
_patch(self)
|
||||||
% (attr, g()))
|
return object.__setattr__(self, attr, value)
|
||||||
|
|
||||||
|
def __delattr__(self, attr):
|
||||||
|
_patch(self)
|
||||||
|
return object.__delattr__(self, attr)
|
||||||
|
@@ -1,9 +1,11 @@
|
|||||||
|
import weakref
|
||||||
from eventlet.green import thread
|
from eventlet.green import thread
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
from eventlet import event
|
from eventlet import event
|
||||||
import eventlet
|
import eventlet
|
||||||
|
from eventlet import corolocal
|
||||||
|
|
||||||
from tests import LimitedTestCase
|
from tests import LimitedTestCase, skipped
|
||||||
|
|
||||||
class Locals(LimitedTestCase):
|
class Locals(LimitedTestCase):
|
||||||
def passthru(self, *args, **kw):
|
def passthru(self, *args, **kw):
|
||||||
@@ -18,6 +20,7 @@ class Locals(LimitedTestCase):
|
|||||||
self.results = []
|
self.results = []
|
||||||
super(Locals, self).tearDown()
|
super(Locals, self).tearDown()
|
||||||
|
|
||||||
|
@skipped # cause it relies on internal details of corolocal that are no longer true
|
||||||
def test_simple(self):
|
def test_simple(self):
|
||||||
tls = thread._local()
|
tls = thread._local()
|
||||||
g_ids = []
|
g_ids = []
|
||||||
@@ -37,3 +40,72 @@ class Locals(LimitedTestCase):
|
|||||||
self.failUnlessRaises(AttributeError, lambda: tls.value)
|
self.failUnlessRaises(AttributeError, lambda: tls.value)
|
||||||
evt.send("done")
|
evt.send("done")
|
||||||
eventlet.sleep()
|
eventlet.sleep()
|
||||||
|
|
||||||
|
def test_assignment(self):
|
||||||
|
my_local = corolocal.local()
|
||||||
|
my_local.a = 1
|
||||||
|
def do_something():
|
||||||
|
my_local.b = 2
|
||||||
|
self.assertEqual(my_local.b, 2)
|
||||||
|
try:
|
||||||
|
my_local.a
|
||||||
|
self.fail()
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
eventlet.spawn(do_something).wait()
|
||||||
|
self.assertEqual(my_local.a, 1)
|
||||||
|
|
||||||
|
def test_calls_init(self):
|
||||||
|
init_args = []
|
||||||
|
class Init(corolocal.local):
|
||||||
|
def __init__(self, *args):
|
||||||
|
init_args.append((args, eventlet.getcurrent()))
|
||||||
|
|
||||||
|
my_local = Init(1,2,3)
|
||||||
|
self.assertEqual(init_args[0][0], (1,2,3))
|
||||||
|
self.assertEqual(init_args[0][1], eventlet.getcurrent())
|
||||||
|
|
||||||
|
def do_something():
|
||||||
|
my_local.foo = 'bar'
|
||||||
|
self.assertEqual(len(init_args), 2, init_args)
|
||||||
|
self.assertEqual(init_args[1][0], (1,2,3))
|
||||||
|
self.assertEqual(init_args[1][1], eventlet.getcurrent())
|
||||||
|
|
||||||
|
eventlet.spawn(do_something).wait()
|
||||||
|
|
||||||
|
def test_calling_methods(self):
|
||||||
|
class Caller(corolocal.local):
|
||||||
|
def callme(self):
|
||||||
|
return self.foo
|
||||||
|
|
||||||
|
my_local = Caller()
|
||||||
|
my_local.foo = "foo1"
|
||||||
|
self.assertEquals("foo1", my_local.callme())
|
||||||
|
|
||||||
|
def do_something():
|
||||||
|
my_local.foo = "foo2"
|
||||||
|
self.assertEquals("foo2", my_local.callme())
|
||||||
|
|
||||||
|
eventlet.spawn(do_something).wait()
|
||||||
|
|
||||||
|
my_local.foo = "foo3"
|
||||||
|
self.assertEquals("foo3", my_local.callme())
|
||||||
|
|
||||||
|
def test_no_leaking(self):
|
||||||
|
refs = weakref.WeakKeyDictionary()
|
||||||
|
my_local = corolocal.local()
|
||||||
|
class X(object):
|
||||||
|
pass
|
||||||
|
def do_something(i):
|
||||||
|
o = X()
|
||||||
|
refs[o] = True
|
||||||
|
my_local.foo = o
|
||||||
|
|
||||||
|
p = eventlet.GreenPool()
|
||||||
|
for i in xrange(100):
|
||||||
|
p.spawn(do_something, i)
|
||||||
|
p.waitall()
|
||||||
|
del p
|
||||||
|
# at this point all our coros have terminated
|
||||||
|
self.assertEqual(len(refs), 1)
|
||||||
|
|
Reference in New Issue
Block a user