merge up to svn branch r120

This commit is contained in:
donovan
2008-05-22 07:35:09 -07:00
parent 3991daef04
commit a9f9486f89
8 changed files with 174 additions and 21 deletions

View File

@@ -95,6 +95,14 @@ class event(object):
self.epoch = time.time() self.epoch = time.time()
self._result = NOT_USED self._result = NOT_USED
self._waiters = {} self._waiters = {}
def ready(self):
""" Return true if the wait() call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling ready() until one returns True, and then
you can wait() on that one."""
return self._result is not NOT_USED
def wait(self): def wait(self):
"""Wait until another coroutine calls send. """Wait until another coroutine calls send.
@@ -124,14 +132,6 @@ class event(object):
raise self._exc raise self._exc
return self._result return self._result
def ready(self):
""" Return true if the wait() call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling ready() until one returns True, and then
you can wait() on that one."""
return self._result is not NOT_USED
def cancel(self, waiter): def cancel(self, waiter):
"""Raise an exception into a coroutine which called """Raise an exception into a coroutine which called
wait() an this event instead of returning a value wait() an this event instead of returning a value
@@ -266,13 +266,18 @@ class CoroutinePool(pools.Pool):
def _main_loop(self, sender): def _main_loop(self, sender):
""" Private, infinite loop run by a pooled coroutine. """ """ Private, infinite loop run by a pooled coroutine. """
while True: try:
recvd = sender.wait() while True:
sender = event() recvd = sender.wait()
(evt, func, args, kw) = recvd sender = event()
self._safe_apply(evt, func, args, kw) (evt, func, args, kw) = recvd
api.get_hub().cancel_timers(api.getcurrent()) self._safe_apply(evt, func, args, kw)
self.put(sender) api.get_hub().cancel_timers(api.getcurrent())
self.put(sender)
finally:
# if we get here, something broke badly, and all we can really
# do is try to keep the pool from leaking items
self.put(self.create())
def _safe_apply(self, evt, func, args, kw): def _safe_apply(self, evt, func, args, kw):
""" Private method that runs the function, catches exceptions, and """ Private method that runs the function, catches exceptions, and

View File

@@ -25,6 +25,8 @@ from eventlet import tests
from eventlet import timer from eventlet import timer
from eventlet import coros, api from eventlet import coros, api
import sys
class TestEvent(tests.TestCase): class TestEvent(tests.TestCase):
mode = 'static' mode = 'static'
def setUp(self): def setUp(self):
@@ -179,6 +181,33 @@ class TestCoroutinePool(tests.TestCase):
pool.execute_async(reenter_async) pool.execute_async(reenter_async)
evt.wait() evt.wait()
def test_horrible_main_loop_death(self):
# testing the case that causes the run_forever
# method to exit unwantedly
pool = coros.CoroutinePool(min_size=1, max_size=1)
def crash(*args, **kw):
raise RuntimeError("Whoa")
class FakeFile(object):
write = crash
# we're going to do this by causing the traceback.print_exc in
# safe_apply to raise an exception and thus exit _main_loop
normal_err = sys.stderr
try:
sys.stderr = FakeFile()
waiter = pool.execute(crash)
self.assertRaises(RuntimeError, waiter.wait)
# the pool should have something free at this point since the
# waiter returned
self.assertEqual(pool.free(), 1)
# shouldn't block when trying to get
t = api.exc_after(0.1, api.TimeoutError)
self.assert_(pool.get())
t.cancel()
finally:
sys.stderr = normal_err
class IncrActor(coros.Actor): class IncrActor(coros.Actor):
def received(self, evt): def received(self, evt):

View File

@@ -23,6 +23,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import copy
import datetime import datetime
import httplib import httplib
import os.path import os.path
@@ -218,9 +219,20 @@ class _LocalParams(_Params):
setattr(self, k, v) setattr(self, k, v)
def __getattr__(self, key): def __getattr__(self, key):
if key == '__setstate__': return
return getattr(self._delegate, key) return getattr(self._delegate, key)
def __reduce__(self):
params = copy.copy(self._delegate)
kwargs = copy.copy(self.__dict__)
assert(kwargs.has_key('_delegate'))
del kwargs['_delegate']
if hasattr(params,'aux'): del params.aux
return (_LocalParams,(params,),kwargs)
def __setitem__(self, k, item):
setattr(self, k, item)
class ConnectionError(Exception): class ConnectionError(Exception):
"""Detailed exception class for reporting on http connection problems. """Detailed exception class for reporting on http connection problems.
@@ -321,6 +333,14 @@ class TemporaryRedirect(Retriable):
class BadRequest(ConnectionError): class BadRequest(ConnectionError):
""" 400 Bad Request """ """ 400 Bad Request """
pass pass
class Unauthorized(ConnectionError):
""" 401 Unauthorized """
pass
class PaymentRequired(ConnectionError):
""" 402 Payment Required """
pass
class Forbidden(ConnectionError): class Forbidden(ConnectionError):
@@ -332,11 +352,42 @@ class NotFound(ConnectionError):
""" 404 Not Found """ """ 404 Not Found """
pass pass
class RequestTimeout(ConnectionError):
""" 408 RequestTimeout """
pass
class Gone(ConnectionError): class Gone(ConnectionError):
""" 410 Gone """ """ 410 Gone """
pass pass
class LengthRequired(ConnectionError):
""" 411 Length Required """
pass
class RequestEntityTooLarge(ConnectionError):
""" 413 Request Entity Too Large """
pass
class RequestURITooLong(ConnectionError):
""" 414 Request-URI Too Long """
pass
class UnsupportedMediaType(ConnectionError):
""" 415 Unsupported Media Type """
pass
class RequestedRangeNotSatisfiable(ConnectionError):
""" 416 Requested Range Not Satisfiable """
pass
class ExpectationFailed(ConnectionError):
""" 417 Expectation Failed """
pass
class NotImplemented(ConnectionError):
""" 501 Not Implemented """
pass
class ServiceUnavailable(Retriable): class ServiceUnavailable(Retriable):
""" 503 Service Unavailable """ """ 503 Service Unavailable """
@@ -349,6 +400,9 @@ class GatewayTimeout(Retriable):
def url(self): def url(self):
return self.params._delegate.url return self.params._delegate.url
class HTTPVersionNotSupported(ConnectionError):
""" 505 HTTP Version Not Supported """
pass
class InternalServerError(ConnectionError): class InternalServerError(ConnectionError):
""" 500 Internal Server Error """ """ 500 Internal Server Error """
@@ -362,7 +416,9 @@ class InternalServerError(ConnectionError):
traceback = llsd.parse(self.params.response_body) traceback = llsd.parse(self.params.response_body)
except: except:
traceback = self.params.response_body traceback = self.params.response_body
if isinstance(traceback, dict): if(isinstance(traceback, dict)
and 'stack-trace' in traceback
and 'description' in traceback):
body = traceback body = traceback
traceback = "Traceback (most recent call last):\n" traceback = "Traceback (most recent call last):\n"
for frame in body['stack-trace']: for frame in body['stack-trace']:
@@ -387,12 +443,23 @@ status_to_error_map = {
304: NotModified, 304: NotModified,
307: TemporaryRedirect, 307: TemporaryRedirect,
400: BadRequest, 400: BadRequest,
401: Unauthorized,
402: PaymentRequired,
403: Forbidden, 403: Forbidden,
404: NotFound, 404: NotFound,
408: RequestTimeout,
410: Gone, 410: Gone,
411: LengthRequired,
413: RequestEntityTooLarge,
414: RequestURITooLong,
415: UnsupportedMediaType,
416: RequestedRangeNotSatisfiable,
417: ExpectationFailed,
500: InternalServerError, 500: InternalServerError,
501: NotImplemented,
503: ServiceUnavailable, 503: ServiceUnavailable,
504: GatewayTimeout, 504: GatewayTimeout,
505: HTTPVersionNotSupported,
} }
scheme_to_factory_map = { scheme_to_factory_map = {

View File

@@ -670,8 +670,12 @@ def main():
# *HACK: some modules may emit on stderr, which breaks everything. # *HACK: some modules may emit on stderr, which breaks everything.
class NullSTDOut(object): class NullSTDOut(object):
def write(a, b): def noop(*args):
pass pass
write = noop
read = noop
flush = noop
sys.stderr = NullSTDOut() sys.stderr = NullSTDOut()
sys.stdout = NullSTDOut() sys.stdout = NullSTDOut()

View File

@@ -22,7 +22,18 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
""" """
import threading try:
import threading
except ImportError:
class Dummy(object): pass
the_thread = Dummy()
class threading(object):
def currentThread():
return the_thread
currentThread = staticmethod(currentThread)
import weakref import weakref
__all__ = ['local'] __all__ = ['local']

View File

@@ -165,6 +165,42 @@ def wrap_pipes_with_coroutine_pipes():
os.fork = new_fork os.fork = new_fork
os.waitpid = new_waitpid os.waitpid = new_waitpid
__original_select__ = select.select
def fake_select(r, w, e, timeout):
"""This is to cooperate with people who are trying to do blocking
reads with a timeout. This only works if r, w, and e aren't
bigger than len 1, and if either r or w is populated.
Install this with wrap_select_with_coroutine_select,
which makes the global select.select into fake_select.
"""
from eventlet import api
assert len(r) <= 1
assert len(w) <= 1
assert len(e) <= 1
if w and r:
raise RuntimeError('fake_select doesn\'t know how to do that yet')
try:
if r:
api.trampoline(r[0], read=True, timeout=timeout)
return r, [], []
else:
api.trampoline(w[0], write=True, timeout=timeout)
return [], w, []
except api.TimeoutError, e:
return [], [], []
except:
return [], [], e
def wrap_select_with_coroutine_select():
select.select = fake_select
def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50):
set_reuse_addr(descriptor) set_reuse_addr(descriptor)

View File

@@ -32,7 +32,7 @@ THE SOFTWARE.
from eventlet import api from eventlet import api
def handle_socket(client): def handle_socket(reader, writer):
print "client connected" print "client connected"
while True: while True:
# pass through every non-eof line # pass through every non-eof line
@@ -47,6 +47,6 @@ server = api.tcp_listener(('0.0.0.0', 6000))
while True: while True:
new_sock, address = server.accept() new_sock, address = server.accept()
# handle every new connection with a new coroutine # handle every new connection with a new coroutine
api.spawn(handle_socket, new_sock) api.spawn(handle_socket, new_sock.makefile('r'), new_sock.makefile('w'))
server.close() server.close()

View File

@@ -27,3 +27,4 @@ setup(
"Intended Audience :: Developers", "Intended Audience :: Developers",
"Development Status :: 4 - Beta"] "Development Status :: 4 - Beta"]
) )