diff --git a/eventlet/api.py b/eventlet/api.py index 35a4517..a6113e7 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -169,7 +169,7 @@ def trampoline(fd, read=False, write=False, timeout=None): hub.remove_descriptor(fileno) greenlib.switch(self, fd) if timeout is not None: - t = hub.schedule_call(timeout, _do_timeout) + t = hub.schedule_call(timeout, _do_timeout, fileno) hub.add_descriptor(fileno, read and cb, write and cb, _do_close) return hub.switch() @@ -365,6 +365,7 @@ def sleep(seconds=0): switch = greenlib.switch +local_dict = greenlib.greenlet_dict getcurrent = greenlet.getcurrent GreenletExit = greenlet.GreenletExit diff --git a/eventlet/api_test.py b/eventlet/api_test.py index 0e45f8f..a979d44 100644 --- a/eventlet/api_test.py +++ b/eventlet/api_test.py @@ -28,7 +28,10 @@ import socket from eventlet import api from eventlet import greenio from eventlet import tests -from eventlet import util +from eventlet import api, wrappedfd, util + +import os.path +import socket def check_hub(): @@ -47,10 +50,9 @@ def check_hub(): class TestApi(tests.TestCase): mode = 'static' - - certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') + certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt') private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key') - + def test_tcp_listener(self): socket = api.tcp_listener(('0.0.0.0', 0)) assert socket.getsockname()[0] == '0.0.0.0' @@ -125,7 +127,7 @@ class TestApi(tests.TestCase): check_hub() - def test_001_trampoline_timeout(self): + def test_trampoline_timeout(self): server = api.tcp_listener(('0.0.0.0', 0)) bound_port = server.getsockname()[1] diff --git a/eventlet/coros.py b/eventlet/coros.py index 0d9e8bb..9bd5387 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -215,6 +215,141 @@ class event(object): hub.schedule_call(0, greenlib.switch, waiter, self._result) +class semaphore(object): + """Classic semaphore implemented with a counter and an event. + Optionally initialize with a resource count, then acquire() and release() + resources as needed. Attempting to acquire() when count is zero suspends + the calling coroutine until count becomes nonzero again. + + >>> from eventlet import coros, api + >>> sem = coros.semaphore(2, limit=3) + >>> sem.acquire() + >>> sem.acquire() + >>> def releaser(sem): + ... print "releasing one" + ... sem.release() + ... + >>> _ = api.spawn(releaser, sem) + >>> sem.acquire() + releasing one + >>> sem.counter + 0 + >>> for x in xrange(3): + ... sem.release() + ... + >>> def acquirer(sem): + ... print "acquiring one" + ... sem.acquire() + ... + >>> _ = api.spawn(acquirer, sem) + >>> sem.release() + acquiring one + >>> sem.counter + 3 + """ + def __init__(self, count=0, limit=None): + if limit is not None and count > limit: + # Prevent initializing with inconsistent values + count = limit + self.counter = count + self.limit = limit + self.acqevent = event() + self.relevent = event() + if self.counter > 0: + # If we initially have items, then don't block acquire()s. + self.acqevent.send() + if self.limit is None or self.counter < self.limit: + # If either there's no limit or we're below it, don't block on + # release()s. + self.relevent.send() + + def acquire(self): + # This logic handles the self.limit is None case because None != any integer. + while self.counter == 0: + # Loop until there are resources to acquire. We loop because we + # could be one of several coroutines waiting for a single item. If + # we all get notified, only one is going to claim it, and the rest + # of us must continue waiting. + self.acqevent.wait() + # claim the resource + self.counter -= 1 + if self.counter == 0: + # If we just transitioned from having a resource to having none, + # make anyone else's wait() actually wait. + self.acqevent.reset() + if self.counter + 1 == self.limit: + # If we just transitioned from being full to having room for one + # more resource, notify whoever was waiting to release one. + self.relevent.send() + + def release(self): + # This logic handles the self.limit is None case because None != any integer. + while self.counter == self.limit: + self.relevent.wait() + self.counter += 1 + if self.counter == self.limit: + self.relevent.reset() + if self.counter == 1: + # If self.counter was 0 before we incremented it, then wake up + # anybody who was waiting + self.acqevent.send() + +class metaphore(object): + """This is sort of an inverse semaphore: a counter that starts at 0 and + waits only if nonzero. It's used to implement a "wait for all" scenario. + + >>> from eventlet import api, coros + >>> count = coros.metaphore() + >>> count.wait() + >>> def decrementer(count, id): + ... print "%s decrementing" % id + ... count.dec() + ... + >>> _ = api.spawn(decrementer, count, 'A') + >>> _ = api.spawn(decrementer, count, 'B') + >>> count.inc(2) + >>> count.wait() + A decrementing + B decrementing + """ + def __init__(self): + self.counter = 0 + self.event = event() + # send() right away, else we'd wait on the default 0 count! + self.event.send() + + def inc(self, by=1): + """Increment our counter. If this transitions the counter from zero to + nonzero, make any subsequent wait() call wait. + """ + assert by > 0 + self.counter += by + if self.counter == by: + # If we just incremented self.counter by 'by', and the new count + # equals 'by', then the old value of self.counter was 0. + # Transitioning from 0 to a nonzero value means wait() must + # actually wait. + self.event.reset() + + def dec(self, by=1): + """Decrement our counter. If this transitions the counter from nonzero + to zero, a current or subsequent wait() call need no longer wait. + """ + assert by > 0 + self.counter -= by + if self.counter <= 0: + # Don't leave self.counter < 0, that will screw things up in + # future calls. + self.counter = 0 + # Transitioning from nonzero to 0 means wait() need no longer wait. + self.event.send() + + def wait(self): + """Suspend the caller only if our count is nonzero. In that case, + resume the caller once the count decrements to zero again. + """ + self.event.wait() + def execute(func, *args, **kw): """ Executes an operation asynchronously in a new coroutine, returning an event to retrieve the return value. @@ -272,21 +407,56 @@ class CoroutinePool(pools.Pool): self._next_event = None else: self._tracked_events = None + self.requested = metaphore() super(CoroutinePool, self).__init__(min_size, max_size) - + +## This doesn't yet pass its own doctest -- but I'm not even sure it's a +## wonderful idea. +## def __del__(self): +## """Experimental: try to prevent the calling script from exiting until +## all coroutines in this pool have run to completion. + +## >>> from eventlet import coros +## >>> pool = coros.CoroutinePool() +## >>> def saw(x): print "I saw %s!" +## ... +## >>> pool.launch_all(saw, "GHI") +## >>> del pool +## I saw G! +## I saw H! +## I saw I! +## """ +## self.wait_all() + def _main_loop(self, sender): """ Private, infinite loop run by a pooled coroutine. """ try: while True: recvd = sender.wait() + # Delete the sender's result here because the very + # first event through the loop is referenced by + # spawn_startup, and therefore is not itself deleted. + # This means that we have to free up its argument + # because otherwise said argument persists in memory + # forever. This is generally only a problem in unit + # tests. + sender._result = NOT_USED + sender = event() (evt, func, args, kw) = recvd self._safe_apply(evt, func, args, kw) api.get_hub().cancel_timers(api.getcurrent()) + # Likewise, delete these variables or else they will + # be referenced by this frame until replaced by the + # next recvd, which may or may not be a long time from + # now. + del evt, func, args, kw, recvd + self.put(sender) finally: # if we get here, something broke badly, and all we can really - # do is try to keep the pool from leaking items + # do is try to keep the pool from leaking items. + # Shouldn't even try to print the exception. self.put(self.create()) def _safe_apply(self, evt, func, args, kw): @@ -345,6 +515,20 @@ class CoroutinePool(pools.Pool): sender = event() self._greenlets.add(api.spawn(self._main_loop, sender)) return sender + + def get(self): + """Override of eventlet.pools.Pool interface""" + # Track the number of requested CoroutinePool coroutines + self.requested.inc() + # forward call to base class + return super(CoroutinePool, self).get() + + def put(self, item): + """Override of eventlet.pools.Pool interface""" + # forward call to base class + super(CoroutinePool, self).put(item) + # Track the number of outstanding CoroutinePool coroutines + self.requested.dec() def execute(self, func, *args, **kw): """Execute func in one of the coroutines maintained @@ -407,6 +591,241 @@ class CoroutinePool(pools.Pool): for g in self._greenlets: api.kill(g) + def wait_all(self): + """Wait until all coroutines started either by execute() or + execute_async() have completed. If you kept the event objects returned + by execute(), you can then call their individual wait() methods to + retrieve results with no further actual waiting. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> pool.wait_all() + >>> def hi(name): + ... print "Hello, %s!" % name + ... return name + ... + >>> evt = pool.execute(hi, "world") + >>> pool.execute_async(hi, "darkness, my old friend") + >>> pool.wait_all() + Hello, world! + Hello, darkness, my old friend! + >>> evt.wait() + 'world' + >>> pool.wait_all() + """ + self.requested.wait() + + def launch_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). You should call wait_all() to + wait for all coroutines, newly-launched plus any previously-submitted + execute() or execute_async() calls, to complete. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> def saw(x): + ... print "I saw %s!" % x + ... + >>> pool.launch_all(saw, "ABC") + >>> pool.wait_all() + I saw A! + I saw B! + I saw C! + """ + for tup in iterable: + self.execute_async(function, *tup) + + def process_all(self, function, iterable): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Discard values returned by function(). Don't return until all + coroutines, newly-launched plus any previously-submitted execute() or + execute_async() calls, have completed. + + >>> from eventlet import coros + >>> pool = coros.CoroutinePool() + >>> def saw(x): print "I saw %s!" % x + ... + >>> pool.process_all(saw, "DEF") + I saw D! + I saw E! + I saw F! + """ + self.launch_all(function, iterable) + self.wait_all() + + def generate_results(self, function, iterable, qsize=None): + """For each tuple (sequence) in iterable, launch function(*tuple) in + its own coroutine -- like itertools.starmap(), but in parallel. + Yield each of the values returned by function(), in the order they're + completed rather than the order the coroutines were launched. + + Iteration stops when we've yielded results for each arguments tuple in + iterable. Unlike wait_all() and process_all(), this function does not + wait for any previously-submitted execute() or execute_async() calls. + + Results are temporarily buffered in a queue. If you pass qsize=, this + value is used to limit the max size of the queue: an attempt to buffer + too many results will suspend the completed CoroutinePool coroutine + until the requesting coroutine (the caller of generate_results()) has + retrieved one or more results by calling this generator-iterator's + next(). + + If any coroutine raises an uncaught exception, that exception will + propagate to the requesting coroutine via the corresponding next() call. + + What I particularly want these tests to illustrate is that using this + generator function: + + for result in generate_results(function, iterable): + # ... do something with result ... + + executes coroutines at least as aggressively as the classic eventlet + idiom: + + events = [pool.execute(function, *args) for args in iterable] + for event in events: + result = event.wait() + # ... do something with result ... + + even without a distinct event object for every arg tuple in iterable, + and despite the funny flow control from interleaving launches of new + coroutines with yields of completed coroutines' results. + + (The use case that makes this function preferable to the classic idiom + above is when the iterable, which may itself be a generator, produces + millions of items.) + + >>> from eventlet import coros + >>> import string + >>> pool = coros.CoroutinePool(max_size=5) + >>> pausers = [coros.event() for x in xrange(2)] + >>> def longtask(evt, desc): + ... print "%s woke up with %s" % (desc, evt.wait()) + ... + >>> pool.launch_all(longtask, zip(pausers, "AB")) + >>> def quicktask(desc): + ... print "returning %s" % desc + ... return desc + ... + + (Instead of using a for loop, step through generate_results() + items individually to illustrate timing) + + >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase)) + >>> print step.next() + returning a + returning b + returning c + a + >>> print step.next() + b + >>> print step.next() + c + >>> print step.next() + returning d + returning e + returning f + d + >>> pausers[0].send("A") + >>> print step.next() + e + >>> print step.next() + f + >>> print step.next() + A woke up with A + returning g + returning h + returning i + g + >>> print "".join([step.next() for x in xrange(3)]) + returning j + returning k + returning l + returning m + hij + >>> pausers[1].send("B") + >>> print "".join([step.next() for x in xrange(4)]) + B woke up with B + returning n + returning o + returning p + returning q + klmn + """ + # Get an iterator because of our funny nested loop below. Wrap the + # iterable in enumerate() so we count items that come through. + tuples = iter(enumerate(iterable)) + # If the iterable is empty, this whole function is a no-op, and we can + # save ourselves some grief by just quitting out. In particular, once + # we enter the outer loop below, we're going to wait on the queue -- + # but if we launched no coroutines with that queue as the destination, + # we could end up waiting a very long time. + try: + index, args = tuples.next() + except StopIteration: + return + # From this point forward, 'args' is the current arguments tuple and + # 'index+1' counts how many such tuples we've seen. + # This implementation relies on the fact that _execute() accepts an + # event-like object, and -- unless it's None -- the completed + # coroutine calls send(result). We slyly pass a queue rather than an + # event -- the same queue instance for all coroutines. This is why our + # queue interface intentionally resembles the event interface. + q = queue(max_size=qsize) + # How many results have we yielded so far? + finished = 0 + # This first loop is only until we've launched all the coroutines. Its + # complexity is because if iterable contains more args tuples than the + # size of our pool, attempting to _execute() the (poolsize+1)th + # coroutine would suspend until something completes and send()s its + # result to our queue. But to keep down queue overhead and to maximize + # responsiveness to our caller, we'd rather suspend on reading the + # queue. So we stuff the pool as full as we can, then wait for + # something to finish, then stuff more coroutines into the pool. + try: + while True: + # Before each yield, start as many new coroutines as we can fit. + # (The self.free() test isn't 100% accurate: if we happen to be + # executing in one of the pool's coroutines, we could _execute() + # without waiting even if self.free() reports 0. See _execute().) + # The point is that we don't want to wait in the _execute() call, + # we want to wait in the q.wait() call. + # IMPORTANT: at start, and whenever we've caught up with all + # coroutines we've launched so far, we MUST iterate this inner + # loop at least once, regardless of self.free() -- otherwise the + # q.wait() call below will deadlock! + # Recall that index is the index of the NEXT args tuple that we + # haven't yet launched. Therefore it counts how many args tuples + # we've launched so far. + while self.free() > 0 or finished == index: + # Just like the implementation of execute_async(), save that + # we're passing our queue instead of None as the "event" to + # which to send() the result. + self._execute(q, function, args, {}) + # We've consumed that args tuple, advance to next. + index, args = tuples.next() + # Okay, we've filled up the pool again, yield a result -- which + # will probably wait for a coroutine to complete. Although we do + # have q.ready(), so we could iterate without waiting, we avoid + # that because every yield could involve considerable real time. + # We don't know how long it takes to return from yield, so every + # time we do, take the opportunity to stuff more requests into the + # pool before yielding again. + yield q.wait() + # Be sure to count results so we know when to stop! + finished += 1 + except StopIteration: + pass + # Here we've exhausted the input iterable. index+1 is the total number + # of coroutines we've launched. We probably haven't yielded that many + # results yet. Wait for the rest of the results, yielding them as they + # arrive. + while finished < index + 1: + yield q.wait() + finished += 1 + class pipe(object): """ Implementation of pipe using events. Not tested! Not used, either.""" @@ -429,6 +848,70 @@ class pipe(object): return buf +class queue(object): + """Cross-coroutine queue, using semaphore to synchronize. + The API is like a generalization of event to be able to hold more than one + item at a time (without reset() or cancel()). + + >>> from eventlet import coros + >>> q = coros.queue(max_size=2) + >>> def putter(q): + ... q.send("first") + ... + >>> _ = api.spawn(putter, q) + >>> q.ready() + False + >>> q.wait() + 'first' + >>> q.ready() + False + >>> q.send("second") + >>> q.ready() + True + >>> q.send("third") + >>> def getter(q): + ... print q.wait() + ... + >>> _ = api.spawn(getter, q) + >>> q.send("fourth") + second + """ + def __init__(self, max_size=None): + """If you omit max_size, the queue will attempt to store an unlimited + number of items. + Specifying max_size means that when the queue already contains + max_size items, an attempt to send() one more item will suspend the + calling coroutine until someone else retrieves one. + """ + self.items = collections.deque() + self.sem = semaphore(count=0, limit=max_size) + + def send(self, result=None, exc=None): + """If you send(exc=SomeExceptionClass), the corresponding wait() call + will raise that exception. + Otherwise, the corresponding wait() will return result (default None). + """ + self.items.append((result, exc)) + self.sem.release() + + def wait(self): + """Wait for an item sent by a send() call, in FIFO order. + If the corresponding send() specifies exc=SomeExceptionClass, this + wait() will raise that exception. + Otherwise, this wait() will return the corresponding send() call's + result= parameter. + """ + self.sem.acquire() + result, exc = self.items.popleft() + if exc is not None: + raise exc + return result + + def ready(self): + # could also base this on self.sem.counter... + return len(self.items) > 0 + + class Actor(object): """ A free-running coroutine that accepts and processes messages. diff --git a/eventlet/db_pool.py b/eventlet/db_pool.py index 3c91793..aefe5f8 100644 --- a/eventlet/db_pool.py +++ b/eventlet/db_pool.py @@ -91,6 +91,8 @@ class BaseConnectionPool(Pool): # it's dead or None try: conn.rollback() + except KeyboardInterrupt: + raise except AttributeError, e: # this means it's already been destroyed, so we don't need to print anything conn = None @@ -113,6 +115,20 @@ class BaseConnectionPool(Pool): super(BaseConnectionPool, self).put(conn) else: self.current_size -= 1 + + def clear(self): + """ Close all connections that this pool still holds a reference to, leaving it empty.""" + for conn in self.free_items: + try: + conn.close() + except KeyboardInterrupt: + raise + except: + pass # even if stuff happens here, we still want to at least try to close all the other connections + self.free_items.clear() + + def __del__(self): + self.clear() class SaranwrappedConnectionPool(BaseConnectionPool): diff --git a/eventlet/httpc.py b/eventlet/httpc.py index afb1737..5da36cc 100644 --- a/eventlet/httpc.py +++ b/eventlet/httpc.py @@ -615,6 +615,7 @@ class HttpSuite(object): except (Found, TemporaryRedirect, MovedPermanently, SeeOther), e: if retried >= max_retries: raise + retried += 1 req = retry_response(e) def get(self, *args, **kwargs): @@ -672,6 +673,51 @@ class HttpSuite(object): return self.post_(*args, **kwargs)[-1] +class HttpStreamSuite(HttpSuite): + def request_(self, params): + '''Make an http request to a url, for internal use mostly.''' + + params = _LocalParams(params, instance=self) + + (scheme, location, path, parameters, query, + fragment) = urlparse.urlparse(params.url) + + if params.use_proxy: + if scheme == 'file': + params.use_proxy = False + else: + params.headers['host'] = location + + if not params.use_proxy: + params.path = path + if query: + params.path += '?' + query + + params.orig_body = params.body + + if params.method in ('PUT', 'POST'): + if self.dumper is not None: + params.body = self.dumper(params.body) + # don't set content-length header because httplib does it + # for us in _send_request + else: + params.body = '' + + params.response = self._get_response_body(params) + response = params.response + + return response.status, response.msg, response + + def _get_response_body(self, params): + connection = connect(params.url, params.use_proxy) + connection.request(params.method, params.path, params.body, + params.headers) + params.response = connection.getresponse() + #connection.close() + self._check_status(params) + + return params.response + def make_suite(dumper, loader, fallback_content_type): """ Return a tuple of methods for making http requests with automatic bidirectional formatting with a particular content-type.""" suite = HttpSuite(dumper, loader, fallback_content_type) diff --git a/eventlet/util.py b/eventlet/util.py index 9bad6d9..2140a38 100644 --- a/eventlet/util.py +++ b/eventlet/util.py @@ -81,7 +81,6 @@ def wrap_ssl(sock, certificate=None, private_key=None): from OpenSSL import SSL from eventlet import greenio, util context = SSL.Context(SSL.SSLv23_METHOD) - #print certificate, private_key if certificate is not None: context.use_certificate_file(certificate) if private_key is not None: