python3 compat fixes

https://github.com/eventlet/eventlet/pull/59
This commit is contained in:
Victor Sergeyev
2013-11-25 18:21:40 +02:00
committed by Sergey Shepelev
parent cfdd9a922c
commit 2b2f0a96b4
35 changed files with 327 additions and 279 deletions

View File

@@ -35,6 +35,8 @@ Contributors
* Floris Bruynooghe * Floris Bruynooghe
* Paul Oppenheim * Paul Oppenheim
* Jakub Stasiak * Jakub Stasiak
* Aldona Majorek
* Victor Sergeyev
Linden Lab Contributors Linden Lab Contributors
----------------------- -----------------------

View File

@@ -9,7 +9,7 @@ def measure_best(repeat, iters,
funcs = list(funcs) funcs = list(funcs)
results = dict([(f,[]) for f in funcs]) results = dict([(f,[]) for f in funcs])
for i in xrange(repeat): for i in six.moves.range(repeat):
random.shuffle(funcs) random.shuffle(funcs)
for func in funcs: for func in funcs:
gc.collect() gc.collect()

View File

@@ -7,7 +7,10 @@ import sys
import eventlet import eventlet
import random import random
import time import time
from eventlet.hubs import timer, get_hub from eventlet.hubs import timer, get_hub
from eventlet.support import six
timer_count = 100000 timer_count = 100000
@@ -19,7 +22,7 @@ l = []
def work(n): def work(n):
l.append(n) l.append(n)
timeouts = [random.uniform(0, 10) for x in xrange(timer_count)] timeouts = [random.uniform(0, 10) for x in six.moves.range(timer_count)]
hub = get_hub() hub = get_hub()

View File

@@ -4,6 +4,7 @@ from __future__ import print_function
import time import time
import benchmarks import benchmarks
from eventlet.support import six
BYTES=1000 BYTES=1000
@@ -30,13 +31,13 @@ def writer(addr, socket_impl):
def green_accepter(server_sock, pool): def green_accepter(server_sock, pool):
for i in xrange(CONCURRENCY): for i in six.moves.range(CONCURRENCY):
sock, addr = server_sock.accept() sock, addr = server_sock.accept()
pool.spawn_n(reader, sock) pool.spawn_n(reader, sock)
def heavy_accepter(server_sock, pool): def heavy_accepter(server_sock, pool):
for i in xrange(CONCURRENCY): for i in six.moves.range(CONCURRENCY):
sock, addr = server_sock.accept() sock, addr = server_sock.accept()
t = threading.Thread(None, reader, "reader thread", (sock,)) t = threading.Thread(None, reader, "reader thread", (sock,))
t.start() t.start()
@@ -57,7 +58,7 @@ def launch_green_threads():
server_sock.listen(50) server_sock.listen(50)
addr = ('localhost', server_sock.getsockname()[1]) addr = ('localhost', server_sock.getsockname()[1])
pool.spawn_n(green_accepter, server_sock, pool) pool.spawn_n(green_accepter, server_sock, pool)
for i in xrange(CONCURRENCY): for i in six.moves.range(CONCURRENCY):
pool.spawn_n(writer, addr, eventlet.green.socket.socket) pool.spawn_n(writer, addr, eventlet.green.socket.socket)
pool.waitall() pool.waitall()
@@ -75,7 +76,7 @@ def launch_heavy_threads():
accepter_thread = threading.Thread(None, heavy_accepter, "accepter thread", (server_sock, threads)) accepter_thread = threading.Thread(None, heavy_accepter, "accepter thread", (server_sock, threads))
accepter_thread.start() accepter_thread.start()
threads.append(accepter_thread) threads.append(accepter_thread)
for i in xrange(CONCURRENCY): for i in six.moves.range(CONCURRENCY):
client_thread = threading.Thread(None, writer, "writer thread", (addr, socket.socket)) client_thread = threading.Thread(None, writer, "writer thread", (addr, socket.socket))
client_thread.start() client_thread.start()
threads.append(client_thread) threads.append(client_thread)

View File

@@ -54,7 +54,7 @@
timeout = Timeout(1) timeout = Timeout(1)
try: try:
... ...
except Timeout, t: except Timeout as t:
if t is not timeout: if t is not timeout:
raise # not my timeout raise # not my timeout

View File

@@ -95,12 +95,13 @@ def backdoor_server(sock, locals=None):
sock.close() sock.close()
def backdoor((conn, addr), locals=None): def backdoor(conn_info, locals=None):
"""Sets up an interactive console on a socket with a single connected """Sets up an interactive console on a socket with a single connected
client. This does not block the caller, as it spawns a new greenlet to client. This does not block the caller, as it spawns a new greenlet to
handle the console. This is meant to be called from within an accept loop handle the console. This is meant to be called from within an accept loop
(such as backdoor_server). (such as backdoor_server).
""" """
conn, addr = conn_info
host, port = addr host, port = addr
print("backdoor to %s:%s" % (host, port)) print("backdoor to %s:%s" % (host, port))
fl = conn.makefile("rw") fl = conn.makefile("rw")

View File

@@ -2,9 +2,12 @@ __select = __import__('select')
error = __select.error error = __select.error
from eventlet.greenthread import getcurrent from eventlet.greenthread import getcurrent
from eventlet.hubs import get_hub from eventlet.hubs import get_hub
from eventlet.support import six
__patched__ = ['select'] __patched__ = ['select']
def get_fileno(obj): def get_fileno(obj):
# The purpose of this function is to exactly replicate # The purpose of this function is to exactly replicate
# the behavior of the select module when confronted with # the behavior of the select module when confronted with
@@ -13,15 +16,16 @@ def get_fileno(obj):
try: try:
f = obj.fileno f = obj.fileno
except AttributeError: except AttributeError:
if not isinstance(obj, (int, long)): if not isinstance(obj, six.integer_types):
raise TypeError("Expected int or long, got " + type(obj)) raise TypeError("Expected int or long, got " + type(obj))
return obj return obj
else: else:
rv = f() rv = f()
if not isinstance(rv, (int, long)): if not isinstance(rv, six.integer_types):
raise TypeError("Expected int or long, got " + type(rv)) raise TypeError("Expected int or long, got " + type(rv))
return rv return rv
def select(read_list, write_list, error_list, timeout=None): def select(read_list, write_list, error_list, timeout=None):
# error checking like this is required by the stdlib unit tests # error checking like this is required by the stdlib unit tests
if timeout is not None: if timeout is not None:

View File

@@ -1,4 +1,4 @@
from eventlet.support import get_errno from eventlet.support import get_errno, six
from eventlet.hubs import trampoline from eventlet.hubs import trampoline
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
@@ -18,13 +18,11 @@ CONNECT_SUCCESS = set((0, errno.EISCONN))
if sys.platform[:3] == "win": if sys.platform[:3] == "win":
CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67 CONNECT_ERR.add(errno.WSAEINVAL) # Bug 67
# Emulate _fileobject class in 3.x implementation if six.PY3:
# Eventually this internal socket structure could be replaced with makefile calls. from io import IOBase as file
try: _fileobject = socket.SocketIO
elif six.PY2:
_fileobject = socket._fileobject _fileobject = socket._fileobject
except AttributeError:
def _fileobject(sock, *args, **kwargs):
return _original_socket.makefile(sock, *args, **kwargs)
def socket_connect(descriptor, address): def socket_connect(descriptor, address):
@@ -123,7 +121,7 @@ class GreenSocket(object):
""" """
def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs): def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True) should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family_or_realsock, (int, long)): if isinstance(family_or_realsock, six.integer_types):
fd = _original_socket(family_or_realsock, *args, **kwargs) fd = _original_socket(family_or_realsock, *args, **kwargs)
else: else:
fd = family_or_realsock fd = family_or_realsock
@@ -427,10 +425,10 @@ class GreenPipe(_fileobject):
- file argument can be descriptor, file name or file object. - file argument can be descriptor, file name or file object.
""" """
def __init__(self, f, mode='r', bufsize=-1): def __init__(self, f, mode='r', bufsize=-1):
if not isinstance(f, (basestring, int, file)): if not isinstance(f, six.string_types + (int, file)):
raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f) raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
if isinstance(f, basestring): if isinstance(f, six.string_types):
f = open(f, mode, 0) f = open(f, mode, 0)
if isinstance(f, int): if isinstance(f, int):
@@ -467,12 +465,11 @@ class GreenPipe(_fileobject):
'write', 'xreadlines', '__iter__', 'writelines']: 'write', 'xreadlines', '__iter__', 'writelines']:
setattr(self, method, _operationOnClosedFile) setattr(self, method, _operationOnClosedFile)
if getattr(file, '__enter__', None): def __enter__(self):
def __enter__(self): return self
return self
def __exit__(self, *args): def __exit__(self, *args):
self.close() self.close()
def readinto(self, buf): def readinto(self, buf):
data = self.read(len(buf)) # FIXME could it be done without allocating intermediate? data = self.read(len(buf)) # FIXME could it be done without allocating intermediate?

View File

@@ -5,10 +5,10 @@ from eventlet import event
from eventlet import hubs from eventlet import hubs
from eventlet import timeout from eventlet import timeout
from eventlet.hubs import timer from eventlet.hubs import timer
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet, six
import warnings import warnings
__all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'spawn_after', 'spawn_after_local', 'GreenThread'] __all__ = ['getcurrent', 'sleep', 'spawn', 'spawn_n', 'spawn_after', 'spawn_after_local', 'GreenThread']
getcurrent = greenlet.getcurrent getcurrent = greenlet.getcurrent
@@ -31,74 +31,74 @@ def sleep(seconds=0):
hub.switch() hub.switch()
finally: finally:
timer.cancel() timer.cancel()
def spawn(func, *args, **kwargs): def spawn(func, *args, **kwargs):
"""Create a greenthread to run ``func(*args, **kwargs)``. Returns a """Create a greenthread to run ``func(*args, **kwargs)``. Returns a
:class:`GreenThread` object which you can use to get the results of the :class:`GreenThread` object which you can use to get the results of the
call. call.
Execution control returns immediately to the caller; the created greenthread Execution control returns immediately to the caller; the created greenthread
is merely scheduled to be run at the next available opportunity. is merely scheduled to be run at the next available opportunity.
Use :func:`spawn_after` to arrange for greenthreads to be spawned Use :func:`spawn_after` to arrange for greenthreads to be spawned
after a finite delay. after a finite delay.
""" """
hub = hubs.get_hub() hub = hubs.get_hub()
g = GreenThread(hub.greenlet) g = GreenThread(hub.greenlet)
hub.schedule_call_global(0, g.switch, func, args, kwargs) hub.schedule_call_global(0, g.switch, func, args, kwargs)
return g return g
def spawn_n(func, *args, **kwargs): def spawn_n(func, *args, **kwargs):
"""Same as :func:`spawn`, but returns a ``greenlet`` object from """Same as :func:`spawn`, but returns a ``greenlet`` object from
which it is not possible to retrieve either a return value or which it is not possible to retrieve either a return value or
whether it raised any exceptions. This is faster than whether it raised any exceptions. This is faster than
:func:`spawn`; it is fastest if there are no keyword arguments. :func:`spawn`; it is fastest if there are no keyword arguments.
If an exception is raised in the function, spawn_n prints a stack If an exception is raised in the function, spawn_n prints a stack
trace; the print can be disabled by calling trace; the print can be disabled by calling
:func:`eventlet.debug.hub_exceptions` with False. :func:`eventlet.debug.hub_exceptions` with False.
""" """
return _spawn_n(0, func, args, kwargs)[1] return _spawn_n(0, func, args, kwargs)[1]
def spawn_after(seconds, func, *args, **kwargs): def spawn_after(seconds, func, *args, **kwargs):
"""Spawns *func* after *seconds* have elapsed. It runs as scheduled even if """Spawns *func* after *seconds* have elapsed. It runs as scheduled even if
the current greenthread has completed. the current greenthread has completed.
*seconds* may be specified as an integer, or a float if fractional seconds *seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *func* will be called with the given *args* and are desired. The *func* will be called with the given *args* and
keyword arguments *kwargs*, and will be executed within its own greenthread. keyword arguments *kwargs*, and will be executed within its own greenthread.
The return value of :func:`spawn_after` is a :class:`GreenThread` object, The return value of :func:`spawn_after` is a :class:`GreenThread` object,
which can be used to retrieve the results of the call. which can be used to retrieve the results of the call.
To cancel the spawn and prevent *func* from being called, To cancel the spawn and prevent *func* from being called,
call :meth:`GreenThread.cancel` on the return value of :func:`spawn_after`. call :meth:`GreenThread.cancel` on the return value of :func:`spawn_after`.
This will not abort the function if it's already started running, which is This will not abort the function if it's already started running, which is
generally the desired behavior. If terminating *func* regardless of whether generally the desired behavior. If terminating *func* regardless of whether
it's started or not is the desired behavior, call :meth:`GreenThread.kill`. it's started or not is the desired behavior, call :meth:`GreenThread.kill`.
""" """
hub = hubs.get_hub() hub = hubs.get_hub()
g = GreenThread(hub.greenlet) g = GreenThread(hub.greenlet)
hub.schedule_call_global(seconds, g.switch, func, args, kwargs) hub.schedule_call_global(seconds, g.switch, func, args, kwargs)
return g return g
def spawn_after_local(seconds, func, *args, **kwargs): def spawn_after_local(seconds, func, *args, **kwargs):
"""Spawns *func* after *seconds* have elapsed. The function will NOT be """Spawns *func* after *seconds* have elapsed. The function will NOT be
called if the current greenthread has exited. called if the current greenthread has exited.
*seconds* may be specified as an integer, or a float if fractional seconds *seconds* may be specified as an integer, or a float if fractional seconds
are desired. The *func* will be called with the given *args* and are desired. The *func* will be called with the given *args* and
keyword arguments *kwargs*, and will be executed within its own greenthread. keyword arguments *kwargs*, and will be executed within its own greenthread.
The return value of :func:`spawn_after` is a :class:`GreenThread` object, The return value of :func:`spawn_after` is a :class:`GreenThread` object,
which can be used to retrieve the results of the call. which can be used to retrieve the results of the call.
To cancel the spawn and prevent *func* from being called, To cancel the spawn and prevent *func* from being called,
call :meth:`GreenThread.cancel` on the return value. This will not abort the call :meth:`GreenThread.cancel` on the return value. This will not abort the
function if it's already started running. If terminating *func* regardless function if it's already started running. If terminating *func* regardless
of whether it's started or not is the desired behavior, call of whether it's started or not is the desired behavior, call
:meth:`GreenThread.kill`. :meth:`GreenThread.kill`.
""" """
@@ -106,7 +106,7 @@ def spawn_after_local(seconds, func, *args, **kwargs):
g = GreenThread(hub.greenlet) g = GreenThread(hub.greenlet)
hub.schedule_call_local(seconds, g.switch, func, args, kwargs) hub.schedule_call_local(seconds, g.switch, func, args, kwargs)
return g return g
def call_after_global(seconds, func, *args, **kwargs): def call_after_global(seconds, func, *args, **kwargs):
warnings.warn("call_after_global is renamed to spawn_after, which" warnings.warn("call_after_global is renamed to spawn_after, which"
@@ -114,7 +114,7 @@ def call_after_global(seconds, func, *args, **kwargs):
" quick search-and-replace on your codebase, thanks!", " quick search-and-replace on your codebase, thanks!",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
return _spawn_n(seconds, func, args, kwargs)[0] return _spawn_n(seconds, func, args, kwargs)[0]
def call_after_local(seconds, function, *args, **kwargs): def call_after_local(seconds, function, *args, **kwargs):
warnings.warn("call_after_local is renamed to spawn_after_local, which" warnings.warn("call_after_local is renamed to spawn_after_local, which"
@@ -151,7 +151,7 @@ def _spawn_n(seconds, func, args, kwargs):
class GreenThread(greenlet.greenlet): class GreenThread(greenlet.greenlet):
"""The GreenThread class is a type of Greenlet which has the additional """The GreenThread class is a type of Greenlet which has the additional
property of being able to retrieve the return value of the main function. property of being able to retrieve the return value of the main function.
Do not construct GreenThread objects directly; call :func:`spawn` to get one. Do not construct GreenThread objects directly; call :func:`spawn` to get one.
""" """
def __init__(self, parent): def __init__(self, parent):
@@ -160,35 +160,35 @@ class GreenThread(greenlet.greenlet):
self._resolving_links = False self._resolving_links = False
def wait(self): def wait(self):
""" Returns the result of the main function of this GreenThread. If the """ Returns the result of the main function of this GreenThread. If the
result is a normal return value, :meth:`wait` returns it. If it raised result is a normal return value, :meth:`wait` returns it. If it raised
an exception, :meth:`wait` will raise the same exception (though the an exception, :meth:`wait` will raise the same exception (though the
stack trace will unavoidably contain some frames from within the stack trace will unavoidably contain some frames from within the
greenthread module).""" greenthread module)."""
return self._exit_event.wait() return self._exit_event.wait()
def link(self, func, *curried_args, **curried_kwargs): def link(self, func, *curried_args, **curried_kwargs):
""" Set up a function to be called with the results of the GreenThread. """ Set up a function to be called with the results of the GreenThread.
The function must have the following signature:: The function must have the following signature::
def func(gt, [curried args/kwargs]): def func(gt, [curried args/kwargs]):
When the GreenThread finishes its run, it calls *func* with itself When the GreenThread finishes its run, it calls *func* with itself
and with the `curried arguments <http://en.wikipedia.org/wiki/Currying>`_ supplied at link-time. If the function wants and with the `curried arguments <http://en.wikipedia.org/wiki/Currying>`_ supplied at link-time. If the function wants
to retrieve the result of the GreenThread, it should call wait() to retrieve the result of the GreenThread, it should call wait()
on its first argument. on its first argument.
Note that *func* is called within execution context of Note that *func* is called within execution context of
the GreenThread, so it is possible to interfere with other linked the GreenThread, so it is possible to interfere with other linked
functions by doing things like switching explicitly to another functions by doing things like switching explicitly to another
greenthread. greenthread.
""" """
self._exit_funcs = getattr(self, '_exit_funcs', deque()) self._exit_funcs = getattr(self, '_exit_funcs', deque())
self._exit_funcs.append((func, curried_args, curried_kwargs)) self._exit_funcs.append((func, curried_args, curried_kwargs))
if self._exit_event.ready(): if self._exit_event.ready():
self._resolve_links() self._resolve_links()
def main(self, function, args, kwargs): def main(self, function, args, kwargs):
try: try:
result = function(*args, **kwargs) result = function(*args, **kwargs)
@@ -199,7 +199,7 @@ class GreenThread(greenlet.greenlet):
else: else:
self._exit_event.send(result) self._exit_event.send(result)
self._resolve_links() self._resolve_links()
def _resolve_links(self): def _resolve_links(self):
# ca and ckw are the curried function arguments # ca and ckw are the curried function arguments
if self._resolving_links: if self._resolving_links:
@@ -212,23 +212,23 @@ class GreenThread(greenlet.greenlet):
f(self, *ca, **ckw) f(self, *ca, **ckw)
finally: finally:
self._resolving_links = False self._resolving_links = False
def kill(self, *throw_args): def kill(self, *throw_args):
"""Kills the greenthread using :func:`kill`. After being killed """Kills the greenthread using :func:`kill`. After being killed
all calls to :meth:`wait` will raise *throw_args* (which default all calls to :meth:`wait` will raise *throw_args* (which default
to :class:`greenlet.GreenletExit`).""" to :class:`greenlet.GreenletExit`)."""
return kill(self, *throw_args) return kill(self, *throw_args)
def cancel(self, *throw_args): def cancel(self, *throw_args):
"""Kills the greenthread using :func:`kill`, but only if it hasn't """Kills the greenthread using :func:`kill`, but only if it hasn't
already started running. After being canceled, already started running. After being canceled,
all calls to :meth:`wait` will raise *throw_args* (which default all calls to :meth:`wait` will raise *throw_args* (which default
to :class:`greenlet.GreenletExit`).""" to :class:`greenlet.GreenletExit`)."""
return cancel(self, *throw_args) return cancel(self, *throw_args)
def cancel(g, *throw_args): def cancel(g, *throw_args):
"""Like :func:`kill`, but only terminates the greenthread if it hasn't """Like :func:`kill`, but only terminates the greenthread if it hasn't
already started execution. If the grenthread has already started already started execution. If the grenthread has already started
execution, :func:`cancel` has no effect.""" execution, :func:`cancel` has no effect."""
if not g: if not g:
kill(g, *throw_args) kill(g, *throw_args)
@@ -237,11 +237,11 @@ def kill(g, *throw_args):
"""Terminates the target greenthread by raising an exception into it. """Terminates the target greenthread by raising an exception into it.
Whatever that greenthread might be doing; be it waiting for I/O or another Whatever that greenthread might be doing; be it waiting for I/O or another
primitive, it sees an exception right away. primitive, it sees an exception right away.
By default, this exception is GreenletExit, but a specific exception By default, this exception is GreenletExit, but a specific exception
may be specified. *throw_args* should be the same as the arguments to may be specified. *throw_args* should be the same as the arguments to
raise; either an exception instance or an exc_info tuple. raise; either an exception instance or an exc_info tuple.
Calling :func:`kill` causes the calling greenthread to cooperatively yield. Calling :func:`kill` causes the calling greenthread to cooperatively yield.
""" """
if g.dead: if g.dead:
@@ -253,7 +253,7 @@ def kill(g, *throw_args):
# method never got called # method never got called
def just_raise(*a, **kw): def just_raise(*a, **kw):
if throw_args: if throw_args:
raise throw_args[0], throw_args[1], throw_args[2] six.reraise(throw_args[0], throw_args[1], throw_args[2])
else: else:
raise greenlet.GreenletExit() raise greenlet.GreenletExit()
g.run = just_raise g.run = just_raise

View File

@@ -3,7 +3,7 @@ import traceback
import event import event
import types import types
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet, six
from eventlet.hubs.hub import BaseHub, FdListener, READ, WRITE from eventlet.hubs.hub import BaseHub, FdListener, READ, WRITE
@@ -63,7 +63,7 @@ class Hub(BaseHub):
t = getattr(event, '__event_exc') t = getattr(event, '__event_exc')
setattr(event, '__event_exc', None) setattr(event, '__event_exc', None)
assert getattr(event, '__event_exc') is None assert getattr(event, '__event_exc') is None
raise t[0], t[1], t[2] six.reraise(t[0], t[1], t[2])
if result != 0: if result != 0:
return result return result

View File

@@ -1,10 +1,14 @@
import sys import sys
import imp import imp
from eventlet.support import six
__all__ = ['inject', 'import_patched', 'monkey_patch', 'is_monkey_patched'] __all__ = ['inject', 'import_patched', 'monkey_patch', 'is_monkey_patched']
__exclude = set(('__builtins__', '__file__', '__name__')) __exclude = set(('__builtins__', '__file__', '__name__'))
class SysModulesSaver(object): class SysModulesSaver(object):
"""Class that captures some subset of the current state of """Class that captures some subset of the current state of
sys.modules. Pass in an iterator of module names to the sys.modules. Pass in an iterator of module names to the
@@ -24,7 +28,7 @@ class SysModulesSaver(object):
sys.modules. sys.modules.
""" """
try: try:
for modname, mod in self._saved.iteritems(): for modname, mod in six.iteritems(self._saved):
if mod is not None: if mod is not None:
sys.modules[modname] = mod sys.modules[modname] = mod
else: else:
@@ -34,7 +38,7 @@ class SysModulesSaver(object):
pass pass
finally: finally:
imp.release_lock() imp.release_lock()
def inject(module_name, new_globals, *additional_modules): def inject(module_name, new_globals, *additional_modules):
"""Base method for "injecting" greened modules into an imported module. It """Base method for "injecting" greened modules into an imported module. It
@@ -64,9 +68,9 @@ def inject(module_name, new_globals, *additional_modules):
_green_select_modules() + _green_select_modules() +
_green_socket_modules() + _green_socket_modules() +
_green_thread_modules() + _green_thread_modules() +
_green_time_modules()) _green_time_modules())
#_green_MySQLdb()) # enable this after a short baking-in period #_green_MySQLdb()) # enable this after a short baking-in period
# after this we are gonna screw with sys.modules, so capture the # after this we are gonna screw with sys.modules, so capture the
# state of all the modules we're going to mess with, and lock # state of all the modules we're going to mess with, and lock
saver = SysModulesSaver([name for name, m in additional_modules]) saver = SysModulesSaver([name for name, m in additional_modules])
@@ -122,7 +126,7 @@ def patch_function(func, *additional_modules):
_green_os_modules() + _green_os_modules() +
_green_select_modules() + _green_select_modules() +
_green_socket_modules() + _green_socket_modules() +
_green_thread_modules() + _green_thread_modules() +
_green_time_modules()) _green_time_modules())
def patched(*args, **kw): def patched(*args, **kw):
@@ -155,7 +159,7 @@ def _original_patch_function(func, *module_names):
def original(modname): def original(modname):
""" This returns an unpatched version of a module; this is useful for """ This returns an unpatched version of a module; this is useful for
Eventlet itself (i.e. tpool).""" Eventlet itself (i.e. tpool)."""
# note that it's not necessary to temporarily install unpatched # note that it's not necessary to temporarily install unpatched
# versions of all patchable modules during the import of the # versions of all patchable modules during the import of the
@@ -172,19 +176,22 @@ def original(modname):
# some rudimentary dependency checking -- fortunately the modules # some rudimentary dependency checking -- fortunately the modules
# we're working on don't have many dependencies so we can just do # we're working on don't have many dependencies so we can just do
# some special-casing here # some special-casing here
deps = {'threading':'thread', 'Queue':'threading'} if six.PY2:
deps = {'threading': 'thread', 'Queue': 'threading'}
if six.PY3:
deps = {'threading': '_thread', 'queue': 'threading'}
if modname in deps: if modname in deps:
dependency = deps[modname] dependency = deps[modname]
saver.save(dependency) saver.save(dependency)
sys.modules[dependency] = original(dependency) sys.modules[dependency] = original(dependency)
try: try:
real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) real_mod = __import__(modname, {}, {}, modname.split('.')[:-1])
if modname == 'Queue' and not hasattr(real_mod, '_threading'): if modname in ('Queue', 'queue') and not hasattr(real_mod, '_threading'):
# tricky hack: Queue's constructor in <2.7 imports # tricky hack: Queue's constructor in <2.7 imports
# threading on every instantiation; therefore we wrap # threading on every instantiation; therefore we wrap
# it so that it always gets the original threading # it so that it always gets the original threading
real_mod.Queue.__init__ = _original_patch_function( real_mod.Queue.__init__ = _original_patch_function(
real_mod.Queue.__init__, real_mod.Queue.__init__,
'threading') 'threading')
# save a reference to the unpatched module so it doesn't get lost # save a reference to the unpatched module so it doesn't get lost
sys.modules[original_name] = real_mod sys.modules[original_name] = real_mod
@@ -200,14 +207,14 @@ def monkey_patch(**on):
The keyword arguments afford some control over which modules are patched. The keyword arguments afford some control over which modules are patched.
If no keyword arguments are supplied, all possible modules are patched. If no keyword arguments are supplied, all possible modules are patched.
If keywords are set to True, only the specified modules are patched. E.g., If keywords are set to True, only the specified modules are patched. E.g.,
``monkey_patch(socket=True, select=True)`` patches only the select and ``monkey_patch(socket=True, select=True)`` patches only the select and
socket modules. Most arguments patch the single module of the same name socket modules. Most arguments patch the single module of the same name
(os, time, select). The exceptions are socket, which also patches the ssl (os, time, select). The exceptions are socket, which also patches the ssl
module if present; and thread, which patches thread, threading, and Queue. module if present; and thread, which patches thread, threading, and Queue.
It's safe to call monkey_patch multiple times. It's safe to call monkey_patch multiple times.
""" """
accepted_args = set(('os', 'select', 'socket', accepted_args = set(('os', 'select', 'socket',
'thread', 'time', 'psycopg', 'MySQLdb')) 'thread', 'time', 'psycopg', 'MySQLdb'))
default_on = on.pop("all",None) default_on = on.pop("all",None)
for k in on.iterkeys(): for k in on.iterkeys():
@@ -221,7 +228,7 @@ def monkey_patch(**on):
# MySQLdb is only on when explicitly patched for the moment # MySQLdb is only on when explicitly patched for the moment
on.setdefault(modname, False) on.setdefault(modname, False)
on.setdefault(modname, default_on) on.setdefault(modname, default_on)
modules_to_patch = [] modules_to_patch = []
if on['os'] and not already_patched.get('os'): if on['os'] and not already_patched.get('os'):
modules_to_patch += _green_os_modules() modules_to_patch += _green_os_modules()
@@ -298,7 +305,10 @@ def _green_thread_modules():
from eventlet.green import Queue from eventlet.green import Queue
from eventlet.green import thread from eventlet.green import thread
from eventlet.green import threading from eventlet.green import threading
return [('Queue', Queue), ('thread', thread), ('threading', threading)] if six.PY2:
return [('Queue', Queue), ('thread', thread), ('threading', threading)]
if six.PY3:
return [('queue', Queue), ('_thread', thread), ('threading', threading)]
def _green_time_modules(): def _green_time_modules():
from eventlet.green import time from eventlet.green import time
@@ -315,9 +325,9 @@ def _green_MySQLdb():
def slurp_properties(source, destination, ignore=[], srckeys=None): def slurp_properties(source, destination, ignore=[], srckeys=None):
"""Copy properties from *source* (assumed to be a module) to """Copy properties from *source* (assumed to be a module) to
*destination* (assumed to be a dict). *destination* (assumed to be a dict).
*ignore* lists properties that should not be thusly copied. *ignore* lists properties that should not be thusly copied.
*srckeys* is a list of keys to copy, if the source's __all__ is *srckeys* is a list of keys to copy, if the source's __all__ is
untrustworthy. untrustworthy.
""" """
if srckeys is None: if srckeys is None:
@@ -325,7 +335,7 @@ def slurp_properties(source, destination, ignore=[], srckeys=None):
destination.update(dict([(name, getattr(source, name)) destination.update(dict([(name, getattr(source, name))
for name in srckeys for name in srckeys
if not ( if not (
name.startswith('__') or name.startswith('__') or
name in ignore) name in ignore)
])) ]))

View File

@@ -190,7 +190,7 @@ class Pool(object):
>>> from eventlet import coros >>> from eventlet import coros
>>> import string >>> import string
>>> pool = coros.CoroutinePool(max_size=5) >>> pool = coros.CoroutinePool(max_size=5)
>>> pausers = [coros.Event() for x in xrange(2)] >>> pausers = [coros.Event() for x in range(2)]
>>> def longtask(evt, desc): >>> def longtask(evt, desc):
... print("%s woke up with %s" % (desc, evt.wait())) ... print("%s woke up with %s" % (desc, evt.wait()))
... ...
@@ -229,14 +229,14 @@ class Pool(object):
returning h returning h
returning i returning i
g g
>>> print("".join([step.next() for x in xrange(3)])) >>> print("".join([step.next() for x in range(3)]))
returning j returning j
returning k returning k
returning l returning l
returning m returning m
hij hij
>>> pausers[1].send("B") >>> pausers[1].send("B")
>>> print("".join([step.next() for x in xrange(4)])) >>> print("".join([step.next() for x in range(4)]))
B woke up with B B woke up with B
returning n returning n
returning o returning o

View File

@@ -71,7 +71,7 @@ class Pool(object):
if create is not None: if create is not None:
self.create = create self.create = create
for x in xrange(min_size): for x in range(min_size):
self.current_size += 1 self.current_size += 1
self.free_items.append(self.create()) self.free_items.append(self.create())

View File

@@ -1,5 +1,5 @@
import warnings import warnings
warnings.warn("The proc module is deprecated! Please use the greenthread " warnings.warn("The proc module is deprecated! Please use the greenthread "
"module, or any of the many other Eventlet cross-coroutine " "module, or any of the many other Eventlet cross-coroutine "
"primitives, instead.", "primitives, instead.",
DeprecationWarning, stacklevel=2) DeprecationWarning, stacklevel=2)
@@ -630,7 +630,7 @@ class wrap_errors(object):
def func1(*args, **kwargs): def func1(*args, **kwargs):
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except (A, B, C), ex: except (A, B, C) as ex:
return ex return ex
wrap_errors provides a shortcut to write that in one line: wrap_errors provides a shortcut to write that in one line:

View File

@@ -45,17 +45,21 @@ import sys
import heapq import heapq
import collections import collections
import traceback import traceback
from Queue import Full, Empty
_NONE = object()
from eventlet.hubs import get_hub
from eventlet.greenthread import getcurrent
from eventlet.event import Event from eventlet.event import Event
from eventlet.greenthread import getcurrent
from eventlet.hubs import get_hub
from eventlet.support import six
from eventlet.timeout import Timeout from eventlet.timeout import Timeout
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty'] __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
_NONE = object()
Full = six.moves.queue.Full
Empty = six.moves.queue.Empty
class Waiter(object): class Waiter(object):
"""A low level synchronization class. """A low level synchronization class.
@@ -451,5 +455,3 @@ class LifoQueue(Queue):
def _get(self): def _get(self):
return self.queue.pop() return self.queue.pop()

View File

@@ -28,7 +28,7 @@ __all__ = ['Timeout',
_NONE = object() _NONE = object()
# deriving from BaseException so that "except Exception, e" doesn't catch # deriving from BaseException so that "except Exception as e" doesn't catch
# Timeout exceptions. # Timeout exceptions.
class Timeout(BaseException): class Timeout(BaseException):
"""Raises *exception* in the current greenthread after *timeout* seconds. """Raises *exception* in the current greenthread after *timeout* seconds.

View File

@@ -17,13 +17,16 @@ import imp
import os import os
import sys import sys
from eventlet import event from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet import greenio from eventlet.support import six
from eventlet import greenthread
from eventlet import patcher
from eventlet import timeout
threading = patcher.original('threading') threading = patcher.original('threading')
Queue_module = patcher.original('Queue') if six.PY2:
Queue_module = patcher.original('Queue')
if six.PY3:
Queue_module = patcher.original('queue')
Queue = Queue_module.Queue Queue = Queue_module.Queue
Empty = Queue_module.Empty Empty = Queue_module.Empty
@@ -117,7 +120,7 @@ def execute(meth,*args, **kwargs):
if not QUIET: if not QUIET:
traceback.print_exception(c,e,tb) traceback.print_exception(c,e,tb)
traceback.print_stack() traceback.print_stack()
raise c,e,tb six.reraise(c, e, tb)
return rv return rv
@@ -263,7 +266,7 @@ def setup():
warnings.warn("Zero threads in tpool. All tpool.execute calls will\ warnings.warn("Zero threads in tpool. All tpool.execute calls will\
execute in main thread. Check the value of the environment \ execute in main thread. Check the value of the environment \
variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning) variable EVENTLET_THREADPOOL_SIZE.", RuntimeWarning)
for i in xrange(_nthreads): for i in six.moves.range(_nthreads):
t = threading.Thread(target=tworker, t = threading.Thread(target=tworker,
name="tpool_thread_%s" % i) name="tpool_thread_%s" % i)
t.setDaemon(True) t.setDaemon(True)

View File

@@ -113,7 +113,7 @@ class WebSocketWSGI(object):
ws = self._handle_legacy_request(environ) ws = self._handle_legacy_request(environ)
else: else:
raise BadRequest() raise BadRequest()
except BadRequest, e: except BadRequest as e:
status = e.status status = e.status
body = e.body or '' body = e.body or ''
headers = e.headers or [] headers = e.headers or []
@@ -123,7 +123,7 @@ class WebSocketWSGI(object):
try: try:
self.handler(ws) self.handler(ws)
except socket.error, e: except socket.error as e:
if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
raise raise
# Make sure we send the closing frame # Make sure we send the closing frame
@@ -186,7 +186,7 @@ class WebSocketWSGI(object):
location, location,
response)) response))
else: #pragma NO COVER else: #pragma NO COVER
raise ValueError("Unknown WebSocket protocol version.") raise ValueError("Unknown WebSocket protocol version.")
sock.sendall(handshake_reply) sock.sendall(handshake_reply)
return WebSocket(sock, environ, self.protocol_version) return WebSocket(sock, environ, self.protocol_version)

View File

@@ -11,7 +11,8 @@ from eventlet.green import socket
from eventlet.green import BaseHTTPServer from eventlet.green import BaseHTTPServer
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenio from eventlet import greenio
from eventlet.support import get_errno from eventlet.support import get_errno, six
DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024 DEFAULT_MAX_SIMULTANEOUS_REQUESTS = 1024
DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1' DEFAULT_MAX_HTTP_VERSION = 'HTTP/1.1'
@@ -372,7 +373,7 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
try: try:
if headers_sent: if headers_sent:
# Re-raise original exception if headers sent # Re-raise original exception if headers sent
raise exc_info[0], exc_info[1], exc_info[2] six.reraise(exc_info[0], exc_info[1], exc_info[2])
finally: finally:
# Avoid dangling circular ref # Avoid dangling circular ref
exc_info = None exc_info = None
@@ -583,10 +584,11 @@ class Server(BaseHTTPServer.HTTPServer):
d.update(self.environ) d.update(self.environ)
return d return d
def process_request(self, (sock, address)): def process_request(self, sock_params):
# The actual request handling takes place in __init__, so we need to # The actual request handling takes place in __init__, so we need to
# set minimum_chunk_size before __init__ executes and we don't want to modify # set minimum_chunk_size before __init__ executes and we don't want to modify
# class variable # class variable
sock, address = sock_params
proto = types.InstanceType(self.protocol) proto = types.InstanceType(self.protocol)
if self.minimum_chunk_size is not None: if self.minimum_chunk_size is not None:
proto.minimum_chunk_size = self.minimum_chunk_size proto.minimum_chunk_size = self.minimum_chunk_size

View File

@@ -1,14 +1,16 @@
import eventlet import eventlet
from eventlet import wsgi from eventlet import wsgi
from eventlet import websocket from eventlet import websocket
from eventlet.support import six
# demo app # demo app
import os import os
import random import random
@websocket.WebSocketWSGI @websocket.WebSocketWSGI
def handle(ws): def handle(ws):
""" This is the websocket handler function. Note that we """ This is the websocket handler function. Note that we
can dispatch based on path in here, too.""" can dispatch based on path in here, too."""
if ws.path == '/echo': if ws.path == '/echo':
while True: while True:
@@ -16,12 +18,12 @@ def handle(ws):
if m is None: if m is None:
break break
ws.send(m) ws.send(m)
elif ws.path == '/data': elif ws.path == '/data':
for i in xrange(10000): for i in six.moves.range(10000):
ws.send("0 %s %s\n" % (i, random.random())) ws.send("0 %s %s\n" % (i, random.random()))
eventlet.sleep(0.1) eventlet.sleep(0.1)
def dispatch(environ, start_response): def dispatch(environ, start_response):
""" This resolves to the web page or the websocket depending on """ This resolves to the web page or the websocket depending on
the path.""" the path."""
@@ -30,11 +32,11 @@ def dispatch(environ, start_response):
else: else:
start_response('200 OK', [('content-type', 'text/html')]) start_response('200 OK', [('content-type', 'text/html')])
return [open(os.path.join( return [open(os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
'websocket.html')).read()] 'websocket.html')).read()]
if __name__ == "__main__": if __name__ == "__main__":
# run an example app from the command line # run an example app from the command line
listener = eventlet.listen(('127.0.0.1', 7000)) listener = eventlet.listen(('127.0.0.1', 7000))
print("\nVisit http://localhost:7000/ in your websocket-capable browser.\n") print("\nVisit http://localhost:7000/ in your websocket-capable browser.\n")
wsgi.server(listener, dispatch) wsgi.server(listener, dispatch)

View File

@@ -3,27 +3,30 @@ import os
import eventlet import eventlet
from eventlet import event from eventlet import event
from eventlet.green import socket from eventlet.green import socket
from eventlet.support import six
from tests import LimitedTestCase, s2b, skip_if_no_ssl from tests import LimitedTestCase, s2b, skip_if_no_ssl
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
class TestServe(LimitedTestCase): class TestServe(LimitedTestCase):
def setUp(self): def setUp(self):
super(TestServe, self).setUp() super(TestServe, self).setUp()
from eventlet import debug from eventlet import debug
debug.hub_exceptions(False) debug.hub_exceptions(False)
def tearDown(self): def tearDown(self):
super(TestServe, self).tearDown() super(TestServe, self).tearDown()
from eventlet import debug from eventlet import debug
debug.hub_exceptions(True) debug.hub_exceptions(True)
def test_exiting_server(self): def test_exiting_server(self):
# tests that the server closes the client sock on handle() exit # tests that the server closes the client sock on handle() exit
def closer(sock,addr): def closer(sock,addr):
pass pass
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, closer) gt = eventlet.spawn(eventlet.serve, l, closer)
client = eventlet.connect(('localhost', l.getsockname()[1])) client = eventlet.connect(('localhost', l.getsockname()[1]))
@@ -37,7 +40,7 @@ class TestServe(LimitedTestCase):
def crasher(sock,addr): def crasher(sock,addr):
sock.recv(1024) sock.recv(1024)
0//0 0//0
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher) gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1])) client = eventlet.connect(('localhost', l.getsockname()[1]))
@@ -51,7 +54,7 @@ class TestServe(LimitedTestCase):
sock.recv(1024) sock.recv(1024)
sock.close() sock.close()
0//0 0//0
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher) gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1])) client = eventlet.connect(('localhost', l.getsockname()[1]))
@@ -65,16 +68,16 @@ class TestServe(LimitedTestCase):
hits[0]+=1 hits[0]+=1
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, counter) gt = eventlet.spawn(eventlet.serve, l, counter)
for i in xrange(100): for i in six.moves.range(100):
client = eventlet.connect(('localhost', l.getsockname()[1])) client = eventlet.connect(('localhost', l.getsockname()[1]))
self.assertFalse(client.recv(100)) self.assertFalse(client.recv(100))
gt.kill() gt.kill()
self.assertEqual(100, hits[0]) self.assertEqual(100, hits[0])
def test_blocking(self): def test_blocking(self):
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
x = eventlet.with_timeout(0.01, x = eventlet.with_timeout(0.01,
eventlet.serve, l, lambda c,a: None, eventlet.serve, l, lambda c,a: None,
timeout_value="timeout") timeout_value="timeout")
self.assertEqual(x, "timeout") self.assertEqual(x, "timeout")
@@ -83,7 +86,7 @@ class TestServe(LimitedTestCase):
raise eventlet.StopServe() raise eventlet.StopServe()
l = eventlet.listen(('localhost', 0)) l = eventlet.listen(('localhost', 0))
# connect to trigger a call to stopit # connect to trigger a call to stopit
gt = eventlet.spawn(eventlet.connect, gt = eventlet.spawn(eventlet.connect,
('localhost', l.getsockname()[1])) ('localhost', l.getsockname()[1]))
eventlet.serve(l, stopit) eventlet.serve(l, stopit)
gt.wait() gt.wait()
@@ -100,7 +103,7 @@ class TestServe(LimitedTestCase):
# verify the client is connected by getting data # verify the client is connected by getting data
self.assertEquals(s2b('hi'), c.recv(2)) self.assertEquals(s2b('hi'), c.recv(2))
return c return c
clients = [test_client() for i in xrange(5)] clients = [test_client() for i in range(5)]
# very next client should not get anything # very next client should not get anything
x = eventlet.with_timeout(0.01, x = eventlet.with_timeout(0.01,
test_client, test_client,
@@ -120,7 +123,7 @@ class TestServe(LimitedTestCase):
client = eventlet.wrap_ssl(eventlet.connect(('localhost', port))) client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
client.sendall("echo") client.sendall("echo")
self.assertEquals("echo", client.recv(1024)) self.assertEquals("echo", client.recv(1024))
def test_socket_reuse(self): def test_socket_reuse(self):
lsock1 = eventlet.listen(('localhost',0)) lsock1 = eventlet.listen(('localhost',0))
port = lsock1.getsockname()[1] port = lsock1.getsockname()[1]

View File

@@ -10,6 +10,7 @@ from unittest import TestCase, main
from tests import skipped, skip_unless, skip_with_pyevent, get_database_auth from tests import skipped, skip_unless, skip_with_pyevent, get_database_auth
from eventlet import event from eventlet import event
from eventlet import db_pool from eventlet import db_pool
from eventlet.support import six
import eventlet import eventlet
@@ -133,7 +134,7 @@ class DBConnectionPool(DBTester):
def fill_up_table(self, conn): def fill_up_table(self, conn):
curs = conn.cursor() curs = conn.cursor()
for i in range(1000): for i in six.moves.range(1000):
curs.execute('insert into test_table (value_int) values (%s)' % i) curs.execute('insert into test_table (value_int) values (%s)' % i)
conn.commit() conn.commit()
@@ -419,12 +420,12 @@ class DBConnectionPool(DBTester):
c = self.connection.cursor() c = self.connection.cursor()
self.connection.commit() self.connection.commit()
def bench(c): def bench(c):
for i in xrange(iterations): for i in six.moves.range(iterations):
c.execute('select 1') c.execute('select 1')
bench(c) # warm-up bench(c) # warm-up
results = [] results = []
for i in xrange(3): for i in range(3):
start = time.time() start = time.time()
bench(c) bench(c)
end = time.time() end = time.time()

View File

@@ -35,7 +35,7 @@ def count():
expected = %s expected = %s
normal = %s normal = %s
p = eventlet.GreenPool() p = eventlet.GreenPool()
for i in xrange(expected*2): for i in range(expected*2):
p.spawn(tpool.execute, count) p.spawn(tpool.execute, count)
p.waitall() p.waitall()
assert highwater[0] > 20, "Highwater %%s <= %%s" %% (highwater[0], normal) assert highwater[0] > 20, "Highwater %%s <= %%s" %% (highwater[0], normal)

View File

@@ -626,7 +626,7 @@ class TestGreenPipe(LimitedTestCase):
one_line = "12345\n" one_line = "12345\n"
eventlet.spawn(sender, wf, one_line * 5) eventlet.spawn(sender, wf, one_line * 5)
for i in xrange(5): for i in range(5):
line = rf.readline() line = rf.readline()
eventlet.sleep(0.01) eventlet.sleep(0.01)
self.assertEquals(line, one_line) self.assertEquals(line, one_line)
@@ -668,7 +668,7 @@ class TestGreenPipe(LimitedTestCase):
r = greenio.GreenPipe(r) r = greenio.GreenPipe(r)
w = greenio.GreenPipe(w, 'w') w = greenio.GreenPipe(w, 'w')
large_message = "".join([1024 * chr(i) for i in xrange(65)]) large_message = "".join([1024 * chr(i) for i in range(65)])
def writer(): def writer():
w.write(large_message) w.write(large_message)
@@ -676,7 +676,7 @@ class TestGreenPipe(LimitedTestCase):
gt = eventlet.spawn(writer) gt = eventlet.spawn(writer)
for i in xrange(65): for i in range(65):
buf = r.read(1024) buf = r.read(1024)
expected = 1024 * chr(i) expected = 1024 * chr(i)
self.assertEquals(buf, expected, self.assertEquals(buf, expected,
@@ -790,7 +790,7 @@ class TestGreenIoStarvation(LimitedTestCase):
recvsize = 2 * min_buf_size() recvsize = 2 * min_buf_size()
sendsize = 10000 * recvsize sendsize = 10000 * recvsize
results = [[] for i in xrange(5)] results = [[] for i in range(5)]
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

View File

@@ -1,33 +1,37 @@
import gc import gc
import itertools import itertools
import os import os
import random import random
import eventlet import eventlet
from eventlet import debug from eventlet import debug
from eventlet import hubs, greenpool, coros, event from eventlet import hubs, greenpool, coros, event
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet, six
import tests import tests
def passthru(a): def passthru(a):
eventlet.sleep(0.01) eventlet.sleep(0.01)
return a return a
def passthru2(a, b): def passthru2(a, b):
eventlet.sleep(0.01) eventlet.sleep(0.01)
return a,b return a,b
def raiser(exc): def raiser(exc):
raise exc raise exc
class GreenPool(tests.LimitedTestCase): class GreenPool(tests.LimitedTestCase):
def test_spawn(self): def test_spawn(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
waiters = [] waiters = []
for i in xrange(10): for i in range(10):
waiters.append(p.spawn(passthru, i)) waiters.append(p.spawn(passthru, i))
results = [waiter.wait() for waiter in waiters] results = [waiter.wait() for waiter in waiters]
self.assertEquals(results, list(xrange(10))) self.assertEquals(results, list(range(10)))
def test_spawn_n(self): def test_spawn_n(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
@@ -35,11 +39,11 @@ class GreenPool(tests.LimitedTestCase):
def do_something(a): def do_something(a):
eventlet.sleep(0.01) eventlet.sleep(0.01)
results_closure.append(a) results_closure.append(a)
for i in xrange(10): for i in range(10):
p.spawn(do_something, i) p.spawn(do_something, i)
p.waitall() p.waitall()
self.assertEquals(results_closure, range(10)) self.assertEquals(results_closure, list(range(10)))
def test_waiting(self): def test_waiting(self):
pool = greenpool.GreenPool(1) pool = greenpool.GreenPool(1)
done = event.Event() done = event.Event()
@@ -48,7 +52,7 @@ class GreenPool(tests.LimitedTestCase):
def waiter(pool): def waiter(pool):
gt = pool.spawn(consume) gt = pool.spawn(consume)
gt.wait() gt.wait()
waiters = [] waiters = []
self.assertEqual(pool.running(), 0) self.assertEqual(pool.running(), 0)
waiters.append(eventlet.spawn(waiter, pool)) waiters.append(eventlet.spawn(waiter, pool))
@@ -66,7 +70,7 @@ class GreenPool(tests.LimitedTestCase):
w.wait() w.wait()
self.assertEqual(pool.waiting(), 0) self.assertEqual(pool.waiting(), 0)
self.assertEqual(pool.running(), 0) self.assertEqual(pool.running(), 0)
def test_multiple_coros(self): def test_multiple_coros(self):
evt = event.Event() evt = event.Event()
results = [] results = []
@@ -85,7 +89,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals(['cons1', 'prod', 'cons2'], results) self.assertEquals(['cons1', 'prod', 'cons2'], results)
def test_timer_cancel(self): def test_timer_cancel(self):
# this test verifies that local timers are not fired # this test verifies that local timers are not fired
# outside of the context of the spawn # outside of the context of the spawn
timer_fired = [] timer_fired = []
def fire_timer(): def fire_timer():
@@ -98,7 +102,7 @@ class GreenPool(tests.LimitedTestCase):
eventlet.sleep(0) eventlet.sleep(0)
eventlet.sleep(0) eventlet.sleep(0)
self.assertEquals(timer_fired, []) self.assertEquals(timer_fired, [])
def test_reentrant(self): def test_reentrant(self):
pool = greenpool.GreenPool(1) pool = greenpool.GreenPool(1)
def reenter(): def reenter():
@@ -115,7 +119,7 @@ class GreenPool(tests.LimitedTestCase):
pool.spawn_n(reenter_async) pool.spawn_n(reenter_async)
self.assertEquals('done', evt.wait()) self.assertEquals('done', evt.wait())
def assert_pool_has_free(self, pool, num_free): def assert_pool_has_free(self, pool, num_free):
self.assertEquals(pool.free(), num_free) self.assertEquals(pool.free(), num_free)
def wait_long_time(e): def wait_long_time(e):
@@ -123,7 +127,7 @@ class GreenPool(tests.LimitedTestCase):
timer = eventlet.Timeout(1) timer = eventlet.Timeout(1)
try: try:
evt = event.Event() evt = event.Event()
for x in xrange(num_free): for x in six.moves.range(num_free):
pool.spawn(wait_long_time, evt) pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect, # if the pool has fewer free than we expect,
# then we'll hit the timeout error # then we'll hit the timeout error
@@ -142,7 +146,7 @@ class GreenPool(tests.LimitedTestCase):
evt.send(None) evt.send(None)
eventlet.sleep(0) eventlet.sleep(0)
eventlet.sleep(0) eventlet.sleep(0)
def test_resize(self): def test_resize(self):
pool = greenpool.GreenPool(2) pool = greenpool.GreenPool(2)
evt = event.Event() evt = event.Event()
@@ -156,13 +160,13 @@ class GreenPool(tests.LimitedTestCase):
# verify that the pool discards excess items put into it # verify that the pool discards excess items put into it
pool.resize(1) pool.resize(1)
# cause the wait_long_time functions to return, which will # cause the wait_long_time functions to return, which will
# trigger puts to the pool # trigger puts to the pool
evt.send(None) evt.send(None)
eventlet.sleep(0) eventlet.sleep(0)
eventlet.sleep(0) eventlet.sleep(0)
self.assertEquals(pool.free(), 1) self.assertEquals(pool.free(), 1)
self.assertEquals(pool.running(), 0) self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 1) self.assert_pool_has_free(pool, 1)
@@ -172,7 +176,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals(pool.free(), 2) self.assertEquals(pool.free(), 2)
self.assertEquals(pool.running(), 0) self.assertEquals(pool.running(), 0)
self.assert_pool_has_free(pool, 2) self.assert_pool_has_free(pool, 2)
def test_pool_smash(self): def test_pool_smash(self):
# The premise is that a coroutine in a Pool tries to get a token out # The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify # of a token pool but times out before getting the token. We verify
@@ -211,7 +215,7 @@ class GreenPool(tests.LimitedTestCase):
# we should be able to get out the thing we put in there, too # we should be able to get out the thing we put in there, too
self.assertEquals(tp.get(), 'wakeup') self.assertEquals(tp.get(), 'wakeup')
gt.wait() gt.wait()
def test_spawn_n_2(self): def test_spawn_n_2(self):
p = greenpool.GreenPool(2) p = greenpool.GreenPool(2)
self.assertEqual(p.free(), 2) self.assertEqual(p.free(), 2)
@@ -238,7 +242,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(set(r), set([1,2,3])) self.assertEqual(set(r), set([1,2,3]))
eventlet.sleep(0) eventlet.sleep(0)
self.assertEqual(set(r), set([1,2,3,4])) self.assertEqual(set(r), set([1,2,3,4]))
def test_exceptions(self): def test_exceptions(self):
p = greenpool.GreenPool(2) p = greenpool.GreenPool(2)
for m in (p.spawn, p.spawn_n): for m in (p.spawn, p.spawn_n):
@@ -254,23 +258,23 @@ class GreenPool(tests.LimitedTestCase):
def test_imap(self): def test_imap(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru, xrange(10))) result_list = list(p.imap(passthru, range(10)))
self.assertEquals(result_list, list(xrange(10))) self.assertEquals(result_list, list(range(10)))
def test_empty_imap(self): def test_empty_imap(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_iter = p.imap(passthru, []) result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next) self.assertRaises(StopIteration, result_iter.next)
def test_imap_nonefunc(self): def test_imap_nonefunc(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(None, xrange(10))) result_list = list(p.imap(None, range(10)))
self.assertEquals(result_list, [(x,) for x in xrange(10)]) self.assertEquals(result_list, [(x,) for x in range(10)])
def test_imap_multi_args(self): def test_imap_multi_args(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru2, xrange(10), xrange(10, 20))) result_list = list(p.imap(passthru2, range(10), range(10, 20)))
self.assertEquals(result_list, list(itertools.izip(xrange(10), xrange(10,20)))) self.assertEquals(result_list, list(itertools.izip(range(10), range(10,20))))
def test_imap_raises(self): def test_imap_raises(self):
# testing the case where the function raises an exception; # testing the case where the function raises an exception;
@@ -282,7 +286,7 @@ class GreenPool(tests.LimitedTestCase):
raise RuntimeError("intentional error") raise RuntimeError("intentional error")
else: else:
return item return item
it = p.imap(raiser, xrange(10)) it = p.imap(raiser, range(10))
results = [] results = []
while True: while True:
try: try:
@@ -292,55 +296,55 @@ class GreenPool(tests.LimitedTestCase):
except StopIteration: except StopIteration:
break break
self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9]) self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9])
def test_starmap(self): def test_starmap(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.starmap(passthru, [(x,) for x in xrange(10)])) result_list = list(p.starmap(passthru, [(x,) for x in range(10)]))
self.assertEquals(result_list, range(10)) self.assertEquals(result_list, list(range(10)))
def test_waitall_on_nothing(self): def test_waitall_on_nothing(self):
p = greenpool.GreenPool() p = greenpool.GreenPool()
p.waitall() p.waitall()
def test_recursive_waitall(self): def test_recursive_waitall(self):
p = greenpool.GreenPool() p = greenpool.GreenPool()
gt = p.spawn(p.waitall) gt = p.spawn(p.waitall)
self.assertRaises(AssertionError, gt.wait) self.assertRaises(AssertionError, gt.wait)
class GreenPile(tests.LimitedTestCase): class GreenPile(tests.LimitedTestCase):
def test_pile(self): def test_pile(self):
p = greenpool.GreenPile(4) p = greenpool.GreenPile(4)
for i in xrange(10): for i in range(10):
p.spawn(passthru, i) p.spawn(passthru, i)
result_list = list(p) result_list = list(p)
self.assertEquals(result_list, list(xrange(10))) self.assertEquals(result_list, list(range(10)))
def test_pile_spawn_times_out(self): def test_pile_spawn_times_out(self):
p = greenpool.GreenPile(4) p = greenpool.GreenPile(4)
for i in xrange(4): for i in range(4):
p.spawn(passthru, i) p.spawn(passthru, i)
# now it should be full and this should time out # now it should be full and this should time out
eventlet.Timeout(0) eventlet.Timeout(0)
self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out") self.assertRaises(eventlet.Timeout, p.spawn, passthru, "time out")
# verify that the spawn breakage didn't interrupt the sequence # verify that the spawn breakage didn't interrupt the sequence
# and terminates properly # and terminates properly
for i in xrange(4,10): for i in range(4,10):
p.spawn(passthru, i) p.spawn(passthru, i)
self.assertEquals(list(p), list(xrange(10))) self.assertEquals(list(p), list(range(10)))
def test_constructing_from_pool(self): def test_constructing_from_pool(self):
pool = greenpool.GreenPool(2) pool = greenpool.GreenPool(2)
pile1 = greenpool.GreenPile(pool) pile1 = greenpool.GreenPile(pool)
pile2 = greenpool.GreenPile(pool) pile2 = greenpool.GreenPile(pool)
def bunch_of_work(pile, unique): def bunch_of_work(pile, unique):
for i in xrange(10): for i in range(10):
pile.spawn(passthru, i + unique) pile.spawn(passthru, i + unique)
eventlet.spawn(bunch_of_work, pile1, 0) eventlet.spawn(bunch_of_work, pile1, 0)
eventlet.spawn(bunch_of_work, pile2, 100) eventlet.spawn(bunch_of_work, pile2, 100)
eventlet.sleep(0) eventlet.sleep(0)
self.assertEquals(list(pile2), list(xrange(100,110))) self.assertEquals(list(pile2), list(range(100,110)))
self.assertEquals(list(pile1), list(xrange(10))) self.assertEquals(list(pile1), list(range(10)))
class StressException(Exception): class StressException(Exception):
@@ -359,7 +363,7 @@ def passthru(arg):
while r.random() < 0.5: while r.random() < 0.5:
eventlet.sleep(r.random() * 0.001) eventlet.sleep(r.random() * 0.001)
return arg return arg
class Stress(tests.LimitedTestCase): class Stress(tests.LimitedTestCase):
# tests will take extra-long # tests will take extra-long
TEST_TIMEOUT=60 TEST_TIMEOUT=60
@@ -367,11 +371,11 @@ class Stress(tests.LimitedTestCase):
def spawn_order_check(self, concurrency): def spawn_order_check(self, concurrency):
# checks that piles are strictly ordered # checks that piles are strictly ordered
p = greenpool.GreenPile(concurrency) p = greenpool.GreenPile(concurrency)
def makework(count, unique): def makework(count, unique):
for i in xrange(count): for i in six.moves.range(count):
token = (unique, i) token = (unique, i)
p.spawn(pressure, token) p.spawn(pressure, token)
iters = 1000 iters = 1000
eventlet.spawn(makework, iters, 1) eventlet.spawn(makework, iters, 1)
eventlet.spawn(makework, iters, 2) eventlet.spawn(makework, iters, 2)
@@ -387,7 +391,7 @@ class Stress(tests.LimitedTestCase):
i = exc.args[0] i = exc.args[0]
except StopIteration: except StopIteration:
break break
received += 1 received += 1
if received % 5 == 0: if received % 5 == 0:
eventlet.sleep(0.0001) eventlet.sleep(0.0001)
unique, order = i unique, order = i
@@ -399,17 +403,17 @@ class Stress(tests.LimitedTestCase):
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_5(self): def test_ordering_5(self):
self.spawn_order_check(5) self.spawn_order_check(5)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_50(self): def test_ordering_50(self):
self.spawn_order_check(50) self.spawn_order_check(50)
def imap_memory_check(self, concurrency): def imap_memory_check(self, concurrency):
# checks that imap is strictly # checks that imap is strictly
# ordered and consumes a constant amount of memory # ordered and consumes a constant amount of memory
p = greenpool.GreenPool(concurrency) p = greenpool.GreenPool(concurrency)
count = 1000 count = 1000
it = p.imap(passthru, xrange(count)) it = p.imap(passthru, six.moves.range(count))
latest = -1 latest = -1
while True: while True:
try: try:
@@ -430,11 +434,11 @@ class Stress(tests.LimitedTestCase):
self.assert_(objs_created < 25 * concurrency, objs_created) self.assert_(objs_created < 25 * concurrency, objs_created)
# make sure we got to the end # make sure we got to the end
self.assertEquals(latest, count - 1) self.assertEquals(latest, count - 1)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_50(self): def test_imap_50(self):
self.imap_memory_check(50) self.imap_memory_check(50)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES') @tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_500(self): def test_imap_500(self):
self.imap_memory_check(500) self.imap_memory_check(500)
@@ -446,20 +450,20 @@ class Stress(tests.LimitedTestCase):
def create(self): def create(self):
self.current_integer = getattr(self, 'current_integer', 0) + 1 self.current_integer = getattr(self, 'current_integer', 0) + 1
return self.current_integer return self.current_integer
def subtest(intpool_size, pool_size, num_executes): def subtest(intpool_size, pool_size, num_executes):
def run(int_pool): def run(int_pool):
token = int_pool.get() token = int_pool.get()
eventlet.sleep(0.0001) eventlet.sleep(0.0001)
int_pool.put(token) int_pool.put(token)
return token return token
int_pool = IntPool(max_size=intpool_size) int_pool = IntPool(max_size=intpool_size)
pool = greenpool.GreenPool(pool_size) pool = greenpool.GreenPool(pool_size)
for ix in xrange(num_executes): for ix in six.moves.range(num_executes):
pool.spawn(run, int_pool) pool.spawn(run, int_pool)
pool.waitall() pool.waitall()
subtest(4, 7, 7) subtest(4, 7, 7)
subtest(50, 75, 100) subtest(50, 75, 100)
for isize in (10, 20, 30, 40, 50): for isize in (10, 20, 30, 40, 50):

View File

@@ -9,7 +9,8 @@ from eventlet import hubs
from eventlet.green import socket from eventlet.green import socket
from eventlet.event import Event from eventlet.event import Event
from eventlet.semaphore import Semaphore from eventlet.semaphore import Semaphore
from eventlet.support import greenlets from eventlet.support import greenlets, six
DELAY = 0.001 DELAY = 0.001
@@ -26,7 +27,7 @@ class TestTimerCleanup(LimitedTestCase):
hub = hubs.get_hub() hub = hubs.get_hub()
stimers = hub.get_timers_count() stimers = hub.get_timers_count()
scanceled = hub.timers_canceled scanceled = hub.timers_canceled
for i in xrange(2000): for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop) t = hubs.get_hub().schedule_call_global(60, noop)
t.cancel() t.cancel()
self.assert_less_than_equal(hub.timers_canceled, self.assert_less_than_equal(hub.timers_canceled,
@@ -40,7 +41,7 @@ class TestTimerCleanup(LimitedTestCase):
hub = hubs.get_hub() hub = hubs.get_hub()
stimers = hub.get_timers_count() stimers = hub.get_timers_count()
scanceled = hub.timers_canceled scanceled = hub.timers_canceled
for i in xrange(2000): for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop) t = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep() eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled, self.assert_less_than_equal(hub.timers_canceled,
@@ -60,7 +61,7 @@ class TestTimerCleanup(LimitedTestCase):
uncanceled_timers = [] uncanceled_timers = []
stimers = hub.get_timers_count() stimers = hub.get_timers_count()
scanceled = hub.timers_canceled scanceled = hub.timers_canceled
for i in xrange(1000): for i in six.moves.range(1000):
# 2/3rds of new timers are uncanceled # 2/3rds of new timers are uncanceled
t = hubs.get_hub().schedule_call_global(60, noop) t = hubs.get_hub().schedule_call_global(60, noop)
t2 = hubs.get_hub().schedule_call_global(60, noop) t2 = hubs.get_hub().schedule_call_global(60, noop)
@@ -304,7 +305,7 @@ server = eventlet.listen(('localhost', 12345))
t = eventlet.Timeout(0.01) t = eventlet.Timeout(0.01)
try: try:
new_sock, address = server.accept() new_sock, address = server.accept()
except eventlet.Timeout, t: except eventlet.Timeout as t:
pass pass
pid = os.fork() pid = os.fork()
@@ -312,7 +313,7 @@ if not pid:
t = eventlet.Timeout(0.1) t = eventlet.Timeout(0.1)
try: try:
new_sock, address = server.accept() new_sock, address = server.accept()
except eventlet.Timeout, t: except eventlet.Timeout as t:
print "accept blocked" print "accept blocked"
else: else:

View File

@@ -16,7 +16,7 @@ if not patcher.is_monkey_patched('psycopg'):
count = [0] count = [0]
def tick(totalseconds, persecond): def tick(totalseconds, persecond):
for i in xrange(totalseconds*persecond): for i in range(totalseconds*persecond):
count[0] += 1 count[0] += 1
eventlet.sleep(1.0/persecond) eventlet.sleep(1.0/persecond)

View File

@@ -216,7 +216,8 @@ test_monkey_patch_threading = """
def test_monkey_patch_threading(): def test_monkey_patch_threading():
tickcount = [0] tickcount = [0]
def tick(): def tick():
for i in xrange(1000): from eventlet.support import six
for i in six.moves.range(1000):
tickcount[0] += 1 tickcount[0] += 1
eventlet.sleep() eventlet.sleep()
@@ -447,11 +448,11 @@ t2.join()
""" + self.epilogue) """ + self.epilogue)
output, lines = self.launch_subprocess('newmod') output, lines = self.launch_subprocess('newmod')
self.assertEqual(len(lines), 10, "\n".join(lines)) self.assertEqual(len(lines), 10, "\n".join(lines))
for i in xrange(0, 3): for i in range(0, 3):
self.assertEqual(lines[i], "GreenThread-1", lines[i]) self.assertEqual(lines[i], "GreenThread-1", lines[i])
for i in xrange(3, 6): for i in range(3, 6):
self.assertEqual(lines[i], "foo", lines[i]) self.assertEqual(lines[i], "foo", lines[i])
for i in xrange(6, 9): for i in range(6, 9):
self.assertEqual(lines[i], "bar", lines[i]) self.assertEqual(lines[i], "bar", lines[i])
def test_ident(self): def test_ident(self):

View File

@@ -3,6 +3,8 @@ from unittest import TestCase, main
import eventlet import eventlet
from eventlet import Queue from eventlet import Queue
from eventlet import pools from eventlet import pools
from eventlet.support import six
class IntPool(pools.Pool): class IntPool(pools.Pool):
def create(self): def create(self):
@@ -111,14 +113,14 @@ class TestIntPool(TestCase):
def just_put(pool_item, index): def just_put(pool_item, index):
self.pool.put(pool_item) self.pool.put(pool_item)
queue.put(index) queue.put(index)
for index in xrange(size + 1): for index in six.moves.range(size + 1):
pool_item = self.pool.get() pool_item = self.pool.get()
eventlet.spawn(just_put, pool_item, index) eventlet.spawn(just_put, pool_item, index)
for _ in range(size+1): for _ in six.moves.range(size+1):
x = queue.get() x = queue.get()
results.append(x) results.append(x)
self.assertEqual(sorted(results), range(size + 1)) self.assertEqual(sorted(results), list(six.moves.range(size + 1)))
finally: finally:
timer.cancel() timer.cancel()
@@ -137,14 +139,14 @@ class TestIntPool(TestCase):
# resize larger and assert that there are more free items # resize larger and assert that there are more free items
pool.resize(2) pool.resize(2)
self.assertEquals(pool.free(), 2) self.assertEquals(pool.free(), 2)
def test_create_contention(self): def test_create_contention(self):
creates = [0] creates = [0]
def sleep_create(): def sleep_create():
creates[0] += 1 creates[0] += 1
eventlet.sleep() eventlet.sleep()
return "slept" return "slept"
p = pools.Pool(max_size=4, create=sleep_create) p = pools.Pool(max_size=4, create=sleep_create)
def do_get(): def do_get():
@@ -153,7 +155,7 @@ class TestIntPool(TestCase):
p.put(x) p.put(x)
gp = eventlet.GreenPool() gp = eventlet.GreenPool()
for i in xrange(100): for i in six.moves.range(100):
gp.spawn_n(do_get) gp.spawn_n(do_get)
gp.waitall() gp.waitall()
self.assertEquals(creates[0], 4) self.assertEquals(creates[0], 4)

View File

@@ -90,10 +90,10 @@ class TestQueue(LimitedTestCase):
for i in range(5): for i in range(5):
q.put(i) q.put(i)
self.assertEquals(list(q.queue), range(5)) self.assertEquals(list(q.queue), list(range(5)))
q.resize(1) q.resize(1)
eventlet.sleep(0) eventlet.sleep(0)
self.assertEquals(list(q.queue), range(5)) self.assertEquals(list(q.queue), list(range(5)))
def test_resize_to_Unlimited(self): def test_resize_to_Unlimited(self):
q = eventlet.Queue(0) q = eventlet.Queue(0)

View File

@@ -4,9 +4,11 @@ warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import pool, coros, api, hubs, timeout from eventlet import pool, coros, api, hubs, timeout
warnings.simplefilter('default', DeprecationWarning) warnings.simplefilter('default', DeprecationWarning)
from eventlet import event as _event from eventlet import event as _event
from eventlet.support import six
from tests import LimitedTestCase from tests import LimitedTestCase
from unittest import main from unittest import main
class TestCoroutinePool(LimitedTestCase): class TestCoroutinePool(LimitedTestCase):
klass = pool.Pool klass = pool.Pool
@@ -25,7 +27,7 @@ class TestCoroutinePool(LimitedTestCase):
pool = self.klass(0, 2) pool = self.klass(0, 2)
worker = pool.execute(some_work) worker = pool.execute(some_work)
self.assertEqual(value, worker.wait()) self.assertEqual(value, worker.wait())
def test_waiting(self): def test_waiting(self):
pool = self.klass(0,1) pool = self.klass(0,1)
done = _event.Event() done = _event.Event()
@@ -34,7 +36,7 @@ class TestCoroutinePool(LimitedTestCase):
def waiter(pool): def waiter(pool):
evt = pool.execute(consume) evt = pool.execute(consume)
evt.wait() evt.wait()
waiters = [] waiters = []
waiters.append(eventlet.spawn(waiter, pool)) waiters.append(eventlet.spawn(waiter, pool))
api.sleep(0) api.sleep(0)
@@ -69,7 +71,7 @@ class TestCoroutinePool(LimitedTestCase):
self.assertEquals(['cons1', 'prod', 'cons2'], results) self.assertEquals(['cons1', 'prod', 'cons2'], results)
def test_timer_cancel(self): def test_timer_cancel(self):
# this test verifies that local timers are not fired # this test verifies that local timers are not fired
# outside of the context of the execute method # outside of the context of the execute method
timer_fired = [] timer_fired = []
def fire_timer(): def fire_timer():
@@ -105,7 +107,7 @@ class TestCoroutinePool(LimitedTestCase):
timer = timeout.Timeout(1, api.TimeoutError) timer = timeout.Timeout(1, api.TimeoutError)
try: try:
evt = _event.Event() evt = _event.Event()
for x in xrange(num_free): for x in six.moves.range(num_free):
pool.execute(wait_long_time, evt) pool.execute(wait_long_time, evt)
# if the pool has fewer free than we expect, # if the pool has fewer free than we expect,
# then we'll hit the timeout error # then we'll hit the timeout error
@@ -134,13 +136,13 @@ class TestCoroutinePool(LimitedTestCase):
# verify that the pool discards excess items put into it # verify that the pool discards excess items put into it
pool.resize(1) pool.resize(1)
# cause the wait_long_time functions to return, which will # cause the wait_long_time functions to return, which will
# trigger puts to the pool # trigger puts to the pool
evt.send(None) evt.send(None)
api.sleep(0) api.sleep(0)
api.sleep(0) api.sleep(0)
self.assertEquals(pool.free(), 1) self.assertEquals(pool.free(), 1)
self.assert_pool_has_free(pool, 1) self.assert_pool_has_free(pool, 1)
@@ -148,10 +150,10 @@ class TestCoroutinePool(LimitedTestCase):
pool.resize(2) pool.resize(2)
self.assertEquals(pool.free(), 2) self.assertEquals(pool.free(), 2)
self.assert_pool_has_free(pool, 2) self.assert_pool_has_free(pool, 2)
def test_stderr_raising(self): def test_stderr_raising(self):
# testing that really egregious errors in the error handling code # testing that really egregious errors in the error handling code
# (that prints tracebacks to stderr) don't cause the pool to lose # (that prints tracebacks to stderr) don't cause the pool to lose
# any members # any members
import sys import sys
pool = self.klass(min_size=1, max_size=1) pool = self.klass(min_size=1, max_size=1)
@@ -169,7 +171,7 @@ class TestCoroutinePool(LimitedTestCase):
self.assertRaises(RuntimeError, waiter.wait) self.assertRaises(RuntimeError, waiter.wait)
# the pool should have something free at this point since the # the pool should have something free at this point since the
# waiter returned # waiter returned
# pool.Pool change: if an exception is raised during execution of a link, # pool.Pool change: if an exception is raised during execution of a link,
# the rest of the links are scheduled to be executed on the next hub iteration # the rest of the links are scheduled to be executed on the next hub iteration
# this introduces a delay in updating pool.sem which makes pool.free() report 0 # this introduces a delay in updating pool.sem which makes pool.free() report 0
# therefore, sleep: # therefore, sleep:
@@ -198,7 +200,7 @@ class TestCoroutinePool(LimitedTestCase):
return 'ok' return 'ok'
pool.execute(slow) pool.execute(slow)
self.assertEquals(pool.wait(), 'ok') self.assertEquals(pool.wait(), 'ok')
def test_pool_smash(self): def test_pool_smash(self):
# The premise is that a coroutine in a Pool tries to get a token out # The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify # of a token pool but times out before getting the token. We verify
@@ -271,27 +273,27 @@ class PoolBasicTests(LimitedTestCase):
p = self.klass() p = self.klass()
evt = p.execute(lambda a: ('foo', a), 1) evt = p.execute(lambda a: ('foo', a), 1)
self.assertEqual(evt.wait(), ('foo', 1)) self.assertEqual(evt.wait(), ('foo', 1))
def test_with_intpool(self): def test_with_intpool(self):
from eventlet import pools from eventlet import pools
class IntPool(pools.Pool): class IntPool(pools.Pool):
def create(self): def create(self):
self.current_integer = getattr(self, 'current_integer', 0) + 1 self.current_integer = getattr(self, 'current_integer', 0) + 1
return self.current_integer return self.current_integer
def subtest(intpool_size, pool_size, num_executes): def subtest(intpool_size, pool_size, num_executes):
def run(int_pool): def run(int_pool):
token = int_pool.get() token = int_pool.get()
api.sleep(0.0001) api.sleep(0.0001)
int_pool.put(token) int_pool.put(token)
return token return token
int_pool = IntPool(max_size=intpool_size) int_pool = IntPool(max_size=intpool_size)
pool = self.klass(max_size=pool_size) pool = self.klass(max_size=pool_size)
for ix in xrange(num_executes): for ix in six.moves.range(num_executes):
pool.execute(run, int_pool) pool.execute(run, int_pool)
pool.waitall() pool.waitall()
subtest(4, 7, 7) subtest(4, 7, 7)
subtest(50, 75, 100) subtest(50, 75, 100)
for isize in (20, 30, 40, 50): for isize in (20, 30, 40, 50):

View File

@@ -72,7 +72,7 @@ class TestProc(LimitedTestCase):
p.link(event) p.link(event)
self.assertEqual(event.wait(), 100) self.assertEqual(event.wait(), 100)
for i in xrange(3): for i in range(3):
event2 = _event.Event() event2 = _event.Event()
p.link(event2) p.link(event2)
self.assertEqual(event2.wait(), 100) self.assertEqual(event2.wait(), 100)
@@ -156,7 +156,7 @@ class TestReturn_link(TestCase):
p = self.p = proc.spawn(return25) p = self.p = proc.spawn(return25)
self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0)) self._test_return(p, True, 25, proc.LinkedCompleted, lambda : sleep(0))
# repeating the same with dead process # repeating the same with dead process
for _ in xrange(3): for _ in range(3):
self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0)) self._test_return(p, False, 25, proc.LinkedCompleted, lambda : sleep(0))
def _test_return(self, p, first_time, result, kill_exc_type, action): def _test_return(self, p, first_time, result, kill_exc_type, action):
@@ -221,7 +221,7 @@ class TestRaise_link(TestCase):
p = self.p = proc.spawn(lambda : getcurrent().throw(ExpectedError('test_raise'))) p = self.p = proc.spawn(lambda : getcurrent().throw(ExpectedError('test_raise')))
self._test_raise(p, True, proc.LinkedFailed) self._test_raise(p, True, proc.LinkedFailed)
# repeating the same with dead process # repeating the same with dead process
for _ in xrange(3): for _ in range(3):
self._test_raise(p, False, proc.LinkedFailed) self._test_raise(p, False, proc.LinkedFailed)
def _test_kill(self, p, first_time, kill_exc_type): def _test_kill(self, p, first_time, kill_exc_type):
@@ -254,7 +254,7 @@ class TestRaise_link(TestCase):
p = self.p = proc.spawn(sleep, DELAY) p = self.p = proc.spawn(sleep, DELAY)
self._test_kill(p, True, proc.LinkedKilled) self._test_kill(p, True, proc.LinkedKilled)
# repeating the same with dead process # repeating the same with dead process
for _ in xrange(3): for _ in range(3):
self._test_kill(p, False, proc.LinkedKilled) self._test_kill(p, False, proc.LinkedKilled)
class TestRaise_link_exception(TestRaise_link): class TestRaise_link_exception(TestRaise_link):

View File

@@ -1,12 +1,15 @@
import weakref import weakref
from eventlet.green import thread from eventlet.green import thread
from eventlet import greenthread from eventlet import greenthread
from eventlet import event from eventlet import event
import eventlet import eventlet
from eventlet import corolocal from eventlet import corolocal
from eventlet.support import six
from tests import LimitedTestCase, skipped from tests import LimitedTestCase, skipped
class Locals(LimitedTestCase): class Locals(LimitedTestCase):
def passthru(self, *args, **kw): def passthru(self, *args, **kw):
self.results.append((args, kw)) self.results.append((args, kw))
@@ -54,43 +57,43 @@ class Locals(LimitedTestCase):
pass pass
eventlet.spawn(do_something).wait() eventlet.spawn(do_something).wait()
self.assertEqual(my_local.a, 1) self.assertEqual(my_local.a, 1)
def test_calls_init(self): def test_calls_init(self):
init_args = [] init_args = []
class Init(corolocal.local): class Init(corolocal.local):
def __init__(self, *args): def __init__(self, *args):
init_args.append((args, eventlet.getcurrent())) init_args.append((args, eventlet.getcurrent()))
my_local = Init(1,2,3) my_local = Init(1,2,3)
self.assertEqual(init_args[0][0], (1,2,3)) self.assertEqual(init_args[0][0], (1,2,3))
self.assertEqual(init_args[0][1], eventlet.getcurrent()) self.assertEqual(init_args[0][1], eventlet.getcurrent())
def do_something(): def do_something():
my_local.foo = 'bar' my_local.foo = 'bar'
self.assertEqual(len(init_args), 2, init_args) self.assertEqual(len(init_args), 2, init_args)
self.assertEqual(init_args[1][0], (1,2,3)) self.assertEqual(init_args[1][0], (1,2,3))
self.assertEqual(init_args[1][1], eventlet.getcurrent()) self.assertEqual(init_args[1][1], eventlet.getcurrent())
eventlet.spawn(do_something).wait() eventlet.spawn(do_something).wait()
def test_calling_methods(self): def test_calling_methods(self):
class Caller(corolocal.local): class Caller(corolocal.local):
def callme(self): def callme(self):
return self.foo return self.foo
my_local = Caller() my_local = Caller()
my_local.foo = "foo1" my_local.foo = "foo1"
self.assertEquals("foo1", my_local.callme()) self.assertEquals("foo1", my_local.callme())
def do_something(): def do_something():
my_local.foo = "foo2" my_local.foo = "foo2"
self.assertEquals("foo2", my_local.callme()) self.assertEquals("foo2", my_local.callme())
eventlet.spawn(do_something).wait() eventlet.spawn(do_something).wait()
my_local.foo = "foo3" my_local.foo = "foo3"
self.assertEquals("foo3", my_local.callme()) self.assertEquals("foo3", my_local.callme())
def test_no_leaking(self): def test_no_leaking(self):
refs = weakref.WeakKeyDictionary() refs = weakref.WeakKeyDictionary()
my_local = corolocal.local() my_local = corolocal.local()
@@ -100,12 +103,11 @@ class Locals(LimitedTestCase):
o = X() o = X()
refs[o] = True refs[o] = True
my_local.foo = o my_local.foo = o
p = eventlet.GreenPool() p = eventlet.GreenPool()
for i in xrange(100): for i in six.moves.range(100):
p.spawn(do_something, i) p.spawn(do_something, i)
p.waitall() p.waitall()
del p del p
# at this point all our coros have terminated # at this point all our coros have terminated
self.assertEqual(len(refs), 1) self.assertEqual(len(refs), 1)

View File

@@ -23,19 +23,24 @@ import gc
from tests import skipped, skip_with_pyevent, LimitedTestCase, main from tests import skipped, skip_with_pyevent, LimitedTestCase, main
from eventlet import tpool, debug from eventlet import tpool, debug
from eventlet.support import six
import eventlet import eventlet
one = 1 one = 1
two = 2 two = 2
three = 3 three = 3
none = None none = None
def noop(): def noop():
pass pass
def raise_exception(): def raise_exception():
raise RuntimeError("hi") raise RuntimeError("hi")
class TestTpool(LimitedTestCase): class TestTpool(LimitedTestCase):
def setUp(self): def setUp(self):
super(TestTpool, self).setUp() super(TestTpool, self).setUp()
@@ -144,24 +149,24 @@ class TestTpool(LimitedTestCase):
@skip_with_pyevent @skip_with_pyevent
def test_wrap_iterator(self): def test_wrap_iterator(self):
self.reset_timeout(2) self.reset_timeout(2)
prox = tpool.Proxy(xrange(10)) prox = tpool.Proxy(range(10))
result = [] result = []
for i in prox: for i in prox:
result.append(i) result.append(i)
self.assertEquals(range(10), result) self.assertEquals(list(range(10)), result)
@skip_with_pyevent @skip_with_pyevent
def test_wrap_iterator2(self): def test_wrap_iterator2(self):
self.reset_timeout(5) # might take a while due to imprecise sleeping self.reset_timeout(5) # might take a while due to imprecise sleeping
def foo(): def foo():
import time import time
for x in xrange(2): for x in range(2):
yield x yield x
time.sleep(0.001) time.sleep(0.001)
counter = [0] counter = [0]
def tick(): def tick():
for i in xrange(20000): for i in six.moves.range(20000):
counter[0]+=1 counter[0]+=1
if counter[0] % 20 == 0: if counter[0] % 20 == 0:
eventlet.sleep(0.0001) eventlet.sleep(0.0001)
@@ -296,7 +301,7 @@ class TpoolLongTests(LimitedTestCase):
def sender_loop(loopnum): def sender_loop(loopnum):
obj = tpool.Proxy(Dummy()) obj = tpool.Proxy(Dummy())
count = 100 count = 100
for n in xrange(count): for n in six.moves.range(count):
eventlet.sleep(random.random()/200.0) eventlet.sleep(random.random()/200.0)
now = time.time() now = time.time()
token = loopnum * count + n token = loopnum * count + n
@@ -306,7 +311,7 @@ class TpoolLongTests(LimitedTestCase):
cnt = 10 cnt = 10
pile = eventlet.GreenPile(cnt) pile = eventlet.GreenPile(cnt)
for i in xrange(cnt): for i in six.moves.range(cnt):
pile.spawn(sender_loop,i) pile.spawn(sender_loop,i)
results = list(pile) results = list(pile)
self.assertEquals(len(results), cnt) self.assertEquals(len(results), cnt)
@@ -339,14 +344,14 @@ from eventlet.tpool import execute
tpool.execute(noop) # get it started tpool.execute(noop) # get it started
gc.collect() gc.collect()
initial_objs = len(gc.get_objects()) initial_objs = len(gc.get_objects())
for i in xrange(10): for i in range(10):
self.assertRaises(RuntimeError, tpool.execute, raise_exception) self.assertRaises(RuntimeError, tpool.execute, raise_exception)
gc.collect() gc.collect()
middle_objs = len(gc.get_objects()) middle_objs = len(gc.get_objects())
# some objects will inevitably be created by the previous loop # some objects will inevitably be created by the previous loop
# now we test to ensure that running the loop an order of # now we test to ensure that running the loop an order of
# magnitude more doesn't generate additional objects # magnitude more doesn't generate additional objects
for i in xrange(100): for i in six.moves.range(100):
self.assertRaises(RuntimeError, tpool.execute, raise_exception) self.assertRaises(RuntimeError, tpool.execute, raise_exception)
first_created = middle_objs - initial_objs first_created = middle_objs - initial_objs
gc.collect() gc.collect()

View File

@@ -23,7 +23,7 @@ def handle(ws):
break break
ws.send(m) ws.send(m)
elif ws.path == '/range': elif ws.path == '/range':
for i in xrange(10): for i in range(10):
ws.send("msg %d" % i) ws.send("msg %d" % i)
eventlet.sleep(0.01) eventlet.sleep(0.01)
elif ws.path == '/error': elif ws.path == '/error':