Fleshed out tpool tests, including a benchmark that evaluates how much overhead execute() adds. Fixed up tpool itself to pass the tests.
This commit is contained in:
@@ -26,6 +26,13 @@ from Queue import Empty, Queue
|
||||
from eventlet import api, coros, greenio
|
||||
from eventlet.api import trampoline, get_hub
|
||||
|
||||
try:
|
||||
set
|
||||
except NameError: # python 2.3 compatibility
|
||||
from sets import Set as set
|
||||
|
||||
QUIET=False
|
||||
|
||||
_rpipe, _wpipe = os.pipe()
|
||||
_rfile = os.fdopen(_rpipe,"r",0)
|
||||
## Work whether or not wrap_pipe_with_coroutine_pipe was called
|
||||
@@ -81,8 +88,9 @@ def erecv(e):
|
||||
if isinstance(rv,tuple) and len(rv) == 4 and isinstance(rv[0],Exception):
|
||||
import sys, traceback
|
||||
(e,a,b,tb) = rv
|
||||
traceback.print_exception(Exception,e,tb)
|
||||
traceback.print_stack()
|
||||
if not QUIET:
|
||||
traceback.print_exception(Exception,e,tb)
|
||||
traceback.print_stack()
|
||||
raise e
|
||||
return rv
|
||||
|
||||
@@ -100,6 +108,21 @@ erpc = execute
|
||||
|
||||
|
||||
|
||||
def proxy_call(autowrap, f, *args, **kwargs):
|
||||
""" Call a function *f* and returns the value. If the type of the
|
||||
return value is in the *autowrap* collection, then it is wrapped in
|
||||
a Proxy object before return. Normally *f* will be called
|
||||
nonblocking with the execute method; if the keyword argument
|
||||
"nonblocking" is set to true, it will simply be executed directly."""
|
||||
if kwargs.pop('nonblocking',False):
|
||||
rv = f(*args, **kwargs)
|
||||
else:
|
||||
rv = execute(f,*args,**kwargs)
|
||||
if type(rv) in autowrap:
|
||||
return Proxy(rv, autowrap)
|
||||
else:
|
||||
return rv
|
||||
|
||||
class Proxy(object):
|
||||
""" a simple proxy-wrapper of any object that comes with a methods-only interface,
|
||||
in order to forward every method invocation onto a thread in the native-thread pool.
|
||||
@@ -109,7 +132,7 @@ class Proxy(object):
|
||||
def __init__(self, obj,autowrap=()):
|
||||
self._obj = obj
|
||||
if isinstance(autowrap, (list, tuple)):
|
||||
autowrap = dict([(x, True) for x in autowrap])
|
||||
autowrap = set(autowrap)
|
||||
self._autowrap = autowrap
|
||||
|
||||
def __getattr__(self,attr_name):
|
||||
@@ -117,16 +140,35 @@ class Proxy(object):
|
||||
if not callable(f):
|
||||
return f
|
||||
def doit(*args, **kwargs):
|
||||
if kwargs.pop('nonblocking',False):
|
||||
rv = f(*args, **kwargs)
|
||||
else:
|
||||
rv = execute(f,*args,**kwargs)
|
||||
if type(rv) in self._autowrap:
|
||||
return Proxy(rv, self._autowrap)
|
||||
else:
|
||||
return rv
|
||||
return proxy_call(self._autowrap, f, *args, **kwargs)
|
||||
return doit
|
||||
|
||||
# the following are a buncha methods that the python interpeter
|
||||
# doesn't use getattr to retrieve and therefore have to be defined
|
||||
# explicitly
|
||||
def __getitem__(self, key):
|
||||
return proxy_call(self._autowrap, self._obj.__getitem__, key)
|
||||
def __setitem__(self, key, value):
|
||||
return proxy_call(self._autowrap, self._obj.__setitem__, key, value)
|
||||
def __deepcopy__(self, memo=None):
|
||||
return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
|
||||
def __copy__(self, memo=None):
|
||||
return proxy_call(self._autowrap, self._obj.__copy__, memo)
|
||||
# these don't go through a proxy call, because they're likely to
|
||||
# be called often, and are unlikely to be implemented on the
|
||||
# wrapped object in such a way that they would block
|
||||
def __eq__(self, rhs):
|
||||
return self._obj.__eq__(rhs)
|
||||
def __repr__(self):
|
||||
return self._obj.__repr__()
|
||||
def __str__(self):
|
||||
return self._obj.__str__()
|
||||
def __len__(self):
|
||||
return len(self._obj)
|
||||
def __nonzero__(self):
|
||||
return bool(self._obj)
|
||||
|
||||
|
||||
|
||||
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
|
||||
_threads = {}
|
||||
|
Reference in New Issue
Block a user