This commit is contained in:
Ryan Williams
2010-06-24 13:16:52 -07:00
37 changed files with 1268 additions and 213 deletions

10
AUTHORS
View File

@@ -1,3 +1,7 @@
Maintainer (i.e., Who To Hassle If You Find Bugs)
-------------------------------------------------
Ryan Williams, rdw on Freenode, breath@alum.mit.edu
Original Authors Original Authors
---------------- ----------------
* Bob Ippolito * Bob Ippolito
@@ -12,6 +16,7 @@ Contributors
* Mike Barton * Mike Barton
* Patrick Carlisle * Patrick Carlisle
* Ben Ford * Ben Ford
* Andrew Godwin
* Brantley Harris * Brantley Harris
* Gregory Holt * Gregory Holt
* Joe Malicki * Joe Malicki
@@ -23,6 +28,7 @@ Contributors
* Sergey Shepelev * Sergey Shepelev
* Chuck Thier * Chuck Thier
* Daniele Varrazzo * Daniele Varrazzo
* Ryan Williams
Linden Lab Contributors Linden Lab Contributors
----------------------- -----------------------
@@ -50,4 +56,6 @@ Thanks To
* Slant, better iterator implementation in tpool * Slant, better iterator implementation in tpool
* Ambroff, nice pygtk hub example * Ambroff, nice pygtk hub example
* Michael Carter, and Marcin Bachry, nice repro of a bug and good diagnosis leading to the fix * Michael Carter, and Marcin Bachry, nice repro of a bug and good diagnosis leading to the fix
* David Ziegler, reporting issue #53 * David Ziegler, reporting issue #53
* Favo Yang, twisted hub patch
* Schmir, patch that fixes readline method with chunked encoding in wsgi.py

17
NEWS
View File

@@ -1,3 +1,20 @@
0.9.9
=====
* A fix for monkeypatching on systems with psycopg version 2.0.14.
* Improved support for chunked transfers in wsgi, plus a bunch of tests from schmir (ported from gevent by redbo)
* A fix for the twisted hub from Favo Yang
0.9.8
=====
* Support for psycopg2's asynchronous mode, from Daniele Varrazzo
* websocket module is now part of core Eventlet with 100% unit test coverage thanks to Ben Ford. See its documentation at http://eventlet.net/doc/modules/websocket.html
* Added wrap_ssl convenience method, meaning that we truly no longer need api or util modules.
* Multiple-reader detection code protects against the common mistake of having multiple greenthreads read from the same socket at the same time, which can be overridden if you know what you're doing.
* Cleaner monkey_patch API: the "all" keyword is no longer necessary.
* Pool objects have a more convenient constructor -- no more need to subclass
* amajorek's reimplementation of GreenPipe
* Many bug fixes, major and minor.
0.9.7 0.9.7
===== =====
* GreenPipe is now a context manager (thanks, quad) * GreenPipe is now a context manager (thanks, quad)

View File

@@ -74,6 +74,8 @@ Network Convenience Functions
.. autofunction:: eventlet.listen .. autofunction:: eventlet.listen
.. autofunction:: eventlet.wrap_ssl
.. autofunction:: eventlet.serve .. autofunction:: eventlet.serve
.. autofunction:: eventlet.StopServe .. autofunction:: eventlet.StopServe

26
doc/environment.rst Normal file
View File

@@ -0,0 +1,26 @@
Environment Variables
======================
Eventlet's behavior can be controlled by a few environment variables.
These are only for the advanced user.
EVENTLET_HUB
Used to force Eventlet to use the specified hub instead of the
optimal one. See :ref:`understanding_hubs` for the list of
acceptable hubs and what they mean (note that picking a hub not on
the list will silently fail). Equivalent to calling
:meth:`eventlet.hubs.use_hub` at the beginning of the program.
EVENTLET_THREADPOOL_SIZE
The size of the threadpool in :mod:`~eventlet.tpool`. This is an
environment variable because tpool constructs its pool on first
use, so any control of the pool size needs to happen before then.
EVENTLET_TPOOL_DNS
If set to 'yes', uses :func:`eventlet.tpool.execute` to call
:func:`~socket.gethostbyname` and :func:`~socket.getaddrinfo`,
making them appear non-blocking. This environment variable is
ignored on OS X.

View File

@@ -64,12 +64,33 @@ Port Forwarder
.. literalinclude:: ../examples/forwarder.py .. literalinclude:: ../examples/forwarder.py
.. _recursive_crawler_example:
Recursive Web Crawler
-----------------------------------------
``examples/recursive_crawler.py``
This is an example recursive web crawler that fetches linked pages from a seed url.
.. literalinclude:: ../examples/recursive_crawler.py
.. _producer_consumer_example: .. _producer_consumer_example:
Producer Consumer/Recursive Web Crawler Producer Consumer Web Crawler
----------------------------------------- -----------------------------------------
``examples/producer_consumer.py`` ``examples/producer_consumer.py``
This is an example implementation of the producer/consumer pattern as well as a functional recursive web crawler. This is an example implementation of the producer/consumer pattern as well as being identical in functionality to the recursive web crawler.
.. literalinclude:: ../examples/producer_consumer.py .. literalinclude:: ../examples/producer_consumer.py
.. _websocket_example:
Websocket Server Example
--------------------------
``examples/websocket.py``
This exercises some of the features of the websocket server
implementation.
.. literalinclude:: ../examples/websocket.py

View File

@@ -31,6 +31,7 @@ Contents
threading threading
hubs hubs
testing testing
environment
modules modules

View File

@@ -15,4 +15,5 @@ Module Reference
modules/queue modules/queue
modules/semaphore modules/semaphore
modules/timeout modules/timeout
modules/websocket
modules/wsgi modules/wsgi

30
doc/modules/websocket.rst Normal file
View File

@@ -0,0 +1,30 @@
:mod:`websocket` -- Websocket Server
=====================================
This module provides a simple way to create a `websocket
<http://dev.w3.org/html5/websockets/>` server. It works with a few
tweaks in the :mod:`~eventlet.wsgi` module that allow websockets to
coexist with other WSGI applications.
To create a websocket server, simply decorate a handler method with
:class:`WebSocketWSGI` and use it as a wsgi application::
from eventlet import wsgi, websocket
import eventlet
@websocket.WebSocketWSGI
def hello_world(ws):
ws.send("hello world")
wsgi.server(eventlet.listen(('', 8090)), hello_world)
You can find a slightly more elaborate version of this code in the file
``examples/websocket.py``.
**Note** that the web socket spec is still under development, and it
will be necessary to change the way that this module works in response
to spec changes.
.. automodule:: eventlet.websocket
:members:

View File

@@ -41,7 +41,7 @@ easy_install eventlet
<p>Alternately, you can download the source tarball: <p>Alternately, you can download the source tarball:
<ul> <ul>
<li><a href="http://pypi.python.org/packages/source/e/eventlet/eventlet-0.9.7.tar.gz">eventlet-0.9.7.tar.gz</a></li> <li><a href="http://pypi.python.org/packages/source/e/eventlet/eventlet-0.9.9.tar.gz">eventlet-0.9.9.tar.gz</a></li>
</ul> </ul>
</p> </p>

View File

@@ -23,6 +23,8 @@ That's it! The output from running nose is the same as unittest's output, if th
Many tests are skipped based on environmental factors; for example, it makes no sense to test Twisted-specific functionality when Twisted is not installed. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped. Many tests are skipped based on environmental factors; for example, it makes no sense to test Twisted-specific functionality when Twisted is not installed. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped.
.. note:: If running Python version 2.4, use this command instead: ``python tests/nosewrapper.py``. There are several tests which make use of the `with` statement and therefore will cause nose grief when it tries to import them; nosewrapper.py excludes these tests so they are skipped.
Doctests Doctests
-------- --------

View File

@@ -1,4 +1,4 @@
version_info = (0, 9, 7, "dev1") version_info = (0, 9, 10, "dev1")
__version__ = ".".join(map(str, version_info)) __version__ = ".".join(map(str, version_info))
try: try:
@@ -31,6 +31,7 @@ try:
listen = convenience.listen listen = convenience.listen
serve = convenience.serve serve = convenience.serve
StopServe = convenience.StopServe StopServe = convenience.StopServe
wrap_ssl = convenience.wrap_ssl
getcurrent = greenlet.greenlet.getcurrent getcurrent = greenlet.greenlet.getcurrent

View File

@@ -68,6 +68,8 @@ def ssl_listener(address, certificate, private_key):
Returns a socket object on which one should call ``accept()`` to Returns a socket object on which one should call ``accept()`` to
accept a connection on the newly bound socket. accept a connection on the newly bound socket.
""" """
warnings.warn("""eventlet.api.ssl_listener is deprecated. Please use eventlet.wrap_ssl(eventlet.listen()) instead.""",
DeprecationWarning, stacklevel=2)
from eventlet import util from eventlet import util
import socket import socket

View File

@@ -97,11 +97,14 @@ def serve(sock, handle, concurrency=1000):
def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False,
cert_reqs=None, ssl_version=None, ca_certs=None, cert_reqs=0, ssl_version=2, ca_certs=None,
do_handshake_on_connect=True, suppress_ragged_eofs=True): do_handshake_on_connect=True, suppress_ragged_eofs=True):
"""Convenience function for converting a regular socket into an SSL """Convenience function for converting a regular socket into an
socket. Has the same interface as :func:`ssl.wrap_socket`, but SSL socket. Has the same interface as :func:`ssl.wrap_socket`,
works on 2.5 or earlier, using PyOpenSSL. but works on 2.5 or earlier, using PyOpenSSL (though note that it
ignores the *cert_reqs*, *ssl_version*, *ca_certs*,
*do_handshake_on_connect*, and *suppress_ragged_eofs* arguments
when using PyOpenSSL).
The preferred idiom is to call wrap_ssl directly on the creation The preferred idiom is to call wrap_ssl directly on the creation
method, e.g., ``wrap_ssl(connect(addr))`` or method, e.g., ``wrap_ssl(connect(addr))`` or
@@ -111,4 +114,41 @@ def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False,
:return Green SSL object. :return Green SSL object.
""" """
pass return wrap_ssl_impl(sock, keyfile=keyfile, certfile=certfile,
server_side=server_side,
cert_reqs=cert_reqs,
ssl_version=ssl_version,
ca_certs=ca_certs,
do_handshake_on_connect=do_handshake_on_connect,
suppress_ragged_eofs=suppress_ragged_eofs)
try:
from eventlet.green import ssl
wrap_ssl_impl = ssl.wrap_socket
except ImportError:
# < 2.6, trying PyOpenSSL
from eventlet.green.OpenSSL import SSL
try:
def wrap_ssl_impl(sock, keyfile=None, certfile=None, server_side=False,
cert_reqs=None, ssl_version=None, ca_certs=None,
do_handshake_on_connect=True, suppress_ragged_eofs=True):
# theoretically the ssl_version could be respected in this
# next line
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
connection = SSL.Connection(context, sock)
if server_side:
connection.set_accept_state()
else:
connection.set_connect_state()
return connection
except ImportError:
def wrap_ssl_impl(*a, **kw):
raise ImportError("To use SSL with Eventlet, "
"you must install PyOpenSSL or use Python 2.6 or later.")

View File

@@ -7,10 +7,11 @@ import linecache
import re import re
import inspect import inspect
__all__ = ['spew', 'unspew', 'format_hub_listeners', 'hub_listener_stacks', __all__ = ['spew', 'unspew', 'format_hub_listeners', 'format_hub_timers',
'hub_exceptions', 'tpool_exceptions'] 'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions',
'hub_prevent_multiple_readers', 'hub_timer_stacks']
_token_spliter = re.compile('\W+') _token_splitter = re.compile('\W+')
class Spew(object): class Spew(object):
""" """
@@ -42,7 +43,7 @@ class Spew(object):
if not self.show_values: if not self.show_values:
return self return self
details = [] details = []
tokens = _token_spliter.split(line) tokens = _token_splitter.split(line)
for tok in tokens: for tok in tokens:
if tok in frame.f_globals: if tok in frame.f_globals:
details.append('%s=%r' % (tok, frame.f_globals[tok])) details.append('%s=%r' % (tok, frame.f_globals[tok]))

View File

@@ -12,8 +12,8 @@ os = __import__('os')
import sys import sys
import warnings import warnings
__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', __patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'getaddrinfo',
'ssl', 'socket'] 'create_connection', 'ssl', 'socket']
try: try:
__original_fromfd__ = __socket.fromfd __original_fromfd__ = __socket.fromfd
@@ -31,20 +31,11 @@ except AttributeError:
pass pass
__original_gethostbyname__ = __socket.gethostbyname __original_gethostbyname__ = __socket.gethostbyname
def gethostbyname(name): # the thread primitives on Darwin have some bugs that make
can_use_tpool = os.environ.get("EVENTLET_TPOOL_GETHOSTBYNAME", # it undesirable to use tpool for hostname lookups
'').lower() == "yes" _can_use_tpool = (
if getattr(get_hub(), 'uses_twisted_reactor', None): os.environ.get("EVENTLET_TPOOL_DNS",'').lower() == "yes"
globals()['gethostbyname'] = _gethostbyname_twisted and not sys.platform.startswith('darwin'))
elif sys.platform.startswith('darwin') or not can_use_tpool:
# the thread primitives on Darwin have some bugs that make
# it undesirable to use tpool for hostname lookups
globals()['gethostbyname'] = __original_gethostbyname__
else:
globals()['gethostbyname'] = _gethostbyname_tpool
return globals()['gethostbyname'](name)
def _gethostbyname_twisted(name): def _gethostbyname_twisted(name):
from twisted.internet import reactor from twisted.internet import reactor
from eventlet.twistedutil import block_on as _block_on from eventlet.twistedutil import block_on as _block_on
@@ -55,12 +46,25 @@ def _gethostbyname_tpool(name):
return tpool.execute( return tpool.execute(
__original_gethostbyname__, name) __original_gethostbyname__, name)
# def getaddrinfo(*args, **kw): if getattr(get_hub(), 'uses_twisted_reactor', None):
# return tpool.execute( gethostbyname = _gethostbyname_twisted
# __socket.getaddrinfo, *args, **kw) elif _can_use_tpool:
# gethostbyname = _gethostbyname_tpool
# XXX there're few more blocking functions in socket else:
# XXX having a hub-independent way to access thread pool would be nice gethostbyname = __original_gethostbyname__
__original_getaddrinfo__ = __socket.getaddrinfo
def _getaddrinfo_tpool(*args, **kw):
from eventlet import tpool
return tpool.execute(
__original_getaddrinfo__, *args, **kw)
if _can_use_tpool:
getaddrinfo = _getaddrinfo_tpool
else:
getaddrinfo = __original_getaddrinfo__
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT): def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Connect to *address* and return the socket object. """Connect to *address* and return the socket object.

View File

@@ -1,4 +1,4 @@
"""implements standard module 'thread' with greenlets""" """Implements the standard thread module, using greenthreads."""
__thread = __import__('thread') __thread = __import__('thread')
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet import greenthread from eventlet import greenthread
@@ -49,4 +49,4 @@ if hasattr(__thread, 'stack_size'):
pass pass
# not going to decrease stack_size, because otherwise other greenlets in this thread will suffer # not going to decrease stack_size, because otherwise other greenlets in this thread will suffer
from eventlet.corolocal import local as _local from eventlet.corolocal import local as _local

View File

@@ -11,13 +11,3 @@ patcher.inject('threading',
('time', time)) ('time', time))
del patcher del patcher
def _patch_main_thread(mod):
# this is some gnarly patching for the threading module;
# if threading is imported before we patch (it nearly always is),
# then the main thread will have the wrong key in therading._active,
# so, we try and replace that key with the correct one here
# this works best if there are no other threads besides the main one
curthread = mod._active.pop(mod._get_ident(), None)
if curthread:
mod._active[thread.get_ident()] = curthread

View File

@@ -77,7 +77,7 @@ class Hub(BaseHub):
return return
try: try:
presult = self.do_poll(seconds) presult = self.do_poll(seconds)
except select.error, e: except (IOError, select.error), e:
if get_errno(e) == errno.EINTR: if get_errno(e) == errno.EINTR:
return return
raise raise

View File

@@ -5,6 +5,33 @@ __all__ = ['inject', 'import_patched', 'monkey_patch', 'is_monkey_patched']
__exclude = set(('__builtins__', '__file__', '__name__')) __exclude = set(('__builtins__', '__file__', '__name__'))
class SysModulesSaver(object):
"""Class that captures some subset of the current state of
sys.modules. Pass in an iterator of module names to the
constructor."""
def __init__(self, module_names=()):
self._saved = {}
self.save(*module_names)
def save(self, *module_names):
"""Saves the named modules to the object."""
for modname in module_names:
self._saved[modname] = sys.modules.get(modname, None)
def restore(self):
"""Restores the modules that the saver knows about into
sys.modules.
"""
for modname, mod in self._saved.iteritems():
if mod is not None:
sys.modules[modname] = mod
else:
try:
del sys.modules[modname]
except KeyError:
pass
def inject(module_name, new_globals, *additional_modules): def inject(module_name, new_globals, *additional_modules):
"""Base method for "injecting" greened modules into an imported module. It """Base method for "injecting" greened modules into an imported module. It
imports the module specified in *module_name*, arranging things so imports the module specified in *module_name*, arranging things so
@@ -34,16 +61,20 @@ def inject(module_name, new_globals, *additional_modules):
_green_socket_modules() + _green_socket_modules() +
_green_thread_modules() + _green_thread_modules() +
_green_time_modules()) _green_time_modules())
# after this we are gonna screw with sys.modules, so capture the
# state of all the modules we're going to mess with
saver = SysModulesSaver([name for name, m in additional_modules])
saver.save(module_name)
## Put the specified modules in sys.modules for the duration of the import # Cover the target modules so that when you import the module it
saved = {} # sees only the patched versions
for name, mod in additional_modules: for name, mod in additional_modules:
saved[name] = sys.modules.get(name, None)
sys.modules[name] = mod sys.modules[name] = mod
## Remove the old module from sys.modules and reimport it while ## Remove the old module from sys.modules and reimport it while
## the specified modules are in place ## the specified modules are in place
old_module = sys.modules.pop(module_name, None) sys.modules.pop(module_name, None)
try: try:
module = __import__(module_name, {}, {}, module_name.split('.')[:-1]) module = __import__(module_name, {}, {}, module_name.split('.')[:-1])
@@ -56,18 +87,7 @@ def inject(module_name, new_globals, *additional_modules):
## Keep a reference to the new module to prevent it from dying ## Keep a reference to the new module to prevent it from dying
sys.modules[patched_name] = module sys.modules[patched_name] = module
finally: finally:
## Put the original module back saver.restore() ## Put the original modules back
if old_module is not None:
sys.modules[module_name] = old_module
elif module_name in sys.modules:
del sys.modules[module_name]
## Put all the saved modules back
for name, mod in additional_modules:
if saved[name] is not None:
sys.modules[name] = saved[name]
else:
del sys.modules[name]
return module return module
@@ -86,8 +106,11 @@ def import_patched(module_name, *additional_modules, **kw_additional_modules):
def patch_function(func, *additional_modules): def patch_function(func, *additional_modules):
"""Huge hack here -- patches the specified modules for the """Decorator that returns a version of the function that patches
duration of the function call.""" some modules for the duration of the function call. This is
deeply gross and should only be used for functions that import
network libraries within their function bodies that there is no
way of getting around."""
if not additional_modules: if not additional_modules:
# supply some defaults # supply some defaults
additional_modules = ( additional_modules = (
@@ -98,35 +121,67 @@ def patch_function(func, *additional_modules):
_green_time_modules()) _green_time_modules())
def patched(*args, **kw): def patched(*args, **kw):
saved = {} saver = SysModulesSaver()
for name, mod in additional_modules: for name, mod in additional_modules:
saved[name] = sys.modules.get(name, None) saver.save(name)
sys.modules[name] = mod sys.modules[name] = mod
try: try:
return func(*args, **kw) return func(*args, **kw)
finally: finally:
## Put all the saved modules back saver.restore()
for name, mod in additional_modules:
if saved[name] is not None:
sys.modules[name] = saved[name]
else:
del sys.modules[name]
return patched return patched
_originals = {} def _original_patch_function(func, *module_names):
def original(modname): """Kind of the contrapositive of patch_function: decorates a
mod = _originals.get(modname) function such that when it's called, sys.modules is populated only
if mod is None: with the unpatched versions of the specified modules. Unlike
# re-import the "pure" module and store it in the global _originals patch_function, only the names of the modules need be supplied,
# dict; be sure to restore whatever module had that name already and there are no defaults. This is a gross hack; tell your kids not
current_mod = sys.modules.pop(modname, None) to import inside function bodies!"""
def patched(*args, **kw):
saver = SysModulesSaver(module_names)
for name in module_names:
sys.modules[name] = original(name)
try: try:
real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) return func(*args, **kw)
_originals[modname] = real_mod
finally: finally:
if current_mod is not None: saver.restore()
sys.modules[modname] = current_mod return patched
return _originals.get(modname)
def original(modname):
""" This returns an unpatched version of a module; this is useful for
Eventlet itself (i.e. tpool)."""
# note that it's not necessary to temporarily install unpatched
# versions of all patchable modules during the import of the
# module; this is because none of them import each other, except
# for threading which imports thread
original_name = '__original_module_' + modname
if original_name in sys.modules:
return sys.modules.get(original_name)
# re-import the "pure" module and store it in the global _originals
# dict; be sure to restore whatever module had that name already
saver = SysModulesSaver((modname,))
sys.modules.pop(modname, None)
# install original thread module if we're getting the original
# threading module
if modname == 'threading':
saver.save('thread')
sys.modules['thread'] = original('thread')
try:
real_mod = __import__(modname, {}, {}, modname.split('.')[:-1])
# hacky hack: Queue's constructor imports threading; therefore
# we wrap it with something that ensures it always gets the
# original threading
if modname == 'Queue':
real_mod.Queue.__init__ = _original_patch_function(real_mod.Queue.__init__, 'threading')
# save a reference to the unpatched module so it doesn't get lost
sys.modules[original_name] = real_mod
finally:
saver.restore()
return sys.modules[original_name]
already_patched = {} already_patched = {}
def monkey_patch(**on): def monkey_patch(**on):
@@ -154,6 +209,7 @@ def monkey_patch(**on):
on.setdefault(modname, default_on) on.setdefault(modname, default_on)
modules_to_patch = [] modules_to_patch = []
patched_thread = False
if on['os'] and not already_patched.get('os'): if on['os'] and not already_patched.get('os'):
modules_to_patch += _green_os_modules() modules_to_patch += _green_os_modules()
already_patched['os'] = True already_patched['os'] = True
@@ -164,10 +220,7 @@ def monkey_patch(**on):
modules_to_patch += _green_socket_modules() modules_to_patch += _green_socket_modules()
already_patched['socket'] = True already_patched['socket'] = True
if on['thread'] and not already_patched.get('thread'): if on['thread'] and not already_patched.get('thread'):
# hacks ahead patched_thread = True
threading = original('threading')
import eventlet.green.threading as greenthreading
greenthreading._patch_main_thread(threading)
modules_to_patch += _green_thread_modules() modules_to_patch += _green_thread_modules()
already_patched['thread'] = True already_patched['thread'] = True
if on['time'] and not already_patched.get('time'): if on['time'] and not already_patched.get('time'):
@@ -188,11 +241,32 @@ def monkey_patch(**on):
for name, mod in modules_to_patch: for name, mod in modules_to_patch:
orig_mod = sys.modules.get(name) orig_mod = sys.modules.get(name)
if orig_mod is None:
orig_mod = __import__(name)
for attr_name in mod.__patched__: for attr_name in mod.__patched__:
patched_attr = getattr(mod, attr_name, None) patched_attr = getattr(mod, attr_name, None)
if patched_attr is not None: if patched_attr is not None:
setattr(orig_mod, attr_name, patched_attr) setattr(orig_mod, attr_name, patched_attr)
# hacks ahead; this is necessary to prevent a KeyError on program exit
if patched_thread:
_patch_main_thread(sys.modules['threading'])
def _patch_main_thread(mod):
"""This is some gnarly patching specific to the threading module;
threading will always be initialized prior to monkeypatching, and
its _active dict will have the wrong key (it uses the real thread
id but once it's patched it will use the greenlet ids); so what we
do is rekey the _active dict so that the main thread's entry uses
the greenthread key. Other threads' keys are ignored."""
thread = original('thread')
curthread = mod._active.pop(thread.get_ident(), None)
if curthread:
import eventlet.green.thread
mod._active[eventlet.green.thread.get_ident()] = curthread
def is_monkey_patched(module): def is_monkey_patched(module):
"""Returns True if the given module is monkeypatched currently, False if """Returns True if the given module is monkeypatched currently, False if
not. *module* can be either the module itself or its name. not. *module* can be either the module itself or its name.

View File

@@ -57,7 +57,7 @@ class Timeout(BaseException):
'%r is already started; to restart it, cancel it first' % self '%r is already started; to restart it, cancel it first' % self
if self.seconds is None: # "fake" timeout (never expires) if self.seconds is None: # "fake" timeout (never expires)
self.timer = None self.timer = None
elif self.exception is None or self.exception is False: # timeout that raises self elif self.exception is None or isinstance(self.exception, bool): # timeout that raises self
self.timer = get_hub().schedule_call_global( self.timer = get_hub().schedule_call_global(
self.seconds, greenlet.getcurrent().throw, self) self.seconds, greenlet.getcurrent().throw, self)
else: # regular timeout with user-provided exception else: # regular timeout with user-provided exception
@@ -112,7 +112,7 @@ class Timeout(BaseException):
suffix = '' suffix = ''
else: else:
suffix = 's' suffix = 's'
if self.exception is None: if self.exception is None or self.exception is True:
return '%s second%s' % (self.seconds, suffix) return '%s second%s' % (self.seconds, suffix)
elif self.exception is False: elif self.exception is False:
return '%s second%s (silent)' % (self.seconds, suffix) return '%s second%s (silent)' % (self.seconds, suffix)

View File

@@ -16,13 +16,14 @@
import os import os
import sys import sys
from Queue import Empty, Queue
from eventlet import event from eventlet import event
from eventlet import greenio from eventlet import greenio
from eventlet import greenthread from eventlet import greenthread
from eventlet import patcher from eventlet import patcher
threading = patcher.original('threading') threading = patcher.original('threading')
Queue_module = patcher.original('Queue')
Queue = Queue_module.Queue
Empty = Queue_module.Empty
__all__ = ['execute', 'Proxy', 'killall'] __all__ = ['execute', 'Proxy', 'killall']
@@ -67,7 +68,10 @@ SYS_EXCS = (KeyboardInterrupt, SystemExit)
def tworker(): def tworker():
global _reqq, _rspq global _reqq, _rspq
while(True): while(True):
msg = _reqq.get() try:
msg = _reqq.get()
except AttributeError:
return # can't get anything off of a dud queue
if msg is None: if msg is None:
return return
(e,meth,args,kwargs) = msg (e,meth,args,kwargs) = msg
@@ -192,11 +196,16 @@ class Proxy(object):
return proxy_call(self._autowrap, self._obj.__deepcopy__, memo) return proxy_call(self._autowrap, self._obj.__deepcopy__, memo)
def __copy__(self, memo=None): def __copy__(self, memo=None):
return proxy_call(self._autowrap, self._obj.__copy__, memo) return proxy_call(self._autowrap, self._obj.__copy__, memo)
def __call__(self, *a, **kw):
if '__call__' in self._autowrap_names:
return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw))
else:
return proxy_call(self._autowrap, self._obj, *a, **kw)
# these don't go through a proxy call, because they're likely to # these don't go through a proxy call, because they're likely to
# be called often, and are unlikely to be implemented on the # be called often, and are unlikely to be implemented on the
# wrapped object in such a way that they would block # wrapped object in such a way that they would block
def __eq__(self, rhs): def __eq__(self, rhs):
return self._obj.__eq__(rhs) return self._obj == rhs
def __hash__(self): def __hash__(self):
return self._obj.__hash__() return self._obj.__hash__()
def __repr__(self): def __repr__(self):
@@ -208,10 +217,11 @@ class Proxy(object):
def __nonzero__(self): def __nonzero__(self):
return bool(self._obj) return bool(self._obj)
def __iter__(self): def __iter__(self):
if iter(self._obj) == self._obj: it = iter(self._obj)
if it == self._obj:
return self return self
else: else:
return Proxy(iter(self._obj)) return Proxy(it)
def next(self): def next(self):
return proxy_call(self._autowrap, self._obj.next) return proxy_call(self._autowrap, self._obj.next)
@@ -231,7 +241,7 @@ def setup():
_rpipe, _wpipe = os.pipe() _rpipe, _wpipe = os.pipe()
_wfile = greenio.GreenPipe(_wpipe, 'wb', 0) _wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
_rfile = greenio.GreenPipe(_rpipe, 'rb', 0) _rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
except ImportError: except (ImportError, NotImplementedError):
# This is Windows compatibility -- use a socket instead of a pipe because # This is Windows compatibility -- use a socket instead of a pipe because
# pipes don't really exist on Windows. # pipes don't really exist on Windows.
import socket import socket
@@ -248,7 +258,7 @@ def setup():
_reqq = Queue(maxsize=-1) _reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1)
for i in range(0,_nthreads): for i in range(0,_nthreads):
t = threading.Thread(target=tworker) t = threading.Thread(target=tworker, name="tpool_thread_%s" % i)
t.setDaemon(True) t.setDaemon(True)
t.start() t.start()
_threads.add(t) _threads.add(t)

View File

@@ -4,9 +4,8 @@ You generally don't have to use it unless you need to call reactor.run()
yourself. yourself.
""" """
from eventlet.hubs.twistedr import BaseTwistedHub from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet import use_hub
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.hubs import _threadlocal from eventlet.hubs import _threadlocal, use_hub
use_hub(BaseTwistedHub) use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub') assert not hasattr(_threadlocal, 'hub')

View File

@@ -1,5 +1,13 @@
import collections import collections
import errno import errno
import string
import struct
from socket import error as SocketError
try:
from hashlib import md5
except ImportError: #pragma NO COVER
from md5 import md5
import eventlet import eventlet
from eventlet import semaphore from eventlet import semaphore
@@ -9,11 +17,26 @@ from eventlet.support import get_errno
ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE)) ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE))
__all__ = ["WebSocketWSGI", "WebSocket"]
class WebSocketWSGI(object): class WebSocketWSGI(object):
"""This is a WSGI application that serves up websocket connections. """Wraps a websocket handler function in a WSGI application.
Use it like this::
@websocket.WebSocketWSGI
def my_handler(ws):
from_browser = ws.wait()
ws.send("from server")
The single argument to the function will be an instance of
:class:`WebSocket`. To close the socket, simply return from the
function. Note that the server will log the websocket request at
the time of closure.
""" """
def __init__(self, handler): def __init__(self, handler):
self.handler = handler self.handler = handler
self.protocol_version = None
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
if not (environ.get('HTTP_CONNECTION') == 'Upgrade' and if not (environ.get('HTTP_CONNECTION') == 'Upgrade' and
@@ -21,61 +44,123 @@ class WebSocketWSGI(object):
# need to check a few more things here for true compliance # need to check a few more things here for true compliance
start_response('400 Bad Request', [('Connection','close')]) start_response('400 Bad Request', [('Connection','close')])
return [] return []
# See if they sent the new-format headers
if 'HTTP_SEC_WEBSOCKET_KEY1' in environ:
self.protocol_version = 76
if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ:
# That's bad.
start_response('400 Bad Request', [('Connection','close')])
return []
else:
self.protocol_version = 75
# Get the underlying socket and wrap a WebSocket class around it
sock = environ['eventlet.input'].get_socket() sock = environ['eventlet.input'].get_socket()
ws = WebSocket(sock, environ) ws = WebSocket(sock, environ, self.protocol_version)
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n" # If it's new-version, we need to work out our challenge response
"Connection: Upgrade\r\n" if self.protocol_version == 76:
"WebSocket-Origin: %s\r\n" key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1'])
"WebSocket-Location: ws://%s%s\r\n\r\n" % ( key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2'])
environ.get('HTTP_ORIGIN'), # There's no content-length header in the request, but it has 8
environ.get('HTTP_HOST'), # bytes of data.
environ.get('PATH_INFO'))) environ['wsgi.input'].content_length = 8
key3 = environ['wsgi.input'].read(8)
key = struct.pack(">II", key1, key2) + key3
response = md5(key).digest()
# Start building the response
if self.protocol_version == 75:
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %s\r\n"
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
environ.get('HTTP_ORIGIN'),
environ.get('HTTP_HOST'),
environ.get('PATH_INFO')))
elif self.protocol_version == 76:
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Origin: %s\r\n"
"Sec-WebSocket-Protocol: %s\r\n"
"Sec-WebSocket-Location: ws://%s%s\r\n"
"\r\n%s"% (
environ.get('HTTP_ORIGIN'),
environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'),
environ.get('HTTP_HOST'),
environ.get('PATH_INFO'),
response))
else: #pragma NO COVER
raise ValueError("Unknown WebSocket protocol version.")
sock.sendall(handshake_reply) sock.sendall(handshake_reply)
try: try:
self.handler(ws) self.handler(ws)
except socket.error, e: except socket.error, e:
if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS:
raise raise
# Make sure we send the closing frame
ws._send_closing_frame(True)
# use this undocumented feature of eventlet.wsgi to ensure that it # use this undocumented feature of eventlet.wsgi to ensure that it
# doesn't barf on the fact that we didn't call start_response # doesn't barf on the fact that we didn't call start_response
return wsgi.ALREADY_HANDLED return wsgi.ALREADY_HANDLED
def _extract_number(self, value):
"""
Utility function which, given a string like 'g98sd 5[]221@1', will
return 9852211. Used to parse the Sec-WebSocket-Key headers.
"""
out = ""
spaces = 0
for char in value:
if char in string.digits:
out += char
elif char == " ":
spaces += 1
return int(out) / spaces
class WebSocket(object): class WebSocket(object):
"""The object representing the server side of a websocket. """A websocket object that handles the details of
serialization/deserialization to the socket.
The primary way to interact with a WebSocket object is to call The primary way to interact with a :class:`WebSocket` object is to
:meth:`send` and :meth:`wait` in order to pass messages back and call :meth:`send` and :meth:`wait` in order to pass messages back
forth with the client. Also available are the following properties: and forth with the browser. Also available are the following
properties:
path path
The path value of the request. This is the same as the WSGI PATH_INFO variable. The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
protocol protocol
The value of the Websocket-Protocol header. The value of the Websocket-Protocol header.
origin origin
The value of the 'Origin' header. The value of the 'Origin' header.
environ environ
The full WSGI environment for this request. The full WSGI environment for this request.
""" """
def __init__(self, sock, environ): def __init__(self, sock, environ, version=76):
""" """
:param socket: The eventlet socket :param socket: The eventlet socket
:type socket: :class:`eventlet.greenio.GreenSocket` :type socket: :class:`eventlet.greenio.GreenSocket`
:param environ: The wsgi environment :param environ: The wsgi environment
:param version: The WebSocket spec version to follow (default is 76)
""" """
self.socket = sock self.socket = sock
self.origin = environ.get('HTTP_ORIGIN') self.origin = environ.get('HTTP_ORIGIN')
self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL') self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
self.path = environ.get('PATH_INFO') self.path = environ.get('PATH_INFO')
self.environ = environ self.environ = environ
self.version = version
self.websocket_closed = False
self._buf = "" self._buf = ""
self._msgs = collections.deque() self._msgs = collections.deque()
self._sendlock = semaphore.Semaphore() self._sendlock = semaphore.Semaphore()
@staticmethod @staticmethod
def pack_message(message): def _pack_message(message):
"""Pack the message inside ``00`` and ``FF`` """Pack the message inside ``00`` and ``FF``
As per the dataframing section (5.3) for the websocket spec As per the dataframing section (5.3) for the websocket spec
@@ -87,11 +172,10 @@ class WebSocket(object):
packed = "\x00%s\xFF" % message packed = "\x00%s\xFF" % message
return packed return packed
def parse_messages(self): def _parse_messages(self):
""" Parses for messages in the buffer *buf*. It is assumed that """ Parses for messages in the buffer *buf*. It is assumed that
the buffer contains the start character for a message, but that it the buffer contains the start character for a message, but that it
may contain only part of the rest of the message. NOTE: only understands may contain only part of the rest of the message.
lengthless messages for now.
Returns an array of messages, and the buffer remainder that Returns an array of messages, and the buffer remainder that
didn't contain any full messages.""" didn't contain any full messages."""
@@ -99,20 +183,29 @@ class WebSocket(object):
end_idx = 0 end_idx = 0
buf = self._buf buf = self._buf
while buf: while buf:
assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf frame_type = ord(buf[0])
end_idx = buf.find("\xFF") if frame_type == 0:
if end_idx == -1: #pragma NO COVER # Normal message.
end_idx = buf.find("\xFF")
if end_idx == -1: #pragma NO COVER
break
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
buf = buf[end_idx+1:]
elif frame_type == 255:
# Closing handshake.
assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
self.websocket_closed = True
break break
msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) else:
buf = buf[end_idx+1:] raise ValueError("Don't understand how to parse this type of message: %r" % buf)
self._buf = buf self._buf = buf
return msgs return msgs
def send(self, message): def send(self, message):
"""Send a message to the client. *message* should be """Send a message to the browser. *message* should be
convertable to a string; unicode objects should be encodable convertable to a string; unicode objects should be encodable
as utf-8.""" as utf-8."""
packed = self.pack_message(message) packed = self._pack_message(message)
# if two greenthreads are trying to send at the same time # if two greenthreads are trying to send at the same time
# on the same socket, sendlock prevents interleaving and corruption # on the same socket, sendlock prevents interleaving and corruption
self._sendlock.acquire() self._sendlock.acquire()
@@ -125,18 +218,34 @@ class WebSocket(object):
"""Waits for and deserializes messages. Returns a single """Waits for and deserializes messages. Returns a single
message; the oldest not yet processed.""" message; the oldest not yet processed."""
while not self._msgs: while not self._msgs:
# Websocket might be closed already.
if self.websocket_closed:
return None
# no parsed messages, must mean buf needs more data # no parsed messages, must mean buf needs more data
delta = self.socket.recv(8096) delta = self.socket.recv(8096)
if delta == '': if delta == '':
return None return None
self._buf += delta self._buf += delta
msgs = self.parse_messages() msgs = self._parse_messages()
self._msgs.extend(msgs) self._msgs.extend(msgs)
return self._msgs.popleft() return self._msgs.popleft()
def _send_closing_frame(self, ignore_send_errors=False):
"""Sends the closing frame to the client, if required."""
if self.version == 76 and not self.websocket_closed:
try:
self.socket.sendall("\xff\x00")
except SocketError:
# Sometimes, like when the remote side cuts off the connection,
# we don't care about this.
if not ignore_send_errors: #pragma NO COVER
raise
self.websocket_closed = True
def close(self): def close(self):
"""Forcibly close the websocket; generally it is preferable to """Forcibly close the websocket; generally it is preferable to
return from the handler method.""" return from the handler method."""
self._send_closing_frame()
self.socket.shutdown(True) self.socket.shutdown(True)
self.socket.close() self.socket.close()

View File

@@ -90,36 +90,54 @@ class Input(object):
self.position += len(read) self.position += len(read)
return read return read
def _chunked_read(self, rfile, length=None): def _chunked_read(self, rfile, length=None, use_readline=False):
if self.wfile is not None: if self.wfile is not None:
## 100 Continue ## 100 Continue
self.wfile.write(self.wfile_line) self.wfile.write(self.wfile_line)
self.wfile = None self.wfile = None
self.wfile_line = None self.wfile_line = None
response = []
try: try:
if length is None: if length == 0:
if self.chunk_length > self.position: return ""
response.append(rfile.read(self.chunk_length - self.position))
while self.chunk_length != 0: if length < 0:
self.chunk_length = int(rfile.readline(), 16) length = None
response.append(rfile.read(self.chunk_length))
rfile.readline() if use_readline:
reader = self.rfile.readline
else: else:
while length > 0 and self.chunk_length != 0: reader = self.rfile.read
if self.chunk_length > self.position:
response.append(rfile.read( response = []
min(self.chunk_length - self.position, length))) while self.chunk_length != 0:
length -= len(response[-1]) maxreadlen = self.chunk_length - self.position
self.position += len(response[-1]) if length is not None and length < maxreadlen:
if self.chunk_length == self.position: maxreadlen = length
rfile.readline()
else: if maxreadlen > 0:
self.chunk_length = int(rfile.readline(), 16) data = reader(maxreadlen)
self.position = 0 if not data:
if not self.chunk_length: self.chunk_length = 0
rfile.readline() raise IOError("unexpected end of file while parsing chunked data")
datalen = len(data)
response.append(data)
self.position += datalen
if self.chunk_length == self.position:
rfile.readline()
if length is not None:
length -= datalen
if length == 0:
break
if use_readline and data[-1] == "\n":
break
else:
self.chunk_length = int(rfile.readline().split(";", 1)[0], 16)
self.position = 0
if self.chunk_length == 0:
rfile.readline()
except greenio.SSL.ZeroReturnError: except greenio.SSL.ZeroReturnError:
pass pass
return ''.join(response) return ''.join(response)
@@ -130,7 +148,10 @@ class Input(object):
return self._do_read(self.rfile.read, length) return self._do_read(self.rfile.read, length)
def readline(self, size=None): def readline(self, size=None):
return self._do_read(self.rfile.readline) if self.chunked_input:
return self._chunked_read(self.rfile, size, True)
else:
return self._do_read(self.rfile.readline, size)
def readlines(self, hint=None): def readlines(self, hint=None):
return self._do_read(self.rfile.readlines, hint) return self._do_read(self.rfile.readlines, hint)
@@ -312,7 +333,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
try: try:
try: try:
result = self.application(self.environ, start_response) result = self.application(self.environ, start_response)
if isinstance(result, _AlreadyHandled): if (isinstance(result, _AlreadyHandled)
or isinstance(getattr(result, '_obj', None), _AlreadyHandled)):
self.close_connection = 1 self.close_connection = 1
return return
if not headers_sent and hasattr(result, '__len__') and \ if not headers_sent and hasattr(result, '__len__') and \
@@ -345,8 +367,9 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
finally: finally:
if hasattr(result, 'close'): if hasattr(result, 'close'):
result.close() result.close()
if (self.environ['eventlet.input'].position if (self.environ['eventlet.input'].chunked_input or
< self.environ.get('CONTENT_LENGTH', 0)): self.environ['eventlet.input'].position \
< self.environ['eventlet.input'].content_length):
## Read and discard body if there was no pending 100-continue ## Read and discard body if there was no pending 100-continue
if not self.environ['eventlet.input'].wfile: if not self.environ['eventlet.input'].wfile:
while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE): while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE):

View File

@@ -2,7 +2,7 @@
it doesn't respect robots.txt and it is pretty brutal about how quickly it it doesn't respect robots.txt and it is pretty brutal about how quickly it
fetches pages. fetches pages.
This is a kind of "producer/consumer" example; the producer function produces This is a kind of "producer/consumer" example; the fetch function produces
jobs, and the GreenPool itself is the consumer, farming out work concurrently. jobs, and the GreenPool itself is the consumer, farming out work concurrently.
It's easier to write it this way rather than writing a standard consumer loop; It's easier to write it this way rather than writing a standard consumer loop;
GreenPool handles any exceptions raised and arranges so that there's a set GreenPool handles any exceptions raised and arranges so that there's a set
@@ -43,10 +43,10 @@ def producer(start_url):
# limit requests to eventlet.net so we don't crash all over the internet # limit requests to eventlet.net so we don't crash all over the internet
if url not in seen and 'eventlet.net' in url: if url not in seen and 'eventlet.net' in url:
seen.add(url) seen.add(url)
pool.spawn(fetch, url, q) pool.spawn_n(fetch, url, q)
return seen return seen
seen = producer("http://eventlet.net") seen = producer("http://eventlet.net")
print "I saw these urls:" print "I saw these urls:"
print "\n".join(seen) print "\n".join(seen)

View File

@@ -0,0 +1,49 @@
"""This is a recursive web crawler. Don't go pointing this at random sites;
it doesn't respect robots.txt and it is pretty brutal about how quickly it
fetches pages.
The code for this is very short; this is perhaps a good indication
that this is making the most effective use of the primitves at hand.
The fetch function does all the work of making http requests,
searching for new urls, and dispatching new fetches. The GreenPool
acts as sort of a job coordinator (and concurrency controller of
course).
"""
from __future__ import with_statement
from eventlet.green import urllib2
import eventlet
import re
# http://daringfireball.net/2009/11/liberal_regex_for_matching_urls
url_regex = re.compile(r'\b(([\w-]+://?|www[.])[^\s()<>]+(?:\([\w\d]+\)|([^[:punct:]\s]|/)))')
def fetch(url, seen, pool):
"""Fetch a url, stick any found urls into the seen set, and
dispatch any new ones to the pool."""
print "fetching", url
data = ''
with eventlet.Timeout(5, False):
data = urllib2.urlopen(url).read()
for url_match in url_regex.finditer(data):
new_url = url_match.group(0)
# only send requests to eventlet.net so as not to destroy the internet
if new_url not in seen and 'eventlet.net' in new_url:
seen.add(new_url)
# while this seems stack-recursive, it's actually not:
# spawned greenthreads start their own stacks
pool.spawn_n(fetch, new_url, seen, pool)
def crawl(start_url):
"""Recursively crawl starting from *start_url*. Returns a set of
urls that were found."""
pool = eventlet.GreenPool()
seen = set()
fetch(start_url, seen, pool)
pool.waitall()
return seen
seen = crawl("http://eventlet.net")
print "I saw these urls:"
print "\n".join(seen)

View File

@@ -5,6 +5,8 @@ from eventlet import websocket
# demo app # demo app
import os import os
import random import random
@websocket.WebSocketWSGI
def handle(ws): def handle(ws):
""" This is the websocket handler function. Note that we """ This is the websocket handler function. Note that we
can dispatch based on path in here, too.""" can dispatch based on path in here, too."""
@@ -20,12 +22,11 @@ def handle(ws):
ws.send("0 %s %s\n" % (i, random.random())) ws.send("0 %s %s\n" % (i, random.random()))
eventlet.sleep(0.1) eventlet.sleep(0.1)
wsapp = websocket.WebSocketWSGI(handle)
def dispatch(environ, start_response): def dispatch(environ, start_response):
""" This resolves to the web page or the websocket depending on """ This resolves to the web page or the websocket depending on
the path.""" the path."""
if environ['PATH_INFO'] == '/data': if environ['PATH_INFO'] == '/data':
return wsapp(environ, start_response) return handle(environ, start_response)
else: else:
start_response('200 OK', [('content-type', 'text/html')]) start_response('200 OK', [('content-type', 'text/html')])
return [open(os.path.join( return [open(os.path.join(

View File

@@ -191,6 +191,7 @@ def get_database_auth():
try: try:
import simplejson import simplejson
except ImportError: except ImportError:
print "No simplejson, using baked-in db credentials."
return retval return retval
if 'EVENTLET_DB_TEST_AUTH' in os.environ: if 'EVENTLET_DB_TEST_AUTH' in os.environ:

View File

@@ -1,7 +1,12 @@
import os
import eventlet import eventlet
from eventlet import event from eventlet import event
from tests import LimitedTestCase, s2b from tests import LimitedTestCase, s2b
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
class TestServe(LimitedTestCase): class TestServe(LimitedTestCase):
def setUp(self): def setUp(self):
super(TestServe, self).setUp() super(TestServe, self).setUp()
@@ -101,3 +106,17 @@ class TestServe(LimitedTestCase):
timeout_value="timed out") timeout_value="timed out")
self.assertEquals(x, "timed out") self.assertEquals(x, "timed out")
def test_wrap_ssl(self):
server = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file, server_side=True)
port = server.getsockname()[1]
def handle(sock,addr):
sock.sendall(sock.recv(1024))
raise eventlet.StopServe()
eventlet.spawn(eventlet.serve, server, handle)
client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
client.sendall("echo")
self.assertEquals("echo", client.recv(1024))

59
tests/env_test.py Normal file
View File

@@ -0,0 +1,59 @@
import os
from tests.patcher_test import ProcessBase
from tests import skip_with_pyevent
class Socket(ProcessBase):
def test_patched_thread(self):
new_mod = """from eventlet.green import socket
socket.gethostbyname('localhost')
socket.getaddrinfo('localhost', 80)
"""
os.environ['EVENTLET_TPOOL_DNS'] = 'yes'
try:
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 1, lines)
finally:
del os.environ['EVENTLET_TPOOL_DNS']
class Tpool(ProcessBase):
@skip_with_pyevent
def test_tpool_size(self):
new_mod = """from eventlet import tpool
import eventlet
import time
current = [0]
highwater = [0]
def count():
current[0] += 1
time.sleep(0.04)
if current[0] > highwater[0]:
highwater[0] = current[0]
current[0] -= 1
expected = 40
p = eventlet.GreenPool()
for i in xrange(expected):
p.spawn(tpool.execute,count)
p.waitall()
assert highwater[0] == expected, "%s != %s" % (highwater[0], expected)"""
os.environ['EVENTLET_THREADPOOL_SIZE'] = "40"
try:
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 1, lines)
finally:
del os.environ['EVENTLET_THREADPOOL_SIZE']
class Hub(ProcessBase):
def test_eventlet_hub(self):
new_mod = """from eventlet import hubs
print hubs.get_hub()
"""
os.environ['EVENTLET_HUB'] = 'selects'
try:
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, "\n".join(lines))
self.assert_("selects" in lines[0])
finally:
del os.environ['EVENTLET_HUB']

View File

@@ -154,6 +154,42 @@ class TestHubBlockingDetector(LimitedTestCase):
self.assertRaises(RuntimeError, gt.wait) self.assertRaises(RuntimeError, gt.wait)
debug.hub_blocking_detection(False) debug.hub_blocking_detection(False)
class TestSuspend(LimitedTestCase):
TEST_TIMEOUT=3
def test_suspend_doesnt_crash(self):
import errno
import os
import shutil
import signal
import subprocess
import sys
import tempfile
self.tempdir = tempfile.mkdtemp('test_suspend')
filename = os.path.join(self.tempdir, 'test_suspend.py')
fd = open(filename, "w")
fd.write("""import eventlet
eventlet.Timeout(0.5)
try:
eventlet.listen(("127.0.0.1", 0)).accept()
except eventlet.Timeout:
print "exited correctly"
""")
fd.close()
python_path = os.pathsep.join(sys.path + [self.tempdir])
new_env = os.environ.copy()
new_env['PYTHONPATH'] = python_path
p = subprocess.Popen([sys.executable,
os.path.join(self.tempdir, filename)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
eventlet.sleep(0.4) # wait for process to hit accept
os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
os.kill(p.pid, signal.SIGCONT)
output, _ = p.communicate()
lines = [l for l in output.split("\n") if l]
self.assert_("exited correctly" in lines[-1])
shutil.rmtree(self.tempdir)
class Foo(object): class Foo(object):
pass pass

View File

@@ -30,11 +30,11 @@ def fetch(num, secs):
f = eventlet.spawn(fetch, 2, 1) f = eventlet.spawn(fetch, 2, 1)
t = eventlet.spawn(tick, 2, 100) t = eventlet.spawn(tick, 2, 100)
f.wait() f.wait()
assert count[0] > 150 assert count[0] > 100, count[0]
print "done" print "done"
""" """
class PatchingPsycopg(patcher_test.Patcher): class PatchingPsycopg(patcher_test.ProcessBase):
def test_psycopg_pached(self): def test_psycopg_pached(self):
if 'PSYCOPG_TEST_DSN' not in os.environ: if 'PSYCOPG_TEST_DSN' not in os.environ:
# construct a non-json dsn for the subprocess # construct a non-json dsn for the subprocess
@@ -50,5 +50,5 @@ class PatchingPsycopg(patcher_test.Patcher):
print "Can't test psycopg2 patching; it's not installed." print "Can't test psycopg2 patching; it's not installed."
return return
# if there's anything wrong with the test program it'll have a stack trace # if there's anything wrong with the test program it'll have a stack trace
self.assert_(lines[0].startswith('done'), repr(output)) self.assert_(lines[0].startswith('done'), output)

View File

@@ -4,7 +4,7 @@ import subprocess
import sys import sys
import tempfile import tempfile
from tests import LimitedTestCase, main from tests import LimitedTestCase, main, skip_with_pyevent
base_module_contents = """ base_module_contents = """
import socket import socket
@@ -27,7 +27,7 @@ import socket
print "importing", patching, socket, patching.socket, patching.urllib print "importing", patching, socket, patching.socket, patching.urllib
""" """
class Patcher(LimitedTestCase): class ProcessBase(LimitedTestCase):
TEST_TIMEOUT=3 # starting processes is time-consuming TEST_TIMEOUT=3 # starting processes is time-consuming
def setUp(self): def setUp(self):
self._saved_syspath = sys.path self._saved_syspath = sys.path
@@ -55,7 +55,7 @@ class Patcher(LimitedTestCase):
return output, lines return output, lines
class ImportPatched(Patcher): class ImportPatched(ProcessBase):
def test_patch_a_module(self): def test_patch_a_module(self):
self.write_to_tempfile("base", base_module_contents) self.write_to_tempfile("base", base_module_contents)
self.write_to_tempfile("patching", patching_module_contents) self.write_to_tempfile("patching", patching_module_contents)
@@ -85,7 +85,7 @@ print "newmod", base, base.socket, base.urllib.socket.socket
self.assert_('GreenSocket' in lines[1], repr(output)) self.assert_('GreenSocket' in lines[1], repr(output))
class MonkeyPatch(Patcher): class MonkeyPatch(ProcessBase):
def test_patched_modules(self): def test_patched_modules(self):
new_mod = """ new_mod = """
from eventlet import patcher from eventlet import patcher
@@ -126,22 +126,6 @@ print "newmod"
self.assertEqual(len(lines), 2, repr(output)) self.assertEqual(len(lines), 2, repr(output))
self.assert_(lines[0].startswith('newmod'), repr(output)) self.assert_(lines[0].startswith('newmod'), repr(output))
def test_tpool(self):
new_mod = """
import eventlet
from eventlet import patcher
patcher.monkey_patch()
from eventlet import tpool
print "newmod", tpool.execute(len, "hi")
print "newmod", tpool.execute(len, "hi2")
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, repr(output))
self.assert_(lines[0].startswith('newmod'), repr(output))
self.assert_('2' in lines[0], repr(output))
self.assert_('3' in lines[1], repr(output))
def test_typeerror(self): def test_typeerror(self):
new_mod = """ new_mod = """
@@ -172,7 +156,7 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys()))
self.assert_(lines[0].startswith(ap), repr(output)) self.assert_(lines[0].startswith(ap), repr(output))
patched_modules = lines[0][len(ap):].strip() patched_modules = lines[0][len(ap):].strip()
# psycopg might or might not be patched based on installed modules # psycopg might or might not be patched based on installed modules
patched_modules.replace("psycopg,", "") patched_modules = patched_modules.replace("psycopg,", "")
self.assertEqual(patched_modules, expected, self.assertEqual(patched_modules, expected,
"Logic:%s\nExpected: %s != %s" %(call, expected, "Logic:%s\nExpected: %s != %s" %(call, expected,
patched_modules)) patched_modules))
@@ -217,5 +201,73 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys()))
"select=True)", "select=True)",
'select') 'select')
test_monkey_patch_threading = """
def test_monkey_patch_threading():
tickcount = [0]
def tick():
for i in xrange(1000):
tickcount[0] += 1
eventlet.sleep()
def do_sleep():
tpool.execute(time.sleep, 0.5)
eventlet.spawn(tick)
w1 = eventlet.spawn(do_sleep)
w1.wait()
print tickcount[0]
assert tickcount[0] > 900
tpool.killall()
"""
class Tpool(ProcessBase):
TEST_TIMEOUT=3
@skip_with_pyevent
def test_simple(self):
new_mod = """
import eventlet
from eventlet import patcher
patcher.monkey_patch()
from eventlet import tpool
print "newmod", tpool.execute(len, "hi")
print "newmod", tpool.execute(len, "hi2")
tpool.killall()
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, output)
self.assert_(lines[0].startswith('newmod'), repr(output))
self.assert_('2' in lines[0], repr(output))
self.assert_('3' in lines[1], repr(output))
@skip_with_pyevent
def test_unpatched_thread(self):
new_mod = """import eventlet
eventlet.monkey_patch(time=False, thread=False)
from eventlet import tpool
import time
"""
new_mod += test_monkey_patch_threading
new_mod += "\ntest_monkey_patch_threading()\n"
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, lines)
@skip_with_pyevent
def test_patched_thread(self):
new_mod = """import eventlet
eventlet.monkey_patch(time=False, thread=True)
from eventlet import tpool
import time
"""
new_mod += test_monkey_patch_threading
new_mod += "\ntest_monkey_patch_threading()\n"
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, "\n".join(lines))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -15,7 +15,7 @@ class Error(Exception):
pass pass
class Test(LimitedTestCase): class Test(LimitedTestCase):
def test_api(self): def test_cancellation(self):
# Nothing happens if with-block finishes before the timeout expires # Nothing happens if with-block finishes before the timeout expires
t = Timeout(DELAY*2) t = Timeout(DELAY*2)
sleep(0) # make it pending sleep(0) # make it pending
@@ -27,6 +27,7 @@ class Test(LimitedTestCase):
assert not t.pending, repr(t) assert not t.pending, repr(t)
sleep(DELAY*2) sleep(DELAY*2)
def test_raising_self(self):
# An exception will be raised if it's not # An exception will be raised if it's not
try: try:
with Timeout(DELAY) as t: with Timeout(DELAY) as t:
@@ -36,6 +37,17 @@ class Test(LimitedTestCase):
else: else:
raise AssertionError('must raise Timeout') raise AssertionError('must raise Timeout')
def test_raising_self_true(self):
# specifying True as the exception raises self as well
try:
with Timeout(DELAY, True) as t:
sleep(DELAY*2)
except Timeout, ex:
assert ex is t, (ex, t)
else:
raise AssertionError('must raise Timeout')
def test_raising_custom_exception(self):
# You can customize the exception raised: # You can customize the exception raised:
try: try:
with Timeout(DELAY, IOError("Operation takes way too long")): with Timeout(DELAY, IOError("Operation takes way too long")):
@@ -43,6 +55,7 @@ class Test(LimitedTestCase):
except IOError, ex: except IOError, ex:
assert str(ex)=="Operation takes way too long", repr(ex) assert str(ex)=="Operation takes way too long", repr(ex)
def test_raising_exception_class(self):
# Providing classes instead of values should be possible too: # Providing classes instead of values should be possible too:
try: try:
with Timeout(DELAY, ValueError): with Timeout(DELAY, ValueError):
@@ -50,6 +63,7 @@ class Test(LimitedTestCase):
except ValueError: except ValueError:
pass pass
def test_raising_exc_tuple(self):
try: try:
1//0 1//0
except: except:
@@ -63,12 +77,16 @@ class Test(LimitedTestCase):
else: else:
raise AssertionError('should not get there') raise AssertionError('should not get there')
def test_cancel_timer_inside_block(self):
# It's possible to cancel the timer inside the block: # It's possible to cancel the timer inside the block:
with Timeout(DELAY) as timer: with Timeout(DELAY) as timer:
timer.cancel() timer.cancel()
sleep(DELAY*2) sleep(DELAY*2)
# To silent the exception before exiting the block, pass False as second parameter.
def test_silent_block(self):
# To silence the exception before exiting the block, pass
# False as second parameter.
XDELAY=0.1 XDELAY=0.1
start = time.time() start = time.time()
with Timeout(XDELAY, False): with Timeout(XDELAY, False):
@@ -76,6 +94,8 @@ class Test(LimitedTestCase):
delta = (time.time()-start) delta = (time.time()-start)
assert delta<XDELAY*2, delta assert delta<XDELAY*2, delta
def test_dummy_timer(self):
# passing None as seconds disables the timer # passing None as seconds disables the timer
with Timeout(None): with Timeout(None):
sleep(DELAY) sleep(DELAY)

View File

@@ -95,6 +95,11 @@ class TestTpool(LimitedTestCase):
exp3 = prox.compile('/') exp3 = prox.compile('/')
self.assert_(exp1 != exp3) self.assert_(exp1 != exp3)
@skip_with_pyevent
def test_wrap_ints(self):
p = tpool.Proxy(4)
self.assert_(p == 4)
@skip_with_pyevent @skip_with_pyevent
def test_wrap_hash(self): def test_wrap_hash(self):
prox1 = tpool.Proxy(''+'A') prox1 = tpool.Proxy(''+'A')
@@ -243,6 +248,28 @@ class TestTpool(LimitedTestCase):
# violating the abstraction to check that we didn't double-wrap # violating the abstraction to check that we didn't double-wrap
self.assert_(not isinstance(x._obj, tpool.Proxy)) self.assert_(not isinstance(x._obj, tpool.Proxy))
@skip_with_pyevent
def test_callable(self):
def wrapped(arg):
return arg
x = tpool.Proxy(wrapped)
self.assertEquals(4, x(4))
# verify that it wraps return values if specified
x = tpool.Proxy(wrapped, autowrap_names=('__call__',))
self.assert_(isinstance(x(4), tpool.Proxy))
self.assertEquals("4", str(x(4)))
@skip_with_pyevent
def test_callable_iterator(self):
def wrapped(arg):
yield arg
yield arg
yield arg
x = tpool.Proxy(wrapped, autowrap_names=('__call__',))
for r in x(3):
self.assertEquals(3, r)
class TpoolLongTests(LimitedTestCase): class TpoolLongTests(LimitedTestCase):
TEST_TIMEOUT=60 TEST_TIMEOUT=60
@skip_with_pyevent @skip_with_pyevent
@@ -294,5 +321,6 @@ from eventlet.tpool import execute
iterations, tpool_overhead, best_normal, best_tpool) iterations, tpool_overhead, best_normal, best_tpool)
tpool.killall() tpool.killall()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -47,7 +47,7 @@ class TestWebSocket(_TestBase):
raise raise
self.assertRaises(urllib2.HTTPError, raiser) self.assertRaises(urllib2.HTTPError, raiser)
def test_incomplete_headers(self): def test_incomplete_headers_75(self):
headers = dict(kv.split(': ') for kv in [ headers = dict(kv.split(': ') for kv in [
"Upgrade: WebSocket", "Upgrade: WebSocket",
# NOTE: intentionally no connection header # NOTE: intentionally no connection header
@@ -63,7 +63,42 @@ class TestWebSocket(_TestBase):
self.assertEqual(resp.getheader('connection'), 'close') self.assertEqual(resp.getheader('connection'), 'close')
self.assertEqual(resp.read(), '') self.assertEqual(resp.read(), '')
def test_correct_upgrade_request(self): def test_incomplete_headers_76(self):
# First test: Missing Connection:
headers = dict(kv.split(': ') for kv in [
"Upgrade: WebSocket",
# NOTE: intentionally no connection header
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
])
http = httplib.HTTPConnection('localhost', self.port)
http.request("GET", "/echo", headers=headers)
resp = http.getresponse()
self.assertEqual(resp.status, 400)
self.assertEqual(resp.getheader('connection'), 'close')
self.assertEqual(resp.read(), '')
# Now, miss off key2
headers = dict(kv.split(': ') for kv in [
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
# NOTE: Intentionally no Key2 header
])
http = httplib.HTTPConnection('localhost', self.port)
http.request("GET", "/echo", headers=headers)
resp = http.getresponse()
self.assertEqual(resp.status, 400)
self.assertEqual(resp.getheader('connection'), 'close')
self.assertEqual(resp.read(), '')
def test_correct_upgrade_request_75(self):
connect = [ connect = [
"GET /echo HTTP/1.1", "GET /echo HTTP/1.1",
"Upgrade: WebSocket", "Upgrade: WebSocket",
@@ -85,7 +120,32 @@ class TestWebSocket(_TestBase):
'WebSocket-Origin: http://localhost:%s' % self.port, 'WebSocket-Origin: http://localhost:%s' % self.port,
'WebSocket-Location: ws://localhost:%s/echo\r\n\r\n' % self.port])) 'WebSocket-Location: ws://localhost:%s/echo\r\n\r\n' % self.port]))
def test_sending_messages_to_websocket(self): def test_correct_upgrade_request_76(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
result = sock.recv(1024)
## The server responds the correct Websocket handshake
self.assertEqual(result,
'\r\n'.join(['HTTP/1.1 101 Web Socket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Sec-WebSocket-Origin: http://localhost:%s' % self.port,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: ws://localhost:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port]))
def test_sending_messages_to_websocket_75(self):
connect = [ connect = [
"GET /echo HTTP/1.1", "GET /echo HTTP/1.1",
"Upgrade: WebSocket", "Upgrade: WebSocket",
@@ -111,7 +171,35 @@ class TestWebSocket(_TestBase):
sock.close() sock.close()
eventlet.sleep(0.01) eventlet.sleep(0.01)
def test_getting_messages_from_websocket(self): def test_sending_messages_to_websocket_76(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
first_resp = sock.recv(1024)
sock.sendall('\x00hello\xFF')
result = sock.recv(1024)
self.assertEqual(result, '\x00hello\xff')
sock.sendall('\x00start')
eventlet.sleep(0.001)
sock.sendall(' end\xff')
result = sock.recv(1024)
self.assertEqual(result, '\x00start end\xff')
sock.shutdown(socket.SHUT_RDWR)
sock.close()
eventlet.sleep(0.01)
def test_getting_messages_from_websocket_75(self):
connect = [ connect = [
"GET /range HTTP/1.1", "GET /range HTTP/1.1",
"Upgrade: WebSocket", "Upgrade: WebSocket",
@@ -134,7 +222,32 @@ class TestWebSocket(_TestBase):
# Last item in msgs is an empty string # Last item in msgs is an empty string
self.assertEqual(msgs[:-1], ['msg %d' % i for i in range(10)]) self.assertEqual(msgs[:-1], ['msg %d' % i for i in range(10)])
def test_breaking_the_connection(self): def test_getting_messages_from_websocket_76(self):
connect = [
"GET /range HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024)
headers, result = resp.split('\r\n\r\n')
msgs = [result[16:].strip('\x00\xff')]
cnt = 10
while cnt:
msgs.append(sock.recv(20).strip('\x00\xff'))
cnt -= 1
# Last item in msgs is an empty string
self.assertEqual(msgs[:-1], ['msg %d' % i for i in range(10)])
def test_breaking_the_connection_75(self):
error_detected = [False] error_detected = [False]
done_with_request = event.Event() done_with_request = event.Event()
site = self.site site = self.site
@@ -165,7 +278,126 @@ class TestWebSocket(_TestBase):
done_with_request.wait() done_with_request.wait()
self.assert_(not error_detected[0]) self.assert_(not error_detected[0])
def test_app_socket_errors(self): def test_breaking_the_connection_76(self):
error_detected = [False]
done_with_request = event.Event()
site = self.site
def error_detector(environ, start_response):
try:
try:
return site(environ, start_response)
except:
error_detected[0] = True
raise
finally:
done_with_request.send(True)
self.site = error_detector
self.spawn_server()
connect = [
"GET /range HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024) # get the headers
sock.close() # close while the app is running
done_with_request.wait()
self.assert_(not error_detected[0])
def test_client_closing_connection_76(self):
error_detected = [False]
done_with_request = event.Event()
site = self.site
def error_detector(environ, start_response):
try:
try:
return site(environ, start_response)
except:
error_detected[0] = True
raise
finally:
done_with_request.send(True)
self.site = error_detector
self.spawn_server()
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024) # get the headers
sock.sendall('\xff\x00') # "Close the connection" packet.
done_with_request.wait()
self.assert_(not error_detected[0])
def test_client_invalid_packet_76(self):
error_detected = [False]
done_with_request = event.Event()
site = self.site
def error_detector(environ, start_response):
try:
try:
return site(environ, start_response)
except:
error_detected[0] = True
raise
finally:
done_with_request.send(True)
self.site = error_detector
self.spawn_server()
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024) # get the headers
sock.sendall('\xef\x00') # Weird packet.
done_with_request.wait()
self.assert_(error_detected[0])
def test_server_closing_connect_76(self):
connect = [
"GET / HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024)
headers, result = resp.split('\r\n\r\n')
# The remote server should have immediately closed the connection.
self.assertEqual(result[16:], '\xff\x00')
def test_app_socket_errors_75(self):
error_detected = [False] error_detected = [False]
done_with_request = event.Event() done_with_request = event.Event()
site = self.site site = self.site
@@ -195,6 +427,38 @@ class TestWebSocket(_TestBase):
done_with_request.wait() done_with_request.wait()
self.assert_(error_detected[0]) self.assert_(error_detected[0])
def test_app_socket_errors_76(self):
error_detected = [False]
done_with_request = event.Event()
site = self.site
def error_detector(environ, start_response):
try:
try:
return site(environ, start_response)
except:
error_detected[0] = True
raise
finally:
done_with_request.send(True)
self.site = error_detector
self.spawn_server()
connect = [
"GET /error HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: localhost:%s" % self.port,
"Origin: http://localhost:%s" % self.port,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(
('localhost', self.port))
sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')
resp = sock.recv(1024)
done_with_request.wait()
self.assert_(error_detected[0])
class TestWebSocketObject(LimitedTestCase): class TestWebSocketObject(LimitedTestCase):

View File

@@ -5,12 +5,11 @@ import errno
import os import os
import socket import socket
import sys import sys
from tests import skipped, LimitedTestCase from tests import skipped, LimitedTestCase, skip_with_pyevent
from unittest import main from unittest import main
from eventlet import api
from eventlet import util
from eventlet import greenio from eventlet import greenio
from eventlet import event
from eventlet.green import socket as greensocket from eventlet.green import socket as greensocket
from eventlet import wsgi from eventlet import wsgi
from eventlet.support import get_errno from eventlet.support import get_errno
@@ -94,13 +93,13 @@ class IterableApp(object):
return self.return_val return self.return_val
class IterableSite(Site): class IterableSite(Site):
def __call__(self, env, start_response): def __call__(self, env, start_response):
iter = self.application(env, start_response) it = self.application(env, start_response)
for i in iter: for i in it:
yield i yield i
CONTENT_LENGTH = 'content-length' CONTENT_LENGTH = 'content-length'
@@ -146,7 +145,6 @@ def read_http(sock):
if CONTENT_LENGTH in headers: if CONTENT_LENGTH in headers:
num = int(headers[CONTENT_LENGTH]) num = int(headers[CONTENT_LENGTH])
body = fd.read(num) body = fd.read(num)
#print body
else: else:
# read until EOF # read until EOF
body = fd.read() body = fd.read()
@@ -381,11 +379,14 @@ class TestHttpd(_TestBase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app) self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(('localhost', self.port)) sock = eventlet.connect(('localhost', self.port))
sock = util.wrap_ssl(sock) sock = eventlet.wrap_ssl(sock)
sock.write('POST /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-length:3\r\n\r\nabc') sock.write('POST /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-length:3\r\n\r\nabc')
result = sock.read(8192) result = sock.read(8192)
self.assertEquals(result[-3:], 'abc') self.assertEquals(result[-3:], 'abc')
@@ -397,11 +398,14 @@ class TestHttpd(_TestBase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app) self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(('localhost', server_sock.getsockname()[1])) sock = eventlet.connect(('localhost', server_sock.getsockname()[1]))
sock = util.wrap_ssl(sock) sock = eventlet.wrap_ssl(sock)
sock.write('GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') sock.write('GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
result = sock.read(8192) result = sock.read(8192)
self.assertEquals(result[-4:], '\r\n\r\n') self.assertEquals(result[-4:], '\r\n\r\n')
@@ -504,12 +508,14 @@ class TestHttpd(_TestBase):
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile) server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile)
client = eventlet.connect(('localhost', sock.getsockname()[1])) client = eventlet.connect(('localhost', sock.getsockname()[1]))
client = util.wrap_ssl(client) client = eventlet.wrap_ssl(client)
client.write('X') # non-empty payload so that SSL handshake occurs client.write('X') # non-empty payload so that SSL handshake occurs
greenio.shutdown_safe(client) greenio.shutdown_safe(client)
client.close() client.close()
@@ -787,7 +793,10 @@ class TestHttpd(_TestBase):
except Exception, e: except Exception, e:
errored[0] = 'SSL handshake error raised exception %s.' % e errored[0] = 'SSL handshake error raised exception %s.' % e
for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'): for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'):
srv_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) srv_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
port = srv_sock.getsockname()[1] port = srv_sock.getsockname()[1]
g = eventlet.spawn_n(server, srv_sock) g = eventlet.spawn_n(server, srv_sock)
client = eventlet.connect(('localhost', port)) client = eventlet.connect(('localhost', port))
@@ -833,6 +842,34 @@ class TestHttpd(_TestBase):
# (one terminates the chunk, one terminates the body) # (one terminates the chunk, one terminates the body)
self.assertEqual(response, ['0', '', '']) self.assertEqual(response, ['0', '', ''])
def test_aborted_chunked_post(self):
read_content = event.Event()
blew_up = [False]
def chunk_reader(env, start_response):
try:
content = env['wsgi.input'].read(1024)
except IOError:
blew_up[0] = True
content = 'ok'
read_content.send(content)
start_response('200 OK', [('Content-Type', 'text/plain')])
return [content]
self.site.application = chunk_reader
expected_body = 'a bunch of stuff'
data = "\r\n".join(['PUT /somefile HTTP/1.0',
'Transfer-Encoding: chunked',
'',
'def',
expected_body])
# start PUT-ing some chunked data but close prematurely
sock = eventlet.connect(('127.0.0.1', self.port))
sock.sendall(data)
sock.close()
# the test passes if we successfully get here, and read all the data
# in spite of the early close
self.assertEqual(read_content.wait(), 'ok')
self.assert_(blew_up[0])
def read_headers(sock): def read_headers(sock):
fd = sock.makefile() fd = sock.makefile()
try: try:
@@ -862,12 +899,14 @@ def read_headers(sock):
return response_line, headers return response_line, headers
class IterableAlreadyHandledTest(_TestBase): class IterableAlreadyHandledTest(_TestBase):
def set_site(self): def set_site(self):
self.site = IterableSite() self.site = IterableSite()
def get_app(self):
return IterableApp(True)
def test_iterable_app_keeps_socket_open_unless_connection_close_sent(self): def test_iterable_app_keeps_socket_open_unless_connection_close_sent(self):
self.site.application = IterableApp(True) self.site.application = self.get_app()
sock = eventlet.connect( sock = eventlet.connect(
('localhost', self.port)) ('localhost', self.port))
@@ -876,7 +915,6 @@ class IterableAlreadyHandledTest(_TestBase):
fd.flush() fd.flush()
response_line, headers = read_headers(sock) response_line, headers = read_headers(sock)
print headers
self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n') self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n')
self.assert_('connection' not in headers) self.assert_('connection' not in headers)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
@@ -886,6 +924,133 @@ class IterableAlreadyHandledTest(_TestBase):
self.assertEqual(headers.get('transfer-encoding'), 'chunked') self.assertEqual(headers.get('transfer-encoding'), 'chunked')
self.assertEqual(body, '0\r\n\r\n') # Still coming back chunked self.assertEqual(body, '0\r\n\r\n') # Still coming back chunked
class ProxiedIterableAlreadyHandledTest(IterableAlreadyHandledTest):
# same thing as the previous test but ensuring that it works with tpooled
# results as well as regular ones
@skip_with_pyevent
def get_app(self):
from eventlet import tpool
return tpool.Proxy(super(ProxiedIterableAlreadyHandledTest, self).get_app())
class TestChunkedInput(_TestBase):
dirt = ""
validator = None
def application(self, env, start_response):
input = env['wsgi.input']
response = []
pi = env["PATH_INFO"]
if pi=="/short-read":
d=input.read(10)
response = [d]
elif pi=="/lines":
for x in input:
response.append(x)
elif pi=="/ping":
input.read()
response.append("pong")
else:
raise RuntimeError("bad path")
start_response('200 OK', [('Content-Type', 'text/plain')])
return response
def connect(self):
return eventlet.connect(('localhost', self.port))
def set_site(self):
self.site = Site()
self.site.application = self.application
def chunk_encode(self, chunks, dirt=None):
if dirt is None:
dirt = self.dirt
b = ""
for c in chunks:
b += "%x%s\r\n%s\r\n" % (len(c), dirt, c)
return b
def body(self, dirt=None):
return self.chunk_encode(["this", " is ", "chunked", "\nline", " 2", "\n", "line3", ""], dirt=dirt)
def ping(self, fd):
fd.sendall("GET /ping HTTP/1.1\r\n\r\n")
self.assertEquals(read_http(fd)[-1], "pong")
def test_short_read_with_content_length(self):
body = self.body()
req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:1000\r\n\r\n" + body
fd = self.connect()
fd.sendall(req)
self.assertEquals(read_http(fd)[-1], "this is ch")
self.ping(fd)
def test_short_read_with_zero_content_length(self):
body = self.body()
req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:0\r\n\r\n" + body
fd = self.connect()
fd.sendall(req)
self.assertEquals(read_http(fd)[-1], "this is ch")
self.ping(fd)
def test_short_read(self):
body = self.body()
req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body
fd = self.connect()
fd.sendall(req)
self.assertEquals(read_http(fd)[-1], "this is ch")
self.ping(fd)
def test_dirt(self):
body = self.body(dirt="; here is dirt\0bla")
req = "POST /ping HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body
fd = self.connect()
fd.sendall(req)
self.assertEquals(read_http(fd)[-1], "pong")
self.ping(fd)
def test_chunked_readline(self):
body = self.body()
req = "POST /lines HTTP/1.1\r\nContent-Length: %s\r\ntransfer-encoding: Chunked\r\n\r\n%s" % (len(body), body)
fd = self.connect()
fd.sendall(req)
self.assertEquals(read_http(fd)[-1], 'this is chunked\nline 2\nline3')
def test_close_before_finished(self):
import signal
got_signal = []
def handler(*args):
got_signal.append(1)
raise KeyboardInterrupt()
signal.signal(signal.SIGALRM, handler)
signal.alarm(1)
try:
body = '4\r\nthi'
req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body
fd = self.connect()
fd.sendall(req)
fd.close()
eventlet.sleep(0.0)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
assert not got_signal, "caught alarm signal. infinite loop detected."
if __name__ == '__main__': if __name__ == '__main__':