diff --git a/AUTHORS b/AUTHORS index da0538a..d41dc71 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,3 +1,7 @@ +Maintainer (i.e., Who To Hassle If You Find Bugs) +------------------------------------------------- +Ryan Williams, rdw on Freenode, breath@alum.mit.edu + Original Authors ---------------- * Bob Ippolito @@ -12,6 +16,7 @@ Contributors * Mike Barton * Patrick Carlisle * Ben Ford +* Andrew Godwin * Brantley Harris * Gregory Holt * Joe Malicki @@ -23,6 +28,7 @@ Contributors * Sergey Shepelev * Chuck Thier * Daniele Varrazzo +* Ryan Williams Linden Lab Contributors ----------------------- @@ -50,4 +56,6 @@ Thanks To * Slant, better iterator implementation in tpool * Ambroff, nice pygtk hub example * Michael Carter, and Marcin Bachry, nice repro of a bug and good diagnosis leading to the fix -* David Ziegler, reporting issue #53 \ No newline at end of file +* David Ziegler, reporting issue #53 +* Favo Yang, twisted hub patch +* Schmir, patch that fixes readline method with chunked encoding in wsgi.py \ No newline at end of file diff --git a/NEWS b/NEWS index 61cab49..02bc6d4 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,20 @@ +0.9.9 +===== +* A fix for monkeypatching on systems with psycopg version 2.0.14. +* Improved support for chunked transfers in wsgi, plus a bunch of tests from schmir (ported from gevent by redbo) +* A fix for the twisted hub from Favo Yang + +0.9.8 +===== +* Support for psycopg2's asynchronous mode, from Daniele Varrazzo +* websocket module is now part of core Eventlet with 100% unit test coverage thanks to Ben Ford. See its documentation at http://eventlet.net/doc/modules/websocket.html +* Added wrap_ssl convenience method, meaning that we truly no longer need api or util modules. +* Multiple-reader detection code protects against the common mistake of having multiple greenthreads read from the same socket at the same time, which can be overridden if you know what you're doing. +* Cleaner monkey_patch API: the "all" keyword is no longer necessary. +* Pool objects have a more convenient constructor -- no more need to subclass +* amajorek's reimplementation of GreenPipe +* Many bug fixes, major and minor. + 0.9.7 ===== * GreenPipe is now a context manager (thanks, quad) diff --git a/doc/basic_usage.rst b/doc/basic_usage.rst index f33a7aa..7b0aad3 100644 --- a/doc/basic_usage.rst +++ b/doc/basic_usage.rst @@ -74,6 +74,8 @@ Network Convenience Functions .. autofunction:: eventlet.listen +.. autofunction:: eventlet.wrap_ssl + .. autofunction:: eventlet.serve .. autofunction:: eventlet.StopServe diff --git a/doc/environment.rst b/doc/environment.rst new file mode 100644 index 0000000..29387e6 --- /dev/null +++ b/doc/environment.rst @@ -0,0 +1,26 @@ +Environment Variables +====================== + +Eventlet's behavior can be controlled by a few environment variables. +These are only for the advanced user. + +EVENTLET_HUB + + Used to force Eventlet to use the specified hub instead of the + optimal one. See :ref:`understanding_hubs` for the list of + acceptable hubs and what they mean (note that picking a hub not on + the list will silently fail). Equivalent to calling + :meth:`eventlet.hubs.use_hub` at the beginning of the program. + +EVENTLET_THREADPOOL_SIZE + + The size of the threadpool in :mod:`~eventlet.tpool`. This is an + environment variable because tpool constructs its pool on first + use, so any control of the pool size needs to happen before then. + +EVENTLET_TPOOL_DNS + + If set to 'yes', uses :func:`eventlet.tpool.execute` to call + :func:`~socket.gethostbyname` and :func:`~socket.getaddrinfo`, + making them appear non-blocking. This environment variable is + ignored on OS X. diff --git a/doc/examples.rst b/doc/examples.rst index a1f68e5..c16bf26 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -64,12 +64,33 @@ Port Forwarder .. literalinclude:: ../examples/forwarder.py +.. _recursive_crawler_example: + +Recursive Web Crawler +----------------------------------------- +``examples/recursive_crawler.py`` + +This is an example recursive web crawler that fetches linked pages from a seed url. + +.. literalinclude:: ../examples/recursive_crawler.py + .. _producer_consumer_example: -Producer Consumer/Recursive Web Crawler +Producer Consumer Web Crawler ----------------------------------------- ``examples/producer_consumer.py`` -This is an example implementation of the producer/consumer pattern as well as a functional recursive web crawler. +This is an example implementation of the producer/consumer pattern as well as being identical in functionality to the recursive web crawler. .. literalinclude:: ../examples/producer_consumer.py + +.. _websocket_example: + +Websocket Server Example +-------------------------- +``examples/websocket.py`` + +This exercises some of the features of the websocket server +implementation. + +.. literalinclude:: ../examples/websocket.py diff --git a/doc/index.rst b/doc/index.rst index 581e94b..2682edc 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -31,6 +31,7 @@ Contents threading hubs testing + environment modules diff --git a/doc/modules.rst b/doc/modules.rst index f33f4db..78561c3 100644 --- a/doc/modules.rst +++ b/doc/modules.rst @@ -15,4 +15,5 @@ Module Reference modules/queue modules/semaphore modules/timeout + modules/websocket modules/wsgi diff --git a/doc/modules/websocket.rst b/doc/modules/websocket.rst new file mode 100644 index 0000000..78660cd --- /dev/null +++ b/doc/modules/websocket.rst @@ -0,0 +1,30 @@ +:mod:`websocket` -- Websocket Server +===================================== + +This module provides a simple way to create a `websocket +` server. It works with a few +tweaks in the :mod:`~eventlet.wsgi` module that allow websockets to +coexist with other WSGI applications. + +To create a websocket server, simply decorate a handler method with +:class:`WebSocketWSGI` and use it as a wsgi application:: + + from eventlet import wsgi, websocket + import eventlet + + @websocket.WebSocketWSGI + def hello_world(ws): + ws.send("hello world") + + wsgi.server(eventlet.listen(('', 8090)), hello_world) + + +You can find a slightly more elaborate version of this code in the file +``examples/websocket.py``. + +**Note** that the web socket spec is still under development, and it +will be necessary to change the way that this module works in response +to spec changes. + +.. automodule:: eventlet.websocket + :members: diff --git a/doc/real_index.html b/doc/real_index.html index 98d4927..aa467c6 100644 --- a/doc/real_index.html +++ b/doc/real_index.html @@ -41,7 +41,7 @@ easy_install eventlet

Alternately, you can download the source tarball:

diff --git a/doc/testing.rst b/doc/testing.rst index 38a94db..596bc5a 100644 --- a/doc/testing.rst +++ b/doc/testing.rst @@ -23,6 +23,8 @@ That's it! The output from running nose is the same as unittest's output, if th Many tests are skipped based on environmental factors; for example, it makes no sense to test Twisted-specific functionality when Twisted is not installed. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped. +.. note:: If running Python version 2.4, use this command instead: ``python tests/nosewrapper.py``. There are several tests which make use of the `with` statement and therefore will cause nose grief when it tries to import them; nosewrapper.py excludes these tests so they are skipped. + Doctests -------- diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 7361da2..263e8d8 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 7, "dev1") +version_info = (0, 9, 10, "dev1") __version__ = ".".join(map(str, version_info)) try: @@ -31,6 +31,7 @@ try: listen = convenience.listen serve = convenience.serve StopServe = convenience.StopServe + wrap_ssl = convenience.wrap_ssl getcurrent = greenlet.greenlet.getcurrent diff --git a/eventlet/api.py b/eventlet/api.py index 3ee73cb..5b78e83 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -68,6 +68,8 @@ def ssl_listener(address, certificate, private_key): Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. """ + warnings.warn("""eventlet.api.ssl_listener is deprecated. Please use eventlet.wrap_ssl(eventlet.listen()) instead.""", + DeprecationWarning, stacklevel=2) from eventlet import util import socket diff --git a/eventlet/convenience.py b/eventlet/convenience.py index b737242..887ca3b 100644 --- a/eventlet/convenience.py +++ b/eventlet/convenience.py @@ -97,11 +97,14 @@ def serve(sock, handle, concurrency=1000): def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, - cert_reqs=None, ssl_version=None, ca_certs=None, + cert_reqs=0, ssl_version=2, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True): - """Convenience function for converting a regular socket into an SSL - socket. Has the same interface as :func:`ssl.wrap_socket`, but - works on 2.5 or earlier, using PyOpenSSL. + """Convenience function for converting a regular socket into an + SSL socket. Has the same interface as :func:`ssl.wrap_socket`, + but works on 2.5 or earlier, using PyOpenSSL (though note that it + ignores the *cert_reqs*, *ssl_version*, *ca_certs*, + *do_handshake_on_connect*, and *suppress_ragged_eofs* arguments + when using PyOpenSSL). The preferred idiom is to call wrap_ssl directly on the creation method, e.g., ``wrap_ssl(connect(addr))`` or @@ -111,4 +114,41 @@ def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, :return Green SSL object. """ - pass + return wrap_ssl_impl(sock, keyfile=keyfile, certfile=certfile, + server_side=server_side, + cert_reqs=cert_reqs, + ssl_version=ssl_version, + ca_certs=ca_certs, + do_handshake_on_connect=do_handshake_on_connect, + suppress_ragged_eofs=suppress_ragged_eofs) + +try: + from eventlet.green import ssl + wrap_ssl_impl = ssl.wrap_socket +except ImportError: + # < 2.6, trying PyOpenSSL + from eventlet.green.OpenSSL import SSL + try: + def wrap_ssl_impl(sock, keyfile=None, certfile=None, server_side=False, + cert_reqs=None, ssl_version=None, ca_certs=None, + do_handshake_on_connect=True, suppress_ragged_eofs=True): + # theoretically the ssl_version could be respected in this + # next line + context = SSL.Context(SSL.SSLv23_METHOD) + if certfile is not None: + context.use_certificate_file(certfile) + if keyfile is not None: + context.use_privatekey_file(keyfile) + context.set_verify(SSL.VERIFY_NONE, lambda *x: True) + + connection = SSL.Connection(context, sock) + if server_side: + connection.set_accept_state() + else: + connection.set_connect_state() + return connection + except ImportError: + def wrap_ssl_impl(*a, **kw): + raise ImportError("To use SSL with Eventlet, " + "you must install PyOpenSSL or use Python 2.6 or later.") + diff --git a/eventlet/debug.py b/eventlet/debug.py index 2920fd9..70d0aca 100644 --- a/eventlet/debug.py +++ b/eventlet/debug.py @@ -7,10 +7,11 @@ import linecache import re import inspect -__all__ = ['spew', 'unspew', 'format_hub_listeners', 'hub_listener_stacks', -'hub_exceptions', 'tpool_exceptions'] +__all__ = ['spew', 'unspew', 'format_hub_listeners', 'format_hub_timers', + 'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions', + 'hub_prevent_multiple_readers', 'hub_timer_stacks'] -_token_spliter = re.compile('\W+') +_token_splitter = re.compile('\W+') class Spew(object): """ @@ -42,7 +43,7 @@ class Spew(object): if not self.show_values: return self details = [] - tokens = _token_spliter.split(line) + tokens = _token_splitter.split(line) for tok in tokens: if tok in frame.f_globals: details.append('%s=%r' % (tok, frame.f_globals[tok])) diff --git a/eventlet/green/socket.py b/eventlet/green/socket.py index 75fe442..866c5ea 100644 --- a/eventlet/green/socket.py +++ b/eventlet/green/socket.py @@ -12,8 +12,8 @@ os = __import__('os') import sys import warnings -__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', - 'ssl', 'socket'] +__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'getaddrinfo', + 'create_connection', 'ssl', 'socket'] try: __original_fromfd__ = __socket.fromfd @@ -31,20 +31,11 @@ except AttributeError: pass __original_gethostbyname__ = __socket.gethostbyname -def gethostbyname(name): - can_use_tpool = os.environ.get("EVENTLET_TPOOL_GETHOSTBYNAME", - '').lower() == "yes" - if getattr(get_hub(), 'uses_twisted_reactor', None): - globals()['gethostbyname'] = _gethostbyname_twisted - elif sys.platform.startswith('darwin') or not can_use_tpool: - # the thread primitives on Darwin have some bugs that make - # it undesirable to use tpool for hostname lookups - globals()['gethostbyname'] = __original_gethostbyname__ - else: - globals()['gethostbyname'] = _gethostbyname_tpool - - return globals()['gethostbyname'](name) - +# the thread primitives on Darwin have some bugs that make +# it undesirable to use tpool for hostname lookups +_can_use_tpool = ( + os.environ.get("EVENTLET_TPOOL_DNS",'').lower() == "yes" + and not sys.platform.startswith('darwin')) def _gethostbyname_twisted(name): from twisted.internet import reactor from eventlet.twistedutil import block_on as _block_on @@ -55,12 +46,25 @@ def _gethostbyname_tpool(name): return tpool.execute( __original_gethostbyname__, name) -# def getaddrinfo(*args, **kw): -# return tpool.execute( -# __socket.getaddrinfo, *args, **kw) -# -# XXX there're few more blocking functions in socket -# XXX having a hub-independent way to access thread pool would be nice +if getattr(get_hub(), 'uses_twisted_reactor', None): + gethostbyname = _gethostbyname_twisted +elif _can_use_tpool: + gethostbyname = _gethostbyname_tpool +else: + gethostbyname = __original_gethostbyname__ + + +__original_getaddrinfo__ = __socket.getaddrinfo +def _getaddrinfo_tpool(*args, **kw): + from eventlet import tpool + return tpool.execute( + __original_getaddrinfo__, *args, **kw) + +if _can_use_tpool: + getaddrinfo = _getaddrinfo_tpool +else: + getaddrinfo = __original_getaddrinfo__ + def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT): """Connect to *address* and return the socket object. diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index 9c7ea5e..9e003f9 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -1,4 +1,4 @@ -"""implements standard module 'thread' with greenlets""" +"""Implements the standard thread module, using greenthreads.""" __thread = __import__('thread') from eventlet.support import greenlets as greenlet from eventlet import greenthread @@ -49,4 +49,4 @@ if hasattr(__thread, 'stack_size'): pass # not going to decrease stack_size, because otherwise other greenlets in this thread will suffer -from eventlet.corolocal import local as _local \ No newline at end of file +from eventlet.corolocal import local as _local diff --git a/eventlet/green/threading.py b/eventlet/green/threading.py index 659150e..7d61c58 100644 --- a/eventlet/green/threading.py +++ b/eventlet/green/threading.py @@ -11,13 +11,3 @@ patcher.inject('threading', ('time', time)) del patcher - -def _patch_main_thread(mod): - # this is some gnarly patching for the threading module; - # if threading is imported before we patch (it nearly always is), - # then the main thread will have the wrong key in therading._active, - # so, we try and replace that key with the correct one here - # this works best if there are no other threads besides the main one - curthread = mod._active.pop(mod._get_ident(), None) - if curthread: - mod._active[thread.get_ident()] = curthread diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 76f5747..f8ba355 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -77,7 +77,7 @@ class Hub(BaseHub): return try: presult = self.do_poll(seconds) - except select.error, e: + except (IOError, select.error), e: if get_errno(e) == errno.EINTR: return raise diff --git a/eventlet/patcher.py b/eventlet/patcher.py index ad484f6..6cc6502 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -5,6 +5,33 @@ __all__ = ['inject', 'import_patched', 'monkey_patch', 'is_monkey_patched'] __exclude = set(('__builtins__', '__file__', '__name__')) +class SysModulesSaver(object): + """Class that captures some subset of the current state of + sys.modules. Pass in an iterator of module names to the + constructor.""" + def __init__(self, module_names=()): + self._saved = {} + self.save(*module_names) + + def save(self, *module_names): + """Saves the named modules to the object.""" + for modname in module_names: + self._saved[modname] = sys.modules.get(modname, None) + + def restore(self): + """Restores the modules that the saver knows about into + sys.modules. + """ + for modname, mod in self._saved.iteritems(): + if mod is not None: + sys.modules[modname] = mod + else: + try: + del sys.modules[modname] + except KeyError: + pass + + def inject(module_name, new_globals, *additional_modules): """Base method for "injecting" greened modules into an imported module. It imports the module specified in *module_name*, arranging things so @@ -34,16 +61,20 @@ def inject(module_name, new_globals, *additional_modules): _green_socket_modules() + _green_thread_modules() + _green_time_modules()) + + # after this we are gonna screw with sys.modules, so capture the + # state of all the modules we're going to mess with + saver = SysModulesSaver([name for name, m in additional_modules]) + saver.save(module_name) - ## Put the specified modules in sys.modules for the duration of the import - saved = {} + # Cover the target modules so that when you import the module it + # sees only the patched versions for name, mod in additional_modules: - saved[name] = sys.modules.get(name, None) sys.modules[name] = mod ## Remove the old module from sys.modules and reimport it while ## the specified modules are in place - old_module = sys.modules.pop(module_name, None) + sys.modules.pop(module_name, None) try: module = __import__(module_name, {}, {}, module_name.split('.')[:-1]) @@ -56,18 +87,7 @@ def inject(module_name, new_globals, *additional_modules): ## Keep a reference to the new module to prevent it from dying sys.modules[patched_name] = module finally: - ## Put the original module back - if old_module is not None: - sys.modules[module_name] = old_module - elif module_name in sys.modules: - del sys.modules[module_name] - - ## Put all the saved modules back - for name, mod in additional_modules: - if saved[name] is not None: - sys.modules[name] = saved[name] - else: - del sys.modules[name] + saver.restore() ## Put the original modules back return module @@ -86,8 +106,11 @@ def import_patched(module_name, *additional_modules, **kw_additional_modules): def patch_function(func, *additional_modules): - """Huge hack here -- patches the specified modules for the - duration of the function call.""" + """Decorator that returns a version of the function that patches + some modules for the duration of the function call. This is + deeply gross and should only be used for functions that import + network libraries within their function bodies that there is no + way of getting around.""" if not additional_modules: # supply some defaults additional_modules = ( @@ -98,35 +121,67 @@ def patch_function(func, *additional_modules): _green_time_modules()) def patched(*args, **kw): - saved = {} + saver = SysModulesSaver() for name, mod in additional_modules: - saved[name] = sys.modules.get(name, None) + saver.save(name) sys.modules[name] = mod try: return func(*args, **kw) finally: - ## Put all the saved modules back - for name, mod in additional_modules: - if saved[name] is not None: - sys.modules[name] = saved[name] - else: - del sys.modules[name] + saver.restore() return patched -_originals = {} -def original(modname): - mod = _originals.get(modname) - if mod is None: - # re-import the "pure" module and store it in the global _originals - # dict; be sure to restore whatever module had that name already - current_mod = sys.modules.pop(modname, None) +def _original_patch_function(func, *module_names): + """Kind of the contrapositive of patch_function: decorates a + function such that when it's called, sys.modules is populated only + with the unpatched versions of the specified modules. Unlike + patch_function, only the names of the modules need be supplied, + and there are no defaults. This is a gross hack; tell your kids not + to import inside function bodies!""" + def patched(*args, **kw): + saver = SysModulesSaver(module_names) + for name in module_names: + sys.modules[name] = original(name) try: - real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) - _originals[modname] = real_mod + return func(*args, **kw) finally: - if current_mod is not None: - sys.modules[modname] = current_mod - return _originals.get(modname) + saver.restore() + return patched + + +def original(modname): + """ This returns an unpatched version of a module; this is useful for + Eventlet itself (i.e. tpool).""" + # note that it's not necessary to temporarily install unpatched + # versions of all patchable modules during the import of the + # module; this is because none of them import each other, except + # for threading which imports thread + original_name = '__original_module_' + modname + if original_name in sys.modules: + return sys.modules.get(original_name) + + # re-import the "pure" module and store it in the global _originals + # dict; be sure to restore whatever module had that name already + saver = SysModulesSaver((modname,)) + sys.modules.pop(modname, None) + # install original thread module if we're getting the original + # threading module + if modname == 'threading': + saver.save('thread') + sys.modules['thread'] = original('thread') + try: + real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) + # hacky hack: Queue's constructor imports threading; therefore + # we wrap it with something that ensures it always gets the + # original threading + if modname == 'Queue': + real_mod.Queue.__init__ = _original_patch_function(real_mod.Queue.__init__, 'threading') + # save a reference to the unpatched module so it doesn't get lost + sys.modules[original_name] = real_mod + finally: + saver.restore() + + return sys.modules[original_name] already_patched = {} def monkey_patch(**on): @@ -154,6 +209,7 @@ def monkey_patch(**on): on.setdefault(modname, default_on) modules_to_patch = [] + patched_thread = False if on['os'] and not already_patched.get('os'): modules_to_patch += _green_os_modules() already_patched['os'] = True @@ -164,10 +220,7 @@ def monkey_patch(**on): modules_to_patch += _green_socket_modules() already_patched['socket'] = True if on['thread'] and not already_patched.get('thread'): - # hacks ahead - threading = original('threading') - import eventlet.green.threading as greenthreading - greenthreading._patch_main_thread(threading) + patched_thread = True modules_to_patch += _green_thread_modules() already_patched['thread'] = True if on['time'] and not already_patched.get('time'): @@ -188,11 +241,32 @@ def monkey_patch(**on): for name, mod in modules_to_patch: orig_mod = sys.modules.get(name) + if orig_mod is None: + orig_mod = __import__(name) for attr_name in mod.__patched__: patched_attr = getattr(mod, attr_name, None) if patched_attr is not None: setattr(orig_mod, attr_name, patched_attr) + # hacks ahead; this is necessary to prevent a KeyError on program exit + if patched_thread: + _patch_main_thread(sys.modules['threading']) + + +def _patch_main_thread(mod): + """This is some gnarly patching specific to the threading module; + threading will always be initialized prior to monkeypatching, and + its _active dict will have the wrong key (it uses the real thread + id but once it's patched it will use the greenlet ids); so what we + do is rekey the _active dict so that the main thread's entry uses + the greenthread key. Other threads' keys are ignored.""" + thread = original('thread') + curthread = mod._active.pop(thread.get_ident(), None) + if curthread: + import eventlet.green.thread + mod._active[eventlet.green.thread.get_ident()] = curthread + + def is_monkey_patched(module): """Returns True if the given module is monkeypatched currently, False if not. *module* can be either the module itself or its name. diff --git a/eventlet/timeout.py b/eventlet/timeout.py index 135545b..573f4ab 100644 --- a/eventlet/timeout.py +++ b/eventlet/timeout.py @@ -57,7 +57,7 @@ class Timeout(BaseException): '%r is already started; to restart it, cancel it first' % self if self.seconds is None: # "fake" timeout (never expires) self.timer = None - elif self.exception is None or self.exception is False: # timeout that raises self + elif self.exception is None or isinstance(self.exception, bool): # timeout that raises self self.timer = get_hub().schedule_call_global( self.seconds, greenlet.getcurrent().throw, self) else: # regular timeout with user-provided exception @@ -112,7 +112,7 @@ class Timeout(BaseException): suffix = '' else: suffix = 's' - if self.exception is None: + if self.exception is None or self.exception is True: return '%s second%s' % (self.seconds, suffix) elif self.exception is False: return '%s second%s (silent)' % (self.seconds, suffix) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index 7e9dd45..551cd4f 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -16,13 +16,14 @@ import os import sys -from Queue import Empty, Queue - from eventlet import event from eventlet import greenio from eventlet import greenthread from eventlet import patcher threading = patcher.original('threading') +Queue_module = patcher.original('Queue') +Queue = Queue_module.Queue +Empty = Queue_module.Empty __all__ = ['execute', 'Proxy', 'killall'] @@ -67,7 +68,10 @@ SYS_EXCS = (KeyboardInterrupt, SystemExit) def tworker(): global _reqq, _rspq while(True): - msg = _reqq.get() + try: + msg = _reqq.get() + except AttributeError: + return # can't get anything off of a dud queue if msg is None: return (e,meth,args,kwargs) = msg @@ -192,11 +196,16 @@ class Proxy(object): return proxy_call(self._autowrap, self._obj.__deepcopy__, memo) def __copy__(self, memo=None): return proxy_call(self._autowrap, self._obj.__copy__, memo) + def __call__(self, *a, **kw): + if '__call__' in self._autowrap_names: + return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw)) + else: + return proxy_call(self._autowrap, self._obj, *a, **kw) # these don't go through a proxy call, because they're likely to # be called often, and are unlikely to be implemented on the # wrapped object in such a way that they would block def __eq__(self, rhs): - return self._obj.__eq__(rhs) + return self._obj == rhs def __hash__(self): return self._obj.__hash__() def __repr__(self): @@ -208,10 +217,11 @@ class Proxy(object): def __nonzero__(self): return bool(self._obj) def __iter__(self): - if iter(self._obj) == self._obj: + it = iter(self._obj) + if it == self._obj: return self else: - return Proxy(iter(self._obj)) + return Proxy(it) def next(self): return proxy_call(self._autowrap, self._obj.next) @@ -231,7 +241,7 @@ def setup(): _rpipe, _wpipe = os.pipe() _wfile = greenio.GreenPipe(_wpipe, 'wb', 0) _rfile = greenio.GreenPipe(_rpipe, 'rb', 0) - except ImportError: + except (ImportError, NotImplementedError): # This is Windows compatibility -- use a socket instead of a pipe because # pipes don't really exist on Windows. import socket @@ -248,7 +258,7 @@ def setup(): _reqq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1) for i in range(0,_nthreads): - t = threading.Thread(target=tworker) + t = threading.Thread(target=tworker, name="tpool_thread_%s" % i) t.setDaemon(True) t.start() _threads.add(t) diff --git a/eventlet/twistedutil/join_reactor.py b/eventlet/twistedutil/join_reactor.py index 4e9ccc2..5964cbf 100644 --- a/eventlet/twistedutil/join_reactor.py +++ b/eventlet/twistedutil/join_reactor.py @@ -4,9 +4,8 @@ You generally don't have to use it unless you need to call reactor.run() yourself. """ from eventlet.hubs.twistedr import BaseTwistedHub -from eventlet import use_hub from eventlet.support import greenlets as greenlet -from eventlet.hubs import _threadlocal +from eventlet.hubs import _threadlocal, use_hub use_hub(BaseTwistedHub) assert not hasattr(_threadlocal, 'hub') diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 5cd2447..fd35b75 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -1,5 +1,13 @@ import collections import errno +import string +import struct +from socket import error as SocketError + +try: + from hashlib import md5 +except ImportError: #pragma NO COVER + from md5 import md5 import eventlet from eventlet import semaphore @@ -9,11 +17,26 @@ from eventlet.support import get_errno ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE)) +__all__ = ["WebSocketWSGI", "WebSocket"] + class WebSocketWSGI(object): - """This is a WSGI application that serves up websocket connections. + """Wraps a websocket handler function in a WSGI application. + + Use it like this:: + + @websocket.WebSocketWSGI + def my_handler(ws): + from_browser = ws.wait() + ws.send("from server") + + The single argument to the function will be an instance of + :class:`WebSocket`. To close the socket, simply return from the + function. Note that the server will log the websocket request at + the time of closure. """ def __init__(self, handler): self.handler = handler + self.protocol_version = None def __call__(self, environ, start_response): if not (environ.get('HTTP_CONNECTION') == 'Upgrade' and @@ -21,61 +44,123 @@ class WebSocketWSGI(object): # need to check a few more things here for true compliance start_response('400 Bad Request', [('Connection','close')]) return [] + + # See if they sent the new-format headers + if 'HTTP_SEC_WEBSOCKET_KEY1' in environ: + self.protocol_version = 76 + if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ: + # That's bad. + start_response('400 Bad Request', [('Connection','close')]) + return [] + else: + self.protocol_version = 75 + # Get the underlying socket and wrap a WebSocket class around it sock = environ['eventlet.input'].get_socket() - ws = WebSocket(sock, environ) - handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - "Upgrade: WebSocket\r\n" - "Connection: Upgrade\r\n" - "WebSocket-Origin: %s\r\n" - "WebSocket-Location: ws://%s%s\r\n\r\n" % ( - environ.get('HTTP_ORIGIN'), - environ.get('HTTP_HOST'), - environ.get('PATH_INFO'))) + ws = WebSocket(sock, environ, self.protocol_version) + + # If it's new-version, we need to work out our challenge response + if self.protocol_version == 76: + key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1']) + key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2']) + # There's no content-length header in the request, but it has 8 + # bytes of data. + environ['wsgi.input'].content_length = 8 + key3 = environ['wsgi.input'].read(8) + key = struct.pack(">II", key1, key2) + key3 + response = md5(key).digest() + + # Start building the response + if self.protocol_version == 75: + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "WebSocket-Origin: %s\r\n" + "WebSocket-Location: ws://%s%s\r\n\r\n" % ( + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_HOST'), + environ.get('PATH_INFO'))) + elif self.protocol_version == 76: + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Origin: %s\r\n" + "Sec-WebSocket-Protocol: %s\r\n" + "Sec-WebSocket-Location: ws://%s%s\r\n" + "\r\n%s"% ( + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'), + environ.get('HTTP_HOST'), + environ.get('PATH_INFO'), + response)) + else: #pragma NO COVER + raise ValueError("Unknown WebSocket protocol version.") + sock.sendall(handshake_reply) try: self.handler(ws) except socket.error, e: if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: raise + # Make sure we send the closing frame + ws._send_closing_frame(True) # use this undocumented feature of eventlet.wsgi to ensure that it # doesn't barf on the fact that we didn't call start_response return wsgi.ALREADY_HANDLED + def _extract_number(self, value): + """ + Utility function which, given a string like 'g98sd 5[]221@1', will + return 9852211. Used to parse the Sec-WebSocket-Key headers. + """ + out = "" + spaces = 0 + for char in value: + if char in string.digits: + out += char + elif char == " ": + spaces += 1 + return int(out) / spaces class WebSocket(object): - """The object representing the server side of a websocket. + """A websocket object that handles the details of + serialization/deserialization to the socket. - The primary way to interact with a WebSocket object is to call - :meth:`send` and :meth:`wait` in order to pass messages back and - forth with the client. Also available are the following properties: + The primary way to interact with a :class:`WebSocket` object is to + call :meth:`send` and :meth:`wait` in order to pass messages back + and forth with the browser. Also available are the following + properties: path - The path value of the request. This is the same as the WSGI PATH_INFO variable. + The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient. protocol The value of the Websocket-Protocol header. origin The value of the 'Origin' header. environ The full WSGI environment for this request. + """ - def __init__(self, sock, environ): + def __init__(self, sock, environ, version=76): """ :param socket: The eventlet socket :type socket: :class:`eventlet.greenio.GreenSocket` :param environ: The wsgi environment + :param version: The WebSocket spec version to follow (default is 76) """ self.socket = sock self.origin = environ.get('HTTP_ORIGIN') self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL') self.path = environ.get('PATH_INFO') self.environ = environ + self.version = version + self.websocket_closed = False self._buf = "" self._msgs = collections.deque() self._sendlock = semaphore.Semaphore() @staticmethod - def pack_message(message): + def _pack_message(message): """Pack the message inside ``00`` and ``FF`` As per the dataframing section (5.3) for the websocket spec @@ -87,11 +172,10 @@ class WebSocket(object): packed = "\x00%s\xFF" % message return packed - def parse_messages(self): + def _parse_messages(self): """ Parses for messages in the buffer *buf*. It is assumed that the buffer contains the start character for a message, but that it - may contain only part of the rest of the message. NOTE: only understands - lengthless messages for now. + may contain only part of the rest of the message. Returns an array of messages, and the buffer remainder that didn't contain any full messages.""" @@ -99,20 +183,29 @@ class WebSocket(object): end_idx = 0 buf = self._buf while buf: - assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf - end_idx = buf.find("\xFF") - if end_idx == -1: #pragma NO COVER + frame_type = ord(buf[0]) + if frame_type == 0: + # Normal message. + end_idx = buf.find("\xFF") + if end_idx == -1: #pragma NO COVER + break + msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) + buf = buf[end_idx+1:] + elif frame_type == 255: + # Closing handshake. + assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf + self.websocket_closed = True break - msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) - buf = buf[end_idx+1:] + else: + raise ValueError("Don't understand how to parse this type of message: %r" % buf) self._buf = buf return msgs def send(self, message): - """Send a message to the client. *message* should be + """Send a message to the browser. *message* should be convertable to a string; unicode objects should be encodable as utf-8.""" - packed = self.pack_message(message) + packed = self._pack_message(message) # if two greenthreads are trying to send at the same time # on the same socket, sendlock prevents interleaving and corruption self._sendlock.acquire() @@ -125,18 +218,34 @@ class WebSocket(object): """Waits for and deserializes messages. Returns a single message; the oldest not yet processed.""" while not self._msgs: + # Websocket might be closed already. + if self.websocket_closed: + return None # no parsed messages, must mean buf needs more data delta = self.socket.recv(8096) if delta == '': return None self._buf += delta - msgs = self.parse_messages() + msgs = self._parse_messages() self._msgs.extend(msgs) return self._msgs.popleft() + def _send_closing_frame(self, ignore_send_errors=False): + """Sends the closing frame to the client, if required.""" + if self.version == 76 and not self.websocket_closed: + try: + self.socket.sendall("\xff\x00") + except SocketError: + # Sometimes, like when the remote side cuts off the connection, + # we don't care about this. + if not ignore_send_errors: #pragma NO COVER + raise + self.websocket_closed = True + def close(self): """Forcibly close the websocket; generally it is preferable to return from the handler method.""" + self._send_closing_frame() self.socket.shutdown(True) self.socket.close() diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 1b46395..36ceee7 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -90,36 +90,54 @@ class Input(object): self.position += len(read) return read - def _chunked_read(self, rfile, length=None): + def _chunked_read(self, rfile, length=None, use_readline=False): if self.wfile is not None: ## 100 Continue self.wfile.write(self.wfile_line) self.wfile = None self.wfile_line = None - - response = [] try: - if length is None: - if self.chunk_length > self.position: - response.append(rfile.read(self.chunk_length - self.position)) - while self.chunk_length != 0: - self.chunk_length = int(rfile.readline(), 16) - response.append(rfile.read(self.chunk_length)) - rfile.readline() + if length == 0: + return "" + + if length < 0: + length = None + + if use_readline: + reader = self.rfile.readline else: - while length > 0 and self.chunk_length != 0: - if self.chunk_length > self.position: - response.append(rfile.read( - min(self.chunk_length - self.position, length))) - length -= len(response[-1]) - self.position += len(response[-1]) - if self.chunk_length == self.position: - rfile.readline() - else: - self.chunk_length = int(rfile.readline(), 16) - self.position = 0 - if not self.chunk_length: - rfile.readline() + reader = self.rfile.read + + response = [] + while self.chunk_length != 0: + maxreadlen = self.chunk_length - self.position + if length is not None and length < maxreadlen: + maxreadlen = length + + if maxreadlen > 0: + data = reader(maxreadlen) + if not data: + self.chunk_length = 0 + raise IOError("unexpected end of file while parsing chunked data") + + datalen = len(data) + response.append(data) + + self.position += datalen + if self.chunk_length == self.position: + rfile.readline() + + if length is not None: + length -= datalen + if length == 0: + break + if use_readline and data[-1] == "\n": + break + else: + self.chunk_length = int(rfile.readline().split(";", 1)[0], 16) + self.position = 0 + if self.chunk_length == 0: + rfile.readline() except greenio.SSL.ZeroReturnError: pass return ''.join(response) @@ -130,7 +148,10 @@ class Input(object): return self._do_read(self.rfile.read, length) def readline(self, size=None): - return self._do_read(self.rfile.readline) + if self.chunked_input: + return self._chunked_read(self.rfile, size, True) + else: + return self._do_read(self.rfile.readline, size) def readlines(self, hint=None): return self._do_read(self.rfile.readlines, hint) @@ -312,7 +333,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): try: try: result = self.application(self.environ, start_response) - if isinstance(result, _AlreadyHandled): + if (isinstance(result, _AlreadyHandled) + or isinstance(getattr(result, '_obj', None), _AlreadyHandled)): self.close_connection = 1 return if not headers_sent and hasattr(result, '__len__') and \ @@ -345,8 +367,9 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): finally: if hasattr(result, 'close'): result.close() - if (self.environ['eventlet.input'].position - < self.environ.get('CONTENT_LENGTH', 0)): + if (self.environ['eventlet.input'].chunked_input or + self.environ['eventlet.input'].position \ + < self.environ['eventlet.input'].content_length): ## Read and discard body if there was no pending 100-continue if not self.environ['eventlet.input'].wfile: while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE): diff --git a/examples/producer_consumer.py b/examples/producer_consumer.py index b335f7d..6f6c82f 100644 --- a/examples/producer_consumer.py +++ b/examples/producer_consumer.py @@ -2,7 +2,7 @@ it doesn't respect robots.txt and it is pretty brutal about how quickly it fetches pages. -This is a kind of "producer/consumer" example; the producer function produces +This is a kind of "producer/consumer" example; the fetch function produces jobs, and the GreenPool itself is the consumer, farming out work concurrently. It's easier to write it this way rather than writing a standard consumer loop; GreenPool handles any exceptions raised and arranges so that there's a set @@ -43,10 +43,10 @@ def producer(start_url): # limit requests to eventlet.net so we don't crash all over the internet if url not in seen and 'eventlet.net' in url: seen.add(url) - pool.spawn(fetch, url, q) + pool.spawn_n(fetch, url, q) return seen seen = producer("http://eventlet.net") print "I saw these urls:" -print "\n".join(seen) \ No newline at end of file +print "\n".join(seen) diff --git a/examples/recursive_crawler.py b/examples/recursive_crawler.py new file mode 100644 index 0000000..2e8701e --- /dev/null +++ b/examples/recursive_crawler.py @@ -0,0 +1,49 @@ +"""This is a recursive web crawler. Don't go pointing this at random sites; +it doesn't respect robots.txt and it is pretty brutal about how quickly it +fetches pages. + +The code for this is very short; this is perhaps a good indication +that this is making the most effective use of the primitves at hand. +The fetch function does all the work of making http requests, +searching for new urls, and dispatching new fetches. The GreenPool +acts as sort of a job coordinator (and concurrency controller of +course). +""" +from __future__ import with_statement + +from eventlet.green import urllib2 +import eventlet +import re + +# http://daringfireball.net/2009/11/liberal_regex_for_matching_urls +url_regex = re.compile(r'\b(([\w-]+://?|www[.])[^\s()<>]+(?:\([\w\d]+\)|([^[:punct:]\s]|/)))') + + +def fetch(url, seen, pool): + """Fetch a url, stick any found urls into the seen set, and + dispatch any new ones to the pool.""" + print "fetching", url + data = '' + with eventlet.Timeout(5, False): + data = urllib2.urlopen(url).read() + for url_match in url_regex.finditer(data): + new_url = url_match.group(0) + # only send requests to eventlet.net so as not to destroy the internet + if new_url not in seen and 'eventlet.net' in new_url: + seen.add(new_url) + # while this seems stack-recursive, it's actually not: + # spawned greenthreads start their own stacks + pool.spawn_n(fetch, new_url, seen, pool) + +def crawl(start_url): + """Recursively crawl starting from *start_url*. Returns a set of + urls that were found.""" + pool = eventlet.GreenPool() + seen = set() + fetch(start_url, seen, pool) + pool.waitall() + return seen + +seen = crawl("http://eventlet.net") +print "I saw these urls:" +print "\n".join(seen) diff --git a/examples/websocket.py b/examples/websocket.py index 9bac0e2..ae8181f 100644 --- a/examples/websocket.py +++ b/examples/websocket.py @@ -5,6 +5,8 @@ from eventlet import websocket # demo app import os import random + +@websocket.WebSocketWSGI def handle(ws): """ This is the websocket handler function. Note that we can dispatch based on path in here, too.""" @@ -20,12 +22,11 @@ def handle(ws): ws.send("0 %s %s\n" % (i, random.random())) eventlet.sleep(0.1) -wsapp = websocket.WebSocketWSGI(handle) def dispatch(environ, start_response): """ This resolves to the web page or the websocket depending on the path.""" if environ['PATH_INFO'] == '/data': - return wsapp(environ, start_response) + return handle(environ, start_response) else: start_response('200 OK', [('content-type', 'text/html')]) return [open(os.path.join( diff --git a/tests/__init__.py b/tests/__init__.py index 6349a96..5884f09 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -191,6 +191,7 @@ def get_database_auth(): try: import simplejson except ImportError: + print "No simplejson, using baked-in db credentials." return retval if 'EVENTLET_DB_TEST_AUTH' in os.environ: diff --git a/tests/convenience_test.py b/tests/convenience_test.py index 64aba0a..7b86861 100644 --- a/tests/convenience_test.py +++ b/tests/convenience_test.py @@ -1,7 +1,12 @@ +import os + import eventlet from eventlet import event from tests import LimitedTestCase, s2b +certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') +private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') + class TestServe(LimitedTestCase): def setUp(self): super(TestServe, self).setUp() @@ -101,3 +106,17 @@ class TestServe(LimitedTestCase): timeout_value="timed out") self.assertEquals(x, "timed out") + def test_wrap_ssl(self): + server = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, server_side=True) + port = server.getsockname()[1] + def handle(sock,addr): + sock.sendall(sock.recv(1024)) + raise eventlet.StopServe() + eventlet.spawn(eventlet.serve, server, handle) + client = eventlet.wrap_ssl(eventlet.connect(('localhost', port))) + client.sendall("echo") + self.assertEquals("echo", client.recv(1024)) + + diff --git a/tests/env_test.py b/tests/env_test.py new file mode 100644 index 0000000..a24cde0 --- /dev/null +++ b/tests/env_test.py @@ -0,0 +1,59 @@ +import os +from tests.patcher_test import ProcessBase +from tests import skip_with_pyevent + +class Socket(ProcessBase): + def test_patched_thread(self): + new_mod = """from eventlet.green import socket +socket.gethostbyname('localhost') +socket.getaddrinfo('localhost', 80) +""" + os.environ['EVENTLET_TPOOL_DNS'] = 'yes' + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 1, lines) + finally: + del os.environ['EVENTLET_TPOOL_DNS'] + +class Tpool(ProcessBase): + @skip_with_pyevent + def test_tpool_size(self): + new_mod = """from eventlet import tpool +import eventlet +import time +current = [0] +highwater = [0] +def count(): + current[0] += 1 + time.sleep(0.04) + if current[0] > highwater[0]: + highwater[0] = current[0] + current[0] -= 1 +expected = 40 +p = eventlet.GreenPool() +for i in xrange(expected): + p.spawn(tpool.execute,count) +p.waitall() +assert highwater[0] == expected, "%s != %s" % (highwater[0], expected)""" + os.environ['EVENTLET_THREADPOOL_SIZE'] = "40" + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 1, lines) + finally: + del os.environ['EVENTLET_THREADPOOL_SIZE'] + +class Hub(ProcessBase): + def test_eventlet_hub(self): + new_mod = """from eventlet import hubs +print hubs.get_hub() +""" + os.environ['EVENTLET_HUB'] = 'selects' + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, "\n".join(lines)) + self.assert_("selects" in lines[0]) + finally: + del os.environ['EVENTLET_HUB'] diff --git a/tests/hub_test.py b/tests/hub_test.py index 8eac85d..d3f9561 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -154,6 +154,42 @@ class TestHubBlockingDetector(LimitedTestCase): self.assertRaises(RuntimeError, gt.wait) debug.hub_blocking_detection(False) +class TestSuspend(LimitedTestCase): + TEST_TIMEOUT=3 + def test_suspend_doesnt_crash(self): + import errno + import os + import shutil + import signal + import subprocess + import sys + import tempfile + self.tempdir = tempfile.mkdtemp('test_suspend') + filename = os.path.join(self.tempdir, 'test_suspend.py') + fd = open(filename, "w") + fd.write("""import eventlet +eventlet.Timeout(0.5) +try: + eventlet.listen(("127.0.0.1", 0)).accept() +except eventlet.Timeout: + print "exited correctly" +""") + fd.close() + python_path = os.pathsep.join(sys.path + [self.tempdir]) + new_env = os.environ.copy() + new_env['PYTHONPATH'] = python_path + p = subprocess.Popen([sys.executable, + os.path.join(self.tempdir, filename)], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) + eventlet.sleep(0.4) # wait for process to hit accept + os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR + os.kill(p.pid, signal.SIGCONT) + output, _ = p.communicate() + lines = [l for l in output.split("\n") if l] + self.assert_("exited correctly" in lines[-1]) + shutil.rmtree(self.tempdir) + + class Foo(object): pass diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py index f63801e..863db43 100644 --- a/tests/patcher_psycopg_test.py +++ b/tests/patcher_psycopg_test.py @@ -30,11 +30,11 @@ def fetch(num, secs): f = eventlet.spawn(fetch, 2, 1) t = eventlet.spawn(tick, 2, 100) f.wait() -assert count[0] > 150 +assert count[0] > 100, count[0] print "done" """ -class PatchingPsycopg(patcher_test.Patcher): +class PatchingPsycopg(patcher_test.ProcessBase): def test_psycopg_pached(self): if 'PSYCOPG_TEST_DSN' not in os.environ: # construct a non-json dsn for the subprocess @@ -50,5 +50,5 @@ class PatchingPsycopg(patcher_test.Patcher): print "Can't test psycopg2 patching; it's not installed." return # if there's anything wrong with the test program it'll have a stack trace - self.assert_(lines[0].startswith('done'), repr(output)) + self.assert_(lines[0].startswith('done'), output) diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 739cc31..f409e86 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -4,7 +4,7 @@ import subprocess import sys import tempfile -from tests import LimitedTestCase, main +from tests import LimitedTestCase, main, skip_with_pyevent base_module_contents = """ import socket @@ -27,7 +27,7 @@ import socket print "importing", patching, socket, patching.socket, patching.urllib """ -class Patcher(LimitedTestCase): +class ProcessBase(LimitedTestCase): TEST_TIMEOUT=3 # starting processes is time-consuming def setUp(self): self._saved_syspath = sys.path @@ -55,7 +55,7 @@ class Patcher(LimitedTestCase): return output, lines -class ImportPatched(Patcher): +class ImportPatched(ProcessBase): def test_patch_a_module(self): self.write_to_tempfile("base", base_module_contents) self.write_to_tempfile("patching", patching_module_contents) @@ -85,7 +85,7 @@ print "newmod", base, base.socket, base.urllib.socket.socket self.assert_('GreenSocket' in lines[1], repr(output)) -class MonkeyPatch(Patcher): +class MonkeyPatch(ProcessBase): def test_patched_modules(self): new_mod = """ from eventlet import patcher @@ -126,22 +126,6 @@ print "newmod" self.assertEqual(len(lines), 2, repr(output)) self.assert_(lines[0].startswith('newmod'), repr(output)) - def test_tpool(self): - new_mod = """ -import eventlet -from eventlet import patcher -patcher.monkey_patch() -from eventlet import tpool -print "newmod", tpool.execute(len, "hi") -print "newmod", tpool.execute(len, "hi2") -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 3, repr(output)) - self.assert_(lines[0].startswith('newmod'), repr(output)) - self.assert_('2' in lines[0], repr(output)) - self.assert_('3' in lines[1], repr(output)) - def test_typeerror(self): new_mod = """ @@ -172,7 +156,7 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys())) self.assert_(lines[0].startswith(ap), repr(output)) patched_modules = lines[0][len(ap):].strip() # psycopg might or might not be patched based on installed modules - patched_modules.replace("psycopg,", "") + patched_modules = patched_modules.replace("psycopg,", "") self.assertEqual(patched_modules, expected, "Logic:%s\nExpected: %s != %s" %(call, expected, patched_modules)) @@ -217,5 +201,73 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys())) "select=True)", 'select') + +test_monkey_patch_threading = """ +def test_monkey_patch_threading(): + tickcount = [0] + def tick(): + for i in xrange(1000): + tickcount[0] += 1 + eventlet.sleep() + + def do_sleep(): + tpool.execute(time.sleep, 0.5) + + eventlet.spawn(tick) + w1 = eventlet.spawn(do_sleep) + w1.wait() + print tickcount[0] + assert tickcount[0] > 900 + tpool.killall() +""" + +class Tpool(ProcessBase): + TEST_TIMEOUT=3 + + @skip_with_pyevent + def test_simple(self): + new_mod = """ +import eventlet +from eventlet import patcher +patcher.monkey_patch() +from eventlet import tpool +print "newmod", tpool.execute(len, "hi") +print "newmod", tpool.execute(len, "hi2") +tpool.killall() +""" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 3, output) + self.assert_(lines[0].startswith('newmod'), repr(output)) + self.assert_('2' in lines[0], repr(output)) + self.assert_('3' in lines[1], repr(output)) + + @skip_with_pyevent + def test_unpatched_thread(self): + new_mod = """import eventlet +eventlet.monkey_patch(time=False, thread=False) +from eventlet import tpool +import time +""" + new_mod += test_monkey_patch_threading + new_mod += "\ntest_monkey_patch_threading()\n" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, lines) + + @skip_with_pyevent + def test_patched_thread(self): + new_mod = """import eventlet +eventlet.monkey_patch(time=False, thread=True) +from eventlet import tpool +import time +""" + new_mod += test_monkey_patch_threading + new_mod += "\ntest_monkey_patch_threading()\n" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, "\n".join(lines)) + + if __name__ == '__main__': main() diff --git a/tests/timeout_test_with_statement.py b/tests/timeout_test_with_statement.py index 48fe0b5..21779ce 100644 --- a/tests/timeout_test_with_statement.py +++ b/tests/timeout_test_with_statement.py @@ -15,7 +15,7 @@ class Error(Exception): pass class Test(LimitedTestCase): - def test_api(self): + def test_cancellation(self): # Nothing happens if with-block finishes before the timeout expires t = Timeout(DELAY*2) sleep(0) # make it pending @@ -27,6 +27,7 @@ class Test(LimitedTestCase): assert not t.pending, repr(t) sleep(DELAY*2) + def test_raising_self(self): # An exception will be raised if it's not try: with Timeout(DELAY) as t: @@ -36,6 +37,17 @@ class Test(LimitedTestCase): else: raise AssertionError('must raise Timeout') + def test_raising_self_true(self): + # specifying True as the exception raises self as well + try: + with Timeout(DELAY, True) as t: + sleep(DELAY*2) + except Timeout, ex: + assert ex is t, (ex, t) + else: + raise AssertionError('must raise Timeout') + + def test_raising_custom_exception(self): # You can customize the exception raised: try: with Timeout(DELAY, IOError("Operation takes way too long")): @@ -43,6 +55,7 @@ class Test(LimitedTestCase): except IOError, ex: assert str(ex)=="Operation takes way too long", repr(ex) + def test_raising_exception_class(self): # Providing classes instead of values should be possible too: try: with Timeout(DELAY, ValueError): @@ -50,6 +63,7 @@ class Test(LimitedTestCase): except ValueError: pass + def test_raising_exc_tuple(self): try: 1//0 except: @@ -63,12 +77,16 @@ class Test(LimitedTestCase): else: raise AssertionError('should not get there') + def test_cancel_timer_inside_block(self): # It's possible to cancel the timer inside the block: with Timeout(DELAY) as timer: timer.cancel() sleep(DELAY*2) - # To silent the exception before exiting the block, pass False as second parameter. + + def test_silent_block(self): + # To silence the exception before exiting the block, pass + # False as second parameter. XDELAY=0.1 start = time.time() with Timeout(XDELAY, False): @@ -76,6 +94,8 @@ class Test(LimitedTestCase): delta = (time.time()-start) assert delta