Merge r122:142 from svn trunk.

This commit is contained in:
rdw
2008-07-21 19:04:53 -07:00
6 changed files with 556 additions and 9 deletions

View File

@@ -169,7 +169,7 @@ def trampoline(fd, read=False, write=False, timeout=None):
hub.remove_descriptor(fileno)
greenlib.switch(self, fd)
if timeout is not None:
t = hub.schedule_call(timeout, _do_timeout)
t = hub.schedule_call(timeout, _do_timeout, fileno)
hub.add_descriptor(fileno, read and cb, write and cb, _do_close)
return hub.switch()
@@ -365,6 +365,7 @@ def sleep(seconds=0):
switch = greenlib.switch
local_dict = greenlib.greenlet_dict
getcurrent = greenlet.getcurrent
GreenletExit = greenlet.GreenletExit

View File

@@ -28,7 +28,10 @@ import socket
from eventlet import api
from eventlet import greenio
from eventlet import tests
from eventlet import util
from eventlet import api, wrappedfd, util
import os.path
import socket
def check_hub():
@@ -47,10 +50,9 @@ def check_hub():
class TestApi(tests.TestCase):
mode = 'static'
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
def test_tcp_listener(self):
socket = api.tcp_listener(('0.0.0.0', 0))
assert socket.getsockname()[0] == '0.0.0.0'
@@ -125,7 +127,7 @@ class TestApi(tests.TestCase):
check_hub()
def test_001_trampoline_timeout(self):
def test_trampoline_timeout(self):
server = api.tcp_listener(('0.0.0.0', 0))
bound_port = server.getsockname()[1]

View File

@@ -215,6 +215,141 @@ class event(object):
hub.schedule_call(0, greenlib.switch, waiter, self._result)
class semaphore(object):
"""Classic semaphore implemented with a counter and an event.
Optionally initialize with a resource count, then acquire() and release()
resources as needed. Attempting to acquire() when count is zero suspends
the calling coroutine until count becomes nonzero again.
>>> from eventlet import coros, api
>>> sem = coros.semaphore(2, limit=3)
>>> sem.acquire()
>>> sem.acquire()
>>> def releaser(sem):
... print "releasing one"
... sem.release()
...
>>> _ = api.spawn(releaser, sem)
>>> sem.acquire()
releasing one
>>> sem.counter
0
>>> for x in xrange(3):
... sem.release()
...
>>> def acquirer(sem):
... print "acquiring one"
... sem.acquire()
...
>>> _ = api.spawn(acquirer, sem)
>>> sem.release()
acquiring one
>>> sem.counter
3
"""
def __init__(self, count=0, limit=None):
if limit is not None and count > limit:
# Prevent initializing with inconsistent values
count = limit
self.counter = count
self.limit = limit
self.acqevent = event()
self.relevent = event()
if self.counter > 0:
# If we initially have items, then don't block acquire()s.
self.acqevent.send()
if self.limit is None or self.counter < self.limit:
# If either there's no limit or we're below it, don't block on
# release()s.
self.relevent.send()
def acquire(self):
# This logic handles the self.limit is None case because None != any integer.
while self.counter == 0:
# Loop until there are resources to acquire. We loop because we
# could be one of several coroutines waiting for a single item. If
# we all get notified, only one is going to claim it, and the rest
# of us must continue waiting.
self.acqevent.wait()
# claim the resource
self.counter -= 1
if self.counter == 0:
# If we just transitioned from having a resource to having none,
# make anyone else's wait() actually wait.
self.acqevent.reset()
if self.counter + 1 == self.limit:
# If we just transitioned from being full to having room for one
# more resource, notify whoever was waiting to release one.
self.relevent.send()
def release(self):
# This logic handles the self.limit is None case because None != any integer.
while self.counter == self.limit:
self.relevent.wait()
self.counter += 1
if self.counter == self.limit:
self.relevent.reset()
if self.counter == 1:
# If self.counter was 0 before we incremented it, then wake up
# anybody who was waiting
self.acqevent.send()
class metaphore(object):
"""This is sort of an inverse semaphore: a counter that starts at 0 and
waits only if nonzero. It's used to implement a "wait for all" scenario.
>>> from eventlet import api, coros
>>> count = coros.metaphore()
>>> count.wait()
>>> def decrementer(count, id):
... print "%s decrementing" % id
... count.dec()
...
>>> _ = api.spawn(decrementer, count, 'A')
>>> _ = api.spawn(decrementer, count, 'B')
>>> count.inc(2)
>>> count.wait()
A decrementing
B decrementing
"""
def __init__(self):
self.counter = 0
self.event = event()
# send() right away, else we'd wait on the default 0 count!
self.event.send()
def inc(self, by=1):
"""Increment our counter. If this transitions the counter from zero to
nonzero, make any subsequent wait() call wait.
"""
assert by > 0
self.counter += by
if self.counter == by:
# If we just incremented self.counter by 'by', and the new count
# equals 'by', then the old value of self.counter was 0.
# Transitioning from 0 to a nonzero value means wait() must
# actually wait.
self.event.reset()
def dec(self, by=1):
"""Decrement our counter. If this transitions the counter from nonzero
to zero, a current or subsequent wait() call need no longer wait.
"""
assert by > 0
self.counter -= by
if self.counter <= 0:
# Don't leave self.counter < 0, that will screw things up in
# future calls.
self.counter = 0
# Transitioning from nonzero to 0 means wait() need no longer wait.
self.event.send()
def wait(self):
"""Suspend the caller only if our count is nonzero. In that case,
resume the caller once the count decrements to zero again.
"""
self.event.wait()
def execute(func, *args, **kw):
""" Executes an operation asynchronously in a new coroutine, returning
an event to retrieve the return value.
@@ -272,21 +407,56 @@ class CoroutinePool(pools.Pool):
self._next_event = None
else:
self._tracked_events = None
self.requested = metaphore()
super(CoroutinePool, self).__init__(min_size, max_size)
## This doesn't yet pass its own doctest -- but I'm not even sure it's a
## wonderful idea.
## def __del__(self):
## """Experimental: try to prevent the calling script from exiting until
## all coroutines in this pool have run to completion.
## >>> from eventlet import coros
## >>> pool = coros.CoroutinePool()
## >>> def saw(x): print "I saw %s!"
## ...
## >>> pool.launch_all(saw, "GHI")
## >>> del pool
## I saw G!
## I saw H!
## I saw I!
## """
## self.wait_all()
def _main_loop(self, sender):
""" Private, infinite loop run by a pooled coroutine. """
try:
while True:
recvd = sender.wait()
# Delete the sender's result here because the very
# first event through the loop is referenced by
# spawn_startup, and therefore is not itself deleted.
# This means that we have to free up its argument
# because otherwise said argument persists in memory
# forever. This is generally only a problem in unit
# tests.
sender._result = NOT_USED
sender = event()
(evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw)
api.get_hub().cancel_timers(api.getcurrent())
# Likewise, delete these variables or else they will
# be referenced by this frame until replaced by the
# next recvd, which may or may not be a long time from
# now.
del evt, func, args, kw, recvd
self.put(sender)
finally:
# if we get here, something broke badly, and all we can really
# do is try to keep the pool from leaking items
# do is try to keep the pool from leaking items.
# Shouldn't even try to print the exception.
self.put(self.create())
def _safe_apply(self, evt, func, args, kw):
@@ -345,6 +515,20 @@ class CoroutinePool(pools.Pool):
sender = event()
self._greenlets.add(api.spawn(self._main_loop, sender))
return sender
def get(self):
"""Override of eventlet.pools.Pool interface"""
# Track the number of requested CoroutinePool coroutines
self.requested.inc()
# forward call to base class
return super(CoroutinePool, self).get()
def put(self, item):
"""Override of eventlet.pools.Pool interface"""
# forward call to base class
super(CoroutinePool, self).put(item)
# Track the number of outstanding CoroutinePool coroutines
self.requested.dec()
def execute(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained
@@ -407,6 +591,241 @@ class CoroutinePool(pools.Pool):
for g in self._greenlets:
api.kill(g)
def wait_all(self):
"""Wait until all coroutines started either by execute() or
execute_async() have completed. If you kept the event objects returned
by execute(), you can then call their individual wait() methods to
retrieve results with no further actual waiting.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> pool.wait_all()
>>> def hi(name):
... print "Hello, %s!" % name
... return name
...
>>> evt = pool.execute(hi, "world")
>>> pool.execute_async(hi, "darkness, my old friend")
>>> pool.wait_all()
Hello, world!
Hello, darkness, my old friend!
>>> evt.wait()
'world'
>>> pool.wait_all()
"""
self.requested.wait()
def launch_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). You should call wait_all() to
wait for all coroutines, newly-launched plus any previously-submitted
execute() or execute_async() calls, to complete.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x):
... print "I saw %s!" % x
...
>>> pool.launch_all(saw, "ABC")
>>> pool.wait_all()
I saw A!
I saw B!
I saw C!
"""
for tup in iterable:
self.execute_async(function, *tup)
def process_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). Don't return until all
coroutines, newly-launched plus any previously-submitted execute() or
execute_async() calls, have completed.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x): print "I saw %s!" % x
...
>>> pool.process_all(saw, "DEF")
I saw D!
I saw E!
I saw F!
"""
self.launch_all(function, iterable)
self.wait_all()
def generate_results(self, function, iterable, qsize=None):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Yield each of the values returned by function(), in the order they're
completed rather than the order the coroutines were launched.
Iteration stops when we've yielded results for each arguments tuple in
iterable. Unlike wait_all() and process_all(), this function does not
wait for any previously-submitted execute() or execute_async() calls.
Results are temporarily buffered in a queue. If you pass qsize=, this
value is used to limit the max size of the queue: an attempt to buffer
too many results will suspend the completed CoroutinePool coroutine
until the requesting coroutine (the caller of generate_results()) has
retrieved one or more results by calling this generator-iterator's
next().
If any coroutine raises an uncaught exception, that exception will
propagate to the requesting coroutine via the corresponding next() call.
What I particularly want these tests to illustrate is that using this
generator function:
for result in generate_results(function, iterable):
# ... do something with result ...
executes coroutines at least as aggressively as the classic eventlet
idiom:
events = [pool.execute(function, *args) for args in iterable]
for event in events:
result = event.wait()
# ... do something with result ...
even without a distinct event object for every arg tuple in iterable,
and despite the funny flow control from interleaving launches of new
coroutines with yields of completed coroutines' results.
(The use case that makes this function preferable to the classic idiom
above is when the iterable, which may itself be a generator, produces
millions of items.)
>>> from eventlet import coros
>>> import string
>>> pool = coros.CoroutinePool(max_size=5)
>>> pausers = [coros.event() for x in xrange(2)]
>>> def longtask(evt, desc):
... print "%s woke up with %s" % (desc, evt.wait())
...
>>> pool.launch_all(longtask, zip(pausers, "AB"))
>>> def quicktask(desc):
... print "returning %s" % desc
... return desc
...
(Instead of using a for loop, step through generate_results()
items individually to illustrate timing)
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
>>> print step.next()
returning a
returning b
returning c
a
>>> print step.next()
b
>>> print step.next()
c
>>> print step.next()
returning d
returning e
returning f
d
>>> pausers[0].send("A")
>>> print step.next()
e
>>> print step.next()
f
>>> print step.next()
A woke up with A
returning g
returning h
returning i
g
>>> print "".join([step.next() for x in xrange(3)])
returning j
returning k
returning l
returning m
hij
>>> pausers[1].send("B")
>>> print "".join([step.next() for x in xrange(4)])
B woke up with B
returning n
returning o
returning p
returning q
klmn
"""
# Get an iterator because of our funny nested loop below. Wrap the
# iterable in enumerate() so we count items that come through.
tuples = iter(enumerate(iterable))
# If the iterable is empty, this whole function is a no-op, and we can
# save ourselves some grief by just quitting out. In particular, once
# we enter the outer loop below, we're going to wait on the queue --
# but if we launched no coroutines with that queue as the destination,
# we could end up waiting a very long time.
try:
index, args = tuples.next()
except StopIteration:
return
# From this point forward, 'args' is the current arguments tuple and
# 'index+1' counts how many such tuples we've seen.
# This implementation relies on the fact that _execute() accepts an
# event-like object, and -- unless it's None -- the completed
# coroutine calls send(result). We slyly pass a queue rather than an
# event -- the same queue instance for all coroutines. This is why our
# queue interface intentionally resembles the event interface.
q = queue(max_size=qsize)
# How many results have we yielded so far?
finished = 0
# This first loop is only until we've launched all the coroutines. Its
# complexity is because if iterable contains more args tuples than the
# size of our pool, attempting to _execute() the (poolsize+1)th
# coroutine would suspend until something completes and send()s its
# result to our queue. But to keep down queue overhead and to maximize
# responsiveness to our caller, we'd rather suspend on reading the
# queue. So we stuff the pool as full as we can, then wait for
# something to finish, then stuff more coroutines into the pool.
try:
while True:
# Before each yield, start as many new coroutines as we can fit.
# (The self.free() test isn't 100% accurate: if we happen to be
# executing in one of the pool's coroutines, we could _execute()
# without waiting even if self.free() reports 0. See _execute().)
# The point is that we don't want to wait in the _execute() call,
# we want to wait in the q.wait() call.
# IMPORTANT: at start, and whenever we've caught up with all
# coroutines we've launched so far, we MUST iterate this inner
# loop at least once, regardless of self.free() -- otherwise the
# q.wait() call below will deadlock!
# Recall that index is the index of the NEXT args tuple that we
# haven't yet launched. Therefore it counts how many args tuples
# we've launched so far.
while self.free() > 0 or finished == index:
# Just like the implementation of execute_async(), save that
# we're passing our queue instead of None as the "event" to
# which to send() the result.
self._execute(q, function, args, {})
# We've consumed that args tuple, advance to next.
index, args = tuples.next()
# Okay, we've filled up the pool again, yield a result -- which
# will probably wait for a coroutine to complete. Although we do
# have q.ready(), so we could iterate without waiting, we avoid
# that because every yield could involve considerable real time.
# We don't know how long it takes to return from yield, so every
# time we do, take the opportunity to stuff more requests into the
# pool before yielding again.
yield q.wait()
# Be sure to count results so we know when to stop!
finished += 1
except StopIteration:
pass
# Here we've exhausted the input iterable. index+1 is the total number
# of coroutines we've launched. We probably haven't yielded that many
# results yet. Wait for the rest of the results, yielding them as they
# arrive.
while finished < index + 1:
yield q.wait()
finished += 1
class pipe(object):
""" Implementation of pipe using events. Not tested! Not used, either."""
@@ -429,6 +848,70 @@ class pipe(object):
return buf
class queue(object):
"""Cross-coroutine queue, using semaphore to synchronize.
The API is like a generalization of event to be able to hold more than one
item at a time (without reset() or cancel()).
>>> from eventlet import coros
>>> q = coros.queue(max_size=2)
>>> def putter(q):
... q.send("first")
...
>>> _ = api.spawn(putter, q)
>>> q.ready()
False
>>> q.wait()
'first'
>>> q.ready()
False
>>> q.send("second")
>>> q.ready()
True
>>> q.send("third")
>>> def getter(q):
... print q.wait()
...
>>> _ = api.spawn(getter, q)
>>> q.send("fourth")
second
"""
def __init__(self, max_size=None):
"""If you omit max_size, the queue will attempt to store an unlimited
number of items.
Specifying max_size means that when the queue already contains
max_size items, an attempt to send() one more item will suspend the
calling coroutine until someone else retrieves one.
"""
self.items = collections.deque()
self.sem = semaphore(count=0, limit=max_size)
def send(self, result=None, exc=None):
"""If you send(exc=SomeExceptionClass), the corresponding wait() call
will raise that exception.
Otherwise, the corresponding wait() will return result (default None).
"""
self.items.append((result, exc))
self.sem.release()
def wait(self):
"""Wait for an item sent by a send() call, in FIFO order.
If the corresponding send() specifies exc=SomeExceptionClass, this
wait() will raise that exception.
Otherwise, this wait() will return the corresponding send() call's
result= parameter.
"""
self.sem.acquire()
result, exc = self.items.popleft()
if exc is not None:
raise exc
return result
def ready(self):
# could also base this on self.sem.counter...
return len(self.items) > 0
class Actor(object):
""" A free-running coroutine that accepts and processes messages.

View File

@@ -91,6 +91,8 @@ class BaseConnectionPool(Pool):
# it's dead or None
try:
conn.rollback()
except KeyboardInterrupt:
raise
except AttributeError, e:
# this means it's already been destroyed, so we don't need to print anything
conn = None
@@ -113,6 +115,20 @@ class BaseConnectionPool(Pool):
super(BaseConnectionPool, self).put(conn)
else:
self.current_size -= 1
def clear(self):
""" Close all connections that this pool still holds a reference to, leaving it empty."""
for conn in self.free_items:
try:
conn.close()
except KeyboardInterrupt:
raise
except:
pass # even if stuff happens here, we still want to at least try to close all the other connections
self.free_items.clear()
def __del__(self):
self.clear()
class SaranwrappedConnectionPool(BaseConnectionPool):

View File

@@ -615,6 +615,7 @@ class HttpSuite(object):
except (Found, TemporaryRedirect, MovedPermanently, SeeOther), e:
if retried >= max_retries:
raise
retried += 1
req = retry_response(e)
def get(self, *args, **kwargs):
@@ -672,6 +673,51 @@ class HttpSuite(object):
return self.post_(*args, **kwargs)[-1]
class HttpStreamSuite(HttpSuite):
def request_(self, params):
'''Make an http request to a url, for internal use mostly.'''
params = _LocalParams(params, instance=self)
(scheme, location, path, parameters, query,
fragment) = urlparse.urlparse(params.url)
if params.use_proxy:
if scheme == 'file':
params.use_proxy = False
else:
params.headers['host'] = location
if not params.use_proxy:
params.path = path
if query:
params.path += '?' + query
params.orig_body = params.body
if params.method in ('PUT', 'POST'):
if self.dumper is not None:
params.body = self.dumper(params.body)
# don't set content-length header because httplib does it
# for us in _send_request
else:
params.body = ''
params.response = self._get_response_body(params)
response = params.response
return response.status, response.msg, response
def _get_response_body(self, params):
connection = connect(params.url, params.use_proxy)
connection.request(params.method, params.path, params.body,
params.headers)
params.response = connection.getresponse()
#connection.close()
self._check_status(params)
return params.response
def make_suite(dumper, loader, fallback_content_type):
""" Return a tuple of methods for making http requests with automatic bidirectional formatting with a particular content-type."""
suite = HttpSuite(dumper, loader, fallback_content_type)

View File

@@ -81,7 +81,6 @@ def wrap_ssl(sock, certificate=None, private_key=None):
from OpenSSL import SSL
from eventlet import greenio, util
context = SSL.Context(SSL.SSLv23_METHOD)
#print certificate, private_key
if certificate is not None:
context.use_certificate_file(certificate)
if private_key is not None: