Rename the eventlet.support.greenlet module to greenlets to avoid clashing with the pypi greenlet distribution; support the pypi greenlet distribution; port httpc_test to use wsgi instead of httpd; add channel back in because I couldn't get pool to work properly with queue yet

This commit is contained in:
donovan
2008-12-29 16:10:34 -08:00
parent 0aa029f44d
commit 01670ce072
19 changed files with 252 additions and 178 deletions

View File

@@ -6,3 +6,7 @@ dist
eventlet.egg-info eventlet.egg-info
build build
htmlreports htmlreports
*.esproj
.DS_Store
results.*.db

View File

@@ -30,7 +30,7 @@ import linecache
import inspect import inspect
import traceback import traceback
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
from eventlet import tls from eventlet import tls
__all__ = [ __all__ = [
@@ -40,6 +40,12 @@ __all__ = [
'unspew', 'use_hub', 'with_timeout', 'timeout'] 'unspew', 'use_hub', 'with_timeout', 'timeout']
def switch(coro, result=None, exc=None):
if exc is not None:
return coro.throw(exc)
return coro.switch(result)
class TimeoutError(Exception): class TimeoutError(Exception):
"""Exception raised if an asynchronous operation times out""" """Exception raised if an asynchronous operation times out"""
pass pass

102
eventlet/channel.py Normal file
View File

@@ -0,0 +1,102 @@
"""\
@file channel.py
@author Bob Ippolito
Copyright (c) 2005-2006, Bob Ippolito
Copyright (c) 2007, Linden Research, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import collections
from eventlet import api
from eventlet.support import greenlets as greenlet
__all__ = ['channel']
class channel(object):
"""A channel is a control flow primitive for co-routines. It is a
"thread-like" queue for controlling flow between two (or more) co-routines.
The state model is:
* If one co-routine calls send(), it is unscheduled until another
co-routine calls receive().
* If one co-rounte calls receive(), it is unscheduled until another
co-routine calls send().
* Once a paired send()/receive() have been called, both co-routeines
are rescheduled.
This is similar to: http://stackless.com/wiki/Channels
"""
balance = 0
def _tasklet_loop(self):
deque = self.deque = collections.deque()
hub = api.get_hub()
current = greenlet.getcurrent()
def switch(g, value=None, exc=None):
if exc is None:
return g.switch(value)
else:
return g.throw(exc)
direction, caller, args = switch(current.parent or current)
try:
while True:
if direction == -1:
# waiting to receive
if self.balance > 0:
sender, args = deque.popleft()
hub.schedule_call(0, switch, sender)
hub.schedule_call(0, switch, caller, *args)
else:
deque.append(caller)
else:
# waiting to send
if self.balance < 0:
receiver = deque.popleft()
hub.schedule_call(0, switch, receiver, *args)
hub.schedule_call(0, switch, caller)
else:
deque.append((caller, args))
self.balance += direction
direction, caller, args = hub.switch()
finally:
deque.clear()
del self.deque
self.balance = 0
def _send_tasklet(self, *args):
try:
t = self._tasklet
except AttributeError:
t = self._tasklet = greenlet.greenlet(self._tasklet_loop)
t.switch()
if args:
return t.switch((1, greenlet.getcurrent(), args))
else:
return t.switch((-1, greenlet.getcurrent(), args))
def receive(self):
return self._send_tasklet()
def send(self, value):
return self._send_tasklet(value)
def send_exception(self, exc):
return self._send_tasklet(None, exc)

View File

@@ -558,6 +558,7 @@ class Semaphore(object):
if self.counter<=0: if self.counter<=0:
self._waiters[api.getcurrent()] = None self._waiters[api.getcurrent()] = None
try: try:
print "hub switch"
api.get_hub().switch() api.get_hub().switch()
finally: finally:
self._waiters.pop(api.getcurrent(), None) self._waiters.pop(api.getcurrent(), None)
@@ -618,6 +619,7 @@ class BoundedSemaphore(object):
api.get_hub().schedule_call(0, self._do_unlock) api.get_hub().schedule_call(0, self._do_unlock)
self._acquire_waiters[api.getcurrent()] = None self._acquire_waiters[api.getcurrent()] = None
try: try:
print "HUB switch"
api.get_hub().switch() api.get_hub().switch()
finally: finally:
self._acquire_waiters.pop(api.getcurrent(), None) self._acquire_waiters.pop(api.getcurrent(), None)

View File

@@ -1,7 +1,7 @@
"""implements standard module 'thread' with greenlets""" """implements standard module 'thread' with greenlets"""
from __future__ import absolute_import from __future__ import absolute_import
import thread as thread_module import thread as thread_module
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
from eventlet.api import spawn from eventlet.api import spawn
from eventlet.coros import semaphore as LockType from eventlet.coros import semaphore as LockType

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
import sys import sys
import itertools import itertools
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
from eventlet import tls from eventlet import tls

View File

@@ -29,7 +29,7 @@ import errno
import traceback import traceback
import time import time
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
from eventlet.timer import Timer from eventlet.timer import Timer
_g_debug = True _g_debug = True

View File

@@ -32,7 +32,7 @@ import time
from eventlet.timer import Timer from eventlet.timer import Timer
from eventlet.hubs import hub from eventlet.hubs import hub
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
# XXX for debugging only # XXX for debugging only
#raise ImportError() #raise ImportError()

View File

@@ -33,7 +33,7 @@ from eventlet import greenlib
from eventlet.timer import Timer from eventlet.timer import Timer
from eventlet.hubs import hub from eventlet.hubs import hub
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
# XXX for debugging only # XXX for debugging only
#raise ImportError() #raise ImportError()

View File

@@ -29,7 +29,7 @@ import time
from eventlet.hubs import hub from eventlet.hubs import hub
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
class Hub(hub.BaseHub): class Hub(hub.BaseHub):
def wait(self, seconds=None): def wait(self, seconds=None):

View File

@@ -29,17 +29,6 @@ import socket
from eventlet import api from eventlet import api
from eventlet import channel from eventlet import channel
class FanFailed(RuntimeError):
pass
class SomeFailed(FanFailed):
pass
class AllFailed(FanFailed):
pass
class Pool(object): class Pool(object):
""" """
@@ -106,44 +95,6 @@ class Pool(object):
""" """
raise NotImplementedError("Implement in subclass") raise NotImplementedError("Implement in subclass")
def fan(self, block, input_list):
chan = channel.channel()
results = []
exceptional_results = 0
for index, input_item in enumerate(input_list):
pool_item = self.get()
## Fan out
api.spawn(
self._invoke, block, pool_item, input_item, index, chan)
## Fan back in
for i in range(len(input_list)):
## Wait for all guys to send to the queue
index, value = chan.receive()
if isinstance(value, Exception):
exceptional_results += 1
results.append((index, value))
results.sort()
results = [value for index, value in results]
if exceptional_results:
if exceptional_results == len(results):
raise AllFailed(results)
raise SomeFailed(results)
return results
def _invoke(self, block, pool_item, input_item, index, chan):
try:
result = block(pool_item, input_item)
except Exception, e:
self.put(pool_item)
chan.send((index, e))
return
self.put(pool_item)
chan.send((index, result))
class Token(object): class Token(object):
pass pass

View File

@@ -0,0 +1,22 @@
try:
import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
greenlet = greenlet.greenlet
except ImportError, e:
print e
try:
from py.magic import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
except ImportError:
try:
from stackless import greenlet
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit
except ImportError:
try:
from support.stacklesss import greenlet, getcurrent, GreenletExit
except ImportError, e:
raise ImportError("Unable to find an implementation of greenlet.")

View File

@@ -5,7 +5,7 @@ yourself.
""" """
from eventlet.hubs.twistedr import BaseTwistedHub from eventlet.hubs.twistedr import BaseTwistedHub
from eventlet.api import use_hub, _threadlocal from eventlet.api import use_hub, _threadlocal
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
use_hub(BaseTwistedHub) use_hub(BaseTwistedHub)
assert not hasattr(_threadlocal, 'hub') assert not hasattr(_threadlocal, 'hub')

View File

@@ -48,7 +48,7 @@ except ImportError:
def g_log(*args): def g_log(*args):
import sys import sys
from eventlet.support import greenlet from eventlet.support import greenlets as greenlet
from eventlet.greenlib import greenlet_id from eventlet.greenlib import greenlet_id
g_id = greenlet_id() g_id = greenlet_id()
if g_id is None: if g_id is None:

View File

@@ -213,6 +213,7 @@ class TestCoroutinePool(tests.TestCase):
for x in range(6): for x in range(6):
pool.execute(lambda n: n, x) pool.execute(lambda n: n, x)
for y in range(6): for y in range(6):
print "wait", y
pool.wait() pool.wait()
def test_track_slow_event(self): def test_track_slow_event(self):

View File

@@ -22,11 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import cgi
from eventlet import api from eventlet import api
from eventlet import httpc from eventlet import httpc
from eventlet import httpd
from eventlet import processes from eventlet import processes
from eventlet import util from eventlet import util
from eventlet import wsgi
import time import time
try: try:
from cStringIO import StringIO from cStringIO import StringIO
@@ -41,64 +44,74 @@ class Site(object):
def __init__(self): def __init__(self):
self.stuff = {'hello': 'hello world'} self.stuff = {'hello': 'hello world'}
def adapt(self, obj, req): def __call__(self, env, start_response):
req.write(str(obj)) return getattr(self, 'handle_%s' % env['REQUEST_METHOD'].lower())(env, start_response)
def _get_query_pairs(env):
parsed = cgi.parse_qs(env['QUERY_STRING'])
for key, values in parsed.items():
for val in values:
yield key, val
def get_query_pairs(env):
return list(_get_query_pairs(env))
def handle_request(self, req):
return getattr(self, 'handle_%s' % req.method().lower())(req)
class BasicSite(Site): class BasicSite(Site):
def handle_get(self, req): def handle_get(self, env, start_response):
req.set_header('x-get', 'hello') headers = [('x-get', 'hello'), ('Content-type', 'text/plain')]
resp = StringIO() resp = StringIO()
path = req.path().lstrip('/') path = env['PATH_INFO'].lstrip('/')
try: try:
resp.write(self.stuff[path]) resp.write(self.stuff[path])
except KeyError: except KeyError:
req.response(404, body='Not found') start_response("404 Not Found", headers)
return return ["Not Found"]
for k,v in req.get_query_pairs(): for k,v in get_query_pairs(env):
resp.write(k + '=' + v + '\n') resp.write(k + '=' + v + '\n')
req.write(resp.getvalue()) start_response("200 OK", headers)
return [resp.getvalue()]
def handle_head(self, req): def handle_head(self, env, start_response):
req.set_header('x-head', 'hello') headers = [('x-head', 'hello'), ('Content-type', 'text/plain')]
path = req.path().lstrip('/') start_response("200 OK", headers)
try: return [""]
req.write('')
except KeyError:
req.response(404, body='Not found')
def handle_put(self, req): def handle_put(self, env, start_response):
req.set_header('x-put', 'hello') headers = [('x-put', 'hello'), ('Content-type', 'text/plain')]
path = req.path().lstrip('/') path = env['PATH_INFO'].lstrip('/')
if not path: if not path:
req.response(400, body='') start_response("400 Bad Request", headers)
return return [""]
if path in self.stuff: if path in self.stuff:
req.response(204) start_response("204 No Content", headers)
else: else:
req.response(201) start_response("201 Created", headers)
self.stuff[path] = req.read_body() self.stuff[path] = env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))
req.write('') return [""]
def handle_delete(self, req): def handle_delete(self, env, start_response):
req.set_header('x-delete', 'hello') headers = [('x-delete', 'hello'), ('Content-type', 'text/plain')]
path = req.path().lstrip('/') path = env['PATH_INFO'].lstrip('/')
if not path: if not path:
req.response(400, body='') start_response("400 Bad Request", headers)
return return [""]
try: try:
del self.stuff[path] del self.stuff[path]
req.response(204) start_response("204 No Content", headers)
except KeyError: except KeyError:
req.response(404) start_response("404 Not Found", headers)
req.write('') return [""]
def handle_post(self, req): def handle_post(self, env, start_response):
req.set_header('x-post', 'hello') headers = [('x-post', 'hello'), ('Content-type', 'text/plain')]
req.write(req.read_body()) start_response("200 OK", headers)
return [env['wsgi.input'].read(int(env.get('CONTENT_LENGTH', '0')))]
class TestBase(object): class TestBase(object):
@@ -109,7 +122,7 @@ class TestBase(object):
def setUp(self): def setUp(self):
self.logfile = StringIO() self.logfile = StringIO()
self.victim = api.spawn(httpd.server, self.victim = api.spawn(wsgi.server,
api.tcp_listener(('0.0.0.0', 31337)), api.tcp_listener(('0.0.0.0', 31337)),
self.site_class(), self.site_class(),
log=self.logfile, log=self.logfile,
@@ -211,45 +224,44 @@ class TestHttpc(TestBase, tests.TestCase):
class RedirectSite(BasicSite): class RedirectSite(BasicSite):
response_code = 301 response_code = "301 Moved Permanently"
def __call__(self, env, start_response):
path = env['PATH_INFO']
if path.startswith('/redirect/'):
url = 'http://' + env['HTTP_HOST'] + path.replace('/redirect/', '/')
start_response(self.response_code, [("Location", url)])
return [""]
return super(RedirectSite, self).__call__(env, start_response)
def handle_request(self, req):
if req.path().startswith('/redirect/'):
url = ('http://' + req.get_header('host') +
req.uri().replace('/redirect/', '/'))
req.response(self.response_code, headers={'location': url},
body='')
return
return Site.handle_request(self, req)
class Site301(RedirectSite): class Site301(RedirectSite):
pass pass
class Site302(BasicSite): class Site302(BasicSite):
def handle_request(self, req): def __call__(self, env, start_response):
if req.path().startswith('/expired/'): path = env['PATH_INFO']
url = ('http://' + req.get_header('host') + if path.startswith('/expired/'):
req.uri().replace('/expired/', '/')) url = 'http://' + env['HTTP_HOST'] + path.replace('/expired/', '/')
headers = {'location': url, 'expires': '0'} headers = [('location', url), ('expires', '0')]
req.response(302, headers=headers, body='') start_response("302 Found", headers)
return return [""]
if req.path().startswith('/expires/'): if path.startswith('/expires/'):
url = ('http://' + req.get_header('host') + url = 'http://' + env['HTTP_HOST'] + path.replace('/expires/', '/')
req.uri().replace('/expires/', '/'))
expires = time.time() + (100 * 24 * 60 * 60) expires = time.time() + (100 * 24 * 60 * 60)
headers = {'location': url, 'expires': httpc.to_http_time(expires)} headers = [('location', url), ('expires', httpc.to_http_time(expires))]
req.response(302, headers=headers, body='') start_response("302 Found", headers)
return return [""]
return Site.handle_request(self, req) return super(Site302, self).__call__(env, start_response)
class Site303(RedirectSite): class Site303(RedirectSite):
response_code = 303 response_code = "303 See Other"
class Site307(RedirectSite): class Site307(RedirectSite):
response_code = 307 response_code = "307 Temporary Redirect"
class TestHttpc301(TestBase, tests.TestCase): class TestHttpc301(TestBase, tests.TestCase):
@@ -332,15 +344,9 @@ class TestHttpc307(TestBase, tests.TestCase):
class Site500(BasicSite): class Site500(BasicSite):
def handle_request(self, req): def __call__(self, env, start_response):
req.response(500, body="screw you world") start_response("500 Internal Server Error", [("Content-type", "text/plain")])
return return ["screw you world"]
class Site500(BasicSite):
def handle_request(self, req):
req.response(500, body="screw you world")
return
class TestHttpc500(TestBase, tests.TestCase): class TestHttpc500(TestBase, tests.TestCase):
@@ -361,8 +367,9 @@ class TestHttpc500(TestBase, tests.TestCase):
class Site504(BasicSite): class Site504(BasicSite):
def handle_request(self, req): def __call__(self, env, start_response):
req.response(504, body="screw you world") start_response("504 Gateway Timeout", [("Content-type", "text/plain")])
return ["screw you world"]
class TestHttpc504(TestBase, tests.TestCase): class TestHttpc504(TestBase, tests.TestCase):

View File

@@ -22,10 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import time
import unittest import unittest
from eventlet import api from eventlet import api
from eventlet import channel from eventlet import coros
from eventlet import pools from eventlet import pools
@@ -65,13 +66,19 @@ class TestIntPool(unittest.TestCase):
self.assertEquals(self.pool.free(), 4) self.assertEquals(self.pool.free(), 4)
def test_exhaustion(self): def test_exhaustion(self):
waiter = channel.channel() waiter = coros.event()
def consumer(): def consumer():
gotten = None gotten = None
cancel = api.exc_after(1, api.TimeoutError)
try: try:
print time.asctime(), "getting"
gotten = self.pool.get() gotten = self.pool.get()
print time.asctime(), "got"
finally: finally:
cancel.cancel()
print "waiter send"
waiter.send(gotten) waiter.send(gotten)
print "waiter sent"
api.spawn(consumer) api.spawn(consumer)
@@ -82,14 +89,17 @@ class TestIntPool(unittest.TestCase):
# Let consumer run; nothing will be in the pool, so he will wait # Let consumer run; nothing will be in the pool, so he will wait
api.sleep(0) api.sleep(0)
print "put in pool", one
# Wake consumer # Wake consumer
self.pool.put(one) self.pool.put(one)
print "done put"
# wait for the consumer # wait for the consumer
self.assertEquals(waiter.receive(), one) self.assertEquals(waiter.wait(), one)
print "done wait"
def test_blocks_on_pool(self): def test_blocks_on_pool(self):
waiter = channel.channel() waiter = coros.event()
def greedy(): def greedy():
self.pool.get() self.pool.get()
self.pool.get() self.pool.get()
@@ -98,7 +108,9 @@ class TestIntPool(unittest.TestCase):
# No one should be waiting yet. # No one should be waiting yet.
self.assertEquals(self.pool.waiting(), 0) self.assertEquals(self.pool.waiting(), 0)
# The call to the next get will unschedule this routine. # The call to the next get will unschedule this routine.
print "calling get"
self.pool.get() self.pool.get()
print "called get"
# So this send should never be called. # So this send should never be called.
waiter.send('Failed!') waiter.send('Failed!')
@@ -113,8 +125,8 @@ class TestIntPool(unittest.TestCase):
## Greedy should be blocking on the last get ## Greedy should be blocking on the last get
self.assertEquals(self.pool.waiting(), 1) self.assertEquals(self.pool.waiting(), 1)
## Send will never be called, so balance should be 0. ## Send will never be called, so the event should not be ready.
self.assertEquals(waiter.balance, 0) self.assertEquals(waiter.ready(), False)
api.kill(killable) api.kill(killable)
@@ -141,38 +153,6 @@ class TestIntPool2(unittest.TestCase):
self.assertEquals(gotten, 1) self.assertEquals(gotten, 1)
ALWAYS = RuntimeError('I always fail')
SOMETIMES = RuntimeError('I fail half the time')
class TestFan(unittest.TestCase):
mode = 'static'
def setUp(self):
self.pool = IntPool(max_size=2)
def test_with_list(self):
list_of_input = ['agent-one', 'agent-two', 'agent-three']
def my_callable(pool_item, next_thing):
## Do some "blocking" (yielding) thing
api.sleep(0.01)
return next_thing
output = self.pool.fan(my_callable, list_of_input)
self.assertEquals(list_of_input, output)
def test_all_fail(self):
def my_failure(pool_item, next_thing):
raise ALWAYS
self.assertRaises(pools.AllFailed, self.pool.fan, my_failure, range(4))
def test_some_fail(self):
def my_failing_callable(pool_item, next_thing):
if next_thing % 2:
raise SOMETIMES
return next_thing
self.assertRaises(pools.SomeFailed, self.pool.fan, my_failing_callable, range(4))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -23,7 +23,6 @@ THE SOFTWARE.
""" """
from greentest import tests from greentest import tests
from eventlet import api from eventlet import api
from eventlet import channel
from eventlet import processes from eventlet import processes
class TestEchoPool(tests.TestCase): class TestEchoPool(tests.TestCase):

View File

@@ -225,7 +225,7 @@ class TestSaranwrap(unittest.TestCase):
tid = make_uuid() tid = make_uuid()
self.assertEqual(tid.get_version(), uuid.uuid4().get_version()) self.assertEqual(tid.get_version(), uuid.uuid4().get_version())
def make_list(): def make_list():
from eventlet import saranwrap_test from greentest import saranwrap_test
prox = saranwrap.wrap(saranwrap_test.list_maker) prox = saranwrap.wrap(saranwrap_test.list_maker)
# after this function returns, prox should fall out of scope # after this function returns, prox should fall out of scope
return prox() return prox()
@@ -270,7 +270,7 @@ sys_path = sys.path""")
sys.path.remove(temp_dir) sys.path.remove(temp_dir)
def test_contention(self): def test_contention(self):
from eventlet import saranwrap_test from greentest import saranwrap_test
prox = saranwrap.wrap(saranwrap_test) prox = saranwrap.wrap(saranwrap_test)
pool = coros.CoroutinePool(max_size=4) pool = coros.CoroutinePool(max_size=4)
@@ -296,7 +296,7 @@ sys_path = sys.path""")
def test_list_of_functions(self): def test_list_of_functions(self):
return # this test is known to fail, we can implement it sometime in the future if we wish return # this test is known to fail, we can implement it sometime in the future if we wish
from eventlet import saranwrap_test from greentest import saranwrap_test
prox = saranwrap.wrap([saranwrap_test.list_maker]) prox = saranwrap.wrap([saranwrap_test.list_maker])
self.assertEquals(list_maker(), prox[0]()) self.assertEquals(list_maker(), prox[0]())