fixed various minor issues that pyflakes complained about, removed extraneous whitespace, wrapped long lines

This commit is contained in:
Tavis Rudd
2010-02-24 22:09:12 -08:00
parent a7eaa4b701
commit cc73169f5b
13 changed files with 248 additions and 238 deletions

View File

@@ -1,5 +1,4 @@
import collections
import time
import traceback
import warnings
@@ -16,34 +15,34 @@ class NOT_USED:
NOT_USED = NOT_USED()
def Event(*a, **kw):
warnings.warn("The Event class has been moved to the event module! "
"Please construct event.Event objects instead.",
warnings.warn("The Event class has been moved to the event module! "
"Please construct event.Event objects instead.",
DeprecationWarning, stacklevel=2)
return _event.Event(*a, **kw)
def event(*a, **kw):
warnings.warn("The event class has been capitalized and moved! Please "
"construct event.Event objects instead.",
"construct event.Event objects instead.",
DeprecationWarning, stacklevel=2)
return _event.Event(*a, **kw)
def Semaphore(count):
warnings.warn("The Semaphore class has moved! Please "
"use semaphore.Semaphore instead.",
"use semaphore.Semaphore instead.",
DeprecationWarning, stacklevel=2)
return semaphoremod.Semaphore(count)
def BoundedSemaphore(count):
warnings.warn("The BoundedSemaphore class has moved! Please "
"use semaphore.BoundedSemaphore instead.",
"use semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2)
return semaphoremod.BoundedSemaphore(count)
def semaphore(count=0, limit=None):
warnings.warn("coros.semaphore is deprecated. Please use either "
"semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
"semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2)
if limit is None:
return Semaphore(count)
@@ -136,7 +135,7 @@ class Queue(object):
def __init__(self):
warnings.warn("coros.Queue is deprecated. Please use "
"eventlet.queue.Queue instead.",
"eventlet.queue.Queue instead.",
DeprecationWarning, stacklevel=2)
self.items = collections.deque()
self._waiters = set()
@@ -148,7 +147,8 @@ class Queue(object):
return len(self.items)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), len(self.items), len(self._waiters))
params = (self.__class__.__name__, hex(id(self)),
len(self.items), len(self._waiters))
return '<%s at %s items[%d] _waiters[%s]>' % params
def send(self, result=None, exc=None):
@@ -195,10 +195,10 @@ class Queue(object):
def waiting(self):
return len(self._waiters)
def __iter__(self):
return self
def next(self):
return self.wait()
@@ -207,7 +207,7 @@ class Channel(object):
def __init__(self, max_size=0):
warnings.warn("coros.Channel is deprecated. Please use "
"eventlet.queue.Queue(0) instead.",
"eventlet.queue.Queue(0) instead.",
DeprecationWarning, stacklevel=2)
self.max_size = max_size
self.items = collections.deque()
@@ -221,7 +221,9 @@ class Channel(object):
return len(self.items)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.max_size, len(self.items), len(self._waiters), len(self._senders))
params = (self.__class__.__name__, hex(id(self)),
self.max_size, len(self.items),
len(self._waiters), len(self._senders))
return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
def send(self, result=None, exc=None):
@@ -323,7 +325,7 @@ class Actor(object):
to process concurrently. If it is 1, the actor will process messages
serially.
"""
warnings.warn("We're phasing out the Actor class, so as to get rid of"
warnings.warn("We're phasing out the Actor class, so as to get rid of"
"the coros module. If you use Actor, please speak up on "
"eventletdev@lists.secondlife.com, and we'll come up with a "
"transition plan. If no one speaks up, we'll remove Actor "
@@ -397,4 +399,3 @@ class Actor(object):
>>> eventlet.kill(a._killer) # test cleanup
"""
raise NotImplementedError()

View File

@@ -11,13 +11,13 @@ import os
import sys
import warnings
__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection',
__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection',
'ssl', 'socket']
__original_fromfd__ = __socket.fromfd
def fromfd(*args):
return socket(__original_fromfd__(*args))
return socket(__original_fromfd__(*args))
__original_socketpair__ = __socket.socketpair
def socketpair(*args):
one, two = __original_socketpair__(*args)
@@ -35,7 +35,7 @@ def gethostbyname(name):
globals()['gethostbyname'] = __original_gethostbyname__
else:
globals()['gethostbyname'] = _gethostbyname_tpool
return globals()['gethostbyname'](name)
def _gethostbyname_twisted(name):
@@ -51,7 +51,7 @@ def _gethostbyname_tpool(name):
# def getaddrinfo(*args, **kw):
# return tpool.execute(
# __socket.getaddrinfo, *args, **kw)
#
#
# XXX there're few more blocking functions in socket
# XXX having a hub-independent way to access thread pool would be nice
@@ -88,10 +88,10 @@ def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
def _convert_to_sslerror(ex):
""" Transliterates SSL.SysCallErrors to socket.sslerrors"""
return sslerror((ex[0], ex[1]))
class GreenSSLObject(object):
""" Wrapper object around the SSLObjects returned by socket.ssl, which have a
""" Wrapper object around the SSLObjects returned by socket.ssl, which have a
slightly different interface from SSL.Connection objects. """
def __init__(self, green_ssl_obj):
""" Should only be called by a 'green' socket.ssl """
@@ -106,7 +106,7 @@ class GreenSSLObject(object):
self.connection.do_handshake()
except _SSL.SysCallError, e:
raise _convert_to_sslerror(e)
def read(self, n=1024):
"""If n is provided, read n bytes from the SSL connection, otherwise read
until EOF. The return value is a string of the bytes read."""
@@ -116,9 +116,9 @@ class GreenSSLObject(object):
return ''
except _SSL.SysCallError, e:
raise _convert_to_sslerror(e)
def write(self, s):
"""Writes the string s to the on the object's SSL connection.
"""Writes the string s to the on the object's SSL connection.
The return value is the number of bytes written. """
try:
return self.connection.write(s)
@@ -130,13 +130,13 @@ class GreenSSLObject(object):
purposes; do not parse the content of this string because its format can't be
parsed unambiguously. """
return str(self.connection.get_peer_certificate().get_subject())
def issuer(self):
"""Returns a string describing the issuer of the server's certificate. Useful
for debugging purposes; do not parse the content of this string because its
for debugging purposes; do not parse the content of this string because its
format can't be parsed unambiguously."""
return str(self.connection.get_peer_certificate().get_issuer())
try:
try:

View File

@@ -2,7 +2,7 @@ from eventlet import patcher
from eventlet.green import thread
from eventlet.green import time
__patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep',
__patched__ = ['_start_new_thread', '_allocate_lock', '_get_ident', '_sleep',
'local', 'stack_size']
patcher.inject('threading',
@@ -21,6 +21,3 @@ def _patch_main_thread(mod):
curthread = mod._active.pop(mod._get_ident(), None)
if curthread:
mod._active[thread.get_ident()] = curthread
if __name__ == '__main__':
_test()

View File

@@ -1,7 +1,4 @@
import eventlet
from eventlet.hubs import trampoline
from eventlet.hubs import get_hub
BUFFER_SIZE = 4096
import errno
@@ -12,10 +9,6 @@ import sys
import time
import warnings
from errno import EWOULDBLOCK, EAGAIN
__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']
CONNECT_ERR = set((errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK))
@@ -302,7 +295,6 @@ class GreenSocket(object):
return total_sent
def sendall(self, data, flags=0):
fd = self.fd
tail = self.send(data, flags)
len_data = len(data)
while tail < len_data:
@@ -375,7 +367,7 @@ class GreenPipe(object):
try:
return fd.read(buflen)
except IOError, e:
if e[0] != EAGAIN:
if e[0] != errno.EAGAIN:
return ''
except socket.error, e:
if e[0] == errno.EPIPE:
@@ -407,7 +399,7 @@ class GreenPipe(object):
fd.flush()
return len(data)
except IOError, e:
if e[0] != EAGAIN:
if e[0] != errno.EAGAIN:
raise
except ValueError, e:
# what's this for?

View File

@@ -10,7 +10,7 @@ from eventlet.support import greenlets as greenlet
__all__ = ['GreenPool', 'GreenPile']
DEBUG = False
try:
next
except NameError:
@@ -28,20 +28,20 @@ class GreenPool(object):
self.coroutines_running = set()
self.sem = semaphore.Semaphore(size)
self.no_coros_running = event.Event()
def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time.
If resize is called when there are more than *new_size* greenthreads
already working on tasks, they will be allowed to complete but no new
tasks will be allowed to get launched until enough greenthreads finish
their tasks to drop the overall quantity below *new_size*. Until
If resize is called when there are more than *new_size* greenthreads
already working on tasks, they will be allowed to complete but no new
tasks will be allowed to get launched until enough greenthreads finish
their tasks to drop the overall quantity below *new_size*. Until
then, the return value of free() will be negative.
"""
size_delta = new_size - self.size
size_delta = new_size - self.size
self.sem.counter += size_delta
self.size = new_size
def running(self):
""" Returns the number of greenthreads that are currently executing
functions in the Parallel's pool."""
@@ -49,20 +49,20 @@ class GreenPool(object):
def free(self):
""" Returns the number of greenthreads available for use.
If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
block the calling greenthread until a slot becomes available."""
return self.sem.counter
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
If the pool is currently at capacity, ``spawn`` will block until one of
the running greenthreads completes its task and frees up a slot.
This function is reentrant; *function* can call ``spawn`` on the same
pool without risk of deadlocking the whole thing.
"""
@@ -82,7 +82,7 @@ class GreenPool(object):
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
def _spawn_n_impl(self, func, args, kwargs, coro):
try:
try:
@@ -98,10 +98,10 @@ class GreenPool(object):
else:
coro = greenthread.getcurrent()
self._spawn_done(coro)
def spawn_n(self, function, *args, **kwargs):
""" Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns
""" Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns
None; the results of *function* are not retrievable.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
@@ -111,7 +111,7 @@ class GreenPool(object):
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl,
g = greenthread.spawn_n(self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = event.Event()
@@ -121,7 +121,7 @@ class GreenPool(object):
"""Waits until all greenthreads in the pool are finished working."""
if self.running():
self.no_coros_running.wait()
def _spawn_done(self, coro):
self.sem.release()
if coro is not None:
@@ -130,25 +130,25 @@ class GreenPool(object):
# we can finish off any waitall() calls that might be pending
if self.sem.balance == self.size:
self.no_coros_running.send(None)
def waiting(self):
"""Return the number of greenthreads waiting to spawn.
"""
if self.sem.balance < 0:
return -self.sem.balance
else:
return 0
return 0
def _do_map(self, func, it, gi):
for args in it:
gi.spawn(func, *args)
gi.spawn(return_stop_iteration)
def starmap(self, function, iterable):
"""This is the same as :func:`itertools.starmap`, except that *func* is
executed in a separate green thread for each item, with the concurrency
limited by the pool's size. In operation, starmap consumes a constant
amount of memory, proportional to the size of the pool, and is thus
"""This is the same as :func:`itertools.starmap`, except that *func* is
executed in a separate green thread for each item, with the concurrency
limited by the pool's size. In operation, starmap consumes a constant
amount of memory, proportional to the size of the pool, and is thus
suited for iterating over extremely long input lists.
"""
if function is None:
@@ -163,22 +163,22 @@ class GreenPool(object):
"""
return self.starmap(function, itertools.izip(*iterables))
def return_stop_iteration():
return StopIteration()
class GreenPile(object):
"""GreenPile is an abstraction representing a bunch of I/O-related tasks.
Construct a GreenPile with an existing GreenPool object. The GreenPile will
then use that pool's concurrency as it processes its jobs. There can be
then use that pool's concurrency as it processes its jobs. There can be
many GreenPiles associated with a single GreenPool.
A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead
A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead
of a GreenPool.
It is not advisable to iterate over a GreenPile in a different greenthread
than the one which is calling spawn. The iterator will exit early in that
situation.
@@ -191,9 +191,9 @@ class GreenPile(object):
self.waiters = queue.LightQueue()
self.used = False
self.counter = 0
def spawn(self, func, *args, **kw):
"""Runs *func* in its own green thread, with the result available by
"""Runs *func* in its own green thread, with the result available by
iterating over the GreenPile object."""
self.used = True
self.counter += 1
@@ -203,10 +203,10 @@ class GreenPile(object):
except:
self.counter -= 1
raise
def __iter__(self):
return self
def next(self):
"""Wait for the next result, suspending the current greenthread until it
is available. Raises StopIteration when there are no more results."""
@@ -216,15 +216,15 @@ class GreenPile(object):
return self.waiters.get().wait()
finally:
self.counter -= 1
# this is identical to GreenPile but it blocks on spawn if the results
# this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed, and it doesn't generate its own StopIteration exception,
# instead relying on the spawning process to send one in when it's done
class GreenMap(GreenPile):
def __init__(self, size_or_pool):
super(GreenMap, self).__init__(size_or_pool)
self.waiters = queue.LightQueue(maxsize=self.pool.size)
def next(self):
try:
val = self.waiters.get().wait()

View File

@@ -21,8 +21,8 @@ class FdListener(object):
def __repr__(self):
return "%s(%r, %r, %r)" % (type(self).__name__, self.evtype, self.fileno, self.cb)
__str__ = __repr__
# in debug mode, track the call site that created the listener
class DebugListener(FdListener):
def __init__(self, evtype, fileno, cb):
@@ -30,20 +30,21 @@ class DebugListener(FdListener):
self.greenlet = greenlet.getcurrent()
super(DebugListener, self).__init__(evtype, fileno, cb)
def __repr__(self):
return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (self.evtype,
self.fileno,
self.cb,
self.greenlet,
''.join(self.where_called))
return "DebugListener(%r, %r, %r, %r)\n%sEndDebugFdListener" % (
self.evtype,
self.fileno,
self.cb,
self.greenlet,
''.join(self.where_called))
__str__ = __repr__
class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
READ = READ
WRITE = WRITE
@@ -58,7 +59,7 @@ class BaseHub(object):
self.next_timers = []
self.lclass = FdListener
self.debug_exceptions = True
def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor.
@@ -81,9 +82,9 @@ class BaseHub(object):
pass
if listener_list:
self.listeners[listener.evtype][listener.fileno] = listener_list
def remove_descriptor(self, fileno):
""" Completely remove all listeners for this fileno. For internal use
""" Completely remove all listeners for this fileno. For internal use
only."""
self.listeners[READ].pop(fileno, None)
self.listeners[WRITE].pop(fileno, None)
@@ -106,7 +107,7 @@ class BaseHub(object):
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
if self.greenlet.parent is not cur:
if self.greenlet.parent is not cur:
cur.parent = self.greenlet
except ValueError:
pass # gets raised if there is a greenlet parent cycle
@@ -231,8 +232,6 @@ class BaseHub(object):
t = self.timers
heappop = heapq.heappop
i = 0
while t:
next = t[0]
@@ -265,12 +264,12 @@ class BaseHub(object):
def get_timers_count(hub):
return max(len(hub.timers), len(hub.next_timers))
def set_debug_listeners(self, value):
if value:
self.lclass = DebugListener
else:
self.lclass = FdListener
def set_timer_exceptions(self, value):
self.debug_exceptions = value

View File

@@ -7,17 +7,17 @@ __exclude = set(('__builtins__', '__file__', '__name__'))
def inject(module_name, new_globals, *additional_modules):
"""Base method for "injecting" greened modules into an imported module. It
imports the module specified in *module_name*, arranging things so
that the already-imported modules in *additional_modules* are used when
imports the module specified in *module_name*, arranging things so
that the already-imported modules in *additional_modules* are used when
*module_name* makes its imports.
*new_globals* is either None or a globals dictionary that gets populated
*new_globals* is either None or a globals dictionary that gets populated
with the contents of the *module_name* module. This is useful when creating
a "green" version of some other module.
*additional_modules* should be a collection of two-element tuples, of the
form (<name>, <module>). If it's not specified, a default selection of
name/module pairs is used, which should cover all use cases but may be
form (<name>, <module>). If it's not specified, a default selection of
name/module pairs is used, which should cover all use cases but may be
slower because there are inevitably redundant or unnecessary imports.
"""
if not additional_modules:
@@ -26,16 +26,17 @@ def inject(module_name, new_globals, *additional_modules):
_green_os_modules() +
_green_select_modules() +
_green_socket_modules() +
_green_thread_modules() +
_green_thread_modules() +
_green_time_modules())
## Put the specified modules in sys.modules for the duration of the import
saved = {}
for name, mod in additional_modules:
saved[name] = sys.modules.get(name, None)
sys.modules[name] = mod
## Remove the old module from sys.modules and reimport it while the specified modules are in place
## Remove the old module from sys.modules and reimport it while
## the specified modules are in place
old_module = sys.modules.pop(module_name, None)
try:
module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
@@ -66,20 +67,20 @@ def inject(module_name, new_globals, *additional_modules):
def import_patched(module_name, *additional_modules, **kw_additional_modules):
"""Imports a module in a way that ensures that the module uses "green"
versions of the standard library modules, so that everything works
"""Imports a module in a way that ensures that the module uses "green"
versions of the standard library modules, so that everything works
nonblockingly.
The only required argument is the name of the module to be imported.
"""
return inject(
module_name,
None,
*additional_modules + tuple(kw_additional_modules.items()))
module_name,
None,
*additional_modules + tuple(kw_additional_modules.items()))
def patch_function(func, *additional_modules):
"""Huge hack here -- patches the specified modules for the
"""Huge hack here -- patches the specified modules for the
duration of the function call."""
def patched(*args, **kw):
saved = {}
@@ -96,7 +97,7 @@ def patch_function(func, *additional_modules):
else:
del sys.modules[name]
return patched
_originals = {}
def original(modname):
mod = _originals.get(modname)
@@ -113,18 +114,18 @@ def original(modname):
return _originals.get(modname)
already_patched = {}
def monkey_patch(all=True, os=False, select=False,
def monkey_patch(all=True, os=False, select=False,
socket=False, thread=False, time=False):
"""Globally patches certain system modules to be greenthread-friendly.
The keyword arguments afford some control over which modules are patched.
If *all* is True, then all modules are patched regardless of the other
If *all* is True, then all modules are patched regardless of the other
arguments. If it's False, then the rest of the keyword arguments control
patching of specific subsections of the standard library.
Most patch the single module of the same name (os, time,
select). The exceptions are socket, which also patches the ssl module if
present; and thread, which patches thread, threading, and Queue.
It's safe to call monkey_patch multiple times.
"""
modules_to_patch = []
@@ -135,7 +136,7 @@ def monkey_patch(all=True, os=False, select=False,
modules_to_patch += _green_select_modules()
already_patched['select'] = True
if all or socket and not already_patched.get('socket'):
modules_to_patch += _green_socket_modules()
modules_to_patch += _green_socket_modules()
already_patched['socket'] = True
if all or thread and not already_patched.get('thread'):
# hacks ahead
@@ -147,19 +148,20 @@ def monkey_patch(all=True, os=False, select=False,
if all or time and not already_patched.get('time'):
modules_to_patch += _green_time_modules()
already_patched['time'] = True
for name, mod in modules_to_patch:
orig_mod = sys.modules.get(name)
for attr in mod.__patched__:
orig_attr = getattr(orig_mod, attr, None)
patched_attr = getattr(mod, attr, None)
for attr_name in mod.__patched__:
#orig_attr = getattr(orig_mod, attr_name, None)
# @@tavis: line above wasn't used, not sure what author intended
patched_attr = getattr(mod, attr_name, None)
if patched_attr is not None:
setattr(orig_mod, attr, patched_attr)
setattr(orig_mod, attr_name, patched_attr)
def _green_os_modules():
from eventlet.green import os
return [('os', os)]
def _green_select_modules():
from eventlet.green import select
return [('select', select)]
@@ -177,7 +179,7 @@ def _green_thread_modules():
from eventlet.green import thread
from eventlet.green import threading
return [('Queue', Queue), ('thread', thread), ('threading', threading)]
def _green_time_modules():
from eventlet.green import time
return [('time', time)]

View File

@@ -10,7 +10,7 @@ try:
exec('''
@contextmanager
def item_impl(self):
""" Get an object out of the pool, for use with with statement.
""" Get an object out of the pool, for use with with statement.
>>> from eventlet import pools
>>> pool = pools.TokenPool(max_size=4)
@@ -35,20 +35,20 @@ except ImportError:
class Pool(object):
"""
Pool is a base class that implements resource limitation and construction.
It is meant to be subclassed. When subclassing, define only
It is meant to be subclassed. When subclassing, define only
the :meth:`create` method to implement the desired resource::
class MyPool(pools.Pool):
def create(self):
return MyObject()
If using 2.5 or greater, the :meth:`item` method acts as a context manager;
that's the best way to use it::
with mypool.item() as thing:
thing.dostuff()
If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred
If stuck on 2.4, the :meth:`get` and :meth:`put` methods are the preferred
nomenclature. Use a ``finally`` to ensure that nothing is leaked::
thing = self.pool.get()
@@ -59,12 +59,12 @@ class Pool(object):
The maximum size of the pool can be modified at runtime via
the :meth:`resize` method.
Specifying a non-zero *min-size* argument pre-populates the pool with
*min_size* items. *max-size* sets a hard limit to the size of the pool --
it cannot contain any more items than *max_size*, and if there are already
*max_size* items 'checked out' of the pool, the pool will cause any
greenthread calling :meth:`get` to cooperatively yield until an item
Specifying a non-zero *min-size* argument pre-populates the pool with
*min_size* items. *max-size* sets a hard limit to the size of the pool --
it cannot contain any more items than *max_size*, and if there are already
*max_size* items 'checked out' of the pool, the pool will cause any
greenthread calling :meth:`get` to cooperatively yield until an item
is :meth:`put` in.
"""
def __init__(self, min_size=0, max_size=4, order_as_stack=False):
@@ -96,7 +96,7 @@ class Pool(object):
self.current_size += 1
return created
return self.channel.get()
if item_impl is not None:
item = item_impl
@@ -118,11 +118,11 @@ class Pool(object):
def resize(self, new_size):
"""Resize the pool to *new_size*.
Adjusting this number does not affect existing items checked out of
the pool, nor on any greenthreads who are waiting for an item to free
Adjusting this number does not affect existing items checked out of
the pool, nor on any greenthreads who are waiting for an item to free
up. Some indeterminate number of :meth:`get`/:meth:`put`
cycles will be necessary before the new maximum size truly matches
cycles will be necessary before the new maximum size truly matches
the actual operation of the pool.
"""
self.max_size = new_size
@@ -137,18 +137,18 @@ class Pool(object):
"""Return the number of routines waiting for a pool item.
"""
return max(0, self.channel.getting() - self.channel.putting())
def create(self):
"""Generate a new pool item. This method must be overridden in order
for the pool to function. It accepts no arguments and returns a single
instance of whatever thing the pool is supposed to contain.
In general, :meth:`create` is called whenever the pool exceeds its
previous high-water mark of concurrently-checked-out-items. In other
words, in a new pool with *min_size* of 0, the very first call
to :meth:`get` will result in a call to :meth:`create`. If the first
caller calls :meth:`put` before some other caller calls :meth:`get`,
then the first item will be returned, and :meth:`create` will not be
In general, :meth:`create` is called whenever the pool exceeds its
previous high-water mark of concurrently-checked-out-items. In other
words, in a new pool with *min_size* of 0, the very first call
to :meth:`get` will result in a call to :meth:`create`. If the first
caller calls :meth:`put` before some other caller calls :meth:`get`,
then the first item will be returned, and :meth:`create` will not be
called a second time.
"""
raise NotImplementedError("Implement in subclass")
@@ -165,5 +165,3 @@ class TokenPool(Pool):
"""
def create(self):
return Token()

View File

@@ -1,5 +1,5 @@
import warnings
warnings.warn("eventlet.processes is deprecated in favor of "
warnings.warn("eventlet.processes is deprecated in favor of "
"eventlet.green.subprocess, which is API-compatible with the standard "
" library subprocess module.",
DeprecationWarning, stacklevel=2)
@@ -11,7 +11,6 @@ import popen2
import signal
from eventlet import api
from eventlet import coros
from eventlet import pools
from eventlet import greenio
@@ -69,7 +68,9 @@ class Process(object):
greenio.set_nonblocking(child_stdout_stderr)
greenio.set_nonblocking(child_stdin)
self.child_stdout_stderr = greenio.GreenPipe(child_stdout_stderr)
self.child_stdout_stderr.newlines = '\n' # the default is \r\n, which aren't sent over pipes
self.child_stdout_stderr.newlines = '\n' # the default is
# \r\n, which aren't sent over
# pipes
self.child_stdin = greenio.GreenPipe(child_stdin)
self.child_stdin.newlines = '\n'

View File

@@ -36,7 +36,10 @@ def wrap(obj, dead_callback = None):
return wrap_module(obj.__name__, dead_callback)
pythonpath_sync()
if _g_debug_mode:
p = Process(sys.executable, ["-W", "ignore", __file__, '--child', '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
p = Process(sys.executable,
["-W", "ignore", __file__, '--child',
'--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')],
dead_callback)
else:
p = Process(sys.executable, ["-W", "ignore", __file__, '--child'], dead_callback)
prox = Proxy(ChildProcess(p, p))
@@ -53,9 +56,13 @@ def wrap_module(fqname, dead_callback = None):
pythonpath_sync()
global _g_debug_mode
if _g_debug_mode:
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname, '--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')], dead_callback)
p = Process(sys.executable,
["-W", "ignore", __file__, '--module', fqname,
'--logfile', os.path.join(tempfile.gettempdir(), 'saranwrap.log')],
dead_callback)
else:
p = Process(sys.executable, ["-W", "ignore", __file__, '--module', fqname,], dead_callback)
p = Process(sys.executable,
["-W", "ignore", __file__, '--module', fqname,], dead_callback)
prox = Proxy(ChildProcess(p,p))
return prox
@@ -140,7 +147,8 @@ def _write_request(param, output):
def _is_local(attribute):
"Return ``True`` if the attribute should be handled locally"
# return attribute in ('_in', '_out', '_id', '__getattribute__', '__setattr__', '__dict__')
# return attribute in ('_in', '_out', '_id', '__getattribute__',
# '__setattr__', '__dict__')
# good enough for now. :)
if '__local_dict' in attribute:
return True
@@ -266,7 +274,8 @@ class Proxy(object):
my_cp = self.__local_dict['_cp']
my_id = self.__local_dict['_id']
# Pass the set attribute across
request = Request('setattr', {'id':my_id, 'attribute':attribute, 'value':value})
request = Request('setattr',
{'id':my_id, 'attribute':attribute, 'value':value})
return my_cp.make_request(request, attribute=attribute)
class ObjectProxy(Proxy):
@@ -324,7 +333,8 @@ class ObjectProxy(Proxy):
return self.__str__()
def __nonzero__(self):
# bool(obj) is another method that skips __getattribute__. There's no good way to just pass
# bool(obj) is another method that skips __getattribute__.
# There's no good way to just pass
# the method on, so we use a special message.
my_cp = self.__local_dict['_cp']
my_id = self.__local_dict['_id']
@@ -395,7 +405,9 @@ class CallableProxy(object):
# having already checked if the method starts with '_' so we
# can safely pass this one to the remote object.
#_prnt("calling %s %s" % (self._object_id, self._name)
request = Request('call', {'id':self._object_id, 'name':self._name, 'args':args, 'kwargs':kwargs})
request = Request('call', {'id':self._object_id,
'name':self._name,
'args':args, 'kwargs':kwargs})
return self._cp.make_request(request, attribute=self._name)
class Server(object):
@@ -444,14 +456,15 @@ class Server(object):
def handle_setitem(self, obj, req):
obj[req['key']] = req['value']
return None # *TODO figure out what the actual return value of __setitem__ should be
return None # *TODO figure out what the actual return value
# of __setitem__ should be
def handle_eq(self, obj, req):
#_log("__eq__ %s %s" % (obj, req))
rhs = None
try:
rhs = self._objects[req['rhs']]
except KeyError, e:
except KeyError:
return False
return (obj == rhs)
@@ -565,7 +578,7 @@ class Server(object):
#_log("objects: %s" % self._objects)
s = Pickle.dumps(body)
_log(`s`)
str_ = _write_lp_hunk(self._out, s)
_write_lp_hunk(self._out, s)
def write_exception(self, e):
"""Helper method to respond with an exception."""
@@ -621,14 +634,16 @@ def named(name):
import_err_strings.append(err.__str__())
toimport = '.'.join(toimport.split('.')[:-1])
if obj is None:
raise ImportError('%s could not be imported. Import errors: %r' % (name, import_err_strings))
raise ImportError(
'%s could not be imported. Import errors: %r' % (name, import_err_strings))
for seg in name.split('.')[1:]:
try:
obj = getattr(obj, seg)
except AttributeError:
dirobj = dir(obj)
dirobj.sort()
raise AttributeError('attribute %r missing from %r (%r) %r. Import errors: %r' % (
raise AttributeError(
'attribute %r missing from %r (%r) %r. Import errors: %r' % (
seg, obj, dirobj, name, import_err_strings))
return obj

View File

@@ -4,10 +4,10 @@ You generally don't have to use it unless you need to call reactor.run()
yourself.
"""
from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet.api import use_hub, _threadlocal
from eventlet import use_hub
from eventlet.support import greenlets as greenlet
from eventlet.hubs import _threadlocal
use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub')
hub = _threadlocal.hub = _threadlocal.Hub(greenlet.getcurrent())

View File

@@ -1,12 +1,11 @@
import os
import socket
import errno
import warnings
from eventlet import greenio
def g_log(*args):
warnings.warn("eventlet.util.g_log is deprecated because we're pretty sure no one uses it. Send mail to eventletdev@lists.secondlife.com if you are actually using it.",
warnings.warn("eventlet.util.g_log is deprecated because "
"we're pretty sure no one uses it. "
"Send mail to eventletdev@lists.secondlife.com "
"if you are actually using it.",
DeprecationWarning, stacklevel=2)
import sys
from eventlet.support import greenlets as greenlet
@@ -42,21 +41,22 @@ try:
server_side=server_side, cert_reqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True)
suppress_ragged_eofs=True)
except ImportError:
# if ssl is not available, use PyOpenSSL
def wrap_ssl(sock, certificate=None, private_key=None, server_side=False):
try:
from eventlet.green.OpenSSL import SSL
except ImportError:
raise ImportError("To use SSL with Eventlet, you must install PyOpenSSL or use Python 2.6 or later.")
raise ImportError("To use SSL with Eventlet, "
"you must install PyOpenSSL or use Python 2.6 or later.")
context = SSL.Context(SSL.SSLv23_METHOD)
if certificate is not None:
context.use_certificate_file(certificate)
if private_key is not None:
context.use_privatekey_file(private_key)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
connection = SSL.Connection(context, sock)
if server_side:
connection.set_accept_state()
@@ -65,22 +65,22 @@ except ImportError:
return connection
def wrap_socket_with_coroutine_socket(use_thread_pool=None):
warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now "
warnings.warn("eventlet.util.wrap_socket_with_coroutine_socket() is now "
"eventlet.patcher.monkey_patch(all=False, socket=True)",
DeprecationWarning, stacklevel=2)
from eventlet import patcher
patcher.monkey_patch(all=False, socket=True)
def wrap_pipes_with_coroutine_pipes():
warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now "
warnings.warn("eventlet.util.wrap_pipes_with_coroutine_pipes() is now "
"eventlet.patcher.monkey_patch(all=False, os=True)",
DeprecationWarning, stacklevel=2)
from eventlet import patcher
patcher.monkey_patch(all=False, os=True)
def wrap_select_with_coroutine_select():
warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now "
warnings.warn("eventlet.util.wrap_select_with_coroutine_select() is now "
"eventlet.patcher.monkey_patch(all=False, select=True)",
DeprecationWarning, stacklevel=2)
from eventlet import patcher
@@ -92,7 +92,7 @@ def wrap_threading_local_with_coro_local():
Since greenlets cannot cross threads, so this should be semantically
identical to ``threadlocal.local``
"""
warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now "
warnings.warn("eventlet.util.wrap_threading_local_with_coro_local() is now "
"eventlet.patcher.monkey_patch(all=False, thread=True) -- though"
"note that more than just _local is patched now.",
DeprecationWarning, stacklevel=2)
@@ -126,4 +126,3 @@ def set_reuse_addr(descriptor):
descriptor.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
except socket.error:
pass

View File

@@ -15,7 +15,8 @@ DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
MAX_REQUEST_LINE = 8192
MINIMUM_CHUNK_SIZE = 4096
DEFAULT_LOG_FORMAT='%(client_ip)s - - [%(date_time)s] "%(request_line)s" %(status_code)s %(body_length)s %(wall_seconds).6f'
DEFAULT_LOG_FORMAT= ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"'
' %(status_code)s %(body_length)s %(wall_seconds).6f')
__all__ = ['server', 'format_date_time']
@@ -32,7 +33,7 @@ def format_date_time(timestamp):
_weekdayname[wd], day, _monthname[month], year, hh, mm, ss
)
# Collections of error codes to compare against. Not all attributes are set
# Collections of error codes to compare against. Not all attributes are set
# on errno module on all platforms, so some are literals :(
BAD_SOCK = set((errno.EBADF, 10053))
BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET))
@@ -41,8 +42,8 @@ BROKEN_SOCK = set((errno.EPIPE, errno.ECONNRESET))
ALREADY_HANDLED = object()
def get_errno(err):
""" Simple method to get the error code out of socket.error objects. It
compensates for some cases where the code is not in the expected
""" Simple method to get the error code out of socket.error objects. It
compensates for some cases where the code is not in the expected
location."""
try:
return err[0]
@@ -50,13 +51,13 @@ def get_errno(err):
return None
class Input(object):
def __init__(self,
rfile,
content_length,
wfile=None,
def __init__(self,
rfile,
content_length,
wfile=None,
wfile_line=None,
chunked_input=False):
self.rfile = rfile
if content_length is not None:
content_length = int(content_length)
@@ -136,7 +137,7 @@ class Input(object):
def __iter__(self):
return iter(self.read())
def get_socket(self):
return self.rfile._sock.dup()
@@ -144,7 +145,7 @@ class Input(object):
class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
minimum_chunk_size = MINIMUM_CHUNK_SIZE
def setup(self):
# overriding SocketServer.setup to correctly handle SSL.Connection objects
conn = self.connection = self.request
@@ -173,7 +174,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
self.raw_requestline = self.rfile.readline(MAX_REQUEST_LINE)
if len(self.raw_requestline) == MAX_REQUEST_LINE:
self.wfile.write(
"HTTP/1.0 414 Request URI Too Long\r\nConnection: close\r\nContent-length: 0\r\n\r\n")
"HTTP/1.0 414 Request URI Too Long\r\n"
"Connection: close\r\nContent-length: 0\r\n\r\n")
self.close_connection = 1
return
except greenio.SSL.ZeroReturnError:
@@ -264,7 +266,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
if self.close_connection:
towrite.append('Connection: close\r\n')
elif send_keep_alive:
towrite.append('Connection: keep-alive\r\n')
towrite.append('Connection: keep-alive\r\n')
towrite.append('\r\n')
# end of header writing
@@ -277,7 +279,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
_writelines(towrite)
length[0] = length[0] + sum(map(len, towrite))
except UnicodeEncodeError:
print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)]
print "Encountered unicode while attempting to write wsgi response: ", \
[x for x in towrite if isinstance(x, unicode)]
traceback.print_exc()
_writelines(
["HTTP/1.0 500 Internal Server Error\r\n",
@@ -285,7 +288,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
"Content-type: text/plain\r\n",
"Content-length: 98\r\n",
"\r\n",
"Internal Server Error: wsgi application passed a unicode object to the server instead of a string."])
("Internal Server Error: wsgi application passed "
"a unicode object to the server instead of a string.")])
def start_response(status, response_headers, exc_info=None):
status_code[0] = status.split()[0]
@@ -298,7 +302,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
# Avoid dangling circular ref
exc_info = None
capitalized_headers = [('-'.join([x.capitalize() for x in key.split('-')]), value)
capitalized_headers = [('-'.join([x.capitalize()
for x in key.split('-')]), value)
for key, value in response_headers]
headers_set[:] = [status, capitalized_headers]
@@ -329,17 +334,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
write(''.join(towrite))
if not headers_sent or (use_chunked[0] and just_written_size):
write('')
except Exception, e:
except Exception:
self.close_connection = 1
exc = traceback.format_exc()
print exc
if not headers_set:
start_response("500 Internal Server Error", [('Content-type', 'text/plain')])
start_response("500 Internal Server Error",
[('Content-type', 'text/plain')])
write(exc)
finally:
if hasattr(result, 'close'):
result.close()
if self.environ['eventlet.input'].position < self.environ.get('CONTENT_LENGTH', 0):
if (self.environ['eventlet.input'].position
< self.environ.get('CONTENT_LENGTH', 0)):
## Read and discard body if there was no pending 100-continue
if not self.environ['eventlet.input'].wfile:
while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE):
@@ -353,7 +360,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
status_code=status_code[0],
body_length=length[0],
wall_seconds=finish - start))
def get_client_ip(self):
client_ip = self.client_address[0]
if self.server.log_x_forwarded_for:
@@ -422,19 +429,19 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
class Server(BaseHTTPServer.HTTPServer):
def __init__(self,
socket,
address,
app,
log=None,
environ=None,
max_http_version=None,
protocol=HttpProtocol,
def __init__(self,
socket,
address,
app,
log=None,
environ=None,
max_http_version=None,
protocol=HttpProtocol,
minimum_chunk_size=None,
log_x_forwarded_for=True,
keepalive=True,
log_format=DEFAULT_LOG_FORMAT):
self.outstanding_requests = 0
self.socket = socket
self.address = address
@@ -454,7 +461,6 @@ class Server(BaseHTTPServer.HTTPServer):
self.log_format = log_format
def get_environ(self):
socket = self.socket
d = {
'wsgi.errors': sys.stderr,
'wsgi.version': (1, 0),
@@ -477,29 +483,29 @@ class Server(BaseHTTPServer.HTTPServer):
try:
import ssl
ACCEPT_EXCEPTIONS = (socket.error, ssl.SSLError)
ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF,
ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF,
ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_SSL))
except ImportError:
ACCEPT_EXCEPTIONS = (socket.error,)
ACCEPT_ERRNO = set((errno.EPIPE, errno.EBADF))
def server(sock, site,
log=None,
environ=None,
def server(sock, site,
log=None,
environ=None,
max_size=None,
max_http_version=DEFAULT_MAX_HTTP_VERSION,
max_http_version=DEFAULT_MAX_HTTP_VERSION,
protocol=HttpProtocol,
server_event=None,
server_event=None,
minimum_chunk_size=None,
log_x_forwarded_for=True,
custom_pool=None,
keepalive=True,
log_format=DEFAULT_LOG_FORMAT):
""" Start up a wsgi server handling requests from the supplied server
""" Start up a wsgi server handling requests from the supplied server
socket. This function loops forever. The *sock* object will be closed after server exits,
but the underlying file descriptor will remain open, so if you have a dup() of *sock*,
it will remain usable.
:param sock: Server socket, must be already bound to a port and listening.
:param site: WSGI application function.
:param log: File-like object that logs should be written to. If not specified, sys.stderr is used.
@@ -514,11 +520,11 @@ def server(sock, site,
:param keepalive: If set to False, disables keepalives on the server; all connections will be closed after serving one request.
:param log_format: A python format string that is used as the template to generate log lines. The following values can be formatted into it: client_ip, date_time, request_line, status_code, body_length, wall_seconds. Look the default for an example of how to use this.
"""
serv = Server(sock, sock.getsockname(),
site, log,
environ=None,
max_http_version=max_http_version,
protocol=protocol,
serv = Server(sock, sock.getsockname(),
site, log,
environ=None,
max_http_version=max_http_version,
protocol=protocol,
minimum_chunk_size=minimum_chunk_size,
log_x_forwarded_for=log_x_forwarded_for,
keepalive=keepalive,
@@ -543,12 +549,13 @@ def server(sock, site,
if port == ':80':
port = ''
serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (os.getpid(), scheme, host, port))
serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (
os.getpid(), scheme, host, port))
while True:
try:
client_socket = sock.accept()
try:
pool.spawn_n(serv.process_request, client_socket)
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of " \
"eventlet.greenpool.GreenPool, is %s. Please convert your"\
@@ -572,4 +579,3 @@ def server(sock, site,
except socket.error, e:
if get_errno(e) not in BROKEN_SOCK:
traceback.print_exc()