From ed9e9b61e01c6e3adbabd32d7f0085df68a47cbd Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Mon, 9 Jun 2008 17:48:55 -0700 Subject: [PATCH 01/12] [svn r123] Fixed infinite redirect problem that Nat discovered. --- eventlet/httpc.py | 22 ++++++++++++++++++---- eventlet/httpc_test.py | 9 ++++++--- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/eventlet/httpc.py b/eventlet/httpc.py index 7f68ffd..8e3d5ad 100644 --- a/eventlet/httpc.py +++ b/eventlet/httpc.py @@ -589,13 +589,27 @@ class HttpSuite(object): def head(self, *args, **kwargs): return self.head_(*args, **kwargs)[-1] - def get_(self, url, headers=None, use_proxy=False, ok=None, aux=None): + def get_(self, url, headers=None, use_proxy=False, ok=None, aux=None, max_retries=8): if headers is None: headers = {} headers['accept'] = self.fallback_content_type+';q=1,*/*;q=0' - return self.request_(_Params(url, 'GET', headers=headers, - loader=self.loader, dumper=self.dumper, - use_proxy=use_proxy, ok=ok, aux=aux)) + def req(): + return self.request_(_Params(url, 'GET', headers=headers, + loader=self.loader, dumper=self.dumper, + use_proxy=use_proxy, ok=ok, aux=aux)) + def retry_response(err): + def doit(): + return err.retry_() + return doit + retried = 0 + while retried <= max_retries: + try: + return req() + except (Found, TemporaryRedirect, MovedPermanently, SeeOther), e: + if retried >= max_retries: + raise + retried += 1 + req = retry_response(e) def get(self, *args, **kwargs): return self.get_(*args, **kwargs)[-1] diff --git a/eventlet/httpc_test.py b/eventlet/httpc_test.py index b0e14f0..66ee28d 100644 --- a/eventlet/httpc_test.py +++ b/eventlet/httpc_test.py @@ -263,11 +263,12 @@ class TestHttpc301(TestBase, tests.TestCase): def test_get(self): try: - httpc.get(self.base_url() + 'hello') + httpc.get(self.base_url() + 'hello', max_retries=0) self.assert_(False) except httpc.MovedPermanently, err: response = err.retry() self.assertEquals(response, 'hello world') + self.assertEquals(httpc.get(self.base_url() + 'hello', max_retries=1), 'hello world') def test_post(self): data = 'qunge' @@ -284,19 +285,21 @@ class TestHttpc302(TestBase, tests.TestCase): def test_get_expired(self): try: - httpc.get(self.base_url() + 'expired/hello') + httpc.get(self.base_url() + 'expired/hello', max_retries=0) self.assert_(False) except httpc.Found, err: response = err.retry() self.assertEquals(response, 'hello world') + self.assertEquals(httpc.get(self.base_url() + 'expired/hello', max_retries=1), 'hello world') def test_get_expires(self): try: - httpc.get(self.base_url() + 'expires/hello') + httpc.get(self.base_url() + 'expires/hello', max_retries=0) self.assert_(False) except httpc.Found, err: response = err.retry() self.assertEquals(response, 'hello world') + self.assertEquals(httpc.get(self.base_url() + 'expires/hello', max_retries=1), 'hello world') class TestHttpc303(TestBase, tests.TestCase): From 34e149a05f467b45870527dc610ff516efd42f22 Mon Sep 17 00:00:00 2001 From: "nat.linden" Date: Wed, 11 Jun 2008 14:20:11 -0700 Subject: [PATCH 02/12] [svn r124] Publish greenlib.greenlet_dict() as api.local_dict() to provide a consistent interface to "coroutine-local storage." --- eventlet/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/eventlet/api.py b/eventlet/api.py index e9877fa..267ec83 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -230,6 +230,7 @@ def sleep(timeout=0): switch = greenlib.switch +local_dict = greenlib.greenlet_dict getcurrent = greenlet.getcurrent GreenletExit = greenlet.GreenletExit From 697a98e57812d1abc49c16cc7cec4829067de646 Mon Sep 17 00:00:00 2001 From: "nat.linden" Date: Wed, 11 Jun 2008 14:53:12 -0700 Subject: [PATCH 03/12] [svn r125] Introduce new CoroutinePool methods: wait_all() to wait until all coroutines launched with either execute() or execute_async() have completed. launch_all() to execute_async() a given function with each of a sequence of sets of arguments. process_all(), shorthand for launch_all() followed by wait_all(). Introduce metaphore, a sort of inverse semaphore on which wait_all() is implemented. --- eventlet/coros.py | 146 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index cef72f9..045b1eb 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -209,6 +209,61 @@ class event(object): for waiter in self._waiters: hub.schedule_call(0, greenlib.switch, waiter, self._result) +class metaphore(object): + """This is sort of an inverse semaphore: a counter that starts at 0 and + waits only if nonzero. It's used to implement a "wait for all" scenario. + + >>> from eventlet import api, coros + >>> count = coros.metaphore() + >>> count.wait() + >>> def decrementer(count, id): + ... print "%s decrementing" % id + ... count.dec() + ... + >>> _ = api.spawn(decrementer, count, 'A') + >>> _ = api.spawn(decrementer, count, 'B') + >>> count.inc(2) + >>> count.wait() + A decrementing + B decrementing + """ + def __init__(self): + self.counter = 0 + self.event = event() + # send() right away, else we'd wait on the default 0 count! + self.event.send() + + def inc(self, by=1): + """Increment our counter. If this transitions the counter from zero to + nonzero, make any subsequent wait() call wait. + """ + assert by > 0 + self.counter += by + if self.counter == by: + # If we just incremented self.counter by 'by', and the new count + # equals 'by', then the old value of self.counter was 0. + # Transitioning from 0 to a nonzero value means wait() must + # actually wait. + self.event.reset() + + def dec(self, by=1): + """Decrement our counter. If this transitions the counter from nonzero + to zero, a current or subsequent wait() call need no longer wait. + """ + assert by > 0 + self.counter -= by + if self.counter <= 0: + # Don't leave self.counter < 0, that will screw things up in + # future calls. + self.counter = 0 + # Transitioning from nonzero to 0 means wait() need no longer wait. + self.event.send() + + def wait(self): + """Suspend the caller only if our count is nonzero. In that case, + resume the caller once the count decrements to zero again. + """ + self.event.wait() def execute(func, *args, **kw): """ Executes an operation asynchronously in a new coroutine, returning @@ -262,8 +317,27 @@ class CoroutinePool(pools.Pool): def __init__(self, min_size=0, max_size=4): self._greenlets = set() + self.requested = metaphore() super(CoroutinePool, self).__init__(min_size, max_size) - + +## This doesn't yet pass its own doctest -- but I'm not even sure it's a +## wonderful idea. +## def __del__(self): +## """Experimental: try to prevent the calling script from exiting until +## all coroutines in this pool have run to completion. + +## >>> from eventlet import coros +## >>> pool = coros.CoroutinePool() +## >>> def saw(x): print "I saw %s!" +## ... +## >>> pool.launch_all(saw, "GHI") +## >>> del pool +## I saw G! +## I saw H! +## I saw I! +## """ +## self.wait_all() + def _main_loop(self, sender): """ Private, infinite loop run by a pooled coroutine. """ try: @@ -283,7 +357,12 @@ class CoroutinePool(pools.Pool): """ Private method that runs the function, catches exceptions, and passes back the return value in the event.""" try: - result = func(*args, **kw) + try: + result = func(*args, **kw) + finally: + # Be sure to decrement requested coroutines, no matter HOW we + # leave. (See _execute().) + self.requested.dec() if evt is not None: evt.send(result) except api.GreenletExit, e: @@ -302,6 +381,11 @@ class CoroutinePool(pools.Pool): def _execute(self, evt, func, args, kw): """ Private implementation of the execute methods. """ + # Track number of requested coroutines. Note the asymmetry: we + # increment when the coroutine is first REQUESTED, long before it + # actually starts up; we don't decrement until it completes. (See + # _safe_apply().) + self.requested.inc() # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine if self.free() == 0 and api.getcurrent() in self._greenlets: @@ -354,6 +438,64 @@ class CoroutinePool(pools.Pool): """ self._execute(None, func, args, kw) + def wait_all(self): + """Wait until all coroutines started either by execute() or + execute_async() have completed. If you kept the event objects returned + by execute(), you can then call their individual wait() methods to + retrieve results with no further actual waiting. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> pool.wait_all() + >>> def hi(name): + ... print "Hello, %s!" % name + ... return name + ... + >>> evt = pool.execute(hi, "world") + >>> pool.execute_async(hi, "darkness, my old friend") + >>> pool.wait_all() + Hello, world! + Hello, darkness, my old friend! + >>> evt.wait() + 'world' + >>> pool.wait_all() + """ + self.requested.wait() + + def launch_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> def saw(x): + ... print "I saw %s!" % x + ... + >>> pool.launch_all(saw, "ABC") + >>> pool.wait_all() + I saw A! + I saw B! + I saw C! + """ + for tup in iterable: + self.execute_async(function, *tup) + + def process_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. Don't + return until they've all completed. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> def saw(x): print "I saw %s!" % x + ... + >>> pool.process_all(saw, "DEF") + I saw D! + I saw E! + I saw F! + """ + self.launch_all(function, iterable) + self.wait_all() class pipe(object): """ Implementation of pipe using events. Not tested! Not used, either.""" From 9222d5f2429d1c61aa97780975d5dc4d8af93e5e Mon Sep 17 00:00:00 2001 From: "nat.linden" Date: Thu, 12 Jun 2008 12:26:45 -0700 Subject: [PATCH 04/12] [svn r126] Add CoroutinePool.generate_results(function, iterable) method with signature like process_all(), but for use when you care about the value returned by each call to function(). generate_results() yields results more or less as soon as they become available, allowing the caller to decide how to process or aggregate them. Add cross-coroutine queue class, with an API like a subset of event. Add classic semaphore class implemented on an event and a counter. --- eventlet/coros.py | 325 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 323 insertions(+), 2 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 045b1eb..6e1ec63 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -209,6 +209,86 @@ class event(object): for waiter in self._waiters: hub.schedule_call(0, greenlib.switch, waiter, self._result) + +class semaphore(object): + """Classic semaphore implemented with a counter and an event. + Optionally initialize with a resource count, then acquire() and release() + resources as needed. Attempting to acquire() when count is zero suspends + the calling coroutine until count becomes nonzero again. + + >>> from eventlet import coros, api + >>> sem = coros.semaphore(2, limit=3) + >>> sem.acquire() + >>> sem.acquire() + >>> def releaser(sem): + ... print "releasing one" + ... sem.release() + ... + >>> _ = api.spawn(releaser, sem) + >>> sem.acquire() + releasing one + >>> sem.counter + 0 + >>> for x in xrange(3): + ... sem.release() + ... + >>> def acquirer(sem): + ... print "acquiring one" + ... sem.acquire() + ... + >>> _ = api.spawn(acquirer, sem) + >>> sem.release() + acquiring one + >>> sem.counter + 3 + """ + def __init__(self, count=0, limit=None): + if limit is not None and count > limit: + # Prevent initializing with inconsistent values + count = limit + self.counter = count + self.limit = limit + self.acqevent = event() + self.relevent = event() + if self.counter > 0: + # If we initially have items, then don't block acquire()s. + self.acqevent.send() + if self.limit is None or self.counter < self.limit: + # If either there's no limit or we're below it, don't block on + # release()s. + self.relevent.send() + + def acquire(self): + # This logic handles the self.limit is None case because None != any integer. + while self.counter == 0: + # Loop until there are resources to acquire. We loop because we + # could be one of several coroutines waiting for a single item. If + # we all get notified, only one is going to claim it, and the rest + # of us must continue waiting. + self.acqevent.wait() + # claim the resource + self.counter -= 1 + if self.counter == 0: + # If we just transitioned from having a resource to having none, + # make anyone else's wait() actually wait. + self.acqevent.reset() + if self.counter + 1 == self.limit: + # If we just transitioned from being full to having room for one + # more resource, notify whoever was waiting to release one. + self.relevent.send() + + def release(self): + # This logic handles the self.limit is None case because None != any integer. + while self.counter == self.limit: + self.relevent.wait() + self.counter += 1 + if self.counter == self.limit: + self.relevent.reset() + if self.counter == 1: + # If self.counter was 0 before we incremented it, then wake up + # anybody who was waiting + self.acqevent.send() + class metaphore(object): """This is sort of an inverse semaphore: a counter that starts at 0 and waits only if nonzero. It's used to implement a "wait for all" scenario. @@ -465,6 +545,9 @@ class CoroutinePool(pools.Pool): def launch_all(self, function, iterable): """For each tuple (sequence) in iterable, launch function(*tuple) in its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). You should call wait_all() to + wait for all coroutines, newly-launched plus any previously-submitted + execute() or execute_async() calls, to complete. >>> from eventlet import coros >>> pool = coros.CoroutinePool() @@ -482,8 +565,10 @@ class CoroutinePool(pools.Pool): def process_all(self, function, iterable): """For each tuple (sequence) in iterable, launch function(*tuple) in - its own coroutine -- like itertools.starmap(), but in parallel. Don't - return until they've all completed. + its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). Don't return until all + coroutines, newly-launched plus any previously-submitted execute() or + execute_async() calls, have completed. >>> from eventlet import coros >>> pool = coros.CoroutinePool() @@ -497,6 +582,178 @@ class CoroutinePool(pools.Pool): self.launch_all(function, iterable) self.wait_all() + def generate_results(self, function, iterable, qsize=None): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Yield each of the values returned by function(), in the order they're + completed rather than the order the coroutines were launched. + + Iteration stops when we've yielded results for each arguments tuple in + iterable. Unlike wait_all() and process_all(), this function does not + wait for any previously-submitted execute() or execute_async() calls. + + Results are temporarily buffered in a queue. If you pass qsize=, this + value is used to limit the max size of the queue: an attempt to buffer + too many results will suspend the completed CoroutinePool coroutine + until the requesting coroutine (the caller of generate_results()) has + retrieved one or more results by calling this generator-iterator's + next(). + + If any coroutine raises an uncaught exception, that exception will + propagate to the requesting coroutine via the corresponding next() call. + + What I particularly want these tests to illustrate is that using this + generator function: + + for result in generate_results(function, iterable): + # ... do something with result ... + + executes coroutines at least as aggressively as the classic eventlet + idiom: + + events = [pool.execute(function, *args) for args in iterable] + for event in events: + result = event.wait() + # ... do something with result ... + + even without a distinct event object for every arg tuple in iterable, + and despite the funny flow control from interleaving launches of new + coroutines with yields of completed coroutines' results. + + (The use case that makes this function preferable to the classic idiom + above is when the iterable, which may itself be a generator, produces + millions of items.) + + >>> from eventlet import coros + >>> import string + >>> pool = coros.CoroutinePool(max_size=5) + >>> pausers = [coros.event() for x in xrange(2)] + >>> def longtask(evt, desc): + ... print "%s woke up with %s" % (desc, evt.wait()) + ... + >>> pool.launch_all(longtask, zip(pausers, "AB")) + >>> def quicktask(desc): + ... print "returning %s" % desc + ... return desc + ... + + (Instead of using a for loop, step through generate_results() + items individually to illustrate timing) + + >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) + >>> print step.next() + returning a + returning b + returning c + a + >>> print step.next() + b + >>> print step.next() + c + >>> print step.next() + returning d + returning e + returning f + d + >>> pausers[0].send("A") + >>> print step.next() + e + >>> print step.next() + f + >>> print step.next() + A woke up with A + returning g + returning h + returning i + g + >>> print "".join([step.next() for x in xrange(3)]) + returning j + returning k + returning l + returning m + hij + >>> pausers[1].send("B") + >>> print "".join([step.next() for x in xrange(4)]) + B woke up with B + returning n + returning o + returning p + returning q + klmn + """ + # Get an iterator because of our funny nested loop below. Wrap the + # iterable in enumerate() so we count items that come through. + tuples = iter(enumerate(iterable)) + # If the iterable is empty, this whole function is a no-op, and we can + # save ourselves some grief by just quitting out. In particular, once + # we enter the outer loop below, we're going to wait on the queue -- + # but if we launched no coroutines with that queue as the destination, + # we could end up waiting a very long time. + try: + index, args = tuples.next() + except StopIteration: + return + # From this point forward, 'args' is the current arguments tuple and + # 'index+1' counts how many such tuples we've seen. + # This implementation relies on the fact that _execute() accepts an + # event-like object, and -- unless it's None -- the completed + # coroutine calls send(result). We slyly pass a queue rather than an + # event -- the same queue instance for all coroutines. This is why our + # queue interface intentionally resembles the event interface. + q = queue(max_size=qsize) + # How many results have we yielded so far? + finished = 0 + # This first loop is only until we've launched all the coroutines. Its + # complexity is because if iterable contains more args tuples than the + # size of our pool, attempting to _execute() the (poolsize+1)th + # coroutine would suspend until something completes and send()s its + # result to our queue. But to keep down queue overhead and to maximize + # responsiveness to our caller, we'd rather suspend on reading the + # queue. So we stuff the pool as full as we can, then wait for + # something to finish, then stuff more coroutines into the pool. + try: + while True: + # Before each yield, start as many new coroutines as we can fit. + # (The self.free() test isn't 100% accurate: if we happen to be + # executing in one of the pool's coroutines, we could _execute() + # without waiting even if self.free() reports 0. See _execute().) + # The point is that we don't want to wait in the _execute() call, + # we want to wait in the q.wait() call. + # IMPORTANT: at start, and whenever we've caught up with all + # coroutines we've launched so far, we MUST iterate this inner + # loop at least once, regardless of self.free() -- otherwise the + # q.wait() call below will deadlock! + # Recall that index is the index of the NEXT args tuple that we + # haven't yet launched. Therefore it counts how many args tuples + # we've launched so far. + while self.free() > 0 or finished == index: + # Just like the implementation of execute_async(), save that + # we're passing our queue instead of None as the "event" to + # which to send() the result. + self._execute(q, function, args, {}) + # We've consumed that args tuple, advance to next. + index, args = tuples.next() + # Okay, we've filled up the pool again, yield a result -- which + # will probably wait for a coroutine to complete. Although we do + # have q.ready(), so we could iterate without waiting, we avoid + # that because every yield could involve considerable real time. + # We don't know how long it takes to return from yield, so every + # time we do, take the opportunity to stuff more requests into the + # pool before yielding again. + yield q.wait() + # Be sure to count results so we know when to stop! + finished += 1 + except StopIteration: + pass + # Here we've exhausted the input iterable. index+1 is the total number + # of coroutines we've launched. We probably haven't yielded that many + # results yet. Wait for the rest of the results, yielding them as they + # arrive. + while finished < index + 1: + yield q.wait() + finished += 1 + + class pipe(object): """ Implementation of pipe using events. Not tested! Not used, either.""" def __init__(self): @@ -518,6 +775,70 @@ class pipe(object): return buf +class queue(object): + """Cross-coroutine queue, using semaphore to synchronize. + The API is like a generalization of event to be able to hold more than one + item at a time (without reset() or cancel()). + + >>> from eventlet import coros + >>> q = coros.queue(max_size=2) + >>> def putter(q): + ... q.send("first") + ... + >>> _ = api.spawn(putter, q) + >>> q.ready() + False + >>> q.wait() + 'first' + >>> q.ready() + False + >>> q.send("second") + >>> q.ready() + True + >>> q.send("third") + >>> def getter(q): + ... print q.wait() + ... + >>> _ = api.spawn(getter, q) + >>> q.send("fourth") + second + """ + def __init__(self, max_size=None): + """If you omit max_size, the queue will attempt to store an unlimited + number of items. + Specifying max_size means that when the queue already contains + max_size items, an attempt to send() one more item will suspend the + calling coroutine until someone else retrieves one. + """ + self.items = collections.deque() + self.sem = semaphore(count=0, limit=max_size) + + def send(self, result=None, exc=None): + """If you send(exc=SomeExceptionClass), the corresponding wait() call + will raise that exception. + Otherwise, the corresponding wait() will return result (default None). + """ + self.items.append((result, exc)) + self.sem.release() + + def wait(self): + """Wait for an item sent by a send() call, in FIFO order. + If the corresponding send() specifies exc=SomeExceptionClass, this + wait() will raise that exception. + Otherwise, this wait() will return the corresponding send() call's + result= parameter. + """ + self.sem.acquire() + result, exc = self.items.popleft() + if exc is not None: + raise exc + return result + + def ready(self): + # could also base this on self.sem.counter... + return len(self.items) > 0 + + class Actor(object): """ A free-running coroutine that accepts and processes messages. From 85ffa7659849db167abf59a4f8eaa6af60f511ec Mon Sep 17 00:00:00 2001 From: "nat.linden" Date: Sat, 14 Jun 2008 04:25:53 -0700 Subject: [PATCH 05/12] [svn r127] Tidy up the implementation of CoroutinePool.wait_all() by overriding Pool.get() and Pool.put() to manage our metaphore, rather than inserting calls inline. --- eventlet/coros.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 6e1ec63..28810a3 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -437,12 +437,7 @@ class CoroutinePool(pools.Pool): """ Private method that runs the function, catches exceptions, and passes back the return value in the event.""" try: - try: - result = func(*args, **kw) - finally: - # Be sure to decrement requested coroutines, no matter HOW we - # leave. (See _execute().) - self.requested.dec() + result = func(*args, **kw) if evt is not None: evt.send(result) except api.GreenletExit, e: @@ -461,11 +456,6 @@ class CoroutinePool(pools.Pool): def _execute(self, evt, func, args, kw): """ Private implementation of the execute methods. """ - # Track number of requested coroutines. Note the asymmetry: we - # increment when the coroutine is first REQUESTED, long before it - # actually starts up; we don't decrement until it completes. (See - # _safe_apply().) - self.requested.inc() # if reentering an empty pool, don't try to wait on a coroutine freeing # itself -- instead, just execute in the current coroutine if self.free() == 0 and api.getcurrent() in self._greenlets: @@ -484,6 +474,20 @@ class CoroutinePool(pools.Pool): sender = event() self._greenlets.add(api.spawn(self._main_loop, sender)) return sender + + def get(self): + """Override of eventlet.pools.Pool interface""" + # Track the number of requested CoroutinePool coroutines + self.requested.inc() + # forward call to base class + return super(CoroutinePool, self).get() + + def put(self, item): + """Override of eventlet.pools.Pool interface""" + # forward call to base class + super(CoroutinePool, self).put(item) + # Track the number of outstanding CoroutinePool coroutines + self.requested.dec() def execute(self, func, *args, **kw): """Execute func in one of the coroutines maintained From a5015a6e7d47c7115a00839f1124ad1c8a0c6e1c Mon Sep 17 00:00:00 2001 From: "nat.linden" Date: Thu, 19 Jun 2008 16:53:22 -0700 Subject: [PATCH 06/12] [svn r128] With rdw's help, introduce an HttpStreamSuite subclass of HttpSuite whose get() method returns an open httplib.HTTPResponse object from which you can incrementally read the response body, get response headers etc. Only lightly tested. --- eventlet/httpc.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/eventlet/httpc.py b/eventlet/httpc.py index 8e3d5ad..ad72e62 100644 --- a/eventlet/httpc.py +++ b/eventlet/httpc.py @@ -656,6 +656,51 @@ class HttpSuite(object): return self.post_(*args, **kwargs)[-1] +class HttpStreamSuite(HttpSuite): + def request_(self, params): + '''Make an http request to a url, for internal use mostly.''' + + params = _LocalParams(params, instance=self) + + (scheme, location, path, parameters, query, + fragment) = urlparse.urlparse(params.url) + + if params.use_proxy: + if scheme == 'file': + params.use_proxy = False + else: + params.headers['host'] = location + + if not params.use_proxy: + params.path = path + if query: + params.path += '?' + query + + params.orig_body = params.body + + if params.method in ('PUT', 'POST'): + if self.dumper is not None: + params.body = self.dumper(params.body) + # don't set content-length header because httplib does it + # for us in _send_request + else: + params.body = '' + + params.response = self._get_response_body(params) + response = params.response + + return response.status, response.msg, response + + def _get_response_body(self, params): + connection = connect(params.url, params.use_proxy) + connection.request(params.method, params.path, params.body, + params.headers) + params.response = connection.getresponse() + #connection.close() + self._check_status(params) + + return params.response + def make_suite(dumper, loader, fallback_content_type): """ Return a tuple of methods for making http requests with automatic bidirectional formatting with a particular content-type.""" suite = HttpSuite(dumper, loader, fallback_content_type) From c138b433d8429a943b47e4c388cd136b39f172e1 Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Wed, 25 Jun 2008 11:53:37 -0700 Subject: [PATCH 07/12] [svn r131] Fix for some SwitchingToDeadGreenlet or TypeError: '_socketobject' object is not iterable exceptions: Passing both read=True and write=True to trampoline would cause the fd to select as both readable and writable in some error conditions. This would cause the greenlet to be spuriously resumed a second time. (commit comment stolen from donovan) --- eventlet/wrappedfd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventlet/wrappedfd.py b/eventlet/wrappedfd.py index 9175fdd..0cdebac 100644 --- a/eventlet/wrappedfd.py +++ b/eventlet/wrappedfd.py @@ -154,13 +154,13 @@ class wrapped_fd(object): client, addr = res util.set_nonblocking(client) return type(self)(client), addr - trampoline(fd, read=True, write=True) + trampoline(fd, read=True) def connect(self, address): fd = self.fd connect = util.socket_connect while not connect(fd, address): - trampoline(fd, read=True, write=True) + trampoline(fd, write=True) recv = higher_order_recv(util.socket_recv) From 4a6a3c257d0d974fb9e797e93a2ed6574225d803 Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Wed, 25 Jun 2008 11:55:29 -0700 Subject: [PATCH 08/12] [svn r132] Timeout calls previously didn't work because the number of arguments passed in to schedule_call didn't match the signature of the _do_timout callback. --- eventlet/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/api.py b/eventlet/api.py index 267ec83..82e8f7c 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -145,7 +145,7 @@ def trampoline(fd, read=None, write=None, timeout=None): hub.remove_descriptor(fileno) greenlib.switch(self, fd) if timeout is not None: - t = hub.schedule_call(timeout, _do_timeout) + t = hub.schedule_call(timeout, _do_timeout, fileno) hub.add_descriptor(fileno, read and cb, write and cb, _do_close) return hub.switch() From 757d1ff7a01a49746abea1104cc0a0c267cd7312 Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Thu, 3 Jul 2008 19:41:39 -0700 Subject: [PATCH 09/12] [svn r134] Fixes for some interesting issues discovered only after our count of unit tests using database connections crested 100. --- eventlet/coros.py | 18 +++++++++++++++++- eventlet/db_pool.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index 28810a3..a918d11 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -423,14 +423,30 @@ class CoroutinePool(pools.Pool): try: while True: recvd = sender.wait() + # Delete the sender's result here because the very + # first event through the loop is referenced by + # spawn_startup, and therefore is not itself deleted. + # This means that we have to free up its argument + # because otherwise said argument persists in memory + # forever. This is generally only a problem in unit + # tests. + sender._result = NOT_USED + sender = event() (evt, func, args, kw) = recvd self._safe_apply(evt, func, args, kw) + # Likewise, delete these variables or else they will + # be referenced by this frame until replaced by the + # next recvd, which may or may not be a long time from + # now. + del evt, func, args, kw, recvd + api.get_hub().runloop.cancel_timers(api.getcurrent()) self.put(sender) finally: # if we get here, something broke badly, and all we can really - # do is try to keep the pool from leaking items + # do is try to keep the pool from leaking items. + # Shouldn't even try to print the exception. self.put(self.create()) def _safe_apply(self, evt, func, args, kw): diff --git a/eventlet/db_pool.py b/eventlet/db_pool.py index 3c91793..aefe5f8 100644 --- a/eventlet/db_pool.py +++ b/eventlet/db_pool.py @@ -91,6 +91,8 @@ class BaseConnectionPool(Pool): # it's dead or None try: conn.rollback() + except KeyboardInterrupt: + raise except AttributeError, e: # this means it's already been destroyed, so we don't need to print anything conn = None @@ -113,6 +115,20 @@ class BaseConnectionPool(Pool): super(BaseConnectionPool, self).put(conn) else: self.current_size -= 1 + + def clear(self): + """ Close all connections that this pool still holds a reference to, leaving it empty.""" + for conn in self.free_items: + try: + conn.close() + except KeyboardInterrupt: + raise + except: + pass # even if stuff happens here, we still want to at least try to close all the other connections + self.free_items.clear() + + def __del__(self): + self.clear() class SaranwrappedConnectionPool(BaseConnectionPool): From 22891f0fd14601df4e857830748a02eb8708846a Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Tue, 8 Jul 2008 12:37:04 -0700 Subject: [PATCH 10/12] [svn r135] Trampoline timeout works now. --- eventlet/api_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventlet/api_test.py b/eventlet/api_test.py index b2c371a..40142f3 100644 --- a/eventlet/api_test.py +++ b/eventlet/api_test.py @@ -90,7 +90,7 @@ class TestApi(tests.TestCase): check_hub() - def dont_test_trampoline_timeout(self): + def test_trampoline_timeout(self): """This test is broken. Please change it's name to test_trampoline_timeout, and fix the bug (or fix the test) """ From b252a5d07df0a5266fba1b0bda1fa3cef85a5925 Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Tue, 15 Jul 2008 23:56:52 -0700 Subject: [PATCH 11/12] [svn r141] Added broken test case for SSL sockets, with supporting test server certs (self-signed, natch). --- eventlet/api_test.py | 37 ++++++++++++++++++++++++++++++------- eventlet/test_server.crt | 15 +++++++++++++++ eventlet/test_server.key | 15 +++++++++++++++ 3 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 eventlet/test_server.crt create mode 100644 eventlet/test_server.key diff --git a/eventlet/api_test.py b/eventlet/api_test.py index 40142f3..5ebf718 100644 --- a/eventlet/api_test.py +++ b/eventlet/api_test.py @@ -24,6 +24,8 @@ THE SOFTWARE. from eventlet import tests from eventlet import api, wrappedfd, util + +import os.path import socket @@ -40,6 +42,10 @@ def check_hub(): class TestApi(tests.TestCase): mode = 'static' + + certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') + private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') + def test_tcp_listener(self): socket = api.tcp_listener(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' @@ -47,10 +53,7 @@ class TestApi(tests.TestCase): check_hub() - def dont_test_connect_tcp(self): - """This test is broken. Please name it test_connect_tcp and fix - the bug (or the test) so it passes. - """ + def test_connect_tcp(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() @@ -70,6 +73,29 @@ class TestApi(tests.TestCase): check_hub() + def dont_test_connect_ssl(self): + """ This test is broken, please fix it, remove the dont from the + name and remove this comment""" + def accept_once(listenfd): + try: + conn, addr = listenfd.accept() + conn.write('hello\n') + conn.close() + finally: + listenfd.close() + + server = api.ssl_listener(('0.0.0.0', 0), + self.certificate_file, + self.private_key_file) + api.spawn(accept_once, server) + + client = util.wrap_ssl( + api.connect_tcp(('127.0.0.1', server.getsockname()[1]))) + assert client.readline() == 'hello\n' + + assert client.read() == '' + client.close() + def test_server(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] @@ -91,9 +117,6 @@ class TestApi(tests.TestCase): check_hub() def test_trampoline_timeout(self): - """This test is broken. Please change it's name to test_trampoline_timeout, - and fix the bug (or fix the test) - """ server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] diff --git a/eventlet/test_server.crt b/eventlet/test_server.crt new file mode 100644 index 0000000..1379e1d --- /dev/null +++ b/eventlet/test_server.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICYzCCAcwCCQD5jx1Aa0dytjANBgkqhkiG9w0BAQQFADB2MQswCQYDVQQGEwJU +UzENMAsGA1UECBMEVGVzdDENMAsGA1UEBxMEVGVzdDEWMBQGA1UEChMNVGVzdCBF +dmVudGxldDENMAsGA1UECxMEVGVzdDENMAsGA1UEAxMEVGVzdDETMBEGCSqGSIb3 +DQEJARYEVGVzdDAeFw0wODA3MDgyMTExNDJaFw0xMDAyMDgwODE1MTBaMHYxCzAJ +BgNVBAYTAlRTMQ0wCwYDVQQIEwRUZXN0MQ0wCwYDVQQHEwRUZXN0MRYwFAYDVQQK +Ew1UZXN0IEV2ZW50bGV0MQ0wCwYDVQQLEwRUZXN0MQ0wCwYDVQQDEwRUZXN0MRMw +EQYJKoZIhvcNAQkBFgRUZXN0MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDM +WcyeIiHQuEGQxgTIvu0aOW4iRFAyUEi8pLWNCxMEHglF8k6OxFVq7XWZMDnDFVnb +ZjmQh5Tc21Ae6cXzxXln578fROXHEzXo3Is8HUlq3ug1yYOGHjxw++Opjf1uoHwP +EBUKsz/flS7knuscgFM9FO05KSPn2wHnZeIDta4yTwIDAQABMA0GCSqGSIb3DQEB +BAUAA4GBAKM71aP0r26gEEEBzovfXm1IwKav6R9/xiWsJ4pFsUXVotcaIjcVBDG1 +Z7tz688hokb+GNxsTI2gNfqanqUnfP9wZxnKRmfTSOvb5aWHIiaiMXSgjiPlqBcm +6mnSeEbSMM9cw479wWhh1YqY8tf3gYJa+sxznVWLSfVLpsjRMphe +-----END CERTIFICATE----- diff --git a/eventlet/test_server.key b/eventlet/test_server.key new file mode 100644 index 0000000..24cd8e5 --- /dev/null +++ b/eventlet/test_server.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXgIBAAKBgQDMWcyeIiHQuEGQxgTIvu0aOW4iRFAyUEi8pLWNCxMEHglF8k6O +xFVq7XWZMDnDFVnbZjmQh5Tc21Ae6cXzxXln578fROXHEzXo3Is8HUlq3ug1yYOG +Hjxw++Opjf1uoHwPEBUKsz/flS7knuscgFM9FO05KSPn2wHnZeIDta4yTwIDAQAB +AoGBAKWfvq0IIvok7Ncm92ew/0D6/R1+2rT8xwdGQ/Nt31q98WwkqLEjxctlbKPd +J2PLIUomf0955BhhFH4JoSwjiHJQ6uishY7srjQQDX/Dxdi5wZAyxYCIVW/kAA9N +/u2s75hSD3s/rqAwOZ182DwAPIqJc4KQoYzvlKERSMDT1PJhAkEA5SUFsiSzBEMX +FyZ++ZMMs1vHrTu5oTK7WHznh9lk7dvsnp9BoUPqhiu8iJ7Q23zj0u5asz2czu11 +nnczXgU6XwJBAORM5Ib4I7nAsoUWn9wDiTwVQeE+D9P1ac9p7EHm7XXuf8o2irRZ +wYYfpXXsjk496YfyQFcQRMk0tU0gegCP7hECQFWRWqwoajUoPIInnPjjwbVki48U +I4CfqjgkBG3Fb5wnKRgezmpDK1vJD1FRRRsBay4EVhhi5KCdKfPv/V2ZxC8CQQCu +U5SxBytofJ8UhxkcTErvaR/8GYLGi//21GAGVop+YdaMlydE3cCrZODYcgCb+CSp +nS7KDG8p4KiMMz9VzJGxAkEAv85K6Sa3H8g9h7LwopBZ5tFNZUaFWo7lEP7DDMH0 +eckZTb1JVpyT/8zrDtsis4WlV9zVkVHxkIaad503BjqvEQ== +-----END RSA PRIVATE KEY----- From d2209a4b381a740202c377756c97cc207532812c Mon Sep 17 00:00:00 2001 From: "which.linden" Date: Wed, 16 Jul 2008 12:09:13 -0700 Subject: [PATCH 12/12] [svn r142] Fix for EVT-4: eventlet's https client is unable to communicate with it's https server. This turned out to be a combination of two factors: 1) the ssl server wasn't properly shutting down its socket before closing it, and 2) the client wasn't properly ignoring the error that this condition generates. --- eventlet/api_test.py | 5 +---- eventlet/util.py | 3 +-- eventlet/wrappedfd.py | 12 ++++++++++++ 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/eventlet/api_test.py b/eventlet/api_test.py index 5ebf718..7cd1138 100644 --- a/eventlet/api_test.py +++ b/eventlet/api_test.py @@ -73,9 +73,7 @@ class TestApi(tests.TestCase): check_hub() - def dont_test_connect_ssl(self): - """ This test is broken, please fix it, remove the dont from the - name and remove this comment""" + def test_connect_ssl(self): def accept_once(listenfd): try: conn, addr = listenfd.accept() @@ -102,7 +100,6 @@ class TestApi(tests.TestCase): connected = [] def accept_twice((conn, addr)): - print 'connected' connected.append(True) conn.close() if len(connected) == 2: diff --git a/eventlet/util.py b/eventlet/util.py index 5ce27df..e09e168 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -89,7 +89,6 @@ def wrap_ssl(sock, certificate=None, private_key=None): from OpenSSL import SSL from eventlet import wrappedfd, util context = SSL.Context(SSL.SSLv23_METHOD) - print certificate, private_key if certificate is not None: context.use_certificate_file(certificate) if private_key is not None: @@ -200,7 +199,7 @@ def socket_recv(descriptor, buflen): except SSL.SysCallError, e: (ssl_errno, ssl_errstr) = e if ssl_errno == -1 or ssl_errno > 0: - raise socket.error(errno.ECONNRESET, errno.errorcode[errno.ECONNRESET]) + return '' raise def file_recv(fd, buflen): diff --git a/eventlet/wrappedfd.py b/eventlet/wrappedfd.py index 0cdebac..e74a553 100644 --- a/eventlet/wrappedfd.py +++ b/eventlet/wrappedfd.py @@ -130,6 +130,13 @@ class wrapped_fd(object): fn = self.setblocking = self.fd.setblocking return fn(*args, **kw) + def shutdown(self, *args, **kw): + if self.is_secure: + fn = self.shutdown = self.fd.sock_shutdown + else: + fn = self.shutdown = self.fd.shutdown + return fn(*args, **kw) + def close(self, *args, **kw): if self._closed: return @@ -137,6 +144,11 @@ class wrapped_fd(object): if self._refcount.is_referenced(): return self._closed = True + if self.is_secure: + # *NOTE: This is not quite the correct SSL shutdown sequence. + # We should actually be checking the return value of shutdown. + # Note also that this is not the same as calling self.shutdown(). + self.fd.shutdown() fn = self.close = self.fd.close try: res = fn(*args, **kw)