From 8eb8e8567cdc872bdc64580f4dba0a3f92751fce Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 6 May 2010 23:54:57 -0700 Subject: [PATCH 01/54] Speling fix. --- eventlet/debug.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventlet/debug.py b/eventlet/debug.py index cca5862..503d7bb 100644 --- a/eventlet/debug.py +++ b/eventlet/debug.py @@ -10,7 +10,7 @@ import inspect __all__ = ['spew', 'unspew', 'format_hub_listeners', 'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions'] -_token_spliter = re.compile('\W+') +_token_splitter = re.compile('\W+') class Spew(object): """ @@ -42,7 +42,7 @@ class Spew(object): if not self.show_values: return self details = [] - tokens = _token_spliter.split(line) + tokens = _token_splitter.split(line) for tok in tokens: if tok in frame.f_globals: details.append('%s=%r' % (tok, frame.f_globals[tok])) From c8be5555573133674625d6d53f3d404b558f7588 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 7 May 2010 08:30:14 -0700 Subject: [PATCH 02/54] Fix to get restore tpool on Windows. --- eventlet/tpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index 7e9dd45..0db26fd 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -231,7 +231,7 @@ def setup(): _rpipe, _wpipe = os.pipe() _wfile = greenio.GreenPipe(_wpipe, 'wb', 0) _rfile = greenio.GreenPipe(_rpipe, 'rb', 0) - except ImportError: + except (ImportError, NotImplementedError): # This is Windows compatibility -- use a socket instead of a pipe because # pipes don't really exist on Windows. import socket From a7039cee84a6944dd8a7e46ad47c5e368bf550df Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 13 May 2010 16:07:34 -0700 Subject: [PATCH 03/54] Fix for infinite loop in wsgi.py, thanks to redbo's repro. --- eventlet/wsgi.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 1b46395..e490e1a 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -111,8 +111,11 @@ class Input(object): if self.chunk_length > self.position: response.append(rfile.read( min(self.chunk_length - self.position, length))) - length -= len(response[-1]) - self.position += len(response[-1]) + last_read = len(response[-1]) + if last_read == 0: + break + length -= last_read + self.position += last_read if self.chunk_length == self.position: rfile.readline() else: From f874a9382704e5f36b5dfa7afdb55076d60d81e2 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 13 May 2010 16:23:54 -0700 Subject: [PATCH 04/54] Test for infinite loop in wsgi, based on redbo's repro. --- tests/wsgi_test.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index 5e79def..e4de67e 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -11,6 +11,7 @@ from unittest import main from eventlet import api from eventlet import util from eventlet import greenio +from eventlet import event from eventlet.green import socket as greensocket from eventlet import wsgi from eventlet.support import get_errno @@ -833,6 +834,28 @@ class TestHttpd(_TestBase): # (one terminates the chunk, one terminates the body) self.assertEqual(response, ['0', '', '']) + def test_aborted_chunked_post(self): + read_content = event.Event() + def chunk_reader(env, start_response): + content = env['wsgi.input'].read(1024) + read_content.send(content) + start_response('200 OK', [('Content-Type', 'text/plain')]) + return [content] + self.site.application = chunk_reader + expected_body = 'a bunch of stuff' + data = "\r\n".join(['PUT /somefile HTTP/1.0', + 'Transfer-Encoding: chunked', + '', + 'def', + expected_body]) + # start PUT-ing some chunked data but close prematurely + sock = eventlet.connect(('127.0.0.1', self.port)) + sock.sendall(data) + sock.close() + # the test passes if we successfully get here, and read all the data + # in spite of the early close + self.assertEqual(read_content.wait(), expected_body) + def read_headers(sock): fd = sock.makefile() try: From 5beb9925a52384c3211485a20a96d4db1d086b19 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 18 May 2010 13:37:08 -0700 Subject: [PATCH 05/54] Uhhh 0.9.8 derr --- eventlet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 7361da2..dda5923 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 7, "dev1") +version_info = (0, 9, 8, "dev1") __version__ = ".".join(map(str, version_info)) try: From 5e3fa7a4e72c8d3b2a4e1e041026b7059bff35f5 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 19 May 2010 20:54:30 -0700 Subject: [PATCH 06/54] From Daniele Varrazzo: 'If you are into evil speed hacks, the patch below shaves off some python lookups from the wait loop, not a bad thing. ' --- eventlet/support/psycopg2_patcher.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/eventlet/support/psycopg2_patcher.py b/eventlet/support/psycopg2_patcher.py index 7d9c1d5..29af65d 100644 --- a/eventlet/support/psycopg2_patcher.py +++ b/eventlet/support/psycopg2_patcher.py @@ -38,15 +38,20 @@ def make_psycopg_green(): extensions.set_wait_callback(eventlet_wait_callback) -def eventlet_wait_callback(conn, timeout=-1): +def eventlet_wait_callback(conn, timeout=-1, + # access these objects with LOAD_FAST instead of LOAD_GLOBAL lookup + POLL_OK=extensions.POLL_OK, + POLL_READ=extensions.POLL_READ, + POLL_WRITE=extensions.POLL_WRITE, + trampoline=trampoline): """A wait callback useful to allow eventlet to work with Psycopg.""" while 1: state = conn.poll() - if state == extensions.POLL_OK: + if state == POLL_OK: break - elif state == extensions.POLL_READ: + elif state == POLL_READ: trampoline(conn.fileno(), read=True) - elif state == extensions.POLL_WRITE: + elif state == POLL_WRITE: trampoline(conn.fileno(), write=True) else: raise psycopg2.OperationalError( From 800883ee1bd3cc4eaaabbc81495b79610983fbc9 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 22 May 2010 14:19:30 -0700 Subject: [PATCH 07/54] Added documentation for websocket module and switched to using decorators in the examples. --- doc/examples.rst | 11 ++++++++++ doc/modules.rst | 1 + eventlet/websocket.py | 49 ++++++++++++++++++++++++++++--------------- examples/websocket.py | 5 +++-- 4 files changed, 47 insertions(+), 19 deletions(-) diff --git a/doc/examples.rst b/doc/examples.rst index a1f68e5..2f7b817 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -73,3 +73,14 @@ Producer Consumer/Recursive Web Crawler This is an example implementation of the producer/consumer pattern as well as a functional recursive web crawler. .. literalinclude:: ../examples/producer_consumer.py + +.. _websocket_example: + +Websocket Server Example +-------------------------- +``examples/websocket.py`` + +This exercises some of the features of the websocket server +implementation. + +.. literalinclude:: ../examples/websocket.py diff --git a/doc/modules.rst b/doc/modules.rst index f33f4db..78561c3 100644 --- a/doc/modules.rst +++ b/doc/modules.rst @@ -15,4 +15,5 @@ Module Reference modules/queue modules/semaphore modules/timeout + modules/websocket modules/wsgi diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 5cd2447..5ef1d01 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -9,8 +9,22 @@ from eventlet.support import get_errno ACCEPTABLE_CLIENT_ERRORS = set((errno.ECONNRESET, errno.EPIPE)) +__all__ = ["WebSocketWSGI", "WebSocket"] + class WebSocketWSGI(object): - """This is a WSGI application that serves up websocket connections. + """Wraps a websocket handler function in a WSGI application. + + Use it like this:: + + @websocket.WebSocketWSGI + def my_handler(ws): + from_browser = ws.wait() + ws.send("from server") + + The single argument to the function will be an instance of + :class:`WebSocket`. To close the socket, simply return from the + function. Note that the server will log the websocket request at + the time of closure. """ def __init__(self, handler): self.handler = handler @@ -29,9 +43,9 @@ class WebSocketWSGI(object): "Connection: Upgrade\r\n" "WebSocket-Origin: %s\r\n" "WebSocket-Location: ws://%s%s\r\n\r\n" % ( - environ.get('HTTP_ORIGIN'), - environ.get('HTTP_HOST'), - environ.get('PATH_INFO'))) + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_HOST'), + environ.get('PATH_INFO'))) sock.sendall(handshake_reply) try: self.handler(ws) @@ -42,22 +56,24 @@ class WebSocketWSGI(object): # doesn't barf on the fact that we didn't call start_response return wsgi.ALREADY_HANDLED - class WebSocket(object): - """The object representing the server side of a websocket. + """A websocket object that handles the details of + serialization/deserialization to the socket. - The primary way to interact with a WebSocket object is to call - :meth:`send` and :meth:`wait` in order to pass messages back and - forth with the client. Also available are the following properties: + The primary way to interact with a :class:`WebSocket` object is to + call :meth:`send` and :meth:`wait` in order to pass messages back + and forth with the browser. Also available are the following + properties: path - The path value of the request. This is the same as the WSGI PATH_INFO variable. + The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient. protocol The value of the Websocket-Protocol header. origin The value of the 'Origin' header. environ The full WSGI environment for this request. + """ def __init__(self, sock, environ): """ @@ -75,7 +91,7 @@ class WebSocket(object): self._sendlock = semaphore.Semaphore() @staticmethod - def pack_message(message): + def _pack_message(message): """Pack the message inside ``00`` and ``FF`` As per the dataframing section (5.3) for the websocket spec @@ -87,11 +103,10 @@ class WebSocket(object): packed = "\x00%s\xFF" % message return packed - def parse_messages(self): + def _parse_messages(self): """ Parses for messages in the buffer *buf*. It is assumed that the buffer contains the start character for a message, but that it - may contain only part of the rest of the message. NOTE: only understands - lengthless messages for now. + may contain only part of the rest of the message. Returns an array of messages, and the buffer remainder that didn't contain any full messages.""" @@ -109,10 +124,10 @@ class WebSocket(object): return msgs def send(self, message): - """Send a message to the client. *message* should be + """Send a message to the browser. *message* should be convertable to a string; unicode objects should be encodable as utf-8.""" - packed = self.pack_message(message) + packed = self._pack_message(message) # if two greenthreads are trying to send at the same time # on the same socket, sendlock prevents interleaving and corruption self._sendlock.acquire() @@ -130,7 +145,7 @@ class WebSocket(object): if delta == '': return None self._buf += delta - msgs = self.parse_messages() + msgs = self._parse_messages() self._msgs.extend(msgs) return self._msgs.popleft() diff --git a/examples/websocket.py b/examples/websocket.py index 9bac0e2..ae8181f 100644 --- a/examples/websocket.py +++ b/examples/websocket.py @@ -5,6 +5,8 @@ from eventlet import websocket # demo app import os import random + +@websocket.WebSocketWSGI def handle(ws): """ This is the websocket handler function. Note that we can dispatch based on path in here, too.""" @@ -20,12 +22,11 @@ def handle(ws): ws.send("0 %s %s\n" % (i, random.random())) eventlet.sleep(0.1) -wsapp = websocket.WebSocketWSGI(handle) def dispatch(environ, start_response): """ This resolves to the web page or the websocket depending on the path.""" if environ['PATH_INFO'] == '/data': - return wsapp(environ, start_response) + return handle(environ, start_response) else: start_response('200 OK', [('content-type', 'text/html')]) return [open(os.path.join( From 7656a601f577f89d9e7aa1cf439c7e4d71c2cd4c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 22 May 2010 16:07:05 -0700 Subject: [PATCH 08/54] Small fix to make the assert work as intended. --- tests/patcher_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 739cc31..6ee9731 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -172,7 +172,7 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys())) self.assert_(lines[0].startswith(ap), repr(output)) patched_modules = lines[0][len(ap):].strip() # psycopg might or might not be patched based on installed modules - patched_modules.replace("psycopg,", "") + patched_modules = patched_modules.replace("psycopg,", "") self.assertEqual(patched_modules, expected, "Logic:%s\nExpected: %s != %s" %(call, expected, patched_modules)) From 065e8ee13dd281499b83275ac8c256a6fe1a3f2b Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 22 May 2010 20:28:37 -0700 Subject: [PATCH 09/54] Doc file for websocket module. --- doc/modules/websocket.rst | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 doc/modules/websocket.rst diff --git a/doc/modules/websocket.rst b/doc/modules/websocket.rst new file mode 100644 index 0000000..78660cd --- /dev/null +++ b/doc/modules/websocket.rst @@ -0,0 +1,30 @@ +:mod:`websocket` -- Websocket Server +===================================== + +This module provides a simple way to create a `websocket +` server. It works with a few +tweaks in the :mod:`~eventlet.wsgi` module that allow websockets to +coexist with other WSGI applications. + +To create a websocket server, simply decorate a handler method with +:class:`WebSocketWSGI` and use it as a wsgi application:: + + from eventlet import wsgi, websocket + import eventlet + + @websocket.WebSocketWSGI + def hello_world(ws): + ws.send("hello world") + + wsgi.server(eventlet.listen(('', 8090)), hello_world) + + +You can find a slightly more elaborate version of this code in the file +``examples/websocket.py``. + +**Note** that the web socket spec is still under development, and it +will be necessary to change the way that this module works in response +to spec changes. + +.. automodule:: eventlet.websocket + :members: From 2e311eda6328c4b672135f8c7a7f33df8cf67636 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 24 May 2010 19:05:14 -0700 Subject: [PATCH 10/54] Cleaned up error output a bit better so diagnosis of failing psycopg tests doesn't take quite so long to track down t configuration problems again. --- tests/__init__.py | 1 + tests/patcher_psycopg_test.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/__init__.py b/tests/__init__.py index 6349a96..5884f09 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -191,6 +191,7 @@ def get_database_auth(): try: import simplejson except ImportError: + print "No simplejson, using baked-in db credentials." return retval if 'EVENTLET_DB_TEST_AUTH' in os.environ: diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py index f63801e..47a3044 100644 --- a/tests/patcher_psycopg_test.py +++ b/tests/patcher_psycopg_test.py @@ -50,5 +50,5 @@ class PatchingPsycopg(patcher_test.Patcher): print "Can't test psycopg2 patching; it's not installed." return # if there's anything wrong with the test program it'll have a stack trace - self.assert_(lines[0].startswith('done'), repr(output)) + self.assert_(lines[0].startswith('done'), output) From a04a15685798b3a17421fb6abda912e611d5e7f8 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 24 May 2010 20:12:26 -0700 Subject: [PATCH 11/54] Finally adding a wrap_ssl implementation and removing some more test references to api and util modules. --- eventlet/__init__.py | 1 + eventlet/api.py | 2 ++ eventlet/convenience.py | 50 +++++++++++++++++++++++++++++++++++---- tests/convenience_test.py | 19 +++++++++++++++ tests/wsgi_test.py | 29 +++++++++++++++-------- 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/eventlet/__init__.py b/eventlet/__init__.py index dda5923..62dc790 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -31,6 +31,7 @@ try: listen = convenience.listen serve = convenience.serve StopServe = convenience.StopServe + wrap_ssl = convenience.wrap_ssl getcurrent = greenlet.greenlet.getcurrent diff --git a/eventlet/api.py b/eventlet/api.py index 3ee73cb..5b78e83 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -68,6 +68,8 @@ def ssl_listener(address, certificate, private_key): Returns a socket object on which one should call ``accept()`` to accept a connection on the newly bound socket. """ + warnings.warn("""eventlet.api.ssl_listener is deprecated. Please use eventlet.wrap_ssl(eventlet.listen()) instead.""", + DeprecationWarning, stacklevel=2) from eventlet import util import socket diff --git a/eventlet/convenience.py b/eventlet/convenience.py index b737242..887ca3b 100644 --- a/eventlet/convenience.py +++ b/eventlet/convenience.py @@ -97,11 +97,14 @@ def serve(sock, handle, concurrency=1000): def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, - cert_reqs=None, ssl_version=None, ca_certs=None, + cert_reqs=0, ssl_version=2, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True): - """Convenience function for converting a regular socket into an SSL - socket. Has the same interface as :func:`ssl.wrap_socket`, but - works on 2.5 or earlier, using PyOpenSSL. + """Convenience function for converting a regular socket into an + SSL socket. Has the same interface as :func:`ssl.wrap_socket`, + but works on 2.5 or earlier, using PyOpenSSL (though note that it + ignores the *cert_reqs*, *ssl_version*, *ca_certs*, + *do_handshake_on_connect*, and *suppress_ragged_eofs* arguments + when using PyOpenSSL). The preferred idiom is to call wrap_ssl directly on the creation method, e.g., ``wrap_ssl(connect(addr))`` or @@ -111,4 +114,41 @@ def wrap_ssl(sock, keyfile=None, certfile=None, server_side=False, :return Green SSL object. """ - pass + return wrap_ssl_impl(sock, keyfile=keyfile, certfile=certfile, + server_side=server_side, + cert_reqs=cert_reqs, + ssl_version=ssl_version, + ca_certs=ca_certs, + do_handshake_on_connect=do_handshake_on_connect, + suppress_ragged_eofs=suppress_ragged_eofs) + +try: + from eventlet.green import ssl + wrap_ssl_impl = ssl.wrap_socket +except ImportError: + # < 2.6, trying PyOpenSSL + from eventlet.green.OpenSSL import SSL + try: + def wrap_ssl_impl(sock, keyfile=None, certfile=None, server_side=False, + cert_reqs=None, ssl_version=None, ca_certs=None, + do_handshake_on_connect=True, suppress_ragged_eofs=True): + # theoretically the ssl_version could be respected in this + # next line + context = SSL.Context(SSL.SSLv23_METHOD) + if certfile is not None: + context.use_certificate_file(certfile) + if keyfile is not None: + context.use_privatekey_file(keyfile) + context.set_verify(SSL.VERIFY_NONE, lambda *x: True) + + connection = SSL.Connection(context, sock) + if server_side: + connection.set_accept_state() + else: + connection.set_connect_state() + return connection + except ImportError: + def wrap_ssl_impl(*a, **kw): + raise ImportError("To use SSL with Eventlet, " + "you must install PyOpenSSL or use Python 2.6 or later.") + diff --git a/tests/convenience_test.py b/tests/convenience_test.py index 64aba0a..7b86861 100644 --- a/tests/convenience_test.py +++ b/tests/convenience_test.py @@ -1,7 +1,12 @@ +import os + import eventlet from eventlet import event from tests import LimitedTestCase, s2b +certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') +private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') + class TestServe(LimitedTestCase): def setUp(self): super(TestServe, self).setUp() @@ -101,3 +106,17 @@ class TestServe(LimitedTestCase): timeout_value="timed out") self.assertEquals(x, "timed out") + def test_wrap_ssl(self): + server = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, server_side=True) + port = server.getsockname()[1] + def handle(sock,addr): + sock.sendall(sock.recv(1024)) + raise eventlet.StopServe() + eventlet.spawn(eventlet.serve, server, handle) + client = eventlet.wrap_ssl(eventlet.connect(('localhost', port))) + client.sendall("echo") + self.assertEquals("echo", client.recv(1024)) + + diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index e4de67e..2f5c05c 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -8,8 +8,6 @@ import sys from tests import skipped, LimitedTestCase from unittest import main -from eventlet import api -from eventlet import util from eventlet import greenio from eventlet import event from eventlet.green import socket as greensocket @@ -382,11 +380,14 @@ class TestHttpd(_TestBase): certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') - server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) + server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, + server_side=True) self.spawn_server(sock=server_sock, site=wsgi_app) sock = eventlet.connect(('localhost', self.port)) - sock = util.wrap_ssl(sock) + sock = eventlet.wrap_ssl(sock) sock.write('POST /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-length:3\r\n\r\nabc') result = sock.read(8192) self.assertEquals(result[-3:], 'abc') @@ -398,11 +399,14 @@ class TestHttpd(_TestBase): certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') - server_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) + server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, + server_side=True) self.spawn_server(sock=server_sock, site=wsgi_app) sock = eventlet.connect(('localhost', server_sock.getsockname()[1])) - sock = util.wrap_ssl(sock) + sock = eventlet.wrap_ssl(sock) sock.write('GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') result = sock.read(8192) self.assertEquals(result[-4:], '\r\n\r\n') @@ -505,12 +509,14 @@ class TestHttpd(_TestBase): certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') - sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) - + sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, + server_side=True) server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile) client = eventlet.connect(('localhost', sock.getsockname()[1])) - client = util.wrap_ssl(client) + client = eventlet.wrap_ssl(client) client.write('X') # non-empty payload so that SSL handshake occurs greenio.shutdown_safe(client) client.close() @@ -788,7 +794,10 @@ class TestHttpd(_TestBase): except Exception, e: errored[0] = 'SSL handshake error raised exception %s.' % e for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'): - srv_sock = api.ssl_listener(('localhost', 0), certificate_file, private_key_file) + srv_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)), + certfile=certificate_file, + keyfile=private_key_file, + server_side=True) port = srv_sock.getsockname()[1] g = eventlet.spawn_n(server, srv_sock) client = eventlet.connect(('localhost', port)) From 4ad24aee814b04e4e037ed66daa46a40c62b6d2c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 24 May 2010 20:20:47 -0700 Subject: [PATCH 12/54] Constant tweakage. --- tests/patcher_psycopg_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py index 47a3044..41ca0d7 100644 --- a/tests/patcher_psycopg_test.py +++ b/tests/patcher_psycopg_test.py @@ -30,7 +30,7 @@ def fetch(num, secs): f = eventlet.spawn(fetch, 2, 1) t = eventlet.spawn(tick, 2, 100) f.wait() -assert count[0] > 150 +assert count[0] > 100, count[0] print "done" """ From 4353de6e857cd96616b2cdbebca7ff949c216bd3 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 12:03:18 -0700 Subject: [PATCH 13/54] Fixed behavior on suspend, thanks to Michael Carter for identifying the problem. --- eventlet/hubs/poll.py | 2 +- tests/hub_test.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 890ff50..270ec0c 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -76,7 +76,7 @@ class Hub(BaseHub): return try: presult = self.do_poll(seconds) - except select.error, e: + except (IOError, select.error), e: if get_errno(e) == errno.EINTR: return raise diff --git a/tests/hub_test.py b/tests/hub_test.py index 22526f3..91fbbd8 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -143,6 +143,42 @@ class TestHubSelection(LimitedTestCase): +class TestSuspend(LimitedTestCase): + TEST_TIMEOUT=3 + def test_suspend_doesnt_crash(self): + import errno + import os + import shutil + import signal + import subprocess + import sys + import tempfile + self.tempdir = tempfile.mkdtemp('test_suspend') + filename = os.path.join(self.tempdir, 'test_suspend.py') + fd = open(filename, "w") + fd.write("""import eventlet +eventlet.Timeout(0.5) +try: + eventlet.listen(("127.0.0.1", 0)).accept() +except eventlet.Timeout: + print "exited correctly" +""") + fd.close() + python_path = os.pathsep.join(sys.path + [self.tempdir]) + new_env = os.environ.copy() + new_env['PYTHONPATH'] = python_path + p = subprocess.Popen([sys.executable, + os.path.join(self.tempdir, filename)], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) + eventlet.sleep(0.4) # wait for process to hit accept + p.send_signal(signal.SIGSTOP) # suspend and resume to generate EINTR + p.send_signal(signal.SIGCONT) + output, _ = p.communicate() + lines = [l for l in output.split("\n") if l] + self.assert_("exited correctly" in lines[-1]) + shutil.rmtree(self.tempdir) + + class Foo(object): pass From df9d2603080dadfdd78026aebc0f96602f3e1297 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 12:23:13 -0700 Subject: [PATCH 14/54] Sending signals using more backwards-compatible methodology. --- tests/hub_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/hub_test.py b/tests/hub_test.py index 91fbbd8..1e5630d 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -171,8 +171,8 @@ except eventlet.Timeout: os.path.join(self.tempdir, filename)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env) eventlet.sleep(0.4) # wait for process to hit accept - p.send_signal(signal.SIGSTOP) # suspend and resume to generate EINTR - p.send_signal(signal.SIGCONT) + os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR + os.kill(p.pid, signal.SIGCONT) output, _ = p.communicate() lines = [l for l in output.split("\n") if l] self.assert_("exited correctly" in lines[-1]) From 94dc142526e9ff11b71df2c9956d02542016504f Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 15:13:07 -0700 Subject: [PATCH 15/54] Updated NEWS with changes, adding wrap_ssl to the docs. --- NEWS | 11 +++++++++++ doc/basic_usage.rst | 2 ++ 2 files changed, 13 insertions(+) diff --git a/NEWS b/NEWS index 61cab49..0fd1c7b 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,14 @@ +0.9.8 +===== +* Support for psycopg2's asynchronous mode, from Daniele Varrazzo +* websocket module is now a core module with 100% unit test coverage thanks to Ben Ford. See its documentation at http://eventlet.net/doc/modules/websocket.html +* Added wrap_ssl convenience method, meaning that we truly no longer need api or util modules. +* Multiple-reader detection code protects against the common mistake of having multiple greenthreads read from the same socket at the same time, which can be overridden if you know what you're doing. +* Cleaner monkey_patch API: the "all" keyword is no longer necessary. +* Pool objects have a more convenient constructor -- no more need to subclass +* amajorek's reimplementation of GreenPipe +* Many bug fixes, major and minor. + 0.9.7 ===== * GreenPipe is now a context manager (thanks, quad) diff --git a/doc/basic_usage.rst b/doc/basic_usage.rst index f33a7aa..7b0aad3 100644 --- a/doc/basic_usage.rst +++ b/doc/basic_usage.rst @@ -74,6 +74,8 @@ Network Convenience Functions .. autofunction:: eventlet.listen +.. autofunction:: eventlet.wrap_ssl + .. autofunction:: eventlet.serve .. autofunction:: eventlet.StopServe From 6b0676a3ae795e6b6b9e52bc3260fd4dc6fd8808 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 17:08:00 -0700 Subject: [PATCH 16/54] 0.9.8 branding --- doc/real_index.html | 2 +- eventlet/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/real_index.html b/doc/real_index.html index 98d4927..ba1cd47 100644 --- a/doc/real_index.html +++ b/doc/real_index.html @@ -41,7 +41,7 @@ easy_install eventlet

Alternately, you can download the source tarball:

diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 62dc790..4b0e868 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 8, "dev1") +version_info = (0, 9, 8) __version__ = ".".join(map(str, version_info)) try: From 85180e9e41c72cd6f5af7551e801787636efe55a Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 17:49:41 -0700 Subject: [PATCH 18/54] Slight NEWS rewording. --- NEWS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS b/NEWS index 0fd1c7b..aa29a2d 100644 --- a/NEWS +++ b/NEWS @@ -1,7 +1,7 @@ 0.9.8 ===== * Support for psycopg2's asynchronous mode, from Daniele Varrazzo -* websocket module is now a core module with 100% unit test coverage thanks to Ben Ford. See its documentation at http://eventlet.net/doc/modules/websocket.html +* websocket module is now part of core Eventlet with 100% unit test coverage thanks to Ben Ford. See its documentation at http://eventlet.net/doc/modules/websocket.html * Added wrap_ssl convenience method, meaning that we truly no longer need api or util modules. * Multiple-reader detection code protects against the common mistake of having multiple greenthreads read from the same socket at the same time, which can be overridden if you know what you're doing. * Cleaner monkey_patch API: the "all" keyword is no longer necessary. From 1917c87bd19f1e104d19706b44ecfd63c3c4b620 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 17:50:08 -0700 Subject: [PATCH 19/54] Moving on to dev version number. --- eventlet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 4b0e868..d2c203f 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 8) +version_info = (0, 9, 9, 'dev1') __version__ = ".".join(map(str, version_info)) try: From f1cc6364dda4c089fe27a3a3fb11f93febb3a124 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 25 May 2010 18:34:20 -0700 Subject: [PATCH 20/54] Importing twisted patch from favoyang. Fixes #55. --- AUTHORS | 3 ++- eventlet/twistedutil/join_reactor.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/AUTHORS b/AUTHORS index da0538a..f9e8817 100644 --- a/AUTHORS +++ b/AUTHORS @@ -50,4 +50,5 @@ Thanks To * Slant, better iterator implementation in tpool * Ambroff, nice pygtk hub example * Michael Carter, and Marcin Bachry, nice repro of a bug and good diagnosis leading to the fix -* David Ziegler, reporting issue #53 \ No newline at end of file +* David Ziegler, reporting issue #53 +* Favo Yang, twisted hub patch \ No newline at end of file diff --git a/eventlet/twistedutil/join_reactor.py b/eventlet/twistedutil/join_reactor.py index 4e9ccc2..5964cbf 100644 --- a/eventlet/twistedutil/join_reactor.py +++ b/eventlet/twistedutil/join_reactor.py @@ -4,9 +4,8 @@ You generally don't have to use it unless you need to call reactor.run() yourself. """ from eventlet.hubs.twistedr import BaseTwistedHub -from eventlet import use_hub from eventlet.support import greenlets as greenlet -from eventlet.hubs import _threadlocal +from eventlet.hubs import _threadlocal, use_hub use_hub(BaseTwistedHub) assert not hasattr(_threadlocal, 'hub') From 15629d15d816319ff4ed1badc4c3aa0dd0ce5991 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 27 May 2010 10:49:48 -0700 Subject: [PATCH 21/54] Backed out changeset 4e67bcf6e46b It busted up installations that didn't have psycopg 2.2 --- eventlet/support/psycopg2_patcher.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/eventlet/support/psycopg2_patcher.py b/eventlet/support/psycopg2_patcher.py index 29af65d..7d9c1d5 100644 --- a/eventlet/support/psycopg2_patcher.py +++ b/eventlet/support/psycopg2_patcher.py @@ -38,20 +38,15 @@ def make_psycopg_green(): extensions.set_wait_callback(eventlet_wait_callback) -def eventlet_wait_callback(conn, timeout=-1, - # access these objects with LOAD_FAST instead of LOAD_GLOBAL lookup - POLL_OK=extensions.POLL_OK, - POLL_READ=extensions.POLL_READ, - POLL_WRITE=extensions.POLL_WRITE, - trampoline=trampoline): +def eventlet_wait_callback(conn, timeout=-1): """A wait callback useful to allow eventlet to work with Psycopg.""" while 1: state = conn.poll() - if state == POLL_OK: + if state == extensions.POLL_OK: break - elif state == POLL_READ: + elif state == extensions.POLL_READ: trampoline(conn.fileno(), read=True) - elif state == POLL_WRITE: + elif state == extensions.POLL_WRITE: trampoline(conn.fileno(), write=True) else: raise psycopg2.OperationalError( From 12fc3a40ff26f1c1b37689a50bbd171992605346 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 27 May 2010 23:04:10 -0700 Subject: [PATCH 22/54] Chunked readline fixes by schmir, ported from gevent by redbo. --- eventlet/wsgi.py | 75 ++++++++++++++++---------- tests/wsgi_test.py | 131 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 174 insertions(+), 32 deletions(-) diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index e490e1a..4b6a93d 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -90,39 +90,54 @@ class Input(object): self.position += len(read) return read - def _chunked_read(self, rfile, length=None): + def _chunked_read(self, rfile, length=None, use_readline=False): if self.wfile is not None: ## 100 Continue self.wfile.write(self.wfile_line) self.wfile = None self.wfile_line = None - - response = [] try: - if length is None: - if self.chunk_length > self.position: - response.append(rfile.read(self.chunk_length - self.position)) - while self.chunk_length != 0: - self.chunk_length = int(rfile.readline(), 16) - response.append(rfile.read(self.chunk_length)) - rfile.readline() + if length == 0: + return "" + + if length < 0: + length = None + + if use_readline: + reader = self.rfile.readline else: - while length > 0 and self.chunk_length != 0: - if self.chunk_length > self.position: - response.append(rfile.read( - min(self.chunk_length - self.position, length))) - last_read = len(response[-1]) - if last_read == 0: + reader = self.rfile.read + + response = [] + while self.chunk_length != 0: + maxreadlen = self.chunk_length - self.position + if length is not None and length < maxreadlen: + maxreadlen = length + + if maxreadlen > 0: + data = reader(maxreadlen) + if not data: + self.chunk_length = 0 + raise IOError("unexpected end of file while parsing chunked data") + + datalen = len(data) + response.append(data) + + self.position += datalen + if self.chunk_length == self.position: + rfile.readline() + + if length is not None: + length -= datalen + if length == 0: break - length -= last_read - self.position += last_read - if self.chunk_length == self.position: - rfile.readline() - else: - self.chunk_length = int(rfile.readline(), 16) - self.position = 0 - if not self.chunk_length: - rfile.readline() + if use_readline and data[-1] == "\n": + break + else: + self.chunk_length = int(rfile.readline().split(";", 1)[0], 16) + self.position = 0 + if self.chunk_length == 0: + rfile.readline() except greenio.SSL.ZeroReturnError: pass return ''.join(response) @@ -133,7 +148,10 @@ class Input(object): return self._do_read(self.rfile.read, length) def readline(self, size=None): - return self._do_read(self.rfile.readline) + if self.chunked_input: + return self._chunked_read(self.rfile, size, True) + else: + return self._do_read(self.rfile.readline, size) def readlines(self, hint=None): return self._do_read(self.rfile.readlines, hint) @@ -348,8 +366,9 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): finally: if hasattr(result, 'close'): result.close() - if (self.environ['eventlet.input'].position - < self.environ.get('CONTENT_LENGTH', 0)): + if (self.environ['eventlet.input'].chunked_input or + self.environ['eventlet.input'].position \ + < self.environ['eventlet.input'].content_length): ## Read and discard body if there was no pending 100-continue if not self.environ['eventlet.input'].wfile: while self.environ['eventlet.input'].read(MINIMUM_CHUNK_SIZE): diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index 2f5c05c..e9281ec 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -145,7 +145,6 @@ def read_http(sock): if CONTENT_LENGTH in headers: num = int(headers[CONTENT_LENGTH]) body = fd.read(num) - #print body else: # read until EOF body = fd.read() @@ -845,8 +844,13 @@ class TestHttpd(_TestBase): def test_aborted_chunked_post(self): read_content = event.Event() + blew_up = [False] def chunk_reader(env, start_response): - content = env['wsgi.input'].read(1024) + try: + content = env['wsgi.input'].read(1024) + except IOError: + blew_up[0] = True + content = 'ok' read_content.send(content) start_response('200 OK', [('Content-Type', 'text/plain')]) return [content] @@ -863,7 +867,8 @@ class TestHttpd(_TestBase): sock.close() # the test passes if we successfully get here, and read all the data # in spite of the early close - self.assertEqual(read_content.wait(), expected_body) + self.assertEqual(read_content.wait(), 'ok') + self.assert_(blew_up[0]) def read_headers(sock): fd = sock.makefile() @@ -908,7 +913,6 @@ class IterableAlreadyHandledTest(_TestBase): fd.flush() response_line, headers = read_headers(sock) - print headers self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n') self.assert_('connection' not in headers) fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') @@ -918,6 +922,125 @@ class IterableAlreadyHandledTest(_TestBase): self.assertEqual(headers.get('transfer-encoding'), 'chunked') self.assertEqual(body, '0\r\n\r\n') # Still coming back chunked +class TestChunkedInput(_TestBase): + dirt = "" + validator = None + def application(self, env, start_response): + input = env['wsgi.input'] + response = [] + + pi = env["PATH_INFO"] + + if pi=="/short-read": + d=input.read(10) + response = [d] + elif pi=="/lines": + for x in input: + response.append(x) + elif pi=="/ping": + input.read() + response.append("pong") + else: + raise RuntimeError("bad path") + + start_response('200 OK', [('Content-Type', 'text/plain')]) + return response + + def connect(self): + return eventlet.connect(('localhost', self.port)) + + def set_site(self): + self.site = Site() + self.site.application = self.application + + def chunk_encode(self, chunks, dirt=None): + if dirt is None: + dirt = self.dirt + + b = "" + for c in chunks: + b += "%x%s\r\n%s\r\n" % (len(c), dirt, c) + return b + + def body(self, dirt=None): + return self.chunk_encode(["this", " is ", "chunked", "\nline", " 2", "\n", "line3", ""], dirt=dirt) + + def ping(self, fd): + fd.sendall("GET /ping HTTP/1.1\r\n\r\n") + self.assertEquals(read_http(fd)[-1], "pong") + + def test_short_read_with_content_length(self): + body = self.body() + req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:1000\r\n\r\n" + body + + fd = self.connect() + fd.sendall(req) + self.assertEquals(read_http(fd)[-1], "this is ch") + + self.ping(fd) + + def test_short_read_with_zero_content_length(self): + body = self.body() + req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:0\r\n\r\n" + body + fd = self.connect() + fd.sendall(req) + self.assertEquals(read_http(fd)[-1], "this is ch") + + self.ping(fd) + + def test_short_read(self): + body = self.body() + req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body + + fd = self.connect() + fd.sendall(req) + self.assertEquals(read_http(fd)[-1], "this is ch") + + self.ping(fd) + + def test_dirt(self): + body = self.body(dirt="; here is dirt\0bla") + req = "POST /ping HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body + + fd = self.connect() + fd.sendall(req) + self.assertEquals(read_http(fd)[-1], "pong") + + self.ping(fd) + + def test_chunked_readline(self): + body = self.body() + req = "POST /lines HTTP/1.1\r\nContent-Length: %s\r\ntransfer-encoding: Chunked\r\n\r\n%s" % (len(body), body) + + fd = self.connect() + fd.sendall(req) + self.assertEquals(read_http(fd)[-1], 'this is chunked\nline 2\nline3') + + def test_close_before_finished(self): + import signal + + got_signal = [] + def handler(*args): + got_signal.append(1) + raise KeyboardInterrupt() + + signal.signal(signal.SIGALRM, handler) + signal.alarm(1) + + try: + body = '4\r\nthi' + req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body + + fd = self.connect() + fd.sendall(req) + fd.close() + eventlet.sleep(0.0) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + assert not got_signal, "caught alarm signal. infinite loop detected." + if __name__ == '__main__': From 18a7ca47dab5a58fa100a3dcf1b673fc7003e057 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 27 May 2010 23:05:22 -0700 Subject: [PATCH 23/54] Credit to schmir, also sticking my own name in a few more places cuz I no longer work for Linden Lab. --- AUTHORS | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/AUTHORS b/AUTHORS index f9e8817..6b3829c 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,3 +1,7 @@ +Maintainer (i.e., Who To Hassle If You Find Bugs) +------------------------------------------------- +Ryan Williams, rdw on Freenode, breath@alum.mit.edu + Original Authors ---------------- * Bob Ippolito @@ -23,6 +27,7 @@ Contributors * Sergey Shepelev * Chuck Thier * Daniele Varrazzo +* Ryan Williams Linden Lab Contributors ----------------------- @@ -51,4 +56,5 @@ Thanks To * Ambroff, nice pygtk hub example * Michael Carter, and Marcin Bachry, nice repro of a bug and good diagnosis leading to the fix * David Ziegler, reporting issue #53 -* Favo Yang, twisted hub patch \ No newline at end of file +* Favo Yang, twisted hub patch +* Schmir, patch that fixes readline method with chunked encoding in wsgi.py \ No newline at end of file From f94cd88c84ec35f311892a0211423dbe76c8a8db Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 28 May 2010 01:02:35 -0700 Subject: [PATCH 24/54] News and branding for 0.9.9 --- NEWS | 6 ++++++ doc/real_index.html | 2 +- eventlet/__init__.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/NEWS b/NEWS index aa29a2d..02bc6d4 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,9 @@ +0.9.9 +===== +* A fix for monkeypatching on systems with psycopg version 2.0.14. +* Improved support for chunked transfers in wsgi, plus a bunch of tests from schmir (ported from gevent by redbo) +* A fix for the twisted hub from Favo Yang + 0.9.8 ===== * Support for psycopg2's asynchronous mode, from Daniele Varrazzo diff --git a/doc/real_index.html b/doc/real_index.html index ba1cd47..aa467c6 100644 --- a/doc/real_index.html +++ b/doc/real_index.html @@ -41,7 +41,7 @@ easy_install eventlet

Alternately, you can download the source tarball:

diff --git a/eventlet/__init__.py b/eventlet/__init__.py index d2c203f..6b87a32 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 9, 'dev1') +version_info = (0, 9, 9) __version__ = ".".join(map(str, version_info)) try: From 2aa135f9eb1f7d6479144c05e42c91928a68c8c1 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 2 Jun 2010 16:14:52 -0700 Subject: [PATCH 26/54] patcher.original no longer leaves the modules it imports in sys.modules to get clobbered later. --- eventlet/patcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index ad484f6..058aa3c 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -126,6 +126,8 @@ def original(modname): finally: if current_mod is not None: sys.modules[modname] = current_mod + else: + del sys.modules[modname] return _originals.get(modname) already_patched = {} From 6d0cab525142f5d00bb348c117a490ec6a998f11 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 2 Jun 2010 16:32:52 -0700 Subject: [PATCH 27/54] Filled out __all__ in debug.py --- eventlet/debug.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eventlet/debug.py b/eventlet/debug.py index 503d7bb..ef70fab 100644 --- a/eventlet/debug.py +++ b/eventlet/debug.py @@ -7,8 +7,9 @@ import linecache import re import inspect -__all__ = ['spew', 'unspew', 'format_hub_listeners', 'hub_listener_stacks', -'hub_exceptions', 'tpool_exceptions'] +__all__ = ['spew', 'unspew', 'format_hub_listeners', 'format_hub_timers', + 'hub_listener_stacks', 'hub_exceptions', 'tpool_exceptions', + 'hub_prevent_multiple_readers', 'hub_timer_stacks'] _token_splitter = re.compile('\W+') From f82309208291839ef97829af46571c694d72b9af Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 2 Jun 2010 16:34:18 -0700 Subject: [PATCH 28/54] Tpool should use original thread-specific Queue instead of possibly-patched one. --- eventlet/tpool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index 0db26fd..13a0035 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -16,13 +16,14 @@ import os import sys -from Queue import Empty, Queue - from eventlet import event from eventlet import greenio from eventlet import greenthread from eventlet import patcher threading = patcher.original('threading') +Queue_module = patcher.original('Queue') +Queue = Queue_module.Queue +Empty = Queue_module.Empty __all__ = ['execute', 'Proxy', 'killall'] From 68eb7a8d9f38700f85cba78406ff8c3f3a35d782 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 2 Jun 2010 17:40:58 -0700 Subject: [PATCH 29/54] Added _original_patch_function to fix up the case where tpool is trying to import an unpatched Queue, but we've already monkeypatched threading. Two tests added for that case, thanks to traceback0. Refactored original to store the original modules in sys.modules rather than a module global; it's easier to handle them that way. --- eventlet/green/thread.py | 4 +-- eventlet/green/threading.py | 2 +- eventlet/patcher.py | 55 +++++++++++++++++++++++++++---------- tests/patcher_test.py | 47 +++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 17 deletions(-) diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index 9c7ea5e..9e003f9 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -1,4 +1,4 @@ -"""implements standard module 'thread' with greenlets""" +"""Implements the standard thread module, using greenthreads.""" __thread = __import__('thread') from eventlet.support import greenlets as greenlet from eventlet import greenthread @@ -49,4 +49,4 @@ if hasattr(__thread, 'stack_size'): pass # not going to decrease stack_size, because otherwise other greenlets in this thread will suffer -from eventlet.corolocal import local as _local \ No newline at end of file +from eventlet.corolocal import local as _local diff --git a/eventlet/green/threading.py b/eventlet/green/threading.py index 659150e..0c0a03c 100644 --- a/eventlet/green/threading.py +++ b/eventlet/green/threading.py @@ -15,7 +15,7 @@ del patcher def _patch_main_thread(mod): # this is some gnarly patching for the threading module; # if threading is imported before we patch (it nearly always is), - # then the main thread will have the wrong key in therading._active, + # then the main thread will have the wrong key in threading._active, # so, we try and replace that key with the correct one here # this works best if there are no other threads besides the main one curthread = mod._active.pop(mod._get_ident(), None) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 058aa3c..7425cb2 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -113,22 +113,49 @@ def patch_function(func, *additional_modules): del sys.modules[name] return patched -_originals = {} -def original(modname): - mod = _originals.get(modname) - if mod is None: - # re-import the "pure" module and store it in the global _originals - # dict; be sure to restore whatever module had that name already - current_mod = sys.modules.pop(modname, None) +def _original_patch_function(func, *module_names): + """Kind of the opposite of patch_function; wraps a function such + that sys.modules is populated only with the unpatched versions of + the specified modules. Also a gross hack; tell your kids not to + import inside function bodies!""" + def patched(*args, **kw): + saved = {} + for name in module_names: + saved[name] = sys.modules.get(name, None) + sys.modules[name] = original(name) try: - real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) - _originals[modname] = real_mod + return func(*args, **kw) finally: - if current_mod is not None: - sys.modules[modname] = current_mod - else: - del sys.modules[modname] - return _originals.get(modname) + for name in module_names: + if saved[name] is not None: + sys.modules[name] = saved[name] + else: + del sys.modules[name] + return patched + + +def original(modname): + original_name = '__original_module_' + modname + if original_name in sys.modules: + return sys.modules.get(original_name) + + # re-import the "pure" module and store it in the global _originals + # dict; be sure to restore whatever module had that name already + current_mod = sys.modules.pop(modname, None) + try: + real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) + # hacky hack: Queue's constructor imports threading; therefore + # we wrap it with something that ensures it always gets the + # original threading + if modname == 'Queue': + real_mod.Queue.__init__ = _original_patch_function(real_mod.Queue.__init__, 'threading') + sys.modules[original_name] = real_mod + finally: + if current_mod is not None: + sys.modules[modname] = current_mod + else: + del sys.modules[modname] + return sys.modules[original_name] already_patched = {} def monkey_patch(**on): diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 6ee9731..9b77ab8 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -217,5 +217,52 @@ print "already_patched", ",".join(sorted(patcher.already_patched.keys())) "select=True)", 'select') + +test_monkey_patch_threading = """ +def test_monkey_patch_threading(): + tickcount = [0] + def tick(): + for i in xrange(1000): + tickcount[0] += 1 + eventlet.sleep() + + def do_sleep(): + tpool.execute(time.sleep, 0.5) + + eventlet.spawn(tick) + w1 = eventlet.spawn(do_sleep) + w1.wait() + print tickcount[0] + assert tickcount[0] > 900 +""" + +class Tpool(Patcher): + TEST_TIMEOUT=3 + + def test_unpatched_thread(self): + new_mod = """import eventlet +eventlet.monkey_patch(time=False, thread=False) +from eventlet import tpool +import time +""" + new_mod += test_monkey_patch_threading + new_mod += "\ntest_monkey_patch_threading()\n" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, lines) + + def test_patched_thread(self): + new_mod = """import eventlet +eventlet.monkey_patch(time=False, thread=True) +from eventlet import tpool +import time +""" + new_mod += test_monkey_patch_threading + new_mod += "\ntest_monkey_patch_threading()\n" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, lines) + + if __name__ == '__main__': main() From 88bf9d024053557e1d71740c4666726f802ec094 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 2 Jun 2010 17:54:27 -0700 Subject: [PATCH 30/54] Named tpool threads, fixed rare race condition where _reqq would be freed before the thread got to it, causing log spam. Consolidated all patcher/tpool tests in one place. --- eventlet/tpool.py | 7 +++++-- tests/patcher_test.py | 32 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index 13a0035..ed6cab9 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -68,7 +68,10 @@ SYS_EXCS = (KeyboardInterrupt, SystemExit) def tworker(): global _reqq, _rspq while(True): - msg = _reqq.get() + try: + msg = _reqq.get() + except AttributeError: + return # can't get anything off of a dud queue if msg is None: return (e,meth,args,kwargs) = msg @@ -249,7 +252,7 @@ def setup(): _reqq = Queue(maxsize=-1) _rspq = Queue(maxsize=-1) for i in range(0,_nthreads): - t = threading.Thread(target=tworker) + t = threading.Thread(target=tworker, name="tpool_thread_%s" % i) t.setDaemon(True) t.start() _threads.add(t) diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 9b77ab8..78bd29f 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -126,22 +126,6 @@ print "newmod" self.assertEqual(len(lines), 2, repr(output)) self.assert_(lines[0].startswith('newmod'), repr(output)) - def test_tpool(self): - new_mod = """ -import eventlet -from eventlet import patcher -patcher.monkey_patch() -from eventlet import tpool -print "newmod", tpool.execute(len, "hi") -print "newmod", tpool.execute(len, "hi2") -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 3, repr(output)) - self.assert_(lines[0].startswith('newmod'), repr(output)) - self.assert_('2' in lines[0], repr(output)) - self.assert_('3' in lines[1], repr(output)) - def test_typeerror(self): new_mod = """ @@ -239,6 +223,22 @@ def test_monkey_patch_threading(): class Tpool(Patcher): TEST_TIMEOUT=3 + def test_simple(self): + new_mod = """ +import eventlet +from eventlet import patcher +patcher.monkey_patch() +from eventlet import tpool +print "newmod", tpool.execute(len, "hi") +print "newmod", tpool.execute(len, "hi2") +""" + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 3, output) + self.assert_(lines[0].startswith('newmod'), repr(output)) + self.assert_('2' in lines[0], repr(output)) + self.assert_('3' in lines[1], repr(output)) + def test_unpatched_thread(self): new_mod = """import eventlet eventlet.monkey_patch(time=False, thread=False) From 0793b503ebe03ca82d1329d76d092329e9c29208 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 4 Jun 2010 13:33:14 -0700 Subject: [PATCH 31/54] Fixed main thread patching and moved the code into patcher.py. Implemented a sys.modules state saver that simplifies much of the code in patcher.py. Improved comments and docs in there too. --- eventlet/green/threading.py | 10 --- eventlet/patcher.py | 125 +++++++++++++++++++++++------------- 2 files changed, 79 insertions(+), 56 deletions(-) diff --git a/eventlet/green/threading.py b/eventlet/green/threading.py index 0c0a03c..7d61c58 100644 --- a/eventlet/green/threading.py +++ b/eventlet/green/threading.py @@ -11,13 +11,3 @@ patcher.inject('threading', ('time', time)) del patcher - -def _patch_main_thread(mod): - # this is some gnarly patching for the threading module; - # if threading is imported before we patch (it nearly always is), - # then the main thread will have the wrong key in threading._active, - # so, we try and replace that key with the correct one here - # this works best if there are no other threads besides the main one - curthread = mod._active.pop(mod._get_ident(), None) - if curthread: - mod._active[thread.get_ident()] = curthread diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 7425cb2..77d8e42 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -5,6 +5,33 @@ __all__ = ['inject', 'import_patched', 'monkey_patch', 'is_monkey_patched'] __exclude = set(('__builtins__', '__file__', '__name__')) +class SysModulesSaver(object): + """Class that captures some subset of the current state of + sys.modules. Pass in an iterator of module names to the + constructor.""" + def __init__(self, module_names=()): + self._saved = {} + self.save(*module_names) + + def save(self, *module_names): + """Saves the named modules to the object.""" + for modname in module_names: + self._saved[modname] = sys.modules.get(modname, None) + + def restore(self): + """Restores the modules that the saver knows about into + sys.modules. + """ + for modname, mod in self._saved.iteritems(): + if mod is not None: + sys.modules[modname] = mod + else: + try: + del sys.modules[modname] + except KeyError: + pass + + def inject(module_name, new_globals, *additional_modules): """Base method for "injecting" greened modules into an imported module. It imports the module specified in *module_name*, arranging things so @@ -34,16 +61,20 @@ def inject(module_name, new_globals, *additional_modules): _green_socket_modules() + _green_thread_modules() + _green_time_modules()) + + # after this we are gonna screw with sys.modules, so capture the + # state of all the modules we're going to mess with + saver = SysModulesSaver([name for name, m in additional_modules]) + saver.save(module_name) - ## Put the specified modules in sys.modules for the duration of the import - saved = {} + # Cover the target modules so that when you import the module it + # sees only the patched versions for name, mod in additional_modules: - saved[name] = sys.modules.get(name, None) sys.modules[name] = mod ## Remove the old module from sys.modules and reimport it while ## the specified modules are in place - old_module = sys.modules.pop(module_name, None) + sys.modules.pop(module_name, None) try: module = __import__(module_name, {}, {}, module_name.split('.')[:-1]) @@ -56,18 +87,7 @@ def inject(module_name, new_globals, *additional_modules): ## Keep a reference to the new module to prevent it from dying sys.modules[patched_name] = module finally: - ## Put the original module back - if old_module is not None: - sys.modules[module_name] = old_module - elif module_name in sys.modules: - del sys.modules[module_name] - - ## Put all the saved modules back - for name, mod in additional_modules: - if saved[name] is not None: - sys.modules[name] = saved[name] - else: - del sys.modules[name] + saver.restore() ## Put the original modules back return module @@ -86,8 +106,11 @@ def import_patched(module_name, *additional_modules, **kw_additional_modules): def patch_function(func, *additional_modules): - """Huge hack here -- patches the specified modules for the - duration of the function call.""" + """Decorator that returns a version of the function that patches + some modules for the duration of the function call. This is + deeply gross and should only be used for functions that import + network libraries within their function bodies that there is no + way of getting around.""" if not additional_modules: # supply some defaults additional_modules = ( @@ -98,50 +121,44 @@ def patch_function(func, *additional_modules): _green_time_modules()) def patched(*args, **kw): - saved = {} + saver = SysModulesSaver(additional_modules.keys()) for name, mod in additional_modules: - saved[name] = sys.modules.get(name, None) sys.modules[name] = mod try: return func(*args, **kw) finally: - ## Put all the saved modules back - for name, mod in additional_modules: - if saved[name] is not None: - sys.modules[name] = saved[name] - else: - del sys.modules[name] + saver.restore() return patched def _original_patch_function(func, *module_names): - """Kind of the opposite of patch_function; wraps a function such - that sys.modules is populated only with the unpatched versions of - the specified modules. Also a gross hack; tell your kids not to - import inside function bodies!""" + """Kind of the contrapositive of patch_function: decorates a + function such that when it's called, sys.modules is populated only + with the unpatched versions of the specified modules. Unlike + patch_function, only the names of the modules need be supplied, + and there are no defaults. This is a gross hack; tell your kids not + to import inside function bodies!""" def patched(*args, **kw): - saved = {} + saver = SysModulesSaver(module_names) for name in module_names: - saved[name] = sys.modules.get(name, None) sys.modules[name] = original(name) try: return func(*args, **kw) finally: - for name in module_names: - if saved[name] is not None: - sys.modules[name] = saved[name] - else: - del sys.modules[name] + saver.restore() return patched def original(modname): + """ This returns an unpatched version of a module; this is useful for + Eventlet itself (i.e. tpool).""" original_name = '__original_module_' + modname if original_name in sys.modules: return sys.modules.get(original_name) # re-import the "pure" module and store it in the global _originals # dict; be sure to restore whatever module had that name already - current_mod = sys.modules.pop(modname, None) + saver = SysModulesSaver((modname,)) + sys.modules.pop(modname, None) try: real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) # hacky hack: Queue's constructor imports threading; therefore @@ -149,12 +166,11 @@ def original(modname): # original threading if modname == 'Queue': real_mod.Queue.__init__ = _original_patch_function(real_mod.Queue.__init__, 'threading') + # save a reference to the unpatched module so it doesn't get lost sys.modules[original_name] = real_mod finally: - if current_mod is not None: - sys.modules[modname] = current_mod - else: - del sys.modules[modname] + saver.restore() + return sys.modules[original_name] already_patched = {} @@ -183,6 +199,7 @@ def monkey_patch(**on): on.setdefault(modname, default_on) modules_to_patch = [] + patched_thread = False if on['os'] and not already_patched.get('os'): modules_to_patch += _green_os_modules() already_patched['os'] = True @@ -193,10 +210,7 @@ def monkey_patch(**on): modules_to_patch += _green_socket_modules() already_patched['socket'] = True if on['thread'] and not already_patched.get('thread'): - # hacks ahead - threading = original('threading') - import eventlet.green.threading as greenthreading - greenthreading._patch_main_thread(threading) + patched_thread = True modules_to_patch += _green_thread_modules() already_patched['thread'] = True if on['time'] and not already_patched.get('time'): @@ -222,6 +236,25 @@ def monkey_patch(**on): if patched_attr is not None: setattr(orig_mod, attr_name, patched_attr) + # hacks ahead; this is necessary to prevent a KeyError on program exit + if patched_thread: + _patch_main_thread(sys.modules['threading']) + + +def _patch_main_thread(mod): + """This is some gnarly patching specific to the threading module; + threading will always be initialized prior to monkeypatching, and + its _active dict will have the wrong key (it uses the real thread + id but once it's patched it will use the greenlet ids); so what we + do is rekey the _active dict so that the main thread's entry uses + the greenthread key. Other threads' keys are ignored.""" + thread = original('thread') + curthread = mod._active.pop(thread.get_ident(), None) + if curthread: + import eventlet.green.thread + mod._active[eventlet.green.thread.get_ident()] = curthread + + def is_monkey_patched(module): """Returns True if the given module is monkeypatched currently, False if not. *module* can be either the module itself or its name. From 605936f15c447d436e33fb59913f128eb70a62fd Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 4 Jun 2010 14:15:10 -0700 Subject: [PATCH 32/54] Defending against one possible flaw in original() which isn't currently a problem but may become so. --- eventlet/patcher.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 77d8e42..f7fc87e 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -151,6 +151,10 @@ def _original_patch_function(func, *module_names): def original(modname): """ This returns an unpatched version of a module; this is useful for Eventlet itself (i.e. tpool).""" + # note that it's not necessary to temporarily install unpatched + # versions of all patchable modules during the import of the + # module; this is because none of them import each other, except + # for threading which imports thread original_name = '__original_module_' + modname if original_name in sys.modules: return sys.modules.get(original_name) @@ -159,6 +163,11 @@ def original(modname): # dict; be sure to restore whatever module had that name already saver = SysModulesSaver((modname,)) sys.modules.pop(modname, None) + # install original thread module if we're getting the original + # threading module + if modname == 'threading': + saver.save('thread') + sys.modules['thread'] = original('thread') try: real_mod = __import__(modname, {}, {}, modname.split('.')[:-1]) # hacky hack: Queue's constructor imports threading; therefore From eb5066a11e37ef54ea545a036be052c7c6fd6c75 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 7 Jun 2010 12:15:41 -0700 Subject: [PATCH 33/54] Added note about running tests under 2.4 --- doc/testing.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/testing.rst b/doc/testing.rst index 38a94db..596bc5a 100644 --- a/doc/testing.rst +++ b/doc/testing.rst @@ -23,6 +23,8 @@ That's it! The output from running nose is the same as unittest's output, if th Many tests are skipped based on environmental factors; for example, it makes no sense to test Twisted-specific functionality when Twisted is not installed. These are printed as S's during execution, and in the summary printed after the tests run it will tell you how many were skipped. +.. note:: If running Python version 2.4, use this command instead: ``python tests/nosewrapper.py``. There are several tests which make use of the `with` statement and therefore will cause nose grief when it tries to import them; nosewrapper.py excludes these tests so they are skipped. + Doctests -------- From d4034daa04e9677baff3dff1d6547358746223a1 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 7 Jun 2010 12:42:33 -0700 Subject: [PATCH 34/54] Skip tpool tests with pyevent hub. --- tests/patcher_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 78bd29f..3bc49db 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -4,7 +4,7 @@ import subprocess import sys import tempfile -from tests import LimitedTestCase, main +from tests import LimitedTestCase, main, skip_with_pyevent base_module_contents = """ import socket @@ -223,6 +223,7 @@ def test_monkey_patch_threading(): class Tpool(Patcher): TEST_TIMEOUT=3 + @skip_with_pyevent def test_simple(self): new_mod = """ import eventlet @@ -239,6 +240,7 @@ print "newmod", tpool.execute(len, "hi2") self.assert_('2' in lines[0], repr(output)) self.assert_('3' in lines[1], repr(output)) + @skip_with_pyevent def test_unpatched_thread(self): new_mod = """import eventlet eventlet.monkey_patch(time=False, thread=False) @@ -251,6 +253,7 @@ import time output, lines = self.launch_subprocess('newmod.py') self.assertEqual(len(lines), 2, lines) + @skip_with_pyevent def test_patched_thread(self): new_mod = """import eventlet eventlet.monkey_patch(time=False, thread=True) From 5cd7c650e3b2fe92c9388199e66bc7f85e66d544 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 7 Jun 2010 12:52:26 -0700 Subject: [PATCH 35/54] Fix argument parsing in patched function. --- eventlet/patcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index f7fc87e..0474580 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -121,8 +121,9 @@ def patch_function(func, *additional_modules): _green_time_modules()) def patched(*args, **kw): - saver = SysModulesSaver(additional_modules.keys()) + saver = SysModulesSaver() for name, mod in additional_modules: + saver.save(name) sys.modules[name] = mod try: return func(*args, **kw) From 355326dee2ebe8b67d827ab1bc95cf15bcf2e5ab Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Jun 2010 14:49:33 +0100 Subject: [PATCH 36/54] Implement opening-handshake support for newer websockets (draft-hixie-thewebsocketprotocol-76). --- eventlet/websocket.py | 75 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 5ef1d01..63c3fac 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -1,5 +1,12 @@ import collections import errno +import string +import struct + +try: + from hashlib import md5 +except ImportError: + from md5 import md5 import eventlet from eventlet import semaphore @@ -28,6 +35,7 @@ class WebSocketWSGI(object): """ def __init__(self, handler): self.handler = handler + self.protocol_version = None def __call__(self, environ, start_response): if not (environ.get('HTTP_CONNECTION') == 'Upgrade' and @@ -35,17 +43,54 @@ class WebSocketWSGI(object): # need to check a few more things here for true compliance start_response('400 Bad Request', [('Connection','close')]) return [] + + # See if they sent the new-format headers + if 'HTTP_SEC_WEBSOCKET_KEY1' in environ: + self.protocol_version = 76 + else: + self.protocol_version = 75 + # Get the underlying socket and wrap a WebSocket class around it sock = environ['eventlet.input'].get_socket() ws = WebSocket(sock, environ) - handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - "Upgrade: WebSocket\r\n" - "Connection: Upgrade\r\n" - "WebSocket-Origin: %s\r\n" - "WebSocket-Location: ws://%s%s\r\n\r\n" % ( - environ.get('HTTP_ORIGIN'), - environ.get('HTTP_HOST'), - environ.get('PATH_INFO'))) + + # If it's new-version, we need to work out our challenge response + if self.protocol_version == 76: + key1 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY1']) + key2 = self._extract_number(environ['HTTP_SEC_WEBSOCKET_KEY2']) + # There's no content-length header in the request, but it has 8 + # bytes of data. + environ['wsgi.input'].content_length = 8 + key3 = environ['wsgi.input'].read(8) + key = struct.pack(">II", key1, key2) + key3 + response = md5(key).digest() + + # Start building the response + if self.protocol_version == 75: + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "WebSocket-Origin: %s\r\n" + "WebSocket-Location: ws://%s%s\r\n\r\n" % ( + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_HOST'), + environ.get('PATH_INFO'))) + elif self.protocol_version == 76: + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Origin: %s\r\n" + "Sec-WebSocket-Protocol: %s\r\n" + "Sec-WebSocket-Location: ws://%s%s\r\n" + "\r\n%s"% ( + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL'), + environ.get('HTTP_HOST'), + environ.get('PATH_INFO'), + response)) + else: + raise ValueError("Unknown WebSocket protocol version.") + sock.sendall(handshake_reply) try: self.handler(ws) @@ -56,6 +101,20 @@ class WebSocketWSGI(object): # doesn't barf on the fact that we didn't call start_response return wsgi.ALREADY_HANDLED + def _extract_number(self, value): + """ + Utility function which, given a string like 'g98sd 5[]221@1', will + return 9852211. Used to parse the Sec-WebSocket-Key headers. + """ + out = "" + spaces = 0 + for char in value: + if char in string.digits: + out += char + elif char == " ": + spaces += 1 + return int(out) / spaces + class WebSocket(object): """A websocket object that handles the details of serialization/deserialization to the socket. From 62b16a9900fb0137b42a2d5d502d45e2aa8e41e3 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Jun 2010 15:03:29 +0100 Subject: [PATCH 37/54] Implement closing frame detection/sending for new WebSockets. --- eventlet/websocket.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 63c3fac..456e5ca 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -97,6 +97,8 @@ class WebSocketWSGI(object): except socket.error, e: if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: raise + # Make sure we send the closing frame + ws._send_closing_frame() # use this undocumented feature of eventlet.wsgi to ensure that it # doesn't barf on the fact that we didn't call start_response return wsgi.ALREADY_HANDLED @@ -134,17 +136,20 @@ class WebSocket(object): The full WSGI environment for this request. """ - def __init__(self, sock, environ): + def __init__(self, sock, environ, version=76): """ :param socket: The eventlet socket :type socket: :class:`eventlet.greenio.GreenSocket` :param environ: The wsgi environment + :param version: The WebSocket spec version to follow (default is 76) """ self.socket = sock self.origin = environ.get('HTTP_ORIGIN') self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL') self.path = environ.get('PATH_INFO') self.environ = environ + self.version = version + self.websocket_closed = False self._buf = "" self._msgs = collections.deque() self._sendlock = semaphore.Semaphore() @@ -173,12 +178,21 @@ class WebSocket(object): end_idx = 0 buf = self._buf while buf: - assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf - end_idx = buf.find("\xFF") - if end_idx == -1: #pragma NO COVER + frame_type = ord(buf[0]) + if frame_type == 0: + # Normal message. + end_idx = buf.find("\xFF") + if end_idx == -1: #pragma NO COVER + break + msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) + buf = buf[end_idx+1:] + elif frame_type == 255: + # Closing handshake. + assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf + self.websocket_closed = True break - msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) - buf = buf[end_idx+1:] + else: + raise ValueError("Don't understand how to parse this type of message: %r" % buf) self._buf = buf return msgs @@ -199,18 +213,25 @@ class WebSocket(object): """Waits for and deserializes messages. Returns a single message; the oldest not yet processed.""" while not self._msgs: - # no parsed messages, must mean buf needs more data + # no parsed messages, must mean buf needs more data (or it's closed) delta = self.socket.recv(8096) - if delta == '': + if delta == '' or self.websocket_closed: return None self._buf += delta msgs = self._parse_messages() self._msgs.extend(msgs) return self._msgs.popleft() + def _send_closing_frame(self): + """Sends the closing frame to the client, if required.""" + if self.version == 76 and not self.websocket_closed: + self.socket.sendall("\xff\x00") + self.websocket_closed = True + def close(self): """Forcibly close the websocket; generally it is preferable to return from the handler method.""" + self._send_closing_frame() self.socket.shutdown(True) self.socket.close() From 4c7a9f74e66fa9d6f8316b0ac882a46fc2ab4b92 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Jun 2010 15:20:13 +0100 Subject: [PATCH 38/54] Don't break old WebSocket closing --- eventlet/websocket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 456e5ca..95ef237 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -52,7 +52,7 @@ class WebSocketWSGI(object): # Get the underlying socket and wrap a WebSocket class around it sock = environ['eventlet.input'].get_socket() - ws = WebSocket(sock, environ) + ws = WebSocket(sock, environ, self.protocol_version) # If it's new-version, we need to work out our challenge response if self.protocol_version == 76: From 9ed0ea9fc5bf816ef369468e88a7c65d2a40f208 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Jun 2010 15:45:14 +0100 Subject: [PATCH 39/54] Tests for WebSocket-76, and renaming the old ones to *_75 --- eventlet/websocket.py | 19 +++- tests/websocket_test.py | 224 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 233 insertions(+), 10 deletions(-) diff --git a/eventlet/websocket.py b/eventlet/websocket.py index 95ef237..d6b6450 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -2,6 +2,7 @@ import collections import errno import string import struct +from socket import error as SocketError try: from hashlib import md5 @@ -47,6 +48,10 @@ class WebSocketWSGI(object): # See if they sent the new-format headers if 'HTTP_SEC_WEBSOCKET_KEY1' in environ: self.protocol_version = 76 + if 'HTTP_SEC_WEBSOCKET_KEY2' not in environ: + # That's bad. + start_response('400 Bad Request', [('Connection','close')]) + return [] else: self.protocol_version = 75 @@ -84,7 +89,7 @@ class WebSocketWSGI(object): "Sec-WebSocket-Location: ws://%s%s\r\n" "\r\n%s"% ( environ.get('HTTP_ORIGIN'), - environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL'), + environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL', 'default'), environ.get('HTTP_HOST'), environ.get('PATH_INFO'), response)) @@ -98,7 +103,7 @@ class WebSocketWSGI(object): if get_errno(e) not in ACCEPTABLE_CLIENT_ERRORS: raise # Make sure we send the closing frame - ws._send_closing_frame() + ws._send_closing_frame(True) # use this undocumented feature of eventlet.wsgi to ensure that it # doesn't barf on the fact that we didn't call start_response return wsgi.ALREADY_HANDLED @@ -222,10 +227,16 @@ class WebSocket(object): self._msgs.extend(msgs) return self._msgs.popleft() - def _send_closing_frame(self): + def _send_closing_frame(self, ignore_send_errors=False): """Sends the closing frame to the client, if required.""" if self.version == 76 and not self.websocket_closed: - self.socket.sendall("\xff\x00") + try: + self.socket.sendall("\xff\x00") + except SocketError: + # Sometimes, like when the remote side cuts off the connection, + # we don't care about this. + if not ignore_send_errors: + raise self.websocket_closed = True def close(self): diff --git a/tests/websocket_test.py b/tests/websocket_test.py index 3fff51d..19624d8 100644 --- a/tests/websocket_test.py +++ b/tests/websocket_test.py @@ -47,7 +47,7 @@ class TestWebSocket(_TestBase): raise self.assertRaises(urllib2.HTTPError, raiser) - def test_incomplete_headers(self): + def test_incomplete_headers_75(self): headers = dict(kv.split(': ') for kv in [ "Upgrade: WebSocket", # NOTE: intentionally no connection header @@ -63,7 +63,23 @@ class TestWebSocket(_TestBase): self.assertEqual(resp.getheader('connection'), 'close') self.assertEqual(resp.read(), '') - def test_correct_upgrade_request(self): + def test_incomplete_headers_76(self): + headers = dict(kv.split(': ') for kv in [ + "Upgrade: WebSocket", + # NOTE: intentionally no connection header + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + ]) + http = httplib.HTTPConnection('localhost', self.port) + http.request("GET", "/echo", headers=headers) + resp = http.getresponse() + + self.assertEqual(resp.status, 400) + self.assertEqual(resp.getheader('connection'), 'close') + self.assertEqual(resp.read(), '') + + def test_correct_upgrade_request_75(self): connect = [ "GET /echo HTTP/1.1", "Upgrade: WebSocket", @@ -85,7 +101,32 @@ class TestWebSocket(_TestBase): 'WebSocket-Origin: http://localhost:%s' % self.port, 'WebSocket-Location: ws://localhost:%s/echo\r\n\r\n' % self.port])) - def test_sending_messages_to_websocket(self): + def test_correct_upgrade_request_76(self): + connect = [ + "GET /echo HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + result = sock.recv(1024) + ## The server responds the correct Websocket handshake + self.assertEqual(result, + '\r\n'.join(['HTTP/1.1 101 Web Socket Protocol Handshake', + 'Upgrade: WebSocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Origin: http://localhost:%s' % self.port, + 'Sec-WebSocket-Protocol: ws', + 'Sec-WebSocket-Location: ws://localhost:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.port])) + + def test_sending_messages_to_websocket_75(self): connect = [ "GET /echo HTTP/1.1", "Upgrade: WebSocket", @@ -111,7 +152,35 @@ class TestWebSocket(_TestBase): sock.close() eventlet.sleep(0.01) - def test_getting_messages_from_websocket(self): + def test_sending_messages_to_websocket_76(self): + connect = [ + "GET /echo HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + first_resp = sock.recv(1024) + sock.sendall('\x00hello\xFF') + result = sock.recv(1024) + self.assertEqual(result, '\x00hello\xff') + sock.sendall('\x00start') + eventlet.sleep(0.001) + sock.sendall(' end\xff') + result = sock.recv(1024) + self.assertEqual(result, '\x00start end\xff') + sock.shutdown(socket.SHUT_RDWR) + sock.close() + eventlet.sleep(0.01) + + def test_getting_messages_from_websocket_75(self): connect = [ "GET /range HTTP/1.1", "Upgrade: WebSocket", @@ -134,7 +203,32 @@ class TestWebSocket(_TestBase): # Last item in msgs is an empty string self.assertEqual(msgs[:-1], ['msg %d' % i for i in range(10)]) - def test_breaking_the_connection(self): + def test_getting_messages_from_websocket_76(self): + connect = [ + "GET /range HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) + headers, result = resp.split('\r\n\r\n') + msgs = [result[16:].strip('\x00\xff')] + cnt = 10 + while cnt: + msgs.append(sock.recv(20).strip('\x00\xff')) + cnt -= 1 + # Last item in msgs is an empty string + self.assertEqual(msgs[:-1], ['msg %d' % i for i in range(10)]) + + def test_breaking_the_connection_75(self): error_detected = [False] done_with_request = event.Event() site = self.site @@ -165,7 +259,93 @@ class TestWebSocket(_TestBase): done_with_request.wait() self.assert_(not error_detected[0]) - def test_app_socket_errors(self): + def test_breaking_the_connection_76(self): + error_detected = [False] + done_with_request = event.Event() + site = self.site + def error_detector(environ, start_response): + try: + try: + return site(environ, start_response) + except: + error_detected[0] = True + raise + finally: + done_with_request.send(True) + self.site = error_detector + self.spawn_server() + connect = [ + "GET /range HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) # get the headers + sock.close() # close while the app is running + done_with_request.wait() + self.assert_(not error_detected[0]) + + def test_client_closing_connection_76(self): + error_detected = [False] + done_with_request = event.Event() + site = self.site + def error_detector(environ, start_response): + try: + try: + return site(environ, start_response) + except: + error_detected[0] = True + raise + finally: + done_with_request.send(True) + self.site = error_detector + self.spawn_server() + connect = [ + "GET /range HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) # get the headers + sock.sendall('\xff\x00') # "Close the connection" packet. + done_with_request.wait() + self.assert_(not error_detected[0]) + + def test_server_closing_connect_76(self): + connect = [ + "GET / HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) + headers, result = resp.split('\r\n\r\n') + # The remote server should have immediately closed the connection. + self.assertEqual(result[16:], '\xff\x00') + + def test_app_socket_errors_75(self): error_detected = [False] done_with_request = event.Event() site = self.site @@ -195,6 +375,38 @@ class TestWebSocket(_TestBase): done_with_request.wait() self.assert_(error_detected[0]) + def test_app_socket_errors_76(self): + error_detected = [False] + done_with_request = event.Event() + site = self.site + def error_detector(environ, start_response): + try: + try: + return site(environ, start_response) + except: + error_detected[0] = True + raise + finally: + done_with_request.send(True) + self.site = error_detector + self.spawn_server() + connect = [ + "GET /error HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) + done_with_request.wait() + self.assert_(error_detected[0]) + class TestWebSocketObject(LimitedTestCase): From a7d8ca4be7b258082178c3903bf883119aca084c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 20:24:24 -0700 Subject: [PATCH 40/54] Documented the environment variables (thereby fixing #15). Improved the implementation of tpooled dns in eventlet.green.socket. --- doc/Makefile | 2 +- doc/environment.rst | 26 +++++++++++++++++++ doc/index.rst | 1 + eventlet/green/socket.py | 48 +++++++++++++++++++---------------- tests/env_test.py | 55 ++++++++++++++++++++++++++++++++++++++++ tests/patcher_test.py | 8 +++--- 6 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 doc/environment.rst create mode 100644 tests/env_test.py diff --git a/doc/Makefile b/doc/Makefile index 076db3a..ae5567e 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -3,7 +3,7 @@ # You can set these variables from the command line. SPHINXOPTS = -SPHINXBUILD = PYTHONPATH=../:$(PYTHONPATH) sphinx-build +SPHINXBUILD = PYTHONPATH=../:$(PYTHONPATH) ~/p/bin/sphinx-build PAPER = # Internal variables. diff --git a/doc/environment.rst b/doc/environment.rst new file mode 100644 index 0000000..29387e6 --- /dev/null +++ b/doc/environment.rst @@ -0,0 +1,26 @@ +Environment Variables +====================== + +Eventlet's behavior can be controlled by a few environment variables. +These are only for the advanced user. + +EVENTLET_HUB + + Used to force Eventlet to use the specified hub instead of the + optimal one. See :ref:`understanding_hubs` for the list of + acceptable hubs and what they mean (note that picking a hub not on + the list will silently fail). Equivalent to calling + :meth:`eventlet.hubs.use_hub` at the beginning of the program. + +EVENTLET_THREADPOOL_SIZE + + The size of the threadpool in :mod:`~eventlet.tpool`. This is an + environment variable because tpool constructs its pool on first + use, so any control of the pool size needs to happen before then. + +EVENTLET_TPOOL_DNS + + If set to 'yes', uses :func:`eventlet.tpool.execute` to call + :func:`~socket.gethostbyname` and :func:`~socket.getaddrinfo`, + making them appear non-blocking. This environment variable is + ignored on OS X. diff --git a/doc/index.rst b/doc/index.rst index 581e94b..2682edc 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -31,6 +31,7 @@ Contents threading hubs testing + environment modules diff --git a/eventlet/green/socket.py b/eventlet/green/socket.py index 75fe442..866c5ea 100644 --- a/eventlet/green/socket.py +++ b/eventlet/green/socket.py @@ -12,8 +12,8 @@ os = __import__('os') import sys import warnings -__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'create_connection', - 'ssl', 'socket'] +__patched__ = ['fromfd', 'socketpair', 'gethostbyname', 'getaddrinfo', + 'create_connection', 'ssl', 'socket'] try: __original_fromfd__ = __socket.fromfd @@ -31,20 +31,11 @@ except AttributeError: pass __original_gethostbyname__ = __socket.gethostbyname -def gethostbyname(name): - can_use_tpool = os.environ.get("EVENTLET_TPOOL_GETHOSTBYNAME", - '').lower() == "yes" - if getattr(get_hub(), 'uses_twisted_reactor', None): - globals()['gethostbyname'] = _gethostbyname_twisted - elif sys.platform.startswith('darwin') or not can_use_tpool: - # the thread primitives on Darwin have some bugs that make - # it undesirable to use tpool for hostname lookups - globals()['gethostbyname'] = __original_gethostbyname__ - else: - globals()['gethostbyname'] = _gethostbyname_tpool - - return globals()['gethostbyname'](name) - +# the thread primitives on Darwin have some bugs that make +# it undesirable to use tpool for hostname lookups +_can_use_tpool = ( + os.environ.get("EVENTLET_TPOOL_DNS",'').lower() == "yes" + and not sys.platform.startswith('darwin')) def _gethostbyname_twisted(name): from twisted.internet import reactor from eventlet.twistedutil import block_on as _block_on @@ -55,12 +46,25 @@ def _gethostbyname_tpool(name): return tpool.execute( __original_gethostbyname__, name) -# def getaddrinfo(*args, **kw): -# return tpool.execute( -# __socket.getaddrinfo, *args, **kw) -# -# XXX there're few more blocking functions in socket -# XXX having a hub-independent way to access thread pool would be nice +if getattr(get_hub(), 'uses_twisted_reactor', None): + gethostbyname = _gethostbyname_twisted +elif _can_use_tpool: + gethostbyname = _gethostbyname_tpool +else: + gethostbyname = __original_gethostbyname__ + + +__original_getaddrinfo__ = __socket.getaddrinfo +def _getaddrinfo_tpool(*args, **kw): + from eventlet import tpool + return tpool.execute( + __original_getaddrinfo__, *args, **kw) + +if _can_use_tpool: + getaddrinfo = _getaddrinfo_tpool +else: + getaddrinfo = __original_getaddrinfo__ + def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT): """Connect to *address* and return the socket object. diff --git a/tests/env_test.py b/tests/env_test.py new file mode 100644 index 0000000..fdd0b2d --- /dev/null +++ b/tests/env_test.py @@ -0,0 +1,55 @@ +import os +from tests.patcher_test import ProcessBase + +class Socket(ProcessBase): + def test_patched_thread(self): + new_mod = """from eventlet.green import socket +socket.gethostbyname('localhost') +socket.getaddrinfo('localhost', 80) +""" + os.environ['EVENTLET_TPOOL_DNS'] = 'yes' + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 1, lines) + finally: + del os.environ['EVENTLET_TPOOL_DNS'] + + def test_tpool_size(self): + new_mod = """from eventlet import tpool +import eventlet +import time +current = [0] +highwater = [0] +def count(): + current[0] += 1 + time.sleep(0.01) + if current[0] > highwater[0]: + highwater[0] = current[0] + current[0] -= 1 +expected = 40 +p = eventlet.GreenPool() +for i in xrange(expected): + p.spawn(tpool.execute,count) +p.waitall() +assert highwater[0] == expected, "%s != %s" % (highwater[0], expected)""" + os.environ['EVENTLET_THREADPOOL_SIZE'] = "40" + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 1, lines) + finally: + del os.environ['EVENTLET_THREADPOOL_SIZE'] + + def test_eventlet_hub(self): + new_mod = """from eventlet import hubs +print hubs.get_hub() +""" + os.environ['EVENTLET_HUB'] = 'selects' + try: + self.write_to_tempfile("newmod", new_mod) + output, lines = self.launch_subprocess('newmod.py') + self.assertEqual(len(lines), 2, "\n".join(lines)) + self.assert_("selects" in lines[0]) + finally: + del os.environ['EVENTLET_HUB'] diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 3bc49db..500762d 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -27,7 +27,7 @@ import socket print "importing", patching, socket, patching.socket, patching.urllib """ -class Patcher(LimitedTestCase): +class ProcessBase(LimitedTestCase): TEST_TIMEOUT=3 # starting processes is time-consuming def setUp(self): self._saved_syspath = sys.path @@ -55,7 +55,7 @@ class Patcher(LimitedTestCase): return output, lines -class ImportPatched(Patcher): +class ImportPatched(ProcessBase): def test_patch_a_module(self): self.write_to_tempfile("base", base_module_contents) self.write_to_tempfile("patching", patching_module_contents) @@ -85,7 +85,7 @@ print "newmod", base, base.socket, base.urllib.socket.socket self.assert_('GreenSocket' in lines[1], repr(output)) -class MonkeyPatch(Patcher): +class MonkeyPatch(ProcessBase): def test_patched_modules(self): new_mod = """ from eventlet import patcher @@ -220,7 +220,7 @@ def test_monkey_patch_threading(): assert tickcount[0] > 900 """ -class Tpool(Patcher): +class Tpool(ProcessBase): TEST_TIMEOUT=3 @skip_with_pyevent From 6c8207a6b79c98500940aeaafc196739ad36de3e Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 20:31:59 -0700 Subject: [PATCH 41/54] Neglected to change this instance. --- tests/patcher_psycopg_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/patcher_psycopg_test.py b/tests/patcher_psycopg_test.py index 41ca0d7..863db43 100644 --- a/tests/patcher_psycopg_test.py +++ b/tests/patcher_psycopg_test.py @@ -34,7 +34,7 @@ assert count[0] > 100, count[0] print "done" """ -class PatchingPsycopg(patcher_test.Patcher): +class PatchingPsycopg(patcher_test.ProcessBase): def test_psycopg_pached(self): if 'PSYCOPG_TEST_DSN' not in os.environ: # construct a non-json dsn for the subprocess From a57bdba2d8ceda5dddfae7259403e041b63acfde Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 20:32:47 -0700 Subject: [PATCH 42/54] Credit --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 6b3829c..d41dc71 100644 --- a/AUTHORS +++ b/AUTHORS @@ -16,6 +16,7 @@ Contributors * Mike Barton * Patrick Carlisle * Ben Ford +* Andrew Godwin * Brantley Harris * Gregory Holt * Joe Malicki From 96a5b2dd57bb1209073c0bf95abeab2a95fde4e7 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 21:20:51 -0700 Subject: [PATCH 43/54] Race condition fix in patcher, added a bit more sleep to the env test so slow machines have a better chance of succeeding. --- eventlet/patcher.py | 2 ++ tests/env_test.py | 6 ++++-- tests/patcher_test.py | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 0474580..6cc6502 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -241,6 +241,8 @@ def monkey_patch(**on): for name, mod in modules_to_patch: orig_mod = sys.modules.get(name) + if orig_mod is None: + orig_mod = __import__(name) for attr_name in mod.__patched__: patched_attr = getattr(mod, attr_name, None) if patched_attr is not None: diff --git a/tests/env_test.py b/tests/env_test.py index fdd0b2d..d09ada9 100644 --- a/tests/env_test.py +++ b/tests/env_test.py @@ -14,7 +14,8 @@ socket.getaddrinfo('localhost', 80) self.assertEqual(len(lines), 1, lines) finally: del os.environ['EVENTLET_TPOOL_DNS'] - + +class Tpool(ProcessBase): def test_tpool_size(self): new_mod = """from eventlet import tpool import eventlet @@ -23,7 +24,7 @@ current = [0] highwater = [0] def count(): current[0] += 1 - time.sleep(0.01) + time.sleep(0.02) if current[0] > highwater[0]: highwater[0] = current[0] current[0] -= 1 @@ -41,6 +42,7 @@ assert highwater[0] == expected, "%s != %s" % (highwater[0], expected)""" finally: del os.environ['EVENTLET_THREADPOOL_SIZE'] +class Hub(ProcessBase): def test_eventlet_hub(self): new_mod = """from eventlet import hubs print hubs.get_hub() diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 500762d..90c7903 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -264,7 +264,7 @@ import time new_mod += "\ntest_monkey_patch_threading()\n" self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 2, lines) + self.assertEqual(len(lines), 2, "\n".join(lines)) if __name__ == '__main__': From a5519afba4a7b2c928ea87e49de971a074e66899 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 21:34:22 -0700 Subject: [PATCH 44/54] Skip skip skip pyevent. --- tests/env_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/env_test.py b/tests/env_test.py index d09ada9..f5587eb 100644 --- a/tests/env_test.py +++ b/tests/env_test.py @@ -1,5 +1,6 @@ import os from tests.patcher_test import ProcessBase +from tests import skip_with_pyevent class Socket(ProcessBase): def test_patched_thread(self): @@ -15,7 +16,8 @@ socket.getaddrinfo('localhost', 80) finally: del os.environ['EVENTLET_TPOOL_DNS'] -class Tpool(ProcessBase): +class Tpool(ProcessBase): + @skip_with_pyevent def test_tpool_size(self): new_mod = """from eventlet import tpool import eventlet From debcde8a96d25b20d31df962df4c3ff440a16778 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 10 Jun 2010 21:55:35 -0700 Subject: [PATCH 45/54] Your machine is so slow, your mom could count up to 0.04 before it could. --- tests/env_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/env_test.py b/tests/env_test.py index f5587eb..a24cde0 100644 --- a/tests/env_test.py +++ b/tests/env_test.py @@ -26,7 +26,7 @@ current = [0] highwater = [0] def count(): current[0] += 1 - time.sleep(0.02) + time.sleep(0.04) if current[0] > highwater[0]: highwater[0] = current[0] current[0] -= 1 From de6f8a778c84145a2e9132f0d31f7717c6326eeb Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Fri, 11 Jun 2010 09:14:41 +0100 Subject: [PATCH 46/54] Improve test coverage for websocket up to 100%, and fix a bug that doing so caught. We love you, coverage. --- eventlet/websocket.py | 15 +++++++----- tests/websocket_test.py | 54 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/eventlet/websocket.py b/eventlet/websocket.py index d6b6450..fd35b75 100644 --- a/eventlet/websocket.py +++ b/eventlet/websocket.py @@ -6,7 +6,7 @@ from socket import error as SocketError try: from hashlib import md5 -except ImportError: +except ImportError: #pragma NO COVER from md5 import md5 import eventlet @@ -93,8 +93,8 @@ class WebSocketWSGI(object): environ.get('HTTP_HOST'), environ.get('PATH_INFO'), response)) - else: - raise ValueError("Unknown WebSocket protocol version.") + else: #pragma NO COVER + raise ValueError("Unknown WebSocket protocol version.") sock.sendall(handshake_reply) try: @@ -218,9 +218,12 @@ class WebSocket(object): """Waits for and deserializes messages. Returns a single message; the oldest not yet processed.""" while not self._msgs: - # no parsed messages, must mean buf needs more data (or it's closed) + # Websocket might be closed already. + if self.websocket_closed: + return None + # no parsed messages, must mean buf needs more data delta = self.socket.recv(8096) - if delta == '' or self.websocket_closed: + if delta == '': return None self._buf += delta msgs = self._parse_messages() @@ -235,7 +238,7 @@ class WebSocket(object): except SocketError: # Sometimes, like when the remote side cuts off the connection, # we don't care about this. - if not ignore_send_errors: + if not ignore_send_errors: #pragma NO COVER raise self.websocket_closed = True diff --git a/tests/websocket_test.py b/tests/websocket_test.py index 19624d8..2e0fa25 100644 --- a/tests/websocket_test.py +++ b/tests/websocket_test.py @@ -64,6 +64,7 @@ class TestWebSocket(_TestBase): self.assertEqual(resp.read(), '') def test_incomplete_headers_76(self): + # First test: Missing Connection: headers = dict(kv.split(': ') for kv in [ "Upgrade: WebSocket", # NOTE: intentionally no connection header @@ -78,6 +79,24 @@ class TestWebSocket(_TestBase): self.assertEqual(resp.status, 400) self.assertEqual(resp.getheader('connection'), 'close') self.assertEqual(resp.read(), '') + + # Now, miss off key2 + headers = dict(kv.split(': ') for kv in [ + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + # NOTE: Intentionally no Key2 header + ]) + http = httplib.HTTPConnection('localhost', self.port) + http.request("GET", "/echo", headers=headers) + resp = http.getresponse() + + self.assertEqual(resp.status, 400) + self.assertEqual(resp.getheader('connection'), 'close') + self.assertEqual(resp.read(), '') def test_correct_upgrade_request_75(self): connect = [ @@ -308,7 +327,7 @@ class TestWebSocket(_TestBase): self.site = error_detector self.spawn_server() connect = [ - "GET /range HTTP/1.1", + "GET /echo HTTP/1.1", "Upgrade: WebSocket", "Connection: Upgrade", "Host: localhost:%s" % self.port, @@ -325,6 +344,39 @@ class TestWebSocket(_TestBase): done_with_request.wait() self.assert_(not error_detected[0]) + def test_client_invalid_packet_76(self): + error_detected = [False] + done_with_request = event.Event() + site = self.site + def error_detector(environ, start_response): + try: + try: + return site(environ, start_response) + except: + error_detected[0] = True + raise + finally: + done_with_request.send(True) + self.site = error_detector + self.spawn_server() + connect = [ + "GET /echo HTTP/1.1", + "Upgrade: WebSocket", + "Connection: Upgrade", + "Host: localhost:%s" % self.port, + "Origin: http://localhost:%s" % self.port, + "Sec-WebSocket-Protocol: ws", + "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5", + "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00", + ] + sock = eventlet.connect( + ('localhost', self.port)) + sock.sendall('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U') + resp = sock.recv(1024) # get the headers + sock.sendall('\xef\x00') # Weird packet. + done_with_request.wait() + self.assert_(error_detected[0]) + def test_server_closing_connect_76(self): connect = [ "GET / HTTP/1.1", From d1ad7f9fc26b81d1356e1f71a9b3964dd092758b Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 11 Jun 2010 08:19:55 -0700 Subject: [PATCH 47/54] The pernicious influence of virtualenv. --- doc/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/Makefile b/doc/Makefile index ae5567e..076db3a 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -3,7 +3,7 @@ # You can set these variables from the command line. SPHINXOPTS = -SPHINXBUILD = PYTHONPATH=../:$(PYTHONPATH) ~/p/bin/sphinx-build +SPHINXBUILD = PYTHONPATH=../:$(PYTHONPATH) sphinx-build PAPER = # Internal variables. From 392682b8fa2fd2bdd9c5bd5b6ec84cae5ef5526f Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Fri, 11 Jun 2010 09:10:44 -0700 Subject: [PATCH 48/54] Kill the tpool threads so that they don't whine on interpreter exit. --- tests/patcher_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 90c7903..f409e86 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -218,6 +218,7 @@ def test_monkey_patch_threading(): w1.wait() print tickcount[0] assert tickcount[0] > 900 + tpool.killall() """ class Tpool(ProcessBase): @@ -232,6 +233,7 @@ patcher.monkey_patch() from eventlet import tpool print "newmod", tpool.execute(len, "hi") print "newmod", tpool.execute(len, "hi2") +tpool.killall() """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') From bb07aed654bbbb159c2eaa89653922432ec0c662 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 14 Jun 2010 09:35:59 -0700 Subject: [PATCH 49/54] Bumping version number --- eventlet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/__init__.py b/eventlet/__init__.py index 6b87a32..263e8d8 100644 --- a/eventlet/__init__.py +++ b/eventlet/__init__.py @@ -1,4 +1,4 @@ -version_info = (0, 9, 9) +version_info = (0, 9, 10, "dev1") __version__ = ".".join(map(str, version_info)) try: From bf3944b0e19185cc38ab76e14da4bb89868c82be Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Sat, 19 Jun 2010 18:42:30 -0700 Subject: [PATCH 50/54] Tweaked the implementation of Timeout so that passing True as the exception class is the same as passing None, because it seemed inconsistent that one boolean value would have special nice behavior and the other would cause weird exceptions. --- eventlet/timeout.py | 4 ++-- examples/producer_consumer.py | 6 +++--- tests/timeout_test_with_statement.py | 24 ++++++++++++++++++++++-- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/eventlet/timeout.py b/eventlet/timeout.py index 135545b..573f4ab 100644 --- a/eventlet/timeout.py +++ b/eventlet/timeout.py @@ -57,7 +57,7 @@ class Timeout(BaseException): '%r is already started; to restart it, cancel it first' % self if self.seconds is None: # "fake" timeout (never expires) self.timer = None - elif self.exception is None or self.exception is False: # timeout that raises self + elif self.exception is None or isinstance(self.exception, bool): # timeout that raises self self.timer = get_hub().schedule_call_global( self.seconds, greenlet.getcurrent().throw, self) else: # regular timeout with user-provided exception @@ -112,7 +112,7 @@ class Timeout(BaseException): suffix = '' else: suffix = 's' - if self.exception is None: + if self.exception is None or self.exception is True: return '%s second%s' % (self.seconds, suffix) elif self.exception is False: return '%s second%s (silent)' % (self.seconds, suffix) diff --git a/examples/producer_consumer.py b/examples/producer_consumer.py index b335f7d..6f6c82f 100644 --- a/examples/producer_consumer.py +++ b/examples/producer_consumer.py @@ -2,7 +2,7 @@ it doesn't respect robots.txt and it is pretty brutal about how quickly it fetches pages. -This is a kind of "producer/consumer" example; the producer function produces +This is a kind of "producer/consumer" example; the fetch function produces jobs, and the GreenPool itself is the consumer, farming out work concurrently. It's easier to write it this way rather than writing a standard consumer loop; GreenPool handles any exceptions raised and arranges so that there's a set @@ -43,10 +43,10 @@ def producer(start_url): # limit requests to eventlet.net so we don't crash all over the internet if url not in seen and 'eventlet.net' in url: seen.add(url) - pool.spawn(fetch, url, q) + pool.spawn_n(fetch, url, q) return seen seen = producer("http://eventlet.net") print "I saw these urls:" -print "\n".join(seen) \ No newline at end of file +print "\n".join(seen) diff --git a/tests/timeout_test_with_statement.py b/tests/timeout_test_with_statement.py index 48fe0b5..21779ce 100644 --- a/tests/timeout_test_with_statement.py +++ b/tests/timeout_test_with_statement.py @@ -15,7 +15,7 @@ class Error(Exception): pass class Test(LimitedTestCase): - def test_api(self): + def test_cancellation(self): # Nothing happens if with-block finishes before the timeout expires t = Timeout(DELAY*2) sleep(0) # make it pending @@ -27,6 +27,7 @@ class Test(LimitedTestCase): assert not t.pending, repr(t) sleep(DELAY*2) + def test_raising_self(self): # An exception will be raised if it's not try: with Timeout(DELAY) as t: @@ -36,6 +37,17 @@ class Test(LimitedTestCase): else: raise AssertionError('must raise Timeout') + def test_raising_self_true(self): + # specifying True as the exception raises self as well + try: + with Timeout(DELAY, True) as t: + sleep(DELAY*2) + except Timeout, ex: + assert ex is t, (ex, t) + else: + raise AssertionError('must raise Timeout') + + def test_raising_custom_exception(self): # You can customize the exception raised: try: with Timeout(DELAY, IOError("Operation takes way too long")): @@ -43,6 +55,7 @@ class Test(LimitedTestCase): except IOError, ex: assert str(ex)=="Operation takes way too long", repr(ex) + def test_raising_exception_class(self): # Providing classes instead of values should be possible too: try: with Timeout(DELAY, ValueError): @@ -50,6 +63,7 @@ class Test(LimitedTestCase): except ValueError: pass + def test_raising_exc_tuple(self): try: 1//0 except: @@ -63,12 +77,16 @@ class Test(LimitedTestCase): else: raise AssertionError('should not get there') + def test_cancel_timer_inside_block(self): # It's possible to cancel the timer inside the block: with Timeout(DELAY) as timer: timer.cancel() sleep(DELAY*2) - # To silent the exception before exiting the block, pass False as second parameter. + + def test_silent_block(self): + # To silence the exception before exiting the block, pass + # False as second parameter. XDELAY=0.1 start = time.time() with Timeout(XDELAY, False): @@ -76,6 +94,8 @@ class Test(LimitedTestCase): delta = (time.time()-start) assert delta Date: Mon, 21 Jun 2010 17:01:13 -0700 Subject: [PATCH 51/54] tpool.Proxy now wraps functions. Improved impolementations of equality and iteration in tpool as well. --- eventlet/tpool.py | 12 +++++++++--- tests/tpool_test.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/eventlet/tpool.py b/eventlet/tpool.py index ed6cab9..551cd4f 100644 --- a/eventlet/tpool.py +++ b/eventlet/tpool.py @@ -196,11 +196,16 @@ class Proxy(object): return proxy_call(self._autowrap, self._obj.__deepcopy__, memo) def __copy__(self, memo=None): return proxy_call(self._autowrap, self._obj.__copy__, memo) + def __call__(self, *a, **kw): + if '__call__' in self._autowrap_names: + return Proxy(proxy_call(self._autowrap, self._obj, *a, **kw)) + else: + return proxy_call(self._autowrap, self._obj, *a, **kw) # these don't go through a proxy call, because they're likely to # be called often, and are unlikely to be implemented on the # wrapped object in such a way that they would block def __eq__(self, rhs): - return self._obj.__eq__(rhs) + return self._obj == rhs def __hash__(self): return self._obj.__hash__() def __repr__(self): @@ -212,10 +217,11 @@ class Proxy(object): def __nonzero__(self): return bool(self._obj) def __iter__(self): - if iter(self._obj) == self._obj: + it = iter(self._obj) + if it == self._obj: return self else: - return Proxy(iter(self._obj)) + return Proxy(it) def next(self): return proxy_call(self._autowrap, self._obj.next) diff --git a/tests/tpool_test.py b/tests/tpool_test.py index 93dee41..9635e6d 100644 --- a/tests/tpool_test.py +++ b/tests/tpool_test.py @@ -95,6 +95,11 @@ class TestTpool(LimitedTestCase): exp3 = prox.compile('/') self.assert_(exp1 != exp3) + @skip_with_pyevent + def test_wrap_ints(self): + p = tpool.Proxy(4) + self.assert_(p == 4) + @skip_with_pyevent def test_wrap_hash(self): prox1 = tpool.Proxy(''+'A') @@ -243,6 +248,28 @@ class TestTpool(LimitedTestCase): # violating the abstraction to check that we didn't double-wrap self.assert_(not isinstance(x._obj, tpool.Proxy)) + @skip_with_pyevent + def test_callable(self): + def wrapped(arg): + return arg + x = tpool.Proxy(wrapped) + self.assertEquals(4, x(4)) + # verify that it wraps return values if specified + x = tpool.Proxy(wrapped, autowrap_names=('__call__',)) + self.assert_(isinstance(x(4), tpool.Proxy)) + self.assertEquals("4", str(x(4))) + + @skip_with_pyevent + def test_callable_iterator(self): + def wrapped(arg): + yield arg + yield arg + yield arg + + x = tpool.Proxy(wrapped, autowrap_names=('__call__',)) + for r in x(3): + self.assertEquals(3, r) + class TpoolLongTests(LimitedTestCase): TEST_TIMEOUT=60 @skip_with_pyevent @@ -294,5 +321,6 @@ from eventlet.tpool import execute iterations, tpool_overhead, best_normal, best_tpool) tpool.killall() + if __name__ == '__main__': main() From 38b96d167dbf4c78ab95dacfb5c0bc2dee9f8692 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 21 Jun 2010 17:22:32 -0700 Subject: [PATCH 52/54] Upgrading wsgi to also ignore tpooled AlreadyHandled results (because isinstance delibarately doesn't work on Proxy objects). --- eventlet/wsgi.py | 3 ++- tests/wsgi_test.py | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index 4b6a93d..36ceee7 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -333,7 +333,8 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): try: try: result = self.application(self.environ, start_response) - if isinstance(result, _AlreadyHandled): + if (isinstance(result, _AlreadyHandled) + or isinstance(getattr(result, '_obj', None), _AlreadyHandled)): self.close_connection = 1 return if not headers_sent and hasattr(result, '__len__') and \ diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index e9281ec..8de1bf5 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -93,13 +93,13 @@ class IterableApp(object): return self.return_val class IterableSite(Site): - def __call__(self, env, start_response): - iter = self.application(env, start_response) - for i in iter: + it = self.application(env, start_response) + for i in it: yield i + CONTENT_LENGTH = 'content-length' @@ -899,12 +899,14 @@ def read_headers(sock): return response_line, headers class IterableAlreadyHandledTest(_TestBase): - def set_site(self): self.site = IterableSite() + def get_app(self): + return IterableApp(True) + def test_iterable_app_keeps_socket_open_unless_connection_close_sent(self): - self.site.application = IterableApp(True) + self.site.application = self.get_app() sock = eventlet.connect( ('localhost', self.port)) @@ -922,6 +924,13 @@ class IterableAlreadyHandledTest(_TestBase): self.assertEqual(headers.get('transfer-encoding'), 'chunked') self.assertEqual(body, '0\r\n\r\n') # Still coming back chunked +class ProxiedIterableAlreadyHandledTest(IterableAlreadyHandledTest): + # same thing as the previous test but ensuring that it works with tpooled + # results as well as regular ones + def get_app(self): + from eventlet import tpool + return tpool.Proxy(super(ProxiedIterableAlreadyHandledTest, self).get_app()) + class TestChunkedInput(_TestBase): dirt = "" validator = None From 0512896657acdf08a9854053f713d9194018057a Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 21 Jun 2010 21:03:40 -0700 Subject: [PATCH 53/54] Skip with pyevent. --- tests/wsgi_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index 8de1bf5..beb066e 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -5,7 +5,7 @@ import errno import os import socket import sys -from tests import skipped, LimitedTestCase +from tests import skipped, LimitedTestCase, skip_with_pyevent from unittest import main from eventlet import greenio @@ -927,6 +927,7 @@ class IterableAlreadyHandledTest(_TestBase): class ProxiedIterableAlreadyHandledTest(IterableAlreadyHandledTest): # same thing as the previous test but ensuring that it works with tpooled # results as well as regular ones + @skip_with_pyevent def get_app(self): from eventlet import tpool return tpool.Proxy(super(ProxiedIterableAlreadyHandledTest, self).get_app()) From 7befa471d5ad5d8d7985a2489723848e7e35898d Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 21 Jun 2010 21:44:44 -0700 Subject: [PATCH 54/54] Added cleaner recursive web crawler example. --- doc/examples.rst | 14 ++++++++-- examples/recursive_crawler.py | 49 +++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) create mode 100644 examples/recursive_crawler.py diff --git a/doc/examples.rst b/doc/examples.rst index 2f7b817..c16bf26 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -64,13 +64,23 @@ Port Forwarder .. literalinclude:: ../examples/forwarder.py +.. _recursive_crawler_example: + +Recursive Web Crawler +----------------------------------------- +``examples/recursive_crawler.py`` + +This is an example recursive web crawler that fetches linked pages from a seed url. + +.. literalinclude:: ../examples/recursive_crawler.py + .. _producer_consumer_example: -Producer Consumer/Recursive Web Crawler +Producer Consumer Web Crawler ----------------------------------------- ``examples/producer_consumer.py`` -This is an example implementation of the producer/consumer pattern as well as a functional recursive web crawler. +This is an example implementation of the producer/consumer pattern as well as being identical in functionality to the recursive web crawler. .. literalinclude:: ../examples/producer_consumer.py diff --git a/examples/recursive_crawler.py b/examples/recursive_crawler.py new file mode 100644 index 0000000..2e8701e --- /dev/null +++ b/examples/recursive_crawler.py @@ -0,0 +1,49 @@ +"""This is a recursive web crawler. Don't go pointing this at random sites; +it doesn't respect robots.txt and it is pretty brutal about how quickly it +fetches pages. + +The code for this is very short; this is perhaps a good indication +that this is making the most effective use of the primitves at hand. +The fetch function does all the work of making http requests, +searching for new urls, and dispatching new fetches. The GreenPool +acts as sort of a job coordinator (and concurrency controller of +course). +""" +from __future__ import with_statement + +from eventlet.green import urllib2 +import eventlet +import re + +# http://daringfireball.net/2009/11/liberal_regex_for_matching_urls +url_regex = re.compile(r'\b(([\w-]+://?|www[.])[^\s()<>]+(?:\([\w\d]+\)|([^[:punct:]\s]|/)))') + + +def fetch(url, seen, pool): + """Fetch a url, stick any found urls into the seen set, and + dispatch any new ones to the pool.""" + print "fetching", url + data = '' + with eventlet.Timeout(5, False): + data = urllib2.urlopen(url).read() + for url_match in url_regex.finditer(data): + new_url = url_match.group(0) + # only send requests to eventlet.net so as not to destroy the internet + if new_url not in seen and 'eventlet.net' in new_url: + seen.add(new_url) + # while this seems stack-recursive, it's actually not: + # spawned greenthreads start their own stacks + pool.spawn_n(fetch, new_url, seen, pool) + +def crawl(start_url): + """Recursively crawl starting from *start_url*. Returns a set of + urls that were found.""" + pool = eventlet.GreenPool() + seen = set() + fetch(start_url, seen, pool) + pool.waitall() + return seen + +seen = crawl("http://eventlet.net") +print "I saw these urls:" +print "\n".join(seen)