diff --git a/.hgignore b/.hgignore index 1c118cd..5ece4c6 100644 --- a/.hgignore +++ b/.hgignore @@ -6,3 +6,7 @@ dist eventlet.egg-info build htmlreports +*.esproj +.DS_Store +results.*.db + diff --git a/eventlet/api.py b/eventlet/api.py index 2bbd09c..4ae98fd 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -30,7 +30,7 @@ import linecache import inspect import traceback -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet import tls __all__ = [ @@ -40,6 +40,12 @@ __all__ = [ 'unspew', 'use_hub', 'with_timeout', 'timeout'] +def switch(coro, result=None, exc=None): + if exc is not None: + return coro.throw(exc) + return coro.switch(result) + + class TimeoutError(Exception): """Exception raised if an asynchronous operation times out""" pass diff --git a/eventlet/channel.py b/eventlet/channel.py new file mode 100644 index 0000000..963088f --- /dev/null +++ b/eventlet/channel.py @@ -0,0 +1,102 @@ +"""\ +@file channel.py +@author Bob Ippolito + +Copyright (c) 2005-2006, Bob Ippolito +Copyright (c) 2007, Linden Research, Inc. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import collections + +from eventlet import api +from eventlet.support import greenlets as greenlet + +__all__ = ['channel'] + +class channel(object): + """A channel is a control flow primitive for co-routines. It is a + "thread-like" queue for controlling flow between two (or more) co-routines. + The state model is: + + * If one co-routine calls send(), it is unscheduled until another + co-routine calls receive(). + * If one co-rounte calls receive(), it is unscheduled until another + co-routine calls send(). + * Once a paired send()/receive() have been called, both co-routeines + are rescheduled. + + This is similar to: http://stackless.com/wiki/Channels + """ + balance = 0 + + def _tasklet_loop(self): + deque = self.deque = collections.deque() + hub = api.get_hub() + current = greenlet.getcurrent() + def switch(g, value=None, exc=None): + if exc is None: + return g.switch(value) + else: + return g.throw(exc) + direction, caller, args = switch(current.parent or current) + try: + while True: + if direction == -1: + # waiting to receive + if self.balance > 0: + sender, args = deque.popleft() + hub.schedule_call(0, switch, sender) + hub.schedule_call(0, switch, caller, *args) + else: + deque.append(caller) + else: + # waiting to send + if self.balance < 0: + receiver = deque.popleft() + hub.schedule_call(0, switch, receiver, *args) + hub.schedule_call(0, switch, caller) + else: + deque.append((caller, args)) + self.balance += direction + direction, caller, args = hub.switch() + finally: + deque.clear() + del self.deque + self.balance = 0 + + def _send_tasklet(self, *args): + try: + t = self._tasklet + except AttributeError: + t = self._tasklet = greenlet.greenlet(self._tasklet_loop) + t.switch() + if args: + return t.switch((1, greenlet.getcurrent(), args)) + else: + return t.switch((-1, greenlet.getcurrent(), args)) + + def receive(self): + return self._send_tasklet() + + def send(self, value): + return self._send_tasklet(value) + + def send_exception(self, exc): + return self._send_tasklet(None, exc) diff --git a/eventlet/coros.py b/eventlet/coros.py index b15f8fd..a8ba5b0 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -558,6 +558,7 @@ class Semaphore(object): if self.counter<=0: self._waiters[api.getcurrent()] = None try: + print "hub switch" api.get_hub().switch() finally: self._waiters.pop(api.getcurrent(), None) @@ -618,6 +619,7 @@ class BoundedSemaphore(object): api.get_hub().schedule_call(0, self._do_unlock) self._acquire_waiters[api.getcurrent()] = None try: + print "HUB switch" api.get_hub().switch() finally: self._acquire_waiters.pop(api.getcurrent(), None) diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index cd7176a..6ec7d03 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -1,7 +1,7 @@ """implements standard module 'thread' with greenlets""" from __future__ import absolute_import import thread as thread_module -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet.api import spawn from eventlet.coros import semaphore as LockType diff --git a/eventlet/greenlib.py b/eventlet/greenlib.py index e293112..326521f 100644 --- a/eventlet/greenlib.py +++ b/eventlet/greenlib.py @@ -25,7 +25,7 @@ THE SOFTWARE. import sys import itertools -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet import tls diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index d287554..f900b51 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -29,7 +29,7 @@ import errno import traceback import time -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet from eventlet.timer import Timer _g_debug = True diff --git a/eventlet/hubs/libev.py b/eventlet/hubs/libev.py index d839ecd..02a8b09 100644 --- a/eventlet/hubs/libev.py +++ b/eventlet/hubs/libev.py @@ -32,7 +32,7 @@ import time from eventlet.timer import Timer from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet # XXX for debugging only #raise ImportError() diff --git a/eventlet/hubs/libevent.py b/eventlet/hubs/libevent.py index 1cf5b23..fab7d03 100644 --- a/eventlet/hubs/libevent.py +++ b/eventlet/hubs/libevent.py @@ -33,7 +33,7 @@ from eventlet import greenlib from eventlet.timer import Timer from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet # XXX for debugging only #raise ImportError() diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index f8e2297..06e2b36 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -29,7 +29,7 @@ import time from eventlet.hubs import hub -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet class Hub(hub.BaseHub): def wait(self, seconds=None): diff --git a/eventlet/pools.py b/eventlet/pools.py index a05aae7..b226206 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -29,17 +29,6 @@ import socket from eventlet import api from eventlet import channel -class FanFailed(RuntimeError): - pass - - -class SomeFailed(FanFailed): - pass - - -class AllFailed(FanFailed): - pass - class Pool(object): """ @@ -106,44 +95,6 @@ class Pool(object): """ raise NotImplementedError("Implement in subclass") - def fan(self, block, input_list): - chan = channel.channel() - results = [] - exceptional_results = 0 - for index, input_item in enumerate(input_list): - pool_item = self.get() - - ## Fan out - api.spawn( - self._invoke, block, pool_item, input_item, index, chan) - - ## Fan back in - for i in range(len(input_list)): - ## Wait for all guys to send to the queue - index, value = chan.receive() - if isinstance(value, Exception): - exceptional_results += 1 - results.append((index, value)) - - results.sort() - results = [value for index, value in results] - - if exceptional_results: - if exceptional_results == len(results): - raise AllFailed(results) - raise SomeFailed(results) - return results - - def _invoke(self, block, pool_item, input_item, index, chan): - try: - result = block(pool_item, input_item) - except Exception, e: - self.put(pool_item) - chan.send((index, e)) - return - self.put(pool_item) - chan.send((index, result)) - class Token(object): pass diff --git a/eventlet/support/greenlets.py b/eventlet/support/greenlets.py new file mode 100644 index 0000000..0423542 --- /dev/null +++ b/eventlet/support/greenlets.py @@ -0,0 +1,22 @@ + +try: + import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + greenlet = greenlet.greenlet +except ImportError, e: + print e + try: + from py.magic import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + except ImportError: + try: + from stackless import greenlet + getcurrent = greenlet.getcurrent + GreenletExit = greenlet.GreenletExit + except ImportError: + try: + from support.stacklesss import greenlet, getcurrent, GreenletExit + except ImportError, e: + raise ImportError("Unable to find an implementation of greenlet.") diff --git a/eventlet/twistedutil/join_reactor.py b/eventlet/twistedutil/join_reactor.py index e658fa8..f1f9470 100644 --- a/eventlet/twistedutil/join_reactor.py +++ b/eventlet/twistedutil/join_reactor.py @@ -5,7 +5,7 @@ yourself. """ from eventlet.hubs.twistedr import BaseTwistedHub from eventlet.api import use_hub, _threadlocal -from eventlet.support import greenlet +from eventlet.support import greenlets as greenlet use_hub(BaseTwistedHub) assert not hasattr(_threadlocal, 'hub') diff --git a/eventlet/util.py b/eventlet/util.py index 9583e25..ed5be69 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -48,7 +48,7 @@ except ImportError: def g_log(*args): import sys - from eventlet.support import greenlet + from eventlet.support import greenlets as greenlet from eventlet.greenlib import greenlet_id g_id = greenlet_id() if g_id is None: diff --git a/greentest/coros_test.py b/greentest/coros_test.py index bf509d9..5f10cea 100644 --- a/greentest/coros_test.py +++ b/greentest/coros_test.py @@ -213,6 +213,7 @@ class TestCoroutinePool(tests.TestCase): for x in range(6): pool.execute(lambda n: n, x) for y in range(6): + print "wait", y pool.wait() def test_track_slow_event(self): diff --git a/greentest/httpc_test.py b/greentest/httpc_test.py index c43b3fa..fd88a74 100644 --- a/greentest/httpc_test.py +++ b/greentest/httpc_test.py @@ -22,11 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import cgi + from eventlet import api from eventlet import httpc -from eventlet import httpd from eventlet import processes from eventlet import util +from eventlet import wsgi + import time try: from cStringIO import StringIO @@ -41,64 +44,74 @@ class Site(object): def __init__(self): self.stuff = {'hello': 'hello world'} - def adapt(self, obj, req): - req.write(str(obj)) + def __call__(self, env, start_response): + return getattr(self, 'handle_%s' % env['REQUEST_METHOD'].lower())(env, start_response) + + +def _get_query_pairs(env): + parsed = cgi.parse_qs(env['QUERY_STRING']) + for key, values in parsed.items(): + for val in values: + yield key, val + + +def get_query_pairs(env): + return list(_get_query_pairs(env)) + - def handle_request(self, req): - return getattr(self, 'handle_%s' % req.method().lower())(req) class BasicSite(Site): - def handle_get(self, req): - req.set_header('x-get', 'hello') + def handle_get(self, env, start_response): + headers = [('x-get', 'hello'), ('Content-type', 'text/plain')] resp = StringIO() - path = req.path().lstrip('/') + path = env['PATH_INFO'].lstrip('/') try: resp.write(self.stuff[path]) except KeyError: - req.response(404, body='Not found') - return - for k,v in req.get_query_pairs(): + start_response("404 Not Found", headers) + return ["Not Found"] + for k,v in get_query_pairs(env): resp.write(k + '=' + v + '\n') - req.write(resp.getvalue()) + start_response("200 OK", headers) + return [resp.getvalue()] - def handle_head(self, req): - req.set_header('x-head', 'hello') - path = req.path().lstrip('/') - try: - req.write('') - except KeyError: - req.response(404, body='Not found') + def handle_head(self, env, start_response): + headers = [('x-head', 'hello'), ('Content-type', 'text/plain')] + start_response("200 OK", headers) + return [""] - def handle_put(self, req): - req.set_header('x-put', 'hello') - path = req.path().lstrip('/') + def handle_put(self, env, start_response): + headers = [('x-put', 'hello'), ('Content-type', 'text/plain')] + path = env['PATH_INFO'].lstrip('/') if not path: - req.response(400, body='') - return - if path in self.stuff: - req.response(204) - else: - req.response(201) - self.stuff[path] = req.read_body() - req.write('') + start_response("400 Bad Request", headers) + return [""] - def handle_delete(self, req): - req.set_header('x-delete', 'hello') - path = req.path().lstrip('/') + if path in self.stuff: + start_response("204 No Content", headers) + else: + start_response("201 Created", headers) + self.stuff[path] = env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0'))) + return [""] + + def handle_delete(self, env, start_response): + headers = [('x-delete', 'hello'), ('Content-type', 'text/plain')] + path = env['PATH_INFO'].lstrip('/') if not path: - req.response(400, body='') - return + start_response("400 Bad Request", headers) + return [""] try: del self.stuff[path] - req.response(204) + start_response("204 No Content", headers) except KeyError: - req.response(404) - req.write('') + start_response("404 Not Found", headers) + return [""] - def handle_post(self, req): - req.set_header('x-post', 'hello') - req.write(req.read_body()) + def handle_post(self, env, start_response): + headers = [('x-post', 'hello'), ('Content-type', 'text/plain')] + start_response("200 OK", headers) + return [env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))] class TestBase(object): @@ -109,7 +122,7 @@ class TestBase(object): def setUp(self): self.logfile = StringIO() - self.victim = api.spawn(httpd.server, + self.victim = api.spawn(wsgi.server, api.tcp_listener(('0.0.0.0', 31337)), self.site_class(), log=self.logfile, @@ -211,45 +224,44 @@ class TestHttpc(TestBase, tests.TestCase): class RedirectSite(BasicSite): - response_code = 301 + response_code = "301 Moved Permanently" + + def __call__(self, env, start_response): + path = env['PATH_INFO'] + if path.startswith('/redirect/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/redirect/', '/') + start_response(self.response_code, [("Location", url)]) + return [""] + return super(RedirectSite, self).__call__(env, start_response) - def handle_request(self, req): - if req.path().startswith('/redirect/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/redirect/', '/')) - req.response(self.response_code, headers={'location': url}, - body='') - return - return Site.handle_request(self, req) class Site301(RedirectSite): pass class Site302(BasicSite): - def handle_request(self, req): - if req.path().startswith('/expired/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/expired/', '/')) - headers = {'location': url, 'expires': '0'} - req.response(302, headers=headers, body='') - return - if req.path().startswith('/expires/'): - url = ('http://' + req.get_header('host') + - req.uri().replace('/expires/', '/')) + def __call__(self, env, start_response): + path = env['PATH_INFO'] + if path.startswith('/expired/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/expired/', '/') + headers = [('location', url), ('expires', '0')] + start_response("302 Found", headers) + return [""] + if path.startswith('/expires/'): + url = 'http://' + env['HTTP_HOST'] + path.replace('/expires/', '/') expires = time.time() + (100 * 24 * 60 * 60) - headers = {'location': url, 'expires': httpc.to_http_time(expires)} - req.response(302, headers=headers, body='') - return - return Site.handle_request(self, req) + headers = [('location', url), ('expires', httpc.to_http_time(expires))] + start_response("302 Found", headers) + return [""] + return super(Site302, self).__call__(env, start_response) class Site303(RedirectSite): - response_code = 303 + response_code = "303 See Other" class Site307(RedirectSite): - response_code = 307 + response_code = "307 Temporary Redirect" class TestHttpc301(TestBase, tests.TestCase): @@ -332,15 +344,9 @@ class TestHttpc307(TestBase, tests.TestCase): class Site500(BasicSite): - def handle_request(self, req): - req.response(500, body="screw you world") - return - - -class Site500(BasicSite): - def handle_request(self, req): - req.response(500, body="screw you world") - return + def __call__(self, env, start_response): + start_response("500 Internal Server Error", [("Content-type", "text/plain")]) + return ["screw you world"] class TestHttpc500(TestBase, tests.TestCase): @@ -361,8 +367,9 @@ class TestHttpc500(TestBase, tests.TestCase): class Site504(BasicSite): - def handle_request(self, req): - req.response(504, body="screw you world") + def __call__(self, env, start_response): + start_response("504 Gateway Timeout", [("Content-type", "text/plain")]) + return ["screw you world"] class TestHttpc504(TestBase, tests.TestCase): diff --git a/greentest/pools_test.py b/greentest/pools_test.py index e604bc1..282067a 100644 --- a/greentest/pools_test.py +++ b/greentest/pools_test.py @@ -22,10 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import time import unittest from eventlet import api -from eventlet import channel +from eventlet import coros from eventlet import pools @@ -65,13 +66,19 @@ class TestIntPool(unittest.TestCase): self.assertEquals(self.pool.free(), 4) def test_exhaustion(self): - waiter = channel.channel() + waiter = coros.event() def consumer(): gotten = None + cancel = api.exc_after(1, api.TimeoutError) try: + print time.asctime(), "getting" gotten = self.pool.get() + print time.asctime(), "got" finally: + cancel.cancel() + print "waiter send" waiter.send(gotten) + print "waiter sent" api.spawn(consumer) @@ -82,14 +89,17 @@ class TestIntPool(unittest.TestCase): # Let consumer run; nothing will be in the pool, so he will wait api.sleep(0) + print "put in pool", one # Wake consumer self.pool.put(one) + print "done put" # wait for the consumer - self.assertEquals(waiter.receive(), one) + self.assertEquals(waiter.wait(), one) + print "done wait" def test_blocks_on_pool(self): - waiter = channel.channel() + waiter = coros.event() def greedy(): self.pool.get() self.pool.get() @@ -98,7 +108,9 @@ class TestIntPool(unittest.TestCase): # No one should be waiting yet. self.assertEquals(self.pool.waiting(), 0) # The call to the next get will unschedule this routine. + print "calling get" self.pool.get() + print "called get" # So this send should never be called. waiter.send('Failed!') @@ -113,8 +125,8 @@ class TestIntPool(unittest.TestCase): ## Greedy should be blocking on the last get self.assertEquals(self.pool.waiting(), 1) - ## Send will never be called, so balance should be 0. - self.assertEquals(waiter.balance, 0) + ## Send will never be called, so the event should not be ready. + self.assertEquals(waiter.ready(), False) api.kill(killable) @@ -140,38 +152,6 @@ class TestIntPool2(unittest.TestCase): gotten = self.pool.get() self.assertEquals(gotten, 1) - -ALWAYS = RuntimeError('I always fail') -SOMETIMES = RuntimeError('I fail half the time') - - -class TestFan(unittest.TestCase): - mode = 'static' - def setUp(self): - self.pool = IntPool(max_size=2) - - def test_with_list(self): - list_of_input = ['agent-one', 'agent-two', 'agent-three'] - - def my_callable(pool_item, next_thing): - ## Do some "blocking" (yielding) thing - api.sleep(0.01) - return next_thing - - output = self.pool.fan(my_callable, list_of_input) - self.assertEquals(list_of_input, output) - - def test_all_fail(self): - def my_failure(pool_item, next_thing): - raise ALWAYS - self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4)) - - def test_some_fail(self): - def my_failing_callable(pool_item, next_thing): - if next_thing % 2: - raise SOMETIMES - return next_thing - self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4)) if __name__ == '__main__': diff --git a/greentest/processes_test.py b/greentest/processes_test.py index e2c1660..98ce505 100644 --- a/greentest/processes_test.py +++ b/greentest/processes_test.py @@ -23,7 +23,6 @@ THE SOFTWARE. """ from greentest import tests from eventlet import api -from eventlet import channel from eventlet import processes class TestEchoPool(tests.TestCase): diff --git a/greentest/saranwrap_test.py b/greentest/saranwrap_test.py index df0878d..32be606 100644 --- a/greentest/saranwrap_test.py +++ b/greentest/saranwrap_test.py @@ -225,7 +225,7 @@ class TestSaranwrap(unittest.TestCase): tid = make_uuid() self.assertEqual(tid.get_version(), uuid.uuid4().get_version()) def make_list(): - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test.list_maker) # after this function returns, prox should fall out of scope return prox() @@ -270,7 +270,7 @@ sys_path = sys.path""") sys.path.remove(temp_dir) def test_contention(self): - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap(saranwrap_test) pool = coros.CoroutinePool(max_size=4) @@ -296,7 +296,7 @@ sys_path = sys.path""") def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish - from eventlet import saranwrap_test + from greentest import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEquals(list_maker(), prox[0]())