From a9f9486f896d2771c3356eea406f921030e7bcef Mon Sep 17 00:00:00 2001 From: donovan Date: Thu, 22 May 2008 07:35:09 -0700 Subject: [PATCH] merge up to svn branch r120 --- eventlet/coros.py | 35 ++++++++++++--------- eventlet/coros_test.py | 29 +++++++++++++++++ eventlet/httpc.py | 71 ++++++++++++++++++++++++++++++++++++++++-- eventlet/saranwrap.py | 6 +++- eventlet/tls.py | 13 +++++++- eventlet/util.py | 36 +++++++++++++++++++++ examples/echoserver.py | 4 +-- setup.py | 1 + 8 files changed, 174 insertions(+), 21 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 4276e09..89e69f6 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -95,6 +95,14 @@ class event(object): self.epoch = time.time() self._result = NOT_USED self._waiters = {} + + def ready(self): + """ Return true if the wait() call will return immediately. + Used to avoid waiting for things that might take a while to time out. + For example, you can put a bunch of events into a list, and then visit + them all repeatedly, calling ready() until one returns True, and then + you can wait() on that one.""" + return self._result is not NOT_USED def wait(self): """Wait until another coroutine calls send. @@ -124,14 +132,6 @@ class event(object): raise self._exc return self._result - def ready(self): - """ Return true if the wait() call will return immediately. - Used to avoid waiting for things that might take a while to time out. - For example, you can put a bunch of events into a list, and then visit - them all repeatedly, calling ready() until one returns True, and then - you can wait() on that one.""" - return self._result is not NOT_USED - def cancel(self, waiter): """Raise an exception into a coroutine which called wait() an this event instead of returning a value @@ -266,13 +266,18 @@ class CoroutinePool(pools.Pool): def _main_loop(self, sender): """ Private, infinite loop run by a pooled coroutine. """ - while True: - recvd = sender.wait() - sender = event() - (evt, func, args, kw) = recvd - self._safe_apply(evt, func, args, kw) - api.get_hub().cancel_timers(api.getcurrent()) - self.put(sender) + try: + while True: + recvd = sender.wait() + sender = event() + (evt, func, args, kw) = recvd + self._safe_apply(evt, func, args, kw) + api.get_hub().cancel_timers(api.getcurrent()) + self.put(sender) + finally: + # if we get here, something broke badly, and all we can really + # do is try to keep the pool from leaking items + self.put(self.create()) def _safe_apply(self, evt, func, args, kw): """ Private method that runs the function, catches exceptions, and diff --git a/eventlet/coros_test.py b/eventlet/coros_test.py index a72bc7a..ae83724 100644 --- a/eventlet/coros_test.py +++ b/eventlet/coros_test.py @@ -25,6 +25,8 @@ from eventlet import tests from eventlet import timer from eventlet import coros, api +import sys + class TestEvent(tests.TestCase): mode = 'static' def setUp(self): @@ -179,6 +181,33 @@ class TestCoroutinePool(tests.TestCase): pool.execute_async(reenter_async) evt.wait() + def test_horrible_main_loop_death(self): + # testing the case that causes the run_forever + # method to exit unwantedly + pool = coros.CoroutinePool(min_size=1, max_size=1) + def crash(*args, **kw): + raise RuntimeError("Whoa") + class FakeFile(object): + write = crash + + # we're going to do this by causing the traceback.print_exc in + # safe_apply to raise an exception and thus exit _main_loop + normal_err = sys.stderr + try: + sys.stderr = FakeFile() + waiter = pool.execute(crash) + self.assertRaises(RuntimeError, waiter.wait) + # the pool should have something free at this point since the + # waiter returned + self.assertEqual(pool.free(), 1) + # shouldn't block when trying to get + t = api.exc_after(0.1, api.TimeoutError) + self.assert_(pool.get()) + t.cancel() + finally: + sys.stderr = normal_err + + class IncrActor(coros.Actor): def received(self, evt): diff --git a/eventlet/httpc.py b/eventlet/httpc.py index 34497b4..23d94dd 100644 --- a/eventlet/httpc.py +++ b/eventlet/httpc.py @@ -23,6 +23,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +import copy import datetime import httplib import os.path @@ -218,9 +219,20 @@ class _LocalParams(_Params): setattr(self, k, v) def __getattr__(self, key): + if key == '__setstate__': return return getattr(self._delegate, key) - + def __reduce__(self): + params = copy.copy(self._delegate) + kwargs = copy.copy(self.__dict__) + assert(kwargs.has_key('_delegate')) + del kwargs['_delegate'] + if hasattr(params,'aux'): del params.aux + return (_LocalParams,(params,),kwargs) + + def __setitem__(self, k, item): + setattr(self, k, item) + class ConnectionError(Exception): """Detailed exception class for reporting on http connection problems. @@ -321,6 +333,14 @@ class TemporaryRedirect(Retriable): class BadRequest(ConnectionError): """ 400 Bad Request """ pass + +class Unauthorized(ConnectionError): + """ 401 Unauthorized """ + pass + +class PaymentRequired(ConnectionError): + """ 402 Payment Required """ + pass class Forbidden(ConnectionError): @@ -332,11 +352,42 @@ class NotFound(ConnectionError): """ 404 Not Found """ pass +class RequestTimeout(ConnectionError): + """ 408 RequestTimeout """ + pass + class Gone(ConnectionError): """ 410 Gone """ pass +class LengthRequired(ConnectionError): + """ 411 Length Required """ + pass + +class RequestEntityTooLarge(ConnectionError): + """ 413 Request Entity Too Large """ + pass + +class RequestURITooLong(ConnectionError): + """ 414 Request-URI Too Long """ + pass + +class UnsupportedMediaType(ConnectionError): + """ 415 Unsupported Media Type """ + pass + +class RequestedRangeNotSatisfiable(ConnectionError): + """ 416 Requested Range Not Satisfiable """ + pass + +class ExpectationFailed(ConnectionError): + """ 417 Expectation Failed """ + pass + +class NotImplemented(ConnectionError): + """ 501 Not Implemented """ + pass class ServiceUnavailable(Retriable): """ 503 Service Unavailable """ @@ -349,6 +400,9 @@ class GatewayTimeout(Retriable): def url(self): return self.params._delegate.url +class HTTPVersionNotSupported(ConnectionError): + """ 505 HTTP Version Not Supported """ + pass class InternalServerError(ConnectionError): """ 500 Internal Server Error """ @@ -362,7 +416,9 @@ class InternalServerError(ConnectionError): traceback = llsd.parse(self.params.response_body) except: traceback = self.params.response_body - if isinstance(traceback, dict): + if(isinstance(traceback, dict) + and 'stack-trace' in traceback + and 'description' in traceback): body = traceback traceback = "Traceback (most recent call last):\n" for frame in body['stack-trace']: @@ -387,12 +443,23 @@ status_to_error_map = { 304: NotModified, 307: TemporaryRedirect, 400: BadRequest, + 401: Unauthorized, + 402: PaymentRequired, 403: Forbidden, 404: NotFound, + 408: RequestTimeout, 410: Gone, + 411: LengthRequired, + 413: RequestEntityTooLarge, + 414: RequestURITooLong, + 415: UnsupportedMediaType, + 416: RequestedRangeNotSatisfiable, + 417: ExpectationFailed, 500: InternalServerError, + 501: NotImplemented, 503: ServiceUnavailable, 504: GatewayTimeout, + 505: HTTPVersionNotSupported, } scheme_to_factory_map = { diff --git a/eventlet/saranwrap.py b/eventlet/saranwrap.py index 43e5fe3..cf52112 100644 --- a/eventlet/saranwrap.py +++ b/eventlet/saranwrap.py @@ -670,8 +670,12 @@ def main(): # *HACK: some modules may emit on stderr, which breaks everything. class NullSTDOut(object): - def write(a, b): + def noop(*args): pass + write = noop + read = noop + flush = noop + sys.stderr = NullSTDOut() sys.stdout = NullSTDOut() diff --git a/eventlet/tls.py b/eventlet/tls.py index 3cb3921..0016fbd 100644 --- a/eventlet/tls.py +++ b/eventlet/tls.py @@ -22,7 +22,18 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -import threading +try: + import threading +except ImportError: + class Dummy(object): pass + + the_thread = Dummy() + + class threading(object): + def currentThread(): + return the_thread + currentThread = staticmethod(currentThread) + import weakref __all__ = ['local'] diff --git a/eventlet/util.py b/eventlet/util.py index 35dbf78..31bdb46 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -165,6 +165,42 @@ def wrap_pipes_with_coroutine_pipes(): os.fork = new_fork os.waitpid = new_waitpid +__original_select__ = select.select + + +def fake_select(r, w, e, timeout): + """This is to cooperate with people who are trying to do blocking + reads with a timeout. This only works if r, w, and e aren't + bigger than len 1, and if either r or w is populated. + + Install this with wrap_select_with_coroutine_select, + which makes the global select.select into fake_select. + """ + from eventlet import api + + assert len(r) <= 1 + assert len(w) <= 1 + assert len(e) <= 1 + + if w and r: + raise RuntimeError('fake_select doesn\'t know how to do that yet') + + try: + if r: + api.trampoline(r[0], read=True, timeout=timeout) + return r, [], [] + else: + api.trampoline(w[0], write=True, timeout=timeout) + return [], w, [] + except api.TimeoutError, e: + return [], [], [] + except: + return [], [], e + + +def wrap_select_with_coroutine_select(): + select.select = fake_select + def socket_bind_and_listen(descriptor, addr=('', 0), backlog=50): set_reuse_addr(descriptor) diff --git a/examples/echoserver.py b/examples/echoserver.py index 3fe0191..8c1813a 100644 --- a/examples/echoserver.py +++ b/examples/echoserver.py @@ -32,7 +32,7 @@ THE SOFTWARE. from eventlet import api -def handle_socket(client): +def handle_socket(reader, writer): print "client connected" while True: # pass through every non-eof line @@ -47,6 +47,6 @@ server = api.tcp_listener(('0.0.0.0', 6000)) while True: new_sock, address = server.accept() # handle every new connection with a new coroutine - api.spawn(handle_socket, new_sock) + api.spawn(handle_socket, new_sock.makefile('r'), new_sock.makefile('w')) server.close() diff --git a/setup.py b/setup.py index fdd7d7c..25f340d 100644 --- a/setup.py +++ b/setup.py @@ -27,3 +27,4 @@ setup( "Intended Audience :: Developers", "Development Status :: 4 - Beta"] ) +